Coverage Report - com.allanbank.mongodb.client.state.Server
 
Classes in this File Line Coverage Branch Coverage Complexity
Server
97%
180/185
89%
66/74
1.972
Server$State
100%
5/5
N/A
1.972
 
 1  
 /*
 2  
  * #%L
 3  
  * Server.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.state;
 21  
 
 22  
 import java.beans.PropertyChangeListener;
 23  
 import java.beans.PropertyChangeSupport;
 24  
 import java.net.InetSocketAddress;
 25  
 import java.util.Arrays;
 26  
 import java.util.Collection;
 27  
 import java.util.Collections;
 28  
 import java.util.List;
 29  
 import java.util.concurrent.TimeUnit;
 30  
 import java.util.concurrent.atomic.AtomicLong;
 31  
 
 32  
 import com.allanbank.mongodb.Version;
 33  
 import com.allanbank.mongodb.bson.Document;
 34  
 import com.allanbank.mongodb.bson.Element;
 35  
 import com.allanbank.mongodb.bson.NumericElement;
 36  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 37  
 import com.allanbank.mongodb.bson.element.BooleanElement;
 38  
 import com.allanbank.mongodb.bson.element.DocumentElement;
 39  
 import com.allanbank.mongodb.bson.element.StringElement;
 40  
 import com.allanbank.mongodb.bson.element.TimestampElement;
 41  
 import com.allanbank.mongodb.client.Client;
 42  
 import com.allanbank.mongodb.util.ServerNameUtils;
 43  
 
 44  
 /**
 45  
  * Server provides tracking of the state of a single MongoDB server.
 46  
  * 
 47  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 48  
  *         mutated in incompatible ways between any two releases of the driver.
 49  
  * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 50  
  */
 51  
 public class Server {
 52  
 
 53  
     /** The name for the Server's canonical name property: '{@value} '. */
 54  
     public static final String CANONICAL_NAME_PROP = "canonicalName";
 55  
 
 56  
     /** The decay rate for the exponential average for the latency. */
 57  
     public static final double DECAY_ALPHA;
 58  
 
 59  
     /** The decay period (number of samples) for the average latency. */
 60  
     public static final double DECAY_SAMPLES = 1000.0D;
 61  
 
 62  
     /** The default MongoDB port. */
 63  
     public static final int DEFAULT_PORT = ServerNameUtils.DEFAULT_PORT;
 64  
 
 65  
     /** The document element type. */
 66  1
     public static final Class<DocumentElement> DOCUMENT_TYPE = DocumentElement.class;
 67  
 
 68  
     /** The default number of max batched write operations. */
 69  
     public static final int MAX_BATCHED_WRITE_OPERATIONS_DEFAULT = 1000;
 70  
 
 71  
     /** The name for the Server's maximum BSON object size property: {@value} . */
 72  
     public static final String MAX_BATCHED_WRITE_OPERATIONS_PROP = "maxWriteBatchSize";
 73  
 
 74  
     /** The name for the Server's maximum BSON object size property: {@value} . */
 75  
     public static final String MAX_BSON_OBJECT_SIZE_PROP = "maxBsonObjectSize";
 76  
 
 77  
     /** The numeric element type. */
 78  1
     public static final Class<NumericElement> NUMERIC_TYPE = NumericElement.class;
 79  
 
 80  
     /** The value for a primary server's state. */
 81  
     public static final int PRIMARY_STATE = 1;
 82  
 
 83  
     /** The value for a secondary (actively replicating) server's state. */
 84  
     public static final int SECONDARY_STATE = 2;
 85  
 
 86  
     /** The name for the Server's state property: {@value} . */
 87  
     public static final String STATE_PROP = "state";
 88  
 
 89  
     /** The string element type. */
 90  1
     public static final Class<StringElement> STRING_TYPE = StringElement.class;
 91  
 
 92  
     /** The name for the Server's tags property: {@value} . */
 93  
     public static final String TAGS_PROP = "tags";
 94  
 
 95  
     /** The timestamp element type. */
 96  1
     public static final Class<TimestampElement> TIMESTAMP_TYPE = TimestampElement.class;
 97  
 
 98  
     /** The name for the Server's version property: {@value} . */
 99  
     public static final String VERSION_PROP = "version";
 100  
 
 101  
     /** The number of nano-seconds per milli-second. */
 102  1
     private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS
 103  
             .toNanos(1);
 104  
 
 105  
     static {
 106  1
         DECAY_ALPHA = (2.0D / (DECAY_SAMPLES + 1));
 107  1
     }
 108  
 
 109  
     /**
 110  
      * Tracks the average latency for the server connection. This is set when
 111  
      * the connection to the server is first created and then updated
 112  
      * periodically using an exponential moving average.
 113  
      */
 114  
     private volatile double myAverageLatency;
 115  
 
 116  
     /**
 117  
      * The socket address provided by the user. This address will not be
 118  
      * updated.
 119  
      */
 120  
     private final InetSocketAddress myCanonicalAddress;
 121  
 
 122  
     /**
 123  
      * The host name for the {@link #myCanonicalAddress}. This is use to
 124  
      * re-resolve the IP address when a connection failure is experienced.
 125  
      */
 126  
     private final String myCanonicalHostName;
 127  
 
 128  
     /** The normalized name of the server being tracked. */
 129  
     private volatile String myCanonicalName;
 130  
 
 131  
     /** Provides support for the sending of property change events. */
 132  
     private final PropertyChangeSupport myEventSupport;
 133  
 
 134  
     /** The time of the last version update. */
 135  1365
     private long myLastVersionUpdate = 0;
 136  
 
 137  
     /**
 138  
      * The maximum number of write operations allowed in a single write command.
 139  
      * Defaults to {@value #MAX_BATCHED_WRITE_OPERATIONS_DEFAULT}.
 140  
      */
 141  1365
     private volatile int myMaxBatchedWriteOperations = MAX_BATCHED_WRITE_OPERATIONS_DEFAULT;
 142  
 
 143  
     /**
 144  
      * The maximum BSON object size the server will accept. Defaults to
 145  
      * {@link Client#MAX_DOCUMENT_SIZE}.
 146  
      */
 147  1365
     private volatile int myMaxBsonObjectSize = Client.MAX_DOCUMENT_SIZE;
 148  
 
 149  
     /** The number of messages sent to the server. */
 150  
     private final AtomicLong myMessagesSent;
 151  
 
 152  
     /** The number of messages received from the server. */
 153  
     private final AtomicLong myRepliesReceived;
 154  
 
 155  
     /**
 156  
      * Tracks the last report of how many seconds the server is behind the
 157  
      * primary.
 158  
      */
 159  
     private volatile double mySecondsBehind;
 160  
 
 161  
     /** Tracking the state of the server. */
 162  
     private volatile State myState;
 163  
 
 164  
     /** Tracking the tags for the server. */
 165  
     private volatile Document myTags;
 166  
 
 167  
     /** The total amount of latency for sending messages to the server. */
 168  
     private final AtomicLong myTotalLatency;
 169  
 
 170  
     /** The version of the server. */
 171  
     private Version myVersion;
 172  
 
 173  
     /**
 174  
      * The socket address being actively used. This will be re-created using the
 175  
      * server's hostname if a connection attempt fails.
 176  
      */
 177  
     private volatile InetSocketAddress myWorkingAddress;
 178  
 
 179  
     /**
 180  
      * Creates a new {@link Server}. Package private to force creation through
 181  
      * the {@link Cluster}.
 182  
      * 
 183  
      * @param server
 184  
      *            The server being tracked.
 185  
      */
 186  1365
     /* package */Server(final InetSocketAddress server) {
 187  1365
         myCanonicalAddress = server;
 188  1365
         myCanonicalHostName = server.getHostName();
 189  1365
         myCanonicalName = ServerNameUtils.normalize(server);
 190  1365
         myWorkingAddress = myCanonicalAddress;
 191  
 
 192  1365
         myEventSupport = new PropertyChangeSupport(this);
 193  
 
 194  1365
         myMessagesSent = new AtomicLong(0);
 195  1365
         myRepliesReceived = new AtomicLong(0);
 196  1365
         myTotalLatency = new AtomicLong(0);
 197  
 
 198  1365
         myState = State.UNKNOWN;
 199  1365
         myAverageLatency = Double.MAX_VALUE;
 200  1365
         mySecondsBehind = Double.MAX_VALUE;
 201  1365
         myTags = null;
 202  
 
 203  1365
         myVersion = Version.UNKNOWN;
 204  1365
     }
 205  
 
 206  
     /**
 207  
      * Add a PropertyChangeListener to receive all future property changes for
 208  
      * the {@link Server}.
 209  
      * 
 210  
      * @param listener
 211  
      *            The PropertyChangeListener to be added
 212  
      * 
 213  
      * @see PropertyChangeSupport#addPropertyChangeListener(PropertyChangeListener)
 214  
      */
 215  
     public void addListener(final PropertyChangeListener listener) {
 216  343
         myEventSupport.addPropertyChangeListener(listener);
 217  
 
 218  343
     }
 219  
 
 220  
     /**
 221  
      * Notification that an attempt to connect to the server via the all of the
 222  
      * {@link #getAddresses() addresses provided} failed.
 223  
      */
 224  
     public void connectFailed() {
 225  28
         final State oldValue = myState;
 226  
 
 227  28
         myWorkingAddress = null;
 228  28
         myState = State.UNAVAILABLE;
 229  
 
 230  28
         myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
 231  28
     }
 232  
 
 233  
     /**
 234  
      * Notification that a connection has closed normally. This will leave the
 235  
      * connection in the last known state even if it is the last open
 236  
      * connection.
 237  
      */
 238  
     public void connectionClosed() {
 239  
         // Nothing for now....
 240  0
     }
 241  
 
 242  
     /**
 243  
      * Notification that a connection was successfully opened to the server. The
 244  
      * {@link InetSocketAddress} provided becomes the preferred address to use
 245  
      * when connecting to the server.
 246  
      * 
 247  
      * @param addressUsed
 248  
      *            The address that was used to connect to the server.
 249  
      */
 250  
     public void connectionOpened(final InetSocketAddress addressUsed) {
 251  176
         myWorkingAddress = addressUsed;
 252  176
     }
 253  
 
 254  
     /**
 255  
      * Notification that a connection has closed abruptly. This will normally
 256  
      * transition the connection to an unknown state.
 257  
      */
 258  
     public void connectionTerminated() {
 259  12
         final State oldValue = myState;
 260  
 
 261  12
         myWorkingAddress = null;
 262  12
         myState = State.UNAVAILABLE;
 263  
 
 264  12
         myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
 265  12
     }
 266  
 
 267  
     /**
 268  
      * {@inheritDoc}
 269  
      * <p>
 270  
      * Overridden to return a stable equality check. This is based only on the
 271  
      * server object's identity. The {@link Cluster} class will de-duplicate
 272  
      * once the canonical host names are determined.
 273  
      * </p>
 274  
      */
 275  
     @Override
 276  
     public boolean equals(final Object object) {
 277  509
         return (this == object);
 278  
     }
 279  
 
 280  
     /**
 281  
      * Returns the address of the server being tracked.
 282  
      * 
 283  
      * @return The address of the server being tracked.
 284  
      */
 285  
     public Collection<InetSocketAddress> getAddresses() {
 286  219
         if (myWorkingAddress == null) {
 287  15
             myWorkingAddress = InetSocketAddress.createUnresolved(
 288  
                     myCanonicalHostName, myCanonicalAddress.getPort());
 289  
         }
 290  
 
 291  219
         if (myCanonicalAddress == myWorkingAddress) {
 292  204
             return Collections.singleton(myCanonicalAddress);
 293  
         }
 294  15
         return Arrays.asList(myWorkingAddress, myCanonicalAddress);
 295  
     }
 296  
 
 297  
     /**
 298  
      * Returns the current average latency (in milliseconds) seen in issuing
 299  
      * requests to the server. If the latency returns {@link Double#MAX_VALUE}
 300  
      * then we have no basis for determining the latency.
 301  
      * <p>
 302  
      * This average is over the recent replies not over all replies received.
 303  
      * </p>
 304  
      * 
 305  
      * @return The current average latency (in milliseconds) seen in issuing
 306  
      *         requests to the server.
 307  
      */
 308  
     public double getAverageLatency() {
 309  2136598
         return myAverageLatency;
 310  
     }
 311  
 
 312  
     /**
 313  
      * Returns the name of the server as reported by the server itself.
 314  
      * 
 315  
      * @return The name of the server as reported by the server itself.
 316  
      */
 317  
     public String getCanonicalName() {
 318  432
         return myCanonicalName;
 319  
     }
 320  
 
 321  
     /**
 322  
      * Returns the maximum number of write operations allowed in a single write
 323  
      * command. Defaults to {@value #MAX_BATCHED_WRITE_OPERATIONS_DEFAULT}.
 324  
      * 
 325  
      * @return The maximum number of write operations allowed in a single write
 326  
      *         command.
 327  
      */
 328  
     public int getMaxBatchedWriteOperations() {
 329  105
         return myMaxBatchedWriteOperations;
 330  
     }
 331  
 
 332  
     /**
 333  
      * Returns the maximum BSON object size the server will accept. Defaults to
 334  
      * {@link Client#MAX_DOCUMENT_SIZE}.
 335  
      * 
 336  
      * @return The maximum BSON object size the server will accept.
 337  
      */
 338  
     public int getMaxBsonObjectSize() {
 339  438
         return myMaxBsonObjectSize;
 340  
     }
 341  
 
 342  
     /**
 343  
      * Returns the number of messages sent to the server.
 344  
      * 
 345  
      * @return The number of messages sent to the server.
 346  
      */
 347  
     public long getMessagesSent() {
 348  3
         return myMessagesSent.get();
 349  
     }
 350  
 
 351  
     /**
 352  
      * Returns the number of messages received from the server.
 353  
      * 
 354  
      * @return The number of messages received from the server.
 355  
      */
 356  
     public long getRepliesReceived() {
 357  3
         return myRepliesReceived.get();
 358  
     }
 359  
 
 360  
     /**
 361  
      * Sets the last reported seconds behind the primary.
 362  
      * 
 363  
      * @return The seconds behind the primary server.
 364  
      */
 365  
     public double getSecondsBehind() {
 366  85
         return mySecondsBehind;
 367  
     }
 368  
 
 369  
     /**
 370  
      * Returns the state value.
 371  
      * 
 372  
      * @return The state value.
 373  
      */
 374  
     public State getState() {
 375  12
         return myState;
 376  
     }
 377  
 
 378  
     /**
 379  
      * Returns the tags for the server.
 380  
      * 
 381  
      * @return The tags for the server.
 382  
      */
 383  
     public Document getTags() {
 384  148
         return myTags;
 385  
     }
 386  
 
 387  
     /**
 388  
      * Returns the total amount of time messages waited for a reply from the
 389  
      * server in nanoseconds. The average latency is approximately
 390  
      * {@link #getTotalLatencyNanoSeconds()}/{@link #getRepliesReceived()}.
 391  
      * 
 392  
      * @return The total amount of time messages waited for a reply from the
 393  
      *         server in nanoseconds.
 394  
      */
 395  
     public long getTotalLatencyNanoSeconds() {
 396  1
         return myTotalLatency.get();
 397  
     }
 398  
 
 399  
     /**
 400  
      * Returns the version of the server.
 401  
      * 
 402  
      * @return The version of the server.
 403  
      */
 404  
     public Version getVersion() {
 405  546
         return myVersion;
 406  
     }
 407  
 
 408  
     /**
 409  
      * {@inheritDoc}
 410  
      * <p>
 411  
      * Overridden to return a stable hash for the server. This is based only on
 412  
      * the server object's {@link System#identityHashCode(Object) identity hash
 413  
      * code}. The {@link Cluster} class will de-duplicate once the canonical
 414  
      * host names are determined.
 415  
      * </p>
 416  
      */
 417  
     @Override
 418  
     public int hashCode() {
 419  520
         return System.identityHashCode(this);
 420  
     }
 421  
 
 422  
     /**
 423  
      * Increments the number of messages sent to the server.
 424  
      */
 425  
     public void incrementMessagesSent() {
 426  326
         myMessagesSent.incrementAndGet();
 427  326
     }
 428  
 
 429  
     /**
 430  
      * Increments the number of messages received from the server.
 431  
      */
 432  
     public void incrementRepliesReceived() {
 433  220
         myRepliesReceived.incrementAndGet();
 434  220
     }
 435  
 
 436  
     /**
 437  
      * Returns true if the server can be written to, false otherwise.
 438  
      * <p>
 439  
      * If writable it might be a standalone server, the primary in a replica
 440  
      * set, or a mongos in a sharded configuration. If not writable it is a
 441  
      * secondary server in a replica set.
 442  
      * </p>
 443  
      * 
 444  
      * @return True if the server can be written to, false otherwise.
 445  
      */
 446  
     public boolean isWritable() {
 447  12
         return (myState == State.WRITABLE);
 448  
     }
 449  
 
 450  
     /**
 451  
      * Returns true if there has not been a recent update to the server's
 452  
      * version or maximum document size.
 453  
      * 
 454  
      * @return True if there has not been a recent update to the server's
 455  
      *         version or maximum document size.
 456  
      */
 457  
     public boolean needBuildInfo() {
 458  172
         final long now = System.currentTimeMillis();
 459  172
         final long tenMinutesAgo = now - TimeUnit.MINUTES.toMillis(10);
 460  
 
 461  172
         return Version.UNKNOWN.equals(myVersion)
 462  
                 || (myLastVersionUpdate < tenMinutesAgo);
 463  
     }
 464  
 
 465  
     /**
 466  
      * Remove a PropertyChangeListener to stop receiving future property changes
 467  
      * for the {@link Server}.
 468  
      * 
 469  
      * @param listener
 470  
      *            The PropertyChangeListener to be removed
 471  
      * 
 472  
      * @see PropertyChangeSupport#removePropertyChangeListener(PropertyChangeListener)
 473  
      */
 474  
     public void removeListener(final PropertyChangeListener listener) {
 475  4
         myEventSupport.removePropertyChangeListener(listener);
 476  4
     }
 477  
 
 478  
     /**
 479  
      * Notification that a status request message on the connection failed.
 480  
      * <p>
 481  
      * In the case of an exception the seconds behind is set to
 482  
      * {@link Integer#MAX_VALUE}. The value is configurable as a long so in
 483  
      * theory a user can ignore this case using a large
 484  
      * {@link com.allanbank.mongodb.MongoClientConfiguration#setMaxSecondaryLag(long)}
 485  
      * .
 486  
      * </p>
 487  
      */
 488  
     public void requestFailed() {
 489  23
         mySecondsBehind = Integer.MAX_VALUE;
 490  23
     }
 491  
 
 492  
     /**
 493  
      * {@inheritDoc}
 494  
      * <p>
 495  
      * Overridden to to return a human readable version of the server state.
 496  
      * </p>
 497  
      */
 498  
     @Override
 499  
     public String toString() {
 500  16
         final StringBuilder builder = new StringBuilder();
 501  
 
 502  16
         builder.append(getCanonicalName());
 503  16
         builder.append("(");
 504  16
         builder.append(myState);
 505  16
         builder.append(",");
 506  16
         if (myTags != null) {
 507  1
             builder.append("T,");
 508  
         }
 509  16
         builder.append(getAverageLatency());
 510  16
         builder.append(")");
 511  
 
 512  16
         return builder.toString();
 513  
     }
 514  
 
 515  
     /**
 516  
      * Updates the state of the server based on the document provided. The
 517  
      * document should be the reply to either a {@code ismaster} or
 518  
      * {@code replSetGetStatus} command.
 519  
      * 
 520  
      * @param document
 521  
      *            The document with the state of the server.
 522  
      */
 523  
     public void update(final Document document) {
 524  432
         updateState(document);
 525  431
         updateSecondsBehind(document);
 526  431
         updateTags(document);
 527  431
         updateName(document);
 528  431
         updateVersion(document);
 529  431
         updateMaxBsonObjectSize(document);
 530  432
         updateMaxWriteOperations(document);
 531  432
     }
 532  
 
 533  
     /**
 534  
      * Updates the average latency (in nano-seconds) for the server.
 535  
      * 
 536  
      * @param latencyNanoSeconds
 537  
      *            The latency seen sending a request and receiving a reply from
 538  
      *            the server.
 539  
      */
 540  
     public void updateAverageLatency(final long latencyNanoSeconds) {
 541  2304
         myTotalLatency.addAndGet(latencyNanoSeconds);
 542  
 
 543  2304
         final double latency = latencyNanoSeconds / NANOS_PER_MILLI;
 544  2304
         final double oldAverage = myAverageLatency;
 545  2304
         if (Double.MAX_VALUE == oldAverage) {
 546  1139
             myAverageLatency = latency;
 547  1139
             if (mySecondsBehind == Double.MAX_VALUE) {
 548  1139
                 mySecondsBehind = 0.0;
 549  
             }
 550  
         }
 551  
         else {
 552  1165
             myAverageLatency = (DECAY_ALPHA * latency)
 553  
                     + ((1.0D - DECAY_ALPHA) * oldAverage);
 554  
         }
 555  2304
     }
 556  
 
 557  
     /**
 558  
      * Extract any {@code maxBsonObjectSize} from the reply.
 559  
      * 
 560  
      * @param isMasterReply
 561  
      *            The reply to the {@code ismaster} command.
 562  
      */
 563  
     private void updateMaxBsonObjectSize(final Document isMasterReply) {
 564  431
         final int oldValue = myMaxBsonObjectSize;
 565  
 
 566  431
         final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE,
 567  
                 MAX_BSON_OBJECT_SIZE_PROP);
 568  432
         if (maxSize != null) {
 569  155
             myMaxBsonObjectSize = maxSize.getIntValue();
 570  
         }
 571  
 
 572  432
         myEventSupport.firePropertyChange(MAX_BSON_OBJECT_SIZE_PROP, oldValue,
 573  
                 myMaxBsonObjectSize);
 574  432
     }
 575  
 
 576  
     /**
 577  
      * Extract any {@code maxWriteBatchSize} from the reply.
 578  
      * 
 579  
      * @param isMasterReply
 580  
      *            The reply to the {@code ismaster} command.
 581  
      */
 582  
     private void updateMaxWriteOperations(final Document isMasterReply) {
 583  432
         final int oldValue = myMaxBatchedWriteOperations;
 584  
 
 585  432
         final NumericElement maxSize = isMasterReply.findFirst(NUMERIC_TYPE,
 586  
                 MAX_BATCHED_WRITE_OPERATIONS_PROP);
 587  432
         if (maxSize != null) {
 588  2
             myMaxBatchedWriteOperations = maxSize.getIntValue();
 589  
         }
 590  
 
 591  432
         myEventSupport.firePropertyChange(MAX_BATCHED_WRITE_OPERATIONS_PROP,
 592  
                 oldValue, myMaxBatchedWriteOperations);
 593  432
     }
 594  
 
 595  
     /**
 596  
      * Updates the canonical name for the server based on the response to the
 597  
      * {@code ismaster} command.
 598  
      * 
 599  
      * @param isMasterReply
 600  
      *            The reply to the {@code ismaster} command.
 601  
      */
 602  
     private void updateName(final Document isMasterReply) {
 603  432
         final String oldValue = myCanonicalName;
 604  
 
 605  431
         final Element element = isMasterReply.findFirst("me");
 606  432
         if (element != null) {
 607  15
             final String name = element.getValueAsString();
 608  15
             if ((name != null) && !myCanonicalName.equals(name)) {
 609  5
                 myCanonicalName = name;
 610  
             }
 611  
         }
 612  
 
 613  431
         myEventSupport.firePropertyChange(CANONICAL_NAME_PROP, oldValue,
 614  
                 myCanonicalName);
 615  431
     }
 616  
 
 617  
     /**
 618  
      * Extract the number of seconds this Server is behind the primary by
 619  
      * comparing its latest optime with that of the absolute latest optime.
 620  
      * <p>
 621  
      * To account for idle servers we use the optime for each server and assign
 622  
      * a value of zero to the "latest" optime and then subtract the remaining
 623  
      * servers from that optime.
 624  
      * </p>
 625  
      * <p>
 626  
      * Lastly, the state of the server is also checked and the seconds behind is
 627  
      * set to {@link Double#MAX_VALUE} if not in the primary (
 628  
      * {@value #PRIMARY_STATE}) or secondary ({@value #SECONDARY_STATE}).
 629  
      * </p>
 630  
      * 
 631  
      * @param replicaStateDoc
 632  
      *            The document to extract the seconds behind from.
 633  
      */
 634  
     private void updateSecondsBehind(final Document replicaStateDoc) {
 635  431
         final State oldValue = myState;
 636  
 
 637  431
         final NumericElement state = replicaStateDoc.get(NUMERIC_TYPE,
 638  
                 "myState");
 639  431
         if (state != null) {
 640  1
             final int value = state.getIntValue();
 641  1
             if (value == PRIMARY_STATE) {
 642  0
                 myState = State.WRITABLE;
 643  0
                 mySecondsBehind = 0;
 644  
             }
 645  1
             else if (value == SECONDARY_STATE) {
 646  1
                 myState = State.READ_ONLY;
 647  
 
 648  1
                 TimestampElement serverTimestamp = null;
 649  1
                 final StringElement expectedName = new StringElement("name",
 650  
                         myCanonicalName);
 651  1
                 for (final DocumentElement member : replicaStateDoc.find(
 652  
                         DOCUMENT_TYPE, "members", ".*")) {
 653  3
                     if (expectedName.equals(member.get("name"))
 654  
                             && (member.get(TIMESTAMP_TYPE, "optimeDate") != null)) {
 655  
 
 656  1
                         serverTimestamp = member.get(TIMESTAMP_TYPE,
 657  
                                 "optimeDate");
 658  
                     }
 659  3
                 }
 660  
 
 661  1
                 if (serverTimestamp != null) {
 662  1
                     TimestampElement latestTimestamp = serverTimestamp;
 663  1
                     for (final TimestampElement time : replicaStateDoc.find(
 664  
                             TIMESTAMP_TYPE, "members", ".*", "optimeDate")) {
 665  3
                         if (latestTimestamp.getTime() < time.getTime()) {
 666  1
                             latestTimestamp = time;
 667  
                         }
 668  3
                     }
 669  
 
 670  1
                     final double msBehind = latestTimestamp.getTime()
 671  
                             - serverTimestamp.getTime();
 672  1
                     mySecondsBehind = (msBehind / TimeUnit.SECONDS.toMillis(1));
 673  
                 }
 674  1
             }
 675  
             else {
 676  
                 // "myState" != 1 and "myState" != 2
 677  0
                 mySecondsBehind = Double.MAX_VALUE;
 678  0
                 myState = State.UNAVAILABLE;
 679  
             }
 680  
         }
 681  
 
 682  431
         myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
 683  431
     }
 684  
 
 685  
     /**
 686  
      * Extract the if the result implies that the server is writable.
 687  
      * 
 688  
      * @param isMasterReply
 689  
      *            The document to extract the seconds behind from.
 690  
      */
 691  
     private void updateState(final Document isMasterReply) {
 692  432
         final State oldValue = myState;
 693  
 
 694  432
         BooleanElement element = isMasterReply.findFirst(BooleanElement.class,
 695  
                 "ismaster");
 696  432
         if (element != null) {
 697  186
             if (element.getValue()) {
 698  108
                 myState = State.WRITABLE;
 699  108
                 mySecondsBehind = 0.0;
 700  
             }
 701  
             else {
 702  78
                 element = isMasterReply.findFirst(BooleanElement.class,
 703  
                         "secondary");
 704  78
                 if ((element != null) && element.getValue()) {
 705  77
                     myState = State.READ_ONLY;
 706  
                     // Check the seconds behind for default values.
 707  
                     // This protects from not being able to get the replica set
 708  
                     // status due to permissions.
 709  77
                     if ((mySecondsBehind == Double.MAX_VALUE)
 710  
                             || (mySecondsBehind == Integer.MAX_VALUE)) {
 711  11
                         mySecondsBehind = 0.0;
 712  
                     }
 713  
                 }
 714  
                 else {
 715  1
                     myState = State.UNAVAILABLE;
 716  
                 }
 717  
             }
 718  
         }
 719  
 
 720  431
         myEventSupport.firePropertyChange(STATE_PROP, oldValue, myState);
 721  431
     }
 722  
 
 723  
     /**
 724  
      * Extract any tags from the reply.
 725  
      * 
 726  
      * @param isMasterReply
 727  
      *            The reply to the {@code ismaster} command.
 728  
      */
 729  
     private void updateTags(final Document isMasterReply) {
 730  431
         final Document oldValue = myTags;
 731  
 
 732  432
         Document tags = isMasterReply.findFirst(DOCUMENT_TYPE, TAGS_PROP);
 733  431
         if (tags != null) {
 734  
             // Strip to a pure Document from a DocumentElement.
 735  35
             tags = BuilderFactory.start(tags.asDocument()).build();
 736  35
             if (tags.getElements().isEmpty()) {
 737  2
                 myTags = null;
 738  
             }
 739  33
             else if (!tags.equals(myTags)) {
 740  21
                 myTags = tags;
 741  
             }
 742  
         }
 743  
 
 744  431
         myEventSupport.firePropertyChange(TAGS_PROP, oldValue, myTags);
 745  431
     }
 746  
 
 747  
     /**
 748  
      * Extract any {@code versionArray} from the reply.
 749  
      * 
 750  
      * @param buildInfoReply
 751  
      *            The reply to the {@code buildinfo} command.
 752  
      */
 753  
     private void updateVersion(final Document buildInfoReply) {
 754  431
         final Version oldValue = myVersion;
 755  
 
 756  431
         final List<NumericElement> versionElements = buildInfoReply.find(
 757  
                 NUMERIC_TYPE, "versionArray", ".*");
 758  431
         if (!versionElements.isEmpty()) {
 759  102
             myVersion = Version.parse(versionElements);
 760  102
             myLastVersionUpdate = System.currentTimeMillis();
 761  
         }
 762  
         else {
 763  
             // Use the String version if present.
 764  329
             final StringElement stringVersion = buildInfoReply.findFirst(
 765  
                     STRING_TYPE, "version");
 766  329
             if (stringVersion != null) {
 767  1
                 myVersion = Version.parse(stringVersion.getValue());
 768  1
                 myLastVersionUpdate = System.currentTimeMillis();
 769  
             }
 770  
             else {
 771  
                 // Use the wire version if present.
 772  328
                 final NumericElement wireVersion = buildInfoReply.findFirst(
 773  
                         NUMERIC_TYPE, "maxWireVersion");
 774  328
                 if (wireVersion != null) {
 775  3
                     final Version version = Version.forWireVersion(wireVersion
 776  
                             .getIntValue());
 777  
 
 778  
                     // Don't want to update the version if we are getting the
 779  
                     // value
 780  
                     // some other way since the wire protocol version requires
 781  
                     // interpretation and really just provides a "floor"
 782  
                     // version.
 783  
                     // Check for an unknown or lower version.
 784  3
                     if (oldValue.equals(Version.UNKNOWN)
 785  
                             || (oldValue.compareTo(version) < 0)) {
 786  2
                         myVersion = version;
 787  
                         // Don't update the myLastVersionUpdate time so we still
 788  
                         // try and get the precise version.
 789  
                     }
 790  
                 }
 791  
             }
 792  
         }
 793  
 
 794  431
         myEventSupport.firePropertyChange(VERSION_PROP, oldValue, myVersion);
 795  431
     }
 796  
 
 797  
     /**
 798  
      * State provides the possible sttes for a server within the MongoDB
 799  
      * cluster.
 800  
      * 
 801  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 802  
      *         be mutated in incompatible ways between any two releases of the
 803  
      *         driver.
 804  
      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 805  
      */
 806  6
     public enum State {
 807  
         /**
 808  
          * We can send reads to the server. It is running, we can connect to it
 809  
          * and is a secondary in the replica set.
 810  
          */
 811  1
         READ_ONLY,
 812  
 
 813  
         /** We cannot connect to the server. */
 814  1
         UNAVAILABLE,
 815  
 
 816  
         /**
 817  
          * A transient state for the server. We have either never connected to
 818  
          * the server or have lost all of the connections to the server.
 819  
          */
 820  1
         UNKNOWN,
 821  
 
 822  
         /**
 823  
          * We can send writes to the server. It is running, we can connect to it
 824  
          * and is either a stand-alone instance, the primary in the replica set
 825  
          * or a mongos.
 826  
          */
 827  1
         WRITABLE;
 828  
     }
 829  
 }