Coverage Report - com.allanbank.mongodb.client.state.Cluster
 
Classes in this File Line Coverage Branch Coverage Complexity
Cluster
97%
150/154
91%
54/59
2.556
Cluster$1
100%
1/1
N/A
2.556
Cluster$ServerListener
100%
29/29
86%
19/22
2.556
 
 1  
 /*
 2  
  * #%L
 3  
  * Cluster.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.state;
 21  
 
 22  
 import java.beans.PropertyChangeEvent;
 23  
 import java.beans.PropertyChangeListener;
 24  
 import java.beans.PropertyChangeSupport;
 25  
 import java.net.InetSocketAddress;
 26  
 import java.util.ArrayList;
 27  
 import java.util.Arrays;
 28  
 import java.util.Collections;
 29  
 import java.util.List;
 30  
 import java.util.concurrent.ConcurrentHashMap;
 31  
 import java.util.concurrent.ConcurrentMap;
 32  
 import java.util.concurrent.CopyOnWriteArrayList;
 33  
 
 34  
 import com.allanbank.mongodb.MongoClientConfiguration;
 35  
 import com.allanbank.mongodb.ReadPreference;
 36  
 import com.allanbank.mongodb.Version;
 37  
 import com.allanbank.mongodb.client.ClusterStats;
 38  
 import com.allanbank.mongodb.client.ClusterType;
 39  
 import com.allanbank.mongodb.client.Message;
 40  
 import com.allanbank.mongodb.client.VersionRange;
 41  
 import com.allanbank.mongodb.util.ServerNameUtils;
 42  
 
 43  
 /**
 44  
  * {@link Cluster} tracks the state of the cluster of MongoDB servers.
 45  
  * PropertyChangeEvents are fired when a server is added or marked writable/not
 46  
  * writable.
 47  
  * <p>
 48  
  * This class uses brute force synchronization to protect its internal state. It
 49  
  * is assumed that multiple connections will be concurrently updating the
 50  
  * {@link Cluster} at once and that at any given time this class may not contain
 51  
  * the absolute truth about the state of the cluster. Instead connections should
 52  
  * keep querying for the state of the cluster via their connection until the
 53  
  * view the server returned and the {@link Cluster} are consistent. Since this
 54  
  * class will not fire a {@link PropertyChangeEvent} when the state is not truly
 55  
  * modified the simplest mechanism is to keep querying for the cluster state on
 56  
  * the connection until no addition change events are seen.
 57  
  * </p>
 58  
  * 
 59  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 60  
  *         mutated in incompatible ways between any two releases of the driver.
 61  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 62  
  */
 63  
 public class Cluster implements ClusterStats {
 64  
 
 65  
     /** The property sued for adding a new server. */
 66  
     public static final String SERVER_PROP = "server";
 67  
 
 68  
     /** The property name for if there is a writable server. */
 69  
     public static final String WRITABLE_PROP = "writable";
 70  
 
 71  
     /** The configuration for connecting to the servers. */
 72  
     protected final MongoClientConfiguration myConfig;
 73  
 
 74  
     /** The complete list of servers. */
 75  
     protected final ConcurrentMap<String, Server> myServers;
 76  
 
 77  
     /** The range of versions within the cluster. */
 78  
     protected VersionRange myServerVersionRange;
 79  
 
 80  
     /** The smallest maximum number of operations in a batch in the cluster. */
 81  
     protected int mySmallestMaxBatchedWriteOperations;
 82  
 
 83  
     /** The smallest maximum document size in the cluster. */
 84  
     protected long mySmallestMaxBsonObjectSize;
 85  
 
 86  
     /** Support for firing property change events. */
 87  
     /* package */final PropertyChangeSupport myChangeSupport;
 88  
 
 89  
     /** The listener for changes to the server. */
 90  
     /* package */final ServerListener myListener;
 91  
 
 92  
     /** The complete list of non-writable servers. */
 93  
     /* package */final CopyOnWriteArrayList<Server> myNonWritableServers;
 94  
 
 95  
     /** The complete list of writable servers. */
 96  
     /* package */final CopyOnWriteArrayList<Server> myWritableServers;
 97  
 
 98  
     /** The type of the cluster. */
 99  
     private final ClusterType myType;
 100  
 
 101  
     /**
 102  
      * Creates a new CLusterState.
 103  
      * 
 104  
      * @param config
 105  
      *            The configuration for the cluster.
 106  
      * @param type
 107  
      *            The type of the cluster.
 108  
      */
 109  268
     public Cluster(final MongoClientConfiguration config, final ClusterType type) {
 110  268
         myConfig = config;
 111  268
         myType = type;
 112  268
         myChangeSupport = new PropertyChangeSupport(this);
 113  268
         myServers = new ConcurrentHashMap<String, Server>();
 114  268
         myWritableServers = new CopyOnWriteArrayList<Server>();
 115  268
         myNonWritableServers = new CopyOnWriteArrayList<Server>();
 116  268
         myListener = new ServerListener();
 117  268
         myServerVersionRange = VersionRange.range(Version.parse("0"),
 118  
                 Version.parse("0"));
 119  268
     }
 120  
 
 121  
     /**
 122  
      * Adds a {@link Server} to the {@link Cluster} for the address provided if
 123  
      * one does not already exist.
 124  
      * 
 125  
      * @param address
 126  
      *            The address of the {@link Server} to return.
 127  
      * @return The {@link Server} for the address.
 128  
      */
 129  
     public Server add(final InetSocketAddress address) {
 130  373
         final String normalized = ServerNameUtils.normalize(address);
 131  373
         Server server = myServers.get(normalized);
 132  373
         if (server == null) {
 133  
 
 134  343
             server = new Server(address);
 135  
 
 136  343
             synchronized (this) {
 137  343
                 final Server existing = myServers.putIfAbsent(normalized,
 138  
                         server);
 139  343
                 if (existing != null) {
 140  0
                     server = existing;
 141  
                 }
 142  
                 else {
 143  343
                     myNonWritableServers.add(server);
 144  343
                     myChangeSupport.firePropertyChange(SERVER_PROP, null,
 145  
                             server);
 146  
 
 147  343
                     server.addListener(myListener);
 148  
                 }
 149  343
             }
 150  
         }
 151  373
         return server;
 152  
     }
 153  
 
 154  
     /**
 155  
      * Adds a {@link Server} to the {@link Cluster} for the address provided if
 156  
      * one does not already exist.
 157  
      * <p>
 158  
      * This method is equivalent to calling {@link #add(InetSocketAddress)
 159  
      * add(ServerNameUtils.parse(address))}.
 160  
      * </p>
 161  
      * 
 162  
      * @param address
 163  
      *            The address of the {@link Server} to return.
 164  
      * @return The {@link Server} for the address.
 165  
      */
 166  
     public Server add(final String address) {
 167  267
         Server server = myServers.get(address);
 168  267
         if (server == null) {
 169  184
             server = add(ServerNameUtils.parse(address));
 170  
         }
 171  
 
 172  267
         return server;
 173  
     }
 174  
 
 175  
     /**
 176  
      * Adds a listener to the state.
 177  
      * 
 178  
      * @param listener
 179  
      *            The listener for the state changes.
 180  
      */
 181  
     public void addListener(final PropertyChangeListener listener) {
 182  59
         synchronized (this) {
 183  59
             myChangeSupport.addPropertyChangeListener(listener);
 184  59
         }
 185  59
     }
 186  
 
 187  
     /**
 188  
      * Removes all of the servers from the cluster.
 189  
      */
 190  
     public void clear() {
 191  2
         for (final Server server : myServers.values()) {
 192  2
             remove(server);
 193  2
         }
 194  2
     }
 195  
 
 196  
     /**
 197  
      * Returns the set of servers that can be used based on the provided
 198  
      * {@link ReadPreference}.
 199  
      * 
 200  
      * @param readPreference
 201  
      *            The {@link ReadPreference} to filter the servers.
 202  
      * @return The {@link List} of servers that can be used. Servers will be
 203  
      *         ordered by preference to be used, most preferred to least
 204  
      *         preferred.
 205  
      */
 206  
     public List<Server> findCandidateServers(final ReadPreference readPreference) {
 207  50
         List<Server> results = Collections.emptyList();
 208  
 
 209  1
         switch (readPreference.getMode()) {
 210  
         case NEAREST:
 211  2
             results = findNearestCandidates(readPreference);
 212  2
             break;
 213  
         case PRIMARY_ONLY:
 214  10
             results = findWritableCandidates(readPreference);
 215  10
             break;
 216  
         case PRIMARY_PREFERRED:
 217  5
             results = merge(findWritableCandidates(readPreference),
 218  
                     findNonWritableCandidates(readPreference));
 219  5
             break;
 220  
         case SECONDARY_ONLY:
 221  23
             results = findNonWritableCandidates(readPreference);
 222  23
             break;
 223  
         case SECONDARY_PREFERRED:
 224  4
             results = merge(findNonWritableCandidates(readPreference),
 225  
                     findWritableCandidates(readPreference));
 226  4
             break;
 227  
         case SERVER:
 228  6
             results = findCandidateServer(readPreference);
 229  
             break;
 230  
         }
 231  
 
 232  50
         return results;
 233  
     }
 234  
 
 235  
     /**
 236  
      * Locates the set of servers that can be used to send the specified
 237  
      * messages.
 238  
      * 
 239  
      * @param message1
 240  
      *            The first message to send.
 241  
      * @param message2
 242  
      *            The second message to send. May be <code>null</code>.
 243  
      * @return The servers that can be used.
 244  
      */
 245  
     public List<Server> findServers(final Message message1,
 246  
             final Message message2) {
 247  19
         List<Server> servers = Collections.emptyList();
 248  
 
 249  19
         if (message1 != null) {
 250  18
             List<Server> potentialServers = findCandidateServers(message1
 251  
                     .getReadPreference());
 252  18
             servers = potentialServers;
 253  
 
 254  18
             if (message2 != null) {
 255  5
                 servers = new ArrayList<Server>(potentialServers);
 256  5
                 potentialServers = findCandidateServers(message2
 257  
                         .getReadPreference());
 258  5
                 servers.retainAll(potentialServers);
 259  
             }
 260  
         }
 261  19
         return servers;
 262  
     }
 263  
 
 264  
     /**
 265  
      * Returns the server state for the address provided. If the {@link Server}
 266  
      * does not already exist a non-writable state is created and returned.
 267  
      * <p>
 268  
      * This method is equivalent to calling {@link #add(String) add(address)}.
 269  
      * </p>
 270  
      * 
 271  
      * @param address
 272  
      *            The address of the {@link Server} to return.
 273  
      * @return The {@link Server} for the address.
 274  
      */
 275  
     public Server get(final String address) {
 276  64
         return add(address);
 277  
     }
 278  
 
 279  
     /**
 280  
      * Returns a copy of the list of non-writable servers. The list returned is
 281  
      * a copy of the internal list and can be modified by the caller.
 282  
      * 
 283  
      * @return The complete list of non-writable servers.
 284  
      */
 285  
     public List<Server> getNonWritableServers() {
 286  2
         return new ArrayList<Server>(myNonWritableServers);
 287  
     }
 288  
 
 289  
     /**
 290  
      * Returns a copy of the list of servers. The list returned is a copy of the
 291  
      * internal list and can be modified by the caller.
 292  
      * 
 293  
      * @return The complete list of servers.
 294  
      */
 295  
     public List<Server> getServers() {
 296  140
         return new ArrayList<Server>(myServers.values());
 297  
     }
 298  
 
 299  
     /**
 300  
      * {@inheritDoc}
 301  
      */
 302  
     @Override
 303  
     public VersionRange getServerVersionRange() {
 304  0
         return myServerVersionRange;
 305  
     }
 306  
 
 307  
     /**
 308  
      * Returns smallest value for the maximum number of write operations allowed
 309  
      * in a single write command.
 310  
      * 
 311  
      * @return The smallest value for maximum number of write operations allowed
 312  
      *         in a single write command.
 313  
      */
 314  
     @Override
 315  
     public int getSmallestMaxBatchedWriteOperations() {
 316  0
         return mySmallestMaxBatchedWriteOperations;
 317  
     }
 318  
 
 319  
     /**
 320  
      * Returns the smallest value for the maximum BSON object size within the
 321  
      * cluster.
 322  
      * 
 323  
      * @return The smallest value for the maximum BSON object size within the
 324  
      *         cluster.
 325  
      */
 326  
     @Override
 327  
     public long getSmallestMaxBsonObjectSize() {
 328  0
         return mySmallestMaxBsonObjectSize;
 329  
     }
 330  
 
 331  
     /**
 332  
      * Returns the type of cluster.
 333  
      * 
 334  
      * @return The type of cluster.
 335  
      */
 336  
     public ClusterType getType() {
 337  104
         return myType;
 338  
     }
 339  
 
 340  
     /**
 341  
      * Returns a copy of the list of writable servers. The list returned is a
 342  
      * copy of the internal list and can be modified by the caller.
 343  
      * 
 344  
      * @return The complete list of writable servers.
 345  
      */
 346  
     public List<Server> getWritableServers() {
 347  34
         return new ArrayList<Server>(myWritableServers);
 348  
     }
 349  
 
 350  
     /**
 351  
      * Removes the specified server from the cluster.
 352  
      * 
 353  
      * @param server
 354  
      *            The server to remove from the cluster.
 355  
      */
 356  
     public void remove(final Server server) {
 357  
 
 358  2
         final Server removed = myServers.remove(server.getCanonicalName());
 359  2
         if (removed != null) {
 360  2
             removed.removeListener(myListener);
 361  2
             myNonWritableServers.remove(removed);
 362  2
             myWritableServers.remove(removed);
 363  
 
 364  2
             updateVersions();
 365  
         }
 366  2
     }
 367  
 
 368  
     /**
 369  
      * Removes a listener to the state.
 370  
      * 
 371  
      * @param listener
 372  
      *            The listener for the state changes.
 373  
      */
 374  
     public void removeListener(final PropertyChangeListener listener) {
 375  52
         synchronized (this) {
 376  52
             myChangeSupport.removePropertyChangeListener(listener);
 377  52
         }
 378  52
     }
 379  
 
 380  
     /**
 381  
      * Computes a relative CDF (cumulative distribution function) for the
 382  
      * servers based on the latency from the client.
 383  
      * <p>
 384  
      * The latency of each server is used to create a strict ordering of servers
 385  
      * from lowest latency to highest. The relative latency of the i'th server
 386  
      * is then calculated based on the function:
 387  
      * </p>
 388  
      * <blockquote>
 389  
      * 
 390  
      * <pre>
 391  
      *                                       latency[0]
 392  
      *                relative_latency[i] =  ----------
 393  
      *                                       latency[i]
 394  
      * </pre>
 395  
      * 
 396  
      * </blockquote>
 397  
      * <p>
 398  
      * The relative latencies are then then summed and the probability of
 399  
      * selecting each server is then calculated by:
 400  
      * </p>
 401  
      * <blockquote>
 402  
      * 
 403  
      * <pre>
 404  
      *                                  relative_latency[i]
 405  
      *     probability[i] = -------------------------------------------------
 406  
      *                      sum(relative_latency[0], ... relative_latency[n])
 407  
      * </pre>
 408  
      * 
 409  
      * </blockquote>
 410  
      * 
 411  
      * <p>
 412  
      * The CDF over these probabilities is returned.
 413  
      * </p>
 414  
      * 
 415  
      * @param servers
 416  
      *            The servers to compute the CDF for.
 417  
      * @return The CDF for the server latencies.
 418  
      */
 419  
     protected final double[] cdf(final List<Server> servers) {
 420  121
         Collections.sort(servers, ServerLatencyComparator.COMPARATOR);
 421  
 
 422  
         // Pick a server to move to the front.
 423  121
         final double[] relativeLatency = new double[servers.size()];
 424  121
         double sum = 0;
 425  121
         double first = Double.NEGATIVE_INFINITY;
 426  100212
         for (int i = 0; i < relativeLatency.length; ++i) {
 427  100091
             final Server server = servers.get(i);
 428  100091
             double latency = server.getAverageLatency();
 429  
 
 430  
             // Turn the latency into a ratio of the lowest latency.
 431  100091
             if (first == Double.NEGATIVE_INFINITY) {
 432  121
                 first = latency;
 433  121
                 latency = 1.0D; // By definition N/N = 1.0.
 434  
             }
 435  
             else {
 436  99970
                 latency /= first;
 437  
             }
 438  
 
 439  100091
             latency = (1.0D / latency); // 4 times as long is 1/4 as likely.
 440  100091
             relativeLatency[i] = latency;
 441  100091
             sum += latency;
 442  
         }
 443  
 
 444  
         // Turn the latencies into a range of 0 <= relativeLatency < 1.
 445  
         // Also known as the CDF (cumulative distribution function)
 446  121
         double accum = 0.0D;
 447  100212
         for (int i = 0; i < relativeLatency.length; ++i) {
 448  100091
             accum += relativeLatency[i];
 449  
 
 450  100091
             relativeLatency[i] = accum / sum;
 451  
         }
 452  
 
 453  121
         return relativeLatency;
 454  
     }
 455  
 
 456  
     /**
 457  
      * Finds the candidate server, if known.
 458  
      * 
 459  
      * @param readPreference
 460  
      *            The read preference to match the server against.
 461  
      * @return The Server found in a singleton list or an empty list if the
 462  
      *         server is not known.
 463  
      */
 464  
     protected List<Server> findCandidateServer(
 465  
             final ReadPreference readPreference) {
 466  6
         final Server server = myServers.get(readPreference.getServer());
 467  6
         if ((server != null) && readPreference.matches(server.getTags())) {
 468  3
             return Collections.singletonList(server);
 469  
         }
 470  3
         return Collections.emptyList();
 471  
     }
 472  
 
 473  
     /**
 474  
      * Returns the list of servers that match the read preference's tags.
 475  
      * 
 476  
      * @param readPreference
 477  
      *            The read preference to match the server against.
 478  
      * @return The servers found in order of preference. Generally this is in
 479  
      *         latency order but we randomly move one of the servers to the
 480  
      *         front of the list to distribute the load across more servers.
 481  
      * 
 482  
      * @see #sort
 483  
      */
 484  
     protected List<Server> findNearestCandidates(
 485  
             final ReadPreference readPreference) {
 486  2
         final List<Server> results = new ArrayList<Server>(myServers.size());
 487  2
         for (final Server server : myServers.values()) {
 488  6
             if (readPreference.matches(server.getTags())) {
 489  5
                 results.add(server);
 490  
             }
 491  6
         }
 492  
 
 493  
         // Sort the server by preference.
 494  2
         sort(results);
 495  
 
 496  2
         return results;
 497  
     }
 498  
 
 499  
     /**
 500  
      * Returns the list of non-writable servers that match the read preference's
 501  
      * tags.
 502  
      * 
 503  
      * @param readPreference
 504  
      *            The read preference to match the server against.
 505  
      * @return The servers found in order of preference. Generally this is in
 506  
      *         latency order but we randomly move one of the servers to the
 507  
      *         front of the list to distribute the load across more servers.
 508  
      * 
 509  
      * @see #sort
 510  
      */
 511  
     protected List<Server> findNonWritableCandidates(
 512  
             final ReadPreference readPreference) {
 513  32
         final List<Server> results = new ArrayList<Server>(
 514  
                 myNonWritableServers.size());
 515  32
         for (final Server server : myNonWritableServers) {
 516  90
             if (readPreference.matches(server.getTags())
 517  
                     && isRecentEnough(server.getSecondsBehind())) {
 518  84
                 results.add(server);
 519  
             }
 520  90
         }
 521  
 
 522  
         // Sort the server by preference.
 523  32
         sort(results);
 524  
 
 525  32
         return results;
 526  
     }
 527  
 
 528  
     /**
 529  
      * Returns the list of writable servers that match the read preference's
 530  
      * tags.
 531  
      * 
 532  
      * @param readPreference
 533  
      *            The read preference to match the server against.
 534  
      * @return The servers found in order of preference. Generally this is in
 535  
      *         latency order but we randomly move one of the servers to the
 536  
      *         front of the list to distribute the load across more servers.
 537  
      * 
 538  
      * @see #sort
 539  
      */
 540  
     protected List<Server> findWritableCandidates(
 541  
             final ReadPreference readPreference) {
 542  19
         final List<Server> results = new ArrayList<Server>(
 543  
                 myWritableServers.size());
 544  19
         for (final Server server : myWritableServers) {
 545  25
             if (readPreference.matches(server.getTags())) {
 546  18
                 results.add(server);
 547  
             }
 548  25
         }
 549  
 
 550  
         // Sort the server by preference.
 551  19
         sort(results);
 552  
 
 553  19
         return results;
 554  
     }
 555  
 
 556  
     /**
 557  
      * Sorts the servers based on the latency from the client.
 558  
      * <p>
 559  
      * To distribute the requests across servers more evenly the first server is
 560  
      * replaced with a random server based on a single sided simplified Gaussian
 561  
      * distribution.
 562  
      * </p>
 563  
      * 
 564  
      * @param servers
 565  
      *            The servers to be sorted.
 566  
      * 
 567  
      * @see #cdf(List)
 568  
      */
 569  
     protected final void sort(final List<Server> servers) {
 570  153
         if (servers.isEmpty() || (servers.size() == 1)) {
 571  33
             return;
 572  
         }
 573  
 
 574  
         // Pick a server to move to the front.
 575  120
         final double[] cdf = cdf(servers);
 576  120
         final double random = Math.random();
 577  120
         int index = Arrays.binarySearch(cdf, random);
 578  
 
 579  
         // Probably a negative index since not expecting an exact match.
 580  120
         if (index < 0) {
 581  
             // Undo (-(insertion point) - 1)
 582  120
             index = Math.abs(index + 1);
 583  
         }
 584  
 
 585  
         // Should not be needed. random should be < 1.0 and
 586  
         // relativeLatency[relativeLatency.length] == 1.0
 587  
         //
 588  
         // assert (random < 1.0D) :
 589  
         // "The random value should be strictly less than 1.0.";
 590  
         // assert (cdf[cdf.length - 1] <= 1.0001) :
 591  
         // "The cdf of the last server should be 1.0.";
 592  
         // assert (0.9999 <= cdf[cdf.length - 1]) :
 593  
         // "The cdf of the last server should be 1.0.";
 594  120
         index = Math.min(cdf.length - 1, index);
 595  
 
 596  
         // Swap the lucky winner into the first position.
 597  120
         Collections.swap(servers, 0, index);
 598  120
     }
 599  
 
 600  
     /**
 601  
      * Updates the min/max versions across all servers. Since the max BSON
 602  
      * object size is tied to the version we also update that value.
 603  
      */
 604  
     protected void updateVersions() {
 605  102
         Version min = null;
 606  102
         Version max = null;
 607  
 
 608  102
         long smallestMaxBsonObjectSize = Long.MAX_VALUE;
 609  102
         int smallestMaxBatchedWriteOperations = Integer.MAX_VALUE;
 610  
 
 611  102
         for (final Server server : myServers.values()) {
 612  100
             min = Version.earlier(min, server.getVersion());
 613  100
             max = Version.later(max, server.getVersion());
 614  
 
 615  100
             smallestMaxBsonObjectSize = Math.min(smallestMaxBsonObjectSize,
 616  
                     server.getMaxBsonObjectSize());
 617  100
             smallestMaxBatchedWriteOperations = Math.min(
 618  
                     smallestMaxBatchedWriteOperations,
 619  
                     server.getMaxBatchedWriteOperations());
 620  100
         }
 621  
 
 622  102
         myServerVersionRange = VersionRange.range(min, max);
 623  102
         mySmallestMaxBsonObjectSize = smallestMaxBsonObjectSize;
 624  102
         mySmallestMaxBatchedWriteOperations = smallestMaxBatchedWriteOperations;
 625  102
     }
 626  
 
 627  
     /**
 628  
      * Returns true if the server is recent enough to be queried.
 629  
      * 
 630  
      * @param secondsBehind
 631  
      *            The number of seconds the server is behind.
 632  
      * @return True if the server is recent enough to be queried, false
 633  
      *         otherwise.
 634  
      */
 635  
     private boolean isRecentEnough(final double secondsBehind) {
 636  85
         return ((secondsBehind * 1000) < myConfig.getMaxSecondaryLag());
 637  
     }
 638  
 
 639  
     /**
 640  
      * Merges the two lists into a single list.
 641  
      * 
 642  
      * @param list1
 643  
      *            The first list of servers.
 644  
      * @param list2
 645  
      *            The second list of servers.
 646  
      * @return The 2 lists of servers merged into a single list.
 647  
      */
 648  
     private final List<Server> merge(final List<Server> list1,
 649  
             final List<Server> list2) {
 650  
         List<Server> results;
 651  9
         if (list1.isEmpty()) {
 652  3
             results = list2;
 653  
         }
 654  6
         else if (list2.isEmpty()) {
 655  1
             results = list1;
 656  
         }
 657  
         else {
 658  5
             results = new ArrayList<Server>(list1.size() + list2.size());
 659  5
             results.addAll(list1);
 660  5
             results.addAll(list2);
 661  
         }
 662  9
         return results;
 663  
     }
 664  
 
 665  
     /**
 666  
      * ServerListener provides a listener for the state updates of the
 667  
      * {@link Server}.
 668  
      * 
 669  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 670  
      *         be mutated in incompatible ways between any two releases of the
 671  
      *         driver.
 672  
      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 673  
      */
 674  268
     protected final class ServerListener implements PropertyChangeListener {
 675  
         @Override
 676  
         public void propertyChange(final PropertyChangeEvent evt) {
 677  645
             final String propertyName = evt.getPropertyName();
 678  645
             final Server server = (Server) evt.getSource();
 679  
 
 680  645
             if (Server.STATE_PROP.equals(propertyName)) {
 681  
 
 682  153
                 final boolean old = !myWritableServers.isEmpty();
 683  
 
 684  153
                 if (Server.State.WRITABLE == evt.getNewValue()) {
 685  77
                     myWritableServers.addIfAbsent(server);
 686  77
                     myNonWritableServers.remove(server);
 687  
                 }
 688  76
                 else if (Server.State.READ_ONLY == evt.getNewValue()) {
 689  47
                     myWritableServers.remove(server);
 690  47
                     myNonWritableServers.addIfAbsent(server);
 691  
                 }
 692  
                 else {
 693  29
                     myWritableServers.remove(server);
 694  29
                     myNonWritableServers.remove(server);
 695  
                 }
 696  
 
 697  153
                 myChangeSupport.firePropertyChange(WRITABLE_PROP, old,
 698  
                         !myWritableServers.isEmpty());
 699  
 
 700  153
             }
 701  492
             else if (Server.CANONICAL_NAME_PROP.equals(propertyName)) {
 702  
                 // Resolved a new canonical name. e.g., What the server
 703  
                 // calls itself in the cluster.
 704  
 
 705  
                 // Remove the entry with the old name.
 706  4
                 myServers.remove(evt.getOldValue(), server);
 707  
 
 708  
                 // And add with the new name. Checking for duplicate entries.
 709  4
                 final Server existing = myServers.putIfAbsent(
 710  
                         server.getCanonicalName(), server);
 711  4
                 if (existing != null) {
 712  
                     // Already have a Server with that name. Remove the listener
 713  
                     // and let this server get garbage collected.
 714  2
                     myNonWritableServers.remove(server);
 715  2
                     myWritableServers.remove(server);
 716  2
                     server.removeListener(myListener);
 717  
 
 718  2
                     myChangeSupport.firePropertyChange(SERVER_PROP, server,
 719  
                             null);
 720  
                 }
 721  4
             }
 722  488
             else if (Server.VERSION_PROP.equals(propertyName)) {
 723  
                 // If the old version is either the high or low for the cluster
 724  
                 // (or the version is UNKNOWN) then recompute the high/low
 725  
                 // versions.
 726  100
                 final Version old = (Version) evt.getOldValue();
 727  
 
 728  100
                 if (Version.UNKNOWN.equals(old)
 729  
                         || (myServerVersionRange.getUpperBounds()
 730  
                                 .compareTo(old) <= 0)
 731  
                         || (myServerVersionRange.getLowerBounds()
 732  
                                 .compareTo(old) >= 0)) {
 733  100
                     updateVersions();
 734  
                 }
 735  
             }
 736  645
         }
 737  
     }
 738  
 }