View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    * 
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   * 
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.    
18   */ 
19  
20  package org.apache.commons.pipeline.stage;
21  
22  import java.util.EventObject;
23  import java.util.Map;
24  import java.util.Queue;
25  import java.util.Set;
26  import java.util.TreeMap;
27  import java.util.TreeSet;
28  import org.apache.commons.pipeline.StageContext;
29  import org.apache.commons.pipeline.StageEventListener;
30  import org.apache.commons.pipeline.StageException;
31  import org.apache.commons.pipeline.event.KeyAvailableEvent;
32  import org.apache.commons.pipeline.util.KeyFactory;
33  import org.apache.commons.pipeline.util.QueueFactory;
34  
35  /**
36   *
37   * @author kjn
38   */
39  public class KeyWaitBufferStage extends BaseStage implements StageEventListener {
40      
41      private Set<Object> receivedKeys = new TreeSet<Object>();
42      private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>();
43      
44      /** Creates a new instance of KeyWaitBufferStage */
45      public KeyWaitBufferStage() {
46      }
47      
48      public void notify(EventObject ev) {
49          if (ev instanceof KeyAvailableEvent) {
50              KeyAvailableEvent e = (KeyAvailableEvent) ev;
51              synchronized(receivedKeys) {
52                  receivedKeys.add(e.getKey());
53              }
54              
55              //at this point, we know that no more objects will be added to
56              //the pending queue for the key, so we can remove and empty it.
57              if (buffers.containsKey(e.getKey())) {
58                  for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
59              }
60          }
61      }
62      
63      public void init(StageContext context) {
64          super.init(context);
65          context.registerListener(this);
66      }
67      
68      public void process(Object obj) throws StageException {
69          Object key = keyFactory.generateKey(obj);
70          synchronized(receivedKeys) {
71              if (!receivedKeys.contains(key)) {
72                  //store the object in a pending queue.
73                  if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue());
74                  buffers.get(key).add(obj);
75                  return;
76              }
77          }
78          
79          this.emit(obj);
80      }
81  
82      /**
83       * Holds value of property keyFactory.
84       */
85      private KeyFactory<Object,? extends Object> keyFactory;
86  
87      /**
88       * Getter for property keyFactory.
89       * @return Value of property keyFactory.
90       */
91      public KeyFactory<Object,? extends Object> getKeyFactory() {
92          return this.keyFactory;
93      }
94  
95      /**
96       * Setter for property keyFactory.
97       * @param keyFactory New value of property keyFactory.
98       */
99      public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
100         this.keyFactory = keyFactory;
101     }
102 
103     /**
104      * Holds value of property queueFactory.
105      */
106     private QueueFactory<Object> queueFactory;
107 
108     /**
109      * Getter for property queueFactory.
110      * @return Value of property queueFactory.
111      */
112     public QueueFactory<Object> getQueueFactory() {
113         return this.queueFactory;
114     }
115 
116     /**
117      * Setter for property queueFactory.
118      * @param queueFactory New value of property queueFactory.
119      */
120     public void setQueueFactory(QueueFactory<Object> queueFactory) {
121         this.queueFactory = queueFactory;
122     }
123 }