1 package org.apache.commons.jcs3.engine;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.IOException;
23 import java.util.Collections;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArraySet;
28
29 import org.apache.commons.jcs3.engine.behavior.ICacheListener;
30 import org.apache.commons.jcs3.engine.behavior.ICacheObserver;
31 import org.apache.commons.jcs3.log.Log;
32 import org.apache.commons.jcs3.log.LogManager;
33
34 /**
35 * Intercepts the requests to the underlying ICacheObserver object so that the listeners can be
36 * recorded locally for remote connection recovery purposes. (Durable subscription like those in JMS
37 * is not implemented at this stage for it can be too expensive.)
38 */
39 public class CacheWatchRepairable
40 implements ICacheObserver
41 {
42 /** The logger */
43 private static final Log log = LogManager.getLog( CacheWatchRepairable.class );
44
45 /** the underlying ICacheObserver. */
46 private ICacheObserver cacheWatch;
47
48 /** Map of cache regions. */
49 private final ConcurrentMap<String, Set<ICacheListener<?, ?>>> cacheMap =
50 new ConcurrentHashMap<>();
51
52 /**
53 * Replaces the underlying cache watch service and re-attaches all existing listeners to the new
54 * cache watch.
55 * <p>
56 * @param cacheWatch The new cacheWatch value
57 */
58 public void setCacheWatch( final ICacheObserver cacheWatch )
59 {
60 this.cacheWatch = cacheWatch;
61 cacheMap.forEach((cacheName, value) -> value.forEach(listener -> {
62 try
63 {
64 log.info( "Adding listener to cache watch. ICacheListener = "
65 + "{0} | ICacheObserver = {1}", listener, cacheWatch );
66 cacheWatch.addCacheListener( cacheName, listener );
67 }
68 catch ( final IOException ex )
69 {
70 log.error( "Problem adding listener. ICacheListener = {0} | "
71 + "ICacheObserver = {1}", listener, cacheWatch, ex );
72 }
73 }));
74 }
75
76 /**
77 * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
78 * <p>
79 * @param cacheName The feature to be added to the CacheListener attribute
80 * @param obj The feature to be added to the CacheListener attribute
81 * @throws IOException
82 */
83 @Override
84 public <K, V> void addCacheListener( final String cacheName, final ICacheListener<K, V> obj )
85 throws IOException
86 {
87 // Record the added cache listener locally, regardless of whether the
88 // remote add-listener operation succeeds or fails.
89 cacheMap.computeIfAbsent(cacheName, key -> new CopyOnWriteArraySet<>(Collections.singletonList(obj)));
90
91 log.info( "Adding listener to cache watch. ICacheListener = {0} | "
92 + "ICacheObserver = {1} | cacheName = {2}", obj, cacheWatch,
93 cacheName );
94 cacheWatch.addCacheListener( cacheName, obj );
95 }
96
97 /**
98 * Adds a feature to the CacheListener attribute of the CacheWatchRepairable object
99 * <p>
100 * @param obj The feature to be added to the CacheListener attribute
101 * @throws IOException
102 */
103 @Override
104 public <K, V> void addCacheListener( final ICacheListener<K, V> obj )
105 throws IOException
106 {
107 // Record the added cache listener locally, regardless of whether the
108 // remote add-listener operation succeeds or fails.
109 cacheMap.values().forEach(set -> set.add(obj));
110
111 log.info( "Adding listener to cache watch. ICacheListener = {0} | "
112 + "ICacheObserver = {1}", obj, cacheWatch );
113 cacheWatch.addCacheListener( obj );
114 }
115
116 /**
117 * Tell the server to release us.
118 * <p>
119 * @param cacheName
120 * @param obj
121 * @throws IOException
122 */
123 @Override
124 public <K, V> void removeCacheListener( final String cacheName, final ICacheListener<K, V> obj )
125 throws IOException
126 {
127 log.info( "removeCacheListener, cacheName [{0}]", cacheName );
128 // Record the removal locally, regardless of whether the remote
129 // remove-listener operation succeeds or fails.
130 final Set<ICacheListener<?, ?>> listenerSet = cacheMap.get( cacheName );
131 if ( listenerSet != null )
132 {
133 listenerSet.remove( obj );
134 }
135 cacheWatch.removeCacheListener( cacheName, obj );
136 }
137
138 /**
139 * @param obj
140 * @throws IOException
141 */
142 @Override
143 public <K, V> void removeCacheListener( final ICacheListener<K, V> obj )
144 throws IOException
145 {
146 log.info( "removeCacheListener, ICacheListener [{0}]", obj );
147
148 // Record the removal locally, regardless of whether the remote
149 // remove-listener operation succeeds or fails.
150 cacheMap.values().forEach(set -> {
151 log.debug("Before removing [{0}] the listenerSet = {1}", obj, set);
152 set.remove( obj );
153 });
154 cacheWatch.removeCacheListener( obj );
155 }
156 }