001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: ConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messagelet;
011    
012    import javax.jms.Destination;
013    import javax.jms.JMSException;
014    import javax.jms.Message;
015    import javax.jms.MessageConsumer;
016    import javax.jms.MessageListener;
017    
018    import org.apache.commons.logging.Log;
019    import org.apache.commons.logging.LogFactory;
020    import org.apache.commons.messenger.Messenger;
021    
022    /** 
023     * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages
024     * using a receive() method on Messenger and then process the message.
025     * This class is a good base class when implementing some kind of transactional processing of 
026     * JMS messages
027     *
028     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
029     * @version $Revision: 155459 $
030     */
031    public class ConsumerThread extends Thread {
032    
033        /** Logger */
034        private static final Log log = LogFactory.getLog(ConsumerThread.class);
035    
036    
037        private MessageConsumer consumer;
038        private Messenger messenger;
039        private Destination destination;
040        private String selector;
041        private MessageListener listener;
042        private boolean shouldStop;
043        
044        public ConsumerThread() {
045            setName("Consumer" + getName());
046        }
047    
048    
049        /**
050         * Starts all the JMS connections and consumes JMS messages, 
051         * passing them onto the MessageListener and Message Driven Objects
052         */
053        public void run() {
054            if (log.isDebugEnabled()) {
055                log.debug( "Starting consumer thread: " + getName());
056            }
057            try {
058                startConsumer();
059            }
060            catch (JMSException e) {
061                log.error("Failed to start consumer thread: " + e, e);
062                setShouldStop(true);
063            }
064            
065            while (! isShouldStop()) {
066                    try {
067                        startTransaction();
068                    }
069                    catch (Exception e) {
070                            log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
071                            break;
072                    }
073    
074                try {
075                    Message message = receive();
076    
077                    if (log.isTraceEnabled()) {
078                        log.trace( "Found: " + message );
079                    }
080                    
081                    if (message != null) {
082                        processMessage(message);
083                        commitTransaction();
084                    }
085                    else {
086                        cancelTransaction();
087                    }
088                }
089                catch (Exception e) {
090                    rollbackTransaction(e);
091                }
092            }
093            
094            try {
095                stopConsumer();
096            }
097            catch (JMSException e) {
098                log.error("Failed to stop consuming messages: " + e, e);
099            }
100        }
101        
102        // Properties
103        //-------------------------------------------------------------------------    
104    
105        /**
106         * Returns the destination.
107         * @return Destination
108         */
109        public Destination getDestination() {
110            return destination;
111        }
112    
113        /**
114         * Returns the listener.
115         * @return MessageListener
116         */
117        public MessageListener getListener() {
118            return listener;
119        }
120    
121        /**
122         * Returns the messenger.
123         * @return Messenger
124         */
125        public Messenger getMessenger() {
126            return messenger;
127        }
128    
129        /**
130         * Returns the selector.
131         * @return String
132         */
133        public String getSelector() {
134            return selector;
135        }
136    
137        /**
138         * Returns the shouldStop.
139         * @return boolean
140         */
141        public boolean isShouldStop() {
142            return shouldStop;
143        }
144    
145        /**
146         * Sets the destination.
147         * @param destination The destination to set
148         */
149        public void setDestination(Destination destination) {
150            this.destination = destination;
151        }
152    
153        /**
154         * Sets the listener.
155         * @param listener The listener to set
156         */
157        public void setListener(MessageListener listener) {
158            this.listener = listener;
159        }
160    
161        /**
162         * Sets the messenger.
163         * @param messenger The messenger to set
164         */
165        public void setMessenger(Messenger messenger) {
166            this.messenger = messenger;
167        }
168    
169        /**
170         * Sets the selector.
171         * @param selector The selector to set
172         */
173        public void setSelector(String selector) {
174            this.selector = selector;
175        }
176    
177        /**
178         * Sets the shouldStop.
179         * @param shouldStop The shouldStop to set
180         */
181        public void setShouldStop(boolean shouldStop) {
182            this.shouldStop = shouldStop;
183        }
184    
185        // Implementation methods
186        //-------------------------------------------------------------------------    
187        
188        /**
189         * Starts consuming messages        
190         */    
191        protected void startConsumer() throws JMSException {
192            consumer = createConsumer();
193        }
194    
195        /**
196         * Stops consuming messages        
197         */    
198        protected void stopConsumer() throws JMSException {
199            consumer.close();
200        }
201    
202        /**
203         * Factory method to create a new MessageConsumer 
204         */
205        protected MessageConsumer createConsumer() throws JMSException {
206            String selector = getSelector();
207            if (selector != null) {
208                return getMessenger().createConsumer(getDestination(), selector);
209            }
210            else {
211                return getMessenger().createConsumer(getDestination());
212            }
213        }
214    
215        /**
216         * Strategy method to consume a message using a receive() kind of method.
217         * @return the message or null if a message could not be found after waiting for
218         * some period of time.
219         */
220        private Message receive() throws JMSException {
221            return getConsumer().receive();
222        }    
223        
224        /**
225         * Strategy method to process a given message. 
226         * By default this will just invoke the MessageListener
227         */
228        protected void processMessage(Message message) throws JMSException {
229            MessageListener listener = getListener();
230            if (listener != null) {
231                listener.onMessage(message);
232            }
233        }
234    
235    
236        /**
237         * Strategy method to represent the code required to start
238         * a transaction.
239         */
240        protected void startTransaction() throws Exception {
241        }
242    
243        /**
244         * Strategy method to represent the code required to commit
245         * a transaction.
246         */
247        protected void commitTransaction() throws Exception {
248        }
249    
250        /**
251         * Strategy method to represent the code required to rollback
252         * a transaction.
253         */
254        protected void rollbackTransaction(Exception e) {
255        }
256    
257        /**
258         * Strategy method to represent the code required to cancel
259         * a transaction. 
260         * This is called when a message is not received.
261         */
262        protected void cancelTransaction() throws Exception {
263        }
264    
265    
266        /**
267         * @erturn the consumer of messages 
268         */
269        protected MessageConsumer getConsumer() {
270            return consumer;
271        }
272    }