View Javadoc
1   /*
2    * #%L
3    * ClusterPinger.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.state;
21  
22  import java.io.Closeable;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.CopyOnWriteArrayList;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  
36  import com.allanbank.mongodb.MongoClientConfiguration;
37  import com.allanbank.mongodb.MongoDbException;
38  import com.allanbank.mongodb.client.ClusterType;
39  import com.allanbank.mongodb.client.connection.Connection;
40  import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
41  import com.allanbank.mongodb.client.message.IsMaster;
42  import com.allanbank.mongodb.client.message.ReplicaSetStatus;
43  import com.allanbank.mongodb.client.message.Reply;
44  import com.allanbank.mongodb.util.IOUtils;
45  import com.allanbank.mongodb.util.log.Log;
46  import com.allanbank.mongodb.util.log.LogFactory;
47  
48  /**
49   * ClusterPinger pings each of the connections in the cluster and updates the
50   * latency of the server from this client.
51   * 
52   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
53   *         mutated in incompatible ways between any two releases of the driver.
54   * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
55   */
56  public class ClusterPinger implements Runnable, Closeable {
57  
58      /** The default interval between ping sweeps in seconds. */
59      public static final int DEFAULT_PING_INTERVAL_SECONDS = 600;
60  
61      /** The logger for the {@link ClusterPinger}. */
62      protected static final Log LOG = LogFactory.getLog(ClusterPinger.class);
63  
64      /** Instance of the inner class containing the ping logic. */
65      private static final Pinger PINGER = new Pinger();
66  
67      /**
68       * Pings the server and suppresses all exceptions.
69       * 
70       * @param server
71       *            The address of the server. Used for logging.
72       * @param conn
73       *            The connection to ping.
74       * @return True if the ping worked, false otherwise.
75       */
76      public static boolean ping(final Server server, final Connection conn) {
77          return PINGER.ping(server, conn);
78      }
79  
80      /** The state of the clusters. */
81      private final List<Cluster> myClusters;
82  
83      /** The configuration for the connections. */
84      private final MongoClientConfiguration myConfig;
85  
86      /** The factory for creating connections to the servers. */
87      private final ProxiedConnectionFactory myConnectionFactory;
88  
89      /** The units for the ping sweep intervals. */
90      private volatile TimeUnit myIntervalUnits;
91  
92      /** The interval for a ping sweep across all of the servers. */
93      private volatile int myPingSweepInterval;
94  
95      /** The thread that is pinging the servers for latency. */
96      private final Thread myPingThread;
97  
98      /** The flag to stop the ping thread. */
99      private volatile boolean myRunning;
100 
101     /**
102      * Creates a new ClusterPinger.
103      * 
104      * @param cluster
105      *            The state of the cluster.
106      * @param factory
107      *            The factory for creating connections to the servers.
108      * @param config
109      *            The configuration for the connections.
110      */
111     public ClusterPinger(final Cluster cluster,
112             final ProxiedConnectionFactory factory,
113             final MongoClientConfiguration config) {
114         super();
115 
116         myConnectionFactory = factory;
117         myConfig = config;
118         myRunning = true;
119 
120         myClusters = new CopyOnWriteArrayList<Cluster>();
121         myClusters.add(cluster);
122 
123         myIntervalUnits = TimeUnit.SECONDS;
124         myPingSweepInterval = DEFAULT_PING_INTERVAL_SECONDS;
125 
126         myPingThread = myConfig.getThreadFactory().newThread(this);
127         myPingThread.setDaemon(true);
128         myPingThread.setName("MongoDB Pinger");
129         myPingThread.setPriority(Thread.MIN_PRIORITY);
130     }
131 
132     /**
133      * Adds a new cluster to the set of tracked clusters.
134      * 
135      * @param cluster
136      *            A new cluster to the set of tracked clusters.
137      */
138     public void addCluster(final Cluster cluster) {
139         myClusters.add(cluster);
140     }
141 
142     /**
143      * {@inheritDoc}
144      * <p>
145      * Overridden to close the pinger.
146      * </p>
147      */
148     @Override
149     public void close() {
150         myRunning = false;
151         myPingThread.interrupt();
152     }
153 
154     /**
155      * Returns the units for the ping sweep intervals.
156      * 
157      * @return The units for the ping sweep intervals.
158      */
159     public TimeUnit getIntervalUnits() {
160         return myIntervalUnits;
161     }
162 
163     /**
164      * Returns the interval for a ping sweep across all of the servers..
165      * 
166      * @return The interval for a ping sweep across all of the servers..
167      */
168     public int getPingSweepInterval() {
169         return myPingSweepInterval;
170     }
171 
172     /**
173      * Performs a single sweep through the servers sending a ping with a
174      * callback to set the latency and tags for each server.
175      * <p>
176      * This method will not return until at least 50% of the servers have
177      * replied (which may be a failure) to the initial ping.
178      * </p>
179      * 
180      * @param cluster
181      *            The cluster of servers to ping.
182      */
183     public void initialSweep(final Cluster cluster) {
184         final List<Server> servers = cluster.getServers();
185         final List<Future<Reply>> replies = new ArrayList<Future<Reply>>(
186                 servers.size());
187         final List<Connection> connections = new ArrayList<Connection>(
188                 servers.size());
189         try {
190             for (final Server server : servers) {
191                 // Ping the current server.
192                 final String name = server.getCanonicalName();
193                 Connection conn = null;
194                 try {
195                     conn = myConnectionFactory.connect(server, myConfig);
196 
197                     // Use a isMaster request to measure latency. It is
198                     // a best case since it does not require any locks.
199                     final Future<Reply> reply = PINGER.pingAsync(
200                             cluster.getType(), server, conn);
201                     replies.add(reply);
202                 }
203                 catch (final IOException e) {
204                     LOG.info("Could not ping '{}': {}", name, e.getMessage());
205                 }
206                 finally {
207                     if (conn != null) {
208                         connections.add(conn);
209                         conn.shutdown(false);
210                     }
211                 }
212             }
213 
214             long now = System.currentTimeMillis();
215             final long deadline = now
216                     + Math.max(5000, myConfig.getConnectTimeout());
217             while ((now < deadline) && !replies.isEmpty()) {
218                 final Iterator<Future<Reply>> iter = replies.iterator();
219                 while (iter.hasNext() && (now < deadline)) {
220                     Future<Reply> future = iter.next();
221                     try {
222                         if (future != null) {
223                             // Pause...
224                             future.get(deadline - now, TimeUnit.MILLISECONDS);
225                         }
226 
227                         // A good reply or we could not connect to the server.
228                         iter.remove();
229                     }
230                     catch (final ExecutionException e) {
231                         // We got a reply. Its a failure but its a reply.
232                         iter.remove();
233                     }
234                     catch (final TimeoutException e) {
235                         // No reply yet.
236                         future = null;
237                     }
238                     catch (final InterruptedException e) {
239                         // No reply yet.
240                         future = null;
241                     }
242 
243                     now = System.currentTimeMillis();
244                 }
245             }
246         }
247         finally {
248             for (final Connection conn : connections) {
249                 IOUtils.close(conn);
250             }
251         }
252     }
253 
254     /**
255      * {@inheritDoc}
256      * <p>
257      * Overridden to periodically wake-up and ping the servers. At first this
258      * will occur fairly often but eventually degrade to once every 5 minutes.
259      * </p>
260      */
261     @Override
262     public void run() {
263         while (myRunning) {
264             try {
265                 final Map<Server, ClusterType> servers = extractAllServers();
266 
267                 final long interval = getIntervalUnits().toMillis(
268                         getPingSweepInterval());
269                 final long perServerSleep = servers.isEmpty() ? interval
270                         : interval / servers.size();
271 
272                 // Sleep a little before starting. We do it first to give
273                 // tests time to finish without a sweep in the middle
274                 // causing confusion and delay.
275                 Thread.sleep(TimeUnit.MILLISECONDS.toMillis(perServerSleep));
276 
277                 startSweep();
278 
279                 for (final Map.Entry<Server, ClusterType> entry : servers
280                         .entrySet()) {
281                     // Ping the current server.
282                     final Server server = entry.getKey();
283                     final String name = server.getCanonicalName();
284                     Connection conn = null;
285                     try {
286                         myPingThread.setName("MongoDB Pinger - " + name);
287 
288                         conn = myConnectionFactory.connect(server, myConfig);
289 
290                         PINGER.pingAsync(entry.getValue(), server, conn);
291 
292                         // Sleep a little between the servers.
293                         Thread.sleep(TimeUnit.MILLISECONDS
294                                 .toMillis(perServerSleep));
295                     }
296                     catch (final IOException e) {
297                         LOG.info("Could not ping '{}': {}", name,
298                                 e.getMessage());
299                     }
300                     finally {
301                         myPingThread.setName("MongoDB Pinger - Idle");
302                         if (conn != null) {
303                             conn.shutdown(true);
304                         }
305                     }
306 
307                 }
308             }
309             catch (final InterruptedException ok) {
310                 LOG.debug("Pinger interrupted.");
311             }
312         }
313     }
314 
315     /**
316      * Sets the value of units for the ping sweep intervals.
317      * 
318      * @param intervalUnits
319      *            The new value for the units for the ping sweep intervals.
320      */
321     public void setIntervalUnits(final TimeUnit intervalUnits) {
322         myIntervalUnits = intervalUnits;
323     }
324 
325     /**
326      * Sets the interval for a ping sweep across all of the servers..
327      * 
328      * @param pingSweepInterval
329      *            The new value for the interval for a ping sweep across all of
330      *            the servers..
331      */
332     public void setPingSweepInterval(final int pingSweepInterval) {
333         myPingSweepInterval = pingSweepInterval;
334     }
335 
336     /**
337      * Starts the background pinger.
338      */
339     public void start() {
340         myPingThread.start();
341     }
342 
343     /**
344      * Stops the background pinger. Equivalent to {@link #close()}.
345      */
346     public void stop() {
347         close();
348     }
349 
350     /**
351      * Starts the background pinger.
352      */
353     public void wakeUp() {
354         myPingThread.interrupt();
355     }
356 
357     /**
358      * Extension point to notify derived classes that a new sweep is starting.
359      */
360     protected void startSweep() {
361         // Nothing.
362     }
363 
364     /**
365      * Extracts the complete list of servers in all clusters.
366      * 
367      * @return The complete list of servers across all clusters.
368      */
369     private Map<Server, ClusterType> extractAllServers() {
370         final Map<Server, ClusterType> servers = new HashMap<Server, ClusterType>();
371 
372         for (final Cluster cluster : myClusters) {
373             for (final Server server : cluster.getServers()) {
374                 servers.put(server, cluster.getType());
375             }
376         }
377 
378         return Collections.unmodifiableMap(servers);
379     }
380 
381     /**
382      * Pinger provides logic to ping servers.
383      * 
384      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
385      */
386     protected static final class Pinger {
387         /**
388          * Pings the server and suppresses all exceptions. Updates the server
389          * state with a latency and the tags found in the response, if any.
390          * 
391          * @param server
392          *            The server to update with the results of the ping. If
393          *            <code>false</code> is returned then the state will not
394          *            have been updated. Passing <code>null</code> for the state
395          *            is allowed.
396          * @param conn
397          *            The connection to ping.
398          * @return True if the ping worked, false otherwise.
399          */
400         public boolean ping(final Server server, final Connection conn) {
401             try {
402                 final Future<Reply> future = pingAsync(ClusterType.STAND_ALONE,
403                         server, conn);
404 
405                 // Wait for the reply.
406                 if (future != null) {
407                     future.get(1, TimeUnit.MINUTES);
408 
409                     return true;
410                 }
411             }
412             catch (final ExecutionException e) {
413                 LOG.info(e, "Could not ping '{}': {}",
414                         server.getCanonicalName(), e.getMessage());
415             }
416             catch (final TimeoutException e) {
417                 LOG.info(e, "'{}' might be a zombie - not receiving "
418                         + "a response to ping: {}", server.getCanonicalName(),
419                         e.getMessage());
420             }
421             catch (final InterruptedException e) {
422                 LOG.info(e, "Interrupted pinging '{}': {}",
423                         server.getCanonicalName(), e.getMessage());
424             }
425 
426             return false;
427         }
428 
429         /**
430          * Pings the server and suppresses all exceptions. Returns a future that
431          * can be used to determine if a response has been received. The future
432          * will update the {@link Server} latency and tags if found.
433          * 
434          * @param type
435          *            The type of cluster to ping.
436          * @param server
437          *            The server to update with the results of the ping. If
438          *            <code>false</code> is returned then the state will not
439          *            have been updated. Passing <code>null</code> for the state
440          *            is allowed.
441          * @param conn
442          *            The connection to ping.
443          * @return A {@link Future} that will be updated once the reply is
444          *         received.
445          */
446         public Future<Reply> pingAsync(final ClusterType type,
447                 final Server server, final Connection conn) {
448             try {
449                 final ServerUpdateCallback future = new ServerUpdateCallback(
450                         server);
451 
452                 conn.send(new IsMaster(), future);
453                 if (type == ClusterType.REPLICA_SET) {
454                     conn.send(new ReplicaSetStatus(), new ServerUpdateCallback(
455                             server));
456                 }
457 
458                 return future;
459             }
460             catch (final MongoDbException e) {
461                 LOG.info("Could not ping '{}': {}", server, e.getMessage());
462             }
463             return null;
464         }
465     }
466 }