View Javadoc
1   package org.apache.commons.jcs.auxiliary.disk.block;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.RandomAccessFile;
26  import java.io.Serializable;
27  import java.nio.ByteBuffer;
28  import java.nio.channels.FileChannel;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.jcs.engine.behavior.IElementSerializer;
34  import org.apache.commons.jcs.utils.serialization.StandardSerializer;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  
38  /**
39   * This class manages reading an writing data to disk. When asked to write a value, it returns a
40   * block array. It can read an object from the block numbers in a byte array.
41   * <p>
42   * @author Aaron Smuts
43   */
44  public class BlockDisk
45  {
46      /** The logger */
47      private static final Log log = LogFactory.getLog( BlockDisk.class );
48  
49      /** The size of the header that indicates the amount of data stored in an occupied block. */
50      public static final byte HEADER_SIZE_BYTES = 4;
51      // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt()
52  
53      /** defaults to 4kb */
54      private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55  
56      /** Size of the blocks */
57      private final int blockSizeBytes;
58  
59      /**
60       * the total number of blocks that have been used. If there are no free, we will use this to
61       * calculate the position of the next block.
62       */
63      private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64  
65      /** Empty blocks that can be reused. */
66      private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<Integer>();
67  
68      /** The serializer. */
69      private final IElementSerializer elementSerializer;
70  
71      /** Location of the spot on disk */
72      private final String filepath;
73  
74      /** File channel for multiple concurrent reads and writes */
75      private final FileChannel fc;
76  
77      /** How many bytes have we put to disk */
78      private final AtomicLong putBytes = new AtomicLong(0);
79  
80      /** How many items have we put to disk */
81      private final AtomicLong putCount = new AtomicLong(0);
82  
83      /**
84       * Constructor for the Disk object
85       * <p>
86       * @param file
87       * @param elementSerializer
88       * @throws IOException
89       */
90      public BlockDisk( File file, IElementSerializer elementSerializer )
91          throws IOException
92      {
93          this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94      }
95  
96      /**
97       * Creates the file and set the block size in bytes.
98       * <p>
99       * @param file
100      * @param blockSizeBytes
101      * @throws IOException
102      */
103     public BlockDisk( File file, int blockSizeBytes )
104         throws IOException
105     {
106         this( file, blockSizeBytes, new StandardSerializer() );
107     }
108 
109     /**
110      * Creates the file and set the block size in bytes.
111      * <p>
112      * @param file
113      * @param blockSizeBytes
114      * @param elementSerializer
115      * @throws IOException
116      */
117     public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118         throws IOException
119     {
120         this.filepath = file.getAbsolutePath();
121         RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122         this.fc = raf.getChannel();
123         this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
124 
125         if ( log.isInfoEnabled() )
126         {
127             log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
128         }
129 
130         this.blockSizeBytes = blockSizeBytes;
131         this.elementSerializer = elementSerializer;
132     }
133 
134     /**
135      * Allocate a given number of blocks from the available set
136      *
137      * @param numBlocksNeeded
138      * @return an array of allocated blocks
139      */
140     private int[] allocateBlocks(int numBlocksNeeded)
141     {
142         assert numBlocksNeeded >= 1;
143 
144         int[] blocks = new int[numBlocksNeeded];
145         // get them from the empty list or take the next one
146         for (int i = 0; i < numBlocksNeeded; i++)
147         {
148             Integer emptyBlock = emptyBlocks.poll();
149             if (emptyBlock == null)
150             {
151                 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
152             }
153             blocks[i] = emptyBlock.intValue();
154         }
155 
156         return blocks;
157     }
158 
159     /**
160      * This writes an object to disk and returns the blocks it was stored in.
161      * <p>
162      * The program flow is as follows:
163      * <ol>
164      * <li>Serialize the object.</li>
165      * <li>Determine the number of blocks needed.</li>
166      * <li>Look for free blocks in the emptyBlock list.</li>
167      * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
168      * <li>If the data will not fit in one block, create sub arrays.</li>
169      * <li>Write the subarrays to disk.</li>
170      * <li>If the process fails we should decrement the block count if we took from it.</li>
171      * </ol>
172      * @param object
173      * @return the blocks we used.
174      * @throws IOException
175      */
176     protected int[] write( Serializable object )
177         throws IOException
178     {
179         // serialize the object
180         byte[] data = elementSerializer.serialize(object);
181 
182         if ( log.isDebugEnabled() )
183         {
184             log.debug( "write, total pre-chunking data.length = " + data.length );
185         }
186 
187         this.putBytes.addAndGet(data.length);
188         this.putCount.incrementAndGet();
189 
190         // figure out how many blocks we need.
191         int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
192 
193         if ( log.isDebugEnabled() )
194         {
195             log.debug( "numBlocksNeeded = " + numBlocksNeeded );
196         }
197 
198         // allocate blocks
199         int[] blocks = allocateBlocks(numBlocksNeeded);
200 
201         int offset = 0;
202         final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
203         ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
204 
205         for (int i = 0; i < numBlocksNeeded; i++)
206         {
207             headerBuffer.clear();
208             int length = Math.min(maxChunkSize, data.length - offset);
209             headerBuffer.putInt(length);
210 
211             ByteBuffer dataBuffer = ByteBuffer.wrap(data, offset, length);
212 
213             long position = calculateByteOffsetForBlockAsLong(blocks[i]);
214             // write the header
215             headerBuffer.flip();
216             int written = fc.write(headerBuffer, position);
217             assert written == HEADER_SIZE_BYTES;
218 
219             //write the data
220             written = fc.write(dataBuffer, position + HEADER_SIZE_BYTES);
221             assert written == length;
222 
223             offset += length;
224         }
225 
226         //fc.force(false);
227 
228         return blocks;
229     }
230 
231     /**
232      * Return the amount to put in each block. Fill them all the way, minus the header.
233      * <p>
234      * @param complete
235      * @param numBlocksNeeded
236      * @return byte[][]
237      */
238     protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
239     {
240         byte[][] chunks = new byte[numBlocksNeeded][];
241 
242         if ( numBlocksNeeded == 1 )
243         {
244             chunks[0] = complete;
245         }
246         else
247         {
248             int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
249             int totalBytes = complete.length;
250             int totalUsed = 0;
251             for ( short i = 0; i < numBlocksNeeded; i++ )
252             {
253                 // use the max that can be written to a block or whatever is left in the original
254                 // array
255                 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
256                 byte[] chunk = new byte[chunkSize];
257                 // copy from the used position to the chunk size on the complete array to the chunk
258                 // array.
259                 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
260                 chunks[i] = chunk;
261                 totalUsed += chunkSize;
262             }
263         }
264 
265         return chunks;
266     }
267 
268     /**
269      * Reads an object that is located in the specified blocks.
270      * <p>
271      * @param blockNumbers
272      * @return Serializable
273      * @throws IOException
274      * @throws ClassNotFoundException
275      */
276     protected <T extends Serializable> T read( int[] blockNumbers )
277         throws IOException, ClassNotFoundException
278     {
279         byte[] data = null;
280 
281         if ( blockNumbers.length == 1 )
282         {
283             data = readBlock( blockNumbers[0] );
284         }
285         else
286         {
287             ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
288             // get all the blocks into data
289             for ( short i = 0; i < blockNumbers.length; i++ )
290             {
291                 byte[] chunk = readBlock( blockNumbers[i] );
292                 baos.write(chunk);
293             }
294 
295             data = baos.toByteArray();
296             baos.close();
297         }
298 
299         if ( log.isDebugEnabled() )
300         {
301             log.debug( "read, total post combination data.length = " + data.length );
302         }
303 
304         return elementSerializer.deSerialize( data, null );
305     }
306 
307     /**
308      * This reads the occupied data in a block.
309      * <p>
310      * The first four bytes of the record should tell us how long it is. The data is read into a
311      * byte array and then an object is constructed from the byte array.
312      * <p>
313      * @return byte[]
314      * @param block
315      * @throws IOException
316      */
317     private byte[] readBlock( int block )
318         throws IOException
319     {
320         int datalen = 0;
321 
322         String message = null;
323         boolean corrupted = false;
324         long fileLength = fc.size();
325 
326         long position = calculateByteOffsetForBlockAsLong( block );
327 //        if ( position > fileLength )
328 //        {
329 //            corrupted = true;
330 //            message = "Record " + position + " starts past EOF.";
331 //        }
332 //        else
333         {
334             ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
335             fc.read(datalength, position);
336             datalength.flip();
337             datalen = datalength.getInt();
338             if ( position + datalen > fileLength )
339             {
340                 corrupted = true;
341                 message = "Record " + position + " exceeds file length.";
342             }
343         }
344 
345         if ( corrupted )
346         {
347             log.warn( "\n The file is corrupt: " + "\n " + message );
348             throw new IOException( "The File Is Corrupt, need to reset" );
349         }
350 
351         ByteBuffer data = ByteBuffer.allocate(datalen);
352         fc.read(data, position + HEADER_SIZE_BYTES);
353         data.flip();
354 
355         return data.array();
356     }
357 
358     /**
359      * Add these blocks to the emptyBlock list.
360      * <p>
361      * @param blocksToFree
362      */
363     protected void freeBlocks( int[] blocksToFree )
364     {
365         if ( blocksToFree != null )
366         {
367             for ( short i = 0; i < blocksToFree.length; i++ )
368             {
369                 emptyBlocks.offer( Integer.valueOf( blocksToFree[i] ) );
370             }
371         }
372     }
373 
374     /**
375      * Calculates the file offset for a particular block.
376      * <p>
377      * @param block number
378      * @return the byte offset for this block in the file as an int; may overflow
379      * @deprecated (since 2.0) use {@link #calculateByteOffsetForBlockAsLong(int)} instead
380      */
381     @Deprecated
382     protected int calculateByteOffsetForBlock( int block )
383     {
384         return block * blockSizeBytes;
385     }
386 
387     /**
388      * Calculates the file offset for a particular block.
389      * <p>
390      * @param block number
391      * @return the byte offset for this block in the file as a long
392      * @since 2.0
393      */
394     protected long calculateByteOffsetForBlockAsLong( int block )
395     {
396         return (long) block * blockSizeBytes;
397     }
398 
399     /**
400      * The number of blocks needed.
401      * <p>
402      * @param data
403      * @return the number of blocks needed to store the byte array
404      */
405     protected int calculateTheNumberOfBlocksNeeded( byte[] data )
406     {
407         int dataLength = data.length;
408 
409         int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
410 
411         // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
412         if ( dataLength <= oneBlock )
413         {
414             return 1;
415         }
416 
417         int dividend = dataLength / oneBlock;
418 
419         if ( dataLength % oneBlock != 0 )
420         {
421             dividend++;
422         }
423         return dividend;
424     }
425 
426     /**
427      * Returns the file length.
428      * <p>
429      * @return the size of the file.
430      * @throws IOException
431      */
432     protected long length()
433         throws IOException
434     {
435         return fc.size();
436     }
437 
438     /**
439      * Closes the file.
440      * <p>
441      * @throws IOException
442      */
443     protected void close()
444         throws IOException
445     {
446         fc.close();
447     }
448 
449     /**
450      * Resets the file.
451      * <p>
452      * @throws IOException
453      */
454     protected synchronized void reset()
455         throws IOException
456     {
457         this.numberOfBlocks.set(0);
458         this.emptyBlocks.clear();
459         fc.truncate(0);
460         fc.force(true);
461     }
462 
463     /**
464      * @return Returns the numberOfBlocks.
465      */
466     protected int getNumberOfBlocks()
467     {
468         return numberOfBlocks.get();
469     }
470 
471     /**
472      * @return Returns the blockSizeBytes.
473      */
474     protected int getBlockSizeBytes()
475     {
476         return blockSizeBytes;
477     }
478 
479     /**
480      * @return Returns the average size of the an element inserted.
481      */
482     protected long getAveragePutSizeBytes()
483     {
484         long count = this.putCount.get();
485 
486         if (count == 0 )
487         {
488             return 0;
489         }
490         return this.putBytes.get() / count;
491     }
492 
493     /**
494      * @return Returns the number of empty blocks.
495      */
496     protected int getEmptyBlocks()
497     {
498         return this.emptyBlocks.size();
499     }
500 
501     /**
502      * For debugging only.
503      * <p>
504      * @return String with details.
505      */
506     @Override
507     public String toString()
508     {
509         StringBuilder buf = new StringBuilder();
510         buf.append( "\nBlock Disk " );
511         buf.append( "\n  Filepath [" + filepath + "]" );
512         buf.append( "\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
513         buf.append( "\n  BlockSizeBytes [" + this.blockSizeBytes + "]" );
514         buf.append( "\n  Put Bytes [" + this.putBytes + "]" );
515         buf.append( "\n  Put Count [" + this.putCount + "]" );
516         buf.append( "\n  Average Size [" + getAveragePutSizeBytes() + "]" );
517         buf.append( "\n  Empty Blocks [" + this.getEmptyBlocks() + "]" );
518         try
519         {
520             buf.append( "\n  Length [" + length() + "]" );
521         }
522         catch ( IOException e )
523         {
524             // swallow
525         }
526         return buf.toString();
527     }
528 
529     /**
530      * This is used for debugging.
531      * <p>
532      * @return the file path.
533      */
534     protected String getFilePath()
535     {
536         return filepath;
537     }
538 }