001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.nio.channels.AsynchronousSocketChannel; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.Future; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.TimeoutException; 029import java.util.concurrent.locks.Lock; 030import java.util.concurrent.locks.ReentrantLock; 031 032import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor; 033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 034import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 035import org.apache.commons.jcs3.log.Log; 036import org.apache.commons.jcs3.log.LogManager; 037import org.apache.commons.jcs3.utils.serialization.StandardSerializer; 038 039/** 040 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so 041 * it is significantly different. 042 */ 043public class LateralTCPSender 044{ 045 /** The logger */ 046 private static final Log log = LogManager.getLog( LateralTCPSender.class ); 047 048 /** Config */ 049 private final int socketOpenTimeOut; 050 private final int socketSoTimeOut; 051 052 /** The serializer. */ 053 private final IElementSerializer serializer; 054 055 /** The client connection with the server. */ 056 private AsynchronousSocketChannel client; 057 058 /** how many messages sent */ 059 private int sendCnt; 060 061 /** Use to synchronize multiple threads that may be trying to get. */ 062 private final Lock lock = new ReentrantLock(true); 063 064 /** 065 * Constructor for the LateralTCPSender object. 066 * <p> 067 * @param lca 068 * @throws IOException 069 * @deprecated Specify serializer 070 */ 071 @Deprecated 072 public LateralTCPSender( final ITCPLateralCacheAttributes lca ) 073 throws IOException 074 { 075 this(lca, new StandardSerializer()); 076 } 077 078 /** 079 * Constructor for the LateralTCPSender object. 080 * <p> 081 * @param lca the configuration object 082 * @param serializer the serializer to use when sending 083 * @throws IOException 084 * @since 3.1 085 */ 086 public LateralTCPSender( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer ) 087 throws IOException 088 { 089 this.socketOpenTimeOut = lca.getOpenTimeOut(); 090 this.socketSoTimeOut = lca.getSocketTimeOut(); 091 092 this.serializer = serializer; 093 094 final String p1 = lca.getTcpServer(); 095 if ( p1 == null ) 096 { 097 throw new IOException( "Invalid server (null)" ); 098 } 099 100 final int colonPosition = p1.lastIndexOf(':'); 101 102 if ( colonPosition < 0 ) 103 { 104 throw new IOException( "Invalid address [" + p1 + "]" ); 105 } 106 107 final String h2 = p1.substring( 0, colonPosition ); 108 final int po = Integer.parseInt( p1.substring( colonPosition + 1 ) ); 109 log.debug( "h2 = {0}, po = {1}", h2, po ); 110 111 if ( h2.isEmpty() ) 112 { 113 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" ); 114 } 115 116 init( h2, po ); 117 } 118 119 /** 120 * Creates a connection to a TCP server. 121 * <p> 122 * @param host 123 * @param port 124 * @throws IOException 125 */ 126 protected void init( final String host, final int port ) 127 throws IOException 128 { 129 log.info( "Attempting connection to [{0}:{1}]", host, port ); 130 131 try 132 { 133 client = AsynchronousSocketChannel.open(); 134 InetSocketAddress hostAddress = new InetSocketAddress(host, port); 135 Future<Void> future = client.connect(hostAddress); 136 137 future.get(this.socketOpenTimeOut, TimeUnit.MILLISECONDS); 138 } 139 catch (final IOException | InterruptedException | ExecutionException | TimeoutException ioe) 140 { 141 throw new IOException( "Cannot connect to " + host + ":" + port, ioe ); 142 } 143 } 144 145 /** 146 * Sends commands to the lateral cache listener. 147 * <p> 148 * @param led 149 * @throws IOException 150 */ 151 public <K, V> void send( final LateralElementDescriptor<K, V> led ) 152 throws IOException 153 { 154 sendCnt++; 155 if ( log.isInfoEnabled() && sendCnt % 100 == 0 ) 156 { 157 log.info( "Send Count {0} = {1}", client.getRemoteAddress(), sendCnt ); 158 } 159 160 log.debug( "sending LateralElementDescriptor" ); 161 162 if ( led == null ) 163 { 164 return; 165 } 166 167 lock.lock(); 168 try 169 { 170 serializer.serializeTo(led, client, socketSoTimeOut); 171 } 172 finally 173 { 174 lock.unlock(); 175 } 176 } 177 178 /** 179 * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could 180 * get into a pretty bad blocking situation here. This needs work. I just wanted to get some 181 * form of get working. However, get is not recommended for performance reasons. If you have 10 182 * laterals, then you have to make 10 failed gets to find out none of the caches have the item. 183 * <p> 184 * @param led 185 * @return ICacheElement 186 * @throws IOException 187 */ 188 public <K, V> Object sendAndReceive( final LateralElementDescriptor<K, V> led ) 189 throws IOException 190 { 191 if ( led == null ) 192 { 193 return null; 194 } 195 196 // Synchronized to insure that the get requests to server from this 197 // sender and the responses are processed in order, else you could 198 // return the wrong item from the cache. 199 // This is a big block of code. May need to re-think this strategy. 200 // This may not be necessary. 201 // Normal puts, etc to laterals do not have to be synchronized. 202 Object response = null; 203 204 lock.lock(); 205 try 206 { 207 // write object to listener 208 send(led); 209 response = serializer.deSerializeFrom(client, socketSoTimeOut, null); 210 } 211 catch ( final IOException | ClassNotFoundException ioe ) 212 { 213 final String message = "Could not open channel to " + 214 client.getRemoteAddress() + " SoTimeout [" + socketSoTimeOut + 215 "] Connected [" + client.isOpen() + "]"; 216 log.error( message, ioe ); 217 throw new IOException(message, ioe); 218 } 219 finally 220 { 221 lock.unlock(); 222 } 223 224 return response; 225 } 226 227 /** 228 * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request 229 * should come into the facade and be sent to all lateral cache services. The lateral cache 230 * service will then call this method. 231 * <p> 232 * @throws IOException 233 */ 234 public void dispose() 235 throws IOException 236 { 237 log.info( "Dispose called" ); 238 client.close(); 239 } 240}