1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.transaction.locking;
18
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.Set;
22 import java.util.Map.Entry;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.locks.Lock;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.commons.transaction.locking.LockException.Code;
30 import org.apache.commons.transaction.locking.locks.ResourceRWLock;
31
32 /**
33 * Advanced read/write lock implementation of a {@link LockManager} based on
34 * {@link ResourceRWLock}.
35 *
36 * <p>
37 * <em>Note</em>: This implementation performs deadlock detection.
38 *
39 * <p>
40 * This implementation is <em>thread-safe</em>.
41 */
42 public class RWLockManager<K, M> extends AbstractLockManager<K, M> implements LockManager<K, M> {
43
44 private Log log = LogFactory.getLog(getClass());
45
46 protected ConcurrentHashMap<KeyEntry<K, M>, ResourceRWLock> allLocks = new ConcurrentHashMap<KeyEntry<K, M>, ResourceRWLock>();
47
48 private long absolutePrewaitTime = -1;
49
50 private long prewaitTimeDivisor = 10;
51
52 protected void release() {
53 Set<Lock> locks = locksForThreads.get(Thread.currentThread());
54
55 if (locks == null) {
56 return;
57 }
58
59
60 for (Lock lock : locks) {
61 lock.unlock();
62 }
63
64
65
66 removeUnsuedLocks();
67
68 locksForThreads.remove(Thread.currentThread());
69 }
70
71 protected void removeUnsuedLocks() {
72 Set<Entry<KeyEntry<K, M>, ResourceRWLock>> locksToCheck = allLocks.entrySet();
73 for (Entry<KeyEntry<K, M>, ResourceRWLock> entry : locksToCheck) {
74 KeyEntry<K, M> keyEntry = entry.getKey();
75 ResourceRWLock lock = entry.getValue();
76
77
78 if (lock.isUnacquired()) {
79
80 if (allLocks.remove(keyEntry, lock)) {
81 log.debug("Completely removing unused lock" + lock);
82 }
83 }
84 }
85 }
86
87 protected ResourceRWLock create(String name) {
88 return new ResourceRWLock(name);
89 }
90
91 protected boolean tryLockInternal(M resourceManager, K key, boolean exclusive, long time,
92 TimeUnit unit) throws LockException {
93 reportTimeout(Thread.currentThread());
94
95 KeyEntry<K, M> entry = new KeyEntry<K, M>(key, resourceManager);
96
97 String resourceName = entry.toString();
98
99 ResourceRWLock rwlock = create(resourceName);
100 ResourceRWLock existingLock = allLocks.putIfAbsent(entry, rwlock);
101 if (existingLock != null)
102 rwlock = existingLock;
103 Set<Lock> locksForThisThread = locksForThreads.get(Thread.currentThread());
104 if (locksForThisThread == null) {
105 throw new IllegalStateException("lock() can only be called after startWork()");
106 }
107
108 Lock lock = exclusive ? rwlock.writeLock() : rwlock.readLock();
109
110 boolean locked;
111 if (time == 0) {
112 locked = lock.tryLock();
113 } else {
114
115
116
117 rwlock.registerWaiter();
118 try {
119 locked = doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(lock, unit
120 .toMillis(time));
121 } finally {
122 rwlock.unregisterWaiter();
123 }
124 }
125 if (locked)
126 locksForThisThread.add(lock);
127
128 return locked;
129 }
130
131 protected boolean doTrickyYetEfficientLockOnlyIfThisCanNotCauseADeadlock(Lock lock,
132 long timeMsecs) throws LockException {
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 try {
156 boolean locked;
157
158
159
160 long startTime = System.currentTimeMillis();
161
162 long preWaitTime = getPrewaitTime(timeMsecs);
163 locked = lock.tryLock(preWaitTime, TimeUnit.MILLISECONDS);
164 if (locked)
165 return true;
166
167
168 cancelAllTimedOut();
169 detectDeadlock(Thread.currentThread(), new HashSet<Thread>());
170
171
172 long now = System.currentTimeMillis();
173 long remainingWaitTime = timeMsecs - (now - startTime);
174 if (remainingWaitTime < 0)
175 return false;
176
177 locked = lock.tryLock(remainingWaitTime, TimeUnit.MILLISECONDS);
178 return locked;
179 } catch (InterruptedException e) {
180 throw new LockException(Code.INTERRUPTED);
181 }
182
183 }
184
185 long getPrewaitTime(long timeMsecs) {
186 if (absolutePrewaitTime != -1)
187 return absolutePrewaitTime;
188 return timeMsecs / prewaitTimeDivisor;
189 }
190
191 protected void detectDeadlock(Thread thread, Set<Thread> path) {
192 path.add(thread);
193
194
195
196
197 Set<Lock> locks = locksForThreads.get(thread);
198
199
200 if (locks != null) {
201 for (Lock lock : locks) {
202
203
204 Collection<Thread> conflicts = ((ResourceRWLock.InnerLock) lock)
205 .getResourceRWLock().getQueuedThreads();
206 for (Thread conflictThread : conflicts) {
207
208 if (path.contains(conflictThread)) {
209 String message = "Cycle found involving " + formatPath(path);
210 throw new LockException(message, LockException.Code.WOULD_DEADLOCK);
211 } else {
212 detectDeadlock(conflictThread, path);
213 }
214 }
215 }
216 }
217 path.remove(thread);
218 }
219
220 private String formatPath(Set<Thread> path) {
221 StringBuffer buf = new StringBuffer();
222 for (Thread thread : path) {
223 buf.append(thread.getName()).append("->");
224 }
225 return buf.toString();
226 }
227
228 protected void cancelAllTimedOut() {
229 Set<Thread> threads = effectiveGlobalTimeouts.keySet();
230 for (Thread thread : threads) {
231 if (hasTimedOut(thread)) {
232
233
234
235
236
237
238
239 thread.interrupt();
240 }
241
242 }
243 }
244
245 public long getAbsolutePrewaitTime() {
246 return absolutePrewaitTime;
247 }
248
249 public void setAbsolutePrewaitTime(long absolutePrewaitTime) {
250 this.absolutePrewaitTime = absolutePrewaitTime;
251 }
252
253 public long getPrewaitTimeDivisor() {
254 return prewaitTimeDivisor;
255 }
256
257 public void setPrewaitTimeDivisor(long prewaitTimeDivisor) {
258 this.prewaitTimeDivisor = prewaitTimeDivisor;
259 }
260
261 }