1 package org.apache.commons.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.TimeoutException;
33
34 import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
35 import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
36 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
37 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
38 import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
39 import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
40 import org.apache.commons.jcs.engine.CacheStatus;
41 import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
42 import org.apache.commons.jcs.engine.behavior.ICacheElement;
43 import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
44 import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
45 import org.apache.commons.jcs.engine.behavior.IZombie;
46 import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
47 import org.apache.commons.jcs.engine.stats.StatElement;
48 import org.apache.commons.jcs.engine.stats.Stats;
49 import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
50 import org.apache.commons.jcs.engine.stats.behavior.IStats;
51 import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
52 import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56
57 public abstract class AbstractRemoteAuxiliaryCache<K, V>
58 extends AbstractAuxiliaryCacheEventLogging<K, V>
59 implements IRemoteCacheClient<K, V>
60 {
61
62 private static final Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
63
64
65
66
67
68 private ICacheServiceNonLocal<K, V> remoteCacheService;
69
70
71 protected final String cacheName;
72
73
74 private IRemoteCacheListener<K, V> remoteCacheListener;
75
76
77 private IRemoteCacheAttributes remoteCacheAttributes;
78
79
80 private ThreadPoolExecutor pool = null;
81
82
83 private boolean usePoolForGet = false;
84
85
86
87
88
89
90
91
92 public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
93 IRemoteCacheListener<K, V> listener )
94 {
95 this.setRemoteCacheAttributes( cattr );
96 this.cacheName = cattr.getCacheName();
97 this.setRemoteCacheService( remote );
98 this.setRemoteCacheListener( listener );
99
100 if ( log.isDebugEnabled() )
101 {
102 log.debug( "Construct> cacheName=" + cattr.getCacheName() );
103 log.debug( "irca = " + getRemoteCacheAttributes() );
104 log.debug( "remote = " + remote );
105 log.debug( "listener = " + listener );
106 }
107
108
109 if ( log.isDebugEnabled() )
110 {
111 log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
112 }
113
114 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
115 {
116 pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
117 if ( log.isDebugEnabled() )
118 {
119 log.debug( "Thread Pool = " + pool );
120 }
121 if ( pool != null )
122 {
123 usePoolForGet = true;
124 }
125 }
126 }
127
128
129
130
131
132
133 @Override
134 protected void processDispose()
135 throws IOException
136 {
137 if ( log.isInfoEnabled() )
138 {
139 log.info( "Disposing of remote cache." );
140 }
141 try
142 {
143 if ( getRemoteCacheListener() != null )
144 {
145 getRemoteCacheListener().dispose();
146 }
147 }
148 catch ( Exception ex )
149 {
150 log.error( "Couldn't dispose", ex );
151 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
152 }
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 @Override
169 protected ICacheElement<K, V> processGet( K key )
170 throws IOException
171 {
172 ICacheElement<K, V> retVal = null;
173 try
174 {
175 if ( usePoolForGet )
176 {
177 retVal = getUsingPool( key );
178 }
179 else
180 {
181 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
182 }
183
184
185 if ( retVal instanceof ICacheElementSerialized )
186 {
187
188
189
190 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
191 {
192 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
193 super.getElementSerializer() );
194 }
195 }
196 }
197 catch ( Exception ex )
198 {
199 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
200 }
201 return retVal;
202 }
203
204
205
206
207
208
209
210
211 public ICacheElement<K, V> getUsingPool( final K key )
212 throws IOException
213 {
214 int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
215
216 try
217 {
218 Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
219 {
220 @Override
221 public ICacheElement<K, V> call()
222 throws IOException
223 {
224 return getRemoteCacheService().get( cacheName, key, getListenerId() );
225 }
226 };
227
228
229 Future<ICacheElement<K, V>> future = pool.submit(command);
230
231
232 ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
233
234 if ( log.isDebugEnabled() )
235 {
236 if ( ice == null )
237 {
238 log.debug( "nothing found in remote cache" );
239 }
240 else
241 {
242 log.debug( "found item in remote cache" );
243 }
244 }
245 return ice;
246 }
247 catch ( TimeoutException te )
248 {
249 log.warn( "TimeoutException, Get Request timed out after " + timeout );
250 throw new IOException( "Get Request timed out after " + timeout );
251 }
252 catch ( InterruptedException ex )
253 {
254 log.warn( "InterruptedException, Get Request timed out after " + timeout );
255 throw new IOException( "Get Request timed out after " + timeout );
256 }
257 catch (ExecutionException ex)
258 {
259
260 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
261 throw new IOException( "Get Request timed out after " + timeout );
262 }
263 }
264
265
266
267
268
269
270
271
272 @Override
273 public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
274 throws IOException
275 {
276 Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
277 try
278 {
279 Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
280
281
282 if ( rawResults != null )
283 {
284 for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
285 {
286 ICacheElement<K, V> unwrappedResult = null;
287 if ( entry.getValue() instanceof ICacheElementSerialized )
288 {
289
290
291
292 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
293 {
294 unwrappedResult = SerializationConversionUtil
295 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
296 super.getElementSerializer() );
297 }
298 }
299 else
300 {
301 unwrappedResult = entry.getValue();
302 }
303 results.put( entry.getKey(), unwrappedResult );
304 }
305 }
306 }
307 catch ( Exception ex )
308 {
309 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
310 ICacheEventLogger.GET_EVENT );
311 }
312 return results;
313 }
314
315
316
317
318
319
320
321
322
323 @Override
324 protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
325 throws IOException
326 {
327 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
328 if ( keys != null && !keys.isEmpty() )
329 {
330 for (K key : keys)
331 {
332 ICacheElement<K, V> element = get( key );
333
334 if ( element != null )
335 {
336 elements.put( key, element );
337 }
338 }
339 }
340 return elements;
341 }
342
343
344
345
346
347
348
349
350
351 @Override
352 protected boolean processRemove( K key )
353 throws IOException
354 {
355 if ( !this.getRemoteCacheAttributes().getGetOnly() )
356 {
357 if ( log.isDebugEnabled() )
358 {
359 log.debug( "remove> key=" + key );
360 }
361 try
362 {
363 getRemoteCacheService().remove( cacheName, key, getListenerId() );
364 }
365 catch ( Exception ex )
366 {
367 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
368 }
369 return true;
370 }
371 return false;
372 }
373
374
375
376
377
378
379
380 @Override
381 protected void processRemoveAll()
382 throws IOException
383 {
384 if ( !this.getRemoteCacheAttributes().getGetOnly() )
385 {
386 try
387 {
388 getRemoteCacheService().removeAll( cacheName, getListenerId() );
389 }
390 catch ( Exception ex )
391 {
392 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
393 }
394 }
395 }
396
397
398
399
400
401
402
403
404
405 @Override
406 protected void processUpdate( ICacheElement<K, V> ce )
407 throws IOException
408 {
409 if ( !getRemoteCacheAttributes().getGetOnly() )
410 {
411 ICacheElementSerialized<K, V> serialized = null;
412 try
413 {
414 if ( log.isDebugEnabled() )
415 {
416 log.debug( "sending item to remote server" );
417 }
418
419
420
421 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
422
423 remoteCacheService.update( serialized, getListenerId() );
424 }
425 catch ( NullPointerException npe )
426 {
427 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
428 }
429 catch ( Exception ex )
430 {
431
432 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
433 ICacheEventLogger.UPDATE_EVENT );
434 }
435 }
436 else
437 {
438 if ( log.isDebugEnabled() )
439 {
440 log.debug( "get only mode, not sending to remote server" );
441 }
442 }
443 }
444
445
446
447
448
449
450 @Override
451 public Set<K> getKeySet()
452 throws IOException
453 {
454 return getRemoteCacheService().getKeySet(cacheName);
455 }
456
457
458
459
460
461
462
463 @Override
464 public IRemoteCacheListener<K, V> getListener()
465 {
466 return getRemoteCacheListener();
467 }
468
469
470
471
472
473
474
475
476 public void setListenerId( long id )
477 {
478 if ( getRemoteCacheListener() != null )
479 {
480 try
481 {
482 getRemoteCacheListener().setListenerId( id );
483
484 if ( log.isDebugEnabled() )
485 {
486 log.debug( "set listenerId = " + id );
487 }
488 }
489 catch ( Exception e )
490 {
491 log.error( "Problem setting listenerId", e );
492 }
493 }
494 }
495
496
497
498
499
500
501 @Override
502 public long getListenerId()
503 {
504 if ( getRemoteCacheListener() != null )
505 {
506 try
507 {
508 if ( log.isDebugEnabled() )
509 {
510 log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
511 }
512 return getRemoteCacheListener().getListenerId();
513 }
514 catch ( Exception e )
515 {
516 log.error( "Problem getting listenerId", e );
517 }
518 }
519 return -1;
520 }
521
522
523
524
525
526 @Override
527 public int getSize()
528 {
529 return 0;
530 }
531
532
533
534
535
536
537
538
539
540 protected abstract void handleException( Exception ex, String msg, String eventName )
541 throws IOException;
542
543
544
545
546
547
548 @Override
549 public String getStats()
550 {
551 return getStatistics().toString();
552 }
553
554
555
556
557 @Override
558 public IStats getStatistics()
559 {
560 IStats stats = new Stats();
561 stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
562
563 ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
564
565 elems.add(new StatElement<String>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
566
567
568
569
570
571
572 elems.add(new StatElement<Boolean>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
573
574 if ( pool != null )
575 {
576 elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
577 elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
578 }
579
580 if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
581 {
582 elems.add(new StatElement<Integer>( "Zombie Queue Size",
583 Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
584 }
585
586 stats.setStatElements( elems );
587
588 return stats;
589 }
590
591
592
593
594
595
596 @Override
597 public CacheStatus getStatus()
598 {
599 return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
600 }
601
602
603
604
605
606
607
608 @Override
609 public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
610 {
611 @SuppressWarnings("unchecked")
612 ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
613 ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
614 if ( prevRemote instanceof ZombieCacheServiceNonLocal )
615 {
616 ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
617 setRemoteCacheService( remote );
618 try
619 {
620 zombie.propagateEvents( remote );
621 }
622 catch ( Exception e )
623 {
624 try
625 {
626 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
627 "fixCache" );
628 }
629 catch ( IOException e1 )
630 {
631
632 }
633 }
634 }
635 else
636 {
637 setRemoteCacheService( remote );
638 }
639 }
640
641
642
643
644
645
646 @Override
647 public CacheType getCacheType()
648 {
649 return CacheType.REMOTE_CACHE;
650 }
651
652
653
654
655
656
657 @Override
658 public String getCacheName()
659 {
660 return cacheName;
661 }
662
663
664
665
666 protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
667 {
668 this.remoteCacheService = remote;
669 }
670
671
672
673
674 protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
675 {
676 return remoteCacheService;
677 }
678
679
680
681
682 @Override
683 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
684 {
685 return getRemoteCacheAttributes();
686 }
687
688
689
690
691 protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
692 {
693 this.remoteCacheAttributes = remoteCacheAttributes;
694 }
695
696
697
698
699 protected IRemoteCacheAttributes getRemoteCacheAttributes()
700 {
701 return remoteCacheAttributes;
702 }
703
704
705
706
707 protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
708 {
709 this.remoteCacheListener = remoteCacheListener;
710 }
711
712
713
714
715 protected IRemoteCacheListener<K, V> getRemoteCacheListener()
716 {
717 return remoteCacheListener;
718 }
719 }