Coverage Report - com.allanbank.mongodb.client.connection.rs.ReplicaSetConnectionFactory
 
Classes in this File Line Coverage Branch Coverage Complexity
ReplicaSetConnectionFactory
96%
82/85
100%
20/20
3
 
 1  
 /*
 2  
  * #%L
 3  
  * ReplicaSetConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.rs;
 22  
 
 23  
 import java.io.IOException;
 24  
 import java.net.InetSocketAddress;
 25  
 import java.util.Collections;
 26  
 import java.util.List;
 27  
 import java.util.concurrent.ExecutionException;
 28  
 import java.util.logging.Level;
 29  
 
 30  
 import com.allanbank.mongodb.MongoClientConfiguration;
 31  
 import com.allanbank.mongodb.MongoDbException;
 32  
 import com.allanbank.mongodb.bson.Document;
 33  
 import com.allanbank.mongodb.bson.element.StringElement;
 34  
 import com.allanbank.mongodb.client.ClusterStats;
 35  
 import com.allanbank.mongodb.client.ClusterType;
 36  
 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
 37  
 import com.allanbank.mongodb.client.connection.Connection;
 38  
 import com.allanbank.mongodb.client.connection.ConnectionFactory;
 39  
 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
 40  
 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
 41  
 import com.allanbank.mongodb.client.message.IsMaster;
 42  
 import com.allanbank.mongodb.client.message.Reply;
 43  
 import com.allanbank.mongodb.client.state.Cluster;
 44  
 import com.allanbank.mongodb.client.state.ClusterPinger;
 45  
 import com.allanbank.mongodb.client.state.LatencyServerSelector;
 46  
 import com.allanbank.mongodb.client.state.Server;
 47  
 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
 48  
 import com.allanbank.mongodb.util.IOUtils;
 49  
 import com.allanbank.mongodb.util.log.Log;
 50  
 import com.allanbank.mongodb.util.log.LogFactory;
 51  
 
 52  
 /**
 53  
  * Provides the ability to create connections to a replica-set environment.
 54  
  * 
 55  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 56  
  *         mutated in incompatible ways between any two releases of the driver.
 57  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 58  
  */
 59  
 public class ReplicaSetConnectionFactory implements ConnectionFactory {
 60  
 
 61  
     /** The logger for the {@link ReplicaSetConnectionFactory}. */
 62  1
     protected static final Log LOG = LogFactory
 63  
             .getLog(ReplicaSetConnectionFactory.class);
 64  
 
 65  
     /** The factory to create proxied connections. */
 66  
     protected final ProxiedConnectionFactory myConnectionFactory;
 67  
 
 68  
     /** The state of the cluster. */
 69  
     private final Cluster myCluster;
 70  
 
 71  
     /** The MongoDB client configuration. */
 72  
     private final MongoClientConfiguration myConfig;
 73  
 
 74  
     /** Pings the servers in the cluster collecting latency and tags. */
 75  
     private final ClusterPinger myPinger;
 76  
 
 77  
     /** The strategy for reconnecting/finding the primary. */
 78  
     private final ReplicaSetReconnectStrategy myStrategy;
 79  
 
 80  
     /**
 81  
      * Creates a new {@link ReplicaSetConnectionFactory}.
 82  
      * 
 83  
      * @param factory
 84  
      *            The factory to create proxied connections.
 85  
      * @param config
 86  
      *            The MongoDB client configuration.
 87  
      */
 88  
     public ReplicaSetConnectionFactory(final ProxiedConnectionFactory factory,
 89  23
             final MongoClientConfiguration config) {
 90  23
         myConnectionFactory = factory;
 91  23
         myConfig = config;
 92  23
         myCluster = new Cluster(config, ClusterType.REPLICA_SET);
 93  23
         myPinger = new ClusterPinger(myCluster, factory, config);
 94  
 
 95  23
         myStrategy = new ReplicaSetReconnectStrategy();
 96  23
         myStrategy.setConfig(myConfig);
 97  23
         myStrategy.setConnectionFactory(myConnectionFactory);
 98  23
         myStrategy.setState(myCluster);
 99  23
         myStrategy.setSelector(new LatencyServerSelector(myCluster, false));
 100  
 
 101  
         // Bootstrap the state off of one of the servers.
 102  23
         bootstrap();
 103  23
     }
 104  
 
 105  
     /**
 106  
      * Finds the primary member of the replica set.
 107  
      */
 108  
     public void bootstrap() {
 109  
         // To fill in the list of servers.
 110  23
         locatePrimary();
 111  
 
 112  
         // Last thing is to start the ping of servers. This will
 113  
         // locate the primary, and get the tags and latencies updated.
 114  23
         myPinger.initialSweep(myCluster);
 115  23
         myPinger.start();
 116  23
     }
 117  
 
 118  
     /**
 119  
      * {@inheritDoc}
 120  
      * <p>
 121  
      * Overridden to close the cluster state and the
 122  
      * {@link ProxiedConnectionFactory}.
 123  
      * </p>
 124  
      */
 125  
     @Override
 126  
     public void close() {
 127  24
         IOUtils.close(myPinger);
 128  24
         IOUtils.close(myConnectionFactory);
 129  24
     }
 130  
 
 131  
     /**
 132  
      * Creates a new connection to the replica set.
 133  
      * 
 134  
      * @see ConnectionFactory#connect()
 135  
      */
 136  
     @Override
 137  
     public Connection connect() throws IOException {
 138  
 
 139  
         // Try to find the primary.
 140  9
         List<Server> writableServers = myCluster.getWritableServers();
 141  44
         for (int i = 0; i < 10; ++i) {
 142  41
             servers: for (final Server primary : writableServers) {
 143  12
                 Connection primaryConn = null;
 144  
                 try {
 145  12
                     primaryConn = myConnectionFactory
 146  
                             .connect(primary, myConfig);
 147  
 
 148  11
                     if (isWritable(primary, primaryConn)) {
 149  
 
 150  6
                         final ReplicaSetConnection rsConnection = new ReplicaSetConnection(
 151  
                                 primaryConn, primary, myCluster,
 152  
                                 myConnectionFactory, myConfig, myStrategy);
 153  
 
 154  6
                         primaryConn = null;
 155  
 
 156  6
                         return rsConnection;
 157  
                     }
 158  
 
 159  
                     break servers;
 160  
                 }
 161  1
                 catch (final IOException e) {
 162  1
                     LOG.debug(e, "Error connecting to presumptive primary: {}",
 163  
                             e.getMessage());
 164  
                 }
 165  
                 finally {
 166  7
                     IOUtils.close(primaryConn);
 167  1
                 }
 168  1
             }
 169  
 
 170  
             // Update the stale state.
 171  35
             writableServers = locatePrimary();
 172  
         }
 173  
 
 174  
         // Don't throw an error here.
 175  
         // Might be doing a secondary query which means we don't need the
 176  
         // primary.
 177  3
         return new ReplicaSetConnection(null, null, myCluster,
 178  
                 myConnectionFactory, myConfig, myStrategy);
 179  
     }
 180  
 
 181  
     /**
 182  
      * {@inheritDoc}
 183  
      * <p>
 184  
      * Overridden to return the {@link Cluster}.
 185  
      * </p>
 186  
      */
 187  
     @Override
 188  
     public ClusterStats getClusterStats() {
 189  0
         return myCluster;
 190  
     }
 191  
 
 192  
     /**
 193  
      * {@inheritDoc}
 194  
      * <p>
 195  
      * Overridden to return {@link ClusterType#REPLICA_SET} cluster type.
 196  
      * </p>
 197  
      */
 198  
     @Override
 199  
     public ClusterType getClusterType() {
 200  2
         return ClusterType.REPLICA_SET;
 201  
     }
 202  
 
 203  
     /**
 204  
      * {@inheritDoc}
 205  
      * <p>
 206  
      * Overridden to return a replica set {@link ReconnectStrategy}.
 207  
      * </p>
 208  
      */
 209  
     @Override
 210  
     public ReconnectStrategy getReconnectStrategy() {
 211  6
         return myStrategy;
 212  
     }
 213  
 
 214  
     /**
 215  
      * Returns the clusterState value.
 216  
      * 
 217  
      * @return The clusterState value.
 218  
      */
 219  
     protected Cluster getCluster() {
 220  18
         return myCluster;
 221  
     }
 222  
 
 223  
     /**
 224  
      * Determines if the connection is to the primary member of the cluster.
 225  
      * 
 226  
      * @param server
 227  
      *            The server connected to.
 228  
      * @param connection
 229  
      *            The connection to test.
 230  
      * @return True if the connection is to the primary member of the
 231  
      *         cluster/replica set.
 232  
      */
 233  
     protected boolean isWritable(final Server server,
 234  
             final Connection connection) {
 235  
 
 236  
         try {
 237  11
             final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
 238  
                     server);
 239  11
             connection.send(new IsMaster(), replyCallback);
 240  
 
 241  11
             final Reply reply = replyCallback.get();
 242  9
             final List<Document> results = reply.getResults();
 243  9
             if (!results.isEmpty()) {
 244  8
                 final Document doc = results.get(0);
 245  
 
 246  
                 // Get the name of the primary server.
 247  8
                 final StringElement primaryName = doc.get(StringElement.class,
 248  
                         "primary");
 249  8
                 if (primaryName != null) {
 250  7
                     return (primaryName.getValue().equals(connection
 251  
                             .getServerName()));
 252  
                 }
 253  
             }
 254  
         }
 255  1
         catch (final InterruptedException e) {
 256  
             // Just ignore the reply.
 257  1
             LOG.debug(e, "Failure testing if a connection is writable: {}",
 258  
                     e.getMessage());
 259  
         }
 260  1
         catch (final ExecutionException e) {
 261  
             // Just ignore the reply.
 262  1
             LOG.debug(e, "Failure testing if a connection is writable: {}",
 263  
                     e.getMessage());
 264  3
         }
 265  4
         return false;
 266  
     }
 267  
 
 268  
     /**
 269  
      * Locates the primary server in the cluster.
 270  
      * 
 271  
      * @return The list of primary servers.
 272  
      */
 273  
     protected List<Server> locatePrimary() {
 274  58
         for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
 275  46
             Connection conn = null;
 276  46
             final FutureReplyCallback future = new FutureReplyCallback();
 277  
             try {
 278  46
                 final Server server = myCluster.add(addr);
 279  
 
 280  46
                 conn = myConnectionFactory.connect(server, myConfig);
 281  
 
 282  24
                 conn.send(new IsMaster(), future);
 283  
 
 284  22
                 final Reply reply = future.get();
 285  20
                 final List<Document> results = reply.getResults();
 286  20
                 if (!results.isEmpty()) {
 287  19
                     final Document doc = results.get(0);
 288  
 
 289  
                     // Replica Sets MUST connect to the primary server.
 290  
                     // See if we can add the other servers also.
 291  19
                     if (myConfig.isAutoDiscoverServers()) {
 292  
                         // Pull them all in.
 293  17
                         final List<StringElement> hosts = doc.find(
 294  
                                 StringElement.class, "hosts", ".*");
 295  17
                         for (final StringElement host : hosts) {
 296  29
                             myCluster.add(host.getValue());
 297  29
                         }
 298  
                     }
 299  
 
 300  
                     // Add and mark the primary as writable.
 301  19
                     final StringElement primary = doc.findFirst(
 302  
                             StringElement.class, "primary");
 303  19
                     if (primary != null) {
 304  17
                         return Collections.singletonList(myCluster.add(primary
 305  
                                 .getValue()));
 306  
                     }
 307  
                 }
 308  
             }
 309  22
             catch (final IOException ioe) {
 310  22
                 LOG.warn(ioe, "I/O error during replica-set bootstrap to {}.",
 311  
                         addr);
 312  
             }
 313  2
             catch (final MongoDbException me) {
 314  2
                 LOG.warn(me,
 315  
                         "MongoDB error during replica-set bootstrap to {}.",
 316  
                         addr);
 317  
             }
 318  2
             catch (final InterruptedException e) {
 319  2
                 LOG.warn(e, "Interrupted during replica-set bootstrap to {}.",
 320  
                         addr);
 321  
             }
 322  0
             catch (final ExecutionException e) {
 323  0
                 LOG.warn(e, "Error during replica-set bootstrap to {}.", addr);
 324  
             }
 325  
             finally {
 326  43
                 IOUtils.close(conn, Level.WARNING,
 327  
                         "I/O error shutting down replica-set bootstrap connection to "
 328  
                                 + addr + ".");
 329  29
             }
 330  29
         }
 331  41
         return Collections.emptyList();
 332  
     }
 333  
 }