Coverage Report - com.allanbank.mongodb.client.connection.socket.Sequence
 
Classes in this File Line Coverage Branch Coverage Complexity
Sequence
100%
68/68
100%
28/28
2.091
 
 1  
 /*
 2  
  * #%L
 3  
  * Sequence.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.socket;
 22  
 
 23  
 import java.util.SortedMap;
 24  
 import java.util.TreeMap;
 25  
 import java.util.concurrent.atomic.AtomicInteger;
 26  
 import java.util.concurrent.atomic.AtomicLongArray;
 27  
 import java.util.concurrent.locks.Condition;
 28  
 import java.util.concurrent.locks.Lock;
 29  
 import java.util.concurrent.locks.ReentrantLock;
 30  
 
 31  
 import com.allanbank.mongodb.LockType;
 32  
 import com.allanbank.mongodb.client.message.PendingMessageQueue;
 33  
 
 34  
 /**
 35  
  * Sequence provides the ability to synchronize the access to the socket's
 36  
  * output stream. The thread starts by reserving a position via the reserve
 37  
  * method and then prepares to send the messages. It will then wait for its turn
 38  
  * to send and finally release the sequence.
 39  
  * <p>
 40  
  * We use an array of longs to avoid false sharing.
 41  
  * </p>
 42  
  * 
 43  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 44  
  *         mutated in incompatible ways between any two releases of the driver.
 45  
  * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 46  
  */
 47  
 /* package */class Sequence {
 48  
 
 49  
     /** The offset of the release value. */
 50  
     private static final int RELEASE_OFFSET = 15 + 7;
 51  
 
 52  
     /** The offset of the reserve value. */
 53  
     private static final int RESERVE_OFFSET = 7;
 54  
 
 55  
     /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
 56  1
     private static final long YIELD_TIME_NS = PendingMessageQueue.YIELD_TIME_NS;
 57  
 
 58  
     /** The condition used when there are waiters. */
 59  
     private final Condition myCondition;
 60  
 
 61  
     /** The mutex used with the sequence. */
 62  
     private final Lock myLock;
 63  
 
 64  
     /** The lock type to use with the sequence. */
 65  
     private final LockType myLockType;
 66  
 
 67  
     /** The atomic array of long values. */
 68  145
     private final AtomicLongArray myPaddedValue = new AtomicLongArray(30);
 69  
 
 70  
     /**
 71  
      * The map of waiters and the condition they are waiting on to avoid the
 72  
      * thundering herd. Only access while holding the {@link #myLock lock}.
 73  
      */
 74  
     private final SortedMap<Long, Condition> myWaiters;
 75  
 
 76  
     /** Tracks how many threads are waiting for a message or a space to open. */
 77  
     private final AtomicInteger myWaiting;
 78  
 
 79  
     /**
 80  
      * Create a sequence with a specified initial value.
 81  
      * 
 82  
      * @param initialValue
 83  
      *            The initial value for this sequence.
 84  
      */
 85  
     public Sequence(final long initialValue) {
 86  6
         this(initialValue, LockType.MUTEX);
 87  6
     }
 88  
 
 89  
     /**
 90  
      * Create a sequence with a specified initial value.
 91  
      * 
 92  
      * @param initialValue
 93  
      *            The initial value for this sequence.
 94  
      * @param lockType
 95  
      *            The lock type to use with the sequence.
 96  
      */
 97  145
     public Sequence(final long initialValue, final LockType lockType) {
 98  145
         myPaddedValue.set(RESERVE_OFFSET, initialValue);
 99  145
         myPaddedValue.set(RELEASE_OFFSET, initialValue);
 100  
 
 101  145
         myLockType = lockType;
 102  
 
 103  145
         myLock = new ReentrantLock(true);
 104  145
         myCondition = myLock.newCondition();
 105  145
         myWaiting = new AtomicInteger(0);
 106  145
         myWaiters = new TreeMap<Long, Condition>();
 107  145
     }
 108  
 
 109  
     /**
 110  
      * Returns an estimate of the number of waiters.
 111  
      * 
 112  
      * @return The waiters.
 113  
      */
 114  
     public int getWaitersCount() {
 115  3
         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
 116  3
         final long release = myPaddedValue.get(RELEASE_OFFSET);
 117  
 
 118  3
         return (int) (release - reserve);
 119  
     }
 120  
 
 121  
     /**
 122  
      * Returns true if the sequence is idle (reserve == release).
 123  
      * 
 124  
      * @return True if the sequence is idle.
 125  
      */
 126  
     public boolean isIdle() {
 127  20
         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
 128  20
         final long release = myPaddedValue.get(RELEASE_OFFSET);
 129  20
         return (reserve == release);
 130  
     }
 131  
 
 132  
     /**
 133  
      * Checks if there is a waiter for the sequence to be released.
 134  
      * 
 135  
      * @param expectedReserve
 136  
      *            The expected value for the reserve if there is no waiter.
 137  
      * @return True if there is a waiter (e.g., the reserve has advanced).
 138  
      */
 139  
     public boolean noWaiter(final long expectedReserve) {
 140  297
         final long reserve = myPaddedValue.get(RESERVE_OFFSET);
 141  
 
 142  297
         return (reserve == expectedReserve);
 143  
     }
 144  
 
 145  
     /**
 146  
      * Release the position in the sequence.
 147  
      * 
 148  
      * @param expectedValue
 149  
      *            The expected/reserved value for the sequence.
 150  
      * @param newValue
 151  
      *            The new value for the sequence.
 152  
      */
 153  
     public void release(final long expectedValue, final long newValue) {
 154  3669765
         while (!compareAndSetRelease(expectedValue, newValue)) {
 155  
             // Let another thread make progress - should not really spin if we
 156  
             // did a waitFor.
 157  4121865
             Thread.yield();
 158  
         }
 159  21796
         notifyWaiters();
 160  21792
     }
 161  
 
 162  
     /**
 163  
      * Reserves a spot in the sequence for the messages to be sent.
 164  
      * 
 165  
      * @param numberOfMessages
 166  
      *            The number of messages to be sent.
 167  
      * @return The current value of the sequence.
 168  
      */
 169  
     public long reserve(final long numberOfMessages) {
 170  
         long current;
 171  
         long next;
 172  
 
 173  
         do {
 174  21807
             current = myPaddedValue.get(RESERVE_OFFSET);
 175  21808
             next = current + numberOfMessages;
 176  
         }
 177  21808
         while (!compareAndSetReserve(current, next));
 178  
 
 179  21797
         return current;
 180  
     }
 181  
 
 182  
     /**
 183  
      * Waits for the reserved sequence to be released.
 184  
      * 
 185  
      * @param wanted
 186  
      *            The sequence to wait to be released.
 187  
      */
 188  
     public void waitFor(final long wanted) {
 189  21746
         long releaseValue = myPaddedValue.get(RELEASE_OFFSET);
 190  42615
         while (releaseValue != wanted) {
 191  20900
             if (myLockType == LockType.LOW_LATENCY_SPIN) {
 192  10390
                 long now = System.nanoTime();
 193  10391
                 final long yeildDeadline = now + YIELD_TIME_NS;
 194  
 
 195  10391
                 releaseValue = myPaddedValue.get(RELEASE_OFFSET);
 196  4693377
                 while ((now < yeildDeadline) && (releaseValue != wanted)) {
 197  
                     // Let another thread make progress.
 198  4774639
                     Thread.yield();
 199  4610042
                     now = System.nanoTime();
 200  4628857
                     releaseValue = myPaddedValue.get(RELEASE_OFFSET);
 201  
                 }
 202  
             }
 203  
 
 204  
             // Block.
 205  20900
             if (releaseValue != wanted) {
 206  20506
                 final Long key = Long.valueOf(wanted);
 207  20501
                 Condition localCondition = myCondition;
 208  
                 try {
 209  20503
                     final int waitCount = myWaiting.incrementAndGet();
 210  20506
                     myLock.lock();
 211  
 
 212  
                     // Second tier try for FindBugs to see the unlock.
 213  
                     try {
 214  
                         // Check for more than 1 waiter. If so stand in line via
 215  
                         // the waiters map. (This will wake threads in the order
 216  
                         // they should be processed.)
 217  20506
                         if (waitCount > 1) {
 218  20501
                             localCondition = myLock.newCondition();
 219  20501
                             myWaiters.put(key, localCondition);
 220  
                         }
 221  
 
 222  20506
                         releaseValue = myPaddedValue.get(RELEASE_OFFSET);
 223  40552
                         while (releaseValue != wanted) {
 224  20046
                             localCondition.awaitUninterruptibly();
 225  20046
                             releaseValue = myPaddedValue.get(RELEASE_OFFSET);
 226  
                         }
 227  
                     }
 228  
                     finally {
 229  20506
                         if (localCondition != myCondition) {
 230  20501
                             myWaiters.remove(key);
 231  
                         }
 232  
                     }
 233  
                 }
 234  
                 finally {
 235  20506
                     myLock.unlock();
 236  20506
                     myWaiting.decrementAndGet();
 237  20506
                 }
 238  20506
             }
 239  
         }
 240  21747
     }
 241  
 
 242  
     /**
 243  
      * Perform a compare and set operation on the sequence release position.
 244  
      * 
 245  
      * @param expectedValue
 246  
      *            The expected current value.
 247  
      * @param newValue
 248  
      *            The value to update to.
 249  
      * @return true if the operation succeeds, false otherwise.
 250  
      */
 251  
     private boolean compareAndSetRelease(final long expectedValue,
 252  
             final long newValue) {
 253  3752329
         return myPaddedValue.compareAndSet(RELEASE_OFFSET, expectedValue,
 254  
                 newValue);
 255  
     }
 256  
 
 257  
     /**
 258  
      * Perform a compare and set operation on the sequence reserve position.
 259  
      * 
 260  
      * @param expectedValue
 261  
      *            The expected current value.
 262  
      * @param newValue
 263  
      *            The value to update to.
 264  
      * @return true if the operation succeeds, false otherwise.
 265  
      */
 266  
     private boolean compareAndSetReserve(final long expectedValue,
 267  
             final long newValue) {
 268  21808
         return myPaddedValue.compareAndSet(RESERVE_OFFSET, expectedValue,
 269  
                 newValue);
 270  
     }
 271  
 
 272  
     /**
 273  
      * Notifies the waiting threads that the state of the sequence has changed.
 274  
      */
 275  
     private void notifyWaiters() {
 276  21793
         if (myWaiting.get() > 0) {
 277  
             try {
 278  20543
                 myLock.lock();
 279  
 
 280  
                 // Wake the reused condition.
 281  20544
                 myCondition.signalAll();
 282  
 
 283  
                 // Wake up the condition with the lowest wanted.
 284  
                 // No Thundering Herd!
 285  20544
                 if (!myWaiters.isEmpty()) {
 286  20073
                     myWaiters.get(myWaiters.firstKey()).signalAll();
 287  
                 }
 288  
             }
 289  
             finally {
 290  20544
                 myLock.unlock();
 291  20544
             }
 292  
         }
 293  21795
     }
 294  
 }