001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020 package org.apache.commons.pipeline.config;
021
022 import java.net.URL;
023 import java.util.ResourceBundle;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.commons.pipeline.AbstractLoggingTestCase;
028 import org.apache.commons.pipeline.Pipeline;
029 import org.apache.commons.pipeline.PipelineFactory;
030 import org.apache.commons.pipeline.Stage;
031 import org.apache.commons.pipeline.StageEventListener;
032 import org.apache.commons.pipeline.listener.ObjectProcessedEventCounter;
033 import org.apache.commons.pipeline.stage.KeyWaitBufferStage;
034 import org.apache.commons.pipeline.testFramework.TestFeeder;
035 import org.apache.commons.pipeline.util.BlockingQueueFactory;
036
037 /**
038 * Test cases for the DigesterPipelineFactory.
039 */
040 public class DigesterPipelineFactoryTest extends AbstractLoggingTestCase {
041 private ResourceBundle testResources = ResourceBundle.getBundle("TestResources");
042 private String keyBase = "test.DigesterPipelineFactoryTest";
043
044 public DigesterPipelineFactoryTest(String testName) {
045 super(testName);
046 }
047
048 public static junit.framework.Test suite() {
049 junit.framework.TestSuite suite = new junit.framework.TestSuite(DigesterPipelineFactoryTest.class);
050
051 return suite;
052 }
053
054 public void testCreatePipeline() throws Exception {
055 Log log = LogFactory.getLog(this.getClass());
056 URL confURL = this.getClass().getClassLoader().getResource(testResources.getString(keyBase + ".configFile"));
057 PipelineFactory factory = new DigesterPipelineFactory(confURL);
058
059 Pipeline pipeline = factory.createPipeline();
060 TestFeeder terminalFeeder = new TestFeeder();
061 pipeline.setTerminalFeeder(terminalFeeder);
062 assertNotNull("Pipeline exists.", pipeline);
063
064 int i = 0;
065 for (Stage stage : pipeline.getStages()) {
066 assertNotNull("Stage is not null.", stage);
067 assertEquals(stage.getClass(), Class.forName(testResources.getString(keyBase + ".stage" + i + ".class")));
068 i++;
069
070 if (stage instanceof KeyWaitBufferStage) {
071 assertTrue(((KeyWaitBufferStage) stage).getQueueFactory() instanceof BlockingQueueFactory.ArrayBlockingQueueFactory);
072 }
073 }
074
075 pipeline.run();
076
077 boolean eventsRecorded = false;
078 assertTrue("Pipeline has at least one listener.", pipeline.getRegisteredListeners().size() > 0);
079 for (StageEventListener l : pipeline.getRegisteredListeners()) {
080 if (l instanceof ObjectProcessedEventCounter) {
081 log.info(((ObjectProcessedEventCounter) l).getCounts().size() + " event sources found.");
082 assertTrue("No events were recorded by the ObjectProcessedEventListener.", ((ObjectProcessedEventCounter) l).getCounts().size() > 0);
083 eventsRecorded = true;
084 }
085 }
086 assertTrue("Events were not raised and properly recorded by the listener.", eventsRecorded);
087
088 assertNotNull(pipeline.getEnv("testDate"));
089 assertEquals("Hello, World!", pipeline.getEnv("testEnvVar"));
090 assertTrue("Terminal feeder did not receive any data.", terminalFeeder.receivedValues.size() > 0);
091 }
092 }