View Javadoc

1   package org.apache.jcs.auxiliary.disk.block;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.File;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.RandomAccessFile;
27  import java.io.Serializable;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.FileChannel;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.jcs.engine.behavior.IElementSerializer;
36  import org.apache.jcs.utils.serialization.StandardSerializer;
37  import org.apache.jcs.utils.struct.SingleLinkedList;
38  
39  /**
40   * This class manages reading an writing data to disk. When asked to write a value, it returns a
41   * block array. It can read an object from the block numbers in a byte array.
42   * <p>
43   * @author Aaron Smuts
44   */
45  public class BlockDisk
46  {
47      /** The logger */
48      private static final Log log = LogFactory.getLog( BlockDisk.class );
49  
50      /** The size of the header that indicates the amount of data stored in an occupied block. */
51      public static final byte HEADER_SIZE_BYTES = 4;
52  
53      /** defaults to 4kb */
54      private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55  
56      /** Size of the blocks */
57      private int blockSizeBytes = DEFAULT_BLOCK_SIZE_BYTES;
58  
59      /**
60       * the total number of blocks that have been used. If there are no free, we will use this to
61       * calculate the position of the next block.
62       */
63      private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64  
65      /** Empty blocks that can be reused. */
66      private final SingleLinkedList<Integer> emptyBlocks = new SingleLinkedList<Integer>();
67  
68      /** The serializer. Uses a standard serializer by default. */
69      protected IElementSerializer elementSerializer = new StandardSerializer();
70  
71      /** Location of the spot on disk */
72      private final String filepath;
73  
74      /** File channel for multiple concurrent reads and writes */
75      private final FileChannel fc;
76  
77      /** How many bytes have we put to disk */
78      private final AtomicLong putBytes = new AtomicLong(0);
79  
80      /** How many items have we put to disk */
81      private final AtomicLong putCount = new AtomicLong(0);
82  
83      /**
84       * Constructor for the Disk object
85       * <p>
86       * @param file
87       * @param elementSerializer
88       * @exception FileNotFoundException
89       */
90      public BlockDisk( File file, IElementSerializer elementSerializer )
91          throws FileNotFoundException
92      {
93          this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94      }
95  
96      /**
97       * Creates the file and set the block size in bytes.
98       * <p>
99       * @param file
100      * @param blockSizeBytes
101      * @throws FileNotFoundException
102      */
103     public BlockDisk( File file, int blockSizeBytes )
104         throws FileNotFoundException
105     {
106         this( file, blockSizeBytes, new StandardSerializer() );
107     }
108 
109     /**
110      * Creates the file and set the block size in bytes.
111      * <p>
112      * @param file
113      * @param blockSizeBytes
114      * @param elementSerializer
115      * @throws FileNotFoundException
116      */
117     public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118         throws FileNotFoundException
119     {
120         this.filepath = file.getAbsolutePath();
121         RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122         this.fc = raf.getChannel();
123 
124         if ( log.isInfoEnabled() )
125         {
126             log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
127         }
128         this.blockSizeBytes = blockSizeBytes;
129         this.elementSerializer = elementSerializer;
130     }
131 
132     /**
133      * This writes an object to disk and returns the blocks it was stored in.
134      * <p>
135      * The program flow is as follows:
136      * <ol>
137      * <li>Serialize the object.</li>
138      * <li>Determine the number of blocks needed.</li>
139      * <li>Look for free blocks in the emptyBlock list.</li>
140      * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
141      * <li>If the data will not fit in one block, create sub arrays.</li>
142      * <li>Write the subarrays to disk.</li>
143      * <li>If the process fails we should decrement the block count if we took from it.</li>
144      * </ol>
145      * @param object
146      * @return the blocks we used.
147      * @throws IOException
148      */
149     protected int[] write( Serializable object )
150         throws IOException
151     {
152         // serialize the object
153         byte[] data = elementSerializer.serialize( object );
154 
155         if ( log.isDebugEnabled() )
156         {
157             log.debug( "write, total pre-chunking data.length = " + data.length );
158         }
159 
160         this.putBytes.addAndGet(data.length);
161         this.putCount.incrementAndGet();
162 
163         // figure out how many blocks we need.
164         int numBlocksNeeded = calculateTheNumberOfBlocksNeeded( data );
165         if ( log.isDebugEnabled() )
166         {
167             log.debug( "numBlocksNeeded = " + numBlocksNeeded );
168         }
169 
170         int[] blocks = new int[numBlocksNeeded];
171 
172         // get them from the empty list or take the next one
173         for ( int i = 0; i < numBlocksNeeded; i++ )
174         {
175             Integer emptyBlock = emptyBlocks.takeFirst();
176             if ( emptyBlock != null )
177             {
178                 blocks[i] = emptyBlock.intValue();
179             }
180             else
181             {
182                 blocks[i] = this.numberOfBlocks.getAndIncrement();
183             }
184         }
185 
186         // get the individual sub arrays.
187         byte[][] chunks = getBlockChunks( data, numBlocksNeeded );
188 
189         // write the blocks
190         for ( int i = 0; i < numBlocksNeeded; i++ )
191         {
192             int position = calculateByteOffsetForBlock( blocks[i] );
193             write( position, chunks[i] );
194         }
195 
196         return blocks;
197     }
198 
199     /**
200      * Return the amount to put in each block. Fill them all the way, minus the header.
201      * <p>
202      * @param complete
203      * @param numBlocksNeeded
204      * @return byte[][]
205      */
206     protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
207     {
208         byte[][] chunks = new byte[numBlocksNeeded][];
209 
210         if ( numBlocksNeeded == 1 )
211         {
212             chunks[0] = complete;
213         }
214         else
215         {
216             int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
217             int totalBytes = complete.length;
218             int totalUsed = 0;
219             for ( short i = 0; i < numBlocksNeeded; i++ )
220             {
221                 // use the max that can be written to a block or whatever is left in the original
222                 // array
223                 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
224                 byte[] chunk = new byte[chunkSize];
225                 // copy from the used position to the chunk size on the complete array to the chunk
226                 // array.
227                 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
228                 chunks[i] = chunk;
229                 totalUsed += chunkSize;
230             }
231         }
232 
233         return chunks;
234     }
235 
236     /**
237      * Writes the given byte array to the Disk at the specified position.
238      * <p>
239      * @param position
240      * @param data
241      * @return true if we wrote successfully
242      * @throws IOException
243      */
244     private boolean write( long position, byte[] data )
245         throws IOException
246     {
247         ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE_BYTES + data.length);
248         buffer.putInt(data.length);
249         buffer.put(data);
250         buffer.flip();
251         int written = fc.write(buffer, position);
252 
253         return written == data.length;
254     }
255 
256     /**
257      * Reads an object that is located in the specified blocks.
258      * <p>
259      * @param blockNumbers
260      * @return Serializable
261      * @throws IOException
262      * @throws ClassNotFoundException
263      */
264     protected Serializable read( int[] blockNumbers )
265         throws IOException, ClassNotFoundException
266     {
267         byte[] data = null;
268 
269         if ( blockNumbers.length == 1 )
270         {
271             data = readBlock( blockNumbers[0] );
272         }
273         else
274         {
275             ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
276             // get all the blocks into data
277             for ( short i = 0; i < blockNumbers.length; i++ )
278             {
279                 byte[] chunk = readBlock( blockNumbers[i] );
280                 baos.write(chunk);
281             }
282 
283             data = baos.toByteArray();
284             baos.close();
285         }
286 
287         if ( log.isDebugEnabled() )
288         {
289             log.debug( "read, total post combination data.length = " + data.length );
290         }
291 
292         return (Serializable) elementSerializer.deSerialize( data );
293     }
294 
295     /**
296      * This reads the occupied data in a block.
297      * <p>
298      * The first four bytes of the record should tell us how long it is. The data is read into a
299      * byte array and then an object is constructed from the byte array.
300      * <p>
301      * @return byte[]
302      * @param block
303      * @throws IOException
304      */
305     private byte[] readBlock( int block )
306         throws IOException
307     {
308         int datalen = 0;
309 
310         String message = null;
311         boolean corrupted = false;
312         long fileLength = fc.size();
313 
314         int position = calculateByteOffsetForBlock( block );
315         if ( position > fileLength )
316         {
317             corrupted = true;
318             message = "Record " + position + " starts past EOF.";
319         }
320         else
321         {
322             ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
323             fc.read(datalength, position);
324             datalength.flip();
325             datalen = datalength.getInt();
326             if ( position + datalen > fileLength )
327             {
328                 corrupted = true;
329                 message = "Record " + position + " exceeds file length.";
330             }
331         }
332 
333         if ( corrupted )
334         {
335             log.warn( "\n The file is corrupt: " + "\n " + message );
336             throw new IOException( "The File Is Corrupt, need to reset" );
337         }
338 
339         ByteBuffer data = ByteBuffer.allocate(datalen);
340         fc.read(data, position + HEADER_SIZE_BYTES);
341         data.flip();
342 
343         return data.array();
344     }
345 
346     /**
347      * Add these blocks to the emptyBlock list.
348      * <p>
349      * @param blocksToFree
350      */
351     protected void freeBlocks( int[] blocksToFree )
352     {
353         if ( blocksToFree != null )
354         {
355             for ( short i = 0; i < blocksToFree.length; i++ )
356             {
357                 emptyBlocks.addLast( Integer.valueOf( blocksToFree[i] ) );
358             }
359         }
360     }
361 
362     /**
363      * Calculates the file offset for a particular block.
364      * <p>
365      * @param block
366      * @return the offset for this block
367      */
368     protected int calculateByteOffsetForBlock( int block )
369     {
370         return block * blockSizeBytes;
371     }
372 
373     /**
374      * The number of blocks needed.
375      * <p>
376      * @param data
377      * @return the number of blocks needed to store the byte array
378      */
379     protected int calculateTheNumberOfBlocksNeeded( byte[] data )
380     {
381         int dataLength = data.length;
382 
383         int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
384 
385         // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
386         if ( dataLength <= oneBlock )
387         {
388             return 1;
389         }
390 
391         int dividend = dataLength / oneBlock;
392 
393         if ( dataLength % oneBlock != 0 )
394         {
395             dividend++;
396         }
397         return dividend;
398     }
399 
400     /**
401      * Returns the file length.
402      * <p>
403      * @return the size of the file.
404      * @exception IOException
405      */
406     protected long length()
407         throws IOException
408     {
409         return fc.size();
410     }
411 
412     /**
413      * Closes the file.
414      * <p>
415      * @exception IOException
416      */
417     protected void close()
418         throws IOException
419     {
420         fc.close();
421     }
422 
423     /**
424      * Resets the file.
425      * <p>
426      * @exception IOException
427      */
428     protected void reset()
429         throws IOException
430     {
431         this.numberOfBlocks.set(0);
432         this.emptyBlocks.clear();
433         fc.truncate(0);
434         fc.force(true);
435     }
436 
437     /**
438      * @return Returns the numberOfBlocks.
439      */
440     protected int getNumberOfBlocks()
441     {
442         return numberOfBlocks.get();
443     }
444 
445     /**
446      * @return Returns the blockSizeBytes.
447      */
448     protected int getBlockSizeBytes()
449     {
450         return blockSizeBytes;
451     }
452 
453     /**
454      * @return Returns the average size of the an element inserted.
455      */
456     protected long getAveragePutSizeBytes()
457     {
458         long count = this.putCount.get();
459 
460         if (count == 0 )
461         {
462             return 0;
463         }
464         return this.putBytes.get() / count;
465     }
466 
467     /**
468      * @return Returns the number of empty blocks.
469      */
470     protected int getEmptyBlocks()
471     {
472         return this.emptyBlocks.size();
473     }
474 
475     /**
476      * For debugging only.
477      * <p>
478      * @return String with details.
479      */
480     @Override
481     public String toString()
482     {
483         StringBuffer buf = new StringBuffer();
484         buf.append( "\nBlock Disk " );
485         buf.append( "\n  Filepath [" + filepath + "]" );
486         buf.append( "\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
487         buf.append( "\n  BlockSizeBytes [" + this.blockSizeBytes + "]" );
488         buf.append( "\n  Put Bytes [" + this.putBytes + "]" );
489         buf.append( "\n  Put Count [" + this.putCount + "]" );
490         buf.append( "\n  Average Size [" + getAveragePutSizeBytes() + "]" );
491         buf.append( "\n  Empty Blocks [" + this.getEmptyBlocks() + "]" );
492         try
493         {
494             buf.append( "\n  Length [" + length() + "]" );
495         }
496         catch ( IOException e )
497         {
498             // swallow
499         }
500         return buf.toString();
501     }
502 
503     /**
504      * This is used for debugging.
505      * <p>
506      * @return the file path.
507      */
508     protected String getFilePath()
509     {
510         return filepath;
511     }
512 }