1 package org.apache.jcs.auxiliary.disk.block;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayOutputStream;
23 import java.io.File;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.RandomAccessFile;
27 import java.io.Serializable;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.FileChannel;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.jcs.engine.behavior.IElementSerializer;
36 import org.apache.jcs.utils.serialization.StandardSerializer;
37 import org.apache.jcs.utils.struct.SingleLinkedList;
38
39
40
41
42
43
44
45 public class BlockDisk
46 {
47
48 private static final Log log = LogFactory.getLog( BlockDisk.class );
49
50
51 public static final byte HEADER_SIZE_BYTES = 4;
52
53
54 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55
56
57 private int blockSizeBytes;
58
59
60
61
62
63 private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64
65
66 private final SingleLinkedList<Integer> emptyBlocks = new SingleLinkedList<Integer>();
67
68
69 protected IElementSerializer elementSerializer;
70
71
72 private final String filepath;
73
74
75 private final FileChannel fc;
76
77
78 private final AtomicLong putBytes = new AtomicLong(0);
79
80
81 private final AtomicLong putCount = new AtomicLong(0);
82
83
84
85
86
87
88
89
90 public BlockDisk( File file, IElementSerializer elementSerializer )
91 throws FileNotFoundException
92 {
93 this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94 }
95
96
97
98
99
100
101
102
103 public BlockDisk( File file, int blockSizeBytes )
104 throws FileNotFoundException
105 {
106 this( file, blockSizeBytes, new StandardSerializer() );
107 }
108
109
110
111
112
113
114
115
116
117 public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118 throws FileNotFoundException
119 {
120 this.filepath = file.getAbsolutePath();
121 RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122 this.fc = raf.getChannel();
123
124 if ( log.isInfoEnabled() )
125 {
126 log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
127 }
128
129 this.blockSizeBytes = blockSizeBytes;
130 this.elementSerializer = elementSerializer;
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 protected int[] write( Serializable object )
151 throws IOException
152 {
153
154 byte[] data = elementSerializer.serialize( object );
155
156 if ( log.isDebugEnabled() )
157 {
158 log.debug( "write, total pre-chunking data.length = " + data.length );
159 }
160
161 this.putBytes.addAndGet(data.length);
162 this.putCount.incrementAndGet();
163
164
165 int numBlocksNeeded = calculateTheNumberOfBlocksNeeded( data );
166 if ( log.isDebugEnabled() )
167 {
168 log.debug( "numBlocksNeeded = " + numBlocksNeeded );
169 }
170
171 int[] blocks = new int[numBlocksNeeded];
172
173
174 for ( int i = 0; i < numBlocksNeeded; i++ )
175 {
176 Integer emptyBlock = emptyBlocks.takeFirst();
177 if ( emptyBlock != null )
178 {
179 blocks[i] = emptyBlock.intValue();
180 }
181 else
182 {
183 blocks[i] = this.numberOfBlocks.getAndIncrement();
184 }
185 }
186
187
188 byte[][] chunks = getBlockChunks( data, numBlocksNeeded );
189
190
191 for ( int i = 0; i < numBlocksNeeded; i++ )
192 {
193 int position = calculateByteOffsetForBlock( blocks[i] );
194 write( position, chunks[i] );
195 }
196
197 return blocks;
198 }
199
200
201
202
203
204
205
206
207 protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
208 {
209 byte[][] chunks = new byte[numBlocksNeeded][];
210
211 if ( numBlocksNeeded == 1 )
212 {
213 chunks[0] = complete;
214 }
215 else
216 {
217 int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
218 int totalBytes = complete.length;
219 int totalUsed = 0;
220 for ( short i = 0; i < numBlocksNeeded; i++ )
221 {
222
223
224 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
225 byte[] chunk = new byte[chunkSize];
226
227
228 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
229 chunks[i] = chunk;
230 totalUsed += chunkSize;
231 }
232 }
233
234 return chunks;
235 }
236
237
238
239
240
241
242
243
244
245 private boolean write( long position, byte[] data )
246 throws IOException
247 {
248 ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE_BYTES + data.length);
249 buffer.putInt(data.length);
250 buffer.put(data);
251 buffer.flip();
252 int written = fc.write(buffer, position);
253 fc.force(true);
254
255 return written == data.length;
256 }
257
258
259
260
261
262
263
264
265
266 protected <T extends Serializable> T read( int[] blockNumbers )
267 throws IOException, ClassNotFoundException
268 {
269 byte[] data = null;
270
271 if ( blockNumbers.length == 1 )
272 {
273 data = readBlock( blockNumbers[0] );
274 }
275 else
276 {
277 ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
278
279 for ( short i = 0; i < blockNumbers.length; i++ )
280 {
281 byte[] chunk = readBlock( blockNumbers[i] );
282 baos.write(chunk);
283 }
284
285 data = baos.toByteArray();
286 baos.close();
287 }
288
289 if ( log.isDebugEnabled() )
290 {
291 log.debug( "read, total post combination data.length = " + data.length );
292 }
293
294 return elementSerializer.deSerialize( data );
295 }
296
297
298
299
300
301
302
303
304
305
306
307 private byte[] readBlock( int block )
308 throws IOException
309 {
310 int datalen = 0;
311
312 String message = null;
313 boolean corrupted = false;
314 long fileLength = fc.size();
315
316 int position = calculateByteOffsetForBlock( block );
317
318
319
320
321
322
323 {
324 ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
325 fc.read(datalength, position);
326 datalength.flip();
327 datalen = datalength.getInt();
328 if ( position + datalen > fileLength )
329 {
330 corrupted = true;
331 message = "Record " + position + " exceeds file length.";
332 }
333 }
334
335 if ( corrupted )
336 {
337 log.warn( "\n The file is corrupt: " + "\n " + message );
338 throw new IOException( "The File Is Corrupt, need to reset" );
339 }
340
341 ByteBuffer data = ByteBuffer.allocate(datalen);
342 fc.read(data, position + HEADER_SIZE_BYTES);
343 data.flip();
344
345 return data.array();
346 }
347
348
349
350
351
352
353 protected void freeBlocks( int[] blocksToFree )
354 {
355 if ( blocksToFree != null )
356 {
357 for ( short i = 0; i < blocksToFree.length; i++ )
358 {
359 emptyBlocks.addLast( Integer.valueOf( blocksToFree[i] ) );
360 }
361 }
362 }
363
364
365
366
367
368
369
370 protected int calculateByteOffsetForBlock( int block )
371 {
372 return block * blockSizeBytes;
373 }
374
375
376
377
378
379
380
381 protected int calculateTheNumberOfBlocksNeeded( byte[] data )
382 {
383 int dataLength = data.length;
384
385 int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
386
387
388 if ( dataLength <= oneBlock )
389 {
390 return 1;
391 }
392
393 int dividend = dataLength / oneBlock;
394
395 if ( dataLength % oneBlock != 0 )
396 {
397 dividend++;
398 }
399 return dividend;
400 }
401
402
403
404
405
406
407
408 protected long length()
409 throws IOException
410 {
411 return fc.size();
412 }
413
414
415
416
417
418
419 protected void close()
420 throws IOException
421 {
422 fc.close();
423 }
424
425
426
427
428
429
430 protected synchronized void reset()
431 throws IOException
432 {
433 this.numberOfBlocks.set(0);
434 this.emptyBlocks.clear();
435 fc.truncate(0);
436 fc.force(true);
437 }
438
439
440
441
442 protected int getNumberOfBlocks()
443 {
444 return numberOfBlocks.get();
445 }
446
447
448
449
450 protected int getBlockSizeBytes()
451 {
452 return blockSizeBytes;
453 }
454
455
456
457
458 protected long getAveragePutSizeBytes()
459 {
460 long count = this.putCount.get();
461
462 if (count == 0 )
463 {
464 return 0;
465 }
466 return this.putBytes.get() / count;
467 }
468
469
470
471
472 protected int getEmptyBlocks()
473 {
474 return this.emptyBlocks.size();
475 }
476
477
478
479
480
481
482 @Override
483 public String toString()
484 {
485 StringBuffer buf = new StringBuffer();
486 buf.append( "\nBlock Disk " );
487 buf.append( "\n Filepath [" + filepath + "]" );
488 buf.append( "\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
489 buf.append( "\n BlockSizeBytes [" + this.blockSizeBytes + "]" );
490 buf.append( "\n Put Bytes [" + this.putBytes + "]" );
491 buf.append( "\n Put Count [" + this.putCount + "]" );
492 buf.append( "\n Average Size [" + getAveragePutSizeBytes() + "]" );
493 buf.append( "\n Empty Blocks [" + this.getEmptyBlocks() + "]" );
494 try
495 {
496 buf.append( "\n Length [" + length() + "]" );
497 }
498 catch ( IOException e )
499 {
500
501 }
502 return buf.toString();
503 }
504
505
506
507
508
509
510 protected String getFilePath()
511 {
512 return filepath;
513 }
514 }