001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messagelet;
011
012 import javax.jms.Destination;
013 import javax.jms.JMSException;
014 import javax.jms.Message;
015 import javax.jms.MessageConsumer;
016 import javax.jms.MessageListener;
017
018 import org.apache.commons.logging.Log;
019 import org.apache.commons.logging.LogFactory;
020 import org.apache.commons.messenger.Messenger;
021
022 /**
023 * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages
024 * using a receive() method on Messenger and then process the message.
025 * This class is a good base class when implementing some kind of transactional processing of
026 * JMS messages
027 *
028 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
029 * @version $Revision: 155459 $
030 */
031 public class ConsumerThread extends Thread {
032
033 /** Logger */
034 private static final Log log = LogFactory.getLog(ConsumerThread.class);
035
036
037 private MessageConsumer consumer;
038 private Messenger messenger;
039 private Destination destination;
040 private String selector;
041 private MessageListener listener;
042 private boolean shouldStop;
043
044 public ConsumerThread() {
045 setName("Consumer" + getName());
046 }
047
048
049 /**
050 * Starts all the JMS connections and consumes JMS messages,
051 * passing them onto the MessageListener and Message Driven Objects
052 */
053 public void run() {
054 if (log.isDebugEnabled()) {
055 log.debug( "Starting consumer thread: " + getName());
056 }
057 try {
058 startConsumer();
059 }
060 catch (JMSException e) {
061 log.error("Failed to start consumer thread: " + e, e);
062 setShouldStop(true);
063 }
064
065 while (! isShouldStop()) {
066 try {
067 startTransaction();
068 }
069 catch (Exception e) {
070 log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
071 break;
072 }
073
074 try {
075 Message message = receive();
076
077 if (log.isTraceEnabled()) {
078 log.trace( "Found: " + message );
079 }
080
081 if (message != null) {
082 processMessage(message);
083 commitTransaction();
084 }
085 else {
086 cancelTransaction();
087 }
088 }
089 catch (Exception e) {
090 rollbackTransaction(e);
091 }
092 }
093
094 try {
095 stopConsumer();
096 }
097 catch (JMSException e) {
098 log.error("Failed to stop consuming messages: " + e, e);
099 }
100 }
101
102 // Properties
103 //-------------------------------------------------------------------------
104
105 /**
106 * Returns the destination.
107 * @return Destination
108 */
109 public Destination getDestination() {
110 return destination;
111 }
112
113 /**
114 * Returns the listener.
115 * @return MessageListener
116 */
117 public MessageListener getListener() {
118 return listener;
119 }
120
121 /**
122 * Returns the messenger.
123 * @return Messenger
124 */
125 public Messenger getMessenger() {
126 return messenger;
127 }
128
129 /**
130 * Returns the selector.
131 * @return String
132 */
133 public String getSelector() {
134 return selector;
135 }
136
137 /**
138 * Returns the shouldStop.
139 * @return boolean
140 */
141 public boolean isShouldStop() {
142 return shouldStop;
143 }
144
145 /**
146 * Sets the destination.
147 * @param destination The destination to set
148 */
149 public void setDestination(Destination destination) {
150 this.destination = destination;
151 }
152
153 /**
154 * Sets the listener.
155 * @param listener The listener to set
156 */
157 public void setListener(MessageListener listener) {
158 this.listener = listener;
159 }
160
161 /**
162 * Sets the messenger.
163 * @param messenger The messenger to set
164 */
165 public void setMessenger(Messenger messenger) {
166 this.messenger = messenger;
167 }
168
169 /**
170 * Sets the selector.
171 * @param selector The selector to set
172 */
173 public void setSelector(String selector) {
174 this.selector = selector;
175 }
176
177 /**
178 * Sets the shouldStop.
179 * @param shouldStop The shouldStop to set
180 */
181 public void setShouldStop(boolean shouldStop) {
182 this.shouldStop = shouldStop;
183 }
184
185 // Implementation methods
186 //-------------------------------------------------------------------------
187
188 /**
189 * Starts consuming messages
190 */
191 protected void startConsumer() throws JMSException {
192 consumer = createConsumer();
193 }
194
195 /**
196 * Stops consuming messages
197 */
198 protected void stopConsumer() throws JMSException {
199 consumer.close();
200 }
201
202 /**
203 * Factory method to create a new MessageConsumer
204 */
205 protected MessageConsumer createConsumer() throws JMSException {
206 String selector = getSelector();
207 if (selector != null) {
208 return getMessenger().createConsumer(getDestination(), selector);
209 }
210 else {
211 return getMessenger().createConsumer(getDestination());
212 }
213 }
214
215 /**
216 * Strategy method to consume a message using a receive() kind of method.
217 * @return the message or null if a message could not be found after waiting for
218 * some period of time.
219 */
220 private Message receive() throws JMSException {
221 return getConsumer().receive();
222 }
223
224 /**
225 * Strategy method to process a given message.
226 * By default this will just invoke the MessageListener
227 */
228 protected void processMessage(Message message) throws JMSException {
229 MessageListener listener = getListener();
230 if (listener != null) {
231 listener.onMessage(message);
232 }
233 }
234
235
236 /**
237 * Strategy method to represent the code required to start
238 * a transaction.
239 */
240 protected void startTransaction() throws Exception {
241 }
242
243 /**
244 * Strategy method to represent the code required to commit
245 * a transaction.
246 */
247 protected void commitTransaction() throws Exception {
248 }
249
250 /**
251 * Strategy method to represent the code required to rollback
252 * a transaction.
253 */
254 protected void rollbackTransaction(Exception e) {
255 }
256
257 /**
258 * Strategy method to represent the code required to cancel
259 * a transaction.
260 * This is called when a message is not received.
261 */
262 protected void cancelTransaction() throws Exception {
263 }
264
265
266 /**
267 * @erturn the consumer of messages
268 */
269 protected MessageConsumer getConsumer() {
270 return consumer;
271 }
272 }