View Javadoc
1   package org.apache.commons.jcs3.auxiliary.disk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashSet;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.locks.ReentrantReadWriteLock;
31  
32  import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging;
33  import org.apache.commons.jcs3.auxiliary.AuxiliaryCache;
34  import org.apache.commons.jcs3.auxiliary.disk.behavior.IDiskCacheAttributes;
35  import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
36  import org.apache.commons.jcs3.engine.CacheInfo;
37  import org.apache.commons.jcs3.engine.CacheStatus;
38  import org.apache.commons.jcs3.engine.behavior.ICache;
39  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
40  import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
41  import org.apache.commons.jcs3.engine.behavior.ICacheListener;
42  import org.apache.commons.jcs3.engine.stats.StatElement;
43  import org.apache.commons.jcs3.engine.stats.Stats;
44  import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
45  import org.apache.commons.jcs3.engine.stats.behavior.IStats;
46  import org.apache.commons.jcs3.log.Log;
47  import org.apache.commons.jcs3.log.LogManager;
48  import org.apache.commons.jcs3.utils.struct.LRUMap;
49  
50  /**
51   * Abstract class providing a base implementation of a disk cache, which can be easily extended to
52   * implement a disk cache for a specific persistence mechanism.
53   *
54   * When implementing the abstract methods note that while this base class handles most things, it
55   * does not acquire or release any locks. Implementations should do so as necessary. This is mainly
56   * done to minimize the time spent in critical sections.
57   *
58   * Error handling in this class needs to be addressed. Currently if an exception is thrown by the
59   * persistence mechanism, this class destroys the event queue. Should it also destroy purgatory?
60   * Should it dispose itself?
61   */
62  public abstract class AbstractDiskCache<K, V>
63      extends AbstractAuxiliaryCacheEventLogging<K, V>
64  {
65      /** The logger */
66      private static final Log log = LogManager.getLog( AbstractDiskCache.class );
67  
68      /** Generic disk cache attributes */
69      private final IDiskCacheAttributes diskCacheAttributes;
70  
71      /**
72       * Map where elements are stored between being added to this cache and actually spooled to disk.
73       * This allows puts to the disk cache to return quickly, and the more expensive operation of
74       * serializing the elements to persistent storage queued for later.
75       *
76       * If the elements are pulled into the memory cache while the are still in purgatory, writing to
77       * disk can be canceled.
78       */
79      private Map<K, PurgatoryElement<K, V>> purgatory;
80  
81      /**
82       * The CacheEventQueue where changes will be queued for asynchronous updating of the persistent
83       * storage.
84       */
85      private final ICacheEventQueue<K, V> cacheEventQueue;
86  
87      /**
88       * Indicates whether the cache is 'alive': initialized, but not yet disposed. Child classes must
89       * set this to true.
90       */
91      private final AtomicBoolean alive = new AtomicBoolean();
92  
93      /** Every cache will have a name, subclasses must set this when they are initialized. */
94      private final String cacheName;
95  
96      /** DEBUG: Keeps a count of the number of purgatory hits for debug messages */
97      private int purgHits;
98  
99      /**
100      * We lock here, so that we cannot get an update after a remove all. an individual removal locks
101      * the item.
102      */
103     private final ReentrantReadWriteLock removeAllLock = new ReentrantReadWriteLock();
104 
105     // ----------------------------------------------------------- constructors
106 
107     /**
108      * Construct the abstract disk cache, create event queues and purgatory. Child classes should
109      * set the alive flag to true after they are initialized.
110      *
111      * @param attr
112      */
113     protected AbstractDiskCache( final IDiskCacheAttributes attr )
114     {
115         this.diskCacheAttributes = attr;
116         this.cacheName = attr.getCacheName();
117 
118         // create queue
119         final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
120         this.cacheEventQueue = fact.createCacheEventQueue(
121                 new MyCacheListener(), CacheInfo.listenerId, cacheName,
122                    diskCacheAttributes.getEventQueuePoolName(),
123                    diskCacheAttributes.getEventQueueType() );
124 
125         // create purgatory
126         initPurgatory();
127     }
128 
129     /**
130      * @return true if the cache is alive
131      */
132     public boolean isAlive()
133     {
134         return alive.get();
135     }
136 
137     /**
138      * @param alive set the alive status
139      */
140     public void setAlive(final boolean alive)
141     {
142         this.alive.set(alive);
143     }
144 
145     /**
146      * Purgatory size of -1 means to use a HashMap with no size limit. Anything greater will use an
147      * LRU map of some sort.
148      *
149      * TODO Currently setting this to 0 will cause nothing to be put to disk, since it will assume
150      *       that if an item is not in purgatory, then it must have been plucked. We should make 0
151      *       work, a way to not use purgatory.
152      */
153     private void initPurgatory()
154     {
155         // we need this so we can stop the updates from happening after a
156         // remove all
157         removeAllLock.writeLock().lock();
158 
159         try
160         {
161             synchronized (this)
162             {
163                 if ( diskCacheAttributes.getMaxPurgatorySize() >= 0 )
164                 {
165                     purgatory = Collections.synchronizedMap(
166                             new LRUMap<>( diskCacheAttributes.getMaxPurgatorySize()));
167                 }
168                 else
169                 {
170                     purgatory = new ConcurrentHashMap<>();
171                 }
172             }
173         }
174         finally
175         {
176             removeAllLock.writeLock().unlock();
177         }
178     }
179 
180     // ------------------------------------------------------- interface ICache
181 
182     /**
183      * Adds the provided element to the cache. Element will be added to purgatory, and then queued
184      * for later writing to the serialized storage mechanism.
185      *
186      * An update results in a put event being created. The put event will call the handlePut method
187      * defined here. The handlePut method calls the implemented doPut on the child.
188      *
189      * @param cacheElement
190      * @throws IOException
191      * @see org.apache.commons.jcs3.engine.behavior.ICache#update
192      */
193     @Override
194     public final void update( final ICacheElement<K, V> cacheElement )
195         throws IOException
196     {
197         log.debug( "Putting element in purgatory, cacheName: {0}, key: {1}",
198                 () -> cacheName, cacheElement::getKey);
199 
200         try
201         {
202             // Wrap the CacheElement in a PurgatoryElement
203             final PurgatoryElement<K, V> pe = new PurgatoryElement<>( cacheElement );
204 
205             // Indicates the element is eligible to be spooled to disk,
206             // this will remain true unless the item is pulled back into
207             // memory.
208             pe.setSpoolable( true );
209 
210             // Add the element to purgatory
211             purgatory.put( pe.getKey(), pe );
212 
213             // Queue element for serialization
214             cacheEventQueue.addPutEvent( pe );
215         }
216         catch ( final IOException ex )
217         {
218             log.error( "Problem adding put event to queue.", ex );
219             cacheEventQueue.destroy();
220         }
221     }
222 
223     /**
224      * Check to see if the item is in purgatory. If so, return it. If not, check to see if we have
225      * it on disk.
226      *
227      * @param key
228      * @return ICacheElement&lt;K, V&gt; or null
229      * @see AuxiliaryCache#get
230      */
231     @Override
232     public final ICacheElement<K, V> get( final K key )
233     {
234         // If not alive, always return null.
235         if (!alive.get())
236         {
237             log.debug( "get was called, but the disk cache is not alive." );
238             return null;
239         }
240 
241         PurgatoryElement<K, V> pe = purgatory.get( key );
242 
243         // If the element was found in purgatory
244         if ( pe != null )
245         {
246             purgHits++;
247 
248             if ( purgHits % 100 == 0 )
249             {
250                 log.debug( "Purgatory hits = {0}", purgHits );
251             }
252 
253             // Since the element will go back to the memory cache, we could set
254             // spoolable to false, which will prevent the queue listener from
255             // serializing the element. This would not match the disk cache
256             // behavior and the behavior of other auxiliaries. Gets never remove
257             // items from auxiliaries.
258             // Beyond consistency, the items should stay in purgatory and get
259             // spooled since the mem cache may be set to 0. If an item is
260             // active, it will keep getting put into purgatory and removed. The
261             // CompositeCache now does not put an item to memory from disk if
262             // the size is 0.
263             // Do not set spoolable to false. Just let it go to disk. This
264             // will allow the memory size = 0 setting to work well.
265 
266             log.debug( "Found element in purgatory, cacheName: {0}, key: {1}",
267                     cacheName, key );
268 
269             return pe.getCacheElement();
270         }
271 
272         // If we reach this point, element was not found in purgatory, so get
273         // it from the cache.
274         try
275         {
276             return doGet( key );
277         }
278         catch (final IOException e)
279         {
280             log.error( e );
281             cacheEventQueue.destroy();
282         }
283 
284         return null;
285     }
286 
287     /**
288      * Gets items from the cache matching the given pattern. Items from memory will replace those
289      * from remote sources.
290      *
291      * This only works with string keys. It's too expensive to do a toString on every key.
292      *
293      * Auxiliaries will do their best to handle simple expressions. For instance, the JDBC disk
294      * cache will convert * to % and . to _
295      *
296      * @param pattern
297      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
298      *         data matching the pattern.
299      * @throws IOException
300      */
301     @Override
302     public Map<K, ICacheElement<K, V>> getMatching( final String pattern )
303         throws IOException
304     {
305         // this avoids locking purgatory, but it uses more memory
306         Set<K> keyArray = new HashSet<>(purgatory.keySet());
307 
308         final Set<K> matchingKeys = getKeyMatcher().getMatchingKeysFromArray(pattern, keyArray);
309 
310         // call getMultiple with the set
311         final Map<K, ICacheElement<K, V>> result = processGetMultiple( matchingKeys );
312 
313         // Get the keys from disk
314         final Map<K, ICacheElement<K, V>> diskMatches = doGetMatching( pattern );
315 
316         result.putAll( diskMatches );
317 
318         return result;
319     }
320 
321     /**
322      * The keys in the cache.
323      *
324      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
325      */
326     @Override
327     public abstract Set<K> getKeySet() throws IOException;
328 
329     /**
330      * Removes are not queued. A call to remove is immediate.
331      *
332      * @param key
333      * @return whether the item was present to be removed.
334      * @throws IOException
335      * @see org.apache.commons.jcs3.engine.behavior.ICache#remove
336      */
337     @Override
338     public final boolean remove( final K key )
339         throws IOException
340     {
341         // I'm getting the object, so I can lock on the element
342         // Remove element from purgatory if it is there
343         PurgatoryElement<K, V> pe = purgatory.remove( key );
344         boolean present;
345 
346         if ( pe != null )
347         {
348             synchronized ( pe.getCacheElement() )
349             {
350                 // no way to remove from queue, just make sure it doesn't get on
351                 // disk and then removed right afterwards
352                 pe.setSpoolable( false );
353 
354                 // Remove from persistent store immediately
355                 present = doRemove( key );
356             }
357         }
358         else
359         {
360             // Remove from persistent store immediately
361             present = doRemove( key );
362         }
363 
364         return present;
365     }
366 
367     /**
368      * @throws IOException
369      * @see org.apache.commons.jcs3.engine.behavior.ICache#removeAll
370      */
371     @Override
372     public final void removeAll()
373         throws IOException
374     {
375         if ( this.diskCacheAttributes.isAllowRemoveAll() )
376         {
377             // Replace purgatory with a new empty hashtable
378             initPurgatory();
379 
380             // Remove all from persistent store immediately
381             doRemoveAll();
382         }
383         else
384         {
385             log.info( "RemoveAll was requested but the request was not "
386                     + "fulfilled: allowRemoveAll is set to false." );
387         }
388     }
389 
390     /**
391      * Adds a dispose request to the disk cache.
392      *
393      * Disposal proceeds in several steps.
394      * <ol>
395      * <li>Prior to this call the Composite cache dumped the memory into the disk cache. If it is
396      * large then we need to wait for the event queue to finish.</li>
397      * <li>Wait until the event queue is empty of until the configured ShutdownSpoolTimeLimit is
398      * reached.</li>
399      * <li>Call doDispose on the concrete impl.</li>
400      * </ol>
401      * @throws IOException
402      */
403     @Override
404     public final void dispose()
405         throws IOException
406     {
407         // wait up to 60 seconds for dispose and then quit if not done.
408         long shutdownSpoolTime = this.diskCacheAttributes.getShutdownSpoolTimeLimit() * 1000L;
409 
410         while (!cacheEventQueue.isEmpty() && shutdownSpoolTime > 0)
411         {
412             try
413             {
414                 Thread.sleep(100);
415                 shutdownSpoolTime -= 100;
416             }
417             catch ( final InterruptedException e )
418             {
419                 break;
420             }
421         }
422 
423         if (shutdownSpoolTime <= 0)
424         {
425             log.info( "No longer waiting for event queue to finish: {0}",
426                     cacheEventQueue::getStatistics);
427         }
428 
429         log.info( "In dispose, destroying event queue." );
430         // This stops the processor thread.
431         cacheEventQueue.destroy();
432 
433         // Invoke any implementation specific disposal code
434         // need to handle the disposal first.
435         doDispose();
436 
437         alive.set(false);
438     }
439 
440     /**
441      * @return the region name.
442      * @see ICache#getCacheName
443      */
444     @Override
445     public String getCacheName()
446     {
447         return cacheName;
448     }
449 
450     /**
451      * Gets basic stats for the abstract disk cache.
452      *
453      * @return String
454      */
455     @Override
456     public String getStats()
457     {
458         return getStatistics().toString();
459     }
460 
461     /**
462      * Returns semi-structured data.
463      *
464      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getStatistics()
465      */
466     @Override
467     public IStats getStatistics()
468     {
469         final IStats stats = new Stats();
470         stats.setTypeName( "Abstract Disk Cache" );
471 
472         final ArrayList<IStatElement<?>> elems = new ArrayList<>();
473 
474         elems.add(new StatElement<>( "Purgatory Hits", Integer.valueOf(purgHits) ) );
475         elems.add(new StatElement<>( "Purgatory Size", Integer.valueOf(purgatory.size()) ) );
476 
477         // get the stats from the event queue too
478         final IStats eqStats = this.cacheEventQueue.getStatistics();
479         elems.addAll(eqStats.getStatElements());
480 
481         stats.setStatElements( elems );
482 
483         return stats;
484     }
485 
486     /**
487      * @return the status -- alive or disposed from CacheConstants
488      * @see ICache#getStatus
489      */
490     @Override
491     public CacheStatus getStatus()
492     {
493         return alive.get() ? CacheStatus.ALIVE : CacheStatus.DISPOSED;
494     }
495 
496     /**
497      * Size cannot be determined without knowledge of the cache implementation, so subclasses will
498      * need to implement this method.
499      *
500      * @return the number of items.
501      * @see ICache#getSize
502      */
503     @Override
504     public abstract int getSize();
505 
506     /**
507      * @see org.apache.commons.jcs3.engine.behavior.ICacheType#getCacheType
508      * @return Always returns DISK_CACHE since subclasses should all be of that type.
509      */
510     @Override
511     public CacheType getCacheType()
512     {
513         return CacheType.DISK_CACHE;
514     }
515 
516     /**
517      * Cache that implements the CacheListener interface, and calls appropriate methods in its
518      * parent class.
519      */
520     protected class MyCacheListener
521         implements ICacheListener<K, V>
522     {
523         /** Id of the listener */
524         private long listenerId;
525 
526         /**
527          * @return cacheElement.getElementAttributes();
528          * @throws IOException
529          * @see ICacheListener#getListenerId
530          */
531         @Override
532         public long getListenerId()
533             throws IOException
534         {
535             return this.listenerId;
536         }
537 
538         /**
539          * @param id
540          * @throws IOException
541          * @see ICacheListener#setListenerId
542          */
543         @Override
544         public void setListenerId( final long id )
545             throws IOException
546         {
547             this.listenerId = id;
548         }
549 
550         /**
551          * @param element
552          * @throws IOException
553          * @see ICacheListener#handlePut NOTE: This checks if the element is a puratory element and
554          *      behaves differently depending. However since we have control over how elements are
555          *      added to the cache event queue, that may not be needed ( they are always
556          *      PurgatoryElements ).
557          */
558         @Override
559         public void handlePut( ICacheElement<K, V> element )
560             throws IOException
561         {
562             if (alive.get())
563             {
564                 // If the element is a PurgatoryElement<K, V> we must check to see
565                 // if it is still spoolable, and remove it from purgatory.
566                 if ( element instanceof PurgatoryElement )
567                 {
568                     final PurgatoryElement<K, V> pe = (PurgatoryElement<K, V>) element;
569 
570                     synchronized ( pe.getCacheElement() )
571                     {
572                         // TODO consider a timeout.
573                         // we need this so that we can have multiple update
574                         // threads and still have removeAll requests come in that
575                         // always win
576                         removeAllLock.readLock().lock();
577 
578                         try
579                         {
580                             // If the element has already been removed from
581                             // purgatory do nothing
582                             if (!purgatory.containsKey(pe.getKey()))
583                             {
584                                 return;
585                             }
586 
587                             element = pe.getCacheElement();
588 
589                             // If the element is still eligible, spool it.
590                             if ( pe.isSpoolable() )
591                             {
592                                 doUpdate( element );
593                             }
594                         }
595                         finally
596                         {
597                             removeAllLock.readLock().unlock();
598                         }
599 
600                         // After the update has completed, it is safe to
601                         // remove the element from purgatory.
602                         purgatory.remove( element.getKey() );
603                     }
604                 }
605                 else
606                 {
607                     // call the child's implementation
608                     doUpdate( element );
609                 }
610             }
611             else
612             {
613                 /*
614                  * The cache is not alive, hence the element should be removed from purgatory. All
615                  * elements should be removed eventually. Perhaps, the alive check should have been
616                  * done before it went in the queue. This block handles the case where the disk
617                  * cache fails during normal operations.
618                  */
619                 purgatory.remove( element.getKey() );
620             }
621         }
622 
623         /**
624          * @param cacheName
625          * @param key
626          * @throws IOException
627          * @see ICacheListener#handleRemove
628          */
629         @Override
630         public void handleRemove( final String cacheName, final K key )
631             throws IOException
632         {
633             if (alive.get() && doRemove( key ) )
634             {
635                 log.debug( "Element removed, key: " + key );
636             }
637         }
638 
639         /**
640          * @param cacheName
641          * @throws IOException
642          * @see ICacheListener#handleRemoveAll
643          */
644         @Override
645         public void handleRemoveAll( final String cacheName )
646             throws IOException
647         {
648             if (alive.get())
649             {
650                 doRemoveAll();
651             }
652         }
653 
654         /**
655          * @param cacheName
656          * @throws IOException
657          * @see ICacheListener#handleDispose
658          */
659         @Override
660         public void handleDispose( final String cacheName )
661             throws IOException
662         {
663             if (alive.get())
664             {
665                 doDispose();
666             }
667         }
668     }
669 
670     /**
671      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
672      * methods call the *WithEventLogging method on the super. The *WithEventLogging methods call
673      * the abstract process* methods. The children implement the process methods.
674      *
675      * ex. doGet calls getWithEventLogging, which calls processGet
676      */
677 
678     /**
679      * Get a value from the persistent store.
680      *
681      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
682      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
683      * abstract process* methods. The children implement the process methods.
684      *
685      * @param key Key to locate value for.
686      * @return An object matching key, or null.
687      * @throws IOException
688      */
689     protected final ICacheElement<K, V> doGet( final K key )
690         throws IOException
691     {
692         return super.getWithEventLogging( key );
693     }
694 
695     /**
696      * Get a value from the persistent store.
697      *
698      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
699      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
700      * abstract process* methods. The children implement the process methods.
701      *
702      * @param pattern Used to match keys.
703      * @return A map of matches..
704      * @throws IOException
705      */
706     protected final Map<K, ICacheElement<K, V>> doGetMatching( final String pattern )
707         throws IOException
708     {
709         return super.getMatchingWithEventLogging( pattern );
710     }
711 
712     /**
713      * Add a cache element to the persistent store.
714      *
715      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
716      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
717      * abstract process* methods. The children implement the process methods.
718      *
719      * @param cacheElement
720      * @throws IOException
721      */
722     protected final void doUpdate( final ICacheElement<K, V> cacheElement )
723         throws IOException
724     {
725         super.updateWithEventLogging( cacheElement );
726     }
727 
728     /**
729      * Remove an object from the persistent store if found.
730      *
731      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
732      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
733      * abstract process* methods. The children implement the process methods.
734      *
735      * @param key Key of object to remove.
736      * @return whether or no the item was present when removed
737      * @throws IOException
738      */
739     protected final boolean doRemove( final K key )
740         throws IOException
741     {
742         return super.removeWithEventLogging( key );
743     }
744 
745     /**
746      * Remove all objects from the persistent store.
747      *
748      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
749      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
750      * abstract process* methods. The children implement the process methods.
751      *
752      * @throws IOException
753      */
754     protected final void doRemoveAll()
755         throws IOException
756     {
757         super.removeAllWithEventLogging();
758     }
759 
760     /**
761      * Dispose of the persistent store. Note that disposal of purgatory and setting alive to false
762      * does NOT need to be done by this method.
763      *
764      * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
765      * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
766      * abstract process* methods. The children implement the process methods.
767      *
768      * @throws IOException
769      */
770     protected final void doDispose()
771         throws IOException
772     {
773         super.disposeWithEventLogging();
774     }
775 
776     /**
777      * Gets the extra info for the event log.
778      *
779      * @return disk location
780      */
781     @Override
782     public String getEventLoggingExtraInfo()
783     {
784         return getDiskLocation();
785     }
786 
787     /**
788      * This is used by the event logging.
789      *
790      * @return the location of the disk, either path or ip.
791      */
792     protected abstract String getDiskLocation();
793 }