View Javadoc

1   package org.apache.jcs.auxiliary.lateral.socket.tcp;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.io.Serializable;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
34  import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
35  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
36  import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService;
37  import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
38  import org.apache.jcs.engine.CacheElement;
39  import org.apache.jcs.engine.behavior.ICacheElement;
40  import org.apache.jcs.engine.behavior.ICacheListener;
41  
42  /**
43   * A lateral cache service implementation. Does not implement getGroupKey
44   */
45  public class LateralTCPService
46      implements ILateralCacheService, ILateralCacheObserver
47  {
48      /** The logger. */
49      private final static Log log = LogFactory.getLog( LateralTCPService.class );
50  
51      /** special configuration */
52      private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
53  
54      /** Sends to another lateral. */
55      private LateralTCPSender sender;
56  
57      /** use the vmid by default */
58      private long listenerId = LateralCacheInfo.listenerId;
59  
60      /**
61       * Constructor for the LateralTCPService object
62       * <p>
63       * @param lca ITCPLateralCacheAttributes
64       * @exception IOException
65       */
66      public LateralTCPService( ITCPLateralCacheAttributes lca )
67          throws IOException
68      {
69          this.setTcpLateralCacheAttributes( lca );
70          try
71          {
72              log.debug( "creating sender, attributes = " + getTcpLateralCacheAttributes() );
73  
74              sender = new LateralTCPSender( lca );
75  
76              if ( log.isInfoEnabled() )
77              {
78                  log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
79              }
80          }
81          catch ( IOException e )
82          {
83              // log.error( "Could not create sender", e );
84              // This gets thrown over and over in recovery mode.
85              // The stack trace isn't useful here.
86              log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
87  
88              throw e;
89          }
90      }
91  
92      /**
93       * @param item
94       * @throws IOException
95       */
96      public void update( ICacheElement item )
97          throws IOException
98      {
99          update( item, getListenerId() );
100     }
101 
102     /**
103      * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
104      * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
105      * packet. It describes what operation the receiver should take when it gets the packet.
106      * <p>
107      * @see org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService#update(org.apache.jcs.engine.behavior.ICacheElement,
108      *      long)
109      */
110     public void update( ICacheElement item, long requesterId )
111         throws IOException
112     {
113         // if we don't allow put, see if we should remove on put
114         if ( !this.getTcpLateralCacheAttributes().isAllowPut() )
115         {
116             // if we can't remove on put, and we can't put then return
117             if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
118             {
119                 return;
120             }
121         }
122 
123         // if we shouldn't remove on put, then put
124         if ( !this.getTcpLateralCacheAttributes().isIssueRemoveOnPut() )
125         {
126             LateralElementDescriptor led = new LateralElementDescriptor( item );
127             led.requesterId = requesterId;
128             led.command = LateralElementDescriptor.UPDATE;
129             sender.send( led );
130         }
131         // else issue a remove with the hashcode for remove check on
132         // on the other end, this will be a server config option
133         else
134         {
135             if ( log.isDebugEnabled() )
136             {
137                 log.debug( "Issuing a remove for a put" );
138             }
139             // set the value to null so we don't send the item
140             CacheElement ce = new CacheElement( item.getCacheName(), item.getKey(), null );
141             LateralElementDescriptor led = new LateralElementDescriptor( ce );
142             led.requesterId = requesterId;
143             led.command = LateralElementDescriptor.REMOVE;
144             led.valHashCode = item.getVal().hashCode();
145             sender.send( led );
146         }
147     }
148 
149     /**
150      * Uses the default listener id and calls the next remove method.
151      * <p>
152      * @see org.apache.jcs.engine.behavior.ICacheService#remove(java.lang.String,
153      *      java.io.Serializable)
154      */
155     public void remove( String cacheName, Serializable key )
156         throws IOException
157     {
158         remove( cacheName, key, getListenerId() );
159     }
160 
161     /**
162      * Wraps the key in a LateralElementDescriptor.
163      * <p>
164      * @see org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService#remove(java.lang.String,
165      *      java.io.Serializable, long)
166      */
167     public void remove( String cacheName, Serializable key, long requesterId )
168         throws IOException
169     {
170         CacheElement ce = new CacheElement( cacheName, key, null );
171         LateralElementDescriptor led = new LateralElementDescriptor( ce );
172         led.requesterId = requesterId;
173         led.command = LateralElementDescriptor.REMOVE;
174         sender.send( led );
175     }
176 
177     /**
178      * Does nothing.
179      * <p>
180      * @throws IOException
181      */
182     public void release()
183         throws IOException
184     {
185         // nothing needs to be done
186     }
187 
188     /**
189      * Will close the connection.
190      * <p>
191      * @param cacheName
192      * @throws IOException
193      */
194     public void dispose( String cacheName )
195         throws IOException
196     {
197         sender.dispose( cacheName );
198     }
199 
200     /**
201      * The service does not get via this method, so this return null.
202      * <p>
203      * @param key
204      * @return always null.
205      * @throws IOException
206      */
207     public Serializable get( String key )
208         throws IOException
209     {
210         if ( log.isDebugEnabled() )
211         {
212             log.debug( "balking at get for key [" + key + "]" );
213         }
214         // p( "junk get" );
215         // return get( cattr.cacheName, key, true );
216         return null;
217         // nothing needs to be done
218     }
219 
220     /**
221      * @param cacheName
222      * @param key
223      * @return ICacheElement if found.
224      * @throws IOException
225      */
226     public ICacheElement get( String cacheName, Serializable key )
227         throws IOException
228     {
229         return get( cacheName, key, getListenerId() );
230     }
231 
232     /**
233      * If get is allowed, we will issues a get request.
234      * <p>
235      * @param cacheName
236      * @param key
237      * @param requesterId
238      * @return ICacheElement if found.
239      * @throws IOException
240      */
241     public ICacheElement get( String cacheName, Serializable key, long requesterId )
242         throws IOException
243     {
244         // if get is not allowed return
245         if ( this.getTcpLateralCacheAttributes().isAllowGet() )
246         {
247             CacheElement ce = new CacheElement( cacheName, key, null );
248             LateralElementDescriptor led = new LateralElementDescriptor( ce );
249             // led.requesterId = requesterId; // later
250             led.command = LateralElementDescriptor.GET;
251             Object response = sender.sendAndReceive( led );
252             if ( response != null )
253             {
254                 return (ICacheElement) response;
255             }
256             return null;
257         }
258         else
259         {
260             // nothing needs to be done
261             return null;
262         }
263     }
264 
265     /**
266      * If allow get is true, we will issue a getmatching query.
267      * <p>
268      * @param cacheName
269      * @param pattern
270      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
271      *         data in cache matching the pattern.
272      * @throws IOException
273      */
274     public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern )
275         throws IOException
276     {
277         return getMatching( cacheName, pattern, getListenerId() );
278     }
279 
280     /**
281      * If allow get is true, we will issue a getmatching query.
282      * <p>
283      * @param cacheName
284      * @param pattern
285      * @param requesterId - our identity
286      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
287      *         data in cache matching the pattern.
288      * @throws IOException
289      */
290     @SuppressWarnings("unchecked")
291     public Map<Serializable, ICacheElement> getMatching( String cacheName, String pattern, long requesterId )
292         throws IOException
293     {
294         // if get is not allowed return
295         if ( this.getTcpLateralCacheAttributes().isAllowGet() )
296         {
297             CacheElement ce = new CacheElement( cacheName, pattern, null );
298             LateralElementDescriptor led = new LateralElementDescriptor( ce );
299             // led.requesterId = requesterId; // later
300             led.command = LateralElementDescriptor.GET_MATCHING;
301 
302             Object response = sender.sendAndReceive( led );
303             if ( response != null )
304             {
305                 return (Map<Serializable, ICacheElement>) response;
306             }
307             return Collections.emptyMap();
308         }
309         else
310         {
311             // nothing needs to be done
312             return null;
313         }
314     }
315 
316     /**
317      * Gets multiple items from the cache based on the given set of keys.
318      * <p>
319      * @param cacheName
320      * @param keys
321      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
322      *         data in cache for any of these keys
323      * @throws IOException
324      */
325     public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys )
326         throws IOException
327     {
328         return getMultiple( cacheName, keys, getListenerId() );
329     }
330 
331     /**
332      * This issues a separate get for each item.
333      * <p>
334      * TODO We should change this. It should issue one request.
335      * <p>
336      * @param cacheName
337      * @param keys
338      * @param requesterId
339      * @return a map of Serializable key to ICacheElement element, or an empty map if there is no
340      *         data in cache for any of these keys
341      * @throws IOException
342      */
343     public Map<Serializable, ICacheElement> getMultiple( String cacheName, Set<Serializable> keys, long requesterId )
344         throws IOException
345     {
346         Map<Serializable, ICacheElement> elements = new HashMap<Serializable, ICacheElement>();
347 
348         if ( keys != null && !keys.isEmpty() )
349         {
350             for (Serializable key : keys)
351             {
352                 ICacheElement element = get( cacheName, key );
353 
354                 if ( element != null )
355                 {
356                     elements.put( key, element );
357                 }
358             }
359         }
360         return elements;
361     }
362 
363     /**
364      * Gets the set of keys of objects currently in the group throws UnsupportedOperationException
365      * <p>
366      * @param cacheName
367      * @param group
368      * @return Set
369      */
370     public Set<Serializable> getGroupKeys( String cacheName, String group )
371     {
372         throw new UnsupportedOperationException( "Groups not implemented." );
373         // return null;
374     }
375 
376     /**
377      * @param cacheName
378      * @throws IOException
379      */
380     public void removeAll( String cacheName )
381         throws IOException
382     {
383         removeAll( cacheName, getListenerId() );
384     }
385 
386     /**
387      * @param cacheName
388      * @param requesterId
389      * @throws IOException
390      */
391     public void removeAll( String cacheName, long requesterId )
392         throws IOException
393     {
394         CacheElement ce = new CacheElement( cacheName, "ALL", null );
395         LateralElementDescriptor led = new LateralElementDescriptor( ce );
396         led.requesterId = requesterId;
397         led.command = LateralElementDescriptor.REMOVEALL;
398         sender.send( led );
399     }
400 
401     /**
402      * @param args
403      */
404     public static void main( String args[] )
405     {
406         try
407         {
408             LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
409 
410             // process user input till done
411             boolean notDone = true;
412             String message = null;
413             // wait to dispose
414             BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
415 
416             while ( notDone )
417             {
418                 System.out.println( "enter mesage:" );
419                 message = br.readLine();
420 
421                 if (message == null)
422                 {
423                     notDone = false;
424                     continue;
425                 }
426 
427                 CacheElement ce = new CacheElement( "test", "test", message );
428                 LateralElementDescriptor led = new LateralElementDescriptor( ce );
429                 sender.send( led );
430             }
431         }
432         catch ( IOException e )
433         {
434             System.out.println( e.toString() );
435         }
436     }
437 
438     // ILateralCacheObserver methods, do nothing here since
439     // the connection is not registered, the udp service is
440     // is not registered.
441 
442     /**
443      * @param cacheName
444      * @param obj
445      * @throws IOException
446      */
447     public void addCacheListener( String cacheName, ICacheListener obj )
448         throws IOException
449     {
450         // Empty
451     }
452 
453     /**
454      * @param obj
455      * @throws IOException
456      */
457     public void addCacheListener( ICacheListener obj )
458         throws IOException
459     {
460         // Empty
461     }
462 
463     /**
464      * @param cacheName
465      * @param obj
466      * @throws IOException
467      */
468     public void removeCacheListener( String cacheName, ICacheListener obj )
469         throws IOException
470     {
471         // Empty
472     }
473 
474     /**
475      * @param obj
476      * @throws IOException
477      */
478     public void removeCacheListener( ICacheListener obj )
479         throws IOException
480     {
481         // Empty
482     }
483 
484     /**
485      * @param listernId The listernId to set.
486      */
487     protected void setListenerId( long listernId )
488     {
489         this.listenerId = listernId;
490     }
491 
492     /**
493      * @return Returns the listernId.
494      */
495     protected long getListenerId()
496     {
497         return listenerId;
498     }
499 
500     /**
501      * @param tcpLateralCacheAttributes The tcpLateralCacheAttributes to set.
502      */
503     public void setTcpLateralCacheAttributes( ITCPLateralCacheAttributes tcpLateralCacheAttributes )
504     {
505         this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
506     }
507 
508     /**
509      * @return Returns the tcpLateralCacheAttributes.
510      */
511     public ITCPLateralCacheAttributes getTcpLateralCacheAttributes()
512     {
513         return tcpLateralCacheAttributes;
514     }
515 }