View Javadoc

1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    * 
8    * $Id: MessengerSession.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messenger;
11  
12  import java.util.HashMap;
13  import java.util.Map;
14  
15  import javax.jms.Destination;
16  import javax.jms.JMSException;
17  import javax.jms.MessageConsumer;
18  import javax.jms.MessageProducer;
19  import javax.jms.Queue;
20  import javax.jms.QueueRequestor;
21  import javax.jms.QueueSession;
22  import javax.jms.Session;
23  import javax.jms.Topic;
24  import javax.jms.TopicRequestor;
25  import javax.jms.TopicSession;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  
30  /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p>
31    *
32    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
33    * @version $Revision: 155459 $
34    */
35  public class MessengerSession {
36  
37      private static final Log log = LogFactory.getLog(MessengerSupport.class);
38  
39      /** @todo should have ack mode for sending and consuming */
40  
41  
42      /** the JMS Session for this thread */
43      private Session session;
44  
45      /** the JMS Listener (async subscription) Session for this thread */
46      private Session listenerSession;
47  
48      /** the MessageConsumer for this threads reply to destination */
49      private MessageConsumer replyToConsumer;
50  
51      /** The factory used to create each thread's JMS Session */
52      private SessionFactory sessionFactory;
53  
54      /** An optional cache of requestors */
55      private Map requestorsMap;
56  
57      /** The inbox which is used for the call() methods */
58      private Destination replyToDestination;
59  
60      /** The current messenger to which I'm connected */
61      private MessengerSupport messenger;
62  
63      /** The producer used to send messages using this session */
64      private MessageProducer producer;
65  
66      public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
67          this.messenger = messenger;
68          this.sessionFactory = sessionFactory;
69      }
70  
71      public SessionFactory getSessionFactory() {
72          return sessionFactory;
73      }
74  
75      /**
76       * Closes any sessions or producers open
77       */
78      public void close() throws JMSException {
79          if (producer != null) {
80              producer.close();
81              producer = null;
82          }
83  
84          if (session != null) {
85              session.close();
86              session = null;
87          }
88          if (listenerSession != null) {
89              listenerSession.close();
90              listenerSession = null;
91          }
92      }
93      
94      
95      /** 
96       * @return the JMS Session for this thread for synchronous mode 
97       */
98      public Session getSession() throws JMSException {
99          if (session == null) {
100             session = createSession();
101         }
102         return session;
103     }
104 
105     /** 
106      * @return the JMS Session for this thread for asynchronous mode 
107      */
108     public Session getListenerSession() throws JMSException {
109         if (listenerSession == null) {
110             listenerSession = createSession();
111         }
112         return listenerSession;
113     }
114  
115     /** 
116      * @return the MessageConsumer for the ReplyTo Destination for this thread
117      */
118     public MessageConsumer getReplyToConsumer() throws JMSException {
119         return replyToConsumer;
120     }
121 
122     public void setReplyToConsumer(MessageConsumer replyToConsumer) {
123         this.replyToConsumer = replyToConsumer;
124     }
125 
126     /**
127      * @return the MessageProducer for the given destination.
128      */
129     public MessageProducer getMessageProducer(Destination destination) throws JMSException {
130         if (producer == null) {
131             producer = messenger.createMessageProducer(this, null);
132         }
133         return producer;
134     }
135 
136     /** 
137      * @return the reply to destination (a temporary queue) 
138      * used to reply to this thread and session
139      */
140     protected Destination getReplyToDestination() throws JMSException {
141         if (replyToDestination == null) {
142             replyToDestination = createTemporaryDestination();
143         }
144         return replyToDestination;
145     }
146 
147     /** 
148      * Sets the reply to destination to use
149      */
150     protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
151         this.replyToDestination = replyToDestination;
152     }
153 
154     /**
155      * @return either a cached TopicRequestor or creates a new one
156      */
157     public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
158         if (messenger.isCacheRequestors()) {
159             TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
160             if (requestor == null) {
161                 requestor = new TopicRequestor(session, destination);
162                 getRequestorsMap().put(destination, requestor);
163             }
164             return requestor;
165         }
166         else {
167             return new TopicRequestor(session, destination);
168         }
169     }
170 
171     /**
172      * @return either a cached QueueRequestor or creates a new one
173      */
174     public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
175         if (messenger.isCacheRequestors()) {
176             QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
177             if (requestor == null) {
178                 requestor = new QueueRequestor(session, destination);
179                 getRequestorsMap().put(destination, requestor);
180             }
181             return requestor;
182         }
183         else {
184             return new QueueRequestor(session, destination);
185         }
186     }
187 
188     public boolean isTopic() throws JMSException {
189         return getSessionFactory().isTopic();
190     }
191 
192 
193 
194 
195     /** 
196      * Factory method to create a new JMS Session 
197      */
198     protected Session createSession() throws JMSException {
199         Session answer = getSessionFactory().createSession(messenger.getConnection());
200         log.info("Created JMS session: " + answer);
201         return answer;
202     }
203 
204     /**
205      * Factory method to create a new temporary destination
206      */
207     protected Destination createTemporaryDestination() throws JMSException {
208         if (isTopic()) {
209             TopicSession topicSession = (TopicSession) session;
210             return topicSession.createTemporaryTopic();
211         }
212         else {
213             QueueSession queueSession = (QueueSession) session;
214             return queueSession.createTemporaryQueue();
215         }
216     }
217 
218     /** 
219      * @return the map of requestors, indexed by destination.
220      *  The Map will be lazily constructed
221      */
222     protected Map getRequestorsMap() {
223         if (requestorsMap == null) {
224             requestorsMap = new HashMap();
225         }
226         return requestorsMap;
227     }
228 }