Coverage Report - com.allanbank.mongodb.client.ClientImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
ClientImpl
90%
168/186
93%
86/92
3.792
ClientImpl$ConnectionListener
100%
6/6
100%
4/4
3.792
 
 1  
 /*
 2  
  * #%L
 3  
  * ClientImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client;
 21  
 
 22  
 import java.beans.PropertyChangeEvent;
 23  
 import java.beans.PropertyChangeListener;
 24  
 import java.io.Closeable;
 25  
 import java.io.IOException;
 26  
 import java.lang.reflect.Constructor;
 27  
 import java.util.ArrayList;
 28  
 import java.util.List;
 29  
 import java.util.concurrent.BlockingQueue;
 30  
 import java.util.concurrent.CopyOnWriteArrayList;
 31  
 import java.util.concurrent.LinkedBlockingQueue;
 32  
 import java.util.concurrent.TimeUnit;
 33  
 import java.util.concurrent.atomic.AtomicLong;
 34  
 
 35  
 import com.allanbank.mongodb.Durability;
 36  
 import com.allanbank.mongodb.MongoClientConfiguration;
 37  
 import com.allanbank.mongodb.MongoCursorControl;
 38  
 import com.allanbank.mongodb.MongoDbException;
 39  
 import com.allanbank.mongodb.MongoIterator;
 40  
 import com.allanbank.mongodb.ReadPreference;
 41  
 import com.allanbank.mongodb.StreamCallback;
 42  
 import com.allanbank.mongodb.bson.Document;
 43  
 import com.allanbank.mongodb.bson.DocumentAssignable;
 44  
 import com.allanbank.mongodb.bson.NumericElement;
 45  
 import com.allanbank.mongodb.bson.element.StringElement;
 46  
 import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
 47  
 import com.allanbank.mongodb.client.connection.Connection;
 48  
 import com.allanbank.mongodb.client.connection.ConnectionFactory;
 49  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 50  
 import com.allanbank.mongodb.client.connection.bootstrap.BootstrapConnectionFactory;
 51  
 import com.allanbank.mongodb.client.state.Cluster;
 52  
 import com.allanbank.mongodb.error.CannotConnectException;
 53  
 import com.allanbank.mongodb.error.ConnectionLostException;
 54  
 import com.allanbank.mongodb.util.IOUtils;
 55  
 import com.allanbank.mongodb.util.log.Log;
 56  
 import com.allanbank.mongodb.util.log.LogFactory;
 57  
 
 58  
 /**
 59  
  * Implementation of the internal {@link Client} interface which all requests to
 60  
  * the MongoDB servers pass.
 61  
  * 
 62  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 63  
  *         mutated in incompatible ways between any two releases of the driver.
 64  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 65  
  */
 66  
 public class ClientImpl extends AbstractClient {
 67  
 
 68  
     /** The logger for the {@link ClientImpl}. */
 69  1
     protected static final Log LOG = LogFactory.getLog(ClientImpl.class);
 70  
 
 71  
     /**
 72  
      * Resolves the bootstrap connection factory to use.
 73  
      * 
 74  
      * @param config
 75  
      *            The client's configuration.
 76  
      * @return The connection factory for connecting to the cluster.
 77  
      */
 78  
     protected static ConnectionFactory resolveBootstrap(
 79  
             final MongoClientConfiguration config) {
 80  
         ConnectionFactory result;
 81  
         try {
 82  
             final String name = "com.allanbank.mongodb.extensions.bootstrap.ExtensionsBootstrapConnectionFactory";
 83  12
             final Class<?> clazz = Class.forName(name);
 84  0
             final Constructor<?> constructor = clazz
 85  
                     .getConstructor(MongoClientConfiguration.class);
 86  
 
 87  0
             result = (ConnectionFactory) constructor.newInstance(config);
 88  
         }
 89  
         // Too many exceptions.
 90  0
         catch (final RuntimeException e) {
 91  0
             throw e;
 92  
         }
 93  12
         catch (final Exception e) {
 94  12
             result = new BootstrapConnectionFactory(config);
 95  0
         }
 96  
 
 97  12
         return result;
 98  
     }
 99  
 
 100  
     /** Counter for the number of reconnects currently being attempted. */
 101  
     private int myActiveReconnects;
 102  
 
 103  
     /** The configuration for interacting with MongoDB. */
 104  
     private final MongoClientConfiguration myConfig;
 105  
 
 106  
     /** Factory for creating connections to MongoDB. */
 107  
     private final ConnectionFactory myConnectionFactory;
 108  
 
 109  
     /** The listener for changes to the state of connections. */
 110  
     private final PropertyChangeListener myConnectionListener;
 111  
 
 112  
     /** The set of open connections. */
 113  
     private final List<Connection> myConnections;
 114  
 
 115  
     /** The set of open connections. */
 116  
     private final BlockingQueue<Connection> myConnectionsToClose;
 117  
 
 118  
     /** The sequence of the connection that was last used. */
 119  62
     private final AtomicLong myNextConnectionSequence = new AtomicLong(0);
 120  
 
 121  
     /**
 122  
      * Create a new ClientImpl.
 123  
      * 
 124  
      * @param config
 125  
      *            The configuration for interacting with MongoDB.
 126  
      */
 127  
     public ClientImpl(final MongoClientConfiguration config) {
 128  12
         this(config, resolveBootstrap(config));
 129  12
     }
 130  
 
 131  
     /**
 132  
      * Create a new ClientImpl.
 133  
      * 
 134  
      * @param config
 135  
      *            The configuration for interacting with MongoDB.
 136  
      * @param connectionFactory
 137  
      *            The source of connection for the client.
 138  
      */
 139  
     public ClientImpl(final MongoClientConfiguration config,
 140  62
             final ConnectionFactory connectionFactory) {
 141  62
         myConfig = config;
 142  62
         myConnectionFactory = connectionFactory;
 143  62
         myConnections = new CopyOnWriteArrayList<Connection>();
 144  62
         myConnectionsToClose = new LinkedBlockingQueue<Connection>();
 145  62
         myConnectionListener = new ConnectionListener();
 146  62
         myActiveReconnects = 0;
 147  62
     }
 148  
 
 149  
     /**
 150  
      * {@inheritDoc}
 151  
      * <p>
 152  
      * Overridden to close all of the open connections.
 153  
      * </p>
 154  
      * 
 155  
      * @see Closeable#close()
 156  
      */
 157  
     @Override
 158  
     public void close() {
 159  
         // Stop any more messages.
 160  10
         super.close();
 161  
 
 162  14
         while (!myConnections.isEmpty()) {
 163  
             try {
 164  4
                 final Connection conn = myConnections.remove(0);
 165  4
                 myConnectionsToClose.add(conn);
 166  4
                 conn.shutdown(false);
 167  
             }
 168  0
             catch (final ArrayIndexOutOfBoundsException aiob) {
 169  
                 // There is a race between the isEmpty() and the remove we can't
 170  
                 // avoid. Next check if isEmpty() will bounce us out of the
 171  
                 // loop.
 172  0
                 aiob.getCause(); // Shhhh - PMD.
 173  4
             }
 174  
         }
 175  
 
 176  
         // Work off the connections to close until they are all closed.
 177  10
         final List<Connection> conns = new ArrayList<Connection>(
 178  
                 myConnectionsToClose);
 179  10
         for (final Connection conn : conns) {
 180  4
             conn.waitForClosed(myConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
 181  4
             if (conn.isOpen()) {
 182  
                 // Force the connection to close.
 183  3
                 close(conn);
 184  
             }
 185  4
         }
 186  
 
 187  
         // Shutdown the connections factory.
 188  10
         IOUtils.close(myConnectionFactory);
 189  10
     }
 190  
 
 191  
     /**
 192  
      * {@inheritDoc}
 193  
      * <p>
 194  
      * Overridden to return the {@link Cluster}.
 195  
      * </p>
 196  
      */
 197  
     @Override
 198  
     public ClusterStats getClusterStats() {
 199  0
         return myConnectionFactory.getClusterStats();
 200  
     }
 201  
 
 202  
     /**
 203  
      * {@inheritDoc}
 204  
      * <p>
 205  
      * Overridden to return the {@link ClusterType} of the
 206  
      * {@link ConnectionFactory}.
 207  
      * </p>
 208  
      */
 209  
     @Override
 210  
     public ClusterType getClusterType() {
 211  2
         return myConnectionFactory.getClusterType();
 212  
     }
 213  
 
 214  
     /**
 215  
      * {@inheritDoc}
 216  
      * <p>
 217  
      * Overridden to return the configuration used when the client was
 218  
      * constructed.
 219  
      * </p>
 220  
      */
 221  
     @Override
 222  
     public MongoClientConfiguration getConfig() {
 223  6
         return myConfig;
 224  
     }
 225  
 
 226  
     /**
 227  
      * Returns the current number of open connections.
 228  
      * 
 229  
      * @return The current number of open connections.
 230  
      */
 231  
     public int getConnectionCount() {
 232  2
         return myConnections.size();
 233  
     }
 234  
 
 235  
     /**
 236  
      * {@inheritDoc}
 237  
      * <p>
 238  
      * Overridden to return the configurations default durability.
 239  
      * </p>
 240  
      * 
 241  
      * @see Client#getDefaultDurability()
 242  
      */
 243  
     @Override
 244  
     public Durability getDefaultDurability() {
 245  4
         return myConfig.getDefaultDurability();
 246  
     }
 247  
 
 248  
     /**
 249  
      * {@inheritDoc}
 250  
      * <p>
 251  
      * Overridden to return the configurations default read preference.
 252  
      * </p>
 253  
      * 
 254  
      * @see Client#getDefaultReadPreference()
 255  
      */
 256  
     @Override
 257  
     public ReadPreference getDefaultReadPreference() {
 258  1
         return myConfig.getDefaultReadPreference();
 259  
     }
 260  
 
 261  
     /**
 262  
      * Returns true if the document looks like a cursor restart document. e.g.,
 263  
      * one that is created by {@link MongoIteratorImpl#asDocument()}.
 264  
      * 
 265  
      * @param doc
 266  
      *            The potential cursor document.
 267  
      * @return True if the document looks like it was created by
 268  
      *         {@link MongoIteratorImpl#asDocument()}.
 269  
      */
 270  
     public boolean isCursorDocument(final Document doc) {
 271  48
         return (doc.getElements().size() == 5)
 272  
                 && (doc.get(StringElement.class,
 273  
                         MongoCursorControl.NAME_SPACE_FIELD) != null)
 274  
                 && (doc.get(NumericElement.class,
 275  
                         MongoCursorControl.CURSOR_ID_FIELD) != null)
 276  
                 && (doc.get(StringElement.class,
 277  
                         MongoCursorControl.SERVER_FIELD) != null)
 278  
                 && (doc.get(NumericElement.class,
 279  
                         MongoCursorControl.BATCH_SIZE_FIELD) != null)
 280  
                 && (doc.get(NumericElement.class,
 281  
                         MongoCursorControl.LIMIT_FIELD) != null);
 282  
     }
 283  
 
 284  
     /**
 285  
      * {@inheritDoc}
 286  
      */
 287  
     @Override
 288  
     public MongoIterator<Document> restart(
 289  
             final DocumentAssignable cursorDocument)
 290  
             throws IllegalArgumentException {
 291  24
         final Document cursorDoc = cursorDocument.asDocument();
 292  
 
 293  24
         if (isCursorDocument(cursorDoc)) {
 294  2
             final MongoIteratorImpl iter = new MongoIteratorImpl(cursorDoc,
 295  
                     this);
 296  2
             iter.restart();
 297  
 
 298  2
             return iter;
 299  
         }
 300  
 
 301  22
         throw new IllegalArgumentException(
 302  
                 "Cannot restart without a well formed cursor document: "
 303  
                         + cursorDoc);
 304  
     }
 305  
 
 306  
     /**
 307  
      * {@inheritDoc}
 308  
      */
 309  
     @Override
 310  
     public MongoCursorControl restart(final StreamCallback<Document> results,
 311  
             final DocumentAssignable cursorDocument)
 312  
             throws IllegalArgumentException {
 313  24
         final Document cursorDoc = cursorDocument.asDocument();
 314  
 
 315  24
         if (isCursorDocument(cursorDoc)) {
 316  2
             final CursorStreamingCallback cb = new CursorStreamingCallback(
 317  
                     this, cursorDoc, results);
 318  2
             cb.restart();
 319  
 
 320  2
             return cb;
 321  
         }
 322  22
         throw new IllegalArgumentException(
 323  
                 "Cannot restart without a well formed cursor document: "
 324  
                         + cursorDoc);
 325  
     }
 326  
 
 327  
     /**
 328  
      * {@inheritDoc}
 329  
      * <p>
 330  
      * Tries to locate a connection that can quickly dispatch the message to a
 331  
      * MongoDB server. The basic metrics for determining if a connection is idle
 332  
      * is to look at the number of messages waiting to be sent. The basic logic
 333  
      * for finding a connection is:
 334  
      * <ol>
 335  
      * <li>Look at the current connection and the next connection. If either is
 336  
      * idle, use it.</li>
 337  
      * <li>If there are no idle connections determine the maximum number of
 338  
      * allowed connections and if there are fewer that the maximum allowed then
 339  
      * take the connection creation lock, create a new connection, use it, and
 340  
      * add to the set of available connections and release the lock.</li>
 341  
      * <li>Neither of the above works then increment the connection index and
 342  
      * use the previous or next connection based on which has the fewest pending
 343  
      * connections.</li>
 344  
      * <ol>
 345  
      */
 346  
     @Override
 347  
     protected Connection findConnection(final Message message1,
 348  
             final Message message2) throws MongoDbException {
 349  
         // Make sure we shrink connections when the max changes.
 350  61
         final int limit = Math.max(1, myConfig.getMaxConnectionCount());
 351  61
         if (limit < myConnections.size()) {
 352  2
             synchronized (myConnectionFactory) {
 353  
                 // Mark the connections as persona non grata.
 354  4
                 while (limit < myConnections.size()) {
 355  
                     try {
 356  2
                         final Connection conn = myConnections.remove(0);
 357  2
                         myConnectionsToClose.add(conn);
 358  2
                         conn.shutdown(false);
 359  
                     }
 360  0
                     catch (final ArrayIndexOutOfBoundsException aiob) {
 361  
                         // Race between the size() and remove(0).
 362  
                         // Next loop should resolve.
 363  0
                         aiob.getCause(); // Shhhh - PMD.
 364  2
                     }
 365  
                 }
 366  2
             }
 367  
         }
 368  
 
 369  
         // Locate a connection to use.
 370  61
         final Connection conn = searchConnection(message1, message2, true);
 371  
 
 372  61
         if (conn == null) {
 373  4
             throw new CannotConnectException(
 374  
                     "Could not create a connection to the server.");
 375  
         }
 376  
 
 377  57
         return conn;
 378  
     }
 379  
 
 380  
     /**
 381  
      * Tries to reconnect previously open {@link Connection}s. If a connection
 382  
      * was being closed then cleans up the remaining state.
 383  
      * 
 384  
      * @param connection
 385  
      *            The connection that was closed.
 386  
      */
 387  
     protected void handleConnectionClosed(final Connection connection) {
 388  
         // Look for the connection in the "active" set first.
 389  9
         if (myConnections.contains(connection)) {
 390  
             // Is it a graceful shutdown?
 391  6
             if (connection.isShuttingDown() && myConnections.remove(connection)) {
 392  
 
 393  1
                 if (myConnections.size() < myConfig.getMinConnectionCount()) {
 394  0
                     LOG.debug(
 395  
                             "MongoDB Connection closed: {}. Will try to reconnect.",
 396  
                             connection);
 397  0
                     reconnect(connection);
 398  
                 }
 399  
                 else {
 400  1
                     LOG.info("MongoDB Connection closed: {}", connection);
 401  1
                     connection
 402  
                             .removePropertyChangeListener(myConnectionListener);
 403  1
                     connection.raiseErrors(new ConnectionLostException(
 404  
                             "Connection shutdown."));
 405  
                 }
 406  
             }
 407  
             else {
 408  
                 // Attempt a reconnect.
 409  5
                 LOG.info("Unexpected MongoDB Connection closed: " + connection
 410  
                         + ". Will try to reconnect.");
 411  5
                 reconnect(connection);
 412  
             }
 413  
         }
 414  3
         else if (myConnectionsToClose.remove(connection)) {
 415  2
             LOG.debug("MongoDB Connection closed: {}", connection);
 416  2
             connection.removePropertyChangeListener(myConnectionListener);
 417  
         }
 418  
         else {
 419  1
             LOG.info("Unknown MongoDB Connection closed: {}", connection);
 420  1
             connection.removePropertyChangeListener(myConnectionListener);
 421  
         }
 422  9
     }
 423  
 
 424  
     /**
 425  
      * Runs the reconnect logic for the connection.
 426  
      * 
 427  
      * @param connection
 428  
      *            The connection to reconnect.
 429  
      */
 430  
     protected void reconnect(final Connection connection) {
 431  5
         final ReconnectStrategy strategy = myConnectionFactory
 432  
                 .getReconnectStrategy();
 433  
 
 434  
         try {
 435  5
             synchronized (this) {
 436  5
                 myActiveReconnects += 1;
 437  5
             }
 438  
 
 439  5
             final Connection newConnection = strategy.reconnect(connection);
 440  5
             if (newConnection != null) {
 441  
                 // Get the new connection in the rotation.
 442  3
                 myConnections.add(newConnection);
 443  3
                 newConnection.addPropertyChangeListener(myConnectionListener);
 444  
             }
 445  
         }
 446  
         finally {
 447  5
             myConnections.remove(connection);
 448  5
             connection.removePropertyChangeListener(myConnectionListener);
 449  
 
 450  
             // Raise errors for all of the pending messages - there is no way to
 451  
             // know their state of flight between here and the server.
 452  5
             final MongoDbException exception = new ConnectionLostException(
 453  
                     "Connection lost to MongoDB: " + connection);
 454  5
             connection.raiseErrors(exception);
 455  
 
 456  5
             synchronized (this) {
 457  5
                 myActiveReconnects -= 1;
 458  5
                 notifyAll();
 459  5
             }
 460  5
         }
 461  5
     }
 462  
 
 463  
     /**
 464  
      * Searches for a connection to use.
 465  
      * <p>
 466  
      * Tries to locate a connection that can quickly dispatch the message to a
 467  
      * MongoDB server. The basic metrics for determining if a connection is idle
 468  
      * is to look at the number of messages waiting to be sent. The basic logic
 469  
      * for finding a connection is:
 470  
      * <ol>
 471  
      * <li>Look at the current connection and the next connection. If either is
 472  
      * idle, use it.</li>
 473  
      * <li>If there are no idle connections determine the maximum number of
 474  
      * allowed connections and if there are fewer that the maximum allowed then
 475  
      * take the connection creation lock, create a new connection, use it, and
 476  
      * add to the set of available connections and release the lock.</li>
 477  
      * <li>Neither of the above works then increment the connection index and
 478  
      * use the previous or next connection based on which has the fewest pending
 479  
      * connections.</li>
 480  
      * <ol>
 481  
      * 
 482  
      * @param message1
 483  
      *            The first message that will be sent. The connection return
 484  
      *            should be compatible with all of the messages
 485  
      *            {@link ReadPreference}.
 486  
      * @param message2
 487  
      *            The second message that will be sent. The connection return
 488  
      *            should be compatible with all of the messages
 489  
      *            {@link ReadPreference}. May be <code>null</code>.
 490  
      * @param waitForReconnect
 491  
      *            If true then the search will block while there is an active
 492  
      *            reconnect attempt.
 493  
      * 
 494  
      * @return The {@link Connection} to send a message on.
 495  
      * @throws MongoDbException
 496  
      *             In the case of an error finding a {@link Connection}.
 497  
      */
 498  
     protected Connection searchConnection(final Message message1,
 499  
             final Message message2, final boolean waitForReconnect)
 500  
             throws MongoDbException {
 501  
         // Locate a connection to use.
 502  63
         Connection conn = findIdleConnection();
 503  63
         if (conn == null) {
 504  59
             conn = tryCreateConnection();
 505  59
             if (conn == null) {
 506  12
                 conn = findMostIdleConnection();
 507  12
                 if ((conn == null) && waitForReconnect) {
 508  5
                     conn = waitForReconnect(message1, message2);
 509  
                 }
 510  
             }
 511  
         }
 512  
 
 513  63
         return conn;
 514  
     }
 515  
 
 516  
     /**
 517  
      * Silently closes the connection.
 518  
      * 
 519  
      * @param conn
 520  
      *            The connection to close.
 521  
      */
 522  
     private void close(final Connection conn) {
 523  
         try {
 524  3
             conn.close();
 525  
         }
 526  1
         catch (final IOException ioe) {
 527  1
             LOG.warn(ioe, "Error closing connection to MongoDB: {}", conn);
 528  
         }
 529  
         finally {
 530  3
             myConnections.remove(conn);
 531  3
             myConnectionsToClose.remove(conn);
 532  
 
 533  3
             conn.removePropertyChangeListener(myConnectionListener);
 534  3
         }
 535  3
     }
 536  
 
 537  
     /**
 538  
      * Tries to find an idle connection to use from the current and next
 539  
      * connection..
 540  
      * 
 541  
      * @return The idle connection, if found.
 542  
      */
 543  
     private Connection findIdleConnection() {
 544  63
         if (!myConnections.isEmpty()) {
 545  
             // Only get() here to try and reuse idle connections.
 546  27
             final long connSequence = myNextConnectionSequence.get();
 547  59
             for (int loop = 0; loop < Math.min(2, myConnections.size()); ++loop) {
 548  
 
 549  
                 // Cast to a long to make sure the Math.abs() works for
 550  
                 // Integer.MIN_VALUE
 551  36
                 final long sequence = Math.abs(connSequence + loop);
 552  36
                 final int size = myConnections.size();
 553  36
                 final int index = (int) (sequence % size);
 554  
                 try {
 555  36
                     final Connection conn = myConnections.get(index);
 556  36
                     if (conn.isAvailable() && (conn.getPendingCount() == 0)) {
 557  4
                         return conn;
 558  
                     }
 559  
                 }
 560  0
                 catch (final ArrayIndexOutOfBoundsException aiob) {
 561  
                     // Race between the size and get and someone closing a
 562  
                     // connection. Next loop should fix.
 563  0
                     aiob.getCause(); // Shhh - PMD.
 564  32
                 }
 565  
             }
 566  
         }
 567  
 
 568  59
         return null;
 569  
     }
 570  
 
 571  
     /**
 572  
      * Locates the most idle connection to use from the current and next
 573  
      * connection.
 574  
      * 
 575  
      * @return The most idle connection.
 576  
      */
 577  
     private Connection findMostIdleConnection() {
 578  12
         if (!myConnections.isEmpty()) {
 579  10
             final long next = (myConnections.size() <= 1) ? 1
 580  
                     : myNextConnectionSequence.incrementAndGet();
 581  10
             final long previous = next - 1;
 582  
 
 583  10
             Connection previousConn = null;
 584  10
             Connection nextConn = null;
 585  20
             while ((previousConn == null) || (nextConn == null)) {
 586  
                 try {
 587  10
                     final int size = myConnections.size();
 588  10
                     previousConn = myConnections.get((int) (previous % size));
 589  10
                     nextConn = myConnections.get((int) (next % size));
 590  
                 }
 591  0
                 catch (final ArrayIndexOutOfBoundsException aiob) {
 592  
                     // Race between the size and get.
 593  
                     // Next loop should fix.
 594  0
                     aiob.getCause(); // Shhh - PMD.
 595  10
                 }
 596  
             }
 597  
 
 598  10
             if (previousConn == nextConn) {
 599  6
                 if (previousConn.isAvailable()) {
 600  3
                     return previousConn;
 601  
                 }
 602  
             }
 603  4
             else if (previousConn.isAvailable()) {
 604  3
                 if (nextConn.isAvailable()) {
 605  3
                     if (previousConn.getPendingCount() < nextConn
 606  
                             .getPendingCount()) {
 607  1
                         return previousConn;
 608  
                     }
 609  2
                     return nextConn;
 610  
                 }
 611  
             }
 612  1
             else if (nextConn.isAvailable()) {
 613  0
                 return nextConn;
 614  
             }
 615  
         }
 616  
 
 617  6
         return null;
 618  
     }
 619  
 
 620  
     /**
 621  
      * Tries to create a new connection.
 622  
      * 
 623  
      * @return The created connection or null if a connection could not be
 624  
      *         created by policy or error.
 625  
      */
 626  
     private Connection tryCreateConnection() {
 627  59
         if (myConnections.size() < myConfig.getMaxConnectionCount()) {
 628  49
             synchronized (myConnectionFactory) {
 629  49
                 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
 630  49
                 if (myConnections.size() < limit) {
 631  
                     try {
 632  49
                         final Connection conn = myConnectionFactory.connect();
 633  
 
 634  47
                         myConnections.add(conn);
 635  
 
 636  
                         // Add a listener for if the connection is closed.
 637  47
                         conn.addPropertyChangeListener(myConnectionListener);
 638  
 
 639  47
                         return conn;
 640  
                     }
 641  2
                     catch (final IOException ioe) {
 642  2
                         LOG.warn(ioe, "Could not create a connection.");
 643  
                     }
 644  
                 }
 645  2
             }
 646  
         }
 647  
 
 648  12
         return null;
 649  
     }
 650  
 
 651  
     /**
 652  
      * Checks if there is an active reconnect attempt on-going. If so waits for
 653  
      * it to finish (with a timeout) and then searches for a connection again.
 654  
      * 
 655  
      * @param message1
 656  
      *            The first message that will be sent. The connection return
 657  
      *            should be compatible with all of the messages
 658  
      *            {@link ReadPreference}.
 659  
      * @param message2
 660  
      *            The second message that will be sent. The connection return
 661  
      *            should be compatible with all of the messages
 662  
      *            {@link ReadPreference}. May be <code>null</code>.
 663  
      * @return The connection found after waiting or <code>null</code> if there
 664  
      *         was no active reconnect or there was still no connection.
 665  
      */
 666  
     private Connection waitForReconnect(final Message message1,
 667  
             final Message message2) {
 668  5
         Connection conn = null;
 669  5
         boolean wasReconnecting = false;
 670  5
         synchronized (this) {
 671  5
             wasReconnecting = (0 < myActiveReconnects);
 672  5
             if (wasReconnecting) {
 673  2
                 long now = System.currentTimeMillis();
 674  2
                 final long deadline = (myConfig.getReconnectTimeout() <= 0) ? Long.MAX_VALUE
 675  
                         : now + myConfig.getReconnectTimeout();
 676  
 
 677  4
                 while ((now < deadline) && (0 < myActiveReconnects)) {
 678  
                     try {
 679  2
                         LOG.debug("Waiting for reconnect to MongoDB.");
 680  2
                         wait(deadline - now);
 681  
 
 682  2
                         now = System.currentTimeMillis();
 683  
                     }
 684  0
                     catch (final InterruptedException e) {
 685  
                         // Ignored - Handled by the loop.
 686  2
                     }
 687  
                 }
 688  
             }
 689  5
         }
 690  
 
 691  5
         if (wasReconnecting) {
 692  
             // Look again now that we may have reconnected.
 693  2
             conn = searchConnection(message1, message2, false);
 694  
         }
 695  5
         return conn;
 696  
     }
 697  
 
 698  
     /**
 699  
      * ConnectionListener provides the call back for events occurring on a
 700  
      * connection.
 701  
      * 
 702  
      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
 703  
      */
 704  
     protected class ConnectionListener implements PropertyChangeListener {
 705  
 
 706  
         /**
 707  
          * Creates a new ConnectionListener.
 708  
          */
 709  62
         public ConnectionListener() {
 710  62
             super();
 711  62
         }
 712  
 
 713  
         /**
 714  
          * {@inheritDoc}
 715  
          * <p>
 716  
          * Overridden to try reconnecting a connection that has closed.
 717  
          * </p>
 718  
          */
 719  
         @Override
 720  
         public void propertyChange(final PropertyChangeEvent event) {
 721  6
             if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
 722  
                     && Boolean.FALSE.equals(event.getNewValue())) {
 723  3
                 handleConnectionClosed((Connection) event.getSource());
 724  
             }
 725  6
         }
 726  
     }
 727  
 }