View Javadoc
1   /*
2    * #%L
3    * Cluster.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.state;
21  
22  import java.beans.PropertyChangeEvent;
23  import java.beans.PropertyChangeListener;
24  import java.beans.PropertyChangeSupport;
25  import java.net.InetSocketAddress;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.Collections;
29  import java.util.List;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  import java.util.concurrent.CopyOnWriteArrayList;
33  
34  import com.allanbank.mongodb.MongoClientConfiguration;
35  import com.allanbank.mongodb.ReadPreference;
36  import com.allanbank.mongodb.Version;
37  import com.allanbank.mongodb.client.ClusterStats;
38  import com.allanbank.mongodb.client.ClusterType;
39  import com.allanbank.mongodb.client.Message;
40  import com.allanbank.mongodb.client.VersionRange;
41  import com.allanbank.mongodb.util.ServerNameUtils;
42  
43  /**
44   * {@link Cluster} tracks the state of the cluster of MongoDB servers.
45   * PropertyChangeEvents are fired when a server is added or marked writable/not
46   * writable.
47   * <p>
48   * This class uses brute force synchronization to protect its internal state. It
49   * is assumed that multiple connections will be concurrently updating the
50   * {@link Cluster} at once and that at any given time this class may not contain
51   * the absolute truth about the state of the cluster. Instead connections should
52   * keep querying for the state of the cluster via their connection until the
53   * view the server returned and the {@link Cluster} are consistent. Since this
54   * class will not fire a {@link PropertyChangeEvent} when the state is not truly
55   * modified the simplest mechanism is to keep querying for the cluster state on
56   * the connection until no addition change events are seen.
57   * </p>
58   * 
59   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
60   *         mutated in incompatible ways between any two releases of the driver.
61   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
62   */
63  public class Cluster implements ClusterStats {
64  
65      /** The property sued for adding a new server. */
66      public static final String SERVER_PROP = "server";
67  
68      /** The property name for if there is a writable server. */
69      public static final String WRITABLE_PROP = "writable";
70  
71      /** The configuration for connecting to the servers. */
72      protected final MongoClientConfiguration myConfig;
73  
74      /** The complete list of servers. */
75      protected final ConcurrentMap<String, Server> myServers;
76  
77      /** The range of versions within the cluster. */
78      protected VersionRange myServerVersionRange;
79  
80      /** The smallest maximum number of operations in a batch in the cluster. */
81      protected int mySmallestMaxBatchedWriteOperations;
82  
83      /** The smallest maximum document size in the cluster. */
84      protected long mySmallestMaxBsonObjectSize;
85  
86      /** Support for firing property change events. */
87      /* package */final PropertyChangeSupport myChangeSupport;
88  
89      /** The listener for changes to the server. */
90      /* package */final ServerListener myListener;
91  
92      /** The complete list of non-writable servers. */
93      /* package */final CopyOnWriteArrayList<Server> myNonWritableServers;
94  
95      /** The complete list of writable servers. */
96      /* package */final CopyOnWriteArrayList<Server> myWritableServers;
97  
98      /** The type of the cluster. */
99      private final ClusterType myType;
100 
101     /**
102      * Creates a new CLusterState.
103      * 
104      * @param config
105      *            The configuration for the cluster.
106      * @param type
107      *            The type of the cluster.
108      */
109     public Cluster(final MongoClientConfiguration config, final ClusterType type) {
110         myConfig = config;
111         myType = type;
112         myChangeSupport = new PropertyChangeSupport(this);
113         myServers = new ConcurrentHashMap<String, Server>();
114         myWritableServers = new CopyOnWriteArrayList<Server>();
115         myNonWritableServers = new CopyOnWriteArrayList<Server>();
116         myListener = new ServerListener();
117         myServerVersionRange = VersionRange.range(Version.parse("0"),
118                 Version.parse("0"));
119     }
120 
121     /**
122      * Adds a {@link Server} to the {@link Cluster} for the address provided if
123      * one does not already exist.
124      * 
125      * @param address
126      *            The address of the {@link Server} to return.
127      * @return The {@link Server} for the address.
128      */
129     public Server add(final InetSocketAddress address) {
130         final String normalized = ServerNameUtils.normalize(address);
131         Server server = myServers.get(normalized);
132         if (server == null) {
133 
134             server = new Server(address);
135 
136             synchronized (this) {
137                 final Server existing = myServers.putIfAbsent(normalized,
138                         server);
139                 if (existing != null) {
140                     server = existing;
141                 }
142                 else {
143                     myNonWritableServers.add(server);
144                     myChangeSupport.firePropertyChange(SERVER_PROP, null,
145                             server);
146 
147                     server.addListener(myListener);
148                 }
149             }
150         }
151         return server;
152     }
153 
154     /**
155      * Adds a {@link Server} to the {@link Cluster} for the address provided if
156      * one does not already exist.
157      * <p>
158      * This method is equivalent to calling {@link #add(InetSocketAddress)
159      * add(ServerNameUtils.parse(address))}.
160      * </p>
161      * 
162      * @param address
163      *            The address of the {@link Server} to return.
164      * @return The {@link Server} for the address.
165      */
166     public Server add(final String address) {
167         Server server = myServers.get(address);
168         if (server == null) {
169             server = add(ServerNameUtils.parse(address));
170         }
171 
172         return server;
173     }
174 
175     /**
176      * Adds a listener to the state.
177      * 
178      * @param listener
179      *            The listener for the state changes.
180      */
181     public void addListener(final PropertyChangeListener listener) {
182         synchronized (this) {
183             myChangeSupport.addPropertyChangeListener(listener);
184         }
185     }
186 
187     /**
188      * Removes all of the servers from the cluster.
189      */
190     public void clear() {
191         for (final Server server : myServers.values()) {
192             remove(server);
193         }
194     }
195 
196     /**
197      * Returns the set of servers that can be used based on the provided
198      * {@link ReadPreference}.
199      * 
200      * @param readPreference
201      *            The {@link ReadPreference} to filter the servers.
202      * @return The {@link List} of servers that can be used. Servers will be
203      *         ordered by preference to be used, most preferred to least
204      *         preferred.
205      */
206     public List<Server> findCandidateServers(final ReadPreference readPreference) {
207         List<Server> results = Collections.emptyList();
208 
209         switch (readPreference.getMode()) {
210         case NEAREST:
211             results = findNearestCandidates(readPreference);
212             break;
213         case PRIMARY_ONLY:
214             results = findWritableCandidates(readPreference);
215             break;
216         case PRIMARY_PREFERRED:
217             results = merge(findWritableCandidates(readPreference),
218                     findNonWritableCandidates(readPreference));
219             break;
220         case SECONDARY_ONLY:
221             results = findNonWritableCandidates(readPreference);
222             break;
223         case SECONDARY_PREFERRED:
224             results = merge(findNonWritableCandidates(readPreference),
225                     findWritableCandidates(readPreference));
226             break;
227         case SERVER:
228             results = findCandidateServer(readPreference);
229             break;
230         }
231 
232         return results;
233     }
234 
235     /**
236      * Locates the set of servers that can be used to send the specified
237      * messages.
238      * 
239      * @param message1
240      *            The first message to send.
241      * @param message2
242      *            The second message to send. May be <code>null</code>.
243      * @return The servers that can be used.
244      */
245     public List<Server> findServers(final Message message1,
246             final Message message2) {
247         List<Server> servers = Collections.emptyList();
248 
249         if (message1 != null) {
250             List<Server> potentialServers = findCandidateServers(message1
251                     .getReadPreference());
252             servers = potentialServers;
253 
254             if (message2 != null) {
255                 servers = new ArrayList<Server>(potentialServers);
256                 potentialServers = findCandidateServers(message2
257                         .getReadPreference());
258                 servers.retainAll(potentialServers);
259             }
260         }
261         return servers;
262     }
263 
264     /**
265      * Returns the server state for the address provided. If the {@link Server}
266      * does not already exist a non-writable state is created and returned.
267      * <p>
268      * This method is equivalent to calling {@link #add(String) add(address)}.
269      * </p>
270      * 
271      * @param address
272      *            The address of the {@link Server} to return.
273      * @return The {@link Server} for the address.
274      */
275     public Server get(final String address) {
276         return add(address);
277     }
278 
279     /**
280      * Returns a copy of the list of non-writable servers. The list returned is
281      * a copy of the internal list and can be modified by the caller.
282      * 
283      * @return The complete list of non-writable servers.
284      */
285     public List<Server> getNonWritableServers() {
286         return new ArrayList<Server>(myNonWritableServers);
287     }
288 
289     /**
290      * Returns a copy of the list of servers. The list returned is a copy of the
291      * internal list and can be modified by the caller.
292      * 
293      * @return The complete list of servers.
294      */
295     public List<Server> getServers() {
296         return new ArrayList<Server>(myServers.values());
297     }
298 
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     public VersionRange getServerVersionRange() {
304         return myServerVersionRange;
305     }
306 
307     /**
308      * Returns smallest value for the maximum number of write operations allowed
309      * in a single write command.
310      * 
311      * @return The smallest value for maximum number of write operations allowed
312      *         in a single write command.
313      */
314     @Override
315     public int getSmallestMaxBatchedWriteOperations() {
316         return mySmallestMaxBatchedWriteOperations;
317     }
318 
319     /**
320      * Returns the smallest value for the maximum BSON object size within the
321      * cluster.
322      * 
323      * @return The smallest value for the maximum BSON object size within the
324      *         cluster.
325      */
326     @Override
327     public long getSmallestMaxBsonObjectSize() {
328         return mySmallestMaxBsonObjectSize;
329     }
330 
331     /**
332      * Returns the type of cluster.
333      * 
334      * @return The type of cluster.
335      */
336     public ClusterType getType() {
337         return myType;
338     }
339 
340     /**
341      * Returns a copy of the list of writable servers. The list returned is a
342      * copy of the internal list and can be modified by the caller.
343      * 
344      * @return The complete list of writable servers.
345      */
346     public List<Server> getWritableServers() {
347         return new ArrayList<Server>(myWritableServers);
348     }
349 
350     /**
351      * Removes the specified server from the cluster.
352      * 
353      * @param server
354      *            The server to remove from the cluster.
355      */
356     public void remove(final Server server) {
357 
358         final Server removed = myServers.remove(server.getCanonicalName());
359         if (removed != null) {
360             removed.removeListener(myListener);
361             myNonWritableServers.remove(removed);
362             myWritableServers.remove(removed);
363 
364             updateVersions();
365         }
366     }
367 
368     /**
369      * Removes a listener to the state.
370      * 
371      * @param listener
372      *            The listener for the state changes.
373      */
374     public void removeListener(final PropertyChangeListener listener) {
375         synchronized (this) {
376             myChangeSupport.removePropertyChangeListener(listener);
377         }
378     }
379 
380     /**
381      * Computes a relative CDF (cumulative distribution function) for the
382      * servers based on the latency from the client.
383      * <p>
384      * The latency of each server is used to create a strict ordering of servers
385      * from lowest latency to highest. The relative latency of the i'th server
386      * is then calculated based on the function:
387      * </p>
388      * <blockquote>
389      * 
390      * <pre>
391      *                                       latency[0]
392      *                relative_latency[i] =  ----------
393      *                                       latency[i]
394      * </pre>
395      * 
396      * </blockquote>
397      * <p>
398      * The relative latencies are then then summed and the probability of
399      * selecting each server is then calculated by:
400      * </p>
401      * <blockquote>
402      * 
403      * <pre>
404      *                                  relative_latency[i]
405      *     probability[i] = -------------------------------------------------
406      *                      sum(relative_latency[0], ... relative_latency[n])
407      * </pre>
408      * 
409      * </blockquote>
410      * 
411      * <p>
412      * The CDF over these probabilities is returned.
413      * </p>
414      * 
415      * @param servers
416      *            The servers to compute the CDF for.
417      * @return The CDF for the server latencies.
418      */
419     protected final double[] cdf(final List<Server> servers) {
420         Collections.sort(servers, ServerLatencyComparator.COMPARATOR);
421 
422         // Pick a server to move to the front.
423         final double[] relativeLatency = new double[servers.size()];
424         double sum = 0;
425         double first = Double.NEGATIVE_INFINITY;
426         for (int i = 0; i < relativeLatency.length; ++i) {
427             final Server server = servers.get(i);
428             double latency = server.getAverageLatency();
429 
430             // Turn the latency into a ratio of the lowest latency.
431             if (first == Double.NEGATIVE_INFINITY) {
432                 first = latency;
433                 latency = 1.0D; // By definition N/N = 1.0.
434             }
435             else {
436                 latency /= first;
437             }
438 
439             latency = (1.0D / latency); // 4 times as long is 1/4 as likely.
440             relativeLatency[i] = latency;
441             sum += latency;
442         }
443 
444         // Turn the latencies into a range of 0 <= relativeLatency < 1.
445         // Also known as the CDF (cumulative distribution function)
446         double accum = 0.0D;
447         for (int i = 0; i < relativeLatency.length; ++i) {
448             accum += relativeLatency[i];
449 
450             relativeLatency[i] = accum / sum;
451         }
452 
453         return relativeLatency;
454     }
455 
456     /**
457      * Finds the candidate server, if known.
458      * 
459      * @param readPreference
460      *            The read preference to match the server against.
461      * @return The Server found in a singleton list or an empty list if the
462      *         server is not known.
463      */
464     protected List<Server> findCandidateServer(
465             final ReadPreference readPreference) {
466         final Server server = myServers.get(readPreference.getServer());
467         if ((server != null) && readPreference.matches(server.getTags())) {
468             return Collections.singletonList(server);
469         }
470         return Collections.emptyList();
471     }
472 
473     /**
474      * Returns the list of servers that match the read preference's tags.
475      * 
476      * @param readPreference
477      *            The read preference to match the server against.
478      * @return The servers found in order of preference. Generally this is in
479      *         latency order but we randomly move one of the servers to the
480      *         front of the list to distribute the load across more servers.
481      * 
482      * @see #sort
483      */
484     protected List<Server> findNearestCandidates(
485             final ReadPreference readPreference) {
486         final List<Server> results = new ArrayList<Server>(myServers.size());
487         for (final Server server : myServers.values()) {
488             if (readPreference.matches(server.getTags())) {
489                 results.add(server);
490             }
491         }
492 
493         // Sort the server by preference.
494         sort(results);
495 
496         return results;
497     }
498 
499     /**
500      * Returns the list of non-writable servers that match the read preference's
501      * tags.
502      * 
503      * @param readPreference
504      *            The read preference to match the server against.
505      * @return The servers found in order of preference. Generally this is in
506      *         latency order but we randomly move one of the servers to the
507      *         front of the list to distribute the load across more servers.
508      * 
509      * @see #sort
510      */
511     protected List<Server> findNonWritableCandidates(
512             final ReadPreference readPreference) {
513         final List<Server> results = new ArrayList<Server>(
514                 myNonWritableServers.size());
515         for (final Server server : myNonWritableServers) {
516             if (readPreference.matches(server.getTags())
517                     && isRecentEnough(server.getSecondsBehind())) {
518                 results.add(server);
519             }
520         }
521 
522         // Sort the server by preference.
523         sort(results);
524 
525         return results;
526     }
527 
528     /**
529      * Returns the list of writable servers that match the read preference's
530      * tags.
531      * 
532      * @param readPreference
533      *            The read preference to match the server against.
534      * @return The servers found in order of preference. Generally this is in
535      *         latency order but we randomly move one of the servers to the
536      *         front of the list to distribute the load across more servers.
537      * 
538      * @see #sort
539      */
540     protected List<Server> findWritableCandidates(
541             final ReadPreference readPreference) {
542         final List<Server> results = new ArrayList<Server>(
543                 myWritableServers.size());
544         for (final Server server : myWritableServers) {
545             if (readPreference.matches(server.getTags())) {
546                 results.add(server);
547             }
548         }
549 
550         // Sort the server by preference.
551         sort(results);
552 
553         return results;
554     }
555 
556     /**
557      * Sorts the servers based on the latency from the client.
558      * <p>
559      * To distribute the requests across servers more evenly the first server is
560      * replaced with a random server based on a single sided simplified Gaussian
561      * distribution.
562      * </p>
563      * 
564      * @param servers
565      *            The servers to be sorted.
566      * 
567      * @see #cdf(List)
568      */
569     protected final void sort(final List<Server> servers) {
570         if (servers.isEmpty() || (servers.size() == 1)) {
571             return;
572         }
573 
574         // Pick a server to move to the front.
575         final double[] cdf = cdf(servers);
576         final double random = Math.random();
577         int index = Arrays.binarySearch(cdf, random);
578 
579         // Probably a negative index since not expecting an exact match.
580         if (index < 0) {
581             // Undo (-(insertion point) - 1)
582             index = Math.abs(index + 1);
583         }
584 
585         // Should not be needed. random should be < 1.0 and
586         // relativeLatency[relativeLatency.length] == 1.0
587         //
588         // assert (random < 1.0D) :
589         // "The random value should be strictly less than 1.0.";
590         // assert (cdf[cdf.length - 1] <= 1.0001) :
591         // "The cdf of the last server should be 1.0.";
592         // assert (0.9999 <= cdf[cdf.length - 1]) :
593         // "The cdf of the last server should be 1.0.";
594         index = Math.min(cdf.length - 1, index);
595 
596         // Swap the lucky winner into the first position.
597         Collections.swap(servers, 0, index);
598     }
599 
600     /**
601      * Updates the min/max versions across all servers. Since the max BSON
602      * object size is tied to the version we also update that value.
603      */
604     protected void updateVersions() {
605         Version min = null;
606         Version max = null;
607 
608         long smallestMaxBsonObjectSize = Long.MAX_VALUE;
609         int smallestMaxBatchedWriteOperations = Integer.MAX_VALUE;
610 
611         for (final Server server : myServers.values()) {
612             min = Version.earlier(min, server.getVersion());
613             max = Version.later(max, server.getVersion());
614 
615             smallestMaxBsonObjectSize = Math.min(smallestMaxBsonObjectSize,
616                     server.getMaxBsonObjectSize());
617             smallestMaxBatchedWriteOperations = Math.min(
618                     smallestMaxBatchedWriteOperations,
619                     server.getMaxBatchedWriteOperations());
620         }
621 
622         myServerVersionRange = VersionRange.range(min, max);
623         mySmallestMaxBsonObjectSize = smallestMaxBsonObjectSize;
624         mySmallestMaxBatchedWriteOperations = smallestMaxBatchedWriteOperations;
625     }
626 
627     /**
628      * Returns true if the server is recent enough to be queried.
629      * 
630      * @param secondsBehind
631      *            The number of seconds the server is behind.
632      * @return True if the server is recent enough to be queried, false
633      *         otherwise.
634      */
635     private boolean isRecentEnough(final double secondsBehind) {
636         return ((secondsBehind * 1000) < myConfig.getMaxSecondaryLag());
637     }
638 
639     /**
640      * Merges the two lists into a single list.
641      * 
642      * @param list1
643      *            The first list of servers.
644      * @param list2
645      *            The second list of servers.
646      * @return The 2 lists of servers merged into a single list.
647      */
648     private final List<Server> merge(final List<Server> list1,
649             final List<Server> list2) {
650         List<Server> results;
651         if (list1.isEmpty()) {
652             results = list2;
653         }
654         else if (list2.isEmpty()) {
655             results = list1;
656         }
657         else {
658             results = new ArrayList<Server>(list1.size() + list2.size());
659             results.addAll(list1);
660             results.addAll(list2);
661         }
662         return results;
663     }
664 
665     /**
666      * ServerListener provides a listener for the state updates of the
667      * {@link Server}.
668      * 
669      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
670      *         be mutated in incompatible ways between any two releases of the
671      *         driver.
672      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
673      */
674     protected final class ServerListener implements PropertyChangeListener {
675         @Override
676         public void propertyChange(final PropertyChangeEvent evt) {
677             final String propertyName = evt.getPropertyName();
678             final Server server = (Server) evt.getSource();
679 
680             if (Server.STATE_PROP.equals(propertyName)) {
681 
682                 final boolean old = !myWritableServers.isEmpty();
683 
684                 if (Server.State.WRITABLE == evt.getNewValue()) {
685                     myWritableServers.addIfAbsent(server);
686                     myNonWritableServers.remove(server);
687                 }
688                 else if (Server.State.READ_ONLY == evt.getNewValue()) {
689                     myWritableServers.remove(server);
690                     myNonWritableServers.addIfAbsent(server);
691                 }
692                 else {
693                     myWritableServers.remove(server);
694                     myNonWritableServers.remove(server);
695                 }
696 
697                 myChangeSupport.firePropertyChange(WRITABLE_PROP, old,
698                         !myWritableServers.isEmpty());
699 
700             }
701             else if (Server.CANONICAL_NAME_PROP.equals(propertyName)) {
702                 // Resolved a new canonical name. e.g., What the server
703                 // calls itself in the cluster.
704 
705                 // Remove the entry with the old name.
706                 myServers.remove(evt.getOldValue(), server);
707 
708                 // And add with the new name. Checking for duplicate entries.
709                 final Server existing = myServers.putIfAbsent(
710                         server.getCanonicalName(), server);
711                 if (existing != null) {
712                     // Already have a Server with that name. Remove the listener
713                     // and let this server get garbage collected.
714                     myNonWritableServers.remove(server);
715                     myWritableServers.remove(server);
716                     server.removeListener(myListener);
717 
718                     myChangeSupport.firePropertyChange(SERVER_PROP, server,
719                             null);
720                 }
721             }
722             else if (Server.VERSION_PROP.equals(propertyName)) {
723                 // If the old version is either the high or low for the cluster
724                 // (or the version is UNKNOWN) then recompute the high/low
725                 // versions.
726                 final Version old = (Version) evt.getOldValue();
727 
728                 if (Version.UNKNOWN.equals(old)
729                         || (myServerVersionRange.getUpperBounds()
730                                 .compareTo(old) <= 0)
731                         || (myServerVersionRange.getLowerBounds()
732                                 .compareTo(old) >= 0)) {
733                     updateVersions();
734                 }
735             }
736         }
737     }
738 }