Coverage Report - com.allanbank.mongodb.client.AbstractMongoOperations
 
Classes in this File Line Coverage Branch Coverage Complexity
AbstractMongoOperations
100%
297/297
99%
150/151
3.294
AbstractMongoOperations$1
100%
1/1
N/A
3.294
 
 1  
 /*
 2  
  * #%L
 3  
  * AbstractMongoOperations.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client;
 21  
 
 22  
 import java.util.ArrayList;
 23  
 import java.util.Collection;
 24  
 import java.util.List;
 25  
 
 26  
 import com.allanbank.mongodb.AsyncMongoCollection;
 27  
 import com.allanbank.mongodb.Callback;
 28  
 import com.allanbank.mongodb.Durability;
 29  
 import com.allanbank.mongodb.MongoCursorControl;
 30  
 import com.allanbank.mongodb.MongoDatabase;
 31  
 import com.allanbank.mongodb.MongoDbException;
 32  
 import com.allanbank.mongodb.MongoIterator;
 33  
 import com.allanbank.mongodb.ReadPreference;
 34  
 import com.allanbank.mongodb.StreamCallback;
 35  
 import com.allanbank.mongodb.Version;
 36  
 import com.allanbank.mongodb.bson.Document;
 37  
 import com.allanbank.mongodb.bson.DocumentAssignable;
 38  
 import com.allanbank.mongodb.bson.Element;
 39  
 import com.allanbank.mongodb.bson.builder.ArrayBuilder;
 40  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 41  
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 42  
 import com.allanbank.mongodb.bson.impl.EmptyDocument;
 43  
 import com.allanbank.mongodb.bson.impl.ImmutableDocument;
 44  
 import com.allanbank.mongodb.bson.impl.RootDocument;
 45  
 import com.allanbank.mongodb.builder.Aggregate;
 46  
 import com.allanbank.mongodb.builder.BatchedWrite;
 47  
 import com.allanbank.mongodb.builder.ConditionBuilder;
 48  
 import com.allanbank.mongodb.builder.Count;
 49  
 import com.allanbank.mongodb.builder.Distinct;
 50  
 import com.allanbank.mongodb.builder.Find;
 51  
 import com.allanbank.mongodb.builder.FindAndModify;
 52  
 import com.allanbank.mongodb.builder.GroupBy;
 53  
 import com.allanbank.mongodb.builder.MapReduce;
 54  
 import com.allanbank.mongodb.builder.ParallelScan;
 55  
 import com.allanbank.mongodb.builder.write.WriteOperation;
 56  
 import com.allanbank.mongodb.client.callback.BatchedNativeWriteCallback;
 57  
 import com.allanbank.mongodb.client.callback.BatchedWriteCallback;
 58  
 import com.allanbank.mongodb.client.callback.CursorCallback;
 59  
 import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
 60  
 import com.allanbank.mongodb.client.callback.LongToIntCallback;
 61  
 import com.allanbank.mongodb.client.callback.MultipleCursorCallback;
 62  
 import com.allanbank.mongodb.client.callback.ReplyArrayCallback;
 63  
 import com.allanbank.mongodb.client.callback.ReplyDocumentCallback;
 64  
 import com.allanbank.mongodb.client.callback.ReplyIntegerCallback;
 65  
 import com.allanbank.mongodb.client.callback.ReplyLongCallback;
 66  
 import com.allanbank.mongodb.client.callback.ReplyResultCallback;
 67  
 import com.allanbank.mongodb.client.callback.SingleDocumentCallback;
 68  
 import com.allanbank.mongodb.client.message.AggregateCommand;
 69  
 import com.allanbank.mongodb.client.message.Command;
 70  
 import com.allanbank.mongodb.client.message.Delete;
 71  
 import com.allanbank.mongodb.client.message.GetLastError;
 72  
 import com.allanbank.mongodb.client.message.Insert;
 73  
 import com.allanbank.mongodb.client.message.ParallelScanCommand;
 74  
 import com.allanbank.mongodb.client.message.Query;
 75  
 import com.allanbank.mongodb.client.message.Update;
 76  
 
 77  
 /**
 78  
  * AbstractMongoOperations provides the core functionality for the operations on
 79  
  * a MongoDB collection.
 80  
  * 
 81  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 82  
  *         mutated in incompatible ways between any two releases of the driver.
 83  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 84  
  */
 85  
 public abstract class AbstractMongoOperations {
 86  
 
 87  
     /**
 88  
      * The default for if a delete should only delete the first document it
 89  
      * matches.
 90  
      */
 91  
     public static final boolean DELETE_SINGLE_DELETE_DEFAULT = false;
 92  
 
 93  
     /** The default empty index options. */
 94  1
     public static final Document EMPTY_INDEX_OPTIONS = EmptyDocument.INSTANCE;
 95  
 
 96  
     /** The name of the canonical id field for MongoDB. */
 97  
     public static final String ID_FIELD_NAME = "_id";
 98  
 
 99  
     /** The default for if an insert should continue on an error. */
 100  
     public static final boolean INSERT_CONTINUE_ON_ERROR_DEFAULT = false;
 101  
 
 102  
     /** The default for a UNIQUE index options. */
 103  1
     public static final Document UNIQUE_INDEX_OPTIONS = new ImmutableDocument(
 104  
             BuilderFactory.start().add("unique", true));
 105  
 
 106  
     /** The default for doing a multiple-update on an update. */
 107  
     public static final boolean UPDATE_MULTIUPDATE_DEFAULT = false;
 108  
 
 109  
     /** The default for doing an upsert on an update. */
 110  
     public static final boolean UPDATE_UPSERT_DEFAULT = false;
 111  
 
 112  
     /** The client for interacting with MongoDB. */
 113  
     protected final Client myClient;
 114  
 
 115  
     /** The name of the database we interact with. */
 116  
     protected final MongoDatabase myDatabase;
 117  
 
 118  
     /** The name of the collection we interact with. */
 119  
     protected final String myName;
 120  
 
 121  
     /** The {@link Durability} for writes from this database instance. */
 122  
     private Durability myDurability;
 123  
 
 124  
     /** The {@link ReadPreference} for reads from this database instance. */
 125  
     private ReadPreference myReadPreference;
 126  
 
 127  
     /**
 128  
      * Create a new AbstractAsyncMongoCollection.
 129  
      * 
 130  
      * @param client
 131  
      *            The client for interacting with MongoDB.
 132  
      * @param database
 133  
      *            The database we interact with.
 134  
      * @param name
 135  
      *            The name of the collection we interact with.
 136  
      */
 137  
     public AbstractMongoOperations(final Client client,
 138  
             final MongoDatabase database, final String name) {
 139  275
         super();
 140  
 
 141  275
         myClient = client;
 142  275
         myDatabase = database;
 143  275
         myName = name;
 144  275
         myDurability = null;
 145  275
         myReadPreference = null;
 146  275
     }
 147  
 
 148  
     /**
 149  
      * Constructs a {@code aggregate} command and sends it to the server via the
 150  
      * {@link Client}.
 151  
      * 
 152  
      * @param results
 153  
      *            Callback for the aggregation results returned.
 154  
      * @param command
 155  
      *            The details of the aggregation request.
 156  
      * @throws MongoDbException
 157  
      *             On an error executing the aggregate command.
 158  
      * @see AsyncMongoCollection#aggregateAsync(Callback, Aggregate)
 159  
      */
 160  
     public void aggregateAsync(final Callback<MongoIterator<Document>> results,
 161  
             final Aggregate command) throws MongoDbException {
 162  
 
 163  10
         final AggregateCommand commandMsg = toCommand(command, false);
 164  
 
 165  10
         final CursorCallback callback = new CursorCallback(myClient,
 166  
                 commandMsg, true, results);
 167  
 
 168  10
         myClient.send(commandMsg, callback);
 169  10
     }
 170  
 
 171  
     /**
 172  
      * Constructs a {@code count} command and sends it to the server via the
 173  
      * {@link Client}.
 174  
      * 
 175  
      * @param results
 176  
      *            The callback to notify of the results.
 177  
      * @param count
 178  
      *            The count command.
 179  
      * @throws MongoDbException
 180  
      *             On an error counting the documents.
 181  
      * @see AsyncMongoCollection#countAsync(Callback, Count)
 182  
      */
 183  
     public void countAsync(final Callback<Long> results, final Count count)
 184  
             throws MongoDbException {
 185  24
         Version minVersion = null;
 186  24
         final DocumentBuilder builder = BuilderFactory.start();
 187  
 
 188  24
         builder.addString("count", getName());
 189  24
         builder.addDocument("query", count.getQuery());
 190  24
         if (count.getMaximumTimeMilliseconds() > 0) {
 191  1
             minVersion = Count.MAX_TIMEOUT_VERSION;
 192  1
             builder.add("maxTimeMS", count.getMaximumTimeMilliseconds());
 193  
         }
 194  
 
 195  
         // Should be last since might wrap command in a $query element.
 196  24
         final ReadPreference finalPreference = updateReadPreference(builder,
 197  
                 count.getReadPreference(), true);
 198  
 
 199  24
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 200  
                 builder.build(), count.getQuery(), finalPreference,
 201  
                 VersionRange.minimum(minVersion));
 202  
 
 203  24
         myClient.send(commandMsg, new ReplyLongCallback(results));
 204  24
     }
 205  
 
 206  
     /**
 207  
      * Constructs a {@link Delete} message and sends it to the server via the
 208  
      * {@link Client}.
 209  
      * 
 210  
      * @param results
 211  
      *            Callback that will be notified of the results of the query. If
 212  
      *            the durability of the operation is NONE then this will be -1.
 213  
      * @param query
 214  
      *            Query to locate the documents to be deleted.
 215  
      * @param singleDelete
 216  
      *            If true then only a single document will be deleted. If
 217  
      *            running in a sharded environment then this field must be false
 218  
      *            or the query must contain the shard key.
 219  
      * @param durability
 220  
      *            The durability for the delete.
 221  
      * @throws MongoDbException
 222  
      *             On an error deleting the documents.
 223  
      * @see AsyncMongoCollection#deleteAsync(Callback, DocumentAssignable,
 224  
      *      boolean, Durability)
 225  
      */
 226  
     public void deleteAsync(final Callback<Long> results,
 227  
             final DocumentAssignable query, final boolean singleDelete,
 228  
             final Durability durability) throws MongoDbException {
 229  
 
 230  33
         if ((durability != Durability.NONE) && useWriteCommand()
 231  
                 && isWriteCommandsSupported(null)) {
 232  
 
 233  1
             final BatchedWrite write = BatchedWrite.delete(query, singleDelete,
 234  
                     durability);
 235  
 
 236  1
             writeAsync(results, write);
 237  1
         }
 238  
         else {
 239  32
             final Delete deleteMessage = new Delete(getDatabaseName(), myName,
 240  
                     query.asDocument(), singleDelete);
 241  
 
 242  32
             if (Durability.NONE.equals(durability)) {
 243  6
                 myClient.send(deleteMessage, null);
 244  6
                 results.callback(Long.valueOf(-1));
 245  
             }
 246  
             else {
 247  26
                 myClient.send(deleteMessage, asGetLastError(durability),
 248  
                         new ReplyLongCallback(results));
 249  
             }
 250  
         }
 251  33
     }
 252  
 
 253  
     /**
 254  
      * Constructs a {@code distinct} command and sends it to the server via the
 255  
      * {@link Client}.
 256  
      * 
 257  
      * @param results
 258  
      *            Callback for the distinct results returned.
 259  
      * @param command
 260  
      *            The details of the distinct request.
 261  
      * @throws MongoDbException
 262  
      *             On an error finding the documents.
 263  
      * @see AsyncMongoCollection#distinctAsync(Callback, Distinct)
 264  
      */
 265  
     public void distinctAsync(final Callback<MongoIterator<Element>> results,
 266  
             final Distinct command) throws MongoDbException {
 267  
 
 268  9
         Version minVersion = null;
 269  
 
 270  9
         final DocumentBuilder builder = BuilderFactory.start();
 271  
 
 272  9
         builder.addString("distinct", getName());
 273  9
         builder.addString("key", command.getKey());
 274  9
         if (command.getQuery() != null) {
 275  6
             builder.addDocument("query", command.getQuery());
 276  
         }
 277  9
         if (command.getMaximumTimeMilliseconds() > 0) {
 278  1
             minVersion = Distinct.MAX_TIMEOUT_VERSION;
 279  1
             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
 280  
         }
 281  
 
 282  
         // Should be last since might wrap command in a $query element.
 283  9
         final ReadPreference readPreference = updateReadPreference(builder,
 284  
                 command.getReadPreference(), true);
 285  
 
 286  9
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 287  
                 builder.build(), readPreference,
 288  
                 VersionRange.minimum(minVersion));
 289  
 
 290  9
         myClient.send(commandMsg, new ReplyArrayCallback(results));
 291  
 
 292  9
     }
 293  
 
 294  
     /**
 295  
      * Constructs a {@link AggregateCommand} and sends it to the server via the
 296  
      * {@link Client}.
 297  
      * 
 298  
      * @param aggregation
 299  
      *            The aggregation details.
 300  
      * @param results
 301  
      *            Callback that will be notified of the results of the explain.
 302  
      * @throws MongoDbException
 303  
      *             On an error finding the documents.
 304  
      * @since MongoDB 2.6
 305  
      * @see AsyncMongoCollection#explainAsync(Callback, Aggregate)
 306  
      */
 307  
     public void explainAsync(final Callback<Document> results,
 308  
             final Aggregate aggregation) throws MongoDbException {
 309  5
         final AggregateCommand commandMsg = toCommand(aggregation, true);
 310  
 
 311  5
         myClient.send(commandMsg, new SingleDocumentCallback(results));
 312  5
     }
 313  
 
 314  
     /**
 315  
      * Constructs a {@link Query} message and sends it to the server via the
 316  
      * {@link Client}.
 317  
      * 
 318  
      * @param query
 319  
      *            The query details.
 320  
      * @param results
 321  
      *            Callback that will be notified of the results of the explain.
 322  
      * @throws MongoDbException
 323  
      *             On an error finding the documents.
 324  
      * @see AsyncMongoCollection#explainAsync(Callback, Find)
 325  
      */
 326  
     public void explainAsync(final Callback<Document> results, final Find query)
 327  
             throws MongoDbException {
 328  
 
 329  7
         ReadPreference readPreference = query.getReadPreference();
 330  7
         if (readPreference == null) {
 331  3
             readPreference = getReadPreference();
 332  
         }
 333  
 
 334  
         Document queryDoc;
 335  7
         if (!readPreference.isLegacy()
 336  
                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
 337  1
             queryDoc = query.toQueryRequest(true, readPreference);
 338  
         }
 339  
         else {
 340  6
             queryDoc = query.toQueryRequest(true);
 341  
         }
 342  
 
 343  7
         final Query queryMessage = new Query(getDatabaseName(), myName,
 344  
                 queryDoc, query.getProjection(), query.getBatchSize(),
 345  
                 query.getLimit(), query.getNumberToSkip(),
 346  
                 false /* tailable */, readPreference,
 347  
                 false /* noCursorTimeout */, false /* awaitData */,
 348  
                 false /* exhaust */, query.isPartialOk());
 349  
 
 350  7
         myClient.send(queryMessage, new SingleDocumentCallback(results));
 351  7
     }
 352  
 
 353  
     /**
 354  
      * Constructs a {@code findAndModify} command and sends it to the server via
 355  
      * the {@link Client}.
 356  
      * 
 357  
      * @param results
 358  
      *            Callback for the the found document.
 359  
      * @param command
 360  
      *            The details of the find and modify request.
 361  
      * @throws MongoDbException
 362  
      *             On an error finding the documents.
 363  
      * @see AsyncMongoCollection#findAndModifyAsync(Callback, FindAndModify)
 364  
      */
 365  
     public void findAndModifyAsync(final Callback<Document> results,
 366  
             final FindAndModify command) throws MongoDbException {
 367  10
         Version minVersion = null;
 368  
 
 369  10
         final DocumentBuilder builder = BuilderFactory.start();
 370  
 
 371  10
         builder.addString("findAndModify", getName());
 372  10
         builder.addDocument("query", command.getQuery());
 373  10
         if (command.getUpdate() != null) {
 374  9
             builder.addDocument("update", command.getUpdate());
 375  
         }
 376  10
         if (command.getSort() != null) {
 377  1
             builder.addDocument("sort", command.getSort());
 378  
         }
 379  10
         if (command.getFields() != null) {
 380  1
             builder.addDocument("fields", command.getFields());
 381  
         }
 382  10
         if (command.isRemove()) {
 383  1
             builder.addBoolean("remove", true);
 384  
         }
 385  10
         if (command.isReturnNew()) {
 386  1
             builder.addBoolean("new", true);
 387  
         }
 388  10
         if (command.isUpsert()) {
 389  1
             builder.addBoolean("upsert", true);
 390  
         }
 391  10
         if (command.getMaximumTimeMilliseconds() > 0) {
 392  1
             minVersion = FindAndModify.MAX_TIMEOUT_VERSION;
 393  1
             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
 394  
         }
 395  
 
 396  
         // Must be the primary since this is a write.
 397  10
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 398  
                 builder.build(), command.getQuery(), ReadPreference.PRIMARY,
 399  
                 VersionRange.minimum(minVersion));
 400  10
         myClient.send(commandMsg, new ReplyDocumentCallback(results));
 401  10
     }
 402  
 
 403  
     /**
 404  
      * Constructs a {@link Query} message and sends it to the server via the
 405  
      * {@link Client}.
 406  
      * 
 407  
      * @param query
 408  
      *            The query details.
 409  
      * @param results
 410  
      *            Callback that will be notified of the results of the find.
 411  
      * @throws MongoDbException
 412  
      *             On an error finding the documents.
 413  
      * @see AsyncMongoCollection#findAsync(Callback, Find)
 414  
      */
 415  
     public void findAsync(final Callback<MongoIterator<Document>> results,
 416  
             final Find query) throws MongoDbException {
 417  
 
 418  13
         final Query queryMessage = createQuery(query, query.getLimit(),
 419  
                 query.getBatchSize(), query.isTailable(), query.isAwaitData(),
 420  
                 query.isImmortalCursor());
 421  
 
 422  13
         final CursorCallback callback = new CursorCallback(myClient,
 423  
                 queryMessage, false, results);
 424  
 
 425  13
         myClient.send(queryMessage, callback);
 426  13
     }
 427  
 
 428  
     /**
 429  
      * Constructs a {@link Query} message and sends it to the server via the
 430  
      * {@link Client}.
 431  
      * 
 432  
      * @param query
 433  
      *            The query details.
 434  
      * @param results
 435  
      *            Callback that will be notified of the results of the find.
 436  
      * @throws MongoDbException
 437  
      *             On an error finding the documents.
 438  
      * @see AsyncMongoCollection#findOneAsync(Callback, Find)
 439  
      */
 440  
     public void findOneAsync(final Callback<Document> results, final Find query)
 441  
             throws MongoDbException {
 442  24
         final Query queryMessage = createQuery(query, 1, 1, false, false, false);
 443  
 
 444  24
         myClient.send(queryMessage, new SingleDocumentCallback(results));
 445  24
     }
 446  
 
 447  
     /**
 448  
      * Returns the name of the database.
 449  
      * 
 450  
      * @return The name of the database.
 451  
      */
 452  
     public String getDatabaseName() {
 453  361
         return myDatabase.getName();
 454  
     }
 455  
 
 456  
     /**
 457  
      * Returns the durability to use when no durability is specified for the
 458  
      * write operation.
 459  
      * 
 460  
      * @return The durability to use when no durability is specified for the
 461  
      *         write operation.
 462  
      */
 463  
     public Durability getDurability() {
 464  63
         Durability result = myDurability;
 465  63
         if (result == null) {
 466  62
             result = myDatabase.getDurability();
 467  
         }
 468  63
         return result;
 469  
     }
 470  
 
 471  
     /**
 472  
      * Returns the name of the collection.
 473  
      * 
 474  
      * @return The name of the collection.
 475  
      */
 476  
     public String getName() {
 477  228
         return myName;
 478  
     }
 479  
 
 480  
     /**
 481  
      * Returns the read preference to use when no read preference is specified
 482  
      * for the read operation.
 483  
      * 
 484  
      * @return The read preference to use when no read preference is specified
 485  
      *         for the read operation.
 486  
      */
 487  
     public ReadPreference getReadPreference() {
 488  92
         ReadPreference result = myReadPreference;
 489  92
         if (result == null) {
 490  91
             result = myDatabase.getReadPreference();
 491  
         }
 492  92
         return result;
 493  
     }
 494  
 
 495  
     /**
 496  
      * Constructs a {@code group} command and sends it to the server via the
 497  
      * {@link Client}.
 498  
      * 
 499  
      * @param results
 500  
      *            Callback for the group results returned.
 501  
      * @param command
 502  
      *            The details of the group request.
 503  
      * @throws MongoDbException
 504  
      *             On an error finding the documents.
 505  
      * @see AsyncMongoCollection#groupByAsync(Callback, GroupBy)
 506  
      */
 507  
     public void groupByAsync(final Callback<MongoIterator<Element>> results,
 508  
             final GroupBy command) throws MongoDbException {
 509  8
         Version minVersion = null;
 510  
 
 511  8
         final DocumentBuilder builder = BuilderFactory.start();
 512  
 
 513  8
         final DocumentBuilder groupDocBuilder = builder.push("group");
 514  
 
 515  8
         groupDocBuilder.addString("ns", getName());
 516  8
         if (!command.getKeys().isEmpty()) {
 517  7
             final DocumentBuilder keysBuilder = groupDocBuilder.push("key");
 518  7
             for (final String key : command.getKeys()) {
 519  7
                 keysBuilder.addBoolean(key, true);
 520  7
             }
 521  
         }
 522  8
         if (command.getKeyFunction() != null) {
 523  1
             groupDocBuilder.addJavaScript("$keyf", command.getKeyFunction());
 524  
         }
 525  8
         if (command.getInitialValue() != null) {
 526  1
             groupDocBuilder.addDocument("initial", command.getInitialValue());
 527  
         }
 528  8
         if (command.getReduceFunction() != null) {
 529  1
             groupDocBuilder.addJavaScript("$reduce",
 530  
                     command.getReduceFunction());
 531  
         }
 532  8
         if (command.getFinalizeFunction() != null) {
 533  1
             groupDocBuilder.addJavaScript("finalize",
 534  
                     command.getFinalizeFunction());
 535  
         }
 536  8
         if (command.getQuery() != null) {
 537  1
             groupDocBuilder.addDocument("cond", command.getQuery());
 538  
         }
 539  8
         if (command.getMaximumTimeMilliseconds() > 0) {
 540  1
             minVersion = GroupBy.MAX_TIMEOUT_VERSION;
 541  
             // maxTimeMS is not in the "group" sub-doc.
 542  
             // See SERVER-12595 commands.
 543  1
             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
 544  
         }
 545  
 
 546  
         // Should be last since might wrap command in a $query element.
 547  8
         final ReadPreference readPreference = updateReadPreference(
 548  
                 groupDocBuilder, command.getReadPreference(), false);
 549  
 
 550  8
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 551  
                 builder.build(), readPreference,
 552  
                 VersionRange.minimum(minVersion));
 553  8
         myClient.send(commandMsg, new ReplyArrayCallback("retval", results));
 554  8
     }
 555  
 
 556  
     /**
 557  
      * Constructs a {@link Insert} message and sends it to the server via the
 558  
      * {@link Client}.
 559  
      * 
 560  
      * @param results
 561  
      *            {@link Callback} that will be notified with the results of the
 562  
      *            insert. Currently, the value is always zero. Once <a
 563  
      *            href="http://jira.mongodb.org/browse/SERVER-4381"
 564  
      *            >SERVER-4381</a> is fixed then expected to be the number of
 565  
      *            documents inserted. If the durability is NONE then returns
 566  
      *            <code>-1</code>.
 567  
      * @param continueOnError
 568  
      *            If the insert should continue if one of the documents causes
 569  
      *            an error.
 570  
      * @param durability
 571  
      *            The durability for the insert.
 572  
      * @param documents
 573  
      *            The documents to add to the collection.
 574  
      * @throws MongoDbException
 575  
      *             On an error inserting the documents.
 576  
      * @see AsyncMongoCollection#insertAsync(Callback, boolean, Durability,
 577  
      *      DocumentAssignable...)
 578  
      */
 579  
     public void insertAsync(final Callback<Integer> results,
 580  
             final boolean continueOnError, final Durability durability,
 581  
             final DocumentAssignable... documents) throws MongoDbException {
 582  
 
 583  34
         doInsertAsync(results, continueOnError, durability, null, documents);
 584  34
     }
 585  
 
 586  
     /**
 587  
      * Constructs a {@code mapreduce} command and sends it to the server via the
 588  
      * {@link Client}.
 589  
      * 
 590  
      * @param results
 591  
      *            Callback for the map/reduce results returned. Note this might
 592  
      *            be empty if the output type is not inline.
 593  
      * @param command
 594  
      *            The details of the map/reduce request.
 595  
      * @throws MongoDbException
 596  
      *             On an error finding the documents.
 597  
      * @see AsyncMongoCollection#mapReduceAsync(Callback, MapReduce)
 598  
      */
 599  
     public void mapReduceAsync(final Callback<MongoIterator<Document>> results,
 600  
             final MapReduce command) throws MongoDbException {
 601  14
         Version minVersion = null;
 602  
 
 603  14
         final DocumentBuilder builder = BuilderFactory.start();
 604  
 
 605  14
         builder.addString("mapreduce", getName());
 606  14
         builder.addJavaScript("map", command.getMapFunction());
 607  14
         builder.addJavaScript("reduce", command.getReduceFunction());
 608  14
         if (command.getFinalizeFunction() != null) {
 609  1
             builder.addJavaScript("finalize", command.getFinalizeFunction());
 610  
         }
 611  14
         if (command.getQuery() != null) {
 612  1
             builder.addDocument("query", command.getQuery());
 613  
         }
 614  14
         if (command.getSort() != null) {
 615  1
             builder.addDocument("sort", command.getSort());
 616  
         }
 617  14
         if (command.getScope() != null) {
 618  1
             builder.addDocument("scope", command.getScope());
 619  
         }
 620  14
         if (command.getLimit() != 0) {
 621  1
             builder.addInteger("limit", command.getLimit());
 622  
         }
 623  14
         if (command.isKeepTemp()) {
 624  1
             builder.addBoolean("keeptemp", true);
 625  
         }
 626  14
         if (command.isJsMode()) {
 627  1
             builder.addBoolean("jsMode", true);
 628  
         }
 629  14
         if (command.isVerbose()) {
 630  1
             builder.addBoolean("verbose", true);
 631  
         }
 632  14
         if (command.getMaximumTimeMilliseconds() > 0) {
 633  1
             minVersion = MapReduce.MAX_TIMEOUT_VERSION;
 634  1
             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
 635  
         }
 636  
 
 637  14
         final DocumentBuilder outputBuilder = builder.push("out");
 638  14
         switch (command.getOutputType()) {
 639  
         case INLINE: {
 640  6
             outputBuilder.addInteger("inline", 1);
 641  6
             break;
 642  
         }
 643  
         case REPLACE: {
 644  4
             outputBuilder.addString("replace", command.getOutputName());
 645  4
             if (command.getOutputDatabase() != null) {
 646  3
                 outputBuilder.addString("db", command.getOutputDatabase());
 647  
             }
 648  
             break;
 649  
         }
 650  
         case MERGE: {
 651  2
             outputBuilder.addString("merge", command.getOutputName());
 652  2
             if (command.getOutputDatabase() != null) {
 653  1
                 outputBuilder.addString("db", command.getOutputDatabase());
 654  
             }
 655  
             break;
 656  
         }
 657  
         case REDUCE: {
 658  2
             outputBuilder.addString("reduce", command.getOutputName());
 659  2
             if (command.getOutputDatabase() != null) {
 660  1
                 outputBuilder.addString("db", command.getOutputDatabase());
 661  
             }
 662  
             break;
 663  
         }
 664  
         }
 665  
 
 666  
         // Should be last since might wrap command in a $query element.
 667  14
         final ReadPreference readPreference = updateReadPreference(builder,
 668  
                 command.getReadPreference(), true);
 669  
 
 670  14
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 671  
                 builder.build(), readPreference,
 672  
                 VersionRange.minimum(minVersion));
 673  14
         myClient.send(commandMsg, new ReplyResultCallback(results));
 674  14
     }
 675  
 
 676  
     /**
 677  
      * Constructs a {@code parallelCollectionScan} command and sends it to the
 678  
      * server via the {@link Client}.
 679  
      * 
 680  
      * @param results
 681  
      *            Callback for the collection of iterators.
 682  
      * @param parallelScan
 683  
      *            The details on the scan.
 684  
      * @throws MongoDbException
 685  
      *             On an error initializing the parallel scan.
 686  
      * @see AsyncMongoCollection#parallelScanAsync(Callback, ParallelScan)
 687  
      * @see <a
 688  
      *      href="http://docs.mongodb.org/manual/reference/command/parallelCollectionScan/">parallelCollectionScan
 689  
      *      Command</a>
 690  
      */
 691  
     public void parallelScanAsync(
 692  
             final Callback<Collection<MongoIterator<Document>>> results,
 693  
             final ParallelScan parallelScan) throws MongoDbException {
 694  3
         final DocumentBuilder builder = BuilderFactory.start();
 695  
 
 696  3
         builder.add("parallelCollectionScan", getName());
 697  3
         builder.add("numCursors", parallelScan.getRequestedIteratorCount());
 698  
 
 699  
         // Should be last since might wrap command in a $query element.
 700  3
         final ReadPreference readPreference = updateReadPreference(builder,
 701  
                 parallelScan.getReadPreference(), true);
 702  
 
 703  3
         final ParallelScanCommand commandMsg = new ParallelScanCommand(
 704  
                 parallelScan, getDatabaseName(), getName(), builder.build(),
 705  
                 readPreference);
 706  
 
 707  3
         myClient.send(commandMsg, new MultipleCursorCallback(myClient,
 708  
                 commandMsg, results));
 709  
 
 710  3
     }
 711  
 
 712  
     /**
 713  
      * Constructs a {@link Insert} of {@link Update} message based on if the
 714  
      * document contains a {@link #ID_FIELD_NAME} and sends it to the server via
 715  
      * the {@link Client}.
 716  
      * 
 717  
      * @param results
 718  
      *            {@link Callback} that will be notified with the results of the
 719  
      *            insert. If the durability of the operation is NONE then this
 720  
      *            will be -1.
 721  
      * @param document
 722  
      *            The document to save to the collection.
 723  
      * @param durability
 724  
      *            The durability for the save.
 725  
      * @throws MongoDbException
 726  
      *             On an error saving the documents.
 727  
      * @see AsyncMongoCollection#saveAsync(Callback, DocumentAssignable,
 728  
      *      Durability)
 729  
      */
 730  
     public void saveAsync(final Callback<Integer> results,
 731  
             final DocumentAssignable document, final Durability durability)
 732  
             throws MongoDbException {
 733  8
         final Document doc = document.asDocument();
 734  
 
 735  8
         if (doc.contains(ID_FIELD_NAME)) {
 736  4
             updateAsync(new LongToIntCallback(results), BuilderFactory.start()
 737  
                     .add(doc.get(ID_FIELD_NAME)), doc, false, true, durability);
 738  
         }
 739  
         else {
 740  4
             insertAsync(results, INSERT_CONTINUE_ON_ERROR_DEFAULT, durability,
 741  
                     doc);
 742  
         }
 743  8
     }
 744  
 
 745  
     /**
 746  
      * Sets the durability to use when no durability is specified for the write
 747  
      * operation.
 748  
      * 
 749  
      * @param durability
 750  
      *            The durability to use when no durability is specified for the
 751  
      *            write operation.
 752  
      */
 753  
     public void setDurability(final Durability durability) {
 754  2
         myDurability = durability;
 755  2
     }
 756  
 
 757  
     /**
 758  
      * Sets the read preference to use when no read preference is specified for
 759  
      * the read operation.
 760  
      * 
 761  
      * @param readPreference
 762  
      *            The read preference to use when no read preference is
 763  
      *            specified for the read operation.
 764  
      */
 765  
     public void setReadPreference(final ReadPreference readPreference) {
 766  2
         myReadPreference = readPreference;
 767  2
     }
 768  
 
 769  
     /**
 770  
      * Constructs a {@code aggregate} command and sends it to the server via the
 771  
      * {@link Client}.
 772  
      * 
 773  
      * @param results
 774  
      *            Callback that will be notified of the results of the query.
 775  
      * @param aggregation
 776  
      *            The aggregation details.
 777  
      * @return A {@link MongoCursorControl} to control the cursor streaming
 778  
      *         documents to the caller. This includes the ability to stop the
 779  
      *         cursor and persist its state.
 780  
      * @throws MongoDbException
 781  
      *             On an error finding the documents.
 782  
      * @see AsyncMongoCollection#stream(StreamCallback, Aggregate)
 783  
      */
 784  
     public MongoCursorControl stream(final StreamCallback<Document> results,
 785  
             final Aggregate aggregation) throws MongoDbException {
 786  4
         final AggregateCommand commandMsg = toCommand(aggregation, false);
 787  
 
 788  4
         final CursorStreamingCallback callback = new CursorStreamingCallback(
 789  
                 myClient, commandMsg, true, results);
 790  
 
 791  4
         myClient.send(commandMsg, callback);
 792  
 
 793  4
         return callback;
 794  
     }
 795  
 
 796  
     /**
 797  
      * Constructs a {@link Query} message and sends it to the server via the
 798  
      * {@link Client}.
 799  
      * 
 800  
      * @param results
 801  
      *            Callback that will be notified of the results of the query.
 802  
      * @param query
 803  
      *            The query details.
 804  
      * @return A {@link MongoCursorControl} to control the cursor streaming
 805  
      *         documents to the caller. This includes the ability to stop the
 806  
      *         cursor and persist its state.
 807  
      * @throws MongoDbException
 808  
      *             On an error finding the documents.
 809  
      * @see AsyncMongoCollection#stream(StreamCallback, Find)
 810  
      */
 811  
     public MongoCursorControl stream(final StreamCallback<Document> results,
 812  
             final Find query) throws MongoDbException {
 813  13
         final Query queryMessage = createQuery(query, query.getLimit(),
 814  
                 query.getBatchSize(), query.isTailable(), query.isAwaitData(),
 815  
                 query.isImmortalCursor());
 816  
 
 817  13
         final CursorStreamingCallback callback = new CursorStreamingCallback(
 818  
                 myClient, queryMessage, false, results);
 819  
 
 820  13
         myClient.send(queryMessage, callback);
 821  
 
 822  13
         return callback;
 823  
     }
 824  
 
 825  
     /**
 826  
      * Constructs a {@code text} command and sends it to the server via the
 827  
      * {@link Client}.
 828  
      * 
 829  
      * @param results
 830  
      *            Callback for the {@code text} results returned.
 831  
      * @param command
 832  
      *            The details of the {@code text} request.
 833  
      * @throws MongoDbException
 834  
      *             On an error executing the {@code text} command.
 835  
      * @see <a
 836  
      *      href="http://docs.mongodb.org/manual/release-notes/2.4/#text-queries">
 837  
      *      MongoDB Text Queries</a>
 838  
      * @since MongoDB 2.4
 839  
      * @see AsyncMongoCollection#textSearchAsync(Callback,
 840  
      *      com.allanbank.mongodb.builder.Text)
 841  
      * @deprecated Support for the {@code text} command was deprecated in the
 842  
      *             2.6 version of MongoDB. Use the
 843  
      *             {@link ConditionBuilder#text(String) $text} query operator
 844  
      *             instead. This method will not be removed until two releases
 845  
      *             after the MongoDB 2.6 release (e.g. 2.10 if the releases are
 846  
      *             2.8 and 2.10).
 847  
      */
 848  
     @Deprecated
 849  
     public void textSearchAsync(
 850  
             final Callback<MongoIterator<com.allanbank.mongodb.builder.TextResult>> results,
 851  
             final com.allanbank.mongodb.builder.Text command)
 852  
             throws MongoDbException {
 853  2
         final Version minVersion = com.allanbank.mongodb.builder.Text.REQUIRED_VERSION;
 854  2
         final DocumentBuilder builder = BuilderFactory.start();
 855  
 
 856  2
         builder.addString("text", getName());
 857  2
         builder.addString("search", command.getSearchTerm());
 858  2
         if (command.getQuery() != null) {
 859  1
             builder.add("filter", command.getQuery());
 860  
         }
 861  2
         if (command.getLimit() > 0) {
 862  1
             builder.add("limit", command.getLimit());
 863  
         }
 864  2
         if (command.getReturnFields() != null) {
 865  1
             builder.add("project", command.getReturnFields());
 866  
         }
 867  2
         if (command.getLanguage() != null) {
 868  1
             builder.add("language", command.getLanguage());
 869  
         }
 870  
 
 871  
         // Should be last since might wrap command in a $query element.
 872  2
         final ReadPreference readPreference = updateReadPreference(builder,
 873  
                 command.getReadPreference(), true);
 874  
 
 875  2
         final Command commandMsg = new Command(getDatabaseName(), getName(),
 876  
                 builder.build(), readPreference,
 877  
                 VersionRange.minimum(minVersion));
 878  2
         myClient.send(commandMsg,
 879  
                 new ReplyResultCallback(
 880  
                         new com.allanbank.mongodb.client.callback.TextCallback(
 881  
                                 results)));
 882  2
     }
 883  
 
 884  
     /**
 885  
      * Constructs a {@link Update} message and sends it to the server via the
 886  
      * {@link Client}.
 887  
      * 
 888  
      * @param results
 889  
      *            The {@link Callback} that will be notified of the number of
 890  
      *            documents updated. If the durability of the operation is NONE
 891  
      *            then this will be -1.
 892  
      * @param query
 893  
      *            The query to select the documents to update.
 894  
      * @param update
 895  
      *            The updates to apply to the selected documents.
 896  
      * @param multiUpdate
 897  
      *            If true then the update is applied to all of the matching
 898  
      *            documents, otherwise only the first document found is updated.
 899  
      * @param upsert
 900  
      *            If true then if no document is found then a new document is
 901  
      *            created and updated, otherwise no operation is performed.
 902  
      * @param durability
 903  
      *            The durability for the update.
 904  
      * @throws MongoDbException
 905  
      *             On an error updating the documents.
 906  
      * @see AsyncMongoCollection#updateAsync(Callback, DocumentAssignable,
 907  
      *      DocumentAssignable, boolean, boolean, Durability)
 908  
      */
 909  
     public void updateAsync(final Callback<Long> results,
 910  
             final DocumentAssignable query, final DocumentAssignable update,
 911  
             final boolean multiUpdate, final boolean upsert,
 912  
             final Durability durability) throws MongoDbException {
 913  
 
 914  32
         final ClusterStats stats = myClient.getClusterStats();
 915  32
         if ((durability != Durability.NONE) && useWriteCommand()
 916  
                 && isWriteCommandsSupported(stats)) {
 917  1
             final BatchedWrite write = BatchedWrite.update(query, update,
 918  
                     multiUpdate, upsert, durability);
 919  
 
 920  1
             doWriteAsync(stats, results, write);
 921  1
         }
 922  
         else {
 923  31
             final Update updateMessage = new Update(getDatabaseName(), myName,
 924  
                     query.asDocument(), update.asDocument(), multiUpdate,
 925  
                     upsert);
 926  
 
 927  31
             if (Durability.NONE == durability) {
 928  1
                 myClient.send(updateMessage, null);
 929  1
                 results.callback(Long.valueOf(-1));
 930  
             }
 931  
             else {
 932  30
                 myClient.send(updateMessage, asGetLastError(durability),
 933  
                         new ReplyLongCallback(results));
 934  
             }
 935  
         }
 936  32
     }
 937  
 
 938  
     /**
 939  
      * Constructs the appropriate set of write commands to send to the server.
 940  
      * 
 941  
      * @param results
 942  
      *            The {@link Callback} that will be notified of the number of
 943  
      *            documents inserted, updated, and deleted. If the durability of
 944  
      *            the operation is NONE then this will be -1.
 945  
      * @param write
 946  
      *            The batched writes
 947  
      * @throws MongoDbException
 948  
      *             On an error submitting the write operations.
 949  
      * 
 950  
      * @since MongoDB 2.6
 951  
      * @see AsyncMongoCollection#writeAsync(Callback,BatchedWrite)
 952  
      */
 953  
     public void writeAsync(final Callback<Long> results,
 954  
             final BatchedWrite write) throws MongoDbException {
 955  7
         final ClusterStats stats = myClient.getClusterStats();
 956  
 
 957  7
         doWriteAsync(stats, results, write);
 958  7
     }
 959  
 
 960  
     /**
 961  
      * Converts the {@link Durability} into a {@link GetLastError} command.
 962  
      * 
 963  
      * @param durability
 964  
      *            The {@link Durability} to convert.
 965  
      * @return The {@link GetLastError} command.
 966  
      */
 967  
     protected GetLastError asGetLastError(final Durability durability) {
 968  92
         return new GetLastError(getDatabaseName(), durability);
 969  
     }
 970  
 
 971  
     /**
 972  
      * Creates a properly configured {@link Query} message.
 973  
      * 
 974  
      * @param query
 975  
      *            The {@link Find} to construct the {@link Query} from.
 976  
      * @param limit
 977  
      *            The limit for the query.
 978  
      * @param batchSize
 979  
      *            The batch size for the query.
 980  
      * @param tailable
 981  
      *            If the query should create a tailable cursor.
 982  
      * @param awaitData
 983  
      *            If the query should await data.
 984  
      * @param immortal
 985  
      *            If the query should create a cursor that does not timeout,
 986  
      *            e.g., immortal.
 987  
      * @return The {@link Query} message.
 988  
      */
 989  
     protected Query createQuery(final Find query, final int limit,
 990  
             final int batchSize, final boolean tailable,
 991  
             final boolean awaitData, final boolean immortal) {
 992  50
         ReadPreference readPreference = query.getReadPreference();
 993  50
         if (readPreference == null) {
 994  35
             readPreference = getReadPreference();
 995  
         }
 996  
 
 997  
         Document queryDoc;
 998  50
         if (!readPreference.isLegacy()
 999  
                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
 1000  5
             queryDoc = query.toQueryRequest(false, readPreference);
 1001  
         }
 1002  
         else {
 1003  45
             queryDoc = query.toQueryRequest(false);
 1004  
         }
 1005  
 
 1006  50
         return new Query(getDatabaseName(), myName, queryDoc,
 1007  
                 query.getProjection(), batchSize, limit,
 1008  
                 query.getNumberToSkip(), tailable, readPreference, immortal,
 1009  
                 awaitData, false /* exhaust */, query.isPartialOk());
 1010  
     }
 1011  
 
 1012  
     /**
 1013  
      * Sends an {@link Insert} message to the server. This version is private to
 1014  
      * this class since most inserts do not need the server version.
 1015  
      * 
 1016  
      * @param results
 1017  
      *            {@link Callback} that will be notified with the results of the
 1018  
      *            insert.
 1019  
      * @param continueOnError
 1020  
      *            If the insert should continue if one of the documents causes
 1021  
      *            an error.
 1022  
      * @param durability
 1023  
      *            The durability for the insert.
 1024  
      * @param requiredServerVersion
 1025  
      *            The required version of the server to support processing the
 1026  
      *            message.
 1027  
      * @param documents
 1028  
      *            The documents to add to the collection.
 1029  
      * @throws MongoDbException
 1030  
      *             On an error inserting the documents.
 1031  
      */
 1032  
     protected void doInsertAsync(final Callback<Integer> results,
 1033  
             final boolean continueOnError, final Durability durability,
 1034  
             final Version requiredServerVersion,
 1035  
             final DocumentAssignable... documents) throws MongoDbException {
 1036  
 
 1037  38
         final ClusterStats stats = myClient.getClusterStats();
 1038  38
         if ((durability != Durability.NONE) && useWriteCommand()
 1039  
                 && isWriteCommandsSupported(stats)) {
 1040  1
             final BatchedWrite write = BatchedWrite.insert(continueOnError,
 1041  
                     durability, documents);
 1042  
 
 1043  1
             doWriteAsync(stats, new LongToIntCallback(results), write);
 1044  1
         }
 1045  
         else {
 1046  
             // Make sure the documents have an _id.
 1047  37
             final List<Document> docs = new ArrayList<Document>(
 1048  
                     documents.length);
 1049  74
             for (final DocumentAssignable docAssignable : documents) {
 1050  37
                 final Document doc = docAssignable.asDocument();
 1051  37
                 if (!doc.contains(ID_FIELD_NAME)
 1052  
                         && (doc instanceof RootDocument)) {
 1053  25
                     ((RootDocument) doc).injectId();
 1054  
                 }
 1055  37
                 docs.add(doc);
 1056  
             }
 1057  
 
 1058  37
             final Insert insertMessage = new Insert(getDatabaseName(), myName,
 1059  
                     docs, continueOnError,
 1060  
                     VersionRange.minimum(requiredServerVersion));
 1061  37
             if (Durability.NONE == durability) {
 1062  1
                 myClient.send(insertMessage, null);
 1063  1
                 results.callback(Integer.valueOf(-1));
 1064  
             }
 1065  
             else {
 1066  36
                 myClient.send(insertMessage, asGetLastError(durability),
 1067  
                         new ReplyIntegerCallback(results));
 1068  
             }
 1069  
         }
 1070  38
     }
 1071  
 
 1072  
     /**
 1073  
      * Performs a async write operation.
 1074  
      * 
 1075  
      * @param stats
 1076  
      *            The stats for verifying the server support write commands.
 1077  
      * @param results
 1078  
      *            The callback for the write.
 1079  
      * @param write
 1080  
      *            The write to send.
 1081  
      */
 1082  
     protected void doWriteAsync(final ClusterStats stats,
 1083  
             final Callback<Long> results, final BatchedWrite write) {
 1084  9
         if (isWriteCommandsSupported(stats)) {
 1085  
 
 1086  6
             final List<BatchedWrite.Bundle> bundles = write.toBundles(
 1087  
                     getName(), stats.getSmallestMaxBsonObjectSize(),
 1088  
                     stats.getSmallestMaxBatchedWriteOperations());
 1089  6
             if (bundles.isEmpty()) {
 1090  3
                 results.callback(Long.valueOf(0));
 1091  3
                 return;
 1092  
             }
 1093  
 
 1094  3
             final BatchedWriteCallback callback = new BatchedWriteCallback(
 1095  
                     getDatabaseName(), getName(), results, write, myClient,
 1096  
                     bundles);
 1097  
 
 1098  
             // Push the messages out.
 1099  3
             callback.send();
 1100  3
         }
 1101  
         else {
 1102  3
             final List<WriteOperation> operations = write.getWrites();
 1103  3
             if (operations.isEmpty()) {
 1104  1
                 results.callback(Long.valueOf(0));
 1105  1
                 return;
 1106  
             }
 1107  
 
 1108  2
             final BatchedNativeWriteCallback callback = new BatchedNativeWriteCallback(
 1109  
                     results, write, this, operations);
 1110  
 
 1111  
             // Push the messages out.
 1112  2
             callback.send();
 1113  
         }
 1114  5
     }
 1115  
 
 1116  
     /**
 1117  
      * Determines if all of the servers in the cluster support the write
 1118  
      * commands.
 1119  
      * 
 1120  
      * @param stats
 1121  
      *            The cluster stats if they have already been retrieved.
 1122  
      * @return True if all servers in the cluster are at least the
 1123  
      *         {@link BatchedWrite#REQUIRED_VERSION}.
 1124  
      */
 1125  
     protected boolean isWriteCommandsSupported(final ClusterStats stats) {
 1126  70
         final ClusterStats clusterStats = (stats == null) ? myClient
 1127  
                 .getClusterStats() : stats;
 1128  70
         final VersionRange serverVersionRange = clusterStats
 1129  
                 .getServerVersionRange();
 1130  70
         final Version minServerVersion = serverVersionRange.getLowerBounds();
 1131  
 
 1132  70
         return (BatchedWrite.REQUIRED_VERSION.compareTo(minServerVersion) <= 0);
 1133  
     }
 1134  
 
 1135  
     /**
 1136  
      * Converts the {@link Aggregate} object to an {@link AggregateCommand}.
 1137  
      * 
 1138  
      * @param command
 1139  
      *            The {@link Aggregate} to convert.
 1140  
      * @param explain
 1141  
      *            If rue then have the server explain the aggregation instead of
 1142  
      *            performing the aggregation.
 1143  
      * @return The command to send to the server for the {@link Aggregate}.
 1144  
      */
 1145  
     protected AggregateCommand toCommand(final Aggregate command,
 1146  
             final boolean explain) {
 1147  19
         Version minVersion = command.getRequiredVersion();
 1148  
 
 1149  19
         final DocumentBuilder builder = BuilderFactory.start();
 1150  
 
 1151  19
         builder.addString("aggregate", getName());
 1152  
 
 1153  
         // Pipeline of operations.
 1154  19
         final ArrayBuilder pipeline = builder.pushArray("pipeline");
 1155  19
         for (final Element e : command.getPipeline()) {
 1156  19
             pipeline.add(e);
 1157  19
         }
 1158  
 
 1159  
         // Options
 1160  19
         if (command.isAllowDiskUsage()) {
 1161  1
             builder.add("allowDiskUsage", true);
 1162  
         }
 1163  19
         if (command.isUseCursor()) {
 1164  2
             final DocumentBuilder cursor = builder.push("cursor");
 1165  2
             if (command.getBatchSize() > 0) {
 1166  1
                 cursor.add("batchSize", command.getBatchSize());
 1167  
             }
 1168  
         }
 1169  19
         if (explain) {
 1170  5
             minVersion = Version.later(minVersion, Aggregate.EXPLAIN_VERSION);
 1171  5
             builder.add("explain", true);
 1172  
         }
 1173  19
         if (command.getMaximumTimeMilliseconds() > 0) {
 1174  1
             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
 1175  
         }
 1176  
 
 1177  
         // Should be last since might wrap command in a $query element.
 1178  19
         final ReadPreference readPreference = updateReadPreference(builder,
 1179  
                 command.getReadPreference(), true);
 1180  
 
 1181  19
         final AggregateCommand commandMsg = new AggregateCommand(command,
 1182  
                 getDatabaseName(), getName(), builder.build(), readPreference,
 1183  
                 VersionRange.minimum(minVersion));
 1184  19
         return commandMsg;
 1185  
     }
 1186  
 
 1187  
     /**
 1188  
      * Determines the {@link ReadPreference} to be used based on the command's
 1189  
      * {@code ReadPreference} or the collection's if the command's
 1190  
      * {@code ReadPreference} is <code>null</code>. Updates the command's
 1191  
      * {@link DocumentBuilder} with the {@code ReadPreference} details if
 1192  
      * connected to a sharded cluster and the resulting {@code ReadPreference}
 1193  
      * is not supported by the legacy settings.
 1194  
      * 
 1195  
      * @param builder
 1196  
      *            The builder for the command document to augment with the read
 1197  
      *            preferences if connected to a sharded cluster.
 1198  
      * @param commandReadPreference
 1199  
      *            The read preferences from the command.
 1200  
      * @param createQueryElement
 1201  
      *            If true then the existing builder's contents will be pushed
 1202  
      *            into a $query sub-document. This is required to ensure the
 1203  
      *            command is not rejected by the {@code mongod} after processing
 1204  
      *            by the {@code mongos}.
 1205  
      * @return The {@link ReadPreference} to use.
 1206  
      */
 1207  
     protected ReadPreference updateReadPreference(
 1208  
             final DocumentBuilder builder,
 1209  
             final ReadPreference commandReadPreference,
 1210  
             final boolean createQueryElement) {
 1211  
 
 1212  79
         ReadPreference readPreference = commandReadPreference;
 1213  79
         if (readPreference == null) {
 1214  43
             readPreference = getReadPreference();
 1215  
         }
 1216  
 
 1217  79
         if (!readPreference.isLegacy()
 1218  
                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
 1219  7
             if (createQueryElement) {
 1220  6
                 final Document query = builder.asDocument();
 1221  6
                 builder.reset();
 1222  6
                 builder.add("$query", query);
 1223  
             }
 1224  7
             builder.add(ReadPreference.FIELD_NAME, readPreference);
 1225  
         }
 1226  
 
 1227  79
         return readPreference;
 1228  
     }
 1229  
 
 1230  
     /**
 1231  
      * Extension point for derived classes to force the
 1232  
      * {@link #insertAsync(Callback, boolean, Durability, DocumentAssignable...)}
 1233  
      * ,
 1234  
      * {@link #updateAsync(Callback, DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
 1235  
      * , and
 1236  
      * {@link #deleteAsync(Callback, DocumentAssignable, boolean, Durability)}
 1237  
      * methods to use the legacy {@link Insert}, {@link Update}, and
 1238  
      * {@link Delete} messages regardless of the server version.
 1239  
      * <p>
 1240  
      * This version of the method always returns true.
 1241  
      * </p>
 1242  
      * 
 1243  
      * @return Return true to allow the use of the write commands added in
 1244  
      *         MongoDB 2.6. Use false to force the use of the {@link Insert},
 1245  
      *         {@link Update}, and {@link Delete}
 1246  
      */
 1247  
     protected boolean useWriteCommand() {
 1248  61
         return true;
 1249  
     }
 1250  
 }