1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messagelet;
11
12 import javax.jms.Destination;
13 import javax.jms.JMSException;
14 import javax.jms.Message;
15 import javax.jms.MessageConsumer;
16 import javax.jms.MessageListener;
17
18 import org.apache.commons.logging.Log;
19 import org.apache.commons.logging.LogFactory;
20 import org.apache.commons.messenger.Messenger;
21
22
23
24
25
26
27
28
29
30
31 public class ConsumerThread extends Thread {
32
33
34 private static final Log log = LogFactory.getLog(ConsumerThread.class);
35
36
37 private MessageConsumer consumer;
38 private Messenger messenger;
39 private Destination destination;
40 private String selector;
41 private MessageListener listener;
42 private boolean shouldStop;
43
44 public ConsumerThread() {
45 setName("Consumer" + getName());
46 }
47
48
49
50
51
52
53 public void run() {
54 if (log.isDebugEnabled()) {
55 log.debug( "Starting consumer thread: " + getName());
56 }
57 try {
58 startConsumer();
59 }
60 catch (JMSException e) {
61 log.error("Failed to start consumer thread: " + e, e);
62 setShouldStop(true);
63 }
64
65 while (! isShouldStop()) {
66 try {
67 startTransaction();
68 }
69 catch (Exception e) {
70 log.error("Caught exception trying to start transaction. This thread will terminate. Reason: " + e, e);
71 break;
72 }
73
74 try {
75 Message message = receive();
76
77 if (log.isTraceEnabled()) {
78 log.trace( "Found: " + message );
79 }
80
81 if (message != null) {
82 processMessage(message);
83 commitTransaction();
84 }
85 else {
86 cancelTransaction();
87 }
88 }
89 catch (Exception e) {
90 rollbackTransaction(e);
91 }
92 }
93
94 try {
95 stopConsumer();
96 }
97 catch (JMSException e) {
98 log.error("Failed to stop consuming messages: " + e, e);
99 }
100 }
101
102
103
104
105
106
107
108
109 public Destination getDestination() {
110 return destination;
111 }
112
113
114
115
116
117 public MessageListener getListener() {
118 return listener;
119 }
120
121
122
123
124
125 public Messenger getMessenger() {
126 return messenger;
127 }
128
129
130
131
132
133 public String getSelector() {
134 return selector;
135 }
136
137
138
139
140
141 public boolean isShouldStop() {
142 return shouldStop;
143 }
144
145
146
147
148
149 public void setDestination(Destination destination) {
150 this.destination = destination;
151 }
152
153
154
155
156
157 public void setListener(MessageListener listener) {
158 this.listener = listener;
159 }
160
161
162
163
164
165 public void setMessenger(Messenger messenger) {
166 this.messenger = messenger;
167 }
168
169
170
171
172
173 public void setSelector(String selector) {
174 this.selector = selector;
175 }
176
177
178
179
180
181 public void setShouldStop(boolean shouldStop) {
182 this.shouldStop = shouldStop;
183 }
184
185
186
187
188
189
190
191 protected void startConsumer() throws JMSException {
192 consumer = createConsumer();
193 }
194
195
196
197
198 protected void stopConsumer() throws JMSException {
199 consumer.close();
200 }
201
202
203
204
205 protected MessageConsumer createConsumer() throws JMSException {
206 String selector = getSelector();
207 if (selector != null) {
208 return getMessenger().createConsumer(getDestination(), selector);
209 }
210 else {
211 return getMessenger().createConsumer(getDestination());
212 }
213 }
214
215
216
217
218
219
220 private Message receive() throws JMSException {
221 return getConsumer().receive();
222 }
223
224
225
226
227
228 protected void processMessage(Message message) throws JMSException {
229 MessageListener listener = getListener();
230 if (listener != null) {
231 listener.onMessage(message);
232 }
233 }
234
235
236
237
238
239
240 protected void startTransaction() throws Exception {
241 }
242
243
244
245
246
247 protected void commitTransaction() throws Exception {
248 }
249
250
251
252
253
254 protected void rollbackTransaction(Exception e) {
255 }
256
257
258
259
260
261
262 protected void cancelTransaction() throws Exception {
263 }
264
265
266
267
268
269 protected MessageConsumer getConsumer() {
270 return consumer;
271 }
272 }