View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.collections4.bloomfilter;
18  
19  import static org.junit.jupiter.api.Assertions.assertArrayEquals;
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.assertFalse;
22  import static org.junit.jupiter.api.Assertions.assertTrue;
23  
24  import java.time.Duration;
25  import java.time.Instant;
26  import java.util.ArrayList;
27  import java.util.Deque;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.function.Consumer;
31  import java.util.function.Predicate;
32  import java.util.function.Supplier;
33  
34  import org.apache.commons.collections4.bloomfilter.LayerManager.Cleanup;
35  import org.apache.commons.collections4.bloomfilter.LayerManager.ExtendCheck;
36  import org.junit.jupiter.api.Test;
37  
38  public class LayeredBloomFilterTest extends AbstractBloomFilterTest<LayeredBloomFilter<?>> {
39  
40      /**
41       * A Predicate that advances after a quantum of time.
42       */
43      static class AdvanceOnTimeQuanta<T extends BloomFilter<T>> implements Predicate<LayerManager<TimestampedBloomFilter<T>>> {
44          Duration quanta;
45  
46          AdvanceOnTimeQuanta(final Duration quanta) {
47              this.quanta = quanta;
48          }
49  
50          @Override
51          public boolean test(final LayerManager<TimestampedBloomFilter<T>> layerManager) {
52              // cannot use getTarget() as it causes recursion.
53              return layerManager.last().getTimestamp().plus(quanta).isBefore(Instant.now());
54          }
55      }
56  
57      /**
58       * A Consumer that cleans the list based on how long each filters has been in
59       * the list.
60       */
61      static class CleanByTime<T extends TimestampedBloomFilter> implements Consumer<List<T>> {
62          Duration elapsedTime;
63  
64          CleanByTime(final Duration elapsedTime) {
65              this.elapsedTime = elapsedTime;
66          }
67  
68          @Override
69          public void accept(final List<T> t) {
70              final Instant min = Instant.now().minus(elapsedTime);
71              final Iterator<T> iter = t.iterator();
72              while (iter.hasNext()) {
73                  final TimestampedBloomFilter bf = iter.next();
74                  if (bf.getTimestamp().isAfter(min) || bf.getTimestamp().equals(min)) {
75                      return;
76                  }
77                  dbgInstrument.add(String.format("Removing old entry: T:%s (Aged: %s) \n", bf.getTimestamp(), Duration.between(bf.getTimestamp(), min)));
78                  iter.remove();
79              }
80          }
81      }
82  
83      static class NumberedBloomFilter extends WrappedBloomFilter<NumberedBloomFilter, SimpleBloomFilter> {
84  
85          int value;
86          int sequence;
87  
88          NumberedBloomFilter(final Shape shape, final int value, final int sequence) {
89              super(new SimpleBloomFilter(shape));
90              this.value = value;
91              this.sequence = sequence;
92          }
93  
94          @Override
95          public NumberedBloomFilter copy() {
96              return new NumberedBloomFilter(getShape(), value, sequence);
97          }
98      }
99  
100     /**
101      * A Bloom filter implementation that tracks the creation time.
102      */
103     public static class TimestampedBloomFilter<T extends BloomFilter<T>> extends WrappedBloomFilter<TimestampedBloomFilter<T>, T> {
104 
105         private final Instant timestamp;
106 
107         TimestampedBloomFilter(final T bf) {
108             this(bf, Instant.now());
109         }
110 
111         TimestampedBloomFilter(final T bf, final Instant timestamp) {
112             super(bf);
113             this.timestamp = timestamp;
114         }
115 
116         @Override
117         public TimestampedBloomFilter<T> copy() {
118             return new TimestampedBloomFilter<>(getWrapped().copy(), timestamp);
119         }
120 
121         public Instant getTimestamp() {
122             return timestamp;
123         }
124     }
125 
126     // ***example of instrumentation ***
127     private static final List<String> dbgInstrument = new ArrayList<>();
128 
129     /**
130      * Creates a LayeredBloomFilter that retains enclosed filters for
131      * {@code duration} and limits the contents of each enclosed filter to a time
132      * {@code quanta}. This filter uses the timestamped Bloom filter internally.
133      *
134      * @param shape    The shape of the Bloom filters.
135      * @param duration The length of time to keep filters in the list.
136      * @param quanta   The quantization factor for each filter. Individual filters
137      *                 will span at most this much time.
138      * @return LayeredBloomFilter with the above properties.
139      */
140     static LayeredBloomFilter<TimestampedBloomFilter<SimpleBloomFilter>> createTimedLayeredFilter(final Shape shape, final Duration duration, final Duration quanta) {
141         final LayerManager.Builder<TimestampedBloomFilter<SimpleBloomFilter>> builder = LayerManager.builder();
142         final Consumer<Deque<TimestampedBloomFilter<SimpleBloomFilter>>> cleanup = Cleanup.removeEmptyTarget().andThen(new CleanByTime(duration));
143         final LayerManager<TimestampedBloomFilter<SimpleBloomFilter>> layerManager = builder
144                 .setSupplier(() -> new TimestampedBloomFilter<>(new SimpleBloomFilter(shape)))
145                 .setCleanup(cleanup)
146                 .setExtendCheck(new AdvanceOnTimeQuanta(quanta)
147                         .or(LayerManager.ExtendCheck.advanceOnSaturation(shape.estimateMaxN())))
148                 .get();
149         return new LayeredBloomFilter<>(shape, layerManager);
150     }
151 
152     /**
153      * Creates a fixed size layered bloom filter that adds new filters to the list,
154      * but never merges them. List will never exceed maxDepth. As additional filters
155      * are added earlier filters are removed.  Uses SimpleBloomFilters.
156      *
157      * @param shape    The shape for the enclosed Bloom filters.
158      * @param maxDepth The maximum depth of layers.
159      * @return An empty layered Bloom filter of the specified shape and depth.
160      */
161     public static LayeredBloomFilter<SimpleBloomFilter> fixed(final Shape shape, final int maxDepth) {
162         return fixed(shape, maxDepth, () -> new SimpleBloomFilter(shape));
163     }
164 
165     /**
166      * Creates a fixed size layered bloom filter that adds new filters to the list,
167      * but never merges them. List will never exceed maxDepth. As additional filters
168      * are added earlier filters are removed.
169      *
170      * @param shape    The shape for the enclosed Bloom filters.
171      * @param maxDepth The maximum depth of layers.
172      * @param supplier A supplier of the Bloom filters to create layers with.
173      * @return An empty layered Bloom filter of the specified shape and depth.
174      */
175     public static <T extends BloomFilter<T>> LayeredBloomFilter<T> fixed(final Shape shape, final int maxDepth, final Supplier<T> supplier) {
176         final LayerManager.Builder<T> builder = LayerManager.builder();
177         builder.setExtendCheck(LayerManager.ExtendCheck.advanceOnPopulated())
178                 .setCleanup(LayerManager.Cleanup.onMaxSize(maxDepth)).setSupplier(supplier);
179         return new LayeredBloomFilter<>(shape, builder.get());
180     }
181 
182     // instrumentation to record timestamps in dbgInstrument list
183     private final Predicate<BloomFilter> dbg = bf -> {
184         final TimestampedBloomFilter tbf = (TimestampedBloomFilter) bf;
185         final Instant ts = Instant.now();
186         dbgInstrument.add(String.format("T:%s (Elapsed:%s)- EstN:%s (Card:%s)\n", tbf.timestamp, Duration.between(tbf.timestamp, ts),
187                 tbf.estimateN(), tbf.cardinality()));
188         return true;
189     };
190     // *** end of instrumentation ***
191 
192     @Override
193     protected LayeredBloomFilter<SimpleBloomFilter> createEmptyFilter(final Shape shape) {
194         return LayeredBloomFilterTest.fixed(shape, 10);
195     }
196 
197     protected BloomFilter makeFilter(final Hasher h) {
198         final BloomFilter bf = new SparseBloomFilter(getTestShape());
199         bf.merge(h);
200         return bf;
201     }
202 
203     protected BloomFilter makeFilter(final IndexExtractor p) {
204         final BloomFilter bf = new SparseBloomFilter(getTestShape());
205         bf.merge(p);
206         return bf;
207     }
208 
209     protected BloomFilter makeFilter(final int... values) {
210         return makeFilter(IndexExtractor.fromIndexArray(values));
211     }
212 
213     private LayeredBloomFilter<SimpleBloomFilter> setupFindTest() {
214         final LayeredBloomFilter<SimpleBloomFilter> filter = LayeredBloomFilterTest.fixed(getTestShape(), 10);
215         filter.merge(TestingHashers.FROM1);
216         filter.merge(TestingHashers.FROM11);
217         filter.merge(new IncrementingHasher(11, 2));
218         filter.merge(TestingHashers.populateFromHashersFrom1AndFrom11(new SimpleBloomFilter(getTestShape())));
219         return filter;
220     }
221 
222     @Override
223     @Test
224     public void testCardinalityAndIsEmpty() {
225         final LayerManager<SimpleBloomFilter> layerManager = LayerManager.<SimpleBloomFilter>builder().setExtendCheck(ExtendCheck.neverAdvance())
226                 .setSupplier(() -> new SimpleBloomFilter(getTestShape())).get();
227         testCardinalityAndIsEmpty(new LayeredBloomFilter<>(getTestShape(), layerManager));
228     }
229 
230     // ***** TESTS THAT CHECK LAYERED PROCESSING ******
231 
232     @Test
233     public void testCleanup() {
234         final int[] sequence = {1};
235         final LayerManager<NumberedBloomFilter> layerManager = LayerManager.<NumberedBloomFilter>builder()
236                 .setSupplier(() -> new NumberedBloomFilter(getTestShape(), 3, sequence[0]++))
237                 .setExtendCheck(ExtendCheck.neverAdvance())
238                 .setCleanup(ll -> ll.removeIf(f -> f.value-- == 0)).get();
239         final LayeredBloomFilter<NumberedBloomFilter> underTest = new LayeredBloomFilter<>(getTestShape(), layerManager);
240         assertEquals(1, underTest.getDepth());
241         underTest.merge(TestingHashers.randomHasher());
242         underTest.cleanup(); // first count == 2
243         assertEquals(1, underTest.getDepth());
244         underTest.next(); // first count == 1
245         assertEquals(2, underTest.getDepth());
246         underTest.merge(TestingHashers.randomHasher());
247         underTest.cleanup(); // first count == 0
248         NumberedBloomFilter f = underTest.get(0);
249         assertEquals(1, f.sequence);
250 
251         assertEquals(2, underTest.getDepth());
252         underTest.cleanup(); // should be removed ; second is now 1st with value 1
253         assertEquals(1, underTest.getDepth());
254         f = underTest.get(0);
255         assertEquals(2, f.sequence);
256 
257         underTest.cleanup(); // first count == 0
258         underTest.cleanup(); // should be removed.  But there is always at least one
259         assertEquals(1, underTest.getDepth());
260         f = underTest.get(0);
261         assertEquals(3, f.sequence);  // it is a new one.
262     }
263     /**
264      * Tests that the estimated union calculations are correct.
265      */
266     @Test
267     public final void testEstimateUnionCrossTypes() {
268         final BloomFilter bf = createFilter(getTestShape(), TestingHashers.FROM1);
269         final BloomFilter bf2 = new DefaultBloomFilterTest.SparseDefaultBloomFilter(getTestShape());
270         bf2.merge(TestingHashers.FROM11);
271 
272         assertEquals(2, bf.estimateUnion(bf2));
273         assertEquals(2, bf2.estimateUnion(bf));
274     }
275 
276     @Test
277     public void testExpiration() throws InterruptedException {
278         // this test uses the instrumentation noted above to track changes for debugging
279         // purposes.
280 
281         // list of timestamps that are expected to be expired.
282         final List<Instant> lst = new ArrayList<>();
283         final Shape shape = Shape.fromNM(4, 64);
284 
285         // create a filter that removes filters that are 4 seconds old
286         // and quantises time to 1 second intervals.
287         final LayeredBloomFilter<TimestampedBloomFilter<SimpleBloomFilter>> underTest = createTimedLayeredFilter(shape, Duration.ofMillis(600),
288                 Duration.ofMillis(150));
289 
290         for (int i = 0; i < 10; i++) {
291             underTest.merge(TestingHashers.randomHasher());
292         }
293         underTest.processBloomFilters(dbg.and(x -> lst.add(((TimestampedBloomFilter) x).timestamp)));
294         assertTrue(underTest.getDepth() > 1);
295 
296         Thread.sleep(300);
297         for (int i = 0; i < 10; i++) {
298             underTest.merge(TestingHashers.randomHasher());
299         }
300         dbgInstrument.add("=== AFTER 300 milliseconds ====\n");
301         underTest.processBloomFilters(dbg);
302 
303         Thread.sleep(150);
304         for (int i = 0; i < 10; i++) {
305             underTest.merge(TestingHashers.randomHasher());
306         }
307         dbgInstrument.add("=== AFTER 450 milliseconds ====\n");
308         underTest.processBloomFilters(dbg);
309 
310         // sleep 200 milliseconds to ensure we cross the 600 millisecond boundary
311         Thread.sleep(200);
312         underTest.merge(TestingHashers.randomHasher());
313         dbgInstrument.add("=== AFTER 600 milliseconds ====\n");
314         assertTrue(underTest.processBloomFilters(dbg.and(x -> !lst.contains(((TimestampedBloomFilter) x).timestamp))),
315                 "Found filter that should have been deleted: " + dbgInstrument.get(dbgInstrument.size() - 1));
316     }
317 
318     @Test
319     public void testFindBitMapExtractor() {
320         final LayeredBloomFilter<SimpleBloomFilter> filter = setupFindTest();
321 
322         IndexExtractor indexExtractor = TestingHashers.FROM1.indices(getTestShape());
323         BitMapExtractor bitMapExtractor = BitMapExtractor.fromIndexExtractor(indexExtractor, getTestShape().getNumberOfBits());
324 
325         int[] expected = {0, 3};
326         int[] result = filter.find(bitMapExtractor);
327         assertArrayEquals(expected, result);
328 
329         expected = new int[]{1, 3};
330         indexExtractor = TestingHashers.FROM11.indices(getTestShape());
331         bitMapExtractor = BitMapExtractor.fromIndexExtractor(indexExtractor, getTestShape().getNumberOfBits());
332         result = filter.find(bitMapExtractor);
333         assertArrayEquals(expected, result);
334     }
335 
336     @Test
337     public void testFindBloomFilter() {
338         final LayeredBloomFilter<SimpleBloomFilter> filter = setupFindTest();
339         int[] expected = {0, 3};
340         int[] result = filter.find(TestingHashers.FROM1);
341         assertArrayEquals(expected, result);
342         expected = new int[] {1, 3};
343         result = filter.find(TestingHashers.FROM11);
344         assertArrayEquals(expected, result);
345     }
346 
347     @Test
348     public void testFindIndexExtractor() {
349         IndexExtractor indexExtractor = TestingHashers.FROM1.indices(getTestShape());
350         final LayeredBloomFilter<SimpleBloomFilter> filter = setupFindTest();
351 
352         int[] expected = {0, 3};
353         int[] result = filter.find(indexExtractor);
354         assertArrayEquals(expected, result);
355 
356         expected = new int[] {1, 3};
357         indexExtractor = TestingHashers.FROM11.indices(getTestShape());
358         result = filter.find(indexExtractor);
359         assertArrayEquals(expected, result);
360     }
361 
362     @Test
363     public final void testGetLayer() {
364         final BloomFilter bf = new SimpleBloomFilter(getTestShape());
365         bf.merge(TestingHashers.FROM11);
366         final LayeredBloomFilter<SimpleBloomFilter> filter = LayeredBloomFilterTest.fixed(getTestShape(), 10);
367         filter.merge(TestingHashers.FROM1);
368         filter.merge(TestingHashers.FROM11);
369         filter.merge(new IncrementingHasher(11, 2));
370         filter.merge(TestingHashers.populateFromHashersFrom1AndFrom11(new SimpleBloomFilter(getTestShape())));
371         assertArrayEquals(bf.asBitMapArray(), filter.get(1).asBitMapArray());
372     }
373 
374     @Test
375     public void testMultipleFilters() {
376         final LayeredBloomFilter<SimpleBloomFilter> filter = LayeredBloomFilterTest.fixed(getTestShape(), 10);
377         filter.merge(TestingHashers.FROM1);
378         filter.merge(TestingHashers.FROM11);
379         assertEquals(2, filter.getDepth());
380         assertTrue(filter.contains(makeFilter(TestingHashers.FROM1)));
381         assertTrue(filter.contains(makeFilter(TestingHashers.FROM11)));
382         final BloomFilter t1 = makeFilter(6, 7, 17, 18, 19);
383         assertFalse(filter.contains(t1));
384         assertFalse(filter.copy().contains(t1));
385         assertTrue(filter.flatten().contains(t1));
386     }
387 
388     @Test
389     public final void testNext() {
390         final LayerManager<SimpleBloomFilter> layerManager = LayerManager.<SimpleBloomFilter>builder().setSupplier(() -> new SimpleBloomFilter(getTestShape())).get();
391         final LayeredBloomFilter<SimpleBloomFilter> filter = new LayeredBloomFilter<>(getTestShape(), layerManager);
392         filter.merge(TestingHashers.FROM1);
393         filter.merge(TestingHashers.FROM11);
394         assertEquals(1, filter.getDepth());
395         filter.next();
396         filter.merge(new IncrementingHasher(11, 2));
397         assertEquals(2, filter.getDepth());
398         assertTrue(filter.get(0).contains(TestingHashers.FROM1));
399         assertTrue(filter.get(0).contains(TestingHashers.FROM11));
400         assertFalse(filter.get(0).contains(new IncrementingHasher(11, 2)));
401         assertFalse(filter.get(1).contains(TestingHashers.FROM1));
402         assertFalse(filter.get(1).contains(TestingHashers.FROM11));
403         assertTrue(filter.get(1).contains(new IncrementingHasher(11, 2)));
404     }
405 }