View Javadoc

1   /*
2    * Copyright 1999-2004 The Apache Software Foundation.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.apache.commons.messenger;
17  
18  import javax.jms.Connection;
19  import javax.jms.ConnectionFactory;
20  import javax.jms.Destination;
21  import javax.jms.JMSException;
22  import javax.jms.Message;
23  import javax.jms.MessageConsumer;
24  import javax.jms.MessageListener;
25  import javax.jms.MessageProducer;
26  import javax.jms.Queue;
27  import javax.jms.QueueSender;
28  import javax.jms.QueueSession;
29  import javax.jms.ServerSessionPool;
30  import javax.jms.Session;
31  import javax.jms.Topic;
32  import javax.jms.TopicPublisher;
33  import javax.jms.TopicSession;
34  import javax.naming.Context;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  
39  /** <p><code>DefaultMessenger</code> is the default implementation of
40    * Messenger which uses a {@link ThreadLocal} variable
41    * to keep the JMS Session that should be used for a given calling thread.</p>
42    *
43    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
44    * @version $Revision: 155459 $
45    */
46  public class DefaultMessenger extends MessengerSupport {
47  
48      private static final boolean SHARE_CONNECTION = true;
49  
50      /** Logger */
51      private static final Log log = LogFactory.getLog(DefaultMessenger.class);
52  
53      /** the MessengerSession for each thread */
54      private ThreadLocal messengerSessionPool = new ThreadLocal();
55  
56      /** the SessionFactory used to create new JMS sessions */
57      private SessionFactory sessionFactory;
58  
59      public DefaultMessenger() {
60      }
61  
62      /** Returns the SessionFactory used to create new JMS sessions */
63      public SessionFactory getSessionFactory() throws JMSException {
64          if (sessionFactory == null) {
65              sessionFactory = createSessionFactory();
66          }
67          return sessionFactory;
68      }
69  
70      /** Sets the SessionFactory used to create new JMS sessions */
71      public void setSessionFactory(SessionFactory sessionFactory) {
72          this.sessionFactory = sessionFactory;
73      }
74  
75      public Connection getConnection() throws JMSException {
76          return getSessionFactory().getConnection();
77      }
78  
79      public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
80          throws JMSException {
81          return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
82      }
83  
84      public synchronized void close() throws JMSException {
85          /** note that only the current session is terminated */
86          MessengerSession session = getMessengerSession();
87  
88          // clear all the pools...
89          messengerSessionPool = new ThreadLocal();
90  
91          session.close();
92          getSessionFactory().close();
93      }
94  
95      public Session getSession() throws JMSException {
96          return getMessengerSession().getSession();
97      }
98  
99      public Session getAsyncSession() throws JMSException {
100         return getMessengerSession().getListenerSession();
101     }
102 
103     public Message call(Destination destination, Message message) throws JMSException {
104         MessengerSession session = borrowMessengerSession();
105         try {
106             Destination replyTo = getReplyToDestination();
107             message.setJMSReplyTo(replyTo);
108 
109             // NOTE - we could consider adding a correlation ID per request so that we can ignore
110             // any cruft or old messages that are sent onto our inbound queue.
111             //
112             // Though that does mean that we must then rely on the inbound message having
113             // the right correlationID. Though at least this strategy would mean
114             // that we could have a single consumer on a temporary queue for all threads
115             // and use correlation IDs to dispatch to the corrent thread
116             //
117             // Maybe this should be a configurable strategy
118 
119             MessageProducer producer = session.getMessageProducer(destination);
120             MessageConsumer consumer = getReplyToConsumer();
121 
122             if (session.isTopic()) {
123                 ((TopicPublisher) producer).publish((Topic) destination, message);
124             }
125             else {
126                 ((QueueSender) producer).send((Queue) destination, message);
127             }
128             Message response = consumer.receive();
129             if (response == null) {
130                 // we could have timed out so lets trash the temporary destination
131                 // so that the next call() method will use a new destination to avoid
132                 // the response for this call() coming back on later call() invokcations
133                 clearReplyToDestination();
134             }
135             return response;
136         }
137         finally {
138             returnMessengerSession(session);
139         }
140     }
141 
142     public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
143         MessengerSession session = borrowMessengerSession();
144         try {
145             Destination replyTo = getReplyToDestination();
146             message.setJMSReplyTo(replyTo);
147 
148             // NOTE - we could consider adding a correlation ID per request so that we can ignore
149             // any cruft or old messages that are sent onto our inbound queue.
150             //
151             // Though that does mean that we must then rely on the inbound message having
152             // the right correlationID. Though at least this strategy would mean
153             // that we could have a single consumer on a temporary queue for all threads
154             // and use correlation IDs to dispatch to the corrent thread
155             //
156             // Maybe this should be a configurable strategy
157 
158             MessageProducer producer = session.getMessageProducer(destination);
159 
160             MessageConsumer consumer = getReplyToConsumer();
161             if (session.isTopic()) {
162                 ((TopicPublisher) producer).publish((Topic) destination, message);
163             }
164             else {
165                 ((QueueSender) producer).send((Queue) destination, message);
166             }
167             Message response = consumer.receive(timeoutMillis);
168             if (response == null) {
169                 // we could have timed out so lets trash the temporary destination
170                 // so that the next call() method will use a new destination to avoid
171                 // the response for this call() coming back on later call() invokcations
172                 clearReplyToDestination();
173             }
174             return response;
175         }
176         finally {
177             returnMessengerSession(session);
178         }
179     }
180 
181     // Implementation methods
182     //-------------------------------------------------------------------------
183     protected boolean isTopic(Connection connection) throws JMSException {
184         return getSessionFactory().isTopic();
185     }
186 
187     protected boolean isTopic(ConnectionFactory factory) throws JMSException {
188         return getSessionFactory().isTopic();
189     }
190 
191     /**
192      * @return the MessageConsumer for this threads temporary destination
193      * which is cached for the duration of this process.
194      */
195     protected MessageConsumer getReplyToConsumer() throws JMSException {
196         MessengerSession messengerSession = getMessengerSession();
197         MessageConsumer consumer = messengerSession.getReplyToConsumer();
198         synchronized ( messengerSession ) {
199             if (consumer == null) {
200                 consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
201                 messengerSession.setReplyToConsumer(consumer);
202             }
203         }
204         return consumer;
205     }
206 
207     /**
208      * Clears the temporary destination used to receive reply-to messages
209      * which will lazily force a new destination and consumer to be created next
210      * time a call() method is invoked.
211      */
212     protected void clearReplyToDestination() throws JMSException {
213         MessengerSession messengerSession = getMessengerSession();
214 
215         messengerSession.setReplyToDestination(null);
216         MessageConsumer consumer = messengerSession.getReplyToConsumer();
217         if (consumer != null) {
218             messengerSession.setReplyToConsumer(null);
219 
220             // ensure that everything is nullified first before we close
221             // just in case an exception occurs
222             consumer.close();
223         }
224     }
225 
226     protected Destination getReplyToDestination() throws JMSException {
227         return getMessengerSession().getReplyToDestination();
228     }
229 
230     protected MessengerSession getMessengerSession() throws JMSException {
231         return borrowMessengerSession();
232     }
233 
234     protected MessengerSession borrowMessengerSession() throws JMSException {
235         MessengerSession answer = (MessengerSession) messengerSessionPool.get();
236         if (answer == null) {
237             answer = createMessengerSession();
238             messengerSessionPool.set(answer);
239         }
240         return answer;
241     }
242 
243     protected void returnMessengerSession(MessengerSession session) {
244     }
245 
246     /**
247      * Factory method to create a new MessengerSession
248      */
249     protected MessengerSession createMessengerSession() throws JMSException {
250         return new MessengerSession(this, getSessionFactory());
251     }
252 
253     /** Factory method to create a SessionFactory.
254       * Derived classes could override this method to create the SessionFactory
255       * from a well known place
256       */
257     protected SessionFactory createSessionFactory() throws JMSException {
258         throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
259     }
260 
261     public Queue getQueue(QueueSession session, String subject) throws JMSException {
262         // XXXX: might want to cache
263         Context ctx = null;
264         JNDISessionFactory factory = null;
265         
266         Queue queue = null;
267         if (isJndiDestinations()) {
268             try {
269                 factory = (JNDISessionFactory) getSessionFactory();
270                 ctx = factory.getContext();
271                 queue = (Queue) ctx.lookup(subject);
272             }
273             catch (Exception e) {
274                 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
275             }
276         }
277         else {
278             // XXXX: might want to cache
279             queue = session.createQueue(subject);
280         }
281         return queue;
282     }
283 
284     public Topic getTopic(TopicSession session, String subject) throws JMSException {
285         // XXXX: might want to cache
286         Context ctx = null;
287         JNDISessionFactory factory = null;
288         
289         Topic topic = null;
290         if (isJndiDestinations()) {
291             try {
292                 factory = (JNDISessionFactory) getSessionFactory();
293                 ctx = factory.getContext();
294                 topic = (Topic) ctx.lookup(subject);
295             }
296             catch (Exception e) {
297                 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
298             }
299         }
300         else {
301             topic = session.createTopic(subject);
302         }
303         return topic;
304     }
305 
306 }