MultiReadOnlySeekableByteChannel.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *   http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.apache.commons.compress.utils;

  18. import java.io.File;
  19. import java.io.IOException;
  20. import java.nio.ByteBuffer;
  21. import java.nio.channels.ClosedChannelException;
  22. import java.nio.channels.NonWritableChannelException;
  23. import java.nio.channels.SeekableByteChannel;
  24. import java.nio.file.Files;
  25. import java.nio.file.Path;
  26. import java.nio.file.StandardOpenOption;
  27. import java.util.ArrayList;
  28. import java.util.Arrays;
  29. import java.util.Collections;
  30. import java.util.List;
  31. import java.util.Objects;

  32. /**
  33.  * Implements a read-only {@link SeekableByteChannel} that concatenates a collection of other {@link SeekableByteChannel}s.
  34.  * <p>
  35.  * This is a lose port of <a href=
  36.  * "https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">
  37.  * MultiReadOnlySeekableByteChannel</a>
  38.  * by Tim Underwood.
  39.  * </p>
  40.  *
  41.  * @since 1.19
  42.  */
  43. public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {

  44.     private static final Path[] EMPTY_PATH_ARRAY = {};

  45.     /**
  46.      * Concatenates the given files.
  47.      *
  48.      * @param files the files to concatenate
  49.      * @throws NullPointerException if files is null
  50.      * @throws IOException          if opening a channel for one of the files fails
  51.      * @return SeekableByteChannel that concatenates all provided files
  52.      */
  53.     public static SeekableByteChannel forFiles(final File... files) throws IOException {
  54.         final List<Path> paths = new ArrayList<>();
  55.         for (final File f : Objects.requireNonNull(files, "files")) {
  56.             paths.add(f.toPath());
  57.         }
  58.         return forPaths(paths.toArray(EMPTY_PATH_ARRAY));
  59.     }

  60.     /**
  61.      * Concatenates the given file paths.
  62.      *
  63.      * @param paths the file paths to concatenate, note that the LAST FILE of files should be the LAST SEGMENT(.zip) and these files should be added in correct
  64.      *              order (e.g.: .z01, .z02... .z99, .zip)
  65.      * @return SeekableByteChannel that concatenates all provided files
  66.      * @throws NullPointerException if files is null
  67.      * @throws IOException          if opening a channel for one of the files fails
  68.      * @throws IOException          if the first channel doesn't seem to hold the beginning of a split archive
  69.      * @since 1.22
  70.      */
  71.     public static SeekableByteChannel forPaths(final Path... paths) throws IOException {
  72.         final List<SeekableByteChannel> channels = new ArrayList<>();
  73.         for (final Path path : Objects.requireNonNull(paths, "paths")) {
  74.             channels.add(Files.newByteChannel(path, StandardOpenOption.READ));
  75.         }
  76.         if (channels.size() == 1) {
  77.             return channels.get(0);
  78.         }
  79.         return new MultiReadOnlySeekableByteChannel(channels);
  80.     }

  81.     /**
  82.      * Concatenates the given channels.
  83.      *
  84.      * @param channels the channels to concatenate
  85.      * @throws NullPointerException if channels is null
  86.      * @return SeekableByteChannel that concatenates all provided channels
  87.      */
  88.     public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
  89.         if (Objects.requireNonNull(channels, "channels").length == 1) {
  90.             return channels[0];
  91.         }
  92.         return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
  93.     }

  94.     private final List<SeekableByteChannel> channelList;

  95.     private long globalPosition;

  96.     private int currentChannelIdx;

  97.     /**
  98.      * Concatenates the given channels.
  99.      *
  100.      * @param channels the channels to concatenate
  101.      * @throws NullPointerException if channels is null
  102.      */
  103.     public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
  104.         this.channelList = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(channels, "channels")));
  105.     }

  106.     @Override
  107.     public void close() throws IOException {
  108.         IOException first = null;
  109.         for (final SeekableByteChannel ch : channelList) {
  110.             try {
  111.                 ch.close();
  112.             } catch (final IOException ex) {
  113.                 if (first == null) {
  114.                     first = ex;
  115.                 }
  116.             }
  117.         }
  118.         if (first != null) {
  119.             throw new IOException("failed to close wrapped channel", first);
  120.         }
  121.     }

  122.     @Override
  123.     public boolean isOpen() {
  124.         return channelList.stream().allMatch(SeekableByteChannel::isOpen);
  125.     }

  126.     /**
  127.      * Gets this channel's position.
  128.      * <p>
  129.      * This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception when invoked on a closed channel. Instead
  130.      * it will return the position the channel had when close has been called.
  131.      * </p>
  132.      */
  133.     @Override
  134.     public long position() {
  135.         return globalPosition;
  136.     }

  137.     @Override
  138.     public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
  139.         if (newPosition < 0) {
  140.             throw new IllegalArgumentException("Negative position: " + newPosition);
  141.         }
  142.         if (!isOpen()) {
  143.             throw new ClosedChannelException();
  144.         }
  145.         globalPosition = newPosition;
  146.         long pos = newPosition;
  147.         for (int i = 0; i < channelList.size(); i++) {
  148.             final SeekableByteChannel currentChannel = channelList.get(i);
  149.             final long size = currentChannel.size();

  150.             final long newChannelPos;
  151.             if (pos == -1L) {
  152.                 // Position is already set for the correct channel,
  153.                 // the rest of the channels get reset to 0
  154.                 newChannelPos = 0;
  155.             } else if (pos <= size) {
  156.                 // This channel is where we want to be
  157.                 currentChannelIdx = i;
  158.                 final long tmp = pos;
  159.                 pos = -1L; // Mark pos as already being set
  160.                 newChannelPos = tmp;
  161.             } else {
  162.                 // newPosition is past this channel. Set channel
  163.                 // position to the end and substract channel size from
  164.                 // pos
  165.                 pos -= size;
  166.                 newChannelPos = size;
  167.             }
  168.             currentChannel.position(newChannelPos);
  169.         }
  170.         return this;
  171.     }

  172.     /**
  173.      * Sets the position based on the given channel number and relative offset
  174.      *
  175.      * @param channelNumber  the channel number
  176.      * @param relativeOffset the relative offset in the corresponding channel
  177.      * @return global position of all channels as if they are a single channel
  178.      * @throws IOException if positioning fails
  179.      */
  180.     public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
  181.         if (!isOpen()) {
  182.             throw new ClosedChannelException();
  183.         }
  184.         long globalPosition = relativeOffset;
  185.         for (int i = 0; i < channelNumber; i++) {
  186.             globalPosition += channelList.get(i).size();
  187.         }

  188.         return position(globalPosition);
  189.     }

  190.     @Override
  191.     public synchronized int read(final ByteBuffer dst) throws IOException {
  192.         if (!isOpen()) {
  193.             throw new ClosedChannelException();
  194.         }
  195.         if (!dst.hasRemaining()) {
  196.             return 0;
  197.         }

  198.         int totalBytesRead = 0;
  199.         while (dst.hasRemaining() && currentChannelIdx < channelList.size()) {
  200.             final SeekableByteChannel currentChannel = channelList.get(currentChannelIdx);
  201.             final int newBytesRead = currentChannel.read(dst);
  202.             if (newBytesRead == -1) {
  203.                 // EOF for this channel -- advance to next channel idx
  204.                 currentChannelIdx += 1;
  205.                 continue;
  206.             }
  207.             if (currentChannel.position() >= currentChannel.size()) {
  208.                 // we are at the end of the current channel
  209.                 currentChannelIdx++;
  210.             }
  211.             totalBytesRead += newBytesRead;
  212.         }
  213.         if (totalBytesRead > 0) {
  214.             globalPosition += totalBytesRead;
  215.             return totalBytesRead;
  216.         }
  217.         return -1;
  218.     }

  219.     @Override
  220.     public long size() throws IOException {
  221.         if (!isOpen()) {
  222.             throw new ClosedChannelException();
  223.         }
  224.         long acc = 0;
  225.         for (final SeekableByteChannel ch : channelList) {
  226.             acc += ch.size();
  227.         }
  228.         return acc;
  229.     }

  230.     /**
  231.      * @throws NonWritableChannelException since this implementation is read-only.
  232.      */
  233.     @Override
  234.     public SeekableByteChannel truncate(final long size) {
  235.         throw new NonWritableChannelException();
  236.     }

  237.     /**
  238.      * @throws NonWritableChannelException since this implementation is read-only.
  239.      */
  240.     @Override
  241.     public int write(final ByteBuffer src) {
  242.         throw new NonWritableChannelException();
  243.     }

  244. }