View Javadoc
1   /*
2    * #%L
3    * SynchronousMongoCollectionImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.util.Collection;
23  import java.util.List;
24  
25  import com.allanbank.mongodb.BatchedAsyncMongoCollection;
26  import com.allanbank.mongodb.Durability;
27  import com.allanbank.mongodb.ListenableFuture;
28  import com.allanbank.mongodb.MongoCollection;
29  import com.allanbank.mongodb.MongoDatabase;
30  import com.allanbank.mongodb.MongoDbException;
31  import com.allanbank.mongodb.MongoIterator;
32  import com.allanbank.mongodb.ReadPreference;
33  import com.allanbank.mongodb.Version;
34  import com.allanbank.mongodb.bson.Document;
35  import com.allanbank.mongodb.bson.DocumentAssignable;
36  import com.allanbank.mongodb.bson.Element;
37  import com.allanbank.mongodb.bson.ElementType;
38  import com.allanbank.mongodb.bson.NumericElement;
39  import com.allanbank.mongodb.bson.builder.BuilderFactory;
40  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
41  import com.allanbank.mongodb.bson.element.BooleanElement;
42  import com.allanbank.mongodb.bson.element.IntegerElement;
43  import com.allanbank.mongodb.builder.Aggregate;
44  import com.allanbank.mongodb.builder.BatchedWrite;
45  import com.allanbank.mongodb.builder.ConditionBuilder;
46  import com.allanbank.mongodb.builder.Count;
47  import com.allanbank.mongodb.builder.Distinct;
48  import com.allanbank.mongodb.builder.Find;
49  import com.allanbank.mongodb.builder.FindAndModify;
50  import com.allanbank.mongodb.builder.GroupBy;
51  import com.allanbank.mongodb.builder.Index;
52  import com.allanbank.mongodb.builder.MapReduce;
53  import com.allanbank.mongodb.builder.ParallelScan;
54  import com.allanbank.mongodb.client.callback.FutureReplyCallback;
55  import com.allanbank.mongodb.client.callback.ValidatingReplyCallback;
56  import com.allanbank.mongodb.client.message.CreateIndexCommand;
57  import com.allanbank.mongodb.util.FutureUtils;
58  
59  /**
60   * Implementation of the {@link MongoCollection} interface including the
61   * synchronous methods.
62   * 
63   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
64   *         mutated in incompatible ways between any two releases of the driver.
65   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
66   */
67  public class SynchronousMongoCollectionImpl extends
68          AbstractAsyncMongoCollection implements MongoCollection {
69  
70      /**
71       * Create a new MongoDatabaseClient.
72       * 
73       * @param client
74       *            The client for interacting with MongoDB.
75       * @param database
76       *            The database the collection is a part of.
77       * @param name
78       *            The name of the collection we interact with.
79       */
80      public SynchronousMongoCollectionImpl(final Client client,
81              final MongoDatabase database, final String name) {
82          super(client, database, name);
83      }
84  
85      /**
86       * {@inheritDoc}
87       * <p>
88       * Overridden to call the {@link #aggregateAsync(Aggregate)}.
89       * </p>
90       * 
91       * @see #aggregateAsync(Aggregate)
92       */
93      @Override
94      public MongoIterator<Document> aggregate(final Aggregate command)
95              throws MongoDbException {
96          return FutureUtils.unwrap(aggregateAsync(command));
97      }
98  
99      /**
100      * {@inheritDoc}
101      * <p>
102      * Overridden to call the {@link #aggregate(Aggregate)}.
103      * </p>
104      */
105     @Override
106     public MongoIterator<Document> aggregate(final Aggregate.Builder command)
107             throws MongoDbException {
108         return aggregate(command.build());
109     }
110 
111     /**
112      * {@inheritDoc}
113      * <p>
114      * Overridden to call the {@link #count(DocumentAssignable,ReadPreference)}
115      * method with {@link #getReadPreference()} as the <tt>readPreference</tt>
116      * argument and an empty {@code query} document.
117      * </p>
118      */
119     @Override
120     public long count() throws MongoDbException {
121         return count(BuilderFactory.start(), getReadPreference());
122     }
123 
124     /**
125      * {@inheritDoc}
126      * <p>
127      * Overridden to call the {@link #countAsync(Count)} and unwrap the result.
128      * </p>
129      */
130     @Override
131     public long count(final Count count) throws MongoDbException {
132         final ListenableFuture<Long> future = countAsync(count);
133 
134         return FutureUtils.unwrap(future).longValue();
135     }
136 
137     /**
138      * {@inheritDoc}
139      * <p>
140      * Overridden to call the {@link #count(Count) count(count.build())}.
141      * </p>
142      */
143     @Override
144     public long count(final Count.Builder count) throws MongoDbException {
145         return count(count.build());
146     }
147 
148     /**
149      * {@inheritDoc}
150      * <p>
151      * Overridden to call the {@link #count(DocumentAssignable, ReadPreference)}
152      * method with {@link #getReadPreference()} as the <tt>readPreference</tt>
153      * argument.
154      * </p>
155      */
156     @Override
157     public long count(final DocumentAssignable query) throws MongoDbException {
158         return count(query, getReadPreference());
159     }
160 
161     /**
162      * {@inheritDoc}
163      * <p>
164      * Overridden to call the
165      * {@link #countAsync(DocumentAssignable, ReadPreference)} method.
166      * </p>
167      */
168     @Override
169     public long count(final DocumentAssignable query,
170             final ReadPreference readPreference) throws MongoDbException {
171 
172         final ListenableFuture<Long> future = countAsync(query, readPreference);
173 
174         return FutureUtils.unwrap(future).longValue();
175     }
176 
177     /**
178      * {@inheritDoc}
179      * <p>
180      * Overridden to call the {@link #count(DocumentAssignable,ReadPreference)}
181      * method with an empty {@code query} document.
182      * </p>
183      */
184     @Override
185     public long count(final ReadPreference readPreference)
186             throws MongoDbException {
187         return count(BuilderFactory.start(), readPreference);
188     }
189 
190     /**
191      * {@inheritDoc}
192      * <p>
193      * Overridden to call the {@link #createIndex(String, boolean, Element...)}
194      * method with <code>null</code> for the name.
195      * </p>
196      * 
197      * @see #createIndex(String, boolean, Element...)
198      */
199     @Override
200     public void createIndex(final boolean unique, final Element... keys)
201             throws MongoDbException {
202         createIndex(null, unique, keys);
203     }
204 
205     /**
206      * {@inheritDoc}
207      * <p>
208      * Overridden to call the
209      * {@link #createIndex(String,DocumentAssignable,Element...)} method with
210      * <code>null</code> for <tt>name</tt>.
211      * </p>
212      * 
213      * @see #createIndex(String,DocumentAssignable,Element...)
214      */
215     @Override
216     public void createIndex(final DocumentAssignable options,
217             final Element... keys) throws MongoDbException {
218         createIndex(null, options, keys);
219     }
220 
221     /**
222      * {@inheritDoc}
223      * <p>
224      * Overridden to call the
225      * {@link #createIndex(DocumentAssignable, Element...)} method with
226      * {@link #EMPTY_INDEX_OPTIONS} for <tt>options</tt>.
227      * </p>
228      * 
229      * @see #createIndex(DocumentAssignable, Element...)
230      */
231     @Override
232     public void createIndex(final Element... keys) throws MongoDbException {
233         createIndex(EMPTY_INDEX_OPTIONS, keys);
234     }
235 
236     /**
237      * {@inheritDoc}
238      * <p>
239      * Overridden to call the
240      * {@link #createIndex(String,DocumentAssignable,Element...)} method with
241      * {@link #UNIQUE_INDEX_OPTIONS} if {@code unique} is <code>true</code> or
242      * {@link #EMPTY_INDEX_OPTIONS} id {@code unique} is <code>false</code>.
243      * </p>
244      * 
245      * @see #createIndex(String, DocumentAssignable, Element...)
246      */
247     @Override
248     public void createIndex(final String name, final boolean unique,
249             final Element... keys) throws MongoDbException {
250         createIndex(name, unique ? UNIQUE_INDEX_OPTIONS : EMPTY_INDEX_OPTIONS,
251                 keys);
252     }
253 
254     /**
255      * {@inheritDoc}
256      * <p>
257      * Overridden to insert the index document into the 'system.indexes'
258      * collection or use the {@code createIndexes} command as appropriate.
259      * </p>
260      * 
261      * @see MongoCollection#createIndex(String,DocumentAssignable,Element...)
262      */
263     @Override
264     public void createIndex(final String name,
265             final DocumentAssignable options, final Element... keys)
266             throws MongoDbException {
267 
268         if (isCreateIndexesSupported()) {
269             final CreateIndexCommand command = new CreateIndexCommand(
270                     getDatabaseName(), getName(), keys, name, options);
271 
272             final FutureReplyCallback callback = new FutureReplyCallback();
273             myClient.send(command, new ValidatingReplyCallback(callback));
274 
275             FutureUtils.unwrap(callback);
276         }
277         else {
278 
279             String indexName = name;
280             if ((name == null) || name.isEmpty()) {
281                 indexName = CreateIndexCommand.buildIndexName(keys);
282             }
283 
284             final DocumentBuilder indexEntryBuilder = BuilderFactory.start();
285             indexEntryBuilder.addString("name", indexName);
286             indexEntryBuilder.addString("ns", getDatabaseName() + "."
287                     + getName());
288 
289             final DocumentBuilder keyBuilder = indexEntryBuilder.push("key");
290             for (final Element key : keys) {
291                 keyBuilder.add(key);
292             }
293 
294             for (final Element option : options.asDocument()) {
295                 indexEntryBuilder.add(option);
296             }
297 
298             final SynchronousMongoCollectionImpl indexCollection = new SynchronousMongoCollectionImpl(
299                     myClient, myDatabase, "system.indexes");
300             final Document indexDocument = indexEntryBuilder.build();
301             if (indexCollection.findOne(indexDocument) == null) {
302 
303                 final Version requiredServerVersion = determineIndexServerVersion(keys);
304 
305                 final FutureCallback<Integer> callback = new FutureCallback<Integer>();
306                 indexCollection.doInsertAsync(callback, false, Durability.ACK,
307                         requiredServerVersion, indexDocument);
308 
309                 FutureUtils.unwrap(callback);
310             }
311         }
312     }
313 
314     /**
315      * {@inheritDoc}
316      * <p>
317      * Overridden to call the
318      * {@link #delete(DocumentAssignable, boolean, Durability)} method with
319      * false as the <tt>singleDelete</tt> argument and the
320      * {@link #getDurability() default durability}.
321      * </p>
322      * 
323      * @see #delete(DocumentAssignable, boolean, Durability)
324      */
325     @Override
326     public long delete(final DocumentAssignable query) throws MongoDbException {
327         return delete(query, DELETE_SINGLE_DELETE_DEFAULT, getDurability());
328     }
329 
330     /**
331      * {@inheritDoc}
332      * <p>
333      * Overridden to call the
334      * {@link #delete(DocumentAssignable, boolean, Durability)} method with the
335      * {@link #getDurability() default durability}.
336      * </p>
337      * 
338      * @see #delete(DocumentAssignable, boolean, Durability)
339      */
340     @Override
341     public long delete(final DocumentAssignable query,
342             final boolean singleDelete) throws MongoDbException {
343         return delete(query, singleDelete, getDurability());
344     }
345 
346     /**
347      * {@inheritDoc}
348      * <p>
349      * Overridden to call the
350      * {@link #deleteAsync(DocumentAssignable, boolean, Durability)} method.
351      * </p>
352      * 
353      * @see #deleteAsync(DocumentAssignable, boolean, Durability)
354      */
355     @Override
356     public long delete(final DocumentAssignable query,
357             final boolean singleDelete, final Durability durability)
358             throws MongoDbException {
359 
360         final ListenableFuture<Long> future = deleteAsync(query, singleDelete,
361                 durability);
362 
363         return FutureUtils.unwrap(future).longValue();
364     }
365 
366     /**
367      * {@inheritDoc}
368      * <p>
369      * Overridden to call the
370      * {@link #deleteAsync(DocumentAssignable, boolean, Durability)} method with
371      * false as the <tt>singleDelete</tt> argument.
372      * </p>
373      * 
374      * @see #delete(DocumentAssignable, boolean, Durability)
375      */
376     @Override
377     public long delete(final DocumentAssignable query,
378             final Durability durability) throws MongoDbException {
379         return delete(query, DELETE_SINGLE_DELETE_DEFAULT, durability);
380     }
381 
382     /**
383      * {@inheritDoc}
384      * <p>
385      * Overridden to call the {@link #distinctAsync(Distinct)}.
386      * </p>
387      */
388     @Override
389     public MongoIterator<Element> distinct(final Distinct command)
390             throws MongoDbException {
391         return FutureUtils.unwrap(distinctAsync(command));
392     }
393 
394     /**
395      * {@inheritDoc}
396      * <p>
397      * Overridden to call the {@link #distinct(Distinct)}.
398      * </p>
399      */
400     @Override
401     public MongoIterator<Element> distinct(final Distinct.Builder command)
402             throws MongoDbException {
403         return distinct(command.build());
404     }
405 
406     /**
407      * {@inheritDoc}
408      * <p>
409      * Overridden to issue a { "drop" : <collection_name> } command.
410      * </p>
411      * 
412      * @see MongoCollection#drop()
413      */
414     @Override
415     public boolean drop() {
416         final Document result = myDatabase.runCommand("drop", myName, null);
417         final List<NumericElement> okElem = result.find(NumericElement.class,
418                 "ok");
419 
420         return ((okElem.size() > 0) && (okElem.get(0).getIntValue() > 0));
421     }
422 
423     /**
424      * {@inheritDoc}
425      * <p>
426      * To generate the name of the index and then drop it.
427      * </p>
428      */
429     @Override
430     public boolean dropIndex(final IntegerElement... keys)
431             throws MongoDbException {
432         return dropIndex(CreateIndexCommand.buildIndexName(keys));
433     }
434 
435     /**
436      * {@inheritDoc}
437      * <p>
438      * Overridden to issue a { "deleteIndexes" : <collection_name>, index :
439      * <name> } command.
440      * </p>
441      */
442     @Override
443     public boolean dropIndex(final String name) throws MongoDbException {
444 
445         final DocumentBuilder options = BuilderFactory.start();
446         options.addString("index", name);
447 
448         final Document result = myDatabase.runCommand("deleteIndexes", myName,
449                 options.build());
450         final List<NumericElement> okElem = result.find(NumericElement.class,
451                 "ok");
452 
453         return ((okElem.size() > 0) && (okElem.get(0).getIntValue() > 0));
454     }
455 
456     /**
457      * {@inheritDoc}
458      */
459     @Override
460     public boolean exists() throws MongoDbException {
461         return myDatabase.listCollectionNames().contains(getName());
462     }
463 
464     /**
465      * {@inheritDoc}
466      * <p>
467      * Overridden to call the {@link #explainAsync(Aggregate)} method.
468      * </p>
469      */
470     @Override
471     public Document explain(final Aggregate aggregation)
472             throws MongoDbException {
473         return FutureUtils.unwrap(explainAsync(aggregation));
474     }
475 
476     /**
477      * {@inheritDoc}
478      * <p>
479      * Overridden to call the {@link #explainAsync(Aggregate)} method.
480      * </p>
481      */
482     @Override
483     public Document explain(final Aggregate.Builder aggregation)
484             throws MongoDbException {
485         return FutureUtils.unwrap(explainAsync(aggregation.build()));
486 
487     }
488 
489     /**
490      * {@inheritDoc}
491      * <p>
492      * Overridden to call the {@link #explain(Find)} method.
493      * </p>
494      * 
495      * @see #explain(Find)
496      */
497     @Override
498     public Document explain(final DocumentAssignable query)
499             throws MongoDbException {
500         return explain(new Find.Builder(query).build());
501     }
502 
503     /**
504      * {@inheritDoc}
505      * <p>
506      * Overridden to call the {@link #explainAsync(Find)} method.
507      * </p>
508      * 
509      * @see #explainAsync(Find)
510      */
511     @Override
512     public Document explain(final Find query) throws MongoDbException {
513         return FutureUtils.unwrap(explainAsync(query));
514     }
515 
516     /**
517      * {@inheritDoc}
518      * <p>
519      * Overridden to call the {@link #explain(Find)} method.
520      * </p>
521      */
522     @Override
523     public Document explain(final Find.Builder query) throws MongoDbException {
524         return explain(query.build());
525     }
526 
527     /**
528      * {@inheritDoc}
529      * <p>
530      * Overridden to call the {@link #findAsync(DocumentAssignable)} method.
531      * </p>
532      * 
533      * @see #findAsync(DocumentAssignable)
534      */
535     @Override
536     public MongoIterator<Document> find(final DocumentAssignable query)
537             throws MongoDbException {
538         return FutureUtils.unwrap(findAsync(query));
539     }
540 
541     /**
542      * {@inheritDoc}
543      * <p>
544      * Overridden to call the {@link #findAsync(Find)} method.
545      * </p>
546      * 
547      * @see #findAsync(Find)
548      */
549     @Override
550     public MongoIterator<Document> find(final Find query)
551             throws MongoDbException {
552         return FutureUtils.unwrap(findAsync(query));
553     }
554 
555     /**
556      * {@inheritDoc}
557      * <p>
558      * Overridden to call the {@link #find(Find)} method.
559      * </p>
560      */
561     @Override
562     public MongoIterator<Document> find(final Find.Builder query)
563             throws MongoDbException {
564         return find(query.build());
565     }
566 
567     /**
568      * {@inheritDoc}
569      * <p>
570      * Overridden to call the {@link #findAndModifyAsync(FindAndModify)}.
571      * </p>
572      * 
573      * @see #findAndModifyAsync(FindAndModify)
574      */
575     @Override
576     public Document findAndModify(final FindAndModify command)
577             throws MongoDbException {
578         return FutureUtils.unwrap(findAndModifyAsync(command));
579     }
580 
581     /**
582      * {@inheritDoc}
583      * <p>
584      * Overridden to call the {@link #findAndModify(FindAndModify)}.
585      * </p>
586      */
587     @Override
588     public Document findAndModify(final FindAndModify.Builder command)
589             throws MongoDbException {
590         return findAndModify(command.build());
591     }
592 
593     /**
594      * {@inheritDoc}
595      * <p>
596      * Overridden to call the {@link #findOneAsync(DocumentAssignable)}.
597      * </p>
598      * 
599      * @see #findOneAsync(DocumentAssignable)
600      */
601     @Override
602     public Document findOne(final DocumentAssignable query)
603             throws MongoDbException {
604         return FutureUtils.unwrap(findOneAsync(query));
605     }
606 
607     /**
608      * <p>
609      * Overridden to call the
610      * {@link #findOneAsync(com.allanbank.mongodb.Callback, Find)}.
611      * </p>
612      * 
613      * @see #findOneAsync(com.allanbank.mongodb.Callback, Find)
614      */
615     @Override
616     public Document findOne(final Find query) throws MongoDbException {
617         return FutureUtils.unwrap(findOneAsync(query));
618     }
619 
620     /**
621      * {@inheritDoc}
622      * <p>
623      * Overridden to call the {@link #findOne(Find)} method.
624      * </p>
625      */
626     @Override
627     public Document findOne(final Find.Builder query) throws MongoDbException {
628         return findOne(query.build());
629     }
630 
631     /**
632      * {@inheritDoc}
633      * <p>
634      * Overridden to call the {@link #groupByAsync(GroupBy)}.
635      * </p>
636      */
637     @Override
638     public MongoIterator<Element> groupBy(final GroupBy command)
639             throws MongoDbException {
640         return FutureUtils.unwrap(groupByAsync(command));
641     }
642 
643     /**
644      * {@inheritDoc}
645      * <p>
646      * Overridden to call the {@link #groupBy(GroupBy)}.
647      * </p>
648      */
649     @Override
650     public MongoIterator<Element> groupBy(final GroupBy.Builder command)
651             throws MongoDbException {
652         return groupBy(command.build());
653     }
654 
655     /**
656      * {@inheritDoc}
657      * <p>
658      * Overridden to call the
659      * {@link #insert(boolean, Durability, DocumentAssignable...)} method with
660      * the {@link #getDurability() default durability}.
661      * </p>
662      * 
663      * @see #insert(boolean, Durability, DocumentAssignable[])
664      */
665     @Override
666     public int insert(final boolean continueOnError,
667             final DocumentAssignable... documents) throws MongoDbException {
668         return insert(continueOnError, getDurability(), documents);
669     }
670 
671     /**
672      * {@inheritDoc}
673      * <p>
674      * Overridden to call the
675      * {@link #insertAsync(boolean, Durability, DocumentAssignable...)} method.
676      * </p>
677      * 
678      * @see #insertAsync(boolean, Durability, DocumentAssignable[])
679      */
680     @Override
681     public int insert(final boolean continueOnError,
682             final Durability durability, final DocumentAssignable... documents)
683             throws MongoDbException {
684         final ListenableFuture<Integer> future = insertAsync(continueOnError,
685                 durability, documents);
686 
687         return FutureUtils.unwrap(future).intValue();
688     }
689 
690     /**
691      * {@inheritDoc}
692      * <p>
693      * Overridden to call the
694      * {@link #insert(boolean, Durability, DocumentAssignable...)} method with
695      * <tt>continueOnError</tt> set to false and the {@link #getDurability()
696      * default durability}.
697      * </p>
698      * 
699      * @see MongoCollection#insertAsync(boolean, Durability,
700      *      DocumentAssignable[])
701      */
702     @Override
703     public int insert(final DocumentAssignable... documents)
704             throws MongoDbException {
705         return insert(INSERT_CONTINUE_ON_ERROR_DEFAULT, getDurability(),
706                 documents);
707     }
708 
709     /**
710      * {@inheritDoc}
711      * <p>
712      * Overridden to call the
713      * {@link #insert(boolean, Durability, DocumentAssignable...)} method with
714      * <tt>continueOnError</tt> set to false.
715      * </p>
716      * 
717      * @see #insert(boolean, Durability, DocumentAssignable[])
718      */
719     @Override
720     public int insert(final Durability durability,
721             final DocumentAssignable... documents) throws MongoDbException {
722         return insert(INSERT_CONTINUE_ON_ERROR_DEFAULT, durability, documents);
723     }
724 
725     /**
726      * {@inheritDoc}
727      * <p>
728      * Overridden to send a {@code collStats} command to the MongoDB server and
729      * look for the {@code capped} field to determine if the collection is
730      * capped or not.
731      * </p>
732      * 
733      * @see MongoCollection#isCapped
734      */
735     @Override
736     public boolean isCapped() throws MongoDbException {
737         final Document statistics = stats();
738 
739         final NumericElement numeric = statistics.get(NumericElement.class,
740                 "capped");
741         if (numeric != null) {
742             return (numeric.getIntValue() != 0);
743         }
744 
745         final BooleanElement bool = statistics.get(BooleanElement.class,
746                 "capped");
747         if (bool != null) {
748             return bool.getValue();
749         }
750 
751         // Not found implies not capped.
752         return false;
753     }
754 
755     /**
756      * {@inheritDoc}
757      * <p>
758      * Overridden to call the {@link #mapReduceAsync(MapReduce)}.
759      * </p>
760      * 
761      * @see #mapReduceAsync(MapReduce)
762      */
763     @Override
764     public MongoIterator<Document> mapReduce(final MapReduce command)
765             throws MongoDbException {
766         return FutureUtils.unwrap(mapReduceAsync(command));
767     }
768 
769     /**
770      * {@inheritDoc}
771      * <p>
772      * Overridden to call the {@link #mapReduce(MapReduce)}.
773      * </p>
774      */
775     @Override
776     public MongoIterator<Document> mapReduce(final MapReduce.Builder command)
777             throws MongoDbException {
778         return mapReduce(command.build());
779     }
780 
781     /**
782      * {@inheritDoc}
783      * <p>
784      * Overridden to call the {@link #parallelScanAsync(ParallelScan)}.
785      * </p>
786      * 
787      * @see #parallelScanAsync(ParallelScan)
788      */
789     @Override
790     public Collection<MongoIterator<Document>> parallelScan(
791             final ParallelScan parallelScan) throws MongoDbException {
792         return FutureUtils.unwrap(parallelScanAsync(parallelScan));
793     }
794 
795     /**
796      * {@inheritDoc}
797      * <p>
798      * Overridden to call the {@link #parallelScan(ParallelScan)}.
799      * </p>
800      * 
801      * @see #parallelScanAsync(ParallelScan)
802      */
803     @Override
804     public Collection<MongoIterator<Document>> parallelScan(
805             final ParallelScan.Builder parallelScan) throws MongoDbException {
806         return parallelScan(parallelScan.build());
807     }
808 
809     /**
810      * {@inheritDoc}
811      * <p>
812      * Overridden to call the {@link #save(DocumentAssignable, Durability)}
813      * using the {@link #getDurability() default durability}.
814      * </p>
815      */
816     @Override
817     public int save(final DocumentAssignable document) throws MongoDbException {
818         return save(document, getDurability());
819     }
820 
821     /**
822      * {@inheritDoc}
823      * <p>
824      * Overridden to call the {@link #saveAsync(DocumentAssignable, Durability)}
825      * .
826      * </p>
827      */
828     @Override
829     public int save(final DocumentAssignable document,
830             final Durability durability) throws MongoDbException {
831         return FutureUtils.unwrap(saveAsync(document, durability)).intValue();
832     }
833 
834     /**
835      * {@inheritDoc}
836      * <p>
837      * Overridden to return a {@link BatchedAsyncMongoCollection}.
838      * </p>
839      */
840     @Override
841     public BatchedAsyncMongoCollection startBatch() {
842         return new BatchedAsyncMongoCollectionImpl(myClient, myDatabase, myName);
843     }
844 
845     /**
846      * {@inheritDoc}
847      * <p>
848      * Overridden to send a {@code collStats} command to the MongoDB server.
849      * </p>
850      * 
851      * @see MongoCollection#stats
852      */
853     @Override
854     public Document stats() throws MongoDbException {
855         return myDatabase.runCommand("collStats", getName(), null);
856     }
857 
858     /**
859      * {@inheritDoc}
860      * <p>
861      * Overridden to call the
862      * {@link #textSearchAsync(com.allanbank.mongodb.builder.Text)}.
863      * </p>
864      * 
865      * @see #textSearchAsync(com.allanbank.mongodb.builder.Text)
866      * @deprecated Support for the {@code text} command was deprecated in the
867      *             2.6 version of MongoDB. Use the
868      *             {@link ConditionBuilder#text(String) $text} query operator
869      *             instead. This method will not be removed until two releases
870      *             after the MongoDB 2.6 release (e.g. 2.10 if the releases are
871      *             2.8 and 2.10).
872      */
873     @Deprecated
874     @Override
875     public MongoIterator<com.allanbank.mongodb.builder.TextResult> textSearch(
876             final com.allanbank.mongodb.builder.Text command)
877             throws MongoDbException {
878         return FutureUtils.unwrap(textSearchAsync(command));
879     }
880 
881     /**
882      * {@inheritDoc}
883      * <p>
884      * Overridden to call the
885      * {@link #textSearch(com.allanbank.mongodb.builder.Text)}.
886      * </p>
887      * 
888      * @deprecated Support for the {@code text} command was deprecated in the
889      *             2.6 version of MongoDB. Use the
890      *             {@link ConditionBuilder#text(String) $text} query operator
891      *             instead. This method will not be removed until two releases
892      *             after the MongoDB 2.6 release (e.g. 2.10 if the releases are
893      *             2.8 and 2.10).
894      */
895     @Deprecated
896     @Override
897     public MongoIterator<com.allanbank.mongodb.builder.TextResult> textSearch(
898             final com.allanbank.mongodb.builder.Text.Builder command)
899             throws MongoDbException {
900         return textSearch(command.build());
901     }
902 
903     /**
904      * {@inheritDoc}
905      * <p>
906      * Overridden to call the
907      * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
908      * method with multiUpdate set to true, upsert set to false, and using the
909      * {@link #getDurability() default durability}.
910      * </p>
911      * 
912      * @see #update(DocumentAssignable, DocumentAssignable, boolean, boolean,
913      *      Durability)
914      */
915     @Override
916     public long update(final DocumentAssignable query,
917             final DocumentAssignable update) throws MongoDbException {
918         return update(query, update, UPDATE_MULTIUPDATE_DEFAULT,
919                 UPDATE_UPSERT_DEFAULT, getDurability());
920     }
921 
922     /**
923      * {@inheritDoc}
924      * <p>
925      * Overridden to call the
926      * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
927      * method with the {@link #getDurability() default durability}.
928      * </p>
929      * 
930      * @see #update(DocumentAssignable, DocumentAssignable, boolean, boolean,
931      *      Durability)
932      */
933     @Override
934     public long update(final DocumentAssignable query,
935             final DocumentAssignable update, final boolean multiUpdate,
936             final boolean upsert) throws MongoDbException {
937         return update(query, update, multiUpdate, upsert, getDurability());
938     }
939 
940     /**
941      * {@inheritDoc}
942      * <p>
943      * Overridden to call the
944      * {@link #updateAsync(DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
945      * method.
946      * </p>
947      * 
948      * @see #updateAsync(DocumentAssignable, DocumentAssignable, boolean,
949      *      boolean, Durability)
950      */
951     @Override
952     public long update(final DocumentAssignable query,
953             final DocumentAssignable update, final boolean multiUpdate,
954             final boolean upsert, final Durability durability)
955             throws MongoDbException {
956 
957         final ListenableFuture<Long> future = updateAsync(query, update,
958                 multiUpdate, upsert, durability);
959 
960         return FutureUtils.unwrap(future).longValue();
961     }
962 
963     /**
964      * {@inheritDoc}
965      * <p>
966      * Overridden to call the
967      * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean, Durability)}
968      * method with multiUpdate set to true, and upsert set to false.
969      * </p>
970      * 
971      * @see #update(DocumentAssignable, DocumentAssignable, boolean, boolean,
972      *      Durability)
973      */
974     @Override
975     public long update(final DocumentAssignable query,
976             final DocumentAssignable update, final Durability durability)
977             throws MongoDbException {
978         return update(query, update, UPDATE_MULTIUPDATE_DEFAULT,
979                 UPDATE_UPSERT_DEFAULT, durability);
980     }
981 
982     /**
983      * {@inheritDoc}
984      * <p>
985      * Overridden to send a {@code collMod} command to the server.
986      * </p>
987      */
988     @Override
989     public Document updateOptions(final DocumentAssignable options)
990             throws MongoDbException {
991         final FutureCallback<Document> future = new FutureCallback<Document>(
992                 getLockType());
993 
994         final DocumentBuilder commandDoc = BuilderFactory.start();
995         commandDoc.add("collMod", getName());
996         addOptions("collMod", options, commandDoc);
997 
998         myDatabase.runCommandAsync(future, commandDoc.build(),
999                 Version.VERSION_2_2);
1000 
1001         return FutureUtils.unwrap(future);
1002     }
1003 
1004     /**
1005      * {@inheritDoc}
1006      * <p>
1007      * Overridden to send a {@code validate} command to the server.
1008      * </p>
1009      */
1010     @Override
1011     public Document validate(final ValidateMode mode) throws MongoDbException {
1012         Document result = null;
1013 
1014         switch (mode) {
1015         case INDEX_ONLY:
1016             result = myDatabase.runCommand("validate", getName(),
1017                     BuilderFactory.start().add("scandata", false).build());
1018             break;
1019         case NORMAL:
1020             result = myDatabase.runCommand("validate", getName(), null);
1021             break;
1022         case FULL:
1023             result = myDatabase.runCommand("validate", getName(),
1024                     BuilderFactory.start().add("full", true).build());
1025             break;
1026         }
1027 
1028         return result;
1029     }
1030 
1031     /**
1032      * {@inheritDoc}
1033      * <p>
1034      * Overridden to call the {@link #writeAsync(BatchedWrite)}.
1035      * </p>
1036      * 
1037      * @see #writeAsync(BatchedWrite)
1038      */
1039     @Override
1040     public long write(final BatchedWrite write) throws MongoDbException {
1041         final ListenableFuture<Long> future = writeAsync(write);
1042 
1043         return FutureUtils.unwrap(future).longValue();
1044     }
1045 
1046     /**
1047      * {@inheritDoc}
1048      * <p>
1049      * Overridden to call the {@link #write(BatchedWrite)}.
1050      * </p>
1051      * 
1052      * @see #write(BatchedWrite)
1053      */
1054     @Override
1055     public long write(final BatchedWrite.Builder write) throws MongoDbException {
1056         return write(write.build());
1057     }
1058 
1059     /**
1060      * Adds the options to the document builder.
1061      * 
1062      * @param command
1063      *            The command to make sure is removed from the options.
1064      * @param options
1065      *            The options to be added. May be <code>null</code>.
1066      * @param builder
1067      *            The builder to add the options to.
1068      */
1069     protected void addOptions(final String command,
1070             final DocumentAssignable options, final DocumentBuilder builder) {
1071         if (options != null) {
1072             for (final Element element : options.asDocument()) {
1073                 if (!command.equals(element.getName())) {
1074                     builder.add(element);
1075                 }
1076             }
1077         }
1078     }
1079 
1080     /**
1081      * Determines the minimum server version required to support the provided
1082      * index keys and options.
1083      * 
1084      * @param keys
1085      *            The index keys.
1086      * @return The version required for the index. May be null.
1087      */
1088     protected Version determineIndexServerVersion(final Element[] keys) {
1089         Version result = null;
1090 
1091         for (final Element key : keys) {
1092             if (key.getType() == ElementType.STRING) {
1093                 final String type = key.getValueAsString();
1094                 if (Index.GEO_2DSPHERE_INDEX_NAME.equals(type)) {
1095                     result = Version.later(result, Version.VERSION_2_4);
1096                 }
1097                 else if (Index.HASHED_INDEX_NAME.equals(type)) {
1098                     result = Version.later(result, Version.VERSION_2_4);
1099                 }
1100                 else if (Index.TEXT_INDEX_NAME.equals(type)) {
1101                     result = Version.later(result, Version.VERSION_2_4);
1102                 }
1103             }
1104         }
1105 
1106         return result;
1107     }
1108 
1109     /**
1110      * Determines if all of the servers in the cluster support the
1111      * {@code createIndexes} command.
1112      * 
1113      * @return True if all servers in the cluster are at least the
1114      *         {@link BatchedWrite#REQUIRED_VERSION}.
1115      */
1116     protected boolean isCreateIndexesSupported() {
1117         final ClusterStats clusterStats = myClient.getClusterStats();
1118         final VersionRange serverVersionRange = clusterStats
1119                 .getServerVersionRange();
1120         final Version minServerVersion = serverVersionRange.getLowerBounds();
1121 
1122         return (CreateIndexCommand.REQUIRED_VERSION.compareTo(minServerVersion) <= 0);
1123     }
1124 }