View Javadoc
1   package org.apache.commons.jcs3.auxiliary.disk.indexed;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.FileChannel;
26  import java.nio.file.StandardOpenOption;
27  
28  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
29  import org.apache.commons.jcs3.log.Log;
30  import org.apache.commons.jcs3.log.LogManager;
31  
32  /** Provides thread safe access to the underlying random access file. */
33  public class IndexedDisk implements AutoCloseable
34  {
35      /** The size of the header that indicates the amount of data stored in an occupied block. */
36      public static final byte HEADER_SIZE_BYTES = 4;
37  
38      /** The serializer. */
39      private final IElementSerializer elementSerializer;
40  
41      /** The logger */
42      private static final Log log = LogManager.getLog(IndexedDisk.class);
43  
44      /** The path to the log directory. */
45      private final String filepath;
46  
47      /** The data file. */
48      private final FileChannel fc;
49  
50      /**
51       * Constructor for the Disk object
52       * <p>
53       * @param file
54       * @param elementSerializer
55       * @throws IOException
56       */
57      public IndexedDisk(final File file, final IElementSerializer elementSerializer)
58          throws IOException
59      {
60          this.filepath = file.getAbsolutePath();
61          this.elementSerializer = elementSerializer;
62          this.fc = FileChannel.open(file.toPath(),
63                  StandardOpenOption.CREATE,
64                  StandardOpenOption.READ,
65                  StandardOpenOption.WRITE);
66      }
67  
68      /**
69       * This reads an object from the given starting position on the file.
70       * <p>
71       * The first four bytes of the record should tell us how long it is. The data is read into a byte
72       * array and then an object is constructed from the byte array.
73       * <p>
74       * @return Serializable
75       * @param ded
76       * @throws IOException
77       * @throws ClassNotFoundException
78       */
79      protected <T> T readObject(final IndexedDiskElementDescriptor ded)
80          throws IOException, ClassNotFoundException
81      {
82          String message = null;
83          boolean corrupted = false;
84          final long fileLength = fc.size();
85          if (ded.pos > fileLength)
86          {
87              corrupted = true;
88              message = "Record " + ded + " starts past EOF.";
89          }
90          else
91          {
92              final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
93              fc.read(datalength, ded.pos);
94              datalength.flip();
95              final int datalen = datalength.getInt();
96              if (ded.len != datalen)
97              {
98                  corrupted = true;
99                  message = "Record " + ded + " does not match data length on disk (" + datalen + ")";
100             }
101             else if (ded.pos + ded.len > fileLength)
102             {
103                 corrupted = true;
104                 message = "Record " + ded + " exceeds file length.";
105             }
106         }
107 
108         if (corrupted)
109         {
110             log.warn("\n The file is corrupt: \n {0}", message);
111             throw new IOException("The File Is Corrupt, need to reset");
112         }
113 
114         final ByteBuffer data = ByteBuffer.allocate(ded.len);
115         fc.read(data, ded.pos + HEADER_SIZE_BYTES);
116         data.flip();
117 
118         return elementSerializer.deSerialize(data.array(), null);
119     }
120 
121     /**
122      * Moves the data stored from one position to another. The descriptor's position is updated.
123      * <p>
124      * @param ded
125      * @param newPosition
126      * @throws IOException
127      */
128     protected void move(final IndexedDiskElementDescriptor ded, final long newPosition)
129         throws IOException
130     {
131         final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
132         fc.read(datalength, ded.pos);
133         datalength.flip();
134         final int length = datalength.getInt();
135 
136         if (length != ded.len)
137         {
138             throw new IOException("Mismatched memory and disk length (" + length + ") for " + ded);
139         }
140 
141         // TODO: more checks?
142 
143         long readPos = ded.pos;
144         long writePos = newPosition;
145 
146         // header len + data len
147         int remaining = HEADER_SIZE_BYTES + length;
148         final ByteBuffer buffer = ByteBuffer.allocate(16384);
149 
150         while (remaining > 0)
151         {
152             // chunk it
153             final int chunkSize = Math.min(remaining, buffer.capacity());
154             buffer.limit(chunkSize);
155             fc.read(buffer, readPos);
156             buffer.flip();
157             fc.write(buffer, writePos);
158             buffer.clear();
159 
160             writePos += chunkSize;
161             readPos += chunkSize;
162             remaining -= chunkSize;
163         }
164 
165         ded.pos = newPosition;
166     }
167 
168     /**
169      * Writes the given byte array to the Disk at the specified position.
170      * <p>
171      * @param data
172      * @param ded
173      * @return true if we wrote successfully
174      * @throws IOException
175      */
176     protected boolean write(final IndexedDiskElementDescriptor ded, final byte[] data)
177         throws IOException
178     {
179         final long pos = ded.pos;
180         if (log.isTraceEnabled())
181         {
182             log.trace("write> pos={0}", pos);
183             log.trace("{0} -- data.length = {1}", fc, data.length);
184         }
185 
186         if (data.length != ded.len)
187         {
188             throw new IOException("Mismatched descriptor and data lengths");
189         }
190 
191         final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
192         headerBuffer.putInt(data.length);
193         // write the header
194         headerBuffer.flip();
195         int written = fc.write(headerBuffer, pos);
196         assert written == HEADER_SIZE_BYTES;
197 
198         //write the data
199         final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
200         written = fc.write(dataBuffer, pos + HEADER_SIZE_BYTES);
201 
202         return written == data.length;
203     }
204 
205     /**
206      * Serializes the object and write it out to the given position.
207      * <p>
208      * TODO: make this take a ded as well.
209      * @param obj
210      * @param pos
211      * @throws IOException
212      */
213     protected <T> void writeObject(final T obj, final long pos)
214         throws IOException
215     {
216         final byte[] data = elementSerializer.serialize(obj);
217         write(new IndexedDiskElementDescriptor(pos, data.length), data);
218     }
219 
220     /**
221      * Returns the raf length.
222      *
223      * @return the length of the file.
224      * @throws IOException If an I/O error occurs.
225      */
226     protected long length()
227         throws IOException
228     {
229         return fc.size();
230     }
231 
232     /**
233      * Closes the raf.
234      * <p>
235      * @throws IOException
236      */
237     @Override
238     public void close()
239         throws IOException
240     {
241         fc.close();
242     }
243 
244     /**
245      * Sets the raf to empty.
246      * <p>
247      * @throws IOException
248      */
249     protected synchronized void reset()
250         throws IOException
251     {
252         log.debug("Resetting Indexed File [{0}]", filepath);
253         fc.truncate(0);
254         fc.force(true);
255     }
256 
257     /**
258      * Truncates the file to a given length.
259      * <p>
260      * @param length the new length of the file
261      * @throws IOException
262      */
263     protected void truncate(final long length)
264         throws IOException
265     {
266         log.info("Truncating file [{0}] to {1}", filepath, length);
267         fc.truncate(length);
268     }
269 
270     /**
271      * This is used for debugging.
272      * <p>
273      * @return the file path.
274      */
275     protected String getFilePath()
276     {
277         return filepath;
278     }
279 
280     /**
281      * Tests if the length is 0.
282      * @return true if the if the length is 0.
283      * @throws IOException If an I/O error occurs.
284      * @since 3.1
285      */
286     protected boolean isEmpty() throws IOException
287     {
288         return length() == 0;
289     }
290 }