001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messagelet; 011 012 import javax.jms.Destination; 013 import javax.jms.JMSException; 014 import javax.jms.Message; 015 import javax.jms.MessageConsumer; 016 import javax.jms.MessageListener; 017 018 import org.apache.commons.logging.Log; 019 import org.apache.commons.logging.LogFactory; 020 import org.apache.commons.messenger.Messenger; 021 022 /** 023 * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages 024 * using a receive() method on Messenger and then process the message. 025 * This class is a good base class when implementing some kind of transactional processing of 026 * JMS messages 027 * 028 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 029 * @version $Revision: 155459 $ 030 */ 031 public class ConsumerThread extends Thread { 032 033 /** Logger */ 034 private static final Log log = LogFactory.getLog(ConsumerThread.class); 035 036 037 private MessageConsumer consumer; 038 private Messenger messenger; 039 private Destination destination; 040 private String selector; 041 private MessageListener listener; 042 private boolean shouldStop; 043 044 public ConsumerThread() { 045 setName("Consumer" + getName()); 046 } 047 048 049 /** 050 * Starts all the JMS connections and consumes JMS messages, 051 * passing them onto the MessageListener and Message Driven Objects 052 */ 053 public void run() { 054 if (log.isDebugEnabled()) { 055 log.debug( "Starting consumer thread: " + getName()); 056 } 057 try { 058 startConsumer(); 059 } 060 catch (JMSException e) { 061 log.error("Failed to start consumer thread: " + e, e); 062 setShouldStop(true); 063 } 064 065 while (! isShouldStop()) { 066 try { 067 startTransaction(); 068 } 069 catch (Exception e) { 070 log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e); 071 break; 072 } 073 074 try { 075 Message message = receive(); 076 077 if (log.isTraceEnabled()) { 078 log.trace( "Found: " + message ); 079 } 080 081 if (message != null) { 082 processMessage(message); 083 commitTransaction(); 084 } 085 else { 086 cancelTransaction(); 087 } 088 } 089 catch (Exception e) { 090 rollbackTransaction(e); 091 } 092 } 093 094 try { 095 stopConsumer(); 096 } 097 catch (JMSException e) { 098 log.error("Failed to stop consuming messages: " + e, e); 099 } 100 } 101 102 // Properties 103 //------------------------------------------------------------------------- 104 105 /** 106 * Returns the destination. 107 * @return Destination 108 */ 109 public Destination getDestination() { 110 return destination; 111 } 112 113 /** 114 * Returns the listener. 115 * @return MessageListener 116 */ 117 public MessageListener getListener() { 118 return listener; 119 } 120 121 /** 122 * Returns the messenger. 123 * @return Messenger 124 */ 125 public Messenger getMessenger() { 126 return messenger; 127 } 128 129 /** 130 * Returns the selector. 131 * @return String 132 */ 133 public String getSelector() { 134 return selector; 135 } 136 137 /** 138 * Returns the shouldStop. 139 * @return boolean 140 */ 141 public boolean isShouldStop() { 142 return shouldStop; 143 } 144 145 /** 146 * Sets the destination. 147 * @param destination The destination to set 148 */ 149 public void setDestination(Destination destination) { 150 this.destination = destination; 151 } 152 153 /** 154 * Sets the listener. 155 * @param listener The listener to set 156 */ 157 public void setListener(MessageListener listener) { 158 this.listener = listener; 159 } 160 161 /** 162 * Sets the messenger. 163 * @param messenger The messenger to set 164 */ 165 public void setMessenger(Messenger messenger) { 166 this.messenger = messenger; 167 } 168 169 /** 170 * Sets the selector. 171 * @param selector The selector to set 172 */ 173 public void setSelector(String selector) { 174 this.selector = selector; 175 } 176 177 /** 178 * Sets the shouldStop. 179 * @param shouldStop The shouldStop to set 180 */ 181 public void setShouldStop(boolean shouldStop) { 182 this.shouldStop = shouldStop; 183 } 184 185 // Implementation methods 186 //------------------------------------------------------------------------- 187 188 /** 189 * Starts consuming messages 190 */ 191 protected void startConsumer() throws JMSException { 192 consumer = createConsumer(); 193 } 194 195 /** 196 * Stops consuming messages 197 */ 198 protected void stopConsumer() throws JMSException { 199 consumer.close(); 200 } 201 202 /** 203 * Factory method to create a new MessageConsumer 204 */ 205 protected MessageConsumer createConsumer() throws JMSException { 206 String selector = getSelector(); 207 if (selector != null) { 208 return getMessenger().createConsumer(getDestination(), selector); 209 } 210 else { 211 return getMessenger().createConsumer(getDestination()); 212 } 213 } 214 215 /** 216 * Strategy method to consume a message using a receive() kind of method. 217 * @return the message or null if a message could not be found after waiting for 218 * some period of time. 219 */ 220 private Message receive() throws JMSException { 221 return getConsumer().receive(); 222 } 223 224 /** 225 * Strategy method to process a given message. 226 * By default this will just invoke the MessageListener 227 */ 228 protected void processMessage(Message message) throws JMSException { 229 MessageListener listener = getListener(); 230 if (listener != null) { 231 listener.onMessage(message); 232 } 233 } 234 235 236 /** 237 * Strategy method to represent the code required to start 238 * a transaction. 239 */ 240 protected void startTransaction() throws Exception { 241 } 242 243 /** 244 * Strategy method to represent the code required to commit 245 * a transaction. 246 */ 247 protected void commitTransaction() throws Exception { 248 } 249 250 /** 251 * Strategy method to represent the code required to rollback 252 * a transaction. 253 */ 254 protected void rollbackTransaction(Exception e) { 255 } 256 257 /** 258 * Strategy method to represent the code required to cancel 259 * a transaction. 260 * This is called when a message is not received. 261 */ 262 protected void cancelTransaction() throws Exception { 263 } 264 265 266 /** 267 * @erturn the consumer of messages 268 */ 269 protected MessageConsumer getConsumer() { 270 return consumer; 271 } 272 }