Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Sequence |
|
| 2.090909090909091;2.091 |
1 | /* | |
2 | * #%L | |
3 | * Sequence.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.client.connection.socket; | |
22 | ||
23 | import java.util.SortedMap; | |
24 | import java.util.TreeMap; | |
25 | import java.util.concurrent.atomic.AtomicInteger; | |
26 | import java.util.concurrent.atomic.AtomicLongArray; | |
27 | import java.util.concurrent.locks.Condition; | |
28 | import java.util.concurrent.locks.Lock; | |
29 | import java.util.concurrent.locks.ReentrantLock; | |
30 | ||
31 | import com.allanbank.mongodb.LockType; | |
32 | import com.allanbank.mongodb.client.message.PendingMessageQueue; | |
33 | ||
34 | /** | |
35 | * Sequence provides the ability to synchronize the access to the socket's | |
36 | * output stream. The thread starts by reserving a position via the reserve | |
37 | * method and then prepares to send the messages. It will then wait for its turn | |
38 | * to send and finally release the sequence. | |
39 | * <p> | |
40 | * We use an array of longs to avoid false sharing. | |
41 | * </p> | |
42 | * | |
43 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
44 | * mutated in incompatible ways between any two releases of the driver. | |
45 | * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved | |
46 | */ | |
47 | /* package */class Sequence { | |
48 | ||
49 | /** The offset of the release value. */ | |
50 | private static final int RELEASE_OFFSET = 15 + 7; | |
51 | ||
52 | /** The offset of the reserve value. */ | |
53 | private static final int RESERVE_OFFSET = 7; | |
54 | ||
55 | /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */ | |
56 | 1 | private static final long YIELD_TIME_NS = PendingMessageQueue.YIELD_TIME_NS; |
57 | ||
58 | /** The condition used when there are waiters. */ | |
59 | private final Condition myCondition; | |
60 | ||
61 | /** The mutex used with the sequence. */ | |
62 | private final Lock myLock; | |
63 | ||
64 | /** The lock type to use with the sequence. */ | |
65 | private final LockType myLockType; | |
66 | ||
67 | /** The atomic array of long values. */ | |
68 | 145 | private final AtomicLongArray myPaddedValue = new AtomicLongArray(30); |
69 | ||
70 | /** | |
71 | * The map of waiters and the condition they are waiting on to avoid the | |
72 | * thundering herd. Only access while holding the {@link #myLock lock}. | |
73 | */ | |
74 | private final SortedMap<Long, Condition> myWaiters; | |
75 | ||
76 | /** Tracks how many threads are waiting for a message or a space to open. */ | |
77 | private final AtomicInteger myWaiting; | |
78 | ||
79 | /** | |
80 | * Create a sequence with a specified initial value. | |
81 | * | |
82 | * @param initialValue | |
83 | * The initial value for this sequence. | |
84 | */ | |
85 | public Sequence(final long initialValue) { | |
86 | 6 | this(initialValue, LockType.MUTEX); |
87 | 6 | } |
88 | ||
89 | /** | |
90 | * Create a sequence with a specified initial value. | |
91 | * | |
92 | * @param initialValue | |
93 | * The initial value for this sequence. | |
94 | * @param lockType | |
95 | * The lock type to use with the sequence. | |
96 | */ | |
97 | 145 | public Sequence(final long initialValue, final LockType lockType) { |
98 | 145 | myPaddedValue.set(RESERVE_OFFSET, initialValue); |
99 | 145 | myPaddedValue.set(RELEASE_OFFSET, initialValue); |
100 | ||
101 | 145 | myLockType = lockType; |
102 | ||
103 | 145 | myLock = new ReentrantLock(true); |
104 | 145 | myCondition = myLock.newCondition(); |
105 | 145 | myWaiting = new AtomicInteger(0); |
106 | 145 | myWaiters = new TreeMap<Long, Condition>(); |
107 | 145 | } |
108 | ||
109 | /** | |
110 | * Returns an estimate of the number of waiters. | |
111 | * | |
112 | * @return The waiters. | |
113 | */ | |
114 | public int getWaitersCount() { | |
115 | 3 | final long reserve = myPaddedValue.get(RESERVE_OFFSET); |
116 | 3 | final long release = myPaddedValue.get(RELEASE_OFFSET); |
117 | ||
118 | 3 | return (int) (release - reserve); |
119 | } | |
120 | ||
121 | /** | |
122 | * Returns true if the sequence is idle (reserve == release). | |
123 | * | |
124 | * @return True if the sequence is idle. | |
125 | */ | |
126 | public boolean isIdle() { | |
127 | 20 | final long reserve = myPaddedValue.get(RESERVE_OFFSET); |
128 | 20 | final long release = myPaddedValue.get(RELEASE_OFFSET); |
129 | 20 | return (reserve == release); |
130 | } | |
131 | ||
132 | /** | |
133 | * Checks if there is a waiter for the sequence to be released. | |
134 | * | |
135 | * @param expectedReserve | |
136 | * The expected value for the reserve if there is no waiter. | |
137 | * @return True if there is a waiter (e.g., the reserve has advanced). | |
138 | */ | |
139 | public boolean noWaiter(final long expectedReserve) { | |
140 | 297 | final long reserve = myPaddedValue.get(RESERVE_OFFSET); |
141 | ||
142 | 297 | return (reserve == expectedReserve); |
143 | } | |
144 | ||
145 | /** | |
146 | * Release the position in the sequence. | |
147 | * | |
148 | * @param expectedValue | |
149 | * The expected/reserved value for the sequence. | |
150 | * @param newValue | |
151 | * The new value for the sequence. | |
152 | */ | |
153 | public void release(final long expectedValue, final long newValue) { | |
154 | 3669765 | while (!compareAndSetRelease(expectedValue, newValue)) { |
155 | // Let another thread make progress - should not really spin if we | |
156 | // did a waitFor. | |
157 | 4121865 | Thread.yield(); |
158 | } | |
159 | 21796 | notifyWaiters(); |
160 | 21792 | } |
161 | ||
162 | /** | |
163 | * Reserves a spot in the sequence for the messages to be sent. | |
164 | * | |
165 | * @param numberOfMessages | |
166 | * The number of messages to be sent. | |
167 | * @return The current value of the sequence. | |
168 | */ | |
169 | public long reserve(final long numberOfMessages) { | |
170 | long current; | |
171 | long next; | |
172 | ||
173 | do { | |
174 | 21807 | current = myPaddedValue.get(RESERVE_OFFSET); |
175 | 21808 | next = current + numberOfMessages; |
176 | } | |
177 | 21808 | while (!compareAndSetReserve(current, next)); |
178 | ||
179 | 21797 | return current; |
180 | } | |
181 | ||
182 | /** | |
183 | * Waits for the reserved sequence to be released. | |
184 | * | |
185 | * @param wanted | |
186 | * The sequence to wait to be released. | |
187 | */ | |
188 | public void waitFor(final long wanted) { | |
189 | 21746 | long releaseValue = myPaddedValue.get(RELEASE_OFFSET); |
190 | 42615 | while (releaseValue != wanted) { |
191 | 20900 | if (myLockType == LockType.LOW_LATENCY_SPIN) { |
192 | 10390 | long now = System.nanoTime(); |
193 | 10391 | final long yeildDeadline = now + YIELD_TIME_NS; |
194 | ||
195 | 10391 | releaseValue = myPaddedValue.get(RELEASE_OFFSET); |
196 | 4693377 | while ((now < yeildDeadline) && (releaseValue != wanted)) { |
197 | // Let another thread make progress. | |
198 | 4774639 | Thread.yield(); |
199 | 4610042 | now = System.nanoTime(); |
200 | 4628857 | releaseValue = myPaddedValue.get(RELEASE_OFFSET); |
201 | } | |
202 | } | |
203 | ||
204 | // Block. | |
205 | 20900 | if (releaseValue != wanted) { |
206 | 20506 | final Long key = Long.valueOf(wanted); |
207 | 20501 | Condition localCondition = myCondition; |
208 | try { | |
209 | 20503 | final int waitCount = myWaiting.incrementAndGet(); |
210 | 20506 | myLock.lock(); |
211 | ||
212 | // Second tier try for FindBugs to see the unlock. | |
213 | try { | |
214 | // Check for more than 1 waiter. If so stand in line via | |
215 | // the waiters map. (This will wake threads in the order | |
216 | // they should be processed.) | |
217 | 20506 | if (waitCount > 1) { |
218 | 20501 | localCondition = myLock.newCondition(); |
219 | 20501 | myWaiters.put(key, localCondition); |
220 | } | |
221 | ||
222 | 20506 | releaseValue = myPaddedValue.get(RELEASE_OFFSET); |
223 | 40552 | while (releaseValue != wanted) { |
224 | 20046 | localCondition.awaitUninterruptibly(); |
225 | 20046 | releaseValue = myPaddedValue.get(RELEASE_OFFSET); |
226 | } | |
227 | } | |
228 | finally { | |
229 | 20506 | if (localCondition != myCondition) { |
230 | 20501 | myWaiters.remove(key); |
231 | } | |
232 | } | |
233 | } | |
234 | finally { | |
235 | 20506 | myLock.unlock(); |
236 | 20506 | myWaiting.decrementAndGet(); |
237 | 20506 | } |
238 | 20506 | } |
239 | } | |
240 | 21747 | } |
241 | ||
242 | /** | |
243 | * Perform a compare and set operation on the sequence release position. | |
244 | * | |
245 | * @param expectedValue | |
246 | * The expected current value. | |
247 | * @param newValue | |
248 | * The value to update to. | |
249 | * @return true if the operation succeeds, false otherwise. | |
250 | */ | |
251 | private boolean compareAndSetRelease(final long expectedValue, | |
252 | final long newValue) { | |
253 | 3752329 | return myPaddedValue.compareAndSet(RELEASE_OFFSET, expectedValue, |
254 | newValue); | |
255 | } | |
256 | ||
257 | /** | |
258 | * Perform a compare and set operation on the sequence reserve position. | |
259 | * | |
260 | * @param expectedValue | |
261 | * The expected current value. | |
262 | * @param newValue | |
263 | * The value to update to. | |
264 | * @return true if the operation succeeds, false otherwise. | |
265 | */ | |
266 | private boolean compareAndSetReserve(final long expectedValue, | |
267 | final long newValue) { | |
268 | 21808 | return myPaddedValue.compareAndSet(RESERVE_OFFSET, expectedValue, |
269 | newValue); | |
270 | } | |
271 | ||
272 | /** | |
273 | * Notifies the waiting threads that the state of the sequence has changed. | |
274 | */ | |
275 | private void notifyWaiters() { | |
276 | 21793 | if (myWaiting.get() > 0) { |
277 | try { | |
278 | 20543 | myLock.lock(); |
279 | ||
280 | // Wake the reused condition. | |
281 | 20544 | myCondition.signalAll(); |
282 | ||
283 | // Wake up the condition with the lowest wanted. | |
284 | // No Thundering Herd! | |
285 | 20544 | if (!myWaiters.isEmpty()) { |
286 | 20073 | myWaiters.get(myWaiters.firstKey()).signalAll(); |
287 | } | |
288 | } | |
289 | finally { | |
290 | 20544 | myLock.unlock(); |
291 | 20544 | } |
292 | } | |
293 | 21795 | } |
294 | } |