001 /*
002 * Copyright 1999-2004 The Apache Software Foundation.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016 package org.apache.commons.messenger;
017
018 import java.util.LinkedList;
019
020 import javax.jms.Connection;
021 import javax.jms.ConnectionFactory;
022 import javax.jms.Destination;
023 import javax.jms.JMSException;
024 import javax.jms.Message;
025 import javax.jms.MessageConsumer;
026 import javax.jms.MessageListener;
027 import javax.jms.MessageProducer;
028 import javax.jms.Queue;
029 import javax.jms.QueueSender;
030 import javax.jms.ServerSessionPool;
031 import javax.jms.Session;
032 import javax.jms.Topic;
033 import javax.jms.TopicPublisher;
034
035 import org.apache.commons.logging.Log;
036 import org.apache.commons.logging.LogFactory;
037
038 /** <p><code>SimpleMessenger</code> is an implementation of
039 * Messenger which uses a single JMS Session for sending
040 * to keep the JMS Session that should be used for a given calling thread.</p>
041 *
042 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
043 * @version $Revision: 155459 $
044 */
045 public class SimpleMessenger extends MessengerSupport {
046
047 /** Logger */
048 private static final Log log = LogFactory.getLog(SimpleMessenger.class);
049
050 /** the SessionFactory used to create new JMS sessions */
051 private SessionFactory sessionFactory;
052
053 /** pool of MessengerSession instances */
054 private LinkedList pool = new LinkedList();
055
056 /** thread local data for RPCs */
057 private ThreadLocal threadLocalData = new ThreadLocal();
058
059 private static int count;
060
061 public SimpleMessenger() {
062 }
063
064 /** Returns the SessionFactory used to create new JMS sessions */
065 public SessionFactory getSessionFactory() throws JMSException {
066 if (sessionFactory == null) {
067 sessionFactory = createSessionFactory();
068 }
069 return sessionFactory;
070 }
071
072 /** Sets the SessionFactory used to create new JMS sessions */
073 public void setSessionFactory(SessionFactory sessionFactory) {
074 this.sessionFactory = sessionFactory;
075 }
076
077 public Connection getConnection() throws JMSException {
078 return getSessionFactory().getConnection();
079 }
080
081 public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads)
082 throws JMSException {
083 return getSessionFactory().createServerSessionPool(messageListener, maxThreads);
084 }
085
086 public synchronized void close() throws JMSException {
087 while (! pool.isEmpty()) {
088 MessengerSession session = (MessengerSession) pool.removeFirst();
089 session.close();
090 }
091
092 getSessionFactory().close();
093 }
094
095 public Session getSession() throws JMSException {
096 throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
097 }
098
099 public Session getAsyncSession() throws JMSException {
100 throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead");
101 }
102
103 public Message call(Destination destination, Message message) throws JMSException {
104 ThreadLocalData data = null;
105 MessengerSession messengerSession = borrowMessengerSession();
106 try {
107 data = getThreadLocalData(messengerSession.getSession());
108 Destination replyTo = data.destination;
109 message.setJMSReplyTo(replyTo);
110 }
111 finally {
112 returnMessengerSession(messengerSession);
113 }
114
115 log.info("Sending message to destination: " + destination);
116
117
118 send(destination, message);
119
120 messengerSession = borrowMessengerSession();
121 try {
122
123 // MessageProducer producer = messengerSession.getMessageProducer(destination);
124 // if (messengerSession.isTopic()) {
125 // ((TopicPublisher) producer).publish((Topic) destination, message);
126 // }
127 // else {
128 // ((QueueSender) producer).send((Queue) destination, message);
129 // }
130 //
131 log.info("Message sent - now waiting for a response...");
132
133 MessageConsumer consumer = data.consumer;
134 Message response = consumer.receive();
135 // Message response = null;
136 if (response == null) {
137 // we could have timed out so lets trash the temporary destination
138 // so that the next call() method will use a new destination to avoid
139 // the response for this call() coming back on later call() invokcations
140 data.clear();
141 }
142 return response;
143 }
144 finally {
145 returnMessengerSession(messengerSession);
146 }
147 }
148
149 public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException {
150 MessengerSession messengerSession = borrowMessengerSession();
151 try {
152 ThreadLocalData data = getThreadLocalData(messengerSession.getSession());
153 Destination replyTo = data.destination;
154 message.setJMSReplyTo(replyTo);
155
156 MessageProducer producer = messengerSession.getMessageProducer(destination);
157
158 MessageConsumer consumer = data.consumer;
159
160 if (messengerSession.isTopic()) {
161 ((TopicPublisher) producer).publish((Topic) destination, message);
162 }
163 else {
164 ((QueueSender) producer).send((Queue) destination, message);
165 }
166 Message response = consumer.receive(timeoutMillis);
167 if (response == null) {
168 // we could have timed out so lets trash the temporary destination
169 // so that the next call() method will use a new destination to avoid
170 // the response for this call() coming back on later call() invokcations
171 data.clear();
172 }
173 return response;
174 }
175 finally {
176 returnMessengerSession(messengerSession);
177 }
178 }
179
180
181 /**
182 * @return the local thread data
183 */
184 protected ThreadLocalData getThreadLocalData(Session session) throws JMSException {
185 ThreadLocalData data = (ThreadLocalData) threadLocalData.get();
186 if (data == null) {
187 data = new ThreadLocalData();
188 threadLocalData.set(data);
189 }
190 if (data.destination == null) {
191 data.destination = createTemporaryDestination();
192 }
193 if (data.consumer == null) {
194 data.consumer = this.createConsumer(data.destination);
195 }
196 return data;
197 }
198
199 // Implementation methods
200 //-------------------------------------------------------------------------
201 protected static class ThreadLocalData {
202 public MessageConsumer consumer;
203 public Destination destination;
204
205 public void clear() throws JMSException {
206 destination = null;
207 consumer.close();
208 }
209 }
210
211 protected boolean isTopic(Connection connection) throws JMSException {
212 return getSessionFactory().isTopic();
213 }
214
215 protected boolean isTopic(ConnectionFactory factory) throws JMSException {
216 return getSessionFactory().isTopic();
217 }
218
219
220
221 /**
222 * Factory method to create a new MessengerSession
223 */
224 protected MessengerSession createMessengerSession() throws JMSException {
225 MessengerSession answer = new MessengerSession(this, getSessionFactory());
226 if (log.isDebugEnabled()) {
227 log.debug("Created MessengerSession: " + ++count + " value: " + answer);
228 }
229 return answer;
230 }
231
232 /** Factory method to create a SessionFactory.
233 * Derived classes could override this method to create the SessionFactory
234 * from a well known place
235 */
236 protected SessionFactory createSessionFactory() throws JMSException {
237 throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session");
238 }
239
240 protected synchronized MessengerSession borrowMessengerSession() throws JMSException {
241 MessengerSession answer = null;
242 if (pool.isEmpty()) {
243 answer = createMessengerSession();
244 }
245 else {
246 answer = (MessengerSession) pool.removeFirst();
247 }
248 if (log.isDebugEnabled()) {
249 log.debug("#### Borrowing messenger session: " + answer);
250 }
251 return answer;
252 }
253
254 protected synchronized void returnMessengerSession(MessengerSession session) {
255 if (log.isDebugEnabled()) {
256 log.debug("#### Returning messenger session: " + session);
257 }
258 pool.addLast(session);
259 }
260
261 }