001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: MessengerSession.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messenger;
011
012 import java.util.HashMap;
013 import java.util.Map;
014
015 import javax.jms.Destination;
016 import javax.jms.JMSException;
017 import javax.jms.MessageConsumer;
018 import javax.jms.MessageProducer;
019 import javax.jms.Queue;
020 import javax.jms.QueueRequestor;
021 import javax.jms.QueueSession;
022 import javax.jms.Session;
023 import javax.jms.Topic;
024 import javax.jms.TopicRequestor;
025 import javax.jms.TopicSession;
026
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029
030 /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p>
031 *
032 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
033 * @version $Revision: 155459 $
034 */
035 public class MessengerSession {
036
037 private static final Log log = LogFactory.getLog(MessengerSupport.class);
038
039 /** @todo should have ack mode for sending and consuming */
040
041
042 /** the JMS Session for this thread */
043 private Session session;
044
045 /** the JMS Listener (async subscription) Session for this thread */
046 private Session listenerSession;
047
048 /** the MessageConsumer for this threads reply to destination */
049 private MessageConsumer replyToConsumer;
050
051 /** The factory used to create each thread's JMS Session */
052 private SessionFactory sessionFactory;
053
054 /** An optional cache of requestors */
055 private Map requestorsMap;
056
057 /** The inbox which is used for the call() methods */
058 private Destination replyToDestination;
059
060 /** The current messenger to which I'm connected */
061 private MessengerSupport messenger;
062
063 /** The producer used to send messages using this session */
064 private MessageProducer producer;
065
066 public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
067 this.messenger = messenger;
068 this.sessionFactory = sessionFactory;
069 }
070
071 public SessionFactory getSessionFactory() {
072 return sessionFactory;
073 }
074
075 /**
076 * Closes any sessions or producers open
077 */
078 public void close() throws JMSException {
079 if (producer != null) {
080 producer.close();
081 producer = null;
082 }
083
084 if (session != null) {
085 session.close();
086 session = null;
087 }
088 if (listenerSession != null) {
089 listenerSession.close();
090 listenerSession = null;
091 }
092 }
093
094
095 /**
096 * @return the JMS Session for this thread for synchronous mode
097 */
098 public Session getSession() throws JMSException {
099 if (session == null) {
100 session = createSession();
101 }
102 return session;
103 }
104
105 /**
106 * @return the JMS Session for this thread for asynchronous mode
107 */
108 public Session getListenerSession() throws JMSException {
109 if (listenerSession == null) {
110 listenerSession = createSession();
111 }
112 return listenerSession;
113 }
114
115 /**
116 * @return the MessageConsumer for the ReplyTo Destination for this thread
117 */
118 public MessageConsumer getReplyToConsumer() throws JMSException {
119 return replyToConsumer;
120 }
121
122 public void setReplyToConsumer(MessageConsumer replyToConsumer) {
123 this.replyToConsumer = replyToConsumer;
124 }
125
126 /**
127 * @return the MessageProducer for the given destination.
128 */
129 public MessageProducer getMessageProducer(Destination destination) throws JMSException {
130 if (producer == null) {
131 producer = messenger.createMessageProducer(this, null);
132 }
133 return producer;
134 }
135
136 /**
137 * @return the reply to destination (a temporary queue)
138 * used to reply to this thread and session
139 */
140 protected Destination getReplyToDestination() throws JMSException {
141 if (replyToDestination == null) {
142 replyToDestination = createTemporaryDestination();
143 }
144 return replyToDestination;
145 }
146
147 /**
148 * Sets the reply to destination to use
149 */
150 protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
151 this.replyToDestination = replyToDestination;
152 }
153
154 /**
155 * @return either a cached TopicRequestor or creates a new one
156 */
157 public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
158 if (messenger.isCacheRequestors()) {
159 TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
160 if (requestor == null) {
161 requestor = new TopicRequestor(session, destination);
162 getRequestorsMap().put(destination, requestor);
163 }
164 return requestor;
165 }
166 else {
167 return new TopicRequestor(session, destination);
168 }
169 }
170
171 /**
172 * @return either a cached QueueRequestor or creates a new one
173 */
174 public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
175 if (messenger.isCacheRequestors()) {
176 QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
177 if (requestor == null) {
178 requestor = new QueueRequestor(session, destination);
179 getRequestorsMap().put(destination, requestor);
180 }
181 return requestor;
182 }
183 else {
184 return new QueueRequestor(session, destination);
185 }
186 }
187
188 public boolean isTopic() throws JMSException {
189 return getSessionFactory().isTopic();
190 }
191
192
193
194
195 /**
196 * Factory method to create a new JMS Session
197 */
198 protected Session createSession() throws JMSException {
199 Session answer = getSessionFactory().createSession(messenger.getConnection());
200 log.info("Created JMS session: " + answer);
201 return answer;
202 }
203
204 /**
205 * Factory method to create a new temporary destination
206 */
207 protected Destination createTemporaryDestination() throws JMSException {
208 if (isTopic()) {
209 TopicSession topicSession = (TopicSession) session;
210 return topicSession.createTemporaryTopic();
211 }
212 else {
213 QueueSession queueSession = (QueueSession) session;
214 return queueSession.createTemporaryQueue();
215 }
216 }
217
218 /**
219 * @return the map of requestors, indexed by destination.
220 * The Map will be lazily constructed
221 */
222 protected Map getRequestorsMap() {
223 if (requestorsMap == null) {
224 requestorsMap = new HashMap();
225 }
226 return requestorsMap;
227 }
228 }