001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.transaction.memory;
018
019import java.io.PrintWriter;
020import java.util.HashSet;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Set;
024import java.util.Collections;
025
026import org.apache.commons.transaction.locking.ReadWriteLock;
027import org.apache.commons.transaction.util.LoggerFacade;
028import org.apache.commons.transaction.util.PrintWriterLogger;
029
030/**
031 * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using
032 * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but
033 * may also fail to commit. 
034 *  
035 * <br>
036 * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and
037 * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to
038 * undo them.
039 * <br>
040 * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism.
041 * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}.
042 * <br>
043 * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>.
044 * <br>
045 * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around.
046 * 
047 * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $
048 * @see TransactionalMapWrapper
049 * @see PessimisticMapWrapper
050 */
051public class OptimisticMapWrapper extends TransactionalMapWrapper {
052
053    protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute
054    protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds
055
056    protected Set activeTransactions;
057
058    protected LoggerFacade logger;
059
060    protected ReadWriteLock commitLock;
061
062    /**
063     * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
064     * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}. 
065     * 
066     * @param wrapped map to be wrapped
067     */
068    public OptimisticMapWrapper(Map wrapped) {
069        this(wrapped, new HashMapFactory(), new HashSetFactory());
070    }
071
072    /**
073     * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
074     * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
075     * 
076     * @param wrapped map to be wrapped
077     * @param mapFactory factory for temporary maps
078     * @param setFactory factory for temporary sets
079     */
080    public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) {
081        this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out),
082                OptimisticMapWrapper.class.getName(), false));
083    }
084
085    /**
086     * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
087     * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
088     * 
089     * @param wrapped map to be wrapped
090     * @param mapFactory factory for temporary maps
091     * @param setFactory factory for temporary sets
092     * @param logger
093     *            generic logger used for all kinds of logging
094     */
095    public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) {
096        super(wrapped, mapFactory, setFactory);
097        activeTransactions = Collections.synchronizedSet(new HashSet());
098        this.logger = logger;
099        commitLock = new ReadWriteLock("COMMIT", logger);
100    }
101
102    public void startTransaction() {
103        if (getActiveTx() != null) {
104            throw new IllegalStateException(
105                "Active thread " + Thread.currentThread() + " already associated with a transaction!");
106        }
107        CopyingTxContext context = new CopyingTxContext();
108        activeTransactions.add(context);
109        setActiveTx(context);
110    }
111
112    public void rollbackTransaction() {
113        TxContext txContext = getActiveTx();
114        super.rollbackTransaction();
115        activeTransactions.remove(txContext);
116    }
117
118    public void commitTransaction() throws ConflictException {
119        commitTransaction(false);
120    }
121
122    public void commitTransaction(boolean force) throws ConflictException {
123        TxContext txContext = getActiveTx();
124        
125        if (txContext == null) {
126            throw new IllegalStateException(
127                "Active thread " + Thread.currentThread() + " not associated with a transaction!");
128        }
129
130        if (txContext.status == STATUS_MARKED_ROLLBACK) {
131            throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!");
132        }
133        
134        try {
135            // in this final commit phase we need to be the only one access the map
136            // to make sure no one adds an entry after we checked for conflicts
137            commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
138
139            if (!force) {
140                Object conflictKey = checkForConflicts();
141                if (conflictKey != null) {
142                    throw new ConflictException(conflictKey);
143                }
144            }
145    
146            activeTransactions.remove(txContext);
147            copyChangesToConcurrentTransactions();
148            super.commitTransaction();
149            
150        } catch (InterruptedException e) {
151            // XXX a bit dirty ;)
152            throw new ConflictException(e);
153        } finally {
154            commitLock.release(txContext);
155        }
156    }
157
158    // TODO: Shouldn't we return a collection rather than a single key here?
159    public Object checkForConflicts() {
160        CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
161
162        Set keys = txContext.changedKeys();
163        Set externalKeys = txContext.externalChangedKeys();
164
165        for (Iterator it2 = keys.iterator(); it2.hasNext();) {
166            Object key = it2.next();
167            if (externalKeys.contains(key)) {
168                return key;
169            }
170        }
171        return null;
172    }
173
174    protected void copyChangesToConcurrentTransactions() {
175        CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx();
176
177        synchronized (activeTransactions) {
178            for (Iterator it = activeTransactions.iterator(); it.hasNext();) {
179                CopyingTxContext otherTxContext = (CopyingTxContext) it.next();
180
181                // no need to copy data if the other transaction does not access global map anyway
182                if (otherTxContext.cleared)
183                    continue;
184
185                if (thisTxContext.cleared) {
186                    // we will clear everything, so we have to copy everything before
187                    otherTxContext.externalChanges.putAll(wrapped);
188                } else // no need to check if we have already copied everthing
189                {
190                    for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) {
191                        Map.Entry entry = (Map.Entry) it2.next();
192                        Object value = wrapped.get(entry.getKey());
193                        if (value != null) {
194                            // undo change
195                            otherTxContext.externalChanges.put(entry.getKey(), value);
196                        } else {
197                            // undo add
198                            otherTxContext.externalDeletes.add(entry.getKey());
199                        }
200                    }
201
202                    for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) {
203                        // undo delete
204                        Object key = it2.next();
205                        Object value = wrapped.get(key);
206                        otherTxContext.externalChanges.put(key, value);
207                    }
208                }
209            }
210        }
211    }
212
213    public class CopyingTxContext extends TxContext {
214        protected Map externalChanges;
215        protected Map externalAdds;
216        protected Set externalDeletes;
217
218        protected CopyingTxContext() {
219            super();
220            externalChanges = mapFactory.createMap();
221            externalDeletes = setFactory.createSet();
222            externalAdds = mapFactory.createMap();
223        }
224
225        protected Set externalChangedKeys() {
226            Set keySet = new HashSet();
227            keySet.addAll(externalDeletes);
228            keySet.addAll(externalChanges.keySet());
229            keySet.addAll(externalAdds.keySet());
230            return keySet;
231        }
232
233        protected Set changedKeys() {
234            Set keySet = new HashSet();
235            keySet.addAll(deletes);
236            keySet.addAll(changes.keySet());
237            keySet.addAll(adds.keySet());
238            return keySet;
239        }
240
241        protected Set keys() {
242            try {
243                commitLock.acquireRead(this, ACCESS_TIMEOUT);
244                Set keySet = super.keys();
245                keySet.removeAll(externalDeletes);
246                keySet.addAll(externalAdds.keySet());
247                return keySet;
248            } catch (InterruptedException e) {
249                return null;
250            } finally {
251                commitLock.release(this);
252            }
253        }
254
255        protected Object get(Object key) {
256            try {
257                commitLock.acquireRead(this, ACCESS_TIMEOUT);
258
259                if (deletes.contains(key)) {
260                    // reflects that entry has been deleted in this tx 
261                    return null;
262                }
263    
264                Object changed = changes.get(key);
265                if (changed != null) {
266                    return changed;
267                }
268    
269                Object added = adds.get(key);
270                if (added != null) {
271                    return added;
272                }
273    
274                if (cleared) {
275                    return null;
276                } else {
277                    if (externalDeletes.contains(key)) {
278                        // reflects that entry has been deleted in this tx 
279                        return null;
280                    }
281    
282                    changed = externalChanges.get(key);
283                    if (changed != null) {
284                        return changed;
285                    }
286    
287                    added = externalAdds.get(key);
288                    if (added != null) {
289                        return added;
290                    }
291    
292                    // not modified in this tx
293                    return wrapped.get(key);
294                }
295            } catch (InterruptedException e) {
296                return null;
297            } finally {
298                commitLock.release(this);
299            }
300        }
301
302        protected void put(Object key, Object value) {
303            try {
304                commitLock.acquireRead(this, ACCESS_TIMEOUT);
305                super.put(key, value);
306            } catch (InterruptedException e) {
307            } finally {
308                commitLock.release(this);
309            }
310        }
311
312        protected void remove(Object key) {
313            try {
314                commitLock.acquireRead(this, ACCESS_TIMEOUT);
315                super.remove(key);
316            } catch (InterruptedException e) {
317            } finally {
318                commitLock.release(this);
319            }
320        }
321
322        protected int size() {
323            try {
324                commitLock.acquireRead(this, ACCESS_TIMEOUT);
325                int size = super.size();
326    
327                size -= externalDeletes.size();
328                size += externalAdds.size();
329    
330                return size;
331            } catch (InterruptedException e) {
332                return -1;
333            } finally {
334                commitLock.release(this);
335            }
336        }
337
338        protected void clear() {
339            try {
340                commitLock.acquireRead(this, ACCESS_TIMEOUT);
341                super.clear();
342                externalDeletes.clear();
343                externalChanges.clear();
344                externalAdds.clear();
345            } catch (InterruptedException e) {
346            } finally {
347                commitLock.release(this);
348            }
349        }
350
351        protected void merge() {
352            try {
353                commitLock.acquireRead(this, ACCESS_TIMEOUT);
354                super.merge();
355            } catch (InterruptedException e) {
356            } finally {
357                commitLock.release(this);
358            }
359        }
360
361        protected void dispose() {
362            try {
363                commitLock.acquireRead(this, ACCESS_TIMEOUT);
364                super.dispose();
365                setFactory.disposeSet(externalDeletes);
366                externalDeletes = null;
367                mapFactory.disposeMap(externalChanges);
368                externalChanges = null;
369                mapFactory.disposeMap(externalAdds);
370                externalAdds = null;
371            } catch (InterruptedException e) {
372            } finally {
373                commitLock.release(this);
374            }
375        }
376
377        protected void finalize() throws Throwable {
378            activeTransactions.remove(this);
379            super.finalize();
380        }
381    }
382}