1 /*
2 * #%L
3 * Aggregate.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.builder;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28
29 import com.allanbank.mongodb.MongoCollection;
30 import com.allanbank.mongodb.ReadPreference;
31 import com.allanbank.mongodb.Version;
32 import com.allanbank.mongodb.bson.DocumentAssignable;
33 import com.allanbank.mongodb.bson.Element;
34 import com.allanbank.mongodb.bson.builder.ArrayBuilder;
35 import com.allanbank.mongodb.bson.builder.BuilderFactory;
36 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
37 import com.allanbank.mongodb.bson.element.ArrayElement;
38 import com.allanbank.mongodb.bson.element.DocumentElement;
39 import com.allanbank.mongodb.bson.element.IntegerElement;
40 import com.allanbank.mongodb.builder.expression.Expression;
41 import com.allanbank.mongodb.builder.expression.Expressions;
42
43 /**
44 * Aggregate provides support for the <tt>aggregate</tt> command supporting a
45 * pipeline of commands to execute.
46 * <p>
47 * Instances of this class are constructed via the inner {@link Builder} class.
48 * Due to the potential complexity of pipelines and the associated operators the
49 * <tt>Builder</tt> is intended to be used with the various support classes
50 * including the {@link Expressions} library. For example:<blockquote>
51 *
52 * <pre>
53 * <code>
54 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
55 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
56 * import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId};
57 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where};
58 * import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc};
59 * import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc};
60 * import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field};
61 * import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set};
62 *
63 * DocumentBuilder b1 = BuilderFactory.start();
64 * DocumentBuilder b2 = BuilderFactory.start();
65 * Aggregate.Builder builder = new Aggregate.Builder();
66 *
67 * builder.match(where("state").notEqualTo("NZ"))
68 * .group(id().addField("state")
69 * .addField("city"),
70 * set("pop").sum("pop"))
71 * .sort(asc("pop"))
72 * .group(id("_id.state"),
73 * set("biggestcity").last("_id.city"),
74 * set("biggestpop").last("pop"),
75 * set("smallestcity").first("_id.city"),
76 * set("smallestpop").first("pop"))
77 * .project(
78 * includeWithoutId(),
79 * set("state", field("_id")),
80 * set("biggestCity",
81 * b1.add(set("name", field("biggestcity"))).add(
82 * set("pop", field("biggestpop")))),
83 * set("smallestCity",
84 * b2.add(set("name", field("smallestcity"))).add(
85 * set("pop", field("smallestpop")))))
86 * .sort(desc("biggestCity.pop"));
87 * </code>
88 * </pre>
89 *
90 * </blockquote>
91 * </p>
92 *
93 *
94 * @see <a
95 * href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example
96 * Inspired By</a>
97 * @api.yes This class is part of the driver's API. Public and protected members
98 * will be deprecated for at least 1 non-bugfix release (version
99 * numbers are <major>.<minor>.<bugfix>) before being
100 * removed or modified.
101 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
102 */
103 public class Aggregate {
104
105 /**
106 * The first version of MongoDB to support the {@code $geoNear} pipeline
107 * operator.
108 */
109 public static final Version ALLOW_DISK_USAGE_REQUIRED_VERSION = Version
110 .parse("2.6");
111
112 /**
113 * The first version of MongoDB to support the {@code aggregate} command
114 * using a cursor.
115 */
116 public static final Version CURSOR_VERSION = Version.parse("2.5.2");
117
118 /**
119 * The first version of MongoDB to support the {@code aggregate} command
120 * with the explain option.
121 */
122 public static final Version EXPLAIN_VERSION = Version.parse("2.5.3");
123
124 /**
125 * The first version of MongoDB to support the {@code $geoNear} pipeline
126 * operator.
127 */
128 public static final Version GEO_NEAR_REQUIRED_VERSION = Version.VERSION_2_4;
129
130 /**
131 * The first version of MongoDB to support the {@code aggregate} command
132 * with the ability to limit the execution time on the server.
133 */
134 public static final Version MAX_TIMEOUT_VERSION = Find.MAX_TIMEOUT_VERSION;
135
136 /**
137 * The first version of MongoDB to support the {@code $redact} pipeline
138 * operator.
139 */
140 public static final Version REDACT_REQUIRED_VERSION = Version
141 .parse("2.5.2");
142
143 /** The first version of MongoDB to support the {@code aggregate} command. */
144 public static final Version REQUIRED_VERSION = Version.parse("2.1.0");
145
146 /**
147 * Creates a new builder for a {@link Aggregate}.
148 *
149 * @return The builder to construct a {@link Aggregate}.
150 */
151 public static Builder builder() {
152 return new Builder();
153 }
154
155 /**
156 * Set to true if the aggregation results should be allowed to spill to
157 * disk.
158 */
159 private final boolean myAllowDiskUsage;
160
161 /** The number of documents to be returned in each batch of results. */
162 private final int myBatchSize;
163
164 /** The total number of documents to be returned. */
165 private final int myLimit;
166
167 /** The maximum amount of time to allow the command to run. */
168 private final long myMaximumTimeMilliseconds;
169
170 /** The pipeline of operations to be applied. */
171 private final List<Element> myPipeline;
172
173 /** The read preference to use. */
174 private final ReadPreference myReadPreference;
175
176 /** The version required for the aggregation. */
177 private final Version myRequiredVersion;
178
179 /** Set to true if the aggregation results should be returned as a cursor. */
180 private final boolean myUseCursor;
181
182 /**
183 * Creates a new Aggregation.
184 *
185 * @param builder
186 * The builder for the Aggregation instance.
187 */
188 protected Aggregate(final Builder builder) {
189 myPipeline = Collections.unmodifiableList(Arrays
190 .asList(builder.myPipeline.build()));
191 myBatchSize = builder.myBatchSize;
192 myLimit = builder.myLimit;
193 myUseCursor = builder.myUseCursor;
194 myAllowDiskUsage = builder.myAllowDiskUsage;
195 myReadPreference = builder.myReadPreference;
196 myRequiredVersion = builder.myRequiredVersion;
197 myMaximumTimeMilliseconds = builder.myMaximumTimeMilliseconds;
198 }
199
200 /**
201 * Returns the number of documents to be returned in each batch of results
202 * by the cursor.
203 *
204 * @return The number of documents to be returned in each batch of results
205 * by the cursor.
206 */
207 public int getBatchSize() {
208 return myBatchSize;
209 }
210
211 /**
212 * Returns the total number of documents to be returned by the cursor.
213 *
214 * @return The total number of documents to be returned the cursor.
215 */
216 public int getCursorLimit() {
217 return myLimit;
218 }
219
220 /**
221 * Returns the maximum amount of time to allow the command to run on the
222 * Server before it is aborted.
223 *
224 * @return The maximum amount of time to allow the command to run on the
225 * Server before it is aborted.
226 *
227 * @since MongoDB 2.6
228 */
229 public long getMaximumTimeMilliseconds() {
230 return myMaximumTimeMilliseconds;
231 }
232
233 /**
234 * Returns the pipeline of operations to apply.
235 *
236 * @return The pipeline of operations to apply.
237 */
238 public List<Element> getPipeline() {
239 return myPipeline;
240 }
241
242 /**
243 * Returns the {@link ReadPreference} specifying which servers may be used
244 * to execute the aggregation.
245 * <p>
246 * If <code>null</code> then the {@link MongoCollection} instance's
247 * {@link ReadPreference} will be used.
248 * </p>
249 *
250 * @return The read preference to use.
251 *
252 * @see MongoCollection#getReadPreference()
253 */
254 public ReadPreference getReadPreference() {
255 return myReadPreference;
256 }
257
258 /**
259 * Returns the version required for the aggregation.
260 *
261 * @return The version required for the aggregation.
262 */
263 public Version getRequiredVersion() {
264 return myRequiredVersion;
265 }
266
267 /**
268 * Returns true if the aggregation results should be allowed to spill to
269 * disk.
270 *
271 * @return True if the aggregation results should be allowed to spill to
272 * disk.
273 */
274 public boolean isAllowDiskUsage() {
275 return myAllowDiskUsage;
276 }
277
278 /**
279 * Returns true if the aggregation results should be returned as a cursor.
280 *
281 * @return True if the aggregation results should be returned as a cursor.
282 */
283 public boolean isUseCursor() {
284 return myUseCursor;
285 }
286
287 /**
288 * Builder provides the ability to construct aggregate command pipelines.
289 * <p>
290 * Methods are provided for all existing pipeline operators and generic
291 * {@link #step} methods are provided to support future pipeline operators
292 * while in development or before the driver is updated.
293 * </p>
294 * <p>
295 * This builder is intended to be used with the various support classes
296 * including the {@link Expressions} library. For example:<blockquote>
297 *
298 * <pre>
299 * <code>
300 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
301 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
302 * import static {@link AggregationProjectFields#includeWithoutId com.allanbank.mongodb.builder.AggregationProjectFields.includeWithoutId};
303 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where};
304 * import static {@link Sort#asc com.allanbank.mongodb.builder.Sort.asc};
305 * import static {@link Sort#desc com.allanbank.mongodb.builder.Sort.desc};
306 * import static {@link Expressions#field com.allanbank.mongodb.builder.expression.Expressions.field};
307 * import static {@link Expressions#set com.allanbank.mongodb.builder.expression.Expressions.set};
308 *
309 * DocumentBuilder b1 = BuilderFactory.start();
310 * DocumentBuilder b2 = BuilderFactory.start();
311 * Aggregation.Builder builder = new Aggregation.Builder();
312 *
313 * builder.match(where("state").notEqualTo("NZ"))
314 * .group(id().addField("state")
315 * .addField("city"),
316 * set("pop").sum("pop"))
317 * .sort(asc("pop"))
318 * .group(id("_id.state"),
319 * set("biggestcity").last("_id.city"),
320 * set("biggestpop").last("pop"),
321 * set("smallestcity").first("_id.city"),
322 * set("smallestpop").first("pop"))
323 * .project(
324 * includeWithoutId(),
325 * set("state", field("_id")),
326 * set("biggestCity",
327 * b1.add(set("name", field("biggestcity"))).add(
328 * set("pop", field("biggestpop")))),
329 * set("smallestCity",
330 * b2.add(set("name", field("smallestcity"))).add(
331 * set("pop", field("smallestpop")))))
332 * .sort(desc("biggestCity.pop"));
333 * </code>
334 * </pre>
335 *
336 * </blockquote>
337 * </p>
338 *
339 * @see <a
340 * href="http://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state">Example
341 * Inspired By</a>
342 * @api.yes This class is part of the driver's API. Public and protected
343 * members will be deprecated for at least 1 non-bugfix release
344 * (version numbers are <major>.<minor>.<bugfix>)
345 * before being removed or modified.
346 * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
347 */
348 public static class Builder {
349
350 /**
351 * Set to true if the aggregation results should be allowed to spill to
352 * disk.
353 */
354 protected boolean myAllowDiskUsage;
355
356 /** The number of documents to be returned in each batch of results. */
357 protected int myBatchSize;
358
359 /** The total number of documents to be returned. */
360 protected int myLimit;
361
362 /** The maximum amount of time to allow the command to run. */
363 protected long myMaximumTimeMilliseconds;
364
365 /** The pipeline of operations to be applied. */
366 protected final ArrayBuilder myPipeline;
367
368 /** The read preference to use. */
369 protected ReadPreference myReadPreference;
370
371 /**
372 * The version required for the aggregation. This is computed
373 * automatically based on the pipeline constructed.
374 */
375 protected Version myRequiredVersion;
376
377 /**
378 * Set to true if the aggregation results should be returned as a
379 * cursor.
380 */
381 protected boolean myUseCursor;
382
383 /**
384 * Creates a new Builder.
385 */
386 public Builder() {
387 myPipeline = BuilderFactory.startArray();
388 reset();
389 }
390
391 /**
392 * Allows the aggregation command can spill to disk.
393 * <p>
394 * This method delegates to {@link #allowDiskUsage(boolean)
395 * allowDiskUsage(true)}.
396 * </p>
397 * <p>
398 * This method also sets the builder to use a cursor to true.
399 * </p>
400 *
401 * @return This builder for chaining method calls.
402 */
403 public Builder allowDiskUsage() {
404 return allowDiskUsage(true);
405 }
406
407 /**
408 * Sets to true if the aggregation command can spill to disk.
409 * <p>
410 * This method delegates to {@link #setAllowDiskUsage(boolean)}.
411 * </p>
412 * <p>
413 * This method also sets the builder to use a cursor to true.
414 * </p>
415 *
416 * @param allowDiskUsage
417 * The new value for if the aggregation command can spill to
418 * disk.
419 * @return This builder for chaining method calls.
420 */
421 public Builder allowDiskUsage(final boolean allowDiskUsage) {
422 return setAllowDiskUsage(allowDiskUsage);
423 }
424
425 /**
426 * Sets the value of the number of documents to be returned in each
427 * batch.
428 * <p>
429 * This method delegates to {@link #setBatchSize(int)}.
430 * </p>
431 * <p>
432 * This method also sets the builder to use a cursor to true.
433 * </p>
434 *
435 * @param batchSize
436 * The new value for the number of documents to be returned
437 * in each batch.
438 * @return This builder for chaining method calls.
439 */
440 public Builder batchSize(final int batchSize) {
441 return setBatchSize(batchSize);
442 }
443
444 /**
445 * Constructs a new {@link Aggregate} object from the state of the
446 * builder.
447 *
448 * @return The new {@link Aggregate} object.
449 */
450 public Aggregate build() {
451 return new Aggregate(this);
452 }
453
454 /**
455 * Sets the value of the total number of documents to be returned.
456 * <p>
457 * This method delegates to {@link #setCusorLimit(int)}.
458 * </p>
459 * <p>
460 * This method also sets the builder to use a cursor to true.
461 * </p>
462 *
463 * @param limit
464 * The new value for the total number of documents to be
465 * returned.
466 * @return This builder for chaining method calls.
467 */
468 public Builder cursorLimit(final int limit) {
469 return setCusorLimit(limit);
470 }
471
472 /**
473 * Adds a <tt>$geoNear</tt> operation to the pipeline to select
474 * documents for the aggregation pipeline based on their relative
475 * location to a set point. The <tt>$geoNear</tt> must be the first
476 * option in the aggregation pipeline. <blockquote>
477 *
478 * <pre>
479 * <code>
480 * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear};
481 *
482 * {@link Aggregate.Builder} builder = new Aggregation.Builder();
483 * builder.geoNear( AggregationGeoNear.builder()
484 * .location( new Point( 1, 2 ) )
485 * .distanceLocationField( "stats.distance" )
486 * .limit( 5 ).build() );
487 * </code>
488 * </pre>
489 *
490 * </blockquote>
491 *
492 * @param geoNear
493 * The options for the GeoNear operation.
494 * @return This builder for chaining method calls.
495 *
496 * @since MongoDB 2.4
497 */
498 public Builder geoNear(final AggregationGeoNear geoNear) {
499 myRequiredVersion = Version.later(myRequiredVersion,
500 GEO_NEAR_REQUIRED_VERSION);
501
502 return step("$geoNear", geoNear.asDocument());
503 }
504
505 /**
506 * Adds a <tt>$geoNear</tt> operation to the pipeline to select
507 * documents for the aggregation pipeline based on their relative
508 * location to a set point. The <tt>$geoNear</tt> must be the first
509 * option in the aggregation pipeline. <blockquote>
510 *
511 * <pre>
512 * <code>
513 * import {@link AggregationGeoNear com.allanbank.mongodb.builder.AggregationGeoNear};
514 *
515 * {@link Aggregate.Builder} builder = new Aggregation.Builder();
516 * builder.geoNear( AggregationGeoNear.builder()
517 * .location( new Point( 1, 2 ) )
518 * .distanceLocationField( "stats.distance" )
519 * .limit( 5 ) );
520 * </code>
521 * </pre>
522 *
523 * </blockquote>
524 *
525 * @param geoNear
526 * The options for the GeoNear operation.
527 * @return This builder for chaining method calls.
528 *
529 * @since MongoDB 2.4
530 */
531 public Builder geoNear(final AggregationGeoNear.Builder geoNear) {
532 return geoNear(geoNear.build());
533 }
534
535 /**
536 * Adds a <tt>$group</tt> operation to the pipeline to aggregate
537 * documents passing this point in the pipeline into a group of
538 * documents.
539 * <p>
540 * This method is intended to construct groups with simple dynamic or
541 * static id documents.
542 * </p>
543 * <blockquote>
544 *
545 * <pre>
546 * <code>
547 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
548 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
549 *
550 * {@link Aggregate.Builder} builder = new Aggregation.Builder();
551 * builder.group(
552 * id("$field1"),
553 * set("resultField1").uniqueValuesOf("$field2"),
554 * set("resultField2").max("$field3"),
555 * set("sum").sum("$field4") );
556 * </code>
557 * </pre>
558 *
559 * </blockquote>
560 *
561 * @param id
562 * The builder for the <tt>_id</tt> field to specify unique
563 * groups.
564 * @param aggregations
565 * The specification for the group id and what fields to
566 * aggregate in the form of a document.
567 * @return This builder for chaining method calls.
568 */
569 public Builder group(final AggregationGroupId id,
570 final AggregationGroupField... aggregations) {
571
572 final Element[] elements = new Element[aggregations.length + 1];
573 elements[0] = id.toElement();
574 for (int i = 0; i < aggregations.length; ++i) {
575 elements[i + 1] = aggregations[i].toElement();
576 }
577
578 return step("$group", elements);
579 }
580
581 /**
582 * Adds a <tt>$group</tt> operation to the pipeline to aggregate
583 * documents passing this point in the pipeline into a group of
584 * documents.
585 * <p>
586 * This method is intended to construct groups with complex dynamic or
587 * static id documents. The {@link AggregationGroupId.Builder}
588 * implements the {@link DocumentBuilder} for construction of arbitrary
589 * complex _id documents.
590 * </p>
591 * <blockquote>
592 *
593 * <pre>
594 * <code>
595 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
596 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
597 *
598 * {@link Aggregate.Builder} builder = new Aggregation.Builder();
599 * builder.group(
600 * id().addField("$field1").addField("$field2"),
601 * set("resultField1").uniqueValuesOf("$field3"),
602 * set("resultField2").first("$field4"),
603 * set("count").count() );
604 * </code>
605 * </pre>
606 *
607 * </blockquote>
608 *
609 * @param id
610 * The builder for the <tt>_id</tt> field to specify unique
611 * groups.
612 * @param aggregations
613 * The specification for the group id and what fields to
614 * aggregate in the form of a document.
615 * @return This builder for chaining method calls.
616 */
617 public Builder group(final AggregationGroupId.Builder id,
618 final AggregationGroupField... aggregations) {
619 return group(id.buildId(), aggregations);
620 }
621
622 /**
623 * Adds a <tt>$group</tt> operation to the pipeline to aggregate
624 * documents passing this point in the pipeline into a group of
625 * documents.
626 *
627 * @param aggregations
628 * The specification for the group id and what fields to
629 * aggregate in the form of a document.
630 * @return This builder for chaining method calls.
631 */
632 public Builder group(final DocumentAssignable aggregations) {
633 return step("$group", aggregations);
634 }
635
636 /**
637 * Adds a <tt>$group</tt> operation to the pipeline to aggregate
638 * documents passing this point in the pipeline into a group of
639 * documents.
640 * <p>
641 * This method is intended to construct groups with complex dynamic or
642 * static id documents. The {@link AggregationGroupId.Builder}
643 * implements the {@link DocumentBuilder} for construction of arbitrary
644 * complex _id documents.
645 * </p>
646 * <blockquote>
647 *
648 * <pre>
649 * <code>
650 * import static {@link AggregationGroupId#id com.allanbank.mongodb.builder.AggregationGroupId.id};
651 * import static {@link AggregationGroupField#set com.allanbank.mongodb.builder.AggregationGroupField.set};
652 *
653 * {@link Aggregate.Builder} builder = new Aggregation.Builder();
654 * builder.group(
655 * id().addInteger("i", 1),
656 * set("resultField1").uniqueValuesOf("$field3"),
657 * set("resultField2").first("$field4"),
658 * set("count").count() );
659 * </code>
660 * </pre>
661 *
662 * </blockquote>
663 *
664 * @param id
665 * The builder for the <tt>_id</tt> field to specify unique
666 * groups.
667 * @param aggregations
668 * The specification for the group id and what fields to
669 * aggregate in the form of a document.
670 * @return This builder for chaining method calls.
671 */
672 public Builder group(final DocumentAssignable id,
673 final AggregationGroupField... aggregations) {
674 return group(new AggregationGroupId(id), aggregations);
675 }
676
677 /**
678 * Adds a <tt>$limit</tt> operation to the pipeline to stop producing
679 * documents passing this point in the pipeline once the limit of
680 * documents is reached.
681 *
682 * @param numberOfDocuments
683 * The number of documents to allow past this point in the
684 * pipeline.
685 * @return This builder for chaining method calls.
686 *
687 * @see <a
688 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit">Aggregation
689 * Framework Operators - $limit</a>
690 */
691 public Builder limit(final int numberOfDocuments) {
692 return step("$limit", numberOfDocuments);
693 }
694
695 /**
696 * Adds a <tt>$limit</tt> operation to the pipeline to stop producing
697 * documents passing this point in the pipeline once the limit of
698 * documents is reached.
699 *
700 * @param numberOfDocuments
701 * The number of documents to allow past this point in the
702 * pipeline.
703 * @return This builder for chaining method calls.
704 *
705 * @see <a
706 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_limit">
707 * Aggregation Framework Operators - $limit</a>
708 */
709 public Builder limit(final long numberOfDocuments) {
710 return step("$limit", numberOfDocuments);
711 }
712
713 /**
714 * Adds a <tt>$match</tt> operation to the pipeline to filter documents
715 * passing this point in the pipeline.
716 * <p>
717 * This method may be used with the {@link QueryBuilder} to easily
718 * specify the criteria to match against. <blockquote>
719 *
720 * <pre>
721 * <code>
722 * import static {@link QueryBuilder#where com.allanbank.mongodb.builder.QueryBuilder.where}
723 *
724 * Aggregation.Builder builder = new Aggregation.Builder();
725 *
726 * builder.match( where("f").greaterThan(23).lessThan(42).and("g").lessThan(3) );
727 * ...
728 * </code>
729 * </pre>
730 *
731 * </blockquote>
732 * </p>
733 *
734 * @param query
735 * The query to match documents against.
736 * @return This builder for chaining method calls.
737 *
738 * @see <a
739 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_match">
740 * Aggregation Framework Operators - $match</a>
741 */
742 public Builder match(final DocumentAssignable query) {
743 return step("$match", query);
744 }
745
746 /**
747 * Sets the maximum number of milliseconds to allow the command to run
748 * before aborting the request on the server.
749 * <p>
750 * This method equivalent to {@link #setMaximumTimeMilliseconds(long)
751 * setMaximumTimeMilliseconds(timeLimitUnits.toMillis(timeLimit)}.
752 * </p>
753 *
754 * @param timeLimit
755 * The new maximum amount of time to allow the command to
756 * run.
757 * @param timeLimitUnits
758 * The units for the maximum amount of time to allow the
759 * command to run.
760 *
761 * @return This {@link Builder} for method call chaining.
762 *
763 * @since MongoDB 2.6
764 */
765 public Builder maximumTime(final long timeLimit,
766 final TimeUnit timeLimitUnits) {
767 return setMaximumTimeMilliseconds(timeLimitUnits
768 .toMillis(timeLimit));
769 }
770
771 /**
772 * Adds a <tt>$out</tt> operation to the pipeline to write all of the
773 * output documents to the specified collection.
774 * <p>
775 * This method also forces the {@link ReadPreference} to be
776 * {@link ReadPreference#PRIMARY}.
777 * </p>
778 *
779 * @param collectionName
780 * The name of the collection to output the results to.
781 * @return This builder for chaining method calls.
782 *
783 * @see <a
784 * href="http://docs.mongodb.org/master/reference/operator/aggregation/out">Aggregation
785 * $out Operator</a>
786 */
787 public Builder out(final String collectionName) {
788 setReadPreference(ReadPreference.PRIMARY);
789 return step("$out", collectionName);
790 }
791
792 /**
793 * Adds a <tt>$project</tt> operation to the pipeline to create a
794 * projection of the documents passing this point in the pipeline.
795 * <p>
796 * This method is intended to be used with the
797 * {@link AggregationProjectFields} and
798 * {@link com.allanbank.mongodb.builder.expression.Expressions
799 * Expressions} static helper methods.
800 * </p>
801 * <blockquote>
802 *
803 * <pre>
804 * <code>
805 * import static {@link AggregationProjectFields#include com.allanbank.mongodb.builder.AggregationProjectFields.include};
806 * import static {@link com.allanbank.mongodb.builder.expression.Expressions com.allanbank.mongodb.builder.expression.Expressions.*};
807 *
808 *
809 * Aggregation.Builder builder = new Aggregation.Builder();
810 * ...
811 * builder.project(
812 * include("chr", "begin", "end", "calledPloidy"),
813 * set("window",
814 * multiply(
815 * divide(
816 * subtract(
817 * field("begin"),
818 * mod(field("begin"), constant(interval))),
819 * constant(interval)),
820 * constant(interval))));
821 * ...
822 * </code>
823 * </pre>
824 *
825 * </blockquote>
826 *
827 * @param fields
828 * The fields to copy into the projected results.
829 * @param elements
830 * The computed elements based on {@link Expressions}.
831 * @return This builder for chaining method calls.
832 */
833 public Builder project(final AggregationProjectFields fields,
834 final Element... elements) {
835 final List<IntegerElement> fieldElements = fields.toElements();
836
837 final List<Element> allElements = new ArrayList<Element>(
838 fieldElements.size() + elements.length);
839 allElements.addAll(fieldElements);
840 allElements.addAll(Arrays.asList(elements));
841
842 return step("$project", allElements);
843 }
844
845 /**
846 * Adds a <tt>$project</tt> operation to the pipeline to create a
847 * projection of the documents passing this point in the pipeline.
848 *
849 * @param projection
850 * The specification for the projection to perform.
851 * @return This builder for chaining method calls.
852 */
853 public Builder project(final DocumentAssignable projection) {
854 return step("$project", projection);
855 }
856
857 /**
858 * Adds a <tt>$redact</tt> operation to potentially prune sub-documents
859 * from the results.
860 *
861 * @param ifExpression
862 * The expression to evaluate to determine if the current
863 * sub-document should be pruned or not.
864 * @param thenOption
865 * Operation to apply if the {@code ifExpression} evaluates
866 * to true.
867 * @param elseOption
868 * Operation to apply if the {@code ifExpression} evaluates
869 * to false.
870 * @return This builder for chaining method calls.
871 */
872 public Builder redact(final DocumentAssignable ifExpression,
873 final RedactOption thenOption, final RedactOption elseOption) {
874 myRequiredVersion = Version.later(myRequiredVersion,
875 REDACT_REQUIRED_VERSION);
876
877 final DocumentBuilder doc = BuilderFactory.start();
878 doc.push(Expressions.CONDITION)
879 .add(new DocumentElement("if", ifExpression.asDocument()))
880 .add("then", thenOption.getToken())
881 .add("else", elseOption.getToken());
882
883 return step("$redact", doc);
884 }
885
886 /**
887 * Adds a <tt>$redact</tt> operation to potentially prune sub-documents
888 * from the results.
889 *
890 * @param ifExpression
891 * The expression to evaluate to determine if the current
892 * sub-document should be pruned or not.
893 * @param thenOption
894 * Operation to apply if the {@code ifExpression} evaluates
895 * to true.
896 * @param elseOption
897 * Operation to apply if the {@code ifExpression} evaluates
898 * to false.
899 * @return This builder for chaining method calls.
900 */
901 public Builder redact(final Expression ifExpression,
902 final RedactOption thenOption, final RedactOption elseOption) {
903 myRequiredVersion = Version.later(myRequiredVersion,
904 REDACT_REQUIRED_VERSION);
905
906 final DocumentBuilder doc = BuilderFactory.start();
907 doc.push(Expressions.CONDITION).add(ifExpression.toElement("if"))
908 .add("then", thenOption.getToken())
909 .add("else", elseOption.getToken());
910
911 return step("$redact", doc);
912 }
913
914 /**
915 * Resets the builder back to an empty pipeline.
916 *
917 * @return This builder for chaining method calls.
918 */
919 public Builder reset() {
920 myPipeline.reset();
921 myReadPreference = null;
922 myMaximumTimeMilliseconds = 0;
923 myBatchSize = 0;
924 myLimit = 0;
925 myUseCursor = false;
926 myAllowDiskUsage = false;
927 myRequiredVersion = REQUIRED_VERSION;
928
929 return this;
930 }
931
932 /**
933 * Sets to true if the aggregation command can spill to disk.
934 * <p>
935 * This method also sets the builder to use a cursor to true.
936 * </p>
937 *
938 * @param allowDiskUsage
939 * The new value for if the aggregation command can spill to
940 * disk.
941 * @return This builder for chaining method calls.
942 */
943 public Builder setAllowDiskUsage(final boolean allowDiskUsage) {
944 myRequiredVersion = Version.later(myRequiredVersion,
945 ALLOW_DISK_USAGE_REQUIRED_VERSION);
946
947 myAllowDiskUsage = allowDiskUsage;
948 return setUseCursor(true);
949 }
950
951 /**
952 * Sets the value of the number of documents to be returned in each
953 * batch.
954 * <p>
955 * This method also sets the builder to use a cursor to true.
956 * </p>
957 *
958 * @param batchSize
959 * The new value for the number of documents to be returned
960 * in each batch.
961 * @return This builder for chaining method calls.
962 */
963 public Builder setBatchSize(final int batchSize) {
964 myBatchSize = batchSize;
965 return setUseCursor(true);
966 }
967
968 /**
969 * Sets the value of the total number of documents to be returned.
970 * <p>
971 * This method also sets the builder to use a cursor to true.
972 * </p>
973 *
974 * @param limit
975 * The new value for the total number of documents to be
976 * returned.
977 * @return This builder for chaining method calls.
978 */
979 public Builder setCusorLimit(final int limit) {
980 myLimit = limit;
981 return setUseCursor(true);
982 }
983
984 /**
985 * Sets the maximum number of milliseconds to allow the command to run
986 * before aborting the request on the server.
987 *
988 * @param maximumTimeMilliseconds
989 * The new maximum number of milliseconds to allow the
990 * command to run.
991 * @return This {@link Builder} for method call chaining.
992 *
993 * @since MongoDB 2.6
994 */
995 public Builder setMaximumTimeMilliseconds(
996 final long maximumTimeMilliseconds) {
997 myRequiredVersion = Version.later(myRequiredVersion,
998 MAX_TIMEOUT_VERSION);
999
1000 myMaximumTimeMilliseconds = maximumTimeMilliseconds;
1001 return this;
1002 }
1003
1004 /**
1005 * Sets the {@link ReadPreference} specifying which servers may be used
1006 * to execute the aggregation.
1007 * <p>
1008 * If not set or set to <code>null</code> then the
1009 * {@link MongoCollection} instance's {@link ReadPreference} will be
1010 * used.
1011 * </p>
1012 *
1013 * @param readPreference
1014 * The read preferences specifying which servers may be used.
1015 * @return This builder for chaining method calls.
1016 *
1017 * @see MongoCollection#getReadPreference()
1018 */
1019 public Builder setReadPreference(final ReadPreference readPreference) {
1020 myReadPreference = readPreference;
1021 return this;
1022 }
1023
1024 /**
1025 * Sets to true if the aggregation results should be returned as a
1026 * cursor.
1027 *
1028 * @param useCursor
1029 * The new value for if the results should be returned via a
1030 * cursor.
1031 * @return This builder for chaining method calls.
1032 */
1033 public Builder setUseCursor(final boolean useCursor) {
1034 myRequiredVersion = Version
1035 .later(myRequiredVersion, CURSOR_VERSION);
1036 myUseCursor = useCursor;
1037 return this;
1038 }
1039
1040 /**
1041 * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified
1042 * number of documents before allowing any document past this point in
1043 * the pipeline.
1044 *
1045 * @param numberOfDocuments
1046 * The number of documents to skip past before allowing any
1047 * documents to pass this point in the pipeline.
1048 * @return This builder for chaining method calls.
1049 *
1050 * @see <a
1051 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip">
1052 * Aggregation Framework Operators - $skip</a>
1053 */
1054 public Builder skip(final int numberOfDocuments) {
1055 return step("$skip", numberOfDocuments);
1056 }
1057
1058 /**
1059 * Adds a <tt>$skip</tt> operation to the pipeline to skip the specified
1060 * number of documents before allowing any document past this point in
1061 * the pipeline.
1062 *
1063 * @param numberOfDocuments
1064 * The number of documents to skip past before allowing any
1065 * documents to pass this point in the pipeline.
1066 * @return This builder for chaining method calls.
1067 *
1068 * @see <a
1069 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_skip">
1070 * Aggregation Framework Operators - $skip</a>
1071 */
1072 public Builder skip(final long numberOfDocuments) {
1073 return step("$skip", numberOfDocuments);
1074 }
1075
1076 /**
1077 * Adds a <tt>$sort</tt> operation to sort the documents passing this
1078 * point based on the sort specification provided.
1079 * <p>
1080 * This method is intended to be used with the {@link Sort} class's
1081 * static methods: <blockquote>
1082 *
1083 * <pre>
1084 * <code>
1085 * import static {@link Sort#asc(String) com.allanbank.mongodb.builder.Sort.asc};
1086 * import static {@link Sort#desc(String) com.allanbank.mongodb.builder.Sort.desc};
1087 *
1088 * Aggregation.Builder builder = new Aggregation.Builder();
1089 *
1090 * builder.setSort( asc("f"), desc("g") );
1091 * ...
1092 * </code>
1093 * </pre>
1094 *
1095 * </blockquote>
1096 *
1097 * @param sortFields
1098 * The sort fields to use.
1099 * @return This builder for chaining method calls.
1100 *
1101 * @see <a
1102 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort">
1103 * Aggregation Framework Operators - $sort</a>
1104 */
1105 public Builder sort(final IntegerElement... sortFields) {
1106 return step("$sort", sortFields);
1107 }
1108
1109 /**
1110 * Adds a <tt>$sort</tt> operation to sort the documents passing this
1111 * point based on the sort fields provides in ascending order.
1112 *
1113 * @param sortFields
1114 * The sort fields to use in ascending order.
1115 * @return This builder for chaining method calls.
1116 *
1117 * @see <a
1118 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_sort">
1119 * Aggregation Framework Operators - $sort</a>
1120 */
1121 public Builder sort(final String... sortFields) {
1122 final IntegerElement[] elements = new IntegerElement[sortFields.length];
1123 for (int i = 0; i < sortFields.length; ++i) {
1124 elements[i] = Sort.asc(sortFields[i]);
1125 }
1126 return sort(elements);
1127 }
1128
1129 /**
1130 * Adds a generic step to the builder's pipeline.
1131 *
1132 * @param operator
1133 * The operator to add to the pipeline.
1134 * @param stepDocument
1135 * The document containing the details of the step to apply.
1136 * @return This builder for chaining method calls.
1137 */
1138 public Builder step(final String operator,
1139 final DocumentAssignable stepDocument) {
1140 myPipeline.push().addDocument(operator, stepDocument.asDocument());
1141 return this;
1142 }
1143
1144 /**
1145 * Adds a generic step to the builder's pipeline.
1146 *
1147 * @param operator
1148 * The operator to add to the pipeline.
1149 * @param value
1150 * The value for the operator.
1151 * @return This builder for chaining method calls.
1152 */
1153 public Builder step(final String operator, final double value) {
1154 myPipeline.push().addDouble(operator, value);
1155 return this;
1156 }
1157
1158 /**
1159 * Adds a generic step to the builder's pipeline.
1160 *
1161 * @param operator
1162 * The operator to add to the pipeline.
1163 * @param elements
1164 * The elements containing the details of the step to apply.
1165 * @return This builder for chaining method calls.
1166 */
1167 public Builder step(final String operator, final Element... elements) {
1168 return step(operator, Arrays.asList(elements));
1169 }
1170
1171 /**
1172 * Adds a generic step to the builder's pipeline.
1173 *
1174 * @param operator
1175 * The operator to add to the pipeline.
1176 * @param value
1177 * The value for the operator.
1178 * @return This builder for chaining method calls.
1179 */
1180 public Builder step(final String operator, final int value) {
1181 myPipeline.push().addInteger(operator, value);
1182 return this;
1183 }
1184
1185 /**
1186 * Adds a generic step to the builder's pipeline.
1187 *
1188 * @param operator
1189 * The operator to add to the pipeline.
1190 * @param elements
1191 * The elements containing the details of the step to apply.
1192 * @return This builder for chaining method calls.
1193 */
1194 public Builder step(final String operator, final List<Element> elements) {
1195 final DocumentBuilder operatorBuilder = myPipeline.push().push(
1196 operator);
1197 for (final Element element : elements) {
1198 operatorBuilder.add(element);
1199 }
1200 return this;
1201 }
1202
1203 /**
1204 * Adds a generic step to the builder's pipeline.
1205 *
1206 * @param operator
1207 * The operator to add to the pipeline.
1208 * @param value
1209 * The value for the operator.
1210 * @return This builder for chaining method calls.
1211 */
1212 public Builder step(final String operator, final long value) {
1213 myPipeline.push().addLong(operator, value);
1214 return this;
1215 }
1216
1217 /**
1218 * Adds a generic step to the builder's pipeline.
1219 *
1220 * @param operator
1221 * The operator to add to the pipeline.
1222 * @param value
1223 * The value for the operator.
1224 * @return This builder for chaining method calls.
1225 */
1226 public Builder step(final String operator, final String value) {
1227 myPipeline.push().addString(operator, value);
1228 return this;
1229 }
1230
1231 /**
1232 * Return the JSON for the current pipeline that would be constructed by
1233 * the builder.
1234 */
1235 @Override
1236 public String toString() {
1237 return new ArrayElement("$pipeline", build().getPipeline())
1238 .toString();
1239 }
1240
1241 /**
1242 * Adds a <tt>$unwind</tt> operation generate a document for each
1243 * element of the specified array field with the array replaced with the
1244 * value of the element.
1245 *
1246 * @param fieldName
1247 * The name of the array field within the document to unwind.
1248 * This name must start with a '$'. If it does not a '$' will
1249 * be prepended to the field name..
1250 * @return This builder for chaining method calls.
1251 *
1252 * @see <a
1253 * href="http://docs.mongodb.org/manual/reference/aggregation/#_S_unwind">
1254 * Aggregation Framework Operators - $unwind</a>
1255 */
1256 public Builder unwind(final String fieldName) {
1257 if (fieldName.startsWith("$")) {
1258 step("$unwind", fieldName);
1259 }
1260 else {
1261 step("$unwind", "$" + fieldName);
1262 }
1263 return this;
1264 }
1265
1266 /**
1267 * Sets that the results should be returned using a cursor.
1268 * <p>
1269 * This method delegates to {@link #setUseCursor(boolean)
1270 * setUseCursor(true)}.
1271 * </p>
1272 *
1273 * @return This builder for chaining method calls.
1274 */
1275 public Builder useCursor() {
1276 return setUseCursor(true);
1277 }
1278
1279 /**
1280 * Sets to true if the aggregation results should be returned as a
1281 * cursor.
1282 * <p>
1283 * This method delegates to {@link #setUseCursor(boolean)}.
1284 * </p>
1285 *
1286 * @param useCursor
1287 * The new value for if the results should be returned via a
1288 * cursor.
1289 * @return This builder for chaining method calls.
1290 */
1291 public Builder useCursor(final boolean useCursor) {
1292 return setUseCursor(useCursor);
1293 }
1294 }
1295 }