001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: MessengerSession.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messenger; 011 012 import java.util.HashMap; 013 import java.util.Map; 014 015 import javax.jms.Destination; 016 import javax.jms.JMSException; 017 import javax.jms.MessageConsumer; 018 import javax.jms.MessageProducer; 019 import javax.jms.Queue; 020 import javax.jms.QueueRequestor; 021 import javax.jms.QueueSession; 022 import javax.jms.Session; 023 import javax.jms.Topic; 024 import javax.jms.TopicRequestor; 025 import javax.jms.TopicSession; 026 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 030 /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p> 031 * 032 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 033 * @version $Revision: 155459 $ 034 */ 035 public class MessengerSession { 036 037 private static final Log log = LogFactory.getLog(MessengerSupport.class); 038 039 /** @todo should have ack mode for sending and consuming */ 040 041 042 /** the JMS Session for this thread */ 043 private Session session; 044 045 /** the JMS Listener (async subscription) Session for this thread */ 046 private Session listenerSession; 047 048 /** the MessageConsumer for this threads reply to destination */ 049 private MessageConsumer replyToConsumer; 050 051 /** The factory used to create each thread's JMS Session */ 052 private SessionFactory sessionFactory; 053 054 /** An optional cache of requestors */ 055 private Map requestorsMap; 056 057 /** The inbox which is used for the call() methods */ 058 private Destination replyToDestination; 059 060 /** The current messenger to which I'm connected */ 061 private MessengerSupport messenger; 062 063 /** The producer used to send messages using this session */ 064 private MessageProducer producer; 065 066 public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) { 067 this.messenger = messenger; 068 this.sessionFactory = sessionFactory; 069 } 070 071 public SessionFactory getSessionFactory() { 072 return sessionFactory; 073 } 074 075 /** 076 * Closes any sessions or producers open 077 */ 078 public void close() throws JMSException { 079 if (producer != null) { 080 producer.close(); 081 producer = null; 082 } 083 084 if (session != null) { 085 session.close(); 086 session = null; 087 } 088 if (listenerSession != null) { 089 listenerSession.close(); 090 listenerSession = null; 091 } 092 } 093 094 095 /** 096 * @return the JMS Session for this thread for synchronous mode 097 */ 098 public Session getSession() throws JMSException { 099 if (session == null) { 100 session = createSession(); 101 } 102 return session; 103 } 104 105 /** 106 * @return the JMS Session for this thread for asynchronous mode 107 */ 108 public Session getListenerSession() throws JMSException { 109 if (listenerSession == null) { 110 listenerSession = createSession(); 111 } 112 return listenerSession; 113 } 114 115 /** 116 * @return the MessageConsumer for the ReplyTo Destination for this thread 117 */ 118 public MessageConsumer getReplyToConsumer() throws JMSException { 119 return replyToConsumer; 120 } 121 122 public void setReplyToConsumer(MessageConsumer replyToConsumer) { 123 this.replyToConsumer = replyToConsumer; 124 } 125 126 /** 127 * @return the MessageProducer for the given destination. 128 */ 129 public MessageProducer getMessageProducer(Destination destination) throws JMSException { 130 if (producer == null) { 131 producer = messenger.createMessageProducer(this, null); 132 } 133 return producer; 134 } 135 136 /** 137 * @return the reply to destination (a temporary queue) 138 * used to reply to this thread and session 139 */ 140 protected Destination getReplyToDestination() throws JMSException { 141 if (replyToDestination == null) { 142 replyToDestination = createTemporaryDestination(); 143 } 144 return replyToDestination; 145 } 146 147 /** 148 * Sets the reply to destination to use 149 */ 150 protected void setReplyToDestination(Destination replyToDestination) throws JMSException { 151 this.replyToDestination = replyToDestination; 152 } 153 154 /** 155 * @return either a cached TopicRequestor or creates a new one 156 */ 157 public TopicRequestor getTopicRequestor(TopicSession session, Topic destination) throws JMSException { 158 if (messenger.isCacheRequestors()) { 159 TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination); 160 if (requestor == null) { 161 requestor = new TopicRequestor(session, destination); 162 getRequestorsMap().put(destination, requestor); 163 } 164 return requestor; 165 } 166 else { 167 return new TopicRequestor(session, destination); 168 } 169 } 170 171 /** 172 * @return either a cached QueueRequestor or creates a new one 173 */ 174 public QueueRequestor getQueueRequestor(QueueSession session, Queue destination) throws JMSException { 175 if (messenger.isCacheRequestors()) { 176 QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination); 177 if (requestor == null) { 178 requestor = new QueueRequestor(session, destination); 179 getRequestorsMap().put(destination, requestor); 180 } 181 return requestor; 182 } 183 else { 184 return new QueueRequestor(session, destination); 185 } 186 } 187 188 public boolean isTopic() throws JMSException { 189 return getSessionFactory().isTopic(); 190 } 191 192 193 194 195 /** 196 * Factory method to create a new JMS Session 197 */ 198 protected Session createSession() throws JMSException { 199 Session answer = getSessionFactory().createSession(messenger.getConnection()); 200 log.info("Created JMS session: " + answer); 201 return answer; 202 } 203 204 /** 205 * Factory method to create a new temporary destination 206 */ 207 protected Destination createTemporaryDestination() throws JMSException { 208 if (isTopic()) { 209 TopicSession topicSession = (TopicSession) session; 210 return topicSession.createTemporaryTopic(); 211 } 212 else { 213 QueueSession queueSession = (QueueSession) session; 214 return queueSession.createTemporaryQueue(); 215 } 216 } 217 218 /** 219 * @return the map of requestors, indexed by destination. 220 * The Map will be lazily constructed 221 */ 222 protected Map getRequestorsMap() { 223 if (requestorsMap == null) { 224 requestorsMap = new HashMap(); 225 } 226 return requestorsMap; 227 } 228 }