1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messagelet;
11
12 import java.util.Iterator;
13
14 import javax.jms.Destination;
15 import javax.jms.JMSException;
16 import javax.jms.MessageListener;
17 import javax.servlet.ServletContext;
18 import javax.servlet.ServletException;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.commons.messagelet.model.Subscription;
23 import org.apache.commons.messagelet.model.SubscriptionList;
24 import org.apache.commons.messenger.Messenger;
25 import org.apache.commons.messenger.MessengerManager;
26 import org.apache.commons.messenger.tool.StopWatchMessageListener;
27
28
29
30
31
32
33
34
35
36 public class SubscriptionManager {
37
38
39 private static final Log log = LogFactory.getLog(SubscriptionManager.class);
40
41
42 private MessengerManager manager;
43
44
45 private SubscriptionList subscriptionList;
46
47
48 private ServletContext servletContext;
49
50
51 private boolean useStopWatch = false;
52
53
54 public SubscriptionManager() {
55 }
56
57
58 protected void subscribe() throws JMSException, ServletException {
59 for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) {
60 Subscription subscription = (Subscription) iter.next();
61 subscribe( subscription );
62 }
63 }
64
65 public void subscribe( Subscription subscription ) throws JMSException, ServletException{
66 String name = subscription.getConnection();
67 Messenger messenger = getMessenger( name );
68 if ( messenger == null ) {
69 throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription );
70 }
71 String subject = subscription.getSubject();
72 if ( subject == null || subject.length() == 0 ) {
73 throw new JMSException( "No destination defined for subscription: " + subscription );
74 }
75
76 Destination destination = messenger.getDestination( subject );
77 if ( destination == null ) {
78 throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription );
79 }
80
81 MessageListener listener = subscription.getMessageListener();
82 if ( listener == null ) {
83 throw new JMSException( "No MessageListener is defined for subscription: " + subscription );
84 }
85
86
87 if ( listener instanceof MessageDrivenObject ) {
88 MessageDrivenObject mdo = (MessageDrivenObject) listener;
89 if ( mdo instanceof MessengerMDO ) {
90 MessengerMDO messengerMDO = (MessengerMDO) mdo;
91 messengerMDO.setMessenger( messenger );
92 messengerMDO.setMessengerManager( getMessengerManager() );
93 }
94 mdo.init( getServletContext() );
95 }
96
97 listener = wrapInStopWatch( listener );
98
99 String selector = subscription.getSelector();
100
101 ConsumerThread thread = subscription.getConsumerThread();
102 if (thread != null) {
103 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread );
104
105 thread.setMessenger(messenger);
106 thread.setDestination(destination);
107 thread.setSelector(selector);
108 thread.setListener(listener);
109 thread.start();
110 }
111 else {
112 if ( selector != null && selector.length() > 0 ) {
113 log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector );
114
115 messenger.addListener( destination, selector, listener );
116 }
117 else {
118 log.info( "Subscribing to messenger: " + name + " destination: " + subject );
119
120 messenger.addListener( destination, listener );
121 }
122
123 log.info( "Subscribed with listener: " + listener );
124 }
125 }
126
127
128
129 public void unsubscribe() throws JMSException, ServletException {
130 SubscriptionList list = getSubscriptionList();
131 if ( list != null ) {
132 for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
133 Subscription subscription = (Subscription) iter.next();
134 unsubscribe( subscription );
135 }
136 }
137 }
138
139 public void unsubscribe( Subscription subscription ) throws JMSException, ServletException {
140
141 String name = subscription.getConnection();
142 Messenger messenger = getMessenger( name );
143
144 MessageListener listener = subscription.getMessageListener();
145 if ( messenger != null && listener != null ) {
146 Destination destination = null;
147 String subject = subscription.getSubject();
148 if ( subject == null || subject.length() == 0 ) {
149 log.error( "No destination defined for subscription: " + subscription );
150 }
151 else {
152 try {
153 destination = messenger.getDestination( subject );
154 if ( destination == null ) {
155 log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription );
156 }
157 }
158 catch (JMSException e) {
159 log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e );
160 }
161 }
162 if ( destination != null ) {
163 try {
164 String selector = subscription.getSelector();
165 if ( selector != null && selector.length() > 0 ) {
166 messenger.removeListener( destination, selector, listener );
167 }
168 else {
169 messenger.removeListener( destination, listener );
170 }
171 }
172 catch (JMSException e) {
173 log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e );
174 }
175 }
176 }
177
178
179 if ( listener instanceof MessageDrivenObject ) {
180 MessageDrivenObject mdo = (MessageDrivenObject) listener;
181 mdo.destroy();
182 }
183 }
184
185
186
187
188
189 public MessengerManager getMessengerManager() throws JMSException {
190 return manager;
191 }
192
193 public void setMessengerManager(MessengerManager manager) {
194 this.manager = manager;
195 }
196
197
198
199
200
201 public SubscriptionList getSubscriptionList() {
202 return subscriptionList;
203 }
204
205
206
207
208
209 public void setSubscriptionList(SubscriptionList subscriptionList) {
210 this.subscriptionList = subscriptionList;
211 }
212
213
214
215
216
217 public ServletContext getServletContext() {
218 return servletContext;
219 }
220
221
222
223
224
225 public void setServletContext(ServletContext servletContext) {
226 this.servletContext = servletContext;
227 }
228
229
230
231
232
233
234 protected MessageListener wrapInStopWatch( MessageListener listener ) {
235 if ( useStopWatch ) {
236 return new StopWatchMessageListener( listener );
237 }
238 return listener;
239 }
240
241 protected Messenger getMessenger(String name) throws JMSException {
242 return getMessengerManager().getMessenger( name );
243 }
244
245 }