001package org.apache.commons.jcs3.auxiliary.disk.block;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.File;
023import java.io.IOException;
024import java.nio.ByteBuffer;
025import java.nio.channels.FileChannel;
026import java.nio.file.StandardOpenOption;
027import java.util.concurrent.ConcurrentLinkedQueue;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.atomic.AtomicLong;
030
031import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
032import org.apache.commons.jcs3.log.Log;
033import org.apache.commons.jcs3.log.LogManager;
034import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
035
036/**
037 * This class manages reading an writing data to disk. When asked to write a value, it returns a
038 * block array. It can read an object from the block numbers in a byte array.
039 */
040public class BlockDisk implements AutoCloseable
041{
042    /** The logger */
043    private static final Log log = LogManager.getLog(BlockDisk.class);
044
045    /** The size of the header that indicates the amount of data stored in an occupied block. */
046    public static final byte HEADER_SIZE_BYTES = 4;
047    // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt()
048
049    /** defaults to 4kb */
050    private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
051
052    /** Size of the blocks */
053    private final int blockSizeBytes;
054
055    /**
056     * the total number of blocks that have been used. If there are no free, we will use this to
057     * calculate the position of the next block.
058     */
059    private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
060
061    /** Empty blocks that can be reused. */
062    private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>();
063
064    /** The serializer. */
065    private final IElementSerializer elementSerializer;
066
067    /** Location of the spot on disk */
068    private final String filepath;
069
070    /** File channel for multiple concurrent reads and writes */
071    private final FileChannel fc;
072
073    /** How many bytes have we put to disk */
074    private final AtomicLong putBytes = new AtomicLong();
075
076    /** How many items have we put to disk */
077    private final AtomicLong putCount = new AtomicLong();
078
079    /**
080     * Constructor for the Disk object
081     * <p>
082     * @param file
083     * @param elementSerializer
084     * @throws IOException
085     */
086    public BlockDisk(final File file, final IElementSerializer elementSerializer)
087        throws IOException
088    {
089        this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer);
090    }
091
092    /**
093     * Creates the file and set the block size in bytes.
094     * <p>
095     * @param file
096     * @param blockSizeBytes
097     * @throws IOException
098     */
099    public BlockDisk(final File file, final int blockSizeBytes)
100        throws IOException
101    {
102        this(file, blockSizeBytes, new StandardSerializer());
103    }
104
105    /**
106     * Creates the file and set the block size in bytes.
107     * <p>
108     * @param file
109     * @param blockSizeBytes
110     * @param elementSerializer
111     * @throws IOException
112     */
113    public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer)
114        throws IOException
115    {
116        this.filepath = file.getAbsolutePath();
117        this.fc = FileChannel.open(file.toPath(),
118                StandardOpenOption.CREATE,
119                StandardOpenOption.READ,
120                StandardOpenOption.WRITE);
121        this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
122
123        log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes);
124
125        this.blockSizeBytes = blockSizeBytes;
126        this.elementSerializer = elementSerializer;
127    }
128
129    /**
130     * Allocate a given number of blocks from the available set
131     *
132     * @param numBlocksNeeded
133     * @return an array of allocated blocks
134     */
135    private int[] allocateBlocks(final int numBlocksNeeded)
136    {
137        assert numBlocksNeeded >= 1;
138
139        final int[] blocks = new int[numBlocksNeeded];
140        // get them from the empty list or take the next one
141        for (int i = 0; i < numBlocksNeeded; i++)
142        {
143            Integer emptyBlock = emptyBlocks.poll();
144            if (emptyBlock == null)
145            {
146                emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
147            }
148            blocks[i] = emptyBlock.intValue();
149        }
150
151        return blocks;
152    }
153
154    /**
155     * This writes an object to disk and returns the blocks it was stored in.
156     * <p>
157     * The program flow is as follows:
158     * <ol>
159     * <li>Serialize the object.</li>
160     * <li>Determine the number of blocks needed.</li>
161     * <li>Look for free blocks in the emptyBlock list.</li>
162     * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li>
163     * <li>If the data will not fit in one block, create sub arrays.</li>
164     * <li>Write the subarrays to disk.</li>
165     * <li>If the process fails we should decrement the block count if we took from it.</li>
166     * </ol>
167     * @param object
168     * @return the blocks we used.
169     * @throws IOException
170     */
171    protected <T> int[] write(final T object)
172        throws IOException
173    {
174        // serialize the object
175        final byte[] data = elementSerializer.serialize(object);
176
177        log.debug("write, total pre-chunking data.length = {0}", data.length);
178
179        this.putBytes.addAndGet(data.length);
180        this.putCount.incrementAndGet();
181
182        // figure out how many blocks we need.
183        final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
184
185        log.debug("numBlocksNeeded = {0}", numBlocksNeeded);
186
187        // allocate blocks
188        final int[] blocks = allocateBlocks(numBlocksNeeded);
189
190        int offset = 0;
191        final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
192        final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
193        final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
194
195        for (int i = 0; i < numBlocksNeeded; i++)
196        {
197            headerBuffer.clear();
198            final int length = Math.min(maxChunkSize, data.length - offset);
199            headerBuffer.putInt(length);
200            headerBuffer.flip();
201
202            dataBuffer.position(offset).limit(offset + length);
203            final ByteBuffer slice = dataBuffer.slice();
204
205            final long position = calculateByteOffsetForBlockAsLong(blocks[i]);
206            // write the header
207            int written = fc.write(headerBuffer, position);
208            assert written == HEADER_SIZE_BYTES;
209
210            //write the data
211            written = fc.write(slice, position + HEADER_SIZE_BYTES);
212            assert written == length;
213
214            offset += length;
215        }
216
217        //fc.force(false);
218
219        return blocks;
220    }
221
222    /**
223     * Return the amount to put in each block. Fill them all the way, minus the header.
224     * <p>
225     * @param complete
226     * @param numBlocksNeeded
227     * @return byte[][]
228     */
229    protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded)
230    {
231        final byte[][] chunks = new byte[numBlocksNeeded][];
232
233        if (numBlocksNeeded == 1)
234        {
235            chunks[0] = complete;
236        }
237        else
238        {
239            final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
240            final int totalBytes = complete.length;
241            int totalUsed = 0;
242            for (short i = 0; i < numBlocksNeeded; i++)
243            {
244                // use the max that can be written to a block or whatever is left in the original
245                // array
246                final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed);
247                final byte[] chunk = new byte[chunkSize];
248                // copy from the used position to the chunk size on the complete array to the chunk
249                // array.
250                System.arraycopy(complete, totalUsed, chunk, 0, chunkSize);
251                chunks[i] = chunk;
252                totalUsed += chunkSize;
253            }
254        }
255
256        return chunks;
257    }
258
259    /**
260     * Reads an object that is located in the specified blocks.
261     * <p>
262     * @param blockNumbers
263     * @return the object instance
264     * @throws IOException
265     * @throws ClassNotFoundException
266     */
267    protected <T> T read(final int[] blockNumbers)
268        throws IOException, ClassNotFoundException
269    {
270        final ByteBuffer data;
271
272        if (blockNumbers.length == 1)
273        {
274            data = readBlock(blockNumbers[0]);
275        }
276        else
277        {
278            data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes());
279            // get all the blocks into data
280            for (short i = 0; i < blockNumbers.length; i++)
281            {
282                final ByteBuffer chunk = readBlock(blockNumbers[i]);
283                data.put(chunk);
284            }
285
286            data.flip();
287        }
288
289        log.debug("read, total post combination data.length = {0}", () -> data.limit());
290
291        return elementSerializer.deSerialize(data.array(), null);
292    }
293
294    /**
295     * This reads the occupied data in a block.
296     * <p>
297     * The first four bytes of the record should tell us how long it is. The data is read into a
298     * byte array and then an object is constructed from the byte array.
299     * <p>
300     * @return byte[]
301     * @param block
302     * @throws IOException
303     */
304    private ByteBuffer readBlock(final int block)
305        throws IOException
306    {
307        int datalen = 0;
308
309        String message = null;
310        boolean corrupted = false;
311        final long fileLength = fc.size();
312
313        final long position = calculateByteOffsetForBlockAsLong(block);
314//        if (position > fileLength)
315//        {
316//            corrupted = true;
317//            message = "Record " + position + " starts past EOF.";
318//        }
319//        else
320        {
321            final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
322            fc.read(datalength, position);
323            datalength.flip();
324            datalen = datalength.getInt();
325            if (position + datalen > fileLength)
326            {
327                corrupted = true;
328                message = "Record " + position + " exceeds file length.";
329            }
330        }
331
332        if (corrupted)
333        {
334            log.warn("\n The file is corrupt: \n {0}", message);
335            throw new IOException("The File Is Corrupt, need to reset");
336        }
337
338        final ByteBuffer data = ByteBuffer.allocate(datalen);
339        fc.read(data, position + HEADER_SIZE_BYTES);
340        data.flip();
341
342        return data;
343    }
344
345    /**
346     * Add these blocks to the emptyBlock list.
347     * <p>
348     * @param blocksToFree
349     */
350    protected void freeBlocks(final int[] blocksToFree)
351    {
352        if (blocksToFree != null)
353        {
354            for (short i = 0; i < blocksToFree.length; i++)
355            {
356                emptyBlocks.offer(Integer.valueOf(blocksToFree[i]));
357            }
358        }
359    }
360
361    /**
362     * Calculates the file offset for a particular block.
363     * <p>
364     * @param block number
365     * @return the byte offset for this block in the file as a long
366     * @since 2.0
367     */
368    protected long calculateByteOffsetForBlockAsLong(final int block)
369    {
370        return (long) block * blockSizeBytes;
371    }
372
373    /**
374     * The number of blocks needed.
375     * <p>
376     * @param data
377     * @return the number of blocks needed to store the byte array
378     */
379    protected int calculateTheNumberOfBlocksNeeded(final byte[] data)
380    {
381        final int dataLength = data.length;
382
383        final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
384
385        // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes
386        if (dataLength <= oneBlock)
387        {
388            return 1;
389        }
390
391        int dividend = dataLength / oneBlock;
392
393        if (dataLength % oneBlock != 0)
394        {
395            dividend++;
396        }
397        return dividend;
398    }
399
400    /**
401     * Returns the file length.
402     * <p>
403     * @return the size of the file.
404     * @throws IOException
405     */
406    protected long length()
407        throws IOException
408    {
409        return fc.size();
410    }
411
412    /**
413     * Closes the file.
414     * <p>
415     * @throws IOException
416     */
417    @Override
418    public void close()
419        throws IOException
420    {
421        this.numberOfBlocks.set(0);
422        this.emptyBlocks.clear();
423        fc.close();
424    }
425
426    /**
427     * Resets the file.
428     * <p>
429     * @throws IOException
430     */
431    protected synchronized void reset()
432        throws IOException
433    {
434        this.numberOfBlocks.set(0);
435        this.emptyBlocks.clear();
436        fc.truncate(0);
437        fc.force(true);
438    }
439
440    /**
441     * @return Returns the numberOfBlocks.
442     */
443    protected int getNumberOfBlocks()
444    {
445        return numberOfBlocks.get();
446    }
447
448    /**
449     * @return Returns the blockSizeBytes.
450     */
451    protected int getBlockSizeBytes()
452    {
453        return blockSizeBytes;
454    }
455
456    /**
457     * @return Returns the average size of the an element inserted.
458     */
459    protected long getAveragePutSizeBytes()
460    {
461        final long count = this.putCount.get();
462
463        if (count == 0)
464        {
465            return 0;
466        }
467        return this.putBytes.get() / count;
468    }
469
470    /**
471     * @return Returns the number of empty blocks.
472     */
473    protected int getEmptyBlocks()
474    {
475        return this.emptyBlocks.size();
476    }
477
478    /**
479     * For debugging only.
480     * <p>
481     * @return String with details.
482     */
483    @Override
484    public String toString()
485    {
486        final StringBuilder buf = new StringBuilder();
487        buf.append("\nBlock Disk ");
488        buf.append("\n  Filepath [" + filepath + "]");
489        buf.append("\n  NumberOfBlocks [" + this.numberOfBlocks.get() + "]");
490        buf.append("\n  BlockSizeBytes [" + this.blockSizeBytes + "]");
491        buf.append("\n  Put Bytes [" + this.putBytes + "]");
492        buf.append("\n  Put Count [" + this.putCount + "]");
493        buf.append("\n  Average Size [" + getAveragePutSizeBytes() + "]");
494        buf.append("\n  Empty Blocks [" + this.getEmptyBlocks() + "]");
495        try
496        {
497            buf.append("\n  Length [" + length() + "]");
498        }
499        catch (final IOException e)
500        {
501            // swallow
502        }
503        return buf.toString();
504    }
505
506    /**
507     * This is used for debugging.
508     * <p>
509     * @return the file path.
510     */
511    protected String getFilePath()
512    {
513        return filepath;
514    }
515}