001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: XAConsumerThread.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messagelet; 011 012 import javax.jms.MessageListener; 013 import javax.transaction.SystemException; 014 import javax.transaction.Transaction; 015 import javax.transaction.TransactionManager; 016 import javax.transaction.xa.XAResource; 017 018 import org.apache.commons.logging.Log; 019 import org.apache.commons.logging.LogFactory; 020 import org.apache.commons.messenger.Messenger; 021 import org.apache.commons.messenger.XACapable; 022 import org.apache.commons.messenger.XACapableAdapter; 023 024 /** 025 * <p><code>XAConsumerThread</code> is a thread which will perform XA processing 026 * of messages 027 * 028 * @author damon.hamacha 029 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 030 * @version $Revision: 155459 $ 031 */ 032 public class XAConsumerThread extends ConsumerThread { 033 034 /** Logger */ 035 private static final Log log = LogFactory.getLog(XAConsumerThread.class); 036 037 private TransactionManager transctionManager; 038 private Transaction transaction; 039 040 public XAConsumerThread() { 041 setName("XAConsumer" + getName()); 042 } 043 044 /** 045 * @return the TransactionManager to be used 046 * @throws SystemException 047 */ 048 public TransactionManager getTransactionManager() throws SystemException { 049 if (transctionManager == null) { 050 transctionManager = createTransactionManager(); 051 } 052 return transctionManager; 053 } 054 055 /** 056 * Sets the transaction manager to be used 057 * 058 * @param transctionManager the transaction manager to be used 059 */ 060 public void setTransactionManager(TransactionManager transctionManager) { 061 this.transctionManager = transctionManager; 062 } 063 064 // Implementation methods 065 //------------------------------------------------------------------------- 066 067 /** 068 * Factory method to create a TransactionManager via some mechanism. 069 * By default this mechanism will lookup in JNDI 070 */ 071 protected TransactionManager createTransactionManager() throws SystemException { 072 return null; 073 } 074 075 076 /** 077 * Enlists any resources with the current transaction. 078 * Typically the input Messenger's Session will always be 079 * enlisted. Then if the current MessageListener implements XACapable 080 * then any of its resources will also be enlisted. 081 * 082 * @param transaction the transaction to enlist resources with 083 * @throws Exception if the enlistment fails for whatever reason 084 */ 085 protected void enlist(Transaction transaction) throws Exception { 086 XACapable xaCapable = getXACapable( getMessenger() ); 087 xaCapable.enlistResources(transaction); 088 089 MessageListener listener = getListener(); 090 if (listener instanceof XACapable) { 091 xaCapable = (XACapable) listener; 092 xaCapable.enlistResources(transaction); 093 } 094 095 if (listener instanceof BridgeMDO) { 096 BridgeMDO bridge = (BridgeMDO) listener; 097 xaCapable = getXACapable( bridge.getOutputMessenger() ); 098 xaCapable.enlistResources(transaction); 099 } 100 } 101 102 103 104 /** 105 * Delists any resources from the current transaction. 106 * This includes the current input Messenger's Session as well 107 * as any resources used by the MessageListener if it implements 108 * XACapable 109 * 110 * @param transaction 111 * @param flag is the flag used by JTA when delisting resources. 112 * It is either XAResource.TMSUCCESS, XAResource.TMSUSPEND, or XAResource.TMFAIL 113 * @throws Exception 114 */ 115 protected void delist(Transaction transaction, int flag) throws Exception { 116 XACapable xaCapable = getXACapable( getMessenger() ); 117 xaCapable.delistResources(transaction, flag); 118 119 MessageListener listener = getListener(); 120 if (listener instanceof XACapable) { 121 xaCapable = (XACapable) listener; 122 xaCapable.delistResources(transaction, flag); 123 } 124 125 if (listener instanceof BridgeMDO) { 126 BridgeMDO bridge = (BridgeMDO) listener; 127 xaCapable = getXACapable( bridge.getOutputMessenger() ); 128 xaCapable.delistResources(transaction, flag); 129 } 130 } 131 132 /** 133 * Strategy method to represent the code required to start 134 * a transaction. 135 */ 136 protected void startTransaction() throws Exception { 137 getTransactionManager().begin(); 138 transaction = getTransactionManager().getTransaction(); 139 140 enlist(transaction); 141 } 142 143 /** 144 * Strategy method to represent the code required to commit 145 * a transaction. 146 */ 147 protected void commitTransaction() throws Exception { 148 delist(transaction, XAResource.TMSUCCESS); 149 try { 150 transaction.commit(); 151 } 152 catch (Exception e) { 153 log.error("Caught exception while committing txn: " + e, e); 154 transaction.setRollbackOnly(); 155 throw e; 156 } 157 } 158 159 /** 160 * Strategy method to represent the code required to rollback 161 * a transaction. 162 */ 163 protected void rollbackTransaction() throws Exception { 164 delist(transaction, XAResource.TMFAIL); 165 transaction.rollback(); 166 } 167 168 /** 169 * Strategy method to represent the code required to cancel 170 * a transaction. 171 * This is called when a message is not received. 172 */ 173 protected void cancelTransaction() throws Exception { 174 delist(transaction, XAResource.TMFAIL); 175 transaction.rollback(); 176 } 177 178 /** 179 * @return an XACapable for the given Messenger 180 */ 181 protected XACapable getXACapable(Messenger messenger) { 182 if (messenger instanceof XACapable) { 183 return (XACapable) messenger; 184 } 185 return new XACapableAdapter(messenger); 186 } 187 }