001 /*
002 * Copyright 1999-2004 The Apache Software Foundation.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.apache.commons.messenger;
017
018 import javax.jms.Connection;
019 import javax.jms.ConnectionFactory;
020 import javax.jms.Destination;
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023 import javax.jms.MessageConsumer;
024 import javax.jms.MessageListener;
025 import javax.jms.MessageProducer;
026 import javax.jms.Queue;
027 import javax.jms.QueueSender;
028 import javax.jms.QueueSession;
029 import javax.jms.ServerSessionPool;
030 import javax.jms.Session;
031 import javax.jms.Topic;
032 import javax.jms.TopicPublisher;
033 import javax.jms.TopicSession;
034 import javax.naming.Context;
035
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /** <p><code>DefaultMessenger</code> is the default implementation of
040 * Messenger which uses a {@link ThreadLocal} variable
041 * to keep the JMS Session that should be used for a given calling thread.</p>
042 *
043 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
044 * @version $Revision: 155459 $
045 */
046 public class DefaultMessenger extends MessengerSupport {
047
048 private static final boolean SHARE_CONNECTION = true;
049
050 /** Logger */
051 private static final Log log = LogFactory.getLog(DefaultMessenger.class);
052
053 /** the MessengerSession for each thread */
054 private ThreadLocal messengerSessionPool = new ThreadLocal();
055
056 /** the SessionFactory used to create new JMS sessions */
057 private SessionFactory sessionFactory;
058
059 public DefaultMessenger() {
060 }
061
062 /** Returns the SessionFactory used to create new JMS sessions */
063 public SessionFactory getSessionFactory() throws JMSException {
064 if (sessionFactory == null) {
065 sessionFactory = createSessionFactory();
066 }
067 return sessionFactory;
068 }
069
070 /** Sets the SessionFactory used to create new JMS sessions */
071 public void setSessionFactory(SessionFactory sessionFactory) {
072 this.sessionFactory = sessionFactory;
073 }
074
075 public Connection getConnection() throws JMSException {
076 return getSessionFactory().getConnection();
077 }
078
079 public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
080 throws JMSException {
081 return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
082 }
083
084 public synchronized void close() throws JMSException {
085 /** note that only the current session is terminated */
086 MessengerSession session = getMessengerSession();
087
088 // clear all the pools...
089 messengerSessionPool = new ThreadLocal();
090
091 session.close();
092 getSessionFactory().close();
093 }
094
095 public Session getSession() throws JMSException {
096 return getMessengerSession().getSession();
097 }
098
099 public Session getAsyncSession() throws JMSException {
100 return getMessengerSession().getListenerSession();
101 }
102
103 public Message call(Destination destination, Message message) throws JMSException {
104 MessengerSession session = borrowMessengerSession();
105 try {
106 Destination replyTo = getReplyToDestination();
107 message.setJMSReplyTo(replyTo);
108
109 // NOTE - we could consider adding a correlation ID per request so that we can ignore
110 // any cruft or old messages that are sent onto our inbound queue.
111 //
112 // Though that does mean that we must then rely on the inbound message having
113 // the right correlationID. Though at least this strategy would mean
114 // that we could have a single consumer on a temporary queue for all threads
115 // and use correlation IDs to dispatch to the corrent thread
116 //
117 // Maybe this should be a configurable strategy
118
119 MessageProducer producer = session.getMessageProducer(destination);
120 MessageConsumer consumer = getReplyToConsumer();
121
122 if (session.isTopic()) {
123 ((TopicPublisher) producer).publish((Topic) destination, message);
124 }
125 else {
126 ((QueueSender) producer).send((Queue) destination, message);
127 }
128 Message response = consumer.receive();
129 if (response == null) {
130 // we could have timed out so lets trash the temporary destination
131 // so that the next call() method will use a new destination to avoid
132 // the response for this call() coming back on later call() invokcations
133 clearReplyToDestination();
134 }
135 return response;
136 }
137 finally {
138 returnMessengerSession(session);
139 }
140 }
141
142 public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
143 MessengerSession session = borrowMessengerSession();
144 try {
145 Destination replyTo = getReplyToDestination();
146 message.setJMSReplyTo(replyTo);
147
148 // NOTE - we could consider adding a correlation ID per request so that we can ignore
149 // any cruft or old messages that are sent onto our inbound queue.
150 //
151 // Though that does mean that we must then rely on the inbound message having
152 // the right correlationID. Though at least this strategy would mean
153 // that we could have a single consumer on a temporary queue for all threads
154 // and use correlation IDs to dispatch to the corrent thread
155 //
156 // Maybe this should be a configurable strategy
157
158 MessageProducer producer = session.getMessageProducer(destination);
159
160 MessageConsumer consumer = getReplyToConsumer();
161 if (session.isTopic()) {
162 ((TopicPublisher) producer).publish((Topic) destination, message);
163 }
164 else {
165 ((QueueSender) producer).send((Queue) destination, message);
166 }
167 Message response = consumer.receive(timeoutMillis);
168 if (response == null) {
169 // we could have timed out so lets trash the temporary destination
170 // so that the next call() method will use a new destination to avoid
171 // the response for this call() coming back on later call() invokcations
172 clearReplyToDestination();
173 }
174 return response;
175 }
176 finally {
177 returnMessengerSession(session);
178 }
179 }
180
181 // Implementation methods
182 //-------------------------------------------------------------------------
183 protected boolean isTopic(Connection connection) throws JMSException {
184 return getSessionFactory().isTopic();
185 }
186
187 protected boolean isTopic(ConnectionFactory factory) throws JMSException {
188 return getSessionFactory().isTopic();
189 }
190
191 /**
192 * @return the MessageConsumer for this threads temporary destination
193 * which is cached for the duration of this process.
194 */
195 protected MessageConsumer getReplyToConsumer() throws JMSException {
196 MessengerSession messengerSession = getMessengerSession();
197 MessageConsumer consumer = messengerSession.getReplyToConsumer();
198 synchronized ( messengerSession ) {
199 if (consumer == null) {
200 consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
201 messengerSession.setReplyToConsumer(consumer);
202 }
203 }
204 return consumer;
205 }
206
207 /**
208 * Clears the temporary destination used to receive reply-to messages
209 * which will lazily force a new destination and consumer to be created next
210 * time a call() method is invoked.
211 */
212 protected void clearReplyToDestination() throws JMSException {
213 MessengerSession messengerSession = getMessengerSession();
214
215 messengerSession.setReplyToDestination(null);
216 MessageConsumer consumer = messengerSession.getReplyToConsumer();
217 if (consumer != null) {
218 messengerSession.setReplyToConsumer(null);
219
220 // ensure that everything is nullified first before we close
221 // just in case an exception occurs
222 consumer.close();
223 }
224 }
225
226 protected Destination getReplyToDestination() throws JMSException {
227 return getMessengerSession().getReplyToDestination();
228 }
229
230 protected MessengerSession getMessengerSession() throws JMSException {
231 return borrowMessengerSession();
232 }
233
234 protected MessengerSession borrowMessengerSession() throws JMSException {
235 MessengerSession answer = (MessengerSession) messengerSessionPool.get();
236 if (answer == null) {
237 answer = createMessengerSession();
238 messengerSessionPool.set(answer);
239 }
240 return answer;
241 }
242
243 protected void returnMessengerSession(MessengerSession session) {
244 }
245
246 /**
247 * Factory method to create a new MessengerSession
248 */
249 protected MessengerSession createMessengerSession() throws JMSException {
250 return new MessengerSession(this, getSessionFactory());
251 }
252
253 /** Factory method to create a SessionFactory.
254 * Derived classes could override this method to create the SessionFactory
255 * from a well known place
256 */
257 protected SessionFactory createSessionFactory() throws JMSException {
258 throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
259 }
260
261 public Queue getQueue(QueueSession session, String subject) throws JMSException {
262 // XXXX: might want to cache
263 Context ctx = null;
264 JNDISessionFactory factory = null;
265
266 Queue queue = null;
267 if (isJndiDestinations()) {
268 try {
269 factory = (JNDISessionFactory) getSessionFactory();
270 ctx = factory.getContext();
271 queue = (Queue) ctx.lookup(subject);
272 }
273 catch (Exception e) {
274 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
275 }
276 }
277 else {
278 // XXXX: might want to cache
279 queue = session.createQueue(subject);
280 }
281 return queue;
282 }
283
284 public Topic getTopic(TopicSession session, String subject) throws JMSException {
285 // XXXX: might want to cache
286 Context ctx = null;
287 JNDISessionFactory factory = null;
288
289 Topic topic = null;
290 if (isJndiDestinations()) {
291 try {
292 factory = (JNDISessionFactory) getSessionFactory();
293 ctx = factory.getContext();
294 topic = (Topic) ctx.lookup(subject);
295 }
296 catch (Exception e) {
297 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
298 }
299 }
300 else {
301 topic = session.createTopic(subject);
302 }
303 return topic;
304 }
305
306 }