Coverage Report - com.allanbank.mongodb.client.connection.socket.TwoThreadSocketConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
TwoThreadSocketConnection
87%
50/57
81%
13/16
3
TwoThreadSocketConnection$SendRunnable
93%
44/47
91%
11/12
3
 
 1  
 /*
 2  
  * #%L
 3  
  * TwoThreadSocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.connection.socket;
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.net.SocketException;
 24  
 
 25  
 import com.allanbank.mongodb.MongoClientConfiguration;
 26  
 import com.allanbank.mongodb.MongoDbException;
 27  
 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
 28  
 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
 29  
 import com.allanbank.mongodb.bson.io.StringDecoderCache;
 30  
 import com.allanbank.mongodb.bson.io.StringEncoderCache;
 31  
 import com.allanbank.mongodb.client.Message;
 32  
 import com.allanbank.mongodb.client.callback.AddressAware;
 33  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 34  
 import com.allanbank.mongodb.client.message.BuildInfo;
 35  
 import com.allanbank.mongodb.client.message.PendingMessage;
 36  
 import com.allanbank.mongodb.client.message.PendingMessageQueue;
 37  
 import com.allanbank.mongodb.client.state.Server;
 38  
 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
 39  
 import com.allanbank.mongodb.util.IOUtils;
 40  
 
 41  
 /**
 42  
  * Provides a blocking Socket based connection to a MongoDB server.
 43  
  * <p>
 44  
  * This version uses a pair of threads (1 send and 1 receive) to handle the
 45  
  * messages going to and from MongoDB.
 46  
  * </p>
 47  
  * <p>
 48  
  * This implementation was the default for the driver through the 1.2.3 release.
 49  
  * It is still used by the driver for communication sockets that are not know to
 50  
  * be standard Java Sockets as it performs better when the communication path
 51  
  * does not have built in buffering of messages.
 52  
  * </p>
 53  
  * 
 54  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 55  
  *         mutated in incompatible ways between any two releases of the driver.
 56  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 57  
  */
 58  
 public class TwoThreadSocketConnection extends AbstractSocketConnection {
 59  
 
 60  
     /** The writer for BSON documents. */
 61  
     protected final BufferingBsonOutputStream myBsonOut;
 62  
 
 63  
     /** The queue of messages to be sent. */
 64  
     protected final PendingMessageQueue myToSendQueue;
 65  
 
 66  
     /** The thread receiving replies. */
 67  
     private final Thread myReceiver;
 68  
 
 69  
     /** The thread sending messages. */
 70  
     private final Thread mySender;
 71  
 
 72  
     /**
 73  
      * Creates a new SocketConnection to a MongoDB server.
 74  
      * 
 75  
      * @param server
 76  
      *            The MongoDB server to connect to.
 77  
      * @param config
 78  
      *            The configuration for the Connection to the MongoDB server.
 79  
      * @throws SocketException
 80  
      *             On a failure connecting to the MongoDB server.
 81  
      * @throws IOException
 82  
      *             On a failure to read or write data to the MongoDB server.
 83  
      */
 84  
     public TwoThreadSocketConnection(final Server server,
 85  
             final MongoClientConfiguration config) throws SocketException,
 86  
             IOException {
 87  39
         this(server, config, new StringEncoderCache(), new StringDecoderCache());
 88  36
     }
 89  
 
 90  
     /**
 91  
      * Creates a new SocketConnection to a MongoDB server.
 92  
      * 
 93  
      * @param server
 94  
      *            The MongoDB server to connect to.
 95  
      * @param config
 96  
      *            The configuration for the Connection to the MongoDB server.
 97  
      * @param encoderCache
 98  
      *            Cache used for encoding strings.
 99  
      * @param decoderCache
 100  
      *            Cache used for decoding strings.
 101  
      * @throws SocketException
 102  
      *             On a failure connecting to the MongoDB server.
 103  
      * @throws IOException
 104  
      *             On a failure to read or write data to the MongoDB server.
 105  
      */
 106  
     public TwoThreadSocketConnection(final Server server,
 107  
             final MongoClientConfiguration config,
 108  
             final StringEncoderCache encoderCache,
 109  
             final StringDecoderCache decoderCache) throws SocketException,
 110  
             IOException {
 111  40
         super(server, config, encoderCache, decoderCache);
 112  
 
 113  37
         myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream(
 114  
                 encoderCache));
 115  
 
 116  37
         myToSendQueue = new PendingMessageQueue(
 117  
                 config.getMaxPendingOperationsPerConnection(),
 118  
                 config.getLockType());
 119  
 
 120  37
         myReceiver = config.getThreadFactory().newThread(
 121  
                 new ReceiveRunnable(this));
 122  37
         myReceiver.setDaemon(true);
 123  37
         myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
 124  
                 + myServer.getCanonicalName());
 125  
 
 126  37
         mySender = config.getThreadFactory().newThread(new SendRunnable());
 127  37
         mySender.setDaemon(true);
 128  37
         mySender.setName("MongoDB " + mySocket.getLocalPort() + "-->"
 129  
                 + myServer.getCanonicalName());
 130  37
     }
 131  
 
 132  
     /**
 133  
      * {@inheritDoc}
 134  
      */
 135  
     @Override
 136  
     public void close() throws IOException {
 137  150
         final boolean wasOpen = myOpen.get();
 138  150
         myOpen.set(false);
 139  
 
 140  150
         mySender.interrupt();
 141  150
         myReceiver.interrupt();
 142  
 
 143  
         try {
 144  150
             if (Thread.currentThread() != mySender) {
 145  113
                 mySender.join();
 146  
             }
 147  
         }
 148  13
         catch (final InterruptedException ie) {
 149  
             // Ignore.
 150  
         }
 151  
         finally {
 152  
             // Now that output is shutdown. Close up the socket. This
 153  
             // Triggers the receiver to close if the interrupt didn't work.
 154  150
             myOutput.close();
 155  150
             myInput.close();
 156  150
             mySocket.close();
 157  150
         }
 158  
 
 159  
         try {
 160  150
             if (Thread.currentThread() != myReceiver) {
 161  76
                 myReceiver.join();
 162  
             }
 163  
         }
 164  37
         catch (final InterruptedException ie) {
 165  
             // Ignore.
 166  113
         }
 167  
 
 168  150
         myEventSupport.firePropertyChange(OPEN_PROP_NAME, wasOpen, false);
 169  150
     }
 170  
 
 171  
     /**
 172  
      * {@inheritDoc}
 173  
      */
 174  
     @Override
 175  
     public int getPendingCount() {
 176  2
         return super.getPendingCount() + myToSendQueue.size();
 177  
     }
 178  
 
 179  
     /**
 180  
      * {@inheritDoc}
 181  
      * <p>
 182  
      * True if the send and pending queues are empty.
 183  
      * </p>
 184  
      */
 185  
     @Override
 186  
     public boolean isIdle() {
 187  10
         return super.isIdle() && myToSendQueue.isEmpty();
 188  
     }
 189  
 
 190  
     /**
 191  
      * {@inheritDoc}
 192  
      * <p>
 193  
      * Notifies the appropriate messages of the error.
 194  
      * </p>
 195  
      */
 196  
     @Override
 197  
     public void raiseErrors(final MongoDbException exception) {
 198  1
         final PendingMessage message = new PendingMessage();
 199  2
         while (myToSendQueue.poll(message)) {
 200  1
             raiseError(exception, message.getReplyCallback());
 201  
         }
 202  
 
 203  1
         super.raiseErrors(exception);
 204  1
     }
 205  
 
 206  
     /**
 207  
      * {@inheritDoc}
 208  
      */
 209  
     @Override
 210  
     public void send(final Message message1, final Message message2,
 211  
             final ReplyCallback replyCallback) throws MongoDbException {
 212  
 
 213  1
         validate(message1, message2);
 214  
 
 215  1
         if (replyCallback instanceof AddressAware) {
 216  0
             ((AddressAware) replyCallback).setAddress(myServer
 217  
                     .getCanonicalName());
 218  
         }
 219  
 
 220  
         try {
 221  1
             myToSendQueue.put(message1, null, message2, replyCallback);
 222  
         }
 223  0
         catch (final InterruptedException e) {
 224  0
             throw new MongoDbException(e);
 225  1
         }
 226  1
     }
 227  
 
 228  
     /**
 229  
      * {@inheritDoc}
 230  
      */
 231  
     @Override
 232  
     public void send(final Message message, final ReplyCallback replyCallback)
 233  
             throws MongoDbException {
 234  
 
 235  34
         validate(message, null);
 236  
 
 237  34
         if (replyCallback instanceof AddressAware) {
 238  0
             ((AddressAware) replyCallback).setAddress(myServer
 239  
                     .getCanonicalName());
 240  
         }
 241  
 
 242  
         try {
 243  34
             myToSendQueue.put(message, replyCallback);
 244  
         }
 245  0
         catch (final InterruptedException e) {
 246  0
             throw new MongoDbException(e);
 247  34
         }
 248  34
     }
 249  
 
 250  
     /**
 251  
      * Starts the connections read and write threads.
 252  
      */
 253  
     @Override
 254  
     public void start() {
 255  37
         myReceiver.start();
 256  37
         mySender.start();
 257  
 
 258  37
         if (myServer.needBuildInfo()) {
 259  0
             send(new BuildInfo(), new ServerUpdateCallback(myServer));
 260  
         }
 261  37
     }
 262  
 
 263  
     /**
 264  
      * Runnable to push data out over the MongoDB connection.
 265  
      * 
 266  
      * @copyright 2011, Allanbank Consulting, Inc., All Rights Reserved
 267  
      */
 268  37
     protected class SendRunnable implements Runnable {
 269  
 
 270  
         /** Tracks if there are messages in the buffer that need to be flushed. */
 271  37
         private boolean myNeedToFlush = false;
 272  
 
 273  
         /** The {@link PendingMessage} used for the local cached copy. */
 274  37
         private final PendingMessage myPendingMessage = new PendingMessage();
 275  
 
 276  
         /**
 277  
          * {@inheritDoc}
 278  
          * <p>
 279  
          * Overridden to pull messages off the
 280  
          * {@link TwoThreadSocketConnection#myToSendQueue} and push them into
 281  
          * the socket connection. If <code>null</code> is ever received from a
 282  
          * poll of the queue then the socket connection is flushed and blocking
 283  
          * call is made to the queue.
 284  
          * </p>
 285  
          * 
 286  
          * @see Runnable#run()
 287  
          */
 288  
         @Override
 289  
         public void run() {
 290  37
             boolean sawError = false;
 291  
             try {
 292  135
                 while (myOpen.get() && !sawError) {
 293  
                     try {
 294  98
                         sendOne();
 295  
                     }
 296  33
                     catch (final InterruptedException ie) {
 297  
                         // Handled by loop but if we have a message, need to
 298  
                         // tell him something bad happened (but we shouldn't).
 299  33
                         raiseError(ie, myPendingMessage.getReplyCallback());
 300  
                     }
 301  1
                     catch (final IOException ioe) {
 302  1
                         myLog.warn(ioe, "I/O Error sending a message.");
 303  1
                         raiseError(ioe, myPendingMessage.getReplyCallback());
 304  1
                         sawError = true;
 305  
                     }
 306  1
                     catch (final RuntimeException re) {
 307  1
                         myLog.warn(re, "Runtime error sending a message.");
 308  1
                         raiseError(re, myPendingMessage.getReplyCallback());
 309  1
                         sawError = true;
 310  
                     }
 311  1
                     catch (final Error error) {
 312  1
                         myLog.error(error, "Error sending a message.");
 313  1
                         raiseError(error, myPendingMessage.getReplyCallback());
 314  1
                         sawError = true;
 315  
                     }
 316  
                     finally {
 317  98
                         myPendingMessage.clear();
 318  98
                     }
 319  
                 }
 320  
             }
 321  
             finally {
 322  
                 // This may/will fail because we are dying.
 323  0
                 try {
 324  37
                     if (myOpen.get()) {
 325  3
                         doFlush();
 326  
                     }
 327  
                 }
 328  0
                 catch (final IOException ioe) {
 329  0
                     myLog.warn(ioe, "I/O Error on final flush of messages.");
 330  
                 }
 331  
                 finally {
 332  
                     // Make sure we get shutdown completely.
 333  37
                     IOUtils.close(TwoThreadSocketConnection.this);
 334  37
                 }
 335  37
             }
 336  37
         }
 337  
 
 338  
         /**
 339  
          * Flushes the messages in the buffer and clears the need-to-flush flag.
 340  
          * 
 341  
          * @throws IOException
 342  
          *             On a failure flushing the messages.
 343  
          */
 344  
         protected final void doFlush() throws IOException {
 345  33
             if (myNeedToFlush) {
 346  33
                 flush();
 347  33
                 myNeedToFlush = false;
 348  
             }
 349  33
         }
 350  
 
 351  
         /**
 352  
          * Sends a single message.
 353  
          * 
 354  
          * @throws InterruptedException
 355  
          *             If the thread is interrupted waiting for a message to
 356  
          *             send.
 357  
          * @throws IOException
 358  
          *             On a failure sending the message.
 359  
          */
 360  
         protected final void sendOne() throws InterruptedException, IOException {
 361  98
             boolean took = false;
 362  98
             if (myNeedToFlush) {
 363  31
                 took = myToSendQueue.poll(myPendingMessage);
 364  
             }
 365  
             else {
 366  67
                 myToSendQueue.take(myPendingMessage);
 367  34
                 took = true;
 368  
             }
 369  
 
 370  65
             if (took) {
 371  35
                 myNeedToFlush = true;
 372  
 
 373  35
                 myPendingMessage.getMessage().write(
 374  
                         myPendingMessage.getMessageId(), myBsonOut);
 375  
 
 376  32
                 send(myPendingMessage, myBsonOut.getOutput());
 377  
 
 378  
                 // We have handed the message off. Not our problem any more.
 379  
                 // We could legitimately do this before the send but in the case
 380  
                 // of an I/O error the send's exception is more meaningful then
 381  
                 // the receivers generic "Didn't get a reply".
 382  32
                 myPendingMessage.clear();
 383  
             }
 384  
             else {
 385  30
                 doFlush();
 386  
             }
 387  62
         }
 388  
     }
 389  
 }