1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messenger;
11
12 import java.io.Serializable;
13 import java.util.HashMap;
14 import java.util.Map;
15
16 import javax.jms.BytesMessage;
17 import javax.jms.Connection;
18 import javax.jms.ConnectionConsumer;
19 import javax.jms.ConnectionFactory;
20 import javax.jms.DeliveryMode;
21 import javax.jms.Destination;
22 import javax.jms.JMSException;
23 import javax.jms.MapMessage;
24 import javax.jms.Message;
25 import javax.jms.MessageConsumer;
26 import javax.jms.MessageListener;
27 import javax.jms.MessageProducer;
28 import javax.jms.ObjectMessage;
29 import javax.jms.Queue;
30 import javax.jms.QueueBrowser;
31 import javax.jms.QueueConnection;
32 import javax.jms.QueueSender;
33 import javax.jms.QueueSession;
34 import javax.jms.ServerSessionPool;
35 import javax.jms.Session;
36 import javax.jms.StreamMessage;
37 import javax.jms.TextMessage;
38 import javax.jms.Topic;
39 import javax.jms.TopicConnection;
40 import javax.jms.TopicPublisher;
41 import javax.jms.TopicSession;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45
46
47
48
49
50
51
52
53 public abstract class MessengerSupport implements Messenger {
54
55
56 private static final Log log = LogFactory.getLog(MessengerSupport.class);
57 private static final Log destinationLog = LogFactory.getLog("org.apache.commons.messenger.destination");
58
59 private static final boolean CACHE_REQUESTOR = true;
60
61
62 private String name;
63
64
65
66
67
68 private boolean jndiDestinations;
69
70
71 private boolean durable;
72
73
74 private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
75
76
77 private String durableName;
78
79
80
81
82
83 private boolean noLocal;
84
85
86 private boolean cacheRequestors;
87
88
89 private Map listeners = new HashMap();
90
91
92 private boolean cacheProducers = true;
93
94 public MessengerSupport() {
95 }
96
97 public String toString() {
98 try {
99 MessengerSession session = borrowMessengerSession();
100 String answer = super.toString() + " session: " + session.toString();
101 returnMessengerSession(session);
102 return answer;
103 }
104 catch (Exception e) {
105 return super.toString() + " session: " + e.toString();
106 }
107 }
108
109 public Destination getDestination(String subject) throws JMSException {
110 MessengerSession messengerSession = borrowMessengerSession();
111 try {
112 boolean debug = destinationLog.isInfoEnabled();
113 Session session = messengerSession.getSession();
114 if (messengerSession.isTopic()) {
115 if (debug) {
116 destinationLog.info("Using topic: " + subject);
117 }
118 return getTopic((TopicSession) session, subject);
119 }
120 else {
121 if (debug) {
122 destinationLog.info("Using queue: " + subject);
123 }
124 return getQueue((QueueSession) session, subject);
125 }
126 }
127 finally {
128 returnMessengerSession(messengerSession);
129 }
130 }
131
132 public Destination createTemporaryDestination() throws JMSException {
133 MessengerSession messengerSession = borrowMessengerSession();
134 try {
135 Session session = messengerSession.getSession();
136 if (messengerSession.isTopic()) {
137 TopicSession topicSession = (TopicSession) session;
138 return topicSession.createTemporaryTopic();
139 }
140 else {
141 QueueSession queueSession = (QueueSession) session;
142 return queueSession.createTemporaryQueue();
143 }
144 }
145 finally {
146 returnMessengerSession(messengerSession);
147 }
148 }
149
150 public void send(Destination destination, Message message) throws JMSException {
151 MessengerSession session = borrowMessengerSession();
152 try {
153 MessageProducer producer = session.getMessageProducer(destination);
154 if (session.isTopic()) {
155 ((TopicPublisher) producer).publish((Topic) destination, message);
156 }
157 else {
158 ((QueueSender) producer).send((Queue) destination, message);
159 }
160 }
161 finally {
162 returnMessengerSession(session);
163 }
164 }
165
166 public Message receive(Destination destination) throws JMSException {
167 MessengerSession session = borrowMessengerSession();
168 MessageConsumer consumer = null;
169 try {
170 consumer = borrowMessageConsumer(session, session.getSession(), destination);
171 return consumer.receive();
172 }
173 finally {
174 returnMessageConsumer(consumer);
175 returnMessengerSession(session);
176 }
177 }
178
179 public Message receive(Destination destination, String selector) throws JMSException {
180 MessengerSession session = borrowMessengerSession();
181 MessageConsumer consumer = null;
182 try {
183 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
184 return consumer.receive();
185 }
186 finally {
187 returnMessageConsumer(consumer);
188 returnMessengerSession(session);
189 }
190 }
191
192 public Message receive(Destination destination, long timeoutMillis) throws JMSException {
193 MessengerSession session = borrowMessengerSession();
194 MessageConsumer consumer = null;
195 try {
196 consumer = borrowMessageConsumer(session, session.getSession(), destination);
197 return consumer.receive(timeoutMillis);
198 }
199 finally {
200 returnMessageConsumer(consumer);
201 returnMessengerSession(session);
202 }
203 }
204
205 public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
206 MessengerSession session = borrowMessengerSession();
207 MessageConsumer consumer = null;
208 try {
209 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
210 return consumer.receive(timeoutMillis);
211 }
212 finally {
213 returnMessageConsumer(consumer);
214 returnMessengerSession(session);
215 }
216 }
217
218 public Message receiveNoWait(Destination destination) throws JMSException {
219 MessengerSession session = borrowMessengerSession();
220 MessageConsumer consumer = null;
221 try {
222 consumer = borrowMessageConsumer(session, session.getSession(), destination);
223 return consumer.receiveNoWait();
224 }
225 finally {
226 returnMessageConsumer(consumer);
227 returnMessengerSession(session);
228 }
229 }
230
231 public Message receiveNoWait(Destination destination, String selector) throws JMSException {
232 MessengerSession session = borrowMessengerSession();
233 MessageConsumer consumer = null;
234 try {
235 consumer = borrowMessageConsumer(session, session.getSession(), destination, selector);
236 return consumer.receiveNoWait();
237 }
238 finally {
239 returnMessageConsumer(consumer);
240 returnMessengerSession(session);
241 }
242 }
243
244 public MessageConsumer createConsumer(Destination destination) throws JMSException {
245 MessengerSession session = borrowMessengerSession();
246 try {
247 return createMessageConsumer(session, session.getSession(), destination);
248 }
249 finally {
250 returnMessengerSession(session);
251 }
252 }
253
254 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
255 MessengerSession session = borrowMessengerSession();
256 try {
257 return createMessageConsumer(session, session.getSession(), destination, selector);
258 }
259 finally {
260 returnMessengerSession(session);
261 }
262 }
263
264 public void run() {
265
266 try {
267 MessengerSession session = borrowMessengerSession();
268 session.getSession().run();
269 returnMessengerSession(session);
270 }
271 catch (JMSException e) {
272
273 }
274 }
275
276 public ConnectionConsumer createConnectionConsumer(
277 Destination destination,
278 ServerSessionPool sessionPool,
279 int maxMessages)
280 throws JMSException {
281 return createConnectionConsumer(destination, null, sessionPool, maxMessages);
282 }
283
284 public ConnectionConsumer createConnectionConsumer(
285 Destination destination,
286 String selector,
287 ServerSessionPool sessionPool,
288 int maxMessages)
289 throws JMSException {
290 Connection connection = getConnection();
291 if (isTopic(connection)) {
292 TopicConnection topicConnection = (TopicConnection) connection;
293 if (isDurable()) {
294 return topicConnection.createDurableConnectionConsumer(
295 (Topic) destination,
296 getDurableName(),
297 selector,
298 sessionPool,
299 maxMessages);
300 }
301 else {
302 return topicConnection.createConnectionConsumer(
303 (Topic) destination,
304 selector,
305 sessionPool,
306 maxMessages);
307 }
308 }
309 else {
310 QueueConnection queueConnection = (QueueConnection) connection;
311 return queueConnection.createConnectionConsumer((Queue) destination, selector, sessionPool, maxMessages);
312 }
313 }
314
315 public abstract Connection getConnection() throws JMSException;
316
317
318
319 public void addListener(Destination destination, MessageListener listener) throws JMSException {
320 if (listener instanceof MessengerListener) {
321 MessengerListener messengerListener = (MessengerListener) listener;
322 messengerListener.setMessenger(this);
323 }
324 MessengerSession session = borrowMessengerSession();
325 try {
326 MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination);
327 consumer.setMessageListener(listener);
328 ListenerKey key = new ListenerKey(destination, listener);
329 listeners.put(key, consumer);
330 }
331 finally {
332 returnMessengerSession(session);
333 }
334 }
335
336 public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
337
338 if (listener instanceof MessengerListener) {
339 MessengerListener messengerListener = (MessengerListener) listener;
340 messengerListener.setMessenger(this);
341 }
342 MessengerSession session = borrowMessengerSession();
343 try {
344 MessageConsumer consumer = createMessageConsumer(session, session.getListenerSession(), destination, selector);
345 consumer.setMessageListener(listener);
346 ListenerKey key = new ListenerKey(destination, listener, selector);
347 listeners.put(key, consumer);
348 }
349 finally {
350 returnMessengerSession(session);
351 }
352 }
353
354 public void removeListener(Destination destination, MessageListener listener) throws JMSException {
355 ListenerKey key = new ListenerKey(destination, listener);
356 MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
357 if (consumer == null) {
358 throw new JMSException("The given listener object has not been added for the given destination");
359 }
360 consumer.close();
361 }
362
363 public void removeListener(Destination destination, String selector, MessageListener listener)
364 throws JMSException {
365
366 ListenerKey key = new ListenerKey(destination, listener, selector);
367 MessageConsumer consumer = (MessageConsumer) listeners.remove(key);
368 if (consumer == null) {
369 throw new JMSException("The given listener object has not been added for the given destination and selector");
370 }
371 consumer.close();
372 }
373
374
375
376 public BytesMessage createBytesMessage() throws JMSException {
377 MessengerSession session = borrowMessengerSession();
378 try {
379 return session.getSession().createBytesMessage();
380 }
381 finally {
382 returnMessengerSession(session);
383 }
384 }
385
386 public MapMessage createMapMessage() throws JMSException {
387 MessengerSession session = borrowMessengerSession();
388 try {
389 return session.getSession().createMapMessage();
390 }
391 finally {
392 returnMessengerSession(session);
393 }
394 }
395
396 public Message createMessage() throws JMSException {
397 MessengerSession session = borrowMessengerSession();
398 try {
399 return session.getSession().createMessage();
400 }
401 finally {
402 returnMessengerSession(session);
403 }
404 }
405
406 public ObjectMessage createObjectMessage() throws JMSException {
407 MessengerSession session = borrowMessengerSession();
408 try {
409 return session.getSession().createObjectMessage();
410 }
411 finally {
412 returnMessengerSession(session);
413 }
414 }
415
416 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
417 MessengerSession session = borrowMessengerSession();
418 try {
419 return session.getSession().createObjectMessage(object);
420 }
421 finally {
422 returnMessengerSession(session);
423 }
424 }
425
426 public StreamMessage createStreamMessage() throws JMSException {
427 MessengerSession session = borrowMessengerSession();
428 try {
429 return session.getSession().createStreamMessage();
430 }
431 finally {
432 returnMessengerSession(session);
433 }
434 }
435
436 public TextMessage createTextMessage() throws JMSException {
437 MessengerSession session = borrowMessengerSession();
438 try {
439 return session.getSession().createTextMessage();
440 }
441 finally {
442 returnMessengerSession(session);
443 }
444 }
445
446 public TextMessage createTextMessage(String text) throws JMSException {
447 MessengerSession session = borrowMessengerSession();
448 try {
449 return session.getSession().createTextMessage(text);
450 }
451 finally {
452 returnMessengerSession(session);
453 }
454 }
455
456 public void commit() throws JMSException {
457 MessengerSession session = borrowMessengerSession();
458 try {
459 session.getSession().commit();
460 }
461 finally {
462 returnMessengerSession(session);
463 }
464 }
465
466 public void rollback() throws JMSException {
467 MessengerSession session = borrowMessengerSession();
468 try {
469 session.getSession().rollback();
470 }
471 finally {
472 returnMessengerSession(session);
473 }
474 }
475
476 public void close() throws JMSException {
477 getSessionFactory().close();
478 }
479
480
481
482
483 public QueueBrowser createBrowser(Destination destination) throws JMSException {
484 MessengerSession session = borrowMessengerSession();
485 QueueBrowser browser = null;
486 try {
487 return createBrowser(session, destination);
488 }
489 finally {
490 returnMessengerSession(session);
491 }
492 }
493
494
495 public int getDeliveryMode(Destination destination) throws JMSException {
496 MessengerSession session = borrowMessengerSession();
497 int deliveryMode = 0;
498 try {
499 MessageProducer producer = session.getMessageProducer(destination);
500 deliveryMode = producer.getDeliveryMode();
501 }
502 finally {
503 returnMessengerSession(session);
504 }
505 return deliveryMode;
506 }
507
508
509 public void setDeliveryMode(Destination destination, int deliveryMode) throws JMSException {
510 MessengerSession session = borrowMessengerSession();
511 MessageProducer producer = null;
512 try {
513 producer = session.getMessageProducer(destination);
514 producer.setDeliveryMode(deliveryMode);
515 }
516 finally {
517 returnMessengerSession(session);
518 }
519 }
520
521
522 public int getPriority(Destination destination) throws JMSException {
523 MessengerSession session = borrowMessengerSession();
524 MessageProducer producer = null;
525 int priority = 0;
526 try {
527 producer = session.getMessageProducer(destination);
528 priority = producer.getPriority();
529 }
530 finally {
531
532 returnMessengerSession(session);
533 }
534 return priority;
535 }
536
537
538 public void setPriority(Destination destination, int priority) throws JMSException {
539 MessengerSession session = borrowMessengerSession();
540 MessageProducer producer = null;
541 try {
542 producer = session.getMessageProducer(destination);
543 producer.setPriority(priority);
544 }
545 finally {
546 returnMessengerSession(session);
547 }
548 }
549
550
551 public long getTimeToLive(Destination destination) throws JMSException {
552 MessengerSession session = borrowMessengerSession();
553 long timeToLive = 0;
554 try {
555 MessageProducer producer = session.getMessageProducer(destination);
556 timeToLive = producer.getTimeToLive();
557 }
558 finally {
559 returnMessengerSession(session);
560 }
561 return timeToLive;
562 }
563
564
565
566
567 public void setTimeToLive(Destination destination, long timeToLive) throws JMSException {
568 MessengerSession session = borrowMessengerSession();
569 try {
570 MessageProducer producer = session.getMessageProducer(destination);
571 producer.setTimeToLive(timeToLive);
572 }
573 finally {
574 returnMessengerSession(session);
575 }
576 }
577
578
579 public boolean getDisableMessageTimestamp(Destination destination) throws JMSException {
580 MessengerSession session = borrowMessengerSession();
581 boolean value = false;
582 try {
583 MessageProducer producer = session.getMessageProducer(destination);
584 value = producer.getDisableMessageTimestamp();
585 }
586 finally {
587 returnMessengerSession(session);
588 }
589 return value;
590 }
591
592
593 public void setDisableMessageTimestamp(Destination destination, boolean value) throws JMSException {
594 MessengerSession session = borrowMessengerSession();
595 try {
596 MessageProducer producer = session.getMessageProducer(destination);
597 producer.setDisableMessageTimestamp(value);
598 }
599 finally {
600 returnMessengerSession(session);
601 }
602 }
603
604
605 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
606 throws JMSException {
607 MessengerSession session = borrowMessengerSession();
608 try {
609 MessageProducer producer = session.getMessageProducer(destination);
610 if (session.isTopic()) {
611 ((TopicPublisher) producer).publish((Topic) destination, message, deliveryMode, priority, timeToLive);
612 }
613 else {
614 ((QueueSender) producer).send((Queue) destination, message, deliveryMode, priority, timeToLive);
615 }
616 }
617 finally {
618 returnMessengerSession(session);
619 }
620 }
621
622
623 public boolean getDisableMessageID(Destination destination) throws JMSException {
624 MessengerSession session = borrowMessengerSession();
625 boolean value = false;
626 try {
627 MessageProducer producer = session.getMessageProducer(destination);
628 value = producer.getDisableMessageID();
629 }
630 finally {
631 returnMessengerSession(session);
632 }
633 return value;
634 }
635
636
637 public void setDisableMessageID(Destination destination, boolean value) throws JMSException {
638 MessengerSession session = borrowMessengerSession();
639 try {
640 MessageProducer producer = session.getMessageProducer(destination);
641 producer.setDisableMessageID(value);
642 }
643 finally {
644 returnMessengerSession(session);
645 }
646 }
647
648
649
650
651 public String getName() {
652 return name;
653 }
654
655
656 public void setName(String name) {
657 this.name = name;
658 }
659
660
661 public void setJndiDestinations(boolean jndiDestinations) {
662 this.jndiDestinations = jndiDestinations;
663 }
664
665
666 public boolean isJndiDestinations() {
667 return jndiDestinations;
668 }
669
670
671 public boolean isDurable() {
672 return durable;
673 }
674
675
676 public void setDurable(boolean durable) {
677 this.durable = durable;
678 }
679
680
681 public boolean isCacheRequestors() {
682 return cacheRequestors;
683 }
684
685
686 public void setCacheRequestors(boolean cacheRequestors) {
687 this.cacheRequestors = cacheRequestors;
688 }
689
690
691 public String getDurableName() {
692 return durableName;
693 }
694
695
696 public void setDurableName(String durableName) {
697 this.durableName = durableName;
698 }
699
700
701
702
703
704 public boolean isNoLocal() {
705 return noLocal;
706 }
707
708
709
710
711
712 public void setNoLocal(boolean noLocal) {
713 this.noLocal = noLocal;
714 }
715
716
717 public boolean isCacheProducers() {
718 return cacheProducers;
719 }
720
721
722 public void setCacheProducers(boolean cacheProducers) {
723 this.cacheProducers = cacheProducers;
724 }
725
726
727
728
729
730 public int getDeliveryMode() {
731 return deliveryMode;
732 }
733
734
735
736
737
738 public void setDeliveryMode(int deliveryMode) {
739 this.deliveryMode = deliveryMode;
740 }
741
742
743
744
745
746
747 public void setPersistentDelivery(boolean persistentDelivery) {
748 if (persistentDelivery) {
749 setDeliveryMode(DeliveryMode.PERSISTENT);
750 }
751 else {
752 setDeliveryMode(DeliveryMode.NON_PERSISTENT);
753 }
754 }
755
756
757
758
759 protected abstract MessengerSession borrowMessengerSession() throws JMSException;
760
761 protected abstract void returnMessengerSession(MessengerSession session);
762
763 protected abstract boolean isTopic(Connection connection) throws JMSException;
764
765 protected abstract boolean isTopic(ConnectionFactory factory) throws JMSException;
766
767
768 protected MessageProducer createMessageProducer(MessengerSession messengerSession, Destination destination) throws JMSException {
769
770 MessageProducer answer = null;
771 Session session = messengerSession.getSession();
772 if (messengerSession.isTopic()) {
773 TopicSession topicSession = (TopicSession) session;
774 answer = topicSession.createPublisher((Topic) destination);
775 }
776 else {
777 QueueSession queueSession = (QueueSession) session;
778 answer = queueSession.createSender((Queue) destination);
779 }
780
781
782 if (deliveryMode != Message.DEFAULT_DELIVERY_MODE) {
783 answer.setDeliveryMode(deliveryMode);
784 }
785 return answer;
786 }
787
788
789 protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
790 throws JMSException {
791
792 MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination);
793
794 if (log.isDebugEnabled()) {
795 log.debug("Created new consumer: " + consumer + " on destination: " + destination);
796 }
797
798 return consumer;
799 }
800
801
802 protected MessageConsumer borrowMessageConsumer(MessengerSession messengerSession, Session session, Destination destination, String selector)
803 throws JMSException {
804
805 MessageConsumer consumer = createMessageConsumer(messengerSession, session, destination, selector);
806
807 if (log.isDebugEnabled()) {
808 log.debug(
809 "Created new consumer: " + consumer + " on destination: " + destination + " selector: " + selector);
810 }
811
812 return consumer;
813 }
814
815
816
817
818
819
820 protected void returnMessageConsumer(MessageConsumer messageConsumer) throws JMSException {
821 if (log.isDebugEnabled()) {
822 log.debug("Closing consumer: " + messageConsumer);
823 }
824
825 if (messageConsumer != null) {
826 messageConsumer.close();
827 }
828 }
829
830
831 protected MessageConsumer createMessageConsumer(MessengerSession messengerSession, Session session, Destination destination)
832 throws JMSException {
833 if (messengerSession.isTopic()) {
834 TopicSession topicSession = (TopicSession) session;
835 if (isDurable()) {
836 return topicSession.createDurableSubscriber((Topic) destination, getDurableName(), null, isNoLocal());
837 }
838 else {
839 return topicSession.createSubscriber((Topic) destination, null, isNoLocal());
840 }
841 }
842 else {
843 QueueSession queueSession = (QueueSession) session;
844 return queueSession.createReceiver((Queue) destination);
845 }
846 }
847
848
849 protected MessageConsumer createMessageConsumer(
850 MessengerSession messengerSession,
851 Session session,
852 Destination destination,
853 String selector)
854 throws JMSException {
855 if (messengerSession.isTopic()) {
856 TopicSession topicSession = (TopicSession) session;
857 if (isDurable()) {
858 return topicSession.createDurableSubscriber(
859 (Topic) destination,
860 getDurableName(),
861 selector,
862 isNoLocal());
863 }
864 else {
865 return topicSession.createSubscriber((Topic) destination, selector, isNoLocal());
866 }
867 }
868 else {
869 QueueSession queueSession = (QueueSession) session;
870 return queueSession.createReceiver((Queue) destination, selector);
871 }
872 }
873
874
875 protected QueueBrowser createBrowser(MessengerSession session, Destination destination) throws JMSException {
876 if (session.isTopic()) {
877 return null;
878 }
879 else {
880 QueueSession queueSession = (QueueSession) session.getSession();
881 return queueSession.createBrowser((Queue) destination);
882 }
883 }
884
885 protected Queue getQueue(QueueSession session, String subject) throws JMSException {
886
887 return session.createQueue(subject);
888 }
889
890 protected Topic getTopic(TopicSession session, String subject) throws JMSException {
891
892 return session.createTopic(subject);
893 }
894 }