View Javadoc
1   /*
2    * #%L
3    * AbstractProxyMultipleConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.proxy;
22  
23  import java.beans.PropertyChangeEvent;
24  import java.beans.PropertyChangeListener;
25  import java.beans.PropertyChangeSupport;
26  import java.io.IOException;
27  import java.util.Arrays;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentMap;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import com.allanbank.mongodb.Callback;
39  import com.allanbank.mongodb.MongoClientConfiguration;
40  import com.allanbank.mongodb.MongoDbException;
41  import com.allanbank.mongodb.ReadPreference;
42  import com.allanbank.mongodb.client.Message;
43  import com.allanbank.mongodb.client.callback.ReplyCallback;
44  import com.allanbank.mongodb.client.connection.Connection;
45  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
46  import com.allanbank.mongodb.client.state.Cluster;
47  import com.allanbank.mongodb.error.ConnectionLostException;
48  import com.allanbank.mongodb.util.log.Log;
49  import com.allanbank.mongodb.util.log.LogFactory;
50  
51  /**
52   * AbstractProxyMultipleConnection provides the core functionality for a
53   * connection that multiplexes requests across multiple connections.
54   * 
55   * @param <K>
56   *            The key used to track the various connections.
57   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
58   *         mutated in incompatible ways between any two releases of the driver.
59   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
60   */
61  public abstract class AbstractProxyMultipleConnection<K> implements Connection {
62  
63      /** The logger for the {@link AbstractProxyMultipleConnection}. */
64      private static final Log LOG = LogFactory
65              .getLog(AbstractProxyMultipleConnection.class);
66  
67      /** The state of the cluster for finding secondary connections. */
68      protected final Cluster myCluster;
69  
70      /** The MongoDB client configuration. */
71      protected final MongoClientConfiguration myConfig;
72  
73      /** Support for emitting property change events. */
74      protected final PropertyChangeSupport myEventSupport;
75  
76      /** The connection factory for opening secondary connections. */
77      protected final ProxiedConnectionFactory myFactory;
78  
79      /** The most recently used connection. */
80      protected final AtomicReference<Connection> myLastUsedConnection;
81  
82      /** The listener for changes in the cluster and connections. */
83      protected final PropertyChangeListener myListener;
84  
85      /** The primary instance this connection is connected to. */
86      protected volatile K myMainKey;
87  
88      /** Set to false when the connection is closed. */
89      protected final AtomicBoolean myOpen;
90  
91      /** Set to true when the connection should be gracefully closed. */
92      protected final AtomicBoolean myShutdown;
93  
94      /** The servers this connection is connected to. */
95      /* package */final ConcurrentMap<K, Connection> myConnections;
96  
97      /**
98       * Creates a new {@link AbstractProxyMultipleConnection}.
99       * 
100      * @param proxiedConnection
101      *            The connection being proxied.
102      * @param server
103      *            The primary server this connection is connected to.
104      * @param cluster
105      *            The state of the cluster for finding secondary connections.
106      * @param factory
107      *            The connection factory for opening secondary connections.
108      * @param config
109      *            The MongoDB client configuration.
110      */
111     public AbstractProxyMultipleConnection(final Connection proxiedConnection,
112             final K server, final Cluster cluster,
113             final ProxiedConnectionFactory factory,
114             final MongoClientConfiguration config) {
115         myMainKey = server;
116         myCluster = cluster;
117         myFactory = factory;
118         myConfig = config;
119 
120         myOpen = new AtomicBoolean(true);
121         myShutdown = new AtomicBoolean(false);
122         myEventSupport = new PropertyChangeSupport(this);
123         myConnections = new ConcurrentHashMap<K, Connection>();
124         myLastUsedConnection = new AtomicReference<Connection>(
125                 proxiedConnection);
126 
127         myListener = new ClusterAndConnectionListener();
128         myCluster.addListener(myListener);
129 
130         if (proxiedConnection != null) {
131             cacheConnection(server, proxiedConnection);
132         }
133     }
134 
135     /**
136      * {@inheritDoc}
137      * <p>
138      * Overridden to add this listener to this connection's event source.
139      * </p>
140      */
141     @Override
142     public void addPropertyChangeListener(final PropertyChangeListener listener) {
143         myEventSupport.addPropertyChangeListener(listener);
144     }
145 
146     /**
147      * Closes the underlying connection.
148      * 
149      * @see Connection#close()
150      */
151     @Override
152     public void close() {
153 
154         myOpen.set(false);
155         myCluster.removeListener(myListener);
156 
157         for (final Connection conn : myConnections.values()) {
158             try {
159                 conn.removePropertyChangeListener(myListener);
160                 conn.close();
161             }
162             catch (final IOException ioe) {
163                 LOG.warn(ioe, "Could not close the connection: {}", conn);
164             }
165         }
166     }
167 
168     /**
169      * {@inheritDoc}
170      * <p>
171      * Forwards the call to the proxied {@link Connection}.
172      * </p>
173      * 
174      * @see java.io.Flushable#flush()
175      */
176     @Override
177     public void flush() throws IOException {
178         for (final Connection conn : myConnections.values()) {
179             try {
180                 conn.flush();
181             }
182             catch (final IOException ioe) {
183                 LOG.warn(ioe, "Could not flush the connection: {}", conn);
184             }
185         }
186     }
187 
188     /**
189      * {@inheritDoc}
190      * <p>
191      * Overridden to return the pending count for the last connection used to
192      * send a message.
193      * </p>
194      */
195     @Override
196     public int getPendingCount() {
197         return myLastUsedConnection.get().getPendingCount();
198     }
199 
200     /**
201      * {@inheritDoc}
202      * <p>
203      * True if the connection is open and not shutting down.
204      * </p>
205      */
206     @Override
207     public boolean isAvailable() {
208         return isOpen() && !isShuttingDown();
209     }
210 
211     /**
212      * {@inheritDoc}
213      * <p>
214      * Overridden to return if the last used connection is idle.
215      * </p>
216      */
217     @Override
218     public boolean isIdle() {
219         return myLastUsedConnection.get().isIdle();
220     }
221 
222     /**
223      * {@inheritDoc}
224      * <p>
225      * Overridden to return if this connection has any open connections.
226      * </p>
227      */
228     @Override
229     public boolean isOpen() {
230         return myOpen.get();
231     }
232 
233     /**
234      * {@inheritDoc}
235      * <p>
236      * Overridden to return if the last used connection is shutting down.
237      * </p>
238      */
239     @Override
240     public boolean isShuttingDown() {
241         return myShutdown.get();
242     }
243 
244     /**
245      * {@inheritDoc}
246      * <p>
247      * Overridden to raise the errors with all of the underlying connections.
248      * </p>
249      */
250     @Override
251     public void raiseErrors(final MongoDbException exception) {
252         for (final Connection conn : myConnections.values()) {
253             conn.raiseErrors(exception);
254         }
255     }
256 
257     /**
258      * {@inheritDoc}
259      * <p>
260      * Overridden to remove the listener from this connection.
261      * </p>
262      */
263     @Override
264     public void removePropertyChangeListener(
265             final PropertyChangeListener listener) {
266         myEventSupport.removePropertyChangeListener(listener);
267     }
268 
269     /**
270      * {@inheritDoc}
271      * <p>
272      * Locates all of the potential servers that can receive all of the
273      * messages. Tries to then send the messages to a server with a connection
274      * already open or failing that tries to open a connection to open of the
275      * servers.
276      * </p>
277      */
278     @Override
279     public void send(final Message message1, final Message message2,
280             final ReplyCallback replyCallback) throws MongoDbException {
281 
282         if (!isAvailable()) {
283             throw new ConnectionLostException("Connection shutting down.");
284         }
285 
286         final List<K> servers = findPotentialKeys(message1, message2);
287         if (!trySend(servers, message1, message2, replyCallback)) {
288             throw new MongoDbException(
289                     "Could not send the messages to any of the potential servers.");
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      * <p>
296      * Locates all of the potential servers that can receive all of the
297      * messages. Tries to then send the messages to a server with a connection
298      * already open or failing that tries to open a connection to open of the
299      * servers.
300      * </p>
301      */
302     @Override
303     public void send(final Message message, final ReplyCallback replyCallback)
304             throws MongoDbException {
305         send(message, null, replyCallback);
306     }
307 
308     /**
309      * {@inheritDoc}
310      * <p>
311      * Overridden to shutdown all of the underlying connections.
312      * </p>
313      */
314     @Override
315     public void shutdown(final boolean force) {
316         myShutdown.set(true);
317         for (final Connection conn : myConnections.values()) {
318             conn.shutdown(force);
319         }
320     }
321 
322     /**
323      * {@inheritDoc}
324      * <p>
325      * Overridden to return the socket information.
326      * </p>
327      */
328     @Override
329     public String toString() {
330         return getConnectionType() + "(" + myLastUsedConnection.get() + ")";
331     }
332 
333     /**
334      * {@inheritDoc}
335      * <p>
336      * Overridden to wait for all of the underlying connections to close.
337      * </p>
338      */
339     @Override
340     public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
341         final long millis = timeoutUnits.toMillis(timeout);
342         long now = System.currentTimeMillis();
343         final long deadline = now + millis;
344 
345         for (final Connection conn : myConnections.values()) {
346             if (now < deadline) {
347                 conn.waitForClosed((int) (deadline - now),
348                         TimeUnit.MILLISECONDS);
349                 now = System.currentTimeMillis();
350             }
351         }
352     }
353 
354     /**
355      * Caches the connection to the server if there is not already a connection
356      * in the cache. If there is a connection already in the cache then the one
357      * provided is closed and the cached connection it returned.
358      * 
359      * @param server
360      *            The server connected to.
361      * @param conn
362      *            The connection to cache, if possible.
363      * @return The connection in the cache.
364      */
365     protected Connection cacheConnection(final K server, final Connection conn) {
366         final Connection existing = myConnections.putIfAbsent(server, conn);
367         if (existing != null) {
368             conn.shutdown(true);
369             return existing;
370         }
371 
372         // Listener to the connection for it to close.
373         conn.addPropertyChangeListener(myListener);
374 
375         return conn;
376     }
377 
378     /**
379      * Attempts to create a connection to the server, catching any exceptions
380      * thrown. If a connection is created it should be
381      * {@link #cacheConnection(Object, Connection) cached}.
382      * 
383      * @param server
384      *            The server to connect to.
385      * @return The connection to the server.
386      */
387     protected abstract Connection connect(final K server);
388 
389     /**
390      * Returns the cached connection for the specified key. This method may
391      * return {@code null}.
392      * 
393      * @param server
394      *            The server connected to.
395      * @return The connection in the cache.
396      */
397     protected Connection connection(final K server) {
398         return myConnections.get(server);
399     }
400 
401     /**
402      * Creates a exception for a reconnect failure.
403      * 
404      * @param message1
405      *            The first message to send.
406      * @param message2
407      *            The second message to send.
408      * @return The exception.
409      */
410     protected MongoDbException createReconnectFailure(final Message message1,
411             final Message message2) {
412         final StringBuilder builder = new StringBuilder(
413                 "Could not find any servers for the following set of read preferences: ");
414         final Set<ReadPreference> seen = new HashSet<ReadPreference>();
415         for (final Message message : Arrays.asList(message1, message2)) {
416             if (message != null) {
417                 final ReadPreference prefs = message.getReadPreference();
418                 if (seen.add(prefs)) {
419                     if (seen.size() > 1) {
420                         builder.append(", ");
421                     }
422                     builder.append(prefs);
423                 }
424             }
425         }
426         builder.append('.');
427 
428         return new MongoDbException(builder.toString());
429     }
430 
431     /**
432      * Sends the message on the connection.
433      * 
434      * @param conn
435      *            The connection to send on.
436      * @param message1
437      *            The first message to send.
438      * @param message2
439      *            The second message to send, may be <code>null</code>.
440      * @param reply
441      *            The reply {@link Callback}.
442      */
443     protected void doSend(final Connection conn, final Message message1,
444             final Message message2, final ReplyCallback reply) {
445 
446         // Use the connection for metrics etc.
447         myLastUsedConnection.lazySet(conn);
448 
449         if (message2 == null) {
450             conn.send(message1, reply);
451         }
452         else {
453             conn.send(message1, message2, reply);
454         }
455     }
456 
457     /**
458      * Locates the set of servers that can be used to send the specified
459      * messages. This method will attempt to connect to the primary server if
460      * there is not a current connection to the primary.
461      * 
462      * @param message1
463      *            The first message to send.
464      * @param message2
465      *            The second message to send. May be <code>null</code>.
466      * @return The servers that can be used.
467      * @throws MongoDbException
468      *             On a failure to locate a server that all messages can be sent
469      *             to.
470      */
471     protected abstract List<K> findPotentialKeys(final Message message1,
472             final Message message2) throws MongoDbException;
473 
474     /**
475      * Returns the type of connection (for logs, etc.).
476      * 
477      * @return The connection type.
478      */
479     protected abstract String getConnectionType();
480 
481     /**
482      * Tries to reconnect previously open {@link Connection}s. If a connection
483      * was being closed then cleans up the remaining state.
484      * 
485      * @param connection
486      *            The connection that was closed.
487      */
488     protected synchronized void handleConnectionClosed(
489             final Connection connection) {
490 
491         if (!myOpen.get()) {
492             return;
493         }
494 
495         final K server = findKeyForConnection(connection);
496 
497         try {
498             // If this is the last connection then go ahead and close this
499             // replica set connection so the number of active connections can
500             // shrink. Only close this connection on a graceful primary
501             // shutdown to pick up when a primary change happens.
502             final K primary = myMainKey;
503             if ((myConnections.size() == 1)
504                     && (!server.equals(primary) || connection.isShuttingDown())) {
505 
506                 // Mark this a graceful shutdown.
507                 removeCachedConnection(server, connection);
508                 shutdown(true);
509 
510                 myEventSupport.firePropertyChange(Connection.OPEN_PROP_NAME,
511                         true, isOpen());
512             }
513             // If the connection that closed was the primary then we need to
514             // reconnect.
515             else if (server.equals(primary) && isOpen()) {
516                 // Not sure who is primary any more.
517                 myMainKey = null;
518 
519                 LOG.info("Primary MongoDB Connection closed: {}({}). "
520                         + "Will try to reconnect.", getConnectionType(),
521                         connection);
522 
523                 // Need to use the reconnect logic to find the new primary.
524                 final ConnectionInfo<K> newConn = reconnectMain();
525                 if (newConn != null) {
526                     removeCachedConnection(server, connection);
527                     updateMain(newConn);
528                 }
529                 // Else could not find a primary. Likely in a bad state but let
530                 // the connection stay for secondary queries if we have another
531                 // connection.
532                 else if (myConnections.size() == 1) {
533                     // Mark this a graceful shutdown.
534                     removeCachedConnection(server, connection);
535                     shutdown(false);
536 
537                     myEventSupport.firePropertyChange(
538                             Connection.OPEN_PROP_NAME, true, isOpen());
539                 }
540             }
541             // Just remove the connection (above).
542             else {
543                 LOG.debug("MongoDB Connection closed: {}({}).",
544                         getConnectionType(), connection);
545             }
546         }
547         finally {
548             // Make sure we always remove the closed connection.
549             removeCachedConnection(server, connection);
550             connection.raiseErrors(new ConnectionLostException(
551                     "Connection closed."));
552         }
553     }
554 
555     /**
556      * Creates a connection back to the main server for this connection.
557      * 
558      * @return The information for the new connection.
559      */
560     protected abstract ConnectionInfo<K> reconnectMain();
561 
562     /**
563      * Remove the connection from the cache.
564      * 
565      * @param key
566      *            The key to remove the connection for.
567      * @param connection
568      *            The connection to remove (if known).
569      */
570     protected void removeCachedConnection(final Object key,
571             final Connection connection) {
572         Connection conn = connection;
573         if (connection == null) {
574             conn = myConnections.remove(key);
575         }
576         else if (!myConnections.remove(key, connection)) {
577             // Different connection found.
578             conn = null;
579         }
580 
581         if (conn != null) {
582             conn.removePropertyChangeListener(myListener);
583             conn.shutdown(true);
584         }
585     }
586 
587     /**
588      * Tries to send the messages to the first server with either an open
589      * connection or that we can open a connection to.
590      * 
591      * @param servers
592      *            The servers the messages can be sent to.
593      * @param message1
594      *            The first message to send.
595      * @param message2
596      *            The second message to send. May be <code>null</code>.
597      * @param reply
598      *            The callback for the replies.
599      * @return The true if the message was sent.
600      */
601     protected boolean trySend(final List<K> servers, final Message message1,
602             final Message message2, final ReplyCallback reply) {
603         for (final K server : servers) {
604 
605             Connection conn = myConnections.get(server);
606 
607             // See if we need to create a connection.
608             if (conn == null) {
609                 // Create one.
610                 conn = connect(server);
611             }
612             else if (!conn.isAvailable()) {
613 
614                 removeCachedConnection(server, conn);
615 
616                 final ReconnectStrategy strategy = myFactory
617                         .getReconnectStrategy();
618                 conn = strategy.reconnect(conn);
619                 if (conn != null) {
620                     conn = cacheConnection(server, conn);
621                 }
622             }
623 
624             if (conn != null) {
625                 doSend(conn, message1, message2, reply);
626                 return true;
627             }
628         }
629 
630         return false;
631     }
632 
633     /**
634      * Update the state with the new primary server.
635      * 
636      * @param newConn
637      *            The new primary server.
638      */
639     protected void updateMain(final ConnectionInfo<K> newConn) {
640         myMainKey = newConn.getConnectionKey();
641 
642         // Add the connection to the cache. This also gets the listener
643         // attached.
644         cacheConnection(newConn.getConnectionKey(), newConn.getConnection());
645     }
646 
647     /**
648      * Finds the server for the connection.
649      * 
650      * @param connection
651      *            The connection to remove.
652      * @return The K for the connection.
653      */
654     private K findKeyForConnection(final Connection connection) {
655         for (final Map.Entry<K, Connection> entry : myConnections.entrySet()) {
656             if (entry.getValue() == connection) {
657                 return entry.getKey();
658             }
659         }
660         return null;
661     }
662 
663     /**
664      * ClusterListener provides a listener for changes in the cluster.
665      * 
666      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
667      *         be mutated in incompatible ways between any two releases of the
668      *         driver.
669      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
670      */
671     protected final class ClusterAndConnectionListener implements
672             PropertyChangeListener {
673         @Override
674         public void propertyChange(final PropertyChangeEvent event) {
675             final String propName = event.getPropertyName();
676             if (Cluster.SERVER_PROP.equals(propName)
677                     && (event.getNewValue() == null)) {
678                 // A K has been removed. Close the connection.
679                 removeCachedConnection(event.getOldValue(), null);
680             }
681             else if (Connection.OPEN_PROP_NAME.equals(event.getPropertyName())
682                     && Boolean.FALSE.equals(event.getNewValue())) {
683                 handleConnectionClosed((Connection) event.getSource());
684             }
685         }
686 
687     }
688 }