1   /*
2    * Copyright (C) The Apache Software Foundation. All rights reserved.
3    *
4    * This software is published under the terms of the Apache Software License
5    * version 1.1, a copy of which has been included with this distribution in
6    * the LICENSE file.
7    * 
8    * $Id: TestMessenger.java 155459 2005-02-26 13:24:44Z dirkv $
9    */
10  package org.apache.commons.messenger;
11  
12  import java.util.ArrayList;
13  import java.util.List;
14  
15  import javax.jms.Destination;
16  import javax.jms.Message;
17  import javax.jms.MessageListener;
18  import javax.jms.TextMessage;
19  
20  import junit.framework.Test;
21  import junit.framework.TestCase;
22  import junit.framework.TestSuite;
23  import junit.textui.TestRunner;
24  
25  /** Test harness for Messenger
26    *
27    * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
28    * @version $Revision: 155459 $
29    */
30  public class TestMessenger extends TestCase {
31  
32      boolean useAsyncStuff = true;
33  
34      protected static boolean verbose = true;
35      protected List failures = new ArrayList();
36  
37      protected String topicName = getClass().getName() + ".Topic";
38      protected String queueName = getClass().getName() + ".Queue";
39  
40      protected String topicMessageText = "This is the text of a topic message";
41      protected String queueMessageText = "This is the text of a queue message";
42  
43      protected long waitTime = 2 * 1000;
44  
45      protected volatile boolean receivedQueueMessage;
46      protected volatile boolean receivedTopicMessage;
47  
48      public static Test suite() {
49          return new TestSuite(TestMessenger.class);
50      }
51  
52      public static void main(String[] args) {
53          if (args.length > 0) {
54              if (args[0].startsWith("-v")) {
55                  verbose = true;
56              }
57          }
58          TestRunner.run(suite());
59      }
60  
61      public TestMessenger(String testName) {
62          super(testName);
63      }
64  
65      public void testSendTopic() throws Exception {
66          log("############### testing send");
67  
68          Messenger messenger = MessengerManager.get("topic");
69          Destination topic = messenger.getDestination(topicName);
70  
71          flushDestination(messenger, topic);
72  
73          Thread thread = new Thread() {
74              public void run() {
75                  try {
76                      receiveTopicMessage();
77                  }
78                  catch (Exception e) {
79                      failures.add(e);
80                  }
81              }
82          };
83          thread.start();
84  
85          log("sleeping to let the receive thread start");
86  
87          Thread.sleep(waitTime);
88  
89          log("creating the message");
90  
91          TextMessage message = messenger.createTextMessage(topicMessageText);
92  
93          log("sending topic message");
94  
95          messenger.send(topic, message);
96  
97          log("sleeping");
98  
99          Thread.sleep(waitTime);
100 
101         assertTrue("Have received the topic message", receivedTopicMessage);
102     }
103 
104     public void testSendQueue() throws Exception {
105         log("############### testing send queue");
106 
107         Messenger messenger = MessengerManager.get("queue");
108         Destination queue = messenger.getDestination(queueName);
109 
110         flushDestination(messenger, queue);
111 
112         Thread thread = new Thread() {
113             public void run() {
114                 try {
115                     receiveQueueMessage();
116                 }
117                 catch (Exception e) {
118                     failures.add(e);
119                 }
120             }
121         };
122         thread.start();
123 
124         log("sleeping to let the receive thread start");
125 
126         Thread.sleep(waitTime);
127 
128         log("creating the message");
129 
130         TextMessage message = messenger.createTextMessage(queueMessageText);
131 
132         log("sending queue message");
133 
134         messenger.send(queue, message);
135 
136         log("sleeping");
137 
138         Thread.sleep(waitTime);
139 
140         assertTrue("Have received the queue message", receivedQueueMessage);
141     }
142 
143     public void testRpc() throws Exception {
144         log("############### testing RPC");
145 
146         final Messenger messenger = MessengerManager.get("queue");
147         final Destination destination = messenger.getDestination(queueName);
148 
149         flushDestination(messenger, destination);
150 
151         if (useAsyncStuff) {
152             log("Adding listneer to queue: " + destination);
153             messenger.addListener(destination, new MessageListener() {
154                 public void onMessage(Message message) {
155                     sendReply(messenger, message);
156                 }
157 
158             });
159         }
160         else {
161             Thread thread = new Thread() {
162                 public void run() {
163                     try {
164                         log("blocking receive on queue: " + destination);
165                         Message message = messenger.receive(destination);
166                         sendReply(messenger, message);
167                     }
168                     catch (Exception e) {
169                         failures.add(e);
170                     }
171                 }
172             };
173             thread.start();
174 
175         }
176 
177         log("sleeping to let the receive thread start");
178 
179         Thread.sleep(waitTime);
180 
181         log("creating the message");
182 
183         TextMessage message = messenger.createTextMessage(queueMessageText);
184 
185         log("sending queue message");
186 
187         Message answer = messenger.call(destination, message);
188 
189 //        Message answer = null;
190 //        messenger.send(destination, message);
191         
192         
193         assertTrue("Have received the reply message", answer != null);
194     }
195 
196     protected void sendReply(Messenger messenger, Message message) {
197         log("Received request: " + message);
198 
199         TextMessage textMessage = (TextMessage) message;
200 
201         try {
202             TextMessage reply = messenger.createTextMessage(textMessage.getText());
203             messenger.send(message.getJMSReplyTo(), reply);
204         }
205         catch (Exception e) {
206             failures.add(e);
207         }
208     }
209 
210     protected void setUp() throws Exception {
211     }
212 
213     protected void flushDestination(Messenger messenger, Destination destination) throws Exception {
214         log("Clearing messenger destination: " + destination);
215 
216         // lets remove any existing messages
217         while (true) {
218             Message m = messenger.receiveNoWait(destination);
219             if (m != null) {
220                 log("Ignoring message: " + m);
221             }
222             else {
223                 break;
224             }
225         }
226 
227         log("Cleared messenger destination: " + destination);
228     }
229 
230     protected void receiveTopicMessage() throws Exception {
231         Messenger messenger = MessengerManager.get("topic");
232         Destination topic = messenger.getDestination(topicName);
233 
234         log("Calling receive() on topic");
235 
236         TextMessage message = (TextMessage) messenger.receive(topic);
237         assertEquals("Topic message text", topicMessageText, message.getText());
238 
239         log("Found topic message: " + message.getText());
240 
241         receivedTopicMessage = true;
242     }
243 
244     protected void receiveQueueMessage() throws Exception {
245         Messenger messenger = MessengerManager.get("queue");
246         Destination queue = messenger.getDestination(queueName);
247 
248         log("Calling receive() on queue");
249 
250         TextMessage message = (TextMessage) messenger.receive(queue);
251         assertEquals("Queue message text", queueMessageText, message.getText());
252 
253         log("Found queue message: " + message.getText());
254 
255         receivedQueueMessage = true;
256     }
257 
258     protected void log(String text) {
259         if (verbose) {
260             System.out.println(text);
261         }
262     }
263 }