001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.BufferedReader; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Set; 029 030import org.apache.commons.jcs.auxiliary.lateral.LateralCommand; 031import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor; 032import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 033import org.apache.commons.jcs.engine.CacheElement; 034import org.apache.commons.jcs.engine.CacheInfo; 035import org.apache.commons.jcs.engine.behavior.ICacheElement; 036import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039 040/** 041 * A lateral cache service implementation. Does not implement getGroupKey 042 * TODO: Remove generics 043 */ 044public class LateralTCPService<K, V> 045 implements ICacheServiceNonLocal<K, V> 046{ 047 /** The logger. */ 048 private static final Log log = LogFactory.getLog( LateralTCPService.class ); 049 050 /** special configuration */ 051 private boolean allowPut; 052 private boolean allowGet; 053 private boolean issueRemoveOnPut; 054 055 /** Sends to another lateral. */ 056 private LateralTCPSender sender; 057 058 /** use the vmid by default */ 059 private long listenerId = CacheInfo.listenerId; 060 061 /** 062 * Constructor for the LateralTCPService object 063 * <p> 064 * @param lca ITCPLateralCacheAttributes 065 * @throws IOException 066 */ 067 public LateralTCPService( ITCPLateralCacheAttributes lca ) 068 throws IOException 069 { 070 this.allowGet = lca.isAllowGet(); 071 this.allowPut = lca.isAllowPut(); 072 this.issueRemoveOnPut = lca.isIssueRemoveOnPut(); 073 074 try 075 { 076 sender = new LateralTCPSender( lca ); 077 078 if ( log.isInfoEnabled() ) 079 { 080 log.debug( "Created sender to [" + lca.getTcpServer() + "]" ); 081 } 082 } 083 catch ( IOException e ) 084 { 085 // log.error( "Could not create sender", e ); 086 // This gets thrown over and over in recovery mode. 087 // The stack trace isn't useful here. 088 log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() ); 089 090 throw e; 091 } 092 } 093 094 /** 095 * @param item 096 * @throws IOException 097 */ 098 @Override 099 public void update( ICacheElement<K, V> item ) 100 throws IOException 101 { 102 update( item, getListenerId() ); 103 } 104 105 /** 106 * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a 107 * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP 108 * packet. It describes what operation the receiver should take when it gets the packet. 109 * <p> 110 * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs.engine.behavior.ICacheElement, 111 * long) 112 */ 113 @Override 114 public void update( ICacheElement<K, V> item, long requesterId ) 115 throws IOException 116 { 117 // if we don't allow put, see if we should remove on put 118 if ( !this.allowPut && 119 // if we can't remove on put, and we can't put then return 120 !this.issueRemoveOnPut ) 121 { 122 return; 123 } 124 125 // if we shouldn't remove on put, then put 126 if ( !this.issueRemoveOnPut ) 127 { 128 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item ); 129 led.requesterId = requesterId; 130 led.command = LateralCommand.UPDATE; 131 sender.send( led ); 132 } 133 // else issue a remove with the hashcode for remove check on 134 // on the other end, this will be a server config option 135 else 136 { 137 if ( log.isDebugEnabled() ) 138 { 139 log.debug( "Issuing a remove for a put" ); 140 } 141 // set the value to null so we don't send the item 142 CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null ); 143 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce ); 144 led.requesterId = requesterId; 145 led.command = LateralCommand.REMOVE; 146 led.valHashCode = item.getVal().hashCode(); 147 sender.send( led ); 148 } 149 } 150 151 /** 152 * Uses the default listener id and calls the next remove method. 153 * <p> 154 * @see org.apache.commons.jcs.engine.behavior.ICacheService#remove(String, Object) 155 */ 156 @Override 157 public void remove( String cacheName, K key ) 158 throws IOException 159 { 160 remove( cacheName, key, getListenerId() ); 161 } 162 163 /** 164 * Wraps the key in a LateralElementDescriptor. 165 * <p> 166 * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long) 167 */ 168 @Override 169 public void remove( String cacheName, K key, long requesterId ) 170 throws IOException 171 { 172 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null ); 173 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce ); 174 led.requesterId = requesterId; 175 led.command = LateralCommand.REMOVE; 176 sender.send( led ); 177 } 178 179 /** 180 * Does nothing. 181 * <p> 182 * @throws IOException 183 */ 184 @Override 185 public void release() 186 throws IOException 187 { 188 // nothing needs to be done 189 } 190 191 /** 192 * Will close the connection. 193 * <p> 194 * @param cacheName 195 * @throws IOException 196 */ 197 @Override 198 public void dispose( String cacheName ) 199 throws IOException 200 { 201 sender.dispose(); 202 } 203 204 /** 205 * @param cacheName 206 * @param key 207 * @return ICacheElement<K, V> if found. 208 * @throws IOException 209 */ 210 @Override 211 public ICacheElement<K, V> get( String cacheName, K key ) 212 throws IOException 213 { 214 return get( cacheName, key, getListenerId() ); 215 } 216 217 /** 218 * If get is allowed, we will issues a get request. 219 * <p> 220 * @param cacheName 221 * @param key 222 * @param requesterId 223 * @return ICacheElement<K, V> if found. 224 * @throws IOException 225 */ 226 @Override 227 public ICacheElement<K, V> get( String cacheName, K key, long requesterId ) 228 throws IOException 229 { 230 // if get is not allowed return 231 if ( this.allowGet ) 232 { 233 CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null ); 234 LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce ); 235 // led.requesterId = requesterId; // later 236 led.command = LateralCommand.GET; 237 @SuppressWarnings("unchecked") // Need to cast from Object 238 ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led ); 239 if ( response != null ) 240 { 241 return response; 242 } 243 return null; 244 } 245 else 246 { 247 // nothing needs to be done 248 return null; 249 } 250 } 251 252 /** 253 * If allow get is true, we will issue a getmatching query. 254 * <p> 255 * @param cacheName 256 * @param pattern 257 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 258 * data in cache matching the pattern. 259 * @throws IOException 260 */ 261 @Override 262 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern ) 263 throws IOException 264 { 265 return getMatching( cacheName, pattern, getListenerId() ); 266 } 267 268 /** 269 * If allow get is true, we will issue a getmatching query. 270 * <p> 271 * @param cacheName 272 * @param pattern 273 * @param requesterId - our identity 274 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 275 * data in cache matching the pattern. 276 * @throws IOException 277 */ 278 @Override 279 @SuppressWarnings("unchecked") // Need to cast from Object 280 public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId ) 281 throws IOException 282 { 283 // if get is not allowed return 284 if ( this.allowGet ) 285 { 286 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null ); 287 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce ); 288 // led.requesterId = requesterId; // later 289 led.command = LateralCommand.GET_MATCHING; 290 291 Object response = sender.sendAndReceive( led ); 292 if ( response != null ) 293 { 294 return (Map<K, ICacheElement<K, V>>) response; 295 } 296 return Collections.emptyMap(); 297 } 298 else 299 { 300 // nothing needs to be done 301 return null; 302 } 303 } 304 305 /** 306 * Gets multiple items from the cache based on the given set of keys. 307 * <p> 308 * @param cacheName 309 * @param keys 310 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 311 * data in cache for any of these keys 312 * @throws IOException 313 */ 314 @Override 315 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys ) 316 throws IOException 317 { 318 return getMultiple( cacheName, keys, getListenerId() ); 319 } 320 321 /** 322 * This issues a separate get for each item. 323 * <p> 324 * TODO We should change this. It should issue one request. 325 * <p> 326 * @param cacheName 327 * @param keys 328 * @param requesterId 329 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 330 * data in cache for any of these keys 331 * @throws IOException 332 */ 333 @Override 334 public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId ) 335 throws IOException 336 { 337 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>(); 338 339 if ( keys != null && !keys.isEmpty() ) 340 { 341 for (K key : keys) 342 { 343 ICacheElement<K, V> element = get( cacheName, key ); 344 345 if ( element != null ) 346 { 347 elements.put( key, element ); 348 } 349 } 350 } 351 return elements; 352 } 353 354 /** 355 * Return the keys in this cache. 356 * <p> 357 * @param cacheName the name of the cache region 358 * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet() 359 */ 360 @Override 361 @SuppressWarnings("unchecked") // Need cast from Object 362 public Set<K> getKeySet(String cacheName) throws IOException 363 { 364 CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null); 365 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce); 366 // led.requesterId = requesterId; // later 367 led.command = LateralCommand.GET_KEYSET; 368 Object response = sender.sendAndReceive(led); 369 if (response != null) 370 { 371 return (Set<K>) response; 372 } 373 374 return null; 375 } 376 377 /** 378 * @param cacheName 379 * @throws IOException 380 */ 381 @Override 382 public void removeAll( String cacheName ) 383 throws IOException 384 { 385 removeAll( cacheName, getListenerId() ); 386 } 387 388 /** 389 * @param cacheName 390 * @param requesterId 391 * @throws IOException 392 */ 393 @Override 394 public void removeAll( String cacheName, long requesterId ) 395 throws IOException 396 { 397 CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null ); 398 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce ); 399 led.requesterId = requesterId; 400 led.command = LateralCommand.REMOVEALL; 401 sender.send( led ); 402 } 403 404 /** 405 * @param args 406 */ 407 public static void main( String args[] ) 408 { 409 try 410 { 411 LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() ); 412 413 // process user input till done 414 boolean notDone = true; 415 String message = null; 416 // wait to dispose 417 BufferedReader br = new BufferedReader( new InputStreamReader( System.in, "UTF-8" ) ); 418 419 while ( notDone ) 420 { 421 System.out.println( "enter message:" ); 422 message = br.readLine(); 423 424 if (message == null) 425 { 426 notDone = false; 427 continue; 428 } 429 430 CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message ); 431 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce ); 432 sender.send( led ); 433 } 434 } 435 catch ( IOException e ) 436 { 437 System.out.println( e.toString() ); 438 } 439 } 440 441 /** 442 * @param listernId The listernId to set. 443 */ 444 protected void setListenerId( long listernId ) 445 { 446 this.listenerId = listernId; 447 } 448 449 /** 450 * @return Returns the listernId. 451 */ 452 protected long getListenerId() 453 { 454 return listenerId; 455 } 456}