View Javadoc
1   /*
2    * #%L
3    * ShardedConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.connection.sharded;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.List;
25  import java.util.concurrent.ExecutionException;
26  import java.util.logging.Level;
27  
28  import com.allanbank.mongodb.MongoClientConfiguration;
29  import com.allanbank.mongodb.MongoDbException;
30  import com.allanbank.mongodb.ReadPreference;
31  import com.allanbank.mongodb.bson.Document;
32  import com.allanbank.mongodb.bson.Element;
33  import com.allanbank.mongodb.bson.element.StringElement;
34  import com.allanbank.mongodb.builder.Find;
35  import com.allanbank.mongodb.client.ClusterStats;
36  import com.allanbank.mongodb.client.ClusterType;
37  import com.allanbank.mongodb.client.Message;
38  import com.allanbank.mongodb.client.callback.FutureReplyCallback;
39  import com.allanbank.mongodb.client.connection.Connection;
40  import com.allanbank.mongodb.client.connection.ConnectionFactory;
41  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
42  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
43  import com.allanbank.mongodb.client.message.GetMore;
44  import com.allanbank.mongodb.client.message.Query;
45  import com.allanbank.mongodb.client.message.Reply;
46  import com.allanbank.mongodb.client.state.Cluster;
47  import com.allanbank.mongodb.client.state.ClusterPinger;
48  import com.allanbank.mongodb.client.state.LatencyServerSelector;
49  import com.allanbank.mongodb.client.state.Server;
50  import com.allanbank.mongodb.client.state.ServerSelector;
51  import com.allanbank.mongodb.util.IOUtils;
52  import com.allanbank.mongodb.util.log.Log;
53  import com.allanbank.mongodb.util.log.LogFactory;
54  
55  /**
56   * Provides the ability to create connections to a shard configuration via
57   * mongos servers.
58   * 
59   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
60   *         mutated in incompatible ways between any two releases of the driver.
61   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
62   */
63  public class ShardedConnectionFactory implements ConnectionFactory {
64  
65      /** The logger for the {@link ShardedConnectionFactory}. */
66      protected static final Log LOG = LogFactory
67              .getLog(ShardedConnectionFactory.class);
68  
69      /** The state of the cluster. */
70      protected final Cluster myCluster;
71  
72      /** The MongoDB client configuration. */
73      protected final MongoClientConfiguration myConfig;
74  
75      /** The factory to create proxied connections. */
76      protected final ProxiedConnectionFactory myConnectionFactory;
77  
78      /** Pings the servers in the cluster collecting latency and tags. */
79      protected final ClusterPinger myPinger;
80  
81      /** The selector for the mongos instance to use. */
82      protected final ServerSelector mySelector;
83  
84      /**
85       * Creates a new {@link ShardedConnectionFactory}.
86       * 
87       * @param factory
88       *            The factory to create proxied connections.
89       * @param config
90       *            The initial configuration.
91       */
92      public ShardedConnectionFactory(final ProxiedConnectionFactory factory,
93              final MongoClientConfiguration config) {
94          myConnectionFactory = factory;
95          myConfig = config;
96          myCluster = createCluster(config);
97          mySelector = createSelector();
98          myPinger = createClusterPinger(factory, config);
99  
100         // Add all of the servers to the cluster.
101         for (final InetSocketAddress address : config.getServerAddresses()) {
102             myCluster.add(address);
103         }
104 
105         bootstrap();
106     }
107 
108     /**
109      * Finds the mongos servers.
110      */
111     public void bootstrap() {
112         final BootstrapState state = createBootstrapState();
113         if (!state.done()) {
114             for (final Server addr : myCluster.getServers()) {
115                 Connection conn = null;
116                 try {
117                     // Send the request...
118                     conn = myConnectionFactory.connect(addr, myConfig);
119 
120                     update(state, conn);
121 
122                     if (state.done()) {
123                         break;
124                     }
125                 }
126                 catch (final IOException ioe) {
127                     LOG.warn(ioe, "I/O error during sharded bootstrap to {}.",
128                             addr);
129                 }
130                 catch (final MongoDbException me) {
131                     LOG.warn(me,
132                             "MongoDB error during sharded bootstrap to {}.",
133                             addr);
134                 }
135                 catch (final InterruptedException e) {
136                     LOG.warn(e, "Interrupted during sharded bootstrap to {}.",
137                             addr);
138                 }
139                 catch (final ExecutionException e) {
140                     LOG.warn(e, "Error during sharded bootstrap to {}.", addr);
141                 }
142                 finally {
143                     IOUtils.close(conn, Level.WARNING,
144                             "I/O error shutting down sharded bootstrap connection to "
145                                     + addr + ".");
146                 }
147             }
148         }
149 
150         // Last thing is to start the ping of servers. This will get the tags
151         // and latencies updated.
152         myPinger.initialSweep(myCluster);
153         myPinger.start();
154     }
155 
156     /**
157      * {@inheritDoc}
158      * <p>
159      * Overridden to close the cluster state and the
160      * {@link ProxiedConnectionFactory}.
161      * </p>
162      */
163     @Override
164     public void close() {
165         IOUtils.close(myPinger);
166         IOUtils.close(myConnectionFactory);
167     }
168 
169     /**
170      * Creates a new connection to the shared mongos servers.
171      * 
172      * @see ConnectionFactory#connect()
173      */
174     @Override
175     public Connection connect() throws IOException {
176         IOException lastError = null;
177         for (final Server server : mySelector.pickServers()) {
178             try {
179                 final Connection primaryConn = myConnectionFactory.connect(
180                         server, myConfig);
181 
182                 return wrap(primaryConn, server);
183             }
184             catch (final IOException e) {
185                 lastError = e;
186             }
187         }
188 
189         if (lastError != null) {
190             throw lastError;
191         }
192 
193         throw new IOException(
194                 "Could not determine a shard server to connect to.");
195     }
196 
197     /**
198      * {@inheritDoc}
199      * <p>
200      * Overridden to return the {@link Cluster}.
201      * </p>
202      */
203     @Override
204     public ClusterStats getClusterStats() {
205         return myCluster;
206     }
207 
208     /**
209      * {@inheritDoc}
210      * <p>
211      * Overridden to return {@link ClusterType#SHARDED} cluster type.
212      * </p>
213      */
214     @Override
215     public ClusterType getClusterType() {
216         return ClusterType.SHARDED;
217     }
218 
219     /**
220      * {@inheritDoc}
221      * <p>
222      * Overridden to return the delegates strategy but replace his state and
223      * selector with our own.
224      * </p>
225      */
226     @Override
227     public ReconnectStrategy getReconnectStrategy() {
228         final ReconnectStrategy delegates = myConnectionFactory
229                 .getReconnectStrategy();
230 
231         delegates.setState(myCluster);
232         delegates.setSelector(mySelector);
233         delegates.setConnectionFactory(myConnectionFactory);
234 
235         return delegates;
236     }
237 
238     /**
239      * Creates a new {@link BootstrapState}.
240      * 
241      * @return The {@link BootstrapState} to track state of loading the cluster
242      *         information.
243      */
244     protected BootstrapState createBootstrapState() {
245         return new BootstrapState(!myConfig.isAutoDiscoverServers());
246     }
247 
248     /**
249      * Creates a {@link Cluster} object to track the state of the servers across
250      * the cluster.
251      * 
252      * @param config
253      *            The configuration for the cluster.
254      * @return The {@link Cluster} to track the servers across the cluster.
255      */
256     protected Cluster createCluster(final MongoClientConfiguration config) {
257         return new Cluster(config, ClusterType.SHARDED);
258     }
259 
260     /**
261      * Creates a {@link ClusterPinger} object to periodically update the status
262      * of the servers.
263      * 
264      * @param factory
265      *            The factory for creating the connections to the servers.
266      * @param config
267      *            The configuration for the client.
268      * 
269      * @return The {@link ClusterPinger} object to periodically update the
270      *         status of the servers.
271      */
272     protected ClusterPinger createClusterPinger(
273             final ProxiedConnectionFactory factory,
274             final MongoClientConfiguration config) {
275         return new ClusterPinger(myCluster, factory, config);
276     }
277 
278     /**
279      * Creates a {@link ServerSelector} object to select the (presumed) optimal
280      * server to handle a request.
281      * <p>
282      * For a sharded cluster this defaults to the {@link LatencyServerSelector}.
283      * </p>
284      * 
285      * @return The {@link ServerSelector} object to select the (presumed)
286      *         optimal server to handle a request.
287      */
288     protected ServerSelector createSelector() {
289         return new LatencyServerSelector(myCluster, true);
290     }
291 
292     /**
293      * Performs a find on the <tt>config</tt> database's <tt>mongos</tt>
294      * collection to return the id for all of the mongos servers in the cluster.
295      * <p>
296      * A single mongos entry looks like: <blockquote>
297      * 
298      * <pre>
299      * <code>
300      * {
301      *     "_id" : "mongos.example.com:27017",
302      *     "ping" : ISODate("2011-12-05T23:54:03.122Z"),
303      *     "up" : 330
304      * }
305      * </code>
306      * </pre>
307      * 
308      * </blockquote>
309      * 
310      * @param conn
311      *            The connection to request from.
312      * @return True if the configuration servers have been determined.
313      * @throws ExecutionException
314      *             On a failure to recover the response from the server.
315      * @throws InterruptedException
316      *             On a failure to receive a response from the server.
317      */
318     protected boolean findMongosServers(final Connection conn)
319             throws InterruptedException, ExecutionException {
320         boolean found = false;
321 
322         // Create a query to pull all of the mongos servers out of the
323         // config database.
324         Message message = new Query("config", "mongos", Find.ALL,
325         /* fields= */null, /* batchSize= */0,
326         /* limit= */0, /* numberToSkip= */0, /* tailable= */false,
327                 ReadPreference.PRIMARY, /* noCursorTimeout= */false,
328                 /* awaitData= */false, /* exhaust= */false, /* partial= */
329                 false);
330 
331         while (message != null) {
332             // Send the request...
333             final FutureReplyCallback future = new FutureReplyCallback();
334             conn.send(message, future);
335 
336             // Don's send it again.
337             message = null;
338 
339             // Receive the response.
340             final Reply reply = future.get();
341 
342             // Validate and pull out the response information.
343             final List<Document> docs = reply.getResults();
344             for (final Document doc : docs) {
345                 final Element idElem = doc.get("_id");
346                 if (idElem instanceof StringElement) {
347                     final StringElement id = (StringElement) idElem;
348 
349                     myCluster.add(id.getValue());
350                     found = true;
351 
352                     LOG.debug("Adding shard mongos: {}", id.getValue());
353                 }
354             }
355 
356             // Cursor?
357             if (reply.getCursorId() != 0) {
358                 // Send a GetMore.
359                 message = new GetMore("config", "mongos", reply.getCursorId(),
360                         0, ReadPreference.PRIMARY);
361             }
362         }
363 
364         return found;
365     }
366 
367     /**
368      * Returns the clusterState value.
369      * 
370      * @return The clusterState value.
371      */
372     protected Cluster getCluster() {
373         return myCluster;
374     }
375 
376     /**
377      * Queries for the addresses for the {@code mongos} servers via the
378      * {@link #findMongosServers(Connection)} method.
379      * 
380      * @param state
381      *            The state of the bootstrap to be updated.
382      * @param conn
383      *            The connection to use to locate the {@code mongos} servers
384      * @throws InterruptedException
385      *             On a failure to wait for the reply to the query due to the
386      *             thread being interrupted.
387      * @throws ExecutionException
388      *             On a failure to execute the query.
389      */
390     protected void update(final BootstrapState state, final Connection conn)
391             throws InterruptedException, ExecutionException {
392         if (state.isMongosFound() || findMongosServers(conn)) {
393             state.setMongosFound(true);
394         }
395     }
396 
397     /**
398      * Wraps the connection in a shard-aware connection.
399      * 
400      * @param primaryConn
401      *            The primary shard connection.
402      * @param server
403      *            The server the connection is connected to.
404      * @return The wrapped connection.
405      */
406     protected Connection wrap(final Connection primaryConn, final Server server) {
407         return new ShardedConnection(primaryConn, server, myCluster,
408                 mySelector, myConnectionFactory, myConfig);
409     }
410 
411     /**
412      * BootstrapState provides the ability to track the state of the bootstrap
413      * for the sharded cluster.
414      * 
415      * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
416      */
417     protected static class BootstrapState {
418         /** Tracks if the {@code mongos} servers have been located. */
419         private boolean myMongosFound;
420 
421         /**
422          * Creates a new BootstrapState.
423          * 
424          * @param mongosFound
425          *            Initials if we should look for the {@code mongos} servers.
426          */
427         protected BootstrapState(final boolean mongosFound) {
428             myMongosFound = mongosFound;
429         }
430 
431         /**
432          * Indicates when the bootstrap is complete.
433          * <p>
434          * This method returns true if auto discovery is turned off or (if on)
435          * when all of the {@code mongos} servers have been located.
436          * 
437          * @return True once the boot strap is complete.
438          */
439         public boolean done() {
440             return myMongosFound;
441         }
442 
443         /**
444          * Returns true if the {@code mongos} servers have been found, false
445          * otherwise.
446          * 
447          * @return True if the {@code mongos} servers have been found, false
448          *         otherwise.
449          */
450         public boolean isMongosFound() {
451             return myMongosFound;
452         }
453 
454         /**
455          * Sets if the the {@code mongos} servers have been found.
456          * 
457          * @param mongosFound
458          *            If true, the {@code mongos} servers have been found, false
459          *            otherwise.
460          */
461         public void setMongosFound(final boolean mongosFound) {
462             myMongosFound = mongosFound;
463         }
464     }
465 }