1 package org.apache.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
38 import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
39 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
40 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
41 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
42 import org.apache.jcs.auxiliary.remote.server.behavior.RemoteType;
43 import org.apache.jcs.engine.CacheStatus;
44 import org.apache.jcs.engine.ZombieCacheServiceNonLocal;
45 import org.apache.jcs.engine.behavior.ICacheElement;
46 import org.apache.jcs.engine.behavior.ICacheElementSerialized;
47 import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
48 import org.apache.jcs.engine.behavior.IZombie;
49 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
50 import org.apache.jcs.engine.stats.StatElement;
51 import org.apache.jcs.engine.stats.Stats;
52 import org.apache.jcs.engine.stats.behavior.IStatElement;
53 import org.apache.jcs.engine.stats.behavior.IStats;
54 import org.apache.jcs.utils.serialization.SerializationConversionUtil;
55 import org.apache.jcs.utils.threadpool.ThreadPoolManager;
56
57
58 public abstract class AbstractRemoteAuxiliaryCache<K extends Serializable, V extends Serializable>
59 extends AbstractAuxiliaryCacheEventLogging<K, V>
60 implements IRemoteCacheClient<K, V>
61 {
62
63 private static final long serialVersionUID = -5329231850422826461L;
64
65
66 private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
67
68
69
70
71
72 private ICacheServiceNonLocal<K, V> remoteCacheService;
73
74
75 protected final String cacheName;
76
77
78 private IRemoteCacheListener<K, V> remoteCacheListener;
79
80
81 private IRemoteCacheAttributes remoteCacheAttributes;
82
83
84 private ThreadPoolExecutor pool = null;
85
86
87 private boolean usePoolForGet = false;
88
89
90
91
92
93
94
95
96 public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
97 IRemoteCacheListener<K, V> listener )
98 {
99 this.setRemoteCacheAttributes( cattr );
100 this.cacheName = cattr.getCacheName();
101 this.setRemoteCacheService( remote );
102 this.setRemoteCacheListener( listener );
103
104 if ( log.isDebugEnabled() )
105 {
106 log.debug( "Construct> cacheName=" + cattr.getCacheName() );
107 log.debug( "irca = " + getRemoteCacheAttributes() );
108 log.debug( "remote = " + remote );
109 log.debug( "listener = " + listener );
110 }
111
112
113 if ( log.isDebugEnabled() )
114 {
115 log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
116 }
117
118 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
119 {
120 pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
121 if ( log.isDebugEnabled() )
122 {
123 log.debug( "Thread Pool = " + pool );
124 }
125 if ( pool != null )
126 {
127 usePoolForGet = true;
128 }
129 }
130 }
131
132
133
134
135
136
137 @Override
138 protected void processDispose()
139 throws IOException
140 {
141 if ( log.isInfoEnabled() )
142 {
143 log.info( "Disposing of remote cache." );
144 }
145 try
146 {
147 if ( getRemoteCacheListener() != null )
148 {
149 getRemoteCacheListener().dispose();
150 }
151 }
152 catch ( Exception ex )
153 {
154 log.error( "Couldn't dispose", ex );
155 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
156 }
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172 @Override
173 protected ICacheElement<K, V> processGet( K key )
174 throws IOException
175 {
176 ICacheElement<K, V> retVal = null;
177 try
178 {
179 if ( usePoolForGet )
180 {
181 retVal = getUsingPool( key );
182 }
183 else
184 {
185 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
186 }
187
188
189 if ( retVal != null && retVal instanceof ICacheElementSerialized )
190 {
191
192
193
194 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
195 {
196 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
197 this.elementSerializer );
198 }
199 }
200 }
201 catch ( Exception ex )
202 {
203 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
204 }
205 return retVal;
206 }
207
208
209
210
211
212
213
214
215 public ICacheElement<K, V> getUsingPool( final K key )
216 throws IOException
217 {
218 int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
219
220 try
221 {
222 Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
223 {
224 public ICacheElement<K, V> call()
225 throws IOException
226 {
227 return getRemoteCacheService().get( cacheName, key, getListenerId() );
228 }
229 };
230
231
232 Future<ICacheElement<K, V>> future = pool.submit(command);
233
234
235 ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
236
237 if ( log.isDebugEnabled() )
238 {
239 if ( ice == null )
240 {
241 log.debug( "nothing found in remote cache" );
242 }
243 else
244 {
245 log.debug( "found item in remote cache" );
246 }
247 }
248 return ice;
249 }
250 catch ( TimeoutException te )
251 {
252 log.warn( "TimeoutException, Get Request timed out after " + timeout );
253 throw new IOException( "Get Request timed out after " + timeout );
254 }
255 catch ( InterruptedException ex )
256 {
257 log.warn( "InterruptedException, Get Request timed out after " + timeout );
258 throw new IOException( "Get Request timed out after " + timeout );
259 }
260 catch (ExecutionException ex)
261 {
262
263 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
264 throw new IOException( "Get Request timed out after " + timeout );
265 }
266 }
267
268
269
270
271
272
273
274
275 @Override
276 public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
277 throws IOException
278 {
279 Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
280 try
281 {
282 Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
283
284
285 if ( rawResults != null )
286 {
287 for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
288 {
289 ICacheElement<K, V> unwrappedResult = null;
290 if ( entry.getValue() instanceof ICacheElementSerialized )
291 {
292
293
294
295 if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
296 {
297 unwrappedResult = SerializationConversionUtil
298 .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
299 this.elementSerializer );
300 }
301 }
302 else
303 {
304 unwrappedResult = entry.getValue();
305 }
306 results.put( entry.getKey(), unwrappedResult );
307 }
308 }
309 }
310 catch ( Exception ex )
311 {
312 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
313 ICacheEventLogger.GET_EVENT );
314 }
315 return results;
316 }
317
318
319
320
321
322
323
324
325
326 @Override
327 protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
328 throws IOException
329 {
330 Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
331 if ( keys != null && !keys.isEmpty() )
332 {
333 for (K key : keys)
334 {
335 ICacheElement<K, V> element = get( key );
336
337 if ( element != null )
338 {
339 elements.put( key, element );
340 }
341 }
342 }
343 return elements;
344 }
345
346
347
348
349
350
351
352
353
354 @Override
355 protected boolean processRemove( K key )
356 throws IOException
357 {
358 if ( !this.getRemoteCacheAttributes().getGetOnly() )
359 {
360 if ( log.isDebugEnabled() )
361 {
362 log.debug( "remove> key=" + key );
363 }
364 try
365 {
366 getRemoteCacheService().remove( cacheName, key, getListenerId() );
367 }
368 catch ( Exception ex )
369 {
370 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
371 }
372 return true;
373 }
374 return false;
375 }
376
377
378
379
380
381
382
383 @Override
384 protected void processRemoveAll()
385 throws IOException
386 {
387 if ( !this.getRemoteCacheAttributes().getGetOnly() )
388 {
389 try
390 {
391 getRemoteCacheService().removeAll( cacheName, getListenerId() );
392 }
393 catch ( Exception ex )
394 {
395 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
396 }
397 }
398 }
399
400
401
402
403
404
405
406
407
408 @Override
409 protected void processUpdate( ICacheElement<K, V> ce )
410 throws IOException
411 {
412 if ( !getRemoteCacheAttributes().getGetOnly() )
413 {
414 ICacheElementSerialized<K, V> serialized = null;
415 try
416 {
417 if ( log.isDebugEnabled() )
418 {
419 log.debug( "sending item to remote server" );
420 }
421
422
423
424 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
425
426 remoteCacheService.update( serialized, getListenerId() );
427 }
428 catch ( NullPointerException npe )
429 {
430 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
431 }
432 catch ( Exception ex )
433 {
434
435 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
436 ICacheEventLogger.UPDATE_EVENT );
437 }
438 }
439 else
440 {
441 if ( log.isDebugEnabled() )
442 {
443 log.debug( "get only mode, not sending to remote server" );
444 }
445 }
446 }
447
448
449
450
451
452
453
454
455
456 public Set<K> getGroupKeys( String groupName )
457 throws java.rmi.RemoteException, IOException
458 {
459 return getRemoteCacheService().getGroupKeys( cacheName, groupName );
460 }
461
462
463
464
465
466
467
468
469 public Set<String> getGroupNames()
470 throws java.rmi.RemoteException, IOException
471 {
472 return getRemoteCacheService().getGroupNames( cacheName );
473 }
474
475
476
477
478
479
480
481 public IRemoteCacheListener<K, V> getListener()
482 {
483 return getRemoteCacheListener();
484 }
485
486
487
488
489
490
491
492
493 public void setListenerId( long id )
494 {
495 if ( getRemoteCacheListener() != null )
496 {
497 try
498 {
499 getRemoteCacheListener().setListenerId( id );
500
501 if ( log.isDebugEnabled() )
502 {
503 log.debug( "set listenerId = " + id );
504 }
505 }
506 catch ( Exception e )
507 {
508 log.error( "Problem setting listenerId", e );
509 }
510 }
511 }
512
513
514
515
516
517
518 public long getListenerId()
519 {
520 if ( getRemoteCacheListener() != null )
521 {
522 try
523 {
524 if ( log.isDebugEnabled() )
525 {
526 log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
527 }
528 return getRemoteCacheListener().getListenerId();
529 }
530 catch ( Exception e )
531 {
532 log.error( "Problem getting listenerId", e );
533 }
534 }
535 return -1;
536 }
537
538
539
540
541
542 public int getSize()
543 {
544 return 0;
545 }
546
547
548
549
550
551
552
553
554
555 protected abstract void handleException( Exception ex, String msg, String eventName )
556 throws IOException;
557
558
559
560
561
562
563 public String getStats()
564 {
565 return getStatistics().toString();
566 }
567
568
569
570
571 public IStats getStatistics()
572 {
573 IStats stats = new Stats();
574 stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
575
576 ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
577
578 IStatElement se = null;
579
580 se = new StatElement();
581 se.setName( "Remote Type" );
582 se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
583 elems.add( se );
584
585 if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
586 {
587
588 }
589
590
591
592 se = new StatElement();
593 se.setName( "UsePoolForGet" );
594 se.setData( "" + usePoolForGet );
595 elems.add( se );
596
597 if ( pool != null )
598 {
599 se = new StatElement();
600 se.setName( "Pool Size" );
601 se.setData( "" + pool.getPoolSize() );
602 elems.add( se );
603
604 se = new StatElement();
605 se.setName( "Maximum Pool Size" );
606 se.setData( "" + pool.getMaximumPoolSize() );
607 elems.add( se );
608 }
609
610 if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
611 {
612 se = new StatElement();
613 se.setName( "Zombie Queue Size" );
614 se.setData( "" + ( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize() );
615 elems.add( se );
616 }
617
618
619 IStatElement[] ses = elems.toArray( new StatElement[0] );
620 stats.setStatElements( ses );
621
622 return stats;
623 }
624
625
626
627
628
629
630 public CacheStatus getStatus()
631 {
632 return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
633 }
634
635
636
637
638
639
640
641 public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
642 {
643 @SuppressWarnings("unchecked")
644 ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
645 if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
646 {
647 ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService();
648 setRemoteCacheService( remote );
649 try
650 {
651 zombie.propagateEvents( remote );
652 }
653 catch ( Exception e )
654 {
655 try
656 {
657 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
658 "fixCache" );
659 }
660 catch ( IOException e1 )
661 {
662
663 }
664 }
665 }
666 else
667 {
668 setRemoteCacheService( remote );
669 }
670 }
671
672
673
674
675
676
677 public CacheType getCacheType()
678 {
679 return CacheType.REMOTE_CACHE;
680 }
681
682
683
684
685
686
687 public String getCacheName()
688 {
689 return cacheName;
690 }
691
692
693
694
695 protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
696 {
697 this.remoteCacheService = remote;
698 }
699
700
701
702
703 protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
704 {
705 return remoteCacheService;
706 }
707
708
709
710
711 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
712 {
713 return getRemoteCacheAttributes();
714 }
715
716
717
718
719 protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
720 {
721 this.remoteCacheAttributes = remoteCacheAttributes;
722 }
723
724
725
726
727 protected IRemoteCacheAttributes getRemoteCacheAttributes()
728 {
729 return remoteCacheAttributes;
730 }
731
732
733
734
735 protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
736 {
737 this.remoteCacheListener = remoteCacheListener;
738 }
739
740
741
742
743 protected IRemoteCacheListener<K, V> getRemoteCacheListener()
744 {
745 return remoteCacheListener;
746 }
747 }