001 /* 002 * Copyright (C) The Apache Software Foundation. All rights reserved. 003 * 004 * This software is published under the terms of the Apache Software License 005 * version 1.1, a copy of which has been included with this distribution in 006 * the LICENSE file. 007 * 008 * $Id: ConsumerTask.java 155459 2005-02-26 13:24:44Z dirkv $ 009 */ 010 package org.apache.commons.messenger.task; 011 012 import java.io.File; 013 import java.io.FileWriter; 014 import java.io.IOException; 015 016 import javax.jms.Destination; 017 import javax.jms.JMSException; 018 import javax.jms.Message; 019 import javax.jms.TextMessage; 020 021 import org.apache.commons.messenger.Messenger; 022 import org.apache.commons.messenger.MessengerManager; 023 import org.apache.tools.ant.BuildException; 024 import org.apache.tools.ant.Project; 025 import org.apache.tools.ant.Task; 026 027 /** 028 * <p><code>ConsumerTask</code> is an Ant task which will 029 * publish all of the given text files as a JMS Text Message 030 * using a given JMS Connection (Messenger) and a Destination 031 * 032 * @author <a href="mailto:jstrachan@apache.org">James Strachan</a> 033 * @version $Revision: 155459 $ 034 */ 035 public class ConsumerTask extends Task { 036 037 private Messenger messenger; 038 private String messengerName; 039 private Destination destination; 040 private String subject; 041 private MessengerManager messengerManager; 042 043 /** the number of messages to receive */ 044 private int count; 045 046 /** the output directory */ 047 private File dir = new File("."); 048 049 // Properties 050 //------------------------------------------------------------------------- 051 052 053 /** 054 * Sets the output directory 055 */ 056 public void setDir(File dir) { 057 this.dir = dir; 058 } 059 060 public Messenger getMessenger() throws JMSException { 061 if ( messenger == null ) { 062 messenger = getMessengerManager().getMessenger( getMessengerName() ); 063 } 064 return messenger; 065 } 066 067 /** Sets the Messenger to be used */ 068 public void setMessenger(Messenger messenger) { 069 this.messenger = messenger; 070 } 071 072 /** Getter for property messengerName. 073 * @return Value of property messengerName. 074 */ 075 public String getMessengerName() { 076 return messengerName; 077 } 078 079 /** Setter for property messengerName. 080 * @param messengerName New value of property messengerName. 081 */ 082 public void setMessengerName(String messengerName) { 083 this.messengerName = messengerName; 084 } 085 086 /** Getter for property destination. 087 * @return Value of property destination. 088 */ 089 public Destination getDestination() throws JMSException { 090 if ( destination == null ) { 091 destination = getMessenger().getDestination( getSubject() ); 092 } 093 return destination; 094 } 095 096 /** Setter for property destination. 097 * @param destination New value of property destination. 098 */ 099 public void setDestination(Destination destination) { 100 this.destination = destination; 101 } 102 103 /** Getter for property subject. 104 * @return Value of property subject. 105 */ 106 public String getSubject() { 107 return subject; 108 } 109 110 /** Setter for property subject. 111 * @param subject New value of property subject. 112 */ 113 public void setSubject(String subject) { 114 this.subject = subject; 115 } 116 117 118 /** Getter for property messengerManager. 119 * @return Value of property messengerManager. 120 */ 121 public MessengerManager getMessengerManager() { 122 return messengerManager; 123 } 124 125 /** Setter for property messengerManager. 126 * @param messengerManager New value of property messengerManager. 127 */ 128 public void setMessengerManager(MessengerManager messengerManager) { 129 this.messengerManager = messengerManager; 130 } 131 132 /** 133 * Sets the URI of the Messenger.xml configuration document to use 134 * to configure the messengers to use for this task. 135 */ 136 public void setConfiguration(String uri) throws JMSException { 137 setMessengerManager( MessengerManager.load( uri ) ); 138 } 139 140 /** 141 * @return the number of messages to receive. 142 * A number less than or equal to 0 will receive messages forever 143 */ 144 public int getCount() { 145 return count; 146 } 147 148 /** 149 * Setter for the number of messages to receive. 150 * A number less than or equal to 0 will receive messages forever 151 */ 152 public void setCount(int count) { 153 this.count = count; 154 } 155 156 157 // Task interface 158 //------------------------------------------------------------------------- 159 160 /** 161 * Performs the copy operation. 162 */ 163 public void execute() throws BuildException { 164 try { 165 Messenger messenger = getMessenger(); 166 if ( messenger == null ) { 167 throw new BuildException("Must specify a valid Messenger", location ); 168 } 169 Destination destination = getDestination(); 170 if ( destination == null ) { 171 throw new BuildException("Must specify a valid JMS Destination", location ); 172 } 173 174 if ( count > 0 ) { 175 log( "Will wait until I receive: " + count + " messages and will write to directory: " + dir ); 176 177 for ( int i = 0; i < count; i++ ) { 178 Message message = messenger.receive( destination ); 179 processMessage( message ); 180 } 181 182 log( "Finished." ); 183 } 184 else { 185 log( "Infinite loop. Will write to directory: " + dir ); 186 187 while (true) { 188 Message message = messenger.receive( destination ); 189 processMessage( message ); 190 } 191 } 192 } 193 catch (IOException e) { 194 log( "Caught exception: " + e, Project.MSG_ERR ); 195 throw new BuildException(e, location); 196 } 197 catch (JMSException e) { 198 log( "Caught exception: " + e, Project.MSG_ERR ); 199 throw new BuildException(e, location); 200 } 201 finally { 202 try { 203 // close the JMS connection to release any background threads 204 messenger.close(); 205 } 206 catch (Exception e) { 207 // ignore close exceptions 208 } 209 } 210 } 211 212 /** 213 * Processes a given message 214 */ 215 protected void processMessage(Message message) throws IOException, JMSException { 216 log( "Received message to: " + message ); 217 218 String text = null; 219 if ( message instanceof TextMessage ) { 220 TextMessage textMessage = (TextMessage) message; 221 text = textMessage.toString(); 222 } 223 else { 224 // #### bit of a hack!!! 225 // ideally we need an XML format for message persistence 226 text = message.toString(); 227 } 228 processMessageText(text); 229 } 230 231 /** 232 * Writes the given text to a file 233 */ 234 protected void processMessageText(String text) throws IOException { 235 FileWriter writer = new FileWriter( dir ); 236 writer.write ( text ); 237 writer.close(); 238 } 239 } 240 241