001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.transaction.memory; 018 019import java.io.PrintWriter; 020import java.util.HashSet; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Set; 024import java.util.Collections; 025 026import org.apache.commons.transaction.locking.ReadWriteLock; 027import org.apache.commons.transaction.util.LoggerFacade; 028import org.apache.commons.transaction.util.PrintWriterLogger; 029 030/** 031 * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using 032 * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but 033 * may also fail to commit. 034 * 035 * <br> 036 * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and 037 * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to 038 * undo them. 039 * <br> 040 * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism. 041 * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}. 042 * <br> 043 * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>. 044 * <br> 045 * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around. 046 * 047 * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $ 048 * @see TransactionalMapWrapper 049 * @see PessimisticMapWrapper 050 */ 051public class OptimisticMapWrapper extends TransactionalMapWrapper { 052 053 protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute 054 protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds 055 056 protected Set activeTransactions; 057 058 protected LoggerFacade logger; 059 060 protected ReadWriteLock commitLock; 061 062 /** 063 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional 064 * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}. 065 * 066 * @param wrapped map to be wrapped 067 */ 068 public OptimisticMapWrapper(Map wrapped) { 069 this(wrapped, new HashMapFactory(), new HashSetFactory()); 070 } 071 072 /** 073 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional 074 * data will be created and disposed using {@link MapFactory} and {@link SetFactory}. 075 * 076 * @param wrapped map to be wrapped 077 * @param mapFactory factory for temporary maps 078 * @param setFactory factory for temporary sets 079 */ 080 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) { 081 this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out), 082 OptimisticMapWrapper.class.getName(), false)); 083 } 084 085 /** 086 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional 087 * data will be created and disposed using {@link MapFactory} and {@link SetFactory}. 088 * 089 * @param wrapped map to be wrapped 090 * @param mapFactory factory for temporary maps 091 * @param setFactory factory for temporary sets 092 * @param logger 093 * generic logger used for all kinds of logging 094 */ 095 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) { 096 super(wrapped, mapFactory, setFactory); 097 activeTransactions = Collections.synchronizedSet(new HashSet()); 098 this.logger = logger; 099 commitLock = new ReadWriteLock("COMMIT", logger); 100 } 101 102 public void startTransaction() { 103 if (getActiveTx() != null) { 104 throw new IllegalStateException( 105 "Active thread " + Thread.currentThread() + " already associated with a transaction!"); 106 } 107 CopyingTxContext context = new CopyingTxContext(); 108 activeTransactions.add(context); 109 setActiveTx(context); 110 } 111 112 public void rollbackTransaction() { 113 TxContext txContext = getActiveTx(); 114 super.rollbackTransaction(); 115 activeTransactions.remove(txContext); 116 } 117 118 public void commitTransaction() throws ConflictException { 119 commitTransaction(false); 120 } 121 122 public void commitTransaction(boolean force) throws ConflictException { 123 TxContext txContext = getActiveTx(); 124 125 if (txContext == null) { 126 throw new IllegalStateException( 127 "Active thread " + Thread.currentThread() + " not associated with a transaction!"); 128 } 129 130 if (txContext.status == STATUS_MARKED_ROLLBACK) { 131 throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!"); 132 } 133 134 try { 135 // in this final commit phase we need to be the only one access the map 136 // to make sure no one adds an entry after we checked for conflicts 137 commitLock.acquireWrite(txContext, COMMIT_TIMEOUT); 138 139 if (!force) { 140 Object conflictKey = checkForConflicts(); 141 if (conflictKey != null) { 142 throw new ConflictException(conflictKey); 143 } 144 } 145 146 activeTransactions.remove(txContext); 147 copyChangesToConcurrentTransactions(); 148 super.commitTransaction(); 149 150 } catch (InterruptedException e) { 151 // XXX a bit dirty ;) 152 throw new ConflictException(e); 153 } finally { 154 commitLock.release(txContext); 155 } 156 } 157 158 // TODO: Shouldn't we return a collection rather than a single key here? 159 public Object checkForConflicts() { 160 CopyingTxContext txContext = (CopyingTxContext) getActiveTx(); 161 162 Set keys = txContext.changedKeys(); 163 Set externalKeys = txContext.externalChangedKeys(); 164 165 for (Iterator it2 = keys.iterator(); it2.hasNext();) { 166 Object key = it2.next(); 167 if (externalKeys.contains(key)) { 168 return key; 169 } 170 } 171 return null; 172 } 173 174 protected void copyChangesToConcurrentTransactions() { 175 CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx(); 176 177 synchronized (activeTransactions) { 178 for (Iterator it = activeTransactions.iterator(); it.hasNext();) { 179 CopyingTxContext otherTxContext = (CopyingTxContext) it.next(); 180 181 // no need to copy data if the other transaction does not access global map anyway 182 if (otherTxContext.cleared) 183 continue; 184 185 if (thisTxContext.cleared) { 186 // we will clear everything, so we have to copy everything before 187 otherTxContext.externalChanges.putAll(wrapped); 188 } else // no need to check if we have already copied everthing 189 { 190 for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) { 191 Map.Entry entry = (Map.Entry) it2.next(); 192 Object value = wrapped.get(entry.getKey()); 193 if (value != null) { 194 // undo change 195 otherTxContext.externalChanges.put(entry.getKey(), value); 196 } else { 197 // undo add 198 otherTxContext.externalDeletes.add(entry.getKey()); 199 } 200 } 201 202 for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) { 203 // undo delete 204 Object key = it2.next(); 205 Object value = wrapped.get(key); 206 otherTxContext.externalChanges.put(key, value); 207 } 208 } 209 } 210 } 211 } 212 213 public class CopyingTxContext extends TxContext { 214 protected Map externalChanges; 215 protected Map externalAdds; 216 protected Set externalDeletes; 217 218 protected CopyingTxContext() { 219 super(); 220 externalChanges = mapFactory.createMap(); 221 externalDeletes = setFactory.createSet(); 222 externalAdds = mapFactory.createMap(); 223 } 224 225 protected Set externalChangedKeys() { 226 Set keySet = new HashSet(); 227 keySet.addAll(externalDeletes); 228 keySet.addAll(externalChanges.keySet()); 229 keySet.addAll(externalAdds.keySet()); 230 return keySet; 231 } 232 233 protected Set changedKeys() { 234 Set keySet = new HashSet(); 235 keySet.addAll(deletes); 236 keySet.addAll(changes.keySet()); 237 keySet.addAll(adds.keySet()); 238 return keySet; 239 } 240 241 protected Set keys() { 242 try { 243 commitLock.acquireRead(this, ACCESS_TIMEOUT); 244 Set keySet = super.keys(); 245 keySet.removeAll(externalDeletes); 246 keySet.addAll(externalAdds.keySet()); 247 return keySet; 248 } catch (InterruptedException e) { 249 return null; 250 } finally { 251 commitLock.release(this); 252 } 253 } 254 255 protected Object get(Object key) { 256 try { 257 commitLock.acquireRead(this, ACCESS_TIMEOUT); 258 259 if (deletes.contains(key)) { 260 // reflects that entry has been deleted in this tx 261 return null; 262 } 263 264 Object changed = changes.get(key); 265 if (changed != null) { 266 return changed; 267 } 268 269 Object added = adds.get(key); 270 if (added != null) { 271 return added; 272 } 273 274 if (cleared) { 275 return null; 276 } else { 277 if (externalDeletes.contains(key)) { 278 // reflects that entry has been deleted in this tx 279 return null; 280 } 281 282 changed = externalChanges.get(key); 283 if (changed != null) { 284 return changed; 285 } 286 287 added = externalAdds.get(key); 288 if (added != null) { 289 return added; 290 } 291 292 // not modified in this tx 293 return wrapped.get(key); 294 } 295 } catch (InterruptedException e) { 296 return null; 297 } finally { 298 commitLock.release(this); 299 } 300 } 301 302 protected void put(Object key, Object value) { 303 try { 304 commitLock.acquireRead(this, ACCESS_TIMEOUT); 305 super.put(key, value); 306 } catch (InterruptedException e) { 307 } finally { 308 commitLock.release(this); 309 } 310 } 311 312 protected void remove(Object key) { 313 try { 314 commitLock.acquireRead(this, ACCESS_TIMEOUT); 315 super.remove(key); 316 } catch (InterruptedException e) { 317 } finally { 318 commitLock.release(this); 319 } 320 } 321 322 protected int size() { 323 try { 324 commitLock.acquireRead(this, ACCESS_TIMEOUT); 325 int size = super.size(); 326 327 size -= externalDeletes.size(); 328 size += externalAdds.size(); 329 330 return size; 331 } catch (InterruptedException e) { 332 return -1; 333 } finally { 334 commitLock.release(this); 335 } 336 } 337 338 protected void clear() { 339 try { 340 commitLock.acquireRead(this, ACCESS_TIMEOUT); 341 super.clear(); 342 externalDeletes.clear(); 343 externalChanges.clear(); 344 externalAdds.clear(); 345 } catch (InterruptedException e) { 346 } finally { 347 commitLock.release(this); 348 } 349 } 350 351 protected void merge() { 352 try { 353 commitLock.acquireRead(this, ACCESS_TIMEOUT); 354 super.merge(); 355 } catch (InterruptedException e) { 356 } finally { 357 commitLock.release(this); 358 } 359 } 360 361 protected void dispose() { 362 try { 363 commitLock.acquireRead(this, ACCESS_TIMEOUT); 364 super.dispose(); 365 setFactory.disposeSet(externalDeletes); 366 externalDeletes = null; 367 mapFactory.disposeMap(externalChanges); 368 externalChanges = null; 369 mapFactory.disposeMap(externalAdds); 370 externalAdds = null; 371 } catch (InterruptedException e) { 372 } finally { 373 commitLock.release(this); 374 } 375 } 376 377 protected void finalize() throws Throwable { 378 activeTransactions.remove(this); 379 super.finalize(); 380 } 381 } 382}