001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.transaction.file; 018 019import java.io.BufferedReader; 020import java.io.BufferedWriter; 021import java.io.File; 022import java.io.FileInputStream; 023import java.io.FileNotFoundException; 024import java.io.FileOutputStream; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.InputStreamReader; 028import java.io.OutputStream; 029import java.io.OutputStreamWriter; 030import java.util.ArrayList; 031import java.util.Collection; 032import java.util.HashMap; 033import java.util.List; 034import java.util.Map; 035import java.util.Iterator; 036import java.util.Collections; 037 038import org.apache.commons.transaction.locking.GenericLock; 039import org.apache.commons.transaction.locking.GenericLockManager; 040import org.apache.commons.transaction.locking.LockException; 041import org.apache.commons.transaction.locking.LockManager2; 042import org.apache.commons.transaction.util.FileHelper; 043import org.apache.commons.transaction.util.LoggerFacade; 044 045/** 046 * A resource manager for streamable objects stored in a file system. 047 * 048 * It is intended for developer and "out of the box" use. 049 * It is <em>not</em> intended to be a real alternative for 050 * a full blown DMBS (of course it can not be compared to a RDBMS at all). 051 * 052 * Major features:<br> 053 * <ul> 054 * <li>Transactions performed with this class more or less comform to the widely accepted ACID properties 055 * <li>Reading should be as fast as from the ordinary file system (at the cost of a bit slower commits) 056 * </ul> 057 * 058 * Compared to a "real" DBMS major limitations are (in order of assumed severity):<br> 059 * <ul> 060 * <li>Number of simultaneously open resources is limited to the number of available file descriptors 061 * <li>It does not scale a bit 062 * <li>Pessimistic transaction and locking scheme 063 * <li>Isolation level currently is restricted to <em>read committed</em> and <em>repeated read</em> (which is not that bad) 064 * </ul> 065 * 066 * <em>Important</em>: If possible you should have the work and store directory located in the 067 * same file system. If not, you might get additional problems, as there are: 068 * <ul> 069 * <li>On commit it might be necessay to copy files instead of rename/relink them. This may lead to time consuming, 070 * overly blocking commit phases and higher risk of corrupted files 071 * <li>Prepare phase might be too permissive, no check for sufficient memory on store file system is possible 072 * </ul> 073 * 074 * General limitations include:<br> 075 * <ul> 076 * <li>Due to lack of synchronization on the transaction context level, every transaction may only be 077 * accessed by a <em>single thread</em> throughout its full life. 078 * This means it is forbidden for a thread that has not started a transaction 079 * to perform any operations inside this transaction. However, threads associated 080 * with different transactions can safely access these methods concurrently. 081 * Reasons for the lack of synchronization are improved performance and simplicity (of the code of this class). 082 * <li>There is no dedicated class for a transaction. Having such a class would be better practice and 083 * make certain actions more intuitive. 084 * <li>Resource identifiers need a reasonsable string representation obtainable by <code>toString</code>. 085 * More specifically, they will have to resolve to a <em>valid</em> file path that does note denote a directory. 086 * If it does, you might be able to create it, but not to read or write anything 087 * from resp. to it. Valid string representations of a resource idenfier are 088 * for example "file" "/root" or "hjfhdfhuhuhsdufhdsufhdsufhdfuhdfduhduhduhdu". 089 * Invalid are for example "/" or "/root/". Invalid on some file systems are for example "c:" or "file://huhu". 090 * <li>As there are no active processes inside this RM and it shares its threads with the application, 091 * control over transactions is limited to points where the application calls the RM. 092 * In particular, this disables <em>active</em> termination of transactions upon timeout. 093 * <li>There is no notion of a connection to this file manager. This means you can not connect from hosts other than 094 * local and you will get problems when plugging this store into a J2EE store using connectors. 095 * <li>Methods should throw more specific exceptions 096 * </ul> 097 * 098 * <p><em>Caution</em>:<br> 099 * The <code>txId</code> passed to many methods as an identifier for the 100 * transaction concerned will function as a key in a <code>HashMap</code>. 101 * Thus assure that <code>equals</code> and <code>hashCode</code> are both 102 * properly implemented and match each other.</p> 103 * 104 * <p><em>Caution</em>:<br> 105 * You will have to guarantee that no other process will access neither 106 * the store or the working dir concurrently to this <code>FileResourceManager</code>.</p> 107 * 108 * <p><em>Special Caution</em>:<br> 109 * Be very careful not to have two instances of <code>FileResourceManager</code> 110 * working in the same store and/or working dir. 111 * 112 * @version $Id: FileResourceManager.java 573315 2007-09-06 16:28:42Z ozeigermann $ 113 */ 114public class FileResourceManager implements ResourceManager, ResourceManagerErrorCodes { 115 116 // reflects the natural isolation level of this store 117 protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ; 118 protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL; 119 120 protected static final int NO_LOCK = 0; 121 protected static final int LOCK_ACCESS = NO_LOCK + 1; 122 protected static final int LOCK_SHARED = NO_LOCK + 2; 123 protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3; 124 protected static final int LOCK_COMMIT = NO_LOCK + 4; 125 126 protected static final int OPERATION_MODE_STOPPED = 0; 127 protected static final int OPERATION_MODE_STOPPING = 1; 128 protected static final int OPERATION_MODE_STARTED = 2; 129 protected static final int OPERATION_MODE_STARTING = 3; 130 protected static final int OPERATION_MODE_RECOVERING = 4; 131 132 protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15"; 133 134 protected static final int DEFAULT_TIMEOUT_MSECS = 5000; 135 protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2; 136 137 protected static final String WORK_CHANGE_DIR = "change"; 138 protected static final String WORK_DELETE_DIR = "delete"; 139 140 protected static final String CONTEXT_FILE = "transaction.log"; 141 142 /* 143 * --- Static helper methods --- 144 * 145 * 146 */ 147 148 protected static void applyDeletes(File removeDir, File targetDir, File rootDir) 149 throws IOException { 150 if (removeDir.isDirectory() && targetDir.isDirectory()) { 151 File[] files = removeDir.listFiles(); 152 for (int i = 0; i < files.length; i++) { 153 File removeFile = files[i]; 154 File targetFile = new File(targetDir, removeFile.getName()); 155 if (removeFile.isFile()) { 156 if (targetFile.exists()) { 157 if (!targetFile.delete()) { 158 throw new IOException("Could not delete file " + removeFile.getName() 159 + " in directory targetDir"); 160 } 161 } 162 // indicate, this has been done 163 removeFile.delete(); 164 } else { 165 applyDeletes(removeFile, targetFile, rootDir); 166 } 167 } 168 // delete empty target directories, except root dir 169 if (!targetDir.equals(rootDir) && targetDir.list().length == 0) { 170 targetDir.delete(); 171 } 172 } 173 } 174 175 /* 176 * --- object members --- 177 * 178 * 179 */ 180 181 protected String workDir; 182 protected String storeDir; 183 protected boolean cleanUp = true; 184 protected boolean dirty = false; 185 protected int operationMode = OPERATION_MODE_STOPPED; 186 protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS; 187 protected boolean debug; 188 189 protected LoggerFacade logger; 190 191 protected Map globalTransactions; 192 protected List globalOpenResources; 193 protected LockManager2 lockManager; 194 195 protected ResourceIdToPathMapper idMapper = null; 196 protected TransactionIdToPathMapper txIdMapper = null; 197 198 protected int idCnt = 0; 199 200 /* 201 * --- ctor and general getter / setter methods --- 202 * 203 * 204 */ 205 206 /** 207 * Creates a new resource manager operation on the specified directories. 208 * 209 * @param storeDir directory where main data should go after commit 210 * @param workDir directory where transactions store temporary data 211 * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters 212 * @param logger the logger to be used by this store 213 */ 214 public FileResourceManager(String storeDir, String workDir, boolean urlEncodePath, LoggerFacade logger) { 215 this(storeDir, workDir, urlEncodePath, logger, false); 216 } 217 218 /** 219 * Creates a new resource manager operation on the specified directories. 220 * 221 * @param storeDir directory where main data should go after commit 222 * @param workDir directory where transactions store temporary data 223 * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters 224 * @param logger the logger to be used by this store 225 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 226 */ 227 public FileResourceManager( 228 String storeDir, 229 String workDir, 230 boolean urlEncodePath, 231 LoggerFacade logger, 232 boolean debug) { 233 this(storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper() : null, new NoOpTransactionIdToPathMapper(), logger, debug); 234 } 235 236 /** 237 * Creates a new resource manager operation on the specified directories. 238 * This constructor is reintroduced for backwards API compatibility and is used by jakarta-slide. 239 * 240 * @param storeDir directory where main data should go after commit 241 * @param workDir directory where transactions store temporary data 242 * @param idMapper mapper for resourceId to path 243 * @param logger the logger to be used by this store 244 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 245 */ 246 public FileResourceManager( 247 String storeDir, 248 String workDir, 249 ResourceIdToPathMapper idMapper, 250 LoggerFacade logger, 251 boolean debug) { 252 this(storeDir, workDir, idMapper, new NoOpTransactionIdToPathMapper(), logger, debug); 253 } 254 /** 255 * Creates a new resource manager operation on the specified directories. 256 * 257 * @param storeDir directory where main data should go after commit 258 * @param workDir directory where transactions store temporary data 259 * @param idMapper mapper for resourceId to path 260 * @param txIdMapper mapper for transaction id to path 261 * @param logger the logger to be used by this store 262 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 263 */ 264 public FileResourceManager( 265 String storeDir, 266 String workDir, 267 ResourceIdToPathMapper idMapper, 268 TransactionIdToPathMapper txIdMapper, 269 LoggerFacade logger, 270 boolean debug) { 271 this.workDir = workDir; 272 this.storeDir = storeDir; 273 this.idMapper = idMapper; 274 this.txIdMapper = txIdMapper; 275 this.logger = logger; 276 this.debug = debug; 277 } 278 279 /** 280 * Gets the store directory. 281 * 282 * @return the store directory 283 * @see #FileResourceManager(String, String, boolean, LoggerFacade) 284 * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean) 285 */ 286 public String getStoreDir() { 287 return storeDir; 288 } 289 290 /** 291 * Gets the working directory. 292 * 293 * @return the work directory 294 * @see #FileResourceManager(String, String, boolean, LoggerFacade) 295 * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean) 296 */ 297 public String getWorkDir() { 298 return workDir; 299 } 300 301 /** 302 * Gets the logger used by this resource manager. 303 * 304 * @return used logger 305 */ 306 public LoggerFacade getLogger() { 307 return logger; 308 } 309 310 /* 311 * --- public methods of interface ResourceManager --- 312 * 313 * 314 */ 315 316 public boolean lockResource(Object resourceId, Object txId) throws ResourceManagerException { 317 lockResource(resourceId, txId, false); 318 // XXX will never return false as it will either throw or return true 319 return true; 320 } 321 322 public boolean lockResource(Object resourceId, Object txId, boolean shared) throws ResourceManagerException { 323 lockResource(resourceId, txId, shared, true, Long.MAX_VALUE, true); 324 // XXX will never return false as it will either throw or return true 325 return true; 326 } 327 328 public boolean lockResource( 329 Object resourceId, 330 Object txId, 331 boolean shared, 332 boolean wait, 333 long timeoutMSecs, 334 boolean reentrant) 335 throws ResourceManagerException { 336 337 TransactionContext context = (shared ? txInitialSaneCheck(txId) : txInitialSaneCheckForWriting(txId)); 338 assureNotMarkedForRollback(context); 339 fileInitialSaneCheck(txId, resourceId); 340 341 // XXX allows locking of non existent resources (e.g. to prepare a create) 342 int level = (shared ? getSharedLockLevel(context) : LOCK_EXCLUSIVE); 343 try { 344 lockManager.lock(txId, resourceId, level, reentrant, Math.min(timeoutMSecs, 345 context.timeoutMSecs)); 346 // XXX will never return false as it will either throw or return true 347 return true; 348 } catch (LockException e) { 349 switch (e.getCode()) { 350 case LockException.CODE_INTERRUPTED: 351 throw new ResourceManagerException("Could not get lock for resource at '" 352 + resourceId + "'", ERR_NO_LOCK, txId); 353 case LockException.CODE_TIMED_OUT: 354 throw new ResourceManagerException("Lock timed out for resource at '" + resourceId 355 + "'", ERR_NO_LOCK, txId); 356 case LockException.CODE_DEADLOCK_VICTIM: 357 throw new ResourceManagerException("Deadlock victim resource at '" + resourceId 358 + "'", ERR_DEAD_LOCK, txId); 359 default : 360 throw new ResourceManagerException("Locking exception for resource at '" + resourceId 361 + "'", ERR_DEAD_LOCK, txId); 362 } 363 } 364 } 365 366 public int getDefaultIsolationLevel() { 367 return DEFAULT_ISOLATION_LEVEL; 368 } 369 370 public int[] getSupportedIsolationLevels() throws ResourceManagerException { 371 return new int[] { ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ }; 372 } 373 374 public boolean isIsolationLevelSupported(int level) throws ResourceManagerException { 375 return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ); 376 } 377 378 /** 379 * Gets the default transaction timeout in <em>milliseconds</em>. 380 */ 381 public long getDefaultTransactionTimeout() { 382 return defaultTimeout; 383 } 384 385 /** 386 * Sets the default transaction timeout. 387 * 388 * @param timeout timeout in <em>milliseconds</em> 389 */ 390 public void setDefaultTransactionTimeout(long timeout) { 391 defaultTimeout = timeout; 392 } 393 394 public long getTransactionTimeout(Object txId) throws ResourceManagerException { 395 assureRMReady(); 396 long msecs = 0; 397 TransactionContext context = getContext(txId); 398 if (context == null) { 399 msecs = getDefaultTransactionTimeout(); 400 } else { 401 msecs = context.timeoutMSecs; 402 } 403 return msecs; 404 } 405 406 public void setTransactionTimeout(Object txId, long mSecs) throws ResourceManagerException { 407 assureRMReady(); 408 TransactionContext context = getContext(txId); 409 if (context != null) { 410 context.timeoutMSecs = mSecs; 411 } else { 412 throw new ResourceManagerException(ERR_NO_TX, txId); 413 } 414 } 415 416 public int getIsolationLevel(Object txId) throws ResourceManagerException { 417 assureRMReady(); 418 TransactionContext context = getContext(txId); 419 if (context == null) { 420 return DEFAULT_ISOLATION_LEVEL; 421 } else { 422 return context.isolationLevel; 423 } 424 } 425 426 public void setIsolationLevel(Object txId, int level) throws ResourceManagerException { 427 assureRMReady(); 428 TransactionContext context = getContext(txId); 429 if (context != null) { 430 if (level != ISOLATION_LEVEL_READ_COMMITTED || level != ISOLATION_LEVEL_REPEATABLE_READ) { 431 context.isolationLevel = level; 432 } else { 433 throw new ResourceManagerException(ERR_ISOLATION_LEVEL_UNSUPPORTED, txId); 434 } 435 } else { 436 throw new ResourceManagerException(ERR_NO_TX, txId); 437 } 438 } 439 440 public synchronized void start() throws ResourceManagerSystemException { 441 442 logger.logInfo("Starting RM at '" + storeDir + "' / '" + workDir + "'"); 443 444 operationMode = OPERATION_MODE_STARTING; 445 446 globalTransactions = Collections.synchronizedMap(new HashMap()); 447 lockManager = new GenericLockManager(LOCK_COMMIT, logger); 448 globalOpenResources = Collections.synchronizedList(new ArrayList()); 449 450 recover(); 451 sync(); 452 453 operationMode = OPERATION_MODE_STARTED; 454 455 if (dirty) { 456 logger.logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)"); 457 } else { 458 logger.logInfo("Started RM"); 459 } 460 461 } 462 463 public synchronized boolean stop(int mode) throws ResourceManagerSystemException { 464 return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR); 465 } 466 467 public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException { 468 469 logger.logInfo("Stopping RM at '" + storeDir + "' / '" + workDir + "'"); 470 471 operationMode = OPERATION_MODE_STOPPING; 472 473 sync(); 474 boolean success = shutdown(mode, timeOut); 475 476 releaseGlobalOpenResources(); 477 478 if (success) { 479 operationMode = OPERATION_MODE_STOPPED; 480 logger.logInfo("Stopped RM"); 481 } else { 482 logger.logWarning("Failed to stop RM"); 483 } 484 485 return success; 486 } 487 488 public synchronized boolean recover() throws ResourceManagerSystemException { 489 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STARTING) { 490 throw new ResourceManagerSystemException( 491 ERR_SYSTEM, 492 "Recovery is possible in started or starting resource manager only"); 493 } 494 int oldMode = operationMode; 495 operationMode = OPERATION_MODE_RECOVERING; 496 497 recoverContexts(); 498 if (globalTransactions.size() > 0) { 499 logger.logInfo("Recovering pending transactions"); 500 } 501 502 dirty = !rollBackOrForward(); 503 504 operationMode = oldMode; 505 return dirty; 506 } 507 508 public int getTransactionState(Object txId) throws ResourceManagerException { 509 TransactionContext context = getContext(txId); 510 511 if (context == null) { 512 return STATUS_NO_TRANSACTION; 513 } else { 514 return context.status; 515 } 516 517 } 518 519 public void startTransaction(Object txId) throws ResourceManagerException { 520 521 if (logger.isFineEnabled()) logger.logFine("Starting Tx " + txId); 522 523 assureStarted(); // can only start a new transaction when not already stopping 524 if (txId == null || txIdMapper.getPathForId(txId).length() == 0) { 525 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 526 } 527 528 // be sure we are the only ones who create this tx 529 synchronized (globalTransactions) { 530 TransactionContext context = getContext(txId); 531 532 if (context != null) { 533 throw new ResourceManagerException(ERR_DUP_TX, txId); 534 } 535 536 context = new TransactionContext(txId); 537 context.init(); 538 globalTransactions.put(txId, context); 539 540 } 541 } 542 543 public void markTransactionForRollback(Object txId) throws ResourceManagerException { 544 assureRMReady(); 545 TransactionContext context = txInitialSaneCheckForWriting(txId); 546 try { 547 context.status = STATUS_MARKED_ROLLBACK; 548 context.saveState(); 549 } finally { 550 // be very sure to free locks and resources, as application might crash or otherwise forget to roll this tx back 551 context.finalCleanUp(); 552 } 553 } 554 555 public int prepareTransaction(Object txId) throws ResourceManagerException { 556 assureRMReady(); 557 // do not allow any further writing or commit or rollback when db is corrupt 558 if (dirty) { 559 throw new ResourceManagerSystemException( 560 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!", 561 ERR_SYSTEM, 562 txId); 563 } 564 565 if (txId == null) { 566 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 567 } 568 569 TransactionContext context = getContext(txId); 570 571 if (context == null) { 572 return PREPARE_FAILURE; 573 } 574 575 synchronized (context) { 576 577 sync(); 578 579 if (context.status != STATUS_ACTIVE) { 580 context.status = STATUS_MARKED_ROLLBACK; 581 context.saveState(); 582 return PREPARE_FAILURE; 583 } 584 585 if (logger.isFineEnabled()) logger.logFine("Preparing Tx " + txId); 586 587 int prepareStatus = PREPARE_FAILURE; 588 589 context.status = STATUS_PREPARING; 590 context.saveState(); 591 // do all checks as early as possible 592 context.closeResources(); 593 if (context.readOnly) { 594 prepareStatus = PREPARE_SUCCESS_READONLY; 595 } else { 596 // do all checks as early as possible 597 try { 598 context.upgradeLockToCommit(); 599 } catch (ResourceManagerException rme) { 600 // if this did not work, mark it for roll back as early as possible 601 markTransactionForRollback(txId); 602 throw rme; 603 } 604 prepareStatus = PREPARE_SUCCESS; 605 } 606 context.status = STATUS_PREPARED; 607 context.saveState(); 608 if (logger.isFineEnabled()) logger.logFine("Prepared Tx " + txId); 609 610 return prepareStatus; 611 } 612 } 613 614 public void rollbackTransaction(Object txId) throws ResourceManagerException { 615 assureRMReady(); 616 TransactionContext context = txInitialSaneCheckForWriting(txId); 617 // needing synchronization in order not to interfer with shutdown thread 618 synchronized (context) { 619 try { 620 621 if (logger.isFineEnabled()) logger.logFine("Rolling back Tx " + txId); 622 623 context.status = STATUS_ROLLING_BACK; 624 context.saveState(); 625 context.rollback(); 626 if (logger.isFineEnabled()) logger.logFine("All resources successfully removed for tx" + txId); 627 context.status = STATUS_ROLLEDBACK; 628 context.saveState(); 629 globalTransactions.remove(txId); 630 context.cleanUp(); 631 632 if (logger.isFineEnabled()) logger.logFine("Rolled back Tx " + txId); 633 634 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag 635 } catch (Error e) { 636 setDirty(txId, e); 637 throw e; 638 } catch (RuntimeException e) { 639 setDirty(txId, e); 640 throw e; 641 } catch (ResourceManagerSystemException e) { 642 setDirty(txId, e); 643 throw e; 644 } finally { 645 context.finalCleanUp(); 646 // tell shutdown thread this tx is finished 647 context.notifyFinish(); 648 } 649 } 650 } 651 652 public void commitTransaction(Object txId) throws ResourceManagerException { 653 assureRMReady(); 654 TransactionContext context = txInitialSaneCheckForWriting(txId); 655 assureNotMarkedForRollback(context); 656 657 // needing synchronization in order not to interfer with shutdown thread 658 synchronized (context) { 659 try { 660 661 if (logger.isFineEnabled()) logger.logFine("Committing Tx " + txId); 662 663 context.status = STATUS_COMMITTING; 664 context.saveState(); 665 context.commit(); 666 if (logger.isFineEnabled()) logger.logFine("All resources successfully moved for tx" + txId); 667 context.status = STATUS_COMMITTED; 668 context.saveState(); 669 globalTransactions.remove(txId); 670 context.cleanUp(); 671 672 if (logger.isFineEnabled()) logger.logFine("Committed Tx " + txId); 673 674 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag 675 } catch (Error e) { 676 setDirty(txId, e); 677 throw e; 678 } catch (RuntimeException e) { 679 setDirty(txId, e); 680 throw e; 681 } catch (ResourceManagerSystemException e) { 682 setDirty(txId, e); 683 throw e; 684 // like "could not upgrade lock" 685 } catch (ResourceManagerException e) { 686 logger.logWarning("Could not commit tx " + txId + ", rolling back instead", e); 687 rollbackTransaction(txId); 688 } finally { 689 context.finalCleanUp(); 690 // tell shutdown thread this tx is finished 691 context.notifyFinish(); 692 } 693 } 694 } 695 696 public boolean resourceExists(Object resourceId) throws ResourceManagerException { 697 // create temporary light weight tx 698 Object txId; 699 TransactionContext context; 700 synchronized (globalTransactions) { 701 txId = generatedUniqueTxId(); 702 if (logger.isFinerEnabled()) 703 logger.logFiner("Creating temporary light weight tx " + txId + " to check for exists"); 704 context = new TransactionContext(txId); 705 context.isLightWeight = true; 706 // XXX higher isolation might be needed to make sure upgrade to commit lock always works 707 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED; 708 // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ; 709 globalTransactions.put(txId, context); 710 } 711 712 boolean exists = resourceExists(txId, resourceId); 713 714 context.freeLocks(); 715 globalTransactions.remove(txId); 716 if (logger.isFinerEnabled()) 717 logger.logFiner("Removing temporary light weight tx " + txId); 718 719 return exists; 720 } 721 722 public boolean resourceExists(Object txId, Object resourceId) throws ResourceManagerException { 723 lockResource(resourceId, txId, true); 724 return (getPathForRead(txId, resourceId) != null); 725 } 726 727 public void deleteResource(Object txId, Object resourceId) throws ResourceManagerException { 728 deleteResource(txId, resourceId, true); 729 } 730 731 public void deleteResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException { 732 733 if (logger.isFineEnabled()) logger.logFine(txId + " deleting " + resourceId); 734 735 lockResource(resourceId, txId, false); 736 737 if (getPathForRead(txId, resourceId) == null) { 738 if (assureOnly) { 739 return; 740 } 741 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId); 742 } 743 String txDeletePath = getDeletePath(txId, resourceId); 744 String mainPath = getMainPath(resourceId); 745 try { 746 getContext(txId).readOnly = false; 747 748 // first undo change / create when there was one 749 undoScheduledChangeOrCreate(txId, resourceId); 750 751 // if there still is a file in main store, we need to schedule 752 // a delete additionally 753 if (FileHelper.fileExists(mainPath)) { 754 FileHelper.createFile(txDeletePath); 755 } 756 } catch (IOException e) { 757 throw new ResourceManagerSystemException( 758 "Can not delete resource at '" + resourceId + "'", 759 ERR_SYSTEM, 760 txId, 761 e); 762 } 763 } 764 765 public void createResource(Object txId, Object resourceId) throws ResourceManagerException { 766 createResource(txId, resourceId, true); 767 } 768 769 public void createResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException { 770 771 if (logger.isFineEnabled()) logger.logFine(txId + " creating " + resourceId); 772 773 lockResource(resourceId, txId, false); 774 775 if (getPathForRead(txId, resourceId) != null) { 776 if (assureOnly) { 777 return; 778 } 779 throw new ResourceManagerException( 780 "Resource at '" + resourceId + "', already exists", 781 ERR_RESOURCE_EXISTS, 782 txId); 783 } 784 785 String txChangePath = getChangePath(txId, resourceId); 786 try { 787 getContext(txId).readOnly = false; 788 789 // creation means either undoing a delete or actually scheduling a create 790 if (!undoScheduledDelete(txId, resourceId)) { 791 FileHelper.createFile(txChangePath); 792 } 793 794 } catch (IOException e) { 795 throw new ResourceManagerSystemException( 796 "Can not create resource at '" + resourceId + "'", 797 ERR_SYSTEM, 798 txId, 799 e); 800 } 801 } 802 803 public void copyResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException { 804 if (logger.isFineEnabled()) logger.logFine(txId + " copying " + fromResourceId + " to " + toResourceId); 805 806 lockResource(fromResourceId, txId, true); 807 lockResource(toResourceId, txId, false); 808 809 if (resourceExists(txId, toResourceId) && !overwrite) { 810 throw new ResourceManagerException( 811 "Resource at '" + toResourceId + "' already exists", 812 ERR_RESOURCE_EXISTS, 813 txId); 814 } 815 816 InputStream fromResourceStream = null; 817 OutputStream toResourceStream = null; 818 try { 819 fromResourceStream = readResource(txId, fromResourceId); 820 toResourceStream = writeResource(txId, toResourceId); 821 FileHelper.copy(fromResourceStream, toResourceStream); 822 } catch (IOException e) { 823 throw new ResourceManagerException(ERR_SYSTEM, txId, e); 824 } finally { 825 closeOpenResource(fromResourceStream); 826 closeOpenResource(toResourceStream); 827 } 828 } 829 830 public void moveResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException { 831 if (logger.isFineEnabled()) logger.logFine(txId + " moving " + fromResourceId + " to " + toResourceId); 832 833 lockResource(fromResourceId, txId, false); 834 lockResource(toResourceId, txId, false); 835 836 copyResource(txId, fromResourceId, toResourceId, overwrite); 837 838 deleteResource(txId, fromResourceId, false); 839 } 840 841 public InputStream readResource(Object resourceId) throws ResourceManagerException { 842 // create temporary light weight tx 843 Object txId; 844 synchronized (globalTransactions) { 845 txId = generatedUniqueTxId(); 846 if (logger.isFinerEnabled()) 847 logger.logFiner("Creating temporary light weight tx " + txId + " for reading"); 848 TransactionContext context = new TransactionContext(txId); 849 context.isLightWeight = true; 850 // XXX higher isolation might be needed to make sure upgrade to commit lock always works 851 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED; 852 // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ; 853 globalTransactions.put(txId, context); 854 } 855 856 InputStream is = readResource(txId, resourceId); 857 return is; 858 } 859 860 public InputStream readResource(Object txId, Object resourceId) throws ResourceManagerException { 861 862 if (logger.isFineEnabled()) logger.logFine(txId + " reading " + resourceId); 863 864 lockResource(resourceId, txId, true); 865 866 String resourcePath = getPathForRead(txId, resourceId); 867 if (resourcePath == null) { 868 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId); 869 } 870 871 File file = new File(resourcePath); 872 try { 873 FileInputStream stream = new FileInputStream(file); 874 getContext(txId).registerResource(stream); 875 return new InputStreamWrapper(stream, txId, resourceId); 876 } catch (FileNotFoundException e) { 877 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId); 878 } 879 } 880 881 public OutputStream writeResource(Object txId, Object resourceId) throws ResourceManagerException { 882 return writeResource(txId, resourceId, false); 883 } 884 885 public OutputStream writeResource(Object txId, Object resourceId, boolean append) throws ResourceManagerException { 886 887 if (logger.isFineEnabled()) logger.logFine(txId + " writing " + resourceId); 888 889 lockResource(resourceId, txId, false); 890 891 if (append) { 892 String mainPath = getMainPath(resourceId); 893 String txChangePath = getChangePath(txId, resourceId); 894 String txDeletePath = getDeletePath(txId, resourceId); 895 896 boolean changeExists = FileHelper.fileExists(txChangePath); 897 boolean deleteExists = FileHelper.fileExists(txDeletePath); 898 boolean mainExists = FileHelper.fileExists(mainPath); 899 900 if (mainExists && !changeExists && !deleteExists) { 901 // the read and the write path for resourceId will be different! 902 copyResource(txId, resourceId, resourceId, true); 903 } 904 } 905 906 String resourcePath = getPathForWrite(txId, resourceId); 907 908 try { 909 FileOutputStream stream = new FileOutputStream(resourcePath, append); 910 TransactionContext context = getContext(txId); 911 context.registerResource(stream); 912 context.readOnly = false; 913 return stream; 914 } catch (FileNotFoundException e) { 915 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId); 916 } 917 } 918 919 /* 920 * --- additional public methods complementing implementation of interfaces --- 921 * 922 * 923 */ 924 925 /** 926 * Resets the store by deleting work <em>and</em> store directory. 927 */ 928 public synchronized void reset() { 929 FileHelper.removeRec(new File(storeDir)); 930 FileHelper.removeRec(new File(workDir)); 931 new File(storeDir).mkdirs(); 932 new File(workDir).mkdirs(); 933 } 934 935 /** 936 * Synchronizes persistent data with caches. Is implemented with an empty 937 * body, but called by other methods relying on synchronization. Subclasses 938 * that utilize caching must implement this method reasonably. 939 * 940 * @throws ResourceManagerSystemException if anything fatal hapened during synchonization 941 */ 942 public synchronized void sync() throws ResourceManagerSystemException { 943 } 944 945 /** 946 * Generates a transaction identifier unique to this resource manager. To do so 947 * it requires this resource manager to be started. 948 * 949 * @return generated transaction identifier 950 * @throws ResourceManagerSystemException if this resource manager has not been started, yet 951 */ 952 public String generatedUniqueTxId() throws ResourceManagerSystemException { 953 assureRMReady(); 954 String txId; 955 synchronized (globalTransactions) { 956 do { 957 txId = Long.toHexString(System.currentTimeMillis()) + "-" 958 + Integer.toHexString(idCnt++); 959 // XXX busy loop 960 } while (getContext(txId) != null); 961 } 962 return txId; 963 } 964 965 /* 966 * --- sane checks --- 967 * 968 * 969 */ 970 971 protected void fileInitialSaneCheck(Object txId, Object path) throws ResourceManagerException { 972 if (path == null || path.toString().length() == 0) { 973 throw new ResourceManagerException(ERR_RESOURCEID_INVALID, txId); 974 } 975 } 976 977 protected void assureStarted() throws ResourceManagerSystemException { 978 if (operationMode != OPERATION_MODE_STARTED) { 979 throw new ResourceManagerSystemException("Resource Manager Service not started", ERR_SYSTEM, null); 980 } 981 } 982 983 protected void assureRMReady() throws ResourceManagerSystemException { 984 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) { 985 throw new ResourceManagerSystemException("Resource Manager Service not ready", ERR_SYSTEM, null); 986 } 987 } 988 989 protected void assureNotMarkedForRollback(TransactionContext context) throws ResourceManagerException { 990 if (context.status == STATUS_MARKED_ROLLBACK) { 991 throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK, context.txId); 992 } 993 } 994 995 protected TransactionContext txInitialSaneCheckForWriting(Object txId) throws ResourceManagerException { 996 assureRMReady(); 997 // do not allow any further writing or commit or rollback when db is corrupt 998 if (dirty) { 999 throw new ResourceManagerSystemException( 1000 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!", 1001 ERR_SYSTEM, 1002 txId); 1003 } 1004 return txInitialSaneCheck(txId); 1005 } 1006 1007 protected TransactionContext txInitialSaneCheck(Object txId) throws ResourceManagerException { 1008 assureRMReady(); 1009 if (txId == null) { 1010 throw new ResourceManagerException(ERR_TXID_INVALID, txId); 1011 } 1012 1013 TransactionContext context = getContext(txId); 1014 1015 if (context == null) { 1016 throw new ResourceManagerException(ERR_NO_TX, txId); 1017 } 1018 1019 return context; 1020 } 1021 1022 /* 1023 * --- General Helpers --- 1024 * 1025 * 1026 */ 1027 1028 protected TransactionContext getContext(Object txId) { 1029 return (TransactionContext) globalTransactions.get(txId); 1030 } 1031 1032 protected String assureLeadingSlash(Object pathObject) { 1033 String path = ""; 1034 if (pathObject != null) { 1035 if (idMapper != null) { 1036 path = idMapper.getPathForId(pathObject); 1037 } else { 1038 path = pathObject.toString(); 1039 } 1040 if (path.length() > 0 && path.charAt(0) != '/' && path.charAt(0) != '\\') { 1041 path = "/" + path; 1042 } 1043 } 1044 return path; 1045 } 1046 1047 protected String getMainPath(Object path) { 1048 StringBuffer buf = new StringBuffer(storeDir.length() + path.toString().length() + 5); 1049 buf.append(storeDir).append(assureLeadingSlash(path)); 1050 return buf.toString(); 1051 } 1052 1053 protected String getTransactionBaseDir(Object txId) { 1054 return workDir + '/' + txIdMapper.getPathForId(txId); 1055 } 1056 1057 protected String getChangePath(Object txId, Object path) { 1058 String txBaseDir = getTransactionBaseDir(txId); 1059 StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length() 1060 + WORK_CHANGE_DIR.length() + 5); 1061 buf.append(txBaseDir).append('/').append(WORK_CHANGE_DIR).append(assureLeadingSlash(path)); 1062 return buf.toString(); 1063 } 1064 1065 protected String getDeletePath(Object txId, Object path) { 1066 String txBaseDir = getTransactionBaseDir(txId); 1067 StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length() 1068 + WORK_DELETE_DIR.length() + 5); 1069 buf.append(txBaseDir).append('/').append(WORK_DELETE_DIR).append(assureLeadingSlash(path)); 1070 return buf.toString(); 1071 } 1072 1073 protected boolean undoScheduledDelete(Object txId, Object resourceId) throws ResourceManagerException { 1074 String txDeletePath = getDeletePath(txId, resourceId); 1075 File deleteFile = new File(txDeletePath); 1076 if (deleteFile.exists()) { 1077 if (!deleteFile.delete()) { 1078 throw new ResourceManagerSystemException( 1079 "Failed to undo delete of '" + resourceId + "'", 1080 ERR_SYSTEM, 1081 txId); 1082 } 1083 return true; 1084 } 1085 return false; 1086 } 1087 1088 protected boolean undoScheduledChangeOrCreate(Object txId, Object resourceId) throws ResourceManagerException { 1089 String txChangePath = getChangePath(txId, resourceId); 1090 File changeFile = new File(txChangePath); 1091 if (changeFile.exists()) { 1092 if (!changeFile.delete()) { 1093 throw new ResourceManagerSystemException( 1094 "Failed to undo change / create of '" + resourceId + "'", 1095 ERR_SYSTEM, 1096 txId); 1097 } 1098 return true; 1099 } 1100 return false; 1101 } 1102 1103 protected String getPathForWrite(Object txId, Object resourceId) throws ResourceManagerException { 1104 try { 1105 // when we want to write, be sure to write to a local copy 1106 String txChangePath = getChangePath(txId, resourceId); 1107 if (!FileHelper.fileExists(txChangePath)) { 1108 FileHelper.createFile(txChangePath); 1109 } 1110 return txChangePath; 1111 } catch (IOException e) { 1112 throw new ResourceManagerSystemException( 1113 "Can not write to resource at '" + resourceId + "'", 1114 ERR_SYSTEM, 1115 txId, 1116 e); 1117 } 1118 } 1119 1120 protected String getPathForRead(Object txId, Object resourceId) throws ResourceManagerException { 1121 1122 String mainPath = getMainPath(resourceId); 1123 String txChangePath = getChangePath(txId, resourceId); 1124 String txDeletePath = getDeletePath(txId, resourceId); 1125 1126 // now, this gets a bit complicated: 1127 1128 boolean changeExists = FileHelper.fileExists(txChangePath); 1129 boolean deleteExists = FileHelper.fileExists(txDeletePath); 1130 boolean mainExists = FileHelper.fileExists(mainPath); 1131 boolean resourceIsDir = 1132 ((mainExists && new File(mainPath).isDirectory()) 1133 || (changeExists && new File(txChangePath).isDirectory())); 1134 if (resourceIsDir) { 1135 logger.logWarning("Resource at '" + resourceId + "' maps to directory"); 1136 } 1137 1138 // first do some sane checks 1139 1140 // this may never be, two cases are possible, both disallowing to have a delete together with a change 1141 // 1. first there was a change, than a delete -> at least delete file exists (when there is a file in main store) 1142 // 2. first there was a delete, than a change -> only change file exists 1143 if (!resourceIsDir && changeExists && deleteExists) { 1144 throw new ResourceManagerSystemException( 1145 "Inconsistent delete and change combination for resource at '" + resourceId + "'", 1146 ERR_TX_INCONSISTENT, 1147 txId); 1148 } 1149 1150 // you should not have been allowed to delete a file that does not exist at all 1151 if (deleteExists && !mainExists) { 1152 throw new ResourceManagerSystemException( 1153 "Inconsistent delete for resource at '" + resourceId + "'", 1154 ERR_TX_INCONSISTENT, 1155 txId); 1156 } 1157 1158 if (changeExists) { 1159 return txChangePath; 1160 } else if (mainExists && !deleteExists) { 1161 return mainPath; 1162 } else { 1163 return null; 1164 } 1165 } 1166 1167 /* 1168 * --- Locking Helpers --- 1169 * 1170 * 1171 */ 1172 1173 protected int getSharedLockLevel(TransactionContext context) throws ResourceManagerException { 1174 if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED 1175 || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) { 1176 return LOCK_ACCESS; 1177 } else if ( 1178 context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ 1179 || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) { 1180 return LOCK_SHARED; 1181 } else { 1182 return LOCK_ACCESS; 1183 } 1184 } 1185 1186 /* 1187 * --- Resource Management --- 1188 * 1189 * 1190 */ 1191 1192 protected void registerOpenResource(Object openResource) { 1193 if (logger.isFinerEnabled()) 1194 logger.logFiner("Registering open resource " + openResource); 1195 globalOpenResources.add(openResource); 1196 } 1197 1198 protected void releaseGlobalOpenResources() { 1199 ArrayList copy; 1200 synchronized (globalOpenResources) { 1201 // XXX need to copy in order to allow removal in releaseOpenResource 1202 copy = new ArrayList(globalOpenResources); 1203 for (Iterator it = copy.iterator(); it.hasNext();) { 1204 Object stream = it.next(); 1205 closeOpenResource(stream); 1206 } 1207 } 1208 } 1209 1210 protected void closeOpenResource(Object openResource) { 1211 if (logger.isFinerEnabled()) logger.logFiner("Releasing resource " + openResource); 1212 globalOpenResources.remove(openResource); 1213 if (openResource instanceof InputStream) { 1214 InputStream is = (InputStream) openResource; 1215 try { 1216 is.close(); 1217 } catch (IOException e) { 1218 // do not care, as it might have been closed somewhere else, before 1219 } 1220 } else if (openResource instanceof OutputStream) { 1221 OutputStream os = (OutputStream) openResource; 1222 try { 1223 os.close(); 1224 } catch (IOException e) { 1225 // do not care, as it might have been closed somewhere else, before 1226 } 1227 } 1228 } 1229 1230 /* 1231 * --- Recovery / Shutdown Support --- 1232 * 1233 * 1234 */ 1235 1236 protected boolean rollBackOrForward() { 1237 boolean allCool = true; 1238 1239 synchronized (globalTransactions) { 1240 ArrayList contexts = new ArrayList(globalTransactions.values()); 1241 for (Iterator it = contexts.iterator(); it.hasNext();) { 1242 TransactionContext context = (TransactionContext) it.next(); 1243 if (context.status == STATUS_COMMITTING) { 1244 // roll forward 1245 logger.logInfo("Rolling forward " + context.txId); 1246 1247 try { 1248 context.commit(); 1249 context.status = STATUS_COMMITTED; 1250 context.saveState(); 1251 globalTransactions.remove(context.txId); 1252 context.cleanUp(); 1253 } catch (ResourceManagerException e) { 1254 // this is not good, but what can we do now? 1255 allCool = false; 1256 logger.logSevere("Rolling forward of " + context.txId + " failed", e); 1257 } 1258 } else if (context.status == STATUS_COMMITTED) { 1259 logger.logInfo("Cleaning already commited " + context.txId); 1260 globalTransactions.remove(context.txId); 1261 try { 1262 context.cleanUp(); 1263 } catch (ResourceManagerException e) { 1264 // this is not good, but what can we do now? 1265 allCool = false; 1266 logger.logWarning("Cleaning of " + context.txId + " failed", e); 1267 } 1268 } else { 1269 // in all other cases roll back and warn when not rollback was explicitely selected for tx 1270 if (context.status != STATUS_ROLLING_BACK 1271 && context.status != STATUS_ROLLEDBACK 1272 && context.status != STATUS_MARKED_ROLLBACK) { 1273 logger.logWarning("Irregularly rolling back " + context.txId); 1274 } else { 1275 logger.logInfo("Rolling back " + context.txId); 1276 } 1277 try { 1278 context.rollback(); 1279 context.status = STATUS_ROLLEDBACK; 1280 context.saveState(); 1281 globalTransactions.remove(context.txId); 1282 context.cleanUp(); 1283 } catch (ResourceManagerException e) { 1284 logger.logWarning("Rolling back of " + context.txId + " failed", e); 1285 } 1286 } 1287 } 1288 1289 } 1290 return allCool; 1291 } 1292 1293 protected void recoverContexts() { 1294 File dir = new File(workDir); 1295 File[] files = dir.listFiles(); 1296 if (files == null) 1297 return; 1298 for (int i = 0; i < files.length; i++) { 1299 File file = files[i]; 1300 Object txId = txIdMapper.getIdForPath(file.getName()); 1301 // recover all transactions we do not already know 1302 if (!globalTransactions.containsKey(txId)) { 1303 1304 logger.logInfo("Recovering " + txId); 1305 TransactionContext context; 1306 try { 1307 context = new TransactionContext(txId); 1308 context.recoverState(); 1309 globalTransactions.put(txId, context); 1310 } catch (ResourceManagerException e) { 1311 // this is not good, but the best we get, just log as warning 1312 logger.logWarning("Recovering of " + txId + " failed"); 1313 } 1314 } 1315 } 1316 } 1317 1318 protected boolean waitForAllTxToStop(long timeoutMSecs) { 1319 long startTime = System.currentTimeMillis(); 1320 1321 // be sure not to lock globalTransactions for too long, as we need to give 1322 // txs the chance to complete (otherwise deadlocks are very likely to occur) 1323 // instead iterate over a copy as we can be sure no new txs will be registered 1324 // after operation level has been set to stopping 1325 1326 Collection transactionsToStop; 1327 synchronized (globalTransactions) { 1328 transactionsToStop = new ArrayList(globalTransactions.values()); 1329 } 1330 for (Iterator it = transactionsToStop.iterator(); it.hasNext();) { 1331 long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 1332 1333 if (remainingTimeout <= 0) { 1334 return false; 1335 } 1336 1337 TransactionContext context = (TransactionContext) it.next(); 1338 synchronized (context) { 1339 if (!context.finished) { 1340 logger.logInfo( 1341 "Waiting for tx " + context.txId + " to finish for " + remainingTimeout + " milli seconds"); 1342 } 1343 while (!context.finished && remainingTimeout > 0) { 1344 try { 1345 context.wait(remainingTimeout); 1346 } catch (InterruptedException e) { 1347 return false; 1348 } 1349 remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs; 1350 } 1351 if (context.finished) { 1352 logger.logInfo("Tx " + context.txId + " finished"); 1353 } else { 1354 logger.logWarning("Tx " + context.txId + " failed to finish in given time"); 1355 } 1356 } 1357 } 1358 1359 return (globalTransactions.size() == 0); 1360 } 1361 1362 protected boolean shutdown(int mode, long timeoutMSecs) { 1363 switch (mode) { 1364 case SHUTDOWN_MODE_NORMAL : 1365 return waitForAllTxToStop(timeoutMSecs); 1366 case SHUTDOWN_MODE_ROLLBACK : 1367 return rollBackOrForward(); 1368 case SHUTDOWN_MODE_KILL : 1369 return true; 1370 default : 1371 return false; 1372 } 1373 } 1374 1375 protected void setDirty(Object txId, Throwable t) { 1376 logger.logSevere( 1377 "Fatal error during critical commit/rollback of transaction " + txId + ", setting database to dirty.", 1378 t); 1379 dirty = true; 1380 } 1381 1382 /** 1383 * Inner class to hold the complete context, i.e. all information needed, for a transaction. 1384 * 1385 */ 1386 protected class TransactionContext { 1387 1388 protected Object txId; 1389 protected int status = STATUS_ACTIVE; 1390 protected int isolationLevel = DEFAULT_ISOLATION_LEVEL; 1391 protected long timeoutMSecs = getDefaultTransactionTimeout(); 1392 protected long startTime; 1393 protected long commitTime = -1L; 1394 protected boolean isLightWeight = false; 1395 protected boolean readOnly = true; 1396 protected boolean finished = false; 1397 1398 // list of streams participating in this tx 1399 private List openResources = new ArrayList(); 1400 1401 public TransactionContext(Object txId) throws ResourceManagerException { 1402 this.txId = txId; 1403 startTime = System.currentTimeMillis(); 1404 } 1405 1406 public long getRemainingTimeout() { 1407 long now = System.currentTimeMillis(); 1408 return (startTime - now + timeoutMSecs); 1409 } 1410 1411 public synchronized void init() throws ResourceManagerException { 1412 String baseDir = getTransactionBaseDir(txId); 1413 String changeDir = baseDir + "/" + WORK_CHANGE_DIR; 1414 String deleteDir = baseDir + "/" + WORK_DELETE_DIR; 1415 1416 new File(changeDir).mkdirs(); 1417 new File(deleteDir).mkdirs(); 1418 1419 saveState(); 1420 } 1421 1422 public synchronized void rollback() throws ResourceManagerException { 1423 closeResources(); 1424 freeLocks(); 1425 } 1426 1427 public synchronized void commit() throws ResourceManagerException { 1428 String baseDir = getTransactionBaseDir(txId); 1429 String changeDir = baseDir + "/" + WORK_CHANGE_DIR; 1430 String deleteDir = baseDir + "/" + WORK_DELETE_DIR; 1431 1432 closeResources(); 1433 upgradeLockToCommit(); 1434 try { 1435 applyDeletes(new File(deleteDir), new File(storeDir), new File(storeDir)); 1436 FileHelper.moveRec(new File(changeDir), new File(storeDir)); 1437 } catch (IOException e) { 1438 throw new ResourceManagerSystemException("Commit failed", ERR_SYSTEM, txId, e); 1439 } 1440 freeLocks(); 1441 commitTime = System.currentTimeMillis(); 1442 } 1443 1444 public synchronized void notifyFinish() { 1445 finished = true; 1446 notifyAll(); 1447 } 1448 1449 public synchronized void cleanUp() throws ResourceManagerException { 1450 if (!cleanUp) 1451 return; // XXX for debugging only 1452 boolean clean = true; 1453 Exception cleanException = null; 1454 String baseDir = getTransactionBaseDir(txId); 1455 FileHelper.removeRec(new File(baseDir)); 1456 if (!clean) { 1457 throw new ResourceManagerSystemException( 1458 "Clean up failed due to unreleasable lock", 1459 ERR_SYSTEM, 1460 txId, 1461 cleanException); 1462 } 1463 } 1464 1465 public synchronized void finalCleanUp() throws ResourceManagerException { 1466 closeResources(); 1467 freeLocks(); 1468 } 1469 1470 public synchronized void upgradeLockToCommit() throws ResourceManagerException { 1471 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) { 1472 GenericLock lock = (GenericLock) it.next(); 1473 // only upgrade if we had write access 1474 if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) { 1475 try { 1476 // in case of deadlocks, make failure of non-committing tx more likely 1477 if (!lock 1478 .acquire( 1479 txId, 1480 LOCK_COMMIT, 1481 true, 1482 true, 1483 getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR)) { 1484 throw new ResourceManagerException( 1485 "Could not upgrade to commit lock for resource at '" 1486 + lock.getResourceId().toString() 1487 + "'", 1488 ERR_NO_LOCK, 1489 txId); 1490 } 1491 } catch (InterruptedException e) { 1492 throw new ResourceManagerSystemException(ERR_SYSTEM, txId, e); 1493 } 1494 } 1495 1496 } 1497 } 1498 1499 public synchronized void freeLocks() { 1500 lockManager.releaseAll(txId); 1501 } 1502 1503 public synchronized void closeResources() { 1504 synchronized (globalOpenResources) { 1505 for (Iterator it = openResources.iterator(); it.hasNext();) { 1506 Object stream = it.next(); 1507 closeOpenResource(stream); 1508 } 1509 } 1510 } 1511 1512 public synchronized void registerResource(Object openResource) { 1513 synchronized (globalOpenResources) { 1514 registerOpenResource(openResource); 1515 openResources.add(openResource); 1516 } 1517 } 1518 1519 public synchronized void saveState() throws ResourceManagerException { 1520 String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE; 1521 File file = new File(statePath); 1522 BufferedWriter writer = null; 1523 try { 1524 OutputStream os = new FileOutputStream(file); 1525 writer = new BufferedWriter(new OutputStreamWriter(os, DEFAULT_PARAMETER_ENCODING)); 1526 writer.write(toString()); 1527 } catch (FileNotFoundException e) { 1528 String msg = "Saving status information to '" + statePath + "' failed! Could not create file"; 1529 logger.logSevere(msg, e); 1530 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1531 } catch (IOException e) { 1532 String msg = "Saving status information to '" + statePath + "' failed"; 1533 logger.logSevere(msg, e); 1534 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1535 } finally { 1536 if (writer != null) { 1537 try { 1538 writer.close(); 1539 } catch (IOException e) { 1540 } 1541 1542 } 1543 } 1544 } 1545 1546 public synchronized void recoverState() throws ResourceManagerException { 1547 String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE; 1548 File file = new File(statePath); 1549 BufferedReader reader = null; 1550 try { 1551 InputStream is = new FileInputStream(file); 1552 1553 reader = new BufferedReader(new InputStreamReader(is, DEFAULT_PARAMETER_ENCODING)); 1554 txId = reader.readLine(); 1555 status = Integer.parseInt(reader.readLine()); 1556 isolationLevel = Integer.parseInt(reader.readLine()); 1557 timeoutMSecs = Long.parseLong(reader.readLine()); 1558 startTime = Long.parseLong(reader.readLine()); 1559 } catch (FileNotFoundException e) { 1560 String msg = "Recovering status information from '" + statePath + "' failed! Could not find file"; 1561 logger.logSevere(msg, e); 1562 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId); 1563 } catch (IOException e) { 1564 String msg = "Recovering status information from '" + statePath + "' failed"; 1565 logger.logSevere(msg, e); 1566 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e); 1567 } catch (Throwable t) { 1568 String msg = "Recovering status information from '" + statePath + "' failed"; 1569 logger.logSevere(msg, t); 1570 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, t); 1571 } finally { 1572 if (reader != null) { 1573 try { 1574 reader.close(); 1575 } catch (IOException e) { 1576 } 1577 1578 } 1579 } 1580 } 1581 1582 public synchronized String toString() { 1583 StringBuffer buf = new StringBuffer(); 1584 buf.append(txId).append('\n'); 1585 buf.append(Integer.toString(status)).append('\n'); 1586 buf.append(Integer.toString(isolationLevel)).append('\n'); 1587 buf.append(Long.toString(timeoutMSecs)).append('\n'); 1588 buf.append(Long.toString(startTime)).append('\n'); 1589 if (debug) { 1590 buf.append("----- Lock Debug Info -----\n"); 1591 1592 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) { 1593 GenericLock lock = (GenericLock) it.next(); 1594 buf.append(lock.toString()+"\n"); 1595 } 1596 1597 } 1598 return buf.toString(); 1599 } 1600 1601 } 1602 1603 private class InputStreamWrapper extends InputStream { 1604 private InputStream is; 1605 private Object txId; 1606 private Object resourceId; 1607 1608 public InputStreamWrapper(InputStream is, Object txId, Object resourceId) { 1609 this.is = is; 1610 this.txId = txId; 1611 this.resourceId = resourceId; 1612 } 1613 1614 public int read() throws IOException { 1615 return is.read(); 1616 } 1617 1618 public int read(byte b[]) throws IOException { 1619 return is.read(b); 1620 } 1621 1622 public int read(byte b[], int off, int len) throws IOException { 1623 return is.read(b, off, len); 1624 } 1625 1626 public int available() throws IOException { 1627 return is.available(); 1628 } 1629 1630 public void close() throws IOException { 1631 try { 1632 is.close(); 1633 } finally { 1634 TransactionContext context; 1635 synchronized (globalTransactions) { 1636 context = getContext(txId); 1637 if (context == null) { 1638 return; 1639 } 1640 } 1641 synchronized (context) { 1642 if (context.isLightWeight) { 1643 if (logger.isFinerEnabled()) 1644 logger.logFiner("Upon close of resource removing temporary light weight tx " + txId); 1645 context.freeLocks(); 1646 globalTransactions.remove(txId); 1647 } else { 1648 // release access lock in order to allow other transactions to commit 1649 if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) { 1650 if (logger.isFinerEnabled()) { 1651 logger.logFiner("Upon close of resource releasing access lock for tx " + txId + " on resource at " + resourceId); 1652 } 1653 lockManager.release(txId, resourceId); 1654 } 1655 } 1656 } 1657 } 1658 } 1659 1660 public void mark(int readlimit) { 1661 is.mark(readlimit); 1662 } 1663 1664 public void reset() throws IOException { 1665 is.reset(); 1666 } 1667 1668 public boolean markSupported() { 1669 return is.markSupported(); 1670 1671 } 1672 1673 } 1674 1675}