1 /*
2 * #%L
3 * ShardedConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.connection.sharded;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26 import java.util.logging.Level;
27
28 import com.allanbank.mongodb.MongoClientConfiguration;
29 import com.allanbank.mongodb.MongoDbException;
30 import com.allanbank.mongodb.ReadPreference;
31 import com.allanbank.mongodb.bson.Document;
32 import com.allanbank.mongodb.bson.Element;
33 import com.allanbank.mongodb.bson.element.StringElement;
34 import com.allanbank.mongodb.builder.Find;
35 import com.allanbank.mongodb.client.ClusterStats;
36 import com.allanbank.mongodb.client.ClusterType;
37 import com.allanbank.mongodb.client.Message;
38 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
39 import com.allanbank.mongodb.client.connection.Connection;
40 import com.allanbank.mongodb.client.connection.ConnectionFactory;
41 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
42 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
43 import com.allanbank.mongodb.client.message.GetMore;
44 import com.allanbank.mongodb.client.message.Query;
45 import com.allanbank.mongodb.client.message.Reply;
46 import com.allanbank.mongodb.client.state.Cluster;
47 import com.allanbank.mongodb.client.state.ClusterPinger;
48 import com.allanbank.mongodb.client.state.LatencyServerSelector;
49 import com.allanbank.mongodb.client.state.Server;
50 import com.allanbank.mongodb.client.state.ServerSelector;
51 import com.allanbank.mongodb.util.IOUtils;
52 import com.allanbank.mongodb.util.log.Log;
53 import com.allanbank.mongodb.util.log.LogFactory;
54
55 /**
56 * Provides the ability to create connections to a shard configuration via
57 * mongos servers.
58 *
59 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
60 * mutated in incompatible ways between any two releases of the driver.
61 * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
62 */
63 public class ShardedConnectionFactory implements ConnectionFactory {
64
65 /** The logger for the {@link ShardedConnectionFactory}. */
66 protected static final Log LOG = LogFactory
67 .getLog(ShardedConnectionFactory.class);
68
69 /** The state of the cluster. */
70 protected final Cluster myCluster;
71
72 /** The MongoDB client configuration. */
73 protected final MongoClientConfiguration myConfig;
74
75 /** The factory to create proxied connections. */
76 protected final ProxiedConnectionFactory myConnectionFactory;
77
78 /** Pings the servers in the cluster collecting latency and tags. */
79 protected final ClusterPinger myPinger;
80
81 /** The selector for the mongos instance to use. */
82 protected final ServerSelector mySelector;
83
84 /**
85 * Creates a new {@link ShardedConnectionFactory}.
86 *
87 * @param factory
88 * The factory to create proxied connections.
89 * @param config
90 * The initial configuration.
91 */
92 public ShardedConnectionFactory(final ProxiedConnectionFactory factory,
93 final MongoClientConfiguration config) {
94 myConnectionFactory = factory;
95 myConfig = config;
96 myCluster = createCluster(config);
97 mySelector = createSelector();
98 myPinger = createClusterPinger(factory, config);
99
100 // Add all of the servers to the cluster.
101 for (final InetSocketAddress address : config.getServerAddresses()) {
102 myCluster.add(address);
103 }
104
105 bootstrap();
106 }
107
108 /**
109 * Finds the mongos servers.
110 */
111 public void bootstrap() {
112 final BootstrapState state = createBootstrapState();
113 if (!state.done()) {
114 for (final Server addr : myCluster.getServers()) {
115 Connection conn = null;
116 try {
117 // Send the request...
118 conn = myConnectionFactory.connect(addr, myConfig);
119
120 update(state, conn);
121
122 if (state.done()) {
123 break;
124 }
125 }
126 catch (final IOException ioe) {
127 LOG.warn(ioe, "I/O error during sharded bootstrap to {}.",
128 addr);
129 }
130 catch (final MongoDbException me) {
131 LOG.warn(me,
132 "MongoDB error during sharded bootstrap to {}.",
133 addr);
134 }
135 catch (final InterruptedException e) {
136 LOG.warn(e, "Interrupted during sharded bootstrap to {}.",
137 addr);
138 }
139 catch (final ExecutionException e) {
140 LOG.warn(e, "Error during sharded bootstrap to {}.", addr);
141 }
142 finally {
143 IOUtils.close(conn, Level.WARNING,
144 "I/O error shutting down sharded bootstrap connection to "
145 + addr + ".");
146 }
147 }
148 }
149
150 // Last thing is to start the ping of servers. This will get the tags
151 // and latencies updated.
152 myPinger.initialSweep(myCluster);
153 myPinger.start();
154 }
155
156 /**
157 * {@inheritDoc}
158 * <p>
159 * Overridden to close the cluster state and the
160 * {@link ProxiedConnectionFactory}.
161 * </p>
162 */
163 @Override
164 public void close() {
165 IOUtils.close(myPinger);
166 IOUtils.close(myConnectionFactory);
167 }
168
169 /**
170 * Creates a new connection to the shared mongos servers.
171 *
172 * @see ConnectionFactory#connect()
173 */
174 @Override
175 public Connection connect() throws IOException {
176 IOException lastError = null;
177 for (final Server server : mySelector.pickServers()) {
178 try {
179 final Connection primaryConn = myConnectionFactory.connect(
180 server, myConfig);
181
182 return wrap(primaryConn, server);
183 }
184 catch (final IOException e) {
185 lastError = e;
186 }
187 }
188
189 if (lastError != null) {
190 throw lastError;
191 }
192
193 throw new IOException(
194 "Could not determine a shard server to connect to.");
195 }
196
197 /**
198 * {@inheritDoc}
199 * <p>
200 * Overridden to return the {@link Cluster}.
201 * </p>
202 */
203 @Override
204 public ClusterStats getClusterStats() {
205 return myCluster;
206 }
207
208 /**
209 * {@inheritDoc}
210 * <p>
211 * Overridden to return {@link ClusterType#SHARDED} cluster type.
212 * </p>
213 */
214 @Override
215 public ClusterType getClusterType() {
216 return ClusterType.SHARDED;
217 }
218
219 /**
220 * {@inheritDoc}
221 * <p>
222 * Overridden to return the delegates strategy but replace his state and
223 * selector with our own.
224 * </p>
225 */
226 @Override
227 public ReconnectStrategy getReconnectStrategy() {
228 final ReconnectStrategy delegates = myConnectionFactory
229 .getReconnectStrategy();
230
231 delegates.setState(myCluster);
232 delegates.setSelector(mySelector);
233 delegates.setConnectionFactory(myConnectionFactory);
234
235 return delegates;
236 }
237
238 /**
239 * Creates a new {@link BootstrapState}.
240 *
241 * @return The {@link BootstrapState} to track state of loading the cluster
242 * information.
243 */
244 protected BootstrapState createBootstrapState() {
245 return new BootstrapState(!myConfig.isAutoDiscoverServers());
246 }
247
248 /**
249 * Creates a {@link Cluster} object to track the state of the servers across
250 * the cluster.
251 *
252 * @param config
253 * The configuration for the cluster.
254 * @return The {@link Cluster} to track the servers across the cluster.
255 */
256 protected Cluster createCluster(final MongoClientConfiguration config) {
257 return new Cluster(config, ClusterType.SHARDED);
258 }
259
260 /**
261 * Creates a {@link ClusterPinger} object to periodically update the status
262 * of the servers.
263 *
264 * @param factory
265 * The factory for creating the connections to the servers.
266 * @param config
267 * The configuration for the client.
268 *
269 * @return The {@link ClusterPinger} object to periodically update the
270 * status of the servers.
271 */
272 protected ClusterPinger createClusterPinger(
273 final ProxiedConnectionFactory factory,
274 final MongoClientConfiguration config) {
275 return new ClusterPinger(myCluster, factory, config);
276 }
277
278 /**
279 * Creates a {@link ServerSelector} object to select the (presumed) optimal
280 * server to handle a request.
281 * <p>
282 * For a sharded cluster this defaults to the {@link LatencyServerSelector}.
283 * </p>
284 *
285 * @return The {@link ServerSelector} object to select the (presumed)
286 * optimal server to handle a request.
287 */
288 protected ServerSelector createSelector() {
289 return new LatencyServerSelector(myCluster, true);
290 }
291
292 /**
293 * Performs a find on the <tt>config</tt> database's <tt>mongos</tt>
294 * collection to return the id for all of the mongos servers in the cluster.
295 * <p>
296 * A single mongos entry looks like: <blockquote>
297 *
298 * <pre>
299 * <code>
300 * {
301 * "_id" : "mongos.example.com:27017",
302 * "ping" : ISODate("2011-12-05T23:54:03.122Z"),
303 * "up" : 330
304 * }
305 * </code>
306 * </pre>
307 *
308 * </blockquote>
309 *
310 * @param conn
311 * The connection to request from.
312 * @return True if the configuration servers have been determined.
313 * @throws ExecutionException
314 * On a failure to recover the response from the server.
315 * @throws InterruptedException
316 * On a failure to receive a response from the server.
317 */
318 protected boolean findMongosServers(final Connection conn)
319 throws InterruptedException, ExecutionException {
320 boolean found = false;
321
322 // Create a query to pull all of the mongos servers out of the
323 // config database.
324 Message message = new Query("config", "mongos", Find.ALL,
325 /* fields= */null, /* batchSize= */0,
326 /* limit= */0, /* numberToSkip= */0, /* tailable= */false,
327 ReadPreference.PRIMARY, /* noCursorTimeout= */false,
328 /* awaitData= */false, /* exhaust= */false, /* partial= */
329 false);
330
331 while (message != null) {
332 // Send the request...
333 final FutureReplyCallback future = new FutureReplyCallback();
334 conn.send(message, future);
335
336 // Don's send it again.
337 message = null;
338
339 // Receive the response.
340 final Reply reply = future.get();
341
342 // Validate and pull out the response information.
343 final List<Document> docs = reply.getResults();
344 for (final Document doc : docs) {
345 final Element idElem = doc.get("_id");
346 if (idElem instanceof StringElement) {
347 final StringElement id = (StringElement) idElem;
348
349 myCluster.add(id.getValue());
350 found = true;
351
352 LOG.debug("Adding shard mongos: {}", id.getValue());
353 }
354 }
355
356 // Cursor?
357 if (reply.getCursorId() != 0) {
358 // Send a GetMore.
359 message = new GetMore("config", "mongos", reply.getCursorId(),
360 0, ReadPreference.PRIMARY);
361 }
362 }
363
364 return found;
365 }
366
367 /**
368 * Returns the clusterState value.
369 *
370 * @return The clusterState value.
371 */
372 protected Cluster getCluster() {
373 return myCluster;
374 }
375
376 /**
377 * Queries for the addresses for the {@code mongos} servers via the
378 * {@link #findMongosServers(Connection)} method.
379 *
380 * @param state
381 * The state of the bootstrap to be updated.
382 * @param conn
383 * The connection to use to locate the {@code mongos} servers
384 * @throws InterruptedException
385 * On a failure to wait for the reply to the query due to the
386 * thread being interrupted.
387 * @throws ExecutionException
388 * On a failure to execute the query.
389 */
390 protected void update(final BootstrapState state, final Connection conn)
391 throws InterruptedException, ExecutionException {
392 if (state.isMongosFound() || findMongosServers(conn)) {
393 state.setMongosFound(true);
394 }
395 }
396
397 /**
398 * Wraps the connection in a shard-aware connection.
399 *
400 * @param primaryConn
401 * The primary shard connection.
402 * @param server
403 * The server the connection is connected to.
404 * @return The wrapped connection.
405 */
406 protected Connection wrap(final Connection primaryConn, final Server server) {
407 return new ShardedConnection(primaryConn, server, myCluster,
408 mySelector, myConnectionFactory, myConfig);
409 }
410
411 /**
412 * BootstrapState provides the ability to track the state of the bootstrap
413 * for the sharded cluster.
414 *
415 * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
416 */
417 protected static class BootstrapState {
418 /** Tracks if the {@code mongos} servers have been located. */
419 private boolean myMongosFound;
420
421 /**
422 * Creates a new BootstrapState.
423 *
424 * @param mongosFound
425 * Initials if we should look for the {@code mongos} servers.
426 */
427 protected BootstrapState(final boolean mongosFound) {
428 myMongosFound = mongosFound;
429 }
430
431 /**
432 * Indicates when the bootstrap is complete.
433 * <p>
434 * This method returns true if auto discovery is turned off or (if on)
435 * when all of the {@code mongos} servers have been located.
436 *
437 * @return True once the boot strap is complete.
438 */
439 public boolean done() {
440 return myMongosFound;
441 }
442
443 /**
444 * Returns true if the {@code mongos} servers have been found, false
445 * otherwise.
446 *
447 * @return True if the {@code mongos} servers have been found, false
448 * otherwise.
449 */
450 public boolean isMongosFound() {
451 return myMongosFound;
452 }
453
454 /**
455 * Sets if the the {@code mongos} servers have been found.
456 *
457 * @param mongosFound
458 * If true, the {@code mongos} servers have been found, false
459 * otherwise.
460 */
461 public void setMongosFound(final boolean mongosFound) {
462 myMongosFound = mongosFound;
463 }
464 }
465 }