View Javadoc
1   /*
2    * #%L
3    * ReplyHandler.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.callback;
22  
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import com.allanbank.mongodb.client.message.Reply;
27  
28  /**
29   * ReplyHandler provides the capability to properly handle the replies to a
30   * callback.
31   * 
32   * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
33   */
34  public class ReplyHandler implements Runnable {
35  
36      /** The socket that we are receiving for. */
37      private static final ThreadLocal<Receiver> ourReceiver = new ThreadLocal<Receiver>();
38  
39      /**
40       * Raise an error on the callback, if any. Will execute the request on a
41       * background thread if provided.
42       * 
43       * @param exception
44       *            The thrown exception.
45       * @param replyCallback
46       *            The callback for the reply to the message.
47       * @param executor
48       *            The executor to use for the back-grounding the reply handling.
49       */
50      public static void raiseError(final Throwable exception,
51              final ReplyCallback replyCallback, final Executor executor) {
52          if (replyCallback != null) {
53              if (executor != null) {
54                  try {
55                      executor.execute(new ReplyHandler(replyCallback, exception));
56                  }
57                  catch (final RejectedExecutionException rej) {
58                      // Run on this thread.
59                      replyCallback.exception(exception);
60                  }
61              }
62              else {
63                  replyCallback.exception(exception);
64              }
65          }
66      }
67  
68      /**
69       * Updates to set the reply for the callback, if any.
70       * 
71       * @param receiver
72       *            The socket receiving the message.
73       * @param reply
74       *            The reply.
75       * @param replyCallback
76       *            The callback for the reply to the message.
77       * @param executor
78       *            The executor to use for the back-grounding the reply handling.
79       */
80      public static void reply(final Receiver receiver, final Reply reply,
81              final ReplyCallback replyCallback, final Executor executor) {
82          if (replyCallback != null) {
83              // We know the FutureCallback will not block or take long to process
84              // so just use this thread in that case.
85              final boolean lightWeight = replyCallback.isLightWeight();
86              if (!lightWeight && (executor != null)) {
87                  try {
88                      executor.execute(new ReplyHandler(replyCallback, reply));
89                  }
90                  catch (final RejectedExecutionException rej) {
91                      // Run on this thread.
92                      run(receiver, reply, replyCallback);
93                  }
94              }
95              else {
96                  run(receiver, reply, replyCallback);
97              }
98          }
99      }
100 
101     /**
102      * If there is a pending reply tries to process that reply.
103      */
104     public static void tryReceive() {
105         final Receiver receiver = ourReceiver.get();
106 
107         if (receiver != null) {
108             receiver.tryReceive();
109         }
110     }
111 
112     /**
113      * Runs the callback on the current thread.
114      * 
115      * @param receiver
116      *            The receiver to be run.
117      * @param reply
118      *            The reply to the provide to the callback.
119      * @param replyCallback
120      *            The reply callback.
121      */
122     private static void run(final Receiver receiver, final Reply reply,
123             final ReplyCallback replyCallback) {
124         final Receiver before = ourReceiver.get();
125         try {
126             ourReceiver.set(receiver);
127             replyCallback.callback(reply);
128         }
129         finally {
130             ourReceiver.set(before);
131         }
132     }
133 
134     /** The exception raised from processing the message. */
135     private final Throwable myError;
136 
137     /** The reply to the message. */
138     private final Reply myReply;
139 
140     /** The callback for the reply to the message. */
141     private final ReplyCallback myReplyCallback;
142 
143     /**
144      * Creates a new ReplyHandler.
145      * 
146      * @param replyCallback
147      *            The callback for the message.
148      * @param reply
149      *            The reply.
150      */
151     public ReplyHandler(final ReplyCallback replyCallback, final Reply reply) {
152         super();
153         myReplyCallback = replyCallback;
154         myReply = reply;
155         myError = null;
156     }
157 
158     /**
159      * Creates a new ReplyHandler.
160      * 
161      * @param replyCallback
162      *            The callback for the message.
163      * @param exception
164      *            The thrown exception.
165      */
166     public ReplyHandler(final ReplyCallback replyCallback,
167             final Throwable exception) {
168         super();
169         myReplyCallback = replyCallback;
170         myError = exception;
171         myReply = null;
172     }
173 
174     /**
175      * {@inheritDoc}
176      * <p>
177      * Overridden to process the callback response.
178      * </p>
179      */
180     @Override
181     public void run() {
182         if (myReply != null) {
183             reply(null, myReply, myReplyCallback, null);
184         }
185         else if (myError != null) {
186             raiseError(myError, myReplyCallback, null);
187         }
188     }
189 }