001 /* 002 * Copyright 1999-2004 The Apache Software Foundation. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 package org.apache.commons.messenger; 017 018 import java.util.LinkedList; 019 020 import javax.jms.Connection; 021 import javax.jms.ConnectionFactory; 022 import javax.jms.Destination; 023 import javax.jms.JMSException; 024 import javax.jms.Message; 025 import javax.jms.MessageConsumer; 026 import javax.jms.MessageListener; 027 import javax.jms.MessageProducer; 028 import javax.jms.Queue; 029 import javax.jms.QueueSender; 030 import javax.jms.ServerSessionPool; 031 import javax.jms.Session; 032 import javax.jms.Topic; 033 import javax.jms.TopicPublisher; 034 035 import org.apache.commons.logging.Log; 036 import org.apache.commons.logging.LogFactory; 037 038 /** <p><code>SimpleMessenger</code> is an implementation of 039 * Messenger which uses a single JMS Session for sending 040 * to keep the JMS Session that should be used for a given calling thread.</p> 041 * 042 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 043 * @version $Revision: 155459 $ 044 */ 045 public class SimpleMessenger extends MessengerSupport { 046 047 /** Logger */ 048 private static final Log log = LogFactory.getLog(SimpleMessenger.class); 049 050 /** the SessionFactory used to create new JMS sessions */ 051 private SessionFactory sessionFactory; 052 053 /** pool of MessengerSession instances */ 054 private LinkedList pool = new LinkedList(); 055 056 /** thread local data for RPCs */ 057 private ThreadLocal threadLocalData = new ThreadLocal(); 058 059 private static int count; 060 061 public SimpleMessenger() { 062 } 063 064 /** Returns the SessionFactory used to create new JMS sessions */ 065 public SessionFactory getSessionFactory() throws JMSException { 066 if (sessionFactory == null) { 067 sessionFactory = createSessionFactory(); 068 } 069 return sessionFactory; 070 } 071 072 /** Sets the SessionFactory used to create new JMS sessions */ 073 public void setSessionFactory(SessionFactory sessionFactory) { 074 this.sessionFactory = sessionFactory; 075 } 076 077 public Connection getConnection() throws JMSException { 078 return getSessionFactory().getConnection(); 079 } 080 081 public ServerSessionPool createServerSessionPool(MessageListener messageListener, int maxThreads) 082 throws JMSException { 083 return getSessionFactory().createServerSessionPool(messageListener, maxThreads); 084 } 085 086 public synchronized void close() throws JMSException { 087 while (! pool.isEmpty()) { 088 MessengerSession session = (MessengerSession) pool.removeFirst(); 089 session.close(); 090 } 091 092 getSessionFactory().close(); 093 } 094 095 public Session getSession() throws JMSException { 096 throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead"); 097 } 098 099 public Session getAsyncSession() throws JMSException { 100 throw new UnsupportedOperationException("Not supported by this Messenger. Please use borrowSession() and returnSession() instead"); 101 } 102 103 public Message call(Destination destination, Message message) throws JMSException { 104 ThreadLocalData data = null; 105 MessengerSession messengerSession = borrowMessengerSession(); 106 try { 107 data = getThreadLocalData(messengerSession.getSession()); 108 Destination replyTo = data.destination; 109 message.setJMSReplyTo(replyTo); 110 } 111 finally { 112 returnMessengerSession(messengerSession); 113 } 114 115 log.info("Sending message to destination: " + destination); 116 117 118 send(destination, message); 119 120 messengerSession = borrowMessengerSession(); 121 try { 122 123 // MessageProducer producer = messengerSession.getMessageProducer(destination); 124 // if (messengerSession.isTopic()) { 125 // ((TopicPublisher) producer).publish((Topic) destination, message); 126 // } 127 // else { 128 // ((QueueSender) producer).send((Queue) destination, message); 129 // } 130 // 131 log.info("Message sent - now waiting for a response..."); 132 133 MessageConsumer consumer = data.consumer; 134 Message response = consumer.receive(); 135 // Message response = null; 136 if (response == null) { 137 // we could have timed out so lets trash the temporary destination 138 // so that the next call() method will use a new destination to avoid 139 // the response for this call() coming back on later call() invokcations 140 data.clear(); 141 } 142 return response; 143 } 144 finally { 145 returnMessengerSession(messengerSession); 146 } 147 } 148 149 public Message call(Destination destination, Message message, long timeoutMillis) throws JMSException { 150 MessengerSession messengerSession = borrowMessengerSession(); 151 try { 152 ThreadLocalData data = getThreadLocalData(messengerSession.getSession()); 153 Destination replyTo = data.destination; 154 message.setJMSReplyTo(replyTo); 155 156 MessageProducer producer = messengerSession.getMessageProducer(destination); 157 158 MessageConsumer consumer = data.consumer; 159 160 if (messengerSession.isTopic()) { 161 ((TopicPublisher) producer).publish((Topic) destination, message); 162 } 163 else { 164 ((QueueSender) producer).send((Queue) destination, message); 165 } 166 Message response = consumer.receive(timeoutMillis); 167 if (response == null) { 168 // we could have timed out so lets trash the temporary destination 169 // so that the next call() method will use a new destination to avoid 170 // the response for this call() coming back on later call() invokcations 171 data.clear(); 172 } 173 return response; 174 } 175 finally { 176 returnMessengerSession(messengerSession); 177 } 178 } 179 180 181 /** 182 * @return the local thread data 183 */ 184 protected ThreadLocalData getThreadLocalData(Session session) throws JMSException { 185 ThreadLocalData data = (ThreadLocalData) threadLocalData.get(); 186 if (data == null) { 187 data = new ThreadLocalData(); 188 threadLocalData.set(data); 189 } 190 if (data.destination == null) { 191 data.destination = createTemporaryDestination(); 192 } 193 if (data.consumer == null) { 194 data.consumer = this.createConsumer(data.destination); 195 } 196 return data; 197 } 198 199 // Implementation methods 200 //------------------------------------------------------------------------- 201 protected static class ThreadLocalData { 202 public MessageConsumer consumer; 203 public Destination destination; 204 205 public void clear() throws JMSException { 206 destination = null; 207 consumer.close(); 208 } 209 } 210 211 protected boolean isTopic(Connection connection) throws JMSException { 212 return getSessionFactory().isTopic(); 213 } 214 215 protected boolean isTopic(ConnectionFactory factory) throws JMSException { 216 return getSessionFactory().isTopic(); 217 } 218 219 220 221 /** 222 * Factory method to create a new MessengerSession 223 */ 224 protected MessengerSession createMessengerSession() throws JMSException { 225 MessengerSession answer = new MessengerSession(this, getSessionFactory()); 226 if (log.isDebugEnabled()) { 227 log.debug("Created MessengerSession: " + ++count + " value: " + answer); 228 } 229 return answer; 230 } 231 232 /** Factory method to create a SessionFactory. 233 * Derived classes could override this method to create the SessionFactory 234 * from a well known place 235 */ 236 protected SessionFactory createSessionFactory() throws JMSException { 237 throw new JMSException("No SessionFactory configured for this Messenger. Cannot create a new JMS Session"); 238 } 239 240 protected synchronized MessengerSession borrowMessengerSession() throws JMSException { 241 MessengerSession answer = null; 242 if (pool.isEmpty()) { 243 answer = createMessengerSession(); 244 } 245 else { 246 answer = (MessengerSession) pool.removeFirst(); 247 } 248 if (log.isDebugEnabled()) { 249 log.debug("#### Borrowing messenger session: " + answer); 250 } 251 return answer; 252 } 253 254 protected synchronized void returnMessengerSession(MessengerSession session) { 255 if (log.isDebugEnabled()) { 256 log.debug("#### Returning messenger session: " + session); 257 } 258 pool.addLast(session); 259 } 260 261 }