View Javadoc
1   /*
2    * #%L
3    * Sequence.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.socket;
22  
23  import java.util.SortedMap;
24  import java.util.TreeMap;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.atomic.AtomicLongArray;
27  import java.util.concurrent.locks.Condition;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import com.allanbank.mongodb.LockType;
32  import com.allanbank.mongodb.client.message.PendingMessageQueue;
33  
34  /**
35   * Sequence provides the ability to synchronize the access to the socket's
36   * output stream. The thread starts by reserving a position via the reserve
37   * method and then prepares to send the messages. It will then wait for its turn
38   * to send and finally release the sequence.
39   * <p>
40   * We use an array of longs to avoid false sharing.
41   * </p>
42   * 
43   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
44   *         mutated in incompatible ways between any two releases of the driver.
45   * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
46   */
47  /* package */class Sequence {
48  
49      /** The offset of the release value. */
50      private static final int RELEASE_OFFSET = 15 + 7;
51  
52      /** The offset of the reserve value. */
53      private static final int RESERVE_OFFSET = 7;
54  
55      /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
56      private static final long YIELD_TIME_NS = PendingMessageQueue.YIELD_TIME_NS;
57  
58      /** The condition used when there are waiters. */
59      private final Condition myCondition;
60  
61      /** The mutex used with the sequence. */
62      private final Lock myLock;
63  
64      /** The lock type to use with the sequence. */
65      private final LockType myLockType;
66  
67      /** The atomic array of long values. */
68      private final AtomicLongArray myPaddedValue = new AtomicLongArray(30);
69  
70      /**
71       * The map of waiters and the condition they are waiting on to avoid the
72       * thundering herd. Only access while holding the {@link #myLock lock}.
73       */
74      private final SortedMap<Long, Condition> myWaiters;
75  
76      /** Tracks how many threads are waiting for a message or a space to open. */
77      private final AtomicInteger myWaiting;
78  
79      /**
80       * Create a sequence with a specified initial value.
81       * 
82       * @param initialValue
83       *            The initial value for this sequence.
84       */
85      public Sequence(final long initialValue) {
86          this(initialValue, LockType.MUTEX);
87      }
88  
89      /**
90       * Create a sequence with a specified initial value.
91       * 
92       * @param initialValue
93       *            The initial value for this sequence.
94       * @param lockType
95       *            The lock type to use with the sequence.
96       */
97      public Sequence(final long initialValue, final LockType lockType) {
98          myPaddedValue.set(RESERVE_OFFSET, initialValue);
99          myPaddedValue.set(RELEASE_OFFSET, initialValue);
100 
101         myLockType = lockType;
102 
103         myLock = new ReentrantLock(true);
104         myCondition = myLock.newCondition();
105         myWaiting = new AtomicInteger(0);
106         myWaiters = new TreeMap<Long, Condition>();
107     }
108 
109     /**
110      * Returns an estimate of the number of waiters.
111      * 
112      * @return The waiters.
113      */
114     public int getWaitersCount() {
115         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
116         final long release = myPaddedValue.get(RELEASE_OFFSET);
117 
118         return (int) (release - reserve);
119     }
120 
121     /**
122      * Returns true if the sequence is idle (reserve == release).
123      * 
124      * @return True if the sequence is idle.
125      */
126     public boolean isIdle() {
127         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
128         final long release = myPaddedValue.get(RELEASE_OFFSET);
129         return (reserve == release);
130     }
131 
132     /**
133      * Checks if there is a waiter for the sequence to be released.
134      * 
135      * @param expectedReserve
136      *            The expected value for the reserve if there is no waiter.
137      * @return True if there is a waiter (e.g., the reserve has advanced).
138      */
139     public boolean noWaiter(final long expectedReserve) {
140         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
141 
142         return (reserve == expectedReserve);
143     }
144 
145     /**
146      * Release the position in the sequence.
147      * 
148      * @param expectedValue
149      *            The expected/reserved value for the sequence.
150      * @param newValue
151      *            The new value for the sequence.
152      */
153     public void release(final long expectedValue, final long newValue) {
154         while (!compareAndSetRelease(expectedValue, newValue)) {
155             // Let another thread make progress - should not really spin if we
156             // did a waitFor.
157             Thread.yield();
158         }
159         notifyWaiters();
160     }
161 
162     /**
163      * Reserves a spot in the sequence for the messages to be sent.
164      * 
165      * @param numberOfMessages
166      *            The number of messages to be sent.
167      * @return The current value of the sequence.
168      */
169     public long reserve(final long numberOfMessages) {
170         long current;
171         long next;
172 
173         do {
174             current = myPaddedValue.get(RESERVE_OFFSET);
175             next = current + numberOfMessages;
176         }
177         while (!compareAndSetReserve(current, next));
178 
179         return current;
180     }
181 
182     /**
183      * Waits for the reserved sequence to be released.
184      * 
185      * @param wanted
186      *            The sequence to wait to be released.
187      */
188     public void waitFor(final long wanted) {
189         long releaseValue = myPaddedValue.get(RELEASE_OFFSET);
190         while (releaseValue != wanted) {
191             if (myLockType == LockType.LOW_LATENCY_SPIN) {
192                 long now = System.nanoTime();
193                 final long yeildDeadline = now + YIELD_TIME_NS;
194 
195                 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
196                 while ((now < yeildDeadline) && (releaseValue != wanted)) {
197                     // Let another thread make progress.
198                     Thread.yield();
199                     now = System.nanoTime();
200                     releaseValue = myPaddedValue.get(RELEASE_OFFSET);
201                 }
202             }
203 
204             // Block.
205             if (releaseValue != wanted) {
206                 final Long key = Long.valueOf(wanted);
207                 Condition localCondition = myCondition;
208                 try {
209                     final int waitCount = myWaiting.incrementAndGet();
210                     myLock.lock();
211 
212                     // Second tier try for FindBugs to see the unlock.
213                     try {
214                         // Check for more than 1 waiter. If so stand in line via
215                         // the waiters map. (This will wake threads in the order
216                         // they should be processed.)
217                         if (waitCount > 1) {
218                             localCondition = myLock.newCondition();
219                             myWaiters.put(key, localCondition);
220                         }
221 
222                         releaseValue = myPaddedValue.get(RELEASE_OFFSET);
223                         while (releaseValue != wanted) {
224                             localCondition.awaitUninterruptibly();
225                             releaseValue = myPaddedValue.get(RELEASE_OFFSET);
226                         }
227                     }
228                     finally {
229                         if (localCondition != myCondition) {
230                             myWaiters.remove(key);
231                         }
232                     }
233                 }
234                 finally {
235                     myLock.unlock();
236                     myWaiting.decrementAndGet();
237                 }
238             }
239         }
240     }
241 
242     /**
243      * Perform a compare and set operation on the sequence release position.
244      * 
245      * @param expectedValue
246      *            The expected current value.
247      * @param newValue
248      *            The value to update to.
249      * @return true if the operation succeeds, false otherwise.
250      */
251     private boolean compareAndSetRelease(final long expectedValue,
252             final long newValue) {
253         return myPaddedValue.compareAndSet(RELEASE_OFFSET, expectedValue,
254                 newValue);
255     }
256 
257     /**
258      * Perform a compare and set operation on the sequence reserve position.
259      * 
260      * @param expectedValue
261      *            The expected current value.
262      * @param newValue
263      *            The value to update to.
264      * @return true if the operation succeeds, false otherwise.
265      */
266     private boolean compareAndSetReserve(final long expectedValue,
267             final long newValue) {
268         return myPaddedValue.compareAndSet(RESERVE_OFFSET, expectedValue,
269                 newValue);
270     }
271 
272     /**
273      * Notifies the waiting threads that the state of the sequence has changed.
274      */
275     private void notifyWaiters() {
276         if (myWaiting.get() > 0) {
277             try {
278                 myLock.lock();
279 
280                 // Wake the reused condition.
281                 myCondition.signalAll();
282 
283                 // Wake up the condition with the lowest wanted.
284                 // No Thundering Herd!
285                 if (!myWaiters.isEmpty()) {
286                     myWaiters.get(myWaiters.firstKey()).signalAll();
287                 }
288             }
289             finally {
290                 myLock.unlock();
291             }
292         }
293     }
294 }