View Javadoc
1   /*
2    * #%L
3    * AbstractMongoOperations.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import com.allanbank.mongodb.AsyncMongoCollection;
27  import com.allanbank.mongodb.Callback;
28  import com.allanbank.mongodb.Durability;
29  import com.allanbank.mongodb.MongoCursorControl;
30  import com.allanbank.mongodb.MongoDatabase;
31  import com.allanbank.mongodb.MongoDbException;
32  import com.allanbank.mongodb.MongoIterator;
33  import com.allanbank.mongodb.ReadPreference;
34  import com.allanbank.mongodb.StreamCallback;
35  import com.allanbank.mongodb.Version;
36  import com.allanbank.mongodb.bson.Document;
37  import com.allanbank.mongodb.bson.DocumentAssignable;
38  import com.allanbank.mongodb.bson.Element;
39  import com.allanbank.mongodb.bson.builder.ArrayBuilder;
40  import com.allanbank.mongodb.bson.builder.BuilderFactory;
41  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
42  import com.allanbank.mongodb.bson.impl.EmptyDocument;
43  import com.allanbank.mongodb.bson.impl.ImmutableDocument;
44  import com.allanbank.mongodb.bson.impl.RootDocument;
45  import com.allanbank.mongodb.builder.Aggregate;
46  import com.allanbank.mongodb.builder.BatchedWrite;
47  import com.allanbank.mongodb.builder.ConditionBuilder;
48  import com.allanbank.mongodb.builder.Count;
49  import com.allanbank.mongodb.builder.Distinct;
50  import com.allanbank.mongodb.builder.Find;
51  import com.allanbank.mongodb.builder.FindAndModify;
52  import com.allanbank.mongodb.builder.GroupBy;
53  import com.allanbank.mongodb.builder.MapReduce;
54  import com.allanbank.mongodb.builder.ParallelScan;
55  import com.allanbank.mongodb.builder.write.WriteOperation;
56  import com.allanbank.mongodb.client.callback.BatchedNativeWriteCallback;
57  import com.allanbank.mongodb.client.callback.BatchedWriteCallback;
58  import com.allanbank.mongodb.client.callback.CursorCallback;
59  import com.allanbank.mongodb.client.callback.CursorStreamingCallback;
60  import com.allanbank.mongodb.client.callback.LongToIntCallback;
61  import com.allanbank.mongodb.client.callback.MultipleCursorCallback;
62  import com.allanbank.mongodb.client.callback.ReplyArrayCallback;
63  import com.allanbank.mongodb.client.callback.ReplyDocumentCallback;
64  import com.allanbank.mongodb.client.callback.ReplyIntegerCallback;
65  import com.allanbank.mongodb.client.callback.ReplyLongCallback;
66  import com.allanbank.mongodb.client.callback.ReplyResultCallback;
67  import com.allanbank.mongodb.client.callback.SingleDocumentCallback;
68  import com.allanbank.mongodb.client.message.AggregateCommand;
69  import com.allanbank.mongodb.client.message.Command;
70  import com.allanbank.mongodb.client.message.Delete;
71  import com.allanbank.mongodb.client.message.GetLastError;
72  import com.allanbank.mongodb.client.message.Insert;
73  import com.allanbank.mongodb.client.message.ParallelScanCommand;
74  import com.allanbank.mongodb.client.message.Query;
75  import com.allanbank.mongodb.client.message.Update;
76  
77  /**
78   * AbstractMongoOperations provides the core functionality for the operations on
79   * a MongoDB collection.
80   * 
81   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
82   *         mutated in incompatible ways between any two releases of the driver.
83   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
84   */
85  public abstract class AbstractMongoOperations {
86  
87      /**
88       * The default for if a delete should only delete the first document it
89       * matches.
90       */
91      public static final boolean DELETE_SINGLE_DELETE_DEFAULT = false;
92  
93      /** The default empty index options. */
94      public static final Document EMPTY_INDEX_OPTIONS = EmptyDocument.INSTANCE;
95  
96      /** The name of the canonical id field for MongoDB. */
97      public static final String ID_FIELD_NAME = "_id";
98  
99      /** The default for if an insert should continue on an error. */
100     public static final boolean INSERT_CONTINUE_ON_ERROR_DEFAULT = false;
101 
102     /** The default for a UNIQUE index options. */
103     public static final Document UNIQUE_INDEX_OPTIONS = new ImmutableDocument(
104             BuilderFactory.start().add("unique", true));
105 
106     /** The default for doing a multiple-update on an update. */
107     public static final boolean UPDATE_MULTIUPDATE_DEFAULT = false;
108 
109     /** The default for doing an upsert on an update. */
110     public static final boolean UPDATE_UPSERT_DEFAULT = false;
111 
112     /** The client for interacting with MongoDB. */
113     protected final Client myClient;
114 
115     /** The name of the database we interact with. */
116     protected final MongoDatabase myDatabase;
117 
118     /** The name of the collection we interact with. */
119     protected final String myName;
120 
121     /** The {@link Durability} for writes from this database instance. */
122     private Durability myDurability;
123 
124     /** The {@link ReadPreference} for reads from this database instance. */
125     private ReadPreference myReadPreference;
126 
127     /**
128      * Create a new AbstractAsyncMongoCollection.
129      * 
130      * @param client
131      *            The client for interacting with MongoDB.
132      * @param database
133      *            The database we interact with.
134      * @param name
135      *            The name of the collection we interact with.
136      */
137     public AbstractMongoOperations(final Client client,
138             final MongoDatabase database, final String name) {
139         super();
140 
141         myClient = client;
142         myDatabase = database;
143         myName = name;
144         myDurability = null;
145         myReadPreference = null;
146     }
147 
148     /**
149      * Constructs a {@code aggregate} command and sends it to the server via the
150      * {@link Client}.
151      * 
152      * @param results
153      *            Callback for the aggregation results returned.
154      * @param command
155      *            The details of the aggregation request.
156      * @throws MongoDbException
157      *             On an error executing the aggregate command.
158      * @see AsyncMongoCollection#aggregateAsync(Callback, Aggregate)
159      */
160     public void aggregateAsync(final Callback<MongoIterator<Document>> results,
161             final Aggregate command) throws MongoDbException {
162 
163         final AggregateCommand commandMsg = toCommand(command, false);
164 
165         final CursorCallback callback = new CursorCallback(myClient,
166                 commandMsg, true, results);
167 
168         myClient.send(commandMsg, callback);
169     }
170 
171     /**
172      * Constructs a {@code count} command and sends it to the server via the
173      * {@link Client}.
174      * 
175      * @param results
176      *            The callback to notify of the results.
177      * @param count
178      *            The count command.
179      * @throws MongoDbException
180      *             On an error counting the documents.
181      * @see AsyncMongoCollection#countAsync(Callback, Count)
182      */
183     public void countAsync(final Callback<Long> results, final Count count)
184             throws MongoDbException {
185         Version minVersion = null;
186         final DocumentBuilder builder = BuilderFactory.start();
187 
188         builder.addString("count", getName());
189         builder.addDocument("query", count.getQuery());
190         if (count.getMaximumTimeMilliseconds() > 0) {
191             minVersion = Count.MAX_TIMEOUT_VERSION;
192             builder.add("maxTimeMS", count.getMaximumTimeMilliseconds());
193         }
194 
195         // Should be last since might wrap command in a $query element.
196         final ReadPreference finalPreference = updateReadPreference(builder,
197                 count.getReadPreference(), true);
198 
199         final Command commandMsg = new Command(getDatabaseName(), getName(),
200                 builder.build(), count.getQuery(), finalPreference,
201                 VersionRange.minimum(minVersion));
202 
203         myClient.send(commandMsg, new ReplyLongCallback(results));
204     }
205 
206     /**
207      * Constructs a {@link Delete} message and sends it to the server via the
208      * {@link Client}.
209      * 
210      * @param results
211      *            Callback that will be notified of the results of the query. If
212      *            the durability of the operation is NONE then this will be -1.
213      * @param query
214      *            Query to locate the documents to be deleted.
215      * @param singleDelete
216      *            If true then only a single document will be deleted. If
217      *            running in a sharded environment then this field must be false
218      *            or the query must contain the shard key.
219      * @param durability
220      *            The durability for the delete.
221      * @throws MongoDbException
222      *             On an error deleting the documents.
223      * @see AsyncMongoCollection#deleteAsync(Callback, DocumentAssignable,
224      *      boolean, Durability)
225      */
226     public void deleteAsync(final Callback<Long> results,
227             final DocumentAssignable query, final boolean singleDelete,
228             final Durability durability) throws MongoDbException {
229 
230         if ((durability != Durability.NONE) && useWriteCommand()
231                 && isWriteCommandsSupported(null)) {
232 
233             final BatchedWrite write = BatchedWrite.delete(query, singleDelete,
234                     durability);
235 
236             writeAsync(results, write);
237         }
238         else {
239             final Delete deleteMessage = new Delete(getDatabaseName(), myName,
240                     query.asDocument(), singleDelete);
241 
242             if (Durability.NONE.equals(durability)) {
243                 myClient.send(deleteMessage, null);
244                 results.callback(Long.valueOf(-1));
245             }
246             else {
247                 myClient.send(deleteMessage, asGetLastError(durability),
248                         new ReplyLongCallback(results));
249             }
250         }
251     }
252 
253     /**
254      * Constructs a {@code distinct} command and sends it to the server via the
255      * {@link Client}.
256      * 
257      * @param results
258      *            Callback for the distinct results returned.
259      * @param command
260      *            The details of the distinct request.
261      * @throws MongoDbException
262      *             On an error finding the documents.
263      * @see AsyncMongoCollection#distinctAsync(Callback, Distinct)
264      */
265     public void distinctAsync(final Callback<MongoIterator<Element>> results,
266             final Distinct command) throws MongoDbException {
267 
268         Version minVersion = null;
269 
270         final DocumentBuilder builder = BuilderFactory.start();
271 
272         builder.addString("distinct", getName());
273         builder.addString("key", command.getKey());
274         if (command.getQuery() != null) {
275             builder.addDocument("query", command.getQuery());
276         }
277         if (command.getMaximumTimeMilliseconds() > 0) {
278             minVersion = Distinct.MAX_TIMEOUT_VERSION;
279             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
280         }
281 
282         // Should be last since might wrap command in a $query element.
283         final ReadPreference readPreference = updateReadPreference(builder,
284                 command.getReadPreference(), true);
285 
286         final Command commandMsg = new Command(getDatabaseName(), getName(),
287                 builder.build(), readPreference,
288                 VersionRange.minimum(minVersion));
289 
290         myClient.send(commandMsg, new ReplyArrayCallback(results));
291 
292     }
293 
294     /**
295      * Constructs a {@link AggregateCommand} and sends it to the server via the
296      * {@link Client}.
297      * 
298      * @param aggregation
299      *            The aggregation details.
300      * @param results
301      *            Callback that will be notified of the results of the explain.
302      * @throws MongoDbException
303      *             On an error finding the documents.
304      * @since MongoDB 2.6
305      * @see AsyncMongoCollection#explainAsync(Callback, Aggregate)
306      */
307     public void explainAsync(final Callback<Document> results,
308             final Aggregate aggregation) throws MongoDbException {
309         final AggregateCommand commandMsg = toCommand(aggregation, true);
310 
311         myClient.send(commandMsg, new SingleDocumentCallback(results));
312     }
313 
314     /**
315      * Constructs a {@link Query} message and sends it to the server via the
316      * {@link Client}.
317      * 
318      * @param query
319      *            The query details.
320      * @param results
321      *            Callback that will be notified of the results of the explain.
322      * @throws MongoDbException
323      *             On an error finding the documents.
324      * @see AsyncMongoCollection#explainAsync(Callback, Find)
325      */
326     public void explainAsync(final Callback<Document> results, final Find query)
327             throws MongoDbException {
328 
329         ReadPreference readPreference = query.getReadPreference();
330         if (readPreference == null) {
331             readPreference = getReadPreference();
332         }
333 
334         Document queryDoc;
335         if (!readPreference.isLegacy()
336                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
337             queryDoc = query.toQueryRequest(true, readPreference);
338         }
339         else {
340             queryDoc = query.toQueryRequest(true);
341         }
342 
343         final Query queryMessage = new Query(getDatabaseName(), myName,
344                 queryDoc, query.getProjection(), query.getBatchSize(),
345                 query.getLimit(), query.getNumberToSkip(),
346                 false /* tailable */, readPreference,
347                 false /* noCursorTimeout */, false /* awaitData */,
348                 false /* exhaust */, query.isPartialOk());
349 
350         myClient.send(queryMessage, new SingleDocumentCallback(results));
351     }
352 
353     /**
354      * Constructs a {@code findAndModify} command and sends it to the server via
355      * the {@link Client}.
356      * 
357      * @param results
358      *            Callback for the the found document.
359      * @param command
360      *            The details of the find and modify request.
361      * @throws MongoDbException
362      *             On an error finding the documents.
363      * @see AsyncMongoCollection#findAndModifyAsync(Callback, FindAndModify)
364      */
365     public void findAndModifyAsync(final Callback<Document> results,
366             final FindAndModify command) throws MongoDbException {
367         Version minVersion = null;
368 
369         final DocumentBuilder builder = BuilderFactory.start();
370 
371         builder.addString("findAndModify", getName());
372         builder.addDocument("query", command.getQuery());
373         if (command.getUpdate() != null) {
374             builder.addDocument("update", command.getUpdate());
375         }
376         if (command.getSort() != null) {
377             builder.addDocument("sort", command.getSort());
378         }
379         if (command.getFields() != null) {
380             builder.addDocument("fields", command.getFields());
381         }
382         if (command.isRemove()) {
383             builder.addBoolean("remove", true);
384         }
385         if (command.isReturnNew()) {
386             builder.addBoolean("new", true);
387         }
388         if (command.isUpsert()) {
389             builder.addBoolean("upsert", true);
390         }
391         if (command.getMaximumTimeMilliseconds() > 0) {
392             minVersion = FindAndModify.MAX_TIMEOUT_VERSION;
393             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
394         }
395 
396         // Must be the primary since this is a write.
397         final Command commandMsg = new Command(getDatabaseName(), getName(),
398                 builder.build(), command.getQuery(), ReadPreference.PRIMARY,
399                 VersionRange.minimum(minVersion));
400         myClient.send(commandMsg, new ReplyDocumentCallback(results));
401     }
402 
403     /**
404      * Constructs a {@link Query} message and sends it to the server via the
405      * {@link Client}.
406      * 
407      * @param query
408      *            The query details.
409      * @param results
410      *            Callback that will be notified of the results of the find.
411      * @throws MongoDbException
412      *             On an error finding the documents.
413      * @see AsyncMongoCollection#findAsync(Callback, Find)
414      */
415     public void findAsync(final Callback<MongoIterator<Document>> results,
416             final Find query) throws MongoDbException {
417 
418         final Query queryMessage = createQuery(query, query.getLimit(),
419                 query.getBatchSize(), query.isTailable(), query.isAwaitData(),
420                 query.isImmortalCursor());
421 
422         final CursorCallback callback = new CursorCallback(myClient,
423                 queryMessage, false, results);
424 
425         myClient.send(queryMessage, callback);
426     }
427 
428     /**
429      * Constructs a {@link Query} message and sends it to the server via the
430      * {@link Client}.
431      * 
432      * @param query
433      *            The query details.
434      * @param results
435      *            Callback that will be notified of the results of the find.
436      * @throws MongoDbException
437      *             On an error finding the documents.
438      * @see AsyncMongoCollection#findOneAsync(Callback, Find)
439      */
440     public void findOneAsync(final Callback<Document> results, final Find query)
441             throws MongoDbException {
442         final Query queryMessage = createQuery(query, 1, 1, false, false, false);
443 
444         myClient.send(queryMessage, new SingleDocumentCallback(results));
445     }
446 
447     /**
448      * Returns the name of the database.
449      * 
450      * @return The name of the database.
451      */
452     public String getDatabaseName() {
453         return myDatabase.getName();
454     }
455 
456     /**
457      * Returns the durability to use when no durability is specified for the
458      * write operation.
459      * 
460      * @return The durability to use when no durability is specified for the
461      *         write operation.
462      */
463     public Durability getDurability() {
464         Durability result = myDurability;
465         if (result == null) {
466             result = myDatabase.getDurability();
467         }
468         return result;
469     }
470 
471     /**
472      * Returns the name of the collection.
473      * 
474      * @return The name of the collection.
475      */
476     public String getName() {
477         return myName;
478     }
479 
480     /**
481      * Returns the read preference to use when no read preference is specified
482      * for the read operation.
483      * 
484      * @return The read preference to use when no read preference is specified
485      *         for the read operation.
486      */
487     public ReadPreference getReadPreference() {
488         ReadPreference result = myReadPreference;
489         if (result == null) {
490             result = myDatabase.getReadPreference();
491         }
492         return result;
493     }
494 
495     /**
496      * Constructs a {@code group} command and sends it to the server via the
497      * {@link Client}.
498      * 
499      * @param results
500      *            Callback for the group results returned.
501      * @param command
502      *            The details of the group request.
503      * @throws MongoDbException
504      *             On an error finding the documents.
505      * @see AsyncMongoCollection#groupByAsync(Callback, GroupBy)
506      */
507     public void groupByAsync(final Callback<MongoIterator<Element>> results,
508             final GroupBy command) throws MongoDbException {
509         Version minVersion = null;
510 
511         final DocumentBuilder builder = BuilderFactory.start();
512 
513         final DocumentBuilder groupDocBuilder = builder.push("group");
514 
515         groupDocBuilder.addString("ns", getName());
516         if (!command.getKeys().isEmpty()) {
517             final DocumentBuilder keysBuilder = groupDocBuilder.push("key");
518             for (final String key : command.getKeys()) {
519                 keysBuilder.addBoolean(key, true);
520             }
521         }
522         if (command.getKeyFunction() != null) {
523             groupDocBuilder.addJavaScript("$keyf", command.getKeyFunction());
524         }
525         if (command.getInitialValue() != null) {
526             groupDocBuilder.addDocument("initial", command.getInitialValue());
527         }
528         if (command.getReduceFunction() != null) {
529             groupDocBuilder.addJavaScript("$reduce",
530                     command.getReduceFunction());
531         }
532         if (command.getFinalizeFunction() != null) {
533             groupDocBuilder.addJavaScript("finalize",
534                     command.getFinalizeFunction());
535         }
536         if (command.getQuery() != null) {
537             groupDocBuilder.addDocument("cond", command.getQuery());
538         }
539         if (command.getMaximumTimeMilliseconds() > 0) {
540             minVersion = GroupBy.MAX_TIMEOUT_VERSION;
541             // maxTimeMS is not in the "group" sub-doc.
542             // See SERVER-12595 commands.
543             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
544         }
545 
546         // Should be last since might wrap command in a $query element.
547         final ReadPreference readPreference = updateReadPreference(
548                 groupDocBuilder, command.getReadPreference(), false);
549 
550         final Command commandMsg = new Command(getDatabaseName(), getName(),
551                 builder.build(), readPreference,
552                 VersionRange.minimum(minVersion));
553         myClient.send(commandMsg, new ReplyArrayCallback("retval", results));
554     }
555 
556     /**
557      * Constructs a {@link Insert} message and sends it to the server via the
558      * {@link Client}.
559      * 
560      * @param results
561      *            {@link Callback} that will be notified with the results of the
562      *            insert. Currently, the value is always zero. Once <a
563      *            href="http://jira.mongodb.org/browse/SERVER-4381"
564      *            >SERVER-4381</a> is fixed then expected to be the number of
565      *            documents inserted. If the durability is NONE then returns
566      *            <code>-1</code>.
567      * @param continueOnError
568      *            If the insert should continue if one of the documents causes
569      *            an error.
570      * @param durability
571      *            The durability for the insert.
572      * @param documents
573      *            The documents to add to the collection.
574      * @throws MongoDbException
575      *             On an error inserting the documents.
576      * @see AsyncMongoCollection#insertAsync(Callback, boolean, Durability,
577      *      DocumentAssignable...)
578      */
579     public void insertAsync(final Callback<Integer> results,
580             final boolean continueOnError, final Durability durability,
581             final DocumentAssignable... documents) throws MongoDbException {
582 
583         doInsertAsync(results, continueOnError, durability, null, documents);
584     }
585 
586     /**
587      * Constructs a {@code mapreduce} command and sends it to the server via the
588      * {@link Client}.
589      * 
590      * @param results
591      *            Callback for the map/reduce results returned. Note this might
592      *            be empty if the output type is not inline.
593      * @param command
594      *            The details of the map/reduce request.
595      * @throws MongoDbException
596      *             On an error finding the documents.
597      * @see AsyncMongoCollection#mapReduceAsync(Callback, MapReduce)
598      */
599     public void mapReduceAsync(final Callback<MongoIterator<Document>> results,
600             final MapReduce command) throws MongoDbException {
601         Version minVersion = null;
602 
603         final DocumentBuilder builder = BuilderFactory.start();
604 
605         builder.addString("mapreduce", getName());
606         builder.addJavaScript("map", command.getMapFunction());
607         builder.addJavaScript("reduce", command.getReduceFunction());
608         if (command.getFinalizeFunction() != null) {
609             builder.addJavaScript("finalize", command.getFinalizeFunction());
610         }
611         if (command.getQuery() != null) {
612             builder.addDocument("query", command.getQuery());
613         }
614         if (command.getSort() != null) {
615             builder.addDocument("sort", command.getSort());
616         }
617         if (command.getScope() != null) {
618             builder.addDocument("scope", command.getScope());
619         }
620         if (command.getLimit() != 0) {
621             builder.addInteger("limit", command.getLimit());
622         }
623         if (command.isKeepTemp()) {
624             builder.addBoolean("keeptemp", true);
625         }
626         if (command.isJsMode()) {
627             builder.addBoolean("jsMode", true);
628         }
629         if (command.isVerbose()) {
630             builder.addBoolean("verbose", true);
631         }
632         if (command.getMaximumTimeMilliseconds() > 0) {
633             minVersion = MapReduce.MAX_TIMEOUT_VERSION;
634             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
635         }
636 
637         final DocumentBuilder outputBuilder = builder.push("out");
638         switch (command.getOutputType()) {
639         case INLINE: {
640             outputBuilder.addInteger("inline", 1);
641             break;
642         }
643         case REPLACE: {
644             outputBuilder.addString("replace", command.getOutputName());
645             if (command.getOutputDatabase() != null) {
646                 outputBuilder.addString("db", command.getOutputDatabase());
647             }
648             break;
649         }
650         case MERGE: {
651             outputBuilder.addString("merge", command.getOutputName());
652             if (command.getOutputDatabase() != null) {
653                 outputBuilder.addString("db", command.getOutputDatabase());
654             }
655             break;
656         }
657         case REDUCE: {
658             outputBuilder.addString("reduce", command.getOutputName());
659             if (command.getOutputDatabase() != null) {
660                 outputBuilder.addString("db", command.getOutputDatabase());
661             }
662             break;
663         }
664         }
665 
666         // Should be last since might wrap command in a $query element.
667         final ReadPreference readPreference = updateReadPreference(builder,
668                 command.getReadPreference(), true);
669 
670         final Command commandMsg = new Command(getDatabaseName(), getName(),
671                 builder.build(), readPreference,
672                 VersionRange.minimum(minVersion));
673         myClient.send(commandMsg, new ReplyResultCallback(results));
674     }
675 
676     /**
677      * Constructs a {@code parallelCollectionScan} command and sends it to the
678      * server via the {@link Client}.
679      * 
680      * @param results
681      *            Callback for the collection of iterators.
682      * @param parallelScan
683      *            The details on the scan.
684      * @throws MongoDbException
685      *             On an error initializing the parallel scan.
686      * @see AsyncMongoCollection#parallelScanAsync(Callback, ParallelScan)
687      * @see <a
688      *      href="http://docs.mongodb.org/manual/reference/command/parallelCollectionScan/">parallelCollectionScan
689      *      Command</a>
690      */
691     public void parallelScanAsync(
692             final Callback<Collection<MongoIterator<Document>>> results,
693             final ParallelScan parallelScan) throws MongoDbException {
694         final DocumentBuilder builder = BuilderFactory.start();
695 
696         builder.add("parallelCollectionScan", getName());
697         builder.add("numCursors", parallelScan.getRequestedIteratorCount());
698 
699         // Should be last since might wrap command in a $query element.
700         final ReadPreference readPreference = updateReadPreference(builder,
701                 parallelScan.getReadPreference(), true);
702 
703         final ParallelScanCommand commandMsg = new ParallelScanCommand(
704                 parallelScan, getDatabaseName(), getName(), builder.build(),
705                 readPreference);
706 
707         myClient.send(commandMsg, new MultipleCursorCallback(myClient,
708                 commandMsg, results));
709 
710     }
711 
712     /**
713      * Constructs a {@link Insert} of {@link Update} message based on if the
714      * document contains a {@link #ID_FIELD_NAME} and sends it to the server via
715      * the {@link Client}.
716      * 
717      * @param results
718      *            {@link Callback} that will be notified with the results of the
719      *            insert. If the durability of the operation is NONE then this
720      *            will be -1.
721      * @param document
722      *            The document to save to the collection.
723      * @param durability
724      *            The durability for the save.
725      * @throws MongoDbException
726      *             On an error saving the documents.
727      * @see AsyncMongoCollection#saveAsync(Callback, DocumentAssignable,
728      *      Durability)
729      */
730     public void saveAsync(final Callback<Integer> results,
731             final DocumentAssignable document, final Durability durability)
732             throws MongoDbException {
733         final Document doc = document.asDocument();
734 
735         if (doc.contains(ID_FIELD_NAME)) {
736             updateAsync(new LongToIntCallback(results), BuilderFactory.start()
737                     .add(doc.get(ID_FIELD_NAME)), doc, false, true, durability);
738         }
739         else {
740             insertAsync(results, INSERT_CONTINUE_ON_ERROR_DEFAULT, durability,
741                     doc);
742         }
743     }
744 
745     /**
746      * Sets the durability to use when no durability is specified for the write
747      * operation.
748      * 
749      * @param durability
750      *            The durability to use when no durability is specified for the
751      *            write operation.
752      */
753     public void setDurability(final Durability durability) {
754         myDurability = durability;
755     }
756 
757     /**
758      * Sets the read preference to use when no read preference is specified for
759      * the read operation.
760      * 
761      * @param readPreference
762      *            The read preference to use when no read preference is
763      *            specified for the read operation.
764      */
765     public void setReadPreference(final ReadPreference readPreference) {
766         myReadPreference = readPreference;
767     }
768 
769     /**
770      * Constructs a {@code aggregate} command and sends it to the server via the
771      * {@link Client}.
772      * 
773      * @param results
774      *            Callback that will be notified of the results of the query.
775      * @param aggregation
776      *            The aggregation details.
777      * @return A {@link MongoCursorControl} to control the cursor streaming
778      *         documents to the caller. This includes the ability to stop the
779      *         cursor and persist its state.
780      * @throws MongoDbException
781      *             On an error finding the documents.
782      * @see AsyncMongoCollection#stream(StreamCallback, Aggregate)
783      */
784     public MongoCursorControl stream(final StreamCallback<Document> results,
785             final Aggregate aggregation) throws MongoDbException {
786         final AggregateCommand commandMsg = toCommand(aggregation, false);
787 
788         final CursorStreamingCallback callback = new CursorStreamingCallback(
789                 myClient, commandMsg, true, results);
790 
791         myClient.send(commandMsg, callback);
792 
793         return callback;
794     }
795 
796     /**
797      * Constructs a {@link Query} message and sends it to the server via the
798      * {@link Client}.
799      * 
800      * @param results
801      *            Callback that will be notified of the results of the query.
802      * @param query
803      *            The query details.
804      * @return A {@link MongoCursorControl} to control the cursor streaming
805      *         documents to the caller. This includes the ability to stop the
806      *         cursor and persist its state.
807      * @throws MongoDbException
808      *             On an error finding the documents.
809      * @see AsyncMongoCollection#stream(StreamCallback, Find)
810      */
811     public MongoCursorControl stream(final StreamCallback<Document> results,
812             final Find query) throws MongoDbException {
813         final Query queryMessage = createQuery(query, query.getLimit(),
814                 query.getBatchSize(), query.isTailable(), query.isAwaitData(),
815                 query.isImmortalCursor());
816 
817         final CursorStreamingCallback callback = new CursorStreamingCallback(
818                 myClient, queryMessage, false, results);
819 
820         myClient.send(queryMessage, callback);
821 
822         return callback;
823     }
824 
825     /**
826      * Constructs a {@code text} command and sends it to the server via the
827      * {@link Client}.
828      * 
829      * @param results
830      *            Callback for the {@code text} results returned.
831      * @param command
832      *            The details of the {@code text} request.
833      * @throws MongoDbException
834      *             On an error executing the {@code text} command.
835      * @see <a
836      *      href="http://docs.mongodb.org/manual/release-notes/2.4/#text-queries">
837      *      MongoDB Text Queries</a>
838      * @since MongoDB 2.4
839      * @see AsyncMongoCollection#textSearchAsync(Callback,
840      *      com.allanbank.mongodb.builder.Text)
841      * @deprecated Support for the {@code text} command was deprecated in the
842      *             2.6 version of MongoDB. Use the
843      *             {@link ConditionBuilder#text(String) $text} query operator
844      *             instead. This method will not be removed until two releases
845      *             after the MongoDB 2.6 release (e.g. 2.10 if the releases are
846      *             2.8 and 2.10).
847      */
848     @Deprecated
849     public void textSearchAsync(
850             final Callback<MongoIterator<com.allanbank.mongodb.builder.TextResult>> results,
851             final com.allanbank.mongodb.builder.Text command)
852             throws MongoDbException {
853         final Version minVersion = com.allanbank.mongodb.builder.Text.REQUIRED_VERSION;
854         final DocumentBuilder builder = BuilderFactory.start();
855 
856         builder.addString("text", getName());
857         builder.addString("search", command.getSearchTerm());
858         if (command.getQuery() != null) {
859             builder.add("filter", command.getQuery());
860         }
861         if (command.getLimit() > 0) {
862             builder.add("limit", command.getLimit());
863         }
864         if (command.getReturnFields() != null) {
865             builder.add("project", command.getReturnFields());
866         }
867         if (command.getLanguage() != null) {
868             builder.add("language", command.getLanguage());
869         }
870 
871         // Should be last since might wrap command in a $query element.
872         final ReadPreference readPreference = updateReadPreference(builder,
873                 command.getReadPreference(), true);
874 
875         final Command commandMsg = new Command(getDatabaseName(), getName(),
876                 builder.build(), readPreference,
877                 VersionRange.minimum(minVersion));
878         myClient.send(commandMsg,
879                 new ReplyResultCallback(
880                         new com.allanbank.mongodb.client.callback.TextCallback(
881                                 results)));
882     }
883 
884     /**
885      * Constructs a {@link Update} message and sends it to the server via the
886      * {@link Client}.
887      * 
888      * @param results
889      *            The {@link Callback} that will be notified of the number of
890      *            documents updated. If the durability of the operation is NONE
891      *            then this will be -1.
892      * @param query
893      *            The query to select the documents to update.
894      * @param update
895      *            The updates to apply to the selected documents.
896      * @param multiUpdate
897      *            If true then the update is applied to all of the matching
898      *            documents, otherwise only the first document found is updated.
899      * @param upsert
900      *            If true then if no document is found then a new document is
901      *            created and updated, otherwise no operation is performed.
902      * @param durability
903      *            The durability for the update.
904      * @throws MongoDbException
905      *             On an error updating the documents.
906      * @see AsyncMongoCollection#updateAsync(Callback, DocumentAssignable,
907      *      DocumentAssignable, boolean, boolean, Durability)
908      */
909     public void updateAsync(final Callback<Long> results,
910             final DocumentAssignable query, final DocumentAssignable update,
911             final boolean multiUpdate, final boolean upsert,
912             final Durability durability) throws MongoDbException {
913 
914         final ClusterStats stats = myClient.getClusterStats();
915         if ((durability != Durability.NONE) && useWriteCommand()
916                 && isWriteCommandsSupported(stats)) {
917             final BatchedWrite write = BatchedWrite.update(query, update,
918                     multiUpdate, upsert, durability);
919 
920             doWriteAsync(stats, results, write);
921         }
922         else {
923             final Update updateMessage = new Update(getDatabaseName(), myName,
924                     query.asDocument(), update.asDocument(), multiUpdate,
925                     upsert);
926 
927             if (Durability.NONE == durability) {
928                 myClient.send(updateMessage, null);
929                 results.callback(Long.valueOf(-1));
930             }
931             else {
932                 myClient.send(updateMessage, asGetLastError(durability),
933                         new ReplyLongCallback(results));
934             }
935         }
936     }
937 
938     /**
939      * Constructs the appropriate set of write commands to send to the server.
940      * 
941      * @param results
942      *            The {@link Callback} that will be notified of the number of
943      *            documents inserted, updated, and deleted. If the durability of
944      *            the operation is NONE then this will be -1.
945      * @param write
946      *            The batched writes
947      * @throws MongoDbException
948      *             On an error submitting the write operations.
949      * 
950      * @since MongoDB 2.6
951      * @see AsyncMongoCollection#writeAsync(Callback,BatchedWrite)
952      */
953     public void writeAsync(final Callback<Long> results,
954             final BatchedWrite write) throws MongoDbException {
955         final ClusterStats stats = myClient.getClusterStats();
956 
957         doWriteAsync(stats, results, write);
958     }
959 
960     /**
961      * Converts the {@link Durability} into a {@link GetLastError} command.
962      * 
963      * @param durability
964      *            The {@link Durability} to convert.
965      * @return The {@link GetLastError} command.
966      */
967     protected GetLastError asGetLastError(final Durability durability) {
968         return new GetLastError(getDatabaseName(), durability);
969     }
970 
971     /**
972      * Creates a properly configured {@link Query} message.
973      * 
974      * @param query
975      *            The {@link Find} to construct the {@link Query} from.
976      * @param limit
977      *            The limit for the query.
978      * @param batchSize
979      *            The batch size for the query.
980      * @param tailable
981      *            If the query should create a tailable cursor.
982      * @param awaitData
983      *            If the query should await data.
984      * @param immortal
985      *            If the query should create a cursor that does not timeout,
986      *            e.g., immortal.
987      * @return The {@link Query} message.
988      */
989     protected Query createQuery(final Find query, final int limit,
990             final int batchSize, final boolean tailable,
991             final boolean awaitData, final boolean immortal) {
992         ReadPreference readPreference = query.getReadPreference();
993         if (readPreference == null) {
994             readPreference = getReadPreference();
995         }
996 
997         Document queryDoc;
998         if (!readPreference.isLegacy()
999                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
1000             queryDoc = query.toQueryRequest(false, readPreference);
1001         }
1002         else {
1003             queryDoc = query.toQueryRequest(false);
1004         }
1005 
1006         return new Query(getDatabaseName(), myName, queryDoc,
1007                 query.getProjection(), batchSize, limit,
1008                 query.getNumberToSkip(), tailable, readPreference, immortal,
1009                 awaitData, false /* exhaust */, query.isPartialOk());
1010     }
1011 
1012     /**
1013      * Sends an {@link Insert} message to the server. This version is private to
1014      * this class since most inserts do not need the server version.
1015      * 
1016      * @param results
1017      *            {@link Callback} that will be notified with the results of the
1018      *            insert.
1019      * @param continueOnError
1020      *            If the insert should continue if one of the documents causes
1021      *            an error.
1022      * @param durability
1023      *            The durability for the insert.
1024      * @param requiredServerVersion
1025      *            The required version of the server to support processing the
1026      *            message.
1027      * @param documents
1028      *            The documents to add to the collection.
1029      * @throws MongoDbException
1030      *             On an error inserting the documents.
1031      */
1032     protected void doInsertAsync(final Callback<Integer> results,
1033             final boolean continueOnError, final Durability durability,
1034             final Version requiredServerVersion,
1035             final DocumentAssignable... documents) throws MongoDbException {
1036 
1037         final ClusterStats stats = myClient.getClusterStats();
1038         if ((durability != Durability.NONE) && useWriteCommand()
1039                 && isWriteCommandsSupported(stats)) {
1040             final BatchedWrite write = BatchedWrite.insert(continueOnError,
1041                     durability, documents);
1042 
1043             doWriteAsync(stats, new LongToIntCallback(results), write);
1044         }
1045         else {
1046             // Make sure the documents have an _id.
1047             final List<Document> docs = new ArrayList<Document>(
1048                     documents.length);
1049             for (final DocumentAssignable docAssignable : documents) {
1050                 final Document doc = docAssignable.asDocument();
1051                 if (!doc.contains(ID_FIELD_NAME)
1052                         && (doc instanceof RootDocument)) {
1053                     ((RootDocument) doc).injectId();
1054                 }
1055                 docs.add(doc);
1056             }
1057 
1058             final Insert insertMessage = new Insert(getDatabaseName(), myName,
1059                     docs, continueOnError,
1060                     VersionRange.minimum(requiredServerVersion));
1061             if (Durability.NONE == durability) {
1062                 myClient.send(insertMessage, null);
1063                 results.callback(Integer.valueOf(-1));
1064             }
1065             else {
1066                 myClient.send(insertMessage, asGetLastError(durability),
1067                         new ReplyIntegerCallback(results));
1068             }
1069         }
1070     }
1071 
1072     /**
1073      * Performs a async write operation.
1074      * 
1075      * @param stats
1076      *            The stats for verifying the server support write commands.
1077      * @param results
1078      *            The callback for the write.
1079      * @param write
1080      *            The write to send.
1081      */
1082     protected void doWriteAsync(final ClusterStats stats,
1083             final Callback<Long> results, final BatchedWrite write) {
1084         if (isWriteCommandsSupported(stats)) {
1085 
1086             final List<BatchedWrite.Bundle> bundles = write.toBundles(
1087                     getName(), stats.getSmallestMaxBsonObjectSize(),
1088                     stats.getSmallestMaxBatchedWriteOperations());
1089             if (bundles.isEmpty()) {
1090                 results.callback(Long.valueOf(0));
1091                 return;
1092             }
1093 
1094             final BatchedWriteCallback callback = new BatchedWriteCallback(
1095                     getDatabaseName(), getName(), results, write, myClient,
1096                     bundles);
1097 
1098             // Push the messages out.
1099             callback.send();
1100         }
1101         else {
1102             final List<WriteOperation> operations = write.getWrites();
1103             if (operations.isEmpty()) {
1104                 results.callback(Long.valueOf(0));
1105                 return;
1106             }
1107 
1108             final BatchedNativeWriteCallback callback = new BatchedNativeWriteCallback(
1109                     results, write, this, operations);
1110 
1111             // Push the messages out.
1112             callback.send();
1113         }
1114     }
1115 
1116     /**
1117      * Determines if all of the servers in the cluster support the write
1118      * commands.
1119      * 
1120      * @param stats
1121      *            The cluster stats if they have already been retrieved.
1122      * @return True if all servers in the cluster are at least the
1123      *         {@link BatchedWrite#REQUIRED_VERSION}.
1124      */
1125     protected boolean isWriteCommandsSupported(final ClusterStats stats) {
1126         final ClusterStats clusterStats = (stats == null) ? myClient
1127                 .getClusterStats() : stats;
1128         final VersionRange serverVersionRange = clusterStats
1129                 .getServerVersionRange();
1130         final Version minServerVersion = serverVersionRange.getLowerBounds();
1131 
1132         return (BatchedWrite.REQUIRED_VERSION.compareTo(minServerVersion) <= 0);
1133     }
1134 
1135     /**
1136      * Converts the {@link Aggregate} object to an {@link AggregateCommand}.
1137      * 
1138      * @param command
1139      *            The {@link Aggregate} to convert.
1140      * @param explain
1141      *            If rue then have the server explain the aggregation instead of
1142      *            performing the aggregation.
1143      * @return The command to send to the server for the {@link Aggregate}.
1144      */
1145     protected AggregateCommand toCommand(final Aggregate command,
1146             final boolean explain) {
1147         Version minVersion = command.getRequiredVersion();
1148 
1149         final DocumentBuilder builder = BuilderFactory.start();
1150 
1151         builder.addString("aggregate", getName());
1152 
1153         // Pipeline of operations.
1154         final ArrayBuilder pipeline = builder.pushArray("pipeline");
1155         for (final Element e : command.getPipeline()) {
1156             pipeline.add(e);
1157         }
1158 
1159         // Options
1160         if (command.isAllowDiskUsage()) {
1161             builder.add("allowDiskUsage", true);
1162         }
1163         if (command.isUseCursor()) {
1164             final DocumentBuilder cursor = builder.push("cursor");
1165             if (command.getBatchSize() > 0) {
1166                 cursor.add("batchSize", command.getBatchSize());
1167             }
1168         }
1169         if (explain) {
1170             minVersion = Version.later(minVersion, Aggregate.EXPLAIN_VERSION);
1171             builder.add("explain", true);
1172         }
1173         if (command.getMaximumTimeMilliseconds() > 0) {
1174             builder.add("maxTimeMS", command.getMaximumTimeMilliseconds());
1175         }
1176 
1177         // Should be last since might wrap command in a $query element.
1178         final ReadPreference readPreference = updateReadPreference(builder,
1179                 command.getReadPreference(), true);
1180 
1181         final AggregateCommand commandMsg = new AggregateCommand(command,
1182                 getDatabaseName(), getName(), builder.build(), readPreference,
1183                 VersionRange.minimum(minVersion));
1184         return commandMsg;
1185     }
1186 
1187     /**
1188      * Determines the {@link ReadPreference} to be used based on the command's
1189      * {@code ReadPreference} or the collection's if the command's
1190      * {@code ReadPreference} is <code>null</code>. Updates the command's
1191      * {@link DocumentBuilder} with the {@code ReadPreference} details if
1192      * connected to a sharded cluster and the resulting {@code ReadPreference}
1193      * is not supported by the legacy settings.
1194      * 
1195      * @param builder
1196      *            The builder for the command document to augment with the read
1197      *            preferences if connected to a sharded cluster.
1198      * @param commandReadPreference
1199      *            The read preferences from the command.
1200      * @param createQueryElement
1201      *            If true then the existing builder's contents will be pushed
1202      *            into a $query sub-document. This is required to ensure the
1203      *            command is not rejected by the {@code mongod} after processing
1204      *            by the {@code mongos}.
1205      * @return The {@link ReadPreference} to use.
1206      */
1207     protected ReadPreference updateReadPreference(
1208             final DocumentBuilder builder,
1209             final ReadPreference commandReadPreference,
1210             final boolean createQueryElement) {
1211 
1212         ReadPreference readPreference = commandReadPreference;
1213         if (readPreference == null) {
1214             readPreference = getReadPreference();
1215         }
1216 
1217         if (!readPreference.isLegacy()
1218                 && (myClient.getClusterType() == ClusterType.SHARDED)) {
1219             if (createQueryElement) {
1220                 final Document query = builder.asDocument();
1221                 builder.reset();
1222                 builder.add("$query", query);
1223             }
1224             builder.add(ReadPreference.FIELD_NAME, readPreference);
1225         }
1226 
1227         return readPreference;
1228     }
1229 
1230     /**
1231      * Extension point for derived classes to force the
1232      * {@link #insertAsync(Callback, boolean, Durability, DocumentAssignable...)}
1233      * ,
1234      * {@link #updateAsync(Callback, DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
1235      * , and
1236      * {@link #deleteAsync(Callback, DocumentAssignable, boolean, Durability)}
1237      * methods to use the legacy {@link Insert}, {@link Update}, and
1238      * {@link Delete} messages regardless of the server version.
1239      * <p>
1240      * This version of the method always returns true.
1241      * </p>
1242      * 
1243      * @return Return true to allow the use of the write commands added in
1244      *         MongoDB 2.6. Use false to force the use of the {@link Insert},
1245      *         {@link Update}, and {@link Delete}
1246      */
1247     protected boolean useWriteCommand() {
1248         return true;
1249     }
1250 }