View Javadoc
1   /*
2    * #%L
3    * ReplicaSetReconnectStrategy.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.rs;
22  
23  import static java.util.concurrent.TimeUnit.MILLISECONDS;
24  
25  import java.io.IOException;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.TimeoutException;
36  import java.util.logging.Level;
37  
38  import com.allanbank.mongodb.MongoClientConfiguration;
39  import com.allanbank.mongodb.bson.Document;
40  import com.allanbank.mongodb.bson.Element;
41  import com.allanbank.mongodb.bson.element.StringElement;
42  import com.allanbank.mongodb.client.connection.Connection;
43  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
44  import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
45  import com.allanbank.mongodb.client.message.IsMaster;
46  import com.allanbank.mongodb.client.message.Reply;
47  import com.allanbank.mongodb.client.state.AbstractReconnectStrategy;
48  import com.allanbank.mongodb.client.state.Cluster;
49  import com.allanbank.mongodb.client.state.Server;
50  import com.allanbank.mongodb.client.state.ServerUpdateCallback;
51  import com.allanbank.mongodb.util.IOUtils;
52  import com.allanbank.mongodb.util.log.Log;
53  import com.allanbank.mongodb.util.log.LogFactory;
54  
55  /**
56   * ReplicaSetReconnectStrategy provides a {@link ReconnectStrategy} designed for
57   * replica sets. The reconnect strategy attempts to locate the primary member of
58   * the replica set by:
59   * <ol>
60   * <li>Querying each member of the replica set for the primary server.</li>
61   * <li>Once a primary server has been identified by a member of the replica set
62   * (the putative primary) the putative primary server is queried for the primary
63   * server.</li>
64   * <ol>
65   * <li>If the putative primary concurs that it is the primary then the search
66   * completes and the primary server's connection is used.</li>
67   * <li>If the putative primary does not concur then the search continues
68   * scanning each server in turn for the primary server.</li>
69   * </ol>
70   * 
71   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
72   *         mutated in incompatible ways between any two releases of the driver.
73   * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
74   */
75  public class ReplicaSetReconnectStrategy extends AbstractReconnectStrategy {
76  
77      /**
78       * The initial amount of time to pause waiting for a server to take over as
79       * the primary.
80       */
81      public static final int INITIAL_RECONNECT_PAUSE_TIME_MS = 10;
82  
83      /**
84       * The Maximum amount of time to pause waiting for a server to take over as
85       * the primary.
86       */
87      public static final int MAX_RECONNECT_PAUSE_TIME_MS = 1000;
88  
89      /** The logger for the {@link ReplicaSetReconnectStrategy}. */
90      protected static final Log LOG = LogFactory
91              .getLog(ReplicaSetReconnectStrategy.class);
92  
93      /** The set of servers we cannot connect to. */
94      private final Set<Server> myDeadServers = Collections
95              .newSetFromMap(new ConcurrentHashMap<Server, Boolean>());
96  
97      /**
98       * Creates a new ReplicaSetReconnectStrategy.
99       */
100     public ReplicaSetReconnectStrategy() {
101         super();
102     }
103 
104     /**
105      * {@inheritDoc}
106      * <p>
107      * Overridden to search for the primary server in the replica set. This will
108      * only continue until the
109      * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
110      * </p>
111      */
112     @Override
113     public ReplicaSetConnection reconnect(final Connection oldConnection) {
114         final ConnectionInfo<Server> info = reconnectPrimary();
115         if (info != null) {
116             return new ReplicaSetConnection(info.getConnection(),
117                     info.getConnectionKey(), getState(),
118                     getConnectionFactory(), getConfig(), this);
119         }
120         return null;
121     }
122 
123     /**
124      * Overridden to search for the primary server in the replica set. This will
125      * only continue until the
126      * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
127      * 
128      * @return The information for the primary connection or null if the
129      *         reconnect fails.
130      */
131     public synchronized ConnectionInfo<Server> reconnectPrimary() {
132         LOG.debug("Trying replica set reconnect.");
133         final Cluster state = getState();
134 
135         // Figure out a deadline for the reconnect.
136         final int wait = getConfig().getReconnectTimeout();
137         long now = System.currentTimeMillis();
138         final long deadline = (wait <= 0) ? Long.MAX_VALUE : (now + wait);
139 
140         final Map<Server, Future<Reply>> answers = new HashMap<Server, Future<Reply>>();
141         final Map<Server, Connection> connections = new HashMap<Server, Connection>();
142 
143         // Clear any interrupts
144         final boolean interrupted = Thread.interrupted();
145         try {
146             // First try a simple reconnect.
147             for (final Server writable : state.getWritableServers()) {
148                 if (verifyPutative(answers, connections, writable, deadline)) {
149                     LOG.debug("New primary for replica set: {}.",
150                             writable.getCanonicalName());
151                     return createReplicaSetConnection(connections, writable);
152                 }
153             }
154 
155             // How much time to pause for replies and waiting for a server
156             // to become primary.
157             int pauseTime = INITIAL_RECONNECT_PAUSE_TIME_MS;
158             while (now < deadline) {
159                 // Ask all of the servers who they think the primary is.
160                 for (final Server server : state.getServers()) {
161 
162                     sendIsPrimary(answers, connections, server, false);
163 
164                     // Anyone replied yet?
165                     final ConnectionInfo<Server> newConn = checkForReply(state,
166                             answers, connections, deadline);
167                     if (newConn != null) {
168                         return newConn;
169                     }
170 
171                     // Loop to the next server.
172                 }
173 
174                 // Wait for a beat for a reply or a server to decide to be
175                 // master.
176                 sleep(pauseTime, MILLISECONDS);
177                 pauseTime = Math.min(MAX_RECONNECT_PAUSE_TIME_MS, pauseTime
178                         + pauseTime);
179 
180                 // Check again for replies before trying to reconnect.
181                 final ConnectionInfo<Server> newConn = checkForReply(state,
182                         answers, connections, deadline);
183                 if (newConn != null) {
184                     return newConn;
185                 }
186 
187                 now = System.currentTimeMillis();
188             }
189         }
190         finally {
191             // Shut down the connections we created.
192             for (final Connection conn : connections.values()) {
193                 conn.shutdown(true);
194             }
195             if (interrupted) {
196                 Thread.currentThread().interrupt();
197             }
198         }
199         return null;
200     }
201 
202     /**
203      * Checks for a reply from a server. If one has been received then it tries
204      * to confirm the primary server by asking it if it thinks it is the primary
205      * server.
206      * 
207      * @param state
208      *            The state of the cluster.
209      * @param answers
210      *            The pending ({@link Future}) answers from each server.
211      * @param connections
212      *            The connection to each server.
213      * @param deadline
214      *            The deadline for the reconnect attempt.
215      * @return The new connection if there was a reply and that server confirmed
216      *         it was the primary.
217      */
218     protected ConnectionInfo<Server> checkForReply(final Cluster state,
219             final Map<Server, Future<Reply>> answers,
220             final Map<Server, Connection> connections, final long deadline) {
221         final Map<Server, Future<Reply>> copy = new HashMap<Server, Future<Reply>>(
222                 answers);
223         for (final Map.Entry<Server, Future<Reply>> entry : copy.entrySet()) {
224 
225             final Server server = entry.getKey();
226             final Future<Reply> reply = entry.getValue();
227 
228             if (reply.isDone()) {
229                 // Remove this reply.
230                 answers.remove(server);
231 
232                 // Check the result.
233                 final String putative = checkReply(reply, connections, server,
234                         deadline);
235 
236                 // Phase2 - Verify the putative server.
237                 if (putative != null) {
238                     final Server putativeServer = getState().get(putative);
239                     if (verifyPutative(answers, connections, putativeServer,
240                             deadline)) {
241 
242                         // Phase 3 - Setup a new replica set connection to the
243                         // primary and seed it with a secondary if there is a
244                         // suitable server.
245                         LOG.info("New primary for replica set: {}", putative);
246                         updateUnknown(state, answers, connections);
247                         return createReplicaSetConnection(connections,
248                                 putativeServer);
249                     }
250                 }
251             }
252             else {
253                 LOG.debug("No reply yet from {}.", server);
254             }
255         }
256 
257         return null;
258     }
259 
260     /**
261      * Extracts who the server thinks is the primary from the reply.
262      * 
263      * @param replyFuture
264      *            The future to get the reply from.
265      * @param connections
266      *            The map of connections. The connection will be closed on an
267      *            error.
268      * @param server
269      *            The server.
270      * @param deadline
271      *            The deadline for the reconnect attempt.
272      * @return The name of the server the reply indicates is the primary, null
273      *         if there is no primary or any error.
274      */
275     protected String checkReply(final Future<Reply> replyFuture,
276             final Map<Server, Connection> connections, final Server server,
277             final long deadline) {
278         if (replyFuture != null) {
279             try {
280                 final Reply reply = replyFuture.get(
281                         Math.max(0, deadline - System.currentTimeMillis()),
282                         TimeUnit.MILLISECONDS);
283 
284                 final List<Document> results = reply.getResults();
285                 if (!results.isEmpty()) {
286                     final Document doc = results.get(0);
287 
288                     // Get the name of the primary server.
289                     final Element primary = doc.get("primary");
290                     if (primary instanceof StringElement) {
291                         return ((StringElement) primary).getValue();
292                     }
293                 }
294             }
295             catch (final InterruptedException e) {
296                 // Just ignore the reply.
297             }
298             catch (final TimeoutException e) {
299                 // Kill the associated connection.
300                 final Connection conn = connections.remove(server);
301                 IOUtils.close(conn);
302             }
303             catch (final ExecutionException e) {
304                 // Kill the associated connection.
305                 final Connection conn = connections.remove(server);
306                 IOUtils.close(conn);
307             }
308         }
309         return null;
310     }
311 
312     /**
313      * Sends a command to the server to return what it thinks the state of the
314      * cluster is. This method will not re-request the information from the
315      * server if there is already an outstanding request.
316      * 
317      * @param answers
318      *            The pending ({@link Future}) answers from each server.
319      * @param connections
320      *            The connection to each server.
321      * @param server
322      *            The server to send the request to.
323      * @param isPrimary
324      *            If true logs connection errors as warnings. Debug otherwise.
325      * @return The future reply for the request sent to the server.
326      */
327     protected Future<Reply> sendIsPrimary(
328             final Map<Server, Future<Reply>> answers,
329             final Map<Server, Connection> connections, final Server server,
330             final boolean isPrimary) {
331         Future<Reply> reply = null;
332         try {
333             // Locate a connection to the server.
334             Connection conn = connections.get(server);
335             if ((conn == null) || !conn.isAvailable()) {
336                 conn = getConnectionFactory().connect(server, getConfig());
337                 connections.put(server, conn);
338             }
339 
340             // Only send to the server if there is not an outstanding
341             // request.
342             reply = answers.get(server);
343             if (reply == null) {
344                 LOG.debug("Sending reconnect(rs) query to {}.",
345                         server.getCanonicalName());
346 
347                 final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
348                         server);
349                 conn.send(new IsMaster(), replyCallback);
350 
351                 reply = replyCallback;
352                 answers.put(server, reply);
353 
354                 myDeadServers.remove(server);
355             }
356         }
357         catch (final IOException e) {
358             // Nothing to do for now. Log at a debug level if this is not the
359             // primary. Warn if we think it is the primary (and have not warned
360             // before)
361             final Level level = (isPrimary && myDeadServers.add(server)) ? Level.WARNING
362                     : Level.FINE;
363             LOG.log(level, e, "Cannot create a connection to '{}'.", server);
364         }
365 
366         return reply;
367     }
368 
369     /**
370      * Sleeps without throwing an exception.
371      * 
372      * @param sleepTime
373      *            The amount of time to sleep.
374      * @param units
375      *            The untis for the amount of time to sleep.
376      */
377     protected void sleep(final int sleepTime, final TimeUnit units) {
378         try {
379             units.sleep(sleepTime);
380         }
381         catch (final InterruptedException e) {
382             // Ignore.
383         }
384     }
385 
386     /**
387      * Tries to verify that the suspected primary server is in fact the primary
388      * server by asking it directly and synchronously.
389      * 
390      * @param answers
391      *            The pending ({@link Future}) answers from each server.
392      * @param connections
393      *            The connection to each server.
394      * @param putativePrimary
395      *            The server we think is the primary.
396      * @param deadline
397      *            The deadline for the reconnect attempt.
398      * @return True if the server concurs that it is the primary.
399      */
400     protected boolean verifyPutative(final Map<Server, Future<Reply>> answers,
401             final Map<Server, Connection> connections,
402             final Server putativePrimary, final long deadline) {
403 
404         LOG.debug("Verify putative server ({}) on reconnect(rs).",
405                 putativePrimary);
406 
407         // Make sure we send a new request. The old reply might have been
408         // before becoming the primary.
409         answers.remove(putativePrimary);
410 
411         // If the primary agrees that they are the primary then it is
412         // probably true.
413         final Future<Reply> reply = sendIsPrimary(answers, connections,
414                 putativePrimary, true);
415         final String primary = checkReply(reply, connections, putativePrimary,
416                 deadline);
417         if (putativePrimary.getCanonicalName().equals(primary)) {
418             return true;
419         }
420 
421         return false;
422     }
423 
424     /**
425      * Creates the {@link ReplicaSetConnection} for the primary server.
426      * 
427      * @param connections
428      *            The connection that are being managed.
429      * @param primaryServer
430      *            The primary server.
431      * @return The {@link ReplicaSetConnection}.
432      */
433     private ConnectionInfo<Server> createReplicaSetConnection(
434             final Map<Server, Connection> connections,
435             final Server primaryServer) {
436         final Connection primaryConn = connections.remove(primaryServer);
437 
438         return new ConnectionInfo<Server>(primaryConn, primaryServer);
439     }
440 
441     /**
442      * Tries to send messages to all of the members of the cluster in an
443      * indeterminate state.
444      * 
445      * @param state
446      *            The state of the cluster.
447      * @param answers
448      *            The pending responses.
449      * @param connections
450      *            The connection already created.
451      */
452     private void updateUnknown(final Cluster state,
453             final Map<Server, Future<Reply>> answers,
454             final Map<Server, Connection> connections) {
455         for (final Server server : state.getServers()) {
456             switch (server.getState()) {
457             case UNKNOWN: // Fall through.
458             case UNAVAILABLE: {
459                 answers.remove(server);
460                 sendIsPrimary(answers, connections, server, false);
461                 break;
462             }
463             case READ_ONLY:
464             case WRITABLE:
465             default: {
466                 // Known good.
467                 break;
468             }
469             }
470         }
471     }
472 }