001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: SubscriptionManager.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messagelet;
011    
012    import java.util.Iterator;
013    
014    import javax.jms.Destination;
015    import javax.jms.JMSException;
016    import javax.jms.MessageListener;
017    import javax.servlet.ServletContext;
018    import javax.servlet.ServletException;
019    
020    import org.apache.commons.logging.Log;
021    import org.apache.commons.logging.LogFactory;
022    import org.apache.commons.messagelet.model.Subscription;
023    import org.apache.commons.messagelet.model.SubscriptionList;
024    import org.apache.commons.messenger.Messenger;
025    import org.apache.commons.messenger.MessengerManager;
026    import org.apache.commons.messenger.tool.StopWatchMessageListener;
027    
028    /** 
029     * <p><code>SubscriptionManager</code> is a simple command line program that will
030     * create a number of subscriptions and consume messages using just regular 
031     * MDO and MessageListener classes.
032     *
033     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
034     * @version $Revision: 155459 $
035     */
036    public class SubscriptionManager {
037    
038        /** Logger */
039        private static final Log log = LogFactory.getLog(SubscriptionManager.class);
040    
041        /** The JMS connections */    
042        private MessengerManager manager;
043    
044        /** The JMS Subscriptions */
045        private SubscriptionList subscriptionList;
046    
047        /** The context passed into MDOs */
048        private ServletContext servletContext;
049            
050        /** Should we use a stopwatch to output performance metrics */
051        private boolean useStopWatch = false;
052    
053        
054        public SubscriptionManager() {
055        }
056    
057    
058        protected void subscribe() throws JMSException, ServletException {
059            for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) {
060                Subscription subscription = (Subscription) iter.next();
061                subscribe( subscription );
062            }
063        }
064        
065        public void subscribe( Subscription subscription ) throws JMSException, ServletException{
066            String name = subscription.getConnection();
067            Messenger messenger = getMessenger( name );
068            if ( messenger == null ) {
069                throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription );
070            }
071            String subject = subscription.getSubject();
072            if ( subject == null || subject.length() == 0 ) {
073                throw new JMSException( "No destination defined for subscription: " + subscription );
074            }
075            
076            Destination destination = messenger.getDestination( subject );
077            if ( destination == null ) {
078                throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription );
079            }
080    
081            MessageListener listener = subscription.getMessageListener();
082            if ( listener == null ) {
083                throw new JMSException( "No MessageListener is defined for subscription: " + subscription );
084            }
085            
086            // if its an MDO the initialise it!
087            if ( listener instanceof MessageDrivenObject ) {
088                MessageDrivenObject mdo = (MessageDrivenObject) listener;
089                if ( mdo instanceof MessengerMDO ) {
090                    MessengerMDO messengerMDO = (MessengerMDO) mdo;
091                    messengerMDO.setMessenger( messenger );
092                    messengerMDO.setMessengerManager( getMessengerManager() );
093                }
094                mdo.init( getServletContext() );
095            }
096    
097            listener = wrapInStopWatch( listener );
098    
099            String selector = subscription.getSelector();
100    
101            ConsumerThread thread = subscription.getConsumerThread();
102            if (thread != null) {
103                log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread );
104                
105                thread.setMessenger(messenger);
106                thread.setDestination(destination);
107                thread.setSelector(selector);
108                thread.setListener(listener);
109                thread.start();
110            }
111            else {
112                if ( selector != null && selector.length() > 0 ) {
113                    log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector );
114                    
115                    messenger.addListener( destination, selector, listener );
116                }
117                else {
118                    log.info( "Subscribing to messenger: " + name + " destination: " + subject );
119                    
120                    messenger.addListener( destination, listener );
121                }
122                
123                log.info( "Subscribed with listener: " + listener );
124            }
125        }
126    
127    
128        
129        public void unsubscribe() throws JMSException, ServletException {
130            SubscriptionList list = getSubscriptionList();
131            if ( list != null ) {
132                for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
133                    Subscription subscription = (Subscription) iter.next();
134                    unsubscribe( subscription );
135                }
136            }
137        }
138        
139        public void unsubscribe( Subscription subscription ) throws JMSException, ServletException {
140            // lets unsubscribe first
141            String name = subscription.getConnection();
142            Messenger messenger = getMessenger( name );
143            
144            MessageListener listener = subscription.getMessageListener();
145            if ( messenger != null && listener != null ) {
146                Destination destination = null;        
147                String subject = subscription.getSubject();
148                if ( subject == null || subject.length() == 0 ) {
149                    log.error( "No destination defined for subscription: " + subscription );
150                }
151                else {
152                    try {
153                        destination = messenger.getDestination( subject );
154                        if ( destination == null ) {
155                            log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription );
156                        }
157                    }
158                    catch (JMSException e) {
159                        log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e );
160                    }
161                }
162                if ( destination != null ) {
163                    try {
164                        String selector = subscription.getSelector();
165                        if ( selector != null && selector.length() > 0 ) {
166                            messenger.removeListener( destination, selector, listener );
167                        }
168                        else {
169                            messenger.removeListener( destination, listener );
170                        }
171                    }
172                    catch (JMSException e) {
173                        log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e );
174                    }
175                }
176            }
177            
178            // now lets destrory the MBO
179            if ( listener instanceof MessageDrivenObject ) {
180                MessageDrivenObject mdo = (MessageDrivenObject) listener;
181                mdo.destroy();
182            }
183        }
184        
185        
186        // Properties
187        //-------------------------------------------------------------------------    
188    
189        public MessengerManager getMessengerManager() throws JMSException {
190            return manager;
191        }
192        
193        public void setMessengerManager(MessengerManager manager) {
194            this.manager = manager;
195        }
196    
197        /**
198         * Returns the subscriptionList.
199         * @return SubscriptionList
200         */
201        public SubscriptionList getSubscriptionList() {
202            return subscriptionList;
203        }
204    
205        /**
206         * Sets the subscriptionList.
207         * @param subscriptionList The subscriptionList to set
208         */
209        public void setSubscriptionList(SubscriptionList subscriptionList) {
210            this.subscriptionList = subscriptionList;
211        }
212        
213        /**
214         * Returns the servletContext.
215         * @return ServletContext
216         */
217        public ServletContext getServletContext() {
218            return servletContext;
219        }
220    
221        /**
222         * Sets the servletContext.
223         * @param servletContext The servletContext to set
224         */
225        public void setServletContext(ServletContext servletContext) {
226            this.servletContext = servletContext;
227        }
228        
229        // Implementation methods
230        //-------------------------------------------------------------------------    
231        /**
232         * Allows the MessageListener to be wrapped inside a stop watch message listener if required 
233         */
234        protected MessageListener wrapInStopWatch( MessageListener listener ) {
235            if ( useStopWatch ) {
236                return new StopWatchMessageListener( listener );
237            }
238            return listener;
239        }
240    
241        protected Messenger getMessenger(String name) throws JMSException {
242            return getMessengerManager().getMessenger( name );
243        }
244    
245    }