001package org.apache.commons.jcs.auxiliary.remote;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.rmi.UnmarshalException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCache;
031import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
032import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
033import org.apache.commons.jcs.engine.CacheAdaptor;
034import org.apache.commons.jcs.engine.CacheEventQueueFactory;
035import org.apache.commons.jcs.engine.CacheStatus;
036import org.apache.commons.jcs.engine.behavior.ICacheElement;
037import org.apache.commons.jcs.engine.behavior.ICacheEventQueue;
038import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
039import org.apache.commons.jcs.engine.stats.StatElement;
040import org.apache.commons.jcs.engine.stats.Stats;
041import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
042import org.apache.commons.jcs.engine.stats.behavior.IStats;
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045
046/**
047 * The RemoteCacheNoWait wraps the RemoteCacheClient. The client holds a handle on the
048 * RemoteCacheService.
049 * <p>
050 * Used to queue up update requests to the underlying cache. These requests will be processed in
051 * their order of arrival via the cache event queue processor.
052 * <p>
053 * Typically errors will be handled down stream. We only need to kill the queue if an error makes it
054 * to this level from the queue. That can only happen if the queue is damaged, since the events are
055 * Processed asynchronously.
056 * <p>
057 * There is no reason to create a queue on startup if the remote is not healthy.
058 * <p>
059 * If the remote cache encounters an error it will zombie--create a balking facade for the service.
060 * The Zombie will queue up items until the connection is restored. An alternative way to accomplish
061 * the same thing would be to stop, not destroy the queue at this level. That way items would be
062 * added to the queue and then when the connection is restored, we could start the worker threads
063 * again. This is a better long term solution, but it requires some significant changes to the
064 * complicated worker queues.
065 */
066public class RemoteCacheNoWait<K, V>
067    extends AbstractAuxiliaryCache<K, V>
068{
069    /** log instance */
070    private static final Log log = LogFactory.getLog( RemoteCacheNoWait.class );
071
072    /** The remote cache client */
073    private final IRemoteCacheClient<K, V> remoteCacheClient;
074
075    /** Event queue for queuing up calls like put and remove. */
076    private ICacheEventQueue<K, V> cacheEventQueue;
077
078    /** how many times get has been called. */
079    private int getCount = 0;
080
081    /** how many times getMatching has been called. */
082    private int getMatchingCount = 0;
083
084    /** how many times getMultiple has been called. */
085    private int getMultipleCount = 0;
086
087    /** how many times remove has been called. */
088    private int removeCount = 0;
089
090    /** how many times put has been called. */
091    private int putCount = 0;
092
093    /**
094     * Constructs with the given remote cache, and fires up an event queue for asynchronous
095     * processing.
096     * <p>
097     * @param cache
098     */
099    public RemoteCacheNoWait( IRemoteCacheClient<K, V> cache )
100    {
101        remoteCacheClient = cache;
102        this.cacheEventQueue = createCacheEventQueue(cache);
103
104        if ( remoteCacheClient.getStatus() == CacheStatus.ERROR )
105        {
106            cacheEventQueue.destroy();
107        }
108    }
109
110    /**
111     * Create a cache event queue from the parameters of the remote client
112     * @param client the remote client
113     */
114    private ICacheEventQueue<K, V> createCacheEventQueue( IRemoteCacheClient<K, V> client )
115    {
116        CacheEventQueueFactory<K, V> factory = new CacheEventQueueFactory<K, V>();
117        ICacheEventQueue<K, V> ceq = factory.createCacheEventQueue(
118            new CacheAdaptor<K, V>( client ),
119            client.getListenerId(),
120            client.getCacheName(),
121            client.getAuxiliaryCacheAttributes().getEventQueuePoolName(),
122            client.getAuxiliaryCacheAttributes().getEventQueueType() );
123        return ceq;
124    }
125
126    /**
127     * Adds a put event to the queue.
128     * <p>
129     * @param element
130     * @throws IOException
131     */
132    @Override
133    public void update( ICacheElement<K, V> element )
134        throws IOException
135    {
136        putCount++;
137        try
138        {
139            cacheEventQueue.addPutEvent( element );
140        }
141        catch ( IOException e )
142        {
143            log.error( "Problem adding putEvent to queue.", e );
144            cacheEventQueue.destroy();
145            throw e;
146        }
147    }
148
149    /**
150     * Synchronously reads from the remote cache.
151     * <p>
152     * @param key
153     * @return element from the remote cache, or null if not present
154     * @throws IOException
155     */
156    @Override
157    public ICacheElement<K, V> get( K key )
158        throws IOException
159    {
160        getCount++;
161        try
162        {
163            return remoteCacheClient.get( key );
164        }
165        catch ( UnmarshalException ue )
166        {
167            if ( log.isDebugEnabled() )
168            {
169                log.debug( "Retrying the get owing to UnmarshalException." );
170            }
171
172            try
173            {
174                return remoteCacheClient.get( key );
175            }
176            catch ( IOException ex )
177            {
178                if ( log.isInfoEnabled() )
179                {
180                    log.info( "Failed in retrying the get for the second time. " + ex.getMessage() );
181                }
182            }
183        }
184        catch ( IOException ex )
185        {
186            // We don't want to destroy the queue on a get failure.
187            // The RemoteCache will Zombie and queue.
188            // Since get does not use the queue, I don't want to kill the queue.
189            throw ex;
190        }
191
192        return null;
193    }
194
195    /**
196     * @param pattern
197     * @return Map
198     * @throws IOException
199     *
200     */
201    @Override
202    public Map<K, ICacheElement<K, V>> getMatching( String pattern )
203        throws IOException
204    {
205        getMatchingCount++;
206        try
207        {
208            return remoteCacheClient.getMatching( pattern );
209        }
210        catch ( UnmarshalException ue )
211        {
212            if ( log.isDebugEnabled() )
213            {
214                log.debug( "Retrying the getMatching owing to UnmarshalException." );
215            }
216
217            try
218            {
219                return remoteCacheClient.getMatching( pattern );
220            }
221            catch ( IOException ex )
222            {
223                if ( log.isInfoEnabled() )
224                {
225                    log.info( "Failed in retrying the getMatching for the second time. " + ex.getMessage() );
226                }
227            }
228        }
229        catch ( IOException ex )
230        {
231            // We don't want to destroy the queue on a get failure.
232            // The RemoteCache will Zombie and queue.
233            // Since get does not use the queue, I don't want to kill the queue.
234            throw ex;
235        }
236
237        return Collections.emptyMap();
238    }
239
240    /**
241     * Gets multiple items from the cache based on the given set of keys. Sends the getMultiple
242     * request on to the server rather than looping through the requested keys.
243     * <p>
244     * @param keys
245     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
246     *         data in cache for any of these keys
247     * @throws IOException
248     */
249    @Override
250    public Map<K, ICacheElement<K, V>> getMultiple( Set<K> keys )
251        throws IOException
252    {
253        getMultipleCount++;
254        try
255        {
256            return remoteCacheClient.getMultiple( keys );
257        }
258        catch ( UnmarshalException ue )
259        {
260            if ( log.isDebugEnabled() )
261            {
262                log.debug( "Retrying the getMultiple owing to UnmarshalException..." );
263            }
264
265            try
266            {
267                return remoteCacheClient.getMultiple( keys );
268            }
269            catch ( IOException ex )
270            {
271                if ( log.isInfoEnabled() )
272                {
273                    log.info( "Failed in retrying the getMultiple for the second time. " + ex.getMessage() );
274                }
275            }
276        }
277        catch ( IOException ex )
278        {
279            // We don't want to destroy the queue on a get failure.
280            // The RemoteCache will Zombie and queue.
281            // Since get does not use the queue, I don't want to kill the queue.
282            throw ex;
283        }
284
285        return new HashMap<K, ICacheElement<K, V>>();
286    }
287
288    /**
289     * Return the keys in this cache.
290     * <p>
291     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
292     */
293    @Override
294    public Set<K> getKeySet() throws IOException
295    {
296        return remoteCacheClient.getKeySet();
297    }
298
299    /**
300     * Adds a remove request to the remote cache.
301     * <p>
302     * @param key
303     * @return if this was successful
304     * @throws IOException
305     */
306    @Override
307    public boolean remove( K key )
308        throws IOException
309    {
310        removeCount++;
311        try
312        {
313            cacheEventQueue.addRemoveEvent( key );
314        }
315        catch ( IOException e )
316        {
317            log.error( "Problem adding RemoveEvent to queue.", e );
318            cacheEventQueue.destroy();
319            throw e;
320        }
321        return false;
322    }
323
324    /**
325     * Adds a removeAll request to the remote cache.
326     * <p>
327     * @throws IOException
328     */
329    @Override
330    public void removeAll()
331        throws IOException
332    {
333        try
334        {
335            cacheEventQueue.addRemoveAllEvent();
336        }
337        catch ( IOException e )
338        {
339            log.error( "Problem adding RemoveAllEvent to queue.", e );
340            cacheEventQueue.destroy();
341            throw e;
342        }
343    }
344
345    /** Adds a dispose request to the remote cache. */
346    @Override
347    public void dispose()
348    {
349        try
350        {
351            cacheEventQueue.addDisposeEvent();
352        }
353        catch ( IOException e )
354        {
355            log.error( "Problem adding DisposeEvent to queue.", e );
356            cacheEventQueue.destroy();
357        }
358    }
359
360    /**
361     * No remote invocation.
362     * <p>
363     * @return The size value
364     */
365    @Override
366    public int getSize()
367    {
368        return remoteCacheClient.getSize();
369    }
370
371    /**
372     * No remote invocation.
373     * <p>
374     * @return The cacheType value
375     */
376    @Override
377    public CacheType getCacheType()
378    {
379        return CacheType.REMOTE_CACHE;
380    }
381
382    /**
383     * Returns the asyn cache status. An error status indicates either the remote connection is not
384     * available, or the asyn queue has been unexpectedly destroyed. No remote invocation.
385     * <p>
386     * @return The status value
387     */
388    @Override
389    public CacheStatus getStatus()
390    {
391        return cacheEventQueue.isWorking() ? remoteCacheClient.getStatus() : CacheStatus.ERROR;
392    }
393
394    /**
395     * Gets the cacheName attribute of the RemoteCacheNoWait object
396     * <p>
397     * @return The cacheName value
398     */
399    @Override
400    public String getCacheName()
401    {
402        return remoteCacheClient.getCacheName();
403    }
404
405    /**
406     * Replaces the remote cache service handle with the given handle and reset the event queue by
407     * starting up a new instance.
408     * <p>
409     * @param remote
410     */
411    public void fixCache( ICacheServiceNonLocal<?, ?> remote )
412    {
413        remoteCacheClient.fixCache( remote );
414        resetEventQ();
415    }
416
417    /**
418     * Resets the event q by first destroying the existing one and starting up new one.
419     * <p>
420     * There may be no good reason to kill the existing queue. We will sometimes need to set a new
421     * listener id, so we should create a new queue. We should let the old queue drain. If we were
422     * Connected to the failover, it would be best to finish sending items.
423     */
424    public void resetEventQ()
425    {
426        ICacheEventQueue<K, V> previousQueue = cacheEventQueue;
427
428        this.cacheEventQueue = createCacheEventQueue(this.remoteCacheClient);
429
430        if ( previousQueue.isWorking() )
431        {
432            // we don't expect anything, it would have all gone to the zombie
433            if ( log.isInfoEnabled() )
434            {
435                log.info( "resetEventQ, previous queue has [" + previousQueue.size() + "] items queued up." );
436            }
437            previousQueue.destroy();
438        }
439    }
440
441    /**
442     * This is temporary. It allows the manager to get the lister.
443     * <p>
444     * @return the instance of the remote cache client used by this object
445     */
446    protected IRemoteCacheClient<K, V> getRemoteCache()
447    {
448        return remoteCacheClient;
449    }
450
451    /**
452     * @return Returns the AuxiliaryCacheAttributes.
453     */
454    @Override
455    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
456    {
457        return remoteCacheClient.getAuxiliaryCacheAttributes();
458    }
459
460    /**
461     * This is for testing only. It allows you to take a look at the event queue.
462     * <p>
463     * @return ICacheEventQueue
464     */
465    protected ICacheEventQueue<K, V> getCacheEventQueue()
466    {
467        return this.cacheEventQueue;
468    }
469
470    /**
471     * Returns the stats and the cache.toString().
472     * <p>
473     * @see java.lang.Object#toString()
474     */
475    @Override
476    public String toString()
477    {
478        return getStats() + "\n" + remoteCacheClient.toString();
479    }
480
481    /**
482     * Returns the statistics in String form.
483     * <p>
484     * @return String
485     */
486    @Override
487    public String getStats()
488    {
489        return getStatistics().toString();
490    }
491
492    /**
493     * @return statistics about this communication
494     */
495    @Override
496    public IStats getStatistics()
497    {
498        IStats stats = new Stats();
499        stats.setTypeName( "Remote Cache No Wait" );
500
501        ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
502
503        elems.add(new StatElement<CacheStatus>( "Status", getStatus() ) );
504
505        // get the stats from the cache queue too
506        IStats cStats = this.remoteCacheClient.getStatistics();
507        if ( cStats != null )
508        {
509            elems.addAll(cStats.getStatElements());
510        }
511
512        // get the stats from the event queue too
513        IStats eqStats = this.cacheEventQueue.getStatistics();
514        elems.addAll(eqStats.getStatElements());
515
516        elems.add(new StatElement<Integer>( "Get Count", Integer.valueOf(this.getCount) ) );
517        elems.add(new StatElement<Integer>( "GetMatching Count", Integer.valueOf(this.getMatchingCount) ) );
518        elems.add(new StatElement<Integer>( "GetMultiple Count", Integer.valueOf(this.getMultipleCount) ) );
519        elems.add(new StatElement<Integer>( "Remove Count", Integer.valueOf(this.removeCount) ) );
520        elems.add(new StatElement<Integer>( "Put Count", Integer.valueOf(this.putCount) ) );
521
522        stats.setStatElements( elems );
523
524        return stats;
525    }
526
527    /**
528     * this won't be called since we don't do ICache logging here.
529     * <p>
530     * @return String
531     */
532    @Override
533    public String getEventLoggingExtraInfo()
534    {
535        return "Remote Cache No Wait";
536    }
537}