1
2
3
4
5
6
7
8
9
10 package org.apache.commons.messenger;
11
12 import java.util.ArrayList;
13 import java.util.List;
14
15 import javax.jms.Destination;
16 import javax.jms.Message;
17 import javax.jms.MessageListener;
18 import javax.jms.TextMessage;
19
20 import junit.framework.Test;
21 import junit.framework.TestCase;
22 import junit.framework.TestSuite;
23 import junit.textui.TestRunner;
24
25
26
27
28
29
30 public class TestMessenger extends TestCase {
31
32 boolean useAsyncStuff = true;
33
34 protected static boolean verbose = true;
35 protected List failures = new ArrayList();
36
37 protected String topicName = getClass().getName() + ".Topic";
38 protected String queueName = getClass().getName() + ".Queue";
39
40 protected String topicMessageText = "This is the text of a topic message";
41 protected String queueMessageText = "This is the text of a queue message";
42
43 protected long waitTime = 2 * 1000;
44
45 protected volatile boolean receivedQueueMessage;
46 protected volatile boolean receivedTopicMessage;
47
48 public static Test suite() {
49 return new TestSuite(TestMessenger.class);
50 }
51
52 public static void main(String[] args) {
53 if (args.length > 0) {
54 if (args[0].startsWith("-v")) {
55 verbose = true;
56 }
57 }
58 TestRunner.run(suite());
59 }
60
61 public TestMessenger(String testName) {
62 super(testName);
63 }
64
65 public void testSendTopic() throws Exception {
66 log("############### testing send");
67
68 Messenger messenger = MessengerManager.get("topic");
69 Destination topic = messenger.getDestination(topicName);
70
71 flushDestination(messenger, topic);
72
73 Thread thread = new Thread() {
74 public void run() {
75 try {
76 receiveTopicMessage();
77 }
78 catch (Exception e) {
79 failures.add(e);
80 }
81 }
82 };
83 thread.start();
84
85 log("sleeping to let the receive thread start");
86
87 Thread.sleep(waitTime);
88
89 log("creating the message");
90
91 TextMessage message = messenger.createTextMessage(topicMessageText);
92
93 log("sending topic message");
94
95 messenger.send(topic, message);
96
97 log("sleeping");
98
99 Thread.sleep(waitTime);
100
101 assertTrue("Have received the topic message", receivedTopicMessage);
102 }
103
104 public void testSendQueue() throws Exception {
105 log("############### testing send queue");
106
107 Messenger messenger = MessengerManager.get("queue");
108 Destination queue = messenger.getDestination(queueName);
109
110 flushDestination(messenger, queue);
111
112 Thread thread = new Thread() {
113 public void run() {
114 try {
115 receiveQueueMessage();
116 }
117 catch (Exception e) {
118 failures.add(e);
119 }
120 }
121 };
122 thread.start();
123
124 log("sleeping to let the receive thread start");
125
126 Thread.sleep(waitTime);
127
128 log("creating the message");
129
130 TextMessage message = messenger.createTextMessage(queueMessageText);
131
132 log("sending queue message");
133
134 messenger.send(queue, message);
135
136 log("sleeping");
137
138 Thread.sleep(waitTime);
139
140 assertTrue("Have received the queue message", receivedQueueMessage);
141 }
142
143 public void testRpc() throws Exception {
144 log("############### testing RPC");
145
146 final Messenger messenger = MessengerManager.get("queue");
147 final Destination destination = messenger.getDestination(queueName);
148
149 flushDestination(messenger, destination);
150
151 if (useAsyncStuff) {
152 log("Adding listneer to queue: " + destination);
153 messenger.addListener(destination, new MessageListener() {
154 public void onMessage(Message message) {
155 sendReply(messenger, message);
156 }
157
158 });
159 }
160 else {
161 Thread thread = new Thread() {
162 public void run() {
163 try {
164 log("blocking receive on queue: " + destination);
165 Message message = messenger.receive(destination);
166 sendReply(messenger, message);
167 }
168 catch (Exception e) {
169 failures.add(e);
170 }
171 }
172 };
173 thread.start();
174
175 }
176
177 log("sleeping to let the receive thread start");
178
179 Thread.sleep(waitTime);
180
181 log("creating the message");
182
183 TextMessage message = messenger.createTextMessage(queueMessageText);
184
185 log("sending queue message");
186
187 Message answer = messenger.call(destination, message);
188
189
190
191
192
193 assertTrue("Have received the reply message", answer != null);
194 }
195
196 protected void sendReply(Messenger messenger, Message message) {
197 log("Received request: " + message);
198
199 TextMessage textMessage = (TextMessage) message;
200
201 try {
202 TextMessage reply = messenger.createTextMessage(textMessage.getText());
203 messenger.send(message.getJMSReplyTo(), reply);
204 }
205 catch (Exception e) {
206 failures.add(e);
207 }
208 }
209
210 protected void setUp() throws Exception {
211 }
212
213 protected void flushDestination(Messenger messenger, Destination destination) throws Exception {
214 log("Clearing messenger destination: " + destination);
215
216
217 while (true) {
218 Message m = messenger.receiveNoWait(destination);
219 if (m != null) {
220 log("Ignoring message: " + m);
221 }
222 else {
223 break;
224 }
225 }
226
227 log("Cleared messenger destination: " + destination);
228 }
229
230 protected void receiveTopicMessage() throws Exception {
231 Messenger messenger = MessengerManager.get("topic");
232 Destination topic = messenger.getDestination(topicName);
233
234 log("Calling receive() on topic");
235
236 TextMessage message = (TextMessage) messenger.receive(topic);
237 assertEquals("Topic message text", topicMessageText, message.getText());
238
239 log("Found topic message: " + message.getText());
240
241 receivedTopicMessage = true;
242 }
243
244 protected void receiveQueueMessage() throws Exception {
245 Messenger messenger = MessengerManager.get("queue");
246 Destination queue = messenger.getDestination(queueName);
247
248 log("Calling receive() on queue");
249
250 TextMessage message = (TextMessage) messenger.receive(queue);
251 assertEquals("Queue message text", queueMessageText, message.getText());
252
253 log("Found queue message: " + message.getText());
254
255 receivedQueueMessage = true;
256 }
257
258 protected void log(String text) {
259 if (verbose) {
260 System.out.println(text);
261 }
262 }
263 }