View Javadoc

1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    * 
8    * $Id: SubscriptionManager.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messagelet;
11  
12  import java.util.Iterator;
13  
14  import javax.jms.Destination;
15  import javax.jms.JMSException;
16  import javax.jms.MessageListener;
17  import javax.servlet.ServletContext;
18  import javax.servlet.ServletException;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.commons.messagelet.model.Subscription;
23  import org.apache.commons.messagelet.model.SubscriptionList;
24  import org.apache.commons.messenger.Messenger;
25  import org.apache.commons.messenger.MessengerManager;
26  import org.apache.commons.messenger.tool.StopWatchMessageListener;
27  
28  /** 
29   * <p><code>SubscriptionManager</code> is a simple command line program that will
30   * create a number of subscriptions and consume messages using just regular 
31   * MDO and MessageListener classes.
32   *
33   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
34   * @version $Revision: 155459 $
35   */
36  public class SubscriptionManager {
37  
38      /** Logger */
39      private static final Log log = LogFactory.getLog(SubscriptionManager.class);
40  
41      /** The JMS connections */    
42      private MessengerManager manager;
43  
44      /** The JMS Subscriptions */
45      private SubscriptionList subscriptionList;
46  
47      /** The context passed into MDOs */
48      private ServletContext servletContext;
49          
50      /** Should we use a stopwatch to output performance metrics */
51      private boolean useStopWatch = false;
52  
53      
54      public SubscriptionManager() {
55      }
56  
57  
58      protected void subscribe() throws JMSException, ServletException {
59          for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) {
60              Subscription subscription = (Subscription) iter.next();
61              subscribe( subscription );
62          }
63      }
64      
65      public void subscribe( Subscription subscription ) throws JMSException, ServletException{
66          String name = subscription.getConnection();
67          Messenger messenger = getMessenger( name );
68          if ( messenger == null ) {
69              throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription );
70          }
71          String subject = subscription.getSubject();
72          if ( subject == null || subject.length() == 0 ) {
73              throw new JMSException( "No destination defined for subscription: " + subscription );
74          }
75          
76          Destination destination = messenger.getDestination( subject );
77          if ( destination == null ) {
78              throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription );
79          }
80  
81          MessageListener listener = subscription.getMessageListener();
82          if ( listener == null ) {
83              throw new JMSException( "No MessageListener is defined for subscription: " + subscription );
84          }
85          
86          // if its an MDO the initialise it!
87          if ( listener instanceof MessageDrivenObject ) {
88              MessageDrivenObject mdo = (MessageDrivenObject) listener;
89              if ( mdo instanceof MessengerMDO ) {
90                  MessengerMDO messengerMDO = (MessengerMDO) mdo;
91                  messengerMDO.setMessenger( messenger );
92                  messengerMDO.setMessengerManager( getMessengerManager() );
93              }
94              mdo.init( getServletContext() );
95          }
96  
97          listener = wrapInStopWatch( listener );
98  
99          String selector = subscription.getSelector();
100 
101         ConsumerThread thread = subscription.getConsumerThread();
102         if (thread != null) {
103             log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread );
104             
105             thread.setMessenger(messenger);
106             thread.setDestination(destination);
107             thread.setSelector(selector);
108             thread.setListener(listener);
109             thread.start();
110         }
111         else {
112             if ( selector != null && selector.length() > 0 ) {
113                 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector );
114                 
115                 messenger.addListener( destination, selector, listener );
116             }
117             else {
118                 log.info( "Subscribing to messenger: " + name + " destination: " + subject );
119                 
120                 messenger.addListener( destination, listener );
121             }
122             
123             log.info( "Subscribed with listener: " + listener );
124         }
125     }
126 
127 
128     
129     public void unsubscribe() throws JMSException, ServletException {
130         SubscriptionList list = getSubscriptionList();
131         if ( list != null ) {
132             for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
133                 Subscription subscription = (Subscription) iter.next();
134                 unsubscribe( subscription );
135             }
136         }
137     }
138     
139     public void unsubscribe( Subscription subscription ) throws JMSException, ServletException {
140         // lets unsubscribe first
141         String name = subscription.getConnection();
142         Messenger messenger = getMessenger( name );
143         
144         MessageListener listener = subscription.getMessageListener();
145         if ( messenger != null && listener != null ) {
146             Destination destination = null;        
147             String subject = subscription.getSubject();
148             if ( subject == null || subject.length() == 0 ) {
149                 log.error( "No destination defined for subscription: " + subscription );
150             }
151             else {
152                 try {
153                     destination = messenger.getDestination( subject );
154                     if ( destination == null ) {
155                         log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription );
156                     }
157                 }
158                 catch (JMSException e) {
159                     log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e );
160                 }
161             }
162             if ( destination != null ) {
163                 try {
164                     String selector = subscription.getSelector();
165                     if ( selector != null && selector.length() > 0 ) {
166                         messenger.removeListener( destination, selector, listener );
167                     }
168                     else {
169                         messenger.removeListener( destination, listener );
170                     }
171                 }
172                 catch (JMSException e) {
173                     log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e );
174                 }
175             }
176         }
177         
178         // now lets destrory the MBO
179         if ( listener instanceof MessageDrivenObject ) {
180             MessageDrivenObject mdo = (MessageDrivenObject) listener;
181             mdo.destroy();
182         }
183     }
184     
185     
186     // Properties
187     //-------------------------------------------------------------------------    
188 
189     public MessengerManager getMessengerManager() throws JMSException {
190         return manager;
191     }
192     
193     public void setMessengerManager(MessengerManager manager) {
194         this.manager = manager;
195     }
196 
197     /**
198      * Returns the subscriptionList.
199      * @return SubscriptionList
200      */
201     public SubscriptionList getSubscriptionList() {
202         return subscriptionList;
203     }
204 
205     /**
206      * Sets the subscriptionList.
207      * @param subscriptionList The subscriptionList to set
208      */
209     public void setSubscriptionList(SubscriptionList subscriptionList) {
210         this.subscriptionList = subscriptionList;
211     }
212     
213     /**
214      * Returns the servletContext.
215      * @return ServletContext
216      */
217     public ServletContext getServletContext() {
218         return servletContext;
219     }
220 
221     /**
222      * Sets the servletContext.
223      * @param servletContext The servletContext to set
224      */
225     public void setServletContext(ServletContext servletContext) {
226         this.servletContext = servletContext;
227     }
228     
229     // Implementation methods
230     //-------------------------------------------------------------------------    
231     /**
232      * Allows the MessageListener to be wrapped inside a stop watch message listener if required 
233      */
234     protected MessageListener wrapInStopWatch( MessageListener listener ) {
235         if ( useStopWatch ) {
236             return new StopWatchMessageListener( listener );
237         }
238         return listener;
239     }
240 
241     protected Messenger getMessenger(String name) throws JMSException {
242         return getMessengerManager().getMessenger( name );
243     }
244 
245 }