1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messagelet;
11
12 import java.util.Enumeration;
13
14 import javax.jms.BytesMessage;
15 import javax.jms.Destination;
16 import javax.jms.JMSException;
17 import javax.jms.MapMessage;
18 import javax.jms.Message;
19 import javax.jms.ObjectMessage;
20 import javax.jms.StreamMessage;
21 import javax.jms.TextMessage;
22 import javax.servlet.ServletException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.commons.messenger.Messenger;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public class BridgeMDO extends MessengerMDO {
47
48
49 private static final Log log = LogFactory.getLog(BridgeMDO.class);
50
51
52 private Messenger outputMessenger;
53
54
55 private Destination outputDestination;
56
57
58 private String outputConnection;
59
60
61 private String outputSubject;
62
63
64 private int bufferSize = 32 * 1024;
65
66
67 private boolean transacted = false;
68
69
70 public BridgeMDO() {
71 }
72
73 public void init() throws ServletException {
74 try {
75 Messenger messenger = getMessenger();
76 Messenger outputMessenger = getOutputMessenger();
77
78 if ( messenger == null ) {
79 throw new ServletException( "No input Messenger is defined for this Bridge" );
80 }
81 if ( outputMessenger == null ) {
82 throw new ServletException( "No output Messenger is defined for this Bridge" );
83 }
84
85
86 boolean tran1 = messenger.getSessionFactory().isTransacted();
87 boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
88
89 if ( tran1 != tran2 ) {
90 throw new ServletException(
91 "Both the input and output Messenger must have the same transacted mode. "
92 + "Input is: " + tran1 + " output is: " + tran2
93 );
94 }
95 transacted = tran1;
96
97
98
99
100
101
102
103 validateOutputDestination();
104
105 }
106 catch (JMSException e) {
107 log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
108 log.error( "Caught: " + e, e);
109 throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
110 }
111 }
112
113
114
115 public void onMessage(Message message) {
116 Messenger messenger = getMessenger();
117
118 try {
119 Message outputMessage = createOutputMessage(message);
120 if ( outputMessage != null ) {
121 Destination destination = getOutputDestination();
122
123 if ( log.isDebugEnabled() ) {
124 log.debug( "Sending message to: " + destination );
125 }
126
127 getOutputMessenger().send( destination, outputMessage );
128 }
129 acknowledge(message);
130 acknowledge(outputMessage);
131 commit();
132 }
133 catch (Exception e) {
134 log.error("Could not send message due to exception", e);
135 rollback();
136 }
137 }
138
139
140
141
142
143
144
145
146 public boolean isTransacted() {
147 return transacted;
148 }
149
150
151
152
153 public void setTransacted(boolean transacted) {
154 this.transacted = transacted;
155 }
156
157
158 public String getOutputConnection() {
159 return outputConnection;
160 }
161
162
163
164
165
166 public void setOutputConnection(String outputConnection) {
167 this.outputConnection = outputConnection;
168 }
169
170 public String getOutputSubject() {
171 return outputSubject;
172 }
173
174
175
176
177 public void setOutputSubject(String outputSubject) {
178 this.outputSubject = outputSubject;
179 }
180
181
182
183
184 public Messenger getOutputMessenger() throws JMSException {
185 if ( outputMessenger == null ) {
186 String name = getOutputConnection();
187 if ( name != null ) {
188 outputMessenger = getMessengerManager().getMessenger( name );
189 }
190 else {
191
192 outputMessenger = getMessenger();
193 }
194 }
195 return outputMessenger;
196 }
197
198
199
200
201 public void setOutputMessenger(Messenger outputMessenger) {
202 this.outputMessenger = outputMessenger;
203 }
204
205
206
207
208 public Destination getOutputDestination() throws JMSException {
209 if ( outputDestination == null ) {
210 String subject = getOutputSubject();
211 if ( subject == null ) {
212 throw new JMSException( "A bridge must have an outputSubject defined!" );
213 }
214 outputDestination = getOutputMessenger().getDestination( subject );
215 }
216 return outputDestination;
217 }
218
219
220
221
222 public void setOutputDestination(Destination outputDestination) {
223 this.outputDestination = outputDestination;
224 }
225
226
227
228
229 public int getBufferSize() {
230 return bufferSize;
231 }
232
233
234
235
236 public void setBufferSize(int bufferSize) {
237 this.bufferSize = bufferSize;
238 }
239
240
241
242
243
244
245
246
247
248 protected void commit() throws JMSException {
249 if ( transacted ) {
250 Messenger outputMessenger = getOutputMessenger();
251 Messenger inputMessenger = getMessenger();
252
253 if ( outputMessenger != inputMessenger ) {
254 outputMessenger.commit();
255 }
256 inputMessenger.commit();
257 }
258 }
259
260
261
262
263
264 protected void rollback() {
265 if ( transacted ) {
266 try {
267 Messenger outputMessenger = getOutputMessenger();
268 Messenger inputMessenger = getMessenger();
269
270 if ( outputMessenger != inputMessenger ) {
271 outputMessenger.rollback();
272 }
273 }
274 catch (Exception e) {
275 log.error( "Caught exception rolling back the output messenger: " + e, e );
276 }
277
278 try {
279 getMessenger().rollback();
280 }
281 catch (Exception e) {
282 log.error( "Caught exception rolling back the input messenger: " + e, e );
283 }
284 }
285 }
286
287
288
289
290
291
292
293 protected Message createOutputMessage(Message inputMessage) throws JMSException {
294 Message outputMessage = null;
295
296 if ( inputMessage instanceof TextMessage ) {
297 outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
298 }
299 else if ( inputMessage instanceof ObjectMessage ) {
300 outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
301 }
302 else if ( inputMessage instanceof MapMessage ) {
303 outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
304 }
305 else if ( inputMessage instanceof BytesMessage ) {
306 outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
307 }
308 else if ( inputMessage instanceof StreamMessage ) {
309 outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
310 }
311 else {
312 outputMessage = getOutputMessenger().createMessage();
313 }
314
315 processMessageHeaders(inputMessage, outputMessage);
316
317 return outputMessage;
318 }
319
320
321
322
323
324
325 protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
326 return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
327 }
328
329
330
331
332
333
334 protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
335 return getOutputMessenger().createTextMessage( inputMessage.getText() );
336 }
337
338
339
340
341
342
343 protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
344 MapMessage answer = getOutputMessenger().createMapMessage();
345
346
347 for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) {
348 String name = (String) e.nextElement();
349 Object value = inputMessage.getObject( name );
350 answer.setObject( name, value );
351 }
352 return answer;
353 }
354
355
356
357
358
359
360 protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
361 BytesMessage answer = getOutputMessenger().createBytesMessage();
362
363
364 byte[] buffer = new byte[bufferSize];
365 while (true ) {
366 int size = inputMessage.readBytes( buffer );
367 if ( size <= 0 ) {
368 break;
369 }
370 answer.writeBytes( buffer, 0, size );
371 if ( size < bufferSize ) {
372 break;
373 }
374 }
375 return answer;
376 }
377
378
379
380
381
382
383 protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
384 StreamMessage answer = getOutputMessenger().createStreamMessage();
385
386
387 byte[] buffer = new byte[bufferSize];
388 while (true ) {
389 int size = inputMessage.readBytes( buffer );
390 if ( size <= 0 ) {
391 break;
392 }
393 answer.writeBytes( buffer, 0, size );
394 if ( size < bufferSize ) {
395 break;
396 }
397 }
398 return answer;
399 }
400
401
402
403
404
405
406
407
408
409 protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
410 }
411
412
413
414
415
416 protected void acknowledge(Message message) throws JMSException {
417 message.acknowledge();
418 }
419
420
421
422
423
424 protected void validateOutputDestination() throws JMSException, ServletException {
425 if ( getOutputDestination() == null ) {
426 throw new ServletException( "No output Destination is defined for this Bridge" );
427 }
428 }
429 }
430
431