1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package com.allanbank.mongodb.client.connection.bootstrap;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26 import java.util.logging.Level;
27
28 import com.allanbank.mongodb.MongoClientConfiguration;
29 import com.allanbank.mongodb.bson.Document;
30 import com.allanbank.mongodb.bson.Element;
31 import com.allanbank.mongodb.bson.element.StringElement;
32 import com.allanbank.mongodb.client.ClusterStats;
33 import com.allanbank.mongodb.client.ClusterType;
34 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
35 import com.allanbank.mongodb.client.connection.Connection;
36 import com.allanbank.mongodb.client.connection.ConnectionFactory;
37 import com.allanbank.mongodb.client.connection.ReconnectStrategy;
38 import com.allanbank.mongodb.client.connection.auth.AuthenticationConnectionFactory;
39 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
40 import com.allanbank.mongodb.client.connection.rs.ReplicaSetConnectionFactory;
41 import com.allanbank.mongodb.client.connection.sharded.ShardedConnectionFactory;
42 import com.allanbank.mongodb.client.connection.socket.SocketConnectionFactory;
43 import com.allanbank.mongodb.client.message.IsMaster;
44 import com.allanbank.mongodb.client.message.Reply;
45 import com.allanbank.mongodb.client.state.Cluster;
46 import com.allanbank.mongodb.error.CannotConnectException;
47 import com.allanbank.mongodb.util.IOUtils;
48 import com.allanbank.mongodb.util.log.Log;
49 import com.allanbank.mongodb.util.log.LogFactory;
50
51
52
53
54
55
56
57
58
59
60 public class BootstrapConnectionFactory implements ConnectionFactory {
61
62
63 protected static final Log LOG = LogFactory
64 .getLog(BootstrapConnectionFactory.class);
65
66
67 private final MongoClientConfiguration myConfig;
68
69
70 private ConnectionFactory myDelegate = null;
71
72
73
74
75
76
77
78
79 public BootstrapConnectionFactory(final MongoClientConfiguration config) {
80 myConfig = config;
81 }
82
83
84
85
86
87
88
89 @Override
90 public void close() {
91 IOUtils.close(myDelegate);
92 }
93
94
95
96
97
98
99
100 @Override
101 public Connection connect() throws IOException {
102 return getDelegate().connect();
103 }
104
105
106
107
108
109
110
111
112 @Override
113 public ClusterStats getClusterStats() {
114 return getDelegate().getClusterStats();
115 }
116
117
118
119
120
121
122
123
124 @Override
125 public ClusterType getClusterType() {
126 return getDelegate().getClusterType();
127 }
128
129
130
131
132
133
134
135 @Override
136 public ReconnectStrategy getReconnectStrategy() {
137 return getDelegate().getReconnectStrategy();
138 }
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 protected void bootstrap() {
160 final SocketConnectionFactory socketFactory = new SocketConnectionFactory(
161 myConfig);
162 ProxiedConnectionFactory factory = socketFactory;
163
164
165
166 if (myConfig.isAuthenticating()) {
167 factory = new AuthenticationConnectionFactory(factory, myConfig);
168 }
169
170 try {
171
172 final Cluster cluster = socketFactory.getCluster();
173 for (final InetSocketAddress addr : myConfig.getServerAddresses()) {
174 Connection conn = null;
175 final FutureReplyCallback future = new FutureReplyCallback();
176 try {
177 conn = factory.connect(cluster.add(addr), myConfig);
178
179 conn.send(new IsMaster(), future);
180 final Reply reply = future.get();
181
182
183 IOUtils.close(conn);
184
185 final List<Document> results = reply.getResults();
186 if (!results.isEmpty()) {
187 final Document doc = results.get(0);
188
189 if (isMongos(doc)) {
190 LOG.debug("Sharded bootstrap to {}.", addr);
191 cluster.clear();
192 myDelegate = bootstrapSharded(factory);
193 }
194 else if (isReplicationSet(doc)) {
195 LOG.debug("Replica-set bootstrap to {}.", addr);
196 cluster.clear();
197 myDelegate = bootstrapReplicaSet(factory);
198 }
199 else {
200 LOG.debug("Simple MongoDB bootstrap to {}.", addr);
201 myDelegate = factory;
202 }
203 factory = null;
204 return;
205 }
206 }
207 catch (final IOException ioe) {
208 LOG.warn(ioe, "I/O error during bootstrap to {}.", addr);
209 }
210 catch (final InterruptedException e) {
211 LOG.warn(e, "Interrupted during bootstrap to {}.", addr);
212 }
213 catch (final ExecutionException e) {
214 LOG.warn(e, "Error during bootstrap to {}.", addr);
215 }
216 finally {
217 IOUtils.close(conn, Level.WARNING,
218 "I/O error shutting down bootstrap connection to "
219 + addr + ".");
220 }
221 }
222 }
223 finally {
224 IOUtils.close(factory);
225 }
226 }
227
228
229
230
231
232
233
234
235 protected ConnectionFactory bootstrapReplicaSet(
236 final ProxiedConnectionFactory factory) {
237 return new ReplicaSetConnectionFactory(factory, getConfig());
238 }
239
240
241
242
243
244
245
246
247 protected ConnectionFactory bootstrapSharded(
248 final ProxiedConnectionFactory factory) {
249 return new ShardedConnectionFactory(factory, getConfig());
250 }
251
252
253
254
255
256
257 protected MongoClientConfiguration getConfig() {
258 return myConfig;
259 }
260
261
262
263
264
265
266 protected ConnectionFactory getDelegate() {
267 if (myDelegate == null) {
268 return createDelegate();
269 }
270 return myDelegate;
271 }
272
273
274
275
276
277
278
279 protected void setDelegate(final ConnectionFactory delegate) {
280 myDelegate = delegate;
281 }
282
283
284
285
286
287
288 private synchronized ConnectionFactory createDelegate() {
289 if (myDelegate == null) {
290 bootstrap();
291 if (myDelegate == null) {
292 LOG.warn("Could not bootstrap a connection to the MongoDB servers.");
293 throw new CannotConnectException(
294 "Could not bootstrap a connection to the MongoDB servers.");
295 }
296 }
297 return myDelegate;
298 }
299
300
301
302
303
304
305
306
307
308
309 private boolean isMongos(final Document doc) {
310
311 final Element processName = doc.get("msg");
312 if (processName instanceof StringElement) {
313 return "isdbgrid".equals(((StringElement) processName).getValue());
314 }
315
316 return false;
317 }
318
319
320
321
322
323
324
325
326
327
328 private boolean isReplicationSet(final Document doc) {
329 return (doc.get("setName") instanceof StringElement);
330 }
331 }