001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: MessengerSupport.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messenger; 011 012 import java.io.Serializable; 013 import java.util.HashMap; 014 import java.util.Map; 015 016 import javax.jms.BytesMessage; 017 import javax.jms.Connection; 018 import javax.jms.ConnectionConsumer; 019 import javax.jms.ConnectionFactory; 020 import javax.jms.DeliveryMode; 021 import javax.jms.Destination; 022 import javax.jms.JMSException; 023 import javax.jms.MapMessage; 024 import javax.jms.Message; 025 import javax.jms.MessageConsumer; 026 import javax.jms.MessageListener; 027 import javax.jms.MessageProducer; 028 import javax.jms.ObjectMessage; 029 import javax.jms.Queue; 030 import javax.jms.QueueBrowser; 031 import javax.jms.QueueConnection; 032 import javax.jms.QueueSender; 033 import javax.jms.QueueSession; 034 import javax.jms.ServerSessionPool; 035 import javax.jms.Session; 036 import javax.jms.StreamMessage; 037 import javax.jms.TextMessage; 038 import javax.jms.Topic; 039 import javax.jms.TopicConnection; 040 import javax.jms.TopicPublisher; 041 import javax.jms.TopicSession; 042 043 import org.apache.commons.logging.Log; 044 import org.apache.commons.logging.LogFactory; 045 046 /** <p><code>MessengerSupport</code> is an abstract base class which implements 047 * most of the functionality of Messenger. Derivations need to specify the 048 * connection and session creation and the pooling strategy.</p> 049 * 050 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 051 * @version $Revision: 155459 $ 052 */ 053 public abstract class MessengerSupport implements Messenger { 054 055 /** Logger */ 056 private static final Log log = LogFactory.getLog(MessengerSupport.class); 057 private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination"); 058 059 private static final boolean CACHE_REQUESTOR = true; 060 061 /** The name of the Messenger */ 062 private String name; 063 064 /** 065 * Whether Queue's and Topic's are looked up using JNDI (true) 066 * or wether they should be created on the fly 067 */ 068 private boolean jndiDestinations; 069 070 /** are topic subscribers durable? */ 071 private boolean durable; 072 073 /** the delivery mode used by default when sending messages */ 074 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; 075 076 /** the durable name used for durable topic based subscriptions */ 077 private String durableName; 078 079 /** 080 * whether local messages are ignored when topic based subscription is used 081 * with a message selector 082 */ 083 private boolean noLocal; 084 085 /** Should we cache the requestor object per thread? */ 086 private boolean cacheRequestors; 087 088 /** A Map of ListenerKey objects to MessageConsumer objects */ 089 private Map listeners = new HashMap(); 090 091 /** whether MessageProducer instances should be cached or not */ 092 private boolean cacheProducers = true; 093 094 public MessengerSupport() { 095 } 096 097 public String toString() { 098 try { 099 MessengerSession session = borrowMessengerSession(); 100 String answer = super.toString() + " session: " + session.toString(); 101 returnMessengerSession(session); 102 return answer; 103 } 104 catch (Exception e) { 105 return super.toString() + " session: " + e.toString(); 106 } 107 } 108 109 public Destination getDestination(String subject) throws JMSException { 110 MessengerSession messengerSession = borrowMessengerSession(); 111 try { 112 boolean debug = destinationLog.isInfoEnabled(); 113 Session session = messengerSession.getSession(); 114 if (messengerSession.isTopic()) { 115 if (debug) { 116 destinationLog.info("Using topic: " + subject); 117 } 118 return getTopic((TopicSession) session, subject); 119 } 120 else { 121 if (debug) { 122 destinationLog.info("Using queue: " + subject); 123 } 124 return getQueue((QueueSession) session, subject); 125 } 126 } 127 finally { 128 returnMessengerSession(messengerSession); 129 } 130 } 131 132 public Destination createTemporaryDestination() throws JMSException { 133 MessengerSession messengerSession = borrowMessengerSession(); 134 try { 135 Session session = messengerSession.getSession(); 136 if (messengerSession.isTopic()) { 137 TopicSession topicSession = (TopicSession) session; 138 return topicSession.createTemporaryTopic(); 139 } 140 else { 141 QueueSession queueSession = (QueueSession) session; 142 return queueSession.createTemporaryQueue(); 143 } 144 } 145 finally { 146 returnMessengerSession(messengerSession); 147 } 148 } 149 150 public void send(Destination destination, Message message) throws JMSException { 151 MessengerSession session = borrowMessengerSession(); 152 try { 153 MessageProducer producer = session.getMessageProducer(destination); 154 if (session.isTopic()) { 155 ((TopicPublisher) producer).publish((Topic) destination, message); 156 } 157 else { 158 ((QueueSender) producer).send((Queue) destination, message); 159 } 160 } 161 finally { 162 returnMessengerSession(session); 163 } 164 } 165 166 public Message receive(Destination destination) throws JMSException { 167 MessengerSession session = borrowMessengerSession(); 168 MessageConsumer consumer = null; 169 try { 170 consumer = borrowMessageConsumer(session, session.getSession(), destination); 171 return consumer.receive(); 172 } 173 finally { 174 returnMessageConsumer(consumer); 175 returnMessengerSession(session); 176 } 177 } 178 179 public Message receive(Destination destination, String selector) throws JMSException { 180 MessengerSession session = borrowMessengerSession(); 181 MessageConsumer consumer = null; 182 try { 183 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); 184 return consumer.receive(); 185 } 186 finally { 187 returnMessageConsumer(consumer); 188 returnMessengerSession(session); 189 } 190 } 191 192 public Message receive(Destination destination, long timeoutMillis) throws JMSException { 193 MessengerSession session = borrowMessengerSession(); 194 MessageConsumer consumer = null; 195 try { 196 consumer = borrowMessageConsumer(session, session.getSession(), destination); 197 return consumer.receive(timeoutMillis); 198 } 199 finally { 200 returnMessageConsumer(consumer); 201 returnMessengerSession(session); 202 } 203 } 204 205 public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException { 206 MessengerSession session = borrowMessengerSession(); 207 MessageConsumer consumer = null; 208 try { 209 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); 210 return consumer.receive(timeoutMillis); 211 } 212 finally { 213 returnMessageConsumer(consumer); 214 returnMessengerSession(session); 215 } 216 } 217 218 public Message receiveNoWait(Destination destination) throws JMSException { 219 MessengerSession session = borrowMessengerSession(); 220 MessageConsumer consumer = null; 221 try { 222 consumer = borrowMessageConsumer(session, session.getSession(), destination); 223 return consumer.receiveNoWait(); 224 } 225 finally { 226 returnMessageConsumer(consumer); 227 returnMessengerSession(session); 228 } 229 } 230 231 public Message receiveNoWait(Destination destination, String selector) throws JMSException { 232 MessengerSession session = borrowMessengerSession(); 233 MessageConsumer consumer = null; 234 try { 235 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector); 236 return consumer.receiveNoWait(); 237 } 238 finally { 239 returnMessageConsumer(consumer); 240 returnMessengerSession(session); 241 } 242 } 243 244 public MessageConsumer createConsumer(Destination destination) throws JMSException { 245 MessengerSession session = borrowMessengerSession(); 246 try { 247 return createMessageConsumer(session, session.getSession(), destination); 248 } 249 finally { 250 returnMessengerSession(session); 251 } 252 } 253 254 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 255 MessengerSession session = borrowMessengerSession(); 256 try { 257 return createMessageConsumer(session, session.getSession(), destination, selector); 258 } 259 finally { 260 returnMessengerSession(session); 261 } 262 } 263 264 public void run() { 265 // don't return sessions which throw an exception 266 try { 267 MessengerSession session = borrowMessengerSession(); 268 session.getSession().run(); 269 returnMessengerSession(session); 270 } 271 catch (JMSException e) { 272 // ### ignore 273 } 274 } 275 276 public ConnectionConsumer createConnectionConsumer( 277 Destination destination, 278 ServerSessionPool sessionPool, 279 int maxMessages) 280 throws JMSException { 281 return createConnectionConsumer(destination, null, sessionPool, maxMessages); 282 } 283 284 public ConnectionConsumer createConnectionConsumer( 285 Destination destination, 286 String selector, 287 ServerSessionPool sessionPool, 288 int maxMessages) 289 throws JMSException { 290 Connection connection = getConnection(); 291 if (isTopic(connection)) { 292 TopicConnection topicConnection = (TopicConnection) connection; 293 if (isDurable()) { 294 return topicConnection.createDurableConnectionConsumer( 295 (Topic) destination, 296 getDurableName(), 297 selector, 298 sessionPool, 299 maxMessages); 300 } 301 else { 302 return topicConnection.createConnectionConsumer( 303 (Topic) destination, 304 selector, 305 sessionPool, 306 maxMessages); 307 } 308 } 309 else { 310 QueueConnection queueConnection = (QueueConnection) connection; 311 return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages); 312 } 313 } 314 315 public abstract Connection getConnection() throws JMSException; 316 317 // Listener API 318 //------------------------------------------------------------------------- 319 public void addListener(Destination destination, MessageListener listener) throws JMSException { 320 if (listener instanceof MessengerListener) { 321 MessengerListener messengerListener = (MessengerListener) listener; 322 messengerListener.setMessenger(this); 323 } 324 MessengerSession session = borrowMessengerSession(); 325 try { 326 MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination); 327 consumer.setMessageListener(listener); 328 ListenerKey key = new ListenerKey(destination, listener); 329 listeners.put(key, consumer); 330 } 331 finally { 332 returnMessengerSession(session); 333 } 334 } 335 336 public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException { 337 338 if (listener instanceof MessengerListener) { 339 MessengerListener messengerListener = (MessengerListener) listener; 340 messengerListener.setMessenger(this); 341 } 342 MessengerSession session = borrowMessengerSession(); 343 try { 344 MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector); 345 consumer.setMessageListener(listener); 346 ListenerKey key = new ListenerKey(destination, listener, selector); 347 listeners.put(key, consumer); 348 } 349 finally { 350 returnMessengerSession(session); 351 } 352 } 353 354 public void removeListener(Destination destination, MessageListener listener) throws JMSException { 355 ListenerKey key = new ListenerKey(destination, listener); 356 MessageConsumer consumer = (MessageConsumer) listeners.remove(key); 357 if (consumer == null) { 358 throw new JMSException("The given listener object has not been added for the given destination"); 359 } 360 consumer.close(); 361 } 362 363 public void removeListener(Destination destination, String selector, MessageListener listener) 364 throws JMSException { 365 366 ListenerKey key = new ListenerKey(destination, listener, selector); 367 MessageConsumer consumer = (MessageConsumer) listeners.remove(key); 368 if (consumer == null) { 369 throw new JMSException("The given listener object has not been added for the given destination and selector"); 370 } 371 consumer.close(); 372 } 373 374 // Message factory methods 375 //------------------------------------------------------------------------- 376 public BytesMessage createBytesMessage() throws JMSException { 377 MessengerSession session = borrowMessengerSession(); 378 try { 379 return session.getSession().createBytesMessage(); 380 } 381 finally { 382 returnMessengerSession(session); 383 } 384 } 385 386 public MapMessage createMapMessage() throws JMSException { 387 MessengerSession session = borrowMessengerSession(); 388 try { 389 return session.getSession().createMapMessage(); 390 } 391 finally { 392 returnMessengerSession(session); 393 } 394 } 395 396 public Message createMessage() throws JMSException { 397 MessengerSession session = borrowMessengerSession(); 398 try { 399 return session.getSession().createMessage(); 400 } 401 finally { 402 returnMessengerSession(session); 403 } 404 } 405 406 public ObjectMessage createObjectMessage() throws JMSException { 407 MessengerSession session = borrowMessengerSession(); 408 try { 409 return session.getSession().createObjectMessage(); 410 } 411 finally { 412 returnMessengerSession(session); 413 } 414 } 415 416 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 417 MessengerSession session = borrowMessengerSession(); 418 try { 419 return session.getSession().createObjectMessage(object); 420 } 421 finally { 422 returnMessengerSession(session); 423 } 424 } 425 426 public StreamMessage createStreamMessage() throws JMSException { 427 MessengerSession session = borrowMessengerSession(); 428 try { 429 return session.getSession().createStreamMessage(); 430 } 431 finally { 432 returnMessengerSession(session); 433 } 434 } 435 436 public TextMessage createTextMessage() throws JMSException { 437 MessengerSession session = borrowMessengerSession(); 438 try { 439 return session.getSession().createTextMessage(); 440 } 441 finally { 442 returnMessengerSession(session); 443 } 444 } 445 446 public TextMessage createTextMessage(String text) throws JMSException { 447 MessengerSession session = borrowMessengerSession(); 448 try { 449 return session.getSession().createTextMessage(text); 450 } 451 finally { 452 returnMessengerSession(session); 453 } 454 } 455 456 public void commit() throws JMSException { 457 MessengerSession session = borrowMessengerSession(); 458 try { 459 session.getSession().commit(); 460 } 461 finally { 462 returnMessengerSession(session); 463 } 464 } 465 466 public void rollback() throws JMSException { 467 MessengerSession session = borrowMessengerSession(); 468 try { 469 session.getSession().rollback(); 470 } 471 finally { 472 returnMessengerSession(session); 473 } 474 } 475 476 public void close() throws JMSException { 477 getSessionFactory().close(); 478 } 479 480 /** 481 * Creates a browser on the given Queue 482 */ 483 public QueueBrowser createBrowser(Destination destination) throws JMSException { 484 MessengerSession session = borrowMessengerSession(); 485 QueueBrowser browser = null; 486 try { 487 return createBrowser(session, destination); 488 } 489 finally { 490 returnMessengerSession(session); 491 } 492 } 493 494 /** Get the producer's default delivery mode. */ 495 public int getDeliveryMode(Destination destination) throws JMSException { 496 MessengerSession session = borrowMessengerSession(); 497 int deliveryMode = 0; 498 try { 499 MessageProducer producer = session.getMessageProducer(destination); 500 deliveryMode = producer.getDeliveryMode(); 501 } 502 finally { 503 returnMessengerSession(session); 504 } 505 return deliveryMode; 506 } 507 508 /** Set the producer's default delivery mode. */ 509 public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException { 510 MessengerSession session = borrowMessengerSession(); 511 MessageProducer producer = null; 512 try { 513 producer = session.getMessageProducer(destination); 514 producer.setDeliveryMode(deliveryMode); 515 } 516 finally { 517 returnMessengerSession(session); 518 } 519 } 520 521 /** Get the producer's default priority. */ 522 public int getPriority(Destination destination) throws JMSException { 523 MessengerSession session = borrowMessengerSession(); 524 MessageProducer producer = null; 525 int priority = 0; 526 try { 527 producer = session.getMessageProducer(destination); 528 priority = producer.getPriority(); 529 } 530 finally { 531 532 returnMessengerSession(session); 533 } 534 return priority; 535 } 536 537 /** Set the producer's default priority. */ 538 public void setPriority(Destination destination, int priority) throws JMSException { 539 MessengerSession session = borrowMessengerSession(); 540 MessageProducer producer = null; 541 try { 542 producer = session.getMessageProducer(destination); 543 producer.setPriority(priority); 544 } 545 finally { 546 returnMessengerSession(session); 547 } 548 } 549 550 /** Get the producer's default delivery mode. */ 551 public long getTimeToLive(Destination destination) throws JMSException { 552 MessengerSession session = borrowMessengerSession(); 553 long timeToLive = 0; 554 try { 555 MessageProducer producer = session.getMessageProducer(destination); 556 timeToLive = producer.getTimeToLive(); 557 } 558 finally { 559 returnMessengerSession(session); 560 } 561 return timeToLive; 562 } 563 564 /** <p>Set the default length of time in milliseconds from its dispatch time that 565 * a produced message should be retained by the message system.</p> 566 */ 567 public void setTimeToLive(Destination destination, long timeToLive) throws JMSException { 568 MessengerSession session = borrowMessengerSession(); 569 try { 570 MessageProducer producer = session.getMessageProducer(destination); 571 producer.setTimeToLive(timeToLive); 572 } 573 finally { 574 returnMessengerSession(session); 575 } 576 } 577 578 /** Get an indication of whether message timestamps are disabled. */ 579 public boolean getDisableMessageTimestamp(Destination destination) throws JMSException { 580 MessengerSession session = borrowMessengerSession(); 581 boolean value = false; 582 try { 583 MessageProducer producer = session.getMessageProducer(destination); 584 value = producer.getDisableMessageTimestamp(); 585 } 586 finally { 587 returnMessengerSession(session); 588 } 589 return value; 590 } 591 592 /** Set whether message timestamps are disabled. */ 593 public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException { 594 MessengerSession session = borrowMessengerSession(); 595 try { 596 MessageProducer producer = session.getMessageProducer(destination); 597 producer.setDisableMessageTimestamp(value); 598 } 599 finally { 600 returnMessengerSession(session); 601 } 602 } 603 604 /** Extends the send capability to send by specifying additional options. */ 605 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) 606 throws JMSException { 607 MessengerSession session = borrowMessengerSession(); 608 try { 609 MessageProducer producer = session.getMessageProducer(destination); 610 if (session.isTopic()) { 611 ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive); 612 } 613 else { 614 ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive); 615 } 616 } 617 finally { 618 returnMessengerSession(session); 619 } 620 } 621 622 /** Get an indication of whether message IDs are disabled. */ 623 public boolean getDisableMessageID(Destination destination) throws JMSException { 624 MessengerSession session = borrowMessengerSession(); 625 boolean value = false; 626 try { 627 MessageProducer producer = session.getMessageProducer(destination); 628 value = producer.getDisableMessageID(); 629 } 630 finally { 631 returnMessengerSession(session); 632 } 633 return value; 634 } 635 636 /** Set whether message IDs are disabled. */ 637 public void setDisableMessageID(Destination destination, boolean value) throws JMSException { 638 MessengerSession session = borrowMessengerSession(); 639 try { 640 MessageProducer producer = session.getMessageProducer(destination); 641 producer.setDisableMessageID(value); 642 } 643 finally { 644 returnMessengerSession(session); 645 } 646 } 647 648 // Properties 649 //------------------------------------------------------------------------- 650 /** Gets the name that this Messenger is called in a MessengerManager */ 651 public String getName() { 652 return name; 653 } 654 655 /** Sets the name that this Messenger is called in a MessengerManager */ 656 public void setName(String name) { 657 this.name = name; 658 } 659 660 /** Setter for jndiDestinations */ 661 public void setJndiDestinations(boolean jndiDestinations) { 662 this.jndiDestinations = jndiDestinations; 663 } 664 665 /** Getter for jndiDestinations */ 666 public boolean isJndiDestinations() { 667 return jndiDestinations; 668 } 669 670 /** Gets whether topic subscribers are durable or not */ 671 public boolean isDurable() { 672 return durable; 673 } 674 675 /** Sets whether topic subscribers are durable or not */ 676 public void setDurable(boolean durable) { 677 this.durable = durable; 678 } 679 680 /** Gets whether we should cache the requestor object per thread? */ 681 public boolean isCacheRequestors() { 682 return cacheRequestors; 683 } 684 685 /** Sets whether we should cache the requestor object per thread? */ 686 public void setCacheRequestors(boolean cacheRequestors) { 687 this.cacheRequestors = cacheRequestors; 688 } 689 690 /** @return the durable name used for durable topic based subscriptions */ 691 public String getDurableName() { 692 return durableName; 693 } 694 695 /** Sets the durable name used for durable topic based subscriptions */ 696 public void setDurableName(String durableName) { 697 this.durableName = durableName; 698 } 699 700 /** 701 * Gets whether local messages are ignored when topic based subscription is used 702 * with a message selector 703 */ 704 public boolean isNoLocal() { 705 return noLocal; 706 } 707 708 /** 709 * Sets whether local messages are ignored when topic based subscription is used 710 * with a message selector 711 */ 712 public void setNoLocal(boolean noLocal) { 713 this.noLocal = noLocal; 714 } 715 716 /** Gets whether MessageProducer instances should be cached or not, which defaults to true */ 717 public boolean isCacheProducers() { 718 return cacheProducers; 719 } 720 721 /** Sets whether MessageProducer instances should be cached or not, which defaults to true */ 722 public void setCacheProducers(boolean cacheProducers) { 723 this.cacheProducers = cacheProducers; 724 } 725 726 /** 727 * Returns the delivery mode used on messages sent via this Messenger 728 * @return int 729 */ 730 public int getDeliveryMode() { 731 return deliveryMode; 732 } 733 734 /** 735 * Sets the delivery mode used on messages sent via this Messenger 736 * @param deliveryMode The deliveryMode to set 737 */ 738 public void setDeliveryMode(int deliveryMode) { 739 this.deliveryMode = deliveryMode; 740 } 741 742 /** 743 * Sets whether message delivery should be persistent or not 744 * 745 * @param persistentDelivery 746 */ 747 public void setPersistentDelivery(boolean persistentDelivery) { 748 if (persistentDelivery) { 749 setDeliveryMode(DeliveryMode.PERSISTENT); 750 } 751 else { 752 setDeliveryMode(DeliveryMode.NON_PERSISTENT); 753 } 754 } 755 756 // Implementation methods 757 //------------------------------------------------------------------------- 758 759 protected abstract MessengerSession borrowMessengerSession() throws JMSException; 760 761 protected abstract void returnMessengerSession(MessengerSession session); 762 763 protected abstract boolean isTopic(Connection connection) throws JMSException; 764 765 protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException; 766 767 /** @return a newly created message producer for the given session and destination */ 768 protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException { 769 770 MessageProducer answer = null; 771 Session session = messengerSession.getSession(); 772 if (messengerSession.isTopic()) { 773 TopicSession topicSession = (TopicSession) session; 774 answer = topicSession.createPublisher((Topic) destination); 775 } 776 else { 777 QueueSession queueSession = (QueueSession) session; 778 answer = queueSession.createSender((Queue) destination); 779 } 780 781 // configure the MessageProducer 782 if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) { 783 answer.setDeliveryMode(deliveryMode); 784 } 785 return answer; 786 } 787 788 /** @return a MessageConsumer for the given session and destination */ 789 protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination) 790 throws JMSException { 791 792 MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination); 793 794 if (log.isDebugEnabled()) { 795 log.debug("Created new consumer: " + consumer + " on destination: " + destination); 796 } 797 798 return consumer; 799 } 800 801 /** @return a MessageConsumer for the given session, destination and selector */ 802 protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector) 803 throws JMSException { 804 805 MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector); 806 807 if (log.isDebugEnabled()) { 808 log.debug( 809 "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector); 810 } 811 812 return consumer; 813 } 814 815 /** 816 * Returns a message consumer back to the pool. 817 * By default this method will close message consumers though we should 818 * be able to cache then 819 */ 820 protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException { 821 if (log.isDebugEnabled()) { 822 log.debug("Closing consumer: " + messageConsumer); 823 } 824 825 if (messageConsumer != null) { 826 messageConsumer.close(); 827 } 828 } 829 830 /** @return a new MessageConsumer for the given session and destination */ 831 protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination) 832 throws JMSException { 833 if (messengerSession.isTopic()) { 834 TopicSession topicSession = (TopicSession) session; 835 if (isDurable()) { 836 return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal()); 837 } 838 else { 839 return topicSession.createSubscriber((Topic) destination, null, isNoLocal()); 840 } 841 } 842 else { 843 QueueSession queueSession = (QueueSession) session; 844 return queueSession.createReceiver((Queue) destination); 845 } 846 } 847 848 /** @return a new MessageConsumer for the given session, destination and selector */ 849 protected MessageConsumer createMessageConsumer( 850 MessengerSession messengerSession, 851 Session session, 852 Destination destination, 853 String selector) 854 throws JMSException { 855 if (messengerSession.isTopic()) { 856 TopicSession topicSession = (TopicSession) session; 857 if (isDurable()) { 858 return topicSession.createDurableSubscriber( 859 (Topic) destination, 860 getDurableName(), 861 selector, 862 isNoLocal()); 863 } 864 else { 865 return topicSession.createSubscriber((Topic) destination, selector, isNoLocal()); 866 } 867 } 868 else { 869 QueueSession queueSession = (QueueSession) session; 870 return queueSession.createReceiver((Queue) destination, selector); 871 } 872 } 873 874 /** @return a new QueueBrowser for the given session and destination */ 875 protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException { 876 if (session.isTopic()) { 877 return null; 878 } 879 else { 880 QueueSession queueSession = (QueueSession) session.getSession(); 881 return queueSession.createBrowser((Queue) destination); 882 } 883 } 884 885 protected Queue getQueue(QueueSession session, String subject) throws JMSException { 886 // XXXX: might want to cache 887 return session.createQueue(subject); 888 } 889 890 protected Topic getTopic(TopicSession session, String subject) throws JMSException { 891 // XXXX: might want to cache 892 return session.createTopic(subject); 893 } 894 }