View Javadoc

1   package org.apache.jcs.auxiliary.disk.block;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.File;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.RandomAccessFile;
27  import java.io.Serializable;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.FileChannel;
30  import java.util.concurrent.atomic.AtomicInteger;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.jcs.engine.behavior.IElementSerializer;
36  import org.apache.jcs.utils.serialization.StandardSerializer;
37  import org.apache.jcs.utils.struct.SingleLinkedList;
38  
39  /**
40   * This class manages reading an writing data to disk. When asked to write a value, it returns a
41   * block array. It can read an object from the block numbers in a byte array.
42   * <p>
43   * @author Aaron Smuts
44   */
45  public class BlockDisk
46  {
47      /** The logger */
48      private static final Log log = LogFactory.getLog( BlockDisk.class );
49  
50      /** The size of the header that indicates the amount of data stored in an occupied block. */
51      public static final byte HEADER_SIZE_BYTES = 4;
52  
53      /** defaults to 4kb */
54      private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55  
56      /** Size of the blocks */
57      private int blockSizeBytes;
58  
59      /**
60       * the total number of blocks that have been used. If there are no free, we will use this to
61       * calculate the position of the next block.
62       */
63      private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64  
65      /** Empty blocks that can be reused. */
66      private final SingleLinkedList<Integer> emptyBlocks = new SingleLinkedList<Integer>();
67  
68      /** The serializer. */
69      protected IElementSerializer elementSerializer;
70  
71      /** Location of the spot on disk */
72      private final String filepath;
73  
74      /** File channel for multiple concurrent reads and writes */
75      private final FileChannel fc;
76  
77      /** How many bytes have we put to disk */
78      private final AtomicLong putBytes = new AtomicLong(0);
79  
80      /** How many items have we put to disk */
81      private final AtomicLong putCount = new AtomicLong(0);
82  
83      /**
84       * Constructor for the Disk object
85       * <p>
86       * @param file
87       * @param elementSerializer
88       * @exception FileNotFoundException
89       */
90      public BlockDisk( File file, IElementSerializer elementSerializer )
91          throws FileNotFoundException
92      {
93          this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94      }
95  
96      /**
97       * Creates the file and set the block size in bytes.
98       * <p>
99       * @param file
100      * @param blockSizeBytes
101      * @throws FileNotFoundException
102      */
103     public BlockDisk( File file, int blockSizeBytes )
104         throws FileNotFoundException
105     {
106         this( file, blockSizeBytes, new StandardSerializer() );
107     }
108 
109     /**
110      * Creates the file and set the block size in bytes.
111      * <p>
112      * @param file
113      * @param blockSizeBytes
114      * @param elementSerializer
115      * @throws FileNotFoundException
116      */
117     public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118         throws FileNotFoundException
119     {
120         this.filepath = file.getAbsolutePath();
121         RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122         this.fc = raf.getChannel();
123 
124         if ( log.isInfoEnabled() )
125         {
126             log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
127         }
128 
129         this.blockSizeBytes = blockSizeBytes;
130         this.elementSerializer = elementSerializer;
131     }
132 
133     /**
134      * This writes an object to disk and returns the blocks it was stored in.
135      * <p>
136      * The program flow is as follows:
137      * <ol>
138      * <li>Serialize the object.</li>
139      * <li>Determine the number of blocks needed.</li>
140      * <li>Look for free blocks in the emptyBlock list.</li>
141      * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
142      * <li>If the data will not fit in one block, create sub arrays.</li>
143      * <li>Write the subarrays to disk.</li>
144      * <li>If the process fails we should decrement the block count if we took from it.</li>
145      * </ol>
146      * @param object
147      * @return the blocks we used.
148      * @throws IOException
149      */
150     protected int[] write( Serializable object )
151         throws IOException
152     {
153         // serialize the object
154         byte[] data = elementSerializer.serialize( object );
155 
156         if ( log.isDebugEnabled() )
157         {
158             log.debug( "write, total pre-chunking data.length = " + data.length );
159         }
160 
161         this.putBytes.addAndGet(data.length);
162         this.putCount.incrementAndGet();
163 
164         // figure out how many blocks we need.
165         int numBlocksNeeded = calculateTheNumberOfBlocksNeeded( data );
166         if ( log.isDebugEnabled() )
167         {
168             log.debug( "numBlocksNeeded = " + numBlocksNeeded );
169         }
170 
171         int[] blocks = new int[numBlocksNeeded];
172 
173         // get them from the empty list or take the next one
174         for ( int i = 0; i < numBlocksNeeded; i++ )
175         {
176             Integer emptyBlock = emptyBlocks.takeFirst();
177             if ( emptyBlock != null )
178             {
179                 blocks[i] = emptyBlock.intValue();
180             }
181             else
182             {
183                 blocks[i] = this.numberOfBlocks.getAndIncrement();
184             }
185         }
186 
187         // get the individual sub arrays.
188         byte[][] chunks = getBlockChunks( data, numBlocksNeeded );
189 
190         // write the blocks
191         for ( int i = 0; i < numBlocksNeeded; i++ )
192         {
193             int position = calculateByteOffsetForBlock( blocks[i] );
194             write( position, chunks[i] );
195         }
196 
197         return blocks;
198     }
199 
200     /**
201      * Return the amount to put in each block. Fill them all the way, minus the header.
202      * <p>
203      * @param complete
204      * @param numBlocksNeeded
205      * @return byte[][]
206      */
207     protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
208     {
209         byte[][] chunks = new byte[numBlocksNeeded][];
210 
211         if ( numBlocksNeeded == 1 )
212         {
213             chunks[0] = complete;
214         }
215         else
216         {
217             int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
218             int totalBytes = complete.length;
219             int totalUsed = 0;
220             for ( short i = 0; i < numBlocksNeeded; i++ )
221             {
222                 // use the max that can be written to a block or whatever is left in the original
223                 // array
224                 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
225                 byte[] chunk = new byte[chunkSize];
226                 // copy from the used position to the chunk size on the complete array to the chunk
227                 // array.
228                 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
229                 chunks[i] = chunk;
230                 totalUsed += chunkSize;
231             }
232         }
233 
234         return chunks;
235     }
236 
237     /**
238      * Writes the given byte array to the Disk at the specified position.
239      * <p>
240      * @param position
241      * @param data
242      * @return true if we wrote successfully
243      * @throws IOException
244      */
245     private boolean write( long position, byte[] data )
246         throws IOException
247     {
248         ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE_BYTES + data.length);
249         buffer.putInt(data.length);
250         buffer.put(data);
251         buffer.flip();
252         int written = fc.write(buffer, position);
253         fc.force(true);
254 
255         return written == data.length;
256     }
257 
258     /**
259      * Reads an object that is located in the specified blocks.
260      * <p>
261      * @param blockNumbers
262      * @return Serializable
263      * @throws IOException
264      * @throws ClassNotFoundException
265      */
266     protected <T extends Serializable> T read( int[] blockNumbers )
267         throws IOException, ClassNotFoundException
268     {
269         byte[] data = null;
270 
271         if ( blockNumbers.length == 1 )
272         {
273             data = readBlock( blockNumbers[0] );
274         }
275         else
276         {
277             ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
278             // get all the blocks into data
279             for ( short i = 0; i < blockNumbers.length; i++ )
280             {
281                 byte[] chunk = readBlock( blockNumbers[i] );
282                 baos.write(chunk);
283             }
284 
285             data = baos.toByteArray();
286             baos.close();
287         }
288 
289         if ( log.isDebugEnabled() )
290         {
291             log.debug( "read, total post combination data.length = " + data.length );
292         }
293 
294         return elementSerializer.deSerialize( data );
295     }
296 
297     /**
298      * This reads the occupied data in a block.
299      * <p>
300      * The first four bytes of the record should tell us how long it is. The data is read into a
301      * byte array and then an object is constructed from the byte array.
302      * <p>
303      * @return byte[]
304      * @param block
305      * @throws IOException
306      */
307     private byte[] readBlock( int block )
308         throws IOException
309     {
310         int datalen = 0;
311 
312         String message = null;
313         boolean corrupted = false;
314         long fileLength = fc.size();
315 
316         int position = calculateByteOffsetForBlock( block );
317 //        if ( position > fileLength )
318 //        {
319 //            corrupted = true;
320 //            message = "Record " + position + " starts past EOF.";
321 //        }
322 //        else
323         {
324             ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
325             fc.read(datalength, position);
326             datalength.flip();
327             datalen = datalength.getInt();
328             if ( position + datalen > fileLength )
329             {
330                 corrupted = true;
331                 message = "Record " + position + " exceeds file length.";
332             }
333         }
334 
335         if ( corrupted )
336         {
337             log.warn( "\n The file is corrupt: " + "\n " + message );
338             throw new IOException( "The File Is Corrupt, need to reset" );
339         }
340 
341         ByteBuffer data = ByteBuffer.allocate(datalen);
342         fc.read(data, position + HEADER_SIZE_BYTES);
343         data.flip();
344 
345         return data.array();
346     }
347 
348     /**
349      * Add these blocks to the emptyBlock list.
350      * <p>
351      * @param blocksToFree
352      */
353     protected void freeBlocks( int[] blocksToFree )
354     {
355         if ( blocksToFree != null )
356         {
357             for ( short i = 0; i < blocksToFree.length; i++ )
358             {
359                 emptyBlocks.addLast( Integer.valueOf( blocksToFree[i] ) );
360             }
361         }
362     }
363 
364     /**
365      * Calculates the file offset for a particular block.
366      * <p>
367      * @param block
368      * @return the offset for this block
369      */
370     protected int calculateByteOffsetForBlock( int block )
371     {
372         return block * blockSizeBytes;
373     }
374 
375     /**
376      * The number of blocks needed.
377      * <p>
378      * @param data
379      * @return the number of blocks needed to store the byte array
380      */
381     protected int calculateTheNumberOfBlocksNeeded( byte[] data )
382     {
383         int dataLength = data.length;
384 
385         int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
386 
387         // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
388         if ( dataLength <= oneBlock )
389         {
390             return 1;
391         }
392 
393         int dividend = dataLength / oneBlock;
394 
395         if ( dataLength % oneBlock != 0 )
396         {
397             dividend++;
398         }
399         return dividend;
400     }
401 
402     /**
403      * Returns the file length.
404      * <p>
405      * @return the size of the file.
406      * @exception IOException
407      */
408     protected long length()
409         throws IOException
410     {
411         return fc.size();
412     }
413 
414     /**
415      * Closes the file.
416      * <p>
417      * @exception IOException
418      */
419     protected void close()
420         throws IOException
421     {
422         fc.close();
423     }
424 
425     /**
426      * Resets the file.
427      * <p>
428      * @exception IOException
429      */
430     protected synchronized void reset()
431         throws IOException
432     {
433         this.numberOfBlocks.set(0);
434         this.emptyBlocks.clear();
435         fc.truncate(0);
436         fc.force(true);
437     }
438 
439     /**
440      * @return Returns the numberOfBlocks.
441      */
442     protected int getNumberOfBlocks()
443     {
444         return numberOfBlocks.get();
445     }
446 
447     /**
448      * @return Returns the blockSizeBytes.
449      */
450     protected int getBlockSizeBytes()
451     {
452         return blockSizeBytes;
453     }
454 
455     /**
456      * @return Returns the average size of the an element inserted.
457      */
458     protected long getAveragePutSizeBytes()
459     {
460         long count = this.putCount.get();
461 
462         if (count == 0 )
463         {
464             return 0;
465         }
466         return this.putBytes.get() / count;
467     }
468 
469     /**
470      * @return Returns the number of empty blocks.
471      */
472     protected int getEmptyBlocks()
473     {
474         return this.emptyBlocks.size();
475     }
476 
477     /**
478      * For debugging only.
479      * <p>
480      * @return String with details.
481      */
482     @Override
483     public String toString()
484     {
485         StringBuffer buf = new StringBuffer();
486         buf.append( "\nBlock Disk " );
487         buf.append( "\n  Filepath [" + filepath + "]" );
488         buf.append( "\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
489         buf.append( "\n  BlockSizeBytes [" + this.blockSizeBytes + "]" );
490         buf.append( "\n  Put Bytes [" + this.putBytes + "]" );
491         buf.append( "\n  Put Count [" + this.putCount + "]" );
492         buf.append( "\n  Average Size [" + getAveragePutSizeBytes() + "]" );
493         buf.append( "\n  Empty Blocks [" + this.getEmptyBlocks() + "]" );
494         try
495         {
496             buf.append( "\n  Length [" + length() + "]" );
497         }
498         catch ( IOException e )
499         {
500             // swallow
501         }
502         return buf.toString();
503     }
504 
505     /**
506      * This is used for debugging.
507      * <p>
508      * @return the file path.
509      */
510     protected String getFilePath()
511     {
512         return filepath;
513     }
514 }