Coverage Report - com.allanbank.mongodb.client.callback.ReplyHandler
 
Classes in this File Line Coverage Branch Coverage Complexity
ReplyHandler
90%
39/43
87%
14/16
2.429
 
 1  
 /*
 2  
  * #%L
 3  
  * ReplyHandler.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.callback;
 22  
 
 23  
 import java.util.concurrent.Executor;
 24  
 import java.util.concurrent.RejectedExecutionException;
 25  
 
 26  
 import com.allanbank.mongodb.client.message.Reply;
 27  
 
 28  
 /**
 29  
  * ReplyHandler provides the capability to properly handle the replies to a
 30  
  * callback.
 31  
  * 
 32  
  * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
 33  
  */
 34  
 public class ReplyHandler implements Runnable {
 35  
 
 36  
     /** The socket that we are receiving for. */
 37  1
     private static final ThreadLocal<Receiver> ourReceiver = new ThreadLocal<Receiver>();
 38  
 
 39  
     /**
 40  
      * Raise an error on the callback, if any. Will execute the request on a
 41  
      * background thread if provided.
 42  
      * 
 43  
      * @param exception
 44  
      *            The thrown exception.
 45  
      * @param replyCallback
 46  
      *            The callback for the reply to the message.
 47  
      * @param executor
 48  
      *            The executor to use for the back-grounding the reply handling.
 49  
      */
 50  
     public static void raiseError(final Throwable exception,
 51  
             final ReplyCallback replyCallback, final Executor executor) {
 52  106
         if (replyCallback != null) {
 53  66
             if (executor != null) {
 54  
                 try {
 55  3
                     executor.execute(new ReplyHandler(replyCallback, exception));
 56  
                 }
 57  1
                 catch (final RejectedExecutionException rej) {
 58  
                     // Run on this thread.
 59  1
                     replyCallback.exception(exception);
 60  3
                 }
 61  
             }
 62  
             else {
 63  63
                 replyCallback.exception(exception);
 64  
             }
 65  
         }
 66  106
     }
 67  
 
 68  
     /**
 69  
      * Updates to set the reply for the callback, if any.
 70  
      * 
 71  
      * @param receiver
 72  
      *            The socket receiving the message.
 73  
      * @param reply
 74  
      *            The reply.
 75  
      * @param replyCallback
 76  
      *            The callback for the reply to the message.
 77  
      * @param executor
 78  
      *            The executor to use for the back-grounding the reply handling.
 79  
      */
 80  
     public static void reply(final Receiver receiver, final Reply reply,
 81  
             final ReplyCallback replyCallback, final Executor executor) {
 82  219
         if (replyCallback != null) {
 83  
             // We know the FutureCallback will not block or take long to process
 84  
             // so just use this thread in that case.
 85  216
             final boolean lightWeight = replyCallback.isLightWeight();
 86  216
             if (!lightWeight && (executor != null)) {
 87  
                 try {
 88  1
                     executor.execute(new ReplyHandler(replyCallback, reply));
 89  
                 }
 90  0
                 catch (final RejectedExecutionException rej) {
 91  
                     // Run on this thread.
 92  0
                     run(receiver, reply, replyCallback);
 93  1
                 }
 94  
             }
 95  
             else {
 96  215
                 run(receiver, reply, replyCallback);
 97  
             }
 98  
         }
 99  219
     }
 100  
 
 101  
     /**
 102  
      * If there is a pending reply tries to process that reply.
 103  
      */
 104  
     public static void tryReceive() {
 105  159
         final Receiver receiver = ourReceiver.get();
 106  
 
 107  159
         if (receiver != null) {
 108  0
             receiver.tryReceive();
 109  
         }
 110  159
     }
 111  
 
 112  
     /**
 113  
      * Runs the callback on the current thread.
 114  
      * 
 115  
      * @param receiver
 116  
      *            The receiver to be run.
 117  
      * @param reply
 118  
      *            The reply to the provide to the callback.
 119  
      * @param replyCallback
 120  
      *            The reply callback.
 121  
      */
 122  
     private static void run(final Receiver receiver, final Reply reply,
 123  
             final ReplyCallback replyCallback) {
 124  215
         final Receiver before = ourReceiver.get();
 125  
         try {
 126  215
             ourReceiver.set(receiver);
 127  215
             replyCallback.callback(reply);
 128  
         }
 129  
         finally {
 130  215
             ourReceiver.set(before);
 131  215
         }
 132  215
     }
 133  
 
 134  
     /** The exception raised from processing the message. */
 135  
     private final Throwable myError;
 136  
 
 137  
     /** The reply to the message. */
 138  
     private final Reply myReply;
 139  
 
 140  
     /** The callback for the reply to the message. */
 141  
     private final ReplyCallback myReplyCallback;
 142  
 
 143  
     /**
 144  
      * Creates a new ReplyHandler.
 145  
      * 
 146  
      * @param replyCallback
 147  
      *            The callback for the message.
 148  
      * @param reply
 149  
      *            The reply.
 150  
      */
 151  
     public ReplyHandler(final ReplyCallback replyCallback, final Reply reply) {
 152  1
         super();
 153  1
         myReplyCallback = replyCallback;
 154  1
         myReply = reply;
 155  1
         myError = null;
 156  1
     }
 157  
 
 158  
     /**
 159  
      * Creates a new ReplyHandler.
 160  
      * 
 161  
      * @param replyCallback
 162  
      *            The callback for the message.
 163  
      * @param exception
 164  
      *            The thrown exception.
 165  
      */
 166  
     public ReplyHandler(final ReplyCallback replyCallback,
 167  
             final Throwable exception) {
 168  3
         super();
 169  3
         myReplyCallback = replyCallback;
 170  3
         myError = exception;
 171  3
         myReply = null;
 172  3
     }
 173  
 
 174  
     /**
 175  
      * {@inheritDoc}
 176  
      * <p>
 177  
      * Overridden to process the callback response.
 178  
      * </p>
 179  
      */
 180  
     @Override
 181  
     public void run() {
 182  2
         if (myReply != null) {
 183  0
             reply(null, myReply, myReplyCallback, null);
 184  
         }
 185  2
         else if (myError != null) {
 186  1
             raiseError(myError, myReplyCallback, null);
 187  
         }
 188  2
     }
 189  
 }