001 /*
002 * Copyright (C) The Apache Software Foundation. All rights reserved.
003 *
004 * This software is published under the terms of the Apache Software License
005 * version 1.1, a copy of which has been included with this distribution in
006 * the LICENSE file.
007 *
008 * $Id: BridgeMDO.java 155459 2005-02-26 13:24:44Z dirkv $
009 */
010 package org.apache.commons.messagelet;
011
012 import java.util.Enumeration;
013
014 import javax.jms.BytesMessage;
015 import javax.jms.Destination;
016 import javax.jms.JMSException;
017 import javax.jms.MapMessage;
018 import javax.jms.Message;
019 import javax.jms.ObjectMessage;
020 import javax.jms.StreamMessage;
021 import javax.jms.TextMessage;
022 import javax.servlet.ServletException;
023
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.apache.commons.messenger.Messenger;
027
028
029 /** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge
030 * from one JMS destination and connection to another.
031 * This allows messages to be consumed on one destination and sent to
032 * another JMS destination, using possibly different JMS providers.
033 * For example this can be used to bridge from SpiritWave to MQSeries.
034 * </p>
035 * <p>
036 * This class is a useful base class to other possible bridge implementations
037 * such as 2 phase commit bridges or bridges with some complex transformation etc.
038 * This class has a number of Factory and Strategy methods to further customize
039 * the acknowledgement and transaction management, the message construction,
040 * transformation and how to handle message headers etc.
041 * </p>
042 *
043 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
044 * @version $Revision: 155459 $
045 */
046 public class BridgeMDO extends MessengerMDO {
047
048 /** Logger */
049 private static final Log log = LogFactory.getLog(BridgeMDO.class);
050
051 /** the Messenger used to output messages */
052 private Messenger outputMessenger;
053
054 /** the Destination output messages will be sent to */
055 private Destination outputDestination;
056
057 /** the name of the messenger to use for output */
058 private String outputConnection;
059
060 /** the name of the destination to use */
061 private String outputSubject;
062
063 /** the buffer size used for ByteMessage and StreamMessage copying */
064 private int bufferSize = 32 * 1024;
065
066 /** should this MDO work in transacted mode */
067 private boolean transacted = false;
068
069
070 public BridgeMDO() {
071 }
072
073 public void init() throws ServletException {
074 try {
075 Messenger messenger = getMessenger();
076 Messenger outputMessenger = getOutputMessenger();
077
078 if ( messenger == null ) {
079 throw new ServletException( "No input Messenger is defined for this Bridge" );
080 }
081 if ( outputMessenger == null ) {
082 throw new ServletException( "No output Messenger is defined for this Bridge" );
083 }
084
085 // enable transacted mode
086 boolean tran1 = messenger.getSessionFactory().isTransacted();
087 boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
088
089 if ( tran1 != tran2 ) {
090 throw new ServletException(
091 "Both the input and output Messenger must have the same transacted mode. "
092 + "Input is: " + tran1 + " output is: " + tran2
093 );
094 }
095 transacted = tran1;
096
097 // use client acknowledgement
098
099 // ### This should be specified in the Messenger.xml file
100 //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );
101 //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );
102
103 validateOutputDestination();
104
105 }
106 catch (JMSException e) {
107 log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
108 log.error( "Caught: " + e, e);
109 throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
110 }
111 }
112
113 // MessageListener interface
114 //-------------------------------------------------------------------------
115 public void onMessage(Message message) {
116 Messenger messenger = getMessenger();
117
118 try {
119 Message outputMessage = createOutputMessage(message);
120 if ( outputMessage != null ) {
121 Destination destination = getOutputDestination();
122
123 if ( log.isDebugEnabled() ) {
124 log.debug( "Sending message to: " + destination );
125 }
126
127 getOutputMessenger().send( destination, outputMessage );
128 }
129 acknowledge(message);
130 acknowledge(outputMessage);
131 commit();
132 }
133 catch (Exception e) {
134 log.error("Could not send message due to exception", e);
135 rollback();
136 }
137 }
138
139
140 // Properties
141 //-------------------------------------------------------------------------
142
143 /**
144 * @return true if this MDO should work in transacted mode
145 */
146 public boolean isTransacted() {
147 return transacted;
148 }
149
150 /**
151 * Sets whether this MDO should work in transacted mode
152 */
153 public void setTransacted(boolean transacted) {
154 this.transacted = transacted;
155 }
156
157
158 public String getOutputConnection() {
159 return outputConnection;
160 }
161
162 /**
163 * Sets the connection name (messenger instance) to use
164 * to output messages
165 */
166 public void setOutputConnection(String outputConnection) {
167 this.outputConnection = outputConnection;
168 }
169
170 public String getOutputSubject() {
171 return outputSubject;
172 }
173
174 /**
175 * Sets the subject (i.e. destination name) to send messages to
176 */
177 public void setOutputSubject(String outputSubject) {
178 this.outputSubject = outputSubject;
179 }
180
181 /**
182 * Gets the Messenger used to output messages
183 */
184 public Messenger getOutputMessenger() throws JMSException {
185 if ( outputMessenger == null ) {
186 String name = getOutputConnection();
187 if ( name != null ) {
188 outputMessenger = getMessengerManager().getMessenger( name );
189 }
190 else {
191 // default to the input messenger
192 outputMessenger = getMessenger();
193 }
194 }
195 return outputMessenger;
196 }
197
198 /**
199 * Sets the Messenger used to output messages
200 */
201 public void setOutputMessenger(Messenger outputMessenger) {
202 this.outputMessenger = outputMessenger;
203 }
204
205 /**
206 * Gets the Destination output messages will be sent to
207 */
208 public Destination getOutputDestination() throws JMSException {
209 if ( outputDestination == null ) {
210 String subject = getOutputSubject();
211 if ( subject == null ) {
212 throw new JMSException( "A bridge must have an outputSubject defined!" );
213 }
214 outputDestination = getOutputMessenger().getDestination( subject );
215 }
216 return outputDestination;
217 }
218
219 /**
220 * Sets the Destination output messages will be sent to
221 */
222 public void setOutputDestination(Destination outputDestination) {
223 this.outputDestination = outputDestination;
224 }
225
226 /**
227 * Gets the buffer size used for ByteMessage and StreamMessage copying
228 */
229 public int getBufferSize() {
230 return bufferSize;
231 }
232
233 /**
234 * Sets the buffer size used for ByteMessage and StreamMessage copying
235 */
236 public void setBufferSize(int bufferSize) {
237 this.bufferSize = bufferSize;
238 }
239
240
241 // Implementation methods
242 //-------------------------------------------------------------------------
243
244 /**
245 * Strategy method to perform a commit() on both the incoming Messenger and the
246 * output Messenger.
247 */
248 protected void commit() throws JMSException {
249 if ( transacted ) {
250 Messenger outputMessenger = getOutputMessenger();
251 Messenger inputMessenger = getMessenger();
252
253 if ( outputMessenger != inputMessenger ) {
254 outputMessenger.commit();
255 }
256 inputMessenger.commit();
257 }
258 }
259
260 /**
261 * Strategy method to perform a rollback() on both the incoming Messenger and the
262 * output Messenger.
263 */
264 protected void rollback() {
265 if ( transacted ) {
266 try {
267 Messenger outputMessenger = getOutputMessenger();
268 Messenger inputMessenger = getMessenger();
269
270 if ( outputMessenger != inputMessenger ) {
271 outputMessenger.rollback();
272 }
273 }
274 catch (Exception e) {
275 log.error( "Caught exception rolling back the output messenger: " + e, e );
276 }
277
278 try {
279 getMessenger().rollback();
280 }
281 catch (Exception e) {
282 log.error( "Caught exception rolling back the input messenger: " + e, e );
283 }
284 }
285 }
286
287
288 /**
289 * Factory method to create an output message given an input message.
290 * Derived classes could override this method to perform any kind of
291 * Message transformation.
292 */
293 protected Message createOutputMessage(Message inputMessage) throws JMSException {
294 Message outputMessage = null;
295
296 if ( inputMessage instanceof TextMessage ) {
297 outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
298 }
299 else if ( inputMessage instanceof ObjectMessage ) {
300 outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
301 }
302 else if ( inputMessage instanceof MapMessage ) {
303 outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
304 }
305 else if ( inputMessage instanceof BytesMessage ) {
306 outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
307 }
308 else if ( inputMessage instanceof StreamMessage ) {
309 outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
310 }
311 else {
312 outputMessage = getOutputMessenger().createMessage();
313 }
314
315 processMessageHeaders(inputMessage, outputMessage);
316
317 return outputMessage;
318 }
319
320 /**
321 * Factory method to create ObjectMessage
322 * Derived classes could override this method to perform any kind of
323 * Message transformation.
324 */
325 protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
326 return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
327 }
328
329 /**
330 * Factory method to create TextMessage
331 * Derived classes could override this method to perform any kind of
332 * Message transformation.
333 */
334 protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
335 return getOutputMessenger().createTextMessage( inputMessage.getText() );
336 }
337
338 /**
339 * Factory method to create MapMessage
340 * Derived classes could override this method to perform any kind of
341 * Message transformation.
342 */
343 protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
344 MapMessage answer = getOutputMessenger().createMapMessage();
345
346 // copy across all values
347 for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) {
348 String name = (String) e.nextElement();
349 Object value = inputMessage.getObject( name );
350 answer.setObject( name, value );
351 }
352 return answer;
353 }
354
355 /**
356 * Factory method to create BytesMessage
357 * Derived classes could override this method to perform any kind of
358 * Message transformation.
359 */
360 protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
361 BytesMessage answer = getOutputMessenger().createBytesMessage();
362
363 // copy across all data
364 byte[] buffer = new byte[bufferSize];
365 while (true ) {
366 int size = inputMessage.readBytes( buffer );
367 if ( size <= 0 ) {
368 break;
369 }
370 answer.writeBytes( buffer, 0, size );
371 if ( size < bufferSize ) {
372 break;
373 }
374 }
375 return answer;
376 }
377
378 /**
379 * Factory method to create StreamMessage
380 * Derived classes could override this method to perform any kind of
381 * Message transformation.
382 */
383 protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
384 StreamMessage answer = getOutputMessenger().createStreamMessage();
385
386 // copy across all data
387 byte[] buffer = new byte[bufferSize];
388 while (true ) {
389 int size = inputMessage.readBytes( buffer );
390 if ( size <= 0 ) {
391 break;
392 }
393 answer.writeBytes( buffer, 0, size );
394 if ( size < bufferSize ) {
395 break;
396 }
397 }
398 return answer;
399 }
400
401
402
403 /**
404 * Strategy method to add any headers required on the output message.
405 * Derived classes could override this method to perform any kind of
406 * header processing, such as copying the correlation ID, copying all
407 * headers or adding some new custom headers etc.
408 */
409 protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
410 }
411
412 /**
413 * Strategy method to allow different derived classes to acknowledge
414 * messages differently, such as to disable acknowledgements
415 */
416 protected void acknowledge(Message message) throws JMSException {
417 message.acknowledge();
418 }
419
420 /**
421 * Validates that there is a valid output destintation that we can use.
422 * Derivations of this class could use multiple destinations
423 */
424 protected void validateOutputDestination() throws JMSException, ServletException {
425 if ( getOutputDestination() == null ) {
426 throw new ServletException( "No output Destination is defined for this Bridge" );
427 }
428 }
429 }
430
431