1 /*
2 * #%L
3 * ClientImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client;
21
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.lang.reflect.Constructor;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import com.allanbank.mongodb.Durability;
36 import com.allanbank.mongodb.MongoClientConfiguration;
37 import com.allanbank.mongodb.MongoCursorControl;
38 import com.allanbank.mongodb.MongoDbException;
39 import com.allanbank.mongodb.MongoIterator;
40 import com.allanbank.mongodb.ReadPreference;
41 import com.allanbank.mongodb.StreamCallback;
42 import com.allanbank.mongodb.bson.Document;
43 import com.allanbank.mongodb.bson.DocumentAssignable;
44 import com.allanbank.mongodb.bson.NumericElement;
45 import com.allanbank.mongodb.bson.element.StringElement;
46 import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
47 import com.allanbank.mongodb.client.connection.Connection;
48 import com.allanbank.mongodb.client.connection.ConnectionFactory;
49 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
50 import com.allanbank.mongodb.client.connection.bootstrap.BootstrapConnectionFactory;
51 import com.allanbank.mongodb.client.state.Cluster;
52 import com.allanbank.mongodb.error.CannotConnectException;
53 import com.allanbank.mongodb.error.ConnectionLostException;
54 import com.allanbank.mongodb.util.IOUtils;
55 import com.allanbank.mongodb.util.log.Log;
56 import com.allanbank.mongodb.util.log.LogFactory;
57
58 /**
59 * Implementation of the internal {@link Client} interface which all requests to
60 * the MongoDB servers pass.
61 *
62 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
63 * mutated in incompatible ways between any two releases of the driver.
64 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
65 */
66 public class ClientImpl extends AbstractClient {
67
68 /** The logger for the {@link ClientImpl}. */
69 protected static final Log LOG = LogFactory.getLog(ClientImpl.class);
70
71 /**
72 * Resolves the bootstrap connection factory to use.
73 *
74 * @param config
75 * The client's configuration.
76 * @return The connection factory for connecting to the cluster.
77 */
78 protected static ConnectionFactory resolveBootstrap(
79 final MongoClientConfiguration config) {
80 ConnectionFactory result;
81 try {
82 final String name = "com.allanbank.mongodb.extensions.bootstrap.ExtensionsBootstrapConnectionFactory";
83 final Class<?> clazz = Class.forName(name);
84 final Constructor<?> constructor = clazz
85 .getConstructor(MongoClientConfiguration.class);
86
87 result = (ConnectionFactory) constructor.newInstance(config);
88 }
89 // Too many exceptions.
90 catch (final RuntimeException e) {
91 throw e;
92 }
93 catch (final Exception e) {
94 result = new BootstrapConnectionFactory(config);
95 }
96
97 return result;
98 }
99
100 /** Counter for the number of reconnects currently being attempted. */
101 private int myActiveReconnects;
102
103 /** The configuration for interacting with MongoDB. */
104 private final MongoClientConfiguration myConfig;
105
106 /** Factory for creating connections to MongoDB. */
107 private final ConnectionFactory myConnectionFactory;
108
109 /** The listener for changes to the state of connections. */
110 private final PropertyChangeListener myConnectionListener;
111
112 /** The set of open connections. */
113 private final List<Connection> myConnections;
114
115 /** The set of open connections. */
116 private final BlockingQueue<Connection> myConnectionsToClose;
117
118 /** The sequence of the connection that was last used. */
119 private final AtomicLong myNextConnectionSequence = new AtomicLong(0);
120
121 /**
122 * Create a new ClientImpl.
123 *
124 * @param config
125 * The configuration for interacting with MongoDB.
126 */
127 public ClientImpl(final MongoClientConfiguration config) {
128 this(config, resolveBootstrap(config));
129 }
130
131 /**
132 * Create a new ClientImpl.
133 *
134 * @param config
135 * The configuration for interacting with MongoDB.
136 * @param connectionFactory
137 * The source of connection for the client.
138 */
139 public ClientImpl(final MongoClientConfiguration config,
140 final ConnectionFactory connectionFactory) {
141 myConfig = config;
142 myConnectionFactory = connectionFactory;
143 myConnections = new CopyOnWriteArrayList<Connection>();
144 myConnectionsToClose = new LinkedBlockingQueue<Connection>();
145 myConnectionListener = new ConnectionListener();
146 myActiveReconnects = 0;
147 }
148
149 /**
150 * {@inheritDoc}
151 * <p>
152 * Overridden to close all of the open connections.
153 * </p>
154 *
155 * @see Closeable#close()
156 */
157 @Override
158 public void close() {
159 // Stop any more messages.
160 super.close();
161
162 while (!myConnections.isEmpty()) {
163 try {
164 final Connection conn = myConnections.remove(0);
165 myConnectionsToClose.add(conn);
166 conn.shutdown(false);
167 }
168 catch (final ArrayIndexOutOfBoundsException aiob) {
169 // There is a race between the isEmpty() and the remove we can't
170 // avoid. Next check if isEmpty() will bounce us out of the
171 // loop.
172 aiob.getCause(); // Shhhh - PMD.
173 }
174 }
175
176 // Work off the connections to close until they are all closed.
177 final List<Connection> conns = new ArrayList<Connection>(
178 myConnectionsToClose);
179 for (final Connection conn : conns) {
180 conn.waitForClosed(myConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
181 if (conn.isOpen()) {
182 // Force the connection to close.
183 close(conn);
184 }
185 }
186
187 // Shutdown the connections factory.
188 IOUtils.close(myConnectionFactory);
189 }
190
191 /**
192 * {@inheritDoc}
193 * <p>
194 * Overridden to return the {@link Cluster}.
195 * </p>
196 */
197 @Override
198 public ClusterStats getClusterStats() {
199 return myConnectionFactory.getClusterStats();
200 }
201
202 /**
203 * {@inheritDoc}
204 * <p>
205 * Overridden to return the {@link ClusterType} of the
206 * {@link ConnectionFactory}.
207 * </p>
208 */
209 @Override
210 public ClusterType getClusterType() {
211 return myConnectionFactory.getClusterType();
212 }
213
214 /**
215 * {@inheritDoc}
216 * <p>
217 * Overridden to return the configuration used when the client was
218 * constructed.
219 * </p>
220 */
221 @Override
222 public MongoClientConfiguration getConfig() {
223 return myConfig;
224 }
225
226 /**
227 * Returns the current number of open connections.
228 *
229 * @return The current number of open connections.
230 */
231 public int getConnectionCount() {
232 return myConnections.size();
233 }
234
235 /**
236 * {@inheritDoc}
237 * <p>
238 * Overridden to return the configurations default durability.
239 * </p>
240 *
241 * @see Client#getDefaultDurability()
242 */
243 @Override
244 public Durability getDefaultDurability() {
245 return myConfig.getDefaultDurability();
246 }
247
248 /**
249 * {@inheritDoc}
250 * <p>
251 * Overridden to return the configurations default read preference.
252 * </p>
253 *
254 * @see Client#getDefaultReadPreference()
255 */
256 @Override
257 public ReadPreference getDefaultReadPreference() {
258 return myConfig.getDefaultReadPreference();
259 }
260
261 /**
262 * Returns true if the document looks like a cursor restart document. e.g.,
263 * one that is created by {@link MongoIteratorImpl#asDocument()}.
264 *
265 * @param doc
266 * The potential cursor document.
267 * @return True if the document looks like it was created by
268 * {@link MongoIteratorImpl#asDocument()}.
269 */
270 public boolean isCursorDocument(final Document doc) {
271 return (doc.getElements().size() == 5)
272 && (doc.get(StringElement.class,
273 MongoCursorControl.NAME_SPACE_FIELD) != null)
274 && (doc.get(NumericElement.class,
275 MongoCursorControl.CURSOR_ID_FIELD) != null)
276 && (doc.get(StringElement.class,
277 MongoCursorControl.SERVER_FIELD) != null)
278 && (doc.get(NumericElement.class,
279 MongoCursorControl.BATCH_SIZE_FIELD) != null)
280 && (doc.get(NumericElement.class,
281 MongoCursorControl.LIMIT_FIELD) != null);
282 }
283
284 /**
285 * {@inheritDoc}
286 */
287 @Override
288 public MongoIterator<Document> restart(
289 final DocumentAssignable cursorDocument)
290 throws IllegalArgumentException {
291 final Document cursorDoc = cursorDocument.asDocument();
292
293 if (isCursorDocument(cursorDoc)) {
294 final MongoIteratorImpl iter = new MongoIteratorImpl(cursorDoc,
295 this);
296 iter.restart();
297
298 return iter;
299 }
300
301 throw new IllegalArgumentException(
302 "Cannot restart without a well formed cursor document: "
303 + cursorDoc);
304 }
305
306 /**
307 * {@inheritDoc}
308 */
309 @Override
310 public MongoCursorControl restart(final StreamCallback<Document> results,
311 final DocumentAssignable cursorDocument)
312 throws IllegalArgumentException {
313 final Document cursorDoc = cursorDocument.asDocument();
314
315 if (isCursorDocument(cursorDoc)) {
316 final CursorStreamingCallback cb = new CursorStreamingCallback(
317 this, cursorDoc, results);
318 cb.restart();
319
320 return cb;
321 }
322 throw new IllegalArgumentException(
323 "Cannot restart without a well formed cursor document: "
324 + cursorDoc);
325 }
326
327 /**
328 * {@inheritDoc}
329 * <p>
330 * Tries to locate a connection that can quickly dispatch the message to a
331 * MongoDB server. The basic metrics for determining if a connection is idle
332 * is to look at the number of messages waiting to be sent. The basic logic
333 * for finding a connection is:
334 * <ol>
335 * <li>Look at the current connection and the next connection. If either is
336 * idle, use it.</li>
337 * <li>If there are no idle connections determine the maximum number of
338 * allowed connections and if there are fewer that the maximum allowed then
339 * take the connection creation lock, create a new connection, use it, and
340 * add to the set of available connections and release the lock.</li>
341 * <li>Neither of the above works then increment the connection index and
342 * use the previous or next connection based on which has the fewest pending
343 * connections.</li>
344 * <ol>
345 */
346 @Override
347 protected Connection findConnection(final Message message1,
348 final Message message2) throws MongoDbException {
349 // Make sure we shrink connections when the max changes.
350 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
351 if (limit < myConnections.size()) {
352 synchronized (myConnectionFactory) {
353 // Mark the connections as persona non grata.
354 while (limit < myConnections.size()) {
355 try {
356 final Connection conn = myConnections.remove(0);
357 myConnectionsToClose.add(conn);
358 conn.shutdown(false);
359 }
360 catch (final ArrayIndexOutOfBoundsException aiob) {
361 // Race between the size() and remove(0).
362 // Next loop should resolve.
363 aiob.getCause(); // Shhhh - PMD.
364 }
365 }
366 }
367 }
368
369 // Locate a connection to use.
370 final Connection conn = searchConnection(message1, message2, true);
371
372 if (conn == null) {
373 throw new CannotConnectException(
374 "Could not create a connection to the server.");
375 }
376
377 return conn;
378 }
379
380 /**
381 * Tries to reconnect previously open {@link Connection}s. If a connection
382 * was being closed then cleans up the remaining state.
383 *
384 * @param connection
385 * The connection that was closed.
386 */
387 protected void handleConnectionClosed(final Connection connection) {
388 // Look for the connection in the "active" set first.
389 if (myConnections.contains(connection)) {
390 // Is it a graceful shutdown?
391 if (connection.isShuttingDown() && myConnections.remove(connection)) {
392
393 if (myConnections.size() < myConfig.getMinConnectionCount()) {
394 LOG.debug(
395 "MongoDB Connection closed: {}. Will try to reconnect.",
396 connection);
397 reconnect(connection);
398 }
399 else {
400 LOG.info("MongoDB Connection closed: {}", connection);
401 connection
402 .removePropertyChangeListener(myConnectionListener);
403 connection.raiseErrors(new ConnectionLostException(
404 "Connection shutdown."));
405 }
406 }
407 else {
408 // Attempt a reconnect.
409 LOG.info("Unexpected MongoDB Connection closed: " + connection
410 + ". Will try to reconnect.");
411 reconnect(connection);
412 }
413 }
414 else if (myConnectionsToClose.remove(connection)) {
415 LOG.debug("MongoDB Connection closed: {}", connection);
416 connection.removePropertyChangeListener(myConnectionListener);
417 }
418 else {
419 LOG.info("Unknown MongoDB Connection closed: {}", connection);
420 connection.removePropertyChangeListener(myConnectionListener);
421 }
422 }
423
424 /**
425 * Runs the reconnect logic for the connection.
426 *
427 * @param connection
428 * The connection to reconnect.
429 */
430 protected void reconnect(final Connection connection) {
431 final ReconnectStrategy strategy = myConnectionFactory
432 .getReconnectStrategy();
433
434 try {
435 synchronized (this) {
436 myActiveReconnects += 1;
437 }
438
439 final Connection newConnection = strategy.reconnect(connection);
440 if (newConnection != null) {
441 // Get the new connection in the rotation.
442 myConnections.add(newConnection);
443 newConnection.addPropertyChangeListener(myConnectionListener);
444 }
445 }
446 finally {
447 myConnections.remove(connection);
448 connection.removePropertyChangeListener(myConnectionListener);
449
450 // Raise errors for all of the pending messages - there is no way to
451 // know their state of flight between here and the server.
452 final MongoDbException exception = new ConnectionLostException(
453 "Connection lost to MongoDB: " + connection);
454 connection.raiseErrors(exception);
455
456 synchronized (this) {
457 myActiveReconnects -= 1;
458 notifyAll();
459 }
460 }
461 }
462
463 /**
464 * Searches for a connection to use.
465 * <p>
466 * Tries to locate a connection that can quickly dispatch the message to a
467 * MongoDB server. The basic metrics for determining if a connection is idle
468 * is to look at the number of messages waiting to be sent. The basic logic
469 * for finding a connection is:
470 * <ol>
471 * <li>Look at the current connection and the next connection. If either is
472 * idle, use it.</li>
473 * <li>If there are no idle connections determine the maximum number of
474 * allowed connections and if there are fewer that the maximum allowed then
475 * take the connection creation lock, create a new connection, use it, and
476 * add to the set of available connections and release the lock.</li>
477 * <li>Neither of the above works then increment the connection index and
478 * use the previous or next connection based on which has the fewest pending
479 * connections.</li>
480 * <ol>
481 *
482 * @param message1
483 * The first message that will be sent. The connection return
484 * should be compatible with all of the messages
485 * {@link ReadPreference}.
486 * @param message2
487 * The second message that will be sent. The connection return
488 * should be compatible with all of the messages
489 * {@link ReadPreference}. May be <code>null</code>.
490 * @param waitForReconnect
491 * If true then the search will block while there is an active
492 * reconnect attempt.
493 *
494 * @return The {@link Connection} to send a message on.
495 * @throws MongoDbException
496 * In the case of an error finding a {@link Connection}.
497 */
498 protected Connection searchConnection(final Message message1,
499 final Message message2, final boolean waitForReconnect)
500 throws MongoDbException {
501 // Locate a connection to use.
502 Connection conn = findIdleConnection();
503 if (conn == null) {
504 conn = tryCreateConnection();
505 if (conn == null) {
506 conn = findMostIdleConnection();
507 if ((conn == null) && waitForReconnect) {
508 conn = waitForReconnect(message1, message2);
509 }
510 }
511 }
512
513 return conn;
514 }
515
516 /**
517 * Silently closes the connection.
518 *
519 * @param conn
520 * The connection to close.
521 */
522 private void close(final Connection conn) {
523 try {
524 conn.close();
525 }
526 catch (final IOException ioe) {
527 LOG.warn(ioe, "Error closing connection to MongoDB: {}", conn);
528 }
529 finally {
530 myConnections.remove(conn);
531 myConnectionsToClose.remove(conn);
532
533 conn.removePropertyChangeListener(myConnectionListener);
534 }
535 }
536
537 /**
538 * Tries to find an idle connection to use from the current and next
539 * connection..
540 *
541 * @return The idle connection, if found.
542 */
543 private Connection findIdleConnection() {
544 if (!myConnections.isEmpty()) {
545 // Only get() here to try and reuse idle connections.
546 final long connSequence = myNextConnectionSequence.get();
547 for (int loop = 0; loop < Math.min(2, myConnections.size()); ++loop) {
548
549 // Cast to a long to make sure the Math.abs() works for
550 // Integer.MIN_VALUE
551 final long sequence = Math.abs(connSequence + loop);
552 final int size = myConnections.size();
553 final int index = (int) (sequence % size);
554 try {
555 final Connection conn = myConnections.get(index);
556 if (conn.isAvailable() && (conn.getPendingCount() == 0)) {
557 return conn;
558 }
559 }
560 catch (final ArrayIndexOutOfBoundsException aiob) {
561 // Race between the size and get and someone closing a
562 // connection. Next loop should fix.
563 aiob.getCause(); // Shhh - PMD.
564 }
565 }
566 }
567
568 return null;
569 }
570
571 /**
572 * Locates the most idle connection to use from the current and next
573 * connection.
574 *
575 * @return The most idle connection.
576 */
577 private Connection findMostIdleConnection() {
578 if (!myConnections.isEmpty()) {
579 final long next = (myConnections.size() <= 1) ? 1
580 : myNextConnectionSequence.incrementAndGet();
581 final long previous = next - 1;
582
583 Connection previousConn = null;
584 Connection nextConn = null;
585 while ((previousConn == null) || (nextConn == null)) {
586 try {
587 final int size = myConnections.size();
588 previousConn = myConnections.get((int) (previous % size));
589 nextConn = myConnections.get((int) (next % size));
590 }
591 catch (final ArrayIndexOutOfBoundsException aiob) {
592 // Race between the size and get.
593 // Next loop should fix.
594 aiob.getCause(); // Shhh - PMD.
595 }
596 }
597
598 if (previousConn == nextConn) {
599 if (previousConn.isAvailable()) {
600 return previousConn;
601 }
602 }
603 else if (previousConn.isAvailable()) {
604 if (nextConn.isAvailable()) {
605 if (previousConn.getPendingCount() < nextConn
606 .getPendingCount()) {
607 return previousConn;
608 }
609 return nextConn;
610 }
611 }
612 else if (nextConn.isAvailable()) {
613 return nextConn;
614 }
615 }
616
617 return null;
618 }
619
620 /**
621 * Tries to create a new connection.
622 *
623 * @return The created connection or null if a connection could not be
624 * created by policy or error.
625 */
626 private Connection tryCreateConnection() {
627 if (myConnections.size() < myConfig.getMaxConnectionCount()) {
628 synchronized (myConnectionFactory) {
629 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
630 if (myConnections.size() < limit) {
631 try {
632 final Connection conn = myConnectionFactory.connect();
633
634 myConnections.add(conn);
635
636 // Add a listener for if the connection is closed.
637 conn.addPropertyChangeListener(myConnectionListener);
638
639 return conn;
640 }
641 catch (final IOException ioe) {
642 LOG.warn(ioe, "Could not create a connection.");
643 }
644 }
645 }
646 }
647
648 return null;
649 }
650
651 /**
652 * Checks if there is an active reconnect attempt on-going. If so waits for
653 * it to finish (with a timeout) and then searches for a connection again.
654 *
655 * @param message1
656 * The first message that will be sent. The connection return
657 * should be compatible with all of the messages
658 * {@link ReadPreference}.
659 * @param message2
660 * The second message that will be sent. The connection return
661 * should be compatible with all of the messages
662 * {@link ReadPreference}. May be <code>null</code>.
663 * @return The connection found after waiting or <code>null</code> if there
664 * was no active reconnect or there was still no connection.
665 */
666 private Connection waitForReconnect(final Message message1,
667 final Message message2) {
668 Connection conn = null;
669 boolean wasReconnecting = false;
670 synchronized (this) {
671 wasReconnecting = (0 < myActiveReconnects);
672 if (wasReconnecting) {
673 long now = System.currentTimeMillis();
674 final long deadline = (myConfig.getReconnectTimeout() <= 0) ? Long.MAX_VALUE
675 : now + myConfig.getReconnectTimeout();
676
677 while ((now < deadline) && (0 < myActiveReconnects)) {
678 try {
679 LOG.debug("Waiting for reconnect to MongoDB.");
680 wait(deadline - now);
681
682 now = System.currentTimeMillis();
683 }
684 catch (final InterruptedException e) {
685 // Ignored - Handled by the loop.
686 }
687 }
688 }
689 }
690
691 if (wasReconnecting) {
692 // Look again now that we may have reconnected.
693 conn = searchConnection(message1, message2, false);
694 }
695 return conn;
696 }
697
698 /**
699 * ConnectionListener provides the call back for events occurring on a
700 * connection.
701 *
702 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
703 */
704 protected class ConnectionListener implements PropertyChangeListener {
705
706 /**
707 * Creates a new ConnectionListener.
708 */
709 public ConnectionListener() {
710 super();
711 }
712
713 /**
714 * {@inheritDoc}
715 * <p>
716 * Overridden to try reconnecting a connection that has closed.
717 * </p>
718 */
719 @Override
720 public void propertyChange(final PropertyChangeEvent event) {
721 if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
722 && Boolean.FALSE.equals(event.getNewValue())) {
723 handleConnectionClosed((Connection) event.getSource());
724 }
725 }
726 }
727 }