001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.stage;
019    
020    import java.util.regex.Pattern;
021    
022    import junit.framework.Test;
023    import junit.framework.TestCase;
024    import junit.framework.TestSuite;
025    
026    import org.apache.commons.pipeline.driver.SynchronousStageDriverFactory;
027    import org.apache.commons.pipeline.event.ObjectProcessedEvent;
028    import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter;
029    import org.apache.commons.pipeline.testFramework.TestStage;
030    import org.apache.commons.pipeline.testFramework.TestFeeder;
031    import org.apache.commons.pipeline.StageException;
032    
033    /**
034     * Test cases for the extended base stage. Since it is abstract these tests have
035     * to use a subclass. In order to prevent dependency problems this subclass is defined 
036     * as a private class.
037     */
038    public class ExtendedBaseStageTest extends AbstractStageTest {
039        TestStage stage = null;
040    
041        public ExtendedBaseStageTest(String testName) {
042            super(testName);
043        }
044    
045        protected void setUp() throws Exception {
046            super.setUp();
047            stage = new TestStage();
048            this.init(stage);
049        }
050    
051        public void testPreprocess() throws Exception {
052            stage.preprocess();
053            assertTrue("Preprocess call failed to call inner method", stage.preprocessed);
054            assertEquals("Incorrectly reporting objects recieved", 0,stage.getObjectsReceived());
055            assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
056        }
057    
058        public void testPostprocess() throws Exception {
059            stage.preprocess();
060            stage.postprocess();
061            assertTrue("Postprocess call failed to call inner method", stage.postprocessed);
062            assertEquals("Incorrectly reporting objects recieved", 0,stage.getObjectsReceived());
063            assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
064        }
065    
066        /**
067         * Verify the emit statistics are correctly reported.
068         */
069        public void testEmits() throws Exception {
070            // Redefine the process method.
071            stage = defineStage(new TestStage() {
072                @Override
073                public void innerProcess(Object obj) {
074                    super.emit(obj);
075                }
076            });
077            Object value = new Object();
078            stage.process(value);
079            // Verify output.
080            assertEquals("Stage did not emit value as expected",
081                         value, testFeeder.receivedValues.get(0));
082            // Verify stats correctly recorded.
083            assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
084            assertEquals("Incorrectly reporting objects emitted", 1,stage.getTotalEmits());
085            // Make sure the stats message is correct.
086            String message = stage.getStatusMessage();
087            assertMatches("Total objects received:1", message);
088            assertMatches("Total objects emitted:1 \\(100\\.000%\\)", message);
089        }
090    
091        /**
092         * Verify the statistics are correctly reported when nothing is emitted
093         */
094        public void testNonEmit() throws Exception {
095            stage.preprocess();
096            Object value = new Object();
097            stage.process(value);
098            // Verify output.
099            assertTrue("Stage emitted something unexpectedly", testFeeder.receivedValues.isEmpty());
100            // Verify stats correctly recorded.
101            assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
102            assertEquals("Incorrectly reporting objects emitted", 0,stage.getTotalEmits());
103            // Make sure the stats message is correct.
104            String message = stage.getStatusMessage();
105            assertMatches("Total objects received:1", message);
106            // No % display on 0 emits.
107            assertMatches("Total objects emitted:0", message);
108        }
109    
110        public void testProcessingTimeStats() throws Exception {
111            // Redefine the process method.
112            stage = defineStage(new TestStage() {
113                @Override
114                public void innerProcess(Object obj) {
115                    try {
116                        Thread.sleep(33);
117                    } catch (InterruptedException e) {
118                        // Ignored.
119                    }
120                    super.emit(obj);
121                }
122            });
123            Object value = new Object();
124            stage.process(value);
125            // Verify stats correctly recorded.
126            assertEquals("Incorrectly reporting objects recieved", 1,stage.getObjectsReceived());
127            assertEquals("Incorrectly reporting objects emitted", 1,stage.getTotalEmits());
128            // Make sure the stats message is correct.
129            String message = stage.getStatusMessage();
130            assertMatches("% time working:100\\.000", message);
131            assertMatches("% time blocking:0\\.000", message);
132            // Should be between 0.032 and 0.034 seconds. Allow up to 0.099 for busy systems
133            assertMatches("Total net processing time \\(sec\\):0\\.0[0-9]{2}",
134                          message);
135            assertMatches("Total gross processing time \\(sec\\):0\\.0[0-9]{2}",
136                          message);
137            assertMatches("Average net processing time \\(sec/obj\\):0\\.0[0-9]{2}",
138                          message);
139            assertMatches("Average gross processing time \\(sec/obj\\):0\\.0[0-9]{2}",
140                          message);
141            // Moving window display
142            assertMatches("Average gross processing time in last 1 \\(sec/obj\\):0\\.0[0-9]{2}",
143                          message);
144        }
145    
146        public void testBranchStatistics() throws Exception {
147            // In order to log the stats we need some blocking. Redefine one of the two branches
148            // used below to block.
149            testContext.branchFeeders.put("A",new TestFeeder() {
150                @Override
151                public void feed(Object obj) {
152                    try {
153                        Thread.sleep(20);
154                    } catch (InterruptedException e) {
155                        // Ignored.
156                    }
157                    super.feed(obj);
158                }
159            });
160            // Redefine the process method.
161            stage = defineStage(new TestStage() {
162                @Override
163                public void innerProcess(Object obj) {
164                    // Hald to one queue, half to another.
165                    if (getObjectsReceived()%2 == 0) {
166                        super.emit("A", obj);
167                    } else {
168                        super.emit("B", obj);
169                    }
170                }
171            });
172            stage.setCollectBranchStats(true);
173            stage.process("Object 1");
174            stage.process("Object 2");
175            // Verify overall stats correctly recorded.
176            assertEquals("Incorrectly reporting objects recieved", 2,stage.getObjectsReceived());
177            assertEquals("Incorrectly reporting objects emitted", 2,stage.getTotalEmits());
178            // Check the stats.
179            String message = stage.getStatusMessage();
180            // Should be 100, maybe 99 in the off chance of a very busy system.
181            assertMatches("% branch A:[0-9]{2,3}\\.[0-9]{3}", message);
182            assertMatches("% branch B:0\\.000", message);
183        }
184    
185        public void testStatusInterval() {
186            stage.setStatusInterval(500L);
187            assertEquals("Failed to set value correctly", 500L, stage.getStatusInterval().longValue());
188        }
189    
190        public void testStatusBatchSize() {
191            stage.setStatusBatchSize(500);
192            assertEquals("Failed to set value correctly", 500, stage.getStatusBatchSize().longValue());
193        }
194    
195        public void testCollectBranchStats() {
196            stage.setCollectBranchStats(true);
197            assertTrue("Failed to set value correctly", stage.getCollectBranchStats());
198        }
199    
200        public void testCurrentStatWindowSize() {
201            stage.setCurrentStatWindowSize(200);
202            assertEquals("Failed to set value correctly", 200, stage.getCurrentStatWindowSize().intValue());
203        }
204    
205        public void testJmxEnabled() {
206            stage.setJmxEnabled(true);
207            assertTrue("Failed to set value correctly", stage.isJmxEnabled());
208            stage.setJmxEnabled(false);
209            assertFalse("Failed to set value correctly", stage.isJmxEnabled());
210        }
211    
212        public void testStageName() {
213            stage.setStageName("foo");
214            assertEquals("Failed to set value correctly", "foo", stage.getStageName());
215        }
216    
217        private void assertMatches(String pattern, String string) {
218            assertMatches("Failed to match expected pattern '" + pattern + "' in \n" + string,
219                          pattern, string);
220        }
221    
222        private void assertMatches(String message, String pattern, String string) {
223            Pattern pat = Pattern.compile(".*" + pattern + ".*", Pattern.DOTALL|Pattern.CASE_INSENSITIVE);
224            assertTrue(message, pat.matcher(string).matches());
225        }
226    
227        /**
228         * Make sure the preprocessing steps are always completed. 
229         */
230        private TestStage defineStage(TestStage stage) throws StageException {
231            this.init(stage);
232            stage.preprocess();
233            return stage;
234        }
235    
236        /**
237         * Private stub stage to record some information.
238         */
239        private class TestStage extends ExtendedBaseStage {
240            boolean preprocessed = false;
241            boolean postprocessed = false;
242    
243            public void innerPreprocess() {
244                preprocessed = true;
245            }
246    
247            public void innerPostprocess() {
248                postprocessed = true;
249            }
250    
251            public void innerProcess(Object obj) {
252                // Overridden by some tests. For others this is a NO-OP
253            }
254    
255            public String status() {
256                return null;
257            }
258        }
259    }