001    /*
002     * Copyright 1999-2004 The Apache Software Foundation.
003     * 
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     * 
008     *      http://www.apache.org/licenses/LICENSE-2.0
009     * 
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    package org.apache.commons.messenger;
017    
018    import java.util.LinkedList;
019    
020    import javax.jms.Connection;
021    import javax.jms.ConnectionFactory;
022    import javax.jms.Destination;
023    import javax.jms.JMSException;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.MessageProducer;
028    import javax.jms.Queue;
029    import javax.jms.QueueSender;
030    import javax.jms.ServerSessionPool;
031    import javax.jms.Session;
032    import javax.jms.Topic;
033    import javax.jms.TopicPublisher;
034    
035    import org.apache.commons.logging.Log;
036    import org.apache.commons.logging.LogFactory;
037    
038    /** <p><code>SimpleMessenger</code> is an implementation of
039      * Messenger which uses a single JMS Session for sending
040      * to keep the JMS Session that should be used for a given calling thread.</p>
041      *
042      * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
043      * @version $Revision: 155459 $
044      */
045    public class SimpleMessenger extends MessengerSupport {
046    
047        /** Logger */
048        private static final Log log = LogFactory.getLog(SimpleMessenger.class);
049    
050        /** the SessionFactory used to create new JMS sessions */
051        private SessionFactory sessionFactory;
052    
053        /** pool of MessengerSession instances */
054        private LinkedList pool = new LinkedList();
055    
056        /** thread local data for RPCs */    
057        private ThreadLocal threadLocalData = new ThreadLocal();
058    
059        private static int count;
060    
061        public SimpleMessenger() {
062        }
063    
064        /** Returns the SessionFactory used to create new JMS sessions */
065        public SessionFactory getSessionFactory() throws JMSException {
066            if (sessionFactory == null) {
067                sessionFactory = createSessionFactory();
068            }
069            return sessionFactory;
070        }
071    
072        /** Sets the SessionFactory used to create new JMS sessions */
073        public void setSessionFactory(SessionFactory sessionFactory) {
074            this.sessionFactory = sessionFactory;
075        }
076    
077        public Connection getConnection() throws JMSException {
078            return getSessionFactory().getConnection();
079        }
080    
081        public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
082            throws JMSException {
083            return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
084        }
085    
086        public synchronized void close() throws JMSException {
087            while (! pool.isEmpty()) {
088                MessengerSession session = (MessengerSession) pool.removeFirst();
089                session.close();
090            }
091    
092            getSessionFactory().close();
093        }
094    
095        public Session getSession() throws JMSException {
096             throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
097        }
098    
099        public Session getAsyncSession() throws JMSException {
100            throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
101        }
102    
103        public Message call(Destination destination, Message message) throws JMSException {
104            ThreadLocalData data = null;
105            MessengerSession messengerSession = borrowMessengerSession();
106            try {
107                data = getThreadLocalData(messengerSession.getSession());
108                Destination replyTo = data.destination;
109                message.setJMSReplyTo(replyTo);
110            }
111            finally {
112                returnMessengerSession(messengerSession);
113            }
114     
115            log.info("Sending message to destination: " + destination);
116    
117        
118            send(destination, message);
119     
120            messengerSession = borrowMessengerSession();
121            try {    
122                
123    //            MessageProducer producer = messengerSession.getMessageProducer(destination);
124    //            if (messengerSession.isTopic()) {
125    //                ((TopicPublisher) producer).publish((Topic) destination, message);
126    //            }
127    //            else {
128    //                ((QueueSender) producer).send((Queue) destination, message);
129    //            }
130    // 
131                log.info("Message sent - now waiting for a response...");
132           
133                MessageConsumer consumer = data.consumer;
134                Message response = consumer.receive();
135    //            Message response = null;
136                if (response == null) {
137                    // we could have timed out so lets trash the temporary destination
138                    // so that the next call() method will use a new destination to avoid
139                    // the response for this call() coming back on later call() invokcations
140                    data.clear();
141                }
142                return response;
143            }
144            finally {
145                returnMessengerSession(messengerSession);
146            }
147        }
148    
149        public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
150            MessengerSession messengerSession = borrowMessengerSession();
151            try {
152                ThreadLocalData data = getThreadLocalData(messengerSession.getSession());
153                Destination replyTo = data.destination;
154                message.setJMSReplyTo(replyTo);
155    
156                MessageProducer producer = messengerSession.getMessageProducer(destination);
157                
158                MessageConsumer consumer = data.consumer;
159    
160                if (messengerSession.isTopic()) {
161                    ((TopicPublisher) producer).publish((Topic) destination, message);
162                }
163                else {
164                    ((QueueSender) producer).send((Queue) destination, message);
165                }
166                Message response = consumer.receive(timeoutMillis);
167                if (response == null) {
168                    // we could have timed out so lets trash the temporary destination
169                    // so that the next call() method will use a new destination to avoid
170                    // the response for this call() coming back on later call() invokcations
171                    data.clear();
172                }
173                return response;
174            }
175            finally {
176                returnMessengerSession(messengerSession);
177            }
178         }
179    
180    
181        /**
182         * @return the local thread data
183         */
184        protected ThreadLocalData getThreadLocalData(Session session) throws JMSException {
185            ThreadLocalData data = (ThreadLocalData) threadLocalData.get();
186            if (data == null) {
187                data = new ThreadLocalData();
188                threadLocalData.set(data);
189            }
190            if (data.destination == null) {
191                data.destination = createTemporaryDestination();
192            }
193            if (data.consumer == null) {
194                data.consumer = this.createConsumer(data.destination);
195            }
196            return data;
197        }
198    
199        // Implementation methods
200        //-------------------------------------------------------------------------
201        protected static class ThreadLocalData {
202            public MessageConsumer consumer;
203            public Destination destination;
204    
205            public void clear() throws JMSException {
206                destination = null;
207                consumer.close();
208            }
209        }
210    
211        protected boolean isTopic(Connection connection) throws JMSException {
212            return getSessionFactory().isTopic();
213        }
214    
215        protected boolean isTopic(ConnectionFactory factory) throws JMSException {
216            return getSessionFactory().isTopic();
217        }
218    
219    
220    
221        /**
222         * Factory method to create a new MessengerSession
223         */
224        protected MessengerSession createMessengerSession() throws JMSException {
225            MessengerSession answer =  new MessengerSession(this, getSessionFactory());
226            if (log.isDebugEnabled()) {
227                log.debug("Created MessengerSession: " + ++count + " value: " + answer);
228            }
229            return answer;
230        }
231    
232        /** Factory method to create a SessionFactory.
233          * Derived classes could override this method to create the SessionFactory
234          * from a well known place
235          */
236        protected SessionFactory createSessionFactory() throws JMSException {
237            throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
238        }
239        
240        protected synchronized MessengerSession borrowMessengerSession() throws JMSException {
241            MessengerSession answer = null;
242            if (pool.isEmpty()) {
243                answer = createMessengerSession();
244            }
245            else {
246                answer = (MessengerSession) pool.removeFirst();
247            }
248            if (log.isDebugEnabled()) {
249                log.debug("#### Borrowing messenger session: " + answer);
250            }
251            return answer;
252        }
253        
254        protected synchronized void returnMessengerSession(MessengerSession session) {
255            if (log.isDebugEnabled()) {
256                log.debug("#### Returning messenger session: " + session);
257            }
258            pool.addLast(session);
259        }
260    
261    }