View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.transaction.memory;
18  
19  import java.io.PrintWriter;
20  import java.util.HashSet;
21  import java.util.Iterator;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.Collections;
25  
26  import org.apache.commons.transaction.locking.ReadWriteLock;
27  import org.apache.commons.transaction.util.LoggerFacade;
28  import org.apache.commons.transaction.util.PrintWriterLogger;
29  
30  /**
31   * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using
32   * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but
33   * may also fail to commit. 
34   *  
35   * <br>
36   * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and
37   * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to
38   * undo them.
39   * <br>
40   * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism.
41   * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}.
42   * <br>
43   * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>.
44   * <br>
45   * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around.
46   * 
47   * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $
48   * @see TransactionalMapWrapper
49   * @see PessimisticMapWrapper
50   */
51  public class OptimisticMapWrapper extends TransactionalMapWrapper {
52  
53      protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute
54      protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds
55  
56      protected Set activeTransactions;
57  
58      protected LoggerFacade logger;
59  
60      protected ReadWriteLock commitLock;
61  
62      /**
63       * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
64       * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}. 
65       * 
66       * @param wrapped map to be wrapped
67       */
68      public OptimisticMapWrapper(Map wrapped) {
69          this(wrapped, new HashMapFactory(), new HashSetFactory());
70      }
71  
72      /**
73       * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
74       * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
75       * 
76       * @param wrapped map to be wrapped
77       * @param mapFactory factory for temporary maps
78       * @param setFactory factory for temporary sets
79       */
80      public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) {
81          this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out),
82                  OptimisticMapWrapper.class.getName(), false));
83      }
84  
85      /**
86       * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
87       * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
88       * 
89       * @param wrapped map to be wrapped
90       * @param mapFactory factory for temporary maps
91       * @param setFactory factory for temporary sets
92       * @param logger
93       *            generic logger used for all kinds of logging
94       */
95      public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) {
96          super(wrapped, mapFactory, setFactory);
97          activeTransactions = Collections.synchronizedSet(new HashSet());
98          this.logger = logger;
99          commitLock = new ReadWriteLock("COMMIT", logger);
100     }
101 
102     public void startTransaction() {
103         if (getActiveTx() != null) {
104             throw new IllegalStateException(
105                 "Active thread " + Thread.currentThread() + " already associated with a transaction!");
106         }
107         CopyingTxContext context = new CopyingTxContext();
108         activeTransactions.add(context);
109         setActiveTx(context);
110     }
111 
112     public void rollbackTransaction() {
113         TxContext txContext = getActiveTx();
114         super.rollbackTransaction();
115         activeTransactions.remove(txContext);
116     }
117 
118     public void commitTransaction() throws ConflictException {
119         commitTransaction(false);
120     }
121 
122     public void commitTransaction(boolean force) throws ConflictException {
123         TxContext txContext = getActiveTx();
124         
125         if (txContext == null) {
126             throw new IllegalStateException(
127                 "Active thread " + Thread.currentThread() + " not associated with a transaction!");
128         }
129 
130         if (txContext.status == STATUS_MARKED_ROLLBACK) {
131             throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!");
132         }
133         
134         try {
135             // in this final commit phase we need to be the only one access the map
136             // to make sure no one adds an entry after we checked for conflicts
137             commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
138 
139             if (!force) {
140                 Object conflictKey = checkForConflicts();
141                 if (conflictKey != null) {
142                     throw new ConflictException(conflictKey);
143                 }
144             }
145     
146             activeTransactions.remove(txContext);
147             copyChangesToConcurrentTransactions();
148             super.commitTransaction();
149             
150         } catch (InterruptedException e) {
151             // XXX a bit dirty ;)
152             throw new ConflictException(e);
153         } finally {
154             commitLock.release(txContext);
155         }
156     }
157 
158     // TODO: Shouldn't we return a collection rather than a single key here?
159     public Object checkForConflicts() {
160         CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
161 
162         Set keys = txContext.changedKeys();
163         Set externalKeys = txContext.externalChangedKeys();
164 
165         for (Iterator it2 = keys.iterator(); it2.hasNext();) {
166             Object key = it2.next();
167             if (externalKeys.contains(key)) {
168                 return key;
169             }
170         }
171         return null;
172     }
173 
174     protected void copyChangesToConcurrentTransactions() {
175         CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx();
176 
177         synchronized (activeTransactions) {
178             for (Iterator it = activeTransactions.iterator(); it.hasNext();) {
179                 CopyingTxContext otherTxContext = (CopyingTxContext) it.next();
180 
181                 // no need to copy data if the other transaction does not access global map anyway
182                 if (otherTxContext.cleared)
183                     continue;
184 
185                 if (thisTxContext.cleared) {
186                     // we will clear everything, so we have to copy everything before
187                     otherTxContext.externalChanges.putAll(wrapped);
188                 } else // no need to check if we have already copied everthing
189                 {
190                     for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) {
191                         Map.Entry entry = (Map.Entry) it2.next();
192                         Object value = wrapped.get(entry.getKey());
193                         if (value != null) {
194                             // undo change
195                             otherTxContext.externalChanges.put(entry.getKey(), value);
196                         } else {
197                             // undo add
198                             otherTxContext.externalDeletes.add(entry.getKey());
199                         }
200                     }
201 
202                     for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) {
203                         // undo delete
204                         Object key = it2.next();
205                         Object value = wrapped.get(key);
206                         otherTxContext.externalChanges.put(key, value);
207                     }
208                 }
209             }
210         }
211     }
212 
213     public class CopyingTxContext extends TxContext {
214         protected Map externalChanges;
215         protected Map externalAdds;
216         protected Set externalDeletes;
217 
218         protected CopyingTxContext() {
219             super();
220             externalChanges = mapFactory.createMap();
221             externalDeletes = setFactory.createSet();
222             externalAdds = mapFactory.createMap();
223         }
224 
225         protected Set externalChangedKeys() {
226             Set keySet = new HashSet();
227             keySet.addAll(externalDeletes);
228             keySet.addAll(externalChanges.keySet());
229             keySet.addAll(externalAdds.keySet());
230             return keySet;
231         }
232 
233         protected Set changedKeys() {
234             Set keySet = new HashSet();
235             keySet.addAll(deletes);
236             keySet.addAll(changes.keySet());
237             keySet.addAll(adds.keySet());
238             return keySet;
239         }
240 
241         protected Set keys() {
242             try {
243                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
244                 Set keySet = super.keys();
245                 keySet.removeAll(externalDeletes);
246                 keySet.addAll(externalAdds.keySet());
247                 return keySet;
248             } catch (InterruptedException e) {
249                 return null;
250             } finally {
251                 commitLock.release(this);
252             }
253         }
254 
255         protected Object get(Object key) {
256             try {
257                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
258 
259                 if (deletes.contains(key)) {
260                     // reflects that entry has been deleted in this tx 
261                     return null;
262                 }
263     
264                 Object changed = changes.get(key);
265                 if (changed != null) {
266                     return changed;
267                 }
268     
269                 Object added = adds.get(key);
270                 if (added != null) {
271                     return added;
272                 }
273     
274                 if (cleared) {
275                     return null;
276                 } else {
277                     if (externalDeletes.contains(key)) {
278                         // reflects that entry has been deleted in this tx 
279                         return null;
280                     }
281     
282                     changed = externalChanges.get(key);
283                     if (changed != null) {
284                         return changed;
285                     }
286     
287                     added = externalAdds.get(key);
288                     if (added != null) {
289                         return added;
290                     }
291     
292                     // not modified in this tx
293                     return wrapped.get(key);
294                 }
295             } catch (InterruptedException e) {
296                 return null;
297             } finally {
298                 commitLock.release(this);
299             }
300         }
301 
302         protected void put(Object key, Object value) {
303             try {
304                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
305                 super.put(key, value);
306             } catch (InterruptedException e) {
307             } finally {
308                 commitLock.release(this);
309             }
310         }
311 
312         protected void remove(Object key) {
313             try {
314                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
315                 super.remove(key);
316             } catch (InterruptedException e) {
317             } finally {
318                 commitLock.release(this);
319             }
320         }
321 
322         protected int size() {
323             try {
324                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
325                 int size = super.size();
326     
327                 size -= externalDeletes.size();
328                 size += externalAdds.size();
329     
330                 return size;
331             } catch (InterruptedException e) {
332                 return -1;
333             } finally {
334                 commitLock.release(this);
335             }
336         }
337 
338         protected void clear() {
339             try {
340                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
341                 super.clear();
342                 externalDeletes.clear();
343                 externalChanges.clear();
344                 externalAdds.clear();
345             } catch (InterruptedException e) {
346             } finally {
347                 commitLock.release(this);
348             }
349         }
350 
351         protected void merge() {
352             try {
353                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
354                 super.merge();
355             } catch (InterruptedException e) {
356             } finally {
357                 commitLock.release(this);
358             }
359         }
360 
361         protected void dispose() {
362             try {
363                 commitLock.acquireRead(this, ACCESS_TIMEOUT);
364                 super.dispose();
365                 setFactory.disposeSet(externalDeletes);
366                 externalDeletes = null;
367                 mapFactory.disposeMap(externalChanges);
368                 externalChanges = null;
369                 mapFactory.disposeMap(externalAdds);
370                 externalAdds = null;
371             } catch (InterruptedException e) {
372             } finally {
373                 commitLock.release(this);
374             }
375         }
376 
377         protected void finalize() throws Throwable {
378             activeTransactions.remove(this);
379             super.finalize();
380         }
381     }
382 }