View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.nio.channels.AsynchronousSocketChannel;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.Future;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.TimeoutException;
29  import java.util.concurrent.locks.Lock;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
33  import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
35  import org.apache.commons.jcs3.log.Log;
36  import org.apache.commons.jcs3.log.LogManager;
37  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
38  
39  /**
40   * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
41   * it is significantly different.
42   */
43  public class LateralTCPSender
44  {
45      /** The logger */
46      private static final Log log = LogManager.getLog( LateralTCPSender.class );
47  
48      /** Config */
49      private final int socketOpenTimeOut;
50      private final int socketSoTimeOut;
51  
52      /** The serializer. */
53      private final IElementSerializer serializer;
54  
55      /** The client connection with the server. */
56      private AsynchronousSocketChannel client;
57  
58      /** how many messages sent */
59      private int sendCnt;
60  
61      /** Use to synchronize multiple threads that may be trying to get. */
62      private final Lock lock = new ReentrantLock(true);
63  
64      /**
65       * Constructor for the LateralTCPSender object.
66       * <p>
67       * @param lca
68       * @throws IOException
69       * @deprecated Specify serializer
70       */
71      @Deprecated
72      public LateralTCPSender( final ITCPLateralCacheAttributes lca )
73          throws IOException
74      {
75          this(lca, new StandardSerializer());
76      }
77  
78      /**
79       * Constructor for the LateralTCPSender object.
80       * <p>
81       * @param lca the configuration object
82       * @param serializer the serializer to use when sending
83       * @throws IOException
84       * @since 3.1
85       */
86      public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
87          throws IOException
88      {
89          this.socketOpenTimeOut = lca.getOpenTimeOut();
90          this.socketSoTimeOut = lca.getSocketTimeOut();
91  
92          this.serializer = serializer;
93  
94          final String p1 = lca.getTcpServer();
95          if ( p1 == null )
96          {
97              throw new IOException( "Invalid server (null)" );
98          }
99  
100         final int colonPosition = p1.lastIndexOf(':');
101 
102         if ( colonPosition < 0 )
103         {
104             throw new IOException( "Invalid address [" + p1 + "]" );
105         }
106 
107         final String h2 = p1.substring( 0, colonPosition );
108         final int po = Integer.parseInt( p1.substring( colonPosition + 1 ) );
109         log.debug( "h2 = {0}, po = {1}", h2, po );
110 
111         if ( h2.isEmpty() )
112         {
113             throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" );
114         }
115 
116         init( h2, po );
117     }
118 
119     /**
120      * Creates a connection to a TCP server.
121      * <p>
122      * @param host
123      * @param port
124      * @throws IOException
125      */
126     protected void init( final String host, final int port )
127         throws IOException
128     {
129         log.info( "Attempting connection to [{0}:{1}]", host, port );
130 
131         try
132         {
133             client = AsynchronousSocketChannel.open();
134             InetSocketAddress hostAddress = new InetSocketAddress(host, port);
135             Future<Void> future = client.connect(hostAddress);
136 
137             future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS);
138         }
139         catch (final IOException | InterruptedException | ExecutionException | TimeoutException ioe)
140         {
141             throw new IOException( "Cannot connect to " + host + ":" + port, ioe );
142         }
143     }
144 
145     /**
146      * Sends commands to the lateral cache listener.
147      * <p>
148      * @param led
149      * @throws IOException
150      */
151     public <K, V> void send( final LateralElementDescriptor<K, V> led )
152         throws IOException
153     {
154         sendCnt++;
155         if ( log.isInfoEnabled() && sendCnt % 100 == 0 )
156         {
157             log.info( "Send Count {0} = {1}", client.getRemoteAddress(), sendCnt );
158         }
159 
160         log.debug( "sending LateralElementDescriptor" );
161 
162         if ( led == null )
163         {
164             return;
165         }
166 
167         lock.lock();
168         try
169         {
170             serializer.serializeTo(led, client, socketSoTimeOut);
171         }
172         finally
173         {
174             lock.unlock();
175         }
176     }
177 
178     /**
179      * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could
180      * get into a pretty bad blocking situation here. This needs work. I just wanted to get some
181      * form of get working. However, get is not recommended for performance reasons. If you have 10
182      * laterals, then you have to make 10 failed gets to find out none of the caches have the item.
183      * <p>
184      * @param led
185      * @return ICacheElement
186      * @throws IOException
187      */
188     public <K, V> Object sendAndReceive( final LateralElementDescriptor<K, V> led )
189         throws IOException
190     {
191         if ( led == null )
192         {
193             return null;
194         }
195 
196         // Synchronized to insure that the get requests to server from this
197         // sender and the responses are processed in order, else you could
198         // return the wrong item from the cache.
199         // This is a big block of code. May need to re-think this strategy.
200         // This may not be necessary.
201         // Normal puts, etc to laterals do not have to be synchronized.
202         Object response = null;
203 
204         lock.lock();
205         try
206         {
207             // write object to listener
208             send(led);
209             response = serializer.deSerializeFrom(client, socketSoTimeOut, null);
210         }
211         catch ( final IOException | ClassNotFoundException ioe )
212         {
213             final String message = "Could not open channel to " +
214                 client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut +
215                 "] Connected [" + client.isOpen() + "]";
216             log.error( message, ioe );
217             throw new IOException(message, ioe);
218         }
219         finally
220         {
221             lock.unlock();
222         }
223 
224         return response;
225     }
226 
227     /**
228      * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request
229      * should come into the facade and be sent to all lateral cache services. The lateral cache
230      * service will then call this method.
231      * <p>
232      * @throws IOException
233      */
234     public void dispose()
235         throws IOException
236     {
237         log.info( "Dispose called" );
238         client.close();
239     }
240 }