View Javadoc

1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    *
8    * $Id: MessengerSupport.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messenger;
11  
12  import java.io.Serializable;
13  import java.util.HashMap;
14  import java.util.Map;
15  
16  import javax.jms.BytesMessage;
17  import javax.jms.Connection;
18  import javax.jms.ConnectionConsumer;
19  import javax.jms.ConnectionFactory;
20  import javax.jms.DeliveryMode;
21  import javax.jms.Destination;
22  import javax.jms.JMSException;
23  import javax.jms.MapMessage;
24  import javax.jms.Message;
25  import javax.jms.MessageConsumer;
26  import javax.jms.MessageListener;
27  import javax.jms.MessageProducer;
28  import javax.jms.ObjectMessage;
29  import javax.jms.Queue;
30  import javax.jms.QueueBrowser;
31  import javax.jms.QueueConnection;
32  import javax.jms.QueueSender;
33  import javax.jms.QueueSession;
34  import javax.jms.ServerSessionPool;
35  import javax.jms.Session;
36  import javax.jms.StreamMessage;
37  import javax.jms.TextMessage;
38  import javax.jms.Topic;
39  import javax.jms.TopicConnection;
40  import javax.jms.TopicPublisher;
41  import javax.jms.TopicSession;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  
46  /** <p><code>MessengerSupport</code> is an abstract base class which implements
47    * most of the functionality of Messenger. Derivations need to specify the
48    * connection and session creation and the pooling strategy.</p>
49    *
50    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
51    * @version $Revision: 155459 $
52    */
53  public abstract class MessengerSupport implements Messenger {
54  
55      /** Logger */
56      private static final Log log = LogFactory.getLog(MessengerSupport.class);
57      private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
58  
59      private static final boolean CACHE_REQUESTOR = true;
60  
61      /** The name of the Messenger */
62      private String name;
63  
64      /** 
65       * Whether Queue's and Topic's are looked up using JNDI (true)
66       * or wether they should be created on the fly 
67       */
68      private boolean jndiDestinations;
69  
70      /** are topic subscribers durable? */
71      private boolean durable;
72  
73      /** the delivery mode used by default when sending messages */
74      private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
75  
76      /** the durable name used for durable topic based subscriptions */
77      private String durableName;
78  
79      /** 
80       * whether local messages are ignored when topic based subscription is used
81       * with a message selector 
82       */
83      private boolean noLocal;
84  
85      /** Should we cache the requestor object per thread? */
86      private boolean cacheRequestors;
87  
88      /** A Map of ListenerKey objects to MessageConsumer objects */
89      private Map listeners = new HashMap();
90  
91      /** whether MessageProducer instances should be cached or not */
92      private boolean cacheProducers = true;
93  
94      public MessengerSupport() {
95      }
96  
97      public String toString() {
98          try {
99              MessengerSession session = borrowMessengerSession();
100             String answer = super.toString() + " session: " + session.toString();
101             returnMessengerSession(session);
102             return answer;
103         }
104         catch (Exception e) {
105             return super.toString() + " session: " + e.toString();
106         }
107     }
108 
109     public Destination getDestination(String subject) throws JMSException {
110         MessengerSession messengerSession = borrowMessengerSession();
111         try {
112             boolean debug = destinationLog.isInfoEnabled();
113             Session session = messengerSession.getSession();
114             if (messengerSession.isTopic()) {
115                 if (debug) {
116                     destinationLog.info("Using topic: " + subject);
117                 }
118                 return getTopic((TopicSession) session, subject);
119             }
120             else {
121                 if (debug) {
122                     destinationLog.info("Using queue: " + subject);
123                 }
124                 return getQueue((QueueSession) session, subject);
125             }
126         }
127         finally {
128             returnMessengerSession(messengerSession);
129         }
130     }
131 
132     public Destination createTemporaryDestination() throws JMSException {
133         MessengerSession messengerSession = borrowMessengerSession();
134         try {
135             Session session = messengerSession.getSession();
136             if (messengerSession.isTopic()) {
137                 TopicSession topicSession = (TopicSession) session;
138                 return topicSession.createTemporaryTopic();
139             }
140             else {
141                 QueueSession queueSession = (QueueSession) session;
142                 return queueSession.createTemporaryQueue();
143             }
144         }
145         finally {
146             returnMessengerSession(messengerSession);
147         }
148     }
149 
150     public void send(Destination destination, Message message) throws JMSException {
151         MessengerSession session = borrowMessengerSession();
152         try {
153             MessageProducer producer = session.getMessageProducer(destination);
154             if (session.isTopic()) {
155                 ((TopicPublisher) producer).publish((Topic) destination, message);
156             }
157             else {
158                 ((QueueSender) producer).send((Queue) destination, message);
159             }
160         }
161         finally {
162             returnMessengerSession(session);
163         }
164     }
165 
166     public Message receive(Destination destination) throws JMSException {
167         MessengerSession session = borrowMessengerSession();
168         MessageConsumer consumer = null;
169         try {
170             consumer = borrowMessageConsumer(session, session.getSession(), destination);
171             return consumer.receive();
172         }
173         finally {
174             returnMessageConsumer(consumer);
175             returnMessengerSession(session);
176         }
177     }
178 
179     public Message receive(Destination destination, String selector) throws JMSException {
180         MessengerSession session = borrowMessengerSession();
181         MessageConsumer consumer = null;
182         try {
183             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
184             return consumer.receive();
185         }
186         finally {
187             returnMessageConsumer(consumer);
188             returnMessengerSession(session);
189         }
190     }
191 
192     public Message receive(Destination destination, long timeoutMillis) throws JMSException {
193         MessengerSession session = borrowMessengerSession();
194         MessageConsumer consumer = null;
195         try {
196             consumer = borrowMessageConsumer(session, session.getSession(), destination);
197             return consumer.receive(timeoutMillis);
198         }
199         finally {
200             returnMessageConsumer(consumer);
201             returnMessengerSession(session);
202         }
203     }
204 
205     public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
206         MessengerSession session = borrowMessengerSession();
207         MessageConsumer consumer = null;
208         try {
209             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
210             return consumer.receive(timeoutMillis);
211         }
212         finally {
213             returnMessageConsumer(consumer);
214             returnMessengerSession(session);
215         }
216     }
217 
218     public Message receiveNoWait(Destination destination) throws JMSException {
219         MessengerSession session = borrowMessengerSession();
220         MessageConsumer consumer = null;
221         try {
222             consumer = borrowMessageConsumer(session, session.getSession(), destination);
223             return consumer.receiveNoWait();
224         }
225         finally {
226             returnMessageConsumer(consumer);
227             returnMessengerSession(session);
228         }
229     }
230 
231     public Message receiveNoWait(Destination destination, String selector) throws JMSException {
232         MessengerSession session = borrowMessengerSession();
233         MessageConsumer consumer = null;
234         try {
235             consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
236             return consumer.receiveNoWait();
237         }
238         finally {
239             returnMessageConsumer(consumer);
240             returnMessengerSession(session);
241         }
242     }
243 
244     public MessageConsumer createConsumer(Destination destination) throws JMSException {
245         MessengerSession session = borrowMessengerSession();
246         try {
247             return createMessageConsumer(session, session.getSession(), destination);
248         }
249         finally {
250             returnMessengerSession(session);
251         }
252     }
253 
254     public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
255         MessengerSession session = borrowMessengerSession();
256         try {
257             return createMessageConsumer(session, session.getSession(), destination, selector);
258         }
259         finally {
260             returnMessengerSession(session);
261         }
262     }
263 
264     public void run() {
265         // don't return sessions which throw an exception
266         try {
267             MessengerSession session = borrowMessengerSession();
268             session.getSession().run();
269             returnMessengerSession(session);
270         }
271         catch (JMSException e) {
272             // ### ignore
273         }
274     }
275 
276     public ConnectionConsumer createConnectionConsumer(
277         Destination destination,
278         ServerSessionPool sessionPool,
279         int maxMessages)
280         throws JMSException {
281         return createConnectionConsumer(destination, null, sessionPool, maxMessages);
282     }
283 
284     public ConnectionConsumer createConnectionConsumer(
285         Destination destination,
286         String selector,
287         ServerSessionPool sessionPool,
288         int maxMessages)
289         throws JMSException {
290         Connection connection = getConnection();
291         if (isTopic(connection)) {
292             TopicConnection topicConnection = (TopicConnection) connection;
293             if (isDurable()) {
294                 return topicConnection.createDurableConnectionConsumer(
295                     (Topic) destination,
296                     getDurableName(),
297                     selector,
298                     sessionPool,
299                     maxMessages);
300             }
301             else {
302                 return topicConnection.createConnectionConsumer(
303                     (Topic) destination,
304                     selector,
305                     sessionPool,
306                     maxMessages);
307             }
308         }
309         else {
310             QueueConnection queueConnection = (QueueConnection) connection;
311             return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages);
312         }
313     }
314 
315     public abstract Connection getConnection() throws JMSException;
316 
317     // Listener API
318     //-------------------------------------------------------------------------
319     public void addListener(Destination destination, MessageListener listener) throws JMSException {
320         if (listener instanceof MessengerListener) {
321             MessengerListener messengerListener = (MessengerListener) listener;
322             messengerListener.setMessenger(this);
323         }
324         MessengerSession session = borrowMessengerSession();
325         try {
326             MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination);
327             consumer.setMessageListener(listener);
328             ListenerKey key = new ListenerKey(destination, listener);
329             listeners.put(key, consumer);
330         }
331         finally {
332             returnMessengerSession(session);
333         }
334     }
335 
336     public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
337 
338         if (listener instanceof MessengerListener) {
339             MessengerListener messengerListener = (MessengerListener) listener;
340             messengerListener.setMessenger(this);
341         }
342         MessengerSession session = borrowMessengerSession();
343         try {
344             MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector);
345             consumer.setMessageListener(listener);
346             ListenerKey key = new ListenerKey(destination, listener, selector);
347             listeners.put(key, consumer);
348         }
349         finally {
350             returnMessengerSession(session);
351         }
352     }
353 
354     public void removeListener(Destination destination, MessageListener listener) throws JMSException {
355         ListenerKey key = new ListenerKey(destination, listener);
356         MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
357         if (consumer == null) {
358             throw new JMSException("The given listener object has not been added for the given destination");
359         }
360         consumer.close();
361     }
362 
363     public void removeListener(Destination destination, String selector, MessageListener listener)
364         throws JMSException {
365 
366         ListenerKey key = new ListenerKey(destination, listener, selector);
367         MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
368         if (consumer == null) {
369             throw new JMSException("The given listener object has not been added for the given destination and selector");
370         }
371         consumer.close();
372     }
373 
374     // Message factory methods
375     //-------------------------------------------------------------------------
376     public BytesMessage createBytesMessage() throws JMSException {
377         MessengerSession session = borrowMessengerSession();
378         try {
379             return session.getSession().createBytesMessage();
380         }
381         finally {
382             returnMessengerSession(session);
383         }
384     }
385 
386     public MapMessage createMapMessage() throws JMSException {
387         MessengerSession session = borrowMessengerSession();
388         try {
389             return session.getSession().createMapMessage();
390         }
391         finally {
392             returnMessengerSession(session);
393         }
394     }
395 
396     public Message createMessage() throws JMSException {
397         MessengerSession session = borrowMessengerSession();
398         try {
399             return session.getSession().createMessage();
400         }
401         finally {
402             returnMessengerSession(session);
403         }
404     }
405 
406     public ObjectMessage createObjectMessage() throws JMSException {
407         MessengerSession session = borrowMessengerSession();
408         try {
409             return session.getSession().createObjectMessage();
410         }
411         finally {
412             returnMessengerSession(session);
413         }
414     }
415 
416     public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
417         MessengerSession session = borrowMessengerSession();
418         try {
419             return session.getSession().createObjectMessage(object);
420         }
421         finally {
422             returnMessengerSession(session);
423         }
424     }
425 
426     public StreamMessage createStreamMessage() throws JMSException {
427         MessengerSession session = borrowMessengerSession();
428         try {
429             return session.getSession().createStreamMessage();
430         }
431         finally {
432             returnMessengerSession(session);
433         }
434     }
435 
436     public TextMessage createTextMessage() throws JMSException {
437         MessengerSession session = borrowMessengerSession();
438         try {
439             return session.getSession().createTextMessage();
440         }
441         finally {
442             returnMessengerSession(session);
443         }
444     }
445 
446     public TextMessage createTextMessage(String text) throws JMSException {
447         MessengerSession session = borrowMessengerSession();
448         try {
449             return session.getSession().createTextMessage(text);
450         }
451         finally {
452             returnMessengerSession(session);
453         }
454     }
455 
456     public void commit() throws JMSException {
457         MessengerSession session = borrowMessengerSession();
458         try {
459             session.getSession().commit();
460         }
461         finally {
462             returnMessengerSession(session);
463         }
464     }
465 
466     public void rollback() throws JMSException {
467         MessengerSession session = borrowMessengerSession();
468         try {
469             session.getSession().rollback();
470         }
471         finally {
472             returnMessengerSession(session);
473         }
474     }
475 
476     public void close() throws JMSException {
477         getSessionFactory().close();
478     }
479 
480     /**
481      * Creates a browser on the given Queue
482      */
483     public QueueBrowser createBrowser(Destination destination) throws JMSException {
484         MessengerSession session = borrowMessengerSession();
485         QueueBrowser browser = null;
486         try {
487             return createBrowser(session, destination);
488         }
489         finally {
490             returnMessengerSession(session);
491         }
492     }
493 
494     /** Get the producer's default delivery mode. */
495     public int getDeliveryMode(Destination destination) throws JMSException {
496         MessengerSession session = borrowMessengerSession();
497         int deliveryMode = 0;
498         try {
499             MessageProducer producer = session.getMessageProducer(destination);
500             deliveryMode = producer.getDeliveryMode();
501         }
502         finally {
503             returnMessengerSession(session);
504         }
505         return deliveryMode;
506     }
507 
508     /** Set the producer's default delivery mode. */
509     public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException {
510         MessengerSession session = borrowMessengerSession();
511         MessageProducer producer = null;
512         try {
513             producer = session.getMessageProducer(destination);
514             producer.setDeliveryMode(deliveryMode);
515         }
516         finally {
517             returnMessengerSession(session);
518         }
519     }
520 
521     /**  Get the producer's default priority. */
522     public int getPriority(Destination destination) throws JMSException {
523         MessengerSession session = borrowMessengerSession();
524         MessageProducer producer = null;
525         int priority = 0;
526         try {
527             producer = session.getMessageProducer(destination);
528             priority = producer.getPriority();
529         }
530         finally {
531 
532             returnMessengerSession(session);
533         }
534         return priority;
535     }
536 
537     /**   Set the producer's default priority. */
538     public void setPriority(Destination destination, int priority) throws JMSException {
539         MessengerSession session = borrowMessengerSession();
540         MessageProducer producer = null;
541         try {
542             producer = session.getMessageProducer(destination);
543             producer.setPriority(priority);
544         }
545         finally {
546             returnMessengerSession(session);
547         }
548     }
549 
550     /**  Get the producer's default delivery mode. */
551     public long getTimeToLive(Destination destination) throws JMSException {
552         MessengerSession session = borrowMessengerSession();
553         long timeToLive = 0;
554         try {
555             MessageProducer producer = session.getMessageProducer(destination);
556             timeToLive = producer.getTimeToLive();
557         }
558         finally {
559             returnMessengerSession(session);
560         }
561         return timeToLive;
562     }
563 
564     /**  <p>Set the default length of time in milliseconds from its dispatch time that
565      *   a produced message should be retained by the message system.</p>
566      */
567     public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
568         MessengerSession session = borrowMessengerSession();
569         try {
570             MessageProducer producer = session.getMessageProducer(destination);
571             producer.setTimeToLive(timeToLive);
572         }
573         finally {
574             returnMessengerSession(session);
575         }
576     }
577 
578     /** Get an indication of whether message timestamps are disabled. */
579     public boolean getDisableMessageTimestamp(Destination destination) throws JMSException {
580         MessengerSession session = borrowMessengerSession();
581         boolean value = false;
582         try {
583             MessageProducer producer = session.getMessageProducer(destination);
584             value = producer.getDisableMessageTimestamp();
585         }
586         finally {
587             returnMessengerSession(session);
588         }
589         return value;
590     }
591     
592     /** Set whether message timestamps are disabled. */
593     public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
594         MessengerSession session = borrowMessengerSession();
595         try {
596             MessageProducer producer = session.getMessageProducer(destination);
597             producer.setDisableMessageTimestamp(value);
598         }
599         finally {
600             returnMessengerSession(session);
601         }
602     }
603 
604     /** Extends the send capability to send by specifying additional options. */
605     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
606         throws JMSException {
607         MessengerSession session = borrowMessengerSession();
608         try {
609             MessageProducer producer = session.getMessageProducer(destination);
610             if (session.isTopic()) {
611                 ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive);
612             }
613             else {
614                 ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive);
615             }
616         }
617         finally {
618             returnMessengerSession(session);
619         }
620     }
621 
622     /**  Get an indication of whether message IDs are disabled. */
623     public boolean getDisableMessageID(Destination destination) throws JMSException {
624         MessengerSession session = borrowMessengerSession();
625         boolean value = false;
626         try {
627             MessageProducer producer = session.getMessageProducer(destination);
628             value = producer.getDisableMessageID();
629         }
630         finally {
631             returnMessengerSession(session);
632         }
633         return value;
634     }
635 
636     /** Set whether message IDs are disabled. */
637     public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
638         MessengerSession session = borrowMessengerSession();
639         try {
640             MessageProducer producer = session.getMessageProducer(destination);
641             producer.setDisableMessageID(value);
642         }
643         finally {
644             returnMessengerSession(session);
645         }
646     }
647 
648     // Properties
649     //-------------------------------------------------------------------------
650     /** Gets the name that this Messenger is called in a MessengerManager */
651     public String getName() {
652         return name;
653     }
654 
655     /** Sets the name that this Messenger is called in a MessengerManager */
656     public void setName(String name) {
657         this.name = name;
658     }
659 
660     /** Setter for jndiDestinations */
661     public void setJndiDestinations(boolean jndiDestinations) {
662         this.jndiDestinations = jndiDestinations;
663     }
664 
665     /** Getter for jndiDestinations */
666     public boolean isJndiDestinations() {
667         return jndiDestinations;
668     }
669 
670     /** Gets whether topic subscribers are durable or not */
671     public boolean isDurable() {
672         return durable;
673     }
674 
675     /** Sets whether topic subscribers are durable or not */
676     public void setDurable(boolean durable) {
677         this.durable = durable;
678     }
679 
680     /** Gets whether we should cache the requestor object per thread? */
681     public boolean isCacheRequestors() {
682         return cacheRequestors;
683     }
684 
685     /** Sets whether we should cache the requestor object per thread? */
686     public void setCacheRequestors(boolean cacheRequestors) {
687         this.cacheRequestors = cacheRequestors;
688     }
689 
690     /** @return the durable name used for durable topic based subscriptions */
691     public String getDurableName() {
692         return durableName;
693     }
694 
695     /** Sets the durable name used for durable topic based subscriptions */
696     public void setDurableName(String durableName) {
697         this.durableName = durableName;
698     }
699 
700     /** 
701      * Gets whether local messages are ignored when topic based subscription is used
702      * with a message selector 
703      */
704     public boolean isNoLocal() {
705         return noLocal;
706     }
707 
708     /** 
709      * Sets whether local messages are ignored when topic based subscription is used
710      * with a message selector 
711      */
712     public void setNoLocal(boolean noLocal) {
713         this.noLocal = noLocal;
714     }
715 
716     /** Gets whether MessageProducer instances should be cached or not, which defaults to true */
717     public boolean isCacheProducers() {
718         return cacheProducers;
719     }
720 
721     /** Sets whether MessageProducer instances should be cached or not, which defaults to true */
722     public void setCacheProducers(boolean cacheProducers) {
723         this.cacheProducers = cacheProducers;
724     }
725 
726     /**
727      * Returns the delivery mode used on messages sent via this Messenger
728      * @return int
729      */
730     public int getDeliveryMode() {
731         return deliveryMode;
732     }
733 
734     /**
735      * Sets the delivery mode used on messages sent via this Messenger
736      * @param deliveryMode The deliveryMode to set
737      */
738     public void setDeliveryMode(int deliveryMode) {
739         this.deliveryMode = deliveryMode;
740     }
741 
742     /**
743      * Sets whether message delivery should be persistent or not
744      * 
745      * @param persistentDelivery
746      */
747     public void setPersistentDelivery(boolean persistentDelivery) {
748         if (persistentDelivery) {
749             setDeliveryMode(DeliveryMode.PERSISTENT);
750         }
751         else {
752             setDeliveryMode(DeliveryMode.NON_PERSISTENT);
753         }
754     }
755 
756     // Implementation methods
757     //-------------------------------------------------------------------------
758 
759     protected abstract MessengerSession borrowMessengerSession() throws JMSException;
760 
761     protected abstract void returnMessengerSession(MessengerSession session);
762 
763     protected abstract boolean isTopic(Connection connection) throws JMSException;
764 
765     protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
766 
767     /** @return a newly created message producer for the given session and destination */
768     protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException {
769 
770         MessageProducer answer = null;
771         Session session = messengerSession.getSession();
772         if (messengerSession.isTopic()) {
773             TopicSession topicSession = (TopicSession) session;
774             answer = topicSession.createPublisher((Topic) destination);
775         }
776         else {
777             QueueSession queueSession = (QueueSession) session;
778             answer = queueSession.createSender((Queue) destination);
779         }
780 
781         // configure the MessageProducer
782         if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
783             answer.setDeliveryMode(deliveryMode);
784         }
785         return answer;
786     }
787 
788     /** @return a MessageConsumer for the given session and destination */
789     protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
790         throws JMSException {
791 
792         MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination);
793 
794         if (log.isDebugEnabled()) {
795             log.debug("Created new consumer: " + consumer + " on destination: " + destination);
796         }
797 
798         return consumer;
799     }
800 
801     /** @return a MessageConsumer for the given session, destination and selector */
802     protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector)
803         throws JMSException {
804 
805         MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector);
806 
807         if (log.isDebugEnabled()) {
808             log.debug(
809                 "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector);
810         }
811 
812         return consumer;
813     }
814 
815     /** 
816      * Returns a message consumer back to the pool. 
817      * By default this method will close message consumers though we should
818      * be able to cache then
819      */
820     protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
821         if (log.isDebugEnabled()) {
822             log.debug("Closing consumer: " + messageConsumer);
823         }
824 
825         if (messageConsumer != null) {
826             messageConsumer.close();
827         }
828     }
829 
830     /** @return a new MessageConsumer for the given session and destination */
831     protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
832         throws JMSException {
833         if (messengerSession.isTopic()) {
834             TopicSession topicSession = (TopicSession) session;
835             if (isDurable()) {
836                 return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal());
837             }
838             else {
839                 return topicSession.createSubscriber((Topic) destination, null, isNoLocal());
840             }
841         }
842         else {
843             QueueSession queueSession = (QueueSession) session;
844             return queueSession.createReceiver((Queue) destination);
845         }
846     }
847 
848     /** @return a new MessageConsumer for the given session, destination and selector */
849     protected MessageConsumer createMessageConsumer(
850         MessengerSession messengerSession,
851         Session session,
852         Destination destination,
853         String selector)
854         throws JMSException {
855         if (messengerSession.isTopic()) {
856             TopicSession topicSession = (TopicSession) session;
857             if (isDurable()) {
858                 return topicSession.createDurableSubscriber(
859                     (Topic) destination,
860                     getDurableName(),
861                     selector,
862                     isNoLocal());
863             }
864             else {
865                 return topicSession.createSubscriber((Topic) destination, selector, isNoLocal());
866             }
867         }
868         else {
869             QueueSession queueSession = (QueueSession) session;
870             return queueSession.createReceiver((Queue) destination, selector);
871         }
872     }
873 
874     /** @return a new QueueBrowser for the given session and destination */
875     protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException {
876         if (session.isTopic()) {
877             return null;
878         }
879         else {
880             QueueSession queueSession = (QueueSession) session.getSession();
881             return queueSession.createBrowser((Queue) destination);
882         }
883     }
884 
885     protected Queue getQueue(QueueSession session, String subject) throws JMSException {
886         // XXXX: might want to cache
887         return session.createQueue(subject);
888     }
889 
890     protected Topic getTopic(TopicSession session, String subject) throws JMSException {
891         // XXXX: might want to cache
892         return session.createTopic(subject);
893     }
894 }