View Javadoc
1   /*
2    * #%L
3    * TwoThreadSocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.connection.socket;
21  
22  import java.io.IOException;
23  import java.net.SocketException;
24  
25  import com.allanbank.mongodb.MongoClientConfiguration;
26  import com.allanbank.mongodb.MongoDbException;
27  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
28  import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
29  import com.allanbank.mongodb.bson.io.StringDecoderCache;
30  import com.allanbank.mongodb.bson.io.StringEncoderCache;
31  import com.allanbank.mongodb.client.Message;
32  import com.allanbank.mongodb.client.callback.AddressAware;
33  import com.allanbank.mongodb.client.callback.ReplyCallback;
34  import com.allanbank.mongodb.client.message.BuildInfo;
35  import com.allanbank.mongodb.client.message.PendingMessage;
36  import com.allanbank.mongodb.client.message.PendingMessageQueue;
37  import com.allanbank.mongodb.client.state.Server;
38  import com.allanbank.mongodb.client.state.ServerUpdateCallback;
39  import com.allanbank.mongodb.util.IOUtils;
40  
41  /**
42   * Provides a blocking Socket based connection to a MongoDB server.
43   * <p>
44   * This version uses a pair of threads (1 send and 1 receive) to handle the
45   * messages going to and from MongoDB.
46   * </p>
47   * <p>
48   * This implementation was the default for the driver through the 1.2.3 release.
49   * It is still used by the driver for communication sockets that are not know to
50   * be standard Java Sockets as it performs better when the communication path
51   * does not have built in buffering of messages.
52   * </p>
53   * 
54   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
55   *         mutated in incompatible ways between any two releases of the driver.
56   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
57   */
58  public class TwoThreadSocketConnection extends AbstractSocketConnection {
59  
60      /** The writer for BSON documents. */
61      protected final BufferingBsonOutputStream myBsonOut;
62  
63      /** The queue of messages to be sent. */
64      protected final PendingMessageQueue myToSendQueue;
65  
66      /** The thread receiving replies. */
67      private final Thread myReceiver;
68  
69      /** The thread sending messages. */
70      private final Thread mySender;
71  
72      /**
73       * Creates a new SocketConnection to a MongoDB server.
74       * 
75       * @param server
76       *            The MongoDB server to connect to.
77       * @param config
78       *            The configuration for the Connection to the MongoDB server.
79       * @throws SocketException
80       *             On a failure connecting to the MongoDB server.
81       * @throws IOException
82       *             On a failure to read or write data to the MongoDB server.
83       */
84      public TwoThreadSocketConnection(final Server server,
85              final MongoClientConfiguration config) throws SocketException,
86              IOException {
87          this(server, config, new StringEncoderCache(), new StringDecoderCache());
88      }
89  
90      /**
91       * Creates a new SocketConnection to a MongoDB server.
92       * 
93       * @param server
94       *            The MongoDB server to connect to.
95       * @param config
96       *            The configuration for the Connection to the MongoDB server.
97       * @param encoderCache
98       *            Cache used for encoding strings.
99       * @param decoderCache
100      *            Cache used for decoding strings.
101      * @throws SocketException
102      *             On a failure connecting to the MongoDB server.
103      * @throws IOException
104      *             On a failure to read or write data to the MongoDB server.
105      */
106     public TwoThreadSocketConnection(final Server server,
107             final MongoClientConfiguration config,
108             final StringEncoderCache encoderCache,
109             final StringDecoderCache decoderCache) throws SocketException,
110             IOException {
111         super(server, config, encoderCache, decoderCache);
112 
113         myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream(
114                 encoderCache));
115 
116         myToSendQueue = new PendingMessageQueue(
117                 config.getMaxPendingOperationsPerConnection(),
118                 config.getLockType());
119 
120         myReceiver = config.getThreadFactory().newThread(
121                 new ReceiveRunnable(this));
122         myReceiver.setDaemon(true);
123         myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
124                 + myServer.getCanonicalName());
125 
126         mySender = config.getThreadFactory().newThread(new SendRunnable());
127         mySender.setDaemon(true);
128         mySender.setName("MongoDB " + mySocket.getLocalPort() + "-->"
129                 + myServer.getCanonicalName());
130     }
131 
132     /**
133      * {@inheritDoc}
134      */
135     @Override
136     public void close() throws IOException {
137         final boolean wasOpen = myOpen.get();
138         myOpen.set(false);
139 
140         mySender.interrupt();
141         myReceiver.interrupt();
142 
143         try {
144             if (Thread.currentThread() != mySender) {
145                 mySender.join();
146             }
147         }
148         catch (final InterruptedException ie) {
149             // Ignore.
150         }
151         finally {
152             // Now that output is shutdown. Close up the socket. This
153             // Triggers the receiver to close if the interrupt didn't work.
154             myOutput.close();
155             myInput.close();
156             mySocket.close();
157         }
158 
159         try {
160             if (Thread.currentThread() != myReceiver) {
161                 myReceiver.join();
162             }
163         }
164         catch (final InterruptedException ie) {
165             // Ignore.
166         }
167 
168         myEventSupport.firePropertyChange(OPEN_PROP_NAME, wasOpen, false);
169     }
170 
171     /**
172      * {@inheritDoc}
173      */
174     @Override
175     public int getPendingCount() {
176         return super.getPendingCount() + myToSendQueue.size();
177     }
178 
179     /**
180      * {@inheritDoc}
181      * <p>
182      * True if the send and pending queues are empty.
183      * </p>
184      */
185     @Override
186     public boolean isIdle() {
187         return super.isIdle() && myToSendQueue.isEmpty();
188     }
189 
190     /**
191      * {@inheritDoc}
192      * <p>
193      * Notifies the appropriate messages of the error.
194      * </p>
195      */
196     @Override
197     public void raiseErrors(final MongoDbException exception) {
198         final PendingMessage message = new PendingMessage();
199         while (myToSendQueue.poll(message)) {
200             raiseError(exception, message.getReplyCallback());
201         }
202 
203         super.raiseErrors(exception);
204     }
205 
206     /**
207      * {@inheritDoc}
208      */
209     @Override
210     public void send(final Message message1, final Message message2,
211             final ReplyCallback replyCallback) throws MongoDbException {
212 
213         validate(message1, message2);
214 
215         if (replyCallback instanceof AddressAware) {
216             ((AddressAware) replyCallback).setAddress(myServer
217                     .getCanonicalName());
218         }
219 
220         try {
221             myToSendQueue.put(message1, null, message2, replyCallback);
222         }
223         catch (final InterruptedException e) {
224             throw new MongoDbException(e);
225         }
226     }
227 
228     /**
229      * {@inheritDoc}
230      */
231     @Override
232     public void send(final Message message, final ReplyCallback replyCallback)
233             throws MongoDbException {
234 
235         validate(message, null);
236 
237         if (replyCallback instanceof AddressAware) {
238             ((AddressAware) replyCallback).setAddress(myServer
239                     .getCanonicalName());
240         }
241 
242         try {
243             myToSendQueue.put(message, replyCallback);
244         }
245         catch (final InterruptedException e) {
246             throw new MongoDbException(e);
247         }
248     }
249 
250     /**
251      * Starts the connections read and write threads.
252      */
253     @Override
254     public void start() {
255         myReceiver.start();
256         mySender.start();
257 
258         if (myServer.needBuildInfo()) {
259             send(new BuildInfo(), new ServerUpdateCallback(myServer));
260         }
261     }
262 
263     /**
264      * Runnable to push data out over the MongoDB connection.
265      * 
266      * @copyright 2011, Allanbank Consulting, Inc., All Rights Reserved
267      */
268     protected class SendRunnable implements Runnable {
269 
270         /** Tracks if there are messages in the buffer that need to be flushed. */
271         private boolean myNeedToFlush = false;
272 
273         /** The {@link PendingMessage} used for the local cached copy. */
274         private final PendingMessage myPendingMessage = new PendingMessage();
275 
276         /**
277          * {@inheritDoc}
278          * <p>
279          * Overridden to pull messages off the
280          * {@link TwoThreadSocketConnection#myToSendQueue} and push them into
281          * the socket connection. If <code>null</code> is ever received from a
282          * poll of the queue then the socket connection is flushed and blocking
283          * call is made to the queue.
284          * </p>
285          * 
286          * @see Runnable#run()
287          */
288         @Override
289         public void run() {
290             boolean sawError = false;
291             try {
292                 while (myOpen.get() && !sawError) {
293                     try {
294                         sendOne();
295                     }
296                     catch (final InterruptedException ie) {
297                         // Handled by loop but if we have a message, need to
298                         // tell him something bad happened (but we shouldn't).
299                         raiseError(ie, myPendingMessage.getReplyCallback());
300                     }
301                     catch (final IOException ioe) {
302                         myLog.warn(ioe, "I/O Error sending a message.");
303                         raiseError(ioe, myPendingMessage.getReplyCallback());
304                         sawError = true;
305                     }
306                     catch (final RuntimeException re) {
307                         myLog.warn(re, "Runtime error sending a message.");
308                         raiseError(re, myPendingMessage.getReplyCallback());
309                         sawError = true;
310                     }
311                     catch (final Error error) {
312                         myLog.error(error, "Error sending a message.");
313                         raiseError(error, myPendingMessage.getReplyCallback());
314                         sawError = true;
315                     }
316                     finally {
317                         myPendingMessage.clear();
318                     }
319                 }
320             }
321             finally {
322                 // This may/will fail because we are dying.
323                 try {
324                     if (myOpen.get()) {
325                         doFlush();
326                     }
327                 }
328                 catch (final IOException ioe) {
329                     myLog.warn(ioe, "I/O Error on final flush of messages.");
330                 }
331                 finally {
332                     // Make sure we get shutdown completely.
333                     IOUtils.close(TwoThreadSocketConnection.this);
334                 }
335             }
336         }
337 
338         /**
339          * Flushes the messages in the buffer and clears the need-to-flush flag.
340          * 
341          * @throws IOException
342          *             On a failure flushing the messages.
343          */
344         protected final void doFlush() throws IOException {
345             if (myNeedToFlush) {
346                 flush();
347                 myNeedToFlush = false;
348             }
349         }
350 
351         /**
352          * Sends a single message.
353          * 
354          * @throws InterruptedException
355          *             If the thread is interrupted waiting for a message to
356          *             send.
357          * @throws IOException
358          *             On a failure sending the message.
359          */
360         protected final void sendOne() throws InterruptedException, IOException {
361             boolean took = false;
362             if (myNeedToFlush) {
363                 took = myToSendQueue.poll(myPendingMessage);
364             }
365             else {
366                 myToSendQueue.take(myPendingMessage);
367                 took = true;
368             }
369 
370             if (took) {
371                 myNeedToFlush = true;
372 
373                 myPendingMessage.getMessage().write(
374                         myPendingMessage.getMessageId(), myBsonOut);
375 
376                 send(myPendingMessage, myBsonOut.getOutput());
377 
378                 // We have handed the message off. Not our problem any more.
379                 // We could legitimately do this before the send but in the case
380                 // of an I/O error the send's exception is more meaningful then
381                 // the receivers generic "Didn't get a reply".
382                 myPendingMessage.clear();
383             }
384             else {
385                 doFlush();
386             }
387         }
388     }
389 }