View Javadoc
1   /*
2    * #%L
3    * ShardedConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.sharded;
22  
23  import java.io.IOException;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import com.allanbank.mongodb.MongoClientConfiguration;
28  import com.allanbank.mongodb.MongoDbException;
29  import com.allanbank.mongodb.ReadPreference;
30  import com.allanbank.mongodb.client.Message;
31  import com.allanbank.mongodb.client.connection.Connection;
32  import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection;
33  import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
34  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
35  import com.allanbank.mongodb.client.state.Cluster;
36  import com.allanbank.mongodb.client.state.Server;
37  import com.allanbank.mongodb.client.state.ServerSelector;
38  import com.allanbank.mongodb.util.log.Log;
39  import com.allanbank.mongodb.util.log.LogFactory;
40  
41  /**
42   * Provides a {@link Connection} implementation for connecting to a sharded
43   * environment via mongos servers.
44   * 
45   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
46   *         mutated in incompatible ways between any two releases of the driver.
47   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
48   */
49  public class ShardedConnection extends AbstractProxyMultipleConnection<Server> {
50  
51      /** The logger for the {@link ShardedConnection}. */
52      private static final Log LOG = LogFactory.getLog(ShardedConnection.class);
53  
54      /** The selector for the server when we need to reconnect. */
55      private final ServerSelector mySelector;
56  
57      /**
58       * Creates a new {@link ShardedConnection}.
59       * 
60       * @param proxiedConnection
61       *            The connection being proxied.
62       * @param server
63       *            The primary server this connection is connected to.
64       * @param cluster
65       *            The state of the cluster for finding secondary connections.
66       * @param selector
67       *            The selector for servers when we need to reconnect.
68       * @param factory
69       *            The connection factory for opening secondary connections.
70       * @param config
71       *            The MongoDB client configuration.
72       */
73      public ShardedConnection(final Connection proxiedConnection,
74              final Server server, final Cluster cluster,
75              final ServerSelector selector,
76              final ProxiedConnectionFactory factory,
77              final MongoClientConfiguration config) {
78          super(proxiedConnection, server, cluster, factory, config);
79  
80          mySelector = selector;
81      }
82  
83      /**
84       * {@inheritDoc}
85       * <p>
86       * Overridden to return the canonical name of the primary.
87       * </p>
88       */
89      @Override
90      public String getServerName() {
91          if (myMainKey != null) {
92              return myMainKey.getCanonicalName();
93          }
94          return "UNKNOWN";
95      }
96  
97      /**
98       * {@inheritDoc}
99       * <p>
100      * Overridden to create a connection to the server.
101      * </p>
102      */
103     @Override
104     protected Connection connect(final Server server) {
105         Connection conn = null;
106         try {
107             conn = myFactory.connect(server, myConfig);
108 
109             conn = cacheConnection(server, conn);
110         }
111         catch (final IOException e) {
112             LOG.info(e, "Could not connect to the server '{}': {}",
113                     server.getCanonicalName(), e.getMessage());
114         }
115         return conn;
116     }
117 
118     /**
119      * {@inheritDoc}
120      * <p>
121      * Overridden for testing access.
122      * </p>
123      */
124     @Override
125     protected Connection connection(final Server server) {
126         LOG.debug("Lookup connection for server: {}", server.getCanonicalName());
127         return super.connection(server);
128     }
129 
130     /**
131      * Locates the set of servers that can be used to send the specified
132      * messages. This method will attempt to connect to the primary server if
133      * there is not a current connection to the primary.
134      * 
135      * @param message1
136      *            The first message to send.
137      * @param message2
138      *            The second message to send. May be <code>null</code>.
139      * @return The servers that can be used.
140      * @throws MongoDbException
141      *             On a failure to locate a server that all messages can be sent
142      *             to.
143      */
144     @Override
145     protected List<Server> findPotentialKeys(final Message message1,
146             final Message message2) throws MongoDbException {
147         List<Server> servers = resolveServerReadPreference(message1, message2);
148 
149         if (servers.isEmpty()) {
150             // If we get here and a reconnect is in progress then
151             // block for the reconnect. Once the reconnect is complete, try
152             // again.
153             if (myMainKey == null) {
154                 // Wait for a reconnect.
155                 final ConnectionInfo<Server> newConnInfo = reconnectMain();
156                 if (newConnInfo != null) {
157                     updateMain(newConnInfo);
158                     servers = resolveServerReadPreference(message1, message2);
159                 }
160             }
161 
162             if (servers.isEmpty()) {
163                 throw createReconnectFailure(message1, message2);
164             }
165         }
166 
167         return servers;
168     }
169 
170     /**
171      * {@inheritDoc}
172      * <p>
173      * Overridden to return the string {@code Sharded}.
174      * </p>
175      */
176     @Override
177     protected String getConnectionType() {
178         return "Sharded";
179     }
180 
181     /**
182      * {@inheritDoc}
183      * <p>
184      * Overridden creates a connection back to the primary server.
185      * </p>
186      */
187     @Override
188     protected ConnectionInfo<Server> reconnectMain() {
189         for (final Server server : mySelector.pickServers()) {
190             try {
191                 final Connection conn = myFactory.connect(server, myConfig);
192 
193                 return new ConnectionInfo<Server>(conn, server);
194             }
195             catch (final IOException ioe) {
196                 // Ignored. Will return null.
197                 LOG.debug(ioe, "Could not connect to '{}': {}",
198                         server.getCanonicalName(), ioe.getMessage());
199             }
200         }
201         return null;
202     }
203 
204     /**
205      * Locates the set of servers that can be used to send the specified
206      * messages.
207      * 
208      * @param message1
209      *            The first message to send.
210      * @param message2
211      *            The second message to send. May be <code>null</code>.
212      * @return The servers that can be used.
213      */
214     private List<Server> resolveServerReadPreference(final Message message1,
215             final Message message2) {
216 
217         List<Server> servers = Collections.emptyList();
218 
219         final Server main = myMainKey;
220         if (main != null) {
221             servers = Collections.singletonList(main);
222         }
223 
224         if (message1 != null) {
225             ReadPreference pref = message1.getReadPreference();
226             if (pref.getServer() != null) {
227                 servers = Collections.singletonList(myCluster.get(pref
228                         .getServer()));
229             }
230             else if (message2 != null) {
231                 pref = message2.getReadPreference();
232                 if (pref.getServer() != null) {
233                     servers = Collections.singletonList(myCluster.get(pref
234                             .getServer()));
235                 }
236             }
237         }
238         return servers;
239     }
240 }