001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: SubscriptionManager.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messagelet;
011
012 import java.util.Iterator;
013
014 import javax.jms.Destination;
015 import javax.jms.JMSException;
016 import javax.jms.MessageListener;
017 import javax.servlet.ServletContext;
018 import javax.servlet.ServletException;
019
020 import org.apache.commons.logging.Log;
021 import org.apache.commons.logging.LogFactory;
022 import org.apache.commons.messagelet.model.Subscription;
023 import org.apache.commons.messagelet.model.SubscriptionList;
024 import org.apache.commons.messenger.Messenger;
025 import org.apache.commons.messenger.MessengerManager;
026 import org.apache.commons.messenger.tool.StopWatchMessageListener;
027
028 /**
029 * <p><code>SubscriptionManager</code> is a simple command line program that will
030 * create a number of subscriptions and consume messages using just regular
031 * MDO and MessageListener classes.
032 *
033 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
034 * @version $Revision: 155459 $
035 */
036 public class SubscriptionManager {
037
038 /** Logger */
039 private static final Log log = LogFactory.getLog(SubscriptionManager.class);
040
041 /** The JMS connections */
042 private MessengerManager manager;
043
044 /** The JMS Subscriptions */
045 private SubscriptionList subscriptionList;
046
047 /** The context passed into MDOs */
048 private ServletContext servletContext;
049
050 /** Should we use a stopwatch to output performance metrics */
051 private boolean useStopWatch = false;
052
053
054 public SubscriptionManager() {
055 }
056
057
058 protected void subscribe() throws JMSException, ServletException {
059 for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) {
060 Subscription subscription = (Subscription) iter.next();
061 subscribe( subscription );
062 }
063 }
064
065 public void subscribe( Subscription subscription ) throws JMSException, ServletException{
066 String name = subscription.getConnection();
067 Messenger messenger = getMessenger( name );
068 if ( messenger == null ) {
069 throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription );
070 }
071 String subject = subscription.getSubject();
072 if ( subject == null || subject.length() == 0 ) {
073 throw new JMSException( "No destination defined for subscription: " + subscription );
074 }
075
076 Destination destination = messenger.getDestination( subject );
077 if ( destination == null ) {
078 throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription );
079 }
080
081 MessageListener listener = subscription.getMessageListener();
082 if ( listener == null ) {
083 throw new JMSException( "No MessageListener is defined for subscription: " + subscription );
084 }
085
086 // if its an MDO the initialise it!
087 if ( listener instanceof MessageDrivenObject ) {
088 MessageDrivenObject mdo = (MessageDrivenObject) listener;
089 if ( mdo instanceof MessengerMDO ) {
090 MessengerMDO messengerMDO = (MessengerMDO) mdo;
091 messengerMDO.setMessenger( messenger );
092 messengerMDO.setMessengerManager( getMessengerManager() );
093 }
094 mdo.init( getServletContext() );
095 }
096
097 listener = wrapInStopWatch( listener );
098
099 String selector = subscription.getSelector();
100
101 ConsumerThread thread = subscription.getConsumerThread();
102 if (thread != null) {
103 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread );
104
105 thread.setMessenger(messenger);
106 thread.setDestination(destination);
107 thread.setSelector(selector);
108 thread.setListener(listener);
109 thread.start();
110 }
111 else {
112 if ( selector != null && selector.length() > 0 ) {
113 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector );
114
115 messenger.addListener( destination, selector, listener );
116 }
117 else {
118 log.info( "Subscribing to messenger: " + name + " destination: " + subject );
119
120 messenger.addListener( destination, listener );
121 }
122
123 log.info( "Subscribed with listener: " + listener );
124 }
125 }
126
127
128
129 public void unsubscribe() throws JMSException, ServletException {
130 SubscriptionList list = getSubscriptionList();
131 if ( list != null ) {
132 for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
133 Subscription subscription = (Subscription) iter.next();
134 unsubscribe( subscription );
135 }
136 }
137 }
138
139 public void unsubscribe( Subscription subscription ) throws JMSException, ServletException {
140 // lets unsubscribe first
141 String name = subscription.getConnection();
142 Messenger messenger = getMessenger( name );
143
144 MessageListener listener = subscription.getMessageListener();
145 if ( messenger != null && listener != null ) {
146 Destination destination = null;
147 String subject = subscription.getSubject();
148 if ( subject == null || subject.length() == 0 ) {
149 log.error( "No destination defined for subscription: " + subscription );
150 }
151 else {
152 try {
153 destination = messenger.getDestination( subject );
154 if ( destination == null ) {
155 log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription );
156 }
157 }
158 catch (JMSException e) {
159 log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e );
160 }
161 }
162 if ( destination != null ) {
163 try {
164 String selector = subscription.getSelector();
165 if ( selector != null && selector.length() > 0 ) {
166 messenger.removeListener( destination, selector, listener );
167 }
168 else {
169 messenger.removeListener( destination, listener );
170 }
171 }
172 catch (JMSException e) {
173 log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e );
174 }
175 }
176 }
177
178 // now lets destrory the MBO
179 if ( listener instanceof MessageDrivenObject ) {
180 MessageDrivenObject mdo = (MessageDrivenObject) listener;
181 mdo.destroy();
182 }
183 }
184
185
186 // Properties
187 //-------------------------------------------------------------------------
188
189 public MessengerManager getMessengerManager() throws JMSException {
190 return manager;
191 }
192
193 public void setMessengerManager(MessengerManager manager) {
194 this.manager = manager;
195 }
196
197 /**
198 * Returns the subscriptionList.
199 * @return SubscriptionList
200 */
201 public SubscriptionList getSubscriptionList() {
202 return subscriptionList;
203 }
204
205 /**
206 * Sets the subscriptionList.
207 * @param subscriptionList The subscriptionList to set
208 */
209 public void setSubscriptionList(SubscriptionList subscriptionList) {
210 this.subscriptionList = subscriptionList;
211 }
212
213 /**
214 * Returns the servletContext.
215 * @return ServletContext
216 */
217 public ServletContext getServletContext() {
218 return servletContext;
219 }
220
221 /**
222 * Sets the servletContext.
223 * @param servletContext The servletContext to set
224 */
225 public void setServletContext(ServletContext servletContext) {
226 this.servletContext = servletContext;
227 }
228
229 // Implementation methods
230 //-------------------------------------------------------------------------
231 /**
232 * Allows the MessageListener to be wrapped inside a stop watch message listener if required
233 */
234 protected MessageListener wrapInStopWatch( MessageListener listener ) {
235 if ( useStopWatch ) {
236 return new StopWatchMessageListener( listener );
237 }
238 return listener;
239 }
240
241 protected Messenger getMessenger(String name) throws JMSException {
242 return getMessengerManager().getMessenger( name );
243 }
244
245 }