001package org.apache.commons.jcs3.auxiliary.lateral;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.rmi.UnmarshalException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.Set;
030import java.util.stream.Collectors;
031
032import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCache;
033import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheAttributes;
034import org.apache.commons.jcs3.engine.CacheAdaptor;
035import org.apache.commons.jcs3.engine.CacheEventQueueFactory;
036import org.apache.commons.jcs3.engine.CacheInfo;
037import org.apache.commons.jcs3.engine.CacheStatus;
038import org.apache.commons.jcs3.engine.behavior.ICacheElement;
039import org.apache.commons.jcs3.engine.behavior.ICacheEventQueue;
040import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
041import org.apache.commons.jcs3.engine.stats.StatElement;
042import org.apache.commons.jcs3.engine.stats.Stats;
043import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
044import org.apache.commons.jcs3.engine.stats.behavior.IStats;
045import org.apache.commons.jcs3.log.Log;
046import org.apache.commons.jcs3.log.LogManager;
047
048/**
049 * Used to queue up update requests to the underlying cache. These requests will be processed in
050 * their order of arrival via the cache event queue processor.
051 */
052public class LateralCacheNoWait<K, V>
053    extends AbstractAuxiliaryCache<K, V>
054{
055    /** The logger. */
056    private static final Log log = LogManager.getLog( LateralCacheNoWait.class );
057
058    /** The cache */
059    private final LateralCache<K, V> cache;
060
061    /** Identify this object */
062    private String identityKey;
063
064    /** The event queue */
065    private ICacheEventQueue<K, V> eventQueue;
066
067    /** times get called */
068    private int getCount;
069
070    /** times remove called */
071    private int removeCount;
072
073    /** times put called */
074    private int putCount;
075
076    /**
077     * Constructs with the given lateral cache, and fires up an event queue for asynchronous
078     * processing.
079     * <p>
080     * @param cache
081     */
082    public LateralCacheNoWait( final LateralCache<K, V> cache )
083    {
084        this.cache = cache;
085        this.identityKey = cache.getCacheName();
086        this.setCacheEventLogger(cache.getCacheEventLogger());
087        this.setElementSerializer(cache.getElementSerializer());
088
089        log.debug( "Constructing LateralCacheNoWait, LateralCache = [{0}]", cache );
090
091        final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
092        this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ),
093                CacheInfo.listenerId, cache.getCacheName(),
094                getAuxiliaryCacheAttributes().getEventQueuePoolName(),
095                getAuxiliaryCacheAttributes().getEventQueueType() );
096
097        // need each no wait to handle each of its real updates and removes,
098        // since there may
099        // be more than one per cache? alternative is to have the cache
100        // perform updates using a different method that specifies the listener
101        // this.q = new CacheEventQueue(new CacheAdaptor(this),
102        // LateralCacheInfo.listenerId, cache.getCacheName());
103        if ( cache.getStatus() == CacheStatus.ERROR )
104        {
105            eventQueue.destroy();
106        }
107    }
108
109    /**
110     * The identifying key to this no wait
111     *
112     * @return the identity key
113     * @since 3.1
114     */
115    public String getIdentityKey()
116    {
117        return identityKey;
118    }
119
120    /**
121     * Set the identifying key to this no wait
122     *
123     * @param identityKey the identityKey to set
124     * @since 3.1
125     */
126    public void setIdentityKey(String identityKey)
127    {
128        this.identityKey = identityKey;
129    }
130
131    /**
132     * @param ce
133     * @throws IOException
134     */
135    @Override
136    public void update( final ICacheElement<K, V> ce )
137        throws IOException
138    {
139        putCount++;
140        try
141        {
142            eventQueue.addPutEvent( ce );
143        }
144        catch ( final IOException ex )
145        {
146            log.error( ex );
147            eventQueue.destroy();
148        }
149    }
150
151    /**
152     * Synchronously reads from the lateral cache.
153     * <p>
154     * @param key
155     * @return ICacheElement&lt;K, V&gt; if found, else null
156     */
157    @Override
158    public ICacheElement<K, V> get( final K key )
159    {
160        getCount++;
161        if ( this.getStatus() != CacheStatus.ERROR )
162        {
163            try
164            {
165                return cache.get( key );
166            }
167            catch ( final UnmarshalException ue )
168            {
169                log.debug( "Retrying the get owing to UnmarshalException..." );
170                try
171                {
172                    return cache.get( key );
173                }
174                catch ( final IOException ex )
175                {
176                    log.error( "Failed in retrying the get for the second time." );
177                    eventQueue.destroy();
178                }
179            }
180            catch ( final IOException ex )
181            {
182                eventQueue.destroy();
183            }
184        }
185        return null;
186    }
187
188    /**
189     * Gets multiple items from the cache based on the given set of keys.
190     * <p>
191     * @param keys
192     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
193     *         data in cache for any of these keys
194     */
195    @Override
196    public Map<K, ICacheElement<K, V>> getMultiple(final Set<K> keys)
197    {
198        if ( keys != null && !keys.isEmpty() )
199        {
200            return keys.stream()
201                .collect(Collectors.toMap(
202                        key -> key,
203                        this::get)).entrySet().stream()
204                    .filter(entry -> entry.getValue() != null)
205                    .collect(Collectors.toMap(
206                            Entry::getKey,
207                            Entry::getValue));
208        }
209
210        return new HashMap<>();
211    }
212
213    /**
214     * Synchronously reads from the lateral cache.
215     * <p>
216     * @param pattern
217     * @return ICacheElement&lt;K, V&gt; if found, else empty
218     */
219    @Override
220    public Map<K, ICacheElement<K, V>> getMatching(final String pattern)
221    {
222        getCount++;
223        if ( this.getStatus() != CacheStatus.ERROR )
224        {
225            try
226            {
227                return cache.getMatching( pattern );
228            }
229            catch ( final UnmarshalException ue )
230            {
231                log.debug( "Retrying the get owing to UnmarshalException." );
232                try
233                {
234                    return cache.getMatching( pattern );
235                }
236                catch ( final IOException ex )
237                {
238                    log.error( "Failed in retrying the get for the second time." );
239                    eventQueue.destroy();
240                }
241            }
242            catch ( final IOException ex )
243            {
244                eventQueue.destroy();
245            }
246        }
247        return Collections.emptyMap();
248    }
249
250    /**
251     * Return the keys in this cache.
252     * <p>
253     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
254     */
255    @Override
256    public Set<K> getKeySet() throws IOException
257    {
258        try
259        {
260            return cache.getKeySet();
261        }
262        catch ( final IOException ex )
263        {
264            log.error( ex );
265            eventQueue.destroy();
266        }
267        return Collections.emptySet();
268    }
269
270    /**
271     * Adds a remove request to the lateral cache.
272     * <p>
273     * @param key
274     * @return always false
275     */
276    @Override
277    public boolean remove( final K key )
278    {
279        removeCount++;
280        try
281        {
282            eventQueue.addRemoveEvent( key );
283        }
284        catch ( final IOException ex )
285        {
286            log.error( ex );
287            eventQueue.destroy();
288        }
289        return false;
290    }
291
292    /** Adds a removeAll request to the lateral cache. */
293    @Override
294    public void removeAll()
295    {
296        try
297        {
298            eventQueue.addRemoveAllEvent();
299        }
300        catch ( final IOException ex )
301        {
302            log.error( ex );
303            eventQueue.destroy();
304        }
305    }
306
307    /** Adds a dispose request to the lateral cache. */
308    @Override
309    public void dispose()
310    {
311        try
312        {
313            eventQueue.addDisposeEvent();
314        }
315        catch ( final IOException ex )
316        {
317            log.error( ex );
318            eventQueue.destroy();
319        }
320    }
321
322    /**
323     * No lateral invocation.
324     * <p>
325     * @return The size value
326     */
327    @Override
328    public int getSize()
329    {
330        return cache.getSize();
331    }
332
333    /**
334     * No lateral invocation.
335     * <p>
336     * @return The cacheType value
337     */
338    @Override
339    public CacheType getCacheType()
340    {
341        return cache.getCacheType();
342    }
343
344    /**
345     * Returns the async cache status. An error status indicates either the lateral connection is not
346     * available, or the asyn queue has been unexpectedly destroyed. No lateral invocation.
347     * <p>
348     * @return The status value
349     */
350    @Override
351    public CacheStatus getStatus()
352    {
353        return eventQueue.isWorking() ? cache.getStatus() : CacheStatus.ERROR;
354    }
355
356    /**
357     * Gets the cacheName attribute of the LateralCacheNoWait object
358     * <p>
359     * @return The cacheName value
360     */
361    @Override
362    public String getCacheName()
363    {
364        return cache.getCacheName();
365    }
366
367    /**
368     * Replaces the lateral cache service handle with the given handle and reset the queue by
369     * starting up a new instance.
370     * <p>
371     * @param lateral
372     */
373    public void fixCache( final ICacheServiceNonLocal<K, V> lateral )
374    {
375        cache.fixCache( lateral );
376        resetEventQ();
377    }
378
379    /**
380     * Resets the event q by first destroying the existing one and starting up new one.
381     */
382    public void resetEventQ()
383    {
384        if ( eventQueue.isWorking() )
385        {
386            eventQueue.destroy();
387        }
388        final CacheEventQueueFactory<K, V> fact = new CacheEventQueueFactory<>();
389        this.eventQueue = fact.createCacheEventQueue( new CacheAdaptor<>( cache ),
390                CacheInfo.listenerId, cache.getCacheName(),
391                getAuxiliaryCacheAttributes().getEventQueuePoolName(),
392                getAuxiliaryCacheAttributes().getEventQueueType() );
393    }
394
395    /**
396     * @return Returns the AuxiliaryCacheAttributes.
397     */
398    @Override
399    public ILateralCacheAttributes getAuxiliaryCacheAttributes()
400    {
401        return cache.getAuxiliaryCacheAttributes();
402    }
403
404    /**
405     * getStats
406     * @return String
407     */
408    @Override
409    public String getStats()
410    {
411        return getStatistics().toString();
412    }
413
414    /**
415     * this won't be called since we don't do ICache logging here.
416     * <p>
417     * @return String
418     */
419    @Override
420    public String getEventLoggingExtraInfo()
421    {
422        return "Lateral Cache No Wait";
423    }
424
425    /**
426     * @return statistics about this communication
427     */
428    @Override
429    public IStats getStatistics()
430    {
431        final IStats stats = new Stats();
432        stats.setTypeName( "Lateral Cache No Wait" );
433
434        // get the stats from the event queue too
435        final IStats eqStats = this.eventQueue.getStatistics();
436        final ArrayList<IStatElement<?>> elems = new ArrayList<>(eqStats.getStatElements());
437
438        elems.add(new StatElement<>( "Get Count", Integer.valueOf(this.getCount) ) );
439        elems.add(new StatElement<>( "Remove Count", Integer.valueOf(this.removeCount) ) );
440        elems.add(new StatElement<>( "Put Count", Integer.valueOf(this.putCount) ) );
441        elems.add(new StatElement<>( "Attributes", cache.getAuxiliaryCacheAttributes() ) );
442
443        stats.setStatElements( elems );
444
445        return stats;
446    }
447
448    /**
449     * @return debugging info.
450     */
451    @Override
452    public String toString()
453    {
454        final StringBuilder buf = new StringBuilder();
455        buf.append( " LateralCacheNoWait " );
456        buf.append( " Status = " + this.getStatus() );
457        buf.append( " cache = [" + cache.toString() + "]" );
458        return buf.toString();
459    }
460}