View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.transaction.locking;
18  
19  import java.util.Collection;
20  import java.util.HashSet;
21  import java.util.Set;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.locks.Lock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.commons.transaction.locking.LockException.Code;
30  import org.apache.commons.transaction.locking.locks.ResourceRWLock;
31  
32  /**
33   * Advanced read/write lock implementation of a {@link LockManager} based on
34   * {@link ResourceRWLock}.
35   * 
36   * <p>
37   * <em>Note</em>: This implementation performs deadlock detection.
38   * 
39   * <p>
40   * This implementation is <em>thread-safe</em>.
41   */
42  public class RWLockManager<K, M> extends AbstractLockManager<K, M> implements LockManager<K, M> {
43  
44      private Log log = LogFactory.getLog(getClass());
45  
46      protected ConcurrentHashMap<KeyEntry<K, M>, ResourceRWLock> allLocks = new ConcurrentHashMap<KeyEntry<K, M>, ResourceRWLock>();
47  
48      private long absolutePrewaitTime = -1;
49  
50      private long prewaitTimeDivisor = 10;
51  
52      protected void release() {
53          Set<Lock> locks = locksForThreads.get(Thread.currentThread());
54          // graceful reaction...
55          if (locks == null) {
56              return;
57          }
58  
59          // first release all locks
60          for (Lock lock : locks) {
61              lock.unlock();
62          }
63  
64          // then remove all locks that are no longer needed to avoid out of
65          // memory
66          removeUnsuedLocks();
67  
68          locksForThreads.remove(Thread.currentThread());
69      }
70  
71      protected void removeUnsuedLocks() {
72          Set<Entry<KeyEntry<K, M>, ResourceRWLock>> locksToCheck = allLocks.entrySet();
73          for (Entry<KeyEntry<K, M>, ResourceRWLock> entry : locksToCheck) {
74              KeyEntry<K, M> keyEntry = entry.getKey();
75              ResourceRWLock lock = entry.getValue();
76  
77              // remove lock if no other thread holds a it
78              if (lock.isUnacquired()) {
79                  // only remove if no one else has modified it in the meantime
80                  if (allLocks.remove(keyEntry, lock)) {
81                      log.debug("Completely removing unused lock" + lock);
82                  }
83              }
84          }
85      }
86  
87      protected ResourceRWLock create(String name) {
88          return new ResourceRWLock(name);
89      }
90  
91      protected boolean tryLockInternal(M resourceManager, K key, boolean exclusive, long time,
92              TimeUnit unit) throws LockException {
93          reportTimeout(Thread.currentThread());
94  
95          KeyEntry<K, M> entry = new KeyEntry<K, M>(key, resourceManager);
96  
97          String resourceName = entry.toString();
98  
99          ResourceRWLock rwlock = create(resourceName);
100         ResourceRWLock existingLock = allLocks.putIfAbsent(entry, rwlock);
101         if (existingLock != null)
102             rwlock = existingLock;
103         Set<Lock> locksForThisThread = locksForThreads.get(Thread.currentThread());
104         if (locksForThisThread == null) {
105             throw new IllegalStateException("lock() can only be called after startWork()");
106         }
107 
108         Lock lock = exclusive ? rwlock.writeLock() : rwlock.readLock();
109 
110         boolean locked;
111         if (time == 0) {
112             locked = lock.tryLock();
113         } else {
114             // we need to have this lock request registered as an additional
115             // waiter as it will not be among the queued threads at the time we
116             // do the deadlock check
117             rwlock.registerWaiter();
118             try {
119                 locked = doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(lock, unit
120                         .toMillis(time));
121             } finally {
122                 rwlock.unregisterWaiter();
123             }
124         }
125         if (locked)
126             locksForThisThread.add(lock);
127 
128         return locked;
129     }
130 
131     protected boolean doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(Lock lock,
132             long timeMsecs) throws LockException {
133 
134         // This algorithm is devided into three parts:
135         // Note: We can be interrupted most of the time
136         //
137         // (I) prewait:
138         // Wait a fraktion of the time to see if we can acquire
139         // the lock in short time. If we can all is good and we exit
140         // signalling success. If not we need to get into a more resource
141         // consuming phase.
142         //
143         // (II) clearing of timed out threads / deadlock detection:
144         // As we have not been able to acquire the lock, yet, maybe there is
145         // deadlock. Clear all threads already timed out and afterwards
146         // check for a deadlock state. If there is one report it with an
147         // exception. If not we enter the final phase.
148         // 
149         // (III) real wait:
150         // Everything is under control, we were just a little bit too
151         // impatient. So wait for the remaining time and see if the can get
152         // the lock
153         // 
154 
155         try {
156             boolean locked;
157 
158             // (I) prewait
159 
160             long startTime = System.currentTimeMillis();
161 
162             long preWaitTime = getPrewaitTime(timeMsecs);
163             locked = lock.tryLock(preWaitTime, TimeUnit.MILLISECONDS);
164             if (locked)
165                 return true;
166 
167             // (II) deadlock detect
168             cancelAllTimedOut();
169             detectDeadlock(Thread.currentThread(), new HashSet<Thread>());
170 
171             // (III) real wait
172             long now = System.currentTimeMillis();
173             long remainingWaitTime = timeMsecs - (now - startTime);
174             if (remainingWaitTime < 0)
175                 return false;
176 
177             locked = lock.tryLock(remainingWaitTime, TimeUnit.MILLISECONDS);
178             return locked;
179         } catch (InterruptedException e) {
180             throw new LockException(Code.INTERRUPTED);
181         }
182 
183     }
184 
185     long getPrewaitTime(long timeMsecs) {
186         if (absolutePrewaitTime != -1)
187             return absolutePrewaitTime;
188         return timeMsecs / prewaitTimeDivisor;
189     }
190 
191     protected void detectDeadlock(Thread thread, Set<Thread> path) {
192         path.add(thread);
193         // these are our locks
194         // Note: No need to make a copy as we can be sure to iterate on our
195         // private
196         // version, as this is a CopyOnWriteArraySet!
197         Set<Lock> locks = locksForThreads.get(thread);
198         // check is necessary as the possibly offending thread might have ended
199         // before this check completes
200         if (locks != null) {
201             for (Lock lock : locks) {
202                 // these are the ones waiting for one of our locks
203                 // and if they wait, they wait because of me!
204                 Collection<Thread> conflicts = ((ResourceRWLock.InnerLock) lock)
205                         .getResourceRWLock().getQueuedThreads();
206                 for (Thread conflictThread : conflicts) {
207                     // this means, we have found a cycle in the wait graph
208                     if (path.contains(conflictThread)) {
209                         String message = "Cycle found involving " + formatPath(path);
210                         throw new LockException(message, LockException.Code.WOULD_DEADLOCK);
211                     } else {
212                         detectDeadlock(conflictThread, path);
213                     }
214                 }
215             }
216         }
217         path.remove(thread);
218     }
219 
220     private String formatPath(Set<Thread> path) {
221         StringBuffer buf = new StringBuffer();
222         for (Thread thread : path) {
223             buf.append(thread.getName()).append("->");
224         }
225         return buf.toString();
226     }
227 
228     protected void cancelAllTimedOut() {
229         Set<Thread> threads = effectiveGlobalTimeouts.keySet();
230         for (Thread thread : threads) {
231             if (hasTimedOut(thread)) {
232                 // TODO #1: We need to record this thread has timed out to
233                 // produce
234                 // a meaningful exception when it tries to continue its work
235                 // TODO #2: If would be even better if we could actively release
236                 // its locks, but only the thread that acquired a lock can
237                 // release it. An extended implementation of ReentrantLock would
238                 // help.
239                 thread.interrupt();
240             }
241 
242         }
243     }
244 
245     public long getAbsolutePrewaitTime() {
246         return absolutePrewaitTime;
247     }
248 
249     public void setAbsolutePrewaitTime(long absolutePrewaitTime) {
250         this.absolutePrewaitTime = absolutePrewaitTime;
251     }
252 
253     public long getPrewaitTimeDivisor() {
254         return prewaitTimeDivisor;
255     }
256 
257     public void setPrewaitTimeDivisor(long prewaitTimeDivisor) {
258         this.prewaitTimeDivisor = prewaitTimeDivisor;
259     }
260 
261 }