View Javadoc
1   /*
2    * #%L
3    * FutureCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.util.concurrent.CancellationException;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.TimeoutException;
28  import java.util.concurrent.atomic.AtomicReference;
29  import java.util.concurrent.locks.AbstractQueuedSynchronizer;
30  
31  import com.allanbank.mongodb.Callback;
32  import com.allanbank.mongodb.ListenableFuture;
33  import com.allanbank.mongodb.LockType;
34  import com.allanbank.mongodb.client.callback.ReplyHandler;
35  import com.allanbank.mongodb.util.Assertions;
36  import com.allanbank.mongodb.util.log.Log;
37  import com.allanbank.mongodb.util.log.LogFactory;
38  
39  /**
40   * Implementation of a {@link Callback} and {@link ListenableFuture} interfaces.
41   * Used to convert a {@link Callback} into a {@link ListenableFuture} for
42   * returning to callers.
43   * 
44   * @param <V>
45   *            The type for the set value.
46   * 
47   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
48   *         mutated in incompatible ways between any two releases of the driver.
49   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
50   */
51  public class FutureCallback<V> implements ListenableFuture<V>, Callback<V> {
52  
53      /** Logger to log exceptions caught when running myPendingListeners. */
54      public static final Log LOG = LogFactory.getLog(FutureCallback.class);
55  
56      /** Amount of time to spin before yielding. Set to 1/100 of a millisecond. */
57      public static final long SPIN_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) / 100;
58  
59      /** Number of times to spin before trying something different. */
60      private static final int SPIN_ITERATIONS = 10000;
61  
62      /** Amount of time to spin/yield before waiting. Set to 1/2 millisecond. */
63      private static final long YIELD_TIME_NS = TimeUnit.MILLISECONDS.toNanos(1) >> 1;
64  
65      /** The type of lock to use when waiting for the future to be fulfilled. */
66      private final LockType myLockType;
67  
68      /** The runnable, executor pairs to execute within a singly linked list. */
69      private AtomicReference<PendingListener> myPendingListeners;
70  
71      /** Synchronization control for this Future. */
72      private final Sync<V> mySync;
73  
74      /**
75       * Create a new FutureCallback.
76       */
77      public FutureCallback() {
78          this(LockType.MUTEX);
79      }
80  
81      /**
82       * Create a new FutureCallback.
83       * 
84       * @param lockType
85       *            The type of lock to use when waiting for the future to be
86       *            fulfilled.
87       */
88      public FutureCallback(final LockType lockType) {
89          mySync = new Sync<V>();
90          myLockType = lockType;
91          myPendingListeners = new AtomicReference<PendingListener>(null);
92      }
93  
94      /**
95       * {@inheritDoc}
96       */
97      @Override
98      public void addListener(final Runnable runnable, final Executor executor) {
99          Assertions.assertNotNull(runnable, "Runnable is null.");
100         Assertions.assertNotNull(executor, "Executor is null.");
101 
102         if (!isDone()) {
103             PendingListener existing = myPendingListeners.get();
104             PendingListener listener = new PendingListener(runnable, executor,
105                     existing);
106 
107             while (!myPendingListeners.compareAndSet(existing, listener)) {
108                 existing = myPendingListeners.get();
109                 listener = new PendingListener(runnable, executor, existing);
110             }
111 
112             if (isDone()) {
113                 execute();
114             }
115         }
116         else {
117             // Run the executor.
118             execute(executor, runnable);
119         }
120     }
121 
122     /**
123      * {@inheritDoc}
124      * <p>
125      * Sets the value for the future and triggers any pending {@link #get} to
126      * return.
127      * </p>
128      * 
129      * @see Callback#callback
130      */
131     @Override
132     public void callback(final V result) {
133         final boolean set = mySync.set(result);
134         if (set) {
135             execute();
136         }
137     }
138 
139     /**
140      * {@inheritDoc}
141      * <p>
142      * If not cancelled and the callback has not completed then cancels the
143      * future, triggers the return of any pending {@link #get()} and returns
144      * true. Otherwise returns false. This does not stop the related MongoDB
145      * invocation.
146      * </p>
147      * 
148      * @see Future#cancel(boolean)
149      */
150     @Override
151     public boolean cancel(final boolean mayInterruptIfRunning) {
152         if (!mySync.cancel(mayInterruptIfRunning)) {
153             return false;
154         }
155         execute();
156 
157         return true;
158     }
159 
160     /**
161      * {@inheritDoc}
162      * <p>
163      * Sets the exception for the future and triggers any pending {@link #get}
164      * to throw a {@link ExecutionException}.
165      * </p>
166      * 
167      * @see Callback#exception
168      */
169     @Override
170     public void exception(final Throwable thrown) {
171         Assertions.assertNotNull(thrown, "Cannot set a null exception.");
172 
173         final boolean set = mySync.setException(thrown);
174         if (set) {
175             execute();
176         }
177     }
178 
179     /**
180      * {@inheritDoc}
181      * <p>
182      * Returns the value set via the {@link Callback}.
183      * </p>
184      * 
185      * @see Future#get()
186      */
187     @Override
188     public V get() throws InterruptedException, ExecutionException {
189 
190         if (myLockType == LockType.LOW_LATENCY_SPIN) {
191             long now = 0;
192             long spinDeadline = 1;
193             long yeildDeadline = 1;
194             while ((now < yeildDeadline) && !isDone()) {
195                 for (int i = 0; (i < SPIN_ITERATIONS) && !isDone(); ++i) {
196                     // Hard spin...
197                 }
198 
199                 if (!isDone()) {
200                     // Pause?
201                     now = System.nanoTime();
202                     if (spinDeadline == 1) {
203                         spinDeadline = now + SPIN_TIME_NS;
204                         yeildDeadline = now + YIELD_TIME_NS;
205                         // First time yield to allow other threads to do their
206                         // work...
207                         Thread.yield();
208                     }
209                     else {
210                         if ((spinDeadline < now) && (now < yeildDeadline)) {
211                             Thread.yield();
212                         }
213                     }
214                 }
215             }
216         }
217 
218         final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
219         while (true) {
220             try {
221                 // Either the value is available and the get() will not block
222                 // or we have spun for long enough and it is time to block.
223                 return mySync.get(shortPause);
224             }
225             catch (final TimeoutException te) {
226                 ReplyHandler.tryReceive();
227             }
228         }
229     }
230 
231     /**
232      * {@inheritDoc}
233      */
234     @Override
235     public V get(final long timeout, final TimeUnit unit)
236             throws InterruptedException, TimeoutException, ExecutionException {
237         long now = System.nanoTime();
238         final long deadline = now + unit.toNanos(timeout);
239         final long shortPause = TimeUnit.MILLISECONDS.toNanos(10);
240         while (true) {
241             try {
242                 // Wait for the result.
243                 return mySync.get(Math.min((deadline - now), shortPause));
244             }
245             catch (final TimeoutException te) {
246                 // Check if we should receive.
247                 now = System.nanoTime();
248                 if (now < deadline) {
249                     ReplyHandler.tryReceive();
250                 }
251                 else {
252                     throw te;
253                 }
254             }
255         }
256     }
257 
258     /**
259      * {@inheritDoc}
260      * <p>
261      * Returns true if {@link #cancel(boolean)} has been called.
262      * </p>
263      * 
264      * @see Future#isCancelled()
265      */
266     @Override
267     public boolean isCancelled() {
268         return mySync.isCancelled();
269     }
270 
271     /**
272      * {@inheritDoc}
273      * <p>
274      * True if a value has been set via the the {@link Callback} interface or
275      * the {@link Future} has been {@link #cancel(boolean) cancelled}.
276      * </p>
277      * 
278      * @see Future#isDone()
279      */
280     @Override
281     public boolean isDone() {
282         return mySync.isDone();
283     }
284 
285     /**
286      * Runs this execution list, executing all existing pairs.
287      * <p>
288      * All callers of this method will drain the list of listeners.
289      * </p>
290      */
291     protected void execute() {
292         PendingListener toRun;
293         PendingListener next;
294 
295         // Keep running until the list is exhausted.
296         do {
297 
298             // Pick the next item from the list.
299             do {
300                 toRun = myPendingListeners.get();
301                 next = (toRun != null) ? toRun.myNext : null;
302             }
303             while (!myPendingListeners.compareAndSet(toRun, next));
304 
305             // Run this item - if it exists.
306             if (toRun != null) {
307                 execute(toRun.myExecutor, toRun.myRunnable);
308             }
309         }
310         while (toRun != null);
311     }
312 
313     /**
314      * Execute the {@link Runnable} with the {@link Executor} suppressing
315      * exceptions.
316      * 
317      * @param executor
318      *            The executor to use.
319      * @param runnable
320      *            The {@link Runnable} to execute.
321      */
322     private void execute(final Executor executor, final Runnable runnable) {
323         try {
324             executor.execute(runnable);
325         }
326         catch (final RuntimeException e) {
327             LOG.error(e, "Exception running a FutureListener's runnable {} "
328                     + "with executor {}", runnable, executor);
329         }
330     }
331 
332     /**
333      * PendingListener an immutable element in the list of listeners.
334      * 
335      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
336      */
337     /* package */static final class PendingListener {
338 
339         /** The executor to use to run the {@link Runnable}. */
340         /* package */final Executor myExecutor;
341 
342         /** The next item to execute. */
343         /* package */final PendingListener myNext;
344 
345         /** The listener's {@link Runnable}. */
346         /* package */final Runnable myRunnable;
347 
348         /**
349          * Creates a new PendingListener.
350          * 
351          * @param runnable
352          *            The listener's {@link Runnable}.
353          * @param executor
354          *            The executor to use to run the {@link Runnable}.
355          * @param next
356          *            The next item to execute.
357          */
358         /* package */PendingListener(final Runnable runnable,
359                 final Executor executor, final PendingListener next) {
360             myRunnable = runnable;
361             myExecutor = executor;
362             myNext = next;
363         }
364     }
365 
366     /**
367      * Tracks the state of the Future via the {@link AbstractQueuedSynchronizer}
368      * model. The state starts in the {@link #RUNNING} state. The first thread
369      * to complete the future moves the state to the {@link #COMPLETING} state,
370      * sets the value and then sets the appropriate final state.
371      * 
372      * @param <V>
373      *            The type of value for the future.
374      */
375     /* package */static final class Sync<V> extends AbstractQueuedSynchronizer {
376 
377         /** State to represent the future was canceled. */
378         /* package */static final int CANCELED = 4;
379 
380         /** State to represent the value has been set. */
381         /* package */static final int COMPLETED = 2;
382 
383         /** State set while the value is being set. */
384         /* package */static final int COMPLETING = 1;
385 
386         /** State to represent the future was interrupted. */
387         /* package */static final int INTERRUPTED = 8;
388 
389         /** The initial running state. */
390         /* package */static final int RUNNING = 0;
391 
392         /** The unused value passed to {@link #acquire(int)} methods. */
393         /* package */static final int UNUSED = -1;
394 
395         /** Serialization version of the class. */
396         private static final long serialVersionUID = -9189950787072982459L;
397 
398         /** The exception for the future. */
399         private Throwable myException;
400 
401         /** The value set in the future. */
402         private V myValue;
403 
404         /**
405          * Creates a new Sync.
406          */
407         /* package */Sync() {
408             myValue = null;
409             myException = null;
410         }
411 
412         /**
413          * Acquisition succeeds if we are done, otherwise fail.
414          */
415         @Override
416         protected int tryAcquireShared(final int ignored) {
417             if (isDone()) {
418                 return 1;
419             }
420             return -1;
421         }
422 
423         /**
424          * We always allow a release to finish. We define it to represent that a
425          * state transition completed.
426          */
427         @Override
428         protected boolean tryReleaseShared(final int finalState) {
429             setState(finalState);
430             return true;
431         }
432 
433         /**
434          * Move to the CANCELED or INTERRUPTED state.
435          * 
436          * @param interrupt
437          *            If we are interrupted.
438          * @return If the cancel worked / won.
439          */
440         /* package */boolean cancel(final boolean interrupt) {
441             return complete(null, null, interrupt ? INTERRUPTED : CANCELED);
442         }
443 
444         /**
445          * Blocks until the future {@link #complete(Object, Throwable, int)
446          * completes}.
447          * 
448          * @return The value set for the future.
449          * @throws CancellationException
450          *             If the future was canceled.
451          * @throws ExecutionException
452          *             If the future failed due to an exception.
453          * @throws InterruptedException
454          *             If these call is interrupted.
455          */
456         /* package */V get() throws CancellationException, ExecutionException,
457                 InterruptedException {
458 
459             // Acquire the shared lock allowing interruption.
460             acquireSharedInterruptibly(UNUSED);
461 
462             return getValue();
463         }
464 
465         /**
466          * Blocks until the future {@link #complete(Object, Throwable, int)
467          * completes} or the number of nano-seconds expires.
468          * 
469          * @param nanos
470          *            The number of nano-seconds to wait.
471          * @return The value set for the future.
472          * @throws TimeoutException
473          *             If this call time expires.
474          * @throws CancellationException
475          *             If the future was canceled.
476          * @throws ExecutionException
477          *             If the future failed due to an exception.
478          * @throws InterruptedException
479          *             If these call is interrupted.
480          */
481         /* package */V get(final long nanos) throws TimeoutException,
482                 CancellationException, ExecutionException, InterruptedException {
483 
484             // Attempt to acquire the shared lock with a timeout.
485             if (!tryAcquireSharedNanos(UNUSED, nanos)) {
486                 throw new TimeoutException("Timeout waiting for task.");
487             }
488 
489             return getValue();
490         }
491 
492         /**
493          * Checks if the state is {@link #CANCELED} or {@link #INTERRUPTED}.
494          * 
495          * @return True if the future state is {@link #CANCELED} or
496          *         {@link #INTERRUPTED}.
497          */
498         /* package */boolean isCancelled() {
499             return (getState() & (CANCELED | INTERRUPTED)) != 0;
500         }
501 
502         /**
503          * Checks if the state is {@link #COMPLETED}, {@link #CANCELED} or
504          * {@link #INTERRUPTED}.
505          * 
506          * @return True if the future state is {@link #COMPLETED},
507          *         {@link #CANCELED} or {@link #INTERRUPTED}.
508          */
509         /* package */boolean isDone() {
510             return (getState() & (COMPLETED | CANCELED | INTERRUPTED)) != 0;
511         }
512 
513         /**
514          * Move to the COMPLETED state and set the value.
515          * 
516          * @param value
517          *            The value to set.
518          * @return If the set worked / won.
519          */
520         /* package */boolean set(final V value) {
521             return complete(value, null, COMPLETED);
522         }
523 
524         /**
525          * Move to the COMPLETED state and set the exception value.
526          * 
527          * @param thrown
528          *            The exception to set.
529          * @return If the set worked / won.
530          */
531         /* package */boolean setException(final Throwable thrown) {
532             return complete(null, thrown, COMPLETED);
533         }
534 
535         /**
536          * Completes the future.
537          * 
538          * @param value
539          *            The value to set as the result of the future.
540          * @param thrown
541          *            the exception to set as the result of the future.
542          * @param finalState
543          *            the state to transition to.
544          * @return Returns true if the completion was successful / won.
545          */
546         private boolean complete(final V value, final Throwable thrown,
547                 final int finalState) {
548 
549             // Move to COMPLETING to see if we are the first to complete.
550             final boolean won = compareAndSetState(RUNNING, COMPLETING);
551             if (won) {
552                 this.myValue = value;
553                 this.myException = ((finalState & (CANCELED | INTERRUPTED)) != 0) ? new CancellationException(
554                         "Future was canceled.") : thrown;
555 
556                 // Release all of the waiting threads.
557                 releaseShared(finalState);
558             }
559             else if (getState() == COMPLETING) {
560                 // Block until done.
561                 acquireShared(UNUSED);
562             }
563 
564             return won;
565         }
566 
567         /**
568          * Implementation to get the future's value.
569          * 
570          * @return The value set for the future.
571          * @throws CancellationException
572          *             If the future was canceled.
573          * @throws ExecutionException
574          *             If the future failed due to an exception.
575          */
576         private V getValue() throws CancellationException, ExecutionException {
577             final int state = getState();
578             switch (state) {
579             case COMPLETED:
580                 if (myException != null) {
581                     throw new ExecutionException(myException);
582                 }
583                 return myValue;
584 
585             case CANCELED:
586             case INTERRUPTED:
587                 final CancellationException cancellation = new CancellationException(
588                         "Future was canceled.");
589                 cancellation.initCause(myException);
590 
591                 throw cancellation;
592 
593             default:
594                 throw new IllegalStateException("Sync in invalid state: "
595                         + state);
596             }
597         }
598     }
599 
600 }