View Javadoc
1   /*
2    * #%L
3    * PendingMessageQueue.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.message;
21  
22  import java.util.List;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.concurrent.locks.Condition;
26  import java.util.concurrent.locks.Lock;
27  import java.util.concurrent.locks.ReentrantLock;
28  
29  import com.allanbank.mongodb.LockType;
30  import com.allanbank.mongodb.client.Message;
31  import com.allanbank.mongodb.client.callback.ReplyCallback;
32  
33  /**
34   * PendingMessageQueue provides an optimized queue for pending messages inspired
35   * by the Disruptor project.
36   * <p>
37   * To reduce thread contention the queue uses a set of integer values to track
38   * the position of the ready messages (the last message that is ready to be
39   * read), reserve (the first message that can be reserved to be written to), and
40   * the take (the next (first) message to be read). For an infinite queue the
41   * following invariant holds: <blockquote>
42   * 
43   * <pre>
44   * <code>
45   * take &lt; readyBefore &lt;= reserve
46   * </code>
47   * </pre>
48   * 
49   * </blockquote> To make handling a limited size queue easier the size of the
50   * queue is forced to power of 2 less than {@value #MAX_SIZE}. The roll over can
51   * then be handled with a simple mask operation.
52   * </p>
53   * <p>
54   * Rather than allocate a pending message per request we use an array of
55   * pre-allocated PendingMessages and copy the data into and out of the objects.
56   * this has a net positive effect on object allocation and garbage collection
57   * time at the cost of a longer initialization.
58   * </p>
59   * <p>
60   * Lastly, This queue assumes there is a single consumer of messages. This is
61   * true for the driver's use case but don't copy the code and expect it to work
62   * with multiple consumers. The consumer should use the following basic
63   * structure: <blockquote>
64   * 
65   * <pre>
66   * <code>
67   * PendingMessage pm = new {@link PendingMessage}();
68   * 
69   * queue.take(pm); // Blocks.
70   * // Handle the message.
71   * 
72   * // or
73   * 
74   * if( queue.poll(pm) ) { // Non-blocking.
75   *    // Handle The Message.
76   * }
77   * </code>
78   * </pre>
79   * 
80   * </blockquote>
81   * </p>
82   * <p>
83   * <b>Warning: </b> This class has been carefully tuned for the driver's use
84   * case. Changes should be carefully bench marked and tested. Comments have been
85   * embedded in the source indicating attempted changes and reverts. Due to its
86   * position in the driver subtle changes in this class can cause large changes
87   * in the performance of the driver.
88   * </p>
89   * 
90   * @see <a href="http://code.google.com/p/disruptor/">Disruptor Project</a>
91   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
92   *         mutated in incompatible ways between any two releases of the driver.
93   * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
94   */
95  public final class PendingMessageQueue {
96  
97      /** The mask for constraining the size the message id. */
98      public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF;
99  
100     /**
101      * The maximum size of the queue. This it currently 2^20 but must be at most
102      * 2^30 to ensure masking works.
103      */
104     public static final int MAX_SIZE = (1 << 20);
105 
106     /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
107     public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
108 
109     /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
110     public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
111 
112     /** Number of times to spin before trying something different. */
113     private static final int SPIN_ITERATIONS = 10000;
114 
115     /** The condition used with the queue being full or empty. */
116     private final Condition myCondition;
117 
118     /** The mutex used with the queue. */
119     private final Lock myLock;
120 
121     /** The lock type to use with the queue. */
122     private final LockType myLockType;
123 
124     /** Tracks how many times we have looped through the ring buffer. */
125     private final AtomicInteger myLooped;
126 
127     /** The mask being used. */
128     private final int myMask;
129 
130     /** The queue of pending messages. */
131     private final PendingMessage[] myQueue;
132 
133     /**
134      * The position of the last message that is ready to be taken.
135      * <p>
136      * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
137      * queue is empty.
138      * </p>
139      */
140     private final AtomicInteger myReadyBeforePosition;
141 
142     /**
143      * The position of the next message that can be reserved.
144      * <p>
145      * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
146      * queue is full.
147      * </p>
148      */
149     private final AtomicInteger myReservePosition;
150 
151     /**
152      * The position of the next message that can be taken.
153      * <p>
154      * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
155      * queue is full.
156      * </p>
157      * <p>
158      * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
159      * queue is empty.
160      * </p>
161      */
162     private volatile int myTakePosition;
163 
164     /** Tracks how many threads are waiting for a message or a space to open. */
165     private final AtomicInteger myWaiting;
166 
167     /**
168      * Creates a new PendingMessageQueue.
169      * 
170      * @param size
171      *            The size of the queue to create.
172      * @param lockType
173      *            The lock type to use with the queue.
174      */
175     public PendingMessageQueue(final int size, final LockType lockType) {
176         int power = size;
177         if (MAX_SIZE < size) {
178             power = MAX_SIZE;
179         }
180         else if (Integer.bitCount(size) != 1) {
181             // Find the next larger power of 2.
182             power = 1;
183             while ((power < size) && (power != 0)) {
184                 power <<= 1;
185             }
186         }
187 
188         myLockType = lockType;
189         myQueue = new PendingMessage[power];
190         for (int i = 0; i < myQueue.length; ++i) {
191             myQueue[i] = new PendingMessage(0, null);
192         }
193         myMask = (power - 1);
194 
195         myLooped = new AtomicInteger(0);
196         myTakePosition = -1;
197         myReadyBeforePosition = new AtomicInteger(0);
198         myReservePosition = new AtomicInteger(0);
199         myWaiting = new AtomicInteger(0);
200 
201         myLock = new ReentrantLock();
202         myCondition = myLock.newCondition();
203     }
204 
205     /**
206      * Returns the size of the queue.
207      * 
208      * @return The size of the queue.
209      */
210     public int capacity() {
211         return myQueue.length - 1;
212     }
213 
214     /**
215      * Drains the list of pending messages into the provided list.
216      * 
217      * @param pending
218      *            The list to add all of the pending messages to.
219      */
220     public void drainTo(final List<PendingMessage> pending) {
221         PendingMessage pm = new PendingMessage();
222         while (poll(pm)) {
223             pending.add(pm);
224             pm = new PendingMessage();
225         }
226     }
227 
228     /**
229      * Returns true if the queue is empty. e.g., the next take position is the
230      * read before position.
231      * 
232      * @return If the queue is empty.
233      */
234     public boolean isEmpty() {
235         final int take = myTakePosition;
236         final int readyBefore = myReadyBeforePosition.get();
237 
238         return (readyBefore == take) || (take < 0);
239     }
240 
241     /**
242      * Puts a message onto the queue. This method will not block waiting for a
243      * space to add the message.
244      * 
245      * @param message
246      *            The message to add.
247      * @param replyCallback
248      *            The callback for the message to add.
249      * @return True if the message was added, false otherwise.
250      */
251     public boolean offer(final Message message,
252             final ReplyCallback replyCallback) {
253 
254         final int loop = myLooped.get();
255         final int reserve = offer();
256         if (reserve < 0) {
257             return false;
258         }
259 
260         final int messageid = toMessageId(loop, reserve);
261 
262         myQueue[reserve].set(messageid, message, replyCallback);
263 
264         markReady(reserve);
265 
266         return true;
267     }
268 
269     /**
270      * Puts a message onto the queue. This method will not block waiting for a
271      * space to add the message.
272      * 
273      * @param pendingMessage
274      *            The message to add.
275      * @return True if the message was added, false otherwise.
276      */
277     public boolean offer(final PendingMessage pendingMessage) {
278         final int reserve = offer();
279         if (reserve < 0) {
280             return false;
281         }
282 
283         myQueue[reserve].set(pendingMessage);
284 
285         markReady(reserve);
286 
287         return true;
288     }
289 
290     /**
291      * Returns the next message from the queue without blocking. <blockquote>
292      * 
293      * <pre>
294      * <code>
295      * PendingMessage pm = new PendingMessage();
296      * if( queue.poll(pm) } {
297      *    // Handle the message copied into pm.
298      * }
299      * </code>
300      * </pre>
301      * 
302      * </blockquote>
303      * 
304      * @param copyOut
305      *            The {@link PendingMessage} to copy the pending message into.
306      * @return True if the pending message was updated.
307      */
308     public boolean poll(final PendingMessage copyOut) {
309         boolean result = false;
310         final int take = myTakePosition;
311         if ((myReadyBeforePosition.get() != take) && (take >= 0)) { // Empty,
312             // Not
313             // started?
314             copyOut.set(myQueue[take]);
315             myQueue[take].clear();
316             result = true;
317 
318             myTakePosition = increment(take);
319             notifyWaiters(false);
320         }
321 
322         return result;
323     }
324 
325     /**
326      * Puts a message onto the queue. This method will block waiting for a space
327      * to add the message.
328      * 
329      * @param message
330      *            The message to add.
331      * @param replyCallback
332      *            The callback for the message to add.
333      * 
334      * @throws InterruptedException
335      *             If the thread is interrupted while waiting for the message.
336      *             If thrown the message will not have been enqueued.
337      */
338     public void put(final Message message, final ReplyCallback replyCallback)
339             throws InterruptedException {
340 
341         int loop = myLooped.get();
342         int reserve = offer();
343         if (reserve < 0) {
344 
345             // Spinning here appears to slow things down.
346 
347             // Block.
348             try {
349                 myWaiting.incrementAndGet();
350                 myLock.lock();
351 
352                 loop = myLooped.get();
353                 reserve = offer();
354                 while (reserve < 0) {
355                     myCondition.await();
356                     loop = myLooped.get();
357                     reserve = offer();
358                 }
359             }
360             finally {
361                 myLock.unlock();
362                 myWaiting.decrementAndGet();
363             }
364         }
365 
366         final int messageid = toMessageId(loop, reserve);
367 
368         myQueue[reserve].set(messageid, message, replyCallback);
369 
370         markReady(reserve);
371     }
372 
373     /**
374      * Puts two messages onto the queue. This method will block waiting for a
375      * space to add the messages but ensures the messages are in sequence in the
376      * queue.
377      * 
378      * @param message
379      *            The first message to add.
380      * @param replyCallback
381      *            The callback for the first message to add.
382      * @param message2
383      *            The second message to add.
384      * @param replyCallback2
385      *            The callback for the second message to add.
386      * 
387      * @throws InterruptedException
388      *             If the thread is interrupted while waiting for the message.
389      *             If thrown neither message will have been enqueued.
390      */
391     public void put(final Message message, final ReplyCallback replyCallback,
392             final Message message2, final ReplyCallback replyCallback2)
393             throws InterruptedException {
394         int loop = myLooped.get();
395         int reserve = offer2();
396         if (reserve < 0) {
397 
398             // Spinning here appears to slow things down.
399             try {
400                 myWaiting.incrementAndGet();
401                 myLock.lock();
402 
403                 loop = myLooped.get();
404                 reserve = offer2();
405                 while (reserve < 0) {
406                     myCondition.await();
407                     loop = myLooped.get();
408                     reserve = offer2();
409                 }
410             }
411             finally {
412                 myLock.unlock();
413                 myWaiting.decrementAndGet();
414             }
415         }
416 
417         // Use reserve + 1 for the second message id since it may have looped
418         // and then the math does not work out causing messageId2 to be lower
419         // than
420         // messageId1, which is bad.
421         final int messageId1 = toMessageId(loop, reserve);
422         final int messageId2 = toMessageId(loop, reserve + 1);
423 
424         final int second = increment(reserve);
425         myQueue[reserve].set(messageId1, message, replyCallback);
426         myQueue[second].set(messageId2, message2, replyCallback2);
427 
428         markReady2(reserve);
429     }
430 
431     /**
432      * Puts a message onto the queue. This method will block waiting for a space
433      * to add the message.
434      * 
435      * @param pendingMessage
436      *            The message to add.
437      * 
438      * @throws InterruptedException
439      *             If the thread is interrupted while waiting for the message.
440      *             If thrown the message will not have been enqueued.
441      */
442     public void put(final PendingMessage pendingMessage)
443             throws InterruptedException {
444         int reserve = offer();
445         if (reserve < 0) {
446 
447             // Spinning here appears to slow things down.
448 
449             try {
450                 myWaiting.incrementAndGet();
451                 myLock.lock();
452 
453                 reserve = offer();
454                 while (reserve < 0) {
455                     myCondition.await();
456                     reserve = offer();
457                 }
458             }
459             finally {
460                 myLock.unlock();
461                 myWaiting.decrementAndGet();
462             }
463         }
464 
465         myQueue[reserve].set(pendingMessage);
466 
467         markReady(reserve);
468     }
469 
470     /**
471      * Returns the number of messages in the queue.
472      * 
473      * @return The number of messages in the queue.
474      */
475     public int size() {
476         final int take = myTakePosition;
477         final int ready = myReadyBeforePosition.get();
478 
479         if (take < 0) {
480             return 0;
481         }
482         else if (take <= ready) {
483             return (ready - take);
484         }
485 
486         return (myQueue.length - take) + ready;
487     }
488 
489     /**
490      * Returns the next message from the queue and will block waiting for a
491      * message.
492      * 
493      * @param copyOut
494      *            The {@link PendingMessage} to copy the pending message into.
495      * @throws InterruptedException
496      *             If the thread is interrupted while waiting for the message.
497      */
498     public void take(final PendingMessage copyOut) throws InterruptedException {
499         if (!poll(copyOut)) {
500 
501             // Spin/yeild loop.
502             if (myLockType == LockType.LOW_LATENCY_SPIN) {
503                 long now = 0;
504                 long spinDeadline = 1;
505                 long yeildDeadline = 1;
506                 while (now < yeildDeadline) {
507                     for (int i = 0; i < SPIN_ITERATIONS; ++i) {
508                         if (poll(copyOut)) {
509                             return;
510                         }
511                     }
512 
513                     // Pause?
514                     now = System.nanoTime();
515                     if (spinDeadline == 1) {
516                         spinDeadline = now + SPIN_TIME_NS;
517                         yeildDeadline = now + YIELD_TIME_NS;
518                         // First time free pass.
519                     }
520                     else {
521                         if ((spinDeadline < now) && (now < yeildDeadline)) {
522                             Thread.yield();
523                         }
524                     }
525                 }
526             }
527 
528             // Block.
529             try {
530                 myWaiting.incrementAndGet();
531                 myLock.lock();
532 
533                 while (!poll(copyOut)) {
534                     myCondition.await();
535                 }
536             }
537             finally {
538                 myLock.unlock();
539                 myWaiting.decrementAndGet();
540             }
541         }
542     }
543 
544     /**
545      * Increments the index handling roll-over.
546      * 
547      * @param index
548      *            The value to increment.
549      * @return The incremented value.
550      */
551     protected int increment(final int index) {
552         return ((index + 1) & myMask);
553     }
554 
555     /**
556      * Marks the position as ready by incrementing the ready position to the
557      * provided position. This method uses a spin lock assuming any other
558      * threads will increment the ready position quickly to the position just
559      * before {@code index}.
560      * 
561      * @param index
562      *            The index of the ready message.
563      */
564     protected void markReady(final int index) {
565         final int after = increment(index);
566 
567         while (!myReadyBeforePosition.compareAndSet(index, after)) {
568             // Spinning here slows things down because we know that the other
569             // thread should be runnable. Always Yield.
570             Thread.yield();
571         }
572 
573         // Pull take position into the queue.
574         if ((index == 0) && (myTakePosition == -1)) {
575             myTakePosition = index;
576         }
577 
578         notifyWaiters(false);
579     }
580 
581     /**
582      * Marks the position and the next position as ready by incrementing the
583      * ready position to the provided position + 1. This method uses a spin lock
584      * assuming any other threads will increment the ready position quickly to
585      * the position just before {@code index}.
586      * 
587      * @param index
588      *            The index of the ready message.
589      */
590     protected void markReady2(final int index) {
591         final int after = increment(index);
592         final int twoAfter = increment(after);
593 
594         while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) {
595             // Just keep swimming...
596             Thread.yield();
597         }
598 
599         // Pull take position into the queue.
600         if ((index == 0) && (myTakePosition == -1)) {
601             myTakePosition = index;
602         }
603 
604         // If someone is waiting let them know we created two messages.
605         notifyWaiters(true);
606     }
607 
608     /**
609      * Notifies the waiting threads that the state of the queue has changed.
610      * 
611      * @param all
612      *            If true then all threads will be woken. Otherwise only a
613      *            single thread is woken.
614      */
615     protected void notifyWaiters(final boolean all) {
616         if (myWaiting.get() > 0) {
617             try {
618                 myLock.lock();
619                 if (all) {
620                     myCondition.signalAll();
621                 }
622                 else {
623                     myCondition.signal();
624                 }
625             }
626             finally {
627                 myLock.unlock();
628             }
629         }
630     }
631 
632     /**
633      * Checks if there is remove for another message. If so returns the index of
634      * the message to update. If not return a value less then zero.
635      * 
636      * @return The position of the message that can be updated or a value of
637      *         less than zero if the queue is full.
638      */
639     protected int offer() {
640         int result = -1;
641         final int reserve = myReservePosition.get();
642         final int next = increment(reserve);
643         if ((myTakePosition != next) /* Full? */
644                 && myReservePosition.compareAndSet(reserve, next)) {
645 
646             // Got a slot.
647             result = reserve;
648 
649             // Check if we looped.
650             if (next < reserve) {
651                 myLooped.incrementAndGet();
652             }
653         }
654         return result;
655     }
656 
657     /**
658      * Checks if there is remove for another two message. If so returns the
659      * index of the first message to update. If not return a value less then
660      * zero.
661      * 
662      * @return The position of the first message that can be updated or a value
663      *         of less than zero if the queue is full.
664      */
665     protected int offer2() {
666         int result = -1;
667         final int reserve = myReservePosition.get();
668         final int first = increment(reserve);
669         final int second = increment(first);
670         final int take = myTakePosition;
671         if ((take != first) && (take != second) /* Full? */
672                 && myReservePosition.compareAndSet(reserve, second)) {
673 
674             // Got two slots. Return the first.
675             result = reserve;
676 
677             // Check if we looped.
678             if (second < reserve) {
679                 myLooped.incrementAndGet();
680             }
681         }
682         return result;
683     }
684 
685     /**
686      * Computes a new message id based on the current loop and reserve spot in
687      * the queue.
688      * 
689      * @param loop
690      *            The number of time the queue has looped over the queue.
691      * @param reserve
692      *            The reserved position in the queue. This can be a virtual
693      *            postion.
694      * @return The message id to use.
695      */
696     private int toMessageId(final int loop, final long reserve) {
697         final long loopOffset = (((long) loop) * myQueue.length);
698         if (loopOffset > MAX_MESSAGE_ID_MASK) {
699             myLooped.compareAndSet(loop, 0);
700         }
701         // Add an extra 1 so the first value is 1 instead of zero.
702         return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1;
703     }
704 }