MultiReadOnlySeekableByteChannel.java
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.commons.compress.utils;
- import java.io.File;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.NonWritableChannelException;
- import java.nio.channels.SeekableByteChannel;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.StandardOpenOption;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.List;
- import java.util.Objects;
- /**
- * Implements a read-only {@link SeekableByteChannel} that concatenates a collection of other {@link SeekableByteChannel}s.
- * <p>
- * This is a lose port of <a href=
- * "https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">
- * MultiReadOnlySeekableByteChannel</a>
- * by Tim Underwood.
- * </p>
- *
- * @since 1.19
- */
- public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
- private static final Path[] EMPTY_PATH_ARRAY = {};
- /**
- * Concatenates the given files.
- *
- * @param files the files to concatenate
- * @throws NullPointerException if files is null
- * @throws IOException if opening a channel for one of the files fails
- * @return SeekableByteChannel that concatenates all provided files
- */
- public static SeekableByteChannel forFiles(final File... files) throws IOException {
- final List<Path> paths = new ArrayList<>();
- for (final File f : Objects.requireNonNull(files, "files")) {
- paths.add(f.toPath());
- }
- return forPaths(paths.toArray(EMPTY_PATH_ARRAY));
- }
- /**
- * Concatenates the given file paths.
- *
- * @param paths the file paths to concatenate, note that the LAST FILE of files should be the LAST SEGMENT(.zip) and these files should be added in correct
- * order (e.g.: .z01, .z02... .z99, .zip)
- * @return SeekableByteChannel that concatenates all provided files
- * @throws NullPointerException if files is null
- * @throws IOException if opening a channel for one of the files fails
- * @throws IOException if the first channel doesn't seem to hold the beginning of a split archive
- * @since 1.22
- */
- public static SeekableByteChannel forPaths(final Path... paths) throws IOException {
- final List<SeekableByteChannel> channels = new ArrayList<>();
- for (final Path path : Objects.requireNonNull(paths, "paths")) {
- channels.add(Files.newByteChannel(path, StandardOpenOption.READ));
- }
- if (channels.size() == 1) {
- return channels.get(0);
- }
- return new MultiReadOnlySeekableByteChannel(channels);
- }
- /**
- * Concatenates the given channels.
- *
- * @param channels the channels to concatenate
- * @throws NullPointerException if channels is null
- * @return SeekableByteChannel that concatenates all provided channels
- */
- public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
- if (Objects.requireNonNull(channels, "channels").length == 1) {
- return channels[0];
- }
- return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
- }
- private final List<SeekableByteChannel> channelList;
- private long globalPosition;
- private int currentChannelIdx;
- /**
- * Concatenates the given channels.
- *
- * @param channels the channels to concatenate
- * @throws NullPointerException if channels is null
- */
- public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
- this.channelList = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(channels, "channels")));
- }
- @Override
- public void close() throws IOException {
- IOException first = null;
- for (final SeekableByteChannel ch : channelList) {
- try {
- ch.close();
- } catch (final IOException ex) {
- if (first == null) {
- first = ex;
- }
- }
- }
- if (first != null) {
- throw new IOException("failed to close wrapped channel", first);
- }
- }
- @Override
- public boolean isOpen() {
- return channelList.stream().allMatch(SeekableByteChannel::isOpen);
- }
- /**
- * Gets this channel's position.
- * <p>
- * This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception when invoked on a closed channel. Instead
- * it will return the position the channel had when close has been called.
- * </p>
- */
- @Override
- public long position() {
- return globalPosition;
- }
- @Override
- public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
- if (newPosition < 0) {
- throw new IllegalArgumentException("Negative position: " + newPosition);
- }
- if (!isOpen()) {
- throw new ClosedChannelException();
- }
- globalPosition = newPosition;
- long pos = newPosition;
- for (int i = 0; i < channelList.size(); i++) {
- final SeekableByteChannel currentChannel = channelList.get(i);
- final long size = currentChannel.size();
- final long newChannelPos;
- if (pos == -1L) {
- // Position is already set for the correct channel,
- // the rest of the channels get reset to 0
- newChannelPos = 0;
- } else if (pos <= size) {
- // This channel is where we want to be
- currentChannelIdx = i;
- final long tmp = pos;
- pos = -1L; // Mark pos as already being set
- newChannelPos = tmp;
- } else {
- // newPosition is past this channel. Set channel
- // position to the end and substract channel size from
- // pos
- pos -= size;
- newChannelPos = size;
- }
- currentChannel.position(newChannelPos);
- }
- return this;
- }
- /**
- * Sets the position based on the given channel number and relative offset
- *
- * @param channelNumber the channel number
- * @param relativeOffset the relative offset in the corresponding channel
- * @return global position of all channels as if they are a single channel
- * @throws IOException if positioning fails
- */
- public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
- if (!isOpen()) {
- throw new ClosedChannelException();
- }
- long globalPosition = relativeOffset;
- for (int i = 0; i < channelNumber; i++) {
- globalPosition += channelList.get(i).size();
- }
- return position(globalPosition);
- }
- @Override
- public synchronized int read(final ByteBuffer dst) throws IOException {
- if (!isOpen()) {
- throw new ClosedChannelException();
- }
- if (!dst.hasRemaining()) {
- return 0;
- }
- int totalBytesRead = 0;
- while (dst.hasRemaining() && currentChannelIdx < channelList.size()) {
- final SeekableByteChannel currentChannel = channelList.get(currentChannelIdx);
- final int newBytesRead = currentChannel.read(dst);
- if (newBytesRead == -1) {
- // EOF for this channel -- advance to next channel idx
- currentChannelIdx += 1;
- continue;
- }
- if (currentChannel.position() >= currentChannel.size()) {
- // we are at the end of the current channel
- currentChannelIdx++;
- }
- totalBytesRead += newBytesRead;
- }
- if (totalBytesRead > 0) {
- globalPosition += totalBytesRead;
- return totalBytesRead;
- }
- return -1;
- }
- @Override
- public long size() throws IOException {
- if (!isOpen()) {
- throw new ClosedChannelException();
- }
- long acc = 0;
- for (final SeekableByteChannel ch : channelList) {
- acc += ch.size();
- }
- return acc;
- }
- /**
- * @throws NonWritableChannelException since this implementation is read-only.
- */
- @Override
- public SeekableByteChannel truncate(final long size) {
- throw new NonWritableChannelException();
- }
- /**
- * @throws NonWritableChannelException since this implementation is read-only.
- */
- @Override
- public int write(final ByteBuffer src) {
- throw new NonWritableChannelException();
- }
- }