View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.collections4.bloomfilter;
18  
19  import static org.junit.jupiter.api.Assertions.assertArrayEquals;
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertFalse;
22  import static org.junit.jupiter.api.Assertions.assertTrue;
23  
24  import java.util.ArrayList;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.concurrent.TimeUnit;
28  import java.util.function.Consumer;
29  import java.util.function.Predicate;
30  
31  import org.apache.commons.collections4.bloomfilter.LayerManager.Cleanup;
32  import org.apache.commons.collections4.bloomfilter.LayerManager.ExtendCheck;
33  import org.junit.jupiter.api.Test;
34  
35  public class LayeredBloomFilterTest extends AbstractBloomFilterTest<LayeredBloomFilter> {
36  
37      /**
38       * A Predicate that advances after a quantum of time.
39       */
40      static class AdvanceOnTimeQuanta implements Predicate<LayerManager> {
41          long quanta;
42  
43          AdvanceOnTimeQuanta(long quanta, TimeUnit unit) {
44              this.quanta = unit.toMillis(quanta);
45          }
46  
47          @Override
48          public boolean test(LayerManager lm) {
49              // can not use getTarget() as it causes recursion.
50              TimestampedBloomFilter bf = (TimestampedBloomFilter) lm.get(lm.getDepth() - 1);
51              return bf.timestamp + quanta < System.currentTimeMillis();
52          }
53      }
54  
55      /**
56       * A Consumer that cleans the list based on how long each filters has been in
57       * the list.
58       */
59      static class CleanByTime implements Consumer<LinkedList<BloomFilter>> {
60          long elapsedTime;
61  
62          CleanByTime(long duration, TimeUnit unit) {
63              elapsedTime = unit.toMillis(duration);
64          }
65  
66          @Override
67          public void accept(LinkedList<BloomFilter> t) {
68              long min = System.currentTimeMillis() - elapsedTime;
69              while (!t.isEmpty() && ((TimestampedBloomFilter) t.getFirst()).getTimestamp() < min) {
70                  TimestampedBloomFilter bf = (TimestampedBloomFilter) t.getFirst();
71                  dbgInstrument.add(String.format("Removing old entry: T:%s (Aged: %s) \n", bf.getTimestamp(),
72                          (min - bf.getTimestamp())));
73                  t.removeFirst();
74              }
75          }
76      }
77  
78      /**
79       * A Bloomfilter implementation that tracks the creation time.
80       */
81      static class TimestampedBloomFilter extends WrappedBloomFilter {
82          final long timestamp;
83  
84          TimestampedBloomFilter(BloomFilter bf) {
85              super(bf);
86              this.timestamp = System.currentTimeMillis();
87          }
88  
89          public long getTimestamp() {
90              return timestamp;
91          }
92      }
93  
94      // ***example of instrumentation ***
95      private static List<String> dbgInstrument = new ArrayList<>();
96  
97      /**
98       * Creates a LayeredBloomFilter that retains enclosed filters for
99       * {@code duration} and limits the contents of each enclosed filter to a time
100      * {@code quanta}. This filter uses the timestamped Bloom filter internally.
101      *
102      * @param shape    The shape of the Bloom filters.
103      * @param duration The length of time to keep filters in the list.
104      * @param dUnit    The unit of time to apply to duration.
105      * @param quanta   The quantization factor for each filter. Individual filters
106      *                 will span at most this much time.
107      * @param qUnit    the unit of time to apply to quanta.
108      * @return LayeredBloomFilter with the above properties.
109      */
110     static LayeredBloomFilter createTimedLayeredFilter(Shape shape, long duration, TimeUnit dUnit, long quanta,
111             TimeUnit qUnit) {
112         LayerManager layerManager = LayerManager.builder()
113                 .setSupplier(() -> new TimestampedBloomFilter(new SimpleBloomFilter(shape)))
114                 .setCleanup(Cleanup.removeEmptyTarget().andThen(new CleanByTime(duration, dUnit)))
115                 .setExtendCheck(new AdvanceOnTimeQuanta(quanta, qUnit)
116                         .or(LayerManager.ExtendCheck.advanceOnSaturation(shape.estimateMaxN())))
117                 .build();
118         return new LayeredBloomFilter(shape, layerManager);
119     }
120 
121     // instrumentation to record timestamps in dbgInstrument list
122     private Predicate<BloomFilter> dbg = (bf) -> {
123         TimestampedBloomFilter tbf = (TimestampedBloomFilter) bf;
124         long ts = System.currentTimeMillis();
125         dbgInstrument.add(String.format("T:%s (Elapsed:%s)- EstN:%s (Card:%s)\n", tbf.timestamp, ts - tbf.timestamp,
126                 tbf.estimateN(), tbf.cardinality()));
127         return true;
128     };
129     // *** end of instrumentation ***
130 
131     @Override
132     protected LayeredBloomFilter createEmptyFilter(Shape shape) {
133         return LayeredBloomFilter.fixed(shape, 10);
134     }
135 
136     protected BloomFilter makeFilter(Hasher h) {
137         BloomFilter bf = new SparseBloomFilter(getTestShape());
138         bf.merge(h);
139         return bf;
140     }
141 
142     protected BloomFilter makeFilter(IndexProducer p) {
143         BloomFilter bf = new SparseBloomFilter(getTestShape());
144         bf.merge(p);
145         return bf;
146     }
147 
148     protected BloomFilter makeFilter(int... values) {
149         return makeFilter(IndexProducer.fromIndexArray(values));
150     }
151 
152     private LayeredBloomFilter setupFindTest() {
153         LayeredBloomFilter filter = LayeredBloomFilter.fixed(getTestShape(), 10);
154         filter.merge(TestingHashers.FROM1);
155         filter.merge(TestingHashers.FROM11);
156         filter.merge(new IncrementingHasher(11, 2));
157         filter.merge(TestingHashers.populateFromHashersFrom1AndFrom11(new SimpleBloomFilter(getTestShape())));
158         return filter;
159     }
160 
161     @Override
162     @Test
163     public void testCardinalityAndIsEmpty() {
164         LayerManager layerManager = LayerManager.builder().setExtendCheck(ExtendCheck.neverAdvance())
165                 .setSupplier(() -> new SimpleBloomFilter(getTestShape())).build();
166         testCardinalityAndIsEmpty(new LayeredBloomFilter(getTestShape(), layerManager));
167     }
168 
169     /**
170      * Tests that the estimated union calculations are correct.
171      */
172     @Test
173     public final void testEstimateUnionCrossTypes() {
174         final BloomFilter bf = createFilter(getTestShape(), TestingHashers.FROM1);
175         final BloomFilter bf2 = new DefaultBloomFilterTest.SparseDefaultBloomFilter(getTestShape());
176         bf2.merge(TestingHashers.FROM11);
177 
178         assertEquals(2, bf.estimateUnion(bf2));
179         assertEquals(2, bf2.estimateUnion(bf));
180     }
181 
182     // ***** TESTS THAT CHECK LAYERED PROCESSING ******
183 
184     @Test
185     public void testExpiration() throws InterruptedException {
186         // this test uses the instrumentation noted above to track changes for debugging
187         // purposes.
188 
189         // list of timestamps that are expected to be expired.
190         List<Long> lst = new ArrayList<>();
191         Shape shape = Shape.fromNM(4, 64);
192 
193         // create a filter that removes filters that are 4 seconds old
194         // and quantises time to 1 second intervals.
195         LayeredBloomFilter underTest = createTimedLayeredFilter(shape, 600, TimeUnit.MILLISECONDS, 150,
196                 TimeUnit.MILLISECONDS);
197 
198         for (int i = 0; i < 10; i++) {
199             underTest.merge(TestingHashers.randomHasher());
200         }
201         underTest.forEachBloomFilter(dbg.and(x -> lst.add(((TimestampedBloomFilter) x).timestamp)));
202         assertTrue(underTest.getDepth() > 1);
203 
204         Thread.sleep(300);
205         for (int i = 0; i < 10; i++) {
206             underTest.merge(TestingHashers.randomHasher());
207         }
208         dbgInstrument.add("=== AFTER 300 milliseconds ====\n");
209         underTest.forEachBloomFilter(dbg);
210 
211         Thread.sleep(150);
212         for (int i = 0; i < 10; i++) {
213             underTest.merge(TestingHashers.randomHasher());
214         }
215         dbgInstrument.add("=== AFTER 450 milliseconds ====\n");
216         underTest.forEachBloomFilter(dbg);
217 
218         // sleep 200 milliseconds to ensure we cross the 600 millisecond boundary
219         Thread.sleep(200);
220         underTest.merge(TestingHashers.randomHasher());
221         dbgInstrument.add("=== AFTER 600 milliseconds ====\n");
222         assertTrue(underTest.forEachBloomFilter(dbg.and(x -> !lst.contains(((TimestampedBloomFilter) x).timestamp))),
223                 "Found filter that should have been deleted: " + dbgInstrument.get(dbgInstrument.size() - 1));
224     }
225     @Test
226     public void testFindBitMapProducer() {
227         LayeredBloomFilter filter = setupFindTest();
228 
229         IndexProducer idxProducer = TestingHashers.FROM1.indices(getTestShape());
230         BitMapProducer producer = BitMapProducer.fromIndexProducer(idxProducer, getTestShape().getNumberOfBits());
231 
232         int[] expected = {0, 3};
233         int[] result = filter.find(producer);
234         assertArrayEquals(expected, result);
235 
236         expected = new int[]{1, 3};
237         idxProducer = TestingHashers.FROM11.indices(getTestShape());
238         producer = BitMapProducer.fromIndexProducer(idxProducer, getTestShape().getNumberOfBits());
239         result = filter.find(producer);
240         assertArrayEquals(expected, result);
241     }
242 
243     @Test
244     public void testFindBloomFilter() {
245         LayeredBloomFilter filter = setupFindTest();
246         int[] expected = {0, 3};
247         int[] result = filter.find(TestingHashers.FROM1);
248         assertArrayEquals(expected, result);
249         expected = new int[] {1, 3};
250         result = filter.find(TestingHashers.FROM11);
251         assertArrayEquals(expected, result);
252     }
253 
254     @Test
255     public void testFindIndexProducer() {
256         IndexProducer producer = TestingHashers.FROM1.indices(getTestShape());
257         LayeredBloomFilter filter = setupFindTest();
258 
259         int[] expected = {0, 3};
260         int[] result = filter.find(producer);
261         assertArrayEquals(expected, result);
262 
263         expected = new int[] {1, 3};
264         producer = TestingHashers.FROM11.indices(getTestShape());
265         result = filter.find(producer);
266         assertArrayEquals(expected, result);
267     }
268 
269     @Test
270     public final void testGetLayer() {
271         BloomFilter bf = new SimpleBloomFilter(getTestShape());
272         bf.merge(TestingHashers.FROM11);
273         LayeredBloomFilter filter = LayeredBloomFilter.fixed(getTestShape(), 10);
274         filter.merge(TestingHashers.FROM1);
275         filter.merge(TestingHashers.FROM11);
276         filter.merge(new IncrementingHasher(11, 2));
277         filter.merge(TestingHashers.populateFromHashersFrom1AndFrom11(new SimpleBloomFilter(getTestShape())));
278         assertArrayEquals(bf.asBitMapArray(), filter.get(1).asBitMapArray());
279     }
280 
281     @Test
282     public void testMultipleFilters() {
283         LayeredBloomFilter filter = LayeredBloomFilter.fixed(getTestShape(), 10);
284         filter.merge(TestingHashers.FROM1);
285         filter.merge(TestingHashers.FROM11);
286         assertEquals(2, filter.getDepth());
287         assertTrue(filter.contains(makeFilter(TestingHashers.FROM1)));
288         assertTrue(filter.contains(makeFilter(TestingHashers.FROM11)));
289         BloomFilter t1 = makeFilter(6, 7, 17, 18, 19);
290         assertFalse(filter.contains(t1));
291         assertFalse(filter.copy().contains(t1));
292         assertTrue(filter.flatten().contains(t1));
293     }
294 
295     @Test
296     public final void testNext() {
297         LayerManager layerManager = LayerManager.builder().setSupplier(() -> new SimpleBloomFilter(getTestShape()))
298                 .build();
299 
300         LayeredBloomFilter filter = new LayeredBloomFilter(getTestShape(), layerManager);
301         filter.merge(TestingHashers.FROM1);
302         filter.merge(TestingHashers.FROM11);
303         assertEquals(1, filter.getDepth());
304         filter.next();
305         filter.merge(new IncrementingHasher(11, 2));
306         assertEquals(2, filter.getDepth());
307         assertTrue(filter.get(0).contains(TestingHashers.FROM1));
308         assertTrue(filter.get(0).contains(TestingHashers.FROM11));
309         assertFalse(filter.get(0).contains(new IncrementingHasher(11, 2)));
310         assertFalse(filter.get(1).contains(TestingHashers.FROM1));
311         assertFalse(filter.get(1).contains(TestingHashers.FROM11));
312         assertTrue(filter.get(1).contains(new IncrementingHasher(11, 2)));
313     }
314 }