View Javadoc
1   /*
2    * #%L
3    * SocketConnectionFactory.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.connection.socket;
21  
22  import java.beans.PropertyChangeEvent;
23  import java.beans.PropertyChangeListener;
24  import java.io.IOException;
25  import java.lang.ref.Reference;
26  import java.net.InetSocketAddress;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.List;
30  import java.util.concurrent.ExecutionException;
31  
32  import com.allanbank.mongodb.MongoClientConfiguration;
33  import com.allanbank.mongodb.Version;
34  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
35  import com.allanbank.mongodb.bson.io.StringDecoderCache;
36  import com.allanbank.mongodb.bson.io.StringEncoderCache;
37  import com.allanbank.mongodb.client.ClusterStats;
38  import com.allanbank.mongodb.client.ClusterType;
39  import com.allanbank.mongodb.client.connection.Connection;
40  import com.allanbank.mongodb.client.connection.ConnectionFactory;
41  import com.allanbank.mongodb.client.connection.ReconnectStrategy;
42  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
43  import com.allanbank.mongodb.client.message.IsMaster;
44  import com.allanbank.mongodb.client.state.Cluster;
45  import com.allanbank.mongodb.client.state.LatencyServerSelector;
46  import com.allanbank.mongodb.client.state.Server;
47  import com.allanbank.mongodb.client.state.ServerSelector;
48  import com.allanbank.mongodb.client.state.ServerUpdateCallback;
49  import com.allanbank.mongodb.client.state.SimpleReconnectStrategy;
50  import com.allanbank.mongodb.util.log.Log;
51  import com.allanbank.mongodb.util.log.LogFactory;
52  
53  /**
54   * {@link ConnectionFactory} to create direct socket connections to the servers.
55   * 
56   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
57   *         mutated in incompatible ways between any two releases of the driver.
58   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
59   */
60  public class SocketConnectionFactory implements ProxiedConnectionFactory {
61  
62      /** The logger for the factory. */
63      private static final Log LOG = LogFactory
64              .getLog(SocketConnectionFactory.class);
65  
66      /**
67       * The buffers used by the single threaded connections. Each buffer is
68       * shared by all connections but there can be up to 1 buffer per application
69       * thread. We use a reference to the buffer to allow the garbage collector
70       * To clean up the stream.
71       */
72      private ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
73  
74      /** The state of the cluster. */
75      private final Cluster myCluster;
76  
77      /** The MongoDB client configuration. */
78      private final MongoClientConfiguration myConfig;
79  
80      /** The MongoDB client configuration. */
81      private final ConfigurationListener myConfigListener;
82  
83      /** Cache used for decoding strings. */
84      private final StringDecoderCache myDecoderCache;
85  
86      /** Cache used for encoding strings. */
87      private final StringEncoderCache myEncoderCache;
88  
89      /** The server selector. */
90      private final ServerSelector myServerSelector;
91  
92      /**
93       * Creates a new {@link SocketConnectionFactory}.
94       * 
95       * @param config
96       *            The MongoDB client configuration.
97       */
98      public SocketConnectionFactory(final MongoClientConfiguration config) {
99          super();
100         myConfig = config;
101         myCluster = new Cluster(config, ClusterType.STAND_ALONE);
102         myServerSelector = new LatencyServerSelector(myCluster, true);
103         myBuffers = new ThreadLocal<Reference<BufferingBsonOutputStream>>();
104 
105         myConfigListener = new ConfigurationListener();
106         myConfig.addPropertyChangeListener(myConfigListener);
107 
108         myDecoderCache = new StringDecoderCache();
109         myDecoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
110         myDecoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
111 
112         myEncoderCache = new StringEncoderCache();
113         myEncoderCache.setMaxCacheEntries(config.getMaxCachedStringEntries());
114         myEncoderCache.setMaxCacheLength(config.getMaxCachedStringLength());
115     }
116 
117     /**
118      * {@inheritDoc}
119      * <p>
120      * Overridden to do nothing.
121      * </p>
122      */
123     @Override
124     public void close() {
125         myBuffers = null; // Let the ThreadLocal's weak reference go.
126         myConfig.removePropertyChangeListener(myConfigListener);
127 
128         // Release the cached entries too.
129         myDecoderCache.setMaxCacheEntries(0);
130         myDecoderCache.setMaxCacheLength(0);
131     }
132 
133     /**
134      * {@inheritDoc}
135      * <p>
136      * Returns a new {@link SocketConnection}.
137      * </p>
138      * 
139      * @see ConnectionFactory#connect()
140      */
141     @Override
142     public Connection connect() throws IOException {
143         final List<InetSocketAddress> servers = new ArrayList<InetSocketAddress>(
144                 myConfig.getServerAddresses());
145 
146         // Shuffle the servers and try to connect to each until one works.
147         IOException last = null;
148         Collections.shuffle(servers);
149         for (final InetSocketAddress address : servers) {
150             try {
151                 final Server server = myCluster.add(address);
152                 final Connection conn = connect(server, myConfig);
153 
154                 // Get the state of the server updated.
155                 final ServerUpdateCallback cb = new ServerUpdateCallback(server);
156                 conn.send(new IsMaster(), cb);
157 
158                 if (Version.UNKNOWN.equals(server.getVersion())) {
159                     // If we don't know the version then wait for that response.
160                     try {
161                         cb.get();
162                     }
163                     catch (final ExecutionException e) {
164                         // Probably not in a good state...
165                         LOG.debug(e, "Could not execute an 'ismaster' command.");
166                     }
167                     catch (final InterruptedException e) {
168                         // Probably not in a good state...
169                         LOG.debug(e, "Could not execute an 'ismaster' command.");
170                     }
171                 }
172 
173                 return conn;
174             }
175             catch (final IOException error) {
176                 last = error;
177             }
178         }
179 
180         if (last != null) {
181             throw last;
182         }
183         throw new IOException("Could not connect to any server: " + servers);
184     }
185 
186     /**
187      * Creates a connection to the address provided.
188      * 
189      * @param server
190      *            The MongoDB server to connect to.
191      * @param config
192      *            The configuration for the Connection to the MongoDB server.
193      * @return The Connection to MongoDB.
194      * @throws IOException
195      *             On a failure connecting to the server.
196      */
197     @Override
198     public Connection connect(final Server server,
199             final MongoClientConfiguration config) throws IOException {
200 
201         final AbstractSocketConnection connection;
202 
203         switch (myConfig.getConnectionModel()) {
204         case SENDER_RECEIVER_THREAD: {
205             connection = new TwoThreadSocketConnection(server, myConfig,
206                     myEncoderCache, myDecoderCache);
207             break;
208         }
209         default: { // and RECEIVER_THREAD
210             connection = new SocketConnection(server, myConfig, myEncoderCache,
211                     myDecoderCache, myBuffers);
212             break;
213         }
214         }
215 
216         // Start the connection.
217         connection.start();
218 
219         return connection;
220     }
221 
222     /**
223      * Returns the cluster state.
224      * 
225      * @return The cluster state.
226      */
227     public Cluster getCluster() {
228         return myCluster;
229     }
230 
231     /**
232      * {@inheritDoc}
233      * <p>
234      * Overridden to return the {@link Cluster}.
235      * </p>
236      */
237     @Override
238     public ClusterStats getClusterStats() {
239         return myCluster;
240     }
241 
242     /**
243      * {@inheritDoc}
244      * <p>
245      * Overridden to return {@link ClusterType#STAND_ALONE} cluster type.
246      * </p>
247      */
248     @Override
249     public ClusterType getClusterType() {
250         return ClusterType.STAND_ALONE;
251     }
252 
253     /**
254      * {@inheritDoc}
255      * <p>
256      * Overridden to return a {@link SimpleReconnectStrategy}.
257      * </p>
258      */
259     @Override
260     public ReconnectStrategy getReconnectStrategy() {
261         final SimpleReconnectStrategy strategy = new SimpleReconnectStrategy();
262         strategy.setConfig(myConfig);
263         strategy.setConnectionFactory(this);
264         strategy.setSelector(myServerSelector);
265         strategy.setState(myCluster);
266 
267         return strategy;
268     }
269 
270     /**
271      * Notification of a change to the configuration of the client.
272      * 
273      * @param evt
274      *            The details of the configuration change.
275      */
276     protected void configurationChanged(final PropertyChangeEvent evt) {
277         final String name = evt.getPropertyName();
278         final Object value = evt.getNewValue();
279         if ("maxCachedStringEntries".equals(name) && (value instanceof Number)) {
280             myDecoderCache.setMaxCacheEntries(((Number) value).intValue());
281             myEncoderCache.setMaxCacheEntries(((Number) value).intValue());
282         }
283         else if ("maxCachedStringLength".equals(name)
284                 && (value instanceof Number)) {
285             myDecoderCache.setMaxCacheLength(((Number) value).intValue());
286             myEncoderCache.setMaxCacheLength(((Number) value).intValue());
287         }
288     }
289 
290     /**
291      * ConfigurationListener provides a listener for changes in the client's
292      * configuration.
293      * 
294      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
295      *         be mutated in incompatible ways between any two releases of the
296      *         driver.
297      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
298      */
299     protected final class ConfigurationListener implements
300             PropertyChangeListener {
301         @Override
302         public void propertyChange(final PropertyChangeEvent evt) {
303             configurationChanged(evt);
304         }
305     }
306 }