View Javadoc
1   package org.apache.commons.jcs3.auxiliary.disk.block;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.FileChannel;
26  import java.nio.file.StandardOpenOption;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
32  import org.apache.commons.jcs3.log.Log;
33  import org.apache.commons.jcs3.log.LogManager;
34  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
35  
36  /**
37   * This class manages reading an writing data to disk. When asked to write a value, it returns a
38   * block array. It can read an object from the block numbers in a byte array.
39   */
40  public class BlockDisk implements AutoCloseable
41  {
42      /** The logger */
43      private static final Log log = LogManager.getLog(BlockDisk.class);
44  
45      /** The size of the header that indicates the amount of data stored in an occupied block. */
46      public static final byte HEADER_SIZE_BYTES = 4;
47      // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt()
48  
49      /** defaults to 4kb */
50      private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
51  
52      /** Size of the blocks */
53      private final int blockSizeBytes;
54  
55      /**
56       * the total number of blocks that have been used. If there are no free, we will use this to
57       * calculate the position of the next block.
58       */
59      private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
60  
61      /** Empty blocks that can be reused. */
62      private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>();
63  
64      /** The serializer. */
65      private final IElementSerializer elementSerializer;
66  
67      /** Location of the spot on disk */
68      private final String filepath;
69  
70      /** File channel for multiple concurrent reads and writes */
71      private final FileChannel fc;
72  
73      /** How many bytes have we put to disk */
74      private final AtomicLong putBytes = new AtomicLong();
75  
76      /** How many items have we put to disk */
77      private final AtomicLong putCount = new AtomicLong();
78  
79      /**
80       * Constructor for the Disk object
81       * <p>
82       * @param file
83       * @param elementSerializer
84       * @throws IOException
85       */
86      public BlockDisk(final File file, final IElementSerializer elementSerializer)
87          throws IOException
88      {
89          this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer);
90      }
91  
92      /**
93       * Creates the file and set the block size in bytes.
94       * <p>
95       * @param file
96       * @param blockSizeBytes
97       * @throws IOException
98       */
99      public BlockDisk(final File file, final int blockSizeBytes)
100         throws IOException
101     {
102         this(file, blockSizeBytes, new StandardSerializer());
103     }
104 
105     /**
106      * Creates the file and set the block size in bytes.
107      * <p>
108      * @param file
109      * @param blockSizeBytes
110      * @param elementSerializer
111      * @throws IOException
112      */
113     public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer)
114         throws IOException
115     {
116         this.filepath = file.getAbsolutePath();
117         this.fc = FileChannel.open(file.toPath(),
118                 StandardOpenOption.CREATE,
119                 StandardOpenOption.READ,
120                 StandardOpenOption.WRITE);
121         this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
122 
123         log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes);
124 
125         this.blockSizeBytes = blockSizeBytes;
126         this.elementSerializer = elementSerializer;
127     }
128 
129     /**
130      * Allocate a given number of blocks from the available set
131      *
132      * @param numBlocksNeeded
133      * @return an array of allocated blocks
134      */
135     private int[] allocateBlocks(final int numBlocksNeeded)
136     {
137         assert numBlocksNeeded >= 1;
138 
139         final int[] blocks = new int[numBlocksNeeded];
140         // get them from the empty list or take the next one
141         for (int i = 0; i < numBlocksNeeded; i++)
142         {
143             Integer emptyBlock = emptyBlocks.poll();
144             if (emptyBlock == null)
145             {
146                 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
147             }
148             blocks[i] = emptyBlock.intValue();
149         }
150 
151         return blocks;
152     }
153 
154     /**
155      * This writes an object to disk and returns the blocks it was stored in.
156      * <p>
157      * The program flow is as follows:
158      * <ol>
159      * <li>Serialize the object.</li>
160      * <li>Determine the number of blocks needed.</li>
161      * <li>Look for free blocks in the emptyBlock list.</li>
162      * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
163      * <li>If the data will not fit in one block, create sub arrays.</li>
164      * <li>Write the subarrays to disk.</li>
165      * <li>If the process fails we should decrement the block count if we took from it.</li>
166      * </ol>
167      * @param object
168      * @return the blocks we used.
169      * @throws IOException
170      */
171     protected <T> int[] write(final T object)
172         throws IOException
173     {
174         // serialize the object
175         final byte[] data = elementSerializer.serialize(object);
176 
177         log.debug("write, total pre-chunking data.length = {0}", data.length);
178 
179         this.putBytes.addAndGet(data.length);
180         this.putCount.incrementAndGet();
181 
182         // figure out how many blocks we need.
183         final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
184 
185         log.debug("numBlocksNeeded = {0}", numBlocksNeeded);
186 
187         // allocate blocks
188         final int[] blocks = allocateBlocks(numBlocksNeeded);
189 
190         int offset = 0;
191         final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
192         final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
193         final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
194 
195         for (int i = 0; i < numBlocksNeeded; i++)
196         {
197             headerBuffer.clear();
198             final int length = Math.min(maxChunkSize, data.length - offset);
199             headerBuffer.putInt(length);
200             headerBuffer.flip();
201 
202             dataBuffer.position(offset).limit(offset + length);
203             final ByteBuffer slice = dataBuffer.slice();
204 
205             final long position = calculateByteOffsetForBlockAsLong(blocks[i]);
206             // write the header
207             int written = fc.write(headerBuffer, position);
208             assert written == HEADER_SIZE_BYTES;
209 
210             //write the data
211             written = fc.write(slice, position + HEADER_SIZE_BYTES);
212             assert written == length;
213 
214             offset += length;
215         }
216 
217         //fc.force(false);
218 
219         return blocks;
220     }
221 
222     /**
223      * Return the amount to put in each block. Fill them all the way, minus the header.
224      * <p>
225      * @param complete
226      * @param numBlocksNeeded
227      * @return byte[][]
228      */
229     protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded)
230     {
231         final byte[][] chunks = new byte[numBlocksNeeded][];
232 
233         if (numBlocksNeeded == 1)
234         {
235             chunks[0] = complete;
236         }
237         else
238         {
239             final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
240             final int totalBytes = complete.length;
241             int totalUsed = 0;
242             for (short i = 0; i < numBlocksNeeded; i++)
243             {
244                 // use the max that can be written to a block or whatever is left in the original
245                 // array
246                 final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed);
247                 final byte[] chunk = new byte[chunkSize];
248                 // copy from the used position to the chunk size on the complete array to the chunk
249                 // array.
250                 System.arraycopy(complete, totalUsed, chunk, 0, chunkSize);
251                 chunks[i] = chunk;
252                 totalUsed += chunkSize;
253             }
254         }
255 
256         return chunks;
257     }
258 
259     /**
260      * Reads an object that is located in the specified blocks.
261      * <p>
262      * @param blockNumbers
263      * @return the object instance
264      * @throws IOException
265      * @throws ClassNotFoundException
266      */
267     protected <T> T read(final int[] blockNumbers)
268         throws IOException, ClassNotFoundException
269     {
270         final ByteBuffer data;
271 
272         if (blockNumbers.length == 1)
273         {
274             data = readBlock(blockNumbers[0]);
275         }
276         else
277         {
278             data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes());
279             // get all the blocks into data
280             for (short i = 0; i < blockNumbers.length; i++)
281             {
282                 final ByteBuffer chunk = readBlock(blockNumbers[i]);
283                 data.put(chunk);
284             }
285 
286             data.flip();
287         }
288 
289         log.debug("read, total post combination data.length = {0}", () -> data.limit());
290 
291         return elementSerializer.deSerialize(data.array(), null);
292     }
293 
294     /**
295      * This reads the occupied data in a block.
296      * <p>
297      * The first four bytes of the record should tell us how long it is. The data is read into a
298      * byte array and then an object is constructed from the byte array.
299      * <p>
300      * @return byte[]
301      * @param block
302      * @throws IOException
303      */
304     private ByteBuffer readBlock(final int block)
305         throws IOException
306     {
307         int datalen = 0;
308 
309         String message = null;
310         boolean corrupted = false;
311         final long fileLength = fc.size();
312 
313         final long position = calculateByteOffsetForBlockAsLong(block);
314 //        if (position > fileLength)
315 //        {
316 //            corrupted = true;
317 //            message = "Record " + position + " starts past EOF.";
318 //        }
319 //        else
320         {
321             final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
322             fc.read(datalength, position);
323             datalength.flip();
324             datalen = datalength.getInt();
325             if (position + datalen > fileLength)
326             {
327                 corrupted = true;
328                 message = "Record " + position + " exceeds file length.";
329             }
330         }
331 
332         if (corrupted)
333         {
334             log.warn("\n The file is corrupt: \n {0}", message);
335             throw new IOException("The File Is Corrupt, need to reset");
336         }
337 
338         final ByteBuffer data = ByteBuffer.allocate(datalen);
339         fc.read(data, position + HEADER_SIZE_BYTES);
340         data.flip();
341 
342         return data;
343     }
344 
345     /**
346      * Add these blocks to the emptyBlock list.
347      * <p>
348      * @param blocksToFree
349      */
350     protected void freeBlocks(final int[] blocksToFree)
351     {
352         if (blocksToFree != null)
353         {
354             for (short i = 0; i < blocksToFree.length; i++)
355             {
356                 emptyBlocks.offer(Integer.valueOf(blocksToFree[i]));
357             }
358         }
359     }
360 
361     /**
362      * Calculates the file offset for a particular block.
363      * <p>
364      * @param block number
365      * @return the byte offset for this block in the file as a long
366      * @since 2.0
367      */
368     protected long calculateByteOffsetForBlockAsLong(final int block)
369     {
370         return (long) block * blockSizeBytes;
371     }
372 
373     /**
374      * The number of blocks needed.
375      * <p>
376      * @param data
377      * @return the number of blocks needed to store the byte array
378      */
379     protected int calculateTheNumberOfBlocksNeeded(final byte[] data)
380     {
381         final int dataLength = data.length;
382 
383         final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
384 
385         // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
386         if (dataLength <= oneBlock)
387         {
388             return 1;
389         }
390 
391         int dividend = dataLength / oneBlock;
392 
393         if (dataLength % oneBlock != 0)
394         {
395             dividend++;
396         }
397         return dividend;
398     }
399 
400     /**
401      * Returns the file length.
402      * <p>
403      * @return the size of the file.
404      * @throws IOException
405      */
406     protected long length()
407         throws IOException
408     {
409         return fc.size();
410     }
411 
412     /**
413      * Closes the file.
414      * <p>
415      * @throws IOException
416      */
417     @Override
418     public void close()
419         throws IOException
420     {
421         this.numberOfBlocks.set(0);
422         this.emptyBlocks.clear();
423         fc.close();
424     }
425 
426     /**
427      * Resets the file.
428      * <p>
429      * @throws IOException
430      */
431     protected synchronized void reset()
432         throws IOException
433     {
434         this.numberOfBlocks.set(0);
435         this.emptyBlocks.clear();
436         fc.truncate(0);
437         fc.force(true);
438     }
439 
440     /**
441      * @return Returns the numberOfBlocks.
442      */
443     protected int getNumberOfBlocks()
444     {
445         return numberOfBlocks.get();
446     }
447 
448     /**
449      * @return Returns the blockSizeBytes.
450      */
451     protected int getBlockSizeBytes()
452     {
453         return blockSizeBytes;
454     }
455 
456     /**
457      * @return Returns the average size of the an element inserted.
458      */
459     protected long getAveragePutSizeBytes()
460     {
461         final long count = this.putCount.get();
462 
463         if (count == 0)
464         {
465             return 0;
466         }
467         return this.putBytes.get() / count;
468     }
469 
470     /**
471      * @return Returns the number of empty blocks.
472      */
473     protected int getEmptyBlocks()
474     {
475         return this.emptyBlocks.size();
476     }
477 
478     /**
479      * For debugging only.
480      * <p>
481      * @return String with details.
482      */
483     @Override
484     public String toString()
485     {
486         final StringBuilder buf = new StringBuilder();
487         buf.append("\nBlock Disk ");
488         buf.append("\n  Filepath [" + filepath + "]");
489         buf.append("\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]");
490         buf.append("\n  BlockSizeBytes [" + this.blockSizeBytes + "]");
491         buf.append("\n  Put Bytes [" + this.putBytes + "]");
492         buf.append("\n  Put Count [" + this.putCount + "]");
493         buf.append("\n  Average Size [" + getAveragePutSizeBytes() + "]");
494         buf.append("\n  Empty Blocks [" + this.getEmptyBlocks() + "]");
495         try
496         {
497             buf.append("\n  Length [" + length() + "]");
498         }
499         catch (final IOException e)
500         {
501             // swallow
502         }
503         return buf.toString();
504     }
505 
506     /**
507      * This is used for debugging.
508      * <p>
509      * @return the file path.
510      */
511     protected String getFilePath()
512     {
513         return filepath;
514     }
515 }