1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messenger;
11
12 import java.util.HashMap;
13 import java.util.Map;
14
15 import javax.jms.Destination;
16 import javax.jms.JMSException;
17 import javax.jms.MessageConsumer;
18 import javax.jms.MessageProducer;
19 import javax.jms.Queue;
20 import javax.jms.QueueRequestor;
21 import javax.jms.QueueSession;
22 import javax.jms.Session;
23 import javax.jms.Topic;
24 import javax.jms.TopicRequestor;
25 import javax.jms.TopicSession;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29
30
31
32
33
34
35 public class MessengerSession {
36
37 private static final Log log = LogFactory.getLog(MessengerSupport.class);
38
39
40
41
42
43 private Session session;
44
45
46 private Session listenerSession;
47
48
49 private MessageConsumer replyToConsumer;
50
51
52 private SessionFactory sessionFactory;
53
54
55 private Map requestorsMap;
56
57
58 private Destination replyToDestination;
59
60
61 private MessengerSupport messenger;
62
63
64 private MessageProducer producer;
65
66 public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) {
67 this.messenger = messenger;
68 this.sessionFactory = sessionFactory;
69 }
70
71 public SessionFactory getSessionFactory() {
72 return sessionFactory;
73 }
74
75
76
77
78 public void close() throws JMSException {
79 if (producer != null) {
80 producer.close();
81 producer = null;
82 }
83
84 if (session != null) {
85 session.close();
86 session = null;
87 }
88 if (listenerSession != null) {
89 listenerSession.close();
90 listenerSession = null;
91 }
92 }
93
94
95
96
97
98 public Session getSession() throws JMSException {
99 if (session == null) {
100 session = createSession();
101 }
102 return session;
103 }
104
105
106
107
108 public Session getListenerSession() throws JMSException {
109 if (listenerSession == null) {
110 listenerSession = createSession();
111 }
112 return listenerSession;
113 }
114
115
116
117
118 public MessageConsumer getReplyToConsumer() throws JMSException {
119 return replyToConsumer;
120 }
121
122 public void setReplyToConsumer(MessageConsumer replyToConsumer) {
123 this.replyToConsumer = replyToConsumer;
124 }
125
126
127
128
129 public MessageProducer getMessageProducer(Destination destination) throws JMSException {
130 if (producer == null) {
131 producer = messenger.createMessageProducer(this, null);
132 }
133 return producer;
134 }
135
136
137
138
139
140 protected Destination getReplyToDestination() throws JMSException {
141 if (replyToDestination == null) {
142 replyToDestination = createTemporaryDestination();
143 }
144 return replyToDestination;
145 }
146
147
148
149
150 protected void setReplyToDestination(Destination replyToDestination) throws JMSException {
151 this.replyToDestination = replyToDestination;
152 }
153
154
155
156
157 public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException {
158 if (messenger.isCacheRequestors()) {
159 TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination);
160 if (requestor == null) {
161 requestor = new TopicRequestor(session, destination);
162 getRequestorsMap().put(destination, requestor);
163 }
164 return requestor;
165 }
166 else {
167 return new TopicRequestor(session, destination);
168 }
169 }
170
171
172
173
174 public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException {
175 if (messenger.isCacheRequestors()) {
176 QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination);
177 if (requestor == null) {
178 requestor = new QueueRequestor(session, destination);
179 getRequestorsMap().put(destination, requestor);
180 }
181 return requestor;
182 }
183 else {
184 return new QueueRequestor(session, destination);
185 }
186 }
187
188 public boolean isTopic() throws JMSException {
189 return getSessionFactory().isTopic();
190 }
191
192
193
194
195
196
197
198 protected Session createSession() throws JMSException {
199 Session answer = getSessionFactory().createSession(messenger.getConnection());
200 log.info("Created JMS session: " + answer);
201 return answer;
202 }
203
204
205
206
207 protected Destination createTemporaryDestination() throws JMSException {
208 if (isTopic()) {
209 TopicSession topicSession = (TopicSession) session;
210 return topicSession.createTemporaryTopic();
211 }
212 else {
213 QueueSession queueSession = (QueueSession) session;
214 return queueSession.createTemporaryQueue();
215 }
216 }
217
218
219
220
221
222 protected Map getRequestorsMap() {
223 if (requestorsMap == null) {
224 requestorsMap = new HashMap();
225 }
226 return requestorsMap;
227 }
228 }