001package org.apache.commons.jcs3.auxiliary.remote;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Future;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033
034import org.apache.commons.jcs3.auxiliary.AbstractAuxiliaryCacheEventLogging;
035import org.apache.commons.jcs3.auxiliary.AuxiliaryCacheAttributes;
036import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes;
037import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient;
038import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener;
039import org.apache.commons.jcs3.auxiliary.remote.server.behavior.RemoteType;
040import org.apache.commons.jcs3.engine.CacheStatus;
041import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal;
042import org.apache.commons.jcs3.engine.behavior.ICacheElement;
043import org.apache.commons.jcs3.engine.behavior.ICacheElementSerialized;
044import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
045import org.apache.commons.jcs3.engine.behavior.IZombie;
046import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger;
047import org.apache.commons.jcs3.engine.stats.StatElement;
048import org.apache.commons.jcs3.engine.stats.Stats;
049import org.apache.commons.jcs3.engine.stats.behavior.IStatElement;
050import org.apache.commons.jcs3.engine.stats.behavior.IStats;
051import org.apache.commons.jcs3.log.Log;
052import org.apache.commons.jcs3.log.LogManager;
053import org.apache.commons.jcs3.utils.serialization.SerializationConversionUtil;
054import org.apache.commons.jcs3.utils.threadpool.ThreadPoolManager;
055
056/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
057public abstract class AbstractRemoteAuxiliaryCache<K, V>
058    extends AbstractAuxiliaryCacheEventLogging<K, V>
059    implements IRemoteCacheClient<K, V>
060{
061    /** The logger. */
062    private static final Log log = LogManager.getLog( AbstractRemoteAuxiliaryCache.class );
063
064    /**
065     * This does the work. In an RMI instances, it will be a remote reference. In an http remote
066     * cache it will be an http client. In zombie mode it is replaced with a balking facade.
067     */
068    private ICacheServiceNonLocal<K, V> remoteCacheService;
069
070    /** The cacheName */
071    protected final String cacheName;
072
073    /** The listener. This can be null. */
074    private IRemoteCacheListener<K, V> remoteCacheListener;
075
076    /** The configuration values. TODO, we'll need a base here. */
077    private IRemoteCacheAttributes remoteCacheAttributes;
078
079    /** A thread pool for gets if configured. */
080    private ExecutorService pool;
081
082    /** Should we get asynchronously using a pool. */
083    private boolean usePoolForGet;
084
085    /**
086     * Creates the base.
087     * <p>
088     * @param cattr
089     * @param remote
090     * @param listener
091     */
092    public AbstractRemoteAuxiliaryCache( final IRemoteCacheAttributes cattr, final ICacheServiceNonLocal<K, V> remote,
093                                         final IRemoteCacheListener<K, V> listener )
094    {
095        this.setRemoteCacheAttributes( cattr );
096        this.cacheName = cattr.getCacheName();
097        this.setRemoteCacheService( remote );
098        this.setRemoteCacheListener( listener );
099
100        if ( log.isDebugEnabled() )
101        {
102            log.debug( "Construct> cacheName={0}", cattr::getCacheName);
103            log.debug( "irca = {0}", this::getRemoteCacheAttributes);
104            log.debug( "remote = {0}", remote );
105            log.debug( "listener = {0}", listener );
106        }
107
108        // use a pool if it is greater than 0
109        log.debug( "GetTimeoutMillis() = {0}",
110                () -> getRemoteCacheAttributes().getGetTimeoutMillis() );
111
112        if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
113        {
114            pool = ThreadPoolManager.getInstance().getExecutorService( getRemoteCacheAttributes().getThreadPoolName() );
115            log.debug( "Thread Pool = {0}", pool );
116            usePoolForGet = true;
117        }
118    }
119
120    /**
121     * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
122     * <p>
123     * @throws IOException
124     */
125    @Override
126    protected void processDispose()
127        throws IOException
128    {
129        log.info( "Disposing of remote cache." );
130        try
131        {
132            if ( getRemoteCacheListener() != null )
133            {
134                getRemoteCacheListener().dispose();
135            }
136        }
137        catch ( final IOException ex )
138        {
139            log.error( "Couldn't dispose", ex );
140            handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
141        }
142    }
143
144    /**
145     * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
146     * <p>
147     * Use threadpool to timeout if a value is set for GetTimeoutMillis
148     * <p>
149     * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
150     * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
151     * other remote servers.
152     * <p>
153     * @param key
154     * @return ICacheElement, a wrapper around the key, value, and attributes
155     * @throws IOException
156     */
157    @Override
158    protected ICacheElement<K, V> processGet( final K key )
159        throws IOException
160    {
161        ICacheElement<K, V> retVal = null;
162        try
163        {
164            if ( usePoolForGet )
165            {
166                retVal = getUsingPool( key );
167            }
168            else
169            {
170                retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
171            }
172
173            // Eventually the instance of will not be necessary.
174            // Never try to deserialize if you are a cluster client. Cluster
175            // clients are merely intra-remote cache communicators. Remote caches are assumed
176            // to have no ability to deserialize the objects.
177            if (retVal instanceof ICacheElementSerialized && this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER)
178            {
179                retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
180                        super.getElementSerializer() );
181            }
182        }
183        catch ( final IOException | ClassNotFoundException ex )
184        {
185            handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
186        }
187        return retVal;
188    }
189
190    /**
191     * This allows gets to timeout in case of remote server machine shutdown.
192     * <p>
193     * @param key
194     * @return ICacheElement
195     * @throws IOException
196     */
197    public ICacheElement<K, V> getUsingPool( final K key )
198        throws IOException
199    {
200        final int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
201
202        try
203        {
204            final Callable<ICacheElement<K, V>> command = () -> getRemoteCacheService().get( cacheName, key, getListenerId() );
205
206            // execute using the pool
207            final Future<ICacheElement<K, V>> future = pool.submit(command);
208
209            // used timed get in order to timeout
210            final ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
211
212            if ( ice == null )
213            {
214                log.debug( "nothing found in remote cache" );
215            }
216            else
217            {
218                log.debug( "found item in remote cache" );
219            }
220            return ice;
221        }
222        catch ( final TimeoutException te )
223        {
224            log.warn( "TimeoutException, Get Request timed out after {0}", timeout );
225            throw new IOException( "Get Request timed out after " + timeout );
226        }
227        catch ( final InterruptedException ex )
228        {
229            log.warn( "InterruptedException, Get Request timed out after {0}", timeout );
230            throw new IOException( "Get Request timed out after " + timeout );
231        }
232        catch (final ExecutionException ex)
233        {
234            // assume that this is an IOException thrown by the callable.
235            log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
236            throw new IOException( "Get Request timed out after " + timeout );
237        }
238    }
239
240    /**
241     * Calls get matching on the server. Each entry in the result is unwrapped.
242     * <p>
243     * @param pattern
244     * @return Map
245     * @throws IOException
246     */
247    @Override
248    public Map<K, ICacheElement<K, V>> processGetMatching( final String pattern )
249        throws IOException
250    {
251        final Map<K, ICacheElement<K, V>> results = new HashMap<>();
252        try
253        {
254            final Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
255
256            // Eventually the instance of will not be necessary.
257            if ( rawResults != null )
258            {
259                for (final Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
260                {
261                    ICacheElement<K, V> unwrappedResult = null;
262                    if ( entry.getValue() instanceof ICacheElementSerialized )
263                    {
264                        // Never try to deserialize if you are a cluster client. Cluster
265                        // clients are merely intra-remote cache communicators. Remote caches are assumed
266                        // to have no ability to deserialize the objects.
267                        if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
268                        {
269                            unwrappedResult = SerializationConversionUtil
270                                .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
271                                        super.getElementSerializer() );
272                        }
273                    }
274                    else
275                    {
276                        unwrappedResult = entry.getValue();
277                    }
278                    results.put( entry.getKey(), unwrappedResult );
279                }
280            }
281        }
282        catch ( final IOException | ClassNotFoundException ex )
283        {
284            handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
285                             ICacheEventLogger.GET_EVENT );
286        }
287        return results;
288    }
289
290    /**
291     * Synchronously remove from the remote cache; if failed, replace the remote handle with a
292     * zombie.
293     * <p>
294     * @param key
295     * @return boolean, whether or not the item was removed
296     * @throws IOException
297     */
298    @Override
299    protected boolean processRemove( final K key )
300        throws IOException
301    {
302        if ( !this.getRemoteCacheAttributes().getGetOnly() )
303        {
304            log.debug( "remove> key={0}", key );
305            try
306            {
307                getRemoteCacheService().remove( cacheName, key, getListenerId() );
308            }
309            catch ( final IOException ex )
310            {
311                handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
312            }
313            return true;
314        }
315        return false;
316    }
317
318    /**
319     * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
320     * zombie.
321     * <p>
322     * @throws IOException
323     */
324    @Override
325    protected void processRemoveAll()
326        throws IOException
327    {
328        if ( !this.getRemoteCacheAttributes().getGetOnly() )
329        {
330            try
331            {
332                getRemoteCacheService().removeAll( cacheName, getListenerId() );
333            }
334            catch ( final IOException ex )
335            {
336                handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
337            }
338        }
339    }
340
341    /**
342     * Serializes the object and then calls update on the remote server with the byte array. The
343     * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
344     * without any knowledge of caches classes.
345     * <p>
346     * @param ce
347     * @throws IOException
348     */
349    @Override
350    protected void processUpdate( final ICacheElement<K, V> ce )
351        throws IOException
352    {
353        if ( !getRemoteCacheAttributes().getGetOnly() )
354        {
355            try
356            {
357                log.debug( "sending item to remote server" );
358
359                // convert so we don't have to know about the object on the
360                // other end.
361                ICacheElementSerialized<K, V> serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
362
363                remoteCacheService.update( serialized, getListenerId() );
364            }
365            catch ( final IOException ex )
366            {
367                // event queue will wait and retry
368                handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
369                                 ICacheEventLogger.UPDATE_EVENT );
370            }
371        }
372        else
373        {
374            log.debug( "get only mode, not sending to remote server" );
375        }
376    }
377
378    /**
379     * Return the keys in this cache.
380     * <p>
381     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
382     */
383    @Override
384    public Set<K> getKeySet()
385        throws IOException
386    {
387        return getRemoteCacheService().getKeySet(cacheName);
388    }
389
390    /**
391     * Allows other member of this package to access the listener. This is mainly needed for
392     * deregistering a listener.
393     * <p>
394     * @return IRemoteCacheListener, the listener for this remote server
395     */
396    @Override
397    public IRemoteCacheListener<K, V> getListener()
398    {
399        return getRemoteCacheListener();
400    }
401
402    /**
403     * let the remote cache set a listener_id. Since there is only one listener for all the regions
404     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
405     * assume that it is a reconnect.
406     * <p>
407     * @param id The new listenerId value
408     */
409    public void setListenerId( final long id )
410    {
411        if ( getRemoteCacheListener() != null )
412        {
413            try
414            {
415                getRemoteCacheListener().setListenerId( id );
416
417                log.debug( "set listenerId = {0}", id );
418            }
419            catch ( final IOException e )
420            {
421                log.error( "Problem setting listenerId", e );
422            }
423        }
424    }
425
426    /**
427     * Gets the listenerId attribute of the RemoteCacheListener object
428     * <p>
429     * @return The listenerId value
430     */
431    @Override
432    public long getListenerId()
433    {
434        if ( getRemoteCacheListener() != null )
435        {
436            try
437            {
438                log.debug( "get listenerId = {0}", getRemoteCacheListener().getListenerId() );
439                return getRemoteCacheListener().getListenerId();
440            }
441            catch ( final IOException e )
442            {
443                log.error( "Problem getting listenerId", e );
444            }
445        }
446        return -1;
447    }
448
449    /**
450     * Returns the current cache size.
451     * @return The size value
452     */
453    @Override
454    public int getSize()
455    {
456        return 0;
457    }
458
459    /**
460     * Custom exception handling some children.  This should be used to initiate failover.
461     * <p>
462     * @param ex
463     * @param msg
464     * @param eventName
465     * @throws IOException
466     */
467    protected abstract void handleException( Exception ex, String msg, String eventName )
468        throws IOException;
469
470    /**
471     * Gets the stats attribute of the RemoteCache object.
472     * <p>
473     * @return The stats value
474     */
475    @Override
476    public String getStats()
477    {
478        return getStatistics().toString();
479    }
480
481    /**
482     * @return IStats object
483     */
484    @Override
485    public IStats getStatistics()
486    {
487        final IStats stats = new Stats();
488        stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
489
490        final ArrayList<IStatElement<?>> elems = new ArrayList<>();
491
492        elems.add(new StatElement<>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
493
494//      if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
495//      {
496//          // something cluster specific
497//      }
498
499        elems.add(new StatElement<>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
500
501        if ( pool != null )
502        {
503            elems.add(new StatElement<>( "Pool", pool ) );
504        }
505
506        if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
507        {
508            elems.add(new StatElement<>( "Zombie Queue Size",
509                    Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
510        }
511
512        stats.setStatElements( elems );
513
514        return stats;
515    }
516
517    /**
518     * Returns the cache status. An error status indicates the remote connection is not available.
519     * <p>
520     * @return The status value
521     */
522    @Override
523    public CacheStatus getStatus()
524    {
525        return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
526    }
527
528    /**
529     * Replaces the current remote cache service handle with the given handle. If the current remote
530     * is a Zombie, then it propagates any events that are queued to the restored service.
531     * <p>
532     * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
533     */
534    @Override
535    public void fixCache( final ICacheServiceNonLocal<?, ?> restoredRemote )
536    {
537        @SuppressWarnings("unchecked") // Don't know how to do this properly
538        final
539        ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
540        final ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
541        if ( prevRemote instanceof ZombieCacheServiceNonLocal )
542        {
543            final ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
544            setRemoteCacheService( remote );
545            try
546            {
547                zombie.propagateEvents( remote );
548            }
549            catch ( final Exception e )
550            {
551                try
552                {
553                    handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
554                                     "fixCache" );
555                }
556                catch ( final IOException e1 )
557                {
558                    // swallow, since this is just expected kick back.  Handle always throws
559                }
560            }
561        }
562        else
563        {
564            setRemoteCacheService( remote );
565        }
566    }
567
568
569    /**
570     * Gets the cacheType attribute of the RemoteCache object
571     * @return The cacheType value
572     */
573    @Override
574    public CacheType getCacheType()
575    {
576        return CacheType.REMOTE_CACHE;
577    }
578
579    /**
580     * Gets the cacheName attribute of the RemoteCache object.
581     * <p>
582     * @return The cacheName value
583     */
584    @Override
585    public String getCacheName()
586    {
587        return cacheName;
588    }
589
590    /**
591     * @param remote the remote to set
592     */
593    protected void setRemoteCacheService( final ICacheServiceNonLocal<K, V> remote )
594    {
595        this.remoteCacheService = remote;
596    }
597
598    /**
599     * @return the remote
600     */
601    protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
602    {
603        return remoteCacheService;
604    }
605
606    /**
607     * @return Returns the AuxiliaryCacheAttributes.
608     */
609    @Override
610    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
611    {
612        return getRemoteCacheAttributes();
613    }
614
615    /**
616     * @param remoteCacheAttributes the remoteCacheAttributes to set
617     */
618    protected void setRemoteCacheAttributes( final IRemoteCacheAttributes remoteCacheAttributes )
619    {
620        this.remoteCacheAttributes = remoteCacheAttributes;
621    }
622
623    /**
624     * @return the remoteCacheAttributes
625     */
626    protected IRemoteCacheAttributes getRemoteCacheAttributes()
627    {
628        return remoteCacheAttributes;
629    }
630
631    /**
632     * @param remoteCacheListener the remoteCacheListener to set
633     */
634    protected void setRemoteCacheListener( final IRemoteCacheListener<K, V> remoteCacheListener )
635    {
636        this.remoteCacheListener = remoteCacheListener;
637    }
638
639    /**
640     * @return the remoteCacheListener
641     */
642    protected IRemoteCacheListener<K, V> getRemoteCacheListener()
643    {
644        return remoteCacheListener;
645    }
646}