001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.BufferedReader; 023import java.io.IOException; 024import java.io.InputStreamReader; 025import java.nio.charset.StandardCharsets; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.Map; 029import java.util.Set; 030 031import org.apache.commons.jcs3.auxiliary.lateral.LateralCommand; 032import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor; 033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes; 034import org.apache.commons.jcs3.engine.CacheElement; 035import org.apache.commons.jcs3.engine.CacheInfo; 036import org.apache.commons.jcs3.engine.behavior.ICacheElement; 037import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal; 038import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 039import org.apache.commons.jcs3.log.Log; 040import org.apache.commons.jcs3.log.LogManager; 041import org.apache.commons.jcs3.utils.serialization.StandardSerializer; 042 043/** 044 * A lateral cache service implementation. Does not implement getGroupKey 045 * TODO: Remove generics 046 */ 047public class LateralTCPService<K, V> 048 implements ICacheServiceNonLocal<K, V> 049{ 050 /** The logger. */ 051 private static final Log log = LogManager.getLog( LateralTCPService.class ); 052 053 /** special configuration */ 054 private final boolean allowPut; 055 private final boolean allowGet; 056 private final boolean issueRemoveOnPut; 057 058 /** Sends to another lateral. */ 059 private final LateralTCPSender sender; 060 061 /** use the vmid by default */ 062 private long listenerId = CacheInfo.listenerId; 063 064 /** 065 * Constructor for the LateralTCPService object 066 * <p> 067 * @param lca ITCPLateralCacheAttributes the configuration object 068 * @throws IOException 069 * 070 * @deprecated Specify serializer 071 */ 072 @Deprecated 073 public LateralTCPService( final ITCPLateralCacheAttributes lca ) 074 throws IOException 075 { 076 this(lca, new StandardSerializer()); 077 } 078 079 /** 080 * Constructor for the LateralTCPService object 081 * <p> 082 * @param lca ITCPLateralCacheAttributes the configuration object 083 * @param serializer the serializer to use when sending 084 * @throws IOException 085 * @since 3.1 086 */ 087 public LateralTCPService( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer ) 088 throws IOException 089 { 090 this.allowGet = lca.isAllowGet(); 091 this.allowPut = lca.isAllowPut(); 092 this.issueRemoveOnPut = lca.isIssueRemoveOnPut(); 093 094 try 095 { 096 sender = new LateralTCPSender( lca, serializer ); 097 098 log.debug( "Created sender to [{0}]", lca::getTcpServer); 099 } 100 catch ( final IOException e ) 101 { 102 // log.error( "Could not create sender", e ); 103 // This gets thrown over and over in recovery mode. 104 // The stack trace isn't useful here. 105 log.error( "Could not create sender to [{0}] -- {1}", lca::getTcpServer, e::getMessage); 106 throw e; 107 } 108 } 109 110 /** 111 * @param item 112 * @throws IOException 113 */ 114 @Override 115 public void update( final ICacheElement<K, V> item ) 116 throws IOException 117 { 118 update( item, getListenerId() ); 119 } 120 121 /** 122 * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a 123 * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP 124 * packet. It describes what operation the receiver should take when it gets the packet. 125 * <p> 126 * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs3.engine.behavior.ICacheElement, 127 * long) 128 */ 129 @Override 130 public void update( final ICacheElement<K, V> item, final long requesterId ) 131 throws IOException 132 { 133 // if we don't allow put, see if we should remove on put 134 if ( !this.allowPut && 135 // if we can't remove on put, and we can't put then return 136 !this.issueRemoveOnPut ) 137 { 138 return; 139 } 140 141 // if we shouldn't remove on put, then put 142 if ( !this.issueRemoveOnPut ) 143 { 144 final LateralElementDescriptor<K, V> led = 145 new LateralElementDescriptor<>(item, LateralCommand.UPDATE, requesterId); 146 sender.send( led ); 147 } 148 // else issue a remove with the hash code for remove check on 149 // on the other end, this will be a server config option 150 else 151 { 152 log.debug( "Issuing a remove for a put" ); 153 154 // set the value to null so we don't send the item 155 final CacheElement<K, V> ce = new CacheElement<>( item.getCacheName(), item.getKey(), null ); 156 final LateralElementDescriptor<K, V> led = 157 new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId); 158 led.valHashCode = item.getVal().hashCode(); 159 sender.send( led ); 160 } 161 } 162 163 /** 164 * Uses the default listener id and calls the next remove method. 165 * <p> 166 * @see org.apache.commons.jcs3.engine.behavior.ICacheService#remove(String, Object) 167 */ 168 @Override 169 public void remove( final String cacheName, final K key ) 170 throws IOException 171 { 172 remove( cacheName, key, getListenerId() ); 173 } 174 175 /** 176 * Wraps the key in a LateralElementDescriptor. 177 * <p> 178 * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long) 179 */ 180 @Override 181 public void remove( final String cacheName, final K key, final long requesterId ) 182 throws IOException 183 { 184 final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null ); 185 final LateralElementDescriptor<K, V> led = 186 new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId); 187 sender.send( led ); 188 } 189 190 /** 191 * Does nothing. 192 * <p> 193 * @throws IOException 194 */ 195 @Override 196 public void release() 197 throws IOException 198 { 199 // nothing needs to be done 200 } 201 202 /** 203 * Will close the connection. 204 * <p> 205 * @param cacheName 206 * @throws IOException 207 */ 208 @Override 209 public void dispose( final String cacheName ) 210 throws IOException 211 { 212 sender.dispose(); 213 } 214 215 /** 216 * @param cacheName 217 * @param key 218 * @return ICacheElement<K, V> if found. 219 * @throws IOException 220 */ 221 @Override 222 public ICacheElement<K, V> get( final String cacheName, final K key ) 223 throws IOException 224 { 225 return get( cacheName, key, getListenerId() ); 226 } 227 228 /** 229 * If get is allowed, we will issues a get request. 230 * <p> 231 * @param cacheName 232 * @param key 233 * @param requesterId 234 * @return ICacheElement<K, V> if found. 235 * @throws IOException 236 */ 237 @Override 238 public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId ) 239 throws IOException 240 { 241 // if get is not allowed return 242 if ( this.allowGet ) 243 { 244 final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null ); 245 final LateralElementDescriptor<K, V> led = 246 new LateralElementDescriptor<>(ce, LateralCommand.GET); 247 // led.requesterId = requesterId; // later 248 @SuppressWarnings("unchecked") // Need to cast from Object 249 final 250 ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led ); 251 return response; 252 } 253 // nothing needs to be done 254 return null; 255 } 256 257 /** 258 * If allow get is true, we will issue a getmatching query. 259 * <p> 260 * @param cacheName 261 * @param pattern 262 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 263 * data in cache matching the pattern. 264 * @throws IOException 265 */ 266 @Override 267 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern ) 268 throws IOException 269 { 270 return getMatching( cacheName, pattern, getListenerId() ); 271 } 272 273 /** 274 * If allow get is true, we will issue a getmatching query. 275 * <p> 276 * @param cacheName 277 * @param pattern 278 * @param requesterId - our identity 279 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 280 * data in cache matching the pattern. 281 * @throws IOException 282 */ 283 @Override 284 @SuppressWarnings("unchecked") // Need to cast from Object 285 public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId ) 286 throws IOException 287 { 288 // if get is not allowed return 289 if ( !this.allowGet ) { 290 // nothing needs to be done 291 return null; 292 } 293 final CacheElement<String, String> ce = new CacheElement<>( cacheName, pattern, null ); 294 final LateralElementDescriptor<String, String> led = 295 new LateralElementDescriptor<>(ce, LateralCommand.GET_MATCHING); 296 // led.requesterId = requesterId; // later 297 298 final Object response = sender.sendAndReceive( led ); 299 if ( response != null ) 300 { 301 return (Map<K, ICacheElement<K, V>>) response; 302 } 303 return Collections.emptyMap(); 304 } 305 306 /** 307 * Gets multiple items from the cache based on the given set of keys. 308 * <p> 309 * @param cacheName 310 * @param keys 311 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 312 * data in cache for any of these keys 313 * @throws IOException 314 */ 315 @Override 316 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys ) 317 throws IOException 318 { 319 return getMultiple( cacheName, keys, getListenerId() ); 320 } 321 322 /** 323 * This issues a separate get for each item. 324 * <p> 325 * TODO We should change this. It should issue one request. 326 * <p> 327 * @param cacheName 328 * @param keys 329 * @param requesterId 330 * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no 331 * data in cache for any of these keys 332 * @throws IOException 333 */ 334 @Override 335 public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId ) 336 throws IOException 337 { 338 final Map<K, ICacheElement<K, V>> elements = new HashMap<>(); 339 340 if ( keys != null && !keys.isEmpty() ) 341 { 342 for (final K key : keys) 343 { 344 final ICacheElement<K, V> element = get( cacheName, key, requesterId ); 345 346 if ( element != null ) 347 { 348 elements.put( key, element ); 349 } 350 } 351 } 352 return elements; 353 } 354 355 /** 356 * Return the keys in this cache. 357 * <p> 358 * @param cacheName the name of the cache region 359 * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet() 360 */ 361 @Override 362 @SuppressWarnings("unchecked") // Need cast from Object 363 public Set<K> getKeySet(final String cacheName) throws IOException 364 { 365 final CacheElement<String, String> ce = new CacheElement<>(cacheName, null, null); 366 final LateralElementDescriptor<String, String> led = 367 new LateralElementDescriptor<>(ce, LateralCommand.GET_KEYSET); 368 // led.requesterId = requesterId; // later 369 final Object response = sender.sendAndReceive(led); 370 if (response != null) 371 { 372 return (Set<K>) response; 373 } 374 375 return null; 376 } 377 378 /** 379 * @param cacheName 380 * @throws IOException 381 */ 382 @Override 383 public void removeAll( final String cacheName ) 384 throws IOException 385 { 386 removeAll( cacheName, getListenerId() ); 387 } 388 389 /** 390 * @param cacheName 391 * @param requesterId 392 * @throws IOException 393 */ 394 @Override 395 public void removeAll( final String cacheName, final long requesterId ) 396 throws IOException 397 { 398 final CacheElement<String, String> ce = new CacheElement<>( cacheName, "ALL", null ); 399 final LateralElementDescriptor<String, String> led = 400 new LateralElementDescriptor<>(ce, LateralCommand.REMOVEALL, requesterId); 401 sender.send( led ); 402 } 403 404 /** 405 * Test 406 * @param args 407 * 408 * @deprecated Use unit tests 409 */ 410 @Deprecated 411 public static void main( final String args[] ) 412 { 413 try 414 { 415 final LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() ); 416 417 // process user input till done 418 boolean notDone = true; 419 String message = null; 420 // wait to dispose 421 final BufferedReader br = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8)); 422 423 while ( notDone ) 424 { 425 System.out.println( "enter message:" ); 426 message = br.readLine(); 427 428 if (message == null) 429 { 430 notDone = false; 431 continue; 432 } 433 434 final CacheElement<String, String> ce = new CacheElement<>( "test", "test", message ); 435 final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce ); 436 sender.send( led ); 437 } 438 } 439 catch ( final IOException e ) 440 { 441 System.out.println( e.toString() ); 442 } 443 } 444 445 /** 446 * @param listernId The listernId to set. 447 */ 448 protected void setListenerId( final long listernId ) 449 { 450 this.listenerId = listernId; 451 } 452 453 /** 454 * @return Returns the listernId. 455 */ 456 protected long getListenerId() 457 { 458 return listenerId; 459 } 460}