View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.transaction.file;
18  
19  import java.io.BufferedReader;
20  import java.io.BufferedWriter;
21  import java.io.File;
22  import java.io.FileInputStream;
23  import java.io.FileNotFoundException;
24  import java.io.FileOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.io.OutputStream;
29  import java.io.OutputStreamWriter;
30  import java.util.ArrayList;
31  import java.util.Collection;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Iterator;
36  import java.util.Collections;
37  
38  import org.apache.commons.transaction.locking.GenericLock;
39  import org.apache.commons.transaction.locking.GenericLockManager;
40  import org.apache.commons.transaction.locking.LockException;
41  import org.apache.commons.transaction.locking.LockManager2;
42  import org.apache.commons.transaction.util.FileHelper;
43  import org.apache.commons.transaction.util.LoggerFacade;
44  
45  /**
46   * A resource manager for streamable objects stored in a file system.
47   * 
48   * It is intended for developer and "out of the box" use. 
49   * It is <em>not</em> intended to be a real alternative for
50   * a full blown DMBS (of course it can not be compared to a RDBMS at all).
51   * 
52   * Major features:<br>
53   * <ul>
54   * <li>Transactions performed with this class more or less comform to the widely accepted ACID properties
55   * <li>Reading should be as fast as from the ordinary file system (at the cost of a bit slower commits) 
56   * </ul>
57   * 
58   * Compared to a "real" DBMS major limitations are (in order of assumed severity):<br>
59   * <ul>
60   * <li>Number of simultaneously open resources is limited to the number of available file descriptors
61   * <li>It does not scale a bit
62   * <li>Pessimistic transaction and locking scheme
63   * <li>Isolation level currently is restricted to <em>read committed</em> and <em>repeated read</em> (which is not that bad)
64   * </ul>
65   * 
66   * <em>Important</em>: If possible you should have the work and store directory located in the 
67   * same file system. If not, you might get additional problems, as there are:
68   * <ul>
69   * <li>On commit it might be necessay to copy files instead of rename/relink them. This may lead to time consuming, 
70   * overly blocking commit phases and higher risk of corrupted files
71   * <li>Prepare phase might be too permissive, no check for sufficient memory on store file system is possible
72   * </ul> 
73   * 
74   * General limitations include:<br>
75   * <ul>
76   * <li>Due to lack of synchronization on the transaction context level, every transaction may only be
77   * accessed by a <em>single thread</em> throughout its full life. 
78   * This means it is forbidden for a thread that has not started a transaction 
79   * to perform any operations inside this transaction. However, threads associated
80   * with different transactions can safely access these methods concurrently.
81   * Reasons for the lack of synchronization are improved performance and simplicity (of the code of this class).
82   * <li>There is no dedicated class for a transaction. Having such a class would be better practice and 
83   * make certain actions more intuitive.
84   * <li>Resource identifiers need a reasonsable string representation obtainable by <code>toString</code>.
85   * More specifically, they will have to resolve to a <em>valid</em> file path that does note denote a directory. 
86   * If it does, you might be able to create it, but not to read or write anything 
87   * from resp. to it. Valid string representations of a resource idenfier are 
88   * for example "file" "/root" or "hjfhdfhuhuhsdufhdsufhdsufhdfuhdfduhduhduhdu". 
89   * Invalid are for example "/" or "/root/". Invalid on some file systems are for example "c:" or "file://huhu".
90   * <li>As there are no active processes inside this RM and it shares its threads with the application,
91   * control over transactions is limited to points where the application calls the RM. 
92   * In particular, this disables <em>active</em> termination of transactions upon timeout.
93   * <li>There is no notion of a connection to this file manager. This means you can not connect from hosts other than
94   * local and you will get problems when plugging this store into a J2EE store using connectors. 
95   * <li>Methods should throw more specific exceptions
96   * </ul>
97   * 
98   * <p><em>Caution</em>:<br>
99   * The <code>txId</code> passed to many methods as an identifier for the
100  * transaction concerned will function as a key in a <code>HashMap</code>.
101  * Thus assure that <code>equals</code> and <code>hashCode</code> are both
102  * properly implemented and match each other.</p>
103  *  
104  * <p><em>Caution</em>:<br>
105  * You will have to guarantee that no other process will access neither
106  * the store or the working dir concurrently to this <code>FileResourceManager</code>.</p>
107  * 
108  * <p><em>Special Caution</em>:<br>
109  * Be very careful not to have two instances of <code>FileResourceManager</code>
110  * working in the same store and/or working dir.
111  *   
112  * @version $Id: FileResourceManager.java 573315 2007-09-06 16:28:42Z ozeigermann $
113  */
114 public class FileResourceManager implements ResourceManager, ResourceManagerErrorCodes {
115 
116     // reflects the natural isolation level of this store
117     protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ;
118     protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL;
119 
120     protected static final int NO_LOCK = 0;
121     protected static final int LOCK_ACCESS = NO_LOCK + 1;
122     protected static final int LOCK_SHARED = NO_LOCK + 2;
123     protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3;
124     protected static final int LOCK_COMMIT = NO_LOCK + 4;
125 
126     protected static final int OPERATION_MODE_STOPPED = 0;
127     protected static final int OPERATION_MODE_STOPPING = 1;
128     protected static final int OPERATION_MODE_STARTED = 2;
129     protected static final int OPERATION_MODE_STARTING = 3;
130     protected static final int OPERATION_MODE_RECOVERING = 4;
131 
132     protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15";
133 
134     protected static final int DEFAULT_TIMEOUT_MSECS = 5000;
135     protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2;
136 
137     protected static final String WORK_CHANGE_DIR = "change";
138     protected static final String WORK_DELETE_DIR = "delete";
139 
140     protected static final String CONTEXT_FILE = "transaction.log";
141 
142     /*
143      * --- Static helper methods ---
144      *
145      *  
146      */
147 
148     protected static void applyDeletes(File removeDir, File targetDir, File rootDir)
149             throws IOException {
150         if (removeDir.isDirectory() && targetDir.isDirectory()) {
151             File[] files = removeDir.listFiles();
152             for (int i = 0; i < files.length; i++) {
153                 File removeFile = files[i];
154                 File targetFile = new File(targetDir, removeFile.getName());
155                 if (removeFile.isFile()) {
156                     if (targetFile.exists()) {
157                         if (!targetFile.delete()) {
158                             throw new IOException("Could not delete file " + removeFile.getName()
159                                     + " in directory targetDir");
160                         }
161                     }
162                     // indicate, this has been done
163                     removeFile.delete();
164                 } else {
165                     applyDeletes(removeFile, targetFile, rootDir);
166                 }
167             }
168             // delete empty target directories, except root dir
169             if (!targetDir.equals(rootDir) && targetDir.list().length == 0) {
170                 targetDir.delete();
171             }
172         }
173     }
174     
175     /*
176      * --- object members ---
177      * 
178      *  
179      */
180 
181     protected String workDir;
182     protected String storeDir;
183     protected boolean cleanUp = true;
184     protected boolean dirty = false;
185     protected int operationMode = OPERATION_MODE_STOPPED;
186     protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS;
187     protected boolean debug;
188 
189     protected LoggerFacade logger;
190 
191     protected Map globalTransactions;
192     protected List globalOpenResources;
193     protected LockManager2 lockManager;
194 
195     protected ResourceIdToPathMapper idMapper = null;
196     protected TransactionIdToPathMapper txIdMapper = null;
197 
198     protected int idCnt = 0;
199 
200     /*
201      * --- ctor and general getter / setter methods ---
202      *
203      *  
204      */
205 
206     /**
207      * Creates a new resource manager operation on the specified directories.
208      * 
209      * @param storeDir directory where main data should go after commit
210      * @param workDir directory where transactions store temporary data
211      * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
212      * @param logger the logger to be used by this store  
213      */
214     public FileResourceManager(String storeDir, String workDir, boolean urlEncodePath, LoggerFacade logger) {
215         this(storeDir, workDir, urlEncodePath, logger, false);
216     }
217 
218     /**
219      * Creates a new resource manager operation on the specified directories.
220      * 
221      * @param storeDir directory where main data should go after commit
222      * @param workDir directory where transactions store temporary data 
223      * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
224      * @param logger the logger to be used by this store
225      * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 
226      */
227     public FileResourceManager(
228         String storeDir,
229         String workDir,
230         boolean urlEncodePath,
231         LoggerFacade logger,
232         boolean debug) {
233         this(storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper() : null, new NoOpTransactionIdToPathMapper(), logger, debug);
234     }
235 
236     /**
237      * Creates a new resource manager operation on the specified directories.
238      * This constructor is reintroduced for backwards API compatibility and is used by jakarta-slide.
239      *
240      * @param storeDir directory where main data should go after commit
241      * @param workDir directory where transactions store temporary data
242      * @param idMapper mapper for resourceId to path
243      * @param logger the logger to be used by this store
244      * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
245      */
246     public FileResourceManager(
247         String storeDir,
248         String workDir,
249         ResourceIdToPathMapper idMapper,
250         LoggerFacade logger,
251         boolean debug) {
252         this(storeDir, workDir, idMapper, new NoOpTransactionIdToPathMapper(), logger, debug);
253     }
254     /**
255      * Creates a new resource manager operation on the specified directories.
256      * 
257      * @param storeDir directory where main data should go after commit
258      * @param workDir directory where transactions store temporary data 
259      * @param idMapper mapper for resourceId to path
260      * @param txIdMapper mapper for transaction id to path
261      * @param logger the logger to be used by this store
262      * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection 
263      */
264     public FileResourceManager(
265         String storeDir,
266         String workDir,
267         ResourceIdToPathMapper idMapper,
268         TransactionIdToPathMapper txIdMapper,
269         LoggerFacade logger,
270         boolean debug) {
271         this.workDir = workDir;
272         this.storeDir = storeDir;
273         this.idMapper = idMapper;
274         this.txIdMapper = txIdMapper;
275         this.logger = logger;
276         this.debug = debug;
277     }
278 
279     /**
280      * Gets the store directory.
281      * 
282      * @return the store directory
283      * @see #FileResourceManager(String, String, boolean, LoggerFacade)
284      * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
285      */
286     public String getStoreDir() {
287         return storeDir;
288     }
289 
290     /**
291      * Gets the working directory.
292      * 
293      * @return the work directory
294      * @see #FileResourceManager(String, String, boolean, LoggerFacade)
295      * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
296      */
297     public String getWorkDir() {
298         return workDir;
299     }
300 
301     /**
302      * Gets the logger used by this resource manager. 
303      * 
304      * @return used logger 
305      */
306     public LoggerFacade getLogger() {
307         return logger;
308     }
309 
310     /*
311      * --- public methods of interface ResourceManager ---
312      *
313      *  
314      */
315 
316     public boolean lockResource(Object resourceId, Object txId) throws ResourceManagerException {
317         lockResource(resourceId, txId, false);
318         // XXX will never return false as it will either throw or return true
319         return true;
320     }
321 
322     public boolean lockResource(Object resourceId, Object txId, boolean shared) throws ResourceManagerException {
323         lockResource(resourceId, txId, shared, true, Long.MAX_VALUE, true);
324         // XXX will never return false as it will either throw or return true
325         return true;
326     }
327 
328     public boolean lockResource(
329         Object resourceId,
330         Object txId,
331         boolean shared,
332         boolean wait,
333         long timeoutMSecs,
334         boolean reentrant)
335         throws ResourceManagerException {
336 
337         TransactionContext context = (shared ? txInitialSaneCheck(txId) : txInitialSaneCheckForWriting(txId));
338         assureNotMarkedForRollback(context);
339         fileInitialSaneCheck(txId, resourceId);
340 
341         // XXX allows locking of non existent resources (e.g. to prepare a create)
342         int level = (shared ? getSharedLockLevel(context) : LOCK_EXCLUSIVE);
343         try {
344             lockManager.lock(txId, resourceId, level, reentrant, Math.min(timeoutMSecs,
345                     context.timeoutMSecs));
346             // XXX will never return false as it will either throw or return true
347             return true;
348         } catch (LockException e) {
349             switch (e.getCode()) {
350             case LockException.CODE_INTERRUPTED:
351                 throw new ResourceManagerException("Could not get lock for resource at '"
352                         + resourceId + "'", ERR_NO_LOCK, txId);
353             case LockException.CODE_TIMED_OUT:
354                 throw new ResourceManagerException("Lock timed out for resource at '" + resourceId
355                         + "'", ERR_NO_LOCK, txId);
356             case LockException.CODE_DEADLOCK_VICTIM:
357                 throw new ResourceManagerException("Deadlock victim resource at '" + resourceId
358                         + "'", ERR_DEAD_LOCK, txId);
359             default :
360                 throw new ResourceManagerException("Locking exception for resource at '" + resourceId
361                         + "'", ERR_DEAD_LOCK, txId);
362             }
363         }
364     }
365 
366     public int getDefaultIsolationLevel() {
367         return DEFAULT_ISOLATION_LEVEL;
368     }
369 
370     public int[] getSupportedIsolationLevels() throws ResourceManagerException {
371         return new int[] { ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ };
372     }
373 
374     public boolean isIsolationLevelSupported(int level) throws ResourceManagerException {
375         return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ);
376     }
377 
378     /**
379      * Gets the default transaction timeout in <em>milliseconds</em>.
380      */
381     public long getDefaultTransactionTimeout() {
382         return defaultTimeout;
383     }
384 
385     /**
386      * Sets the default transaction timeout.
387      * 
388      * @param timeout timeout in <em>milliseconds</em>
389      */
390     public void setDefaultTransactionTimeout(long timeout) {
391         defaultTimeout = timeout;
392     }
393 
394     public long getTransactionTimeout(Object txId) throws ResourceManagerException {
395         assureRMReady();
396         long msecs = 0;
397         TransactionContext context = getContext(txId);
398         if (context == null) {
399             msecs = getDefaultTransactionTimeout();
400         } else {
401             msecs = context.timeoutMSecs;
402         }
403         return msecs;
404     }
405 
406     public void setTransactionTimeout(Object txId, long mSecs) throws ResourceManagerException {
407         assureRMReady();
408         TransactionContext context = getContext(txId);
409         if (context != null) {
410             context.timeoutMSecs = mSecs;
411         } else {
412             throw new ResourceManagerException(ERR_NO_TX, txId);
413         }
414     }
415 
416     public int getIsolationLevel(Object txId) throws ResourceManagerException {
417         assureRMReady();
418         TransactionContext context = getContext(txId);
419         if (context == null) {
420             return DEFAULT_ISOLATION_LEVEL;
421         } else {
422             return context.isolationLevel;
423         }
424     }
425 
426     public void setIsolationLevel(Object txId, int level) throws ResourceManagerException {
427         assureRMReady();
428         TransactionContext context = getContext(txId);
429         if (context != null) {
430             if (level != ISOLATION_LEVEL_READ_COMMITTED || level != ISOLATION_LEVEL_REPEATABLE_READ) {
431                 context.isolationLevel = level;
432             } else {
433                 throw new ResourceManagerException(ERR_ISOLATION_LEVEL_UNSUPPORTED, txId);
434             }
435         } else {
436             throw new ResourceManagerException(ERR_NO_TX, txId);
437         }
438     }
439 
440     public synchronized void start() throws ResourceManagerSystemException {
441 
442         logger.logInfo("Starting RM at '" + storeDir + "' / '" + workDir + "'");
443 
444         operationMode = OPERATION_MODE_STARTING;
445 
446         globalTransactions = Collections.synchronizedMap(new HashMap());
447         lockManager = new GenericLockManager(LOCK_COMMIT, logger);
448         globalOpenResources = Collections.synchronizedList(new ArrayList());
449 
450         recover();
451         sync();
452 
453         operationMode = OPERATION_MODE_STARTED;
454 
455         if (dirty) {
456             logger.logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)");
457         } else {
458             logger.logInfo("Started RM");
459         }
460 
461     }
462 
463     public synchronized boolean stop(int mode) throws ResourceManagerSystemException {
464         return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR);
465     }
466 
467     public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException {
468 
469         logger.logInfo("Stopping RM at '" + storeDir + "' / '" + workDir + "'");
470 
471         operationMode = OPERATION_MODE_STOPPING;
472 
473         sync();
474         boolean success = shutdown(mode, timeOut);
475 
476         releaseGlobalOpenResources();
477 
478         if (success) {
479             operationMode = OPERATION_MODE_STOPPED;
480             logger.logInfo("Stopped RM");
481         } else {
482             logger.logWarning("Failed to stop RM");
483         }
484 
485         return success;
486     }
487 
488     public synchronized boolean recover() throws ResourceManagerSystemException {
489         if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STARTING) {
490             throw new ResourceManagerSystemException(
491                 ERR_SYSTEM,
492                 "Recovery is possible in started or starting resource manager only");
493         }
494         int oldMode = operationMode;
495         operationMode = OPERATION_MODE_RECOVERING;
496 
497         recoverContexts();
498         if (globalTransactions.size() > 0) {
499             logger.logInfo("Recovering pending transactions");
500         }
501 
502         dirty = !rollBackOrForward();
503 
504         operationMode = oldMode;
505         return dirty;
506     }
507 
508     public int getTransactionState(Object txId) throws ResourceManagerException {
509         TransactionContext context = getContext(txId);
510 
511         if (context == null) {
512             return STATUS_NO_TRANSACTION;
513         } else {
514             return context.status;
515         }
516 
517     }
518 
519     public void startTransaction(Object txId) throws ResourceManagerException {
520 
521         if (logger.isFineEnabled()) logger.logFine("Starting Tx " + txId);
522 
523         assureStarted(); // can only start a new transaction when not already stopping
524         if (txId == null || txIdMapper.getPathForId(txId).length() == 0) {
525             throw new ResourceManagerException(ERR_TXID_INVALID, txId);
526         }
527 
528         // be sure we are the only ones who create this tx 
529         synchronized (globalTransactions) {
530             TransactionContext context = getContext(txId);
531 
532             if (context != null) {
533                 throw new ResourceManagerException(ERR_DUP_TX, txId);
534             }
535 
536             context = new TransactionContext(txId);
537             context.init();
538             globalTransactions.put(txId, context);
539 
540         }
541     }
542 
543     public void markTransactionForRollback(Object txId) throws ResourceManagerException {
544         assureRMReady();
545         TransactionContext context = txInitialSaneCheckForWriting(txId);
546         try {
547             context.status = STATUS_MARKED_ROLLBACK;
548             context.saveState();
549         } finally {
550             // be very sure to free locks and resources, as application might crash or otherwise forget to roll this tx back
551             context.finalCleanUp();
552         }
553     }
554 
555     public int prepareTransaction(Object txId) throws ResourceManagerException {
556         assureRMReady();
557         // do not allow any further writing or commit or rollback when db is corrupt
558         if (dirty) {
559             throw new ResourceManagerSystemException(
560                 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
561                 ERR_SYSTEM,
562                 txId);
563         }
564 
565         if (txId == null) {
566             throw new ResourceManagerException(ERR_TXID_INVALID, txId);
567         }
568 
569         TransactionContext context = getContext(txId);
570 
571         if (context == null) {
572             return PREPARE_FAILURE;
573         }
574 
575         synchronized (context) {
576 
577             sync();
578 
579             if (context.status != STATUS_ACTIVE) {
580                 context.status = STATUS_MARKED_ROLLBACK;
581                 context.saveState();
582                 return PREPARE_FAILURE;
583             }
584 
585             if (logger.isFineEnabled()) logger.logFine("Preparing Tx " + txId);
586 
587             int prepareStatus = PREPARE_FAILURE;
588 
589             context.status = STATUS_PREPARING;
590             context.saveState();
591             // do all checks as early as possible
592             context.closeResources();
593             if (context.readOnly) {
594                 prepareStatus = PREPARE_SUCCESS_READONLY;
595             } else {
596                 // do all checks as early as possible
597                 try {
598                     context.upgradeLockToCommit();
599                 } catch (ResourceManagerException rme) {
600                     // if this did not work, mark it for roll back as early as possible
601                     markTransactionForRollback(txId);
602                     throw rme;
603                 }
604                 prepareStatus = PREPARE_SUCCESS;
605             }
606             context.status = STATUS_PREPARED;
607             context.saveState();
608             if (logger.isFineEnabled()) logger.logFine("Prepared Tx " + txId);
609 
610             return prepareStatus;
611         }
612     }
613 
614     public void rollbackTransaction(Object txId) throws ResourceManagerException {
615         assureRMReady();
616         TransactionContext context = txInitialSaneCheckForWriting(txId);
617         // needing synchronization in order not to interfer with shutdown thread
618         synchronized (context) {
619             try {
620 
621                 if (logger.isFineEnabled()) logger.logFine("Rolling back Tx " + txId);
622 
623                 context.status = STATUS_ROLLING_BACK;
624                 context.saveState();
625                 context.rollback();
626                 if (logger.isFineEnabled()) logger.logFine("All resources successfully removed for tx" + txId);
627                 context.status = STATUS_ROLLEDBACK;
628                 context.saveState();
629                 globalTransactions.remove(txId);
630                 context.cleanUp();
631 
632                 if (logger.isFineEnabled()) logger.logFine("Rolled back Tx " + txId);
633 
634                 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
635             } catch (Error e) {
636                 setDirty(txId, e);
637                 throw e;
638             } catch (RuntimeException e) {
639                 setDirty(txId, e);
640                 throw e;
641             } catch (ResourceManagerSystemException e) {
642                 setDirty(txId, e);
643                 throw e;
644             } finally {
645                 context.finalCleanUp();
646                 // tell shutdown thread this tx is finished
647                 context.notifyFinish();
648             }
649         }
650     }
651 
652     public void commitTransaction(Object txId) throws ResourceManagerException {
653         assureRMReady();
654         TransactionContext context = txInitialSaneCheckForWriting(txId);
655         assureNotMarkedForRollback(context);
656 
657         // needing synchronization in order not to interfer with shutdown thread
658         synchronized (context) {
659             try {
660 
661                 if (logger.isFineEnabled()) logger.logFine("Committing Tx " + txId);
662 
663                 context.status = STATUS_COMMITTING;
664                 context.saveState();
665                 context.commit();
666                 if (logger.isFineEnabled()) logger.logFine("All resources successfully moved for tx" + txId);
667                 context.status = STATUS_COMMITTED;
668                 context.saveState();
669                 globalTransactions.remove(txId);
670                 context.cleanUp();
671 
672                 if (logger.isFineEnabled()) logger.logFine("Committed Tx " + txId);
673 
674                 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
675             } catch (Error e) {
676                 setDirty(txId, e);
677                 throw e;
678             } catch (RuntimeException e) {
679                 setDirty(txId, e);
680                 throw e;
681             } catch (ResourceManagerSystemException e) {
682                 setDirty(txId, e);
683                 throw e;
684                 // like "could not upgrade lock"
685             } catch (ResourceManagerException e) {
686                 logger.logWarning("Could not commit tx " + txId + ", rolling back instead", e);
687                 rollbackTransaction(txId);
688             } finally {
689                 context.finalCleanUp();
690                 // tell shutdown thread this tx is finished
691                 context.notifyFinish();
692             }
693         }
694     }
695 
696     public boolean resourceExists(Object resourceId) throws ResourceManagerException {
697         // create temporary light weight tx
698         Object txId;
699         TransactionContext context;
700         synchronized (globalTransactions) {
701             txId = generatedUniqueTxId();
702             if (logger.isFinerEnabled())
703                 logger.logFiner("Creating temporary light weight tx " + txId + " to check for exists");
704             context = new TransactionContext(txId);
705             context.isLightWeight = true;
706             // XXX higher isolation might be needed to make sure upgrade to commit lock always works
707             context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
708             // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
709             globalTransactions.put(txId, context);
710         }
711 
712         boolean exists = resourceExists(txId, resourceId);
713 
714         context.freeLocks();
715         globalTransactions.remove(txId);
716         if (logger.isFinerEnabled())
717             logger.logFiner("Removing temporary light weight tx " + txId);
718 
719         return exists;
720     }
721 
722     public boolean resourceExists(Object txId, Object resourceId) throws ResourceManagerException {
723         lockResource(resourceId, txId, true);
724         return (getPathForRead(txId, resourceId) != null);
725     }
726 
727     public void deleteResource(Object txId, Object resourceId) throws ResourceManagerException {
728         deleteResource(txId, resourceId, true);
729     }
730 
731     public void deleteResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
732 
733         if (logger.isFineEnabled()) logger.logFine(txId + " deleting " + resourceId);
734 
735         lockResource(resourceId, txId, false);
736 
737         if (getPathForRead(txId, resourceId) == null) {
738             if (assureOnly) {
739                 return;
740             }
741             throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
742         }
743         String txDeletePath = getDeletePath(txId, resourceId);
744         String mainPath = getMainPath(resourceId);
745         try {
746             getContext(txId).readOnly = false;
747 
748             // first undo change / create when there was one
749             undoScheduledChangeOrCreate(txId, resourceId);
750 
751             // if there still is a file in main store, we need to schedule
752             // a delete additionally
753             if (FileHelper.fileExists(mainPath)) {
754                 FileHelper.createFile(txDeletePath);
755             }
756         } catch (IOException e) {
757             throw new ResourceManagerSystemException(
758                 "Can not delete resource at '" + resourceId + "'",
759                 ERR_SYSTEM,
760                 txId,
761                 e);
762         }
763     }
764 
765     public void createResource(Object txId, Object resourceId) throws ResourceManagerException {
766         createResource(txId, resourceId, true);
767     }
768 
769     public void createResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
770 
771         if (logger.isFineEnabled()) logger.logFine(txId + " creating " + resourceId);
772 
773         lockResource(resourceId, txId, false);
774 
775         if (getPathForRead(txId, resourceId) != null) {
776             if (assureOnly) {
777                 return;
778             }
779             throw new ResourceManagerException(
780                 "Resource at '" + resourceId + "', already exists",
781                 ERR_RESOURCE_EXISTS,
782                 txId);
783         }
784 
785         String txChangePath = getChangePath(txId, resourceId);
786         try {
787             getContext(txId).readOnly = false;
788             
789             // creation means either undoing a delete or actually scheduling a create
790             if (!undoScheduledDelete(txId, resourceId)) {
791                 FileHelper.createFile(txChangePath);
792             }
793 
794         } catch (IOException e) {
795             throw new ResourceManagerSystemException(
796                 "Can not create resource at '" + resourceId + "'",
797                 ERR_SYSTEM,
798                 txId,
799                 e);
800         }
801     }
802 
803     public void copyResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
804         if (logger.isFineEnabled()) logger.logFine(txId + " copying " + fromResourceId + " to " + toResourceId);
805 
806         lockResource(fromResourceId, txId, true);
807         lockResource(toResourceId, txId, false);
808 
809         if (resourceExists(txId, toResourceId) && !overwrite) {
810             throw new ResourceManagerException(
811                 "Resource at '" + toResourceId + "' already exists",
812                 ERR_RESOURCE_EXISTS,
813                 txId);
814         }
815 
816         InputStream fromResourceStream = null;
817         OutputStream toResourceStream = null;
818         try {
819             fromResourceStream = readResource(txId, fromResourceId);
820             toResourceStream = writeResource(txId, toResourceId);
821             FileHelper.copy(fromResourceStream, toResourceStream);
822         } catch (IOException e) {
823             throw new ResourceManagerException(ERR_SYSTEM, txId, e);
824         } finally {
825             closeOpenResource(fromResourceStream);
826             closeOpenResource(toResourceStream);
827         }
828     }
829 
830     public void moveResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
831         if (logger.isFineEnabled()) logger.logFine(txId + " moving " + fromResourceId + " to " + toResourceId);
832 
833         lockResource(fromResourceId, txId, false);
834         lockResource(toResourceId, txId, false);
835 
836         copyResource(txId, fromResourceId, toResourceId, overwrite);
837 
838         deleteResource(txId, fromResourceId, false);
839     }
840 
841     public InputStream readResource(Object resourceId) throws ResourceManagerException {
842         // create temporary light weight tx
843         Object txId;
844         synchronized (globalTransactions) {
845             txId = generatedUniqueTxId();
846             if (logger.isFinerEnabled())
847                 logger.logFiner("Creating temporary light weight tx " + txId + " for reading");
848             TransactionContext context = new TransactionContext(txId);
849             context.isLightWeight = true;
850             // XXX higher isolation might be needed to make sure upgrade to commit lock always works
851             context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
852             // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
853             globalTransactions.put(txId, context);
854         }
855 
856         InputStream is = readResource(txId, resourceId);
857         return is;
858     }
859 
860     public InputStream readResource(Object txId, Object resourceId) throws ResourceManagerException {
861 
862         if (logger.isFineEnabled()) logger.logFine(txId + " reading " + resourceId);
863 
864         lockResource(resourceId, txId, true);
865 
866         String resourcePath = getPathForRead(txId, resourceId);
867         if (resourcePath == null) {
868             throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
869         }
870 
871         File file = new File(resourcePath);
872         try {
873             FileInputStream stream = new FileInputStream(file);
874             getContext(txId).registerResource(stream);
875             return new InputStreamWrapper(stream, txId, resourceId);
876         } catch (FileNotFoundException e) {
877             throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
878         }
879     }
880 
881     public OutputStream writeResource(Object txId, Object resourceId) throws ResourceManagerException {
882         return writeResource(txId, resourceId, false);
883     }
884 
885     public OutputStream writeResource(Object txId, Object resourceId, boolean append) throws ResourceManagerException {
886 
887         if (logger.isFineEnabled()) logger.logFine(txId + " writing " + resourceId);
888 
889         lockResource(resourceId, txId, false);
890 
891         if (append) {
892             String mainPath = getMainPath(resourceId);
893             String txChangePath = getChangePath(txId, resourceId);
894             String txDeletePath = getDeletePath(txId, resourceId);
895 
896             boolean changeExists = FileHelper.fileExists(txChangePath);
897             boolean deleteExists = FileHelper.fileExists(txDeletePath);
898             boolean mainExists = FileHelper.fileExists(mainPath);
899 
900             if (mainExists && !changeExists && !deleteExists) {
901                 // the read and the write path for resourceId will be different!
902                 copyResource(txId, resourceId, resourceId, true);
903             }
904         }
905 
906         String resourcePath = getPathForWrite(txId, resourceId);
907 
908         try {
909             FileOutputStream stream = new FileOutputStream(resourcePath, append);
910             TransactionContext context = getContext(txId);
911             context.registerResource(stream);
912             context.readOnly = false;
913             return stream;
914         } catch (FileNotFoundException e) {
915             throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
916         }
917     }
918 
919     /*
920      * --- additional public methods complementing implementation of interfaces ---
921      *
922      *  
923      */
924 
925     /**
926      * Resets the store by deleting work <em>and</em> store directory.
927      */
928     public synchronized void reset() {
929         FileHelper.removeRec(new File(storeDir));
930         FileHelper.removeRec(new File(workDir));
931         new File(storeDir).mkdirs();
932         new File(workDir).mkdirs();
933     }
934 
935     /**
936      * Synchronizes persistent data with caches. Is implemented with an empty
937      * body, but called by other methods relying on synchronization. Subclasses
938      * that utilize caching must implement this method reasonably.
939      * 
940      * @throws ResourceManagerSystemException if anything fatal hapened during synchonization
941      */
942     public synchronized void sync() throws ResourceManagerSystemException {
943     }
944 
945     /**
946      * Generates a transaction identifier unique to this resource manager. To do so
947      * it requires this resource manager to be started.
948      * 
949      * @return generated transaction identifier
950      * @throws ResourceManagerSystemException if this resource manager has not been started, yet
951      */
952     public String generatedUniqueTxId() throws ResourceManagerSystemException {
953         assureRMReady();
954         String txId;
955         synchronized (globalTransactions) {
956             do {
957                 txId = Long.toHexString(System.currentTimeMillis()) + "-"
958                         + Integer.toHexString(idCnt++);
959                 // XXX busy loop
960             } while (getContext(txId) != null);
961         }
962         return txId;
963     }
964 
965     /*
966      * --- sane checks ---
967      *
968      *  
969      */
970 
971     protected void fileInitialSaneCheck(Object txId, Object path) throws ResourceManagerException {
972         if (path == null || path.toString().length() == 0) {
973             throw new ResourceManagerException(ERR_RESOURCEID_INVALID, txId);
974         }
975     }
976 
977     protected void assureStarted() throws ResourceManagerSystemException {
978         if (operationMode != OPERATION_MODE_STARTED) {
979             throw new ResourceManagerSystemException("Resource Manager Service not started", ERR_SYSTEM, null);
980         }
981     }
982 
983     protected void assureRMReady() throws ResourceManagerSystemException {
984         if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) {
985             throw new ResourceManagerSystemException("Resource Manager Service not ready", ERR_SYSTEM, null);
986         }
987     }
988 
989     protected void assureNotMarkedForRollback(TransactionContext context) throws ResourceManagerException {
990         if (context.status == STATUS_MARKED_ROLLBACK) {
991             throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK, context.txId);
992         }
993     }
994 
995     protected TransactionContext txInitialSaneCheckForWriting(Object txId) throws ResourceManagerException {
996         assureRMReady();
997         // do not allow any further writing or commit or rollback when db is corrupt
998         if (dirty) {
999             throw new ResourceManagerSystemException(
1000                 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
1001                 ERR_SYSTEM,
1002                 txId);
1003         }
1004         return txInitialSaneCheck(txId);
1005     }
1006 
1007     protected TransactionContext txInitialSaneCheck(Object txId) throws ResourceManagerException {
1008         assureRMReady();
1009         if (txId == null) {
1010             throw new ResourceManagerException(ERR_TXID_INVALID, txId);
1011         }
1012 
1013         TransactionContext context = getContext(txId);
1014 
1015         if (context == null) {
1016             throw new ResourceManagerException(ERR_NO_TX, txId);
1017         }
1018 
1019         return context;
1020     }
1021 
1022     /*
1023      * --- General Helpers ---
1024      *
1025      *  
1026      */
1027 
1028     protected TransactionContext getContext(Object txId) {
1029         return (TransactionContext) globalTransactions.get(txId);
1030     }
1031 
1032     protected String assureLeadingSlash(Object pathObject) {
1033         String path = "";
1034         if (pathObject != null) {
1035             if (idMapper != null) {
1036                 path = idMapper.getPathForId(pathObject);
1037             } else {
1038                 path = pathObject.toString();
1039             }
1040             if (path.length() > 0 && path.charAt(0) != '/' && path.charAt(0) != '\\') {
1041                 path = "/" + path;
1042             }
1043         }
1044         return path;
1045     }
1046 
1047     protected String getMainPath(Object path) {
1048         StringBuffer buf = new StringBuffer(storeDir.length() + path.toString().length() + 5);
1049         buf.append(storeDir).append(assureLeadingSlash(path));
1050         return buf.toString();
1051     }
1052 
1053     protected String getTransactionBaseDir(Object txId) {
1054         return workDir + '/' + txIdMapper.getPathForId(txId);
1055     }
1056 
1057     protected String getChangePath(Object txId, Object path) {
1058         String txBaseDir = getTransactionBaseDir(txId);
1059         StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1060                 + WORK_CHANGE_DIR.length() + 5);
1061         buf.append(txBaseDir).append('/').append(WORK_CHANGE_DIR).append(assureLeadingSlash(path));
1062         return buf.toString();
1063     }
1064 
1065     protected String getDeletePath(Object txId, Object path) {
1066         String txBaseDir = getTransactionBaseDir(txId);
1067         StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1068                 + WORK_DELETE_DIR.length() + 5);
1069         buf.append(txBaseDir).append('/').append(WORK_DELETE_DIR).append(assureLeadingSlash(path));
1070         return buf.toString();
1071     }
1072 
1073     protected boolean undoScheduledDelete(Object txId, Object resourceId) throws ResourceManagerException {
1074         String txDeletePath = getDeletePath(txId, resourceId);
1075         File deleteFile = new File(txDeletePath);
1076         if (deleteFile.exists()) {
1077             if (!deleteFile.delete()) {
1078                 throw new ResourceManagerSystemException(
1079                     "Failed to undo delete of '" + resourceId + "'",
1080                     ERR_SYSTEM,
1081                     txId);
1082             }
1083             return true;
1084         }
1085         return false;
1086     }
1087 
1088     protected boolean undoScheduledChangeOrCreate(Object txId, Object resourceId) throws ResourceManagerException {
1089         String txChangePath = getChangePath(txId, resourceId);
1090         File changeFile = new File(txChangePath);
1091         if (changeFile.exists()) {
1092             if (!changeFile.delete()) {
1093                 throw new ResourceManagerSystemException(
1094                     "Failed to undo change / create of '" + resourceId + "'",
1095                     ERR_SYSTEM,
1096                     txId);
1097             }
1098             return true;
1099         }
1100         return false;
1101     }
1102 
1103     protected String getPathForWrite(Object txId, Object resourceId) throws ResourceManagerException {
1104         try {
1105             // when we want to write, be sure to write to a local copy
1106             String txChangePath = getChangePath(txId, resourceId);
1107             if (!FileHelper.fileExists(txChangePath)) {
1108                 FileHelper.createFile(txChangePath);
1109             }
1110             return txChangePath;
1111         } catch (IOException e) {
1112             throw new ResourceManagerSystemException(
1113                 "Can not write to resource at '" + resourceId + "'",
1114                 ERR_SYSTEM,
1115                 txId,
1116                 e);
1117         }
1118     }
1119 
1120     protected String getPathForRead(Object txId, Object resourceId) throws ResourceManagerException {
1121 
1122         String mainPath = getMainPath(resourceId);
1123         String txChangePath = getChangePath(txId, resourceId);
1124         String txDeletePath = getDeletePath(txId, resourceId);
1125 
1126         // now, this gets a bit complicated:
1127 
1128         boolean changeExists = FileHelper.fileExists(txChangePath);
1129         boolean deleteExists = FileHelper.fileExists(txDeletePath);
1130         boolean mainExists = FileHelper.fileExists(mainPath);
1131         boolean resourceIsDir =
1132             ((mainExists && new File(mainPath).isDirectory())
1133                 || (changeExists && new File(txChangePath).isDirectory()));
1134         if (resourceIsDir) {
1135             logger.logWarning("Resource at '" + resourceId + "' maps to directory");
1136         }
1137 
1138         // first do some sane checks
1139 
1140         // this may never be, two cases are possible, both disallowing to have a delete together with a change
1141         // 1. first there was a change, than a delete -> at least delete file exists (when there is a file in main store)
1142         // 2. first there was a delete, than a change -> only change file exists
1143         if (!resourceIsDir && changeExists && deleteExists) {
1144             throw new ResourceManagerSystemException(
1145                 "Inconsistent delete and change combination for resource at '" + resourceId + "'",
1146                 ERR_TX_INCONSISTENT,
1147                 txId);
1148         }
1149 
1150         // you should not have been allowed to delete a file that does not exist at all
1151         if (deleteExists && !mainExists) {
1152             throw new ResourceManagerSystemException(
1153                 "Inconsistent delete for resource at '" + resourceId + "'",
1154                 ERR_TX_INCONSISTENT,
1155                 txId);
1156         }
1157 
1158         if (changeExists) {
1159             return txChangePath;
1160         } else if (mainExists && !deleteExists) {
1161             return mainPath;
1162         } else {
1163             return null;
1164         }
1165     }
1166 
1167     /*
1168      * --- Locking Helpers ---
1169      *
1170      *  
1171      */
1172 
1173     protected int getSharedLockLevel(TransactionContext context) throws ResourceManagerException {
1174         if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED
1175             || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1176             return LOCK_ACCESS;
1177         } else if (
1178             context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ
1179                 || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) {
1180             return LOCK_SHARED;
1181         } else {
1182             return LOCK_ACCESS;
1183         }
1184     }
1185 
1186     /*
1187      * --- Resource Management ---
1188      *
1189      *  
1190      */
1191 
1192     protected void registerOpenResource(Object openResource) {
1193         if (logger.isFinerEnabled())
1194             logger.logFiner("Registering open resource " + openResource);
1195         globalOpenResources.add(openResource);
1196     }
1197 
1198     protected void releaseGlobalOpenResources() {
1199         ArrayList copy;
1200         synchronized (globalOpenResources) {
1201             // XXX need to copy in order to allow removal in releaseOpenResource  
1202             copy = new ArrayList(globalOpenResources);
1203             for (Iterator it = copy.iterator(); it.hasNext();) {
1204                 Object stream = it.next();
1205                 closeOpenResource(stream);
1206             }
1207         }
1208     }
1209 
1210     protected void closeOpenResource(Object openResource) {
1211         if (logger.isFinerEnabled()) logger.logFiner("Releasing resource " + openResource);
1212         globalOpenResources.remove(openResource);
1213         if (openResource instanceof InputStream) {
1214             InputStream is = (InputStream) openResource;
1215             try {
1216                 is.close();
1217             } catch (IOException e) {
1218                 // do not care, as it might have been closed somewhere else, before 
1219             }
1220         } else if (openResource instanceof OutputStream) {
1221             OutputStream os = (OutputStream) openResource;
1222             try {
1223                 os.close();
1224             } catch (IOException e) {
1225                 // do not care, as it might have been closed somewhere else, before 
1226             }
1227         }
1228     }
1229 
1230     /*
1231      * --- Recovery / Shutdown Support ---
1232      *
1233      *  
1234      */
1235 
1236     protected boolean rollBackOrForward() {
1237         boolean allCool = true;
1238 
1239         synchronized (globalTransactions) {
1240             ArrayList contexts = new ArrayList(globalTransactions.values());
1241             for (Iterator it = contexts.iterator(); it.hasNext();) {
1242                 TransactionContext context = (TransactionContext) it.next();
1243                 if (context.status == STATUS_COMMITTING) {
1244                     // roll forward
1245                     logger.logInfo("Rolling forward " + context.txId);
1246 
1247                     try {
1248                         context.commit();
1249                         context.status = STATUS_COMMITTED;
1250                         context.saveState();
1251                         globalTransactions.remove(context.txId);
1252                         context.cleanUp();
1253                     } catch (ResourceManagerException e) {
1254                         // this is not good, but what can we do now?
1255                         allCool = false;
1256                         logger.logSevere("Rolling forward of " + context.txId + " failed", e);
1257                     }
1258                 } else if (context.status == STATUS_COMMITTED) {
1259                     logger.logInfo("Cleaning already commited " + context.txId);
1260                     globalTransactions.remove(context.txId);
1261                     try {
1262                         context.cleanUp();
1263                     } catch (ResourceManagerException e) {
1264                         // this is not good, but what can we do now?
1265                         allCool = false;
1266                         logger.logWarning("Cleaning of " + context.txId + " failed", e);
1267                     }
1268                 } else {
1269                     // in all other cases roll back and warn when not rollback was explicitely selected for tx
1270                     if (context.status != STATUS_ROLLING_BACK
1271                         && context.status != STATUS_ROLLEDBACK
1272                         && context.status != STATUS_MARKED_ROLLBACK) {
1273                         logger.logWarning("Irregularly rolling back " + context.txId);
1274                     } else {
1275                         logger.logInfo("Rolling back " + context.txId);
1276                     }
1277                     try {
1278                         context.rollback();
1279                         context.status = STATUS_ROLLEDBACK;
1280                         context.saveState();
1281                         globalTransactions.remove(context.txId);
1282                         context.cleanUp();
1283                     } catch (ResourceManagerException e) {
1284                         logger.logWarning("Rolling back of " + context.txId + " failed", e);
1285                     }
1286                 }
1287             }
1288 
1289         }
1290         return allCool;
1291     }
1292 
1293     protected void recoverContexts() {
1294         File dir = new File(workDir);
1295         File[] files = dir.listFiles();
1296         if (files == null)
1297             return;
1298         for (int i = 0; i < files.length; i++) {
1299             File file = files[i];
1300             Object txId = txIdMapper.getIdForPath(file.getName());
1301             // recover all transactions we do not already know
1302             if (!globalTransactions.containsKey(txId)) {
1303 
1304                 logger.logInfo("Recovering " + txId);
1305                 TransactionContext context;
1306                 try {
1307                     context = new TransactionContext(txId);
1308                     context.recoverState();
1309                     globalTransactions.put(txId, context);
1310                 } catch (ResourceManagerException e) {
1311                     // this is not good, but the best we get, just log as warning
1312                     logger.logWarning("Recovering of " + txId + " failed");
1313                 }
1314             }
1315         }
1316     }
1317 
1318     protected boolean waitForAllTxToStop(long timeoutMSecs) {
1319         long startTime = System.currentTimeMillis();
1320 
1321         // be sure not to lock globalTransactions for too long, as we need to give
1322         // txs the chance to complete (otherwise deadlocks are very likely to occur)
1323         // instead iterate over a copy as we can be sure no new txs will be registered
1324         // after operation level has been set to stopping
1325 
1326         Collection transactionsToStop;
1327         synchronized (globalTransactions) {
1328             transactionsToStop = new ArrayList(globalTransactions.values());
1329         }
1330         for (Iterator it = transactionsToStop.iterator(); it.hasNext();) {
1331             long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1332 
1333             if (remainingTimeout <= 0) {
1334                 return false;
1335             }
1336 
1337             TransactionContext context = (TransactionContext) it.next();
1338             synchronized (context) {
1339                 if (!context.finished) {
1340                     logger.logInfo(
1341                         "Waiting for tx " + context.txId + " to finish for " + remainingTimeout + " milli seconds");
1342                 }
1343                 while (!context.finished && remainingTimeout > 0) {
1344                     try {
1345                         context.wait(remainingTimeout);
1346                     } catch (InterruptedException e) {
1347                         return false;
1348                     }
1349                     remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1350                 }
1351                 if (context.finished) {
1352                     logger.logInfo("Tx " + context.txId + " finished");
1353                 } else {
1354                     logger.logWarning("Tx " + context.txId + " failed to finish in given time");
1355                 }
1356             }
1357         }
1358 
1359         return (globalTransactions.size() == 0);
1360     }
1361 
1362     protected boolean shutdown(int mode, long timeoutMSecs) {
1363         switch (mode) {
1364             case SHUTDOWN_MODE_NORMAL :
1365                 return waitForAllTxToStop(timeoutMSecs);
1366             case SHUTDOWN_MODE_ROLLBACK :
1367                 return rollBackOrForward();
1368             case SHUTDOWN_MODE_KILL :
1369                 return true;
1370             default :
1371                 return false;
1372         }
1373     }
1374 
1375     protected void setDirty(Object txId, Throwable t) {
1376         logger.logSevere(
1377             "Fatal error during critical commit/rollback of transaction " + txId + ", setting database to dirty.",
1378             t);
1379         dirty = true;
1380     }
1381 
1382     /**
1383      * Inner class to hold the complete context, i.e. all information needed, for a transaction.
1384      * 
1385      */
1386     protected class TransactionContext {
1387 
1388         protected Object txId;
1389         protected int status = STATUS_ACTIVE;
1390         protected int isolationLevel = DEFAULT_ISOLATION_LEVEL;
1391         protected long timeoutMSecs = getDefaultTransactionTimeout();
1392         protected long startTime;
1393         protected long commitTime = -1L;
1394         protected boolean isLightWeight = false;
1395         protected boolean readOnly = true;
1396         protected boolean finished = false;
1397 
1398         // list of streams participating in this tx
1399         private List openResources = new ArrayList();
1400 
1401         public TransactionContext(Object txId) throws ResourceManagerException {
1402             this.txId = txId;
1403             startTime = System.currentTimeMillis();
1404         }
1405 
1406         public long getRemainingTimeout() {
1407             long now = System.currentTimeMillis();
1408             return (startTime - now + timeoutMSecs);
1409         }
1410 
1411         public synchronized void init() throws ResourceManagerException {
1412             String baseDir = getTransactionBaseDir(txId);
1413             String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1414             String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1415 
1416             new File(changeDir).mkdirs();
1417             new File(deleteDir).mkdirs();
1418 
1419             saveState();
1420         }
1421 
1422         public synchronized void rollback() throws ResourceManagerException {
1423             closeResources();
1424             freeLocks();
1425         }
1426 
1427         public synchronized void commit() throws ResourceManagerException {
1428             String baseDir = getTransactionBaseDir(txId);
1429             String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1430             String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1431 
1432             closeResources();
1433             upgradeLockToCommit();
1434             try {
1435                 applyDeletes(new File(deleteDir), new File(storeDir), new File(storeDir));
1436                 FileHelper.moveRec(new File(changeDir), new File(storeDir));
1437             } catch (IOException e) {
1438                 throw new ResourceManagerSystemException("Commit failed", ERR_SYSTEM, txId, e);
1439             }
1440             freeLocks();
1441             commitTime = System.currentTimeMillis();
1442         }
1443 
1444         public synchronized void notifyFinish() {
1445             finished = true;
1446             notifyAll();
1447         }
1448 
1449         public synchronized void cleanUp() throws ResourceManagerException {
1450             if (!cleanUp)
1451                 return; // XXX for debugging only
1452             boolean clean = true;
1453             Exception cleanException = null;
1454             String baseDir = getTransactionBaseDir(txId);
1455             FileHelper.removeRec(new File(baseDir));
1456             if (!clean) {
1457                 throw new ResourceManagerSystemException(
1458                     "Clean up failed due to unreleasable lock",
1459                     ERR_SYSTEM,
1460                     txId,
1461                     cleanException);
1462             }
1463         }
1464 
1465         public synchronized void finalCleanUp() throws ResourceManagerException {
1466             closeResources();
1467             freeLocks();
1468         }
1469 
1470         public synchronized void upgradeLockToCommit() throws ResourceManagerException {
1471             for (Iterator it =  lockManager.getAll(txId).iterator(); it.hasNext();) {
1472                 GenericLock lock = (GenericLock) it.next();
1473                 // only upgrade if we had write access
1474                 if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) {
1475                     try {
1476                         // in case of deadlocks, make failure of non-committing tx more likely
1477                         if (!lock
1478                             .acquire(
1479                                 txId,
1480                                 LOCK_COMMIT,
1481                                 true,
1482                                 true,
1483                                 getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR)) {
1484                             throw new ResourceManagerException(
1485                                 "Could not upgrade to commit lock for resource at '"
1486                                     + lock.getResourceId().toString()
1487                                     + "'",
1488                                 ERR_NO_LOCK,
1489                                 txId);
1490                         }
1491                     } catch (InterruptedException e) {
1492                         throw new ResourceManagerSystemException(ERR_SYSTEM, txId, e);
1493                     }
1494                 }
1495 
1496             }
1497         }
1498 
1499         public synchronized void freeLocks() {
1500             lockManager.releaseAll(txId);
1501         }
1502 
1503         public synchronized void closeResources() {
1504             synchronized (globalOpenResources) {
1505                 for (Iterator it = openResources.iterator(); it.hasNext();) {
1506                     Object stream = it.next();
1507                     closeOpenResource(stream);
1508                 }
1509             }
1510         }
1511 
1512         public synchronized void registerResource(Object openResource) {
1513             synchronized (globalOpenResources) {
1514                 registerOpenResource(openResource);
1515                 openResources.add(openResource);
1516             }
1517         }
1518 
1519         public synchronized void saveState() throws ResourceManagerException {
1520             String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1521             File file = new File(statePath);
1522             BufferedWriter writer = null;
1523             try {
1524                 OutputStream os = new FileOutputStream(file);
1525                 writer = new BufferedWriter(new OutputStreamWriter(os, DEFAULT_PARAMETER_ENCODING));
1526                 writer.write(toString());
1527             } catch (FileNotFoundException e) {
1528                 String msg = "Saving status information to '" + statePath + "' failed! Could not create file";
1529                 logger.logSevere(msg, e);
1530                 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1531             } catch (IOException e) {
1532                 String msg = "Saving status information to '" + statePath + "' failed";
1533                 logger.logSevere(msg, e);
1534                 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1535             } finally {
1536                 if (writer != null) {
1537                     try {
1538                         writer.close();
1539                     } catch (IOException e) {
1540                     }
1541 
1542                 }
1543             }
1544         }
1545 
1546         public synchronized void recoverState() throws ResourceManagerException {
1547             String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1548             File file = new File(statePath);
1549             BufferedReader reader = null;
1550             try {
1551                 InputStream is = new FileInputStream(file);
1552 
1553                 reader = new BufferedReader(new InputStreamReader(is, DEFAULT_PARAMETER_ENCODING));
1554                 txId = reader.readLine();
1555                 status = Integer.parseInt(reader.readLine());
1556                 isolationLevel = Integer.parseInt(reader.readLine());
1557                 timeoutMSecs = Long.parseLong(reader.readLine());
1558                 startTime = Long.parseLong(reader.readLine());
1559             } catch (FileNotFoundException e) {
1560                 String msg = "Recovering status information from '" + statePath + "' failed! Could not find file";
1561                 logger.logSevere(msg, e);
1562                 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId);
1563             } catch (IOException e) {
1564                 String msg = "Recovering status information from '" + statePath + "' failed";
1565                 logger.logSevere(msg, e);
1566                 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1567             } catch (Throwable t) {
1568                 String msg = "Recovering status information from '" + statePath + "' failed";
1569                 logger.logSevere(msg, t);
1570                 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, t);
1571             } finally {
1572                 if (reader != null) {
1573                     try {
1574                         reader.close();
1575                     } catch (IOException e) {
1576                     }
1577 
1578                 }
1579             }
1580         }
1581 
1582         public synchronized String toString() {
1583             StringBuffer buf = new StringBuffer();
1584             buf.append(txId).append('\n');
1585             buf.append(Integer.toString(status)).append('\n');
1586             buf.append(Integer.toString(isolationLevel)).append('\n');
1587             buf.append(Long.toString(timeoutMSecs)).append('\n');
1588             buf.append(Long.toString(startTime)).append('\n');
1589             if (debug) {
1590                 buf.append("----- Lock Debug Info -----\n");
1591 
1592                 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) {
1593                     GenericLock lock = (GenericLock) it.next();
1594                     buf.append(lock.toString()+"\n");
1595                 }
1596 
1597             }
1598             return buf.toString();
1599         }
1600 
1601     }
1602 
1603     private class InputStreamWrapper extends InputStream {
1604         private InputStream is;
1605         private Object txId;
1606         private Object resourceId;
1607 
1608         public InputStreamWrapper(InputStream is, Object txId, Object resourceId) {
1609             this.is = is;
1610             this.txId = txId;
1611             this.resourceId = resourceId;
1612         }
1613 
1614         public int read() throws IOException {
1615             return is.read();
1616         }
1617 
1618         public int read(byte b[]) throws IOException {
1619             return is.read(b);
1620         }
1621 
1622         public int read(byte b[], int off, int len) throws IOException {
1623             return is.read(b, off, len);
1624         }
1625 
1626         public int available() throws IOException {
1627             return is.available();
1628         }
1629 
1630         public void close() throws IOException {
1631             try {
1632                 is.close();
1633             } finally {
1634                 TransactionContext context;
1635                 synchronized (globalTransactions) {
1636                     context = getContext(txId);
1637                     if (context == null) {
1638                         return;
1639                     }
1640                 }
1641                 synchronized (context) {
1642                     if (context.isLightWeight) {
1643                         if (logger.isFinerEnabled())
1644                             logger.logFiner("Upon close of resource removing temporary light weight tx " + txId);
1645                         context.freeLocks();
1646                         globalTransactions.remove(txId);
1647                     } else {
1648                         // release access lock in order to allow other transactions to commit
1649                         if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) {
1650                             if (logger.isFinerEnabled()) {
1651                                 logger.logFiner("Upon close of resource releasing access lock for tx " + txId + " on resource at " + resourceId);
1652                             }
1653                             lockManager.release(txId, resourceId);
1654                         }
1655                     }
1656                 }
1657             }
1658         }
1659 
1660         public void mark(int readlimit) {
1661             is.mark(readlimit);
1662         }
1663 
1664         public void reset() throws IOException {
1665             is.reset();
1666         }
1667 
1668         public boolean markSupported() {
1669             return is.markSupported();
1670 
1671         }
1672 
1673     }
1674 
1675 }