View Javadoc
1   /*
2    * #%L
3    * ClientImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.beans.PropertyChangeEvent;
23  import java.beans.PropertyChangeListener;
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.lang.reflect.Constructor;
27  import java.util.ArrayList;
28  import java.util.List;
29  import java.util.concurrent.BlockingQueue;
30  import java.util.concurrent.CopyOnWriteArrayList;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import com.allanbank.mongodb.Durability;
36  import com.allanbank.mongodb.MongoClientConfiguration;
37  import com.allanbank.mongodb.MongoCursorControl;
38  import com.allanbank.mongodb.MongoDbException;
39  import com.allanbank.mongodb.MongoIterator;
40  import com.allanbank.mongodb.ReadPreference;
41  import com.allanbank.mongodb.StreamCallback;
42  import com.allanbank.mongodb.bson.Document;
43  import com.allanbank.mongodb.bson.DocumentAssignable;
44  import com.allanbank.mongodb.bson.NumericElement;
45  import com.allanbank.mongodb.bson.element.StringElement;
46  import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
47  import com.allanbank.mongodb.client.connection.Connection;
48  import com.allanbank.mongodb.client.connection.ConnectionFactory;
49  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
50  import com.allanbank.mongodb.client.connection.bootstrap.BootstrapConnectionFactory;
51  import com.allanbank.mongodb.client.state.Cluster;
52  import com.allanbank.mongodb.error.CannotConnectException;
53  import com.allanbank.mongodb.error.ConnectionLostException;
54  import com.allanbank.mongodb.util.IOUtils;
55  import com.allanbank.mongodb.util.log.Log;
56  import com.allanbank.mongodb.util.log.LogFactory;
57  
58  /**
59   * Implementation of the internal {@link Client} interface which all requests to
60   * the MongoDB servers pass.
61   * 
62   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
63   *         mutated in incompatible ways between any two releases of the driver.
64   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
65   */
66  public class ClientImpl extends AbstractClient {
67  
68      /** The logger for the {@link ClientImpl}. */
69      protected static final Log LOG = LogFactory.getLog(ClientImpl.class);
70  
71      /**
72       * Resolves the bootstrap connection factory to use.
73       * 
74       * @param config
75       *            The client's configuration.
76       * @return The connection factory for connecting to the cluster.
77       */
78      protected static ConnectionFactory resolveBootstrap(
79              final MongoClientConfiguration config) {
80          ConnectionFactory result;
81          try {
82              final String name = "com.allanbank.mongodb.extensions.bootstrap.ExtensionsBootstrapConnectionFactory";
83              final Class<?> clazz = Class.forName(name);
84              final Constructor<?> constructor = clazz
85                      .getConstructor(MongoClientConfiguration.class);
86  
87              result = (ConnectionFactory) constructor.newInstance(config);
88          }
89          // Too many exceptions.
90          catch (final RuntimeException e) {
91              throw e;
92          }
93          catch (final Exception e) {
94              result = new BootstrapConnectionFactory(config);
95          }
96  
97          return result;
98      }
99  
100     /** Counter for the number of reconnects currently being attempted. */
101     private int myActiveReconnects;
102 
103     /** The configuration for interacting with MongoDB. */
104     private final MongoClientConfiguration myConfig;
105 
106     /** Factory for creating connections to MongoDB. */
107     private final ConnectionFactory myConnectionFactory;
108 
109     /** The listener for changes to the state of connections. */
110     private final PropertyChangeListener myConnectionListener;
111 
112     /** The set of open connections. */
113     private final List<Connection> myConnections;
114 
115     /** The set of open connections. */
116     private final BlockingQueue<Connection> myConnectionsToClose;
117 
118     /** The sequence of the connection that was last used. */
119     private final AtomicLong myNextConnectionSequence = new AtomicLong(0);
120 
121     /**
122      * Create a new ClientImpl.
123      * 
124      * @param config
125      *            The configuration for interacting with MongoDB.
126      */
127     public ClientImpl(final MongoClientConfiguration config) {
128         this(config, resolveBootstrap(config));
129     }
130 
131     /**
132      * Create a new ClientImpl.
133      * 
134      * @param config
135      *            The configuration for interacting with MongoDB.
136      * @param connectionFactory
137      *            The source of connection for the client.
138      */
139     public ClientImpl(final MongoClientConfiguration config,
140             final ConnectionFactory connectionFactory) {
141         myConfig = config;
142         myConnectionFactory = connectionFactory;
143         myConnections = new CopyOnWriteArrayList<Connection>();
144         myConnectionsToClose = new LinkedBlockingQueue<Connection>();
145         myConnectionListener = new ConnectionListener();
146         myActiveReconnects = 0;
147     }
148 
149     /**
150      * {@inheritDoc}
151      * <p>
152      * Overridden to close all of the open connections.
153      * </p>
154      * 
155      * @see Closeable#close()
156      */
157     @Override
158     public void close() {
159         // Stop any more messages.
160         super.close();
161 
162         while (!myConnections.isEmpty()) {
163             try {
164                 final Connection conn = myConnections.remove(0);
165                 myConnectionsToClose.add(conn);
166                 conn.shutdown(false);
167             }
168             catch (final ArrayIndexOutOfBoundsException aiob) {
169                 // There is a race between the isEmpty() and the remove we can't
170                 // avoid. Next check if isEmpty() will bounce us out of the
171                 // loop.
172                 aiob.getCause(); // Shhhh - PMD.
173             }
174         }
175 
176         // Work off the connections to close until they are all closed.
177         final List<Connection> conns = new ArrayList<Connection>(
178                 myConnectionsToClose);
179         for (final Connection conn : conns) {
180             conn.waitForClosed(myConfig.getReadTimeout(), TimeUnit.MILLISECONDS);
181             if (conn.isOpen()) {
182                 // Force the connection to close.
183                 close(conn);
184             }
185         }
186 
187         // Shutdown the connections factory.
188         IOUtils.close(myConnectionFactory);
189     }
190 
191     /**
192      * {@inheritDoc}
193      * <p>
194      * Overridden to return the {@link Cluster}.
195      * </p>
196      */
197     @Override
198     public ClusterStats getClusterStats() {
199         return myConnectionFactory.getClusterStats();
200     }
201 
202     /**
203      * {@inheritDoc}
204      * <p>
205      * Overridden to return the {@link ClusterType} of the
206      * {@link ConnectionFactory}.
207      * </p>
208      */
209     @Override
210     public ClusterType getClusterType() {
211         return myConnectionFactory.getClusterType();
212     }
213 
214     /**
215      * {@inheritDoc}
216      * <p>
217      * Overridden to return the configuration used when the client was
218      * constructed.
219      * </p>
220      */
221     @Override
222     public MongoClientConfiguration getConfig() {
223         return myConfig;
224     }
225 
226     /**
227      * Returns the current number of open connections.
228      * 
229      * @return The current number of open connections.
230      */
231     public int getConnectionCount() {
232         return myConnections.size();
233     }
234 
235     /**
236      * {@inheritDoc}
237      * <p>
238      * Overridden to return the configurations default durability.
239      * </p>
240      * 
241      * @see Client#getDefaultDurability()
242      */
243     @Override
244     public Durability getDefaultDurability() {
245         return myConfig.getDefaultDurability();
246     }
247 
248     /**
249      * {@inheritDoc}
250      * <p>
251      * Overridden to return the configurations default read preference.
252      * </p>
253      * 
254      * @see Client#getDefaultReadPreference()
255      */
256     @Override
257     public ReadPreference getDefaultReadPreference() {
258         return myConfig.getDefaultReadPreference();
259     }
260 
261     /**
262      * Returns true if the document looks like a cursor restart document. e.g.,
263      * one that is created by {@link MongoIteratorImpl#asDocument()}.
264      * 
265      * @param doc
266      *            The potential cursor document.
267      * @return True if the document looks like it was created by
268      *         {@link MongoIteratorImpl#asDocument()}.
269      */
270     public boolean isCursorDocument(final Document doc) {
271         return (doc.getElements().size() == 5)
272                 && (doc.get(StringElement.class,
273                         MongoCursorControl.NAME_SPACE_FIELD) != null)
274                 && (doc.get(NumericElement.class,
275                         MongoCursorControl.CURSOR_ID_FIELD) != null)
276                 && (doc.get(StringElement.class,
277                         MongoCursorControl.SERVER_FIELD) != null)
278                 && (doc.get(NumericElement.class,
279                         MongoCursorControl.BATCH_SIZE_FIELD) != null)
280                 && (doc.get(NumericElement.class,
281                         MongoCursorControl.LIMIT_FIELD) != null);
282     }
283 
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     public MongoIterator<Document> restart(
289             final DocumentAssignable cursorDocument)
290             throws IllegalArgumentException {
291         final Document cursorDoc = cursorDocument.asDocument();
292 
293         if (isCursorDocument(cursorDoc)) {
294             final MongoIteratorImpl iter = new MongoIteratorImpl(cursorDoc,
295                     this);
296             iter.restart();
297 
298             return iter;
299         }
300 
301         throw new IllegalArgumentException(
302                 "Cannot restart without a well formed cursor document: "
303                         + cursorDoc);
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     public MongoCursorControl restart(final StreamCallback<Document> results,
311             final DocumentAssignable cursorDocument)
312             throws IllegalArgumentException {
313         final Document cursorDoc = cursorDocument.asDocument();
314 
315         if (isCursorDocument(cursorDoc)) {
316             final CursorStreamingCallback cb = new CursorStreamingCallback(
317                     this, cursorDoc, results);
318             cb.restart();
319 
320             return cb;
321         }
322         throw new IllegalArgumentException(
323                 "Cannot restart without a well formed cursor document: "
324                         + cursorDoc);
325     }
326 
327     /**
328      * {@inheritDoc}
329      * <p>
330      * Tries to locate a connection that can quickly dispatch the message to a
331      * MongoDB server. The basic metrics for determining if a connection is idle
332      * is to look at the number of messages waiting to be sent. The basic logic
333      * for finding a connection is:
334      * <ol>
335      * <li>Look at the current connection and the next connection. If either is
336      * idle, use it.</li>
337      * <li>If there are no idle connections determine the maximum number of
338      * allowed connections and if there are fewer that the maximum allowed then
339      * take the connection creation lock, create a new connection, use it, and
340      * add to the set of available connections and release the lock.</li>
341      * <li>Neither of the above works then increment the connection index and
342      * use the previous or next connection based on which has the fewest pending
343      * connections.</li>
344      * <ol>
345      */
346     @Override
347     protected Connection findConnection(final Message message1,
348             final Message message2) throws MongoDbException {
349         // Make sure we shrink connections when the max changes.
350         final int limit = Math.max(1, myConfig.getMaxConnectionCount());
351         if (limit < myConnections.size()) {
352             synchronized (myConnectionFactory) {
353                 // Mark the connections as persona non grata.
354                 while (limit < myConnections.size()) {
355                     try {
356                         final Connection conn = myConnections.remove(0);
357                         myConnectionsToClose.add(conn);
358                         conn.shutdown(false);
359                     }
360                     catch (final ArrayIndexOutOfBoundsException aiob) {
361                         // Race between the size() and remove(0).
362                         // Next loop should resolve.
363                         aiob.getCause(); // Shhhh - PMD.
364                     }
365                 }
366             }
367         }
368 
369         // Locate a connection to use.
370         final Connection conn = searchConnection(message1, message2, true);
371 
372         if (conn == null) {
373             throw new CannotConnectException(
374                     "Could not create a connection to the server.");
375         }
376 
377         return conn;
378     }
379 
380     /**
381      * Tries to reconnect previously open {@link Connection}s. If a connection
382      * was being closed then cleans up the remaining state.
383      * 
384      * @param connection
385      *            The connection that was closed.
386      */
387     protected void handleConnectionClosed(final Connection connection) {
388         // Look for the connection in the "active" set first.
389         if (myConnections.contains(connection)) {
390             // Is it a graceful shutdown?
391             if (connection.isShuttingDown() && myConnections.remove(connection)) {
392 
393                 if (myConnections.size() < myConfig.getMinConnectionCount()) {
394                     LOG.debug(
395                             "MongoDB Connection closed: {}. Will try to reconnect.",
396                             connection);
397                     reconnect(connection);
398                 }
399                 else {
400                     LOG.info("MongoDB Connection closed: {}", connection);
401                     connection
402                             .removePropertyChangeListener(myConnectionListener);
403                     connection.raiseErrors(new ConnectionLostException(
404                             "Connection shutdown."));
405                 }
406             }
407             else {
408                 // Attempt a reconnect.
409                 LOG.info("Unexpected MongoDB Connection closed: " + connection
410                         + ". Will try to reconnect.");
411                 reconnect(connection);
412             }
413         }
414         else if (myConnectionsToClose.remove(connection)) {
415             LOG.debug("MongoDB Connection closed: {}", connection);
416             connection.removePropertyChangeListener(myConnectionListener);
417         }
418         else {
419             LOG.info("Unknown MongoDB Connection closed: {}", connection);
420             connection.removePropertyChangeListener(myConnectionListener);
421         }
422     }
423 
424     /**
425      * Runs the reconnect logic for the connection.
426      * 
427      * @param connection
428      *            The connection to reconnect.
429      */
430     protected void reconnect(final Connection connection) {
431         final ReconnectStrategy strategy = myConnectionFactory
432                 .getReconnectStrategy();
433 
434         try {
435             synchronized (this) {
436                 myActiveReconnects += 1;
437             }
438 
439             final Connection newConnection = strategy.reconnect(connection);
440             if (newConnection != null) {
441                 // Get the new connection in the rotation.
442                 myConnections.add(newConnection);
443                 newConnection.addPropertyChangeListener(myConnectionListener);
444             }
445         }
446         finally {
447             myConnections.remove(connection);
448             connection.removePropertyChangeListener(myConnectionListener);
449 
450             // Raise errors for all of the pending messages - there is no way to
451             // know their state of flight between here and the server.
452             final MongoDbException exception = new ConnectionLostException(
453                     "Connection lost to MongoDB: " + connection);
454             connection.raiseErrors(exception);
455 
456             synchronized (this) {
457                 myActiveReconnects -= 1;
458                 notifyAll();
459             }
460         }
461     }
462 
463     /**
464      * Searches for a connection to use.
465      * <p>
466      * Tries to locate a connection that can quickly dispatch the message to a
467      * MongoDB server. The basic metrics for determining if a connection is idle
468      * is to look at the number of messages waiting to be sent. The basic logic
469      * for finding a connection is:
470      * <ol>
471      * <li>Look at the current connection and the next connection. If either is
472      * idle, use it.</li>
473      * <li>If there are no idle connections determine the maximum number of
474      * allowed connections and if there are fewer that the maximum allowed then
475      * take the connection creation lock, create a new connection, use it, and
476      * add to the set of available connections and release the lock.</li>
477      * <li>Neither of the above works then increment the connection index and
478      * use the previous or next connection based on which has the fewest pending
479      * connections.</li>
480      * <ol>
481      * 
482      * @param message1
483      *            The first message that will be sent. The connection return
484      *            should be compatible with all of the messages
485      *            {@link ReadPreference}.
486      * @param message2
487      *            The second message that will be sent. The connection return
488      *            should be compatible with all of the messages
489      *            {@link ReadPreference}. May be <code>null</code>.
490      * @param waitForReconnect
491      *            If true then the search will block while there is an active
492      *            reconnect attempt.
493      * 
494      * @return The {@link Connection} to send a message on.
495      * @throws MongoDbException
496      *             In the case of an error finding a {@link Connection}.
497      */
498     protected Connection searchConnection(final Message message1,
499             final Message message2, final boolean waitForReconnect)
500             throws MongoDbException {
501         // Locate a connection to use.
502         Connection conn = findIdleConnection();
503         if (conn == null) {
504             conn = tryCreateConnection();
505             if (conn == null) {
506                 conn = findMostIdleConnection();
507                 if ((conn == null) && waitForReconnect) {
508                     conn = waitForReconnect(message1, message2);
509                 }
510             }
511         }
512 
513         return conn;
514     }
515 
516     /**
517      * Silently closes the connection.
518      * 
519      * @param conn
520      *            The connection to close.
521      */
522     private void close(final Connection conn) {
523         try {
524             conn.close();
525         }
526         catch (final IOException ioe) {
527             LOG.warn(ioe, "Error closing connection to MongoDB: {}", conn);
528         }
529         finally {
530             myConnections.remove(conn);
531             myConnectionsToClose.remove(conn);
532 
533             conn.removePropertyChangeListener(myConnectionListener);
534         }
535     }
536 
537     /**
538      * Tries to find an idle connection to use from the current and next
539      * connection..
540      * 
541      * @return The idle connection, if found.
542      */
543     private Connection findIdleConnection() {
544         if (!myConnections.isEmpty()) {
545             // Only get() here to try and reuse idle connections.
546             final long connSequence = myNextConnectionSequence.get();
547             for (int loop = 0; loop < Math.min(2, myConnections.size()); ++loop) {
548 
549                 // Cast to a long to make sure the Math.abs() works for
550                 // Integer.MIN_VALUE
551                 final long sequence = Math.abs(connSequence + loop);
552                 final int size = myConnections.size();
553                 final int index = (int) (sequence % size);
554                 try {
555                     final Connection conn = myConnections.get(index);
556                     if (conn.isAvailable() && (conn.getPendingCount() == 0)) {
557                         return conn;
558                     }
559                 }
560                 catch (final ArrayIndexOutOfBoundsException aiob) {
561                     // Race between the size and get and someone closing a
562                     // connection. Next loop should fix.
563                     aiob.getCause(); // Shhh - PMD.
564                 }
565             }
566         }
567 
568         return null;
569     }
570 
571     /**
572      * Locates the most idle connection to use from the current and next
573      * connection.
574      * 
575      * @return The most idle connection.
576      */
577     private Connection findMostIdleConnection() {
578         if (!myConnections.isEmpty()) {
579             final long next = (myConnections.size() <= 1) ? 1
580                     : myNextConnectionSequence.incrementAndGet();
581             final long previous = next - 1;
582 
583             Connection previousConn = null;
584             Connection nextConn = null;
585             while ((previousConn == null) || (nextConn == null)) {
586                 try {
587                     final int size = myConnections.size();
588                     previousConn = myConnections.get((int) (previous % size));
589                     nextConn = myConnections.get((int) (next % size));
590                 }
591                 catch (final ArrayIndexOutOfBoundsException aiob) {
592                     // Race between the size and get.
593                     // Next loop should fix.
594                     aiob.getCause(); // Shhh - PMD.
595                 }
596             }
597 
598             if (previousConn == nextConn) {
599                 if (previousConn.isAvailable()) {
600                     return previousConn;
601                 }
602             }
603             else if (previousConn.isAvailable()) {
604                 if (nextConn.isAvailable()) {
605                     if (previousConn.getPendingCount() < nextConn
606                             .getPendingCount()) {
607                         return previousConn;
608                     }
609                     return nextConn;
610                 }
611             }
612             else if (nextConn.isAvailable()) {
613                 return nextConn;
614             }
615         }
616 
617         return null;
618     }
619 
620     /**
621      * Tries to create a new connection.
622      * 
623      * @return The created connection or null if a connection could not be
624      *         created by policy or error.
625      */
626     private Connection tryCreateConnection() {
627         if (myConnections.size() < myConfig.getMaxConnectionCount()) {
628             synchronized (myConnectionFactory) {
629                 final int limit = Math.max(1, myConfig.getMaxConnectionCount());
630                 if (myConnections.size() < limit) {
631                     try {
632                         final Connection conn = myConnectionFactory.connect();
633 
634                         myConnections.add(conn);
635 
636                         // Add a listener for if the connection is closed.
637                         conn.addPropertyChangeListener(myConnectionListener);
638 
639                         return conn;
640                     }
641                     catch (final IOException ioe) {
642                         LOG.warn(ioe, "Could not create a connection.");
643                     }
644                 }
645             }
646         }
647 
648         return null;
649     }
650 
651     /**
652      * Checks if there is an active reconnect attempt on-going. If so waits for
653      * it to finish (with a timeout) and then searches for a connection again.
654      * 
655      * @param message1
656      *            The first message that will be sent. The connection return
657      *            should be compatible with all of the messages
658      *            {@link ReadPreference}.
659      * @param message2
660      *            The second message that will be sent. The connection return
661      *            should be compatible with all of the messages
662      *            {@link ReadPreference}. May be <code>null</code>.
663      * @return The connection found after waiting or <code>null</code> if there
664      *         was no active reconnect or there was still no connection.
665      */
666     private Connection waitForReconnect(final Message message1,
667             final Message message2) {
668         Connection conn = null;
669         boolean wasReconnecting = false;
670         synchronized (this) {
671             wasReconnecting = (0 < myActiveReconnects);
672             if (wasReconnecting) {
673                 long now = System.currentTimeMillis();
674                 final long deadline = (myConfig.getReconnectTimeout() <= 0) ? Long.MAX_VALUE
675                         : now + myConfig.getReconnectTimeout();
676 
677                 while ((now < deadline) && (0 < myActiveReconnects)) {
678                     try {
679                         LOG.debug("Waiting for reconnect to MongoDB.");
680                         wait(deadline - now);
681 
682                         now = System.currentTimeMillis();
683                     }
684                     catch (final InterruptedException e) {
685                         // Ignored - Handled by the loop.
686                     }
687                 }
688             }
689         }
690 
691         if (wasReconnecting) {
692             // Look again now that we may have reconnected.
693             conn = searchConnection(message1, message2, false);
694         }
695         return conn;
696     }
697 
698     /**
699      * ConnectionListener provides the call back for events occurring on a
700      * connection.
701      * 
702      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
703      */
704     protected class ConnectionListener implements PropertyChangeListener {
705 
706         /**
707          * Creates a new ConnectionListener.
708          */
709         public ConnectionListener() {
710             super();
711         }
712 
713         /**
714          * {@inheritDoc}
715          * <p>
716          * Overridden to try reconnecting a connection that has closed.
717          * </p>
718          */
719         @Override
720         public void propertyChange(final PropertyChangeEvent event) {
721             if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
722                     && Boolean.FALSE.equals(event.getNewValue())) {
723                 handleConnectionClosed((Connection) event.getSource());
724             }
725         }
726     }
727 }