View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.io.Serializable;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
34  import org.apache.jcs.auxiliary.lateral.LateralCommand;
35  import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
36  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
37  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
38  import org.apache.jcs.engine.CacheElement;
39  import org.apache.jcs.engine.behavior.ICacheElement;
40  import org.apache.jcs.engine.behavior.ICacheListener;
41  import org.apache.jcs.engine.behavior.ICacheServiceNonLocal;
42  
43  /**
44   * A lateral cache service implementation. Does not implement getGroupKey
45   */
46  public class LateralTCPService<K extends Serializable, V extends Serializable>
47      implements ICacheServiceNonLocal<K, V>, ILateralCacheObserver
48  {
49      /** The logger. */
50      private final static Log log = LogFactory.getLog( LateralTCPService.class );
51  
52      /** special configuration */
53      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
54  
55      /** Sends to another lateral. */
56      private LateralTCPSender sender;
57  
58      /** use the vmid by default */
59      private long listenerId = LateralCacheInfo.listenerId;
60  
61      /**
62       * Constructor for the LateralTCPService object
63       * <p>
64       * @param lca ITCPLateralCacheAttributes
65       * @exception IOException
66       */
67      public LateralTCPService( ITCPLateralCacheAttributes lca )
68          throws IOException
69      {
70          this.setTcpLateralCacheAttributes( lca );
71          try
72          {
73              log.debug( "creating sender, attributes = " + getTcpLateralCacheAttributes() );
74  
75              sender = new LateralTCPSender( lca );
76  
77              if ( log.isInfoEnabled() )
78              {
79                  log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
80              }
81          }
82          catch ( IOException e )
83          {
84              // log.error( "Could not create sender", e );
85              // This gets thrown over and over in recovery mode.
86              // The stack trace isn't useful here.
87              log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
88  
89              throw e;
90          }
91      }
92  
93      /**
94       * @param item
95       * @throws IOException
96       */
97      public void update( ICacheElement<K, V> item )
98          throws IOException
99      {
100         update( item, getListenerId() );
101     }
102 
103     /**
104      * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
105      * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
106      * packet. It describes what operation the receiver should take when it gets the packet.
107      * <p>
108      * @see org.apache.jcs.auxiliary.lateral.behavior.ICacheServiceNonLocal#update(org.apache.jcs.engine.behavior.ICacheElement,
109      *      long)
110      */
111     public void update( ICacheElement<K, V> item, long requesterId )
112         throws IOException
113     {
114         // if we don't allow put, see if we should remove on put
115         if ( !this.getTcpLateralCacheAttributes().isAllowPut() )
116         {
117             // if we can't remove on put, and we can't put then return
118             if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
119             {
120                 return;
121             }
122         }
123 
124         // if we shouldn't remove on put, then put
125         if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
126         {
127             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item );
128             led.requesterId = requesterId;
129             led.command = LateralCommand.UPDATE;
130             sender.send( led );
131         }
132         // else issue a remove with the hashcode for remove check on
133         // on the other end, this will be a server config option
134         else
135         {
136             if ( log.isDebugEnabled() )
137             {
138                 log.debug( "Issuing a remove for a put" );
139             }
140             // set the value to null so we don't send the item
141             CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null );
142             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
143             led.requesterId = requesterId;
144             led.command = LateralCommand.REMOVE;
145             led.valHashCode = item.getVal().hashCode();
146             sender.send( led );
147         }
148     }
149 
150     /**
151      * Uses the default listener id and calls the next remove method.
152      * <p>
153      * @see org.apache.jcs.engine.behavior.ICacheService#remove(java.lang.String,
154      *      java.io.Serializable)
155      */
156     public void remove( String cacheName, K key )
157         throws IOException
158     {
159         remove( cacheName, key, getListenerId() );
160     }
161 
162     /**
163      * Wraps the key in a LateralElementDescriptor.
164      * <p>
165      * @see org.apache.jcs.auxiliary.lateral.behavior.ICacheServiceNonLocal#remove(java.lang.String,
166      *      java.io.Serializable, long)
167      */
168     public void remove( String cacheName, K key, long requesterId )
169         throws IOException
170     {
171         CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
172         LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
173         led.requesterId = requesterId;
174         led.command = LateralCommand.REMOVE;
175         sender.send( led );
176     }
177 
178     /**
179      * Does nothing.
180      * <p>
181      * @throws IOException
182      */
183     public void release()
184         throws IOException
185     {
186         // nothing needs to be done
187     }
188 
189     /**
190      * Will close the connection.
191      * <p>
192      * @param cacheName
193      * @throws IOException
194      */
195     public void dispose( String cacheName )
196         throws IOException
197     {
198         sender.dispose( cacheName );
199     }
200 
201     /**
202      * The service does not get via this method, so this return null.
203      * <p>
204      * @param key
205      * @return always null.
206      * @throws IOException
207      */
208     public Serializable get( String key )
209         throws IOException
210     {
211         if ( log.isDebugEnabled() )
212         {
213             log.debug( "balking at get for key [" + key + "]" );
214         }
215         // p( "junk get" );
216         // return get( cattr.cacheName, key, true );
217         return null;
218         // nothing needs to be done
219     }
220 
221     /**
222      * @param cacheName
223      * @param key
224      * @return ICacheElement<K, V> if found.
225      * @throws IOException
226      */
227     public ICacheElement<K, V> get( String cacheName, K key )
228         throws IOException
229     {
230         return get( cacheName, key, getListenerId() );
231     }
232 
233     /**
234      * If get is allowed, we will issues a get request.
235      * <p>
236      * @param cacheName
237      * @param key
238      * @param requesterId
239      * @return ICacheElement<K, V> if found.
240      * @throws IOException
241      */
242     public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
243         throws IOException
244     {
245         // if get is not allowed return
246         if ( this.getTcpLateralCacheAttributes().isAllowGet() )
247         {
248             CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
249             LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
250             // led.requesterId = requesterId; // later
251             led.command = LateralCommand.GET;
252             @SuppressWarnings("unchecked") // Need to cast from Object
253             ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
254             if ( response != null )
255             {
256                 return response;
257             }
258             return null;
259         }
260         else
261         {
262             // nothing needs to be done
263             return null;
264         }
265     }
266 
267     /**
268      * If allow get is true, we will issue a getmatching query.
269      * <p>
270      * @param cacheName
271      * @param pattern
272      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
273      *         data in cache matching the pattern.
274      * @throws IOException
275      */
276     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
277         throws IOException
278     {
279         return getMatching( cacheName, pattern, getListenerId() );
280     }
281 
282     /**
283      * If allow get is true, we will issue a getmatching query.
284      * <p>
285      * @param cacheName
286      * @param pattern
287      * @param requesterId - our identity
288      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
289      *         data in cache matching the pattern.
290      * @throws IOException
291      */
292     @SuppressWarnings("unchecked") // Need to cast from Object
293     public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
294         throws IOException
295     {
296         // if get is not allowed return
297         if ( this.getTcpLateralCacheAttributes().isAllowGet() )
298         {
299             CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null );
300             LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
301             // led.requesterId = requesterId; // later
302             led.command = LateralCommand.GET_MATCHING;
303 
304             Object response = sender.sendAndReceive( led );
305             if ( response != null )
306             {
307                 return (Map<K, ICacheElement<K, V>>) response;
308             }
309             return Collections.emptyMap();
310         }
311         else
312         {
313             // nothing needs to be done
314             return null;
315         }
316     }
317 
318     /**
319      * Gets multiple items from the cache based on the given set of keys.
320      * <p>
321      * @param cacheName
322      * @param keys
323      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
324      *         data in cache for any of these keys
325      * @throws IOException
326      */
327     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
328         throws IOException
329     {
330         return getMultiple( cacheName, keys, getListenerId() );
331     }
332 
333     /**
334      * This issues a separate get for each item.
335      * <p>
336      * TODO We should change this. It should issue one request.
337      * <p>
338      * @param cacheName
339      * @param keys
340      * @param requesterId
341      * @return a map of K key to ICacheElement<K, V> element, or an empty map if there is no
342      *         data in cache for any of these keys
343      * @throws IOException
344      */
345     public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
346         throws IOException
347     {
348         Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
349 
350         if ( keys != null && !keys.isEmpty() )
351         {
352             for (K key : keys)
353             {
354                 ICacheElement<K, V> element = get( cacheName, key );
355 
356                 if ( element != null )
357                 {
358                     elements.put( key, element );
359                 }
360             }
361         }
362         return elements;
363     }
364 
365     /**
366      * Gets the set of keys of objects currently in the group
367      * <p>
368      * @param cacheName
369      * @param group
370      * @return Set
371      */
372     @SuppressWarnings("unchecked") // Need cast from Object
373     public Set<K> getGroupKeys( String cacheName, String group )
374         throws IOException
375     {
376         CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, group, null);
377         LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
378         // led.requesterId = requesterId; // later
379         led.command = LateralCommand.GET_GROUP_KEYS;
380         Object response = sender.sendAndReceive(led);
381         if (response != null)
382         {
383             return (Set<K>) response;
384         }
385 
386         return null;
387     }
388 
389     /**
390      * Gets the set of groups currently in the cache throws
391      * UnsupportedOperationException
392      * <p>
393      *
394      * @param cacheName
395      * @return Set
396      */
397     @SuppressWarnings("unchecked") // Need cast from Object
398     public Set<String> getGroupNames(String cacheName)
399         throws IOException
400     {
401         CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null);
402         LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
403         // led.requesterId = requesterId; // later
404         led.command = LateralCommand.GET_GROUP_NAMES;
405         Object response = sender.sendAndReceive(led);
406         if (response != null)
407         {
408             return (Set<String>) response;
409         }
410 
411         return null;
412     }
413 
414     /**
415      * @param cacheName
416      * @throws IOException
417      */
418     public void removeAll( String cacheName )
419         throws IOException
420     {
421         removeAll( cacheName, getListenerId() );
422     }
423 
424     /**
425      * @param cacheName
426      * @param requesterId
427      * @throws IOException
428      */
429     public void removeAll( String cacheName, long requesterId )
430         throws IOException
431     {
432         CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null );
433         LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
434         led.requesterId = requesterId;
435         led.command = LateralCommand.REMOVEALL;
436         sender.send( led );
437     }
438 
439     /**
440      * @param args
441      */
442     public static void main( String args[] )
443     {
444         try
445         {
446             LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
447 
448             // process user input till done
449             boolean notDone = true;
450             String message = null;
451             // wait to dispose
452             BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
453 
454             while ( notDone )
455             {
456                 System.out.println( "enter mesage:" );
457                 message = br.readLine();
458 
459                 if (message == null)
460                 {
461                     notDone = false;
462                     continue;
463                 }
464 
465                 CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message );
466                 LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
467                 sender.send( led );
468             }
469         }
470         catch ( IOException e )
471         {
472             System.out.println( e.toString() );
473         }
474     }
475 
476     // ILateralCacheObserver methods, do nothing here since
477     // the connection is not registered, the udp service is
478     // is not registered.
479 
480     /**
481      * @param cacheName
482      * @param obj
483      * @throws IOException
484      */
485     public <KK extends Serializable, VV extends Serializable> void addCacheListener( String cacheName, ICacheListener<KK, VV> obj )
486         throws IOException
487     {
488         // Empty
489     }
490 
491     /**
492      * @param obj
493      * @throws IOException
494      */
495     public <KK extends Serializable, VV extends Serializable> void addCacheListener( ICacheListener<KK, VV> obj )
496         throws IOException
497     {
498         // Empty
499     }
500 
501     /**
502      * @param cacheName
503      * @param obj
504      * @throws IOException
505      */
506     public <KK extends Serializable, VV extends Serializable> void removeCacheListener( String cacheName, ICacheListener<KK, VV> obj )
507         throws IOException
508     {
509         // Empty
510     }
511 
512     /**
513      * @param obj
514      * @throws IOException
515      */
516     public <KK extends Serializable, VV extends Serializable> void removeCacheListener( ICacheListener<KK, VV> obj )
517         throws IOException
518     {
519         // Empty
520     }
521 
522     /**
523      * @param listernId The listernId to set.
524      */
525     protected void setListenerId( long listernId )
526     {
527         this.listenerId = listernId;
528     }
529 
530     /**
531      * @return Returns the listernId.
532      */
533     protected long getListenerId()
534     {
535         return listenerId;
536     }
537 
538     /**
539      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
540      */
541     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
542     {
543         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
544     }
545 
546     /**
547      * @return Returns the tcpLateralCacheAttributes.
548      */
549     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
550     {
551         return tcpLateralCacheAttributes;
552     }
553 
554 
555 }