001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.nio.channels.AsynchronousSocketChannel;
025import java.util.concurrent.ExecutionException;
026import java.util.concurrent.Future;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029import java.util.concurrent.locks.Lock;
030import java.util.concurrent.locks.ReentrantLock;
031
032import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
034import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
035import org.apache.commons.jcs3.log.Log;
036import org.apache.commons.jcs3.log.LogManager;
037import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
038
039/**
040 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
041 * it is significantly different.
042 */
043public class LateralTCPSender
044{
045    /** The logger */
046    private static final Log log = LogManager.getLog( LateralTCPSender.class );
047
048    /** Config */
049    private final int socketOpenTimeOut;
050    private final int socketSoTimeOut;
051
052    /** The serializer. */
053    private final IElementSerializer serializer;
054
055    /** The client connection with the server. */
056    private AsynchronousSocketChannel client;
057
058    /** how many messages sent */
059    private int sendCnt;
060
061    /** Use to synchronize multiple threads that may be trying to get. */
062    private final Lock lock = new ReentrantLock(true);
063
064    /**
065     * Constructor for the LateralTCPSender object.
066     * <p>
067     * @param lca
068     * @throws IOException
069     * @deprecated Specify serializer
070     */
071    @Deprecated
072    public LateralTCPSender( final ITCPLateralCacheAttributes lca )
073        throws IOException
074    {
075        this(lca, new StandardSerializer());
076    }
077
078    /**
079     * Constructor for the LateralTCPSender object.
080     * <p>
081     * @param lca the configuration object
082     * @param serializer the serializer to use when sending
083     * @throws IOException
084     * @since 3.1
085     */
086    public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
087        throws IOException
088    {
089        this.socketOpenTimeOut = lca.getOpenTimeOut();
090        this.socketSoTimeOut = lca.getSocketTimeOut();
091
092        this.serializer = serializer;
093
094        final String p1 = lca.getTcpServer();
095        if ( p1 == null )
096        {
097            throw new IOException( "Invalid server (null)" );
098        }
099
100        final int colonPosition = p1.lastIndexOf(':');
101
102        if ( colonPosition < 0 )
103        {
104            throw new IOException( "Invalid address [" + p1 + "]" );
105        }
106
107        final String h2 = p1.substring( 0, colonPosition );
108        final int po = Integer.parseInt( p1.substring( colonPosition + 1 ) );
109        log.debug( "h2 = {0}, po = {1}", h2, po );
110
111        if ( h2.isEmpty() )
112        {
113            throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
114        }
115
116        init( h2, po );
117    }
118
119    /**
120     * Creates a connection to a TCP server.
121     * <p>
122     * @param host
123     * @param port
124     * @throws IOException
125     */
126    protected void init( final String host, final int port )
127        throws IOException
128    {
129        log.info( "Attempting connection to [{0}:{1}]", host, port );
130
131        try
132        {
133            client = AsynchronousSocketChannel.open();
134            InetSocketAddress hostAddress = new InetSocketAddress(host, port);
135            Future<Void> future = client.connect(hostAddress);
136
137            future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS);
138        }
139        catch (final IOException | InterruptedException | ExecutionException | TimeoutException ioe)
140        {
141            throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
142        }
143    }
144
145    /**
146     * Sends commands to the lateral cache listener.
147     * <p>
148     * @param led
149     * @throws IOException
150     */
151    public <K, V> void send( final LateralElementDescriptor<K, V> led )
152        throws IOException
153    {
154        sendCnt++;
155        if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
156        {
157            log.info( "Send Count {0} = {1}", client.getRemoteAddress(), sendCnt );
158        }
159
160        log.debug( "sending LateralElementDescriptor" );
161
162        if ( led == null )
163        {
164            return;
165        }
166
167        lock.lock();
168        try
169        {
170            serializer.serializeTo(led, client, socketSoTimeOut);
171        }
172        finally
173        {
174            lock.unlock();
175        }
176    }
177
178    /**
179     * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
180     * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
181     * form of get working. However, get is not recommended for performance reasons. If you have 10
182     * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
183     * <p>
184     * @param led
185     * @return ICacheElement
186     * @throws IOException
187     */
188    public <K, V> Object sendAndReceive( final LateralElementDescriptor<K, V> led )
189        throws IOException
190    {
191        if ( led == null )
192        {
193            return null;
194        }
195
196        // Synchronized to insure that the get requests to server from this
197        // sender and the responses are processed in order, else you could
198        // return the wrong item from the cache.
199        // This is a big block of code. May need to re-think this strategy.
200        // This may not be necessary.
201        // Normal puts, etc to laterals do not have to be synchronized.
202        Object response = null;
203
204        lock.lock();
205        try
206        {
207            // write object to listener
208            send(led);
209            response = serializer.deSerializeFrom(client, socketSoTimeOut, null);
210        }
211        catch ( final IOException | ClassNotFoundException ioe )
212        {
213            final String message = "Could not open channel to " +
214                client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut +
215                "] Connected [" + client.isOpen() + "]";
216            log.error( message, ioe );
217            throw new IOException(message, ioe);
218        }
219        finally
220        {
221            lock.unlock();
222        }
223
224        return response;
225    }
226
227    /**
228     * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
229     * should come into the facade and be sent to all lateral cache services. The lateral cache
230     * service will then call this method.
231     * <p>
232     * @throws IOException
233     */
234    public void dispose()
235        throws IOException
236    {
237        log.info( "Dispose called" );
238        client.close();
239    }
240}