View Javadoc
1   /*
2    * #%L
3    * BootstrapConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.connection.bootstrap;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.List;
25  import java.util.concurrent.ExecutionException;
26  import java.util.logging.Level;
27  
28  import com.allanbank.mongodb.MongoClientConfiguration;
29  import com.allanbank.mongodb.bson.Document;
30  import com.allanbank.mongodb.bson.Element;
31  import com.allanbank.mongodb.bson.element.StringElement;
32  import com.allanbank.mongodb.client.ClusterStats;
33  import com.allanbank.mongodb.client.ClusterType;
34  import com.allanbank.mongodb.client.callback.FutureReplyCallback;
35  import com.allanbank.mongodb.client.connection.Connection;
36  import com.allanbank.mongodb.client.connection.ConnectionFactory;
37  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
38  import com.allanbank.mongodb.client.connection.auth.AuthenticationConnectionFactory;
39  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
40  import com.allanbank.mongodb.client.connection.rs.ReplicaSetConnectionFactory;
41  import com.allanbank.mongodb.client.connection.sharded.ShardedConnectionFactory;
42  import com.allanbank.mongodb.client.connection.socket.SocketConnectionFactory;
43  import com.allanbank.mongodb.client.message.IsMaster;
44  import com.allanbank.mongodb.client.message.Reply;
45  import com.allanbank.mongodb.client.state.Cluster;
46  import com.allanbank.mongodb.error.CannotConnectException;
47  import com.allanbank.mongodb.util.IOUtils;
48  import com.allanbank.mongodb.util.log.Log;
49  import com.allanbank.mongodb.util.log.LogFactory;
50  
51  /**
52   * Provides the ability to bootstrap into the appropriate
53   * {@link ConnectionFactory} based on the configuration of the server(s)
54   * connected to.
55   * 
56   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
57   *         mutated in incompatible ways between any two releases of the driver.
58   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
59   */
60  public class BootstrapConnectionFactory implements ConnectionFactory {
61  
62      /** The logger for the {@link BootstrapConnectionFactory}. */
63      protected static final Log LOG = LogFactory
64              .getLog(BootstrapConnectionFactory.class);
65  
66      /** The configuration for the connections to be created. */
67      private final MongoClientConfiguration myConfig;
68  
69      /** The delegate connection factory post */
70      private ConnectionFactory myDelegate = null;
71  
72      /**
73       * Creates a {@link BootstrapConnectionFactory}
74       * 
75       * @param config
76       *            The configuration to use in discovering the server
77       *            configuration.
78       */
79      public BootstrapConnectionFactory(final MongoClientConfiguration config) {
80          myConfig = config;
81      }
82  
83      /**
84       * {@inheritDoc}
85       * <p>
86       * Overridden to close the delegate {@link ConnectionFactory}.
87       * </p>
88       */
89      @Override
90      public void close() {
91          IOUtils.close(myDelegate);
92      }
93  
94      /**
95       * {@inheritDoc}
96       * <p>
97       * Delegates the connection to the setup delegate.
98       * </p>
99       */
100     @Override
101     public Connection connect() throws IOException {
102         return getDelegate().connect();
103     }
104 
105     /**
106      * {@inheritDoc}
107      * <p>
108      * Overridden to return the cluster stats of the proxied
109      * {@link ConnectionFactory}.
110      * </p>
111      */
112     @Override
113     public ClusterStats getClusterStats() {
114         return getDelegate().getClusterStats();
115     }
116 
117     /**
118      * {@inheritDoc}
119      * <p>
120      * Overridden to return the cluster type of the delegate
121      * {@link ConnectionFactory}.
122      * </p>
123      */
124     @Override
125     public ClusterType getClusterType() {
126         return getDelegate().getClusterType();
127     }
128 
129     /**
130      * {@inheritDoc}
131      * <p>
132      * Overridden to return the delegates strategy.
133      * </p>
134      */
135     @Override
136     public ReconnectStrategy getReconnectStrategy() {
137         return getDelegate().getReconnectStrategy();
138     }
139 
140     /**
141      * Re-bootstraps the environment. Normally this method is only called once
142      * during the constructor of the factory to initialize the delegate but
143      * users can reset the delegate by manually invoking this method.
144      * <p>
145      * A bootstrap will issue one commands to the first working MongoDB process.
146      * The reply to the {@link IsMaster} command is used to detect connecting to
147      * a mongos <tt>process</tt> and by extension a Sharded configuration.
148      * </p>
149      * <p>
150      * If not using a Sharded configuration then the server status is checked
151      * for a <tt>repl</tt> element. If present a Replication Set configuration
152      * is assumed.
153      * </p>
154      * <p>
155      * If neither a Sharded or Replication Set is being used then a plain socket
156      * connection factory is used.
157      * </p>
158      */
159     protected void bootstrap() {
160         final SocketConnectionFactory socketFactory = new SocketConnectionFactory(
161                 myConfig);
162         ProxiedConnectionFactory factory = socketFactory;
163 
164         // Authentication has to be right on top of the physical
165         // connection.
166         if (myConfig.isAuthenticating()) {
167             factory = new AuthenticationConnectionFactory(factory, myConfig);
168         }
169 
170         try {
171             // Use the socket factories cluster.
172             final Cluster cluster = socketFactory.getCluster();
173             for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
174                 Connection conn = null;
175                 final FutureReplyCallback future = new FutureReplyCallback();
176                 try {
177                     conn = factory.connect(cluster.add(addr), myConfig);
178 
179                     conn.send(new IsMaster(), future);
180                     final Reply reply = future.get();
181 
182                     // Close the connection now that we have the reply.
183                     IOUtils.close(conn);
184 
185                     final List<Document> results = reply.getResults();
186                     if (!results.isEmpty()) {
187                         final Document doc = results.get(0);
188 
189                         if (isMongos(doc)) {
190                             LOG.debug("Sharded bootstrap to {}.", addr);
191                             cluster.clear(); // not needed.
192                             myDelegate = bootstrapSharded(factory);
193                         }
194                         else if (isReplicationSet(doc)) {
195                             LOG.debug("Replica-set bootstrap to {}.", addr);
196                             cluster.clear(); // not needed.
197                             myDelegate = bootstrapReplicaSet(factory);
198                         }
199                         else {
200                             LOG.debug("Simple MongoDB bootstrap to {}.", addr);
201                             myDelegate = factory;
202                         }
203                         factory = null; // Don't close.
204                         return;
205                     }
206                 }
207                 catch (final IOException ioe) {
208                     LOG.warn(ioe, "I/O error during bootstrap to {}.", addr);
209                 }
210                 catch (final InterruptedException e) {
211                     LOG.warn(e, "Interrupted during bootstrap to {}.", addr);
212                 }
213                 catch (final ExecutionException e) {
214                     LOG.warn(e, "Error during bootstrap to {}.", addr);
215                 }
216                 finally {
217                     IOUtils.close(conn, Level.WARNING,
218                             "I/O error shutting down bootstrap connection to "
219                                     + addr + ".");
220                 }
221             }
222         }
223         finally {
224             IOUtils.close(factory);
225         }
226     }
227 
228     /**
229      * Initializes the factory for connecting to the replica set.
230      * 
231      * @param factory
232      *            The factory for connecting to the servers directly.
233      * @return The connection factory for connecting to the replica set.
234      */
235     protected ConnectionFactory bootstrapReplicaSet(
236             final ProxiedConnectionFactory factory) {
237         return new ReplicaSetConnectionFactory(factory, getConfig());
238     }
239 
240     /**
241      * Initializes the factory for connecting to the sharded cluster.
242      * 
243      * @param factory
244      *            The factory for connecting to the servers directly.
245      * @return The connection factory for connecting to the sharded cluster.
246      */
247     protected ConnectionFactory bootstrapSharded(
248             final ProxiedConnectionFactory factory) {
249         return new ShardedConnectionFactory(factory, getConfig());
250     }
251 
252     /**
253      * The configuration for the client.
254      * 
255      * @return The configuration for the client.
256      */
257     protected MongoClientConfiguration getConfig() {
258         return myConfig;
259     }
260 
261     /**
262      * Returns the underlying delegate factory.
263      * 
264      * @return The underlying delegate factory.
265      */
266     protected ConnectionFactory getDelegate() {
267         if (myDelegate == null) {
268             return createDelegate();
269         }
270         return myDelegate;
271     }
272 
273     /**
274      * Sets the underlying delegate factory.
275      * 
276      * @param delegate
277      *            The underlying delegate factory.
278      */
279     protected void setDelegate(final ConnectionFactory delegate) {
280         myDelegate = delegate;
281     }
282 
283     /**
284      * Creates the delegate connection factory.
285      * 
286      * @return The delegate connection factory.
287      */
288     private synchronized ConnectionFactory createDelegate() {
289         if (myDelegate == null) {
290             bootstrap();
291             if (myDelegate == null) {
292                 LOG.warn("Could not bootstrap a connection to the MongoDB servers.");
293                 throw new CannotConnectException(
294                         "Could not bootstrap a connection to the MongoDB servers.");
295             }
296         }
297         return myDelegate;
298     }
299 
300     /**
301      * Returns true if the document contains a "process" element that is a
302      * string and contains the value "mongos".
303      * 
304      * @param doc
305      *            The document to validate.
306      * @return True if the document contains a "process" element that is a
307      *         string and contains the value "mongos".
308      */
309     private boolean isMongos(final Document doc) {
310 
311         final Element processName = doc.get("msg");
312         if (processName instanceof StringElement) {
313             return "isdbgrid".equals(((StringElement) processName).getValue());
314         }
315 
316         return false;
317     }
318 
319     /**
320      * Returns true if the document contains a "repl" element that is a
321      * sub-document.
322      * 
323      * @param doc
324      *            The document to validate.
325      * @return True if the document contains a "repl" element that is a
326      *         sub-document.
327      */
328     private boolean isReplicationSet(final Document doc) {
329         return (doc.get("setName") instanceof StringElement);
330     }
331 }