View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.commons.jcs.jcache;
20  
21  import org.apache.commons.jcs.engine.CacheElement;
22  import org.apache.commons.jcs.engine.ElementAttributes;
23  import org.apache.commons.jcs.engine.behavior.ICacheElement;
24  import org.apache.commons.jcs.engine.behavior.IElementAttributes;
25  import org.apache.commons.jcs.engine.behavior.IElementSerializer;
26  import org.apache.commons.jcs.jcache.jmx.JCSCacheMXBean;
27  import org.apache.commons.jcs.jcache.jmx.JCSCacheStatisticsMXBean;
28  import org.apache.commons.jcs.jcache.jmx.JMXs;
29  import org.apache.commons.jcs.jcache.proxy.ExceptionWrapperHandler;
30  import org.apache.commons.jcs.jcache.thread.DaemonThreadFactory;
31  import org.apache.commons.jcs.utils.serialization.StandardSerializer;
32  
33  import javax.cache.Cache;
34  import javax.cache.CacheException;
35  import javax.cache.CacheManager;
36  import javax.cache.configuration.CacheEntryListenerConfiguration;
37  import javax.cache.configuration.Configuration;
38  import javax.cache.configuration.Factory;
39  import javax.cache.event.CacheEntryEvent;
40  import javax.cache.event.EventType;
41  import javax.cache.expiry.Duration;
42  import javax.cache.expiry.EternalExpiryPolicy;
43  import javax.cache.expiry.ExpiryPolicy;
44  import javax.cache.integration.CacheLoader;
45  import javax.cache.integration.CacheLoaderException;
46  import javax.cache.integration.CacheWriter;
47  import javax.cache.integration.CacheWriterException;
48  import javax.cache.integration.CompletionListener;
49  import javax.cache.processor.EntryProcessor;
50  import javax.cache.processor.EntryProcessorException;
51  import javax.cache.processor.EntryProcessorResult;
52  import javax.management.ObjectName;
53  import java.io.Closeable;
54  import java.io.IOException;
55  import java.util.Arrays;
56  import java.util.HashMap;
57  import java.util.HashSet;
58  import java.util.Iterator;
59  import java.util.Map;
60  import java.util.Properties;
61  import java.util.Set;
62  import java.util.concurrent.ConcurrentHashMap;
63  import java.util.concurrent.ConcurrentMap;
64  import java.util.concurrent.ExecutorService;
65  import java.util.concurrent.Executors;
66  
67  import static org.apache.commons.jcs.jcache.Asserts.assertNotNull;
68  import static org.apache.commons.jcs.jcache.serialization.Serializations.copy;
69  
70  // TODO: configure serializer
71  public class JCSCache<K, V> implements Cache<K, V>
72  {
73      private final ExpiryAwareCache<K, V> delegate;
74      private final JCSCachingManager manager;
75      private final JCSConfiguration<K, V> config;
76      private final CacheLoader<K, V> loader;
77      private final CacheWriter<? super K, ? super V> writer;
78      private final ExpiryPolicy expiryPolicy;
79      private final ObjectName cacheConfigObjectName;
80      private final ObjectName cacheStatsObjectName;
81      private final String name;
82      private volatile boolean closed = false;
83      private final Map<CacheEntryListenerConfiguration<K, V>, JCSListener<K, V>> listeners = new ConcurrentHashMap<CacheEntryListenerConfiguration<K, V>, JCSListener<K, V>>();
84      private final Statistics statistics = new Statistics();
85      private final ExecutorService pool;
86      private final IElementSerializer serializer; // using json/xml should work as well -> don't force Serializable
87  
88  
89      public JCSCache(final ClassLoader classLoader, final JCSCachingManager mgr,
90                      final String cacheName, final JCSConfiguration<K, V> configuration,
91                      final Properties properties, final ExpiryAwareCache<K, V> cache)
92      {
93          manager = mgr;
94  
95          name = cacheName;
96  
97          delegate = cache;
98          if (delegate.getElementAttributes() == null)
99          {
100             delegate.setElementAttributes(new ElementAttributes());
101         }
102         delegate.getElementAttributes().addElementEventHandler(new EvictionListener(statistics));
103 
104         config = configuration;
105 
106         final int poolSize = Integer.parseInt(property(properties, cacheName, "pool.size", "3"));
107         final DaemonThreadFactory threadFactory = new DaemonThreadFactory("JCS-JCache-" + cacheName + "-");
108         pool = poolSize > 0 ? Executors.newFixedThreadPool(poolSize, threadFactory) : Executors.newCachedThreadPool(threadFactory);
109 
110         try
111         {
112             serializer = IElementSerializer.class.cast(classLoader.loadClass(property(properties, "serializer", cacheName, StandardSerializer.class.getName())).newInstance());
113         }
114         catch (final Exception e)
115         {
116             throw new IllegalArgumentException(e);
117         }
118 
119         final Factory<CacheLoader<K, V>> cacheLoaderFactory = configuration.getCacheLoaderFactory();
120         if (cacheLoaderFactory == null)
121         {
122             loader = NoLoader.INSTANCE;
123         }
124         else
125         {
126             loader = ExceptionWrapperHandler
127                     .newProxy(classLoader, cacheLoaderFactory.create(), CacheLoaderException.class, CacheLoader.class);
128         }
129 
130         final Factory<CacheWriter<? super K, ? super V>> cacheWriterFactory = configuration.getCacheWriterFactory();
131         if (cacheWriterFactory == null)
132         {
133             writer = NoWriter.INSTANCE;
134         }
135         else
136         {
137             writer = ExceptionWrapperHandler
138                     .newProxy(classLoader, cacheWriterFactory.create(), CacheWriterException.class, CacheWriter.class);
139         }
140 
141         final Factory<ExpiryPolicy> expiryPolicyFactory = configuration.getExpiryPolicyFactory();
142         if (expiryPolicyFactory == null)
143         {
144             expiryPolicy = new EternalExpiryPolicy();
145         }
146         else
147         {
148             expiryPolicy = expiryPolicyFactory.create();
149         }
150 
151         for (final CacheEntryListenerConfiguration<K, V> listener : config.getCacheEntryListenerConfigurations())
152         {
153             listeners.put(listener, new JCSListener<K, V>(listener));
154         }
155         delegate.init(this, listeners);
156 
157         statistics.setActive(config.isStatisticsEnabled());
158 
159         final String mgrStr = manager.getURI().toString().replaceAll(",|:|=|\n", ".");
160         final String cacheStr = name.replaceAll(",|:|=|\n", ".");
161         try
162         {
163             cacheConfigObjectName = new ObjectName("javax.cache:type=CacheConfiguration,"
164                     + "CacheManager=" + mgrStr + "," + "Cache=" + cacheStr);
165             cacheStatsObjectName = new ObjectName("javax.cache:type=CacheStatistics,"
166                     + "CacheManager=" + mgrStr + "," + "Cache=" + cacheStr);
167         }
168         catch (final Exception e)
169         {
170             throw new IllegalArgumentException(e);
171         }
172         if (config.isManagementEnabled())
173         {
174             JMXs.register(cacheConfigObjectName, new JCSCacheMXBean<K, V>(this));
175         }
176         if (config.isStatisticsEnabled())
177         {
178             JMXs.register(cacheStatsObjectName, new JCSCacheStatisticsMXBean(statistics));
179         }
180     }
181 
182     private static String property(final Properties properties, final String cacheName, final String name, final String defaultValue)
183     {
184         return properties.getProperty(cacheName + "." + name, properties.getProperty(name, defaultValue));
185     }
186 
187     private void assertNotClosed()
188     {
189         if (isClosed())
190         {
191             throw new IllegalStateException("cache closed");
192         }
193     }
194 
195     @Override
196     public V get(final K key)
197     {
198         assertNotClosed();
199         assertNotNull(key, "key");
200         final long getStart = Times.now(false);
201         return doGetControllingExpiry(getStart, key, true, false, false, true);
202     }
203 
204     private V doLoad(final K key, final boolean update, final long now, final boolean propagateLoadException)
205     {
206         V v = null;
207         try
208         {
209             v = loader.load(key);
210         }
211         catch (final CacheLoaderException e)
212         {
213             if (propagateLoadException)
214             {
215                 throw e;
216             }
217         }
218         if (v != null)
219         {
220             final Duration duration = update ? expiryPolicy.getExpiryForUpdate() : expiryPolicy.getExpiryForCreation();
221             if (isNotZero(duration))
222             {
223                 final IElementAttributes clone = delegate.getElementAttributes().clone();
224                 if (ElementAttributes.class.isInstance(clone))
225                 {
226                     ElementAttributes.class.cast(clone).setCreateTime();
227                 }
228                 final ICacheElement<K, V> element = updateElement(key, v, duration, clone);
229                 try
230                 {
231                     delegate.update(element);
232                 }
233                 catch (final IOException e)
234                 {
235                     throw new CacheException(e);
236                 }
237             }
238         }
239         return v;
240     }
241 
242     private ICacheElement<K, V> updateElement(final K key, final V v, final Duration duration, final IElementAttributes attrs)
243     {
244         final ICacheElement<K, V> element = new CacheElement<K, V>(name, key, v);
245         if (duration != null)
246         {
247             attrs.setTimeFactorForMilliseconds(1);
248             final boolean eternal = duration.isEternal();
249             attrs.setIsEternal(eternal);
250             if (!eternal)
251             {
252                 attrs.setLastAccessTimeNow();
253             }
254             // MaxLife = -1 to use IdleTime excepted if jcache.ccf asked for something else
255         }
256         element.setElementAttributes(attrs);
257         return element;
258     }
259 
260     private void touch(final K key, final ICacheElement<K, V> element)
261     {
262         if (config.isStoreByValue())
263         {
264             final K copy = copy(serializer, manager.getClassLoader(), key);
265             try
266             {
267                 delegate.update(new CacheElement<K, V>(name, copy, element.getVal(), element.getElementAttributes()));
268             }
269             catch (final IOException e)
270             {
271                 throw new CacheException(e);
272             }
273         }
274     }
275 
276     @Override
277     public Map<K, V> getAll(final Set<? extends K> keys)
278     {
279         assertNotClosed();
280         for (final K k : keys)
281         {
282             assertNotNull(k, "key");
283         }
284 
285         final long now = Times.now(false);
286         final Map<K, V> result = new HashMap<K, V>();
287         for (final K key : keys) {
288             assertNotNull(key, "key");
289 
290             final ICacheElement<K, V> elt = delegate.get(key);
291             V val = elt != null ? elt.getVal() : null;
292             if (val == null && config.isReadThrough())
293             {
294                 val = doLoad(key, false, now, false);
295                 if (val != null)
296                 {
297                     result.put(key, val);
298                 }
299             }
300             else if (elt != null)
301             {
302                 final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
303                 if (isNotZero(expiryForAccess))
304                 {
305                     touch(key, elt);
306                     result.put(key, val);
307                 }
308                 else
309                 {
310                     forceExpires(key);
311                 }
312             }
313         }
314         return result;
315     }
316 
317     @Override
318     public boolean containsKey(final K key)
319     {
320         assertNotClosed();
321         assertNotNull(key, "key");
322         return delegate.get(key) != null;
323     }
324 
325     @Override
326     public void put(final K key, final V rawValue)
327     {
328         assertNotClosed();
329         assertNotNull(key, "key");
330         assertNotNull(rawValue, "value");
331 
332         final ICacheElement<K, V> oldElt = delegate.get(key);
333         final V old = oldElt != null ? oldElt.getVal() : null;
334 
335         final boolean storeByValue = config.isStoreByValue();
336         final V value = storeByValue ? copy(serializer, manager.getClassLoader(), rawValue) : rawValue;
337 
338         final boolean created = old == null;
339         final Duration duration = created ? expiryPolicy.getExpiryForCreation() : expiryPolicy.getExpiryForUpdate();
340         if (isNotZero(duration))
341         {
342             final boolean statisticsEnabled = config.isStatisticsEnabled();
343             final long start = Times.now(false);
344 
345             final K jcsKey = storeByValue ? copy(serializer, manager.getClassLoader(), key) : key;
346             final ICacheElement<K, V> element = updateElement( // reuse it to create basic structure
347                     jcsKey, value, created ? null : duration,
348                     oldElt != null ? oldElt.getElementAttributes() : delegate.getElementAttributes().clone());
349             if (created && duration != null) { // set maxLife
350                 final IElementAttributes copy = element.getElementAttributes();
351                 copy.setTimeFactorForMilliseconds(1);
352                 final boolean eternal = duration.isEternal();
353                 copy.setIsEternal(eternal);
354                 if (ElementAttributes.class.isInstance(copy)) {
355                     ElementAttributes.class.cast(copy).setCreateTime();
356                 }
357                 if (!eternal)
358                 {
359                     copy.setIsEternal(false);
360                     if (duration == expiryPolicy.getExpiryForAccess())
361                     {
362                         element.getElementAttributes().setIdleTime(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
363                     }
364                     else
365                         {
366                         element.getElementAttributes().setMaxLife(duration.getTimeUnit().toMillis(duration.getDurationAmount()));
367                     }
368                 }
369                 element.setElementAttributes(copy);
370             }
371             writer.write(new JCSEntry<K, V>(jcsKey, value));
372             try
373             {
374                 delegate.update(element);
375             }
376             catch (final IOException e)
377             {
378                 throw new CacheException(e);
379             }
380             for (final JCSListener<K, V> listener : listeners.values())
381             {
382                 if (created)
383                 {
384                     listener.onCreated(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
385                             EventType.CREATED, null, key, value)));
386                 }
387                 else
388                 {
389                     listener.onUpdated(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
390                             EventType.UPDATED, old, key, value)));
391                 }
392             }
393 
394             if (statisticsEnabled)
395             {
396                 statistics.increasePuts(1);
397                 statistics.addPutTime(System.currentTimeMillis() - start);
398             }
399         }
400         else
401         {
402             if (!created)
403             {
404                 forceExpires(key);
405             }
406         }
407     }
408 
409     private static boolean isNotZero(final Duration duration)
410     {
411         return duration == null || !duration.isZero();
412     }
413 
414     private void forceExpires(final K cacheKey)
415     {
416         final ICacheElement<K, V> elt = delegate.get(cacheKey);
417         delegate.remove(cacheKey);
418         for (final JCSListener<K, V> listener : listeners.values())
419         {
420             listener.onExpired(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
421                     EventType.REMOVED, null, cacheKey, elt.getVal())));
422         }
423     }
424 
425     @Override
426     public V getAndPut(final K key, final V value)
427     {
428         assertNotClosed();
429         assertNotNull(key, "key");
430         assertNotNull(value, "value");
431         final long getStart = Times.now(false);
432         final V v = doGetControllingExpiry(getStart, key, false, false, true, false);
433         put(key, value);
434         return v;
435     }
436 
437     @Override
438     public void putAll(final Map<? extends K, ? extends V> map)
439     {
440         assertNotClosed();
441         final TempStateCacheView<K, V> view = new TempStateCacheView<K, V>(this);
442         for (final Map.Entry<? extends K, ? extends V> e : map.entrySet())
443         {
444             view.put(e.getKey(), e.getValue());
445         }
446         view.merge();
447     }
448 
449     @Override
450     public boolean putIfAbsent(final K key, final V value)
451     {
452         if (!containsKey(key))
453         {
454             put(key, value);
455             return true;
456         }
457         return false;
458     }
459 
460     @Override
461     public boolean remove(final K key)
462     {
463         assertNotClosed();
464         assertNotNull(key, "key");
465 
466         final boolean statisticsEnabled = config.isStatisticsEnabled();
467         final long start = Times.now(!statisticsEnabled);
468 
469         writer.delete(key);
470         final K cacheKey = key;
471 
472         final ICacheElement<K, V> v = delegate.get(cacheKey);
473         delegate.remove(cacheKey);
474 
475         final V value = v != null && v.getVal() != null ? v.getVal() : null;
476         boolean remove = v != null;
477         for (final JCSListener<K, V> listener : listeners.values())
478         {
479             listener.onRemoved(Arrays.<CacheEntryEvent<? extends K, ? extends V>> asList(new JCSCacheEntryEvent<K, V>(this,
480                     EventType.REMOVED, null, key, value)));
481         }
482         if (remove && statisticsEnabled)
483         {
484             statistics.increaseRemovals(1);
485             statistics.addRemoveTime(Times.now(false) - start);
486         }
487         return remove;
488     }
489 
490     @Override
491     public boolean remove(final K key, final V oldValue)
492     {
493         assertNotClosed();
494         assertNotNull(key, "key");
495         assertNotNull(oldValue, "oldValue");
496         final long getStart = Times.now(false);
497         final V v = doGetControllingExpiry(getStart, key, false, false, false, false);
498         if (oldValue.equals(v))
499         {
500             remove(key);
501             return true;
502         }
503         else if (v != null)
504         {
505             // weird but just for stats to be right (org.jsr107.tck.expiry.CacheExpiryTest.removeSpecifiedEntryShouldNotCallExpiryPolicyMethods())
506             expiryPolicy.getExpiryForAccess();
507         }
508         return false;
509     }
510 
511     @Override
512     public V getAndRemove(final K key)
513     {
514         assertNotClosed();
515         assertNotNull(key, "key");
516         final long getStart = Times.now(false);
517         final V v = doGetControllingExpiry(getStart, key, false, false, true, false);
518         remove(key);
519         return v;
520     }
521 
522     private V doGetControllingExpiry(final long getStart, final K key, final boolean updateAcess, final boolean forceDoLoad, final boolean skipLoad,
523             final boolean propagateLoadException)
524     {
525         final boolean statisticsEnabled = config.isStatisticsEnabled();
526         final ICacheElement<K, V> elt = delegate.get(key);
527         V v = elt != null ? elt.getVal() : null;
528         if (v == null && (config.isReadThrough() || forceDoLoad))
529         {
530             if (!skipLoad)
531             {
532                 v = doLoad(key, false, getStart, propagateLoadException);
533             }
534         }
535         else if (statisticsEnabled)
536         {
537             if (v != null)
538             {
539                 statistics.increaseHits(1);
540             }
541             else
542             {
543                 statistics.increaseMisses(1);
544             }
545         }
546 
547         if (updateAcess && elt != null)
548         {
549             final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
550             if (!isNotZero(expiryForAccess))
551             {
552                 forceExpires(key);
553             }
554             else if (expiryForAccess != null && (!elt.getElementAttributes().getIsEternal() || !expiryForAccess.isEternal()))
555             {
556                 try
557                 {
558                     delegate.update(updateElement(key, elt.getVal(), expiryForAccess, elt.getElementAttributes()));
559                 }
560                 catch (final IOException e)
561                 {
562                     throw new CacheException(e);
563                 }
564             }
565         }
566         if (statisticsEnabled && v != null)
567         {
568             statistics.addGetTime(Times.now(false) - getStart);
569         }
570         return v;
571     }
572 
573     @Override
574     public boolean replace(final K key, final V oldValue, final V newValue)
575     {
576         assertNotClosed();
577         assertNotNull(key, "key");
578         assertNotNull(oldValue, "oldValue");
579         assertNotNull(newValue, "newValue");
580         final boolean statisticsEnabled = config.isStatisticsEnabled();
581         final ICacheElement<K, V> elt = delegate.get(key);
582         if (elt != null)
583         {
584             V value = elt.getVal();
585             if (value != null && statisticsEnabled)
586             {
587                 statistics.increaseHits(1);
588             }
589             if (value == null && config.isReadThrough())
590             {
591                 value = doLoad(key, false, Times.now(false), false);
592             }
593             if (value != null && value.equals(oldValue))
594             {
595                 put(key, newValue);
596                 return true;
597             }
598             else if (value != null)
599             {
600                 final Duration expiryForAccess = expiryPolicy.getExpiryForAccess();
601                 if (expiryForAccess != null && (!elt.getElementAttributes().getIsEternal() || !expiryForAccess.isEternal()))
602                 {
603                     try
604                     {
605                         delegate.update(updateElement(key, elt.getVal(), expiryForAccess, elt.getElementAttributes()));
606                     }
607                     catch (final IOException e)
608                     {
609                         throw new CacheException(e);
610                     }
611                 }
612             }
613         }
614         else if (statisticsEnabled)
615         {
616             statistics.increaseMisses(1);
617         }
618         return false;
619     }
620 
621     @Override
622     public boolean replace(final K key, final V value)
623     {
624         assertNotClosed();
625         assertNotNull(key, "key");
626         assertNotNull(value, "value");
627         boolean statisticsEnabled = config.isStatisticsEnabled();
628         if (containsKey(key))
629         {
630             if (statisticsEnabled)
631             {
632                 statistics.increaseHits(1);
633             }
634             put(key, value);
635             return true;
636         }
637         else if (statisticsEnabled)
638         {
639             statistics.increaseMisses(1);
640         }
641         return false;
642     }
643 
644     @Override
645     public V getAndReplace(final K key, final V value)
646     {
647         assertNotClosed();
648         assertNotNull(key, "key");
649         assertNotNull(value, "value");
650 
651         final boolean statisticsEnabled = config.isStatisticsEnabled();
652 
653         final ICacheElement<K, V> elt = delegate.get(key);
654         if (elt != null)
655         {
656             V oldValue = elt.getVal();
657             if (oldValue == null && config.isReadThrough())
658             {
659                 oldValue = doLoad(key, false, Times.now(false), false);
660             }
661             else if (statisticsEnabled)
662             {
663                 statistics.increaseHits(1);
664             }
665             put(key, value);
666             return oldValue;
667         }
668         else if (statisticsEnabled)
669         {
670             statistics.increaseMisses(1);
671         }
672         return null;
673     }
674 
675     @Override
676     public void removeAll(final Set<? extends K> keys)
677     {
678         assertNotClosed();
679         assertNotNull(keys, "keys");
680         for (final K k : keys)
681         {
682             remove(k);
683         }
684     }
685 
686     @Override
687     public void removeAll()
688     {
689         assertNotClosed();
690         for (final K k : delegate.getKeySet())
691         {
692             remove(k);
693         }
694     }
695 
696     @Override
697     public void clear()
698     {
699         assertNotClosed();
700         try
701         {
702             delegate.removeAll();
703         }
704         catch (final IOException e)
705         {
706             throw new CacheException(e);
707         }
708     }
709 
710     @Override
711     public <C2 extends Configuration<K, V>> C2 getConfiguration(final Class<C2> clazz)
712     {
713         assertNotClosed();
714         return clazz.cast(config);
715     }
716 
717     @Override
718     public void loadAll(final Set<? extends K> keys, final boolean replaceExistingValues, final CompletionListener completionListener)
719     {
720         assertNotClosed();
721         assertNotNull(keys, "keys");
722         for (final K k : keys)
723         {
724             assertNotNull(k, "a key");
725         }
726         pool.submit(new Runnable()
727         {
728             @Override
729             public void run()
730             {
731                 doLoadAll(keys, replaceExistingValues, completionListener);
732             }
733         });
734     }
735 
736     private void doLoadAll(final Set<? extends K> keys, final boolean replaceExistingValues, final CompletionListener completionListener)
737     {
738         try
739         {
740             final long now = Times.now(false);
741             for (final K k : keys)
742             {
743                 if (replaceExistingValues)
744                 {
745                     doLoad(k, containsKey(k), now, completionListener != null);
746                     continue;
747                 }
748                 else if (containsKey(k))
749                 {
750                     continue;
751                 }
752                 doGetControllingExpiry(now, k, true, true, false, completionListener != null);
753             }
754         }
755         catch (final RuntimeException e)
756         {
757             if (completionListener != null)
758             {
759                 completionListener.onException(e);
760                 return;
761             }
762         }
763         if (completionListener != null)
764         {
765             completionListener.onCompletion();
766         }
767     }
768 
769     @Override
770     public <T> T invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... arguments) throws EntryProcessorException
771     {
772         final TempStateCacheView<K, V> view = new TempStateCacheView<K, V>(this);
773         final T t = doInvoke(view, key, entryProcessor, arguments);
774         view.merge();
775         return t;
776     }
777 
778     private <T> T doInvoke(final TempStateCacheView<K, V> view, final K key, final EntryProcessor<K, V, T> entryProcessor,
779             final Object... arguments)
780     {
781         assertNotClosed();
782         assertNotNull(entryProcessor, "entryProcessor");
783         assertNotNull(key, "key");
784         try
785         {
786             if (config.isStatisticsEnabled())
787             {
788                 if (containsKey(key))
789                 {
790                     statistics.increaseHits(1);
791                 }
792                 else
793                 {
794                     statistics.increaseMisses(1);
795                 }
796             }
797             return entryProcessor.process(new JCSMutableEntry<K, V>(view, key), arguments);
798         }
799         catch (final Exception ex)
800         {
801             return throwEntryProcessorException(ex);
802         }
803     }
804 
805     private static <T> T throwEntryProcessorException(final Exception ex)
806     {
807         if (EntryProcessorException.class.isInstance(ex))
808         {
809             throw EntryProcessorException.class.cast(ex);
810         }
811         throw new EntryProcessorException(ex);
812     }
813 
814     @Override
815     public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, final EntryProcessor<K, V, T> entryProcessor,
816             final Object... arguments)
817     {
818         assertNotClosed();
819         assertNotNull(entryProcessor, "entryProcessor");
820         final Map<K, EntryProcessorResult<T>> results = new HashMap<K, EntryProcessorResult<T>>();
821         for (final K k : keys)
822         {
823             try
824             {
825                 final T invoke = invoke(k, entryProcessor, arguments);
826                 if (invoke != null)
827                 {
828                     results.put(k, new EntryProcessorResult<T>()
829                     {
830                         @Override
831                         public T get() throws EntryProcessorException
832                         {
833                             return invoke;
834                         }
835                     });
836                 }
837             }
838             catch (final Exception e)
839             {
840                 results.put(k, new EntryProcessorResult<T>()
841                 {
842                     @Override
843                     public T get() throws EntryProcessorException
844                     {
845                         return throwEntryProcessorException(e);
846                     }
847                 });
848             }
849         }
850         return results;
851     }
852 
853     @Override
854     public void registerCacheEntryListener(final CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
855     {
856         assertNotClosed();
857         if (listeners.containsKey(cacheEntryListenerConfiguration))
858         {
859             throw new IllegalArgumentException(cacheEntryListenerConfiguration + " already registered");
860         }
861         listeners.put(cacheEntryListenerConfiguration, new JCSListener<K, V>(cacheEntryListenerConfiguration));
862         config.addListener(cacheEntryListenerConfiguration);
863     }
864 
865     @Override
866     public void deregisterCacheEntryListener(final CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration)
867     {
868         assertNotClosed();
869         listeners.remove(cacheEntryListenerConfiguration);
870         config.removeListener(cacheEntryListenerConfiguration);
871     }
872 
873     @Override
874     public Iterator<Entry<K, V>> iterator()
875     {
876         assertNotClosed();
877         final Iterator<K> keys = new HashSet<K>(delegate.getKeySet()).iterator();
878         return new Iterator<Entry<K, V>>()
879         {
880             private K lastKey = null;
881 
882             @Override
883             public boolean hasNext()
884             {
885                 return keys.hasNext();
886             }
887 
888             @Override
889             public Entry<K, V> next()
890             {
891                 lastKey = keys.next();
892                 return new JCSEntry<K, V>(lastKey, get(lastKey));
893             }
894 
895             @Override
896             public void remove()
897             {
898                 if (isClosed() || lastKey == null)
899                 {
900                     throw new IllegalStateException(isClosed() ? "cache closed" : "call next() before remove()");
901                 }
902                 JCSCache.this.remove(lastKey);
903             }
904         };
905     }
906 
907     @Override
908     public String getName()
909     {
910         assertNotClosed();
911         return name;
912     }
913 
914     @Override
915     public CacheManager getCacheManager()
916     {
917         assertNotClosed();
918         return manager;
919     }
920 
921     @Override
922     public synchronized void close()
923     {
924         if (isClosed())
925         {
926             return;
927         }
928 
929         for (final Runnable task : pool.shutdownNow()) {
930             task.run();
931         }
932 
933         manager.release(getName());
934         closed = true;
935         close(loader);
936         close(writer);
937         close(expiryPolicy);
938         for (final JCSListener<K, V> listener : listeners.values())
939         {
940             close(listener);
941         }
942         listeners.clear();
943         JMXs.unregister(cacheConfigObjectName);
944         JMXs.unregister(cacheStatsObjectName);
945         try
946         {
947             delegate.removeAll();
948         }
949         catch (final IOException e)
950         {
951             throw new CacheException(e);
952         }
953     }
954 
955     private static void close(final Object potentiallyCloseable)
956     {
957         if (Closeable.class.isInstance(potentiallyCloseable))
958         {
959             Closeable.class.cast(potentiallyCloseable);
960         }
961     }
962 
963     @Override
964     public boolean isClosed()
965     {
966         return closed;
967     }
968 
969     @Override
970     public <T> T unwrap(final Class<T> clazz)
971     {
972         assertNotClosed();
973         if (clazz.isInstance(this))
974         {
975             return clazz.cast(this);
976         }
977         if (clazz.isAssignableFrom(Map.class) || clazz.isAssignableFrom(ConcurrentMap.class))
978         {
979             return clazz.cast(delegate);
980         }
981         throw new IllegalArgumentException(clazz.getName() + " not supported in unwrap");
982     }
983 
984     public Statistics getStatistics()
985     {
986         return statistics;
987     }
988 
989     public void enableManagement()
990     {
991         config.managementEnabled();
992         JMXs.register(cacheConfigObjectName, new JCSCacheMXBean<K, V>(this));
993     }
994 
995     public void disableManagement()
996     {
997         config.managementDisabled();
998         JMXs.unregister(cacheConfigObjectName);
999     }
1000 
1001     public void enableStatistics()
1002     {
1003         config.statisticsEnabled();
1004         statistics.setActive(true);
1005         JMXs.register(cacheStatsObjectName, new JCSCacheStatisticsMXBean(statistics));
1006     }
1007 
1008     public void disableStatistics()
1009     {
1010         config.statisticsDisabled();
1011         statistics.setActive(false);
1012         JMXs.unregister(cacheStatsObjectName);
1013     }
1014 }