View Javadoc
1   /*
2    * #%L
3    * ReplicaSetConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.connection.rs;
22  
23  import java.io.IOException;
24  import java.util.List;
25  
26  import com.allanbank.mongodb.MongoClientConfiguration;
27  import com.allanbank.mongodb.MongoDbException;
28  import com.allanbank.mongodb.client.Message;
29  import com.allanbank.mongodb.client.connection.Connection;
30  import com.allanbank.mongodb.client.connection.proxy.AbstractProxyMultipleConnection;
31  import com.allanbank.mongodb.client.connection.proxy.ConnectionInfo;
32  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
33  import com.allanbank.mongodb.client.state.Cluster;
34  import com.allanbank.mongodb.client.state.Server;
35  import com.allanbank.mongodb.util.log.Log;
36  import com.allanbank.mongodb.util.log.LogFactory;
37  
38  /**
39   * Provides a {@link Connection} implementation for connecting to a replica-set
40   * environment.
41   * 
42   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
43   *         mutated in incompatible ways between any two releases of the driver.
44   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
45   */
46  public class ReplicaSetConnection extends
47          AbstractProxyMultipleConnection<Server> {
48  
49      /** The logger for the {@link ReplicaSetConnection}. */
50      private static final Log LOG = LogFactory
51              .getLog(ReplicaSetConnection.class);
52  
53      /** The strategy for reconnecting/finding the primary. */
54      private volatile ReplicaSetReconnectStrategy myReconnectStrategy;
55  
56      /**
57       * Creates a new {@link ReplicaSetConnection}.
58       * 
59       * @param proxiedConnection
60       *            The connection being proxied.
61       * @param server
62       *            The primary server this connection is connected to.
63       * @param cluster
64       *            The state of the cluster for finding secondary connections.
65       * @param factory
66       *            The connection factory for opening secondary connections.
67       * @param config
68       *            The MongoDB client configuration.
69       * @param strategy
70       *            The strategy for reconnecting/finding the primary.
71       */
72      public ReplicaSetConnection(final Connection proxiedConnection,
73              final Server server, final Cluster cluster,
74              final ProxiedConnectionFactory factory,
75              final MongoClientConfiguration config,
76              final ReplicaSetReconnectStrategy strategy) {
77          super(proxiedConnection, server, cluster, factory, config);
78  
79          myReconnectStrategy = strategy;
80      }
81  
82      /**
83       * {@inheritDoc}
84       * <p>
85       * Overridden to return the canonical name of the primary.
86       * </p>
87       */
88      @Override
89      public String getServerName() {
90          if (myMainKey != null) {
91              return myMainKey.getCanonicalName();
92          }
93          return "UNKNOWN";
94      }
95  
96      /**
97       * {@inheritDoc}
98       * <p>
99       * Overridden to create a connection to the server.
100      * </p>
101      */
102     @Override
103     protected Connection connect(final Server server) {
104         Connection conn = null;
105         try {
106             conn = myFactory.connect(server, myConfig);
107 
108             conn = cacheConnection(server, conn);
109         }
110         catch (final IOException e) {
111             LOG.info("Could not connect to the server '"
112                     + server.getCanonicalName() + "': " + e.getMessage());
113         }
114         return conn;
115     }
116 
117     /**
118      * Locates the set of servers that can be used to send the specified
119      * messages. This method will attempt to connect to the primary server if
120      * there is not a current connection to the primary.
121      * 
122      * @param message1
123      *            The first message to send.
124      * @param message2
125      *            The second message to send. May be <code>null</code>.
126      * @return The servers that can be used.
127      * @throws MongoDbException
128      *             On a failure to locate a server that all messages can be sent
129      *             to.
130      */
131     @Override
132     protected List<Server> findPotentialKeys(final Message message1,
133             final Message message2) throws MongoDbException {
134         List<Server> servers = myCluster.findServers(message1, message2);
135 
136         if (servers.isEmpty()) {
137             // If we get here and a reconnect is in progress then
138             // block for the reconnect. The primary will have been marked a
139             // secondary... Once the reconnect is complete, try again.
140             if (myMainKey == null) {
141                 // Wait for a reconnect.
142                 final ConnectionInfo<Server> newConnInfo = reconnectMain();
143                 if (newConnInfo != null) {
144                     updateMain(newConnInfo);
145                     servers = myCluster.findServers(message1, message2);
146                 }
147             }
148 
149             if (servers.isEmpty()) {
150                 throw createReconnectFailure(message1, message2);
151             }
152         }
153 
154         return servers;
155     }
156 
157     /**
158      * {@inheritDoc}
159      * <p>
160      * Overridden to return the string {@code ReplicaSet}.
161      * </p>
162      */
163     @Override
164     protected String getConnectionType() {
165         return "ReplicaSet";
166     }
167 
168     /**
169      * {@inheritDoc}
170      * <p>
171      * Overridden creates a connection back to the primary server.
172      * </p>
173      */
174     @Override
175     protected ConnectionInfo<Server> reconnectMain() {
176         return myReconnectStrategy.reconnectPrimary();
177     }
178 
179 }