001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.commons.pipeline.driver;
019    
020    
021    import org.apache.commons.pipeline.Stage;
022    import org.apache.commons.pipeline.StageContext;
023    import org.apache.commons.pipeline.StageDriver;
024    import org.apache.commons.pipeline.StageDriverFactory;
025    import org.apache.commons.pipeline.util.BlockingQueueFactory;
026    
027    /**
028     * This factory is used to create {@link ThreadPoolStageDriver} instances configured
029     * to run specific stages.
030     */
031    public class ThreadPoolStageDriverFactory implements StageDriverFactory {
032        
033        private int numThreads = 1;
034        
035        /** Creates a new instance of ThreadPoolStageDriverFactory */
036        public ThreadPoolStageDriverFactory() {
037        }
038        
039        /**
040         * Creates the new {@link ThreadPoolStageDriver} based upon the configuration
041         * of this factory instance
042         * @param stage The stage to be run by the newly created driver
043         * @param context The context in which the stage will be run
044         * @return the newly created driver
045         */
046        public StageDriver createStageDriver(Stage stage, StageContext context) {
047            try {
048                return new ThreadPoolStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance, numThreads);
049            } catch (Exception e) {
050                throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e);
051            }
052        }
053        
054        /**
055         * Holds value of property queueFactory.
056         */
057        private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
058    
059        /**
060         * Getter for property queueFactory.
061         * @return Value of property queueFactory.
062         */
063        public BlockingQueueFactory<?> getQueueFactory() {
064            return this.queueFactory;
065        }
066    
067        /**
068         * Setter for property queueFactory.
069         * @param queueFactory New value of property queueFactory.
070         */
071        public void setQueueFactory(BlockingQueueFactory<?> queueFactory) {
072            this.queueFactory = queueFactory;
073        }    
074        
075        /**
076         * Holds value of property timeout.
077         */
078        private long timeout = 500;
079        
080        /**
081         * Timeout for wait to ensure deadlock cannot occur on thread termination.
082         * Default is 500
083         * @return Value of property timeout.
084         */
085        public long getTimeout() {
086            return this.timeout;
087        }
088        
089        /**
090         * Setter for property timeout.
091         * @param timeout New value of property timeout.
092         */
093        public void setTimeout(long timeout) {
094            this.timeout = timeout;
095        }
096        
097        /**
098         * Holds value of property faultTolerance.
099         */
100        private FaultTolerance faultTolerance = FaultTolerance.NONE;
101        
102        /**
103         * Getter for property faultTolerance. See {@link FaultTolerance} for valid values
104         * and enumation meanings.
105         * @return Value of property faultTolerance.
106         */
107        public FaultTolerance getFaultTolerance() {
108            return this.faultTolerance;
109        }
110        
111        /**
112         * Setter for property faultTolerance.
113         *
114         * @param faultTolerance New value of property faultTolerance.
115         */
116        public void setFaultTolerance(FaultTolerance faultTolerance) {
117            this.faultTolerance = faultTolerance;
118        }
119        
120        /**
121         * Convenience setter for property faultTolerance for use by Digester.
122         *
123         * @param level New value of property level ("ALL","CHECKED", or "NONE").
124         */
125        public void setFaultToleranceLevel(String level) {
126            this.faultTolerance = FaultTolerance.valueOf(level);
127        }
128        
129        /**
130         * Returns the number of threads that will be allocated to the thread
131         * pool of a driver created by this factory.
132         */
133        public int getNumThreads() {
134            return numThreads;
135        }
136        
137        /**
138         * Sets the number of threads that will be allocated to the thread
139         * pool of a driver created by this factory.
140         */
141        public void setNumThreads(int numThreads) {
142            this.numThreads = numThreads;
143        }
144        
145    }