001package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;
002
003/*
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *   http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied.  See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 */
021
022import java.io.BufferedReader;
023import java.io.IOException;
024import java.io.InputStreamReader;
025import java.nio.charset.StandardCharsets;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.Map;
029import java.util.Set;
030
031import org.apache.commons.jcs3.auxiliary.lateral.LateralCommand;
032import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
033import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
034import org.apache.commons.jcs3.engine.CacheElement;
035import org.apache.commons.jcs3.engine.CacheInfo;
036import org.apache.commons.jcs3.engine.behavior.ICacheElement;
037import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal;
038import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
039import org.apache.commons.jcs3.log.Log;
040import org.apache.commons.jcs3.log.LogManager;
041import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
042
043/**
044 * A lateral cache service implementation. Does not implement getGroupKey
045 * TODO: Remove generics
046 */
047public class LateralTCPService<K, V>
048    implements ICacheServiceNonLocal<K, V>
049{
050    /** The logger. */
051    private static final Log log = LogManager.getLog( LateralTCPService.class );
052
053    /** special configuration */
054    private final boolean allowPut;
055    private final boolean allowGet;
056    private final boolean issueRemoveOnPut;
057
058    /** Sends to another lateral. */
059    private final LateralTCPSender sender;
060
061    /** use the vmid by default */
062    private long listenerId = CacheInfo.listenerId;
063
064    /**
065     * Constructor for the LateralTCPService object
066     * <p>
067     * @param lca ITCPLateralCacheAttributes the configuration object
068     * @throws IOException
069     *
070     * @deprecated Specify serializer
071     */
072    @Deprecated
073    public LateralTCPService( final ITCPLateralCacheAttributes lca )
074        throws IOException
075    {
076        this(lca, new StandardSerializer());
077    }
078
079    /**
080     * Constructor for the LateralTCPService object
081     * <p>
082     * @param lca ITCPLateralCacheAttributes the configuration object
083     * @param serializer the serializer to use when sending
084     * @throws IOException
085     * @since 3.1
086     */
087    public LateralTCPService( final ITCPLateralCacheAttributes lca, final IElementSerializer serializer )
088        throws IOException
089    {
090        this.allowGet = lca.isAllowGet();
091        this.allowPut = lca.isAllowPut();
092        this.issueRemoveOnPut = lca.isIssueRemoveOnPut();
093
094        try
095        {
096            sender = new LateralTCPSender( lca, serializer );
097
098            log.debug( "Created sender to [{0}]", lca::getTcpServer);
099        }
100        catch ( final IOException e )
101        {
102            // log.error( "Could not create sender", e );
103            // This gets thrown over and over in recovery mode.
104            // The stack trace isn't useful here.
105            log.error( "Could not create sender to [{0}] -- {1}", lca::getTcpServer, e::getMessage);
106            throw e;
107        }
108    }
109
110    /**
111     * @param item
112     * @throws IOException
113     */
114    @Override
115    public void update( final ICacheElement<K, V> item )
116        throws IOException
117    {
118        update( item, getListenerId() );
119    }
120
121    /**
122     * If put is allowed, we will issue a put. If issue put on remove is configured, we will issue a
123     * remove. Either way, we create a lateral element descriptor, which is essentially a JCS TCP
124     * packet. It describes what operation the receiver should take when it gets the packet.
125     * <p>
126     * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#update(org.apache.commons.jcs3.engine.behavior.ICacheElement,
127     *      long)
128     */
129    @Override
130    public void update( final ICacheElement<K, V> item, final long requesterId )
131        throws IOException
132    {
133        // if we don't allow put, see if we should remove on put
134        if ( !this.allowPut &&
135            // if we can't remove on put, and we can't put then return
136            !this.issueRemoveOnPut )
137        {
138            return;
139        }
140
141        // if we shouldn't remove on put, then put
142        if ( !this.issueRemoveOnPut )
143        {
144            final LateralElementDescriptor<K, V> led =
145                    new LateralElementDescriptor<>(item, LateralCommand.UPDATE, requesterId);
146            sender.send( led );
147        }
148        // else issue a remove with the hash code for remove check on
149        // on the other end, this will be a server config option
150        else
151        {
152            log.debug( "Issuing a remove for a put" );
153
154            // set the value to null so we don't send the item
155            final CacheElement<K, V> ce = new CacheElement<>( item.getCacheName(), item.getKey(), null );
156            final LateralElementDescriptor<K, V> led =
157                    new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
158            led.valHashCode = item.getVal().hashCode();
159            sender.send( led );
160        }
161    }
162
163    /**
164     * Uses the default listener id and calls the next remove method.
165     * <p>
166     * @see org.apache.commons.jcs3.engine.behavior.ICacheService#remove(String, Object)
167     */
168    @Override
169    public void remove( final String cacheName, final K key )
170        throws IOException
171    {
172        remove( cacheName, key, getListenerId() );
173    }
174
175    /**
176     * Wraps the key in a LateralElementDescriptor.
177     * <p>
178     * @see org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal#remove(String, Object, long)
179     */
180    @Override
181    public void remove( final String cacheName, final K key, final long requesterId )
182        throws IOException
183    {
184        final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
185        final LateralElementDescriptor<K, V> led =
186                new LateralElementDescriptor<>(ce, LateralCommand.REMOVE, requesterId);
187        sender.send( led );
188    }
189
190    /**
191     * Does nothing.
192     * <p>
193     * @throws IOException
194     */
195    @Override
196    public void release()
197        throws IOException
198    {
199        // nothing needs to be done
200    }
201
202    /**
203     * Will close the connection.
204     * <p>
205     * @param cacheName
206     * @throws IOException
207     */
208    @Override
209    public void dispose( final String cacheName )
210        throws IOException
211    {
212        sender.dispose();
213    }
214
215    /**
216     * @param cacheName
217     * @param key
218     * @return ICacheElement&lt;K, V&gt; if found.
219     * @throws IOException
220     */
221    @Override
222    public ICacheElement<K, V> get( final String cacheName, final K key )
223        throws IOException
224    {
225        return get( cacheName, key, getListenerId() );
226    }
227
228    /**
229     * If get is allowed, we will issues a get request.
230     * <p>
231     * @param cacheName
232     * @param key
233     * @param requesterId
234     * @return ICacheElement&lt;K, V&gt; if found.
235     * @throws IOException
236     */
237    @Override
238    public ICacheElement<K, V> get( final String cacheName, final K key, final long requesterId )
239        throws IOException
240    {
241        // if get is not allowed return
242        if ( this.allowGet )
243        {
244            final CacheElement<K, V> ce = new CacheElement<>( cacheName, key, null );
245            final LateralElementDescriptor<K, V> led =
246                    new LateralElementDescriptor<>(ce, LateralCommand.GET);
247            // led.requesterId = requesterId; // later
248            @SuppressWarnings("unchecked") // Need to cast from Object
249            final
250            ICacheElement<K, V> response = (ICacheElement<K, V>)sender.sendAndReceive( led );
251            return response;
252        }
253        // nothing needs to be done
254        return null;
255    }
256
257    /**
258     * If allow get is true, we will issue a getmatching query.
259     * <p>
260     * @param cacheName
261     * @param pattern
262     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
263     *         data in cache matching the pattern.
264     * @throws IOException
265     */
266    @Override
267    public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern )
268        throws IOException
269    {
270        return getMatching( cacheName, pattern, getListenerId() );
271    }
272
273    /**
274     * If allow get is true, we will issue a getmatching query.
275     * <p>
276     * @param cacheName
277     * @param pattern
278     * @param requesterId - our identity
279     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
280     *         data in cache matching the pattern.
281     * @throws IOException
282     */
283    @Override
284    @SuppressWarnings("unchecked") // Need to cast from Object
285    public Map<K, ICacheElement<K, V>> getMatching( final String cacheName, final String pattern, final long requesterId )
286        throws IOException
287    {
288        // if get is not allowed return
289        if ( !this.allowGet ) {
290            // nothing needs to be done
291            return null;
292        }
293        final CacheElement<String, String> ce = new CacheElement<>( cacheName, pattern, null );
294        final LateralElementDescriptor<String, String> led =
295                new LateralElementDescriptor<>(ce, LateralCommand.GET_MATCHING);
296        // led.requesterId = requesterId; // later
297
298        final Object response = sender.sendAndReceive( led );
299        if ( response != null )
300        {
301            return (Map<K, ICacheElement<K, V>>) response;
302        }
303        return Collections.emptyMap();
304    }
305
306    /**
307     * Gets multiple items from the cache based on the given set of keys.
308     * <p>
309     * @param cacheName
310     * @param keys
311     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
312     *         data in cache for any of these keys
313     * @throws IOException
314     */
315    @Override
316    public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys )
317        throws IOException
318    {
319        return getMultiple( cacheName, keys, getListenerId() );
320    }
321
322    /**
323     * This issues a separate get for each item.
324     * <p>
325     * TODO We should change this. It should issue one request.
326     * <p>
327     * @param cacheName
328     * @param keys
329     * @param requesterId
330     * @return a map of K key to ICacheElement&lt;K, V&gt; element, or an empty map if there is no
331     *         data in cache for any of these keys
332     * @throws IOException
333     */
334    @Override
335    public Map<K, ICacheElement<K, V>> getMultiple( final String cacheName, final Set<K> keys, final long requesterId )
336        throws IOException
337    {
338        final Map<K, ICacheElement<K, V>> elements = new HashMap<>();
339
340        if ( keys != null && !keys.isEmpty() )
341        {
342            for (final K key : keys)
343            {
344                final ICacheElement<K, V> element = get( cacheName, key, requesterId );
345
346                if ( element != null )
347                {
348                    elements.put( key, element );
349                }
350            }
351        }
352        return elements;
353    }
354
355    /**
356     * Return the keys in this cache.
357     * <p>
358     * @param cacheName the name of the cache region
359     * @see org.apache.commons.jcs3.auxiliary.AuxiliaryCache#getKeySet()
360     */
361    @Override
362    @SuppressWarnings("unchecked") // Need cast from Object
363    public Set<K> getKeySet(final String cacheName) throws IOException
364    {
365        final CacheElement<String, String> ce = new CacheElement<>(cacheName, null, null);
366        final LateralElementDescriptor<String, String> led =
367                new LateralElementDescriptor<>(ce, LateralCommand.GET_KEYSET);
368        // led.requesterId = requesterId; // later
369        final Object response = sender.sendAndReceive(led);
370        if (response != null)
371        {
372            return (Set<K>) response;
373        }
374
375        return null;
376    }
377
378    /**
379     * @param cacheName
380     * @throws IOException
381     */
382    @Override
383    public void removeAll( final String cacheName )
384        throws IOException
385    {
386        removeAll( cacheName, getListenerId() );
387    }
388
389    /**
390     * @param cacheName
391     * @param requesterId
392     * @throws IOException
393     */
394    @Override
395    public void removeAll( final String cacheName, final long requesterId )
396        throws IOException
397    {
398        final CacheElement<String, String> ce = new CacheElement<>( cacheName, "ALL", null );
399        final LateralElementDescriptor<String, String> led =
400                new LateralElementDescriptor<>(ce, LateralCommand.REMOVEALL, requesterId);
401        sender.send( led );
402    }
403
404    /**
405     * Test
406     * @param args
407     *
408     * @deprecated Use unit tests
409     */
410    @Deprecated
411    public static void main( final String args[] )
412    {
413        try
414        {
415            final LateralTCPSender sender = new LateralTCPSender( new TCPLateralCacheAttributes() );
416
417            // process user input till done
418            boolean notDone = true;
419            String message = null;
420            // wait to dispose
421            final BufferedReader br = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
422
423            while ( notDone )
424            {
425                System.out.println( "enter message:" );
426                message = br.readLine();
427
428                if (message == null)
429                {
430                    notDone = false;
431                    continue;
432                }
433
434                final CacheElement<String, String> ce = new CacheElement<>( "test", "test", message );
435                final LateralElementDescriptor<String, String> led = new LateralElementDescriptor<>( ce );
436                sender.send( led );
437            }
438        }
439        catch ( final IOException e )
440        {
441            System.out.println( e.toString() );
442        }
443    }
444
445    /**
446     * @param listernId The listernId to set.
447     */
448    protected void setListenerId( final long listernId )
449    {
450        this.listenerId = listernId;
451    }
452
453    /**
454     * @return Returns the listernId.
455     */
456    protected long getListenerId()
457    {
458        return listenerId;
459    }
460}