001    package org.apache.jcs.auxiliary.lateral;
002    
003    /*
004     * Licensed to the Apache Software Foundation (ASF) under one
005     * or more contributor license agreements.  See the NOTICE file
006     * distributed with this work for additional information
007     * regarding copyright ownership.  The ASF licenses this file
008     * to you under the Apache License, Version 2.0 (the
009     * "License"); you may not use this file except in compliance
010     * with the License.  You may obtain a copy of the License at
011     *
012     *   http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing,
015     * software distributed under the License is distributed on an
016     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017     * KIND, either express or implied.  See the License for the
018     * specific language governing permissions and limitations
019     * under the License.
020     */
021    
022    import java.io.IOException;
023    import java.io.Serializable;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    import org.apache.jcs.auxiliary.AbstractAuxiliaryCacheEventLogging;
032    import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
033    import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
034    import org.apache.jcs.engine.CacheStatus;
035    import org.apache.jcs.engine.ZombieCacheServiceNonLocal;
036    import org.apache.jcs.engine.behavior.ICacheElement;
037    import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
038    import org.apache.jcs.engine.behavior.IZombie;
039    import org.apache.jcs.engine.stats.Stats;
040    import org.apache.jcs.engine.stats.behavior.IStats;
041    
042    /**
043     * Lateral distributor. Returns null on get by default. Net search not implemented.
044     */
045    public class LateralCache<K extends Serializable, V extends Serializable>
046        extends AbstractAuxiliaryCacheEventLogging<K, V>
047    {
048        /** Don't change. */
049        private static final long serialVersionUID = 6274549256562382782L;
050    
051        /** The logger. */
052        private final static Log log = LogFactory.getLog( LateralCache.class );
053    
054        /** generalize this, use another interface */
055        private final ILateralCacheAttributes lateralCacheAttribures;
056    
057        /** The region name */
058        final String cacheName;
059    
060        /** either http, socket.udp, or socket.tcp can set in config */
061        private ICacheServiceNonLocal<K, V> lateralCacheService;
062    
063        /** Monitors the connection. */
064        private LateralCacheMonitor monitor;
065    
066        /**
067         * Constructor for the LateralCache object
068         * <p>
069         * @param cattr
070         * @param lateral
071         * @param monitor
072         */
073        public LateralCache( ILateralCacheAttributes cattr, ICacheServiceNonLocal<K, V> lateral, LateralCacheMonitor monitor )
074        {
075            this.cacheName = cattr.getCacheName();
076            this.lateralCacheAttribures = cattr;
077            this.lateralCacheService = lateral;
078            this.monitor = monitor;
079        }
080    
081        /**
082         * Constructor for the LateralCache object
083         * <p>
084         * @param cattr
085         */
086        public LateralCache( ILateralCacheAttributes cattr )
087        {
088            this.cacheName = cattr.getCacheName();
089            this.lateralCacheAttribures = cattr;
090        }
091    
092        /**
093         * Update lateral.
094         * <p>
095         * @param ce
096         * @throws IOException
097         */
098        @Override
099        protected void processUpdate( ICacheElement<K, V> ce )
100            throws IOException
101        {
102            try
103            {
104                if ( log.isDebugEnabled() )
105                {
106                    log.debug( "update: lateral = [" + lateralCacheService + "], " + "LateralCacheInfo.listenerId = "
107                        + LateralCacheInfo.listenerId );
108                }
109                lateralCacheService.update( ce, LateralCacheInfo.listenerId );
110            }
111            catch ( NullPointerException npe )
112            {
113                log.error( "Failure updating lateral. lateral = " + lateralCacheService, npe );
114                handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttribures );
115                return;
116            }
117            catch ( Exception ex )
118            {
119                handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + lateralCacheAttribures );
120            }
121        }
122    
123        /**
124         * The performance costs are too great. It is not recommended that you enable lateral gets.
125         * <p>
126         * @param key
127         * @return ICacheElement<K, V> or null
128         * @throws IOException
129         */
130        @Override
131        protected ICacheElement<K, V> processGet( K key )
132            throws IOException
133        {
134            ICacheElement<K, V> obj = null;
135    
136            if ( this.lateralCacheAttribures.getPutOnlyMode() )
137            {
138                return null;
139            }
140            try
141            {
142                obj = lateralCacheService.get( cacheName, key );
143            }
144            catch ( Exception e )
145            {
146                log.error( e );
147                handleException( e, "Failed to get [" + key + "] from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
148            }
149            return obj;
150        }
151    
152        /**
153         * @param pattern
154         * @return A map of K key to ICacheElement<K, V> element, or an empty map if there is no
155         *         data in cache for any of these keys
156         * @throws IOException
157         */
158        @Override
159        protected Map<K, ICacheElement<K, V>> processGetMatching( String pattern )
160            throws IOException
161        {
162            if ( this.lateralCacheAttribures.getPutOnlyMode() )
163            {
164                return Collections.emptyMap();
165            }
166            try
167            {
168                return lateralCacheService.getMatching( cacheName, pattern );
169            }
170            catch ( IOException e )
171            {
172                log.error( e );
173                handleException( e, "Failed to getMatching [" + pattern + "] from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
174                return Collections.emptyMap();
175            }
176        }
177    
178        /**
179         * Gets multiple items from the cache based on the given set of keys.
180         * <p>
181         * @param keys
182         * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
183         *         data in cache for any of these keys
184         * @throws IOException
185         */
186        @Override
187        protected Map<K, ICacheElement<K, V>> processGetMultiple( Set<K> keys )
188            throws IOException
189        {
190            Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
191    
192            if ( keys != null && !keys.isEmpty() )
193            {
194                for (K key : keys)
195                {
196                    ICacheElement<K, V> element = get( key );
197    
198                    if ( element != null )
199                    {
200                        elements.put( key, element );
201                    }
202                }
203            }
204    
205            return elements;
206        }
207    
208        /**
209         * Gets the set of keys of objects currently in the group.
210         * <p>
211         * @param group
212         * @return a Set of group keys.
213         * @throws IOException
214         */
215        public Set<K> getGroupKeys( String groupName )
216            throws IOException
217        {
218            try
219            {
220                return lateralCacheService.getGroupKeys( cacheName, groupName );
221            }
222            catch ( Exception ex )
223            {
224                handleException( ex, "Failed to remove groupName [" + groupName + "] from " + lateralCacheAttribures.getCacheName() + "@"
225                    + lateralCacheAttribures );
226            }
227            return Collections.emptySet();
228        }
229    
230        /**
231         * Gets the set of group names in the cache
232         * <p>
233         * @return a Set of group names.
234         * @throws IOException
235         */
236        public Set<String> getGroupNames()
237            throws IOException
238        {
239            try
240            {
241                return lateralCacheService.getGroupNames( cacheName );
242            }
243            catch ( Exception ex )
244            {
245                handleException( ex, "Failed to get group names from " + lateralCacheAttribures.getCacheName() + "@"
246                    + lateralCacheAttribures );
247            }
248            return Collections.emptySet();
249        }
250    
251        /**
252         * Synchronously remove from the remote cache; if failed, replace the remote handle with a
253         * zombie.
254         * <p>
255         * @param key
256         * @return false always
257         * @throws IOException
258         */
259        @Override
260        protected boolean processRemove( K key )
261            throws IOException
262        {
263            if ( log.isDebugEnabled() )
264            {
265                log.debug( "removing key:" + key );
266            }
267    
268            try
269            {
270                lateralCacheService.remove( cacheName, key, LateralCacheInfo.listenerId );
271            }
272            catch ( Exception ex )
273            {
274                handleException( ex, "Failed to remove " + key + " from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
275            }
276            return false;
277        }
278    
279        /**
280         * Synchronously removeAll from the remote cache; if failed, replace the remote handle with a
281         * zombie.
282         * <p>
283         * @throws IOException
284         */
285        @Override
286        protected void processRemoveAll()
287            throws IOException
288        {
289            try
290            {
291                lateralCacheService.removeAll( cacheName, LateralCacheInfo.listenerId );
292            }
293            catch ( Exception ex )
294            {
295                handleException( ex, "Failed to remove all from " + lateralCacheAttribures.getCacheName() + "@" + lateralCacheAttribures );
296            }
297        }
298    
299        /**
300         * Synchronously dispose the cache. Not sure we want this.
301         * <p>
302         * @throws IOException
303         */
304        @Override
305        protected void processDispose()
306            throws IOException
307        {
308            log.debug( "Disposing of lateral cache" );
309    
310            ///* HELP: This section did nothing but generate compilation warnings.
311            // TODO: may limit this functionality. It is dangerous.
312            // asmuts -- Added functionality to help with warnings. I'm not getting
313            // any.
314            try
315            {
316                lateralCacheService.dispose( this.lateralCacheAttribures.getCacheName() );
317                // Should remove connection
318            }
319            catch ( Exception ex )
320            {
321                log.error( "Couldn't dispose", ex );
322                handleException( ex, "Failed to dispose " + lateralCacheAttribures.getCacheName() );
323            }
324        }
325    
326        /**
327         * Returns the cache status.
328         * <p>
329         * @return The status value
330         */
331        public CacheStatus getStatus()
332        {
333            return this.lateralCacheService instanceof IZombie ? CacheStatus.ERROR : CacheStatus.ALIVE;
334        }
335    
336        /**
337         * Returns the current cache size.
338         * <p>
339         * @return The size value
340         */
341        public int getSize()
342        {
343            return 0;
344        }
345    
346        /**
347         * Gets the cacheType attribute of the LateralCache object
348         * <p>
349         * @return The cacheType value
350         */
351        public CacheType getCacheType()
352        {
353            return CacheType.LATERAL_CACHE;
354        }
355    
356        /**
357         * Gets the cacheName attribute of the LateralCache object
358         * <p>
359         * @return The cacheName value
360         */
361        public String getCacheName()
362        {
363            return cacheName;
364        }
365    
366        /**
367         * Not yet sure what to do here.
368         * <p>
369         * @param ex
370         * @param msg
371         * @throws IOException
372         */
373        private void handleException( Exception ex, String msg )
374            throws IOException
375        {
376            log.error( "Disabling lateral cache due to error " + msg, ex );
377    
378            lateralCacheService = new ZombieCacheServiceNonLocal<K, V>( lateralCacheAttribures.getZombieQueueMaxSize() );
379            // may want to flush if region specifies
380            // Notify the cache monitor about the error, and kick off the recovery
381            // process.
382            monitor.notifyError();
383    
384            // could stop the net search if it is built and try to reconnect?
385            if ( ex instanceof IOException )
386            {
387                throw (IOException) ex;
388            }
389            throw new IOException( ex.getMessage() );
390        }
391    
392        /**
393         * Replaces the current remote cache service handle with the given handle.
394         * <p>
395         * @param restoredLateral
396         */
397        public void fixCache( ICacheServiceNonLocal<K, V> restoredLateral )
398        {
399            if ( this.lateralCacheService != null && this.lateralCacheService instanceof ZombieCacheServiceNonLocal )
400            {
401                ZombieCacheServiceNonLocal<K, V> zombie = (ZombieCacheServiceNonLocal<K, V>) this.lateralCacheService;
402                this.lateralCacheService = restoredLateral;
403                try
404                {
405                    zombie.propagateEvents( restoredLateral );
406                }
407                catch ( Exception e )
408                {
409                    try
410                    {
411                        handleException( e, "Problem propagating events from Zombie Queue to new Lateral Service." );
412                    }
413                    catch ( IOException e1 )
414                    {
415                        // swallow, since this is just expected kick back.  Handle always throws
416                    }
417                }
418            }
419            else
420            {
421                this.lateralCacheService = restoredLateral;
422            }
423        }
424    
425        /**
426         * getStats
427         * <p>
428         * @return String
429         */
430        public String getStats()
431        {
432            return "";
433        }
434    
435        /**
436         * @return Returns the AuxiliaryCacheAttributes.
437         */
438        public AuxiliaryCacheAttributes getAuxiliaryCacheAttributes()
439        {
440            return lateralCacheAttribures;
441        }
442    
443        /**
444         * @return debugging data.
445         */
446        @Override
447        public String toString()
448        {
449            StringBuffer buf = new StringBuffer();
450            buf.append( "\n LateralCache " );
451            buf.append( "\n Cache Name [" + lateralCacheAttribures.getCacheName() + "]" );
452            buf.append( "\n cattr =  [" + lateralCacheAttribures + "]" );
453            return buf.toString();
454        }
455    
456        /**
457         * @return extra data.
458         */
459        @Override
460        public String getEventLoggingExtraInfo()
461        {
462            return null;
463        }
464    
465        /**
466         * The NoWait on top does not call out to here yet.
467         * <p>
468         * @return almost nothing
469         */
470        public IStats getStatistics()
471        {
472            IStats stats = new Stats();
473            stats.setTypeName( "LateralCache" );
474            return stats;
475        }
476    }