001package org.apache.commons.jcs.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.BufferedReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Set;
029
030import org.apache.commons.jcs.auxiliary.lateral.LateralCommand;
031import org.apache.commons.jcs.auxiliary.lateral.LateralElementDescriptor;
032import org.apache.commons.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
033import org.apache.commons.jcs.engine.CacheElement;
034import org.apache.commons.jcs.engine.CacheInfo;
035import org.apache.commons.jcs.engine.behavior.ICacheElement;
036import org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039
040/**
041 * A lateral cache service implementation. Does not implement getGroupKey
042 * TODO: Remove generics
043 */
044public class LateralTCPService<K, V>
045    implements ICacheServiceNonLocal<K, V>
046{
047    /** The logger. */
048    private static final Log log = LogFactory.getLog( LateralTCPService.class );
049
050    /** special configuration */
051    private boolean allowPut;
052    private boolean allowGet;
053    private boolean issueRemoveOnPut;
054
055    /** Sends to another lateral. */
056    private LateralTCPSender sender;
057
058    /** use the vmid by default */
059    private long listenerId = CacheInfo.listenerId;
060
061    /**
062     * Constructor for the LateralTCPService object
063     * <p>
064     * @param lca ITCPLateralCacheAttributes
065     * @throws IOException
066     */
067    public LateralTCPService( ITCPLateralCacheAttributes lca )
068        throws IOException
069    {
070        this.allowGet = lca.isAllowGet();
071        this.allowPut = lca.isAllowPut();
072        this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
073
074        try
075        {
076            sender = new LateralTCPSender( lca );
077
078            if ( log.isInfoEnabled() )
079            {
080                log.debug( "Created sender to [" + lca.getTcpServer() + "]" );
081            }
082        }
083        catch ( IOException e )
084        {
085            // log.error( "Could not create sender", e );
086            // This gets thrown over and over in recovery mode.
087            // The stack trace isn't useful here.
088            log.error( "Could not create sender to [" + lca.getTcpServer() + "] -- " + e.getMessage() );
089
090            throw e;
091        }
092    }
093
094    /**
095     * @param item
096     * @throws IOException
097     */
098    @Override
099    public void update( ICacheElement<K, V> item )
100        throws IOException
101    {
102        update( item, getListenerId() );
103    }
104
105    /**
106     * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
107     * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
108     * packet. It describes what operation the receiver should take when it gets the packet.
109     * <p>
110     * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs.engine.behavior.ICacheElement,
111     *      long)
112     */
113    @Override
114    public void update( ICacheElement<K, V> item, long requesterId )
115        throws IOException
116    {
117        // if we don't allow put, see if we should remove on put
118        if ( !this.allowPut &&
119            // if we can't remove on put, and we can't put then return
120            !this.issueRemoveOnPut )
121        {
122            return;
123        }
124
125        // if we shouldn't remove on put, then put
126        if ( !this.issueRemoveOnPut )
127        {
128            LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( item );
129            led.requesterId = requesterId;
130            led.command = LateralCommand.UPDATE;
131            sender.send( led );
132        }
133        // else issue a remove with the hashcode for remove check on
134        // on the other end, this will be a server config option
135        else
136        {
137            if ( log.isDebugEnabled() )
138            {
139                log.debug( "Issuing a remove for a put" );
140            }
141            // set the value to null so we don't send the item
142            CacheElement<K, V> ce = new CacheElement<K, V>( item.getCacheName(), item.getKey(), null );
143            LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
144            led.requesterId = requesterId;
145            led.command = LateralCommand.REMOVE;
146            led.valHashCode = item.getVal().hashCode();
147            sender.send( led );
148        }
149    }
150
151    /**
152     * Uses the default listener id and calls the next remove method.
153     * <p>
154     * @see org.apache.commons.jcs.engine.behavior.ICacheService#remove(String, Object)
155     */
156    @Override
157    public void remove( String cacheName, K key )
158        throws IOException
159    {
160        remove( cacheName, key, getListenerId() );
161    }
162
163    /**
164     * Wraps the key in a LateralElementDescriptor.
165     * <p>
166     * @see org.apache.commons.jcs.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long)
167     */
168    @Override
169    public void remove( String cacheName, K key, long requesterId )
170        throws IOException
171    {
172        CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
173        LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
174        led.requesterId = requesterId;
175        led.command = LateralCommand.REMOVE;
176        sender.send( led );
177    }
178
179    /**
180     * Does nothing.
181     * <p>
182     * @throws IOException
183     */
184    @Override
185    public void release()
186        throws IOException
187    {
188        // nothing needs to be done
189    }
190
191    /**
192     * Will close the connection.
193     * <p>
194     * @param cacheName
195     * @throws IOException
196     */
197    @Override
198    public void dispose( String cacheName )
199        throws IOException
200    {
201        sender.dispose();
202    }
203
204    /**
205     * @param cacheName
206     * @param key
207     * @return ICacheElement&lt;K, V&gt; if found.
208     * @throws IOException
209     */
210    @Override
211    public ICacheElement<K, V> get( String cacheName, K key )
212        throws IOException
213    {
214        return get( cacheName, key, getListenerId() );
215    }
216
217    /**
218     * If get is allowed, we will issues a get request.
219     * <p>
220     * @param cacheName
221     * @param key
222     * @param requesterId
223     * @return ICacheElement&lt;K, V&gt; if found.
224     * @throws IOException
225     */
226    @Override
227    public ICacheElement<K, V> get( String cacheName, K key, long requesterId )
228        throws IOException
229    {
230        // if get is not allowed return
231        if ( this.allowGet )
232        {
233            CacheElement<K, V> ce = new CacheElement<K, V>( cacheName, key, null );
234            LateralElementDescriptor<K, V> led = new LateralElementDescriptor<K, V>( ce );
235            // led.requesterId = requesterId; // later
236            led.command = LateralCommand.GET;
237            @SuppressWarnings("unchecked") // Need to cast from Object
238            ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
239            if ( response != null )
240            {
241                return response;
242            }
243            return null;
244        }
245        else
246        {
247            // nothing needs to be done
248            return null;
249        }
250    }
251
252    /**
253     * If allow get is true, we will issue a getmatching query.
254     * <p>
255     * @param cacheName
256     * @param pattern
257     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
258     *         data in cache matching the pattern.
259     * @throws IOException
260     */
261    @Override
262    public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern )
263        throws IOException
264    {
265        return getMatching( cacheName, pattern, getListenerId() );
266    }
267
268    /**
269     * If allow get is true, we will issue a getmatching query.
270     * <p>
271     * @param cacheName
272     * @param pattern
273     * @param requesterId - our identity
274     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
275     *         data in cache matching the pattern.
276     * @throws IOException
277     */
278    @Override
279    @SuppressWarnings("unchecked") // Need to cast from Object
280    public Map<K, ICacheElement<K, V>> getMatching( String cacheName, String pattern, long requesterId )
281        throws IOException
282    {
283        // if get is not allowed return
284        if ( this.allowGet )
285        {
286            CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, pattern, null );
287            LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
288            // led.requesterId = requesterId; // later
289            led.command = LateralCommand.GET_MATCHING;
290
291            Object response = sender.sendAndReceive( led );
292            if ( response != null )
293            {
294                return (Map<K, ICacheElement<K, V>>) response;
295            }
296            return Collections.emptyMap();
297        }
298        else
299        {
300            // nothing needs to be done
301            return null;
302        }
303    }
304
305    /**
306     * Gets multiple items from the cache based on the given set of keys.
307     * <p>
308     * @param cacheName
309     * @param keys
310     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
311     *         data in cache for any of these keys
312     * @throws IOException
313     */
314    @Override
315    public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys )
316        throws IOException
317    {
318        return getMultiple( cacheName, keys, getListenerId() );
319    }
320
321    /**
322     * This issues a separate get for each item.
323     * <p>
324     * TODO We should change this. It should issue one request.
325     * <p>
326     * @param cacheName
327     * @param keys
328     * @param requesterId
329     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
330     *         data in cache for any of these keys
331     * @throws IOException
332     */
333    @Override
334    public Map<K, ICacheElement<K, V>> getMultiple( String cacheName, Set<K> keys, long requesterId )
335        throws IOException
336    {
337        Map<K, ICacheElement<K, V>> elements = new HashMap<K, ICacheElement<K, V>>();
338
339        if ( keys != null && !keys.isEmpty() )
340        {
341            for (K key : keys)
342            {
343                ICacheElement<K, V> element = get( cacheName, key );
344
345                if ( element != null )
346                {
347                    elements.put( key, element );
348                }
349            }
350        }
351        return elements;
352    }
353
354    /**
355     * Return the keys in this cache.
356     * <p>
357     * @param cacheName the name of the cache region
358     * @see org.apache.commons.jcs.auxiliary.AuxiliaryCache#getKeySet()
359     */
360    @Override
361    @SuppressWarnings("unchecked") // Need cast from Object
362    public Set<K> getKeySet(String cacheName) throws IOException
363    {
364        CacheElement<String, String> ce = new CacheElement<String, String>(cacheName, null, null);
365        LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>(ce);
366        // led.requesterId = requesterId; // later
367        led.command = LateralCommand.GET_KEYSET;
368        Object response = sender.sendAndReceive(led);
369        if (response != null)
370        {
371            return (Set<K>) response;
372        }
373
374        return null;
375    }
376
377    /**
378     * @param cacheName
379     * @throws IOException
380     */
381    @Override
382    public void removeAll( String cacheName )
383        throws IOException
384    {
385        removeAll( cacheName, getListenerId() );
386    }
387
388    /**
389     * @param cacheName
390     * @param requesterId
391     * @throws IOException
392     */
393    @Override
394    public void removeAll( String cacheName, long requesterId )
395        throws IOException
396    {
397        CacheElement<String, String> ce = new CacheElement<String, String>( cacheName, "ALL", null );
398        LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
399        led.requesterId = requesterId;
400        led.command = LateralCommand.REMOVEALL;
401        sender.send( led );
402    }
403
404    /**
405     * @param args
406     */
407    public static void main( String args[] )
408    {
409        try
410        {
411            LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
412
413            // process user input till done
414            boolean notDone = true;
415            String message = null;
416            // wait to dispose
417            BufferedReader br = new BufferedReader( new InputStreamReader( System.in, "UTF-8" ) );
418
419            while ( notDone )
420            {
421                System.out.println( "enter message:" );
422                message = br.readLine();
423
424                if (message == null)
425                {
426                    notDone = false;
427                    continue;
428                }
429
430                CacheElement<String, String> ce = new CacheElement<String, String>( "test", "test", message );
431                LateralElementDescriptor<String, String> led = new LateralElementDescriptor<String, String>( ce );
432                sender.send( led );
433            }
434        }
435        catch ( IOException e )
436        {
437            System.out.println( e.toString() );
438        }
439    }
440
441    /**
442     * @param listernId The listernId to set.
443     */
444    protected void setListenerId( long listernId )
445    {
446        this.listenerId = listernId;
447    }
448
449    /**
450     * @return Returns the listernId.
451     */
452    protected long getListenerId()
453    {
454        return listenerId;
455    }
456}