View Javadoc
1   /*
2    * #%L
3    * Aggregate.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.builder;
22  
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.concurrent.TimeUnit;
28  
29  import com.allanbank.mongodb.MongoCollection;
30  import com.allanbank.mongodb.ReadPreference;
31  import com.allanbank.mongodb.Version;
32  import com.allanbank.mongodb.bson.DocumentAssignable;
33  import com.allanbank.mongodb.bson.Element;
34  import com.allanbank.mongodb.bson.builder.ArrayBuilder;
35  import com.allanbank.mongodb.bson.builder.BuilderFactory;
36  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
37  import com.allanbank.mongodb.bson.element.ArrayElement;
38  import com.allanbank.mongodb.bson.element.DocumentElement;
39  import com.allanbank.mongodb.bson.element.IntegerElement;
40  import com.allanbank.mongodb.builder.expression.Expression;
41  import com.allanbank.mongodb.builder.expression.Expressions;
42  
43  /**
44   * Aggregate provides support for the <tt>aggregate</tt> command supporting a
45   * pipeline of commands to execute.
46   * <p>
47   * Instances of this class are constructed via the inner {@link Builder} class.
48   * Due to the potential complexity of pipelines and the associated operators the
49   * <tt>Builder</tt> is intended to be used with the various support classes
50   * including the {@link Expressions} library. For example:<blockquote>
51   * 
52   * <pre>
53   * <code>
54   *  import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
55   *  import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
56   *  import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId};
57   *  import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where};
58   *  import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc};
59   *  import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc};
60   *  import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field};
61   *  import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set};
62   * 
63   *  DocumentBuilder b1 = BuilderFactory.start();
64   *  DocumentBuilder b2 = BuilderFactory.start();
65   *  Aggregate.Builder builder = new Aggregate.Builder();
66   * 
67   *  builder.match(where("state").notEqualTo("NZ"))
68   *          .group(id().addField("state")
69   *                     .addField("city"),
70   *                 set("pop").sum("pop"))
71   *          .sort(asc("pop"))
72   *          .group(id("_id.state"),
73   *                 set("biggestcity").last("_id.city"),
74   *                 set("biggestpop").last("pop"),
75   *                 set("smallestcity").first("_id.city"),
76   *                 set("smallestpop").first("pop"))
77   *          .project(
78   *                  includeWithoutId(),
79   *                  set("state", field("_id")),
80   *                  set("biggestCity",
81   *                          b1.add(set("name", field("biggestcity"))).add(
82   *                                  set("pop", field("biggestpop")))),
83   *                  set("smallestCity",
84   *                          b2.add(set("name", field("smallestcity"))).add(
85   *                                  set("pop", field("smallestpop")))))
86   *          .sort(desc("biggestCity.pop"));
87   * </code>
88   * </pre>
89   * 
90   * </blockquote>
91   * </p>
92   * 
93   * 
94   * @see <a
95   *      href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example
96   *      Inspired By</a>
97   * @api.yes This class is part of the driver's API. Public and protected members
98   *          will be deprecated for at least 1 non-bugfix release (version
99   *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
100  *          removed or modified.
101  * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
102  */
103 public class Aggregate {
104 
105     /**
106      * The first version of MongoDB to support the {@code $geoNear} pipeline
107      * operator.
108      */
109     public static final Version ALLOW_DISK_USAGE_REQUIRED_VERSION = Version
110             .parse("2.6");
111 
112     /**
113      * The first version of MongoDB to support the {@code aggregate} command
114      * using a cursor.
115      */
116     public static final Version CURSOR_VERSION = Version.parse("2.5.2");
117 
118     /**
119      * The first version of MongoDB to support the {@code aggregate} command
120      * with the explain option.
121      */
122     public static final Version EXPLAIN_VERSION = Version.parse("2.5.3");
123 
124     /**
125      * The first version of MongoDB to support the {@code $geoNear} pipeline
126      * operator.
127      */
128     public static final Version GEO_NEAR_REQUIRED_VERSION = Version.VERSION_2_4;
129 
130     /**
131      * The first version of MongoDB to support the {@code aggregate} command
132      * with the ability to limit the execution time on the server.
133      */
134     public static final Version MAX_TIMEOUT_VERSION = Find.MAX_TIMEOUT_VERSION;
135 
136     /**
137      * The first version of MongoDB to support the {@code $redact} pipeline
138      * operator.
139      */
140     public static final Version REDACT_REQUIRED_VERSION = Version
141             .parse("2.5.2");
142 
143     /** The first version of MongoDB to support the {@code aggregate} command. */
144     public static final Version REQUIRED_VERSION = Version.parse("2.1.0");
145 
146     /**
147      * Creates a new builder for a {@link Aggregate}.
148      * 
149      * @return The builder to construct a {@link Aggregate}.
150      */
151     public static Builder builder() {
152         return new Builder();
153     }
154 
155     /**
156      * Set to true if the aggregation results should be allowed to spill to
157      * disk.
158      */
159     private final boolean myAllowDiskUsage;
160 
161     /** The number of documents to be returned in each batch of results. */
162     private final int myBatchSize;
163 
164     /** The total number of documents to be returned. */
165     private final int myLimit;
166 
167     /** The maximum amount of time to allow the command to run. */
168     private final long myMaximumTimeMilliseconds;
169 
170     /** The pipeline of operations to be applied. */
171     private final List<Element> myPipeline;
172 
173     /** The read preference to use. */
174     private final ReadPreference myReadPreference;
175 
176     /** The version required for the aggregation. */
177     private final Version myRequiredVersion;
178 
179     /** Set to true if the aggregation results should be returned as a cursor. */
180     private final boolean myUseCursor;
181 
182     /**
183      * Creates a new Aggregation.
184      * 
185      * @param builder
186      *            The builder for the Aggregation instance.
187      */
188     protected Aggregate(final Builder builder) {
189         myPipeline = Collections.unmodifiableList(Arrays
190                 .asList(builder.myPipeline.build()));
191         myBatchSize = builder.myBatchSize;
192         myLimit = builder.myLimit;
193         myUseCursor = builder.myUseCursor;
194         myAllowDiskUsage = builder.myAllowDiskUsage;
195         myReadPreference = builder.myReadPreference;
196         myRequiredVersion = builder.myRequiredVersion;
197         myMaximumTimeMilliseconds = builder.myMaximumTimeMilliseconds;
198     }
199 
200     /**
201      * Returns the number of documents to be returned in each batch of results
202      * by the cursor.
203      * 
204      * @return The number of documents to be returned in each batch of results
205      *         by the cursor.
206      */
207     public int getBatchSize() {
208         return myBatchSize;
209     }
210 
211     /**
212      * Returns the total number of documents to be returned by the cursor.
213      * 
214      * @return The total number of documents to be returned the cursor.
215      */
216     public int getCursorLimit() {
217         return myLimit;
218     }
219 
220     /**
221      * Returns the maximum amount of time to allow the command to run on the
222      * Server before it is aborted.
223      * 
224      * @return The maximum amount of time to allow the command to run on the
225      *         Server before it is aborted.
226      * 
227      * @since MongoDB 2.6
228      */
229     public long getMaximumTimeMilliseconds() {
230         return myMaximumTimeMilliseconds;
231     }
232 
233     /**
234      * Returns the pipeline of operations to apply.
235      * 
236      * @return The pipeline of operations to apply.
237      */
238     public List<Element> getPipeline() {
239         return myPipeline;
240     }
241 
242     /**
243      * Returns the {@link ReadPreference} specifying which servers may be used
244      * to execute the aggregation.
245      * <p>
246      * If <code>null</code> then the {@link MongoCollection} instance's
247      * {@link ReadPreference} will be used.
248      * </p>
249      * 
250      * @return The read preference to use.
251      * 
252      * @see MongoCollection#getReadPreference()
253      */
254     public ReadPreference getReadPreference() {
255         return myReadPreference;
256     }
257 
258     /**
259      * Returns the version required for the aggregation.
260      * 
261      * @return The version required for the aggregation.
262      */
263     public Version getRequiredVersion() {
264         return myRequiredVersion;
265     }
266 
267     /**
268      * Returns true if the aggregation results should be allowed to spill to
269      * disk.
270      * 
271      * @return True if the aggregation results should be allowed to spill to
272      *         disk.
273      */
274     public boolean isAllowDiskUsage() {
275         return myAllowDiskUsage;
276     }
277 
278     /**
279      * Returns true if the aggregation results should be returned as a cursor.
280      * 
281      * @return True if the aggregation results should be returned as a cursor.
282      */
283     public boolean isUseCursor() {
284         return myUseCursor;
285     }
286 
287     /**
288      * Builder provides the ability to construct aggregate command pipelines.
289      * <p>
290      * Methods are provided for all existing pipeline operators and generic
291      * {@link #step} methods are provided to support future pipeline operators
292      * while in development or before the driver is updated.
293      * </p>
294      * <p>
295      * This builder is intended to be used with the various support classes
296      * including the {@link Expressions} library. For example:<blockquote>
297      * 
298      * <pre>
299      * <code>
300      *  import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
301      *  import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
302      *  import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId};
303      *  import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where};
304      *  import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc};
305      *  import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc};
306      *  import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field};
307      *  import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set};
308      * 
309      *  DocumentBuilder b1 = BuilderFactory.start();
310      *  DocumentBuilder b2 = BuilderFactory.start();
311      *  Aggregation.Builder builder = new Aggregation.Builder();
312      * 
313      *  builder.match(where("state").notEqualTo("NZ"))
314      *          .group(id().addField("state")
315      *                     .addField("city"),
316      *                 set("pop").sum("pop"))
317      *          .sort(asc("pop"))
318      *          .group(id("_id.state"),
319      *                 set("biggestcity").last("_id.city"),
320      *                 set("biggestpop").last("pop"),
321      *                 set("smallestcity").first("_id.city"),
322      *                 set("smallestpop").first("pop"))
323      *          .project(
324      *                  includeWithoutId(),
325      *                  set("state", field("_id")),
326      *                  set("biggestCity",
327      *                          b1.add(set("name", field("biggestcity"))).add(
328      *                                  set("pop", field("biggestpop")))),
329      *                  set("smallestCity",
330      *                          b2.add(set("name", field("smallestcity"))).add(
331      *                                  set("pop", field("smallestpop")))))
332      *          .sort(desc("biggestCity.pop"));
333      * </code>
334      * </pre>
335      * 
336      * </blockquote>
337      * </p>
338      * 
339      * @see <a
340      *      href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example
341      *      Inspired By</a>
342      * @api.yes This class is part of the driver's API. Public and protected
343      *          members will be deprecated for at least 1 non-bugfix release
344      *          (version numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;)
345      *          before being removed or modified.
346      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
347      */
348     public static class Builder {
349 
350         /**
351          * Set to true if the aggregation results should be allowed to spill to
352          * disk.
353          */
354         protected boolean myAllowDiskUsage;
355 
356         /** The number of documents to be returned in each batch of results. */
357         protected int myBatchSize;
358 
359         /** The total number of documents to be returned. */
360         protected int myLimit;
361 
362         /** The maximum amount of time to allow the command to run. */
363         protected long myMaximumTimeMilliseconds;
364 
365         /** The pipeline of operations to be applied. */
366         protected final ArrayBuilder myPipeline;
367 
368         /** The read preference to use. */
369         protected ReadPreference myReadPreference;
370 
371         /**
372          * The version required for the aggregation. This is computed
373          * automatically based on the pipeline constructed.
374          */
375         protected Version myRequiredVersion;
376 
377         /**
378          * Set to true if the aggregation results should be returned as a
379          * cursor.
380          */
381         protected boolean myUseCursor;
382 
383         /**
384          * Creates a new Builder.
385          */
386         public Builder() {
387             myPipeline = BuilderFactory.startArray();
388             reset();
389         }
390 
391         /**
392          * Allows the aggregation command can spill to disk.
393          * <p>
394          * This method delegates to {@link #allowDiskUsage(boolean)
395          * allowDiskUsage(true)}.
396          * </p>
397          * <p>
398          * This method also sets the builder to use a cursor to true.
399          * </p>
400          * 
401          * @return This builder for chaining method calls.
402          */
403         public Builder allowDiskUsage() {
404             return allowDiskUsage(true);
405         }
406 
407         /**
408          * Sets to true if the aggregation command can spill to disk.
409          * <p>
410          * This method delegates to {@link #setAllowDiskUsage(boolean)}.
411          * </p>
412          * <p>
413          * This method also sets the builder to use a cursor to true.
414          * </p>
415          * 
416          * @param allowDiskUsage
417          *            The new value for if the aggregation command can spill to
418          *            disk.
419          * @return This builder for chaining method calls.
420          */
421         public Builder allowDiskUsage(final boolean allowDiskUsage) {
422             return setAllowDiskUsage(allowDiskUsage);
423         }
424 
425         /**
426          * Sets the value of the number of documents to be returned in each
427          * batch.
428          * <p>
429          * This method delegates to {@link #setBatchSize(int)}.
430          * </p>
431          * <p>
432          * This method also sets the builder to use a cursor to true.
433          * </p>
434          * 
435          * @param batchSize
436          *            The new value for the number of documents to be returned
437          *            in each batch.
438          * @return This builder for chaining method calls.
439          */
440         public Builder batchSize(final int batchSize) {
441             return setBatchSize(batchSize);
442         }
443 
444         /**
445          * Constructs a new {@link Aggregate} object from the state of the
446          * builder.
447          * 
448          * @return The new {@link Aggregate} object.
449          */
450         public Aggregate build() {
451             return new Aggregate(this);
452         }
453 
454         /**
455          * Sets the value of the total number of documents to be returned.
456          * <p>
457          * This method delegates to {@link #setCusorLimit(int)}.
458          * </p>
459          * <p>
460          * This method also sets the builder to use a cursor to true.
461          * </p>
462          * 
463          * @param limit
464          *            The new value for the total number of documents to be
465          *            returned.
466          * @return This builder for chaining method calls.
467          */
468         public Builder cursorLimit(final int limit) {
469             return setCusorLimit(limit);
470         }
471 
472         /**
473          * Adds a <tt>$geoNear</tt> operation to the pipeline to select
474          * documents for the aggregation pipeline based on their relative
475          * location to a set point. The <tt>$geoNear</tt> must be the first
476          * option in the aggregation pipeline. <blockquote>
477          * 
478          * <pre>
479          * <code>
480          * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear};
481          * 
482          * {@link Aggregate.Builder} builder = new Aggregation.Builder();
483          * builder.geoNear( AggregationGeoNear.builder()
484          *           .location( new Point( 1, 2 ) )
485          *           .distanceLocationField( "stats.distance" )
486          *           .limit( 5 ).build() );
487          * </code>
488          * </pre>
489          * 
490          * </blockquote>
491          * 
492          * @param geoNear
493          *            The options for the GeoNear operation.
494          * @return This builder for chaining method calls.
495          * 
496          * @since MongoDB 2.4
497          */
498         public Builder geoNear(final AggregationGeoNear geoNear) {
499             myRequiredVersion = Version.later(myRequiredVersion,
500                     GEO_NEAR_REQUIRED_VERSION);
501 
502             return step("$geoNear", geoNear.asDocument());
503         }
504 
505         /**
506          * Adds a <tt>$geoNear</tt> operation to the pipeline to select
507          * documents for the aggregation pipeline based on their relative
508          * location to a set point. The <tt>$geoNear</tt> must be the first
509          * option in the aggregation pipeline. <blockquote>
510          * 
511          * <pre>
512          * <code>
513          * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear};
514          * 
515          * {@link Aggregate.Builder} builder = new Aggregation.Builder();
516          * builder.geoNear( AggregationGeoNear.builder()
517          *           .location( new Point( 1, 2 ) )
518          *           .distanceLocationField( "stats.distance" )
519          *           .limit( 5 ) );
520          * </code>
521          * </pre>
522          * 
523          * </blockquote>
524          * 
525          * @param geoNear
526          *            The options for the GeoNear operation.
527          * @return This builder for chaining method calls.
528          * 
529          * @since MongoDB 2.4
530          */
531         public Builder geoNear(final AggregationGeoNear.Builder geoNear) {
532             return geoNear(geoNear.build());
533         }
534 
535         /**
536          * Adds a <tt>$group</tt> operation to the pipeline to aggregate
537          * documents passing this point in the pipeline into a group of
538          * documents.
539          * <p>
540          * This method is intended to construct groups with simple dynamic or
541          * static id documents.
542          * </p>
543          * <blockquote>
544          * 
545          * <pre>
546          * <code>
547          * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
548          * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
549          * 
550          * {@link Aggregate.Builder} builder = new Aggregation.Builder();
551          * builder.group(
552          *           id("$field1"),
553          *           set("resultField1").uniqueValuesOf("$field2"),
554          *           set("resultField2").max("$field3"),
555          *           set("sum").sum("$field4") );
556          * </code>
557          * </pre>
558          * 
559          * </blockquote>
560          * 
561          * @param id
562          *            The builder for the <tt>_id</tt> field to specify unique
563          *            groups.
564          * @param aggregations
565          *            The specification for the group id and what fields to
566          *            aggregate in the form of a document.
567          * @return This builder for chaining method calls.
568          */
569         public Builder group(final AggregationGroupId id,
570                 final AggregationGroupField... aggregations) {
571 
572             final Element[] elements = new Element[aggregations.length + 1];
573             elements[0] = id.toElement();
574             for (int i = 0; i < aggregations.length; ++i) {
575                 elements[i + 1] = aggregations[i].toElement();
576             }
577 
578             return step("$group", elements);
579         }
580 
581         /**
582          * Adds a <tt>$group</tt> operation to the pipeline to aggregate
583          * documents passing this point in the pipeline into a group of
584          * documents.
585          * <p>
586          * This method is intended to construct groups with complex dynamic or
587          * static id documents. The {@link AggregationGroupId.Builder}
588          * implements the {@link DocumentBuilder} for construction of arbitrary
589          * complex _id documents.
590          * </p>
591          * <blockquote>
592          * 
593          * <pre>
594          * <code>
595          * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
596          * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
597          * 
598          * {@link Aggregate.Builder} builder = new Aggregation.Builder();
599          * builder.group(
600          *           id().addField("$field1").addField("$field2"),
601          *           set("resultField1").uniqueValuesOf("$field3"),
602          *           set("resultField2").first("$field4"),
603          *           set("count").count() );
604          * </code>
605          * </pre>
606          * 
607          * </blockquote>
608          * 
609          * @param id
610          *            The builder for the <tt>_id</tt> field to specify unique
611          *            groups.
612          * @param aggregations
613          *            The specification for the group id and what fields to
614          *            aggregate in the form of a document.
615          * @return This builder for chaining method calls.
616          */
617         public Builder group(final AggregationGroupId.Builder id,
618                 final AggregationGroupField... aggregations) {
619             return group(id.buildId(), aggregations);
620         }
621 
622         /**
623          * Adds a <tt>$group</tt> operation to the pipeline to aggregate
624          * documents passing this point in the pipeline into a group of
625          * documents.
626          * 
627          * @param aggregations
628          *            The specification for the group id and what fields to
629          *            aggregate in the form of a document.
630          * @return This builder for chaining method calls.
631          */
632         public Builder group(final DocumentAssignable aggregations) {
633             return step("$group", aggregations);
634         }
635 
636         /**
637          * Adds a <tt>$group</tt> operation to the pipeline to aggregate
638          * documents passing this point in the pipeline into a group of
639          * documents.
640          * <p>
641          * This method is intended to construct groups with complex dynamic or
642          * static id documents. The {@link AggregationGroupId.Builder}
643          * implements the {@link DocumentBuilder} for construction of arbitrary
644          * complex _id documents.
645          * </p>
646          * <blockquote>
647          * 
648          * <pre>
649          * <code>
650          * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
651          * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
652          * 
653          * {@link Aggregate.Builder} builder = new Aggregation.Builder();
654          * builder.group(
655          *           id().addInteger("i", 1),
656          *           set("resultField1").uniqueValuesOf("$field3"),
657          *           set("resultField2").first("$field4"),
658          *           set("count").count() );
659          * </code>
660          * </pre>
661          * 
662          * </blockquote>
663          * 
664          * @param id
665          *            The builder for the <tt>_id</tt> field to specify unique
666          *            groups.
667          * @param aggregations
668          *            The specification for the group id and what fields to
669          *            aggregate in the form of a document.
670          * @return This builder for chaining method calls.
671          */
672         public Builder group(final DocumentAssignable id,
673                 final AggregationGroupField... aggregations) {
674             return group(new AggregationGroupId(id), aggregations);
675         }
676 
677         /**
678          * Adds a <tt>$limit</tt> operation to the pipeline to stop producing
679          * documents passing this point in the pipeline once the limit of
680          * documents is reached.
681          * 
682          * @param numberOfDocuments
683          *            The number of documents to allow past this point in the
684          *            pipeline.
685          * @return This builder for chaining method calls.
686          * 
687          * @see <a
688          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit">Aggregation
689          *      Framework Operators - $limit</a>
690          */
691         public Builder limit(final int numberOfDocuments) {
692             return step("$limit", numberOfDocuments);
693         }
694 
695         /**
696          * Adds a <tt>$limit</tt> operation to the pipeline to stop producing
697          * documents passing this point in the pipeline once the limit of
698          * documents is reached.
699          * 
700          * @param numberOfDocuments
701          *            The number of documents to allow past this point in the
702          *            pipeline.
703          * @return This builder for chaining method calls.
704          * 
705          * @see <a
706          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit">
707          *      Aggregation Framework Operators - $limit</a>
708          */
709         public Builder limit(final long numberOfDocuments) {
710             return step("$limit", numberOfDocuments);
711         }
712 
713         /**
714          * Adds a <tt>$match</tt> operation to the pipeline to filter documents
715          * passing this point in the pipeline.
716          * <p>
717          * This method may be used with the {@link QueryBuilder} to easily
718          * specify the criteria to match against. <blockquote>
719          * 
720          * <pre>
721          * <code>
722          * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where}
723          * 
724          * Aggregation.Builder builder = new Aggregation.Builder();
725          * 
726          * builder.match( where("f").greaterThan(23).lessThan(42).and("g").lessThan(3) );
727          * ...
728          * </code>
729          * </pre>
730          * 
731          * </blockquote>
732          * </p>
733          * 
734          * @param query
735          *            The query to match documents against.
736          * @return This builder for chaining method calls.
737          * 
738          * @see <a
739          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_match">
740          *      Aggregation Framework Operators - $match</a>
741          */
742         public Builder match(final DocumentAssignable query) {
743             return step("$match", query);
744         }
745 
746         /**
747          * Sets the maximum number of milliseconds to allow the command to run
748          * before aborting the request on the server.
749          * <p>
750          * This method equivalent to {@link #setMaximumTimeMilliseconds(long)
751          * setMaximumTimeMilliseconds(timeLimitUnits.toMillis(timeLimit)}.
752          * </p>
753          * 
754          * @param timeLimit
755          *            The new maximum amount of time to allow the command to
756          *            run.
757          * @param timeLimitUnits
758          *            The units for the maximum amount of time to allow the
759          *            command to run.
760          * 
761          * @return This {@link Builder} for method call chaining.
762          * 
763          * @since MongoDB 2.6
764          */
765         public Builder maximumTime(final long timeLimit,
766                 final TimeUnit timeLimitUnits) {
767             return setMaximumTimeMilliseconds(timeLimitUnits
768                     .toMillis(timeLimit));
769         }
770 
771         /**
772          * Adds a <tt>$out</tt> operation to the pipeline to write all of the
773          * output documents to the specified collection.
774          * <p>
775          * This method also forces the {@link ReadPreference} to be
776          * {@link ReadPreference#PRIMARY}.
777          * </p>
778          * 
779          * @param collectionName
780          *            The name of the collection to output the results to.
781          * @return This builder for chaining method calls.
782          * 
783          * @see <a
784          *      href="http://docs.mongodb.org/master/reference/operator/aggregation/out">Aggregation
785          *      $out Operator</a>
786          */
787         public Builder out(final String collectionName) {
788             setReadPreference(ReadPreference.PRIMARY);
789             return step("$out", collectionName);
790         }
791 
792         /**
793          * Adds a <tt>$project</tt> operation to the pipeline to create a
794          * projection of the documents passing this point in the pipeline.
795          * <p>
796          * This method is intended to be used with the
797          * {@link AggregationProjectFields} and
798          * {@link com.allanbank.mongodb.builder.expression.Expressions
799          * Expressions} static helper methods.
800          * </p>
801          * <blockquote>
802          * 
803          * <pre>
804          * <code>
805          * import static {@link AggregationProjectFields#include com.allanbank.mongodb.builder.AggregationProjectFields.include};
806          * import static {@link com.allanbank.mongodb.builder.expression.Expressions com.allanbank.mongodb.builder.expression.Expressions.*};
807          * 
808          * 
809          * Aggregation.Builder builder = new Aggregation.Builder();
810          * ...
811          * builder.project(
812          *         include("chr", "begin", "end", "calledPloidy"),
813          *         set("window",
814          *             multiply(
815          *                 divide(
816          *                     subtract(
817          *                         field("begin"),
818          *                         mod(field("begin"), constant(interval))),
819          *                     constant(interval)),
820          *                 constant(interval))));
821          * ...
822          * </code>
823          * </pre>
824          * 
825          * </blockquote>
826          * 
827          * @param fields
828          *            The fields to copy into the projected results.
829          * @param elements
830          *            The computed elements based on {@link Expressions}.
831          * @return This builder for chaining method calls.
832          */
833         public Builder project(final AggregationProjectFields fields,
834                 final Element... elements) {
835             final List<IntegerElement> fieldElements = fields.toElements();
836 
837             final List<Element> allElements = new ArrayList<Element>(
838                     fieldElements.size() + elements.length);
839             allElements.addAll(fieldElements);
840             allElements.addAll(Arrays.asList(elements));
841 
842             return step("$project", allElements);
843         }
844 
845         /**
846          * Adds a <tt>$project</tt> operation to the pipeline to create a
847          * projection of the documents passing this point in the pipeline.
848          * 
849          * @param projection
850          *            The specification for the projection to perform.
851          * @return This builder for chaining method calls.
852          */
853         public Builder project(final DocumentAssignable projection) {
854             return step("$project", projection);
855         }
856 
857         /**
858          * Adds a <tt>$redact</tt> operation to potentially prune sub-documents
859          * from the results.
860          * 
861          * @param ifExpression
862          *            The expression to evaluate to determine if the current
863          *            sub-document should be pruned or not.
864          * @param thenOption
865          *            Operation to apply if the {@code ifExpression} evaluates
866          *            to true.
867          * @param elseOption
868          *            Operation to apply if the {@code ifExpression} evaluates
869          *            to false.
870          * @return This builder for chaining method calls.
871          */
872         public Builder redact(final DocumentAssignable ifExpression,
873                 final RedactOption thenOption, final RedactOption elseOption) {
874             myRequiredVersion = Version.later(myRequiredVersion,
875                     REDACT_REQUIRED_VERSION);
876 
877             final DocumentBuilder doc = BuilderFactory.start();
878             doc.push(Expressions.CONDITION)
879                     .add(new DocumentElement("if", ifExpression.asDocument()))
880                     .add("then", thenOption.getToken())
881                     .add("else", elseOption.getToken());
882 
883             return step("$redact", doc);
884         }
885 
886         /**
887          * Adds a <tt>$redact</tt> operation to potentially prune sub-documents
888          * from the results.
889          * 
890          * @param ifExpression
891          *            The expression to evaluate to determine if the current
892          *            sub-document should be pruned or not.
893          * @param thenOption
894          *            Operation to apply if the {@code ifExpression} evaluates
895          *            to true.
896          * @param elseOption
897          *            Operation to apply if the {@code ifExpression} evaluates
898          *            to false.
899          * @return This builder for chaining method calls.
900          */
901         public Builder redact(final Expression ifExpression,
902                 final RedactOption thenOption, final RedactOption elseOption) {
903             myRequiredVersion = Version.later(myRequiredVersion,
904                     REDACT_REQUIRED_VERSION);
905 
906             final DocumentBuilder doc = BuilderFactory.start();
907             doc.push(Expressions.CONDITION).add(ifExpression.toElement("if"))
908                     .add("then", thenOption.getToken())
909                     .add("else", elseOption.getToken());
910 
911             return step("$redact", doc);
912         }
913 
914         /**
915          * Resets the builder back to an empty pipeline.
916          * 
917          * @return This builder for chaining method calls.
918          */
919         public Builder reset() {
920             myPipeline.reset();
921             myReadPreference = null;
922             myMaximumTimeMilliseconds = 0;
923             myBatchSize = 0;
924             myLimit = 0;
925             myUseCursor = false;
926             myAllowDiskUsage = false;
927             myRequiredVersion = REQUIRED_VERSION;
928 
929             return this;
930         }
931 
932         /**
933          * Sets to true if the aggregation command can spill to disk.
934          * <p>
935          * This method also sets the builder to use a cursor to true.
936          * </p>
937          * 
938          * @param allowDiskUsage
939          *            The new value for if the aggregation command can spill to
940          *            disk.
941          * @return This builder for chaining method calls.
942          */
943         public Builder setAllowDiskUsage(final boolean allowDiskUsage) {
944             myRequiredVersion = Version.later(myRequiredVersion,
945                     ALLOW_DISK_USAGE_REQUIRED_VERSION);
946 
947             myAllowDiskUsage = allowDiskUsage;
948             return setUseCursor(true);
949         }
950 
951         /**
952          * Sets the value of the number of documents to be returned in each
953          * batch.
954          * <p>
955          * This method also sets the builder to use a cursor to true.
956          * </p>
957          * 
958          * @param batchSize
959          *            The new value for the number of documents to be returned
960          *            in each batch.
961          * @return This builder for chaining method calls.
962          */
963         public Builder setBatchSize(final int batchSize) {
964             myBatchSize = batchSize;
965             return setUseCursor(true);
966         }
967 
968         /**
969          * Sets the value of the total number of documents to be returned.
970          * <p>
971          * This method also sets the builder to use a cursor to true.
972          * </p>
973          * 
974          * @param limit
975          *            The new value for the total number of documents to be
976          *            returned.
977          * @return This builder for chaining method calls.
978          */
979         public Builder setCusorLimit(final int limit) {
980             myLimit = limit;
981             return setUseCursor(true);
982         }
983 
984         /**
985          * Sets the maximum number of milliseconds to allow the command to run
986          * before aborting the request on the server.
987          * 
988          * @param maximumTimeMilliseconds
989          *            The new maximum number of milliseconds to allow the
990          *            command to run.
991          * @return This {@link Builder} for method call chaining.
992          * 
993          * @since MongoDB 2.6
994          */
995         public Builder setMaximumTimeMilliseconds(
996                 final long maximumTimeMilliseconds) {
997             myRequiredVersion = Version.later(myRequiredVersion,
998                     MAX_TIMEOUT_VERSION);
999 
1000             myMaximumTimeMilliseconds = maximumTimeMilliseconds;
1001             return this;
1002         }
1003 
1004         /**
1005          * Sets the {@link ReadPreference} specifying which servers may be used
1006          * to execute the aggregation.
1007          * <p>
1008          * If not set or set to <code>null</code> then the
1009          * {@link MongoCollection} instance's {@link ReadPreference} will be
1010          * used.
1011          * </p>
1012          * 
1013          * @param readPreference
1014          *            The read preferences specifying which servers may be used.
1015          * @return This builder for chaining method calls.
1016          * 
1017          * @see MongoCollection#getReadPreference()
1018          */
1019         public Builder setReadPreference(final ReadPreference readPreference) {
1020             myReadPreference = readPreference;
1021             return this;
1022         }
1023 
1024         /**
1025          * Sets to true if the aggregation results should be returned as a
1026          * cursor.
1027          * 
1028          * @param useCursor
1029          *            The new value for if the results should be returned via a
1030          *            cursor.
1031          * @return This builder for chaining method calls.
1032          */
1033         public Builder setUseCursor(final boolean useCursor) {
1034             myRequiredVersion = Version
1035                     .later(myRequiredVersion, CURSOR_VERSION);
1036             myUseCursor = useCursor;
1037             return this;
1038         }
1039 
1040         /**
1041          * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified
1042          * number of documents before allowing any document past this point in
1043          * the pipeline.
1044          * 
1045          * @param numberOfDocuments
1046          *            The number of documents to skip past before allowing any
1047          *            documents to pass this point in the pipeline.
1048          * @return This builder for chaining method calls.
1049          * 
1050          * @see <a
1051          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip">
1052          *      Aggregation Framework Operators - $skip</a>
1053          */
1054         public Builder skip(final int numberOfDocuments) {
1055             return step("$skip", numberOfDocuments);
1056         }
1057 
1058         /**
1059          * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified
1060          * number of documents before allowing any document past this point in
1061          * the pipeline.
1062          * 
1063          * @param numberOfDocuments
1064          *            The number of documents to skip past before allowing any
1065          *            documents to pass this point in the pipeline.
1066          * @return This builder for chaining method calls.
1067          * 
1068          * @see <a
1069          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip">
1070          *      Aggregation Framework Operators - $skip</a>
1071          */
1072         public Builder skip(final long numberOfDocuments) {
1073             return step("$skip", numberOfDocuments);
1074         }
1075 
1076         /**
1077          * Adds a <tt>$sort</tt> operation to sort the documents passing this
1078          * point based on the sort specification provided.
1079          * <p>
1080          * This method is intended to be used with the {@link Sort} class's
1081          * static methods: <blockquote>
1082          * 
1083          * <pre>
1084          * <code>
1085          * import static {@link Sort#asc(String) com.allanbank.mongodb.builder.Sort.asc};
1086          * import static {@link Sort#desc(String) com.allanbank.mongodb.builder.Sort.desc};
1087          * 
1088          * Aggregation.Builder builder = new Aggregation.Builder();
1089          * 
1090          * builder.setSort( asc("f"), desc("g") );
1091          * ...
1092          * </code>
1093          * </pre>
1094          * 
1095          * </blockquote>
1096          * 
1097          * @param sortFields
1098          *            The sort fields to use.
1099          * @return This builder for chaining method calls.
1100          * 
1101          * @see <a
1102          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort">
1103          *      Aggregation Framework Operators - $sort</a>
1104          */
1105         public Builder sort(final IntegerElement... sortFields) {
1106             return step("$sort", sortFields);
1107         }
1108 
1109         /**
1110          * Adds a <tt>$sort</tt> operation to sort the documents passing this
1111          * point based on the sort fields provides in ascending order.
1112          * 
1113          * @param sortFields
1114          *            The sort fields to use in ascending order.
1115          * @return This builder for chaining method calls.
1116          * 
1117          * @see <a
1118          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort">
1119          *      Aggregation Framework Operators - $sort</a>
1120          */
1121         public Builder sort(final String... sortFields) {
1122             final IntegerElement[] elements = new IntegerElement[sortFields.length];
1123             for (int i = 0; i < sortFields.length; ++i) {
1124                 elements[i] = Sort.asc(sortFields[i]);
1125             }
1126             return sort(elements);
1127         }
1128 
1129         /**
1130          * Adds a generic step to the builder's pipeline.
1131          * 
1132          * @param operator
1133          *            The operator to add to the pipeline.
1134          * @param stepDocument
1135          *            The document containing the details of the step to apply.
1136          * @return This builder for chaining method calls.
1137          */
1138         public Builder step(final String operator,
1139                 final DocumentAssignable stepDocument) {
1140             myPipeline.push().addDocument(operator, stepDocument.asDocument());
1141             return this;
1142         }
1143 
1144         /**
1145          * Adds a generic step to the builder's pipeline.
1146          * 
1147          * @param operator
1148          *            The operator to add to the pipeline.
1149          * @param value
1150          *            The value for the operator.
1151          * @return This builder for chaining method calls.
1152          */
1153         public Builder step(final String operator, final double value) {
1154             myPipeline.push().addDouble(operator, value);
1155             return this;
1156         }
1157 
1158         /**
1159          * Adds a generic step to the builder's pipeline.
1160          * 
1161          * @param operator
1162          *            The operator to add to the pipeline.
1163          * @param elements
1164          *            The elements containing the details of the step to apply.
1165          * @return This builder for chaining method calls.
1166          */
1167         public Builder step(final String operator, final Element... elements) {
1168             return step(operator, Arrays.asList(elements));
1169         }
1170 
1171         /**
1172          * Adds a generic step to the builder's pipeline.
1173          * 
1174          * @param operator
1175          *            The operator to add to the pipeline.
1176          * @param value
1177          *            The value for the operator.
1178          * @return This builder for chaining method calls.
1179          */
1180         public Builder step(final String operator, final int value) {
1181             myPipeline.push().addInteger(operator, value);
1182             return this;
1183         }
1184 
1185         /**
1186          * Adds a generic step to the builder's pipeline.
1187          * 
1188          * @param operator
1189          *            The operator to add to the pipeline.
1190          * @param elements
1191          *            The elements containing the details of the step to apply.
1192          * @return This builder for chaining method calls.
1193          */
1194         public Builder step(final String operator, final List<Element> elements) {
1195             final DocumentBuilder operatorBuilder = myPipeline.push().push(
1196                     operator);
1197             for (final Element element : elements) {
1198                 operatorBuilder.add(element);
1199             }
1200             return this;
1201         }
1202 
1203         /**
1204          * Adds a generic step to the builder's pipeline.
1205          * 
1206          * @param operator
1207          *            The operator to add to the pipeline.
1208          * @param value
1209          *            The value for the operator.
1210          * @return This builder for chaining method calls.
1211          */
1212         public Builder step(final String operator, final long value) {
1213             myPipeline.push().addLong(operator, value);
1214             return this;
1215         }
1216 
1217         /**
1218          * Adds a generic step to the builder's pipeline.
1219          * 
1220          * @param operator
1221          *            The operator to add to the pipeline.
1222          * @param value
1223          *            The value for the operator.
1224          * @return This builder for chaining method calls.
1225          */
1226         public Builder step(final String operator, final String value) {
1227             myPipeline.push().addString(operator, value);
1228             return this;
1229         }
1230 
1231         /**
1232          * Return the JSON for the current pipeline that would be constructed by
1233          * the builder.
1234          */
1235         @Override
1236         public String toString() {
1237             return new ArrayElement("$pipeline", build().getPipeline())
1238                     .toString();
1239         }
1240 
1241         /**
1242          * Adds a <tt>$unwind</tt> operation generate a document for each
1243          * element of the specified array field with the array replaced with the
1244          * value of the element.
1245          * 
1246          * @param fieldName
1247          *            The name of the array field within the document to unwind.
1248          *            This name must start with a '$'. If it does not a '$' will
1249          *            be prepended to the field name..
1250          * @return This builder for chaining method calls.
1251          * 
1252          * @see <a
1253          *      href="http://docs.mongodb.org/manual/reference/aggregation/#_S_unwind">
1254          *      Aggregation Framework Operators - $unwind</a>
1255          */
1256         public Builder unwind(final String fieldName) {
1257             if (fieldName.startsWith("$")) {
1258                 step("$unwind", fieldName);
1259             }
1260             else {
1261                 step("$unwind", "$" + fieldName);
1262             }
1263             return this;
1264         }
1265 
1266         /**
1267          * Sets that the results should be returned using a cursor.
1268          * <p>
1269          * This method delegates to {@link #setUseCursor(boolean)
1270          * setUseCursor(true)}.
1271          * </p>
1272          * 
1273          * @return This builder for chaining method calls.
1274          */
1275         public Builder useCursor() {
1276             return setUseCursor(true);
1277         }
1278 
1279         /**
1280          * Sets to true if the aggregation results should be returned as a
1281          * cursor.
1282          * <p>
1283          * This method delegates to {@link #setUseCursor(boolean)}.
1284          * </p>
1285          * 
1286          * @param useCursor
1287          *            The new value for if the results should be returned via a
1288          *            cursor.
1289          * @return This builder for chaining method calls.
1290          */
1291         public Builder useCursor(final boolean useCursor) {
1292             return setUseCursor(useCursor);
1293         }
1294     }
1295 }