1 package org.apache.jcs.auxiliary.disk.block;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayOutputStream;
23 import java.io.File;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.RandomAccessFile;
27 import java.io.Serializable;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.FileChannel;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.jcs.engine.behavior.IElementSerializer;
36 import org.apache.jcs.utils.serialization.StandardSerializer;
37 import org.apache.jcs.utils.struct.SingleLinkedList;
38
39
40
41
42
43
44
45 public class BlockDisk
46 {
47
48 private static final Log log = LogFactory.getLog( BlockDisk.class );
49
50
51 public static final byte HEADER_SIZE_BYTES = 4;
52
53
54 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
55
56
57 private int blockSizeBytes = DEFAULT_BLOCK_SIZE_BYTES;
58
59
60
61
62
63 private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
64
65
66 private final SingleLinkedList<Integer> emptyBlocks = new SingleLinkedList<Integer>();
67
68
69 protected IElementSerializer elementSerializer = new StandardSerializer();
70
71
72 private final String filepath;
73
74
75 private final FileChannel fc;
76
77
78 private final AtomicLong putBytes = new AtomicLong(0);
79
80
81 private final AtomicLong putCount = new AtomicLong(0);
82
83
84
85
86
87
88
89
90 public BlockDisk( File file, IElementSerializer elementSerializer )
91 throws FileNotFoundException
92 {
93 this( file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer );
94 }
95
96
97
98
99
100
101
102
103 public BlockDisk( File file, int blockSizeBytes )
104 throws FileNotFoundException
105 {
106 this( file, blockSizeBytes, new StandardSerializer() );
107 }
108
109
110
111
112
113
114
115
116
117 public BlockDisk( File file, int blockSizeBytes, IElementSerializer elementSerializer )
118 throws FileNotFoundException
119 {
120 this.filepath = file.getAbsolutePath();
121 RandomAccessFile raf = new RandomAccessFile( filepath, "rw" );
122 this.fc = raf.getChannel();
123
124 if ( log.isInfoEnabled() )
125 {
126 log.info( "Constructing BlockDisk, blockSizeBytes [" + blockSizeBytes + "]" );
127 }
128 this.blockSizeBytes = blockSizeBytes;
129 this.elementSerializer = elementSerializer;
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 protected int[] write( Serializable object )
150 throws IOException
151 {
152
153 byte[] data = elementSerializer.serialize( object );
154
155 if ( log.isDebugEnabled() )
156 {
157 log.debug( "write, total pre-chunking data.length = " + data.length );
158 }
159
160 this.putBytes.addAndGet(data.length);
161 this.putCount.incrementAndGet();
162
163
164 int numBlocksNeeded = calculateTheNumberOfBlocksNeeded( data );
165 if ( log.isDebugEnabled() )
166 {
167 log.debug( "numBlocksNeeded = " + numBlocksNeeded );
168 }
169
170 int[] blocks = new int[numBlocksNeeded];
171
172
173 for ( int i = 0; i < numBlocksNeeded; i++ )
174 {
175 Integer emptyBlock = emptyBlocks.takeFirst();
176 if ( emptyBlock != null )
177 {
178 blocks[i] = emptyBlock.intValue();
179 }
180 else
181 {
182 blocks[i] = this.numberOfBlocks.getAndIncrement();
183 }
184 }
185
186
187 byte[][] chunks = getBlockChunks( data, numBlocksNeeded );
188
189
190 for ( int i = 0; i < numBlocksNeeded; i++ )
191 {
192 int position = calculateByteOffsetForBlock( blocks[i] );
193 write( position, chunks[i] );
194 }
195
196 return blocks;
197 }
198
199
200
201
202
203
204
205
206 protected byte[][] getBlockChunks( byte[] complete, int numBlocksNeeded )
207 {
208 byte[][] chunks = new byte[numBlocksNeeded][];
209
210 if ( numBlocksNeeded == 1 )
211 {
212 chunks[0] = complete;
213 }
214 else
215 {
216 int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
217 int totalBytes = complete.length;
218 int totalUsed = 0;
219 for ( short i = 0; i < numBlocksNeeded; i++ )
220 {
221
222
223 int chunkSize = Math.min( maxChunkSize, totalBytes - totalUsed );
224 byte[] chunk = new byte[chunkSize];
225
226
227 System.arraycopy( complete, totalUsed, chunk, 0, chunkSize );
228 chunks[i] = chunk;
229 totalUsed += chunkSize;
230 }
231 }
232
233 return chunks;
234 }
235
236
237
238
239
240
241
242
243
244 private boolean write( long position, byte[] data )
245 throws IOException
246 {
247 ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE_BYTES + data.length);
248 buffer.putInt(data.length);
249 buffer.put(data);
250 buffer.flip();
251 int written = fc.write(buffer, position);
252
253 return written == data.length;
254 }
255
256
257
258
259
260
261
262
263
264 protected Serializable read( int[] blockNumbers )
265 throws IOException, ClassNotFoundException
266 {
267 byte[] data = null;
268
269 if ( blockNumbers.length == 1 )
270 {
271 data = readBlock( blockNumbers[0] );
272 }
273 else
274 {
275 ByteArrayOutputStream baos = new ByteArrayOutputStream(getBlockSizeBytes());
276
277 for ( short i = 0; i < blockNumbers.length; i++ )
278 {
279 byte[] chunk = readBlock( blockNumbers[i] );
280 baos.write(chunk);
281 }
282
283 data = baos.toByteArray();
284 baos.close();
285 }
286
287 if ( log.isDebugEnabled() )
288 {
289 log.debug( "read, total post combination data.length = " + data.length );
290 }
291
292 return (Serializable) elementSerializer.deSerialize( data );
293 }
294
295
296
297
298
299
300
301
302
303
304
305 private byte[] readBlock( int block )
306 throws IOException
307 {
308 int datalen = 0;
309
310 String message = null;
311 boolean corrupted = false;
312 long fileLength = fc.size();
313
314 int position = calculateByteOffsetForBlock( block );
315 if ( position > fileLength )
316 {
317 corrupted = true;
318 message = "Record " + position + " starts past EOF.";
319 }
320 else
321 {
322 ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
323 fc.read(datalength, position);
324 datalength.flip();
325 datalen = datalength.getInt();
326 if ( position + datalen > fileLength )
327 {
328 corrupted = true;
329 message = "Record " + position + " exceeds file length.";
330 }
331 }
332
333 if ( corrupted )
334 {
335 log.warn( "\n The file is corrupt: " + "\n " + message );
336 throw new IOException( "The File Is Corrupt, need to reset" );
337 }
338
339 ByteBuffer data = ByteBuffer.allocate(datalen);
340 fc.read(data, position + HEADER_SIZE_BYTES);
341 data.flip();
342
343 return data.array();
344 }
345
346
347
348
349
350
351 protected void freeBlocks( int[] blocksToFree )
352 {
353 if ( blocksToFree != null )
354 {
355 for ( short i = 0; i < blocksToFree.length; i++ )
356 {
357 emptyBlocks.addLast( Integer.valueOf( blocksToFree[i] ) );
358 }
359 }
360 }
361
362
363
364
365
366
367
368 protected int calculateByteOffsetForBlock( int block )
369 {
370 return block * blockSizeBytes;
371 }
372
373
374
375
376
377
378
379 protected int calculateTheNumberOfBlocksNeeded( byte[] data )
380 {
381 int dataLength = data.length;
382
383 int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
384
385
386 if ( dataLength <= oneBlock )
387 {
388 return 1;
389 }
390
391 int dividend = dataLength / oneBlock;
392
393 if ( dataLength % oneBlock != 0 )
394 {
395 dividend++;
396 }
397 return dividend;
398 }
399
400
401
402
403
404
405
406 protected long length()
407 throws IOException
408 {
409 return fc.size();
410 }
411
412
413
414
415
416
417 protected void close()
418 throws IOException
419 {
420 fc.close();
421 }
422
423
424
425
426
427
428 protected void reset()
429 throws IOException
430 {
431 this.numberOfBlocks.set(0);
432 this.emptyBlocks.clear();
433 fc.truncate(0);
434 fc.force(true);
435 }
436
437
438
439
440 protected int getNumberOfBlocks()
441 {
442 return numberOfBlocks.get();
443 }
444
445
446
447
448 protected int getBlockSizeBytes()
449 {
450 return blockSizeBytes;
451 }
452
453
454
455
456 protected long getAveragePutSizeBytes()
457 {
458 long count = this.putCount.get();
459
460 if (count == 0 )
461 {
462 return 0;
463 }
464 return this.putBytes.get() / count;
465 }
466
467
468
469
470 protected int getEmptyBlocks()
471 {
472 return this.emptyBlocks.size();
473 }
474
475
476
477
478
479
480 @Override
481 public String toString()
482 {
483 StringBuffer buf = new StringBuffer();
484 buf.append( "\nBlock Disk " );
485 buf.append( "\n Filepath [" + filepath + "]" );
486 buf.append( "\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]" );
487 buf.append( "\n BlockSizeBytes [" + this.blockSizeBytes + "]" );
488 buf.append( "\n Put Bytes [" + this.putBytes + "]" );
489 buf.append( "\n Put Count [" + this.putCount + "]" );
490 buf.append( "\n Average Size [" + getAveragePutSizeBytes() + "]" );
491 buf.append( "\n Empty Blocks [" + this.getEmptyBlocks() + "]" );
492 try
493 {
494 buf.append( "\n Length [" + length() + "]" );
495 }
496 catch ( IOException e )
497 {
498
499 }
500 return buf.toString();
501 }
502
503
504
505
506
507
508 protected String getFilePath()
509 {
510 return filepath;
511 }
512 }