1 /*
2 * Copyright (C) The Apache Software Foundation. All rights reserved.
3 *
4 * This software is published under the terms of the Apache Software License
5 * version 1.1, a copy of which has been included with this distribution in
6 * the LICENSE file.
7 *
8 * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
9 */
10 package org.apache.commons.messagelet;
11
12 import javax.jms.Destination;
13 import javax.jms.JMSException;
14 import javax.jms.Message;
15 import javax.jms.MessageConsumer;
16 import javax.jms.MessageListener;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.apache.commons.messenger.Messenger;
21
22 /**
23 * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages
24 * using a receive() method on Messenger and then process the message.
25 * This class is a good base class when implementing some kind of transactional processing of
26 * JMS messages
27 *
28 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
29 * @version $Revision: 155459 $
30 */
31 public class ConsumerThread extends Thread {
32
33 /** Logger */
34 private static final Log log = LogFactory.getLog(ConsumerThread.class);
35
36
37 private MessageConsumer consumer;
38 private Messenger messenger;
39 private Destination destination;
40 private String selector;
41 private MessageListener listener;
42 private boolean shouldStop;
43
44 public ConsumerThread() {
45 setName("Consumer" + getName());
46 }
47
48
49 /**
50 * Starts all the JMS connections and consumes JMS messages,
51 * passing them onto the MessageListener and Message Driven Objects
52 */
53 public void run() {
54 if (log.isDebugEnabled()) {
55 log.debug( "Starting consumer thread: " + getName());
56 }
57 try {
58 startConsumer();
59 }
60 catch (JMSException e) {
61 log.error("Failed to start consumer thread: " + e, e);
62 setShouldStop(true);
63 }
64
65 while (! isShouldStop()) {
66 try {
67 startTransaction();
68 }
69 catch (Exception e) {
70 log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
71 break;
72 }
73
74 try {
75 Message message = receive();
76
77 if (log.isTraceEnabled()) {
78 log.trace( "Found: " + message );
79 }
80
81 if (message != null) {
82 processMessage(message);
83 commitTransaction();
84 }
85 else {
86 cancelTransaction();
87 }
88 }
89 catch (Exception e) {
90 rollbackTransaction(e);
91 }
92 }
93
94 try {
95 stopConsumer();
96 }
97 catch (JMSException e) {
98 log.error("Failed to stop consuming messages: " + e, e);
99 }
100 }
101
102 // Properties
103 //-------------------------------------------------------------------------
104
105 /**
106 * Returns the destination.
107 * @return Destination
108 */
109 public Destination getDestination() {
110 return destination;
111 }
112
113 /**
114 * Returns the listener.
115 * @return MessageListener
116 */
117 public MessageListener getListener() {
118 return listener;
119 }
120
121 /**
122 * Returns the messenger.
123 * @return Messenger
124 */
125 public Messenger getMessenger() {
126 return messenger;
127 }
128
129 /**
130 * Returns the selector.
131 * @return String
132 */
133 public String getSelector() {
134 return selector;
135 }
136
137 /**
138 * Returns the shouldStop.
139 * @return boolean
140 */
141 public boolean isShouldStop() {
142 return shouldStop;
143 }
144
145 /**
146 * Sets the destination.
147 * @param destination The destination to set
148 */
149 public void setDestination(Destination destination) {
150 this.destination = destination;
151 }
152
153 /**
154 * Sets the listener.
155 * @param listener The listener to set
156 */
157 public void setListener(MessageListener listener) {
158 this.listener = listener;
159 }
160
161 /**
162 * Sets the messenger.
163 * @param messenger The messenger to set
164 */
165 public void setMessenger(Messenger messenger) {
166 this.messenger = messenger;
167 }
168
169 /**
170 * Sets the selector.
171 * @param selector The selector to set
172 */
173 public void setSelector(String selector) {
174 this.selector = selector;
175 }
176
177 /**
178 * Sets the shouldStop.
179 * @param shouldStop The shouldStop to set
180 */
181 public void setShouldStop(boolean shouldStop) {
182 this.shouldStop = shouldStop;
183 }
184
185 // Implementation methods
186 //-------------------------------------------------------------------------
187
188 /**
189 * Starts consuming messages
190 */
191 protected void startConsumer() throws JMSException {
192 consumer = createConsumer();
193 }
194
195 /**
196 * Stops consuming messages
197 */
198 protected void stopConsumer() throws JMSException {
199 consumer.close();
200 }
201
202 /**
203 * Factory method to create a new MessageConsumer
204 */
205 protected MessageConsumer createConsumer() throws JMSException {
206 String selector = getSelector();
207 if (selector != null) {
208 return getMessenger().createConsumer(getDestination(), selector);
209 }
210 else {
211 return getMessenger().createConsumer(getDestination());
212 }
213 }
214
215 /**
216 * Strategy method to consume a message using a receive() kind of method.
217 * @return the message or null if a message could not be found after waiting for
218 * some period of time.
219 */
220 private Message receive() throws JMSException {
221 return getConsumer().receive();
222 }
223
224 /**
225 * Strategy method to process a given message.
226 * By default this will just invoke the MessageListener
227 */
228 protected void processMessage(Message message) throws JMSException {
229 MessageListener listener = getListener();
230 if (listener != null) {
231 listener.onMessage(message);
232 }
233 }
234
235
236 /**
237 * Strategy method to represent the code required to start
238 * a transaction.
239 */
240 protected void startTransaction() throws Exception {
241 }
242
243 /**
244 * Strategy method to represent the code required to commit
245 * a transaction.
246 */
247 protected void commitTransaction() throws Exception {
248 }
249
250 /**
251 * Strategy method to represent the code required to rollback
252 * a transaction.
253 */
254 protected void rollbackTransaction(Exception e) {
255 }
256
257 /**
258 * Strategy method to represent the code required to cancel
259 * a transaction.
260 * This is called when a message is not received.
261 */
262 protected void cancelTransaction() throws Exception {
263 }
264
265
266 /**
267 * @erturn the consumer of messages
268 */
269 protected MessageConsumer getConsumer() {
270 return consumer;
271 }
272 }