Coverage Report - com.allanbank.mongodb.client.state.ClusterPinger
 
Classes in this File Line Coverage Branch Coverage Complexity
ClusterPinger
94%
98/104
92%
26/28
2.647
ClusterPinger$Pinger
80%
17/21
100%
4/4
2.647
 
 1  
 /*
 2  
  * #%L
 3  
  * ClusterPinger.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.state;
 21  
 
 22  
 import java.io.Closeable;
 23  
 import java.io.IOException;
 24  
 import java.util.ArrayList;
 25  
 import java.util.Collections;
 26  
 import java.util.HashMap;
 27  
 import java.util.Iterator;
 28  
 import java.util.List;
 29  
 import java.util.Map;
 30  
 import java.util.concurrent.CopyOnWriteArrayList;
 31  
 import java.util.concurrent.ExecutionException;
 32  
 import java.util.concurrent.Future;
 33  
 import java.util.concurrent.TimeUnit;
 34  
 import java.util.concurrent.TimeoutException;
 35  
 
 36  
 import com.allanbank.mongodb.MongoClientConfiguration;
 37  
 import com.allanbank.mongodb.MongoDbException;
 38  
 import com.allanbank.mongodb.client.ClusterType;
 39  
 import com.allanbank.mongodb.client.connection.Connection;
 40  
 import com.allanbank.mongodb.client.connection.proxy.ProxiedConnectionFactory;
 41  
 import com.allanbank.mongodb.client.message.IsMaster;
 42  
 import com.allanbank.mongodb.client.message.ReplicaSetStatus;
 43  
 import com.allanbank.mongodb.client.message.Reply;
 44  
 import com.allanbank.mongodb.util.IOUtils;
 45  
 import com.allanbank.mongodb.util.log.Log;
 46  
 import com.allanbank.mongodb.util.log.LogFactory;
 47  
 
 48  
 /**
 49  
  * ClusterPinger pings each of the connections in the cluster and updates the
 50  
  * latency of the server from this client.
 51  
  * 
 52  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 53  
  *         mutated in incompatible ways between any two releases of the driver.
 54  
  * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
 55  
  */
 56  
 public class ClusterPinger implements Runnable, Closeable {
 57  
 
 58  
     /** The default interval between ping sweeps in seconds. */
 59  
     public static final int DEFAULT_PING_INTERVAL_SECONDS = 600;
 60  
 
 61  
     /** The logger for the {@link ClusterPinger}. */
 62  1
     protected static final Log LOG = LogFactory.getLog(ClusterPinger.class);
 63  
 
 64  
     /** Instance of the inner class containing the ping logic. */
 65  1
     private static final Pinger PINGER = new Pinger();
 66  
 
 67  
     /**
 68  
      * Pings the server and suppresses all exceptions.
 69  
      * 
 70  
      * @param server
 71  
      *            The address of the server. Used for logging.
 72  
      * @param conn
 73  
      *            The connection to ping.
 74  
      * @return True if the ping worked, false otherwise.
 75  
      */
 76  
     public static boolean ping(final Server server, final Connection conn) {
 77  6
         return PINGER.ping(server, conn);
 78  
     }
 79  
 
 80  
     /** The state of the clusters. */
 81  
     private final List<Cluster> myClusters;
 82  
 
 83  
     /** The configuration for the connections. */
 84  
     private final MongoClientConfiguration myConfig;
 85  
 
 86  
     /** The factory for creating connections to the servers. */
 87  
     private final ProxiedConnectionFactory myConnectionFactory;
 88  
 
 89  
     /** The units for the ping sweep intervals. */
 90  
     private volatile TimeUnit myIntervalUnits;
 91  
 
 92  
     /** The interval for a ping sweep across all of the servers. */
 93  
     private volatile int myPingSweepInterval;
 94  
 
 95  
     /** The thread that is pinging the servers for latency. */
 96  
     private final Thread myPingThread;
 97  
 
 98  
     /** The flag to stop the ping thread. */
 99  
     private volatile boolean myRunning;
 100  
 
 101  
     /**
 102  
      * Creates a new ClusterPinger.
 103  
      * 
 104  
      * @param cluster
 105  
      *            The state of the cluster.
 106  
      * @param factory
 107  
      *            The factory for creating connections to the servers.
 108  
      * @param config
 109  
      *            The configuration for the connections.
 110  
      */
 111  
     public ClusterPinger(final Cluster cluster,
 112  
             final ProxiedConnectionFactory factory,
 113  
             final MongoClientConfiguration config) {
 114  58
         super();
 115  
 
 116  58
         myConnectionFactory = factory;
 117  58
         myConfig = config;
 118  58
         myRunning = true;
 119  
 
 120  58
         myClusters = new CopyOnWriteArrayList<Cluster>();
 121  58
         myClusters.add(cluster);
 122  
 
 123  58
         myIntervalUnits = TimeUnit.SECONDS;
 124  58
         myPingSweepInterval = DEFAULT_PING_INTERVAL_SECONDS;
 125  
 
 126  58
         myPingThread = myConfig.getThreadFactory().newThread(this);
 127  58
         myPingThread.setDaemon(true);
 128  58
         myPingThread.setName("MongoDB Pinger");
 129  58
         myPingThread.setPriority(Thread.MIN_PRIORITY);
 130  58
     }
 131  
 
 132  
     /**
 133  
      * Adds a new cluster to the set of tracked clusters.
 134  
      * 
 135  
      * @param cluster
 136  
      *            A new cluster to the set of tracked clusters.
 137  
      */
 138  
     public void addCluster(final Cluster cluster) {
 139  0
         myClusters.add(cluster);
 140  0
     }
 141  
 
 142  
     /**
 143  
      * {@inheritDoc}
 144  
      * <p>
 145  
      * Overridden to close the pinger.
 146  
      * </p>
 147  
      */
 148  
     @Override
 149  
     public void close() {
 150  75
         myRunning = false;
 151  75
         myPingThread.interrupt();
 152  75
     }
 153  
 
 154  
     /**
 155  
      * Returns the units for the ping sweep intervals.
 156  
      * 
 157  
      * @return The units for the ping sweep intervals.
 158  
      */
 159  
     public TimeUnit getIntervalUnits() {
 160  49
         return myIntervalUnits;
 161  
     }
 162  
 
 163  
     /**
 164  
      * Returns the interval for a ping sweep across all of the servers..
 165  
      * 
 166  
      * @return The interval for a ping sweep across all of the servers..
 167  
      */
 168  
     public int getPingSweepInterval() {
 169  49
         return myPingSweepInterval;
 170  
     }
 171  
 
 172  
     /**
 173  
      * Performs a single sweep through the servers sending a ping with a
 174  
      * callback to set the latency and tags for each server.
 175  
      * <p>
 176  
      * This method will not return until at least 50% of the servers have
 177  
      * replied (which may be a failure) to the initial ping.
 178  
      * </p>
 179  
      * 
 180  
      * @param cluster
 181  
      *            The cluster of servers to ping.
 182  
      */
 183  
     public void initialSweep(final Cluster cluster) {
 184  46
         final List<Server> servers = cluster.getServers();
 185  46
         final List<Future<Reply>> replies = new ArrayList<Future<Reply>>(
 186  
                 servers.size());
 187  46
         final List<Connection> connections = new ArrayList<Connection>(
 188  
                 servers.size());
 189  
         try {
 190  46
             for (final Server server : servers) {
 191  
                 // Ping the current server.
 192  59
                 final String name = server.getCanonicalName();
 193  59
                 Connection conn = null;
 194  
                 try {
 195  59
                     conn = myConnectionFactory.connect(server, myConfig);
 196  
 
 197  
                     // Use a isMaster request to measure latency. It is
 198  
                     // a best case since it does not require any locks.
 199  47
                     final Future<Reply> reply = PINGER.pingAsync(
 200  
                             cluster.getType(), server, conn);
 201  47
                     replies.add(reply);
 202  
                 }
 203  12
                 catch (final IOException e) {
 204  12
                     LOG.info("Could not ping '{}': {}", name, e.getMessage());
 205  
                 }
 206  
                 finally {
 207  59
                     if (conn != null) {
 208  47
                         connections.add(conn);
 209  47
                         conn.shutdown(false);
 210  
                     }
 211  
                 }
 212  59
             }
 213  
 
 214  46
             long now = System.currentTimeMillis();
 215  46
             final long deadline = now
 216  
                     + Math.max(5000, myConfig.getConnectTimeout());
 217  86
             while ((now < deadline) && !replies.isEmpty()) {
 218  40
                 final Iterator<Future<Reply>> iter = replies.iterator();
 219  88
                 while (iter.hasNext() && (now < deadline)) {
 220  48
                     Future<Reply> future = iter.next();
 221  
                     try {
 222  48
                         if (future != null) {
 223  
                             // Pause...
 224  43
                             future.get(deadline - now, TimeUnit.MILLISECONDS);
 225  
                         }
 226  
 
 227  
                         // A good reply or we could not connect to the server.
 228  46
                         iter.remove();
 229  
                     }
 230  1
                     catch (final ExecutionException e) {
 231  
                         // We got a reply. Its a failure but its a reply.
 232  1
                         iter.remove();
 233  
                     }
 234  0
                     catch (final TimeoutException e) {
 235  
                         // No reply yet.
 236  0
                         future = null;
 237  
                     }
 238  1
                     catch (final InterruptedException e) {
 239  
                         // No reply yet.
 240  1
                         future = null;
 241  47
                     }
 242  
 
 243  48
                     now = System.currentTimeMillis();
 244  48
                 }
 245  40
             }
 246  
         }
 247  
         finally {
 248  46
             for (final Connection conn : connections) {
 249  47
                 IOUtils.close(conn);
 250  47
             }
 251  46
         }
 252  46
     }
 253  
 
 254  
     /**
 255  
      * {@inheritDoc}
 256  
      * <p>
 257  
      * Overridden to periodically wake-up and ping the servers. At first this
 258  
      * will occur fairly often but eventually degrade to once every 5 minutes.
 259  
      * </p>
 260  
      */
 261  
     @Override
 262  
     public void run() {
 263  100
         while (myRunning) {
 264  
             try {
 265  49
                 final Map<Server, ClusterType> servers = extractAllServers();
 266  
 
 267  49
                 final long interval = getIntervalUnits().toMillis(
 268  
                         getPingSweepInterval());
 269  49
                 final long perServerSleep = servers.isEmpty() ? interval
 270  
                         : interval / servers.size();
 271  
 
 272  
                 // Sleep a little before starting. We do it first to give
 273  
                 // tests time to finish without a sweep in the middle
 274  
                 // causing confusion and delay.
 275  49
                 Thread.sleep(TimeUnit.MILLISECONDS.toMillis(perServerSleep));
 276  
 
 277  15
                 startSweep();
 278  
 
 279  15
                 for (final Map.Entry<Server, ClusterType> entry : servers
 280  
                         .entrySet()) {
 281  
                     // Ping the current server.
 282  15
                     final Server server = entry.getKey();
 283  15
                     final String name = server.getCanonicalName();
 284  15
                     Connection conn = null;
 285  
                     try {
 286  15
                         myPingThread.setName("MongoDB Pinger - " + name);
 287  
 
 288  15
                         conn = myConnectionFactory.connect(server, myConfig);
 289  
 
 290  14
                         PINGER.pingAsync(entry.getValue(), server, conn);
 291  
 
 292  
                         // Sleep a little between the servers.
 293  14
                         Thread.sleep(TimeUnit.MILLISECONDS
 294  
                                 .toMillis(perServerSleep));
 295  
                     }
 296  1
                     catch (final IOException e) {
 297  1
                         LOG.info("Could not ping '{}': {}", name,
 298  
                                 e.getMessage());
 299  
                     }
 300  
                     finally {
 301  15
                         myPingThread.setName("MongoDB Pinger - Idle");
 302  15
                         if (conn != null) {
 303  14
                             conn.shutdown(true);
 304  
                         }
 305  
                     }
 306  
 
 307  14
                 }
 308  
             }
 309  35
             catch (final InterruptedException ok) {
 310  35
                 LOG.debug("Pinger interrupted.");
 311  49
             }
 312  
         }
 313  51
     }
 314  
 
 315  
     /**
 316  
      * Sets the value of units for the ping sweep intervals.
 317  
      * 
 318  
      * @param intervalUnits
 319  
      *            The new value for the units for the ping sweep intervals.
 320  
      */
 321  
     public void setIntervalUnits(final TimeUnit intervalUnits) {
 322  12
         myIntervalUnits = intervalUnits;
 323  12
     }
 324  
 
 325  
     /**
 326  
      * Sets the interval for a ping sweep across all of the servers..
 327  
      * 
 328  
      * @param pingSweepInterval
 329  
      *            The new value for the interval for a ping sweep across all of
 330  
      *            the servers..
 331  
      */
 332  
     public void setPingSweepInterval(final int pingSweepInterval) {
 333  12
         myPingSweepInterval = pingSweepInterval;
 334  12
     }
 335  
 
 336  
     /**
 337  
      * Starts the background pinger.
 338  
      */
 339  
     public void start() {
 340  40
         myPingThread.start();
 341  40
     }
 342  
 
 343  
     /**
 344  
      * Stops the background pinger. Equivalent to {@link #close()}.
 345  
      */
 346  
     public void stop() {
 347  2
         close();
 348  2
     }
 349  
 
 350  
     /**
 351  
      * Starts the background pinger.
 352  
      */
 353  
     public void wakeUp() {
 354  0
         myPingThread.interrupt();
 355  0
     }
 356  
 
 357  
     /**
 358  
      * Extension point to notify derived classes that a new sweep is starting.
 359  
      */
 360  
     protected void startSweep() {
 361  
         // Nothing.
 362  15
     }
 363  
 
 364  
     /**
 365  
      * Extracts the complete list of servers in all clusters.
 366  
      * 
 367  
      * @return The complete list of servers across all clusters.
 368  
      */
 369  
     private Map<Server, ClusterType> extractAllServers() {
 370  49
         final Map<Server, ClusterType> servers = new HashMap<Server, ClusterType>();
 371  
 
 372  49
         for (final Cluster cluster : myClusters) {
 373  49
             for (final Server server : cluster.getServers()) {
 374  57
                 servers.put(server, cluster.getType());
 375  57
             }
 376  49
         }
 377  
 
 378  49
         return Collections.unmodifiableMap(servers);
 379  
     }
 380  
 
 381  
     /**
 382  
      * Pinger provides logic to ping servers.
 383  
      * 
 384  
      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
 385  
      */
 386  1
     protected static final class Pinger {
 387  
         /**
 388  
          * Pings the server and suppresses all exceptions. Updates the server
 389  
          * state with a latency and the tags found in the response, if any.
 390  
          * 
 391  
          * @param server
 392  
          *            The server to update with the results of the ping. If
 393  
          *            <code>false</code> is returned then the state will not
 394  
          *            have been updated. Passing <code>null</code> for the state
 395  
          *            is allowed.
 396  
          * @param conn
 397  
          *            The connection to ping.
 398  
          * @return True if the ping worked, false otherwise.
 399  
          */
 400  
         public boolean ping(final Server server, final Connection conn) {
 401  
             try {
 402  6
                 final Future<Reply> future = pingAsync(ClusterType.STAND_ALONE,
 403  
                         server, conn);
 404  
 
 405  
                 // Wait for the reply.
 406  6
                 if (future != null) {
 407  5
                     future.get(1, TimeUnit.MINUTES);
 408  
 
 409  4
                     return true;
 410  
                 }
 411  
             }
 412  1
             catch (final ExecutionException e) {
 413  1
                 LOG.info(e, "Could not ping '{}': {}",
 414  
                         server.getCanonicalName(), e.getMessage());
 415  
             }
 416  0
             catch (final TimeoutException e) {
 417  0
                 LOG.info(e, "'{}' might be a zombie - not receiving "
 418  
                         + "a response to ping: {}", server.getCanonicalName(),
 419  
                         e.getMessage());
 420  
             }
 421  0
             catch (final InterruptedException e) {
 422  0
                 LOG.info(e, "Interrupted pinging '{}': {}",
 423  
                         server.getCanonicalName(), e.getMessage());
 424  2
             }
 425  
 
 426  2
             return false;
 427  
         }
 428  
 
 429  
         /**
 430  
          * Pings the server and suppresses all exceptions. Returns a future that
 431  
          * can be used to determine if a response has been received. The future
 432  
          * will update the {@link Server} latency and tags if found.
 433  
          * 
 434  
          * @param type
 435  
          *            The type of cluster to ping.
 436  
          * @param server
 437  
          *            The server to update with the results of the ping. If
 438  
          *            <code>false</code> is returned then the state will not
 439  
          *            have been updated. Passing <code>null</code> for the state
 440  
          *            is allowed.
 441  
          * @param conn
 442  
          *            The connection to ping.
 443  
          * @return A {@link Future} that will be updated once the reply is
 444  
          *         received.
 445  
          */
 446  
         public Future<Reply> pingAsync(final ClusterType type,
 447  
                 final Server server, final Connection conn) {
 448  
             try {
 449  67
                 final ServerUpdateCallback future = new ServerUpdateCallback(
 450  
                         server);
 451  
 
 452  67
                 conn.send(new IsMaster(), future);
 453  60
                 if (type == ClusterType.REPLICA_SET) {
 454  26
                     conn.send(new ReplicaSetStatus(), new ServerUpdateCallback(
 455  
                             server));
 456  
                 }
 457  
 
 458  60
                 return future;
 459  
             }
 460  7
             catch (final MongoDbException e) {
 461  7
                 LOG.info("Could not ping '{}': {}", server, e.getMessage());
 462  
             }
 463  7
             return null;
 464  
         }
 465  
     }
 466  
 }