AsyncCacheWriter.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *   http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing,
  13.  * software distributed under the License is distributed on an
  14.  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15.  * KIND, either express or implied.  See the License for the
  16.  * specific language governing permissions and limitations
  17.  * under the License.
  18.  */
  19. package org.apache.commons.jcs3.jcache.extras.writer;

  20. import javax.cache.Cache;
  21. import javax.cache.configuration.Factory;
  22. import javax.cache.integration.CacheWriter;
  23. import javax.cache.integration.CacheWriterException;
  24. import java.io.Closeable;
  25. import java.io.IOException;
  26. import java.util.Collection;
  27. import java.util.List;
  28. import java.util.concurrent.ExecutorService;
  29. import java.util.concurrent.Executors;
  30. import java.util.concurrent.ThreadFactory;
  31. import java.util.concurrent.atomic.AtomicInteger;
  32. import java.util.logging.Level;
  33. import java.util.logging.Logger;

  34. public class AsyncCacheWriter<K, V> implements CacheWriter<K, V>, Closeable, Factory<CacheWriter<K, V>>
  35. {
  36.     private static final Logger LOGGER = Logger.getLogger(AsyncCacheWriter.class.getName());

  37.     private final CacheWriter<K, V> writer;
  38.     private final ExecutorService pool;

  39.     public AsyncCacheWriter(final CacheWriter<K, V> delegate, final int poolSize)
  40.     {
  41.         writer = delegate;
  42.         pool = Executors.newFixedThreadPool(
  43.                 poolSize, new DaemonThreadFactory(delegate.getClass().getName() + "-" + delegate.hashCode() + "-"));
  44.     }

  45.     @Override
  46.     public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException
  47.     {
  48.         pool.submit(new ExceptionProtectionRunnable()
  49.         {
  50.             @Override
  51.             public void doRun()
  52.             {
  53.                 writer.write(entry);
  54.             }
  55.         });
  56.     }

  57.     @Override
  58.     public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException
  59.     {
  60.         pool.submit(new ExceptionProtectionRunnable()
  61.         {
  62.             @Override
  63.             public void doRun()
  64.             {
  65.                 writer.writeAll(entries);
  66.             }
  67.         });
  68.     }

  69.     @Override
  70.     public void delete(final Object key) throws CacheWriterException
  71.     {
  72.         pool.submit(new ExceptionProtectionRunnable()
  73.         {
  74.             @Override
  75.             public void doRun()
  76.             {
  77.                 writer.delete(key);
  78.             }
  79.         });
  80.     }

  81.     @Override
  82.     public void deleteAll(final Collection<?> keys) throws CacheWriterException
  83.     {
  84.         pool.submit(new ExceptionProtectionRunnable()
  85.         {
  86.             @Override
  87.             public void doRun()
  88.             {
  89.                 writer.deleteAll(keys);
  90.             }
  91.         });
  92.     }

  93.     @Override
  94.     public void close() throws IOException
  95.     {
  96.         final List<Runnable> runnables = pool.shutdownNow();
  97.         for (final Runnable r : runnables)
  98.         {
  99.             r.run();
  100.         }
  101.     }

  102.     @Override
  103.     public CacheWriter<K, V> create()
  104.     {
  105.         return this;
  106.     }

  107.     // avoid dep on impl
  108.     private static class DaemonThreadFactory implements ThreadFactory
  109.     {
  110.         private final AtomicInteger index = new AtomicInteger(1);
  111.         private final String prefix;

  112.         public DaemonThreadFactory(final String prefix)
  113.         {
  114.             this.prefix = prefix;
  115.         }

  116.         @Override
  117.         public Thread newThread( final Runnable runner )
  118.         {
  119.             final Thread t = new Thread( runner );
  120.             t.setName(prefix + index.getAndIncrement());
  121.             t.setDaemon(true);
  122.             return t;
  123.         }
  124.     }

  125.     private static abstract class ExceptionProtectionRunnable implements Runnable
  126.     {
  127.         @Override
  128.         public void run()
  129.         {
  130.             try
  131.             {
  132.                 doRun();
  133.             }
  134.             catch (final Exception e)
  135.             {
  136.                 LOGGER.log(Level.SEVERE, e.getMessage(), e);
  137.             }
  138.         }

  139.         protected abstract void doRun();
  140.     }
  141. }