001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.io.ObjectInputStream; 024import java.io.ObjectOutputStream; 025import java.net.InetSocketAddress; 026import java.net.Socket; 027 028import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor; 029import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 030import org.apache.commons.jcs.io.ObjectInputStreamClassLoaderAware; 031import org.apache.commons.logging.Log; 032import org.apache.commons.logging.LogFactory; 033 034/** 035 * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so 036 * it is significantly different. 037 */ 038public class LateralTCPSender 039{ 040 /** The logger */ 041 private static final Log log = LogFactory.getLog( LateralTCPSender.class ); 042 043 /** Config */ 044 private int socketOpenTimeOut; 045 private int socketSoTimeOut; 046 047 /** The stream from the server connection. */ 048 private ObjectOutputStream oos; 049 050 /** The socket connection with the server. */ 051 private Socket socket; 052 053 /** how many messages sent */ 054 private int sendCnt = 0; 055 056 /** Use to synchronize multiple threads that may be trying to get. */ 057 private final Object getLock = new int[0]; 058 059 /** 060 * Constructor for the LateralTCPSender object. 061 * <p> 062 * @param lca 063 * @throws IOException 064 */ 065 public LateralTCPSender( ITCPLateralCacheAttributes lca ) 066 throws IOException 067 { 068 this.socketOpenTimeOut = lca.getOpenTimeOut(); 069 this.socketSoTimeOut = lca.getSocketTimeOut(); 070 071 String p1 = lca.getTcpServer(); 072 if ( p1 == null ) 073 { 074 throw new IOException( "Invalid server (null)" ); 075 } 076 077 String h2 = p1.substring( 0, p1.indexOf( ":" ) ); 078 int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) ); 079 if ( log.isDebugEnabled() ) 080 { 081 log.debug( "h2 = " + h2 ); 082 log.debug( "po = " + po ); 083 } 084 085 if ( h2.length() == 0 ) 086 { 087 throw new IOException( "Cannot connect to invalid address [" + h2 + ":" + po + "]" ); 088 } 089 090 init( h2, po ); 091 } 092 093 /** 094 * Creates a connection to a TCP server. 095 * <p> 096 * @param host 097 * @param port 098 * @throws IOException 099 */ 100 protected void init( String host, int port ) 101 throws IOException 102 { 103 try 104 { 105 if ( log.isInfoEnabled() ) 106 { 107 log.info( "Attempting connection to [" + host + "]" ); 108 } 109 110 // have time out socket open do this for us 111 try 112 { 113 socket = new Socket(); 114 socket.connect( new InetSocketAddress( host, port ), this.socketOpenTimeOut ); 115 } 116 catch ( IOException ioe ) 117 { 118 if (socket != null) 119 { 120 socket.close(); 121 } 122 123 throw new IOException( "Cannot connect to " + host + ":" + port, ioe ); 124 } 125 126 socket.setSoTimeout( socketSoTimeOut ); 127 synchronized ( this ) 128 { 129 oos = new ObjectOutputStream( socket.getOutputStream() ); 130 } 131 } 132 catch ( java.net.ConnectException e ) 133 { 134 log.debug( "Remote host [" + host + "] refused connection." ); 135 throw e; 136 } 137 catch ( IOException e ) 138 { 139 log.debug( "Could not connect to [" + host + "]. Exception is " + e ); 140 throw e; 141 } 142 } 143 144 /** 145 * Sends commands to the lateral cache listener. 146 * <p> 147 * @param led 148 * @throws IOException 149 */ 150 public <K, V> void send( LateralElementDescriptor<K, V> led ) 151 throws IOException 152 { 153 sendCnt++; 154 if ( log.isInfoEnabled() && sendCnt % 100 == 0 ) 155 { 156 log.info( "Send Count (port " + socket.getPort() + ") = " + sendCnt ); 157 } 158 159 if ( log.isDebugEnabled() ) 160 { 161 log.debug( "sending LateralElementDescriptor" ); 162 } 163 164 if ( led == null ) 165 { 166 return; 167 } 168 169 if ( oos == null ) 170 { 171 throw new IOException( "No remote connection is available for LateralTCPSender." ); 172 } 173 174 synchronized ( this.getLock ) 175 { 176 oos.writeUnshared( led ); 177 oos.flush(); 178 } 179 } 180 181 /** 182 * Sends commands to the lateral cache listener and gets a response. I'm afraid that we could 183 * get into a pretty bad blocking situation here. This needs work. I just wanted to get some 184 * form of get working. However, get is not recommended for performance reasons. If you have 10 185 * laterals, then you have to make 10 failed gets to find out none of the caches have the item. 186 * <p> 187 * @param led 188 * @return ICacheElement 189 * @throws IOException 190 */ 191 public <K, V> Object sendAndReceive( LateralElementDescriptor<K, V> led ) 192 throws IOException 193 { 194 if ( led == null ) 195 { 196 return null; 197 } 198 199 if ( oos == null ) 200 { 201 throw new IOException( "No remote connection is available for LateralTCPSender." ); 202 } 203 204 Object response = null; 205 206 // Synchronized to insure that the get requests to server from this 207 // sender and the responses are processed in order, else you could 208 // return the wrong item from the cache. 209 // This is a big block of code. May need to re-think this strategy. 210 // This may not be necessary. 211 // Normal puts, etc to laterals do not have to be synchronized. 212 synchronized ( this.getLock ) 213 { 214 try 215 { 216 // clean up input stream, nothing should be there yet. 217 if ( socket.getInputStream().available() > 0 ) 218 { 219 socket.getInputStream().read( new byte[socket.getInputStream().available()] ); 220 } 221 } 222 catch ( IOException ioe ) 223 { 224 log.error( "Problem cleaning socket before send " + socket, ioe ); 225 throw ioe; 226 } 227 228 // write object to listener 229 oos.writeUnshared( led ); 230 oos.flush(); 231 ObjectInputStream ois = null; 232 233 try 234 { 235 socket.setSoTimeout( socketSoTimeOut ); 236 ois = new ObjectInputStreamClassLoaderAware( socket.getInputStream(), null ); 237 response = ois.readObject(); 238 } 239 catch ( IOException ioe ) 240 { 241 String message = "Could not open ObjectInputStream to " + socket + 242 " SoTimeout [" + socket.getSoTimeout() + 243 "] Connected [" + socket.isConnected() + "]"; 244 log.error( message, ioe ); 245 throw ioe; 246 } 247 catch ( Exception e ) 248 { 249 log.error( e ); 250 } 251 finally 252 { 253 if (ois != null) 254 { 255 ois.close(); 256 } 257 } 258 } 259 260 return response; 261 } 262 263 /** 264 * Closes connection used by all LateralTCPSenders for this lateral connection. Dispose request 265 * should come into the facade and be sent to all lateral cache services. The lateral cache 266 * service will then call this method. 267 * <p> 268 * @throws IOException 269 */ 270 public void dispose() 271 throws IOException 272 { 273 if ( log.isInfoEnabled() ) 274 { 275 log.info( "Dispose called" ); 276 } 277 // WILL CLOSE CONNECTION USED BY ALL 278 oos.close(); 279 socket.close(); 280 } 281}