View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   https://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.commons.compress.utils;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ClosedChannelException;
26  import java.nio.channels.NonWritableChannelException;
27  import java.nio.channels.SeekableByteChannel;
28  import java.nio.file.Files;
29  import java.nio.file.Path;
30  import java.nio.file.StandardOpenOption;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Collections;
34  import java.util.List;
35  import java.util.Objects;
36  
37  /**
38   * Implements a read-only {@link SeekableByteChannel} that concatenates a collection of other {@link SeekableByteChannel}s.
39   * <p>
40   * This is a lose port of <a href=
41   * "https://github.com/frugalmechanic/fm-common/blob/master/jvm/src/main/scala/fm/common/MultiReadOnlySeekableByteChannel.scala">
42   * MultiReadOnlySeekableByteChannel</a>
43   * by Tim Underwood.
44   * </p>
45   *
46   * @since 1.19
47   */
48  public class MultiReadOnlySeekableByteChannel implements SeekableByteChannel {
49  
50      private static final Path[] EMPTY_PATH_ARRAY = {};
51  
52      /**
53       * Concatenates the given files.
54       *
55       * @param files the files to concatenate
56       * @throws NullPointerException if files is null
57       * @throws IOException          if opening a channel for one of the files fails
58       * @return SeekableByteChannel that concatenates all provided files
59       */
60      public static SeekableByteChannel forFiles(final File... files) throws IOException {
61          final List<Path> paths = new ArrayList<>();
62          for (final File f : Objects.requireNonNull(files, "files")) {
63              paths.add(f.toPath());
64          }
65          return forPaths(paths.toArray(EMPTY_PATH_ARRAY));
66      }
67  
68      /**
69       * Concatenates the given file paths.
70       *
71       * @param paths the file paths to concatenate, note that the LAST FILE of files should be the LAST SEGMENT(.zip) and these files should be added in correct
72       *              order (for example: .z01, .z02... .z99, .zip)
73       * @return SeekableByteChannel that concatenates all provided files
74       * @throws NullPointerException if files is null
75       * @throws IOException          if opening a channel for one of the files fails
76       * @throws IOException          if the first channel doesn't seem to hold the beginning of a split archive
77       * @since 1.22
78       */
79      public static SeekableByteChannel forPaths(final Path... paths) throws IOException {
80          final List<SeekableByteChannel> channels = new ArrayList<>();
81          for (final Path path : Objects.requireNonNull(paths, "paths")) {
82              channels.add(Files.newByteChannel(path, StandardOpenOption.READ));
83          }
84          if (channels.size() == 1) {
85              return channels.get(0);
86          }
87          return new MultiReadOnlySeekableByteChannel(channels);
88      }
89  
90      /**
91       * Concatenates the given channels.
92       *
93       * @param channels the channels to concatenate
94       * @throws NullPointerException if channels is null
95       * @return SeekableByteChannel that concatenates all provided channels
96       */
97      public static SeekableByteChannel forSeekableByteChannels(final SeekableByteChannel... channels) {
98          if (Objects.requireNonNull(channels, "channels").length == 1) {
99              return channels[0];
100         }
101         return new MultiReadOnlySeekableByteChannel(Arrays.asList(channels));
102     }
103 
104     private final List<SeekableByteChannel> channelList;
105 
106     private long globalPosition;
107 
108     private int currentChannelIdx;
109 
110     /**
111      * Concatenates the given channels.
112      *
113      * @param channels the channels to concatenate
114      * @throws NullPointerException if channels is null
115      */
116     public MultiReadOnlySeekableByteChannel(final List<SeekableByteChannel> channels) {
117         this.channelList = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(channels, "channels")));
118     }
119 
120     @Override
121     public void close() throws IOException {
122         IOException first = null;
123         for (final SeekableByteChannel ch : channelList) {
124             try {
125                 ch.close();
126             } catch (final IOException ex) {
127                 if (first == null) {
128                     first = ex;
129                 }
130             }
131         }
132         if (first != null) {
133             throw new IOException("failed to close wrapped channel", first);
134         }
135     }
136 
137     @Override
138     public boolean isOpen() {
139         return channelList.stream().allMatch(SeekableByteChannel::isOpen);
140     }
141 
142     /**
143      * Gets this channel's position.
144      * <p>
145      * This method violates the contract of {@link SeekableByteChannel#position()} as it will not throw any exception when invoked on a closed channel. Instead
146      * it will return the position the channel had when close has been called.
147      * </p>
148      */
149     @Override
150     public long position() {
151         return globalPosition;
152     }
153 
154     @Override
155     public synchronized SeekableByteChannel position(final long newPosition) throws IOException {
156         if (newPosition < 0) {
157             throw new IllegalArgumentException("Negative position: " + newPosition);
158         }
159         if (!isOpen()) {
160             throw new ClosedChannelException();
161         }
162         globalPosition = newPosition;
163         long pos = newPosition;
164         for (int i = 0; i < channelList.size(); i++) {
165             final SeekableByteChannel currentChannel = channelList.get(i);
166             final long size = currentChannel.size();
167 
168             final long newChannelPos;
169             if (pos == -1L) {
170                 // Position is already set for the correct channel,
171                 // the rest of the channels get reset to 0
172                 newChannelPos = 0;
173             } else if (pos <= size) {
174                 // This channel is where we want to be
175                 currentChannelIdx = i;
176                 final long tmp = pos;
177                 pos = -1L; // Mark pos as already being set
178                 newChannelPos = tmp;
179             } else {
180                 // newPosition is past this channel. Set channel
181                 // position to the end and substract channel size from
182                 // pos
183                 pos -= size;
184                 newChannelPos = size;
185             }
186             currentChannel.position(newChannelPos);
187         }
188         return this;
189     }
190 
191     /**
192      * Sets the position based on the given channel number and relative offset
193      *
194      * @param channelNumber  the channel number
195      * @param relativeOffset the relative offset in the corresponding channel
196      * @return global position of all channels as if they are a single channel
197      * @throws IOException if positioning fails
198      */
199     public synchronized SeekableByteChannel position(final long channelNumber, final long relativeOffset) throws IOException {
200         if (!isOpen()) {
201             throw new ClosedChannelException();
202         }
203         long globalPosition = relativeOffset;
204         for (int i = 0; i < channelNumber; i++) {
205             globalPosition += channelList.get(i).size();
206         }
207 
208         return position(globalPosition);
209     }
210 
211     @Override
212     public synchronized int read(final ByteBuffer dst) throws IOException {
213         if (!isOpen()) {
214             throw new ClosedChannelException();
215         }
216         if (!dst.hasRemaining()) {
217             return 0;
218         }
219 
220         int totalBytesRead = 0;
221         while (dst.hasRemaining() && currentChannelIdx < channelList.size()) {
222             final SeekableByteChannel currentChannel = channelList.get(currentChannelIdx);
223             final int newBytesRead = currentChannel.read(dst);
224             if (newBytesRead == -1) {
225                 // EOF for this channel -- advance to next channel idx
226                 currentChannelIdx += 1;
227                 continue;
228             }
229             if (currentChannel.position() >= currentChannel.size()) {
230                 // we are at the end of the current channel
231                 currentChannelIdx++;
232             }
233             totalBytesRead += newBytesRead;
234         }
235         if (totalBytesRead > 0) {
236             globalPosition += totalBytesRead;
237             return totalBytesRead;
238         }
239         return -1;
240     }
241 
242     @Override
243     public long size() throws IOException {
244         if (!isOpen()) {
245             throw new ClosedChannelException();
246         }
247         long acc = 0;
248         for (final SeekableByteChannel ch : channelList) {
249             acc += ch.size();
250         }
251         return acc;
252     }
253 
254     /**
255      * @throws NonWritableChannelException since this implementation is read-only.
256      */
257     @Override
258     public SeekableByteChannel truncate(final long size) {
259         throw new NonWritableChannelException();
260     }
261 
262     /**
263      * @throws NonWritableChannelException since this implementation is read-only.
264      */
265     @Override
266     public int write(final ByteBuffer src) {
267         throw new NonWritableChannelException();
268     }
269 
270 }