001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.    
018     */ 
019    
020    
021    package org.apache.commons.pipeline.driver;
022    
023    import java.util.concurrent.BlockingQueue;
024    import java.util.concurrent.LinkedBlockingQueue;
025    import org.apache.commons.pipeline.Stage;
026    import org.apache.commons.pipeline.StageContext;
027    import org.apache.commons.pipeline.StageDriver;
028    import org.apache.commons.pipeline.StageDriverFactory;
029    import org.apache.commons.pipeline.util.BlockingQueueFactory;
030    
031    /**
032     * This factory is used to create DedicatedThreadStageDriver instances configured
033     * to run specific stages.
034     *
035     */
036    public class DedicatedThreadStageDriverFactory implements StageDriverFactory {
037        
038        /** Creates a new instance of DedicatedThreadStageDriverFactory */
039        public DedicatedThreadStageDriverFactory() {
040        }
041        
042        /**
043         * Creates the new {@link DedicatedThreadStageDriver} based upon the configuration
044         * of this factory instance
045         * @param stage The stage to be run by the newly created driver
046         * @param context The context in which the stage will be run
047         * @return the newly created driver
048         */
049        public StageDriver createStageDriver(Stage stage, StageContext context) {
050            try {
051                return new DedicatedThreadStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance);
052            } catch (Exception e) {
053                throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e);
054            }
055        }
056            
057        /**
058         * Holds value of property timeout.
059         */
060        private long timeout = 500;
061        
062        /**
063         * Timeout (in milliseconds) for queue polling to ensure deadlock cannot 
064         * occur on thread termination. Default value is 500 ms.
065         * @return Value of property timeout.
066         */
067        public long getTimeout() {
068            return this.timeout;
069        }
070        
071        /**
072         * Setter for property timeout.
073         * @param timeout New value of property timeout.
074         */
075        public void setTimeout(long timeout) {
076            this.timeout = timeout;
077        }
078        
079        /**
080         * Holds value of property faultTolerance.
081         */
082        private FaultTolerance faultTolerance = FaultTolerance.NONE;
083        
084        /**
085         * Getter for property faultTolerance. See {@link FaultTolerance} for valid values
086         * and enumation meanings.
087         * @return Value of property faultTolerance.
088         */
089        public FaultTolerance getFaultTolerance() {
090            return this.faultTolerance;
091        }
092        
093        /**
094         * Setter for property faultTolerance.
095         * 
096         * @param faultTolerance New value of property faultTolerance.
097         */
098        public void setFaultTolerance(FaultTolerance faultTolerance) {
099            this.faultTolerance = faultTolerance;
100        }
101        
102        /**
103         * Convenience setter for property faultTolerance for use by Digester.
104         * 
105         * @param level New value of property level ("ALL","CHECKED", or "NONE").
106         */
107        public void setFaultToleranceLevel(String level) {
108            this.faultTolerance = FaultTolerance.valueOf(level);
109        }
110    
111        /**
112         * Holds value of property queueFactory.
113         */
114        private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
115    
116        /**
117         * Getter for property queueFactory.
118         * @return Value of property queueFactory.
119         */
120        public BlockingQueueFactory<?> getQueueFactory() {
121            return this.queueFactory;
122        }
123    
124        /**
125         * Setter for property queueFactory.
126         * @param queueFactory New value of property queueFactory.
127         */
128        public void setQueueFactory(BlockingQueueFactory<?> queueFactory) {
129            this.queueFactory = queueFactory;
130        }    
131    }