001 /* 002 * Copyright 1999-2004 The Apache Software Foundation. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.apache.commons.messenger; 017 018 import javax.jms.Connection; 019 import javax.jms.ConnectionFactory; 020 import javax.jms.Destination; 021 import javax.jms.JMSException; 022 import javax.jms.Message; 023 import javax.jms.MessageConsumer; 024 import javax.jms.MessageListener; 025 import javax.jms.MessageProducer; 026 import javax.jms.Queue; 027 import javax.jms.QueueSender; 028 import javax.jms.QueueSession; 029 import javax.jms.ServerSessionPool; 030 import javax.jms.Session; 031 import javax.jms.Topic; 032 import javax.jms.TopicPublisher; 033 import javax.jms.TopicSession; 034 import javax.naming.Context; 035 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** <p><code>DefaultMessenger</code> is the default implementation of 040 * Messenger which uses a {@link ThreadLocal} variable 041 * to keep the JMS Session that should be used for a given calling thread.</p> 042 * 043 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 044 * @version $Revision: 155459 $ 045 */ 046 public class DefaultMessenger extends MessengerSupport { 047 048 private static final boolean SHARE_CONNECTION = true; 049 050 /** Logger */ 051 private static final Log log = LogFactory.getLog(DefaultMessenger.class); 052 053 /** the MessengerSession for each thread */ 054 private ThreadLocal messengerSessionPool = new ThreadLocal(); 055 056 /** the SessionFactory used to create new JMS sessions */ 057 private SessionFactory sessionFactory; 058 059 public DefaultMessenger() { 060 } 061 062 /** Returns the SessionFactory used to create new JMS sessions */ 063 public SessionFactory getSessionFactory() throws JMSException { 064 if (sessionFactory == null) { 065 sessionFactory = createSessionFactory(); 066 } 067 return sessionFactory; 068 } 069 070 /** Sets the SessionFactory used to create new JMS sessions */ 071 public void setSessionFactory(SessionFactory sessionFactory) { 072 this.sessionFactory = sessionFactory; 073 } 074 075 public Connection getConnection() throws JMSException { 076 return getSessionFactory().getConnection(); 077 } 078 079 public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) 080 throws JMSException { 081 return getSessionFactory().createServerSessionPool(messageListener, maxThreads); 082 } 083 084 public synchronized void close() throws JMSException { 085 /** note that only the current session is terminated */ 086 MessengerSession session = getMessengerSession(); 087 088 // clear all the pools... 089 messengerSessionPool = new ThreadLocal(); 090 091 session.close(); 092 getSessionFactory().close(); 093 } 094 095 public Session getSession() throws JMSException { 096 return getMessengerSession().getSession(); 097 } 098 099 public Session getAsyncSession() throws JMSException { 100 return getMessengerSession().getListenerSession(); 101 } 102 103 public Message call(Destination destination, Message message) throws JMSException { 104 MessengerSession session = borrowMessengerSession(); 105 try { 106 Destination replyTo = getReplyToDestination(); 107 message.setJMSReplyTo(replyTo); 108 109 // NOTE - we could consider adding a correlation ID per request so that we can ignore 110 // any cruft or old messages that are sent onto our inbound queue. 111 // 112 // Though that does mean that we must then rely on the inbound message having 113 // the right correlationID. Though at least this strategy would mean 114 // that we could have a single consumer on a temporary queue for all threads 115 // and use correlation IDs to dispatch to the corrent thread 116 // 117 // Maybe this should be a configurable strategy 118 119 MessageProducer producer = session.getMessageProducer(destination); 120 MessageConsumer consumer = getReplyToConsumer(); 121 122 if (session.isTopic()) { 123 ((TopicPublisher) producer).publish((Topic) destination, message); 124 } 125 else { 126 ((QueueSender) producer).send((Queue) destination, message); 127 } 128 Message response = consumer.receive(); 129 if (response == null) { 130 // we could have timed out so lets trash the temporary destination 131 // so that the next call() method will use a new destination to avoid 132 // the response for this call() coming back on later call() invokcations 133 clearReplyToDestination(); 134 } 135 return response; 136 } 137 finally { 138 returnMessengerSession(session); 139 } 140 } 141 142 public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException { 143 MessengerSession session = borrowMessengerSession(); 144 try { 145 Destination replyTo = getReplyToDestination(); 146 message.setJMSReplyTo(replyTo); 147 148 // NOTE - we could consider adding a correlation ID per request so that we can ignore 149 // any cruft or old messages that are sent onto our inbound queue. 150 // 151 // Though that does mean that we must then rely on the inbound message having 152 // the right correlationID. Though at least this strategy would mean 153 // that we could have a single consumer on a temporary queue for all threads 154 // and use correlation IDs to dispatch to the corrent thread 155 // 156 // Maybe this should be a configurable strategy 157 158 MessageProducer producer = session.getMessageProducer(destination); 159 160 MessageConsumer consumer = getReplyToConsumer(); 161 if (session.isTopic()) { 162 ((TopicPublisher) producer).publish((Topic) destination, message); 163 } 164 else { 165 ((QueueSender) producer).send((Queue) destination, message); 166 } 167 Message response = consumer.receive(timeoutMillis); 168 if (response == null) { 169 // we could have timed out so lets trash the temporary destination 170 // so that the next call() method will use a new destination to avoid 171 // the response for this call() coming back on later call() invokcations 172 clearReplyToDestination(); 173 } 174 return response; 175 } 176 finally { 177 returnMessengerSession(session); 178 } 179 } 180 181 // Implementation methods 182 //------------------------------------------------------------------------- 183 protected boolean isTopic(Connection connection) throws JMSException { 184 return getSessionFactory().isTopic(); 185 } 186 187 protected boolean isTopic(ConnectionFactory factory) throws JMSException { 188 return getSessionFactory().isTopic(); 189 } 190 191 /** 192 * @return the MessageConsumer for this threads temporary destination 193 * which is cached for the duration of this process. 194 */ 195 protected MessageConsumer getReplyToConsumer() throws JMSException { 196 MessengerSession messengerSession = getMessengerSession(); 197 MessageConsumer consumer = messengerSession.getReplyToConsumer(); 198 synchronized ( messengerSession ) { 199 if (consumer == null) { 200 consumer = createMessageConsumer(messengerSession, messengerSession.getSession(), messengerSession.getReplyToDestination()); 201 messengerSession.setReplyToConsumer(consumer); 202 } 203 } 204 return consumer; 205 } 206 207 /** 208 * Clears the temporary destination used to receive reply-to messages 209 * which will lazily force a new destination and consumer to be created next 210 * time a call() method is invoked. 211 */ 212 protected void clearReplyToDestination() throws JMSException { 213 MessengerSession messengerSession = getMessengerSession(); 214 215 messengerSession.setReplyToDestination(null); 216 MessageConsumer consumer = messengerSession.getReplyToConsumer(); 217 if (consumer != null) { 218 messengerSession.setReplyToConsumer(null); 219 220 // ensure that everything is nullified first before we close 221 // just in case an exception occurs 222 consumer.close(); 223 } 224 } 225 226 protected Destination getReplyToDestination() throws JMSException { 227 return getMessengerSession().getReplyToDestination(); 228 } 229 230 protected MessengerSession getMessengerSession() throws JMSException { 231 return borrowMessengerSession(); 232 } 233 234 protected MessengerSession borrowMessengerSession() throws JMSException { 235 MessengerSession answer = (MessengerSession) messengerSessionPool.get(); 236 if (answer == null) { 237 answer = createMessengerSession(); 238 messengerSessionPool.set(answer); 239 } 240 return answer; 241 } 242 243 protected void returnMessengerSession(MessengerSession session) { 244 } 245 246 /** 247 * Factory method to create a new MessengerSession 248 */ 249 protected MessengerSession createMessengerSession() throws JMSException { 250 return new MessengerSession(this, getSessionFactory()); 251 } 252 253 /** Factory method to create a SessionFactory. 254 * Derived classes could override this method to create the SessionFactory 255 * from a well known place 256 */ 257 protected SessionFactory createSessionFactory() throws JMSException { 258 throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session"); 259 } 260 261 public Queue getQueue(QueueSession session, String subject) throws JMSException { 262 // XXXX: might want to cache 263 Context ctx = null; 264 JNDISessionFactory factory = null; 265 266 Queue queue = null; 267 if (isJndiDestinations()) { 268 try { 269 factory = (JNDISessionFactory) getSessionFactory(); 270 ctx = factory.getContext(); 271 queue = (Queue) ctx.lookup(subject); 272 } 273 catch (Exception e) { 274 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e); 275 } 276 } 277 else { 278 // XXXX: might want to cache 279 queue = session.createQueue(subject); 280 } 281 return queue; 282 } 283 284 public Topic getTopic(TopicSession session, String subject) throws JMSException { 285 // XXXX: might want to cache 286 Context ctx = null; 287 JNDISessionFactory factory = null; 288 289 Topic topic = null; 290 if (isJndiDestinations()) { 291 try { 292 factory = (JNDISessionFactory) getSessionFactory(); 293 ctx = factory.getContext(); 294 topic = (Topic) ctx.lookup(subject); 295 } 296 catch (Exception e) { 297 log.error("Unable to lookup subject: " + subject + ". Exception: " + e, e); 298 } 299 } 300 else { 301 topic = session.createTopic(subject); 302 } 303 return topic; 304 } 305 306 }