| Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
| ShardedConnectionFactory |
|
| 2.0526315789473686;2.053 | ||||
| ShardedConnectionFactory$BootstrapState |
|
| 2.0526315789473686;2.053 |
| 1 | /* | |
| 2 | * #%L | |
| 3 | * ShardedConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
| 4 | * %% | |
| 5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
| 6 | * %% | |
| 7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
| 8 | * you may not use this file except in compliance with the License. | |
| 9 | * You may obtain a copy of the License at | |
| 10 | * | |
| 11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
| 12 | * | |
| 13 | * Unless required by applicable law or agreed to in writing, software | |
| 14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
| 15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 16 | * See the License for the specific language governing permissions and | |
| 17 | * limitations under the License. | |
| 18 | * #L% | |
| 19 | */ | |
| 20 | package com.allanbank.mongodb.client.connection.sharded; | |
| 21 | ||
| 22 | import java.io.IOException; | |
| 23 | import java.net.InetSocketAddress; | |
| 24 | import java.util.List; | |
| 25 | import java.util.concurrent.ExecutionException; | |
| 26 | import java.util.logging.Level; | |
| 27 | ||
| 28 | import com.allanbank.mongodb.MongoClientConfiguration; | |
| 29 | import com.allanbank.mongodb.MongoDbException; | |
| 30 | import com.allanbank.mongodb.ReadPreference; | |
| 31 | import com.allanbank.mongodb.bson.Document; | |
| 32 | import com.allanbank.mongodb.bson.Element; | |
| 33 | import com.allanbank.mongodb.bson.element.StringElement; | |
| 34 | import com.allanbank.mongodb.builder.Find; | |
| 35 | import com.allanbank.mongodb.client.ClusterStats; | |
| 36 | import com.allanbank.mongodb.client.ClusterType; | |
| 37 | import com.allanbank.mongodb.client.Message; | |
| 38 | import com.allanbank.mongodb.client.callback.FutureReplyCallback; | |
| 39 | import com.allanbank.mongodb.client.connection.Connection; | |
| 40 | import com.allanbank.mongodb.client.connection.ConnectionFactory; | |
| 41 | import com.allanbank.mongodb.client.connection.ReconnectStrategy; | |
| 42 | import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory; | |
| 43 | import com.allanbank.mongodb.client.message.GetMore; | |
| 44 | import com.allanbank.mongodb.client.message.Query; | |
| 45 | import com.allanbank.mongodb.client.message.Reply; | |
| 46 | import com.allanbank.mongodb.client.state.Cluster; | |
| 47 | import com.allanbank.mongodb.client.state.ClusterPinger; | |
| 48 | import com.allanbank.mongodb.client.state.LatencyServerSelector; | |
| 49 | import com.allanbank.mongodb.client.state.Server; | |
| 50 | import com.allanbank.mongodb.client.state.ServerSelector; | |
| 51 | import com.allanbank.mongodb.util.IOUtils; | |
| 52 | import com.allanbank.mongodb.util.log.Log; | |
| 53 | import com.allanbank.mongodb.util.log.LogFactory; | |
| 54 | ||
| 55 | /** | |
| 56 | * Provides the ability to create connections to a shard configuration via | |
| 57 | * mongos servers. | |
| 58 | * | |
| 59 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
| 60 | * mutated in incompatible ways between any two releases of the driver. | |
| 61 | * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved | |
| 62 | */ | |
| 63 | public class ShardedConnectionFactory implements ConnectionFactory { | |
| 64 | ||
| 65 | /** The logger for the {@link ShardedConnectionFactory}. */ | |
| 66 | 1 | protected static final Log LOG = LogFactory |
| 67 | .getLog(ShardedConnectionFactory.class); | |
| 68 | ||
| 69 | /** The state of the cluster. */ | |
| 70 | protected final Cluster myCluster; | |
| 71 | ||
| 72 | /** The MongoDB client configuration. */ | |
| 73 | protected final MongoClientConfiguration myConfig; | |
| 74 | ||
| 75 | /** The factory to create proxied connections. */ | |
| 76 | protected final ProxiedConnectionFactory myConnectionFactory; | |
| 77 | ||
| 78 | /** Pings the servers in the cluster collecting latency and tags. */ | |
| 79 | protected final ClusterPinger myPinger; | |
| 80 | ||
| 81 | /** The selector for the mongos instance to use. */ | |
| 82 | protected final ServerSelector mySelector; | |
| 83 | ||
| 84 | /** | |
| 85 | * Creates a new {@link ShardedConnectionFactory}. | |
| 86 | * | |
| 87 | * @param factory | |
| 88 | * The factory to create proxied connections. | |
| 89 | * @param config | |
| 90 | * The initial configuration. | |
| 91 | */ | |
| 92 | public ShardedConnectionFactory(final ProxiedConnectionFactory factory, | |
| 93 | 16 | final MongoClientConfiguration config) { |
| 94 | 16 | myConnectionFactory = factory; |
| 95 | 16 | myConfig = config; |
| 96 | 16 | myCluster = createCluster(config); |
| 97 | 16 | mySelector = createSelector(); |
| 98 | 16 | myPinger = createClusterPinger(factory, config); |
| 99 | ||
| 100 | // Add all of the servers to the cluster. | |
| 101 | 16 | for (final InetSocketAddress address : config.getServerAddresses()) { |
| 102 | 16 | myCluster.add(address); |
| 103 | 16 | } |
| 104 | ||
| 105 | 16 | bootstrap(); |
| 106 | 16 | } |
| 107 | ||
| 108 | /** | |
| 109 | * Finds the mongos servers. | |
| 110 | */ | |
| 111 | public void bootstrap() { | |
| 112 | 16 | final BootstrapState state = createBootstrapState(); |
| 113 | 16 | if (!state.done()) { |
| 114 | 14 | for (final Server addr : myCluster.getServers()) { |
| 115 | 13 | Connection conn = null; |
| 116 | try { | |
| 117 | // Send the request... | |
| 118 | 13 | conn = myConnectionFactory.connect(addr, myConfig); |
| 119 | ||
| 120 | 12 | update(state, conn); |
| 121 | ||
| 122 | 8 | if (state.done()) { |
| 123 | break; | |
| 124 | } | |
| 125 | } | |
| 126 | 1 | catch (final IOException ioe) { |
| 127 | 1 | LOG.warn(ioe, "I/O error during sharded bootstrap to {}.", |
| 128 | addr); | |
| 129 | } | |
| 130 | 2 | catch (final MongoDbException me) { |
| 131 | 2 | LOG.warn(me, |
| 132 | "MongoDB error during sharded bootstrap to {}.", | |
| 133 | addr); | |
| 134 | } | |
| 135 | 1 | catch (final InterruptedException e) { |
| 136 | 1 | LOG.warn(e, "Interrupted during sharded bootstrap to {}.", |
| 137 | addr); | |
| 138 | } | |
| 139 | 1 | catch (final ExecutionException e) { |
| 140 | 1 | LOG.warn(e, "Error during sharded bootstrap to {}.", addr); |
| 141 | } | |
| 142 | finally { | |
| 143 | 10 | IOUtils.close(conn, Level.WARNING, |
| 144 | "I/O error shutting down sharded bootstrap connection to " | |
| 145 | + addr + "."); | |
| 146 | 8 | } |
| 147 | 8 | } |
| 148 | } | |
| 149 | ||
| 150 | // Last thing is to start the ping of servers. This will get the tags | |
| 151 | // and latencies updated. | |
| 152 | 16 | myPinger.initialSweep(myCluster); |
| 153 | 16 | myPinger.start(); |
| 154 | 16 | } |
| 155 | ||
| 156 | /** | |
| 157 | * {@inheritDoc} | |
| 158 | * <p> | |
| 159 | * Overridden to close the cluster state and the | |
| 160 | * {@link ProxiedConnectionFactory}. | |
| 161 | * </p> | |
| 162 | */ | |
| 163 | @Override | |
| 164 | public void close() { | |
| 165 | 17 | IOUtils.close(myPinger); |
| 166 | 17 | IOUtils.close(myConnectionFactory); |
| 167 | 17 | } |
| 168 | ||
| 169 | /** | |
| 170 | * Creates a new connection to the shared mongos servers. | |
| 171 | * | |
| 172 | * @see ConnectionFactory#connect() | |
| 173 | */ | |
| 174 | @Override | |
| 175 | public Connection connect() throws IOException { | |
| 176 | 5 | IOException lastError = null; |
| 177 | 5 | for (final Server server : mySelector.pickServers()) { |
| 178 | try { | |
| 179 | 3 | final Connection primaryConn = myConnectionFactory.connect( |
| 180 | server, myConfig); | |
| 181 | ||
| 182 | 2 | return wrap(primaryConn, server); |
| 183 | } | |
| 184 | 1 | catch (final IOException e) { |
| 185 | 1 | lastError = e; |
| 186 | } | |
| 187 | 1 | } |
| 188 | ||
| 189 | 3 | if (lastError != null) { |
| 190 | 1 | throw lastError; |
| 191 | } | |
| 192 | ||
| 193 | 2 | throw new IOException( |
| 194 | "Could not determine a shard server to connect to."); | |
| 195 | } | |
| 196 | ||
| 197 | /** | |
| 198 | * {@inheritDoc} | |
| 199 | * <p> | |
| 200 | * Overridden to return the {@link Cluster}. | |
| 201 | * </p> | |
| 202 | */ | |
| 203 | @Override | |
| 204 | public ClusterStats getClusterStats() { | |
| 205 | 0 | return myCluster; |
| 206 | } | |
| 207 | ||
| 208 | /** | |
| 209 | * {@inheritDoc} | |
| 210 | * <p> | |
| 211 | * Overridden to return {@link ClusterType#SHARDED} cluster type. | |
| 212 | * </p> | |
| 213 | */ | |
| 214 | @Override | |
| 215 | public ClusterType getClusterType() { | |
| 216 | 2 | return ClusterType.SHARDED; |
| 217 | } | |
| 218 | ||
| 219 | /** | |
| 220 | * {@inheritDoc} | |
| 221 | * <p> | |
| 222 | * Overridden to return the delegates strategy but replace his state and | |
| 223 | * selector with our own. | |
| 224 | * </p> | |
| 225 | */ | |
| 226 | @Override | |
| 227 | public ReconnectStrategy getReconnectStrategy() { | |
| 228 | 2 | final ReconnectStrategy delegates = myConnectionFactory |
| 229 | .getReconnectStrategy(); | |
| 230 | ||
| 231 | 2 | delegates.setState(myCluster); |
| 232 | 2 | delegates.setSelector(mySelector); |
| 233 | 2 | delegates.setConnectionFactory(myConnectionFactory); |
| 234 | ||
| 235 | 2 | return delegates; |
| 236 | } | |
| 237 | ||
| 238 | /** | |
| 239 | * Creates a new {@link BootstrapState}. | |
| 240 | * | |
| 241 | * @return The {@link BootstrapState} to track state of loading the cluster | |
| 242 | * information. | |
| 243 | */ | |
| 244 | protected BootstrapState createBootstrapState() { | |
| 245 | 16 | return new BootstrapState(!myConfig.isAutoDiscoverServers()); |
| 246 | } | |
| 247 | ||
| 248 | /** | |
| 249 | * Creates a {@link Cluster} object to track the state of the servers across | |
| 250 | * the cluster. | |
| 251 | * | |
| 252 | * @param config | |
| 253 | * The configuration for the cluster. | |
| 254 | * @return The {@link Cluster} to track the servers across the cluster. | |
| 255 | */ | |
| 256 | protected Cluster createCluster(final MongoClientConfiguration config) { | |
| 257 | 16 | return new Cluster(config, ClusterType.SHARDED); |
| 258 | } | |
| 259 | ||
| 260 | /** | |
| 261 | * Creates a {@link ClusterPinger} object to periodically update the status | |
| 262 | * of the servers. | |
| 263 | * | |
| 264 | * @param factory | |
| 265 | * The factory for creating the connections to the servers. | |
| 266 | * @param config | |
| 267 | * The configuration for the client. | |
| 268 | * | |
| 269 | * @return The {@link ClusterPinger} object to periodically update the | |
| 270 | * status of the servers. | |
| 271 | */ | |
| 272 | protected ClusterPinger createClusterPinger( | |
| 273 | final ProxiedConnectionFactory factory, | |
| 274 | final MongoClientConfiguration config) { | |
| 275 | 16 | return new ClusterPinger(myCluster, factory, config); |
| 276 | } | |
| 277 | ||
| 278 | /** | |
| 279 | * Creates a {@link ServerSelector} object to select the (presumed) optimal | |
| 280 | * server to handle a request. | |
| 281 | * <p> | |
| 282 | * For a sharded cluster this defaults to the {@link LatencyServerSelector}. | |
| 283 | * </p> | |
| 284 | * | |
| 285 | * @return The {@link ServerSelector} object to select the (presumed) | |
| 286 | * optimal server to handle a request. | |
| 287 | */ | |
| 288 | protected ServerSelector createSelector() { | |
| 289 | 16 | return new LatencyServerSelector(myCluster, true); |
| 290 | } | |
| 291 | ||
| 292 | /** | |
| 293 | * Performs a find on the <tt>config</tt> database's <tt>mongos</tt> | |
| 294 | * collection to return the id for all of the mongos servers in the cluster. | |
| 295 | * <p> | |
| 296 | * A single mongos entry looks like: <blockquote> | |
| 297 | * | |
| 298 | * <pre> | |
| 299 | * <code> | |
| 300 | * { | |
| 301 | * "_id" : "mongos.example.com:27017", | |
| 302 | * "ping" : ISODate("2011-12-05T23:54:03.122Z"), | |
| 303 | * "up" : 330 | |
| 304 | * } | |
| 305 | * </code> | |
| 306 | * </pre> | |
| 307 | * | |
| 308 | * </blockquote> | |
| 309 | * | |
| 310 | * @param conn | |
| 311 | * The connection to request from. | |
| 312 | * @return True if the configuration servers have been determined. | |
| 313 | * @throws ExecutionException | |
| 314 | * On a failure to recover the response from the server. | |
| 315 | * @throws InterruptedException | |
| 316 | * On a failure to receive a response from the server. | |
| 317 | */ | |
| 318 | protected boolean findMongosServers(final Connection conn) | |
| 319 | throws InterruptedException, ExecutionException { | |
| 320 | 12 | boolean found = false; |
| 321 | ||
| 322 | // Create a query to pull all of the mongos servers out of the | |
| 323 | // config database. | |
| 324 | 12 | Message message = new Query("config", "mongos", Find.ALL, |
| 325 | /* fields= */null, /* batchSize= */0, | |
| 326 | /* limit= */0, /* numberToSkip= */0, /* tailable= */false, | |
| 327 | ReadPreference.PRIMARY, /* noCursorTimeout= */false, | |
| 328 | /* awaitData= */false, /* exhaust= */false, /* partial= */ | |
| 329 | false); | |
| 330 | ||
| 331 | 20 | while (message != null) { |
| 332 | // Send the request... | |
| 333 | 12 | final FutureReplyCallback future = new FutureReplyCallback(); |
| 334 | 12 | conn.send(message, future); |
| 335 | ||
| 336 | // Don's send it again. | |
| 337 | 10 | message = null; |
| 338 | ||
| 339 | // Receive the response. | |
| 340 | 10 | final Reply reply = future.get(); |
| 341 | ||
| 342 | // Validate and pull out the response information. | |
| 343 | 8 | final List<Document> docs = reply.getResults(); |
| 344 | 8 | for (final Document doc : docs) { |
| 345 | 11 | final Element idElem = doc.get("_id"); |
| 346 | 11 | if (idElem instanceof StringElement) { |
| 347 | 9 | final StringElement id = (StringElement) idElem; |
| 348 | ||
| 349 | 9 | myCluster.add(id.getValue()); |
| 350 | 9 | found = true; |
| 351 | ||
| 352 | 9 | LOG.debug("Adding shard mongos: {}", id.getValue()); |
| 353 | } | |
| 354 | 11 | } |
| 355 | ||
| 356 | // Cursor? | |
| 357 | 8 | if (reply.getCursorId() != 0) { |
| 358 | // Send a GetMore. | |
| 359 | 0 | message = new GetMore("config", "mongos", reply.getCursorId(), |
| 360 | 0, ReadPreference.PRIMARY); | |
| 361 | } | |
| 362 | 8 | } |
| 363 | ||
| 364 | 8 | return found; |
| 365 | } | |
| 366 | ||
| 367 | /** | |
| 368 | * Returns the clusterState value. | |
| 369 | * | |
| 370 | * @return The clusterState value. | |
| 371 | */ | |
| 372 | protected Cluster getCluster() { | |
| 373 | 3 | return myCluster; |
| 374 | } | |
| 375 | ||
| 376 | /** | |
| 377 | * Queries for the addresses for the {@code mongos} servers via the | |
| 378 | * {@link #findMongosServers(Connection)} method. | |
| 379 | * | |
| 380 | * @param state | |
| 381 | * The state of the bootstrap to be updated. | |
| 382 | * @param conn | |
| 383 | * The connection to use to locate the {@code mongos} servers | |
| 384 | * @throws InterruptedException | |
| 385 | * On a failure to wait for the reply to the query due to the | |
| 386 | * thread being interrupted. | |
| 387 | * @throws ExecutionException | |
| 388 | * On a failure to execute the query. | |
| 389 | */ | |
| 390 | protected void update(final BootstrapState state, final Connection conn) | |
| 391 | throws InterruptedException, ExecutionException { | |
| 392 | 12 | if (state.isMongosFound() || findMongosServers(conn)) { |
| 393 | 5 | state.setMongosFound(true); |
| 394 | } | |
| 395 | 8 | } |
| 396 | ||
| 397 | /** | |
| 398 | * Wraps the connection in a shard-aware connection. | |
| 399 | * | |
| 400 | * @param primaryConn | |
| 401 | * The primary shard connection. | |
| 402 | * @param server | |
| 403 | * The server the connection is connected to. | |
| 404 | * @return The wrapped connection. | |
| 405 | */ | |
| 406 | protected Connection wrap(final Connection primaryConn, final Server server) { | |
| 407 | 2 | return new ShardedConnection(primaryConn, server, myCluster, |
| 408 | mySelector, myConnectionFactory, myConfig); | |
| 409 | } | |
| 410 | ||
| 411 | /** | |
| 412 | * BootstrapState provides the ability to track the state of the bootstrap | |
| 413 | * for the sharded cluster. | |
| 414 | * | |
| 415 | * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved | |
| 416 | */ | |
| 417 | protected static class BootstrapState { | |
| 418 | /** Tracks if the {@code mongos} servers have been located. */ | |
| 419 | private boolean myMongosFound; | |
| 420 | ||
| 421 | /** | |
| 422 | * Creates a new BootstrapState. | |
| 423 | * | |
| 424 | * @param mongosFound | |
| 425 | * Initials if we should look for the {@code mongos} servers. | |
| 426 | */ | |
| 427 | 16 | protected BootstrapState(final boolean mongosFound) { |
| 428 | 16 | myMongosFound = mongosFound; |
| 429 | 16 | } |
| 430 | ||
| 431 | /** | |
| 432 | * Indicates when the bootstrap is complete. | |
| 433 | * <p> | |
| 434 | * This method returns true if auto discovery is turned off or (if on) | |
| 435 | * when all of the {@code mongos} servers have been located. | |
| 436 | * | |
| 437 | * @return True once the boot strap is complete. | |
| 438 | */ | |
| 439 | public boolean done() { | |
| 440 | 24 | return myMongosFound; |
| 441 | } | |
| 442 | ||
| 443 | /** | |
| 444 | * Returns true if the {@code mongos} servers have been found, false | |
| 445 | * otherwise. | |
| 446 | * | |
| 447 | * @return True if the {@code mongos} servers have been found, false | |
| 448 | * otherwise. | |
| 449 | */ | |
| 450 | public boolean isMongosFound() { | |
| 451 | 12 | return myMongosFound; |
| 452 | } | |
| 453 | ||
| 454 | /** | |
| 455 | * Sets if the the {@code mongos} servers have been found. | |
| 456 | * | |
| 457 | * @param mongosFound | |
| 458 | * If true, the {@code mongos} servers have been found, false | |
| 459 | * otherwise. | |
| 460 | */ | |
| 461 | public void setMongosFound(final boolean mongosFound) { | |
| 462 | 5 | myMongosFound = mongosFound; |
| 463 | 5 | } |
| 464 | } | |
| 465 | } |