001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: SubscriptionManager.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messagelet; 011 012 import java.util.Iterator; 013 014 import javax.jms.Destination; 015 import javax.jms.JMSException; 016 import javax.jms.MessageListener; 017 import javax.servlet.ServletContext; 018 import javax.servlet.ServletException; 019 020 import org.apache.commons.logging.Log; 021 import org.apache.commons.logging.LogFactory; 022 import org.apache.commons.messagelet.model.Subscription; 023 import org.apache.commons.messagelet.model.SubscriptionList; 024 import org.apache.commons.messenger.Messenger; 025 import org.apache.commons.messenger.MessengerManager; 026 import org.apache.commons.messenger.tool.StopWatchMessageListener; 027 028 /** 029 * <p><code>SubscriptionManager</code> is a simple command line program that will 030 * create a number of subscriptions and consume messages using just regular 031 * MDO and MessageListener classes. 032 * 033 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 034 * @version $Revision: 155459 $ 035 */ 036 public class SubscriptionManager { 037 038 /** Logger */ 039 private static final Log log = LogFactory.getLog(SubscriptionManager.class); 040 041 /** The JMS connections */ 042 private MessengerManager manager; 043 044 /** The JMS Subscriptions */ 045 private SubscriptionList subscriptionList; 046 047 /** The context passed into MDOs */ 048 private ServletContext servletContext; 049 050 /** Should we use a stopwatch to output performance metrics */ 051 private boolean useStopWatch = false; 052 053 054 public SubscriptionManager() { 055 } 056 057 058 protected void subscribe() throws JMSException, ServletException { 059 for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) { 060 Subscription subscription = (Subscription) iter.next(); 061 subscribe( subscription ); 062 } 063 } 064 065 public void subscribe( Subscription subscription ) throws JMSException, ServletException{ 066 String name = subscription.getConnection(); 067 Messenger messenger = getMessenger( name ); 068 if ( messenger == null ) { 069 throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription ); 070 } 071 String subject = subscription.getSubject(); 072 if ( subject == null || subject.length() == 0 ) { 073 throw new JMSException( "No destination defined for subscription: " + subscription ); 074 } 075 076 Destination destination = messenger.getDestination( subject ); 077 if ( destination == null ) { 078 throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription ); 079 } 080 081 MessageListener listener = subscription.getMessageListener(); 082 if ( listener == null ) { 083 throw new JMSException( "No MessageListener is defined for subscription: " + subscription ); 084 } 085 086 // if its an MDO the initialise it! 087 if ( listener instanceof MessageDrivenObject ) { 088 MessageDrivenObject mdo = (MessageDrivenObject) listener; 089 if ( mdo instanceof MessengerMDO ) { 090 MessengerMDO messengerMDO = (MessengerMDO) mdo; 091 messengerMDO.setMessenger( messenger ); 092 messengerMDO.setMessengerManager( getMessengerManager() ); 093 } 094 mdo.init( getServletContext() ); 095 } 096 097 listener = wrapInStopWatch( listener ); 098 099 String selector = subscription.getSelector(); 100 101 ConsumerThread thread = subscription.getConsumerThread(); 102 if (thread != null) { 103 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread ); 104 105 thread.setMessenger(messenger); 106 thread.setDestination(destination); 107 thread.setSelector(selector); 108 thread.setListener(listener); 109 thread.start(); 110 } 111 else { 112 if ( selector != null && selector.length() > 0 ) { 113 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector ); 114 115 messenger.addListener( destination, selector, listener ); 116 } 117 else { 118 log.info( "Subscribing to messenger: " + name + " destination: " + subject ); 119 120 messenger.addListener( destination, listener ); 121 } 122 123 log.info( "Subscribed with listener: " + listener ); 124 } 125 } 126 127 128 129 public void unsubscribe() throws JMSException, ServletException { 130 SubscriptionList list = getSubscriptionList(); 131 if ( list != null ) { 132 for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) { 133 Subscription subscription = (Subscription) iter.next(); 134 unsubscribe( subscription ); 135 } 136 } 137 } 138 139 public void unsubscribe( Subscription subscription ) throws JMSException, ServletException { 140 // lets unsubscribe first 141 String name = subscription.getConnection(); 142 Messenger messenger = getMessenger( name ); 143 144 MessageListener listener = subscription.getMessageListener(); 145 if ( messenger != null && listener != null ) { 146 Destination destination = null; 147 String subject = subscription.getSubject(); 148 if ( subject == null || subject.length() == 0 ) { 149 log.error( "No destination defined for subscription: " + subscription ); 150 } 151 else { 152 try { 153 destination = messenger.getDestination( subject ); 154 if ( destination == null ) { 155 log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription ); 156 } 157 } 158 catch (JMSException e) { 159 log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e ); 160 } 161 } 162 if ( destination != null ) { 163 try { 164 String selector = subscription.getSelector(); 165 if ( selector != null && selector.length() > 0 ) { 166 messenger.removeListener( destination, selector, listener ); 167 } 168 else { 169 messenger.removeListener( destination, listener ); 170 } 171 } 172 catch (JMSException e) { 173 log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e ); 174 } 175 } 176 } 177 178 // now lets destrory the MBO 179 if ( listener instanceof MessageDrivenObject ) { 180 MessageDrivenObject mdo = (MessageDrivenObject) listener; 181 mdo.destroy(); 182 } 183 } 184 185 186 // Properties 187 //------------------------------------------------------------------------- 188 189 public MessengerManager getMessengerManager() throws JMSException { 190 return manager; 191 } 192 193 public void setMessengerManager(MessengerManager manager) { 194 this.manager = manager; 195 } 196 197 /** 198 * Returns the subscriptionList. 199 * @return SubscriptionList 200 */ 201 public SubscriptionList getSubscriptionList() { 202 return subscriptionList; 203 } 204 205 /** 206 * Sets the subscriptionList. 207 * @param subscriptionList The subscriptionList to set 208 */ 209 public void setSubscriptionList(SubscriptionList subscriptionList) { 210 this.subscriptionList = subscriptionList; 211 } 212 213 /** 214 * Returns the servletContext. 215 * @return ServletContext 216 */ 217 public ServletContext getServletContext() { 218 return servletContext; 219 } 220 221 /** 222 * Sets the servletContext. 223 * @param servletContext The servletContext to set 224 */ 225 public void setServletContext(ServletContext servletContext) { 226 this.servletContext = servletContext; 227 } 228 229 // Implementation methods 230 //------------------------------------------------------------------------- 231 /** 232 * Allows the MessageListener to be wrapped inside a stop watch message listener if required 233 */ 234 protected MessageListener wrapInStopWatch( MessageListener listener ) { 235 if ( useStopWatch ) { 236 return new StopWatchMessageListener( listener ); 237 } 238 return listener; 239 } 240 241 protected Messenger getMessenger(String name) throws JMSException { 242 return getMessengerManager().getMessenger( name ); 243 } 244 245 }