001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     *
008     * $Id: MessengerSupport.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messenger;
011    
012    import java.io.Serializable;
013    import java.util.HashMap;
014    import java.util.Map;
015    
016    import javax.jms.BytesMessage;
017    import javax.jms.Connection;
018    import javax.jms.ConnectionConsumer;
019    import javax.jms.ConnectionFactory;
020    import javax.jms.DeliveryMode;
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import javax.jms.MapMessage;
024    import javax.jms.Message;
025    import javax.jms.MessageConsumer;
026    import javax.jms.MessageListener;
027    import javax.jms.MessageProducer;
028    import javax.jms.ObjectMessage;
029    import javax.jms.Queue;
030    import javax.jms.QueueBrowser;
031    import javax.jms.QueueConnection;
032    import javax.jms.QueueSender;
033    import javax.jms.QueueSession;
034    import javax.jms.ServerSessionPool;
035    import javax.jms.Session;
036    import javax.jms.StreamMessage;
037    import javax.jms.TextMessage;
038    import javax.jms.Topic;
039    import javax.jms.TopicConnection;
040    import javax.jms.TopicPublisher;
041    import javax.jms.TopicSession;
042    
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    /** <p><code>MessengerSupport</code> is an abstract base class which implements
047      * most of the functionality of Messenger. Derivations need to specify the
048      * connection and session creation and the pooling strategy.</p>
049      *
050      * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
051      * @version $Revision: 155459 $
052      */
053    public abstract class MessengerSupport implements Messenger {
054    
055        /** Logger */
056        private static final Log log = LogFactory.getLog(MessengerSupport.class);
057        private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
058    
059        private static final boolean CACHE_REQUESTOR = true;
060    
061        /** The name of the Messenger */
062        private String name;
063    
064        /** 
065         * Whether Queue's and Topic's are looked up using JNDI (true)
066         * or wether they should be created on the fly 
067         */
068        private boolean jndiDestinations;
069    
070        /** are topic subscribers durable? */
071        private boolean durable;
072    
073        /** the delivery mode used by default when sending messages */
074        private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
075    
076        /** the durable name used for durable topic based subscriptions */
077        private String durableName;
078    
079        /** 
080         * whether local messages are ignored when topic based subscription is used
081         * with a message selector 
082         */
083        private boolean noLocal;
084    
085        /** Should we cache the requestor object per thread? */
086        private boolean cacheRequestors;
087    
088        /** A Map of ListenerKey objects to MessageConsumer objects */
089        private Map listeners = new HashMap();
090    
091        /** whether MessageProducer instances should be cached or not */
092        private boolean cacheProducers = true;
093    
094        public MessengerSupport() {
095        }
096    
097        public String toString() {
098            try {
099                MessengerSession session = borrowMessengerSession();
100                String answer = super.toString() + " session: " + session.toString();
101                returnMessengerSession(session);
102                return answer;
103            }
104            catch (Exception e) {
105                return super.toString() + " session: " + e.toString();
106            }
107        }
108    
109        public Destination getDestination(String subject) throws JMSException {
110            MessengerSession messengerSession = borrowMessengerSession();
111            try {
112                boolean debug = destinationLog.isInfoEnabled();
113                Session session = messengerSession.getSession();
114                if (messengerSession.isTopic()) {
115                    if (debug) {
116                        destinationLog.info("Using topic: " + subject);
117                    }
118                    return getTopic((TopicSession) session, subject);
119                }
120                else {
121                    if (debug) {
122                        destinationLog.info("Using queue: " + subject);
123                    }
124                    return getQueue((QueueSession) session, subject);
125                }
126            }
127            finally {
128                returnMessengerSession(messengerSession);
129            }
130        }
131    
132        public Destination createTemporaryDestination() throws JMSException {
133            MessengerSession messengerSession = borrowMessengerSession();
134            try {
135                Session session = messengerSession.getSession();
136                if (messengerSession.isTopic()) {
137                    TopicSession topicSession = (TopicSession) session;
138                    return topicSession.createTemporaryTopic();
139                }
140                else {
141                    QueueSession queueSession = (QueueSession) session;
142                    return queueSession.createTemporaryQueue();
143                }
144            }
145            finally {
146                returnMessengerSession(messengerSession);
147            }
148        }
149    
150        public void send(Destination destination, Message message) throws JMSException {
151            MessengerSession session = borrowMessengerSession();
152            try {
153                MessageProducer producer = session.getMessageProducer(destination);
154                if (session.isTopic()) {
155                    ((TopicPublisher) producer).publish((Topic) destination, message);
156                }
157                else {
158                    ((QueueSender) producer).send((Queue) destination, message);
159                }
160            }
161            finally {
162                returnMessengerSession(session);
163            }
164        }
165    
166        public Message receive(Destination destination) throws JMSException {
167            MessengerSession session = borrowMessengerSession();
168            MessageConsumer consumer = null;
169            try {
170                consumer = borrowMessageConsumer(session, session.getSession(), destination);
171                return consumer.receive();
172            }
173            finally {
174                returnMessageConsumer(consumer);
175                returnMessengerSession(session);
176            }
177        }
178    
179        public Message receive(Destination destination, String selector) throws JMSException {
180            MessengerSession session = borrowMessengerSession();
181            MessageConsumer consumer = null;
182            try {
183                consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
184                return consumer.receive();
185            }
186            finally {
187                returnMessageConsumer(consumer);
188                returnMessengerSession(session);
189            }
190        }
191    
192        public Message receive(Destination destination, long timeoutMillis) throws JMSException {
193            MessengerSession session = borrowMessengerSession();
194            MessageConsumer consumer = null;
195            try {
196                consumer = borrowMessageConsumer(session, session.getSession(), destination);
197                return consumer.receive(timeoutMillis);
198            }
199            finally {
200                returnMessageConsumer(consumer);
201                returnMessengerSession(session);
202            }
203        }
204    
205        public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
206            MessengerSession session = borrowMessengerSession();
207            MessageConsumer consumer = null;
208            try {
209                consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
210                return consumer.receive(timeoutMillis);
211            }
212            finally {
213                returnMessageConsumer(consumer);
214                returnMessengerSession(session);
215            }
216        }
217    
218        public Message receiveNoWait(Destination destination) throws JMSException {
219            MessengerSession session = borrowMessengerSession();
220            MessageConsumer consumer = null;
221            try {
222                consumer = borrowMessageConsumer(session, session.getSession(), destination);
223                return consumer.receiveNoWait();
224            }
225            finally {
226                returnMessageConsumer(consumer);
227                returnMessengerSession(session);
228            }
229        }
230    
231        public Message receiveNoWait(Destination destination, String selector) throws JMSException {
232            MessengerSession session = borrowMessengerSession();
233            MessageConsumer consumer = null;
234            try {
235                consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
236                return consumer.receiveNoWait();
237            }
238            finally {
239                returnMessageConsumer(consumer);
240                returnMessengerSession(session);
241            }
242        }
243    
244        public MessageConsumer createConsumer(Destination destination) throws JMSException {
245            MessengerSession session = borrowMessengerSession();
246            try {
247                return createMessageConsumer(session, session.getSession(), destination);
248            }
249            finally {
250                returnMessengerSession(session);
251            }
252        }
253    
254        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
255            MessengerSession session = borrowMessengerSession();
256            try {
257                return createMessageConsumer(session, session.getSession(), destination, selector);
258            }
259            finally {
260                returnMessengerSession(session);
261            }
262        }
263    
264        public void run() {
265            // don't return sessions which throw an exception
266            try {
267                MessengerSession session = borrowMessengerSession();
268                session.getSession().run();
269                returnMessengerSession(session);
270            }
271            catch (JMSException e) {
272                // ### ignore
273            }
274        }
275    
276        public ConnectionConsumer createConnectionConsumer(
277            Destination destination,
278            ServerSessionPool sessionPool,
279            int maxMessages)
280            throws JMSException {
281            return createConnectionConsumer(destination, null, sessionPool, maxMessages);
282        }
283    
284        public ConnectionConsumer createConnectionConsumer(
285            Destination destination,
286            String selector,
287            ServerSessionPool sessionPool,
288            int maxMessages)
289            throws JMSException {
290            Connection connection = getConnection();
291            if (isTopic(connection)) {
292                TopicConnection topicConnection = (TopicConnection) connection;
293                if (isDurable()) {
294                    return topicConnection.createDurableConnectionConsumer(
295                        (Topic) destination,
296                        getDurableName(),
297                        selector,
298                        sessionPool,
299                        maxMessages);
300                }
301                else {
302                    return topicConnection.createConnectionConsumer(
303                        (Topic) destination,
304                        selector,
305                        sessionPool,
306                        maxMessages);
307                }
308            }
309            else {
310                QueueConnection queueConnection = (QueueConnection) connection;
311                return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages);
312            }
313        }
314    
315        public abstract Connection getConnection() throws JMSException;
316    
317        // Listener API
318        //-------------------------------------------------------------------------
319        public void addListener(Destination destination, MessageListener listener) throws JMSException {
320            if (listener instanceof MessengerListener) {
321                MessengerListener messengerListener = (MessengerListener) listener;
322                messengerListener.setMessenger(this);
323            }
324            MessengerSession session = borrowMessengerSession();
325            try {
326                MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination);
327                consumer.setMessageListener(listener);
328                ListenerKey key = new ListenerKey(destination, listener);
329                listeners.put(key, consumer);
330            }
331            finally {
332                returnMessengerSession(session);
333            }
334        }
335    
336        public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
337    
338            if (listener instanceof MessengerListener) {
339                MessengerListener messengerListener = (MessengerListener) listener;
340                messengerListener.setMessenger(this);
341            }
342            MessengerSession session = borrowMessengerSession();
343            try {
344                MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector);
345                consumer.setMessageListener(listener);
346                ListenerKey key = new ListenerKey(destination, listener, selector);
347                listeners.put(key, consumer);
348            }
349            finally {
350                returnMessengerSession(session);
351            }
352        }
353    
354        public void removeListener(Destination destination, MessageListener listener) throws JMSException {
355            ListenerKey key = new ListenerKey(destination, listener);
356            MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
357            if (consumer == null) {
358                throw new JMSException("The given listener object has not been added for the given destination");
359            }
360            consumer.close();
361        }
362    
363        public void removeListener(Destination destination, String selector, MessageListener listener)
364            throws JMSException {
365    
366            ListenerKey key = new ListenerKey(destination, listener, selector);
367            MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
368            if (consumer == null) {
369                throw new JMSException("The given listener object has not been added for the given destination and selector");
370            }
371            consumer.close();
372        }
373    
374        // Message factory methods
375        //-------------------------------------------------------------------------
376        public BytesMessage createBytesMessage() throws JMSException {
377            MessengerSession session = borrowMessengerSession();
378            try {
379                return session.getSession().createBytesMessage();
380            }
381            finally {
382                returnMessengerSession(session);
383            }
384        }
385    
386        public MapMessage createMapMessage() throws JMSException {
387            MessengerSession session = borrowMessengerSession();
388            try {
389                return session.getSession().createMapMessage();
390            }
391            finally {
392                returnMessengerSession(session);
393            }
394        }
395    
396        public Message createMessage() throws JMSException {
397            MessengerSession session = borrowMessengerSession();
398            try {
399                return session.getSession().createMessage();
400            }
401            finally {
402                returnMessengerSession(session);
403            }
404        }
405    
406        public ObjectMessage createObjectMessage() throws JMSException {
407            MessengerSession session = borrowMessengerSession();
408            try {
409                return session.getSession().createObjectMessage();
410            }
411            finally {
412                returnMessengerSession(session);
413            }
414        }
415    
416        public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
417            MessengerSession session = borrowMessengerSession();
418            try {
419                return session.getSession().createObjectMessage(object);
420            }
421            finally {
422                returnMessengerSession(session);
423            }
424        }
425    
426        public StreamMessage createStreamMessage() throws JMSException {
427            MessengerSession session = borrowMessengerSession();
428            try {
429                return session.getSession().createStreamMessage();
430            }
431            finally {
432                returnMessengerSession(session);
433            }
434        }
435    
436        public TextMessage createTextMessage() throws JMSException {
437            MessengerSession session = borrowMessengerSession();
438            try {
439                return session.getSession().createTextMessage();
440            }
441            finally {
442                returnMessengerSession(session);
443            }
444        }
445    
446        public TextMessage createTextMessage(String text) throws JMSException {
447            MessengerSession session = borrowMessengerSession();
448            try {
449                return session.getSession().createTextMessage(text);
450            }
451            finally {
452                returnMessengerSession(session);
453            }
454        }
455    
456        public void commit() throws JMSException {
457            MessengerSession session = borrowMessengerSession();
458            try {
459                session.getSession().commit();
460            }
461            finally {
462                returnMessengerSession(session);
463            }
464        }
465    
466        public void rollback() throws JMSException {
467            MessengerSession session = borrowMessengerSession();
468            try {
469                session.getSession().rollback();
470            }
471            finally {
472                returnMessengerSession(session);
473            }
474        }
475    
476        public void close() throws JMSException {
477            getSessionFactory().close();
478        }
479    
480        /**
481         * Creates a browser on the given Queue
482         */
483        public QueueBrowser createBrowser(Destination destination) throws JMSException {
484            MessengerSession session = borrowMessengerSession();
485            QueueBrowser browser = null;
486            try {
487                return createBrowser(session, destination);
488            }
489            finally {
490                returnMessengerSession(session);
491            }
492        }
493    
494        /** Get the producer's default delivery mode. */
495        public int getDeliveryMode(Destination destination) throws JMSException {
496            MessengerSession session = borrowMessengerSession();
497            int deliveryMode = 0;
498            try {
499                MessageProducer producer = session.getMessageProducer(destination);
500                deliveryMode = producer.getDeliveryMode();
501            }
502            finally {
503                returnMessengerSession(session);
504            }
505            return deliveryMode;
506        }
507    
508        /** Set the producer's default delivery mode. */
509        public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException {
510            MessengerSession session = borrowMessengerSession();
511            MessageProducer producer = null;
512            try {
513                producer = session.getMessageProducer(destination);
514                producer.setDeliveryMode(deliveryMode);
515            }
516            finally {
517                returnMessengerSession(session);
518            }
519        }
520    
521        /**  Get the producer's default priority. */
522        public int getPriority(Destination destination) throws JMSException {
523            MessengerSession session = borrowMessengerSession();
524            MessageProducer producer = null;
525            int priority = 0;
526            try {
527                producer = session.getMessageProducer(destination);
528                priority = producer.getPriority();
529            }
530            finally {
531    
532                returnMessengerSession(session);
533            }
534            return priority;
535        }
536    
537        /**   Set the producer's default priority. */
538        public void setPriority(Destination destination, int priority) throws JMSException {
539            MessengerSession session = borrowMessengerSession();
540            MessageProducer producer = null;
541            try {
542                producer = session.getMessageProducer(destination);
543                producer.setPriority(priority);
544            }
545            finally {
546                returnMessengerSession(session);
547            }
548        }
549    
550        /**  Get the producer's default delivery mode. */
551        public long getTimeToLive(Destination destination) throws JMSException {
552            MessengerSession session = borrowMessengerSession();
553            long timeToLive = 0;
554            try {
555                MessageProducer producer = session.getMessageProducer(destination);
556                timeToLive = producer.getTimeToLive();
557            }
558            finally {
559                returnMessengerSession(session);
560            }
561            return timeToLive;
562        }
563    
564        /**  <p>Set the default length of time in milliseconds from its dispatch time that
565         *   a produced message should be retained by the message system.</p>
566         */
567        public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
568            MessengerSession session = borrowMessengerSession();
569            try {
570                MessageProducer producer = session.getMessageProducer(destination);
571                producer.setTimeToLive(timeToLive);
572            }
573            finally {
574                returnMessengerSession(session);
575            }
576        }
577    
578        /** Get an indication of whether message timestamps are disabled. */
579        public boolean getDisableMessageTimestamp(Destination destination) throws JMSException {
580            MessengerSession session = borrowMessengerSession();
581            boolean value = false;
582            try {
583                MessageProducer producer = session.getMessageProducer(destination);
584                value = producer.getDisableMessageTimestamp();
585            }
586            finally {
587                returnMessengerSession(session);
588            }
589            return value;
590        }
591        
592        /** Set whether message timestamps are disabled. */
593        public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
594            MessengerSession session = borrowMessengerSession();
595            try {
596                MessageProducer producer = session.getMessageProducer(destination);
597                producer.setDisableMessageTimestamp(value);
598            }
599            finally {
600                returnMessengerSession(session);
601            }
602        }
603    
604        /** Extends the send capability to send by specifying additional options. */
605        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
606            throws JMSException {
607            MessengerSession session = borrowMessengerSession();
608            try {
609                MessageProducer producer = session.getMessageProducer(destination);
610                if (session.isTopic()) {
611                    ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive);
612                }
613                else {
614                    ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive);
615                }
616            }
617            finally {
618                returnMessengerSession(session);
619            }
620        }
621    
622        /**  Get an indication of whether message IDs are disabled. */
623        public boolean getDisableMessageID(Destination destination) throws JMSException {
624            MessengerSession session = borrowMessengerSession();
625            boolean value = false;
626            try {
627                MessageProducer producer = session.getMessageProducer(destination);
628                value = producer.getDisableMessageID();
629            }
630            finally {
631                returnMessengerSession(session);
632            }
633            return value;
634        }
635    
636        /** Set whether message IDs are disabled. */
637        public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
638            MessengerSession session = borrowMessengerSession();
639            try {
640                MessageProducer producer = session.getMessageProducer(destination);
641                producer.setDisableMessageID(value);
642            }
643            finally {
644                returnMessengerSession(session);
645            }
646        }
647    
648        // Properties
649        //-------------------------------------------------------------------------
650        /** Gets the name that this Messenger is called in a MessengerManager */
651        public String getName() {
652            return name;
653        }
654    
655        /** Sets the name that this Messenger is called in a MessengerManager */
656        public void setName(String name) {
657            this.name = name;
658        }
659    
660        /** Setter for jndiDestinations */
661        public void setJndiDestinations(boolean jndiDestinations) {
662            this.jndiDestinations = jndiDestinations;
663        }
664    
665        /** Getter for jndiDestinations */
666        public boolean isJndiDestinations() {
667            return jndiDestinations;
668        }
669    
670        /** Gets whether topic subscribers are durable or not */
671        public boolean isDurable() {
672            return durable;
673        }
674    
675        /** Sets whether topic subscribers are durable or not */
676        public void setDurable(boolean durable) {
677            this.durable = durable;
678        }
679    
680        /** Gets whether we should cache the requestor object per thread? */
681        public boolean isCacheRequestors() {
682            return cacheRequestors;
683        }
684    
685        /** Sets whether we should cache the requestor object per thread? */
686        public void setCacheRequestors(boolean cacheRequestors) {
687            this.cacheRequestors = cacheRequestors;
688        }
689    
690        /** @return the durable name used for durable topic based subscriptions */
691        public String getDurableName() {
692            return durableName;
693        }
694    
695        /** Sets the durable name used for durable topic based subscriptions */
696        public void setDurableName(String durableName) {
697            this.durableName = durableName;
698        }
699    
700        /** 
701         * Gets whether local messages are ignored when topic based subscription is used
702         * with a message selector 
703         */
704        public boolean isNoLocal() {
705            return noLocal;
706        }
707    
708        /** 
709         * Sets whether local messages are ignored when topic based subscription is used
710         * with a message selector 
711         */
712        public void setNoLocal(boolean noLocal) {
713            this.noLocal = noLocal;
714        }
715    
716        /** Gets whether MessageProducer instances should be cached or not, which defaults to true */
717        public boolean isCacheProducers() {
718            return cacheProducers;
719        }
720    
721        /** Sets whether MessageProducer instances should be cached or not, which defaults to true */
722        public void setCacheProducers(boolean cacheProducers) {
723            this.cacheProducers = cacheProducers;
724        }
725    
726        /**
727         * Returns the delivery mode used on messages sent via this Messenger
728         * @return int
729         */
730        public int getDeliveryMode() {
731            return deliveryMode;
732        }
733    
734        /**
735         * Sets the delivery mode used on messages sent via this Messenger
736         * @param deliveryMode The deliveryMode to set
737         */
738        public void setDeliveryMode(int deliveryMode) {
739            this.deliveryMode = deliveryMode;
740        }
741    
742        /**
743         * Sets whether message delivery should be persistent or not
744         * 
745         * @param persistentDelivery
746         */
747        public void setPersistentDelivery(boolean persistentDelivery) {
748            if (persistentDelivery) {
749                setDeliveryMode(DeliveryMode.PERSISTENT);
750            }
751            else {
752                setDeliveryMode(DeliveryMode.NON_PERSISTENT);
753            }
754        }
755    
756        // Implementation methods
757        //-------------------------------------------------------------------------
758    
759        protected abstract MessengerSession borrowMessengerSession() throws JMSException;
760    
761        protected abstract void returnMessengerSession(MessengerSession session);
762    
763        protected abstract boolean isTopic(Connection connection) throws JMSException;
764    
765        protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
766    
767        /** @return a newly created message producer for the given session and destination */
768        protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException {
769    
770            MessageProducer answer = null;
771            Session session = messengerSession.getSession();
772            if (messengerSession.isTopic()) {
773                TopicSession topicSession = (TopicSession) session;
774                answer = topicSession.createPublisher((Topic) destination);
775            }
776            else {
777                QueueSession queueSession = (QueueSession) session;
778                answer = queueSession.createSender((Queue) destination);
779            }
780    
781            // configure the MessageProducer
782            if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
783                answer.setDeliveryMode(deliveryMode);
784            }
785            return answer;
786        }
787    
788        /** @return a MessageConsumer for the given session and destination */
789        protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
790            throws JMSException {
791    
792            MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination);
793    
794            if (log.isDebugEnabled()) {
795                log.debug("Created new consumer: " + consumer + " on destination: " + destination);
796            }
797    
798            return consumer;
799        }
800    
801        /** @return a MessageConsumer for the given session, destination and selector */
802        protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector)
803            throws JMSException {
804    
805            MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector);
806    
807            if (log.isDebugEnabled()) {
808                log.debug(
809                    "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector);
810            }
811    
812            return consumer;
813        }
814    
815        /** 
816         * Returns a message consumer back to the pool. 
817         * By default this method will close message consumers though we should
818         * be able to cache then
819         */
820        protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
821            if (log.isDebugEnabled()) {
822                log.debug("Closing consumer: " + messageConsumer);
823            }
824    
825            if (messageConsumer != null) {
826                messageConsumer.close();
827            }
828        }
829    
830        /** @return a new MessageConsumer for the given session and destination */
831        protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
832            throws JMSException {
833            if (messengerSession.isTopic()) {
834                TopicSession topicSession = (TopicSession) session;
835                if (isDurable()) {
836                    return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal());
837                }
838                else {
839                    return topicSession.createSubscriber((Topic) destination, null, isNoLocal());
840                }
841            }
842            else {
843                QueueSession queueSession = (QueueSession) session;
844                return queueSession.createReceiver((Queue) destination);
845            }
846        }
847    
848        /** @return a new MessageConsumer for the given session, destination and selector */
849        protected MessageConsumer createMessageConsumer(
850            MessengerSession messengerSession,
851            Session session,
852            Destination destination,
853            String selector)
854            throws JMSException {
855            if (messengerSession.isTopic()) {
856                TopicSession topicSession = (TopicSession) session;
857                if (isDurable()) {
858                    return topicSession.createDurableSubscriber(
859                        (Topic) destination,
860                        getDurableName(),
861                        selector,
862                        isNoLocal());
863                }
864                else {
865                    return topicSession.createSubscriber((Topic) destination, selector, isNoLocal());
866                }
867            }
868            else {
869                QueueSession queueSession = (QueueSession) session;
870                return queueSession.createReceiver((Queue) destination, selector);
871            }
872        }
873    
874        /** @return a new QueueBrowser for the given session and destination */
875        protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException {
876            if (session.isTopic()) {
877                return null;
878            }
879            else {
880                QueueSession queueSession = (QueueSession) session.getSession();
881                return queueSession.createBrowser((Queue) destination);
882            }
883        }
884    
885        protected Queue getQueue(QueueSession session, String subject) throws JMSException {
886            // XXXX: might want to cache
887            return session.createQueue(subject);
888        }
889    
890        protected Topic getTopic(TopicSession session, String subject) throws JMSException {
891            // XXXX: might want to cache
892            return session.createTopic(subject);
893        }
894    }