1 /*
2 * #%L
3 * ReplicaSetReconnectStrategy.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.connection.rs;
22
23 import static java.util.concurrent.TimeUnit.MILLISECONDS;
24
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.logging.Level;
37
38 import com.allanbank.mongodb.MongoClientConfiguration;
39 import com.allanbank.mongodb.bson.Document;
40 import com.allanbank.mongodb.bson.Element;
41 import com.allanbank.mongodb.bson.element.StringElement;
42 import com.allanbank.mongodb.client.connection.Connection;
43 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
44 import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
45 import com.allanbank.mongodb.client.message.IsMaster;
46 import com.allanbank.mongodb.client.message.Reply;
47 import com.allanbank.mongodb.client.state.AbstractReconnectStrategy;
48 import com.allanbank.mongodb.client.state.Cluster;
49 import com.allanbank.mongodb.client.state.Server;
50 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
51 import com.allanbank.mongodb.util.IOUtils;
52 import com.allanbank.mongodb.util.log.Log;
53 import com.allanbank.mongodb.util.log.LogFactory;
54
55 /**
56 * ReplicaSetReconnectStrategy provides a {@link ReconnectStrategy} designed for
57 * replica sets. The reconnect strategy attempts to locate the primary member of
58 * the replica set by:
59 * <ol>
60 * <li>Querying each member of the replica set for the primary server.</li>
61 * <li>Once a primary server has been identified by a member of the replica set
62 * (the putative primary) the putative primary server is queried for the primary
63 * server.</li>
64 * <ol>
65 * <li>If the putative primary concurs that it is the primary then the search
66 * completes and the primary server's connection is used.</li>
67 * <li>If the putative primary does not concur then the search continues
68 * scanning each server in turn for the primary server.</li>
69 * </ol>
70 *
71 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
72 * mutated in incompatible ways between any two releases of the driver.
73 * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
74 */
75 public class ReplicaSetReconnectStrategy extends AbstractReconnectStrategy {
76
77 /**
78 * The initial amount of time to pause waiting for a server to take over as
79 * the primary.
80 */
81 public static final int INITIAL_RECONNECT_PAUSE_TIME_MS = 10;
82
83 /**
84 * The Maximum amount of time to pause waiting for a server to take over as
85 * the primary.
86 */
87 public static final int MAX_RECONNECT_PAUSE_TIME_MS = 1000;
88
89 /** The logger for the {@link ReplicaSetReconnectStrategy}. */
90 protected static final Log LOG = LogFactory
91 .getLog(ReplicaSetReconnectStrategy.class);
92
93 /** The set of servers we cannot connect to. */
94 private final Set<Server> myDeadServers = Collections
95 .newSetFromMap(new ConcurrentHashMap<Server, Boolean>());
96
97 /**
98 * Creates a new ReplicaSetReconnectStrategy.
99 */
100 public ReplicaSetReconnectStrategy() {
101 super();
102 }
103
104 /**
105 * {@inheritDoc}
106 * <p>
107 * Overridden to search for the primary server in the replica set. This will
108 * only continue until the
109 * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
110 * </p>
111 */
112 @Override
113 public ReplicaSetConnection reconnect(final Connection oldConnection) {
114 final ConnectionInfo<Server> info = reconnectPrimary();
115 if (info != null) {
116 return new ReplicaSetConnection(info.getConnection(),
117 info.getConnectionKey(), getState(),
118 getConnectionFactory(), getConfig(), this);
119 }
120 return null;
121 }
122
123 /**
124 * Overridden to search for the primary server in the replica set. This will
125 * only continue until the
126 * {@link MongoClientConfiguration#getReconnectTimeout()} has expired.
127 *
128 * @return The information for the primary connection or null if the
129 * reconnect fails.
130 */
131 public synchronized ConnectionInfo<Server> reconnectPrimary() {
132 LOG.debug("Trying replica set reconnect.");
133 final Cluster state = getState();
134
135 // Figure out a deadline for the reconnect.
136 final int wait = getConfig().getReconnectTimeout();
137 long now = System.currentTimeMillis();
138 final long deadline = (wait <= 0) ? Long.MAX_VALUE : (now + wait);
139
140 final Map<Server, Future<Reply>> answers = new HashMap<Server, Future<Reply>>();
141 final Map<Server, Connection> connections = new HashMap<Server, Connection>();
142
143 // Clear any interrupts
144 final boolean interrupted = Thread.interrupted();
145 try {
146 // First try a simple reconnect.
147 for (final Server writable : state.getWritableServers()) {
148 if (verifyPutative(answers, connections, writable, deadline)) {
149 LOG.debug("New primary for replica set: {}.",
150 writable.getCanonicalName());
151 return createReplicaSetConnection(connections, writable);
152 }
153 }
154
155 // How much time to pause for replies and waiting for a server
156 // to become primary.
157 int pauseTime = INITIAL_RECONNECT_PAUSE_TIME_MS;
158 while (now < deadline) {
159 // Ask all of the servers who they think the primary is.
160 for (final Server server : state.getServers()) {
161
162 sendIsPrimary(answers, connections, server, false);
163
164 // Anyone replied yet?
165 final ConnectionInfo<Server> newConn = checkForReply(state,
166 answers, connections, deadline);
167 if (newConn != null) {
168 return newConn;
169 }
170
171 // Loop to the next server.
172 }
173
174 // Wait for a beat for a reply or a server to decide to be
175 // master.
176 sleep(pauseTime, MILLISECONDS);
177 pauseTime = Math.min(MAX_RECONNECT_PAUSE_TIME_MS, pauseTime
178 + pauseTime);
179
180 // Check again for replies before trying to reconnect.
181 final ConnectionInfo<Server> newConn = checkForReply(state,
182 answers, connections, deadline);
183 if (newConn != null) {
184 return newConn;
185 }
186
187 now = System.currentTimeMillis();
188 }
189 }
190 finally {
191 // Shut down the connections we created.
192 for (final Connection conn : connections.values()) {
193 conn.shutdown(true);
194 }
195 if (interrupted) {
196 Thread.currentThread().interrupt();
197 }
198 }
199 return null;
200 }
201
202 /**
203 * Checks for a reply from a server. If one has been received then it tries
204 * to confirm the primary server by asking it if it thinks it is the primary
205 * server.
206 *
207 * @param state
208 * The state of the cluster.
209 * @param answers
210 * The pending ({@link Future}) answers from each server.
211 * @param connections
212 * The connection to each server.
213 * @param deadline
214 * The deadline for the reconnect attempt.
215 * @return The new connection if there was a reply and that server confirmed
216 * it was the primary.
217 */
218 protected ConnectionInfo<Server> checkForReply(final Cluster state,
219 final Map<Server, Future<Reply>> answers,
220 final Map<Server, Connection> connections, final long deadline) {
221 final Map<Server, Future<Reply>> copy = new HashMap<Server, Future<Reply>>(
222 answers);
223 for (final Map.Entry<Server, Future<Reply>> entry : copy.entrySet()) {
224
225 final Server server = entry.getKey();
226 final Future<Reply> reply = entry.getValue();
227
228 if (reply.isDone()) {
229 // Remove this reply.
230 answers.remove(server);
231
232 // Check the result.
233 final String putative = checkReply(reply, connections, server,
234 deadline);
235
236 // Phase2 - Verify the putative server.
237 if (putative != null) {
238 final Server putativeServer = getState().get(putative);
239 if (verifyPutative(answers, connections, putativeServer,
240 deadline)) {
241
242 // Phase 3 - Setup a new replica set connection to the
243 // primary and seed it with a secondary if there is a
244 // suitable server.
245 LOG.info("New primary for replica set: {}", putative);
246 updateUnknown(state, answers, connections);
247 return createReplicaSetConnection(connections,
248 putativeServer);
249 }
250 }
251 }
252 else {
253 LOG.debug("No reply yet from {}.", server);
254 }
255 }
256
257 return null;
258 }
259
260 /**
261 * Extracts who the server thinks is the primary from the reply.
262 *
263 * @param replyFuture
264 * The future to get the reply from.
265 * @param connections
266 * The map of connections. The connection will be closed on an
267 * error.
268 * @param server
269 * The server.
270 * @param deadline
271 * The deadline for the reconnect attempt.
272 * @return The name of the server the reply indicates is the primary, null
273 * if there is no primary or any error.
274 */
275 protected String checkReply(final Future<Reply> replyFuture,
276 final Map<Server, Connection> connections, final Server server,
277 final long deadline) {
278 if (replyFuture != null) {
279 try {
280 final Reply reply = replyFuture.get(
281 Math.max(0, deadline - System.currentTimeMillis()),
282 TimeUnit.MILLISECONDS);
283
284 final List<Document> results = reply.getResults();
285 if (!results.isEmpty()) {
286 final Document doc = results.get(0);
287
288 // Get the name of the primary server.
289 final Element primary = doc.get("primary");
290 if (primary instanceof StringElement) {
291 return ((StringElement) primary).getValue();
292 }
293 }
294 }
295 catch (final InterruptedException e) {
296 // Just ignore the reply.
297 }
298 catch (final TimeoutException e) {
299 // Kill the associated connection.
300 final Connection conn = connections.remove(server);
301 IOUtils.close(conn);
302 }
303 catch (final ExecutionException e) {
304 // Kill the associated connection.
305 final Connection conn = connections.remove(server);
306 IOUtils.close(conn);
307 }
308 }
309 return null;
310 }
311
312 /**
313 * Sends a command to the server to return what it thinks the state of the
314 * cluster is. This method will not re-request the information from the
315 * server if there is already an outstanding request.
316 *
317 * @param answers
318 * The pending ({@link Future}) answers from each server.
319 * @param connections
320 * The connection to each server.
321 * @param server
322 * The server to send the request to.
323 * @param isPrimary
324 * If true logs connection errors as warnings. Debug otherwise.
325 * @return The future reply for the request sent to the server.
326 */
327 protected Future<Reply> sendIsPrimary(
328 final Map<Server, Future<Reply>> answers,
329 final Map<Server, Connection> connections, final Server server,
330 final boolean isPrimary) {
331 Future<Reply> reply = null;
332 try {
333 // Locate a connection to the server.
334 Connection conn = connections.get(server);
335 if ((conn == null) || !conn.isAvailable()) {
336 conn = getConnectionFactory().connect(server, getConfig());
337 connections.put(server, conn);
338 }
339
340 // Only send to the server if there is not an outstanding
341 // request.
342 reply = answers.get(server);
343 if (reply == null) {
344 LOG.debug("Sending reconnect(rs) query to {}.",
345 server.getCanonicalName());
346
347 final ServerUpdateCallback replyCallback = new ServerUpdateCallback(
348 server);
349 conn.send(new IsMaster(), replyCallback);
350
351 reply = replyCallback;
352 answers.put(server, reply);
353
354 myDeadServers.remove(server);
355 }
356 }
357 catch (final IOException e) {
358 // Nothing to do for now. Log at a debug level if this is not the
359 // primary. Warn if we think it is the primary (and have not warned
360 // before)
361 final Level level = (isPrimary && myDeadServers.add(server)) ? Level.WARNING
362 : Level.FINE;
363 LOG.log(level, e, "Cannot create a connection to '{}'.", server);
364 }
365
366 return reply;
367 }
368
369 /**
370 * Sleeps without throwing an exception.
371 *
372 * @param sleepTime
373 * The amount of time to sleep.
374 * @param units
375 * The untis for the amount of time to sleep.
376 */
377 protected void sleep(final int sleepTime, final TimeUnit units) {
378 try {
379 units.sleep(sleepTime);
380 }
381 catch (final InterruptedException e) {
382 // Ignore.
383 }
384 }
385
386 /**
387 * Tries to verify that the suspected primary server is in fact the primary
388 * server by asking it directly and synchronously.
389 *
390 * @param answers
391 * The pending ({@link Future}) answers from each server.
392 * @param connections
393 * The connection to each server.
394 * @param putativePrimary
395 * The server we think is the primary.
396 * @param deadline
397 * The deadline for the reconnect attempt.
398 * @return True if the server concurs that it is the primary.
399 */
400 protected boolean verifyPutative(final Map<Server, Future<Reply>> answers,
401 final Map<Server, Connection> connections,
402 final Server putativePrimary, final long deadline) {
403
404 LOG.debug("Verify putative server ({}) on reconnect(rs).",
405 putativePrimary);
406
407 // Make sure we send a new request. The old reply might have been
408 // before becoming the primary.
409 answers.remove(putativePrimary);
410
411 // If the primary agrees that they are the primary then it is
412 // probably true.
413 final Future<Reply> reply = sendIsPrimary(answers, connections,
414 putativePrimary, true);
415 final String primary = checkReply(reply, connections, putativePrimary,
416 deadline);
417 if (putativePrimary.getCanonicalName().equals(primary)) {
418 return true;
419 }
420
421 return false;
422 }
423
424 /**
425 * Creates the {@link ReplicaSetConnection} for the primary server.
426 *
427 * @param connections
428 * The connection that are being managed.
429 * @param primaryServer
430 * The primary server.
431 * @return The {@link ReplicaSetConnection}.
432 */
433 private ConnectionInfo<Server> createReplicaSetConnection(
434 final Map<Server, Connection> connections,
435 final Server primaryServer) {
436 final Connection primaryConn = connections.remove(primaryServer);
437
438 return new ConnectionInfo<Server>(primaryConn, primaryServer);
439 }
440
441 /**
442 * Tries to send messages to all of the members of the cluster in an
443 * indeterminate state.
444 *
445 * @param state
446 * The state of the cluster.
447 * @param answers
448 * The pending responses.
449 * @param connections
450 * The connection already created.
451 */
452 private void updateUnknown(final Cluster state,
453 final Map<Server, Future<Reply>> answers,
454 final Map<Server, Connection> connections) {
455 for (final Server server : state.getServers()) {
456 switch (server.getState()) {
457 case UNKNOWN: // Fall through.
458 case UNAVAILABLE: {
459 answers.remove(server);
460 sendIsPrimary(answers, connections, server, false);
461 break;
462 }
463 case READ_ONLY:
464 case WRITABLE:
465 default: {
466 // Known good.
467 break;
468 }
469 }
470 }
471 }
472 }