Coverage Report - com.allanbank.mongodb.client.connection.socket.AbstractSocketConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractSocketConnection
94%
210/223
90%
68/75
2.844
AbstractSocketConnection$1
100%
1/1
N/A
2.844
 
 1  
 /*
 2  
  * #%L
 3  
  * AbstractSocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.socket;
 22  
 
 23  
 import java.beans.PropertyChangeListener;
 24  
 import java.beans.PropertyChangeSupport;
 25  
 import java.io.BufferedOutputStream;
 26  
 import java.io.EOFException;
 27  
 import java.io.IOException;
 28  
 import java.io.InputStream;
 29  
 import java.io.InterruptedIOException;
 30  
 import java.io.StreamCorruptedException;
 31  
 import java.net.InetSocketAddress;
 32  
 import java.net.Socket;
 33  
 import java.net.SocketException;
 34  
 import java.net.SocketTimeoutException;
 35  
 import java.util.concurrent.Executor;
 36  
 import java.util.concurrent.TimeUnit;
 37  
 import java.util.concurrent.atomic.AtomicBoolean;
 38  
 import java.util.concurrent.atomic.AtomicInteger;
 39  
 
 40  
 import javax.net.SocketFactory;
 41  
 
 42  
 import com.allanbank.mongodb.MongoClientConfiguration;
 43  
 import com.allanbank.mongodb.MongoDbException;
 44  
 import com.allanbank.mongodb.Version;
 45  
 import com.allanbank.mongodb.bson.io.BsonInputStream;
 46  
 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
 47  
 import com.allanbank.mongodb.bson.io.StringDecoderCache;
 48  
 import com.allanbank.mongodb.bson.io.StringEncoderCache;
 49  
 import com.allanbank.mongodb.client.Message;
 50  
 import com.allanbank.mongodb.client.Operation;
 51  
 import com.allanbank.mongodb.client.VersionRange;
 52  
 import com.allanbank.mongodb.client.callback.NoOpCallback;
 53  
 import com.allanbank.mongodb.client.callback.Receiver;
 54  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 55  
 import com.allanbank.mongodb.client.callback.ReplyHandler;
 56  
 import com.allanbank.mongodb.client.connection.Connection;
 57  
 import com.allanbank.mongodb.client.connection.SocketConnectionListener;
 58  
 import com.allanbank.mongodb.client.message.Delete;
 59  
 import com.allanbank.mongodb.client.message.GetMore;
 60  
 import com.allanbank.mongodb.client.message.Header;
 61  
 import com.allanbank.mongodb.client.message.Insert;
 62  
 import com.allanbank.mongodb.client.message.IsMaster;
 63  
 import com.allanbank.mongodb.client.message.KillCursors;
 64  
 import com.allanbank.mongodb.client.message.PendingMessage;
 65  
 import com.allanbank.mongodb.client.message.PendingMessageQueue;
 66  
 import com.allanbank.mongodb.client.message.Query;
 67  
 import com.allanbank.mongodb.client.message.Reply;
 68  
 import com.allanbank.mongodb.client.message.Update;
 69  
 import com.allanbank.mongodb.client.state.Server;
 70  
 import com.allanbank.mongodb.error.ConnectionLostException;
 71  
 import com.allanbank.mongodb.error.DocumentToLargeException;
 72  
 import com.allanbank.mongodb.error.ServerVersionException;
 73  
 import com.allanbank.mongodb.util.IOUtils;
 74  
 import com.allanbank.mongodb.util.log.Log;
 75  
 import com.allanbank.mongodb.util.log.LogFactory;
 76  
 
 77  
 /**
 78  
  * AbstractSocketConnection provides the basic functionality for a socket
 79  
  * connection that passes messages between the sender and receiver.
 80  
  * 
 81  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 82  
  *         mutated in incompatible ways between any two releases of the driver.
 83  
  * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
 84  
  */
 85  
 public abstract class AbstractSocketConnection implements Connection, Receiver {
 86  
 
 87  
     /** The length of the message header in bytes. */
 88  
     public static final int HEADER_LENGTH = 16;
 89  
 
 90  
     /** The writer for BSON documents. Shares this objects {@link #myInput}. */
 91  
     protected final BsonInputStream myBsonIn;
 92  
 
 93  
     /** The connections configuration. */
 94  
     protected final MongoClientConfiguration myConfig;
 95  
 
 96  
     /** The cache for the encoding of strings. */
 97  
     protected final StringEncoderCache myEncoderCache;
 98  
 
 99  
     /** Support for emitting property change events. */
 100  
     protected final PropertyChangeSupport myEventSupport;
 101  
 
 102  
     /** The executor for the responses. */
 103  
     protected final Executor myExecutor;
 104  
 
 105  
     /** The buffered input stream. */
 106  
     protected final InputStream myInput;
 107  
 
 108  
     /** The logger for the connection. */
 109  
     protected final Log myLog;
 110  
 
 111  
     /** Holds if the connection is open. */
 112  
     protected final AtomicBoolean myOpen;
 113  
 
 114  
     /** The buffered output stream. */
 115  
     protected final BufferedOutputStream myOutput;
 116  
 
 117  
     /** The queue of messages sent but waiting for a reply. */
 118  
     protected final PendingMessageQueue myPendingQueue;
 119  
 
 120  
     /** The open socket. */
 121  
     protected final Server myServer;
 122  
 
 123  
     /** Set to true when the connection should be gracefully closed. */
 124  
     protected final AtomicBoolean myShutdown;
 125  
 
 126  
     /** The open socket. */
 127  
     protected final Socket mySocket;
 128  
 
 129  
     /** Tracks the number of sequential read timeouts. */
 130  204
     private int myIdleTicks = 0;
 131  
 
 132  
     /** The {@link PendingMessage} used for the local cached copy. */
 133  204
     private final PendingMessage myPendingMessage = new PendingMessage();
 134  
 
 135  
     /** Set to true when the sender discovers they are the receive thread. */
 136  204
     private final AtomicInteger myReaderNeedsToFlush = new AtomicInteger(0);
 137  
 
 138  
     /**
 139  
      * Creates a new AbstractSocketConnection.
 140  
      * 
 141  
      * @param server
 142  
      *            The MongoDB server to connect to.
 143  
      * @param config
 144  
      *            The configuration for the Connection to the MongoDB server.
 145  
      * @param encoderCache
 146  
      *            Cache used for encoding strings.
 147  
      * @param decoderCache
 148  
      *            Cache used for decoding strings.
 149  
      * @throws SocketException
 150  
      *             On a failure connecting to the MongoDB server.
 151  
      * @throws IOException
 152  
      *             On a failure to read or write data to the MongoDB server.
 153  
      */
 154  
     public AbstractSocketConnection(final Server server,
 155  
             final MongoClientConfiguration config,
 156  
             final StringEncoderCache encoderCache,
 157  
             final StringDecoderCache decoderCache) throws SocketException,
 158  
             IOException {
 159  204
         super();
 160  
 
 161  204
         myServer = server;
 162  204
         myConfig = config;
 163  204
         myEncoderCache = encoderCache;
 164  
 
 165  204
         myLog = LogFactory.getLog(getClass());
 166  
 
 167  204
         myExecutor = config.getExecutor();
 168  204
         myEventSupport = new PropertyChangeSupport(this);
 169  204
         myOpen = new AtomicBoolean(false);
 170  204
         myShutdown = new AtomicBoolean(false);
 171  
 
 172  204
         mySocket = openSocket(server, config);
 173  176
         updateSocketWithOptions(config);
 174  
 
 175  173
         myOpen.set(true);
 176  
 
 177  173
         myInput = mySocket.getInputStream();
 178  173
         myBsonIn = new BsonInputStream(myInput, decoderCache);
 179  
 
 180  
         // Careful with the size of the buffer here. Seems Java likes to call
 181  
         // madvise(..., MADV_DONTNEED) for buffers over a certain size.
 182  
         // Net effect is that the performance of the system goes down the
 183  
         // drain. Some numbers using the
 184  
         // UnixDomainSocketAccepatanceTest.testMultiFetchiterator
 185  
         // 1M ==> More than a minute...
 186  
         // 512K ==> 24 seconds
 187  
         // 256K ==> 16.9 sec.
 188  
         // 128K ==> 17 sec.
 189  
         // 64K ==> 17 sec.
 190  
         // 32K ==> 16.5 sec.
 191  
         // Based on those numbers we set the buffer to 32K as larger does not
 192  
         // improve performance.
 193  173
         myOutput = new BufferedOutputStream(mySocket.getOutputStream(),
 194  
                 32 * 1024);
 195  
 
 196  173
         myPendingQueue = new PendingMessageQueue(
 197  
                 config.getMaxPendingOperationsPerConnection(),
 198  
                 config.getLockType());
 199  173
     }
 200  
 
 201  
     /**
 202  
      * {@inheritDoc}
 203  
      * <p>
 204  
      * Overridden to add the listener to this connection.
 205  
      * </p>
 206  
      */
 207  
     @Override
 208  
     public void addPropertyChangeListener(final PropertyChangeListener listener) {
 209  13
         myEventSupport.addPropertyChangeListener(listener);
 210  13
     }
 211  
 
 212  
     /**
 213  
      * {@inheritDoc}
 214  
      */
 215  
     @Override
 216  
     public void flush() throws IOException {
 217  367
         myReaderNeedsToFlush.set(0);
 218  367
         myOutput.flush();
 219  366
     }
 220  
 
 221  
     /**
 222  
      * {@inheritDoc}
 223  
      */
 224  
     @Override
 225  
     public int getPendingCount() {
 226  5
         return myPendingQueue.size();
 227  
     }
 228  
 
 229  
     /**
 230  
      * {@inheritDoc}
 231  
      * <p>
 232  
      * Overridden to returns the server's name.
 233  
      * </p>
 234  
      */
 235  
     @Override
 236  
     public String getServerName() {
 237  9
         return myServer.getCanonicalName();
 238  
     }
 239  
 
 240  
     /**
 241  
      * {@inheritDoc}
 242  
      * <p>
 243  
      * True if the connection is open and not shutting down.
 244  
      * </p>
 245  
      */
 246  
     @Override
 247  
     public boolean isAvailable() {
 248  37
         return isOpen() && !isShuttingDown();
 249  
     }
 250  
 
 251  
     /**
 252  
      * {@inheritDoc}
 253  
      * <p>
 254  
      * True if the send and pending queues are empty.
 255  
      * </p>
 256  
      */
 257  
     @Override
 258  
     public boolean isIdle() {
 259  100
         return myPendingQueue.isEmpty();
 260  
     }
 261  
 
 262  
     /**
 263  
      * {@inheritDoc}
 264  
      * <p>
 265  
      * True if the connection has not been closed.
 266  
      * </p>
 267  
      */
 268  
     @Override
 269  
     public boolean isOpen() {
 270  1001
         return myOpen.get();
 271  
     }
 272  
 
 273  
     /**
 274  
      * {@inheritDoc}
 275  
      */
 276  
     @Override
 277  
     public boolean isShuttingDown() {
 278  265
         return myShutdown.get();
 279  
     }
 280  
 
 281  
     /**
 282  
      * {@inheritDoc}
 283  
      * <p>
 284  
      * Notifies the appropriate messages of the error.
 285  
      * </p>
 286  
      */
 287  
     @Override
 288  
     public void raiseErrors(final MongoDbException exception) {
 289  3
         final PendingMessage message = new PendingMessage();
 290  
 
 291  4
         while (myPendingQueue.poll(message)) {
 292  1
             raiseError(exception, message.getReplyCallback());
 293  
         }
 294  3
     }
 295  
 
 296  
     /**
 297  
      * {@inheritDoc}
 298  
      * <p>
 299  
      * Overridden to remove the listener from this connection.
 300  
      * </p>
 301  
      */
 302  
     @Override
 303  
     public void removePropertyChangeListener(
 304  
             final PropertyChangeListener listener) {
 305  14
         myEventSupport.removePropertyChangeListener(listener);
 306  14
     }
 307  
 
 308  
     /**
 309  
      * {@inheritDoc}
 310  
      * <p>
 311  
      * Overridden to mark the socket as shutting down and tickles the sender to
 312  
      * make sure that happens as soon as possible.
 313  
      * </p>
 314  
      */
 315  
     @Override
 316  
     public void shutdown(final boolean force) {
 317  
         // Mark
 318  49
         myShutdown.set(true);
 319  
 
 320  49
         if (force) {
 321  9
             IOUtils.close(this);
 322  
         }
 323  
         else {
 324  40
             if (isOpen()) {
 325  
                 // Force a message with a callback to wake the receiver up.
 326  39
                 send(new IsMaster(), new NoOpCallback());
 327  
             }
 328  
         }
 329  49
     }
 330  
 
 331  
     /**
 332  
      * Starts the connection.
 333  
      */
 334  
     public abstract void start();
 335  
 
 336  
     /**
 337  
      * Stops the socket connection by calling {@link #shutdown(boolean)
 338  
      * shutdown(false)}.
 339  
      */
 340  
     public void stop() {
 341  2
         shutdown(false);
 342  2
     }
 343  
 
 344  
     /**
 345  
      * {@inheritDoc}
 346  
      * <p>
 347  
      * Overridden to return the socket information.
 348  
      * </p>
 349  
      */
 350  
     @Override
 351  
     public String toString() {
 352  4
         return "MongoDB(" + mySocket.getLocalPort() + "-->"
 353  
                 + mySocket.getRemoteSocketAddress() + ")";
 354  
     }
 355  
 
 356  
     /**
 357  
      * {@inheritDoc}
 358  
      * <p>
 359  
      * If there is a pending flush then flushes.
 360  
      * </p>
 361  
      * <p>
 362  
      * If there is any available data then does a single receive.
 363  
      * </p>
 364  
      */
 365  
     @Override
 366  
     public void tryReceive() {
 367  
         try {
 368  0
             doReceiverFlush();
 369  
 
 370  0
             if ((myBsonIn.available() > 0) || (myInput.available() > 0)) {
 371  0
                 doReceiveOne();
 372  
             }
 373  
         }
 374  0
         catch (final IOException error) {
 375  0
             myLog.info(
 376  
                     "Received an error when checking for pending messages: {}.",
 377  
                     error.getMessage());
 378  0
         }
 379  0
     }
 380  
 
 381  
     /**
 382  
      * {@inheritDoc}
 383  
      * <p>
 384  
      * Waits for the connections pending queues to empty.
 385  
      * </p>
 386  
      */
 387  
     @Override
 388  
     public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
 389  13
         long now = System.currentTimeMillis();
 390  13
         final long deadline = now + timeoutUnits.toMillis(timeout);
 391  
 
 392  344
         while (isOpen() && (now < deadline)) {
 393  
             try {
 394  
                 // A slow spin loop.
 395  331
                 TimeUnit.MILLISECONDS.sleep(10);
 396  
             }
 397  1
             catch (final InterruptedException e) {
 398  
                 // Ignore.
 399  1
                 e.hashCode();
 400  330
             }
 401  331
             now = System.currentTimeMillis();
 402  
         }
 403  13
     }
 404  
 
 405  
     /**
 406  
      * Receives a single message from the connection.
 407  
      * 
 408  
      * @return The {@link Message} received.
 409  
      * @throws MongoDbException
 410  
      *             On an error receiving the message.
 411  
      */
 412  
     protected Message doReceive() throws MongoDbException {
 413  
         try {
 414  
             int length;
 415  
             try {
 416  385
                 length = readIntSuppressTimeoutOnNonFirstByte();
 417  
             }
 418  13
             catch (final SocketTimeoutException ok) {
 419  
                 // This is OK. We check if we are still running and come right
 420  
                 // back.
 421  13
                 return null;
 422  228
             }
 423  
 
 424  228
             myBsonIn.prefetch(length - 4);
 425  
 
 426  220
             final int requestId = myBsonIn.readInt();
 427  220
             final int responseId = myBsonIn.readInt();
 428  220
             final int opCode = myBsonIn.readInt();
 429  
 
 430  220
             final Operation op = Operation.fromCode(opCode);
 431  220
             if (op == null) {
 432  
                 // Huh? Dazed and confused
 433  1
                 throw new MongoDbException("Unexpected operation read '"
 434  
                         + opCode + "'.");
 435  
             }
 436  
 
 437  219
             final Header header = new Header(length, requestId, responseId, op);
 438  219
             Message message = null;
 439  219
             switch (op) {
 440  
             case REPLY:
 441  212
                 message = new Reply(header, myBsonIn);
 442  212
                 break;
 443  
             case QUERY:
 444  1
                 message = new Query(header, myBsonIn);
 445  1
                 break;
 446  
             case UPDATE:
 447  1
                 message = new Update(myBsonIn);
 448  1
                 break;
 449  
             case INSERT:
 450  2
                 message = new Insert(header, myBsonIn);
 451  1
                 break;
 452  
             case GET_MORE:
 453  1
                 message = new GetMore(myBsonIn);
 454  1
                 break;
 455  
             case DELETE:
 456  1
                 message = new Delete(myBsonIn);
 457  1
                 break;
 458  
             case KILL_CURSORS:
 459  1
                 message = new KillCursors(myBsonIn);
 460  
                 break;
 461  
             }
 462  
 
 463  218
             myServer.incrementRepliesReceived();
 464  
 
 465  218
             return message;
 466  
         }
 467  
 
 468  153
         catch (final IOException ioe) {
 469  153
             final MongoDbException error = new ConnectionLostException(ioe);
 470  
 
 471  153
             shutdown(error, (ioe instanceof InterruptedIOException));
 472  
 
 473  153
             throw error;
 474  
         }
 475  
     }
 476  
 
 477  
     /**
 478  
      * Receives and process a single message.
 479  
      */
 480  
     protected void doReceiveOne() {
 481  
 
 482  385
         doReceiverFlush();
 483  
 
 484  385
         final Message received = doReceive();
 485  231
         if (received instanceof Reply) {
 486  212
             myIdleTicks = 0;
 487  212
             final Reply reply = (Reply) received;
 488  212
             final int replyId = reply.getResponseToId();
 489  212
             boolean took = false;
 490  
 
 491  
             // Keep polling the pending queue until we get to
 492  
             // message based on a matching replyId.
 493  
             try {
 494  212
                 took = myPendingQueue.poll(myPendingMessage);
 495  215
                 while (took && (myPendingMessage.getMessageId() != replyId)) {
 496  
 
 497  3
                     final MongoDbException noReply = new MongoDbException(
 498  
                             "No reply received.");
 499  
 
 500  
                     // Note that this message will not get a reply.
 501  3
                     raiseError(noReply, myPendingMessage.getReplyCallback());
 502  
 
 503  
                     // Keep looking.
 504  3
                     took = myPendingQueue.poll(myPendingMessage);
 505  3
                 }
 506  
 
 507  212
                 if (took) {
 508  
                     // Must be the pending message's reply.
 509  211
                     reply(reply, myPendingMessage);
 510  
                 }
 511  
                 else {
 512  1
                     myLog.warn("Could not find the callback for reply '{}'.",
 513  
                             +replyId);
 514  
                 }
 515  
             }
 516  
             finally {
 517  212
                 myPendingMessage.clear();
 518  212
             }
 519  212
         }
 520  19
         else if (received != null) {
 521  6
             myLog.warn("Received a non-Reply message: {}.", received);
 522  6
             shutdown(new ConnectionLostException(new StreamCorruptedException(
 523  
                     "Received a non-Reply message: " + received)), false);
 524  
         }
 525  
         else {
 526  13
             myIdleTicks += 1;
 527  
 
 528  13
             if (myConfig.getMaxIdleTickCount() <= myIdleTicks) {
 529  
                 // Shutdown the connection., nicely.
 530  1
                 shutdown(false);
 531  
             }
 532  
         }
 533  231
     }
 534  
 
 535  
     /**
 536  
      * Sends a single message to the connection.
 537  
      * 
 538  
      * @param messageId
 539  
      *            The id to use for the message.
 540  
      * @param message
 541  
      *            The message to send.
 542  
      * @throws IOException
 543  
      *             On a failure sending the message.
 544  
      */
 545  
     protected void doSend(final int messageId,
 546  
             final RandomAccessOutputStream message) throws IOException {
 547  324
         message.writeTo(myOutput);
 548  324
         message.reset();
 549  
 
 550  324
         myServer.incrementMessagesSent();
 551  324
     }
 552  
 
 553  
     /**
 554  
      * Should be called when the send of a message happens on the receive
 555  
      * thread. The sender should not flush the {@link #myOutput}. Instead the
 556  
      * receive thread will {@link #flush()} once it has consumed all of the
 557  
      * pending messages to be received.
 558  
      */
 559  
     protected void markReaderNeedsToFlush() {
 560  2
         myReaderNeedsToFlush.incrementAndGet();
 561  2
     }
 562  
 
 563  
     /**
 564  
      * Updates to raise an error on the callback, if any.
 565  
      * 
 566  
      * @param exception
 567  
      *            The thrown exception.
 568  
      * @param replyCallback
 569  
      *            The callback for the reply to the message.
 570  
      */
 571  
     protected void raiseError(final Throwable exception,
 572  
             final ReplyCallback replyCallback) {
 573  101
         ReplyHandler.raiseError(exception, replyCallback, myExecutor);
 574  101
     }
 575  
 
 576  
     /**
 577  
      * Reads a little-endian 4 byte signed integer from the stream.
 578  
      * 
 579  
      * @return The integer value.
 580  
      * @throws EOFException
 581  
      *             On insufficient data for the integer.
 582  
      * @throws IOException
 583  
      *             On a failure reading the integer.
 584  
      */
 585  
     protected int readIntSuppressTimeoutOnNonFirstByte() throws EOFException,
 586  
             IOException {
 587  385
         int read = 0;
 588  385
         int eofCheck = 0;
 589  385
         int result = 0;
 590  
 
 591  385
         read = myBsonIn.read();
 592  234
         eofCheck |= read;
 593  234
         result += (read << 0);
 594  
 
 595  933
         for (int i = Byte.SIZE; i < Integer.SIZE; i += Byte.SIZE) {
 596  
             try {
 597  700
                 read = myBsonIn.read();
 598  
             }
 599  1
             catch (final SocketTimeoutException ste) {
 600  
                 // Bad - Only the first byte should timeout.
 601  1
                 throw new IOException(ste);
 602  699
             }
 603  699
             eofCheck |= read;
 604  699
             result += (read << i);
 605  
         }
 606  
 
 607  233
         if (eofCheck < 0) {
 608  5
             throw new EOFException("Remote connection closed: "
 609  
                     + mySocket.getRemoteSocketAddress());
 610  
         }
 611  228
         return result;
 612  
     }
 613  
 
 614  
     /**
 615  
      * Updates to set the reply for the callback, if any.
 616  
      * 
 617  
      * @param reply
 618  
      *            The reply.
 619  
      * @param pendingMessage
 620  
      *            The pending message.
 621  
      */
 622  
     protected void reply(final Reply reply, final PendingMessage pendingMessage) {
 623  
 
 624  213
         final long latency = pendingMessage.latency();
 625  
 
 626  
         // Add the latency.
 627  213
         if (latency > 0) {
 628  212
             myServer.updateAverageLatency(latency);
 629  
         }
 630  
 
 631  213
         final ReplyCallback callback = pendingMessage.getReplyCallback();
 632  213
         ReplyHandler.reply(this, reply, callback, myExecutor);
 633  213
     }
 634  
 
 635  
     /**
 636  
      * Sends a single message.
 637  
      * 
 638  
      * @param pendingMessage
 639  
      *            The message to be sent.
 640  
      * @param message
 641  
      *            The message that has already been encoded/serialized. This may
 642  
      *            be <code>null</code> in which case the message is streamed to
 643  
      *            the socket.
 644  
      * @throws InterruptedException
 645  
      *             If the thread is interrupted waiting for a message to send.
 646  
      * @throws IOException
 647  
      *             On a failure sending the message.
 648  
      */
 649  
     protected final void send(final PendingMessage pendingMessage,
 650  
             final RandomAccessOutputStream message)
 651  
             throws InterruptedException, IOException {
 652  
 
 653  324
         final int messageId = pendingMessage.getMessageId();
 654  
 
 655  
         // Mark the timestamp.
 656  324
         pendingMessage.timestampNow();
 657  
 
 658  
         // Make sure the message is on the queue before the
 659  
         // message is sent to ensure the receive thread can
 660  
         // assume an empty pending queue means that there is
 661  
         // no message for the reply.
 662  324
         if ((pendingMessage.getReplyCallback() != null)
 663  
                 && !myPendingQueue.offer(pendingMessage)) {
 664  
             // Push what we have out before blocking.
 665  0
             flush();
 666  0
             myPendingQueue.put(pendingMessage);
 667  
         }
 668  
 
 669  324
         doSend(messageId, message);
 670  
 
 671  
         // If shutting down then flush after each message.
 672  324
         if (myShutdown.get()) {
 673  40
             flush();
 674  
         }
 675  323
     }
 676  
 
 677  
     /**
 678  
      * Shutsdown the connection on an error.
 679  
      * 
 680  
      * @param error
 681  
      *            The error causing the shutdown.
 682  
      * @param receiveError
 683  
      *            If true then the socket experienced a receive error.
 684  
      */
 685  
     protected void shutdown(final MongoDbException error,
 686  
             final boolean receiveError) {
 687  160
         if (receiveError) {
 688  8
             myServer.connectionTerminated();
 689  
         }
 690  
 
 691  
         // Have to assume all of the requests have failed that are pending.
 692  160
         final PendingMessage message = new PendingMessage();
 693  216
         while (myPendingQueue.poll(message)) {
 694  56
             raiseError(error, message.getReplyCallback());
 695  
         }
 696  
 
 697  160
         closeQuietly();
 698  160
     }
 699  
 
 700  
     /**
 701  
      * Updates the socket with the configuration's socket options.
 702  
      * 
 703  
      * @param config
 704  
      *            The configuration to apply.
 705  
      * @throws SocketException
 706  
      *             On a failure setting the socket options.
 707  
      */
 708  
     protected void updateSocketWithOptions(final MongoClientConfiguration config)
 709  
             throws SocketException {
 710  176
         mySocket.setKeepAlive(config.isUsingSoKeepalive());
 711  174
         mySocket.setSoTimeout(config.getReadTimeout());
 712  
         try {
 713  174
             mySocket.setTcpNoDelay(true);
 714  
         }
 715  2
         catch (final SocketException seIgnored) {
 716  
             // The junixsocket implementation does not support TCP_NO_DELAY,
 717  
             // which makes sense but it throws an exception instead of silently
 718  
             // ignoring - ignore it here.
 719  2
             if (!"AFUNIXSocketException".equals(seIgnored.getClass()
 720  
                     .getSimpleName())) {
 721  1
                 throw seIgnored;
 722  
             }
 723  172
         }
 724  173
         mySocket.setPerformancePreferences(1, 5, 6);
 725  173
     }
 726  
 
 727  
     /**
 728  
      * Ensures that the documents in the message do not exceed the maximum size
 729  
      * allowed by MongoDB.
 730  
      * 
 731  
      * @param message1
 732  
      *            The message to be sent to the server.
 733  
      * @param message2
 734  
      *            The second message to be sent to the server.
 735  
      * @throws DocumentToLargeException
 736  
      *             On a message being too large.
 737  
      * @throws ServerVersionException
 738  
      *             If one of the messages cannot be sent to the server version.
 739  
      */
 740  
     protected void validate(final Message message1, final Message message2)
 741  
             throws DocumentToLargeException, ServerVersionException {
 742  
 
 743  333
         final Version serverVersion = myServer.getVersion();
 744  333
         final int maxBsonSize = myServer.getMaxBsonObjectSize();
 745  
 
 746  333
         message1.validateSize(maxBsonSize);
 747  332
         validateVersion(message1, serverVersion);
 748  
 
 749  331
         if (message2 != null) {
 750  3
             message2.validateSize(maxBsonSize);
 751  2
             validateVersion(message1, serverVersion);
 752  
         }
 753  330
     }
 754  
 
 755  
     /**
 756  
      * Closes the connection to the server without allowing an exception to be
 757  
      * thrown.
 758  
      */
 759  
     private void closeQuietly() {
 760  
         try {
 761  160
             close();
 762  
         }
 763  0
         catch (final IOException e) {
 764  0
             myLog.warn(e, "I/O exception trying to shutdown the connection.");
 765  160
         }
 766  160
     }
 767  
 
 768  
     /**
 769  
      * Check if the handler for a message dropped data in the send buffer that
 770  
      * it did not flush to avoid a deadlock with the server. If so then flush
 771  
      * that message.
 772  
      */
 773  
     private void doReceiverFlush() {
 774  
         try {
 775  385
             final int unflushedMessages = myReaderNeedsToFlush.get();
 776  385
             if ((unflushedMessages != 0)
 777  
                     && (myPendingQueue.size() <= unflushedMessages)) {
 778  2
                 flush();
 779  
             }
 780  
         }
 781  0
         catch (final IOException ignored) {
 782  0
             myLog.warn("Error flushing data to the server: "
 783  
                     + ignored.getMessage());
 784  385
         }
 785  385
     }
 786  
 
 787  
     /**
 788  
      * Tries to open a connection to the server.
 789  
      * 
 790  
      * @param server
 791  
      *            The server to open the connection to.
 792  
      * @param config
 793  
      *            The configuration for attempting to open the connection.
 794  
      * @return The opened {@link Socket}.
 795  
      * @throws IOException
 796  
      *             On a failure opening a connection to the server.
 797  
      */
 798  
     private Socket openSocket(final Server server,
 799  
             final MongoClientConfiguration config) throws IOException {
 800  204
         final SocketFactory factory = config.getSocketFactory();
 801  
 
 802  204
         IOException last = null;
 803  204
         Socket socket = null;
 804  204
         for (final InetSocketAddress address : myServer.getAddresses()) {
 805  
             try {
 806  
 
 807  219
                 socket = factory.createSocket();
 808  218
                 socket.connect(address, config.getConnectTimeout());
 809  
 
 810  
                 // If the factory wants to know about the connection then let it
 811  
                 // know first.
 812  178
                 if (factory instanceof SocketConnectionListener) {
 813  4
                     ((SocketConnectionListener) factory).connected(address,
 814  
                             socket);
 815  
                 }
 816  
 
 817  
                 // Let the server know the working connection.
 818  176
                 server.connectionOpened(address);
 819  
 
 820  176
                 last = null;
 821  176
                 break;
 822  
             }
 823  43
             catch (final IOException error) {
 824  43
                 last = error;
 825  
                 try {
 826  43
                     if (socket != null) {
 827  42
                         socket.close();
 828  
                     }
 829  
                 }
 830  3
                 catch (final IOException ignore) {
 831  3
                     myLog.info(
 832  
                             "Could not close the defunct socket connection: {}",
 833  
                             socket);
 834  40
                 }
 835  
             }
 836  
 
 837  43
         }
 838  204
         if (last != null) {
 839  28
             server.connectFailed();
 840  28
             throw last;
 841  
         }
 842  
 
 843  176
         return socket;
 844  
     }
 845  
 
 846  
     /**
 847  
      * Validates that the server we are about to send the message to knows how
 848  
      * to handle the message.
 849  
      * 
 850  
      * @param message
 851  
      *            The message to be sent.
 852  
      * @param serverVersion
 853  
      *            The server version.
 854  
      * @throws ServerVersionException
 855  
      *             If the messages cannot be sent to the server version.
 856  
      */
 857  
     private void validateVersion(final Message message,
 858  
             final Version serverVersion) throws ServerVersionException {
 859  334
         final VersionRange range = message.getRequiredVersionRange();
 860  334
         if ((range != null) && !range.contains(serverVersion)) {
 861  1
             throw new ServerVersionException(message.getOperationName(), range,
 862  
                     serverVersion, message);
 863  
         }
 864  333
     }
 865  
 }