1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.apache.commons.messenger;
17
18 import javax.jms.Connection;
19 import javax.jms.ConnectionFactory;
20 import javax.jms.Destination;
21 import javax.jms.JMSException;
22 import javax.jms.Message;
23 import javax.jms.MessageConsumer;
24 import javax.jms.MessageListener;
25 import javax.jms.MessageProducer;
26 import javax.jms.Queue;
27 import javax.jms.QueueSender;
28 import javax.jms.QueueSession;
29 import javax.jms.ServerSessionPool;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32 import javax.jms.TopicPublisher;
33 import javax.jms.TopicSession;
34 import javax.naming.Context;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39
40
41
42
43
44
45
46 public class DefaultMessenger extends MessengerSupport {
47
48 private static final boolean SHARE_CONNECTION = true;
49
50
51 private static final Log log = LogFactory.getLog(DefaultMessenger.class);
52
53
54 private ThreadLocal messengerSessionPool = new ThreadLocal();
55
56
57 private SessionFactory sessionFactory;
58
59 public DefaultMessenger() {
60 }
61
62
63 public SessionFactory getSessionFactory() throws JMSException {
64 if (sessionFactory == null) {
65 sessionFactory = createSessionFactory();
66 }
67 return sessionFactory;
68 }
69
70
71 public void setSessionFactory(SessionFactory sessionFactory) {
72 this.sessionFactory = sessionFactory;
73 }
74
75 public Connection getConnection() throws JMSException {
76 return getSessionFactory().getConnection();
77 }
78
79 public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
80 throws JMSException {
81 return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
82 }
83
84 public synchronized void close() throws JMSException {
85
86 MessengerSession session = getMessengerSession();
87
88
89 messengerSessionPool = new ThreadLocal();
90
91 session.close();
92 getSessionFactory().close();
93 }
94
95 public Session getSession() throws JMSException {
96 return getMessengerSession().getSession();
97 }
98
99 public Session getAsyncSession() throws JMSException {
100 return getMessengerSession().getListenerSession();
101 }
102
103 public Message call(Destination destination, Message message) throws JMSException {
104 MessengerSession session = borrowMessengerSession();
105 try {
106 Destination replyTo = getReplyToDestination();
107 message.setJMSReplyTo(replyTo);
108
109
110
111
112
113
114
115
116
117
118
119 MessageProducer producer = session.getMessageProducer(destination);
120 MessageConsumer consumer = getReplyToConsumer();
121
122 if (session.isTopic()) {
123 ((TopicPublisher) producer).publish((Topic) destination, message);
124 }
125 else {
126 ((QueueSender) producer).send((Queue) destination, message);
127 }
128 Message response = consumer.receive();
129 if (response == null) {
130
131
132
133 clearReplyToDestination();
134 }
135 return response;
136 }
137 finally {
138 returnMessengerSession(session);
139 }
140 }
141
142 public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
143 MessengerSession session = borrowMessengerSession();
144 try {
145 Destination replyTo = getReplyToDestination();
146 message.setJMSReplyTo(replyTo);
147
148
149
150
151
152
153
154
155
156
157
158 MessageProducer producer = session.getMessageProducer(destination);
159
160 MessageConsumer consumer = getReplyToConsumer();
161 if (session.isTopic()) {
162 ((TopicPublisher) producer).publish((Topic) destination, message);
163 }
164 else {
165 ((QueueSender) producer).send((Queue) destination, message);
166 }
167 Message response = consumer.receive(timeoutMillis);
168 if (response == null) {
169
170
171
172 clearReplyToDestination();
173 }
174 return response;
175 }
176 finally {
177 returnMessengerSession(session);
178 }
179 }
180
181
182
183 protected boolean isTopic(Connection connection) throws JMSException {
184 return getSessionFactory().isTopic();
185 }
186
187 protected boolean isTopic(ConnectionFactory factory) throws JMSException {
188 return getSessionFactory().isTopic();
189 }
190
191
192
193
194
195 protected MessageConsumer getReplyToConsumer() throws JMSException {
196 MessengerSession messengerSession = getMessengerSession();
197 MessageConsumer consumer = messengerSession.getReplyToConsumer();
198 synchronized ( messengerSession ) {
199 if (consumer == null) {
200 consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
201 messengerSession.setReplyToConsumer(consumer);
202 }
203 }
204 return consumer;
205 }
206
207
208
209
210
211
212 protected void clearReplyToDestination() throws JMSException {
213 MessengerSession messengerSession = getMessengerSession();
214
215 messengerSession.setReplyToDestination(null);
216 MessageConsumer consumer = messengerSession.getReplyToConsumer();
217 if (consumer != null) {
218 messengerSession.setReplyToConsumer(null);
219
220
221
222 consumer.close();
223 }
224 }
225
226 protected Destination getReplyToDestination() throws JMSException {
227 return getMessengerSession().getReplyToDestination();
228 }
229
230 protected MessengerSession getMessengerSession() throws JMSException {
231 return borrowMessengerSession();
232 }
233
234 protected MessengerSession borrowMessengerSession() throws JMSException {
235 MessengerSession answer = (MessengerSession) messengerSessionPool.get();
236 if (answer == null) {
237 answer = createMessengerSession();
238 messengerSessionPool.set(answer);
239 }
240 return answer;
241 }
242
243 protected void returnMessengerSession(MessengerSession session) {
244 }
245
246
247
248
249 protected MessengerSession createMessengerSession() throws JMSException {
250 return new MessengerSession(this, getSessionFactory());
251 }
252
253
254
255
256
257 protected SessionFactory createSessionFactory() throws JMSException {
258 throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
259 }
260
261 public Queue getQueue(QueueSession session, String subject) throws JMSException {
262
263 Context ctx = null;
264 JNDISessionFactory factory = null;
265
266 Queue queue = null;
267 if (isJndiDestinations()) {
268 try {
269 factory = (JNDISessionFactory) getSessionFactory();
270 ctx = factory.getContext();
271 queue = (Queue) ctx.lookup(subject);
272 }
273 catch (Exception e) {
274 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
275 }
276 }
277 else {
278
279 queue = session.createQueue(subject);
280 }
281 return queue;
282 }
283
284 public Topic getTopic(TopicSession session, String subject) throws JMSException {
285
286 Context ctx = null;
287 JNDISessionFactory factory = null;
288
289 Topic topic = null;
290 if (isJndiDestinations()) {
291 try {
292 factory = (JNDISessionFactory) getSessionFactory();
293 ctx = factory.getContext();
294 topic = (Topic) ctx.lookup(subject);
295 }
296 catch (Exception e) {
297 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
298 }
299 }
300 else {
301 topic = session.createTopic(subject);
302 }
303 return topic;
304 }
305
306 }