001package org.apache.commons.jcs.auxiliary.remote;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033
034import org.apache.commons.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
035import org.apache.commons.jcs.auxiliary.AuxiliaryCacheAttributes;
036import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheAttributes;
037import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheClient;
038import org.apache.commons.jcs.auxiliary.remote.behavior.IRemoteCacheListener;
039import org.apache.commons.jcs.auxiliary.remote.server.behavior.RemoteType;
040import org.apache.commons.jcs.engine.CacheStatus;
041import org.apache.commons.jcs.engine.ZombieCacheServiceNonLocal;
042import org.apache.commons.jcs.engine.behavior.ICacheElement;
043import org.apache.commons.jcs.engine.behavior.ICacheElementSerialized;
044import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
045import org.apache.commons.jcs.engine.behavior.IZombie;
046import org.apache.commons.jcs.engine.logging.behavior.ICacheEventLogger;
047import org.apache.commons.jcs.engine.stats.StatElement;
048import org.apache.commons.jcs.engine.stats.Stats;
049import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
050import org.apache.commons.jcs.engine.stats.behavior.IStats;
051import org.apache.commons.jcs.utils.serialization.SerializationConversionUtil;
052import org.apache.commons.jcs.utils.threadpool.ThreadPoolManager;
053import org.apache.commons.logging.Log;
054import org.apache.commons.logging.LogFactory;
055
056/** Abstract base for remote caches. I'm trying to break out and reuse common functionality. */
057public abstract class AbstractRemoteAuxiliaryCache<K, V>
058    extends AbstractAuxiliaryCacheEventLogging<K, V>
059    implements IRemoteCacheClient<K, V>
060{
061    /** The logger. */
062    private static final Log log = LogFactory.getLog( AbstractRemoteAuxiliaryCache.class );
063
064    /**
065     * This does the work. In an RMI instances, it will be a remote reference. In an http remote
066     * cache it will be an http client. In zombie mode it is replaced with a balking facade.
067     */
068    private ICacheServiceNonLocal<K, V> remoteCacheService;
069
070    /** The cacheName */
071    protected final String cacheName;
072
073    /** The listener. This can be null. */
074    private IRemoteCacheListener<K, V> remoteCacheListener;
075
076    /** The configuration values. TODO, we'll need a base here. */
077    private IRemoteCacheAttributes remoteCacheAttributes;
078
079    /** A thread pool for gets if configured. */
080    private ThreadPoolExecutor pool = null;
081
082    /** Should we get asynchronously using a pool. */
083    private boolean usePoolForGet = false;
084
085    /**
086     * Creates the base.
087     * <p>
088     * @param cattr
089     * @param remote
090     * @param listener
091     */
092    public AbstractRemoteAuxiliaryCache( IRemoteCacheAttributes cattr, ICacheServiceNonLocal<K, V> remote,
093                                         IRemoteCacheListener<K, V> listener )
094    {
095        this.setRemoteCacheAttributes( cattr );
096        this.cacheName = cattr.getCacheName();
097        this.setRemoteCacheService( remote );
098        this.setRemoteCacheListener( listener );
099
100        if ( log.isDebugEnabled() )
101        {
102            log.debug( "Construct> cacheName=" + cattr.getCacheName() );
103            log.debug( "irca = " + getRemoteCacheAttributes() );
104            log.debug( "remote = " + remote );
105            log.debug( "listener = " + listener );
106        }
107
108        // use a pool if it is greater than 0
109        if ( log.isDebugEnabled() )
110        {
111            log.debug( "GetTimeoutMillis() = " + getRemoteCacheAttributes().getGetTimeoutMillis() );
112        }
113
114        if ( getRemoteCacheAttributes().getGetTimeoutMillis() > 0 )
115        {
116            pool = ThreadPoolManager.getInstance().getPool( getRemoteCacheAttributes().getThreadPoolName() );
117            if ( log.isDebugEnabled() )
118            {
119                log.debug( "Thread Pool = " + pool );
120            }
121            if ( pool != null )
122            {
123                usePoolForGet = true;
124            }
125        }
126    }
127
128    /**
129     * Synchronously dispose the remote cache; if failed, replace the remote handle with a zombie.
130     * <p>
131     * @throws IOException
132     */
133    @Override
134    protected void processDispose()
135        throws IOException
136    {
137        if ( log.isInfoEnabled() )
138        {
139            log.info( "Disposing of remote cache." );
140        }
141        try
142        {
143            if ( getRemoteCacheListener() != null )
144            {
145                getRemoteCacheListener().dispose();
146            }
147        }
148        catch ( Exception ex )
149        {
150            log.error( "Couldn't dispose", ex );
151            handleException( ex, "Failed to dispose [" + cacheName + "]", ICacheEventLogger.DISPOSE_EVENT );
152        }
153    }
154
155    /**
156     * Synchronously get from the remote cache; if failed, replace the remote handle with a zombie.
157     * <p>
158     * Use threadpool to timeout if a value is set for GetTimeoutMillis
159     * <p>
160     * If we are a cluster client, we need to leave the Element in its serialized form. Cluster
161     * clients cannot deserialize objects. Cluster clients get ICacheElementSerialized objects from
162     * other remote servers.
163     * <p>
164     * @param key
165     * @return ICacheElement, a wrapper around the key, value, and attributes
166     * @throws IOException
167     */
168    @Override
169    protected ICacheElement<K, V> processGet( K key )
170        throws IOException
171    {
172        ICacheElement<K, V> retVal = null;
173        try
174        {
175            if ( usePoolForGet )
176            {
177                retVal = getUsingPool( key );
178            }
179            else
180            {
181                retVal = getRemoteCacheService().get( cacheName, key, getListenerId() );
182            }
183
184            // Eventually the instance of will not be necessary.
185            if ( retVal instanceof ICacheElementSerialized )
186            {
187                // Never try to deserialize if you are a cluster client. Cluster
188                // clients are merely intra-remote cache communicators. Remote caches are assumed
189                // to have no ability to deserialize the objects.
190                if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
191                {
192                    retVal = SerializationConversionUtil.getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) retVal,
193                            super.getElementSerializer() );
194                }
195            }
196        }
197        catch ( Exception ex )
198        {
199            handleException( ex, "Failed to get [" + key + "] from [" + cacheName + "]", ICacheEventLogger.GET_EVENT );
200        }
201        return retVal;
202    }
203
204    /**
205     * This allows gets to timeout in case of remote server machine shutdown.
206     * <p>
207     * @param key
208     * @return ICacheElement
209     * @throws IOException
210     */
211    public ICacheElement<K, V> getUsingPool( final K key )
212        throws IOException
213    {
214        int timeout = getRemoteCacheAttributes().getGetTimeoutMillis();
215
216        try
217        {
218            Callable<ICacheElement<K, V>> command = new Callable<ICacheElement<K, V>>()
219            {
220                @Override
221                public ICacheElement<K, V> call()
222                    throws IOException
223                {
224                    return getRemoteCacheService().get( cacheName, key, getListenerId() );
225                }
226            };
227
228            // execute using the pool
229            Future<ICacheElement<K, V>> future = pool.submit(command);
230
231            // used timed get in order to timeout
232            ICacheElement<K, V> ice = future.get(timeout, TimeUnit.MILLISECONDS);
233
234            if ( log.isDebugEnabled() )
235            {
236                if ( ice == null )
237                {
238                    log.debug( "nothing found in remote cache" );
239                }
240                else
241                {
242                    log.debug( "found item in remote cache" );
243                }
244            }
245            return ice;
246        }
247        catch ( TimeoutException te )
248        {
249            log.warn( "TimeoutException, Get Request timed out after " + timeout );
250            throw new IOException( "Get Request timed out after " + timeout );
251        }
252        catch ( InterruptedException ex )
253        {
254            log.warn( "InterruptedException, Get Request timed out after " + timeout );
255            throw new IOException( "Get Request timed out after " + timeout );
256        }
257        catch (ExecutionException ex)
258        {
259            // assume that this is an IOException thrown by the callable.
260            log.error( "ExecutionException, Assuming an IO exception thrown in the background.", ex );
261            throw new IOException( "Get Request timed out after " + timeout );
262        }
263    }
264
265    /**
266     * Calls get matching on the server. Each entry in the result is unwrapped.
267     * <p>
268     * @param pattern
269     * @return Map
270     * @throws IOException
271     */
272    @Override
273    public Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
274        throws IOException
275    {
276        Map<K, ICacheElement<K, V>> results = new HashMap<K, ICacheElement<K, V>>();
277        try
278        {
279            Map<K, ICacheElement<K, V>> rawResults = getRemoteCacheService().getMatching( cacheName, pattern, getListenerId() );
280
281            // Eventually the instance of will not be necessary.
282            if ( rawResults != null )
283            {
284                for (Map.Entry<K, ICacheElement<K, V>> entry : rawResults.entrySet())
285                {
286                    ICacheElement<K, V> unwrappedResult = null;
287                    if ( entry.getValue() instanceof ICacheElementSerialized )
288                    {
289                        // Never try to deserialize if you are a cluster client. Cluster
290                        // clients are merely intra-remote cache communicators. Remote caches are assumed
291                        // to have no ability to deserialize the objects.
292                        if ( this.getRemoteCacheAttributes().getRemoteType() != RemoteType.CLUSTER )
293                        {
294                            unwrappedResult = SerializationConversionUtil
295                                .getDeSerializedCacheElement( (ICacheElementSerialized<K, V>) entry.getValue(),
296                                        super.getElementSerializer() );
297                        }
298                    }
299                    else
300                    {
301                        unwrappedResult = entry.getValue();
302                    }
303                    results.put( entry.getKey(), unwrappedResult );
304                }
305            }
306        }
307        catch ( Exception ex )
308        {
309            handleException( ex, "Failed to getMatching [" + pattern + "] from [" + cacheName + "]",
310                             ICacheEventLogger.GET_EVENT );
311        }
312        return results;
313    }
314
315    /**
316     * Gets multiple items from the cache based on the given set of keys.
317     * <p>
318     * @param keys
319     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
320     *         data in cache for any of these keys
321     * @throws IOException
322     */
323    @Override
324    protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
325        throws IOException
326    {
327        Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
328        if ( keys != null && !keys.isEmpty() )
329        {
330            for (K key : keys)
331            {
332                ICacheElement<K, V> element = get( key );
333
334                if ( element != null )
335                {
336                    elements.put( key, element );
337                }
338            }
339        }
340        return elements;
341    }
342
343    /**
344     * Synchronously remove from the remote cache; if failed, replace the remote handle with a
345     * zombie.
346     * <p>
347     * @param key
348     * @return boolean, whether or not the item was removed
349     * @throws IOException
350     */
351    @Override
352    protected boolean processRemove( K key )
353        throws IOException
354    {
355        if ( !this.getRemoteCacheAttributes().getGetOnly() )
356        {
357            if ( log.isDebugEnabled() )
358            {
359                log.debug( "remove> key=" + key );
360            }
361            try
362            {
363                getRemoteCacheService().remove( cacheName, key, getListenerId() );
364            }
365            catch ( Exception ex )
366            {
367                handleException( ex, "Failed to remove " + key + " from " + cacheName, ICacheEventLogger.REMOVE_EVENT );
368            }
369            return true;
370        }
371        return false;
372    }
373
374    /**
375     * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
376     * zombie.
377     * <p>
378     * @throws IOException
379     */
380    @Override
381    protected void processRemoveAll()
382        throws IOException
383    {
384        if ( !this.getRemoteCacheAttributes().getGetOnly() )
385        {
386            try
387            {
388                getRemoteCacheService().removeAll( cacheName, getListenerId() );
389            }
390            catch ( Exception ex )
391            {
392                handleException( ex, "Failed to remove all from " + cacheName, ICacheEventLogger.REMOVEALL_EVENT );
393            }
394        }
395    }
396
397    /**
398     * Serializes the object and then calls update on the remote server with the byte array. The
399     * byte array is wrapped in a ICacheElementSerialized. This allows the remote server to operate
400     * without any knowledge of caches classes.
401     * <p>
402     * @param ce
403     * @throws IOException
404     */
405    @Override
406    protected void processUpdate( ICacheElement<K, V> ce )
407        throws IOException
408    {
409        if ( !getRemoteCacheAttributes().getGetOnly() )
410        {
411            ICacheElementSerialized<K, V> serialized = null;
412            try
413            {
414                if ( log.isDebugEnabled() )
415                {
416                    log.debug( "sending item to remote server" );
417                }
418
419                // convert so we don't have to know about the object on the
420                // other end.
421                serialized = SerializationConversionUtil.getSerializedCacheElement( ce, super.getElementSerializer() );
422
423                remoteCacheService.update( serialized, getListenerId() );
424            }
425            catch ( NullPointerException npe )
426            {
427                log.error( "npe for ce = " + ce + "ce.attr = " + ce.getElementAttributes(), npe );
428            }
429            catch ( Exception ex )
430            {
431                // event queue will wait and retry
432                handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName(),
433                                 ICacheEventLogger.UPDATE_EVENT );
434            }
435        }
436        else
437        {
438            if ( log.isDebugEnabled() )
439            {
440                log.debug( "get only mode, not sending to remote server" );
441            }
442        }
443    }
444
445    /**
446     * Return the keys in this cache.
447     * <p>
448     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
449     */
450    @Override
451    public Set<K> getKeySet()
452        throws IOException
453    {
454        return getRemoteCacheService().getKeySet(cacheName);
455    }
456
457    /**
458     * Allows other member of this package to access the listener. This is mainly needed for
459     * deregistering a listener.
460     * <p>
461     * @return IRemoteCacheListener, the listener for this remote server
462     */
463    @Override
464    public IRemoteCacheListener<K, V> getListener()
465    {
466        return getRemoteCacheListener();
467    }
468
469    /**
470     * let the remote cache set a listener_id. Since there is only one listener for all the regions
471     * and every region gets registered? the id shouldn't be set if it isn't zero. If it is we
472     * assume that it is a reconnect.
473     * <p>
474     * @param id The new listenerId value
475     */
476    public void setListenerId( long id )
477    {
478        if ( getRemoteCacheListener() != null )
479        {
480            try
481            {
482                getRemoteCacheListener().setListenerId( id );
483
484                if ( log.isDebugEnabled() )
485                {
486                    log.debug( "set listenerId = " + id );
487                }
488            }
489            catch ( Exception e )
490            {
491                log.error( "Problem setting listenerId", e );
492            }
493        }
494    }
495
496    /**
497     * Gets the listenerId attribute of the RemoteCacheListener object
498     * <p>
499     * @return The listenerId value
500     */
501    @Override
502    public long getListenerId()
503    {
504        if ( getRemoteCacheListener() != null )
505        {
506            try
507            {
508                if ( log.isDebugEnabled() )
509                {
510                    log.debug( "get listenerId = " + getRemoteCacheListener().getListenerId() );
511                }
512                return getRemoteCacheListener().getListenerId();
513            }
514            catch ( Exception e )
515            {
516                log.error( "Problem getting listenerId", e );
517            }
518        }
519        return -1;
520    }
521
522    /**
523     * Returns the current cache size.
524     * @return The size value
525     */
526    @Override
527    public int getSize()
528    {
529        return 0;
530    }
531
532    /**
533     * Custom exception handling some children.  This should be used to initiate failover.
534     * <p>
535     * @param ex
536     * @param msg
537     * @param eventName
538     * @throws IOException
539     */
540    protected abstract void handleException( Exception ex, String msg, String eventName )
541        throws IOException;
542
543    /**
544     * Gets the stats attribute of the RemoteCache object.
545     * <p>
546     * @return The stats value
547     */
548    @Override
549    public String getStats()
550    {
551        return getStatistics().toString();
552    }
553
554    /**
555     * @return IStats object
556     */
557    @Override
558    public IStats getStatistics()
559    {
560        IStats stats = new Stats();
561        stats.setTypeName( "AbstractRemoteAuxiliaryCache" );
562
563        ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
564
565        elems.add(new StatElement<String>( "Remote Type", this.getRemoteCacheAttributes().getRemoteTypeName() ) );
566
567//      if ( this.getRemoteCacheAttributes().getRemoteType() == RemoteType.CLUSTER )
568//      {
569//          // something cluster specific
570//      }
571
572        elems.add(new StatElement<Boolean>( "UsePoolForGet", Boolean.valueOf(usePoolForGet) ) );
573
574        if ( pool != null )
575        {
576            elems.add(new StatElement<Integer>( "Pool Size", Integer.valueOf(pool.getPoolSize()) ) );
577            elems.add(new StatElement<Integer>( "Maximum Pool Size", Integer.valueOf(pool.getMaximumPoolSize()) ) );
578        }
579
580        if ( getRemoteCacheService() instanceof ZombieCacheServiceNonLocal )
581        {
582            elems.add(new StatElement<Integer>( "Zombie Queue Size",
583                    Integer.valueOf(( (ZombieCacheServiceNonLocal<K, V>) getRemoteCacheService() ).getQueueSize()) ) );
584        }
585
586        stats.setStatElements( elems );
587
588        return stats;
589    }
590
591    /**
592     * Returns the cache status. An error status indicates the remote connection is not available.
593     * <p>
594     * @return The status value
595     */
596    @Override
597    public CacheStatus getStatus()
598    {
599        return getRemoteCacheService() instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
600    }
601
602    /**
603     * Replaces the current remote cache service handle with the given handle. If the current remote
604     * is a Zombie, then it propagates any events that are queued to the restored service.
605     * <p>
606     * @param restoredRemote ICacheServiceNonLocal -- the remote server or proxy to the remote server
607     */
608    @Override
609    public void fixCache( ICacheServiceNonLocal<?, ?> restoredRemote )
610    {
611        @SuppressWarnings("unchecked") // Don't know how to do this properly
612        ICacheServiceNonLocal<K, V> remote = (ICacheServiceNonLocal<K, V>)restoredRemote;
613        ICacheServiceNonLocal<K, V> prevRemote = getRemoteCacheService();
614        if ( prevRemote instanceof ZombieCacheServiceNonLocal )
615        {
616            ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) prevRemote;
617            setRemoteCacheService( remote );
618            try
619            {
620                zombie.propagateEvents( remote );
621            }
622            catch ( Exception e )
623            {
624                try
625                {
626                    handleException( e, "Problem propagating events from Zombie Queue to new Remote Service.",
627                                     "fixCache" );
628                }
629                catch ( IOException e1 )
630                {
631                    // swallow, since this is just expected kick back.  Handle always throws
632                }
633            }
634        }
635        else
636        {
637            setRemoteCacheService( remote );
638        }
639    }
640
641
642    /**
643     * Gets the cacheType attribute of the RemoteCache object
644     * @return The cacheType value
645     */
646    @Override
647    public CacheType getCacheType()
648    {
649        return CacheType.REMOTE_CACHE;
650    }
651
652    /**
653     * Gets the cacheName attribute of the RemoteCache object.
654     * <p>
655     * @return The cacheName value
656     */
657    @Override
658    public String getCacheName()
659    {
660        return cacheName;
661    }
662
663    /**
664     * @param remote the remote to set
665     */
666    protected void setRemoteCacheService( ICacheServiceNonLocal<K, V> remote )
667    {
668        this.remoteCacheService = remote;
669    }
670
671    /**
672     * @return the remote
673     */
674    protected ICacheServiceNonLocal<K, V> getRemoteCacheService()
675    {
676        return remoteCacheService;
677    }
678
679    /**
680     * @return Returns the AuxiliaryCacheAttributes.
681     */
682    @Override
683    public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
684    {
685        return getRemoteCacheAttributes();
686    }
687
688    /**
689     * @param remoteCacheAttributes the remoteCacheAttributes to set
690     */
691    protected void setRemoteCacheAttributes( IRemoteCacheAttributes remoteCacheAttributes )
692    {
693        this.remoteCacheAttributes = remoteCacheAttributes;
694    }
695
696    /**
697     * @return the remoteCacheAttributes
698     */
699    protected IRemoteCacheAttributes getRemoteCacheAttributes()
700    {
701        return remoteCacheAttributes;
702    }
703
704    /**
705     * @param remoteCacheListener the remoteCacheListener to set
706     */
707    protected void setRemoteCacheListener( IRemoteCacheListener<K, V> remoteCacheListener )
708    {
709        this.remoteCacheListener = remoteCacheListener;
710    }
711
712    /**
713     * @return the remoteCacheListener
714     */
715    protected IRemoteCacheListener<K, V> getRemoteCacheListener()
716    {
717        return remoteCacheListener;
718    }
719}