001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing,
013     * software distributed under the License is distributed on an
014     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015     * KIND, either express or implied.  See the License for the
016     * specific language governing permissions and limitations
017     * under the License.    
018     */ 
019    
020    package org.apache.commons.pipeline.stage;
021    
022    import java.util.EventObject;
023    import java.util.Map;
024    import java.util.Queue;
025    import java.util.Set;
026    import java.util.TreeMap;
027    import java.util.TreeSet;
028    import org.apache.commons.pipeline.StageContext;
029    import org.apache.commons.pipeline.StageEventListener;
030    import org.apache.commons.pipeline.StageException;
031    import org.apache.commons.pipeline.event.KeyAvailableEvent;
032    import org.apache.commons.pipeline.util.KeyFactory;
033    import org.apache.commons.pipeline.util.QueueFactory;
034    
035    /**
036     *
037     * @author kjn
038     */
039    public class KeyWaitBufferStage extends BaseStage implements StageEventListener {
040        
041        private Set<Object> receivedKeys = new TreeSet<Object>();
042        private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>();
043        
044        /** Creates a new instance of KeyWaitBufferStage */
045        public KeyWaitBufferStage() {
046        }
047        
048        public void notify(EventObject ev) {
049            if (ev instanceof KeyAvailableEvent) {
050                KeyAvailableEvent e = (KeyAvailableEvent) ev;
051                synchronized(receivedKeys) {
052                    receivedKeys.add(e.getKey());
053                }
054                
055                //at this point, we know that no more objects will be added to
056                //the pending queue for the key, so we can remove and empty it.
057                if (buffers.containsKey(e.getKey())) {
058                    for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
059                }
060            }
061        }
062        
063        public void init(StageContext context) {
064            super.init(context);
065            context.registerListener(this);
066        }
067        
068        public void process(Object obj) throws StageException {
069            Object key = keyFactory.generateKey(obj);
070            synchronized(receivedKeys) {
071                if (!receivedKeys.contains(key)) {
072                    //store the object in a pending queue.
073                    if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue());
074                    buffers.get(key).add(obj);
075                    return;
076                }
077            }
078            
079            this.emit(obj);
080        }
081    
082        /**
083         * Holds value of property keyFactory.
084         */
085        private KeyFactory<Object,? extends Object> keyFactory;
086    
087        /**
088         * Getter for property keyFactory.
089         * @return Value of property keyFactory.
090         */
091        public KeyFactory<Object,? extends Object> getKeyFactory() {
092            return this.keyFactory;
093        }
094    
095        /**
096         * Setter for property keyFactory.
097         * @param keyFactory New value of property keyFactory.
098         */
099        public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
100            this.keyFactory = keyFactory;
101        }
102    
103        /**
104         * Holds value of property queueFactory.
105         */
106        private QueueFactory<Object> queueFactory;
107    
108        /**
109         * Getter for property queueFactory.
110         * @return Value of property queueFactory.
111         */
112        public QueueFactory<Object> getQueueFactory() {
113            return this.queueFactory;
114        }
115    
116        /**
117         * Setter for property queueFactory.
118         * @param queueFactory New value of property queueFactory.
119         */
120        public void setQueueFactory(QueueFactory<Object> queueFactory) {
121            this.queueFactory = queueFactory;
122        }
123    }