001package org.apache.commons.jcs.auxiliary.disk.block;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.ByteArrayOutputStream;
023import java.io.File;
024import java.io.IOException;
025import java.io.RandomAccessFile;
026import java.io.Serializable;
027import java.nio.ByteBuffer;
028import java.nio.channels.FileChannel;
029import java.util.concurrent.ConcurrentLinkedQueue;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicLong;
032
033import org.apache.commons.jcs.engine.behavior.IElementSerializer;
034import org.apache.commons.jcs.utils.serialization.StandardSerializer;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037
038/**
039 * This class manages reading an writing data to disk. When asked to write a value, it returns a
040 * block array. It can read an object from the block numbers in a byte array.
041 * <p>
042 * @author Aaron Smuts
043 */
044public class BlockDisk
045{
046    /** The logger */
047    private static final Log log = LogFactory.getLog( BlockDisk.class );
048
049    /** The size of the header that indicates the amount of data stored in an occupied block. */
050    public static final byte HEADER_SIZE_BYTES = 4;
051    // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt()
052
053    /** defaults to 4kb */
054    private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
055
056    /** Size of the blocks */
057    private final int blockSizeBytes;
058
059    /**
060     * the total number of blocks that have been used. If there are no free, we will use this to
061     * calculate the position of the next block.
062     */
063    private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
064
065    /** Empty blocks that can be reused. */
066    private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<Integer>();
067
068    /** The serializer. */
069    private final IElementSerializer elementSerializer;
070
071    /** Location of the spot on disk */
072    private final String filepath;
073
074    /** File channel for multiple concurrent reads and writes */
075    private final FileChannel fc;
076
077    /** How many bytes have we put to disk */
078    private final AtomicLong putBytes = new AtomicLong(0);
079
080    /** How many items have we put to disk */
081    private final AtomicLong putCount = new AtomicLong(0);
082
083    /**
084     * Constructor for the Disk object
085     * <p>
086     * @param file
087     * @param elementSerializer
088     * @throws IOException
089     */
090    public BlockDisk( File file, IElementSerializer elementSerializer )
091        throws IOException
092    {
093        this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
094    }
095
096    /**
097     * Creates the file and set the block size in bytes.
098     * <p>
099     * @param file
100     * @param blockSizeBytes
101     * @throws IOException
102     */
103    public BlockDisk( File file, int blockSizeBytes )
104        throws IOException
105    {
106        this( file, blockSizeBytes, new StandardSerializer() );
107    }
108
109    /**
110     * Creates the file and set the block size in bytes.
111     * <p>
112     * @param file
113     * @param blockSizeBytes
114     * @param elementSerializer
115     * @throws IOException
116     */
117    public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118        throws IOException
119    {
120        this.filepath = file.getAbsolutePath();
121        RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122        this.fc = raf.getChannel();
123        this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
124
125        if ( log.isInfoEnabled() )
126        {
127            log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
128        }
129
130        this.blockSizeBytes = blockSizeBytes;
131        this.elementSerializer = elementSerializer;
132    }
133
134    /**
135     * Allocate a given number of blocks from the available set
136     *
137     * @param numBlocksNeeded
138     * @return an array of allocated blocks
139     */
140    private int[] allocateBlocks(int numBlocksNeeded)
141    {
142        assert numBlocksNeeded >= 1;
143
144        int[] blocks = new int[numBlocksNeeded];
145        // get them from the empty list or take the next one
146        for (int i = 0; i < numBlocksNeeded; i++)
147        {
148            Integer emptyBlock = emptyBlocks.poll();
149            if (emptyBlock == null)
150            {
151                emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
152            }
153            blocks[i] = emptyBlock.intValue();
154        }
155
156        return blocks;
157    }
158
159    /**
160     * This writes an object to disk and returns the blocks it was stored in.
161     * <p>
162     * The program flow is as follows:
163     * <ol>
164     * <li>Serialize the object.</li>
165     * <li>Determine the number of blocks needed.</li>
166     * <li>Look for free blocks in the emptyBlock list.</li>
167     * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
168     * <li>If the data will not fit in one block, create sub arrays.</li>
169     * <li>Write the subarrays to disk.</li>
170     * <li>If the process fails we should decrement the block count if we took from it.</li>
171     * </ol>
172     * @param object
173     * @return the blocks we used.
174     * @throws IOException
175     */
176    protected int[] write( Serializable object )
177        throws IOException
178    {
179        // serialize the object
180        byte[] data = elementSerializer.serialize(object);
181
182        if ( log.isDebugEnabled() )
183        {
184            log.debug( "write, total pre-chunking data.length = " + data.length );
185        }
186
187        this.putBytes.addAndGet(data.length);
188        this.putCount.incrementAndGet();
189
190        // figure out how many blocks we need.
191        int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
192
193        if ( log.isDebugEnabled() )
194        {
195            log.debug( "numBlocksNeeded = " + numBlocksNeeded );
196        }
197
198        // allocate blocks
199        int[] blocks = allocateBlocks(numBlocksNeeded);
200
201        int offset = 0;
202        final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
203        ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
204
205        for (int i = 0; i < numBlocksNeeded; i++)
206        {
207            headerBuffer.clear();
208            int length = Math.min(maxChunkSize, data.length - offset);
209            headerBuffer.putInt(length);
210
211            ByteBuffer dataBuffer = ByteBuffer.wrap(data, offset, length);
212
213            long position = calculateByteOffsetForBlockAsLong(blocks[i]);
214            // write the header
215            headerBuffer.flip();
216            int written = fc.write(headerBuffer, position);
217            assert written == HEADER_SIZE_BYTES;
218
219            //write the data
220            written = fc.write(dataBuffer, position + HEADER_SIZE_BYTES);
221            assert written == length;
222
223            offset += length;
224        }
225
226        //fc.force(false);
227
228        return blocks;
229    }
230
231    /**
232     * Return the amount to put in each block. Fill them all the way, minus the header.
233     * <p>
234     * @param complete
235     * @param numBlocksNeeded
236     * @return byte[][]
237     */
238    protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
239    {
240        byte[][] chunks = new byte[numBlocksNeeded][];
241
242        if ( numBlocksNeeded == 1 )
243        {
244            chunks[0] = complete;
245        }
246        else
247        {
248            int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
249            int totalBytes = complete.length;
250            int totalUsed = 0;
251            for ( short i = 0; i < numBlocksNeeded; i++ )
252            {
253                // use the max that can be written to a block or whatever is left in the original
254                // array
255                int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
256                byte[] chunk = new byte[chunkSize];
257                // copy from the used position to the chunk size on the complete array to the chunk
258                // array.
259                System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
260                chunks[i] = chunk;
261                totalUsed += chunkSize;
262            }
263        }
264
265        return chunks;
266    }
267
268    /**
269     * Reads an object that is located in the specified blocks.
270     * <p>
271     * @param blockNumbers
272     * @return Serializable
273     * @throws IOException
274     * @throws ClassNotFoundException
275     */
276    protected <T extends Serializable> T read( int[] blockNumbers )
277        throws IOException, ClassNotFoundException
278    {
279        byte[] data = null;
280
281        if ( blockNumbers.length == 1 )
282        {
283            data = readBlock( blockNumbers[0] );
284        }
285        else
286        {
287            ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
288            // get all the blocks into data
289            for ( short i = 0; i < blockNumbers.length; i++ )
290            {
291                byte[] chunk = readBlock( blockNumbers[i] );
292                baos.write(chunk);
293            }
294
295            data = baos.toByteArray();
296            baos.close();
297        }
298
299        if ( log.isDebugEnabled() )
300        {
301            log.debug( "read, total post combination data.length = " + data.length );
302        }
303
304        return elementSerializer.deSerialize( data, null );
305    }
306
307    /**
308     * This reads the occupied data in a block.
309     * <p>
310     * The first four bytes of the record should tell us how long it is. The data is read into a
311     * byte array and then an object is constructed from the byte array.
312     * <p>
313     * @return byte[]
314     * @param block
315     * @throws IOException
316     */
317    private byte[] readBlock( int block )
318        throws IOException
319    {
320        int datalen = 0;
321
322        String message = null;
323        boolean corrupted = false;
324        long fileLength = fc.size();
325
326        long position = calculateByteOffsetForBlockAsLong( block );
327//        if ( position > fileLength )
328//        {
329//            corrupted = true;
330//            message = "Record " + position + " starts past EOF.";
331//        }
332//        else
333        {
334            ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
335            fc.read(datalength, position);
336            datalength.flip();
337            datalen = datalength.getInt();
338            if ( position + datalen > fileLength )
339            {
340                corrupted = true;
341                message = "Record " + position + " exceeds file length.";
342            }
343        }
344
345        if ( corrupted )
346        {
347            log.warn( "\n The file is corrupt: " + "\n " + message );
348            throw new IOException( "The File Is Corrupt, need to reset" );
349        }
350
351        ByteBuffer data = ByteBuffer.allocate(datalen);
352        fc.read(data, position + HEADER_SIZE_BYTES);
353        data.flip();
354
355        return data.array();
356    }
357
358    /**
359     * Add these blocks to the emptyBlock list.
360     * <p>
361     * @param blocksToFree
362     */
363    protected void freeBlocks( int[] blocksToFree )
364    {
365        if ( blocksToFree != null )
366        {
367            for ( short i = 0; i < blocksToFree.length; i++ )
368            {
369                emptyBlocks.offer( Integer.valueOf( blocksToFree[i] ) );
370            }
371        }
372    }
373
374    /**
375     * Calculates the file offset for a particular block.
376     * <p>
377     * @param block number
378     * @return the byte offset for this block in the file as an int; may overflow
379     * @deprecated (since 2.0) use {@link #calculateByteOffsetForBlockAsLong(int)} instead
380     */
381    @Deprecated
382    protected int calculateByteOffsetForBlock( int block )
383    {
384        return block * blockSizeBytes;
385    }
386
387    /**
388     * Calculates the file offset for a particular block.
389     * <p>
390     * @param block number
391     * @return the byte offset for this block in the file as a long
392     * @since 2.0
393     */
394    protected long calculateByteOffsetForBlockAsLong( int block )
395    {
396        return (long) block * blockSizeBytes;
397    }
398
399    /**
400     * The number of blocks needed.
401     * <p>
402     * @param data
403     * @return the number of blocks needed to store the byte array
404     */
405    protected int calculateTheNumberOfBlocksNeeded( byte[] data )
406    {
407        int dataLength = data.length;
408
409        int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
410
411        // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
412        if ( dataLength <= oneBlock )
413        {
414            return 1;
415        }
416
417        int dividend = dataLength / oneBlock;
418
419        if ( dataLength % oneBlock != 0 )
420        {
421            dividend++;
422        }
423        return dividend;
424    }
425
426    /**
427     * Returns the file length.
428     * <p>
429     * @return the size of the file.
430     * @throws IOException
431     */
432    protected long length()
433        throws IOException
434    {
435        return fc.size();
436    }
437
438    /**
439     * Closes the file.
440     * <p>
441     * @throws IOException
442     */
443    protected void close()
444        throws IOException
445    {
446        fc.close();
447    }
448
449    /**
450     * Resets the file.
451     * <p>
452     * @throws IOException
453     */
454    protected synchronized void reset()
455        throws IOException
456    {
457        this.numberOfBlocks.set(0);
458        this.emptyBlocks.clear();
459        fc.truncate(0);
460        fc.force(true);
461    }
462
463    /**
464     * @return Returns the numberOfBlocks.
465     */
466    protected int getNumberOfBlocks()
467    {
468        return numberOfBlocks.get();
469    }
470
471    /**
472     * @return Returns the blockSizeBytes.
473     */
474    protected int getBlockSizeBytes()
475    {
476        return blockSizeBytes;
477    }
478
479    /**
480     * @return Returns the average size of the an element inserted.
481     */
482    protected long getAveragePutSizeBytes()
483    {
484        long count = this.putCount.get();
485
486        if (count == 0 )
487        {
488            return 0;
489        }
490        return this.putBytes.get() / count;
491    }
492
493    /**
494     * @return Returns the number of empty blocks.
495     */
496    protected int getEmptyBlocks()
497    {
498        return this.emptyBlocks.size();
499    }
500
501    /**
502     * For debugging only.
503     * <p>
504     * @return String with details.
505     */
506    @Override
507    public String toString()
508    {
509        StringBuilder buf = new StringBuilder();
510        buf.append( "\nBlock Disk " );
511        buf.append( "\n  Filepath [" + filepath + "]" );
512        buf.append( "\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
513        buf.append( "\n  BlockSizeBytes [" + this.blockSizeBytes + "]" );
514        buf.append( "\n  Put Bytes [" + this.putBytes + "]" );
515        buf.append( "\n  Put Count [" + this.putCount + "]" );
516        buf.append( "\n  Average Size [" + getAveragePutSizeBytes() + "]" );
517        buf.append( "\n  Empty Blocks [" + this.getEmptyBlocks() + "]" );
518        try
519        {
520            buf.append( "\n  Length [" + length() + "]" );
521        }
522        catch ( IOException e )
523        {
524            // swallow
525        }
526        return buf.toString();
527    }
528
529    /**
530     * This is used for debugging.
531     * <p>
532     * @return the file path.
533     */
534    protected String getFilePath()
535    {
536        return filepath;
537    }
538}