001    /*
002     * Copyright 1999-2004 The Apache Software Foundation.
003     * 
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     * 
008     *      http://www.apache.org/licenses/LICENSE-2.0
009     * 
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.apache.commons.messenger;
017    
018    import javax.jms.Connection;
019    import javax.jms.ConnectionFactory;
020    import javax.jms.Destination;
021    import javax.jms.JMSException;
022    import javax.jms.Message;
023    import javax.jms.MessageConsumer;
024    import javax.jms.MessageListener;
025    import javax.jms.MessageProducer;
026    import javax.jms.Queue;
027    import javax.jms.QueueSender;
028    import javax.jms.QueueSession;
029    import javax.jms.ServerSessionPool;
030    import javax.jms.Session;
031    import javax.jms.Topic;
032    import javax.jms.TopicPublisher;
033    import javax.jms.TopicSession;
034    import javax.naming.Context;
035    
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /** <p><code>DefaultMessenger</code> is the default implementation of
040      * Messenger which uses a {@link ThreadLocal} variable
041      * to keep the JMS Session that should be used for a given calling thread.</p>
042      *
043      * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
044      * @version $Revision: 155459 $
045      */
046    public class DefaultMessenger extends MessengerSupport {
047    
048        private static final boolean SHARE_CONNECTION = true;
049    
050        /** Logger */
051        private static final Log log = LogFactory.getLog(DefaultMessenger.class);
052    
053        /** the MessengerSession for each thread */
054        private ThreadLocal messengerSessionPool = new ThreadLocal();
055    
056        /** the SessionFactory used to create new JMS sessions */
057        private SessionFactory sessionFactory;
058    
059        public DefaultMessenger() {
060        }
061    
062        /** Returns the SessionFactory used to create new JMS sessions */
063        public SessionFactory getSessionFactory() throws JMSException {
064            if (sessionFactory == null) {
065                sessionFactory = createSessionFactory();
066            }
067            return sessionFactory;
068        }
069    
070        /** Sets the SessionFactory used to create new JMS sessions */
071        public void setSessionFactory(SessionFactory sessionFactory) {
072            this.sessionFactory = sessionFactory;
073        }
074    
075        public Connection getConnection() throws JMSException {
076            return getSessionFactory().getConnection();
077        }
078    
079        public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
080            throws JMSException {
081            return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
082        }
083    
084        public synchronized void close() throws JMSException {
085            /** note that only the current session is terminated */
086            MessengerSession session = getMessengerSession();
087    
088            // clear all the pools...
089            messengerSessionPool = new ThreadLocal();
090    
091            session.close();
092            getSessionFactory().close();
093        }
094    
095        public Session getSession() throws JMSException {
096            return getMessengerSession().getSession();
097        }
098    
099        public Session getAsyncSession() throws JMSException {
100            return getMessengerSession().getListenerSession();
101        }
102    
103        public Message call(Destination destination, Message message) throws JMSException {
104            MessengerSession session = borrowMessengerSession();
105            try {
106                Destination replyTo = getReplyToDestination();
107                message.setJMSReplyTo(replyTo);
108    
109                // NOTE - we could consider adding a correlation ID per request so that we can ignore
110                // any cruft or old messages that are sent onto our inbound queue.
111                //
112                // Though that does mean that we must then rely on the inbound message having
113                // the right correlationID. Though at least this strategy would mean
114                // that we could have a single consumer on a temporary queue for all threads
115                // and use correlation IDs to dispatch to the corrent thread
116                //
117                // Maybe this should be a configurable strategy
118    
119                MessageProducer producer = session.getMessageProducer(destination);
120                MessageConsumer consumer = getReplyToConsumer();
121    
122                if (session.isTopic()) {
123                    ((TopicPublisher) producer).publish((Topic) destination, message);
124                }
125                else {
126                    ((QueueSender) producer).send((Queue) destination, message);
127                }
128                Message response = consumer.receive();
129                if (response == null) {
130                    // we could have timed out so lets trash the temporary destination
131                    // so that the next call() method will use a new destination to avoid
132                    // the response for this call() coming back on later call() invokcations
133                    clearReplyToDestination();
134                }
135                return response;
136            }
137            finally {
138                returnMessengerSession(session);
139            }
140        }
141    
142        public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
143            MessengerSession session = borrowMessengerSession();
144            try {
145                Destination replyTo = getReplyToDestination();
146                message.setJMSReplyTo(replyTo);
147    
148                // NOTE - we could consider adding a correlation ID per request so that we can ignore
149                // any cruft or old messages that are sent onto our inbound queue.
150                //
151                // Though that does mean that we must then rely on the inbound message having
152                // the right correlationID. Though at least this strategy would mean
153                // that we could have a single consumer on a temporary queue for all threads
154                // and use correlation IDs to dispatch to the corrent thread
155                //
156                // Maybe this should be a configurable strategy
157    
158                MessageProducer producer = session.getMessageProducer(destination);
159    
160                MessageConsumer consumer = getReplyToConsumer();
161                if (session.isTopic()) {
162                    ((TopicPublisher) producer).publish((Topic) destination, message);
163                }
164                else {
165                    ((QueueSender) producer).send((Queue) destination, message);
166                }
167                Message response = consumer.receive(timeoutMillis);
168                if (response == null) {
169                    // we could have timed out so lets trash the temporary destination
170                    // so that the next call() method will use a new destination to avoid
171                    // the response for this call() coming back on later call() invokcations
172                    clearReplyToDestination();
173                }
174                return response;
175            }
176            finally {
177                returnMessengerSession(session);
178            }
179        }
180    
181        // Implementation methods
182        //-------------------------------------------------------------------------
183        protected boolean isTopic(Connection connection) throws JMSException {
184            return getSessionFactory().isTopic();
185        }
186    
187        protected boolean isTopic(ConnectionFactory factory) throws JMSException {
188            return getSessionFactory().isTopic();
189        }
190    
191        /**
192         * @return the MessageConsumer for this threads temporary destination
193         * which is cached for the duration of this process.
194         */
195        protected MessageConsumer getReplyToConsumer() throws JMSException {
196            MessengerSession messengerSession = getMessengerSession();
197            MessageConsumer consumer = messengerSession.getReplyToConsumer();
198            synchronized ( messengerSession ) {
199                if (consumer == null) {
200                    consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination());
201                    messengerSession.setReplyToConsumer(consumer);
202                }
203            }
204            return consumer;
205        }
206    
207        /**
208         * Clears the temporary destination used to receive reply-to messages
209         * which will lazily force a new destination and consumer to be created next
210         * time a call() method is invoked.
211         */
212        protected void clearReplyToDestination() throws JMSException {
213            MessengerSession messengerSession = getMessengerSession();
214    
215            messengerSession.setReplyToDestination(null);
216            MessageConsumer consumer = messengerSession.getReplyToConsumer();
217            if (consumer != null) {
218                messengerSession.setReplyToConsumer(null);
219    
220                // ensure that everything is nullified first before we close
221                // just in case an exception occurs
222                consumer.close();
223            }
224        }
225    
226        protected Destination getReplyToDestination() throws JMSException {
227            return getMessengerSession().getReplyToDestination();
228        }
229    
230        protected MessengerSession getMessengerSession() throws JMSException {
231            return borrowMessengerSession();
232        }
233    
234        protected MessengerSession borrowMessengerSession() throws JMSException {
235            MessengerSession answer = (MessengerSession) messengerSessionPool.get();
236            if (answer == null) {
237                answer = createMessengerSession();
238                messengerSessionPool.set(answer);
239            }
240            return answer;
241        }
242    
243        protected void returnMessengerSession(MessengerSession session) {
244        }
245    
246        /**
247         * Factory method to create a new MessengerSession
248         */
249        protected MessengerSession createMessengerSession() throws JMSException {
250            return new MessengerSession(this, getSessionFactory());
251        }
252    
253        /** Factory method to create a SessionFactory.
254          * Derived classes could override this method to create the SessionFactory
255          * from a well known place
256          */
257        protected SessionFactory createSessionFactory() throws JMSException {
258            throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
259        }
260    
261        public Queue getQueue(QueueSession session, String subject) throws JMSException {
262            // XXXX: might want to cache
263            Context ctx = null;
264            JNDISessionFactory factory = null;
265            
266            Queue queue = null;
267            if (isJndiDestinations()) {
268                try {
269                    factory = (JNDISessionFactory) getSessionFactory();
270                    ctx = factory.getContext();
271                    queue = (Queue) ctx.lookup(subject);
272                }
273                catch (Exception e) {
274                    log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
275                }
276            }
277            else {
278                // XXXX: might want to cache
279                queue = session.createQueue(subject);
280            }
281            return queue;
282        }
283    
284        public Topic getTopic(TopicSession session, String subject) throws JMSException {
285            // XXXX: might want to cache
286            Context ctx = null;
287            JNDISessionFactory factory = null;
288            
289            Topic topic = null;
290            if (isJndiDestinations()) {
291                try {
292                    factory = (JNDISessionFactory) getSessionFactory();
293                    ctx = factory.getContext();
294                    topic = (Topic) ctx.lookup(subject);
295                }
296                catch (Exception e) {
297                    log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e);
298                }
299            }
300            else {
301                topic = session.createTopic(subject);
302            }
303            return topic;
304        }
305    
306    }