001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.stage;
019
020 import java.util.regex.Pattern;
021
022 import junit.framework.Test;
023 import junit.framework.TestCase;
024 import junit.framework.TestSuite;
025
026 import org.apache.commons.pipeline.driver.SynchronousStageDriverFactory;
027 import org.apache.commons.pipeline.event.ObjectProcessedEvent;
028 import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter;
029 import org.apache.commons.pipeline.testFramework.TestStage;
030 import org.apache.commons.pipeline.testFramework.TestFeeder;
031 import org.apache.commons.pipeline.StageException;
032
033 /**
034 * Test cases for the extended base stage. Since it is abstract these tests have
035 * to use a subclass. In order to prevent dependency problems this subclass is defined
036 * as a private class.
037 */
038 public class ExtendedBaseStageTest extends AbstractStageTest {
039 TestStage stage = null;
040
041 public ExtendedBaseStageTest(String testName) {
042 super(testName);
043 }
044
045 protected void setUp() throws Exception {
046 super.setUp();
047 stage = new TestStage();
048 this.init(stage);
049 }
050
051 public void testPreprocess() throws Exception {
052 stage.preprocess();
053 assertTrue("Preprocess call failed to call inner method", stage.preprocessed);
054 assertEquals("Incorrectly reporting objects recieved", 0,stage.getObjectsReceived());
055 assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
056 }
057
058 public void testPostprocess() throws Exception {
059 stage.preprocess();
060 stage.postprocess();
061 assertTrue("Postprocess call failed to call inner method", stage.postprocessed);
062 assertEquals("Incorrectly reporting objects recieved", 0,stage.getObjectsReceived());
063 assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
064 }
065
066 /**
067 * Verify the emit statistics are correctly reported.
068 */
069 public void testEmits() throws Exception {
070 // Redefine the process method.
071 stage = defineStage(new TestStage() {
072 @Override
073 public void innerProcess(Object obj) {
074 super.emit(obj);
075 }
076 });
077 Object value = new Object();
078 stage.process(value);
079 // Verify output.
080 assertEquals("Stage did not emit value as expected",
081 value, testFeeder.receivedValues.get(0));
082 // Verify stats correctly recorded.
083 assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
084 assertEquals("Incorrectly reporting objects emitted", 1,stage.getTotalEmits());
085 // Make sure the stats message is correct.
086 String message = stage.getStatusMessage();
087 assertMatches("Total objects received:1", message);
088 assertMatches("Total objects emitted:1 \\(100\\.000%\\)", message);
089 }
090
091 /**
092 * Verify the statistics are correctly reported when nothing is emitted
093 */
094 public void testNonEmit() throws Exception {
095 stage.preprocess();
096 Object value = new Object();
097 stage.process(value);
098 // Verify output.
099 assertTrue("Stage emitted something unexpectedly", testFeeder.receivedValues.isEmpty());
100 // Verify stats correctly recorded.
101 assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
102 assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
103 // Make sure the stats message is correct.
104 String message = stage.getStatusMessage();
105 assertMatches("Total objects received:1", message);
106 // No % display on 0 emits.
107 assertMatches("Total objects emitted:0", message);
108 }
109
110 public void testProcessingTimeStats() throws Exception {
111 // Redefine the process method.
112 stage = defineStage(new TestStage() {
113 @Override
114 public void innerProcess(Object obj) {
115 try {
116 Thread.sleep(33);
117 } catch (InterruptedException e) {
118 // Ignored.
119 }
120 super.emit(obj);
121 }
122 });
123 Object value = new Object();
124 stage.process(value);
125 // Verify stats correctly recorded.
126 assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
127 assertEquals("Incorrectly reporting objects emitted", 1,stage.getTotalEmits());
128 // Make sure the stats message is correct.
129 String message = stage.getStatusMessage();
130 assertMatches("% time working:100\\.000", message);
131 assertMatches("% time blocking:0\\.000", message);
132 // Should be between 0.032 and 0.034 seconds. Allow up to 0.099 for busy systems
133 assertMatches("Total net processing time \\(sec\\):0\\.0[0-9]{2}",
134 message);
135 assertMatches("Total gross processing time \\(sec\\):0\\.0[0-9]{2}",
136 message);
137 assertMatches("Average net processing time \\(sec/obj\\):0\\.0[0-9]{2}",
138 message);
139 assertMatches("Average gross processing time \\(sec/obj\\):0\\.0[0-9]{2}",
140 message);
141 // Moving window display
142 assertMatches("Average gross processing time in last 1 \\(sec/obj\\):0\\.0[0-9]{2}",
143 message);
144 }
145
146 public void testBranchStatistics() throws Exception {
147 // In order to log the stats we need some blocking. Redefine one of the two branches
148 // used below to block.
149 testContext.branchFeeders.put("A",new TestFeeder() {
150 @Override
151 public void feed(Object obj) {
152 try {
153 Thread.sleep(20);
154 } catch (InterruptedException e) {
155 // Ignored.
156 }
157 super.feed(obj);
158 }
159 });
160 // Redefine the process method.
161 stage = defineStage(new TestStage() {
162 @Override
163 public void innerProcess(Object obj) {
164 // Hald to one queue, half to another.
165 if (getObjectsReceived()%2 == 0) {
166 super.emit("A", obj);
167 } else {
168 super.emit("B", obj);
169 }
170 }
171 });
172 stage.setCollectBranchStats(true);
173 stage.process("Object 1");
174 stage.process("Object 2");
175 // Verify overall stats correctly recorded.
176 assertEquals("Incorrectly reporting objects recieved", 2,stage.getObjectsReceived());
177 assertEquals("Incorrectly reporting objects emitted", 2,stage.getTotalEmits());
178 // Check the stats.
179 String message = stage.getStatusMessage();
180 // Should be 100, maybe 99 in the off chance of a very busy system.
181 assertMatches("% branch A:[0-9]{2,3}\\.[0-9]{3}", message);
182 assertMatches("% branch B:0\\.000", message);
183 }
184
185 public void testStatusInterval() {
186 stage.setStatusInterval(500L);
187 assertEquals("Failed to set value correctly", 500L, stage.getStatusInterval().longValue());
188 }
189
190 public void testStatusBatchSize() {
191 stage.setStatusBatchSize(500);
192 assertEquals("Failed to set value correctly", 500, stage.getStatusBatchSize().longValue());
193 }
194
195 public void testCollectBranchStats() {
196 stage.setCollectBranchStats(true);
197 assertTrue("Failed to set value correctly", stage.getCollectBranchStats());
198 }
199
200 public void testCurrentStatWindowSize() {
201 stage.setCurrentStatWindowSize(200);
202 assertEquals("Failed to set value correctly", 200, stage.getCurrentStatWindowSize().intValue());
203 }
204
205 public void testJmxEnabled() {
206 stage.setJmxEnabled(true);
207 assertTrue("Failed to set value correctly", stage.isJmxEnabled());
208 stage.setJmxEnabled(false);
209 assertFalse("Failed to set value correctly", stage.isJmxEnabled());
210 }
211
212 public void testStageName() {
213 stage.setStageName("foo");
214 assertEquals("Failed to set value correctly", "foo", stage.getStageName());
215 }
216
217 private void assertMatches(String pattern, String string) {
218 assertMatches("Failed to match expected pattern '" + pattern + "' in \n" + string,
219 pattern, string);
220 }
221
222 private void assertMatches(String message, String pattern, String string) {
223 Pattern pat = Pattern.compile(".*" + pattern + ".*", Pattern.DOTALL|Pattern.CASE_INSENSITIVE);
224 assertTrue(message, pat.matcher(string).matches());
225 }
226
227 /**
228 * Make sure the preprocessing steps are always completed.
229 */
230 private TestStage defineStage(TestStage stage) throws StageException {
231 this.init(stage);
232 stage.preprocess();
233 return stage;
234 }
235
236 /**
237 * Private stub stage to record some information.
238 */
239 private class TestStage extends ExtendedBaseStage {
240 boolean preprocessed = false;
241 boolean postprocessed = false;
242
243 public void innerPreprocess() {
244 preprocessed = true;
245 }
246
247 public void innerPostprocess() {
248 postprocessed = true;
249 }
250
251 public void innerProcess(Object obj) {
252 // Overridden by some tests. For others this is a NO-OP
253 }
254
255 public String status() {
256 return null;
257 }
258 }
259 }