Coverage Report - com.allanbank.mongodb.client.message.PendingMessageQueue
 
Classes in this File Line Coverage Branch Coverage Complexity
PendingMessageQueue
91%
168/184
88%
80/90
3.579
 
 1  
 /*
 2  
  * #%L
 3  
  * PendingMessageQueue.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.message;
 21  
 
 22  
 import java.util.List;
 23  
 import java.util.concurrent.TimeUnit;
 24  
 import java.util.concurrent.atomic.AtomicInteger;
 25  
 import java.util.concurrent.locks.Condition;
 26  
 import java.util.concurrent.locks.Lock;
 27  
 import java.util.concurrent.locks.ReentrantLock;
 28  
 
 29  
 import com.allanbank.mongodb.LockType;
 30  
 import com.allanbank.mongodb.client.Message;
 31  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 32  
 
 33  
 /**
 34  
  * PendingMessageQueue provides an optimized queue for pending messages inspired
 35  
  * by the Disruptor project.
 36  
  * <p>
 37  
  * To reduce thread contention the queue uses a set of integer values to track
 38  
  * the position of the ready messages (the last message that is ready to be
 39  
  * read), reserve (the first message that can be reserved to be written to), and
 40  
  * the take (the next (first) message to be read). For an infinite queue the
 41  
  * following invariant holds: <blockquote>
 42  
  * 
 43  
  * <pre>
 44  
  * <code>
 45  
  * take &lt; readyBefore &lt;= reserve
 46  
  * </code>
 47  
  * </pre>
 48  
  * 
 49  
  * </blockquote> To make handling a limited size queue easier the size of the
 50  
  * queue is forced to power of 2 less than {@value #MAX_SIZE}. The roll over can
 51  
  * then be handled with a simple mask operation.
 52  
  * </p>
 53  
  * <p>
 54  
  * Rather than allocate a pending message per request we use an array of
 55  
  * pre-allocated PendingMessages and copy the data into and out of the objects.
 56  
  * this has a net positive effect on object allocation and garbage collection
 57  
  * time at the cost of a longer initialization.
 58  
  * </p>
 59  
  * <p>
 60  
  * Lastly, This queue assumes there is a single consumer of messages. This is
 61  
  * true for the driver's use case but don't copy the code and expect it to work
 62  
  * with multiple consumers. The consumer should use the following basic
 63  
  * structure: <blockquote>
 64  
  * 
 65  
  * <pre>
 66  
  * <code>
 67  
  * PendingMessage pm = new {@link PendingMessage}();
 68  
  * 
 69  
  * queue.take(pm); // Blocks.
 70  
  * // Handle the message.
 71  
  * 
 72  
  * // or
 73  
  * 
 74  
  * if( queue.poll(pm) ) { // Non-blocking.
 75  
  *    // Handle The Message.
 76  
  * }
 77  
  * </code>
 78  
  * </pre>
 79  
  * 
 80  
  * </blockquote>
 81  
  * </p>
 82  
  * <p>
 83  
  * <b>Warning: </b> This class has been carefully tuned for the driver's use
 84  
  * case. Changes should be carefully bench marked and tested. Comments have been
 85  
  * embedded in the source indicating attempted changes and reverts. Due to its
 86  
  * position in the driver subtle changes in this class can cause large changes
 87  
  * in the performance of the driver.
 88  
  * </p>
 89  
  * 
 90  
  * @see <a href="http://code.google.com/p/disruptor/">Disruptor Project</a>
 91  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 92  
  *         mutated in incompatible ways between any two releases of the driver.
 93  
  * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
 94  
  */
 95  
 public final class PendingMessageQueue {
 96  
 
 97  
     /** The mask for constraining the size the message id. */
 98  
     public static final long MAX_MESSAGE_ID_MASK = 0x0FFFFFFF;
 99  
 
 100  
     /**
 101  
      * The maximum size of the queue. This it currently 2^20 but must be at most
 102  
      * 2^30 to ensure masking works.
 103  
      */
 104  
     public static final int MAX_SIZE = (1 << 20);
 105  
 
 106  
     /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
 107  1
     public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
 108  
 
 109  
     /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
 110  1
     public static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
 111  
 
 112  
     /** Number of times to spin before trying something different. */
 113  
     private static final int SPIN_ITERATIONS = 10000;
 114  
 
 115  
     /** The condition used with the queue being full or empty. */
 116  
     private final Condition myCondition;
 117  
 
 118  
     /** The mutex used with the queue. */
 119  
     private final Lock myLock;
 120  
 
 121  
     /** The lock type to use with the queue. */
 122  
     private final LockType myLockType;
 123  
 
 124  
     /** Tracks how many times we have looped through the ring buffer. */
 125  
     private final AtomicInteger myLooped;
 126  
 
 127  
     /** The mask being used. */
 128  
     private final int myMask;
 129  
 
 130  
     /** The queue of pending messages. */
 131  
     private final PendingMessage[] myQueue;
 132  
 
 133  
     /**
 134  
      * The position of the last message that is ready to be taken.
 135  
      * <p>
 136  
      * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
 137  
      * queue is empty.
 138  
      * </p>
 139  
      */
 140  
     private final AtomicInteger myReadyBeforePosition;
 141  
 
 142  
     /**
 143  
      * The position of the next message that can be reserved.
 144  
      * <p>
 145  
      * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
 146  
      * queue is full.
 147  
      * </p>
 148  
      */
 149  
     private final AtomicInteger myReservePosition;
 150  
 
 151  
     /**
 152  
      * The position of the next message that can be taken.
 153  
      * <p>
 154  
      * When ({@link #myReservePosition} == ({@link #myTakePosition} - 1)) the
 155  
      * queue is full.
 156  
      * </p>
 157  
      * <p>
 158  
      * When ({@link #myReadyBeforePosition} == {@link #myTakePosition}) the
 159  
      * queue is empty.
 160  
      * </p>
 161  
      */
 162  
     private volatile int myTakePosition;
 163  
 
 164  
     /** Tracks how many threads are waiting for a message or a space to open. */
 165  
     private final AtomicInteger myWaiting;
 166  
 
 167  
     /**
 168  
      * Creates a new PendingMessageQueue.
 169  
      * 
 170  
      * @param size
 171  
      *            The size of the queue to create.
 172  
      * @param lockType
 173  
      *            The lock type to use with the queue.
 174  
      */
 175  254
     public PendingMessageQueue(final int size, final LockType lockType) {
 176  254
         int power = size;
 177  254
         if (MAX_SIZE < size) {
 178  2
             power = MAX_SIZE;
 179  
         }
 180  252
         else if (Integer.bitCount(size) != 1) {
 181  
             // Find the next larger power of 2.
 182  2
             power = 1;
 183  22
             while ((power < size) && (power != 0)) {
 184  20
                 power <<= 1;
 185  
             }
 186  
         }
 187  
 
 188  254
         myLockType = lockType;
 189  254
         myQueue = new PendingMessage[power];
 190  2347774
         for (int i = 0; i < myQueue.length; ++i) {
 191  2347520
             myQueue[i] = new PendingMessage(0, null);
 192  
         }
 193  254
         myMask = (power - 1);
 194  
 
 195  254
         myLooped = new AtomicInteger(0);
 196  254
         myTakePosition = -1;
 197  254
         myReadyBeforePosition = new AtomicInteger(0);
 198  254
         myReservePosition = new AtomicInteger(0);
 199  254
         myWaiting = new AtomicInteger(0);
 200  
 
 201  254
         myLock = new ReentrantLock();
 202  254
         myCondition = myLock.newCondition();
 203  254
     }
 204  
 
 205  
     /**
 206  
      * Returns the size of the queue.
 207  
      * 
 208  
      * @return The size of the queue.
 209  
      */
 210  
     public int capacity() {
 211  54260
         return myQueue.length - 1;
 212  
     }
 213  
 
 214  
     /**
 215  
      * Drains the list of pending messages into the provided list.
 216  
      * 
 217  
      * @param pending
 218  
      *            The list to add all of the pending messages to.
 219  
      */
 220  
     public void drainTo(final List<PendingMessage> pending) {
 221  2
         PendingMessage pm = new PendingMessage();
 222  2048
         while (poll(pm)) {
 223  2046
             pending.add(pm);
 224  2046
             pm = new PendingMessage();
 225  
         }
 226  2
     }
 227  
 
 228  
     /**
 229  
      * Returns true if the queue is empty. e.g., the next take position is the
 230  
      * read before position.
 231  
      * 
 232  
      * @return If the queue is empty.
 233  
      */
 234  
     public boolean isEmpty() {
 235  10467
         final int take = myTakePosition;
 236  10467
         final int readyBefore = myReadyBeforePosition.get();
 237  
 
 238  10467
         return (readyBefore == take) || (take < 0);
 239  
     }
 240  
 
 241  
     /**
 242  
      * Puts a message onto the queue. This method will not block waiting for a
 243  
      * space to add the message.
 244  
      * 
 245  
      * @param message
 246  
      *            The message to add.
 247  
      * @param replyCallback
 248  
      *            The callback for the message to add.
 249  
      * @return True if the message was added, false otherwise.
 250  
      */
 251  
     public boolean offer(final Message message,
 252  
             final ReplyCallback replyCallback) {
 253  
 
 254  0
         final int loop = myLooped.get();
 255  0
         final int reserve = offer();
 256  0
         if (reserve < 0) {
 257  0
             return false;
 258  
         }
 259  
 
 260  0
         final int messageid = toMessageId(loop, reserve);
 261  
 
 262  0
         myQueue[reserve].set(messageid, message, replyCallback);
 263  
 
 264  0
         markReady(reserve);
 265  
 
 266  0
         return true;
 267  
     }
 268  
 
 269  
     /**
 270  
      * Puts a message onto the queue. This method will not block waiting for a
 271  
      * space to add the message.
 272  
      * 
 273  
      * @param pendingMessage
 274  
      *            The message to add.
 275  
      * @return True if the message was added, false otherwise.
 276  
      */
 277  
     public boolean offer(final PendingMessage pendingMessage) {
 278  8464
         final int reserve = offer();
 279  8464
         if (reserve < 0) {
 280  2
             return false;
 281  
         }
 282  
 
 283  8462
         myQueue[reserve].set(pendingMessage);
 284  
 
 285  8462
         markReady(reserve);
 286  
 
 287  8462
         return true;
 288  
     }
 289  
 
 290  
     /**
 291  
      * Returns the next message from the queue without blocking. <blockquote>
 292  
      * 
 293  
      * <pre>
 294  
      * <code>
 295  
      * PendingMessage pm = new PendingMessage();
 296  
      * if( queue.poll(pm) } {
 297  
      *    // Handle the message copied into pm.
 298  
      * }
 299  
      * </code>
 300  
      * </pre>
 301  
      * 
 302  
      * </blockquote>
 303  
      * 
 304  
      * @param copyOut
 305  
      *            The {@link PendingMessage} to copy the pending message into.
 306  
      * @return True if the pending message was updated.
 307  
      */
 308  
     public boolean poll(final PendingMessage copyOut) {
 309  344965415
         boolean result = false;
 310  344965415
         final int take = myTakePosition;
 311  344965415
         if ((myReadyBeforePosition.get() != take) && (take >= 0)) { // Empty,
 312  
             // Not
 313  
             // started?
 314  484240
             copyOut.set(myQueue[take]);
 315  484240
             myQueue[take].clear();
 316  484240
             result = true;
 317  
 
 318  484240
             myTakePosition = increment(take);
 319  484240
             notifyWaiters(false);
 320  
         }
 321  
 
 322  344965415
         return result;
 323  
     }
 324  
 
 325  
     /**
 326  
      * Puts a message onto the queue. This method will block waiting for a space
 327  
      * to add the message.
 328  
      * 
 329  
      * @param message
 330  
      *            The message to add.
 331  
      * @param replyCallback
 332  
      *            The callback for the message to add.
 333  
      * 
 334  
      * @throws InterruptedException
 335  
      *             If the thread is interrupted while waiting for the message.
 336  
      *             If thrown the message will not have been enqueued.
 337  
      */
 338  
     public void put(final Message message, final ReplyCallback replyCallback)
 339  
             throws InterruptedException {
 340  
 
 341  24586
         int loop = myLooped.get();
 342  24586
         int reserve = offer();
 343  24586
         if (reserve < 0) {
 344  
 
 345  
             // Spinning here appears to slow things down.
 346  
 
 347  
             // Block.
 348  
             try {
 349  2
                 myWaiting.incrementAndGet();
 350  2
                 myLock.lock();
 351  
 
 352  2
                 loop = myLooped.get();
 353  2
                 reserve = offer();
 354  2
                 while (reserve < 0) {
 355  2
                     myCondition.await();
 356  0
                     loop = myLooped.get();
 357  0
                     reserve = offer();
 358  
                 }
 359  
             }
 360  
             finally {
 361  2
                 myLock.unlock();
 362  2
                 myWaiting.decrementAndGet();
 363  0
             }
 364  
         }
 365  
 
 366  24584
         final int messageid = toMessageId(loop, reserve);
 367  
 
 368  24584
         myQueue[reserve].set(messageid, message, replyCallback);
 369  
 
 370  24584
         markReady(reserve);
 371  24584
     }
 372  
 
 373  
     /**
 374  
      * Puts two messages onto the queue. This method will block waiting for a
 375  
      * space to add the messages but ensures the messages are in sequence in the
 376  
      * queue.
 377  
      * 
 378  
      * @param message
 379  
      *            The first message to add.
 380  
      * @param replyCallback
 381  
      *            The callback for the first message to add.
 382  
      * @param message2
 383  
      *            The second message to add.
 384  
      * @param replyCallback2
 385  
      *            The callback for the second message to add.
 386  
      * 
 387  
      * @throws InterruptedException
 388  
      *             If the thread is interrupted while waiting for the message.
 389  
      *             If thrown neither message will have been enqueued.
 390  
      */
 391  
     public void put(final Message message, final ReplyCallback replyCallback,
 392  
             final Message message2, final ReplyCallback replyCallback2)
 393  
             throws InterruptedException {
 394  3073
         int loop = myLooped.get();
 395  3073
         int reserve = offer2();
 396  3073
         if (reserve < 0) {
 397  
 
 398  
             // Spinning here appears to slow things down.
 399  
             try {
 400  2
                 myWaiting.incrementAndGet();
 401  2
                 myLock.lock();
 402  
 
 403  2
                 loop = myLooped.get();
 404  2
                 reserve = offer2();
 405  2
                 while (reserve < 0) {
 406  2
                     myCondition.await();
 407  0
                     loop = myLooped.get();
 408  0
                     reserve = offer2();
 409  
                 }
 410  
             }
 411  
             finally {
 412  2
                 myLock.unlock();
 413  2
                 myWaiting.decrementAndGet();
 414  0
             }
 415  
         }
 416  
 
 417  
         // Use reserve + 1 for the second message id since it may have looped
 418  
         // and then the math does not work out causing messageId2 to be lower
 419  
         // than
 420  
         // messageId1, which is bad.
 421  3071
         final int messageId1 = toMessageId(loop, reserve);
 422  3071
         final int messageId2 = toMessageId(loop, reserve + 1);
 423  
 
 424  3071
         final int second = increment(reserve);
 425  3071
         myQueue[reserve].set(messageId1, message, replyCallback);
 426  3071
         myQueue[second].set(messageId2, message2, replyCallback2);
 427  
 
 428  3071
         markReady2(reserve);
 429  3071
     }
 430  
 
 431  
     /**
 432  
      * Puts a message onto the queue. This method will block waiting for a space
 433  
      * to add the message.
 434  
      * 
 435  
      * @param pendingMessage
 436  
      *            The message to add.
 437  
      * 
 438  
      * @throws InterruptedException
 439  
      *             If the thread is interrupted while waiting for the message.
 440  
      *             If thrown the message will not have been enqueued.
 441  
      */
 442  
     public void put(final PendingMessage pendingMessage)
 443  
             throws InterruptedException {
 444  452738
         int reserve = offer();
 445  453154
         if (reserve < 0) {
 446  
 
 447  
             // Spinning here appears to slow things down.
 448  
 
 449  
             try {
 450  16162
                 myWaiting.incrementAndGet();
 451  16170
                 myLock.lock();
 452  
 
 453  16178
                 reserve = offer();
 454  199767
                 while (reserve < 0) {
 455  183591
                     myCondition.await();
 456  183589
                     reserve = offer();
 457  
                 }
 458  
             }
 459  
             finally {
 460  16178
                 myLock.unlock();
 461  16178
                 myWaiting.decrementAndGet();
 462  16176
             }
 463  
         }
 464  
 
 465  453234
         myQueue[reserve].set(pendingMessage);
 466  
 
 467  453222
         markReady(reserve);
 468  453188
     }
 469  
 
 470  
     /**
 471  
      * Returns the number of messages in the queue.
 472  
      * 
 473  
      * @return The number of messages in the queue.
 474  
      */
 475  
     public int size() {
 476  11267
         final int take = myTakePosition;
 477  11267
         final int ready = myReadyBeforePosition.get();
 478  
 
 479  11267
         if (take < 0) {
 480  3
             return 0;
 481  
         }
 482  11264
         else if (take <= ready) {
 483  11256
             return (ready - take);
 484  
         }
 485  
 
 486  8
         return (myQueue.length - take) + ready;
 487  
     }
 488  
 
 489  
     /**
 490  
      * Returns the next message from the queue and will block waiting for a
 491  
      * message.
 492  
      * 
 493  
      * @param copyOut
 494  
      *            The {@link PendingMessage} to copy the pending message into.
 495  
      * @throws InterruptedException
 496  
      *             If the thread is interrupted while waiting for the message.
 497  
      */
 498  
     public void take(final PendingMessage copyOut) throws InterruptedException {
 499  477875
         if (!poll(copyOut)) {
 500  
 
 501  
             // Spin/yeild loop.
 502  87099
             if (myLockType == LockType.LOW_LATENCY_SPIN) {
 503  59623
                 long now = 0;
 504  59623
                 long spinDeadline = 1;
 505  59623
                 long yeildDeadline = 1;
 506  86217
                 while (now < yeildDeadline) {
 507  344454520
                     for (int i = 0; i < SPIN_ITERATIONS; ++i) {
 508  344427926
                         if (poll(copyOut)) {
 509  58587
                             return;
 510  
                         }
 511  
                     }
 512  
 
 513  
                     // Pause?
 514  26594
                     now = System.nanoTime();
 515  26594
                     if (spinDeadline == 1) {
 516  10009
                         spinDeadline = now + SPIN_TIME_NS;
 517  10009
                         yeildDeadline = now + YIELD_TIME_NS;
 518  
                         // First time free pass.
 519  
                     }
 520  
                     else {
 521  16585
                         if ((spinDeadline < now) && (now < yeildDeadline)) {
 522  15549
                             Thread.yield();
 523  
                         }
 524  
                     }
 525  
                 }
 526  
             }
 527  
 
 528  
             // Block.
 529  
             try {
 530  28512
                 myWaiting.incrementAndGet();
 531  28512
                 myLock.lock();
 532  
 
 533  52998
                 while (!poll(copyOut)) {
 534  24533
                     myCondition.await();
 535  
                 }
 536  
             }
 537  
             finally {
 538  28512
                 myLock.unlock();
 539  28512
                 myWaiting.decrementAndGet();
 540  28465
             }
 541  
         }
 542  419241
     }
 543  
 
 544  
     /**
 545  
      * Increments the index handling roll-over.
 546  
      * 
 547  
      * @param index
 548  
      *            The value to increment.
 549  
      * @return The incremented value.
 550  
      */
 551  
     protected int increment(final int index) {
 552  1571997
         return ((index + 1) & myMask);
 553  
     }
 554  
 
 555  
     /**
 556  
      * Marks the position as ready by incrementing the ready position to the
 557  
      * provided position. This method uses a spin lock assuming any other
 558  
      * threads will increment the ready position quickly to the position just
 559  
      * before {@code index}.
 560  
      * 
 561  
      * @param index
 562  
      *            The index of the ready message.
 563  
      */
 564  
     protected void markReady(final int index) {
 565  486227
         final int after = increment(index);
 566  
 
 567  488498
         while (!myReadyBeforePosition.compareAndSet(index, after)) {
 568  
             // Spinning here slows things down because we know that the other
 569  
             // thread should be runnable. Always Yield.
 570  2470
             Thread.yield();
 571  
         }
 572  
 
 573  
         // Pull take position into the queue.
 574  486231
         if ((index == 0) && (myTakePosition == -1)) {
 575  174
             myTakePosition = index;
 576  
         }
 577  
 
 578  486232
         notifyWaiters(false);
 579  486223
     }
 580  
 
 581  
     /**
 582  
      * Marks the position and the next position as ready by incrementing the
 583  
      * ready position to the provided position + 1. This method uses a spin lock
 584  
      * assuming any other threads will increment the ready position quickly to
 585  
      * the position just before {@code index}.
 586  
      * 
 587  
      * @param index
 588  
      *            The index of the ready message.
 589  
      */
 590  
     protected void markReady2(final int index) {
 591  3071
         final int after = increment(index);
 592  3071
         final int twoAfter = increment(after);
 593  
 
 594  3071
         while (!myReadyBeforePosition.compareAndSet(index, twoAfter)) {
 595  
             // Just keep swimming...
 596  0
             Thread.yield();
 597  
         }
 598  
 
 599  
         // Pull take position into the queue.
 600  3071
         if ((index == 0) && (myTakePosition == -1)) {
 601  3
             myTakePosition = index;
 602  
         }
 603  
 
 604  
         // If someone is waiting let them know we created two messages.
 605  3071
         notifyWaiters(true);
 606  3071
     }
 607  
 
 608  
     /**
 609  
      * Notifies the waiting threads that the state of the queue has changed.
 610  
      * 
 611  
      * @param all
 612  
      *            If true then all threads will be woken. Otherwise only a
 613  
      *            single thread is woken.
 614  
      */
 615  
     protected void notifyWaiters(final boolean all) {
 616  966117
         if (myWaiting.get() > 0) {
 617  
             try {
 618  523124
                 myLock.lock();
 619  523382
                 if (all) {
 620  1
                     myCondition.signalAll();
 621  
                 }
 622  
                 else {
 623  523381
                     myCondition.signal();
 624  
                 }
 625  
             }
 626  
             finally {
 627  523382
                 myLock.unlock();
 628  523283
             }
 629  
         }
 630  967782
     }
 631  
 
 632  
     /**
 633  
      * Checks if there is remove for another message. If so returns the index of
 634  
      * the message to update. If not return a value less then zero.
 635  
      * 
 636  
      * @return The position of the message that can be updated or a value of
 637  
      *         less than zero if the queue is full.
 638  
      */
 639  
     protected int offer() {
 640  684936
         int result = -1;
 641  684997
         final int reserve = myReservePosition.get();
 642  685368
         final int next = increment(reserve);
 643  685428
         if ((myTakePosition != next) /* Full? */
 644  
                 && myReservePosition.compareAndSet(reserve, next)) {
 645  
 
 646  
             // Got a slot.
 647  486280
             result = reserve;
 648  
 
 649  
             // Check if we looped.
 650  486282
             if (next < reserve) {
 651  1754
                 myLooped.incrementAndGet();
 652  
             }
 653  
         }
 654  685680
         return result;
 655  
     }
 656  
 
 657  
     /**
 658  
      * Checks if there is remove for another two message. If so returns the
 659  
      * index of the first message to update. If not return a value less then
 660  
      * zero.
 661  
      * 
 662  
      * @return The position of the first message that can be updated or a value
 663  
      *         of less than zero if the queue is full.
 664  
      */
 665  
     protected int offer2() {
 666  3075
         int result = -1;
 667  3075
         final int reserve = myReservePosition.get();
 668  3075
         final int first = increment(reserve);
 669  3075
         final int second = increment(first);
 670  3075
         final int take = myTakePosition;
 671  3075
         if ((take != first) && (take != second) /* Full? */
 672  
                 && myReservePosition.compareAndSet(reserve, second)) {
 673  
 
 674  
             // Got two slots. Return the first.
 675  3071
             result = reserve;
 676  
 
 677  
             // Check if we looped.
 678  3071
             if (second < reserve) {
 679  4
                 myLooped.incrementAndGet();
 680  
             }
 681  
         }
 682  3075
         return result;
 683  
     }
 684  
 
 685  
     /**
 686  
      * Computes a new message id based on the current loop and reserve spot in
 687  
      * the queue.
 688  
      * 
 689  
      * @param loop
 690  
      *            The number of time the queue has looped over the queue.
 691  
      * @param reserve
 692  
      *            The reserved position in the queue. This can be a virtual
 693  
      *            postion.
 694  
      * @return The message id to use.
 695  
      */
 696  
     private int toMessageId(final int loop, final long reserve) {
 697  30726
         final long loopOffset = (((long) loop) * myQueue.length);
 698  30726
         if (loopOffset > MAX_MESSAGE_ID_MASK) {
 699  0
             myLooped.compareAndSet(loop, 0);
 700  
         }
 701  
         // Add an extra 1 so the first value is 1 instead of zero.
 702  30726
         return (int) ((loopOffset + reserve) & MAX_MESSAGE_ID_MASK) + 1;
 703  
     }
 704  
 }