001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019
020 package org.apache.commons.pipeline.stage;
021
022 import java.util.EventObject;
023 import java.util.Map;
024 import java.util.Queue;
025 import java.util.Set;
026 import java.util.TreeMap;
027 import java.util.TreeSet;
028 import org.apache.commons.pipeline.StageContext;
029 import org.apache.commons.pipeline.StageEventListener;
030 import org.apache.commons.pipeline.StageException;
031 import org.apache.commons.pipeline.event.KeyAvailableEvent;
032 import org.apache.commons.pipeline.util.KeyFactory;
033 import org.apache.commons.pipeline.util.QueueFactory;
034
035 /**
036 *
037 * @author kjn
038 */
039 public class KeyWaitBufferStage extends BaseStage implements StageEventListener {
040
041 private Set<Object> receivedKeys = new TreeSet<Object>();
042 private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>();
043
044 /** Creates a new instance of KeyWaitBufferStage */
045 public KeyWaitBufferStage() {
046 }
047
048 public void notify(EventObject ev) {
049 if (ev instanceof KeyAvailableEvent) {
050 KeyAvailableEvent e = (KeyAvailableEvent) ev;
051 synchronized(receivedKeys) {
052 receivedKeys.add(e.getKey());
053 }
054
055 //at this point, we know that no more objects will be added to
056 //the pending queue for the key, so we can remove and empty it.
057 if (buffers.containsKey(e.getKey())) {
058 for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
059 }
060 }
061 }
062
063 public void init(StageContext context) {
064 super.init(context);
065 context.registerListener(this);
066 }
067
068 public void process(Object obj) throws StageException {
069 Object key = keyFactory.generateKey(obj);
070 synchronized(receivedKeys) {
071 if (!receivedKeys.contains(key)) {
072 //store the object in a pending queue.
073 if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue());
074 buffers.get(key).add(obj);
075 return;
076 }
077 }
078
079 this.emit(obj);
080 }
081
082 /**
083 * Holds value of property keyFactory.
084 */
085 private KeyFactory<Object,? extends Object> keyFactory;
086
087 /**
088 * Getter for property keyFactory.
089 * @return Value of property keyFactory.
090 */
091 public KeyFactory<Object,? extends Object> getKeyFactory() {
092 return this.keyFactory;
093 }
094
095 /**
096 * Setter for property keyFactory.
097 * @param keyFactory New value of property keyFactory.
098 */
099 public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
100 this.keyFactory = keyFactory;
101 }
102
103 /**
104 * Holds value of property queueFactory.
105 */
106 private QueueFactory<Object> queueFactory;
107
108 /**
109 * Getter for property queueFactory.
110 * @return Value of property queueFactory.
111 */
112 public QueueFactory<Object> getQueueFactory() {
113 return this.queueFactory;
114 }
115
116 /**
117 * Setter for property queueFactory.
118 * @param queueFactory New value of property queueFactory.
119 */
120 public void setQueueFactory(QueueFactory<Object> queueFactory) {
121 this.queueFactory = queueFactory;
122 }
123 }