1 package org.apache.jcs.auxiliary.remote;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.Serializable;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
38 import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
39 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
40 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
41 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
42 import org.apache.jcs.auxiliary.remote.behavior.IRemoteCacheService;
43 import org.apache.jcs.engine.CacheConstants;
44 import org.apache.jcs.engine.behavior.ICacheElement;
45 import org.apache.jcs.engine.behavior.ICacheElementSerialized;
46 import org.apache.jcs.engine.behavior.IZombie;
47 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
48 import org.apache.jcs.engine.stats.StatElement;
49 import org.apache.jcs.engine.stats.Stats;
50 import org.apache.jcs.engine.stats.behavior.IStatElement;
51 import org.apache.jcs.engine.stats.behavior.IStats;
52 import org.apache.jcs.utils.serialization.SerializationConversionUtil;
53 import org.apache.jcs.utils.threadpool.ThreadPoolManager;
54
55
56 public abstract class AbstractRemoteAuxiliaryCache
57 extends AbstractAuxiliaryCacheEventLogging
58 implements IRemoteCacheClient
59 {
60
61 private static final long serialVersionUID = -5329231850422826461L;
62
63
64 private final static Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
65
66
67
68
69
70 private IRemoteCacheService remoteCacheService;
71
72
73 protected final String cacheName;
74
75
76 private IRemoteCacheListener remoteCacheListener;
77
78
79 private IRemoteCacheAttributes remoteCacheAttributes;
80
81
82 private ThreadPoolExecutor pool = null;
83
84
85 private boolean usePoolForGet = false;
86
87
88
89
90
91
92
93
94 public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, IRemoteCacheService remote,
95 IRemoteCacheListener listener )
96 {
97 this.setRemoteCacheAttributes( cattr );
98 this.cacheName = cattr.getCacheName();
99 this.setRemoteCacheService( remote );
100 this.setRemoteCacheListener( listener );
101
102 if ( log.isDebugEnabled() )
103 {
104 log.debug( "Construct> cacheName=" + cattr.getCacheName() );
105 log.debug( "irca = " + getRemoteCacheAttributes() );
106 log.debug( "remote = " + remote );
107 log.debug( "listener = " + listener );
108 }
109
110
111 if ( log.isDebugEnabled() )
112 {
113 log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
114 }
115
116 if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
117 {
118 pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
119 if ( log.isDebugEnabled() )
120 {
121 log.debug( "Thread Pool = " + pool );
122 }
123 if ( pool != null )
124 {
125 usePoolForGet = true;
126 }
127 }
128 }
129
130
131
132
133
134
135 @Override
136 protected void processDispose()
137 throws IOException
138 {
139 if ( log.isInfoEnabled() )
140 {
141 log.info( "Disposing of remote cache." );
142 }
143 try
144 {
145 if ( getRemoteCacheListener() != null )
146 {
147 getRemoteCacheListener().dispose();
148 }
149 }
150 catch ( Exception ex )
151 {
152 log.error( "Couldn't dispose", ex );
153 handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
154 }
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 @Override
171 protected ICacheElement processGet( Serializable key )
172 throws IOException
173 {
174 ICacheElement retVal = null;
175 try
176 {
177 if ( usePoolForGet )
178 {
179 retVal = getUsingPool( key );
180 }
181 else
182 {
183 retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
184 }
185
186
187 if ( retVal != null && retVal instanceof ICacheElementSerialized )
188 {
189
190
191
192 if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER )
193 {
194 retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized) retVal,
195 this.elementSerializer );
196 }
197 }
198 }
199 catch ( Exception ex )
200 {
201 handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
202 }
203 return retVal;
204 }
205
206
207
208
209
210
211
212
213 public ICacheElement getUsingPool( final Serializable key )
214 throws IOException
215 {
216 int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
217
218 try
219 {
220 Callable<ICacheElement> command = new Callable<ICacheElement>()
221 {
222 public ICacheElement call()
223 throws IOException
224 {
225 return getRemoteCacheService().get( cacheName, key, getListenerId() );
226 }
227 };
228
229
230 Future<ICacheElement> future = pool.submit(command);
231
232
233 ICacheElement ice = future.get(timeout, TimeUnit.MILLISECONDS);
234
235 if ( log.isDebugEnabled() )
236 {
237 if ( ice == null )
238 {
239 log.debug( "nothing found in remote cache" );
240 }
241 else
242 {
243 log.debug( "found item in remote cache" );
244 }
245 }
246 return ice;
247 }
248 catch ( TimeoutException te )
249 {
250 log.warn( "TimeoutException, Get Request timed out after " + timeout );
251 throw new IOException( "Get Request timed out after " + timeout );
252 }
253 catch ( InterruptedException ex )
254 {
255 log.warn( "InterruptedException, Get Request timed out after " + timeout );
256 throw new IOException( "Get Request timed out after " + timeout );
257 }
258 catch (ExecutionException ex)
259 {
260
261 log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
262 throw new IOException( "Get Request timed out after " + timeout );
263 }
264 }
265
266
267
268
269
270
271
272
273 @Override
274 public Map<Serializable, ICacheElement> processGetMatching( String pattern )
275 throws IOException
276 {
277 Map<Serializable, ICacheElement> results = new HashMap<Serializable, ICacheElement>();
278 try
279 {
280 Map<Serializable, ICacheElement> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
281
282
283 if ( rawResults != null )
284 {
285 for (Map.Entry<Serializable, ICacheElement> entry : rawResults.entrySet())
286 {
287 ICacheElement unwrappedResult = null;
288 if ( entry.getValue() instanceof ICacheElementSerialized )
289 {
290
291
292
293 if ( this.getRemoteCacheAttributes().getRemoteType() != IRemoteCacheAttributes.CLUSTER )
294 {
295 unwrappedResult = SerializationConversionUtil
296 .getDeSerializedCacheElement( (ICacheElementSerialized) entry.getValue(),
297 this.elementSerializer );
298 }
299 }
300 else
301 {
302 unwrappedResult = entry.getValue();
303 }
304 results.put( entry.getKey(), unwrappedResult );
305 }
306 }
307 }
308 catch ( Exception ex )
309 {
310 handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
311 ICacheEventLogger.GET_EVENT );
312 }
313 return results;
314 }
315
316
317
318
319
320
321
322
323
324 @Override
325 protected Map<Serializable, ICacheElement> processGetMultiple( Set<Serializable> keys )
326 throws IOException
327 {
328 Map<Serializable, ICacheElement> elements = new HashMap<Serializable, ICacheElement>();
329 if ( keys != null && !keys.isEmpty() )
330 {
331 for (Serializable key : keys)
332 {
333 ICacheElement element = get( key );
334
335 if ( element != null )
336 {
337 elements.put( key, element );
338 }
339 }
340 }
341 return elements;
342 }
343
344
345
346
347
348
349
350
351
352 @Override
353 protected boolean processRemove( Serializable key )
354 throws IOException
355 {
356 if ( !this.getRemoteCacheAttributes().getGetOnly() )
357 {
358 if ( log.isDebugEnabled() )
359 {
360 log.debug( "remove> key=" + key );
361 }
362 try
363 {
364 getRemoteCacheService().remove( cacheName, key, getListenerId() );
365 }
366 catch ( Exception ex )
367 {
368 handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
369 }
370 return true;
371 }
372 return false;
373 }
374
375
376
377
378
379
380
381 @Override
382 protected void processRemoveAll()
383 throws IOException
384 {
385 if ( !this.getRemoteCacheAttributes().getGetOnly() )
386 {
387 try
388 {
389 getRemoteCacheService().removeAll( cacheName, getListenerId() );
390 }
391 catch ( Exception ex )
392 {
393 handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
394 }
395 }
396 }
397
398
399
400
401
402
403
404
405
406 @Override
407 protected void processUpdate( ICacheElement ce )
408 throws IOException
409 {
410 if ( !getRemoteCacheAttributes().getGetOnly() )
411 {
412 ICacheElementSerialized serialized = null;
413 try
414 {
415 if ( log.isDebugEnabled() )
416 {
417 log.debug( "sending item to remote server" );
418 }
419
420
421
422 serialized = SerializationConversionUtil.getSerializedCacheElement( ce, this.elementSerializer );
423
424 remoteCacheService.update( serialized, getListenerId() );
425 }
426 catch ( NullPointerException npe )
427 {
428 log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
429 }
430 catch ( Exception ex )
431 {
432
433 handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
434 ICacheEventLogger.UPDATE_EVENT );
435 }
436 }
437 else
438 {
439 if ( log.isDebugEnabled() )
440 {
441 log.debug( "get only mode, not sending to remote server" );
442 }
443 }
444 }
445
446
447
448
449
450
451
452
453
454 public Set<Serializable> getGroupKeys( String groupName )
455 throws java.rmi.RemoteException, IOException
456 {
457 return getRemoteCacheService().getGroupKeys( cacheName, groupName );
458 }
459
460
461
462
463
464
465
466 public IRemoteCacheListener getListener()
467 {
468 return getRemoteCacheListener();
469 }
470
471
472
473
474
475
476
477
478 public void setListenerId( long id )
479 {
480 if ( getRemoteCacheListener() != null )
481 {
482 try
483 {
484 getRemoteCacheListener().setListenerId( id );
485
486 if ( log.isDebugEnabled() )
487 {
488 log.debug( "set listenerId = " + id );
489 }
490 }
491 catch ( Exception e )
492 {
493 log.error( "Problem setting listenerId", e );
494 }
495 }
496 }
497
498
499
500
501
502
503 public long getListenerId()
504 {
505 if ( getRemoteCacheListener() != null )
506 {
507 try
508 {
509 if ( log.isDebugEnabled() )
510 {
511 log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
512 }
513 return getRemoteCacheListener().getListenerId();
514 }
515 catch ( Exception e )
516 {
517 log.error( "Problem getting listenerId", e );
518 }
519 }
520 return -1;
521 }
522
523
524
525
526
527 public int getSize()
528 {
529 return 0;
530 }
531
532
533
534
535
536
537
538
539
540 protected abstract void handleException( Exception ex, String msg, String eventName )
541 throws IOException;
542
543
544
545
546
547
548 public String getStats()
549 {
550 return getStatistics().toString();
551 }
552
553
554
555
556 public IStats getStatistics()
557 {
558 IStats stats = new Stats();
559 stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
560
561 ArrayList<IStatElement> elems = new ArrayList<IStatElement>();
562
563 IStatElement se = null;
564
565 se = new StatElement();
566 se.setName( "Remote Type" );
567 se.setData( this.getRemoteCacheAttributes().getRemoteTypeName() + "" );
568 elems.add( se );
569
570 if ( this.getRemoteCacheAttributes().getRemoteType() == IRemoteCacheAttributes.CLUSTER )
571 {
572
573 }
574
575
576
577 se = new StatElement();
578 se.setName( "UsePoolForGet" );
579 se.setData( "" + usePoolForGet );
580 elems.add( se );
581
582 if ( pool != null )
583 {
584 se = new StatElement();
585 se.setName( "Pool Size" );
586 se.setData( "" + pool.getPoolSize() );
587 elems.add( se );
588
589 se = new StatElement();
590 se.setName( "Maximum Pool Size" );
591 se.setData( "" + pool.getMaximumPoolSize() );
592 elems.add( se );
593 }
594
595 if ( getRemoteCacheService() instanceof ZombieRemoteCacheService )
596 {
597 se = new StatElement();
598 se.setName( "Zombie Queue Size" );
599 se.setData( "" + ( (ZombieRemoteCacheService) getRemoteCacheService() ).getQueueSize() );
600 elems.add( se );
601 }
602
603
604 IStatElement[] ses = elems.toArray( new StatElement[0] );
605 stats.setStatElements( ses );
606
607 return stats;
608 }
609
610
611
612
613
614
615 public int getStatus()
616 {
617 return getRemoteCacheService() instanceof IZombie ? CacheConstants.STATUS_ERROR : CacheConstants.STATUS_ALIVE;
618 }
619
620
621
622
623
624
625
626 public void fixCache( IRemoteCacheService restoredRemote )
627 {
628 if ( getRemoteCacheService() != null && getRemoteCacheService() instanceof ZombieRemoteCacheService )
629 {
630 ZombieRemoteCacheService zombie = (ZombieRemoteCacheService) getRemoteCacheService();
631 setRemoteCacheService( restoredRemote );
632 try
633 {
634 zombie.propagateEvents( restoredRemote );
635 }
636 catch ( Exception e )
637 {
638 try
639 {
640 handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
641 "fixCache" );
642 }
643 catch ( IOException e1 )
644 {
645
646 }
647 }
648 }
649 else
650 {
651 setRemoteCacheService( restoredRemote );
652 }
653 }
654
655
656
657
658
659
660 public int getCacheType()
661 {
662 return REMOTE_CACHE;
663 }
664
665
666
667
668
669
670 public String getCacheName()
671 {
672 return cacheName;
673 }
674
675
676
677
678 protected void setRemoteCacheService( IRemoteCacheService remote )
679 {
680 this.remoteCacheService = remote;
681 }
682
683
684
685
686 protected IRemoteCacheService getRemoteCacheService()
687 {
688 return remoteCacheService;
689 }
690
691
692
693
694 public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
695 {
696 return getRemoteCacheAttributes();
697 }
698
699
700
701
702 protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
703 {
704 this.remoteCacheAttributes = remoteCacheAttributes;
705 }
706
707
708
709
710 protected IRemoteCacheAttributes getRemoteCacheAttributes()
711 {
712 return remoteCacheAttributes;
713 }
714
715
716
717
718 protected void setRemoteCacheListener( IRemoteCacheListener remoteCacheListener )
719 {
720 this.remoteCacheListener = remoteCacheListener;
721 }
722
723
724
725
726 protected IRemoteCacheListener getRemoteCacheListener()
727 {
728 return remoteCacheListener;
729 }
730 }