Coverage Report - com.allanbank.mongodb.client.FutureCallback
 
Classes in this File Line Coverage Branch Coverage Complexity
FutureCallback
98%
78/79
92%
37/40
2.8
FutureCallback$PendingListener
100%
5/5
N/A
2.8
FutureCallback$Sync
88%
32/36
90%
19/21
2.8
 
 1  
 /*
 2  
  * #%L
 3  
  * FutureCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client;
 21  
 
 22  
 import java.util.concurrent.CancellationException;
 23  
 import java.util.concurrent.ExecutionException;
 24  
 import java.util.concurrent.Executor;
 25  
 import java.util.concurrent.Future;
 26  
 import java.util.concurrent.TimeUnit;
 27  
 import java.util.concurrent.TimeoutException;
 28  
 import java.util.concurrent.atomic.AtomicReference;
 29  
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 30  
 
 31  
 import com.allanbank.mongodb.Callback;
 32  
 import com.allanbank.mongodb.ListenableFuture;
 33  
 import com.allanbank.mongodb.LockType;
 34  
 import com.allanbank.mongodb.client.callback.ReplyHandler;
 35  
 import com.allanbank.mongodb.util.Assertions;
 36  
 import com.allanbank.mongodb.util.log.Log;
 37  
 import com.allanbank.mongodb.util.log.LogFactory;
 38  
 
 39  
 /**
 40  
  * Implementation of a {@link Callback} and {@link ListenableFuture} interfaces.
 41  
  * Used to convert a {@link Callback} into a {@link ListenableFuture} for
 42  
  * returning to callers.
 43  
  * 
 44  
  * @param <V>
 45  
  *            The type for the set value.
 46  
  * 
 47  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 48  
  *         mutated in incompatible ways between any two releases of the driver.
 49  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 50  
  */
 51  
 public class FutureCallback<V> implements ListenableFuture<V>, Callback<V> {
 52  
 
 53  
     /** Logger to log exceptions caught when running myPendingListeners. */
 54  1
     public static final Log LOG = LogFactory.getLog(FutureCallback.class);
 55  
 
 56  
     /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
 57  1
     public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
 58  
 
 59  
     /** Number of times to spin before trying something different. */
 60  
     private static final int SPIN_ITERATIONS = 10000;
 61  
 
 62  
     /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
 63  1
     private static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
 64  
 
 65  
     /** The type of lock to use when waiting for the future to be fulfilled. */
 66  
     private final LockType myLockType;
 67  
 
 68  
     /** The runnable, executor pairs to execute within a singly linked list. */
 69  
     private AtomicReference<PendingListener> myPendingListeners;
 70  
 
 71  
     /** Synchronization control for this Future. */
 72  
     private final Sync<V> mySync;
 73  
 
 74  
     /**
 75  
      * Create a new FutureCallback.
 76  
      */
 77  
     public FutureCallback() {
 78  1001011
         this(LockType.MUTEX);
 79  1001011
     }
 80  
 
 81  
     /**
 82  
      * Create a new FutureCallback.
 83  
      * 
 84  
      * @param lockType
 85  
      *            The type of lock to use when waiting for the future to be
 86  
      *            fulfilled.
 87  
      */
 88  1001196
     public FutureCallback(final LockType lockType) {
 89  1001196
         mySync = new Sync<V>();
 90  1001196
         myLockType = lockType;
 91  1001196
         myPendingListeners = new AtomicReference<PendingListener>(null);
 92  1001196
     }
 93  
 
 94  
     /**
 95  
      * {@inheritDoc}
 96  
      */
 97  
     @Override
 98  
     public void addListener(final Runnable runnable, final Executor executor) {
 99  102
         Assertions.assertNotNull(runnable, "Runnable is null.");
 100  101
         Assertions.assertNotNull(executor, "Executor is null.");
 101  
 
 102  102
         if (!isDone()) {
 103  27
             PendingListener existing = myPendingListeners.get();
 104  27
             PendingListener listener = new PendingListener(runnable, executor,
 105  
                     existing);
 106  
 
 107  37
             while (!myPendingListeners.compareAndSet(existing, listener)) {
 108  9
                 existing = myPendingListeners.get();
 109  9
                 listener = new PendingListener(runnable, executor, existing);
 110  
             }
 111  
 
 112  28
             if (isDone()) {
 113  0
                 execute();
 114  
             }
 115  28
         }
 116  
         else {
 117  
             // Run the executor.
 118  75
             execute(executor, runnable);
 119  
         }
 120  99
     }
 121  
 
 122  
     /**
 123  
      * {@inheritDoc}
 124  
      * <p>
 125  
      * Sets the value for the future and triggers any pending {@link #get} to
 126  
      * return.
 127  
      * </p>
 128  
      * 
 129  
      * @see Callback#callback
 130  
      */
 131  
     @Override
 132  
     public void callback(final V result) {
 133  1001035
         final boolean set = mySync.set(result);
 134  1001035
         if (set) {
 135  1001034
             execute();
 136  
         }
 137  1001035
     }
 138  
 
 139  
     /**
 140  
      * {@inheritDoc}
 141  
      * <p>
 142  
      * If not cancelled and the callback has not completed then cancels the
 143  
      * future, triggers the return of any pending {@link #get()} and returns
 144  
      * true. Otherwise returns false. This does not stop the related MongoDB
 145  
      * invocation.
 146  
      * </p>
 147  
      * 
 148  
      * @see Future#cancel(boolean)
 149  
      */
 150  
     @Override
 151  
     public boolean cancel(final boolean mayInterruptIfRunning) {
 152  103
         if (!mySync.cancel(mayInterruptIfRunning)) {
 153  101
             return false;
 154  
         }
 155  4
         execute();
 156  
 
 157  4
         return true;
 158  
     }
 159  
 
 160  
     /**
 161  
      * {@inheritDoc}
 162  
      * <p>
 163  
      * Sets the exception for the future and triggers any pending {@link #get}
 164  
      * to throw a {@link ExecutionException}.
 165  
      * </p>
 166  
      * 
 167  
      * @see Callback#exception
 168  
      */
 169  
     @Override
 170  
     public void exception(final Throwable thrown) {
 171  63
         Assertions.assertNotNull(thrown, "Cannot set a null exception.");
 172  
 
 173  63
         final boolean set = mySync.setException(thrown);
 174  63
         if (set) {
 175  61
             execute();
 176  
         }
 177  63
     }
 178  
 
 179  
     /**
 180  
      * {@inheritDoc}
 181  
      * <p>
 182  
      * Returns the value set via the {@link Callback}.
 183  
      * </p>
 184  
      * 
 185  
      * @see Future#get()
 186  
      */
 187  
     @Override
 188  
     public V get() throws InterruptedException, ExecutionException {
 189  
 
 190  1000846
         if (myLockType == LockType.LOW_LATENCY_SPIN) {
 191  2
             long now = 0;
 192  2
             long spinDeadline = 1;
 193  2
             long yeildDeadline = 1;
 194  7
             while ((now < yeildDeadline) && !isDone()) {
 195  5
                 for (int i = 0; (i < SPIN_ITERATIONS) && !isDone(); ++i) {
 196  
                     // Hard spin...
 197  
                 }
 198  
 
 199  5
                 if (!isDone()) {
 200  
                     // Pause?
 201  4
                     now = System.nanoTime();
 202  4
                     if (spinDeadline == 1) {
 203  2
                         spinDeadline = now + SPIN_TIME_NS;
 204  2
                         yeildDeadline = now + YIELD_TIME_NS;
 205  
                         // First time yield to allow other threads to do their
 206  
                         // work...
 207  2
                         Thread.yield();
 208  
                     }
 209  
                     else {
 210  2
                         if ((spinDeadline < now) && (now < yeildDeadline)) {
 211  1
                             Thread.yield();
 212  
                         }
 213  
                     }
 214  
                 }
 215  
             }
 216  
         }
 217  
 
 218  1000846
         final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
 219  
         while (true) {
 220  
             try {
 221  
                 // Either the value is available and the get() will not block
 222  
                 // or we have spun for long enough and it is time to block.
 223  1000855
                 return mySync.get(shortPause);
 224  
             }
 225  9
             catch (final TimeoutException te) {
 226  9
                 ReplyHandler.tryReceive();
 227  9
             }
 228  
         }
 229  
     }
 230  
 
 231  
     /**
 232  
      * {@inheritDoc}
 233  
      */
 234  
     @Override
 235  
     public V get(final long timeout, final TimeUnit unit)
 236  
             throws InterruptedException, TimeoutException, ExecutionException {
 237  116
         long now = System.nanoTime();
 238  116
         final long deadline = now + unit.toNanos(timeout);
 239  116
         final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
 240  
         while (true) {
 241  
             try {
 242  
                 // Wait for the result.
 243  266
                 return mySync.get(Math.min((deadline - now), shortPause));
 244  
             }
 245  153
             catch (final TimeoutException te) {
 246  
                 // Check if we should receive.
 247  153
                 now = System.nanoTime();
 248  153
                 if (now < deadline) {
 249  150
                     ReplyHandler.tryReceive();
 250  
                 }
 251  
                 else {
 252  3
                     throw te;
 253  
                 }
 254  150
             }
 255  
         }
 256  
     }
 257  
 
 258  
     /**
 259  
      * {@inheritDoc}
 260  
      * <p>
 261  
      * Returns true if {@link #cancel(boolean)} has been called.
 262  
      * </p>
 263  
      * 
 264  
      * @see Future#isCancelled()
 265  
      */
 266  
     @Override
 267  
     public boolean isCancelled() {
 268  5
         return mySync.isCancelled();
 269  
     }
 270  
 
 271  
     /**
 272  
      * {@inheritDoc}
 273  
      * <p>
 274  
      * True if a value has been set via the the {@link Callback} interface or
 275  
      * the {@link Future} has been {@link #cancel(boolean) cancelled}.
 276  
      * </p>
 277  
      * 
 278  
      * @see Future#isDone()
 279  
      */
 280  
     @Override
 281  
     public boolean isDone() {
 282  49432
         return mySync.isDone();
 283  
     }
 284  
 
 285  
     /**
 286  
      * Runs this execution list, executing all existing pairs.
 287  
      * <p>
 288  
      * All callers of this method will drain the list of listeners.
 289  
      * </p>
 290  
      */
 291  
     protected void execute() {
 292  
         PendingListener toRun;
 293  
         PendingListener next;
 294  
 
 295  
         // Keep running until the list is exhausted.
 296  
         do {
 297  
 
 298  
             // Pick the next item from the list.
 299  
             do {
 300  1001127
                 toRun = myPendingListeners.get();
 301  1001127
                 next = (toRun != null) ? toRun.myNext : null;
 302  
             }
 303  1001127
             while (!myPendingListeners.compareAndSet(toRun, next));
 304  
 
 305  
             // Run this item - if it exists.
 306  1001127
             if (toRun != null) {
 307  28
                 execute(toRun.myExecutor, toRun.myRunnable);
 308  
             }
 309  
         }
 310  1001127
         while (toRun != null);
 311  1001099
     }
 312  
 
 313  
     /**
 314  
      * Execute the {@link Runnable} with the {@link Executor} suppressing
 315  
      * exceptions.
 316  
      * 
 317  
      * @param executor
 318  
      *            The executor to use.
 319  
      * @param runnable
 320  
      *            The {@link Runnable} to execute.
 321  
      */
 322  
     private void execute(final Executor executor, final Runnable runnable) {
 323  
         try {
 324  103
             executor.execute(runnable);
 325  
         }
 326  1
         catch (final RuntimeException e) {
 327  1
             LOG.error(e, "Exception running a FutureListener's runnable {} "
 328  
                     + "with executor {}", runnable, executor);
 329  96
         }
 330  99
     }
 331  
 
 332  
     /**
 333  
      * PendingListener an immutable element in the list of listeners.
 334  
      * 
 335  
      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 336  
      */
 337  
     /* package */static final class PendingListener {
 338  
 
 339  
         /** The executor to use to run the {@link Runnable}. */
 340  
         /* package */final Executor myExecutor;
 341  
 
 342  
         /** The next item to execute. */
 343  
         /* package */final PendingListener myNext;
 344  
 
 345  
         /** The listener's {@link Runnable}. */
 346  
         /* package */final Runnable myRunnable;
 347  
 
 348  
         /**
 349  
          * Creates a new PendingListener.
 350  
          * 
 351  
          * @param runnable
 352  
          *            The listener's {@link Runnable}.
 353  
          * @param executor
 354  
          *            The executor to use to run the {@link Runnable}.
 355  
          * @param next
 356  
          *            The next item to execute.
 357  
          */
 358  
         /* package */PendingListener(final Runnable runnable,
 359  37
                 final Executor executor, final PendingListener next) {
 360  37
             myRunnable = runnable;
 361  37
             myExecutor = executor;
 362  37
             myNext = next;
 363  37
         }
 364  
     }
 365  
 
 366  
     /**
 367  
      * Tracks the state of the Future via the {@link AbstractQueuedSynchronizer}
 368  
      * model. The state starts in the {@link #RUNNING} state. The first thread
 369  
      * to complete the future moves the state to the {@link #COMPLETING} state,
 370  
      * sets the value and then sets the appropriate final state.
 371  
      * 
 372  
      * @param <V>
 373  
      *            The type of value for the future.
 374  
      */
 375  
     /* package */static final class Sync<V> extends AbstractQueuedSynchronizer {
 376  
 
 377  
         /** State to represent the future was canceled. */
 378  
         /* package */static final int CANCELED = 4;
 379  
 
 380  
         /** State to represent the value has been set. */
 381  
         /* package */static final int COMPLETED = 2;
 382  
 
 383  
         /** State set while the value is being set. */
 384  
         /* package */static final int COMPLETING = 1;
 385  
 
 386  
         /** State to represent the future was interrupted. */
 387  
         /* package */static final int INTERRUPTED = 8;
 388  
 
 389  
         /** The initial running state. */
 390  
         /* package */static final int RUNNING = 0;
 391  
 
 392  
         /** The unused value passed to {@link #acquire(int)} methods. */
 393  
         /* package */static final int UNUSED = -1;
 394  
 
 395  
         /** Serialization version of the class. */
 396  
         private static final long serialVersionUID = -9189950787072982459L;
 397  
 
 398  
         /** The exception for the future. */
 399  
         private Throwable myException;
 400  
 
 401  
         /** The value set in the future. */
 402  
         private V myValue;
 403  
 
 404  
         /**
 405  
          * Creates a new Sync.
 406  
          */
 407  1001196
         /* package */Sync() {
 408  1001196
             myValue = null;
 409  1001196
             myException = null;
 410  1001196
         }
 411  
 
 412  
         /**
 413  
          * Acquisition succeeds if we are done, otherwise fail.
 414  
          */
 415  
         @Override
 416  
         protected int tryAcquireShared(final int ignored) {
 417  1001695
             if (isDone()) {
 418  1000948
                 return 1;
 419  
             }
 420  747
             return -1;
 421  
         }
 422  
 
 423  
         /**
 424  
          * We always allow a release to finish. We define it to represent that a
 425  
          * state transition completed.
 426  
          */
 427  
         @Override
 428  
         protected boolean tryReleaseShared(final int finalState) {
 429  1001099
             setState(finalState);
 430  1001099
             return true;
 431  
         }
 432  
 
 433  
         /**
 434  
          * Move to the CANCELED or INTERRUPTED state.
 435  
          * 
 436  
          * @param interrupt
 437  
          *            If we are interrupted.
 438  
          * @return If the cancel worked / won.
 439  
          */
 440  
         /* package */boolean cancel(final boolean interrupt) {
 441  105
             return complete(null, null, interrupt ? INTERRUPTED : CANCELED);
 442  
         }
 443  
 
 444  
         /**
 445  
          * Blocks until the future {@link #complete(Object, Throwable, int)
 446  
          * completes}.
 447  
          * 
 448  
          * @return The value set for the future.
 449  
          * @throws CancellationException
 450  
          *             If the future was canceled.
 451  
          * @throws ExecutionException
 452  
          *             If the future failed due to an exception.
 453  
          * @throws InterruptedException
 454  
          *             If these call is interrupted.
 455  
          */
 456  
         /* package */V get() throws CancellationException, ExecutionException,
 457  
                 InterruptedException {
 458  
 
 459  
             // Acquire the shared lock allowing interruption.
 460  0
             acquireSharedInterruptibly(UNUSED);
 461  
 
 462  0
             return getValue();
 463  
         }
 464  
 
 465  
         /**
 466  
          * Blocks until the future {@link #complete(Object, Throwable, int)
 467  
          * completes} or the number of nano-seconds expires.
 468  
          * 
 469  
          * @param nanos
 470  
          *            The number of nano-seconds to wait.
 471  
          * @return The value set for the future.
 472  
          * @throws TimeoutException
 473  
          *             If this call time expires.
 474  
          * @throws CancellationException
 475  
          *             If the future was canceled.
 476  
          * @throws ExecutionException
 477  
          *             If the future failed due to an exception.
 478  
          * @throws InterruptedException
 479  
          *             If these call is interrupted.
 480  
          */
 481  
         /* package */V get(final long nanos) throws TimeoutException,
 482  
                 CancellationException, ExecutionException, InterruptedException {
 483  
 
 484  
             // Attempt to acquire the shared lock with a timeout.
 485  1001121
             if (!tryAcquireSharedNanos(UNUSED, nanos)) {
 486  162
                 throw new TimeoutException("Timeout waiting for task.");
 487  
             }
 488  
 
 489  1000948
             return getValue();
 490  
         }
 491  
 
 492  
         /**
 493  
          * Checks if the state is {@link #CANCELED} or {@link #INTERRUPTED}.
 494  
          * 
 495  
          * @return True if the future state is {@link #CANCELED} or
 496  
          *         {@link #INTERRUPTED}.
 497  
          */
 498  
         /* package */boolean isCancelled() {
 499  5
             return (getState() & (CANCELED | INTERRUPTED)) != 0;
 500  
         }
 501  
 
 502  
         /**
 503  
          * Checks if the state is {@link #COMPLETED}, {@link #CANCELED} or
 504  
          * {@link #INTERRUPTED}.
 505  
          * 
 506  
          * @return True if the future state is {@link #COMPLETED},
 507  
          *         {@link #CANCELED} or {@link #INTERRUPTED}.
 508  
          */
 509  
         /* package */boolean isDone() {
 510  1051128
             return (getState() & (COMPLETED | CANCELED | INTERRUPTED)) != 0;
 511  
         }
 512  
 
 513  
         /**
 514  
          * Move to the COMPLETED state and set the value.
 515  
          * 
 516  
          * @param value
 517  
          *            The value to set.
 518  
          * @return If the set worked / won.
 519  
          */
 520  
         /* package */boolean set(final V value) {
 521  1001035
             return complete(value, null, COMPLETED);
 522  
         }
 523  
 
 524  
         /**
 525  
          * Move to the COMPLETED state and set the exception value.
 526  
          * 
 527  
          * @param thrown
 528  
          *            The exception to set.
 529  
          * @return If the set worked / won.
 530  
          */
 531  
         /* package */boolean setException(final Throwable thrown) {
 532  63
             return complete(null, thrown, COMPLETED);
 533  
         }
 534  
 
 535  
         /**
 536  
          * Completes the future.
 537  
          * 
 538  
          * @param value
 539  
          *            The value to set as the result of the future.
 540  
          * @param thrown
 541  
          *            the exception to set as the result of the future.
 542  
          * @param finalState
 543  
          *            the state to transition to.
 544  
          * @return Returns true if the completion was successful / won.
 545  
          */
 546  
         private boolean complete(final V value, final Throwable thrown,
 547  
                 final int finalState) {
 548  
 
 549  
             // Move to COMPLETING to see if we are the first to complete.
 550  1001203
             final boolean won = compareAndSetState(RUNNING, COMPLETING);
 551  1001203
             if (won) {
 552  1001099
                 this.myValue = value;
 553  1001099
                 this.myException = ((finalState & (CANCELED | INTERRUPTED)) != 0) ? new CancellationException(
 554  
                         "Future was canceled.") : thrown;
 555  
 
 556  
                 // Release all of the waiting threads.
 557  1001099
                 releaseShared(finalState);
 558  
             }
 559  104
             else if (getState() == COMPLETING) {
 560  
                 // Block until done.
 561  0
                 acquireShared(UNUSED);
 562  
             }
 563  
 
 564  1001202
             return won;
 565  
         }
 566  
 
 567  
         /**
 568  
          * Implementation to get the future's value.
 569  
          * 
 570  
          * @return The value set for the future.
 571  
          * @throws CancellationException
 572  
          *             If the future was canceled.
 573  
          * @throws ExecutionException
 574  
          *             If the future failed due to an exception.
 575  
          */
 576  
         private V getValue() throws CancellationException, ExecutionException {
 577  1000948
             final int state = getState();
 578  1000948
             switch (state) {
 579  
             case COMPLETED:
 580  1000944
                 if (myException != null) {
 581  35
                     throw new ExecutionException(myException);
 582  
                 }
 583  1000909
                 return myValue;
 584  
 
 585  
             case CANCELED:
 586  
             case INTERRUPTED:
 587  4
                 final CancellationException cancellation = new CancellationException(
 588  
                         "Future was canceled.");
 589  4
                 cancellation.initCause(myException);
 590  
 
 591  4
                 throw cancellation;
 592  
 
 593  
             default:
 594  0
                 throw new IllegalStateException("Sync in invalid state: "
 595  
                         + state);
 596  
             }
 597  
         }
 598  
     }
 599  
 
 600  
 }