1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.pipeline.driver;
19
20
21 import org.apache.commons.pipeline.Stage;
22 import org.apache.commons.pipeline.StageContext;
23 import org.apache.commons.pipeline.StageDriver;
24 import org.apache.commons.pipeline.StageDriverFactory;
25 import org.apache.commons.pipeline.util.BlockingQueueFactory;
26
27 /**
28 * This factory is used to create {@link ThreadPoolStageDriver} instances configured
29 * to run specific stages.
30 */
31 public class ThreadPoolStageDriverFactory implements StageDriverFactory {
32
33 private int numThreads = 1;
34
35 /** Creates a new instance of ThreadPoolStageDriverFactory */
36 public ThreadPoolStageDriverFactory() {
37 }
38
39 /**
40 * Creates the new {@link ThreadPoolStageDriver} based upon the configuration
41 * of this factory instance
42 * @param stage The stage to be run by the newly created driver
43 * @param context The context in which the stage will be run
44 * @return the newly created driver
45 */
46 public StageDriver createStageDriver(Stage stage, StageContext context) {
47 try {
48 return new ThreadPoolStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance, numThreads);
49 } catch (Exception e) {
50 throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e);
51 }
52 }
53
54 /**
55 * Holds value of property queueFactory.
56 */
57 private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
58
59 /**
60 * Getter for property queueFactory.
61 * @return Value of property queueFactory.
62 */
63 public BlockingQueueFactory<?> getQueueFactory() {
64 return this.queueFactory;
65 }
66
67 /**
68 * Setter for property queueFactory.
69 * @param queueFactory New value of property queueFactory.
70 */
71 public void setQueueFactory(BlockingQueueFactory<?> queueFactory) {
72 this.queueFactory = queueFactory;
73 }
74
75 /**
76 * Holds value of property timeout.
77 */
78 private long timeout = 500;
79
80 /**
81 * Timeout for wait to ensure deadlock cannot occur on thread termination.
82 * Default is 500
83 * @return Value of property timeout.
84 */
85 public long getTimeout() {
86 return this.timeout;
87 }
88
89 /**
90 * Setter for property timeout.
91 * @param timeout New value of property timeout.
92 */
93 public void setTimeout(long timeout) {
94 this.timeout = timeout;
95 }
96
97 /**
98 * Holds value of property faultTolerance.
99 */
100 private FaultTolerance faultTolerance = FaultTolerance.NONE;
101
102 /**
103 * Getter for property faultTolerance. See {@link FaultTolerance} for valid values
104 * and enumation meanings.
105 * @return Value of property faultTolerance.
106 */
107 public FaultTolerance getFaultTolerance() {
108 return this.faultTolerance;
109 }
110
111 /**
112 * Setter for property faultTolerance.
113 *
114 * @param faultTolerance New value of property faultTolerance.
115 */
116 public void setFaultTolerance(FaultTolerance faultTolerance) {
117 this.faultTolerance = faultTolerance;
118 }
119
120 /**
121 * Convenience setter for property faultTolerance for use by Digester.
122 *
123 * @param level New value of property level ("ALL","CHECKED", or "NONE").
124 */
125 public void setFaultToleranceLevel(String level) {
126 this.faultTolerance = FaultTolerance.valueOf(level);
127 }
128
129 /**
130 * Returns the number of threads that will be allocated to the thread
131 * pool of a driver created by this factory.
132 */
133 public int getNumThreads() {
134 return numThreads;
135 }
136
137 /**
138 * Sets the number of threads that will be allocated to the thread
139 * pool of a driver created by this factory.
140 */
141 public void setNumThreads(int numThreads) {
142 this.numThreads = numThreads;
143 }
144
145 }