001package org.apache.commons.jcs3.engine.behavior;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.EOFException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.nio.ByteBuffer;
027import java.nio.channels.AsynchronousByteChannel;
028import java.nio.channels.ReadableByteChannel;
029import java.nio.channels.WritableByteChannel;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Future;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034
035/**
036 * Defines the behavior for cache element serializers. This layer of abstraction allows us to plug
037 * in different serialization mechanisms, such as a compressing standard serializer.
038 */
039public interface IElementSerializer
040{
041    /**
042     * Turns an object into a byte array.
043     *
044     * @param <T> the type of the object
045     * @param obj the object to serialize
046     * @return byte[] a byte array containing the serialized object
047     * @throws IOException if serialization fails
048     */
049    <T> byte[] serialize( T obj )
050        throws IOException;
051
052    /**
053     * Turns a byte array into an object.
054     *
055     * @param bytes data bytes
056     * @param loader class loader to use
057     * @return Object
058     * @throws IOException if de-serialization fails
059     * @throws ClassNotFoundException thrown if we don't know the object.
060     */
061    <T> T deSerialize( byte[] bytes, ClassLoader loader )
062        throws IOException, ClassNotFoundException;
063
064    /**
065     * Convenience method to write serialized object into a stream.
066     * The stream data will be prepended with a four-byte length prefix.
067     *
068     * @param <T> the type of the object
069     * @param obj the object to serialize
070     * @param os the output stream
071     * @return the number of bytes written
072     * @throws IOException if serialization or writing fails
073     * @since 3.1
074     */
075    default <T> int serializeTo(T obj, OutputStream os)
076        throws IOException
077    {
078        final byte[] serialized = serialize(obj);
079        final ByteBuffer buffer = ByteBuffer.allocate(4 + serialized.length);
080        buffer.putInt(serialized.length);
081        buffer.put(serialized);
082        buffer.flip();
083
084        os.write(buffer.array());
085        return buffer.capacity();
086    }
087
088    /**
089     * Convenience method to write serialized object into a channel.
090     * The stream data will be prepended with a four-byte length prefix.
091     *
092     * @param <T> the type of the object
093     * @param obj the object to serialize
094     * @param oc the output channel
095     * @return the number of bytes written
096     * @throws IOException if serialization or writing fails
097     * @since 3.1
098     */
099    default <T> int serializeTo(T obj, WritableByteChannel oc)
100        throws IOException
101    {
102        final byte[] serialized = serialize(obj);
103        final ByteBuffer buffer = ByteBuffer.allocate(4 + serialized.length);
104        buffer.putInt(serialized.length);
105        buffer.put(serialized);
106        buffer.flip();
107
108        int count = 0;
109        while (buffer.hasRemaining())
110        {
111            count += oc.write(buffer);
112        }
113        return count;
114    }
115
116    /**
117     * Convenience method to write serialized object into an
118     * asynchronous channel.
119     * The stream data will be prepended with a four-byte length prefix.
120     *
121     * @param <T> the type of the object
122     * @param obj the object to serialize
123     * @param oc the output channel
124     * @param writeTimeoutMs the write timeout im milliseconds
125     * @return the number of bytes written
126     * @throws IOException if serialization or writing fails
127     * @since 3.1
128     */
129    default <T> int serializeTo(T obj, AsynchronousByteChannel oc, int writeTimeoutMs)
130        throws IOException
131    {
132        final byte[] serialized = serialize(obj);
133        final ByteBuffer buffer = ByteBuffer.allocate(4 + serialized.length);
134        buffer.putInt(serialized.length);
135        buffer.put(serialized);
136        buffer.flip();
137
138        int count = 0;
139        while (buffer.hasRemaining())
140        {
141            Future<Integer> bytesWritten = oc.write(buffer);
142            try
143            {
144                count += bytesWritten.get(writeTimeoutMs, TimeUnit.MILLISECONDS);
145            }
146            catch (InterruptedException | ExecutionException | TimeoutException e)
147            {
148                throw new IOException("Write timeout exceeded " + writeTimeoutMs, e);
149            }
150        }
151
152        return count;
153    }
154
155    /**
156     * Convenience method to read serialized object from a stream.
157     * The method expects to find a four-byte length prefix in the
158     * stream data.
159     *
160     * @param <T> the type of the object
161     * @param is the input stream
162     * @param loader class loader to use
163     * @throws IOException if serialization or reading fails
164     * @throws ClassNotFoundException thrown if we don't know the object.
165     * @since 3.1
166     */
167    default <T> T deSerializeFrom(InputStream is, ClassLoader loader)
168        throws IOException, ClassNotFoundException
169    {
170        final byte[] bufferSize = new byte[4];
171        int read = is.read(bufferSize);
172        if (read < 0)
173        {
174            throw new EOFException("End of stream reached");
175        }
176        assert read == bufferSize.length;
177        ByteBuffer size = ByteBuffer.wrap(bufferSize);
178
179        byte[] serialized = new byte[size.getInt()];
180        read = is.read(serialized);
181        assert read == serialized.length;
182
183        return deSerialize(serialized, loader);
184    }
185
186    /**
187     * Convenience method to read serialized object from a channel.
188     * The method expects to find a four-byte length prefix in the
189     * stream data.
190     *
191     * @param <T> the type of the object
192     * @param ic the input channel
193     * @param loader class loader to use
194     * @throws IOException if serialization or reading fails
195     * @throws ClassNotFoundException thrown if we don't know the object.
196     * @since 3.1
197     */
198    default <T> T deSerializeFrom(ReadableByteChannel ic, ClassLoader loader)
199        throws IOException, ClassNotFoundException
200    {
201        final ByteBuffer bufferSize = ByteBuffer.allocate(4);
202        int read = ic.read(bufferSize);
203        if (read < 0)
204        {
205            throw new EOFException("End of stream reached (length)");
206        }
207        assert read == bufferSize.capacity();
208        bufferSize.flip();
209
210        final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt());
211        while (serialized.remaining() > 0)
212        {
213            read = ic.read(serialized);
214            if (read < 0)
215            {
216                throw new EOFException("End of stream reached (object)");
217            }
218        }
219        serialized.flip();
220
221        return deSerialize(serialized.array(), loader);
222    }
223
224    /**
225     * Convenience method to read serialized object from an
226     * asynchronous channel.
227     * The method expects to find a four-byte length prefix in the
228     * stream data.
229     *
230     * @param <T> the type of the object
231     * @param ic the input channel
232     * @param readTimeoutMs the read timeout in milliseconds
233     * @param loader class loader to use
234     * @throws IOException if serialization or reading fails
235     * @throws ClassNotFoundException thrown if we don't know the object.
236     * @since 3.1
237     */
238    default <T> T deSerializeFrom(AsynchronousByteChannel ic, int readTimeoutMs, ClassLoader loader)
239        throws IOException, ClassNotFoundException
240    {
241        final ByteBuffer bufferSize = ByteBuffer.allocate(4);
242        Future<Integer> readFuture = ic.read(bufferSize);
243
244        try
245        {
246            int read = readFuture.get(readTimeoutMs, TimeUnit.MILLISECONDS);
247            if (read < 0)
248            {
249                throw new EOFException("End of stream reached (length)");
250            }
251            assert read == bufferSize.capacity();
252        }
253        catch (InterruptedException | ExecutionException | TimeoutException e)
254        {
255            throw new IOException("Read timeout exceeded (length)" + readTimeoutMs, e);
256        }
257
258        bufferSize.flip();
259
260        final ByteBuffer serialized = ByteBuffer.allocate(bufferSize.getInt());
261        while (serialized.remaining() > 0)
262        {
263            readFuture = ic.read(serialized);
264            try
265            {
266                int read = readFuture.get(readTimeoutMs, TimeUnit.MILLISECONDS);
267                if (read < 0)
268                {
269                    throw new EOFException("End of stream reached (object)");
270                }
271            }
272            catch (InterruptedException | ExecutionException | TimeoutException e)
273            {
274                throw new IOException("Read timeout exceeded (object)" + readTimeoutMs, e);
275            }
276        }
277
278        serialized.flip();
279
280        return deSerialize(serialized.array(), loader);
281    }
282}