001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *   http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.jcs.jcache.extras.writer;
020
021import javax.cache.Cache;
022import javax.cache.configuration.Factory;
023import javax.cache.integration.CacheWriter;
024import javax.cache.integration.CacheWriterException;
025import java.io.Closeable;
026import java.io.IOException;
027import java.util.Collection;
028import java.util.List;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.logging.Level;
034import java.util.logging.Logger;
035
036public class AsyncCacheWriter<K, V> implements CacheWriter<K, V>, Closeable, Factory<CacheWriter<K, V>>
037{
038    private static final Logger LOGGER = Logger.getLogger(AsyncCacheWriter.class.getName());
039
040    private final CacheWriter<K, V> writer;
041    private final ExecutorService pool;
042
043    public AsyncCacheWriter(final CacheWriter<K, V> delegate, final int poolSize)
044    {
045        writer = delegate;
046        pool = Executors.newFixedThreadPool(
047                poolSize, new DaemonThreadFactory(delegate.getClass().getName() + "-" + delegate.hashCode() + "-"));
048    }
049
050    @Override
051    public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException
052    {
053        pool.submit(new ExceptionProtectionRunnable()
054        {
055            @Override
056            public void doRun()
057            {
058                writer.write(entry);
059            }
060        });
061    }
062
063    @Override
064    public void writeAll(final Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException
065    {
066        pool.submit(new ExceptionProtectionRunnable()
067        {
068            @Override
069            public void doRun()
070            {
071                writer.writeAll(entries);
072            }
073        });
074    }
075
076    @Override
077    public void delete(final Object key) throws CacheWriterException
078    {
079        pool.submit(new ExceptionProtectionRunnable()
080        {
081            @Override
082            public void doRun()
083            {
084                writer.delete(key);
085            }
086        });
087    }
088
089    @Override
090    public void deleteAll(final Collection<?> keys) throws CacheWriterException
091    {
092        pool.submit(new ExceptionProtectionRunnable()
093        {
094            @Override
095            public void doRun()
096            {
097                writer.deleteAll(keys);
098            }
099        });
100    }
101
102    @Override
103    public void close() throws IOException
104    {
105        final List<Runnable> runnables = pool.shutdownNow();
106        for (final Runnable r : runnables)
107        {
108            r.run();
109        }
110    }
111
112    @Override
113    public CacheWriter<K, V> create()
114    {
115        return this;
116    }
117
118    // avoid dep on impl
119    private static class DaemonThreadFactory implements ThreadFactory
120    {
121        private final AtomicInteger index = new AtomicInteger(1);
122        private final String prefix;
123
124        public DaemonThreadFactory(final String prefix)
125        {
126            this.prefix = prefix;
127        }
128
129        @Override
130        public Thread newThread( final Runnable runner )
131        {
132            final Thread t = new Thread( runner );
133            t.setName(prefix + index.getAndIncrement());
134            t.setDaemon(true);
135            return t;
136        }
137    }
138
139    private static abstract class ExceptionProtectionRunnable implements Runnable
140    {
141        @Override
142        public void run()
143        {
144            try
145            {
146                doRun();
147            }
148            catch (final Exception e)
149            {
150                LOGGER.log(Level.SEVERE, e.getMessage(), e);
151            }
152        }
153
154        protected abstract void doRun();
155    }
156}