1 package org.apache.commons.jcs.auxiliary.disk.block;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayOutputStream;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.RandomAccessFile;
26 import java.io.Serializable;
27 import java.nio.ByteBuffer;
28 import java.nio.channels.FileChannel;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.jcs.engine.behavior.IElementSerializer;
34 import org.apache.commons.jcs.utils.serialization.StandardSerializer;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38
39
40
41
42
43
44 public class BlockDisk
45 {
46
47 private static final Log log = LogFactory.getLog( BlockDisk.class );
48
49
50 public static final byte HEADER_SIZE_BYTES = 4;
51
52
53
54 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55
56
57 private final int blockSizeBytes;
58
59
60
61
62
63 private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64
65
66 private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<Integer>();
67
68
69 private final IElementSerializer elementSerializer;
70
71
72 private final String filepath;
73
74
75 private final FileChannel fc;
76
77
78 private final AtomicLong putBytes = new AtomicLong(0);
79
80
81 private final AtomicLong putCount = new AtomicLong(0);
82
83
84
85
86
87
88
89
90 public BlockDisk( File file, IElementSerializer elementSerializer )
91 throws IOException
92 {
93 this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94 }
95
96
97
98
99
100
101
102
103 public BlockDisk( File file, int blockSizeBytes )
104 throws IOException
105 {
106 this( file, blockSizeBytes, new StandardSerializer() );
107 }
108
109
110
111
112
113
114
115
116
117 public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118 throws IOException
119 {
120 this.filepath = file.getAbsolutePath();
121 RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122 this.fc = raf.getChannel();
123 this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
124
125 if ( log.isInfoEnabled() )
126 {
127 log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
128 }
129
130 this.blockSizeBytes = blockSizeBytes;
131 this.elementSerializer = elementSerializer;
132 }
133
134
135
136
137
138
139
140 private int[] allocateBlocks(int numBlocksNeeded)
141 {
142 assert numBlocksNeeded >= 1;
143
144 int[] blocks = new int[numBlocksNeeded];
145
146 for (int i = 0; i < numBlocksNeeded; i++)
147 {
148 Integer emptyBlock = emptyBlocks.poll();
149 if (emptyBlock == null)
150 {
151 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
152 }
153 blocks[i] = emptyBlock.intValue();
154 }
155
156 return blocks;
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 protected int[] write( Serializable object )
177 throws IOException
178 {
179
180 byte[] data = elementSerializer.serialize(object);
181
182 if ( log.isDebugEnabled() )
183 {
184 log.debug( "write, total pre-chunking data.length = " + data.length );
185 }
186
187 this.putBytes.addAndGet(data.length);
188 this.putCount.incrementAndGet();
189
190
191 int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
192
193 if ( log.isDebugEnabled() )
194 {
195 log.debug( "numBlocksNeeded = " + numBlocksNeeded );
196 }
197
198
199 int[] blocks = allocateBlocks(numBlocksNeeded);
200
201 int offset = 0;
202 final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
203 ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
204
205 for (int i = 0; i < numBlocksNeeded; i++)
206 {
207 headerBuffer.clear();
208 int length = Math.min(maxChunkSize, data.length - offset);
209 headerBuffer.putInt(length);
210
211 ByteBuffer dataBuffer = ByteBuffer.wrap(data, offset, length);
212
213 long position = calculateByteOffsetForBlockAsLong(blocks[i]);
214
215 headerBuffer.flip();
216 int written = fc.write(headerBuffer, position);
217 assert written == HEADER_SIZE_BYTES;
218
219
220 written = fc.write(dataBuffer, position + HEADER_SIZE_BYTES);
221 assert written == length;
222
223 offset += length;
224 }
225
226
227
228 return blocks;
229 }
230
231
232
233
234
235
236
237
238 protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
239 {
240 byte[][] chunks = new byte[numBlocksNeeded][];
241
242 if ( numBlocksNeeded == 1 )
243 {
244 chunks[0] = complete;
245 }
246 else
247 {
248 int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
249 int totalBytes = complete.length;
250 int totalUsed = 0;
251 for ( short i = 0; i < numBlocksNeeded; i++ )
252 {
253
254
255 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
256 byte[] chunk = new byte[chunkSize];
257
258
259 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
260 chunks[i] = chunk;
261 totalUsed += chunkSize;
262 }
263 }
264
265 return chunks;
266 }
267
268
269
270
271
272
273
274
275
276 protected <T extends Serializable> T read( int[] blockNumbers )
277 throws IOException, ClassNotFoundException
278 {
279 byte[] data = null;
280
281 if ( blockNumbers.length == 1 )
282 {
283 data = readBlock( blockNumbers[0] );
284 }
285 else
286 {
287 ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
288
289 for ( short i = 0; i < blockNumbers.length; i++ )
290 {
291 byte[] chunk = readBlock( blockNumbers[i] );
292 baos.write(chunk);
293 }
294
295 data = baos.toByteArray();
296 baos.close();
297 }
298
299 if ( log.isDebugEnabled() )
300 {
301 log.debug( "read, total post combination data.length = " + data.length );
302 }
303
304 return elementSerializer.deSerialize( data, null );
305 }
306
307
308
309
310
311
312
313
314
315
316
317 private byte[] readBlock( int block )
318 throws IOException
319 {
320 int datalen = 0;
321
322 String message = null;
323 boolean corrupted = false;
324 long fileLength = fc.size();
325
326 long position = calculateByteOffsetForBlockAsLong( block );
327
328
329
330
331
332
333 {
334 ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
335 fc.read(datalength, position);
336 datalength.flip();
337 datalen = datalength.getInt();
338 if ( position + datalen > fileLength )
339 {
340 corrupted = true;
341 message = "Record " + position + " exceeds file length.";
342 }
343 }
344
345 if ( corrupted )
346 {
347 log.warn( "\n The file is corrupt: " + "\n " + message );
348 throw new IOException( "The File Is Corrupt, need to reset" );
349 }
350
351 ByteBuffer data = ByteBuffer.allocate(datalen);
352 fc.read(data, position + HEADER_SIZE_BYTES);
353 data.flip();
354
355 return data.array();
356 }
357
358
359
360
361
362
363 protected void freeBlocks( int[] blocksToFree )
364 {
365 if ( blocksToFree != null )
366 {
367 for ( short i = 0; i < blocksToFree.length; i++ )
368 {
369 emptyBlocks.offer( Integer.valueOf( blocksToFree[i] ) );
370 }
371 }
372 }
373
374
375
376
377
378
379
380
381 @Deprecated
382 protected int calculateByteOffsetForBlock( int block )
383 {
384 return block * blockSizeBytes;
385 }
386
387
388
389
390
391
392
393
394 protected long calculateByteOffsetForBlockAsLong( int block )
395 {
396 return (long) block * blockSizeBytes;
397 }
398
399
400
401
402
403
404
405 protected int calculateTheNumberOfBlocksNeeded( byte[] data )
406 {
407 int dataLength = data.length;
408
409 int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
410
411
412 if ( dataLength <= oneBlock )
413 {
414 return 1;
415 }
416
417 int dividend = dataLength / oneBlock;
418
419 if ( dataLength % oneBlock != 0 )
420 {
421 dividend++;
422 }
423 return dividend;
424 }
425
426
427
428
429
430
431
432 protected long length()
433 throws IOException
434 {
435 return fc.size();
436 }
437
438
439
440
441
442
443 protected void close()
444 throws IOException
445 {
446 fc.close();
447 }
448
449
450
451
452
453
454 protected synchronized void reset()
455 throws IOException
456 {
457 this.numberOfBlocks.set(0);
458 this.emptyBlocks.clear();
459 fc.truncate(0);
460 fc.force(true);
461 }
462
463
464
465
466 protected int getNumberOfBlocks()
467 {
468 return numberOfBlocks.get();
469 }
470
471
472
473
474 protected int getBlockSizeBytes()
475 {
476 return blockSizeBytes;
477 }
478
479
480
481
482 protected long getAveragePutSizeBytes()
483 {
484 long count = this.putCount.get();
485
486 if (count == 0 )
487 {
488 return 0;
489 }
490 return this.putBytes.get() / count;
491 }
492
493
494
495
496 protected int getEmptyBlocks()
497 {
498 return this.emptyBlocks.size();
499 }
500
501
502
503
504
505
506 @Override
507 public String toString()
508 {
509 StringBuilder buf = new StringBuilder();
510 buf.append( "\nBlock Disk " );
511 buf.append( "\n Filepath [" + filepath + "]" );
512 buf.append( "\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
513 buf.append( "\n BlockSizeBytes [" + this.blockSizeBytes + "]" );
514 buf.append( "\n Put Bytes [" + this.putBytes + "]" );
515 buf.append( "\n Put Count [" + this.putCount + "]" );
516 buf.append( "\n Average Size [" + getAveragePutSizeBytes() + "]" );
517 buf.append( "\n Empty Blocks [" + this.getEmptyBlocks() + "]" );
518 try
519 {
520 buf.append( "\n Length [" + length() + "]" );
521 }
522 catch ( IOException e )
523 {
524
525 }
526 return buf.toString();
527 }
528
529
530
531
532
533
534 protected String getFilePath()
535 {
536 return filepath;
537 }
538 }