View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io.jmh;
18  
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.LinkedBlockingQueue;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.commons.io.input.QueueInputStream;
24  import org.apache.commons.io.output.QueueOutputStream;
25  import org.apache.commons.lang3.RandomUtils;
26  import org.openjdk.jmh.annotations.Benchmark;
27  import org.openjdk.jmh.annotations.BenchmarkMode;
28  import org.openjdk.jmh.annotations.Group;
29  import org.openjdk.jmh.annotations.Mode;
30  import org.openjdk.jmh.annotations.OutputTimeUnit;
31  import org.openjdk.jmh.annotations.Scope;
32  import org.openjdk.jmh.annotations.State;
33  import org.openjdk.jmh.infra.Blackhole;
34  
35  /**
36   * Measures the amount of time to push 1 MiB to a {@link QueueOutputStream} and read it using a {@link QueueInputStream}
37   */
38  @BenchmarkMode(Mode.SampleTime)
39  @OutputTimeUnit(TimeUnit.MILLISECONDS)
40  @State(Scope.Group)
41  public class QueueStreamBenchmark {
42  
43      private static final int CAPACITY = 1024 * 1024;
44      private static final int BUFFER_SIZE = 1024;
45  
46      private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);
47      private final QueueInputStream inputStream = QueueInputStream.builder()
48              .setBlockingQueue(queue)
49              .get();
50      private final QueueOutputStream outputStream = inputStream.newQueueOutputStream();
51  
52      private final byte[] input = RandomUtils.insecure().randomBytes(CAPACITY);
53      private final byte[] output = new byte[BUFFER_SIZE];
54  
55      @Benchmark
56      @Group("streams")
57      public void input(final Blackhole bh) throws Exception {
58          int received = 0;
59          while (received < CAPACITY) {
60              final int len = inputStream.read(output, 0, BUFFER_SIZE);
61              bh.consume(output);
62              received += len;
63          }
64      }
65  
66      @Benchmark
67      @Group("streams")
68      public void output() throws Exception {
69          int sent = 0;
70          while (sent < CAPACITY) {
71              final int len = Math.min(CAPACITY - sent, BUFFER_SIZE);
72              outputStream.write(input, sent, len);
73              sent += len;
74          }
75      }
76  }