001package org.apache.commons.jcs.auxiliary.disk.block; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.ByteArrayOutputStream; 023import java.io.File; 024import java.io.IOException; 025import java.io.RandomAccessFile; 026import java.io.Serializable; 027import java.nio.ByteBuffer; 028import java.nio.channels.FileChannel; 029import java.util.concurrent.ConcurrentLinkedQueue; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.concurrent.atomic.AtomicLong; 032 033import org.apache.commons.jcs.engine.behavior.IElementSerializer; 034import org.apache.commons.jcs.utils.serialization.StandardSerializer; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037 038/** 039 * This class manages reading an writing data to disk. When asked to write a value, it returns a 040 * block array. It can read an object from the block numbers in a byte array. 041 * <p> 042 * @author Aaron Smuts 043 */ 044public class BlockDisk 045{ 046 /** The logger */ 047 private static final Log log = LogFactory.getLog( BlockDisk.class ); 048 049 /** The size of the header that indicates the amount of data stored in an occupied block. */ 050 public static final byte HEADER_SIZE_BYTES = 4; 051 // N.B. 4 bytes is the size used for ByteBuffer.putInt(int value) and ByteBuffer.getInt() 052 053 /** defaults to 4kb */ 054 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024; 055 056 /** Size of the blocks */ 057 private final int blockSizeBytes; 058 059 /** 060 * the total number of blocks that have been used. If there are no free, we will use this to 061 * calculate the position of the next block. 062 */ 063 private final AtomicInteger numberOfBlocks = new AtomicInteger(0); 064 065 /** Empty blocks that can be reused. */ 066 private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<Integer>(); 067 068 /** The serializer. */ 069 private final IElementSerializer elementSerializer; 070 071 /** Location of the spot on disk */ 072 private final String filepath; 073 074 /** File channel for multiple concurrent reads and writes */ 075 private final FileChannel fc; 076 077 /** How many bytes have we put to disk */ 078 private final AtomicLong putBytes = new AtomicLong(0); 079 080 /** How many items have we put to disk */ 081 private final AtomicLong putCount = new AtomicLong(0); 082 083 /** 084 * Constructor for the Disk object 085 * <p> 086 * @param file 087 * @param elementSerializer 088 * @throws IOException 089 */ 090 public BlockDisk( File file, IElementSerializer elementSerializer ) 091 throws IOException 092 { 093 this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer ); 094 } 095 096 /** 097 * Creates the file and set the block size in bytes. 098 * <p> 099 * @param file 100 * @param blockSizeBytes 101 * @throws IOException 102 */ 103 public BlockDisk( File file, int blockSizeBytes ) 104 throws IOException 105 { 106 this( file, blockSizeBytes, new StandardSerializer() ); 107 } 108 109 /** 110 * Creates the file and set the block size in bytes. 111 * <p> 112 * @param file 113 * @param blockSizeBytes 114 * @param elementSerializer 115 * @throws IOException 116 */ 117 public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer ) 118 throws IOException 119 { 120 this.filepath = file.getAbsolutePath(); 121 RandomAccessFile raf = new RandomAccessFile( filepath, "rw" ); 122 this.fc = raf.getChannel(); 123 this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes)); 124 125 if ( log.isInfoEnabled() ) 126 { 127 log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" ); 128 } 129 130 this.blockSizeBytes = blockSizeBytes; 131 this.elementSerializer = elementSerializer; 132 } 133 134 /** 135 * Allocate a given number of blocks from the available set 136 * 137 * @param numBlocksNeeded 138 * @return an array of allocated blocks 139 */ 140 private int[] allocateBlocks(int numBlocksNeeded) 141 { 142 assert numBlocksNeeded >= 1; 143 144 int[] blocks = new int[numBlocksNeeded]; 145 // get them from the empty list or take the next one 146 for (int i = 0; i < numBlocksNeeded; i++) 147 { 148 Integer emptyBlock = emptyBlocks.poll(); 149 if (emptyBlock == null) 150 { 151 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement()); 152 } 153 blocks[i] = emptyBlock.intValue(); 154 } 155 156 return blocks; 157 } 158 159 /** 160 * This writes an object to disk and returns the blocks it was stored in. 161 * <p> 162 * The program flow is as follows: 163 * <ol> 164 * <li>Serialize the object.</li> 165 * <li>Determine the number of blocks needed.</li> 166 * <li>Look for free blocks in the emptyBlock list.</li> 167 * <li>If there were not enough in the empty list. Take the nextBlock and increment it.</li> 168 * <li>If the data will not fit in one block, create sub arrays.</li> 169 * <li>Write the subarrays to disk.</li> 170 * <li>If the process fails we should decrement the block count if we took from it.</li> 171 * </ol> 172 * @param object 173 * @return the blocks we used. 174 * @throws IOException 175 */ 176 protected int[] write( Serializable object ) 177 throws IOException 178 { 179 // serialize the object 180 byte[] data = elementSerializer.serialize(object); 181 182 if ( log.isDebugEnabled() ) 183 { 184 log.debug( "write, total pre-chunking data.length = " + data.length ); 185 } 186 187 this.putBytes.addAndGet(data.length); 188 this.putCount.incrementAndGet(); 189 190 // figure out how many blocks we need. 191 int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data); 192 193 if ( log.isDebugEnabled() ) 194 { 195 log.debug( "numBlocksNeeded = " + numBlocksNeeded ); 196 } 197 198 // allocate blocks 199 int[] blocks = allocateBlocks(numBlocksNeeded); 200 201 int offset = 0; 202 final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES; 203 ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES); 204 205 for (int i = 0; i < numBlocksNeeded; i++) 206 { 207 headerBuffer.clear(); 208 int length = Math.min(maxChunkSize, data.length - offset); 209 headerBuffer.putInt(length); 210 211 ByteBuffer dataBuffer = ByteBuffer.wrap(data, offset, length); 212 213 long position = calculateByteOffsetForBlockAsLong(blocks[i]); 214 // write the header 215 headerBuffer.flip(); 216 int written = fc.write(headerBuffer, position); 217 assert written == HEADER_SIZE_BYTES; 218 219 //write the data 220 written = fc.write(dataBuffer, position + HEADER_SIZE_BYTES); 221 assert written == length; 222 223 offset += length; 224 } 225 226 //fc.force(false); 227 228 return blocks; 229 } 230 231 /** 232 * Return the amount to put in each block. Fill them all the way, minus the header. 233 * <p> 234 * @param complete 235 * @param numBlocksNeeded 236 * @return byte[][] 237 */ 238 protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded ) 239 { 240 byte[][] chunks = new byte[numBlocksNeeded][]; 241 242 if ( numBlocksNeeded == 1 ) 243 { 244 chunks[0] = complete; 245 } 246 else 247 { 248 int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES; 249 int totalBytes = complete.length; 250 int totalUsed = 0; 251 for ( short i = 0; i < numBlocksNeeded; i++ ) 252 { 253 // use the max that can be written to a block or whatever is left in the original 254 // array 255 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed ); 256 byte[] chunk = new byte[chunkSize]; 257 // copy from the used position to the chunk size on the complete array to the chunk 258 // array. 259 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize ); 260 chunks[i] = chunk; 261 totalUsed += chunkSize; 262 } 263 } 264 265 return chunks; 266 } 267 268 /** 269 * Reads an object that is located in the specified blocks. 270 * <p> 271 * @param blockNumbers 272 * @return Serializable 273 * @throws IOException 274 * @throws ClassNotFoundException 275 */ 276 protected <T extends Serializable> T read( int[] blockNumbers ) 277 throws IOException, ClassNotFoundException 278 { 279 byte[] data = null; 280 281 if ( blockNumbers.length == 1 ) 282 { 283 data = readBlock( blockNumbers[0] ); 284 } 285 else 286 { 287 ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes()); 288 // get all the blocks into data 289 for ( short i = 0; i < blockNumbers.length; i++ ) 290 { 291 byte[] chunk = readBlock( blockNumbers[i] ); 292 baos.write(chunk); 293 } 294 295 data = baos.toByteArray(); 296 baos.close(); 297 } 298 299 if ( log.isDebugEnabled() ) 300 { 301 log.debug( "read, total post combination data.length = " + data.length ); 302 } 303 304 return elementSerializer.deSerialize( data, null ); 305 } 306 307 /** 308 * This reads the occupied data in a block. 309 * <p> 310 * The first four bytes of the record should tell us how long it is. The data is read into a 311 * byte array and then an object is constructed from the byte array. 312 * <p> 313 * @return byte[] 314 * @param block 315 * @throws IOException 316 */ 317 private byte[] readBlock( int block ) 318 throws IOException 319 { 320 int datalen = 0; 321 322 String message = null; 323 boolean corrupted = false; 324 long fileLength = fc.size(); 325 326 long position = calculateByteOffsetForBlockAsLong( block ); 327// if ( position > fileLength ) 328// { 329// corrupted = true; 330// message = "Record " + position + " starts past EOF."; 331// } 332// else 333 { 334 ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES); 335 fc.read(datalength, position); 336 datalength.flip(); 337 datalen = datalength.getInt(); 338 if ( position + datalen > fileLength ) 339 { 340 corrupted = true; 341 message = "Record " + position + " exceeds file length."; 342 } 343 } 344 345 if ( corrupted ) 346 { 347 log.warn( "\n The file is corrupt: " + "\n " + message ); 348 throw new IOException( "The File Is Corrupt, need to reset" ); 349 } 350 351 ByteBuffer data = ByteBuffer.allocate(datalen); 352 fc.read(data, position + HEADER_SIZE_BYTES); 353 data.flip(); 354 355 return data.array(); 356 } 357 358 /** 359 * Add these blocks to the emptyBlock list. 360 * <p> 361 * @param blocksToFree 362 */ 363 protected void freeBlocks( int[] blocksToFree ) 364 { 365 if ( blocksToFree != null ) 366 { 367 for ( short i = 0; i < blocksToFree.length; i++ ) 368 { 369 emptyBlocks.offer( Integer.valueOf( blocksToFree[i] ) ); 370 } 371 } 372 } 373 374 /** 375 * Calculates the file offset for a particular block. 376 * <p> 377 * @param block number 378 * @return the byte offset for this block in the file as an int; may overflow 379 * @deprecated (since 2.0) use {@link #calculateByteOffsetForBlockAsLong(int)} instead 380 */ 381 @Deprecated 382 protected int calculateByteOffsetForBlock( int block ) 383 { 384 return block * blockSizeBytes; 385 } 386 387 /** 388 * Calculates the file offset for a particular block. 389 * <p> 390 * @param block number 391 * @return the byte offset for this block in the file as a long 392 * @since 2.0 393 */ 394 protected long calculateByteOffsetForBlockAsLong( int block ) 395 { 396 return (long) block * blockSizeBytes; 397 } 398 399 /** 400 * The number of blocks needed. 401 * <p> 402 * @param data 403 * @return the number of blocks needed to store the byte array 404 */ 405 protected int calculateTheNumberOfBlocksNeeded( byte[] data ) 406 { 407 int dataLength = data.length; 408 409 int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES; 410 411 // takes care of 0 = HEADER_SIZE_BYTES + blockSizeBytes 412 if ( dataLength <= oneBlock ) 413 { 414 return 1; 415 } 416 417 int dividend = dataLength / oneBlock; 418 419 if ( dataLength % oneBlock != 0 ) 420 { 421 dividend++; 422 } 423 return dividend; 424 } 425 426 /** 427 * Returns the file length. 428 * <p> 429 * @return the size of the file. 430 * @throws IOException 431 */ 432 protected long length() 433 throws IOException 434 { 435 return fc.size(); 436 } 437 438 /** 439 * Closes the file. 440 * <p> 441 * @throws IOException 442 */ 443 protected void close() 444 throws IOException 445 { 446 fc.close(); 447 } 448 449 /** 450 * Resets the file. 451 * <p> 452 * @throws IOException 453 */ 454 protected synchronized void reset() 455 throws IOException 456 { 457 this.numberOfBlocks.set(0); 458 this.emptyBlocks.clear(); 459 fc.truncate(0); 460 fc.force(true); 461 } 462 463 /** 464 * @return Returns the numberOfBlocks. 465 */ 466 protected int getNumberOfBlocks() 467 { 468 return numberOfBlocks.get(); 469 } 470 471 /** 472 * @return Returns the blockSizeBytes. 473 */ 474 protected int getBlockSizeBytes() 475 { 476 return blockSizeBytes; 477 } 478 479 /** 480 * @return Returns the average size of the an element inserted. 481 */ 482 protected long getAveragePutSizeBytes() 483 { 484 long count = this.putCount.get(); 485 486 if (count == 0 ) 487 { 488 return 0; 489 } 490 return this.putBytes.get() / count; 491 } 492 493 /** 494 * @return Returns the number of empty blocks. 495 */ 496 protected int getEmptyBlocks() 497 { 498 return this.emptyBlocks.size(); 499 } 500 501 /** 502 * For debugging only. 503 * <p> 504 * @return String with details. 505 */ 506 @Override 507 public String toString() 508 { 509 StringBuilder buf = new StringBuilder(); 510 buf.append( "\nBlock Disk " ); 511 buf.append( "\n Filepath [" + filepath + "]" ); 512 buf.append( "\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]" ); 513 buf.append( "\n BlockSizeBytes [" + this.blockSizeBytes + "]" ); 514 buf.append( "\n Put Bytes [" + this.putBytes + "]" ); 515 buf.append( "\n Put Count [" + this.putCount + "]" ); 516 buf.append( "\n Average Size [" + getAveragePutSizeBytes() + "]" ); 517 buf.append( "\n Empty Blocks [" + this.getEmptyBlocks() + "]" ); 518 try 519 { 520 buf.append( "\n Length [" + length() + "]" ); 521 } 522 catch ( IOException e ) 523 { 524 // swallow 525 } 526 return buf.toString(); 527 } 528 529 /** 530 * This is used for debugging. 531 * <p> 532 * @return the file path. 533 */ 534 protected String getFilePath() 535 { 536 return filepath; 537 } 538}