View Javadoc
1   /*
2    * #%L
3    * ReplicaSetConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.rs;
22  
23  import java.io.IOException;
24  import java.net.InetSocketAddress;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.concurrent.ExecutionException;
28  import java.util.logging.Level;
29  
30  import com.allanbank.mongodb.MongoClientConfiguration;
31  import com.allanbank.mongodb.MongoDbException;
32  import com.allanbank.mongodb.bson.Document;
33  import com.allanbank.mongodb.bson.element.StringElement;
34  import com.allanbank.mongodb.client.ClusterStats;
35  import com.allanbank.mongodb.client.ClusterType;
36  import com.allanbank.mongodb.client.callback.FutureReplyCallback;
37  import com.allanbank.mongodb.client.connection.Connection;
38  import com.allanbank.mongodb.client.connection.ConnectionFactory;
39  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
40  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
41  import com.allanbank.mongodb.client.message.IsMaster;
42  import com.allanbank.mongodb.client.message.Reply;
43  import com.allanbank.mongodb.client.state.Cluster;
44  import com.allanbank.mongodb.client.state.ClusterPinger;
45  import com.allanbank.mongodb.client.state.LatencyServerSelector;
46  import com.allanbank.mongodb.client.state.Server;
47  import com.allanbank.mongodb.client.state.ServerUpdateCallback;
48  import com.allanbank.mongodb.util.IOUtils;
49  import com.allanbank.mongodb.util.log.Log;
50  import com.allanbank.mongodb.util.log.LogFactory;
51  
52  /**
53   * Provides the ability to create connections to a replica-set environment.
54   * 
55   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
56   *         mutated in incompatible ways between any two releases of the driver.
57   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
58   */
59  public class ReplicaSetConnectionFactory implements ConnectionFactory {
60  
61      /** The logger for the {@link ReplicaSetConnectionFactory}. */
62      protected static final Log LOG = LogFactory
63              .getLog(ReplicaSetConnectionFactory.class);
64  
65      /** The factory to create proxied connections. */
66      protected final ProxiedConnectionFactory myConnectionFactory;
67  
68      /** The state of the cluster. */
69      private final Cluster myCluster;
70  
71      /** The MongoDB client configuration. */
72      private final MongoClientConfiguration myConfig;
73  
74      /** Pings the servers in the cluster collecting latency and tags. */
75      private final ClusterPinger myPinger;
76  
77      /** The strategy for reconnecting/finding the primary. */
78      private final ReplicaSetReconnectStrategy myStrategy;
79  
80      /**
81       * Creates a new {@link ReplicaSetConnectionFactory}.
82       * 
83       * @param factory
84       *            The factory to create proxied connections.
85       * @param config
86       *            The MongoDB client configuration.
87       */
88      public ReplicaSetConnectionFactory(final ProxiedConnectionFactory factory,
89              final MongoClientConfiguration config) {
90          myConnectionFactory = factory;
91          myConfig = config;
92          myCluster = new Cluster(config, ClusterType.REPLICA_SET);
93          myPinger = new ClusterPinger(myCluster, factory, config);
94  
95          myStrategy = new ReplicaSetReconnectStrategy();
96          myStrategy.setConfig(myConfig);
97          myStrategy.setConnectionFactory(myConnectionFactory);
98          myStrategy.setState(myCluster);
99          myStrategy.setSelector(new LatencyServerSelector(myCluster, false));
100 
101         // Bootstrap the state off of one of the servers.
102         bootstrap();
103     }
104 
105     /**
106      * Finds the primary member of the replica set.
107      */
108     public void bootstrap() {
109         // To fill in the list of servers.
110         locatePrimary();
111 
112         // Last thing is to start the ping of servers. This will
113         // locate the primary, and get the tags and latencies updated.
114         myPinger.initialSweep(myCluster);
115         myPinger.start();
116     }
117 
118     /**
119      * {@inheritDoc}
120      * <p>
121      * Overridden to close the cluster state and the
122      * {@link ProxiedConnectionFactory}.
123      * </p>
124      */
125     @Override
126     public void close() {
127         IOUtils.close(myPinger);
128         IOUtils.close(myConnectionFactory);
129     }
130 
131     /**
132      * Creates a new connection to the replica set.
133      * 
134      * @see ConnectionFactory#connect()
135      */
136     @Override
137     public Connection connect() throws IOException {
138 
139         // Try to find the primary.
140         List<Server> writableServers = myCluster.getWritableServers();
141         for (int i = 0; i < 10; ++i) {
142             servers: for (final Server primary : writableServers) {
143                 Connection primaryConn = null;
144                 try {
145                     primaryConn = myConnectionFactory
146                             .connect(primary, myConfig);
147 
148                     if (isWritable(primary, primaryConn)) {
149 
150                         final ReplicaSetConnection rsConnection = new ReplicaSetConnection(
151                                 primaryConn, primary, myCluster,
152                                 myConnectionFactory, myConfig, myStrategy);
153 
154                         primaryConn = null;
155 
156                         return rsConnection;
157                     }
158 
159                     break servers;
160                 }
161                 catch (final IOException e) {
162                     LOG.debug(e, "Error connecting to presumptive primary: {}",
163                             e.getMessage());
164                 }
165                 finally {
166                     IOUtils.close(primaryConn);
167                 }
168             }
169 
170             // Update the stale state.
171             writableServers = locatePrimary();
172         }
173 
174         // Don't throw an error here.
175         // Might be doing a secondary query which means we don't need the
176         // primary.
177         return new ReplicaSetConnection(null, null, myCluster,
178                 myConnectionFactory, myConfig, myStrategy);
179     }
180 
181     /**
182      * {@inheritDoc}
183      * <p>
184      * Overridden to return the {@link Cluster}.
185      * </p>
186      */
187     @Override
188     public ClusterStats getClusterStats() {
189         return myCluster;
190     }
191 
192     /**
193      * {@inheritDoc}
194      * <p>
195      * Overridden to return {@link ClusterType#REPLICA_SET} cluster type.
196      * </p>
197      */
198     @Override
199     public ClusterType getClusterType() {
200         return ClusterType.REPLICA_SET;
201     }
202 
203     /**
204      * {@inheritDoc}
205      * <p>
206      * Overridden to return a replica set {@link ReconnectStrategy}.
207      * </p>
208      */
209     @Override
210     public ReconnectStrategy getReconnectStrategy() {
211         return myStrategy;
212     }
213 
214     /**
215      * Returns the clusterState value.
216      * 
217      * @return The clusterState value.
218      */
219     protected Cluster getCluster() {
220         return myCluster;
221     }
222 
223     /**
224      * Determines if the connection is to the primary member of the cluster.
225      * 
226      * @param server
227      *            The server connected to.
228      * @param connection
229      *            The connection to test.
230      * @return True if the connection is to the primary member of the
231      *         cluster/replica set.
232      */
233     protected boolean isWritable(final Server server,
234             final Connection connection) {
235 
236         try {
237             final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
238                     server);
239             connection.send(new IsMaster(), replyCallback);
240 
241             final Reply reply = replyCallback.get();
242             final List<Document> results = reply.getResults();
243             if (!results.isEmpty()) {
244                 final Document doc = results.get(0);
245 
246                 // Get the name of the primary server.
247                 final StringElement primaryName = doc.get(StringElement.class,
248                         "primary");
249                 if (primaryName != null) {
250                     return (primaryName.getValue().equals(connection
251                             .getServerName()));
252                 }
253             }
254         }
255         catch (final InterruptedException e) {
256             // Just ignore the reply.
257             LOG.debug(e, "Failure testing if a connection is writable: {}",
258                     e.getMessage());
259         }
260         catch (final ExecutionException e) {
261             // Just ignore the reply.
262             LOG.debug(e, "Failure testing if a connection is writable: {}",
263                     e.getMessage());
264         }
265         return false;
266     }
267 
268     /**
269      * Locates the primary server in the cluster.
270      * 
271      * @return The list of primary servers.
272      */
273     protected List<Server> locatePrimary() {
274         for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
275             Connection conn = null;
276             final FutureReplyCallback future = new FutureReplyCallback();
277             try {
278                 final Server server = myCluster.add(addr);
279 
280                 conn = myConnectionFactory.connect(server, myConfig);
281 
282                 conn.send(new IsMaster(), future);
283 
284                 final Reply reply = future.get();
285                 final List<Document> results = reply.getResults();
286                 if (!results.isEmpty()) {
287                     final Document doc = results.get(0);
288 
289                     // Replica Sets MUST connect to the primary server.
290                     // See if we can add the other servers also.
291                     if (myConfig.isAutoDiscoverServers()) {
292                         // Pull them all in.
293                         final List<StringElement> hosts = doc.find(
294                                 StringElement.class, "hosts", ".*");
295                         for (final StringElement host : hosts) {
296                             myCluster.add(host.getValue());
297                         }
298                     }
299 
300                     // Add and mark the primary as writable.
301                     final StringElement primary = doc.findFirst(
302                             StringElement.class, "primary");
303                     if (primary != null) {
304                         return Collections.singletonList(myCluster.add(primary
305                                 .getValue()));
306                     }
307                 }
308             }
309             catch (final IOException ioe) {
310                 LOG.warn(ioe, "I/O error during replica-set bootstrap to {}.",
311                         addr);
312             }
313             catch (final MongoDbException me) {
314                 LOG.warn(me,
315                         "MongoDB error during replica-set bootstrap to {}.",
316                         addr);
317             }
318             catch (final InterruptedException e) {
319                 LOG.warn(e, "Interrupted during replica-set bootstrap to {}.",
320                         addr);
321             }
322             catch (final ExecutionException e) {
323                 LOG.warn(e, "Error during replica-set bootstrap to {}.", addr);
324             }
325             finally {
326                 IOUtils.close(conn, Level.WARNING,
327                         "I/O error shutting down replica-set bootstrap connection to "
328                                 + addr + ".");
329             }
330         }
331         return Collections.emptyList();
332     }
333 }