001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.commons.pipeline.driver;
019
020
021 import org.apache.commons.pipeline.Stage;
022 import org.apache.commons.pipeline.StageContext;
023 import org.apache.commons.pipeline.StageDriver;
024 import org.apache.commons.pipeline.StageDriverFactory;
025 import org.apache.commons.pipeline.util.BlockingQueueFactory;
026
027 /**
028 * This factory is used to create {@link ThreadPoolStageDriver} instances configured
029 * to run specific stages.
030 */
031 public class ThreadPoolStageDriverFactory implements StageDriverFactory {
032
033 private int numThreads = 1;
034
035 /** Creates a new instance of ThreadPoolStageDriverFactory */
036 public ThreadPoolStageDriverFactory() {
037 }
038
039 /**
040 * Creates the new {@link ThreadPoolStageDriver} based upon the configuration
041 * of this factory instance
042 * @param stage The stage to be run by the newly created driver
043 * @param context The context in which the stage will be run
044 * @return the newly created driver
045 */
046 public StageDriver createStageDriver(Stage stage, StageContext context) {
047 try {
048 return new ThreadPoolStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance, numThreads);
049 } catch (Exception e) {
050 throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e);
051 }
052 }
053
054 /**
055 * Holds value of property queueFactory.
056 */
057 private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
058
059 /**
060 * Getter for property queueFactory.
061 * @return Value of property queueFactory.
062 */
063 public BlockingQueueFactory<?> getQueueFactory() {
064 return this.queueFactory;
065 }
066
067 /**
068 * Setter for property queueFactory.
069 * @param queueFactory New value of property queueFactory.
070 */
071 public void setQueueFactory(BlockingQueueFactory<?> queueFactory) {
072 this.queueFactory = queueFactory;
073 }
074
075 /**
076 * Holds value of property timeout.
077 */
078 private long timeout = 500;
079
080 /**
081 * Timeout for wait to ensure deadlock cannot occur on thread termination.
082 * Default is 500
083 * @return Value of property timeout.
084 */
085 public long getTimeout() {
086 return this.timeout;
087 }
088
089 /**
090 * Setter for property timeout.
091 * @param timeout New value of property timeout.
092 */
093 public void setTimeout(long timeout) {
094 this.timeout = timeout;
095 }
096
097 /**
098 * Holds value of property faultTolerance.
099 */
100 private FaultTolerance faultTolerance = FaultTolerance.NONE;
101
102 /**
103 * Getter for property faultTolerance. See {@link FaultTolerance} for valid values
104 * and enumation meanings.
105 * @return Value of property faultTolerance.
106 */
107 public FaultTolerance getFaultTolerance() {
108 return this.faultTolerance;
109 }
110
111 /**
112 * Setter for property faultTolerance.
113 *
114 * @param faultTolerance New value of property faultTolerance.
115 */
116 public void setFaultTolerance(FaultTolerance faultTolerance) {
117 this.faultTolerance = faultTolerance;
118 }
119
120 /**
121 * Convenience setter for property faultTolerance for use by Digester.
122 *
123 * @param level New value of property level ("ALL","CHECKED", or "NONE").
124 */
125 public void setFaultToleranceLevel(String level) {
126 this.faultTolerance = FaultTolerance.valueOf(level);
127 }
128
129 /**
130 * Returns the number of threads that will be allocated to the thread
131 * pool of a driver created by this factory.
132 */
133 public int getNumThreads() {
134 return numThreads;
135 }
136
137 /**
138 * Sets the number of threads that will be allocated to the thread
139 * pool of a driver created by this factory.
140 */
141 public void setNumThreads(int numThreads) {
142 this.numThreads = numThreads;
143 }
144
145 }