View Javadoc

1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    * 
8    * $Id: BridgeMDO.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messagelet;
11  
12  import java.util.Enumeration;
13  
14  import javax.jms.BytesMessage;
15  import javax.jms.Destination;
16  import javax.jms.JMSException;
17  import javax.jms.MapMessage;
18  import javax.jms.Message;
19  import javax.jms.ObjectMessage;
20  import javax.jms.StreamMessage;
21  import javax.jms.TextMessage;
22  import javax.servlet.ServletException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.commons.messenger.Messenger;
27  
28  
29  /** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge
30   * from one JMS destination and connection to another.
31   * This allows messages to be consumed on one destination and sent to 
32   * another JMS destination, using possibly different JMS providers.
33   * For example this can be used to bridge from SpiritWave to MQSeries. 
34   * </p>
35   * <p>
36   * This class is a useful base class to other possible bridge implementations
37   * such as 2 phase commit bridges or bridges with some complex transformation etc.
38   * This class has a number of Factory and Strategy methods to further customize
39   * the acknowledgement and transaction management, the message construction, 
40   * transformation and how to handle message headers etc.
41   * </p>
42   *
43   * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
44   * @version $Revision: 155459 $
45   */
46  public class BridgeMDO extends MessengerMDO {
47  
48      /** Logger */
49      private static final Log log = LogFactory.getLog(BridgeMDO.class);
50    
51      /** the Messenger used to output messages */
52      private Messenger outputMessenger;
53        
54      /** the Destination output messages will be sent to */
55      private Destination outputDestination;
56  
57      /** the name of the messenger to use for output */      
58      private String outputConnection;
59      
60      /** the name of the destination to use */
61      private String outputSubject;
62      
63      /** the buffer size used for ByteMessage and StreamMessage copying */
64      private int bufferSize = 32 * 1024;
65  
66      /** should this MDO work in transacted mode */
67      private boolean transacted = false;
68      
69      
70      public BridgeMDO() {
71      }
72      
73      public void init() throws ServletException {
74          try {
75              Messenger messenger = getMessenger();
76              Messenger outputMessenger = getOutputMessenger();
77              
78              if ( messenger == null ) {
79                  throw new ServletException( "No input Messenger is defined for this Bridge" );
80              }
81              if ( outputMessenger == null ) {
82                  throw new ServletException( "No output Messenger is defined for this Bridge" );
83              }
84              
85              // enable transacted mode 
86              boolean tran1 = messenger.getSessionFactory().isTransacted();
87              boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
88              
89              if ( tran1 != tran2 ) {
90                  throw new ServletException( 
91                      "Both the input and output Messenger must have the same transacted mode. "
92                      + "Input is: " + tran1 + " output is: " + tran2 
93                  );
94              }
95              transacted = tran1;
96              
97              // use client acknowledgement
98              
99              // ### This should be specified in the Messenger.xml file
100             //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
101             //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
102 
103             validateOutputDestination();
104             
105         }
106         catch (JMSException e) {
107             log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
108             log.error( "Caught: " + e, e);
109             throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
110         }
111     }
112     
113     // MessageListener interface
114     //-------------------------------------------------------------------------
115     public void onMessage(Message message) {
116         Messenger messenger = getMessenger();
117         
118         try {
119             Message outputMessage = createOutputMessage(message);
120             if ( outputMessage != null ) {
121                 Destination destination = getOutputDestination();
122                 
123                 if ( log.isDebugEnabled() ) {
124                     log.debug( "Sending message to: " + destination );
125                 }
126                 
127                 getOutputMessenger().send( destination, outputMessage );
128             }
129             acknowledge(message);
130             acknowledge(outputMessage);
131             commit();
132         }
133         catch (Exception e) {
134             log.error("Could not send message due to exception", e);
135             rollback();
136         }
137     }
138     
139     
140     // Properties
141     //-------------------------------------------------------------------------
142     
143     /** 
144      * @return true if this MDO should work in transacted mode
145      */
146     public boolean isTransacted() {
147         return transacted;
148     }
149 
150     /**
151      * Sets whether this MDO should work in transacted mode
152      */    
153     public void setTransacted(boolean transacted) {
154         this.transacted = transacted;
155     }
156     
157         
158     public String getOutputConnection() {
159         return outputConnection;
160     }
161 
162     /**
163      * Sets the connection name (messenger instance) to use
164      * to output messages
165      */    
166     public void setOutputConnection(String outputConnection) {
167         this.outputConnection = outputConnection;
168     }
169     
170     public String getOutputSubject() {
171         return outputSubject;
172     }
173     
174     /** 
175      * Sets the subject (i.e. destination name) to send messages to
176      */
177     public void setOutputSubject(String outputSubject) {
178         this.outputSubject = outputSubject;
179     }
180     
181     /**
182      * Gets the Messenger used to output messages 
183      */
184     public Messenger getOutputMessenger() throws JMSException {
185         if ( outputMessenger == null ) {
186             String name = getOutputConnection();
187             if ( name != null ) {
188                 outputMessenger = getMessengerManager().getMessenger( name );
189             }
190             else {
191                 // default to the input messenger
192                 outputMessenger = getMessenger();
193             }
194         }
195         return outputMessenger;
196     }
197     
198     /**
199      * Sets the Messenger used to output messages 
200      */
201     public void setOutputMessenger(Messenger outputMessenger) {
202         this.outputMessenger = outputMessenger;
203     }
204     
205     /**
206      * Gets the Destination output messages will be sent to
207      */
208     public Destination getOutputDestination() throws JMSException {
209         if ( outputDestination == null ) {
210             String subject = getOutputSubject();
211             if ( subject == null ) {
212                 throw new JMSException( "A bridge must have an outputSubject defined!" );
213             }
214             outputDestination = getOutputMessenger().getDestination( subject );
215         }
216         return outputDestination;
217     }
218     
219     /**
220      * Sets the Destination output messages will be sent to
221      */
222     public void setOutputDestination(Destination outputDestination) {
223         this.outputDestination = outputDestination;
224     }
225     
226     /**
227      * Gets the buffer size used for ByteMessage and StreamMessage copying
228      */
229     public int getBufferSize() {
230         return bufferSize;
231     }
232 
233     /**
234      * Sets the buffer size used for ByteMessage and StreamMessage copying
235      */
236     public void setBufferSize(int bufferSize) {
237         this.bufferSize = bufferSize;
238     }
239     
240     
241     // Implementation methods
242     //-------------------------------------------------------------------------
243 
244     /**
245      * Strategy method to perform a commit() on both the incoming Messenger and the
246      * output Messenger.
247      */
248     protected void commit() throws JMSException {
249         if ( transacted ) {
250             Messenger outputMessenger = getOutputMessenger();
251             Messenger inputMessenger = getMessenger();
252             
253             if ( outputMessenger != inputMessenger ) {
254                 outputMessenger.commit();
255             }
256             inputMessenger.commit();
257         }
258     }
259 
260     /**
261      * Strategy method to perform a rollback() on both the incoming Messenger and the
262      * output Messenger.
263      */
264     protected void rollback() {
265         if ( transacted ) {
266             try {
267                 Messenger outputMessenger = getOutputMessenger();
268                 Messenger inputMessenger = getMessenger();
269                 
270                 if ( outputMessenger != inputMessenger ) {
271                         outputMessenger.rollback();
272                 }
273             }
274             catch (Exception e) {
275                 log.error( "Caught exception rolling back the output messenger: " + e, e );
276             }
277             
278             try {
279                 getMessenger().rollback();
280             }
281             catch (Exception e) {
282                 log.error( "Caught exception rolling back the input messenger: " + e, e );
283             }
284         }
285     }
286 
287     
288     /**
289      * Factory method to create an output message given an input message.
290      * Derived classes could override this method to perform any kind of 
291      * Message transformation.
292      */
293     protected Message createOutputMessage(Message inputMessage) throws JMSException {
294         Message outputMessage = null;
295         
296         if ( inputMessage instanceof TextMessage ) {
297             outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
298         }
299         else if ( inputMessage instanceof ObjectMessage ) {
300             outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
301         }
302         else if ( inputMessage instanceof MapMessage ) {
303             outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
304         }
305         else if ( inputMessage instanceof BytesMessage ) {
306             outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
307         }
308         else if ( inputMessage instanceof StreamMessage ) {
309             outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
310         }
311         else {
312             outputMessage = getOutputMessenger().createMessage();
313         }
314         
315         processMessageHeaders(inputMessage, outputMessage);
316         
317         return outputMessage;
318     }
319         
320     /**
321      * Factory method to create ObjectMessage 
322      * Derived classes could override this method to perform any kind of 
323      * Message transformation.
324      */
325     protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
326         return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
327     }
328     
329     /**
330      * Factory method to create TextMessage 
331      * Derived classes could override this method to perform any kind of 
332      * Message transformation.
333      */
334     protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
335         return getOutputMessenger().createTextMessage( inputMessage.getText() );
336     }
337     
338     /**
339      * Factory method to create MapMessage 
340      * Derived classes could override this method to perform any kind of 
341      * Message transformation.
342      */
343     protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
344         MapMessage answer = getOutputMessenger().createMapMessage();
345         
346         // copy across all values
347         for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) {
348             String name = (String) e.nextElement();
349             Object value = inputMessage.getObject( name );
350             answer.setObject( name, value );
351         }
352         return answer;
353     }
354     
355     /**
356      * Factory method to create BytesMessage 
357      * Derived classes could override this method to perform any kind of 
358      * Message transformation.
359      */
360     protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
361         BytesMessage answer = getOutputMessenger().createBytesMessage();
362         
363         // copy across all data
364         byte[] buffer = new byte[bufferSize];
365         while (true ) {
366             int size = inputMessage.readBytes( buffer );
367             if ( size <= 0 ) {
368                 break;
369             }
370             answer.writeBytes( buffer, 0, size );
371             if ( size < bufferSize ) {
372                 break;
373             }
374         }
375         return answer;
376     }
377     
378     /**
379      * Factory method to create StreamMessage 
380      * Derived classes could override this method to perform any kind of 
381      * Message transformation.
382      */
383     protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
384         StreamMessage answer = getOutputMessenger().createStreamMessage();
385         
386         // copy across all data
387         byte[] buffer = new byte[bufferSize];
388         while (true ) {
389             int size = inputMessage.readBytes( buffer );
390             if ( size <= 0 ) {
391                 break;
392             }
393             answer.writeBytes( buffer, 0, size );
394             if ( size < bufferSize ) {
395                 break;
396             }
397         }
398         return answer;
399     }
400     
401     
402     
403     /**
404      * Strategy method to add any headers required on the output message.
405      * Derived classes could override this method to perform any kind of 
406      * header processing, such as copying the correlation ID, copying all
407      * headers or adding some new custom headers etc.
408      */
409     protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
410     }
411      
412     /**
413      * Strategy method to allow different derived classes to acknowledge
414      * messages differently, such as to disable acknowledgements
415      */
416     protected void acknowledge(Message message) throws JMSException {
417         message.acknowledge();
418     }            
419     
420     /**
421      * Validates that there is a valid output destintation that we can use.
422      * Derivations of this class could use multiple destinations
423      */
424     protected void validateOutputDestination() throws JMSException, ServletException {
425         if ( getOutputDestination() == null ) {
426             throw new ServletException( "No output Destination is defined for this Bridge" );
427         }
428     }
429 }
430 
431