View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.pipeline.driver;
19  
20  
21  import org.apache.commons.pipeline.Stage;
22  import org.apache.commons.pipeline.StageContext;
23  import org.apache.commons.pipeline.StageDriver;
24  import org.apache.commons.pipeline.StageDriverFactory;
25  import org.apache.commons.pipeline.util.BlockingQueueFactory;
26  
27  /**
28   * This factory is used to create {@link ThreadPoolStageDriver} instances configured
29   * to run specific stages.
30   */
31  public class ThreadPoolStageDriverFactory implements StageDriverFactory {
32      
33      private int numThreads = 1;
34      
35      /** Creates a new instance of ThreadPoolStageDriverFactory */
36      public ThreadPoolStageDriverFactory() {
37      }
38      
39      /**
40       * Creates the new {@link ThreadPoolStageDriver} based upon the configuration
41       * of this factory instance
42       * @param stage The stage to be run by the newly created driver
43       * @param context The context in which the stage will be run
44       * @return the newly created driver
45       */
46      public StageDriver createStageDriver(Stage stage, StageContext context) {
47          try {
48              return new ThreadPoolStageDriver(stage, context, queueFactory.createQueue(), timeout, faultTolerance, numThreads);
49          } catch (Exception e) {
50              throw new IllegalStateException("Instantiation of driver failed due to illegal factory state.", e);
51          }
52      }
53      
54      /**
55       * Holds value of property queueFactory.
56       */
57      private BlockingQueueFactory<?> queueFactory = new BlockingQueueFactory.LinkedBlockingQueueFactory();
58  
59      /**
60       * Getter for property queueFactory.
61       * @return Value of property queueFactory.
62       */
63      public BlockingQueueFactory<?> getQueueFactory() {
64          return this.queueFactory;
65      }
66  
67      /**
68       * Setter for property queueFactory.
69       * @param queueFactory New value of property queueFactory.
70       */
71      public void setQueueFactory(BlockingQueueFactory<?> queueFactory) {
72          this.queueFactory = queueFactory;
73      }    
74      
75      /**
76       * Holds value of property timeout.
77       */
78      private long timeout = 500;
79      
80      /**
81       * Timeout for wait to ensure deadlock cannot occur on thread termination.
82       * Default is 500
83       * @return Value of property timeout.
84       */
85      public long getTimeout() {
86          return this.timeout;
87      }
88      
89      /**
90       * Setter for property timeout.
91       * @param timeout New value of property timeout.
92       */
93      public void setTimeout(long timeout) {
94          this.timeout = timeout;
95      }
96      
97      /**
98       * Holds value of property faultTolerance.
99       */
100     private FaultTolerance faultTolerance = FaultTolerance.NONE;
101     
102     /**
103      * Getter for property faultTolerance. See {@link FaultTolerance} for valid values
104      * and enumation meanings.
105      * @return Value of property faultTolerance.
106      */
107     public FaultTolerance getFaultTolerance() {
108         return this.faultTolerance;
109     }
110     
111     /**
112      * Setter for property faultTolerance.
113      *
114      * @param faultTolerance New value of property faultTolerance.
115      */
116     public void setFaultTolerance(FaultTolerance faultTolerance) {
117         this.faultTolerance = faultTolerance;
118     }
119     
120     /**
121      * Convenience setter for property faultTolerance for use by Digester.
122      *
123      * @param level New value of property level ("ALL","CHECKED", or "NONE").
124      */
125     public void setFaultToleranceLevel(String level) {
126         this.faultTolerance = FaultTolerance.valueOf(level);
127     }
128     
129     /**
130      * Returns the number of threads that will be allocated to the thread
131      * pool of a driver created by this factory.
132      */
133     public int getNumThreads() {
134         return numThreads;
135     }
136     
137     /**
138      * Sets the number of threads that will be allocated to the thread
139      * pool of a driver created by this factory.
140      */
141     public void setNumThreads(int numThreads) {
142         this.numThreads = numThreads;
143     }
144     
145 }