View Javadoc
1   package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.nio.charset.StandardCharsets;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.jcs3.auxiliary.lateral.LateralCommand;
32  import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
33  import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
34  import org.apache.commons.jcs3.engine.CacheElement;
35  import org.apache.commons.jcs3.engine.CacheInfo;
36  import org.apache.commons.jcs3.engine.behavior.ICacheElement;
37  import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
38  import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
39  import org.apache.commons.jcs3.log.Log;
40  import org.apache.commons.jcs3.log.LogManager;
41  import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
42  
43  /**
44   * A lateral cache service implementation. Does not implement getGroupKey
45   * TODO: Remove generics
46   */
47  public class LateralTCPService<K, V>
48      implements ICacheServiceNonLocal<K, V>
49  {
50      /** The logger. */
51      private static final Log log = LogManager.getLog( LateralTCPService.class );
52  
53      /** special configuration */
54      private final boolean allowPut;
55      private final boolean allowGet;
56      private final boolean issueRemoveOnPut;
57  
58      /** Sends to another lateral. */
59      private final LateralTCPSender sender;
60  
61      /** use the vmid by default */
62      private long listenerId = CacheInfo.listenerId;
63  
64      /**
65       * Constructor for the LateralTCPService object
66       * <p>
67       * @param lca ITCPLateralCacheAttributes the configuration object
68       * @throws IOException
69       *
70       * @deprecated Specify serializer
71       */
72      @Deprecated
73      public LateralTCPService( final ITCPLateralCacheAttributes lca )
74          throws IOException
75      {
76          this(lca, new StandardSerializer());
77      }
78  
79      /**
80       * Constructor for the LateralTCPService object
81       * <p>
82       * @param lca ITCPLateralCacheAttributes the configuration object
83       * @param serializer the serializer to use when sending
84       * @throws IOException
85       * @since 3.1
86       */
87      public LateralTCPService( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
88          throws IOException
89      {
90          this.allowGet = lca.isAllowGet();
91          this.allowPut = lca.isAllowPut();
92          this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
93  
94          try
95          {
96              sender = new LateralTCPSender( lca, serializer );
97  
98              log.debug( "Created sender to [{0}]", lca::getTcpServer);
99          }
100         catch ( final IOException e )
101         {
102             // log.error( "Could not create sender", e );
103             // This gets thrown over and over in recovery mode.
104             // The stack trace isn't useful here.
105             log.error( "Could not create sender to [{0}] -- {1}", lca::getTcpServer, e::getMessage);
106             throw e;
107         }
108     }
109 
110     /**
111      * @param item
112      * @throws IOException
113      */
114     @Override
115     public void update( final ICacheElement<K, V> item )
116         throws IOException
117     {
118         update( item, getListenerId() );
119     }
120 
121     /**
122      * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
123      * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
124      * packet. It describes what operation the receiver should take when it gets the packet.
125      * <p>
126      * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs3.engine.behavior.ICacheElement,
127      *      long)
128      */
129     @Override
130     public void update( final ICacheElement<K, V> item, final long requesterId )
131         throws IOException
132     {
133         // if we don't allow put, see if we should remove on put
134         if ( !this.allowPut &&
135             // if we can't remove on put, and we can't put then return
136             !this.issueRemoveOnPut )
137         {
138             return;
139         }
140 
141         // if we shouldn't remove on put, then put
142         if ( !this.issueRemoveOnPut )
143         {
144             final LateralElementDescriptor<K, V> led =
145                     new LateralElementDescriptor<>(item, LateralCommand.UPDATE, requesterId);
146             sender.send( led );
147         }
148         // else issue a remove with the hash code for remove check on
149         // on the other end, this will be a server config option
150         else
151         {
152             log.debug( "Issuing a remove for a put" );
153 
154             // set the value to null so we don't send the item
155             final CacheElement<K, V> ce = new CacheElement<>( item.getCacheName(), item.getKey(), null );
156             final LateralElementDescriptor<K, V> led =
157                     new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
158             led.valHashCode = item.getVal().hashCode();
159             sender.send( led );
160         }
161     }
162 
163     /**
164      * Uses the default listener id and calls the next remove method.
165      * <p>
166      * @see org.apache.commons.jcs3.engine.behavior.ICacheService#remove(String, Object)
167      */
168     @Override
169     public void remove( final String cacheName, final K key )
170         throws IOException
171     {
172         remove( cacheName, key, getListenerId() );
173     }
174 
175     /**
176      * Wraps the key in a LateralElementDescriptor.
177      * <p>
178      * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long)
179      */
180     @Override
181     public void remove( final String cacheName, final K key, final long requesterId )
182         throws IOException
183     {
184         final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
185         final LateralElementDescriptor<K, V> led =
186                 new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
187         sender.send( led );
188     }
189 
190     /**
191      * Does nothing.
192      * <p>
193      * @throws IOException
194      */
195     @Override
196     public void release()
197         throws IOException
198     {
199         // nothing needs to be done
200     }
201 
202     /**
203      * Will close the connection.
204      * <p>
205      * @param cacheName
206      * @throws IOException
207      */
208     @Override
209     public void dispose( final String cacheName )
210         throws IOException
211     {
212         sender.dispose();
213     }
214 
215     /**
216      * @param cacheName
217      * @param key
218      * @return ICacheElement&lt;K, V&gt; if found.
219      * @throws IOException
220      */
221     @Override
222     public ICacheElement<K, V> get( final String cacheName, final K key )
223         throws IOException
224     {
225         return get( cacheName, key, getListenerId() );
226     }
227 
228     /**
229      * If get is allowed, we will issues a get request.
230      * <p>
231      * @param cacheName
232      * @param key
233      * @param requesterId
234      * @return ICacheElement&lt;K, V&gt; if found.
235      * @throws IOException
236      */
237     @Override
238     public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
239         throws IOException
240     {
241         // if get is not allowed return
242         if ( this.allowGet )
243         {
244             final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
245             final LateralElementDescriptor<K, V> led =
246                     new LateralElementDescriptor<>(ce, LateralCommand.GET);
247             // led.requesterId = requesterId; // later
248             @SuppressWarnings("unchecked") // Need to cast from Object
249             final
250             ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
251             return response;
252         }
253         // nothing needs to be done
254         return null;
255     }
256 
257     /**
258      * If allow get is true, we will issue a getmatching query.
259      * <p>
260      * @param cacheName
261      * @param pattern
262      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
263      *         data in cache matching the pattern.
264      * @throws IOException
265      */
266     @Override
267     public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern )
268         throws IOException
269     {
270         return getMatching( cacheName, pattern, getListenerId() );
271     }
272 
273     /**
274      * If allow get is true, we will issue a getmatching query.
275      * <p>
276      * @param cacheName
277      * @param pattern
278      * @param requesterId - our identity
279      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
280      *         data in cache matching the pattern.
281      * @throws IOException
282      */
283     @Override
284     @SuppressWarnings("unchecked") // Need to cast from Object
285     public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
286         throws IOException
287     {
288         // if get is not allowed return
289         if ( !this.allowGet ) {
290             // nothing needs to be done
291             return null;
292         }
293         final CacheElement<String, String> ce = new CacheElement<>( cacheName, pattern, null );
294         final LateralElementDescriptor<String, String> led =
295                 new LateralElementDescriptor<>(ce, LateralCommand.GET_MATCHING);
296         // led.requesterId = requesterId; // later
297 
298         final Object response = sender.sendAndReceive( led );
299         if ( response != null )
300         {
301             return (Map<K, ICacheElement<K, V>>) response;
302         }
303         return Collections.emptyMap();
304     }
305 
306     /**
307      * Gets multiple items from the cache based on the given set of keys.
308      * <p>
309      * @param cacheName
310      * @param keys
311      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
312      *         data in cache for any of these keys
313      * @throws IOException
314      */
315     @Override
316     public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys )
317         throws IOException
318     {
319         return getMultiple( cacheName, keys, getListenerId() );
320     }
321 
322     /**
323      * This issues a separate get for each item.
324      * <p>
325      * TODO We should change this. It should issue one request.
326      * <p>
327      * @param cacheName
328      * @param keys
329      * @param requesterId
330      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
331      *         data in cache for any of these keys
332      * @throws IOException
333      */
334     @Override
335     public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
336         throws IOException
337     {
338         final Map<K, ICacheElement<K, V>> elements = new HashMap<>();
339 
340         if ( keys != null && !keys.isEmpty() )
341         {
342             for (final K key : keys)
343             {
344                 final ICacheElement<K, V> element = get( cacheName, key, requesterId );
345 
346                 if ( element != null )
347                 {
348                     elements.put( key, element );
349                 }
350             }
351         }
352         return elements;
353     }
354 
355     /**
356      * Return the keys in this cache.
357      * <p>
358      * @param cacheName the name of the cache region
359      * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
360      */
361     @Override
362     @SuppressWarnings("unchecked") // Need cast from Object
363     public Set<K> getKeySet(final String cacheName) throws IOException
364     {
365         final CacheElement<String, String> ce = new CacheElement<>(cacheName, null, null);
366         final LateralElementDescriptor<String, String> led =
367                 new LateralElementDescriptor<>(ce, LateralCommand.GET_KEYSET);
368         // led.requesterId = requesterId; // later
369         final Object response = sender.sendAndReceive(led);
370         if (response != null)
371         {
372             return (Set<K>) response;
373         }
374 
375         return null;
376     }
377 
378     /**
379      * @param cacheName
380      * @throws IOException
381      */
382     @Override
383     public void removeAll( final String cacheName )
384         throws IOException
385     {
386         removeAll( cacheName, getListenerId() );
387     }
388 
389     /**
390      * @param cacheName
391      * @param requesterId
392      * @throws IOException
393      */
394     @Override
395     public void removeAll( final String cacheName, final long requesterId )
396         throws IOException
397     {
398         final CacheElement<String, String> ce = new CacheElement<>( cacheName, "ALL", null );
399         final LateralElementDescriptor<String, String> led =
400                 new LateralElementDescriptor<>(ce, LateralCommand.REMOVEALL, requesterId);
401         sender.send( led );
402     }
403 
404     /**
405      * Test
406      * @param args
407      *
408      * @deprecated Use unit tests
409      */
410     @Deprecated
411     public static void main( final String args[] )
412     {
413         try
414         {
415             final LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
416 
417             // process user input till done
418             boolean notDone = true;
419             String message = null;
420             // wait to dispose
421             final BufferedReader br = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
422 
423             while ( notDone )
424             {
425                 System.out.println( "enter message:" );
426                 message = br.readLine();
427 
428                 if (message == null)
429                 {
430                     notDone = false;
431                     continue;
432                 }
433 
434                 final CacheElement<String, String> ce = new CacheElement<>( "test", "test", message );
435                 final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
436                 sender.send( led );
437             }
438         }
439         catch ( final IOException e )
440         {
441             System.out.println( e.toString() );
442         }
443     }
444 
445     /**
446      * @param listernId The listernId to set.
447      */
448     protected void setListenerId( final long listernId )
449     {
450         this.listenerId = listernId;
451     }
452 
453     /**
454      * @return Returns the listernId.
455      */
456     protected long getListenerId()
457     {
458         return listenerId;
459     }
460 }