1 /*
2 * #%L
3 * Sequence.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.connection.socket;
22
23 import java.util.SortedMap;
24 import java.util.TreeMap;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import java.util.concurrent.atomic.AtomicLongArray;
27 import java.util.concurrent.locks.Condition;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import com.allanbank.mongodb.LockType;
32 import com.allanbank.mongodb.client.message.PendingMessageQueue;
33
34 /**
35 * Sequence provides the ability to synchronize the access to the socket's
36 * output stream. The thread starts by reserving a position via the reserve
37 * method and then prepares to send the messages. It will then wait for its turn
38 * to send and finally release the sequence.
39 * <p>
40 * We use an array of longs to avoid false sharing.
41 * </p>
42 *
43 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
44 * mutated in incompatible ways between any two releases of the driver.
45 * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
46 */
47 /* package */class Sequence {
48
49 /** The offset of the release value. */
50 private static final int RELEASE_OFFSET = 15 + 7;
51
52 /** The offset of the reserve value. */
53 private static final int RESERVE_OFFSET = 7;
54
55 /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
56 private static final long YIELD_TIME_NS = PendingMessageQueue.YIELD_TIME_NS;
57
58 /** The condition used when there are waiters. */
59 private final Condition myCondition;
60
61 /** The mutex used with the sequence. */
62 private final Lock myLock;
63
64 /** The lock type to use with the sequence. */
65 private final LockType myLockType;
66
67 /** The atomic array of long values. */
68 private final AtomicLongArray myPaddedValue = new AtomicLongArray(30);
69
70 /**
71 * The map of waiters and the condition they are waiting on to avoid the
72 * thundering herd. Only access while holding the {@link #myLock lock}.
73 */
74 private final SortedMap<Long, Condition> myWaiters;
75
76 /** Tracks how many threads are waiting for a message or a space to open. */
77 private final AtomicInteger myWaiting;
78
79 /**
80 * Create a sequence with a specified initial value.
81 *
82 * @param initialValue
83 * The initial value for this sequence.
84 */
85 public Sequence(final long initialValue) {
86 this(initialValue, LockType.MUTEX);
87 }
88
89 /**
90 * Create a sequence with a specified initial value.
91 *
92 * @param initialValue
93 * The initial value for this sequence.
94 * @param lockType
95 * The lock type to use with the sequence.
96 */
97 public Sequence(final long initialValue, final LockType lockType) {
98 myPaddedValue.set(RESERVE_OFFSET, initialValue);
99 myPaddedValue.set(RELEASE_OFFSET, initialValue);
100
101 myLockType = lockType;
102
103 myLock = new ReentrantLock(true);
104 myCondition = myLock.newCondition();
105 myWaiting = new AtomicInteger(0);
106 myWaiters = new TreeMap<Long, Condition>();
107 }
108
109 /**
110 * Returns an estimate of the number of waiters.
111 *
112 * @return The waiters.
113 */
114 public int getWaitersCount() {
115 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
116 final long release = myPaddedValue.get(RELEASE_OFFSET);
117
118 return (int) (release - reserve);
119 }
120
121 /**
122 * Returns true if the sequence is idle (reserve == release).
123 *
124 * @return True if the sequence is idle.
125 */
126 public boolean isIdle() {
127 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
128 final long release = myPaddedValue.get(RELEASE_OFFSET);
129 return (reserve == release);
130 }
131
132 /**
133 * Checks if there is a waiter for the sequence to be released.
134 *
135 * @param expectedReserve
136 * The expected value for the reserve if there is no waiter.
137 * @return True if there is a waiter (e.g., the reserve has advanced).
138 */
139 public boolean noWaiter(final long expectedReserve) {
140 final long reserve = myPaddedValue.get(RESERVE_OFFSET);
141
142 return (reserve == expectedReserve);
143 }
144
145 /**
146 * Release the position in the sequence.
147 *
148 * @param expectedValue
149 * The expected/reserved value for the sequence.
150 * @param newValue
151 * The new value for the sequence.
152 */
153 public void release(final long expectedValue, final long newValue) {
154 while (!compareAndSetRelease(expectedValue, newValue)) {
155 // Let another thread make progress - should not really spin if we
156 // did a waitFor.
157 Thread.yield();
158 }
159 notifyWaiters();
160 }
161
162 /**
163 * Reserves a spot in the sequence for the messages to be sent.
164 *
165 * @param numberOfMessages
166 * The number of messages to be sent.
167 * @return The current value of the sequence.
168 */
169 public long reserve(final long numberOfMessages) {
170 long current;
171 long next;
172
173 do {
174 current = myPaddedValue.get(RESERVE_OFFSET);
175 next = current + numberOfMessages;
176 }
177 while (!compareAndSetReserve(current, next));
178
179 return current;
180 }
181
182 /**
183 * Waits for the reserved sequence to be released.
184 *
185 * @param wanted
186 * The sequence to wait to be released.
187 */
188 public void waitFor(final long wanted) {
189 long releaseValue = myPaddedValue.get(RELEASE_OFFSET);
190 while (releaseValue != wanted) {
191 if (myLockType == LockType.LOW_LATENCY_SPIN) {
192 long now = System.nanoTime();
193 final long yeildDeadline = now + YIELD_TIME_NS;
194
195 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
196 while ((now < yeildDeadline) && (releaseValue != wanted)) {
197 // Let another thread make progress.
198 Thread.yield();
199 now = System.nanoTime();
200 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
201 }
202 }
203
204 // Block.
205 if (releaseValue != wanted) {
206 final Long key = Long.valueOf(wanted);
207 Condition localCondition = myCondition;
208 try {
209 final int waitCount = myWaiting.incrementAndGet();
210 myLock.lock();
211
212 // Second tier try for FindBugs to see the unlock.
213 try {
214 // Check for more than 1 waiter. If so stand in line via
215 // the waiters map. (This will wake threads in the order
216 // they should be processed.)
217 if (waitCount > 1) {
218 localCondition = myLock.newCondition();
219 myWaiters.put(key, localCondition);
220 }
221
222 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
223 while (releaseValue != wanted) {
224 localCondition.awaitUninterruptibly();
225 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
226 }
227 }
228 finally {
229 if (localCondition != myCondition) {
230 myWaiters.remove(key);
231 }
232 }
233 }
234 finally {
235 myLock.unlock();
236 myWaiting.decrementAndGet();
237 }
238 }
239 }
240 }
241
242 /**
243 * Perform a compare and set operation on the sequence release position.
244 *
245 * @param expectedValue
246 * The expected current value.
247 * @param newValue
248 * The value to update to.
249 * @return true if the operation succeeds, false otherwise.
250 */
251 private boolean compareAndSetRelease(final long expectedValue,
252 final long newValue) {
253 return myPaddedValue.compareAndSet(RELEASE_OFFSET, expectedValue,
254 newValue);
255 }
256
257 /**
258 * Perform a compare and set operation on the sequence reserve position.
259 *
260 * @param expectedValue
261 * The expected current value.
262 * @param newValue
263 * The value to update to.
264 * @return true if the operation succeeds, false otherwise.
265 */
266 private boolean compareAndSetReserve(final long expectedValue,
267 final long newValue) {
268 return myPaddedValue.compareAndSet(RESERVE_OFFSET, expectedValue,
269 newValue);
270 }
271
272 /**
273 * Notifies the waiting threads that the state of the sequence has changed.
274 */
275 private void notifyWaiters() {
276 if (myWaiting.get() > 0) {
277 try {
278 myLock.lock();
279
280 // Wake the reused condition.
281 myCondition.signalAll();
282
283 // Wake up the condition with the lowest wanted.
284 // No Thundering Herd!
285 if (!myWaiters.isEmpty()) {
286 myWaiters.get(myWaiters.firstKey()).signalAll();
287 }
288 }
289 finally {
290 myLock.unlock();
291 }
292 }
293 }
294 }