View Javadoc
1   package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Set;
29  
30  import org.apache.commons.jcs.auxiliary.lateral.LateralCommand;
31  import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
32  import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
33  import org.apache.commons.jcs.engine.CacheElement;
34  import org.apache.commons.jcs.engine.CacheInfo;
35  import org.apache.commons.jcs.engine.behavior.ICacheElement;
36  import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  
40  /**
41   * A lateral cache service implementation. Does not implement getGroupKey
42   * TODO: Remove generics
43   */
44  public class LateralTCPService<K, V>
45      implements ICacheServiceNonLocal<K, V>
46  {
47      /** The logger. */
48      private static final Log log = LogFactory.getLog( LateralTCPService.class );
49  
50      /** special configuration */
51      private boolean allowPut;
52      private boolean allowGet;
53      private boolean issueRemoveOnPut;
54  
55      /** Sends to another lateral. */
56      private LateralTCPSender sender;
57  
58      /** use the vmid by default */
59      private long listenerId = CacheInfo.listenerId;
60  
61      /**
62       * Constructor for the LateralTCPService object
63       * <p>
64       * @param lca ITCPLateralCacheAttributes
65       * @throws IOException
66       */
67      public LateralTCPService( ITCPLateralCacheAttributes lca )
68          throws IOException
69      {
70          this.allowGet = lca.isAllowGet();
71          this.allowPut = lca.isAllowPut();
72          this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
73  
74          try
75          {
76              sender = new LateralTCPSender( lca );
77  
78              if ( log.isInfoEnabled() )
79              {
80                  log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
81              }
82          }
83          catch ( IOException e )
84          {
85              // log.error( "Could not create sender", e );
86              // This gets thrown over and over in recovery mode.
87              // The stack trace isn't useful here.
88              log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
89  
90              throw e;
91          }
92      }
93  
94      /**
95       * @param item
96       * @throws IOException
97       */
98      @Override
99      public void update( ICacheElement<K, V> item )
100         throws IOException
101     {
102         update( item, getListenerId() );
103     }
104 
105     /**
106      * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
107      * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
108      * packet. It describes what operation the receiver should take when it gets the packet.
109      * <p>
110      * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs.engine.behavior.ICacheElement,
111      *      long)
112      */
113     @Override
114     public void update( ICacheElement<K, V> item, long requesterId )
115         throws IOException
116     {
117         // if we don't allow put, see if we should remove on put
118         if ( !this.allowPut &&
119             // if we can't remove on put, and we can't put then return
120             !this.issueRemoveOnPut )
121         {
122             return;
123         }
124 
125         // if we shouldn't remove on put, then put
126         if ( !this.issueRemoveOnPut )
127         {
128             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item );
129             led.requesterId = requesterId;
130             led.command = LateralCommand.UPDATE;
131             sender.send( led );
132         }
133         // else issue a remove with the hashcode for remove check on
134         // on the other end, this will be a server config option
135         else
136         {
137             if ( log.isDebugEnabled() )
138             {
139                 log.debug( "Issuing a remove for a put" );
140             }
141             // set the value to null so we don't send the item
142             CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null );
143             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
144             led.requesterId = requesterId;
145             led.command = LateralCommand.REMOVE;
146             led.valHashCode = item.getVal().hashCode();
147             sender.send( led );
148         }
149     }
150 
151     /**
152      * Uses the default listener id and calls the next remove method.
153      * <p>
154      * @see org.apache.commons.jcs.engine.behavior.ICacheService#remove(String, Object)
155      */
156     @Override
157     public void remove( String cacheName, K key )
158         throws IOException
159     {
160         remove( cacheName, key, getListenerId() );
161     }
162 
163     /**
164      * Wraps the key in a LateralElementDescriptor.
165      * <p>
166      * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long)
167      */
168     @Override
169     public void remove( String cacheName, K key, long requesterId )
170         throws IOException
171     {
172         CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
173         LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
174         led.requesterId = requesterId;
175         led.command = LateralCommand.REMOVE;
176         sender.send( led );
177     }
178 
179     /**
180      * Does nothing.
181      * <p>
182      * @throws IOException
183      */
184     @Override
185     public void release()
186         throws IOException
187     {
188         // nothing needs to be done
189     }
190 
191     /**
192      * Will close the connection.
193      * <p>
194      * @param cacheName
195      * @throws IOException
196      */
197     @Override
198     public void dispose( String cacheName )
199         throws IOException
200     {
201         sender.dispose();
202     }
203 
204     /**
205      * @param cacheName
206      * @param key
207      * @return ICacheElement&lt;K, V&gt; if found.
208      * @throws IOException
209      */
210     @Override
211     public ICacheElement<K, V> get( String cacheName, K key )
212         throws IOException
213     {
214         return get( cacheName, key, getListenerId() );
215     }
216 
217     /**
218      * If get is allowed, we will issues a get request.
219      * <p>
220      * @param cacheName
221      * @param key
222      * @param requesterId
223      * @return ICacheElement&lt;K, V&gt; if found.
224      * @throws IOException
225      */
226     @Override
227     public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
228         throws IOException
229     {
230         // if get is not allowed return
231         if ( this.allowGet )
232         {
233             CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
234             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
235             // led.requesterId = requesterId; // later
236             led.command = LateralCommand.GET;
237             @SuppressWarnings("unchecked") // Need to cast from Object
238             ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
239             if ( response != null )
240             {
241                 return response;
242             }
243             return null;
244         }
245         else
246         {
247             // nothing needs to be done
248             return null;
249         }
250     }
251 
252     /**
253      * If allow get is true, we will issue a getmatching query.
254      * <p>
255      * @param cacheName
256      * @param pattern
257      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
258      *         data in cache matching the pattern.
259      * @throws IOException
260      */
261     @Override
262     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
263         throws IOException
264     {
265         return getMatching( cacheName, pattern, getListenerId() );
266     }
267 
268     /**
269      * If allow get is true, we will issue a getmatching query.
270      * <p>
271      * @param cacheName
272      * @param pattern
273      * @param requesterId - our identity
274      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
275      *         data in cache matching the pattern.
276      * @throws IOException
277      */
278     @Override
279     @SuppressWarnings("unchecked") // Need to cast from Object
280     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
281         throws IOException
282     {
283         // if get is not allowed return
284         if ( this.allowGet )
285         {
286             CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null );
287             LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
288             // led.requesterId = requesterId; // later
289             led.command = LateralCommand.GET_MATCHING;
290 
291             Object response = sender.sendAndReceive( led );
292             if ( response != null )
293             {
294                 return (Map<K, ICacheElement<K, V>>) response;
295             }
296             return Collections.emptyMap();
297         }
298         else
299         {
300             // nothing needs to be done
301             return null;
302         }
303     }
304 
305     /**
306      * Gets multiple items from the cache based on the given set of keys.
307      * <p>
308      * @param cacheName
309      * @param keys
310      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
311      *         data in cache for any of these keys
312      * @throws IOException
313      */
314     @Override
315     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
316         throws IOException
317     {
318         return getMultiple( cacheName, keys, getListenerId() );
319     }
320 
321     /**
322      * This issues a separate get for each item.
323      * <p>
324      * TODO We should change this. It should issue one request.
325      * <p>
326      * @param cacheName
327      * @param keys
328      * @param requesterId
329      * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
330      *         data in cache for any of these keys
331      * @throws IOException
332      */
333     @Override
334     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
335         throws IOException
336     {
337         Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
338 
339         if ( keys != null && !keys.isEmpty() )
340         {
341             for (K key : keys)
342             {
343                 ICacheElement<K, V> element = get( cacheName, key );
344 
345                 if ( element != null )
346                 {
347                     elements.put( key, element );
348                 }
349             }
350         }
351         return elements;
352     }
353 
354     /**
355      * Return the keys in this cache.
356      * <p>
357      * @param cacheName the name of the cache region
358      * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
359      */
360     @Override
361     @SuppressWarnings("unchecked") // Need cast from Object
362     public Set<K> getKeySet(String cacheName) throws IOException
363     {
364         CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null);
365         LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
366         // led.requesterId = requesterId; // later
367         led.command = LateralCommand.GET_KEYSET;
368         Object response = sender.sendAndReceive(led);
369         if (response != null)
370         {
371             return (Set<K>) response;
372         }
373 
374         return null;
375     }
376 
377     /**
378      * @param cacheName
379      * @throws IOException
380      */
381     @Override
382     public void removeAll( String cacheName )
383         throws IOException
384     {
385         removeAll( cacheName, getListenerId() );
386     }
387 
388     /**
389      * @param cacheName
390      * @param requesterId
391      * @throws IOException
392      */
393     @Override
394     public void removeAll( String cacheName, long requesterId )
395         throws IOException
396     {
397         CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null );
398         LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
399         led.requesterId = requesterId;
400         led.command = LateralCommand.REMOVEALL;
401         sender.send( led );
402     }
403 
404     /**
405      * @param args
406      */
407     public static void main( String args[] )
408     {
409         try
410         {
411             LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
412 
413             // process user input till done
414             boolean notDone = true;
415             String message = null;
416             // wait to dispose
417             BufferedReader br = new BufferedReader( new InputStreamReader( System.in, "UTF-8" ) );
418 
419             while ( notDone )
420             {
421                 System.out.println( "enter message:" );
422                 message = br.readLine();
423 
424                 if (message == null)
425                 {
426                     notDone = false;
427                     continue;
428                 }
429 
430                 CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message );
431                 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
432                 sender.send( led );
433             }
434         }
435         catch ( IOException e )
436         {
437             System.out.println( e.toString() );
438         }
439     }
440 
441     /**
442      * @param listernId The listernId to set.
443      */
444     protected void setListenerId( long listernId )
445     {
446         this.listenerId = listernId;
447     }
448 
449     /**
450      * @return Returns the listernId.
451      */
452     protected long getListenerId()
453     {
454         return listenerId;
455     }
456 }