001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019 020 package org.apache.commons.pipeline.stage; 021 022 import java.util.EventObject; 023 import java.util.Map; 024 import java.util.Queue; 025 import java.util.Set; 026 import java.util.TreeMap; 027 import java.util.TreeSet; 028 import org.apache.commons.pipeline.StageContext; 029 import org.apache.commons.pipeline.StageEventListener; 030 import org.apache.commons.pipeline.StageException; 031 import org.apache.commons.pipeline.event.KeyAvailableEvent; 032 import org.apache.commons.pipeline.util.KeyFactory; 033 import org.apache.commons.pipeline.util.QueueFactory; 034 035 /** 036 * 037 * @author kjn 038 */ 039 public class KeyWaitBufferStage extends BaseStage implements StageEventListener { 040 041 private Set<Object> receivedKeys = new TreeSet<Object>(); 042 private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>(); 043 044 /** Creates a new instance of KeyWaitBufferStage */ 045 public KeyWaitBufferStage() { 046 } 047 048 public void notify(EventObject ev) { 049 if (ev instanceof KeyAvailableEvent) { 050 KeyAvailableEvent e = (KeyAvailableEvent) ev; 051 synchronized(receivedKeys) { 052 receivedKeys.add(e.getKey()); 053 } 054 055 //at this point, we know that no more objects will be added to 056 //the pending queue for the key, so we can remove and empty it. 057 if (buffers.containsKey(e.getKey())) { 058 for (Object obj : buffers.remove(e.getKey())) this.emit(obj); 059 } 060 } 061 } 062 063 public void init(StageContext context) { 064 super.init(context); 065 context.registerListener(this); 066 } 067 068 public void process(Object obj) throws StageException { 069 Object key = keyFactory.generateKey(obj); 070 synchronized(receivedKeys) { 071 if (!receivedKeys.contains(key)) { 072 //store the object in a pending queue. 073 if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue()); 074 buffers.get(key).add(obj); 075 return; 076 } 077 } 078 079 this.emit(obj); 080 } 081 082 /** 083 * Holds value of property keyFactory. 084 */ 085 private KeyFactory<Object,? extends Object> keyFactory; 086 087 /** 088 * Getter for property keyFactory. 089 * @return Value of property keyFactory. 090 */ 091 public KeyFactory<Object,? extends Object> getKeyFactory() { 092 return this.keyFactory; 093 } 094 095 /** 096 * Setter for property keyFactory. 097 * @param keyFactory New value of property keyFactory. 098 */ 099 public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) { 100 this.keyFactory = keyFactory; 101 } 102 103 /** 104 * Holds value of property queueFactory. 105 */ 106 private QueueFactory<Object> queueFactory; 107 108 /** 109 * Getter for property queueFactory. 110 * @return Value of property queueFactory. 111 */ 112 public QueueFactory<Object> getQueueFactory() { 113 return this.queueFactory; 114 } 115 116 /** 117 * Setter for property queueFactory. 118 * @param queueFactory New value of property queueFactory. 119 */ 120 public void setQueueFactory(QueueFactory<Object> queueFactory) { 121 this.queueFactory = queueFactory; 122 } 123 }