001    /*
002     * Copyright (C) The Apache Software Foundation. All rights reserved.
003     *
004     * This software is published under the terms of the Apache Software License
005     * version 1.1, a copy of which has been included with this distribution in
006     * the LICENSE file.
007     * 
008     * $Id: BridgeMDO.java 155459 2005-02-26 13:24:44Z dirkv $
009     */
010    package org.apache.commons.messagelet;
011    
012    import java.util.Enumeration;
013    
014    import javax.jms.BytesMessage;
015    import javax.jms.Destination;
016    import javax.jms.JMSException;
017    import javax.jms.MapMessage;
018    import javax.jms.Message;
019    import javax.jms.ObjectMessage;
020    import javax.jms.StreamMessage;
021    import javax.jms.TextMessage;
022    import javax.servlet.ServletException;
023    
024    import org.apache.commons.logging.Log;
025    import org.apache.commons.logging.LogFactory;
026    import org.apache.commons.messenger.Messenger;
027    
028    
029    /** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge
030     * from one JMS destination and connection to another.
031     * This allows messages to be consumed on one destination and sent to 
032     * another JMS destination, using possibly different JMS providers.
033     * For example this can be used to bridge from SpiritWave to MQSeries. 
034     * </p>
035     * <p>
036     * This class is a useful base class to other possible bridge implementations
037     * such as 2 phase commit bridges or bridges with some complex transformation etc.
038     * This class has a number of Factory and Strategy methods to further customize
039     * the acknowledgement and transaction management, the message construction, 
040     * transformation and how to handle message headers etc.
041     * </p>
042     *
043     * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
044     * @version $Revision: 155459 $
045     */
046    public class BridgeMDO extends MessengerMDO {
047    
048        /** Logger */
049        private static final Log log = LogFactory.getLog(BridgeMDO.class);
050      
051        /** the Messenger used to output messages */
052        private Messenger outputMessenger;
053          
054        /** the Destination output messages will be sent to */
055        private Destination outputDestination;
056    
057        /** the name of the messenger to use for output */      
058        private String outputConnection;
059        
060        /** the name of the destination to use */
061        private String outputSubject;
062        
063        /** the buffer size used for ByteMessage and StreamMessage copying */
064        private int bufferSize = 32 * 1024;
065    
066        /** should this MDO work in transacted mode */
067        private boolean transacted = false;
068        
069        
070        public BridgeMDO() {
071        }
072        
073        public void init() throws ServletException {
074            try {
075                Messenger messenger = getMessenger();
076                Messenger outputMessenger = getOutputMessenger();
077                
078                if ( messenger == null ) {
079                    throw new ServletException( "No input Messenger is defined for this Bridge" );
080                }
081                if ( outputMessenger == null ) {
082                    throw new ServletException( "No output Messenger is defined for this Bridge" );
083                }
084                
085                // enable transacted mode 
086                boolean tran1 = messenger.getSessionFactory().isTransacted();
087                boolean tran2 = outputMessenger.getSessionFactory().isTransacted();
088                
089                if ( tran1 != tran2 ) {
090                    throw new ServletException( 
091                        "Both the input and output Messenger must have the same transacted mode. "
092                        + "Input is: " + tran1 + " output is: " + tran2 
093                    );
094                }
095                transacted = tran1;
096                
097                // use client acknowledgement
098                
099                // ### This should be specified in the Messenger.xml file
100                //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
101                //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE );     
102    
103                validateOutputDestination();
104                
105            }
106            catch (JMSException e) {
107                log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" );
108                log.error( "Caught: " + e, e);
109                throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e);
110            }
111        }
112        
113        // MessageListener interface
114        //-------------------------------------------------------------------------
115        public void onMessage(Message message) {
116            Messenger messenger = getMessenger();
117            
118            try {
119                Message outputMessage = createOutputMessage(message);
120                if ( outputMessage != null ) {
121                    Destination destination = getOutputDestination();
122                    
123                    if ( log.isDebugEnabled() ) {
124                        log.debug( "Sending message to: " + destination );
125                    }
126                    
127                    getOutputMessenger().send( destination, outputMessage );
128                }
129                acknowledge(message);
130                acknowledge(outputMessage);
131                commit();
132            }
133            catch (Exception e) {
134                log.error("Could not send message due to exception", e);
135                rollback();
136            }
137        }
138        
139        
140        // Properties
141        //-------------------------------------------------------------------------
142        
143        /** 
144         * @return true if this MDO should work in transacted mode
145         */
146        public boolean isTransacted() {
147            return transacted;
148        }
149    
150        /**
151         * Sets whether this MDO should work in transacted mode
152         */    
153        public void setTransacted(boolean transacted) {
154            this.transacted = transacted;
155        }
156        
157            
158        public String getOutputConnection() {
159            return outputConnection;
160        }
161    
162        /**
163         * Sets the connection name (messenger instance) to use
164         * to output messages
165         */    
166        public void setOutputConnection(String outputConnection) {
167            this.outputConnection = outputConnection;
168        }
169        
170        public String getOutputSubject() {
171            return outputSubject;
172        }
173        
174        /** 
175         * Sets the subject (i.e. destination name) to send messages to
176         */
177        public void setOutputSubject(String outputSubject) {
178            this.outputSubject = outputSubject;
179        }
180        
181        /**
182         * Gets the Messenger used to output messages 
183         */
184        public Messenger getOutputMessenger() throws JMSException {
185            if ( outputMessenger == null ) {
186                String name = getOutputConnection();
187                if ( name != null ) {
188                    outputMessenger = getMessengerManager().getMessenger( name );
189                }
190                else {
191                    // default to the input messenger
192                    outputMessenger = getMessenger();
193                }
194            }
195            return outputMessenger;
196        }
197        
198        /**
199         * Sets the Messenger used to output messages 
200         */
201        public void setOutputMessenger(Messenger outputMessenger) {
202            this.outputMessenger = outputMessenger;
203        }
204        
205        /**
206         * Gets the Destination output messages will be sent to
207         */
208        public Destination getOutputDestination() throws JMSException {
209            if ( outputDestination == null ) {
210                String subject = getOutputSubject();
211                if ( subject == null ) {
212                    throw new JMSException( "A bridge must have an outputSubject defined!" );
213                }
214                outputDestination = getOutputMessenger().getDestination( subject );
215            }
216            return outputDestination;
217        }
218        
219        /**
220         * Sets the Destination output messages will be sent to
221         */
222        public void setOutputDestination(Destination outputDestination) {
223            this.outputDestination = outputDestination;
224        }
225        
226        /**
227         * Gets the buffer size used for ByteMessage and StreamMessage copying
228         */
229        public int getBufferSize() {
230            return bufferSize;
231        }
232    
233        /**
234         * Sets the buffer size used for ByteMessage and StreamMessage copying
235         */
236        public void setBufferSize(int bufferSize) {
237            this.bufferSize = bufferSize;
238        }
239        
240        
241        // Implementation methods
242        //-------------------------------------------------------------------------
243    
244        /**
245         * Strategy method to perform a commit() on both the incoming Messenger and the
246         * output Messenger.
247         */
248        protected void commit() throws JMSException {
249            if ( transacted ) {
250                Messenger outputMessenger = getOutputMessenger();
251                Messenger inputMessenger = getMessenger();
252                
253                if ( outputMessenger != inputMessenger ) {
254                    outputMessenger.commit();
255                }
256                inputMessenger.commit();
257            }
258        }
259    
260        /**
261         * Strategy method to perform a rollback() on both the incoming Messenger and the
262         * output Messenger.
263         */
264        protected void rollback() {
265            if ( transacted ) {
266                try {
267                    Messenger outputMessenger = getOutputMessenger();
268                    Messenger inputMessenger = getMessenger();
269                    
270                    if ( outputMessenger != inputMessenger ) {
271                            outputMessenger.rollback();
272                    }
273                }
274                catch (Exception e) {
275                    log.error( "Caught exception rolling back the output messenger: " + e, e );
276                }
277                
278                try {
279                    getMessenger().rollback();
280                }
281                catch (Exception e) {
282                    log.error( "Caught exception rolling back the input messenger: " + e, e );
283                }
284            }
285        }
286    
287        
288        /**
289         * Factory method to create an output message given an input message.
290         * Derived classes could override this method to perform any kind of 
291         * Message transformation.
292         */
293        protected Message createOutputMessage(Message inputMessage) throws JMSException {
294            Message outputMessage = null;
295            
296            if ( inputMessage instanceof TextMessage ) {
297                outputMessage = createOutputTextMessage( (TextMessage) inputMessage );
298            }
299            else if ( inputMessage instanceof ObjectMessage ) {
300                outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage );
301            }
302            else if ( inputMessage instanceof MapMessage ) {
303                outputMessage = createOutputMapMessage( (MapMessage) inputMessage );
304            }
305            else if ( inputMessage instanceof BytesMessage ) {
306                outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage );
307            }
308            else if ( inputMessage instanceof StreamMessage ) {
309                outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage );
310            }
311            else {
312                outputMessage = getOutputMessenger().createMessage();
313            }
314            
315            processMessageHeaders(inputMessage, outputMessage);
316            
317            return outputMessage;
318        }
319            
320        /**
321         * Factory method to create ObjectMessage 
322         * Derived classes could override this method to perform any kind of 
323         * Message transformation.
324         */
325        protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException {
326            return getOutputMessenger().createObjectMessage( inputMessage.getObject() );
327        }
328        
329        /**
330         * Factory method to create TextMessage 
331         * Derived classes could override this method to perform any kind of 
332         * Message transformation.
333         */
334        protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException {
335            return getOutputMessenger().createTextMessage( inputMessage.getText() );
336        }
337        
338        /**
339         * Factory method to create MapMessage 
340         * Derived classes could override this method to perform any kind of 
341         * Message transformation.
342         */
343        protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException {
344            MapMessage answer = getOutputMessenger().createMapMessage();
345            
346            // copy across all values
347            for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) {
348                String name = (String) e.nextElement();
349                Object value = inputMessage.getObject( name );
350                answer.setObject( name, value );
351            }
352            return answer;
353        }
354        
355        /**
356         * Factory method to create BytesMessage 
357         * Derived classes could override this method to perform any kind of 
358         * Message transformation.
359         */
360        protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException {
361            BytesMessage answer = getOutputMessenger().createBytesMessage();
362            
363            // copy across all data
364            byte[] buffer = new byte[bufferSize];
365            while (true ) {
366                int size = inputMessage.readBytes( buffer );
367                if ( size <= 0 ) {
368                    break;
369                }
370                answer.writeBytes( buffer, 0, size );
371                if ( size < bufferSize ) {
372                    break;
373                }
374            }
375            return answer;
376        }
377        
378        /**
379         * Factory method to create StreamMessage 
380         * Derived classes could override this method to perform any kind of 
381         * Message transformation.
382         */
383        protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException {
384            StreamMessage answer = getOutputMessenger().createStreamMessage();
385            
386            // copy across all data
387            byte[] buffer = new byte[bufferSize];
388            while (true ) {
389                int size = inputMessage.readBytes( buffer );
390                if ( size <= 0 ) {
391                    break;
392                }
393                answer.writeBytes( buffer, 0, size );
394                if ( size < bufferSize ) {
395                    break;
396                }
397            }
398            return answer;
399        }
400        
401        
402        
403        /**
404         * Strategy method to add any headers required on the output message.
405         * Derived classes could override this method to perform any kind of 
406         * header processing, such as copying the correlation ID, copying all
407         * headers or adding some new custom headers etc.
408         */
409        protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException {
410        }
411         
412        /**
413         * Strategy method to allow different derived classes to acknowledge
414         * messages differently, such as to disable acknowledgements
415         */
416        protected void acknowledge(Message message) throws JMSException {
417            message.acknowledge();
418        }            
419        
420        /**
421         * Validates that there is a valid output destintation that we can use.
422         * Derivations of this class could use multiple destinations
423         */
424        protected void validateOutputDestination() throws JMSException, ServletException {
425            if ( getOutputDestination() == null ) {
426                throw new ServletException( "No output Destination is defined for this Bridge" );
427            }
428        }
429    }
430    
431