001package org.apache.commons.jcs3.auxiliary.disk.block; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.File; 023import java.io.IOException; 024import java.nio.ByteBuffer; 025import java.nio.channels.FileChannel; 026import java.nio.file.StandardOpenOption; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.atomic.AtomicInteger; 029import java.util.concurrent.atomic.AtomicLong; 030 031import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 032import org.apache.commons.jcs3.log.Log; 033import org.apache.commons.jcs3.log.LogManager; 034import org.apache.commons.jcs3.utils.serialization.StandardSerializer; 035 036/** 037 * This class manages reading an writing data to disk. When asked to write a value, it returns a 038 * block array. It can read an object from the block numbers in a byte array. 039 */ 040public class BlockDisk implements AutoCloseable 041{ 042 /** The logger */ 043 private static final Log log = LogManager.getLog(BlockDisk.class); 044 045 /** The size of the header that indicates the amount of data stored in an occupied block. */ 046 public static final byte HEADER_SIZE_BYTES = 4; 047 // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt() 048 049 /** defaults to 4kb */ 050 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024; 051 052 /** Size of the blocks */ 053 private final int blockSizeBytes; 054 055 /** 056 * the total number of blocks that have been used. If there are no free, we will use this to 057 * calculate the position of the next block. 058 */ 059 private final AtomicInteger numberOfBlocks = new AtomicInteger(0); 060 061 /** Empty blocks that can be reused. */ 062 private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>(); 063 064 /** The serializer. */ 065 private final IElementSerializer elementSerializer; 066 067 /** Location of the spot on disk */ 068 private final String filepath; 069 070 /** File channel for multiple concurrent reads and writes */ 071 private final FileChannel fc; 072 073 /** How many bytes have we put to disk */ 074 private final AtomicLong putBytes = new AtomicLong(); 075 076 /** How many items have we put to disk */ 077 private final AtomicLong putCount = new AtomicLong(); 078 079 /** 080 * Constructor for the Disk object 081 * <p> 082 * @param file 083 * @param elementSerializer 084 * @throws IOException 085 */ 086 public BlockDisk(final File file, final IElementSerializer elementSerializer) 087 throws IOException 088 { 089 this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer); 090 } 091 092 /** 093 * Creates the file and set the block size in bytes. 094 * <p> 095 * @param file 096 * @param blockSizeBytes 097 * @throws IOException 098 */ 099 public BlockDisk(final File file, final int blockSizeBytes) 100 throws IOException 101 { 102 this(file, blockSizeBytes, new StandardSerializer()); 103 } 104 105 /** 106 * Creates the file and set the block size in bytes. 107 * <p> 108 * @param file 109 * @param blockSizeBytes 110 * @param elementSerializer 111 * @throws IOException 112 */ 113 public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer) 114 throws IOException 115 { 116 this.filepath = file.getAbsolutePath(); 117 this.fc = FileChannel.open(file.toPath(), 118 StandardOpenOption.CREATE, 119 StandardOpenOption.READ, 120 StandardOpenOption.WRITE); 121 this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes)); 122 123 log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes); 124 125 this.blockSizeBytes = blockSizeBytes; 126 this.elementSerializer = elementSerializer; 127 } 128 129 /** 130 * Allocate a given number of blocks from the available set 131 * 132 * @param numBlocksNeeded 133 * @return an array of allocated blocks 134 */ 135 private int[] allocateBlocks(final int numBlocksNeeded) 136 { 137 assert numBlocksNeeded >= 1; 138 139 final int[] blocks = new int[numBlocksNeeded]; 140 // get them from the empty list or take the next one 141 for (int i = 0; i < numBlocksNeeded; i++) 142 { 143 Integer emptyBlock = emptyBlocks.poll(); 144 if (emptyBlock == null) 145 { 146 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement()); 147 } 148 blocks[i] = emptyBlock.intValue(); 149 } 150 151 return blocks; 152 } 153 154 /** 155 * This writes an object to disk and returns the blocks it was stored in. 156 * <p> 157 * The program flow is as follows: 158 * <ol> 159 * <li>Serialize the object.</li> 160 * <li>Determine the number of blocks needed.</li> 161 * <li>Look for free blocks in the emptyBlock list.</li> 162 * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li> 163 * <li>If the data will not fit in one block, create sub arrays.</li> 164 * <li>Write the subarrays to disk.</li> 165 * <li>If the process fails we should decrement the block count if we took from it.</li> 166 * </ol> 167 * @param object 168 * @return the blocks we used. 169 * @throws IOException 170 */ 171 protected <T> int[] write(final T object) 172 throws IOException 173 { 174 // serialize the object 175 final byte[] data = elementSerializer.serialize(object); 176 177 log.debug("write, total pre-chunking data.length = {0}", data.length); 178 179 this.putBytes.addAndGet(data.length); 180 this.putCount.incrementAndGet(); 181 182 // figure out how many blocks we need. 183 final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data); 184 185 log.debug("numBlocksNeeded = {0}", numBlocksNeeded); 186 187 // allocate blocks 188 final int[] blocks = allocateBlocks(numBlocksNeeded); 189 190 int offset = 0; 191 final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES; 192 final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES); 193 final ByteBuffer dataBuffer = ByteBuffer.wrap(data); 194 195 for (int i = 0; i < numBlocksNeeded; i++) 196 { 197 headerBuffer.clear(); 198 final int length = Math.min(maxChunkSize, data.length - offset); 199 headerBuffer.putInt(length); 200 headerBuffer.flip(); 201 202 dataBuffer.position(offset).limit(offset + length); 203 final ByteBuffer slice = dataBuffer.slice(); 204 205 final long position = calculateByteOffsetForBlockAsLong(blocks[i]); 206 // write the header 207 int written = fc.write(headerBuffer, position); 208 assert written == HEADER_SIZE_BYTES; 209 210 //write the data 211 written = fc.write(slice, position + HEADER_SIZE_BYTES); 212 assert written == length; 213 214 offset += length; 215 } 216 217 //fc.force(false); 218 219 return blocks; 220 } 221 222 /** 223 * Return the amount to put in each block. Fill them all the way, minus the header. 224 * <p> 225 * @param complete 226 * @param numBlocksNeeded 227 * @return byte[][] 228 */ 229 protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded) 230 { 231 final byte[][] chunks = new byte[numBlocksNeeded][]; 232 233 if (numBlocksNeeded == 1) 234 { 235 chunks[0] = complete; 236 } 237 else 238 { 239 final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES; 240 final int totalBytes = complete.length; 241 int totalUsed = 0; 242 for (short i = 0; i < numBlocksNeeded; i++) 243 { 244 // use the max that can be written to a block or whatever is left in the original 245 // array 246 final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed); 247 final byte[] chunk = new byte[chunkSize]; 248 // copy from the used position to the chunk size on the complete array to the chunk 249 // array. 250 System.arraycopy(complete, totalUsed, chunk, 0, chunkSize); 251 chunks[i] = chunk; 252 totalUsed += chunkSize; 253 } 254 } 255 256 return chunks; 257 } 258 259 /** 260 * Reads an object that is located in the specified blocks. 261 * <p> 262 * @param blockNumbers 263 * @return the object instance 264 * @throws IOException 265 * @throws ClassNotFoundException 266 */ 267 protected <T> T read(final int[] blockNumbers) 268 throws IOException, ClassNotFoundException 269 { 270 final ByteBuffer data; 271 272 if (blockNumbers.length == 1) 273 { 274 data = readBlock(blockNumbers[0]); 275 } 276 else 277 { 278 data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes()); 279 // get all the blocks into data 280 for (short i = 0; i < blockNumbers.length; i++) 281 { 282 final ByteBuffer chunk = readBlock(blockNumbers[i]); 283 data.put(chunk); 284 } 285 286 data.flip(); 287 } 288 289 log.debug("read, total post combination data.length = {0}", () -> data.limit()); 290 291 return elementSerializer.deSerialize(data.array(), null); 292 } 293 294 /** 295 * This reads the occupied data in a block. 296 * <p> 297 * The first four bytes of the record should tell us how long it is. The data is read into a 298 * byte array and then an object is constructed from the byte array. 299 * <p> 300 * @return byte[] 301 * @param block 302 * @throws IOException 303 */ 304 private ByteBuffer readBlock(final int block) 305 throws IOException 306 { 307 int datalen = 0; 308 309 String message = null; 310 boolean corrupted = false; 311 final long fileLength = fc.size(); 312 313 final long position = calculateByteOffsetForBlockAsLong(block); 314// if (position > fileLength) 315// { 316// corrupted = true; 317// message = "Record " + position + " starts past EOF."; 318// } 319// else 320 { 321 final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES); 322 fc.read(datalength, position); 323 datalength.flip(); 324 datalen = datalength.getInt(); 325 if (position + datalen > fileLength) 326 { 327 corrupted = true; 328 message = "Record " + position + " exceeds file length."; 329 } 330 } 331 332 if (corrupted) 333 { 334 log.warn("\n The file is corrupt: \n {0}", message); 335 throw new IOException("The File Is Corrupt, need to reset"); 336 } 337 338 final ByteBuffer data = ByteBuffer.allocate(datalen); 339 fc.read(data, position + HEADER_SIZE_BYTES); 340 data.flip(); 341 342 return data; 343 } 344 345 /** 346 * Add these blocks to the emptyBlock list. 347 * <p> 348 * @param blocksToFree 349 */ 350 protected void freeBlocks(final int[] blocksToFree) 351 { 352 if (blocksToFree != null) 353 { 354 for (short i = 0; i < blocksToFree.length; i++) 355 { 356 emptyBlocks.offer(Integer.valueOf(blocksToFree[i])); 357 } 358 } 359 } 360 361 /** 362 * Calculates the file offset for a particular block. 363 * <p> 364 * @param block number 365 * @return the byte offset for this block in the file as a long 366 * @since 2.0 367 */ 368 protected long calculateByteOffsetForBlockAsLong(final int block) 369 { 370 return (long) block * blockSizeBytes; 371 } 372 373 /** 374 * The number of blocks needed. 375 * <p> 376 * @param data 377 * @return the number of blocks needed to store the byte array 378 */ 379 protected int calculateTheNumberOfBlocksNeeded(final byte[] data) 380 { 381 final int dataLength = data.length; 382 383 final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES; 384 385 // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes 386 if (dataLength <= oneBlock) 387 { 388 return 1; 389 } 390 391 int dividend = dataLength / oneBlock; 392 393 if (dataLength % oneBlock != 0) 394 { 395 dividend++; 396 } 397 return dividend; 398 } 399 400 /** 401 * Returns the file length. 402 * <p> 403 * @return the size of the file. 404 * @throws IOException 405 */ 406 protected long length() 407 throws IOException 408 { 409 return fc.size(); 410 } 411 412 /** 413 * Closes the file. 414 * <p> 415 * @throws IOException 416 */ 417 @Override 418 public void close() 419 throws IOException 420 { 421 this.numberOfBlocks.set(0); 422 this.emptyBlocks.clear(); 423 fc.close(); 424 } 425 426 /** 427 * Resets the file. 428 * <p> 429 * @throws IOException 430 */ 431 protected synchronized void reset() 432 throws IOException 433 { 434 this.numberOfBlocks.set(0); 435 this.emptyBlocks.clear(); 436 fc.truncate(0); 437 fc.force(true); 438 } 439 440 /** 441 * @return Returns the numberOfBlocks. 442 */ 443 protected int getNumberOfBlocks() 444 { 445 return numberOfBlocks.get(); 446 } 447 448 /** 449 * @return Returns the blockSizeBytes. 450 */ 451 protected int getBlockSizeBytes() 452 { 453 return blockSizeBytes; 454 } 455 456 /** 457 * @return Returns the average size of the an element inserted. 458 */ 459 protected long getAveragePutSizeBytes() 460 { 461 final long count = this.putCount.get(); 462 463 if (count == 0) 464 { 465 return 0; 466 } 467 return this.putBytes.get() / count; 468 } 469 470 /** 471 * @return Returns the number of empty blocks. 472 */ 473 protected int getEmptyBlocks() 474 { 475 return this.emptyBlocks.size(); 476 } 477 478 /** 479 * For debugging only. 480 * <p> 481 * @return String with details. 482 */ 483 @Override 484 public String toString() 485 { 486 final StringBuilder buf = new StringBuilder(); 487 buf.append("\nBlock Disk "); 488 buf.append("\n Filepath [" + filepath + "]"); 489 buf.append("\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]"); 490 buf.append("\n BlockSizeBytes [" + this.blockSizeBytes + "]"); 491 buf.append("\n Put Bytes [" + this.putBytes + "]"); 492 buf.append("\n Put Count [" + this.putCount + "]"); 493 buf.append("\n Average Size [" + getAveragePutSizeBytes() + "]"); 494 buf.append("\n Empty Blocks [" + this.getEmptyBlocks() + "]"); 495 try 496 { 497 buf.append("\n Length [" + length() + "]"); 498 } 499 catch (final IOException e) 500 { 501 // swallow 502 } 503 return buf.toString(); 504 } 505 506 /** 507 * This is used for debugging. 508 * <p> 509 * @return the file path. 510 */ 511 protected String getFilePath() 512 { 513 return filepath; 514 } 515}