1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.commons.jcs.jcache;
20
21 import org.apache.commons.jcs.engine.CacheElement;
22 import org.apache.commons.jcs.engine.ElementAttributes;
23 import org.apache.commons.jcs.engine.behavior.ICacheElement;
24 import org.apache.commons.jcs.engine.behavior.IElementAttributes;
25 import org.apache.commons.jcs.engine.behavior.IElementSerializer;
26 import org.apache.commons.jcs.jcache.jmx.JCSCacheMXBean;
27 import org.apache.commons.jcs.jcache.jmx.JCSCacheStatisticsMXBean;
28 import org.apache.commons.jcs.jcache.jmx.JMXs;
29 import org.apache.commons.jcs.jcache.proxy.ExceptionWrapperHandler;
30 import org.apache.commons.jcs.jcache.thread.DaemonThreadFactory;
31 import org.apache.commons.jcs.utils.serialization.StandardSerializer;
32
33 import javax.cache.Cache;
34 import javax.cache.CacheException;
35 import javax.cache.CacheManager;
36 import javax.cache.configuration.CacheEntryListenerConfiguration;
37 import javax.cache.configuration.Configuration;
38 import javax.cache.configuration.Factory;
39 import javax.cache.event.CacheEntryEvent;
40 import javax.cache.event.EventType;
41 import javax.cache.expiry.Duration;
42 import javax.cache.expiry.EternalExpiryPolicy;
43 import javax.cache.expiry.ExpiryPolicy;
44 import javax.cache.integration.CacheLoader;
45 import javax.cache.integration.CacheLoaderException;
46 import javax.cache.integration.CacheWriter;
47 import javax.cache.integration.CacheWriterException;
48 import javax.cache.integration.CompletionListener;
49 import javax.cache.processor.EntryProcessor;
50 import javax.cache.processor.EntryProcessorException;
51 import javax.cache.processor.EntryProcessorResult;
52 import javax.management.ObjectName;
53 import java.io.Closeable;
54 import java.io.IOException;
55 import java.util.Arrays;
56 import java.util.HashMap;
57 import java.util.HashSet;
58 import java.util.Iterator;
59 import java.util.Map;
60 import java.util.Properties;
61 import java.util.Set;
62 import java.util.concurrent.ConcurrentHashMap;
63 import java.util.concurrent.ConcurrentMap;
64 import java.util.concurrent.ExecutorService;
65 import java.util.concurrent.Executors;
66
67 import static org.apache.commons.jcs.jcache.Asserts.assertNotNull;
68 import static org.apache.commons.jcs.jcache.serialization.Serializations.copy;
69
70
71 public class JCSCache<K, V> implements Cache<K, V>
72 {
73 private final ExpiryAwareCache<K, V> delegate;
74 private final JCSCachingManager manager;
75 private final JCSConfiguration<K, V> config;
76 private final CacheLoader<K, V> loader;
77 private final CacheWriter<? super K, ? super V> writer;
78 private final ExpiryPolicy expiryPolicy;
79 private final ObjectName cacheConfigObjectName;
80 private final ObjectName cacheStatsObjectName;
81 private final String name;
82 private volatile boolean closed = false;
83 private final Map<CacheEntryListenerConfiguration<K, V>, JCSListener<K, V>> listeners = new ConcurrentHashMap<CacheEntryListenerConfiguration<K, V>, JCSListener<K, V>>();
84 private final Statistics statistics = new Statistics();
85 private final ExecutorService pool;
86 private final IElementSerializer serializer;
87
88
89 public JCSCache(final ClassLoader classLoader, final JCSCachingManager mgr,
90 final String cacheName, final JCSConfiguration<K, V> configuration,
91 final Properties properties, final ExpiryAwareCache<K, V> cache)
92 {
93 manager = mgr;
94
95 name = cacheName;
96
97 delegate = cache;
98 if (delegate.getElementAttributes() == null)
99 {
100 delegate.setElementAttributes(new ElementAttributes());
101 }
102 delegate.getElementAttributes().addElementEventHandler(new EvictionListener(statistics));
103
104 config = configuration;
105
106 final int poolSize = Integer.parseInt(property(properties, cacheName, "pool.size", "3"));
107 final DaemonThreadFactory threadFactory = new DaemonThreadFactory("JCS-JCache-" + cacheName + "-");
108 pool = poolSize > 0 ? Executors.newFixedThreadPool(poolSize, threadFactory) : Executors.newCachedThreadPool(threadFactory);
109
110 try
111 {
112 serializer = IElementSerializer.class.cast(classLoader.loadClass(property(properties, "serializer", cacheName, StandardSerializer.class.getName())).newInstance());
113 }
114 catch (final Exception e)
115 {
116 throw new IllegalArgumentException(e);
117 }
118
119 final Factory<CacheLoader<K, V>> cacheLoaderFactory = configuration.getCacheLoaderFactory();
120 if (cacheLoaderFactory == null)
121 {
122 loader = NoLoader.INSTANCE;
123 }
124 else
125 {
126 loader = ExceptionWrapperHandler
127 .newProxy(classLoader, cacheLoaderFactory.create(), CacheLoaderException.class, CacheLoader.class);
128 }
129
130 final Factory<CacheWriter<? super K, ? super V>> cacheWriterFactory = configuration.getCacheWriterFactory();
131 if (cacheWriterFactory == null)
132 {
133 writer = NoWriter.INSTANCE;
134 }
135 else
136 {
137 writer = ExceptionWrapperHandler
138 .newProxy(classLoader, cacheWriterFactory.create(), CacheWriterException.class, CacheWriter.class);
139 }
140
141 final Factory<ExpiryPolicy> expiryPolicyFactory = configuration.getExpiryPolicyFactory();
142 if (expiryPolicyFactory == null)
143 {
144 expiryPolicy = new EternalExpiryPolicy();
145 }
146 else
147 {
148 expiryPolicy = expiryPolicyFactory.create();
149 }
150
151 for (final CacheEntryListenerConfiguration<K, V> listener : config.getCacheEntryListenerConfigurations())
152 {
153 listeners.put(listener, new JCSListener<K, V>(listener));
154 }
155 delegate.init(this, listeners);
156
157 statistics.setActive(config.isStatisticsEnabled());
158
159 final String mgrStr = manager.getURI().toString().replaceAll(",|:|=|\n", ".");
160 final String cacheStr = name.replaceAll(",|:|=|\n", ".");
161 try
162 {
163 cacheConfigObjectName = new ObjectName("javax.cache:type=CacheConfiguration,"
164 + "CacheManager=" + mgrStr + "," + "Cache=" + cacheStr);
165 cacheStatsObjectName = new ObjectName("javax.cache:type=CacheStatistics,"
166 + "CacheManager=" + mgrStr + "," + "Cache=" + cacheStr);
167 }
168 catch (final Exception e)
169 {
170 throw new IllegalArgumentException(e);
171 }
172 if (config.isManagementEnabled())
173 {
174 JMXs.register(cacheConfigObjectName, new JCSCacheMXBean<K, V>(this));
175 }
176 if (config.isStatisticsEnabled())
177 {
178 JMXs.register(cacheStatsObjectName, new JCSCacheStatisticsMXBean(statistics));
179 }
180 }
181
182 private static String property(final Properties properties, final String cacheName, final String name, final String defaultValue)
183 {
184 return properties.getProperty(cacheName + "." + name, properties.getProperty(name, defaultValue));
185 }
186
187 private void assertNotClosed()
188 {
189 if (isClosed())
190 {
191 throw new IllegalStateException("cache closed");
192 }
193 }
194
195 @Override
196 public V get(final K key)
197 {
198 assertNotClosed();
199 assertNotNull(key, "key");
200 final long getStart = Times.now(false);
201 return doGetControllingExpiry(getStart, key, true, false, false, true);
202 }
203
204 private V doLoad(final K key, final boolean update, final long now, final boolean propagateLoadException)
205 {
206 V v = null;
207 try
208 {
209 v = loader.load(key);
210 }
211 catch (final CacheLoaderException e)
212 {
213 if (propagateLoadException)
214 {
215 throw e;
216 }
217 }
218 if (v != null)
219 {
220 final Duration duration = update ? expiryPolicy.getExpiryForUpdate() : expiryPolicy.getExpiryForCreation();
221 if (isNotZero(duration))
222 {
223 final IElementAttributes clone = delegate.getElementAttributes().clone();
224 if (ElementAttributes.class.isInstance(clone))
225 {
226 ElementAttributes.class.cast(clone).setCreateTime();
227 }
228 final ICacheElement<K, V> element = updateElement(key, v, duration, clone);
229 try
230 {
231 delegate.update(element);
232 }
233 catch (final IOException e)
234 {
235 throw new CacheException(e);
236 }
237 }
238 }
239 return v;
240 }
241
242 private ICacheElement<K, V> updateElement(final K key, final V v, final Duration duration, final IElementAttributes attrs)
243 {
244 final ICacheElement<K, V> element = new CacheElement<K, V>(name, key, v);
245 if (duration != null)
246 {
247 attrs.setTimeFactorForMilliseconds(1);
248 final boolean eternal = duration.isEternal();
249 attrs.setIsEternal(eternal);
250 if (!eternal)
251 {
252 attrs.setLastAccessTimeNow();
253 }
254
255 }
256 element.setElementAttributes(attrs);
257 return element;
258 }
259
260 private void touch(final K key, final ICacheElement<K, V> element)
261 {
262 if (config.isStoreByValue())
263 {
264 final K copy = copy(serializer, manager.getClassLoader(), key);
265 try
266 {
267 delegate.update(new CacheElement<K, V>(name, copy, element.getVal(), element.getElementAttributes()));
268 }
269 catch (final IOException e)
270 {
271 throw new CacheException(e);
272 }
273 }
274 }
275
276 @Override
277 public Map<K, V> getAll(final Set<? extends K> keys)
278 {
279 assertNotClosed();
280 for (final K k : keys)
281 {
282 assertNotNull(k, "key");
283 }
284
285 final long now = Times.now(false);
286 final Map<K, V> result = new HashMap<K, V>();
287 for (final K key : keys) {
288 assertNotNull(key, "key");
289
290 final ICacheElement<K, V> elt = delegate.get(key);
291 V val = elt != null ? elt.getVal() : null;
292 if (val == null && config.isReadThrough())
293 {
294 val = doLoad(key, false, now, false);
295 if (val != null)
296 {
297 result.put(key, val);
298 }
299 }
300 else if (elt != null)
301 {
302 final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
303 if (isNotZero(expiryForAccess))
304 {
305 touch(key, elt);
306 result.put(key, val);
307 }
308 else
309 {
310 forceExpires(key);
311 }
312 }
313 }
314 return result;
315 }
316
317 @Override
318 public boolean containsKey(final K key)
319 {
320 assertNotClosed();
321 assertNotNull(key, "key");
322 return delegate.get(key) != null;
323 }
324
325 @Override
326 public void put(final K key, final V rawValue)
327 {
328 assertNotClosed();
329 assertNotNull(key, "key");
330 assertNotNull(rawValue, "value");
331
332 final ICacheElement<K, V> oldElt = delegate.get(key);
333 final V old = oldElt != null ? oldElt.getVal() : null;
334
335 final boolean storeByValue = config.isStoreByValue();
336 final V value = storeByValue ? copy(serializer, manager.getClassLoader(), rawValue) : rawValue;
337
338 final boolean created = old == null;
339 final Duration duration = created ? expiryPolicy.getExpiryForCreation() : expiryPolicy.getExpiryForUpdate();
340 if (isNotZero(duration))
341 {
342 final boolean statisticsEnabled = config.isStatisticsEnabled();
343 final long start = Times.now(false);
344
345 final K jcsKey = storeByValue ? copy(serializer, manager.getClassLoader(), key) : key;
346 final ICacheElement<K, V> element = updateElement(
347 jcsKey, value, created ? null : duration,
348 oldElt != null ? oldElt.getElementAttributes() : delegate.getElementAttributes().clone());
349 if (created && duration != null) {
350 final IElementAttributes copy = element.getElementAttributes();
351 copy.setTimeFactorForMilliseconds(1);
352 final boolean eternal = duration.isEternal();
353 copy.setIsEternal(eternal);
354 if (ElementAttributes.class.isInstance(copy)) {
355 ElementAttributes.class.cast(copy).setCreateTime();
356 }
357 if (!eternal)
358 {
359 copy.setIsEternal(false);
360 if (duration == expiryPolicy.getExpiryForAccess())
361 {
362 element.getElementAttributes().setIdleTime(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
363 }
364 else
365 {
366 element.getElementAttributes().setMaxLife(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
367 }
368 }
369 element.setElementAttributes(copy);
370 }
371 writer.write(new JCSEntry<K, V>(jcsKey, value));
372 try
373 {
374 delegate.update(element);
375 }
376 catch (final IOException e)
377 {
378 throw new CacheException(e);
379 }
380 for (final JCSListener<K, V> listener : listeners.values())
381 {
382 if (created)
383 {
384 listener.onCreated(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
385 EventType.CREATED, null, key, value)));
386 }
387 else
388 {
389 listener.onUpdated(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
390 EventType.UPDATED, old, key, value)));
391 }
392 }
393
394 if (statisticsEnabled)
395 {
396 statistics.increasePuts(1);
397 statistics.addPutTime(System.currentTimeMillis() - start);
398 }
399 }
400 else
401 {
402 if (!created)
403 {
404 forceExpires(key);
405 }
406 }
407 }
408
409 private static boolean isNotZero(final Duration duration)
410 {
411 return duration == null || !duration.isZero();
412 }
413
414 private void forceExpires(final K cacheKey)
415 {
416 final ICacheElement<K, V> elt = delegate.get(cacheKey);
417 delegate.remove(cacheKey);
418 for (final JCSListener<K, V> listener : listeners.values())
419 {
420 listener.onExpired(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
421 EventType.REMOVED, null, cacheKey, elt.getVal())));
422 }
423 }
424
425 @Override
426 public V getAndPut(final K key, final V value)
427 {
428 assertNotClosed();
429 assertNotNull(key, "key");
430 assertNotNull(value, "value");
431 final long getStart = Times.now(false);
432 final V v = doGetControllingExpiry(getStart, key, false, false, true, false);
433 put(key, value);
434 return v;
435 }
436
437 @Override
438 public void putAll(final Map<? extends K, ? extends V> map)
439 {
440 assertNotClosed();
441 final TempStateCacheView<K, V> view = new TempStateCacheView<K, V>(this);
442 for (final Map.Entry<? extends K, ? extends V> e : map.entrySet())
443 {
444 view.put(e.getKey(), e.getValue());
445 }
446 view.merge();
447 }
448
449 @Override
450 public boolean putIfAbsent(final K key, final V value)
451 {
452 if (!containsKey(key))
453 {
454 put(key, value);
455 return true;
456 }
457 return false;
458 }
459
460 @Override
461 public boolean remove(final K key)
462 {
463 assertNotClosed();
464 assertNotNull(key, "key");
465
466 final boolean statisticsEnabled = config.isStatisticsEnabled();
467 final long start = Times.now(!statisticsEnabled);
468
469 writer.delete(key);
470 final K cacheKey = key;
471
472 final ICacheElement<K, V> v = delegate.get(cacheKey);
473 delegate.remove(cacheKey);
474
475 final V value = v != null && v.getVal() != null ? v.getVal() : null;
476 boolean remove = v != null;
477 for (final JCSListener<K, V> listener : listeners.values())
478 {
479 listener.onRemoved(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
480 EventType.REMOVED, null, key, value)));
481 }
482 if (remove && statisticsEnabled)
483 {
484 statistics.increaseRemovals(1);
485 statistics.addRemoveTime(Times.now(false) - start);
486 }
487 return remove;
488 }
489
490 @Override
491 public boolean remove(final K key, final V oldValue)
492 {
493 assertNotClosed();
494 assertNotNull(key, "key");
495 assertNotNull(oldValue, "oldValue");
496 final long getStart = Times.now(false);
497 final V v = doGetControllingExpiry(getStart, key, false, false, false, false);
498 if (oldValue.equals(v))
499 {
500 remove(key);
501 return true;
502 }
503 else if (v != null)
504 {
505
506 expiryPolicy.getExpiryForAccess();
507 }
508 return false;
509 }
510
511 @Override
512 public V getAndRemove(final K key)
513 {
514 assertNotClosed();
515 assertNotNull(key, "key");
516 final long getStart = Times.now(false);
517 final V v = doGetControllingExpiry(getStart, key, false, false, true, false);
518 remove(key);
519 return v;
520 }
521
522 private V doGetControllingExpiry(final long getStart, final K key, final boolean updateAcess, final boolean forceDoLoad, final boolean skipLoad,
523 final boolean propagateLoadException)
524 {
525 final boolean statisticsEnabled = config.isStatisticsEnabled();
526 final ICacheElement<K, V> elt = delegate.get(key);
527 V v = elt != null ? elt.getVal() : null;
528 if (v == null && (config.isReadThrough() || forceDoLoad))
529 {
530 if (!skipLoad)
531 {
532 v = doLoad(key, false, getStart, propagateLoadException);
533 }
534 }
535 else if (statisticsEnabled)
536 {
537 if (v != null)
538 {
539 statistics.increaseHits(1);
540 }
541 else
542 {
543 statistics.increaseMisses(1);
544 }
545 }
546
547 if (updateAcess && elt != null)
548 {
549 final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
550 if (!isNotZero(expiryForAccess))
551 {
552 forceExpires(key);
553 }
554 else if (expiryForAccess != null && (!elt.getElementAttributes().getIsEternal() || !expiryForAccess.isEternal()))
555 {
556 try
557 {
558 delegate.update(updateElement(key, elt.getVal(), expiryForAccess, elt.getElementAttributes()));
559 }
560 catch (final IOException e)
561 {
562 throw new CacheException(e);
563 }
564 }
565 }
566 if (statisticsEnabled && v != null)
567 {
568 statistics.addGetTime(Times.now(false) - getStart);
569 }
570 return v;
571 }
572
573 @Override
574 public boolean replace(final K key, final V oldValue, final V newValue)
575 {
576 assertNotClosed();
577 assertNotNull(key, "key");
578 assertNotNull(oldValue, "oldValue");
579 assertNotNull(newValue, "newValue");
580 final boolean statisticsEnabled = config.isStatisticsEnabled();
581 final ICacheElement<K, V> elt = delegate.get(key);
582 if (elt != null)
583 {
584 V value = elt.getVal();
585 if (value != null && statisticsEnabled)
586 {
587 statistics.increaseHits(1);
588 }
589 if (value == null && config.isReadThrough())
590 {
591 value = doLoad(key, false, Times.now(false), false);
592 }
593 if (value != null && value.equals(oldValue))
594 {
595 put(key, newValue);
596 return true;
597 }
598 else if (value != null)
599 {
600 final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
601 if (expiryForAccess != null && (!elt.getElementAttributes().getIsEternal() || !expiryForAccess.isEternal()))
602 {
603 try
604 {
605 delegate.update(updateElement(key, elt.getVal(), expiryForAccess, elt.getElementAttributes()));
606 }
607 catch (final IOException e)
608 {
609 throw new CacheException(e);
610 }
611 }
612 }
613 }
614 else if (statisticsEnabled)
615 {
616 statistics.increaseMisses(1);
617 }
618 return false;
619 }
620
621 @Override
622 public boolean replace(final K key, final V value)
623 {
624 assertNotClosed();
625 assertNotNull(key, "key");
626 assertNotNull(value, "value");
627 boolean statisticsEnabled = config.isStatisticsEnabled();
628 if (containsKey(key))
629 {
630 if (statisticsEnabled)
631 {
632 statistics.increaseHits(1);
633 }
634 put(key, value);
635 return true;
636 }
637 else if (statisticsEnabled)
638 {
639 statistics.increaseMisses(1);
640 }
641 return false;
642 }
643
644 @Override
645 public V getAndReplace(final K key, final V value)
646 {
647 assertNotClosed();
648 assertNotNull(key, "key");
649 assertNotNull(value, "value");
650
651 final boolean statisticsEnabled = config.isStatisticsEnabled();
652
653 final ICacheElement<K, V> elt = delegate.get(key);
654 if (elt != null)
655 {
656 V oldValue = elt.getVal();
657 if (oldValue == null && config.isReadThrough())
658 {
659 oldValue = doLoad(key, false, Times.now(false), false);
660 }
661 else if (statisticsEnabled)
662 {
663 statistics.increaseHits(1);
664 }
665 put(key, value);
666 return oldValue;
667 }
668 else if (statisticsEnabled)
669 {
670 statistics.increaseMisses(1);
671 }
672 return null;
673 }
674
675 @Override
676 public void removeAll(final Set<? extends K> keys)
677 {
678 assertNotClosed();
679 assertNotNull(keys, "keys");
680 for (final K k : keys)
681 {
682 remove(k);
683 }
684 }
685
686 @Override
687 public void removeAll()
688 {
689 assertNotClosed();
690 for (final K k : delegate.getKeySet())
691 {
692 remove(k);
693 }
694 }
695
696 @Override
697 public void clear()
698 {
699 assertNotClosed();
700 try
701 {
702 delegate.removeAll();
703 }
704 catch (final IOException e)
705 {
706 throw new CacheException(e);
707 }
708 }
709
710 @Override
711 public <C2 extends Configuration<K, V>> C2 getConfiguration(final Class<C2> clazz)
712 {
713 assertNotClosed();
714 return clazz.cast(config);
715 }
716
717 @Override
718 public void loadAll(final Set<? extends K> keys, final boolean replaceExistingValues, final CompletionListener completionListener)
719 {
720 assertNotClosed();
721 assertNotNull(keys, "keys");
722 for (final K k : keys)
723 {
724 assertNotNull(k, "a key");
725 }
726 pool.submit(new Runnable()
727 {
728 @Override
729 public void run()
730 {
731 doLoadAll(keys, replaceExistingValues, completionListener);
732 }
733 });
734 }
735
736 private void doLoadAll(final Set<? extends K> keys, final boolean replaceExistingValues, final CompletionListener completionListener)
737 {
738 try
739 {
740 final long now = Times.now(false);
741 for (final K k : keys)
742 {
743 if (replaceExistingValues)
744 {
745 doLoad(k, containsKey(k), now, completionListener != null);
746 continue;
747 }
748 else if (containsKey(k))
749 {
750 continue;
751 }
752 doGetControllingExpiry(now, k, true, true, false, completionListener != null);
753 }
754 }
755 catch (final RuntimeException e)
756 {
757 if (completionListener != null)
758 {
759 completionListener.onException(e);
760 return;
761 }
762 }
763 if (completionListener != null)
764 {
765 completionListener.onCompletion();
766 }
767 }
768
769 @Override
770 public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) throws EntryProcessorException
771 {
772 final TempStateCacheView<K, V> view = new TempStateCacheView<K, V>(this);
773 final T t = doInvoke(view, key, entryProcessor, arguments);
774 view.merge();
775 return t;
776 }
777
778 private <T> T doInvoke(final TempStateCacheView<K, V> view, final K key, final EntryProcessor<K, V, T> entryProcessor,
779 final Object... arguments)
780 {
781 assertNotClosed();
782 assertNotNull(entryProcessor, "entryProcessor");
783 assertNotNull(key, "key");
784 try
785 {
786 if (config.isStatisticsEnabled())
787 {
788 if (containsKey(key))
789 {
790 statistics.increaseHits(1);
791 }
792 else
793 {
794 statistics.increaseMisses(1);
795 }
796 }
797 return entryProcessor.process(new JCSMutableEntry<K, V>(view, key), arguments);
798 }
799 catch (final Exception ex)
800 {
801 return throwEntryProcessorException(ex);
802 }
803 }
804
805 private static <T> T throwEntryProcessorException(final Exception ex)
806 {
807 if (EntryProcessorException.class.isInstance(ex))
808 {
809 throw EntryProcessorException.class.cast(ex);
810 }
811 throw new EntryProcessorException(ex);
812 }
813
814 @Override
815 public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor,
816 final Object... arguments)
817 {
818 assertNotClosed();
819 assertNotNull(entryProcessor, "entryProcessor");
820 final Map<K, EntryProcessorResult<T>> results = new HashMap<K, EntryProcessorResult<T>>();
821 for (final K k : keys)
822 {
823 try
824 {
825 final T invoke = invoke(k, entryProcessor, arguments);
826 if (invoke != null)
827 {
828 results.put(k, new EntryProcessorResult<T>()
829 {
830 @Override
831 public T get() throws EntryProcessorException
832 {
833 return invoke;
834 }
835 });
836 }
837 }
838 catch (final Exception e)
839 {
840 results.put(k, new EntryProcessorResult<T>()
841 {
842 @Override
843 public T get() throws EntryProcessorException
844 {
845 return throwEntryProcessorException(e);
846 }
847 });
848 }
849 }
850 return results;
851 }
852
853 @Override
854 public void registerCacheEntryListener(final CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
855 {
856 assertNotClosed();
857 if (listeners.containsKey(cacheEntryListenerConfiguration))
858 {
859 throw new IllegalArgumentException(cacheEntryListenerConfiguration + " already registered");
860 }
861 listeners.put(cacheEntryListenerConfiguration, new JCSListener<K, V>(cacheEntryListenerConfiguration));
862 config.addListener(cacheEntryListenerConfiguration);
863 }
864
865 @Override
866 public void deregisterCacheEntryListener(final CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
867 {
868 assertNotClosed();
869 listeners.remove(cacheEntryListenerConfiguration);
870 config.removeListener(cacheEntryListenerConfiguration);
871 }
872
873 @Override
874 public Iterator<Entry<K, V>> iterator()
875 {
876 assertNotClosed();
877 final Iterator<K> keys = new HashSet<K>(delegate.getKeySet()).iterator();
878 return new Iterator<Entry<K, V>>()
879 {
880 private K lastKey = null;
881
882 @Override
883 public boolean hasNext()
884 {
885 return keys.hasNext();
886 }
887
888 @Override
889 public Entry<K, V> next()
890 {
891 lastKey = keys.next();
892 return new JCSEntry<K, V>(lastKey, get(lastKey));
893 }
894
895 @Override
896 public void remove()
897 {
898 if (isClosed() || lastKey == null)
899 {
900 throw new IllegalStateException(isClosed() ? "cache closed" : "call next() before remove()");
901 }
902 JCSCache.this.remove(lastKey);
903 }
904 };
905 }
906
907 @Override
908 public String getName()
909 {
910 assertNotClosed();
911 return name;
912 }
913
914 @Override
915 public CacheManager getCacheManager()
916 {
917 assertNotClosed();
918 return manager;
919 }
920
921 @Override
922 public synchronized void close()
923 {
924 if (isClosed())
925 {
926 return;
927 }
928
929 for (final Runnable task : pool.shutdownNow()) {
930 task.run();
931 }
932
933 manager.release(getName());
934 closed = true;
935 close(loader);
936 close(writer);
937 close(expiryPolicy);
938 for (final JCSListener<K, V> listener : listeners.values())
939 {
940 close(listener);
941 }
942 listeners.clear();
943 JMXs.unregister(cacheConfigObjectName);
944 JMXs.unregister(cacheStatsObjectName);
945 try
946 {
947 delegate.removeAll();
948 }
949 catch (final IOException e)
950 {
951 throw new CacheException(e);
952 }
953 }
954
955 private static void close(final Object potentiallyCloseable)
956 {
957 if (Closeable.class.isInstance(potentiallyCloseable))
958 {
959 Closeable.class.cast(potentiallyCloseable);
960 }
961 }
962
963 @Override
964 public boolean isClosed()
965 {
966 return closed;
967 }
968
969 @Override
970 public <T> T unwrap(final Class<T> clazz)
971 {
972 assertNotClosed();
973 if (clazz.isInstance(this))
974 {
975 return clazz.cast(this);
976 }
977 if (clazz.isAssignableFrom(Map.class) || clazz.isAssignableFrom(ConcurrentMap.class))
978 {
979 return clazz.cast(delegate);
980 }
981 throw new IllegalArgumentException(clazz.getName() + " not supported in unwrap");
982 }
983
984 public Statistics getStatistics()
985 {
986 return statistics;
987 }
988
989 public void enableManagement()
990 {
991 config.managementEnabled();
992 JMXs.register(cacheConfigObjectName, new JCSCacheMXBean<K, V>(this));
993 }
994
995 public void disableManagement()
996 {
997 config.managementDisabled();
998 JMXs.unregister(cacheConfigObjectName);
999 }
1000
1001 public void enableStatistics()
1002 {
1003 config.statisticsEnabled();
1004 statistics.setActive(true);
1005 JMXs.register(cacheStatsObjectName, new JCSCacheStatisticsMXBean(statistics));
1006 }
1007
1008 public void disableStatistics()
1009 {
1010 config.statisticsDisabled();
1011 statistics.setActive(false);
1012 JMXs.unregister(cacheStatsObjectName);
1013 }
1014 }