001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: MessengerSession.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messenger;
011    
012    import java.util.HashMap;
013    import java.util.Map;
014    
015    import javax.jms.Destination;
016    import javax.jms.JMSException;
017    import javax.jms.MessageConsumer;
018    import javax.jms.MessageProducer;
019    import javax.jms.Queue;
020    import javax.jms.QueueRequestor;
021    import javax.jms.QueueSession;
022    import javax.jms.Session;
023    import javax.jms.Topic;
024    import javax.jms.TopicRequestor;
025    import javax.jms.TopicSession;
026    
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p>
031      *
032      * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
033      * @version $Revision: 155459 $
034      */
035    public class MessengerSession {
036    
037        private static final Log log = LogFactory.getLog(MessengerSupport.class);
038    
039        /** @todo should have ack mode for sending and consuming */
040    
041    
042        /** the JMS Session for this thread */
043        private Session session;
044    
045        /** the JMS Listener (async subscription) Session for this thread */
046        private Session listenerSession;
047    
048        /** the MessageConsumer for this threads reply to destination */
049        private MessageConsumer replyToConsumer;
050    
051        /** The factory used to create each thread's JMS Session */
052        private SessionFactory sessionFactory;
053    
054        /** An optional cache of requestors */
055        private Map requestorsMap;
056    
057        /** The inbox which is used for the call() methods */
058        private Destination replyToDestination;
059    
060        /** The current messenger to which I'm connected */
061        private MessengerSupport messenger;
062    
063        /** The producer used to send messages using this session */
064        private MessageProducer producer;
065    
066        public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
067            this.messenger = messenger;
068            this.sessionFactory = sessionFactory;
069        }
070    
071        public SessionFactory getSessionFactory() {
072            return sessionFactory;
073        }
074    
075        /**
076         * Closes any sessions or producers open
077         */
078        public void close() throws JMSException {
079            if (producer != null) {
080                producer.close();
081                producer = null;
082            }
083    
084            if (session != null) {
085                session.close();
086                session = null;
087            }
088            if (listenerSession != null) {
089                listenerSession.close();
090                listenerSession = null;
091            }
092        }
093        
094        
095        /** 
096         * @return the JMS Session for this thread for synchronous mode 
097         */
098        public Session getSession() throws JMSException {
099            if (session == null) {
100                session = createSession();
101            }
102            return session;
103        }
104    
105        /** 
106         * @return the JMS Session for this thread for asynchronous mode 
107         */
108        public Session getListenerSession() throws JMSException {
109            if (listenerSession == null) {
110                listenerSession = createSession();
111            }
112            return listenerSession;
113        }
114     
115        /** 
116         * @return the MessageConsumer for the ReplyTo Destination for this thread
117         */
118        public MessageConsumer getReplyToConsumer() throws JMSException {
119            return replyToConsumer;
120        }
121    
122        public void setReplyToConsumer(MessageConsumer replyToConsumer) {
123            this.replyToConsumer = replyToConsumer;
124        }
125    
126        /**
127         * @return the MessageProducer for the given destination.
128         */
129        public MessageProducer getMessageProducer(Destination destination) throws JMSException {
130            if (producer == null) {
131                producer = messenger.createMessageProducer(this, null);
132            }
133            return producer;
134        }
135    
136        /** 
137         * @return the reply to destination (a temporary queue) 
138         * used to reply to this thread and session
139         */
140        protected Destination getReplyToDestination() throws JMSException {
141            if (replyToDestination == null) {
142                replyToDestination = createTemporaryDestination();
143            }
144            return replyToDestination;
145        }
146    
147        /** 
148         * Sets the reply to destination to use
149         */
150        protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
151            this.replyToDestination = replyToDestination;
152        }
153    
154        /**
155         * @return either a cached TopicRequestor or creates a new one
156         */
157        public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
158            if (messenger.isCacheRequestors()) {
159                TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
160                if (requestor == null) {
161                    requestor = new TopicRequestor(session, destination);
162                    getRequestorsMap().put(destination, requestor);
163                }
164                return requestor;
165            }
166            else {
167                return new TopicRequestor(session, destination);
168            }
169        }
170    
171        /**
172         * @return either a cached QueueRequestor or creates a new one
173         */
174        public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
175            if (messenger.isCacheRequestors()) {
176                QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
177                if (requestor == null) {
178                    requestor = new QueueRequestor(session, destination);
179                    getRequestorsMap().put(destination, requestor);
180                }
181                return requestor;
182            }
183            else {
184                return new QueueRequestor(session, destination);
185            }
186        }
187    
188        public boolean isTopic() throws JMSException {
189            return getSessionFactory().isTopic();
190        }
191    
192    
193    
194    
195        /** 
196         * Factory method to create a new JMS Session 
197         */
198        protected Session createSession() throws JMSException {
199            Session answer = getSessionFactory().createSession(messenger.getConnection());
200            log.info("Created JMS session: " + answer);
201            return answer;
202        }
203    
204        /**
205         * Factory method to create a new temporary destination
206         */
207        protected Destination createTemporaryDestination() throws JMSException {
208            if (isTopic()) {
209                TopicSession topicSession = (TopicSession) session;
210                return topicSession.createTemporaryTopic();
211            }
212            else {
213                QueueSession queueSession = (QueueSession) session;
214                return queueSession.createTemporaryQueue();
215            }
216        }
217    
218        /** 
219         * @return the map of requestors, indexed by destination.
220         *  The Map will be lazily constructed
221         */
222        protected Map getRequestorsMap() {
223            if (requestorsMap == null) {
224                requestorsMap = new HashMap();
225            }
226            return requestorsMap;
227        }
228    }