Coverage Report - com.allanbank.mongodb.client.connection.sharded.ShardedConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
ShardedConnection
75%
36/48
50%
11/22
3
 
 1  
 /*
 2  
  * #%L
 3  
  * ShardedConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.connection.sharded;
 22  
 
 23  
 import java.io.IOException;
 24  
 import java.util.Collections;
 25  
 import java.util.List;
 26  
 
 27  
 import com.allanbank.mongodb.MongoClientConfiguration;
 28  
 import com.allanbank.mongodb.MongoDbException;
 29  
 import com.allanbank.mongodb.ReadPreference;
 30  
 import com.allanbank.mongodb.client.Message;
 31  
 import com.allanbank.mongodb.client.connection.Connection;
 32  
 import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection;
 33  
 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
 34  
 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
 35  
 import com.allanbank.mongodb.client.state.Cluster;
 36  
 import com.allanbank.mongodb.client.state.Server;
 37  
 import com.allanbank.mongodb.client.state.ServerSelector;
 38  
 import com.allanbank.mongodb.util.log.Log;
 39  
 import com.allanbank.mongodb.util.log.LogFactory;
 40  
 
 41  
 /**
 42  
  * Provides a {@link Connection} implementation for connecting to a sharded
 43  
  * environment via mongos servers.
 44  
  * 
 45  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 46  
  *         mutated in incompatible ways between any two releases of the driver.
 47  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 48  
  */
 49  0
 public class ShardedConnection extends AbstractProxyMultipleConnection<Server> {
 50  
 
 51  
     /** The logger for the {@link ShardedConnection}. */
 52  1
     private static final Log LOG = LogFactory.getLog(ShardedConnection.class);
 53  
 
 54  
     /** The selector for the server when we need to reconnect. */
 55  
     private final ServerSelector mySelector;
 56  
 
 57  
     /**
 58  
      * Creates a new {@link ShardedConnection}.
 59  
      * 
 60  
      * @param proxiedConnection
 61  
      *            The connection being proxied.
 62  
      * @param server
 63  
      *            The primary server this connection is connected to.
 64  
      * @param cluster
 65  
      *            The state of the cluster for finding secondary connections.
 66  
      * @param selector
 67  
      *            The selector for servers when we need to reconnect.
 68  
      * @param factory
 69  
      *            The connection factory for opening secondary connections.
 70  
      * @param config
 71  
      *            The MongoDB client configuration.
 72  
      */
 73  
     public ShardedConnection(final Connection proxiedConnection,
 74  
             final Server server, final Cluster cluster,
 75  
             final ServerSelector selector,
 76  
             final ProxiedConnectionFactory factory,
 77  
             final MongoClientConfiguration config) {
 78  20
         super(proxiedConnection, server, cluster, factory, config);
 79  
 
 80  20
         mySelector = selector;
 81  20
     }
 82  
 
 83  
     /**
 84  
      * {@inheritDoc}
 85  
      * <p>
 86  
      * Overridden to return the canonical name of the primary.
 87  
      * </p>
 88  
      */
 89  
     @Override
 90  
     public String getServerName() {
 91  1
         if (myMainKey != null) {
 92  1
             return myMainKey.getCanonicalName();
 93  
         }
 94  0
         return "UNKNOWN";
 95  
     }
 96  
 
 97  
     /**
 98  
      * {@inheritDoc}
 99  
      * <p>
 100  
      * Overridden to create a connection to the server.
 101  
      * </p>
 102  
      */
 103  
     @Override
 104  
     protected Connection connect(final Server server) {
 105  2
         Connection conn = null;
 106  
         try {
 107  2
             conn = myFactory.connect(server, myConfig);
 108  
 
 109  2
             conn = cacheConnection(server, conn);
 110  
         }
 111  0
         catch (final IOException e) {
 112  0
             LOG.info(e, "Could not connect to the server '{}': {}",
 113  
                     server.getCanonicalName(), e.getMessage());
 114  2
         }
 115  2
         return conn;
 116  
     }
 117  
 
 118  
     /**
 119  
      * {@inheritDoc}
 120  
      * <p>
 121  
      * Overridden for testing access.
 122  
      * </p>
 123  
      */
 124  
     @Override
 125  
     protected Connection connection(final Server server) {
 126  1
         LOG.debug("Lookup connection for server: {}", server.getCanonicalName());
 127  1
         return super.connection(server);
 128  
     }
 129  
 
 130  
     /**
 131  
      * Locates the set of servers that can be used to send the specified
 132  
      * messages. This method will attempt to connect to the primary server if
 133  
      * there is not a current connection to the primary.
 134  
      * 
 135  
      * @param message1
 136  
      *            The first message to send.
 137  
      * @param message2
 138  
      *            The second message to send. May be <code>null</code>.
 139  
      * @return The servers that can be used.
 140  
      * @throws MongoDbException
 141  
      *             On a failure to locate a server that all messages can be sent
 142  
      *             to.
 143  
      */
 144  
     @Override
 145  
     protected List<Server> findPotentialKeys(final Message message1,
 146  
             final Message message2) throws MongoDbException {
 147  4
         List<Server> servers = resolveServerReadPreference(message1, message2);
 148  
 
 149  4
         if (servers.isEmpty()) {
 150  
             // If we get here and a reconnect is in progress then
 151  
             // block for the reconnect. Once the reconnect is complete, try
 152  
             // again.
 153  0
             if (myMainKey == null) {
 154  
                 // Wait for a reconnect.
 155  0
                 final ConnectionInfo<Server> newConnInfo = reconnectMain();
 156  0
                 if (newConnInfo != null) {
 157  0
                     updateMain(newConnInfo);
 158  0
                     servers = resolveServerReadPreference(message1, message2);
 159  
                 }
 160  
             }
 161  
 
 162  0
             if (servers.isEmpty()) {
 163  0
                 throw createReconnectFailure(message1, message2);
 164  
             }
 165  
         }
 166  
 
 167  4
         return servers;
 168  
     }
 169  
 
 170  
     /**
 171  
      * {@inheritDoc}
 172  
      * <p>
 173  
      * Overridden to return the string {@code Sharded}.
 174  
      * </p>
 175  
      */
 176  
     @Override
 177  
     protected String getConnectionType() {
 178  3
         return "Sharded";
 179  
     }
 180  
 
 181  
     /**
 182  
      * {@inheritDoc}
 183  
      * <p>
 184  
      * Overridden creates a connection back to the primary server.
 185  
      * </p>
 186  
      */
 187  
     @Override
 188  
     protected ConnectionInfo<Server> reconnectMain() {
 189  2
         for (final Server server : mySelector.pickServers()) {
 190  
             try {
 191  3
                 final Connection conn = myFactory.connect(server, myConfig);
 192  
 
 193  2
                 return new ConnectionInfo<Server>(conn, server);
 194  
             }
 195  1
             catch (final IOException ioe) {
 196  
                 // Ignored. Will return null.
 197  1
                 LOG.debug(ioe, "Could not connect to '{}': {}",
 198  
                         server.getCanonicalName(), ioe.getMessage());
 199  
             }
 200  1
         }
 201  0
         return null;
 202  
     }
 203  
 
 204  
     /**
 205  
      * Locates the set of servers that can be used to send the specified
 206  
      * messages.
 207  
      * 
 208  
      * @param message1
 209  
      *            The first message to send.
 210  
      * @param message2
 211  
      *            The second message to send. May be <code>null</code>.
 212  
      * @return The servers that can be used.
 213  
      */
 214  
     private List<Server> resolveServerReadPreference(final Message message1,
 215  
             final Message message2) {
 216  
 
 217  4
         List<Server> servers = Collections.emptyList();
 218  
 
 219  4
         final Server main = myMainKey;
 220  4
         if (main != null) {
 221  4
             servers = Collections.singletonList(main);
 222  
         }
 223  
 
 224  4
         if (message1 != null) {
 225  4
             ReadPreference pref = message1.getReadPreference();
 226  4
             if (pref.getServer() != null) {
 227  1
                 servers = Collections.singletonList(myCluster.get(pref
 228  
                         .getServer()));
 229  
             }
 230  3
             else if (message2 != null) {
 231  2
                 pref = message2.getReadPreference();
 232  2
                 if (pref.getServer() != null) {
 233  1
                     servers = Collections.singletonList(myCluster.get(pref
 234  
                             .getServer()));
 235  
                 }
 236  
             }
 237  
         }
 238  4
         return servers;
 239  
     }
 240  
 }