001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: BridgeMDO.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messagelet; 011 012 import java.util.Enumeration; 013 014 import javax.jms.BytesMessage; 015 import javax.jms.Destination; 016 import javax.jms.JMSException; 017 import javax.jms.MapMessage; 018 import javax.jms.Message; 019 import javax.jms.ObjectMessage; 020 import javax.jms.StreamMessage; 021 import javax.jms.TextMessage; 022 import javax.servlet.ServletException; 023 024 import org.apache.commons.logging.Log; 025 import org.apache.commons.logging.LogFactory; 026 import org.apache.commons.messenger.Messenger; 027 028 029 /** <p><code>BridgeMDO</code> is an MDO which implements a JMS bridge 030 * from one JMS destination and connection to another. 031 * This allows messages to be consumed on one destination and sent to 032 * another JMS destination, using possibly different JMS providers. 033 * For example this can be used to bridge from SpiritWave to MQSeries. 034 * </p> 035 * <p> 036 * This class is a useful base class to other possible bridge implementations 037 * such as 2 phase commit bridges or bridges with some complex transformation etc. 038 * This class has a number of Factory and Strategy methods to further customize 039 * the acknowledgement and transaction management, the message construction, 040 * transformation and how to handle message headers etc. 041 * </p> 042 * 043 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 044 * @version $Revision: 155459 $ 045 */ 046 public class BridgeMDO extends MessengerMDO { 047 048 /** Logger */ 049 private static final Log log = LogFactory.getLog(BridgeMDO.class); 050 051 /** the Messenger used to output messages */ 052 private Messenger outputMessenger; 053 054 /** the Destination output messages will be sent to */ 055 private Destination outputDestination; 056 057 /** the name of the messenger to use for output */ 058 private String outputConnection; 059 060 /** the name of the destination to use */ 061 private String outputSubject; 062 063 /** the buffer size used for ByteMessage and StreamMessage copying */ 064 private int bufferSize = 32 * 1024; 065 066 /** should this MDO work in transacted mode */ 067 private boolean transacted = false; 068 069 070 public BridgeMDO() { 071 } 072 073 public void init() throws ServletException { 074 try { 075 Messenger messenger = getMessenger(); 076 Messenger outputMessenger = getOutputMessenger(); 077 078 if ( messenger == null ) { 079 throw new ServletException( "No input Messenger is defined for this Bridge" ); 080 } 081 if ( outputMessenger == null ) { 082 throw new ServletException( "No output Messenger is defined for this Bridge" ); 083 } 084 085 // enable transacted mode 086 boolean tran1 = messenger.getSessionFactory().isTransacted(); 087 boolean tran2 = outputMessenger.getSessionFactory().isTransacted(); 088 089 if ( tran1 != tran2 ) { 090 throw new ServletException( 091 "Both the input and output Messenger must have the same transacted mode. " 092 + "Input is: " + tran1 + " output is: " + tran2 093 ); 094 } 095 transacted = tran1; 096 097 // use client acknowledgement 098 099 // ### This should be specified in the Messenger.xml file 100 //messenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE ); 101 //outputMessenger.getSessionFactory().setAcknowledgeMode( Session.CLIENT_ACKNOWLEDGE ); 102 103 validateOutputDestination(); 104 105 } 106 catch (JMSException e) { 107 log.error( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" ); 108 log.error( "Caught: " + e, e); 109 throw new ServletException( "Caught exception trying to configure the transacted, client acknowledge modes of the JMS connections" + e, e); 110 } 111 } 112 113 // MessageListener interface 114 //------------------------------------------------------------------------- 115 public void onMessage(Message message) { 116 Messenger messenger = getMessenger(); 117 118 try { 119 Message outputMessage = createOutputMessage(message); 120 if ( outputMessage != null ) { 121 Destination destination = getOutputDestination(); 122 123 if ( log.isDebugEnabled() ) { 124 log.debug( "Sending message to: " + destination ); 125 } 126 127 getOutputMessenger().send( destination, outputMessage ); 128 } 129 acknowledge(message); 130 acknowledge(outputMessage); 131 commit(); 132 } 133 catch (Exception e) { 134 log.error("Could not send message due to exception", e); 135 rollback(); 136 } 137 } 138 139 140 // Properties 141 //------------------------------------------------------------------------- 142 143 /** 144 * @return true if this MDO should work in transacted mode 145 */ 146 public boolean isTransacted() { 147 return transacted; 148 } 149 150 /** 151 * Sets whether this MDO should work in transacted mode 152 */ 153 public void setTransacted(boolean transacted) { 154 this.transacted = transacted; 155 } 156 157 158 public String getOutputConnection() { 159 return outputConnection; 160 } 161 162 /** 163 * Sets the connection name (messenger instance) to use 164 * to output messages 165 */ 166 public void setOutputConnection(String outputConnection) { 167 this.outputConnection = outputConnection; 168 } 169 170 public String getOutputSubject() { 171 return outputSubject; 172 } 173 174 /** 175 * Sets the subject (i.e. destination name) to send messages to 176 */ 177 public void setOutputSubject(String outputSubject) { 178 this.outputSubject = outputSubject; 179 } 180 181 /** 182 * Gets the Messenger used to output messages 183 */ 184 public Messenger getOutputMessenger() throws JMSException { 185 if ( outputMessenger == null ) { 186 String name = getOutputConnection(); 187 if ( name != null ) { 188 outputMessenger = getMessengerManager().getMessenger( name ); 189 } 190 else { 191 // default to the input messenger 192 outputMessenger = getMessenger(); 193 } 194 } 195 return outputMessenger; 196 } 197 198 /** 199 * Sets the Messenger used to output messages 200 */ 201 public void setOutputMessenger(Messenger outputMessenger) { 202 this.outputMessenger = outputMessenger; 203 } 204 205 /** 206 * Gets the Destination output messages will be sent to 207 */ 208 public Destination getOutputDestination() throws JMSException { 209 if ( outputDestination == null ) { 210 String subject = getOutputSubject(); 211 if ( subject == null ) { 212 throw new JMSException( "A bridge must have an outputSubject defined!" ); 213 } 214 outputDestination = getOutputMessenger().getDestination( subject ); 215 } 216 return outputDestination; 217 } 218 219 /** 220 * Sets the Destination output messages will be sent to 221 */ 222 public void setOutputDestination(Destination outputDestination) { 223 this.outputDestination = outputDestination; 224 } 225 226 /** 227 * Gets the buffer size used for ByteMessage and StreamMessage copying 228 */ 229 public int getBufferSize() { 230 return bufferSize; 231 } 232 233 /** 234 * Sets the buffer size used for ByteMessage and StreamMessage copying 235 */ 236 public void setBufferSize(int bufferSize) { 237 this.bufferSize = bufferSize; 238 } 239 240 241 // Implementation methods 242 //------------------------------------------------------------------------- 243 244 /** 245 * Strategy method to perform a commit() on both the incoming Messenger and the 246 * output Messenger. 247 */ 248 protected void commit() throws JMSException { 249 if ( transacted ) { 250 Messenger outputMessenger = getOutputMessenger(); 251 Messenger inputMessenger = getMessenger(); 252 253 if ( outputMessenger != inputMessenger ) { 254 outputMessenger.commit(); 255 } 256 inputMessenger.commit(); 257 } 258 } 259 260 /** 261 * Strategy method to perform a rollback() on both the incoming Messenger and the 262 * output Messenger. 263 */ 264 protected void rollback() { 265 if ( transacted ) { 266 try { 267 Messenger outputMessenger = getOutputMessenger(); 268 Messenger inputMessenger = getMessenger(); 269 270 if ( outputMessenger != inputMessenger ) { 271 outputMessenger.rollback(); 272 } 273 } 274 catch (Exception e) { 275 log.error( "Caught exception rolling back the output messenger: " + e, e ); 276 } 277 278 try { 279 getMessenger().rollback(); 280 } 281 catch (Exception e) { 282 log.error( "Caught exception rolling back the input messenger: " + e, e ); 283 } 284 } 285 } 286 287 288 /** 289 * Factory method to create an output message given an input message. 290 * Derived classes could override this method to perform any kind of 291 * Message transformation. 292 */ 293 protected Message createOutputMessage(Message inputMessage) throws JMSException { 294 Message outputMessage = null; 295 296 if ( inputMessage instanceof TextMessage ) { 297 outputMessage = createOutputTextMessage( (TextMessage) inputMessage ); 298 } 299 else if ( inputMessage instanceof ObjectMessage ) { 300 outputMessage = createOutputObjectMessage( (ObjectMessage) inputMessage ); 301 } 302 else if ( inputMessage instanceof MapMessage ) { 303 outputMessage = createOutputMapMessage( (MapMessage) inputMessage ); 304 } 305 else if ( inputMessage instanceof BytesMessage ) { 306 outputMessage = createOutputBytesMessage( (BytesMessage) inputMessage ); 307 } 308 else if ( inputMessage instanceof StreamMessage ) { 309 outputMessage = createOutputStreamMessage( (StreamMessage) inputMessage ); 310 } 311 else { 312 outputMessage = getOutputMessenger().createMessage(); 313 } 314 315 processMessageHeaders(inputMessage, outputMessage); 316 317 return outputMessage; 318 } 319 320 /** 321 * Factory method to create ObjectMessage 322 * Derived classes could override this method to perform any kind of 323 * Message transformation. 324 */ 325 protected ObjectMessage createOutputObjectMessage(ObjectMessage inputMessage) throws JMSException { 326 return getOutputMessenger().createObjectMessage( inputMessage.getObject() ); 327 } 328 329 /** 330 * Factory method to create TextMessage 331 * Derived classes could override this method to perform any kind of 332 * Message transformation. 333 */ 334 protected TextMessage createOutputTextMessage(TextMessage inputMessage) throws JMSException { 335 return getOutputMessenger().createTextMessage( inputMessage.getText() ); 336 } 337 338 /** 339 * Factory method to create MapMessage 340 * Derived classes could override this method to perform any kind of 341 * Message transformation. 342 */ 343 protected MapMessage createOutputMapMessage(MapMessage inputMessage) throws JMSException { 344 MapMessage answer = getOutputMessenger().createMapMessage(); 345 346 // copy across all values 347 for ( Enumeration e = inputMessage.getMapNames(); e.hasMoreElements(); ) { 348 String name = (String) e.nextElement(); 349 Object value = inputMessage.getObject( name ); 350 answer.setObject( name, value ); 351 } 352 return answer; 353 } 354 355 /** 356 * Factory method to create BytesMessage 357 * Derived classes could override this method to perform any kind of 358 * Message transformation. 359 */ 360 protected BytesMessage createOutputBytesMessage(BytesMessage inputMessage) throws JMSException { 361 BytesMessage answer = getOutputMessenger().createBytesMessage(); 362 363 // copy across all data 364 byte[] buffer = new byte[bufferSize]; 365 while (true ) { 366 int size = inputMessage.readBytes( buffer ); 367 if ( size <= 0 ) { 368 break; 369 } 370 answer.writeBytes( buffer, 0, size ); 371 if ( size < bufferSize ) { 372 break; 373 } 374 } 375 return answer; 376 } 377 378 /** 379 * Factory method to create StreamMessage 380 * Derived classes could override this method to perform any kind of 381 * Message transformation. 382 */ 383 protected StreamMessage createOutputStreamMessage(StreamMessage inputMessage) throws JMSException { 384 StreamMessage answer = getOutputMessenger().createStreamMessage(); 385 386 // copy across all data 387 byte[] buffer = new byte[bufferSize]; 388 while (true ) { 389 int size = inputMessage.readBytes( buffer ); 390 if ( size <= 0 ) { 391 break; 392 } 393 answer.writeBytes( buffer, 0, size ); 394 if ( size < bufferSize ) { 395 break; 396 } 397 } 398 return answer; 399 } 400 401 402 403 /** 404 * Strategy method to add any headers required on the output message. 405 * Derived classes could override this method to perform any kind of 406 * header processing, such as copying the correlation ID, copying all 407 * headers or adding some new custom headers etc. 408 */ 409 protected void processMessageHeaders(Message inputMessage, Message outputMessage) throws JMSException { 410 } 411 412 /** 413 * Strategy method to allow different derived classes to acknowledge 414 * messages differently, such as to disable acknowledgements 415 */ 416 protected void acknowledge(Message message) throws JMSException { 417 message.acknowledge(); 418 } 419 420 /** 421 * Validates that there is a valid output destintation that we can use. 422 * Derivations of this class could use multiple destinations 423 */ 424 protected void validateOutputDestination() throws JMSException, ServletException { 425 if ( getOutputDestination() == null ) { 426 throw new ServletException( "No output Destination is defined for this Bridge" ); 427 } 428 } 429 } 430 431