1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io.jmh;
18
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.commons.io.input.QueueInputStream;
24 import org.apache.commons.io.output.QueueOutputStream;
25 import org.apache.commons.lang3.RandomUtils;
26 import org.openjdk.jmh.annotations.Benchmark;
27 import org.openjdk.jmh.annotations.BenchmarkMode;
28 import org.openjdk.jmh.annotations.Group;
29 import org.openjdk.jmh.annotations.Mode;
30 import org.openjdk.jmh.annotations.OutputTimeUnit;
31 import org.openjdk.jmh.annotations.Scope;
32 import org.openjdk.jmh.annotations.State;
33 import org.openjdk.jmh.infra.Blackhole;
34
35
36
37
38 @BenchmarkMode(Mode.SampleTime)
39 @OutputTimeUnit(TimeUnit.MILLISECONDS)
40 @State(Scope.Group)
41 public class QueueStreamBenchmark {
42
43 private static final int CAPACITY = 1024 * 1024;
44 private static final int BUFFER_SIZE = 1024;
45
46 private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
47 private final QueueInputStream inputStream = QueueInputStream.builder()
48 .setBlockingQueue(queue)
49 .get();
50 private final QueueOutputStream outputStream = inputStream.newQueueOutputStream();
51
52 private final byte[] input = RandomUtils.insecure().randomBytes(CAPACITY);
53 private final byte[] output = new byte[BUFFER_SIZE];
54
55 @Benchmark
56 @Group("streams")
57 public void input(final Blackhole bh) throws Exception {
58 int received = 0;
59 while (received < CAPACITY) {
60 final int len = inputStream.read(output, 0, BUFFER_SIZE);
61 bh.consume(output);
62 received += len;
63 }
64 }
65
66 @Benchmark
67 @Group("streams")
68 public void output() throws Exception {
69 int sent = 0;
70 while (sent < CAPACITY) {
71 final int len = Math.min(CAPACITY - sent, BUFFER_SIZE);
72 outputStream.write(input, sent, len);
73 sent += len;
74 }
75 }
76 }