View Javadoc

1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    * 
8    * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messagelet;
11  
12  import javax.jms.Destination;
13  import javax.jms.JMSException;
14  import javax.jms.Message;
15  import javax.jms.MessageConsumer;
16  import javax.jms.MessageListener;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.apache.commons.messenger.Messenger;
21  
22  /** 
23   * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages
24   * using a receive() method on Messenger and then process the message.
25   * This class is a good base class when implementing some kind of transactional processing of 
26   * JMS messages
27   *
28   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
29   * @version $Revision: 155459 $
30   */
31  public class ConsumerThread extends Thread {
32  
33      /** Logger */
34      private static final Log log = LogFactory.getLog(ConsumerThread.class);
35  
36  
37      private MessageConsumer consumer;
38      private Messenger messenger;
39      private Destination destination;
40      private String selector;
41      private MessageListener listener;
42      private boolean shouldStop;
43      
44      public ConsumerThread() {
45          setName("Consumer" + getName());
46      }
47  
48  
49      /**
50       * Starts all the JMS connections and consumes JMS messages, 
51       * passing them onto the MessageListener and Message Driven Objects
52       */
53      public void run() {
54          if (log.isDebugEnabled()) {
55              log.debug( "Starting consumer thread: " + getName());
56          }
57          try {
58              startConsumer();
59          }
60          catch (JMSException e) {
61              log.error("Failed to start consumer thread: " + e, e);
62              setShouldStop(true);
63          }
64          
65          while (! isShouldStop()) {
66          	try {
67  	            startTransaction();
68          	}
69          	catch (Exception e) {
70          		log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
71          		break;
72          	}
73  
74              try {
75                  Message message = receive();
76  
77                  if (log.isTraceEnabled()) {
78                      log.trace( "Found: " + message );
79                  }
80                  
81                  if (message != null) {
82                      processMessage(message);
83                      commitTransaction();
84                  }
85                  else {
86                      cancelTransaction();
87                  }
88              }
89              catch (Exception e) {
90                  rollbackTransaction(e);
91              }
92          }
93          
94          try {
95              stopConsumer();
96          }
97          catch (JMSException e) {
98              log.error("Failed to stop consuming messages: " + e, e);
99          }
100     }
101     
102     // Properties
103     //-------------------------------------------------------------------------    
104 
105     /**
106      * Returns the destination.
107      * @return Destination
108      */
109     public Destination getDestination() {
110         return destination;
111     }
112 
113     /**
114      * Returns the listener.
115      * @return MessageListener
116      */
117     public MessageListener getListener() {
118         return listener;
119     }
120 
121     /**
122      * Returns the messenger.
123      * @return Messenger
124      */
125     public Messenger getMessenger() {
126         return messenger;
127     }
128 
129     /**
130      * Returns the selector.
131      * @return String
132      */
133     public String getSelector() {
134         return selector;
135     }
136 
137     /**
138      * Returns the shouldStop.
139      * @return boolean
140      */
141     public boolean isShouldStop() {
142         return shouldStop;
143     }
144 
145     /**
146      * Sets the destination.
147      * @param destination The destination to set
148      */
149     public void setDestination(Destination destination) {
150         this.destination = destination;
151     }
152 
153     /**
154      * Sets the listener.
155      * @param listener The listener to set
156      */
157     public void setListener(MessageListener listener) {
158         this.listener = listener;
159     }
160 
161     /**
162      * Sets the messenger.
163      * @param messenger The messenger to set
164      */
165     public void setMessenger(Messenger messenger) {
166         this.messenger = messenger;
167     }
168 
169     /**
170      * Sets the selector.
171      * @param selector The selector to set
172      */
173     public void setSelector(String selector) {
174         this.selector = selector;
175     }
176 
177     /**
178      * Sets the shouldStop.
179      * @param shouldStop The shouldStop to set
180      */
181     public void setShouldStop(boolean shouldStop) {
182         this.shouldStop = shouldStop;
183     }
184 
185     // Implementation methods
186     //-------------------------------------------------------------------------    
187     
188     /**
189      * Starts consuming messages        
190      */    
191     protected void startConsumer() throws JMSException {
192         consumer = createConsumer();
193     }
194 
195     /**
196      * Stops consuming messages        
197      */    
198     protected void stopConsumer() throws JMSException {
199         consumer.close();
200     }
201 
202     /**
203      * Factory method to create a new MessageConsumer 
204      */
205     protected MessageConsumer createConsumer() throws JMSException {
206         String selector = getSelector();
207         if (selector != null) {
208             return getMessenger().createConsumer(getDestination(), selector);
209         }
210         else {
211             return getMessenger().createConsumer(getDestination());
212         }
213     }
214 
215     /**
216      * Strategy method to consume a message using a receive() kind of method.
217      * @return the message or null if a message could not be found after waiting for
218      * some period of time.
219      */
220     private Message receive() throws JMSException {
221         return getConsumer().receive();
222     }    
223     
224     /**
225      * Strategy method to process a given message. 
226      * By default this will just invoke the MessageListener
227      */
228     protected void processMessage(Message message) throws JMSException {
229         MessageListener listener = getListener();
230         if (listener != null) {
231             listener.onMessage(message);
232         }
233     }
234 
235 
236     /**
237      * Strategy method to represent the code required to start
238      * a transaction.
239      */
240     protected void startTransaction() throws Exception {
241     }
242 
243     /**
244      * Strategy method to represent the code required to commit
245      * a transaction.
246      */
247     protected void commitTransaction() throws Exception {
248     }
249 
250     /**
251      * Strategy method to represent the code required to rollback
252      * a transaction.
253      */
254     protected void rollbackTransaction(Exception e) {
255     }
256 
257     /**
258      * Strategy method to represent the code required to cancel
259      * a transaction. 
260      * This is called when a message is not received.
261      */
262     protected void cancelTransaction() throws Exception {
263     }
264 
265 
266     /**
267      * @erturn the consumer of messages 
268      */
269     protected MessageConsumer getConsumer() {
270         return consumer;
271     }
272 }