001package org.apache.commons.jcs3.auxiliary.disk;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging;
033import org.apache.commons.jcs3.auxiliary.AuxiliaryCache;
034import org.apache.commons.jcs3.auxiliary.disk.behavior.IDiskCacheAttributes;
035import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
036import org.apache.commons.jcs3.engine.CacheInfo;
037import org.apache.commons.jcs3.engine.CacheStatus;
038import org.apache.commons.jcs3.engine.behavior.ICache;
039import org.apache.commons.jcs3.engine.behavior.ICacheElement;
040import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
041import org.apache.commons.jcs3.engine.behavior.ICacheListener;
042import org.apache.commons.jcs3.engine.stats.StatElement;
043import org.apache.commons.jcs3.engine.stats.Stats;
044import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
045import org.apache.commons.jcs3.engine.stats.behavior.IStats;
046import org.apache.commons.jcs3.log.Log;
047import org.apache.commons.jcs3.log.LogManager;
048import org.apache.commons.jcs3.utils.struct.LRUMap;
049
050/**
051 * Abstract class providing a base implementation of a disk cache, which can be easily extended to
052 * implement a disk cache for a specific persistence mechanism.
053 *
054 * When implementing the abstract methods note that while this base class handles most things, it
055 * does not acquire or release any locks. Implementations should do so as necessary. This is mainly
056 * done to minimize the time spent in critical sections.
057 *
058 * Error handling in this class needs to be addressed. Currently if an exception is thrown by the
059 * persistence mechanism, this class destroys the event queue. Should it also destroy purgatory?
060 * Should it dispose itself?
061 */
062public abstract class AbstractDiskCache<K, V>
063    extends AbstractAuxiliaryCacheEventLogging<K, V>
064{
065    /** The logger */
066    private static final Log log = LogManager.getLog( AbstractDiskCache.class );
067
068    /** Generic disk cache attributes */
069    private final IDiskCacheAttributes diskCacheAttributes;
070
071    /**
072     * Map where elements are stored between being added to this cache and actually spooled to disk.
073     * This allows puts to the disk cache to return quickly, and the more expensive operation of
074     * serializing the elements to persistent storage queued for later.
075     *
076     * If the elements are pulled into the memory cache while the are still in purgatory, writing to
077     * disk can be canceled.
078     */
079    private Map<K, PurgatoryElement<K, V>> purgatory;
080
081    /**
082     * The CacheEventQueue where changes will be queued for asynchronous updating of the persistent
083     * storage.
084     */
085    private final ICacheEventQueue<K, V> cacheEventQueue;
086
087    /**
088     * Indicates whether the cache is 'alive': initialized, but not yet disposed. Child classes must
089     * set this to true.
090     */
091    private final AtomicBoolean alive = new AtomicBoolean();
092
093    /** Every cache will have a name, subclasses must set this when they are initialized. */
094    private final String cacheName;
095
096    /** DEBUG: Keeps a count of the number of purgatory hits for debug messages */
097    private int purgHits;
098
099    /**
100     * We lock here, so that we cannot get an update after a remove all. an individual removal locks
101     * the item.
102     */
103    private final ReentrantReadWriteLock removeAllLock = new ReentrantReadWriteLock();
104
105    // ----------------------------------------------------------- constructors
106
107    /**
108     * Construct the abstract disk cache, create event queues and purgatory. Child classes should
109     * set the alive flag to true after they are initialized.
110     *
111     * @param attr
112     */
113    protected AbstractDiskCache( final IDiskCacheAttributes attr )
114    {
115        this.diskCacheAttributes = attr;
116        this.cacheName = attr.getCacheName();
117
118        // create queue
119        final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
120        this.cacheEventQueue = fact.createCacheEventQueue(
121                new MyCacheListener(), CacheInfo.listenerId, cacheName,
122                   diskCacheAttributes.getEventQueuePoolName(),
123                   diskCacheAttributes.getEventQueueType() );
124
125        // create purgatory
126        initPurgatory();
127    }
128
129    /**
130     * @return true if the cache is alive
131     */
132    public boolean isAlive()
133    {
134        return alive.get();
135    }
136
137    /**
138     * @param alive set the alive status
139     */
140    public void setAlive(final boolean alive)
141    {
142        this.alive.set(alive);
143    }
144
145    /**
146     * Purgatory size of -1 means to use a HashMap with no size limit. Anything greater will use an
147     * LRU map of some sort.
148     *
149     * TODO Currently setting this to 0 will cause nothing to be put to disk, since it will assume
150     *       that if an item is not in purgatory, then it must have been plucked. We should make 0
151     *       work, a way to not use purgatory.
152     */
153    private void initPurgatory()
154    {
155        // we need this so we can stop the updates from happening after a
156        // remove all
157        removeAllLock.writeLock().lock();
158
159        try
160        {
161            synchronized (this)
162            {
163                if ( diskCacheAttributes.getMaxPurgatorySize() >= 0 )
164                {
165                    purgatory = Collections.synchronizedMap(
166                            new LRUMap<>( diskCacheAttributes.getMaxPurgatorySize()));
167                }
168                else
169                {
170                    purgatory = new ConcurrentHashMap<>();
171                }
172            }
173        }
174        finally
175        {
176            removeAllLock.writeLock().unlock();
177        }
178    }
179
180    // ------------------------------------------------------- interface ICache
181
182    /**
183     * Adds the provided element to the cache. Element will be added to purgatory, and then queued
184     * for later writing to the serialized storage mechanism.
185     *
186     * An update results in a put event being created. The put event will call the handlePut method
187     * defined here. The handlePut method calls the implemented doPut on the child.
188     *
189     * @param cacheElement
190     * @throws IOException
191     * @see org.apache.commons.jcs3.engine.behavior.ICache#update
192     */
193    @Override
194    public final void update( final ICacheElement<K, V> cacheElement )
195        throws IOException
196    {
197        log.debug( "Putting element in purgatory, cacheName: {0}, key: {1}",
198                () -> cacheName, cacheElement::getKey);
199
200        try
201        {
202            // Wrap the CacheElement in a PurgatoryElement
203            final PurgatoryElement<K, V> pe = new PurgatoryElement<>( cacheElement );
204
205            // Indicates the element is eligible to be spooled to disk,
206            // this will remain true unless the item is pulled back into
207            // memory.
208            pe.setSpoolable( true );
209
210            // Add the element to purgatory
211            purgatory.put( pe.getKey(), pe );
212
213            // Queue element for serialization
214            cacheEventQueue.addPutEvent( pe );
215        }
216        catch ( final IOException ex )
217        {
218            log.error( "Problem adding put event to queue.", ex );
219            cacheEventQueue.destroy();
220        }
221    }
222
223    /**
224     * Check to see if the item is in purgatory. If so, return it. If not, check to see if we have
225     * it on disk.
226     *
227     * @param key
228     * @return ICacheElement&lt;K, V&gt; or null
229     * @see AuxiliaryCache#get
230     */
231    @Override
232    public final ICacheElement<K, V> get( final K key )
233    {
234        // If not alive, always return null.
235        if (!alive.get())
236        {
237            log.debug( "get was called, but the disk cache is not alive." );
238            return null;
239        }
240
241        PurgatoryElement<K, V> pe = purgatory.get( key );
242
243        // If the element was found in purgatory
244        if ( pe != null )
245        {
246            purgHits++;
247
248            if ( purgHits % 100 == 0 )
249            {
250                log.debug( "Purgatory hits = {0}", purgHits );
251            }
252
253            // Since the element will go back to the memory cache, we could set
254            // spoolable to false, which will prevent the queue listener from
255            // serializing the element. This would not match the disk cache
256            // behavior and the behavior of other auxiliaries. Gets never remove
257            // items from auxiliaries.
258            // Beyond consistency, the items should stay in purgatory and get
259            // spooled since the mem cache may be set to 0. If an item is
260            // active, it will keep getting put into purgatory and removed. The
261            // CompositeCache now does not put an item to memory from disk if
262            // the size is 0.
263            // Do not set spoolable to false. Just let it go to disk. This
264            // will allow the memory size = 0 setting to work well.
265
266            log.debug( "Found element in purgatory, cacheName: {0}, key: {1}",
267                    cacheName, key );
268
269            return pe.getCacheElement();
270        }
271
272        // If we reach this point, element was not found in purgatory, so get
273        // it from the cache.
274        try
275        {
276            return doGet( key );
277        }
278        catch (final IOException e)
279        {
280            log.error( e );
281            cacheEventQueue.destroy();
282        }
283
284        return null;
285    }
286
287    /**
288     * Gets items from the cache matching the given pattern. Items from memory will replace those
289     * from remote sources.
290     *
291     * This only works with string keys. It's too expensive to do a toString on every key.
292     *
293     * Auxiliaries will do their best to handle simple expressions. For instance, the JDBC disk
294     * cache will convert * to % and . to _
295     *
296     * @param pattern
297     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
298     *         data matching the pattern.
299     * @throws IOException
300     */
301    @Override
302    public Map<K, ICacheElement<K, V>> getMatching( final String pattern )
303        throws IOException
304    {
305        // this avoids locking purgatory, but it uses more memory
306        Set<K> keyArray = new HashSet<>(purgatory.keySet());
307
308        final Set<K> matchingKeys = getKeyMatcher().getMatchingKeysFromArray(pattern, keyArray);
309
310        // call getMultiple with the set
311        final Map<K, ICacheElement<K, V>> result = processGetMultiple( matchingKeys );
312
313        // Get the keys from disk
314        final Map<K, ICacheElement<K, V>> diskMatches = doGetMatching( pattern );
315
316        result.putAll( diskMatches );
317
318        return result;
319    }
320
321    /**
322     * The keys in the cache.
323     *
324     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
325     */
326    @Override
327    public abstract Set<K> getKeySet() throws IOException;
328
329    /**
330     * Removes are not queued. A call to remove is immediate.
331     *
332     * @param key
333     * @return whether the item was present to be removed.
334     * @throws IOException
335     * @see org.apache.commons.jcs3.engine.behavior.ICache#remove
336     */
337    @Override
338    public final boolean remove( final K key )
339        throws IOException
340    {
341        // I'm getting the object, so I can lock on the element
342        // Remove element from purgatory if it is there
343        PurgatoryElement<K, V> pe = purgatory.remove( key );
344        boolean present;
345
346        if ( pe != null )
347        {
348            synchronized ( pe.getCacheElement() )
349            {
350                // no way to remove from queue, just make sure it doesn't get on
351                // disk and then removed right afterwards
352                pe.setSpoolable( false );
353
354                // Remove from persistent store immediately
355                present = doRemove( key );
356            }
357        }
358        else
359        {
360            // Remove from persistent store immediately
361            present = doRemove( key );
362        }
363
364        return present;
365    }
366
367    /**
368     * @throws IOException
369     * @see org.apache.commons.jcs3.engine.behavior.ICache#removeAll
370     */
371    @Override
372    public final void removeAll()
373        throws IOException
374    {
375        if ( this.diskCacheAttributes.isAllowRemoveAll() )
376        {
377            // Replace purgatory with a new empty hashtable
378            initPurgatory();
379
380            // Remove all from persistent store immediately
381            doRemoveAll();
382        }
383        else
384        {
385            log.info( "RemoveAll was requested but the request was not "
386                    + "fulfilled: allowRemoveAll is set to false." );
387        }
388    }
389
390    /**
391     * Adds a dispose request to the disk cache.
392     *
393     * Disposal proceeds in several steps.
394     * <ol>
395     * <li>Prior to this call the Composite cache dumped the memory into the disk cache. If it is
396     * large then we need to wait for the event queue to finish.</li>
397     * <li>Wait until the event queue is empty of until the configured ShutdownSpoolTimeLimit is
398     * reached.</li>
399     * <li>Call doDispose on the concrete impl.</li>
400     * </ol>
401     * @throws IOException
402     */
403    @Override
404    public final void dispose()
405        throws IOException
406    {
407        // wait up to 60 seconds for dispose and then quit if not done.
408        long shutdownSpoolTime = this.diskCacheAttributes.getShutdownSpoolTimeLimit() * 1000L;
409
410        while (!cacheEventQueue.isEmpty() && shutdownSpoolTime > 0)
411        {
412            try
413            {
414                Thread.sleep(100);
415                shutdownSpoolTime -= 100;
416            }
417            catch ( final InterruptedException e )
418            {
419                break;
420            }
421        }
422
423        if (shutdownSpoolTime <= 0)
424        {
425            log.info( "No longer waiting for event queue to finish: {0}",
426                    cacheEventQueue::getStatistics);
427        }
428
429        log.info( "In dispose, destroying event queue." );
430        // This stops the processor thread.
431        cacheEventQueue.destroy();
432
433        // Invoke any implementation specific disposal code
434        // need to handle the disposal first.
435        doDispose();
436
437        alive.set(false);
438    }
439
440    /**
441     * @return the region name.
442     * @see ICache#getCacheName
443     */
444    @Override
445    public String getCacheName()
446    {
447        return cacheName;
448    }
449
450    /**
451     * Gets basic stats for the abstract disk cache.
452     *
453     * @return String
454     */
455    @Override
456    public String getStats()
457    {
458        return getStatistics().toString();
459    }
460
461    /**
462     * Returns semi-structured data.
463     *
464     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getStatistics()
465     */
466    @Override
467    public IStats getStatistics()
468    {
469        final IStats stats = new Stats();
470        stats.setTypeName( "Abstract Disk Cache" );
471
472        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
473
474        elems.add(new StatElement<>( "Purgatory Hits", Integer.valueOf(purgHits) ) );
475        elems.add(new StatElement<>( "Purgatory Size", Integer.valueOf(purgatory.size()) ) );
476
477        // get the stats from the event queue too
478        final IStats eqStats = this.cacheEventQueue.getStatistics();
479        elems.addAll(eqStats.getStatElements());
480
481        stats.setStatElements( elems );
482
483        return stats;
484    }
485
486    /**
487     * @return the status -- alive or disposed from CacheConstants
488     * @see ICache#getStatus
489     */
490    @Override
491    public CacheStatus getStatus()
492    {
493        return alive.get() ? CacheStatus.ALIVE : CacheStatus.DISPOSED;
494    }
495
496    /**
497     * Size cannot be determined without knowledge of the cache implementation, so subclasses will
498     * need to implement this method.
499     *
500     * @return the number of items.
501     * @see ICache#getSize
502     */
503    @Override
504    public abstract int getSize();
505
506    /**
507     * @see org.apache.commons.jcs3.engine.behavior.ICacheType#getCacheType
508     * @return Always returns DISK_CACHE since subclasses should all be of that type.
509     */
510    @Override
511    public CacheType getCacheType()
512    {
513        return CacheType.DISK_CACHE;
514    }
515
516    /**
517     * Cache that implements the CacheListener interface, and calls appropriate methods in its
518     * parent class.
519     */
520    protected class MyCacheListener
521        implements ICacheListener<K, V>
522    {
523        /** Id of the listener */
524        private long listenerId;
525
526        /**
527         * @return cacheElement.getElementAttributes();
528         * @throws IOException
529         * @see ICacheListener#getListenerId
530         */
531        @Override
532        public long getListenerId()
533            throws IOException
534        {
535            return this.listenerId;
536        }
537
538        /**
539         * @param id
540         * @throws IOException
541         * @see ICacheListener#setListenerId
542         */
543        @Override
544        public void setListenerId( final long id )
545            throws IOException
546        {
547            this.listenerId = id;
548        }
549
550        /**
551         * @param element
552         * @throws IOException
553         * @see ICacheListener#handlePut NOTE: This checks if the element is a puratory element and
554         *      behaves differently depending. However since we have control over how elements are
555         *      added to the cache event queue, that may not be needed ( they are always
556         *      PurgatoryElements ).
557         */
558        @Override
559        public void handlePut( ICacheElement<K, V> element )
560            throws IOException
561        {
562            if (alive.get())
563            {
564                // If the element is a PurgatoryElement<K, V> we must check to see
565                // if it is still spoolable, and remove it from purgatory.
566                if ( element instanceof PurgatoryElement )
567                {
568                    final PurgatoryElement<K, V> pe = (PurgatoryElement<K, V>) element;
569
570                    synchronized ( pe.getCacheElement() )
571                    {
572                        // TODO consider a timeout.
573                        // we need this so that we can have multiple update
574                        // threads and still have removeAll requests come in that
575                        // always win
576                        removeAllLock.readLock().lock();
577
578                        try
579                        {
580                            // If the element has already been removed from
581                            // purgatory do nothing
582                            if (!purgatory.containsKey(pe.getKey()))
583                            {
584                                return;
585                            }
586
587                            element = pe.getCacheElement();
588
589                            // If the element is still eligible, spool it.
590                            if ( pe.isSpoolable() )
591                            {
592                                doUpdate( element );
593                            }
594                        }
595                        finally
596                        {
597                            removeAllLock.readLock().unlock();
598                        }
599
600                        // After the update has completed, it is safe to
601                        // remove the element from purgatory.
602                        purgatory.remove( element.getKey() );
603                    }
604                }
605                else
606                {
607                    // call the child's implementation
608                    doUpdate( element );
609                }
610            }
611            else
612            {
613                /*
614                 * The cache is not alive, hence the element should be removed from purgatory. All
615                 * elements should be removed eventually. Perhaps, the alive check should have been
616                 * done before it went in the queue. This block handles the case where the disk
617                 * cache fails during normal operations.
618                 */
619                purgatory.remove( element.getKey() );
620            }
621        }
622
623        /**
624         * @param cacheName
625         * @param key
626         * @throws IOException
627         * @see ICacheListener#handleRemove
628         */
629        @Override
630        public void handleRemove( final String cacheName, final K key )
631            throws IOException
632        {
633            if (alive.get() && doRemove( key ) )
634            {
635                log.debug( "Element removed, key: " + key );
636            }
637        }
638
639        /**
640         * @param cacheName
641         * @throws IOException
642         * @see ICacheListener#handleRemoveAll
643         */
644        @Override
645        public void handleRemoveAll( final String cacheName )
646            throws IOException
647        {
648            if (alive.get())
649            {
650                doRemoveAll();
651            }
652        }
653
654        /**
655         * @param cacheName
656         * @throws IOException
657         * @see ICacheListener#handleDispose
658         */
659        @Override
660        public void handleDispose( final String cacheName )
661            throws IOException
662        {
663            if (alive.get())
664            {
665                doDispose();
666            }
667        }
668    }
669
670    /**
671     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
672     * methods call the *WithEventLogging method on the super. The *WithEventLogging methods call
673     * the abstract process* methods. The children implement the process methods.
674     *
675     * ex. doGet calls getWithEventLogging, which calls processGet
676     */
677
678    /**
679     * Get a value from the persistent store.
680     *
681     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
682     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
683     * abstract process* methods. The children implement the process methods.
684     *
685     * @param key Key to locate value for.
686     * @return An object matching key, or null.
687     * @throws IOException
688     */
689    protected final ICacheElement<K, V> doGet( final K key )
690        throws IOException
691    {
692        return super.getWithEventLogging( key );
693    }
694
695    /**
696     * Get a value from the persistent store.
697     *
698     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
699     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
700     * abstract process* methods. The children implement the process methods.
701     *
702     * @param pattern Used to match keys.
703     * @return A map of matches..
704     * @throws IOException
705     */
706    protected final Map<K, ICacheElement<K, V>> doGetMatching( final String pattern )
707        throws IOException
708    {
709        return super.getMatchingWithEventLogging( pattern );
710    }
711
712    /**
713     * Add a cache element to the persistent store.
714     *
715     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
716     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
717     * abstract process* methods. The children implement the process methods.
718     *
719     * @param cacheElement
720     * @throws IOException
721     */
722    protected final void doUpdate( final ICacheElement<K, V> cacheElement )
723        throws IOException
724    {
725        super.updateWithEventLogging( cacheElement );
726    }
727
728    /**
729     * Remove an object from the persistent store if found.
730     *
731     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
732     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
733     * abstract process* methods. The children implement the process methods.
734     *
735     * @param key Key of object to remove.
736     * @return whether or no the item was present when removed
737     * @throws IOException
738     */
739    protected final boolean doRemove( final K key )
740        throws IOException
741    {
742        return super.removeWithEventLogging( key );
743    }
744
745    /**
746     * Remove all objects from the persistent store.
747     *
748     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
749     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
750     * abstract process* methods. The children implement the process methods.
751     *
752     * @throws IOException
753     */
754    protected final void doRemoveAll()
755        throws IOException
756    {
757        super.removeAllWithEventLogging();
758    }
759
760    /**
761     * Dispose of the persistent store. Note that disposal of purgatory and setting alive to false
762     * does NOT need to be done by this method.
763     *
764     * Before the event logging layer, the subclasses implemented the do* methods. Now the do*
765     * methods call the *EventLogging method on the super. The *WithEventLogging methods call the
766     * abstract process* methods. The children implement the process methods.
767     *
768     * @throws IOException
769     */
770    protected final void doDispose()
771        throws IOException
772    {
773        super.disposeWithEventLogging();
774    }
775
776    /**
777     * Gets the extra info for the event log.
778     *
779     * @return disk location
780     */
781    @Override
782    public String getEventLoggingExtraInfo()
783    {
784        return getDiskLocation();
785    }
786
787    /**
788     * This is used by the event logging.
789     *
790     * @return the location of the disk, either path or ip.
791     */
792    protected abstract String getDiskLocation();
793}