View Javadoc
1   /*
2    * #%L
3    * BatchedWrite.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.builder;
22  
23  import java.io.Serializable;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.LinkedHashMap;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.SortedMap;
31  import java.util.TreeMap;
32  
33  import com.allanbank.mongodb.BatchedAsyncMongoCollection;
34  import com.allanbank.mongodb.Durability;
35  import com.allanbank.mongodb.MongoCollection;
36  import com.allanbank.mongodb.Version;
37  import com.allanbank.mongodb.bson.Document;
38  import com.allanbank.mongodb.bson.DocumentAssignable;
39  import com.allanbank.mongodb.bson.Element;
40  import com.allanbank.mongodb.bson.builder.ArrayBuilder;
41  import com.allanbank.mongodb.bson.builder.BuilderFactory;
42  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
43  import com.allanbank.mongodb.bson.impl.EmptyDocument;
44  import com.allanbank.mongodb.builder.write.DeleteOperation;
45  import com.allanbank.mongodb.builder.write.InsertOperation;
46  import com.allanbank.mongodb.builder.write.UpdateOperation;
47  import com.allanbank.mongodb.builder.write.WriteOperation;
48  import com.allanbank.mongodb.builder.write.WriteOperationType;
49  import com.allanbank.mongodb.error.DocumentToLargeException;
50  
51  /**
52   * BatchedWrite provides a container for a group of write operations to be sent
53   * to the server as one group.
54   * <p>
55   * The default mode ({@link BatchedWriteMode#SERIALIZE_AND_CONTINUE}) for this
56   * class is to submit the operations to the server in the order that they are
57   * added to the Builder and to apply as many of the writes as possible (commonly
58   * referred to as continue-on-error). This has the effect of causing the fewest
59   * surprises and optimizing the performance of the writes since the driver can
60   * send multiple distinct writes to the server at once.
61   * </p>
62   * <p>
63   * The {@link BatchedWriteMode#SERIALIZE_AND_STOP} mode also sends each write as
64   * a separate request but instead of attempting all writes the driver will stop
65   * sending requests once one of the writes fails. This also prevents the driver
66   * from sending multiple write messages to the server which can degrade
67   * performance.
68   * </p>
69   * <p>
70   * The last mode, {@link BatchedWriteMode#REORDERED}, may re-order writes to
71   * maximize performance. Similar to the
72   * {@link BatchedWriteMode#SERIALIZE_AND_CONTINUE} this mode will also attempt
73   * all writes. The reordering of writes is across all {@link WriteOperationType}
74   * s.
75   * </p>
76   * <p>
77   * If using a MongoDB server after {@link #REQUIRED_VERSION 2.5.5} a batched
78   * write will result in use of the new write commands.
79   * </p>
80   * <p>
81   * For a more generalized batched write and query capability see the
82   * {@link BatchedAsyncMongoCollection} and {@link MongoCollection#startBatch()}.
83   * </p>
84   * 
85   * @api.yes This class is part of the driver's API. Public and protected members
86   *          will be deprecated for at least 1 non-bugfix release (version
87   *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
88   *          removed or modified.
89   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
90   */
91  public class BatchedWrite implements Serializable {
92  
93      /** The first version of MongoDB to support the {@code aggregate} command. */
94      public static final Version REQUIRED_VERSION = Version.parse("2.5.5");
95  
96      /** Serialization version for the class. */
97      private static final long serialVersionUID = 6984498574755719178L;
98  
99      /**
100      * Creates a new builder for a {@link BatchedWrite}.
101      * 
102      * @return The builder to construct a {@link BatchedWrite}.
103      */
104     public static Builder builder() {
105         return new Builder();
106     }
107 
108     /**
109      * Create a batched write with a single delete operation. Users can just use
110      * the {@link MongoCollection#delete} variants and the driver will convert
111      * the deletes to batched writes as appropriate.
112      * <p>
113      * This method avoids the construction of a builder.
114      * </p>
115      * 
116      * @param query
117      *            The query to find the documents to delete.
118      * @param singleDelete
119      *            If true then only a single document will be deleted. If
120      *            running in a sharded environment then this field must be false
121      *            or the query must contain the shard key.
122      * @param durability
123      *            The durability of the delete.
124      * @return The BatchedWrite with the single delete.
125      */
126     public static BatchedWrite delete(final DocumentAssignable query,
127             final boolean singleDelete, final Durability durability) {
128         final DeleteOperation op = new DeleteOperation(query, singleDelete);
129         return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
130                 durability);
131     }
132 
133     /**
134      * Create a batched write with a single inserts operation. Users can just
135      * use the {@link MongoCollection#insert} variants and the driver will
136      * convert the inserts to batched writes as appropriate.
137      * <p>
138      * This method avoids the construction of a builder.
139      * </p>
140      * 
141      * @param continueOnError
142      *            If the insert should continue if one of the documents causes
143      *            an error.
144      * @param durability
145      *            The durability for the insert.
146      * @param documents
147      *            The documents to add to the collection.
148      * @return The BatchedWrite with the inserts.
149      */
150     public static BatchedWrite insert(final boolean continueOnError,
151             final Durability durability, final DocumentAssignable... documents) {
152         final List<WriteOperation> ops = new ArrayList<WriteOperation>(
153                 documents.length);
154         for (final DocumentAssignable doc : documents) {
155             ops.add(new InsertOperation(doc));
156         }
157         return new BatchedWrite(ops,
158                 continueOnError ? BatchedWriteMode.SERIALIZE_AND_CONTINUE
159                         : BatchedWriteMode.SERIALIZE_AND_STOP, durability);
160     }
161 
162     /**
163      * Create a batched write with a single update operation. Users can just use
164      * the {@link MongoCollection#update} variants and the driver will convert
165      * the updates to batched writes as appropriate.
166      * 
167      * @param query
168      *            The query for the update.
169      * @param update
170      *            The update for the update.
171      * @param multiUpdate
172      *            If true then the update will update multiple documents.
173      * @param upsert
174      *            If no document is found then upsert the document.
175      * @param durability
176      *            The durability of the update.
177      * @return The BatchedWrite with the single update.
178      */
179     public static BatchedWrite update(final DocumentAssignable query,
180             final DocumentAssignable update, final boolean multiUpdate,
181             final boolean upsert, final Durability durability) {
182         final UpdateOperation op = new UpdateOperation(query, update,
183                 multiUpdate, upsert);
184         return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
185                 durability);
186     }
187 
188     /** The durability for the writes. */
189     private final Durability myDurability;
190 
191     /** The mode for submitting the writes to the server. */
192     private final BatchedWriteMode myMode;
193 
194     /** The writes to submit to the server. */
195     private final List<WriteOperation> myWrites;
196 
197     /**
198      * Creates a new BatchedWrite.
199      * 
200      * @param builder
201      *            The builder for the writes.
202      */
203     protected BatchedWrite(final Builder builder) {
204         myWrites = Collections.unmodifiableList(new ArrayList<WriteOperation>(
205                 builder.myWrites));
206         myMode = builder.myMode;
207         myDurability = builder.myDurability;
208     }
209 
210     /**
211      * Creates a new BatchedWrite.
212      * 
213      * @param ops
214      *            The operations for the batch.
215      * @param mode
216      *            The mode for the batch.
217      * @param durability
218      *            The durability for the batch.
219      */
220     private BatchedWrite(final List<WriteOperation> ops,
221             final BatchedWriteMode mode, final Durability durability) {
222         myWrites = Collections.unmodifiableList(ops);
223         myMode = mode;
224         myDurability = durability;
225     }
226 
227     /**
228      * Creates a new BatchedWrite.
229      * 
230      * @param op
231      *            The single operation for the batch.
232      * @param mode
233      *            The mode for the batch.
234      * @param durability
235      *            The durability for the batch.
236      */
237     private BatchedWrite(final WriteOperation op, final BatchedWriteMode mode,
238             final Durability durability) {
239         this(Collections.singletonList(op), mode, durability);
240     }
241 
242     /**
243      * Returns the durability for the writes.
244      * 
245      * @return The durability for the writes.
246      */
247     public Durability getDurability() {
248         return myDurability;
249     }
250 
251     /**
252      * Returns the mode for submitting the writes to the server.
253      * 
254      * @return The mode for submitting the writes to the server.
255      */
256     public BatchedWriteMode getMode() {
257         return myMode;
258     }
259 
260     /**
261      * Returns the writes to submit to the server.
262      * 
263      * @return The writes to submit to the server.
264      */
265     public List<WriteOperation> getWrites() {
266         return myWrites;
267     }
268 
269     /**
270      * Creates write commands for all of the insert, updates and deletes. The
271      * number and order of the writes is based on the {@link #getMode() mode}.
272      * 
273      * @param collectionName
274      *            The name of the collection the documents will be inserted
275      *            into.
276      * @param maxCommandSize
277      *            The maximum document size.
278      * @param maxOperationsPerBundle
279      *            The maximum number of writes to include in each bundle.
280      * @return The list of command documents to be sent.
281      */
282     public List<Bundle> toBundles(final String collectionName,
283             final long maxCommandSize, final int maxOperationsPerBundle) {
284         switch (getMode()) {
285         case REORDERED: {
286             return createOptimized(collectionName, maxCommandSize,
287                     maxOperationsPerBundle);
288         }
289         case SERIALIZE_AND_CONTINUE: {
290             return createSerialized(collectionName, maxCommandSize,
291                     maxOperationsPerBundle, false);
292         }
293         default: {
294             return createSerialized(collectionName, maxCommandSize,
295                     maxOperationsPerBundle, true);
296         }
297         }
298     }
299 
300     /**
301      * Adds the document to the array of documents.
302      * 
303      * @param array
304      *            The array to add the operation to.
305      * @param operation
306      *            The operation to add.
307      */
308     private void add(final ArrayBuilder array, final WriteOperation operation) {
309         switch (operation.getType()) {
310         case INSERT: {
311             final InsertOperation insertOperation = (InsertOperation) operation;
312 
313             array.add(insertOperation.getDocument());
314             break;
315         }
316         case UPDATE: {
317             final UpdateOperation updateOperation = (UpdateOperation) operation;
318             final DocumentBuilder update = array.push();
319 
320             update.add("q", updateOperation.getQuery());
321             update.add("u", updateOperation.getUpdate());
322             if (updateOperation.isUpsert()) {
323                 update.add("upsert", true);
324             }
325             if (updateOperation.isMultiUpdate()) {
326                 update.add("multi", true);
327             }
328             break;
329         }
330         case DELETE: {
331             final DeleteOperation deleteOperation = (DeleteOperation) operation;
332             array.push().add("q", deleteOperation.getQuery())
333                     .add("limit", deleteOperation.isSingleDelete() ? 1 : 0);
334             break;
335         }
336         }
337     }
338 
339     /**
340      * Adds the durability ('writeConcern') to the command document.
341      * 
342      * @param command
343      *            The command document to add the durability to.
344      * @param durability
345      *            The durability to add. May be <code>null</code>.
346      */
347     private void addDurability(final DocumentBuilder command,
348             final Durability durability) {
349         if (durability != null) {
350             final DocumentBuilder durabilityDoc = command.push("writeConcern");
351             if (durability.equals(Durability.NONE)) {
352                 durabilityDoc.add("w", 0);
353             }
354             else if (durability.equals(Durability.ACK)) {
355                 durabilityDoc.add("w", 1);
356             }
357             else {
358                 boolean first = true;
359                 for (final Element part : durability.asDocument()) {
360                     if (first) {
361                         // The first element is "getlasterror".
362                         first = false;
363                     }
364                     else {
365                         durabilityDoc.add(part);
366                     }
367                 }
368             }
369         }
370     }
371 
372     /**
373      * Creates a {@link DocumentToLargeException} for the operation.
374      * 
375      * @param operation
376      *            The large operation.
377      * @param size
378      *            The size of the operation.
379      * @param maxCommandSize
380      *            The maximum size of the operation.
381      * @return The created exception.
382      */
383     private DocumentToLargeException createDocumentToLargeException(
384             final WriteOperation operation, final int size,
385             final int maxCommandSize) {
386 
387         Document doc = EmptyDocument.INSTANCE;
388 
389         switch (operation.getType()) {
390         case INSERT: {
391             final InsertOperation insertOperation = (InsertOperation) operation;
392             doc = insertOperation.getDocument();
393             break;
394         }
395         case UPDATE: {
396             final UpdateOperation updateOperation = (UpdateOperation) operation;
397             doc = updateOperation.getQuery();
398             final Document update = updateOperation.getUpdate();
399             if (doc.size() < update.size()) {
400                 doc = update;
401             }
402             break;
403         }
404         case DELETE: {
405             final DeleteOperation deleteOperation = (DeleteOperation) operation;
406             doc = deleteOperation.getQuery();
407             break;
408         }
409         }
410 
411         return new DocumentToLargeException(size, maxCommandSize, doc);
412     }
413 
414     /**
415      * Reorders the writes into as few write commands as possible.
416      * <p>
417      * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
418      * href=
419      * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
420      * >16K</a>). This is for the command overhead. We don't explicitly use the
421      * overhead but we may end up implicitly using it in the case of a operation
422      * that is just at or below maxCommandSize. For those cases we start the
423      * 'head' map below with the full map. That allows the big operations to be
424      * added to command documents of there own once the command overhead has
425      * been factored in.
426      * </p>
427      * 
428      * @param collectionName
429      *            The name of the collection the documents will be inserted
430      *            into.
431      * @param maxCommandSize
432      *            The maximum document size.
433      * @param maxOperationsPerBundle
434      *            The maximum number of writes to include in each bundle.
435      * @return The list of command documents to be sent.
436      */
437     private List<Bundle> createOptimized(final String collectionName,
438             final long maxCommandSize, final int maxOperationsPerBundle) {
439         // Bucket the operations and sort by size.
440         Map<WriteOperationType, SortedMap<Long, List<WriteOperation>>> operationsBuckets;
441         operationsBuckets = new LinkedHashMap<WriteOperationType, SortedMap<Long, List<WriteOperation>>>();
442         for (final WriteOperation writeOp : getWrites()) {
443             SortedMap<Long, List<WriteOperation>> operations = operationsBuckets
444                     .get(writeOp.getType());
445             if (operations == null) {
446                 operations = new TreeMap<Long, List<WriteOperation>>();
447                 operationsBuckets.put(writeOp.getType(), operations);
448             }
449 
450             final Long size = Long.valueOf(sizeOf(-1, writeOp));
451             List<WriteOperation> list = operations.get(size);
452             if (list == null) {
453                 list = new LinkedList<WriteOperation>();
454                 operations.put(size, list);
455             }
456             list.add(writeOp);
457         }
458 
459         // Check if any operation is too big.
460         final Long maxMessageSize = Long.valueOf(maxCommandSize + 1);
461         for (final SortedMap<Long, List<WriteOperation>> operations : operationsBuckets
462                 .values()) {
463             if (!operations.tailMap(maxMessageSize).isEmpty()) {
464                 final Long biggest = operations.lastKey();
465                 final List<WriteOperation> operation = operations.get(biggest);
466                 throw createDocumentToLargeException(operation.get(0),
467                         biggest.intValue(), (int) maxCommandSize);
468             }
469         }
470 
471         // Now build commands packing the operations into a few messages as
472         // possible.
473         final List<Bundle> commands = new ArrayList<Bundle>();
474         final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
475                 Math.min(maxOperationsPerBundle, myWrites.size()));
476         final DocumentBuilder command = BuilderFactory.start();
477         for (final Map.Entry<WriteOperationType, SortedMap<Long, List<WriteOperation>>> entry : operationsBuckets
478                 .entrySet()) {
479             final SortedMap<Long, List<WriteOperation>> operations = entry
480                     .getValue();
481             while (!operations.isEmpty()) {
482                 final ArrayBuilder docs = start(entry.getKey(), collectionName,
483                         false, command);
484                 long remaining = maxCommandSize - command.build().size();
485 
486                 SortedMap<Long, List<WriteOperation>> head = operations;
487                 int index = 0;
488                 while (!head.isEmpty()
489                         && (bundled.size() < maxOperationsPerBundle)) {
490                     final Long biggest = head.lastKey();
491                     final List<WriteOperation> bigOps = head.get(biggest);
492                     final WriteOperation operation = bigOps.remove(0);
493                     if (bigOps.isEmpty()) {
494                         head.remove(biggest);
495                     }
496 
497                     add(docs, operation);
498                     bundled.add(operation);
499 
500                     remaining -= sizeOf(index, operation);
501                     index += 1;
502                     head = operations.headMap(Long.valueOf(remaining
503                             - sizeOfIndex(index)));
504                 }
505 
506                 commands.add(new Bundle(command.build(), bundled));
507                 bundled.clear();
508             }
509         }
510 
511         return commands;
512     }
513 
514     /**
515      * Creates write commands for each sequence of insert, updates and deletes.
516      * <p>
517      * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
518      * href=
519      * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
520      * >16K</a>). This is for the command overhead. We don't explicitly use the
521      * overhead but we may end up using it in the case of a operation that is
522      * just at or below maxCommandSize. That is why we start the 'head' map
523      * below with the full map. That allows those big operations to be added to
524      * commands of there own once the command overhead has been factored in.
525      * </p>
526      * 
527      * @param collectionName
528      *            The name of the collection the documents will be inserted
529      *            into.
530      * @param maxCommandSize
531      *            The maximum document size.
532      * @param stopOnError
533      *            If true then the ordered flag is set to true.
534      * @param maxOperationsPerBundle
535      *            The maximum number of writes to include in each bundle.
536      * @return The list of command documents to be sent.
537      */
538     private List<Bundle> createSerialized(final String collectionName,
539             final long maxCommandSize, final int maxOperationsPerBundle,
540             final boolean stopOnError) {
541         final List<Bundle> commands = new ArrayList<Bundle>();
542         final DocumentBuilder command = BuilderFactory.start();
543 
544         final List<WriteOperation> toSend = getWrites();
545         final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
546                 Math.min(maxOperationsPerBundle, myWrites.size()));
547 
548         ArrayBuilder opsArray = null;
549         WriteOperationType lastType = null;
550 
551         long remaining = maxCommandSize;
552         for (final WriteOperation writeOp : toSend) {
553             long size = sizeOf(-1, writeOp);
554             final long indexSize = sizeOfIndex(bundled.size());
555             if (maxCommandSize < size) {
556                 throw createDocumentToLargeException(writeOp, (int) size,
557                         (int) maxCommandSize);
558             }
559             size += indexSize; // Add in the index overhead.
560 
561             // Close a command if change type or too big.
562             if (!bundled.isEmpty()
563                     && ((lastType != writeOp.getType())
564                             || ((remaining - size) < 0) || (maxOperationsPerBundle <= bundled
565                             .size()))) {
566                 commands.add(new Bundle(command.build(), bundled));
567                 bundled.clear();
568             }
569 
570             // Start a command? - Maybe after closing?
571             if (bundled.isEmpty()) {
572                 opsArray = start(writeOp.getType(), collectionName,
573                         stopOnError, command);
574                 lastType = writeOp.getType();
575                 remaining = (maxCommandSize - command.build().size());
576             }
577 
578             // Add the operation.
579             add(opsArray, writeOp);
580             bundled.add(writeOp);
581 
582             // Remove the size of the operation from the remaining.
583             remaining -= size;
584         }
585 
586         if (!bundled.isEmpty()) {
587             commands.add(new Bundle(command.build(), bundled));
588         }
589 
590         return commands;
591     }
592 
593     /**
594      * Returns the size of the encoded operation.
595      * <p>
596      * For an {@code InsertOperation} this is the size of the document to
597      * insert.
598      * <p>
599      * For an {@code UpdateOperation} this includes the space for:
600      * <dl>
601      * <dt>Document Overhead</dt>
602      * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
603      * <dt>'q' field</dt>
604      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
605      * <dt>'u' field</dt>
606      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
607      * <dt>'upsert' field</dt>
608      * <dd>name (7 bytes), type (1 byte), value (1 byte)</dd>
609      * <dt>'multi' field</dt>
610      * <dd>name (6 bytes), type (1 byte), value (1 byte)</dd>
611      * </dl>
612      * </p>
613      * <p>
614      * For a {@code DeleteOperation} this includes the space for:
615      * <dl>
616      * <dt>Document Overhead</dt>
617      * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
618      * <dt>'q' field</dt>
619      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
620      * <dt>'limit' field</dt>
621      * <dd>name (6 bytes), type (1 byte), value (4 bytes)</dd>
622      * </dl>
623      * 
624      * @param index
625      *            The index of the operation in the operations array.
626      * @param operation
627      *            The operation to determine the size of.
628      * @return The size of the operation.
629      */
630     private long sizeOf(final int index, final WriteOperation operation) {
631         long result = 0;
632         switch (operation.getType()) {
633         case INSERT: {
634             final InsertOperation insertOperation = (InsertOperation) operation;
635             result = sizeOfIndex(index) + insertOperation.getDocument().size();
636             break;
637         }
638         case UPDATE: {
639             final UpdateOperation updateOperation = (UpdateOperation) operation;
640             result = sizeOfIndex(index) + updateOperation.getQuery().size()
641                     + updateOperation.getUpdate().size() + 29;
642             break;
643         }
644         case DELETE: {
645             final DeleteOperation deleteOperation = (DeleteOperation) operation;
646             result = sizeOfIndex(index) + deleteOperation.getQuery().size()
647                     + 20;
648             break;
649         }
650         }
651 
652         return result;
653     }
654 
655     /**
656      * Returns the number of bytes required to encode the index within the array
657      * element.
658      * 
659      * @param index
660      *            The index to return the size of.
661      * @return The length of the encoded index.
662      */
663     private long sizeOfIndex(final int index) {
664         // For 2.6 the number of items in the array is capped at 1000. This
665         // allows up to 99,999 without resorting to turning the value into
666         // a string which seems like safe enough padding.
667         if (index < 0) {
668             return 0; // For estimating operation sizes.
669         }
670         else if (index < 10) {
671             return 3; // single character plus a null plus a type.
672         }
673         else if (index < 100) {
674             return 4; // two characters plus a null plus a type.
675         }
676         else if (index < 1000) {
677             return 5; // three characters plus a null plus a type.
678         }
679         else if (index < 10000) {
680             return 6; // four characters plus a null plus a type.
681         }
682 
683         return Integer.toString(index).length() + 2;
684     }
685 
686     /**
687      * Starts a new command document.
688      * 
689      * @param operation
690      *            The operation to start.
691      * @param collectionName
692      *            The collection to operate on.
693      * @param stopOnError
694      *            If true then the operations should stop once an error is
695      *            encountered. Is mapped to the {@code ordered} field in the
696      *            command document.
697      * @param command
698      *            The command builder.
699      * @return The {@link ArrayBuilder} for the operations array.
700      */
701     private ArrayBuilder start(final WriteOperationType operation,
702             final String collectionName, final boolean stopOnError,
703             final DocumentBuilder command) {
704 
705         String commandName = "";
706         String arrayName = "";
707         switch (operation) {
708         case INSERT: {
709             commandName = "insert";
710             arrayName = "documents";
711             break;
712         }
713         case UPDATE: {
714             commandName = "update";
715             arrayName = "updates";
716             break;
717         }
718         case DELETE: {
719             commandName = "delete";
720             arrayName = "deletes";
721             break;
722         }
723         }
724 
725         command.reset();
726         command.add(commandName, collectionName);
727         if (!stopOnError) {
728             command.add("ordered", stopOnError);
729         }
730         addDurability(command, getDurability());
731 
732         return command.pushArray(arrayName);
733     }
734 
735     /**
736      * Builder for creating {@link BatchedWrite}s.
737      * 
738      * @api.yes This class is part of the driver's API. Public and protected
739      *          members will be deprecated for at least 1 non-bugfix release
740      *          (version numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;)
741      *          before being removed or modified.
742      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
743      */
744     public static class Builder {
745 
746         /** The durability for the writes. */
747         protected Durability myDurability;
748 
749         /** The mode for submitting the writes to the server. */
750         protected BatchedWriteMode myMode;
751 
752         /** The writes to submit to the server. */
753         protected final List<WriteOperation> myWrites;
754 
755         /**
756          * Creates a new Builder.
757          */
758         public Builder() {
759             myWrites = new ArrayList<WriteOperation>();
760 
761             reset();
762         }
763 
764         /**
765          * Constructs a new {@link BatchedWrite} object from the state of the
766          * builder.
767          * 
768          * @return The new {@link BatchedWrite} object.
769          */
770         public BatchedWrite build() {
771             return new BatchedWrite(this);
772         }
773 
774         /**
775          * Update a document based on a query.
776          * <p>
777          * Defaults to deleting as many documents as match the query.
778          * </p>
779          * <p>
780          * This method is delegates to
781          * {@link #delete(DocumentAssignable, boolean) delete(query, false)}
782          * </p>
783          * 
784          * @param query
785          *            The query to find the document to delete.
786          * @return This builder for chaining method calls.
787          */
788         public Builder delete(final DocumentAssignable query) {
789             return delete(query, false);
790         }
791 
792         /**
793          * Update a document based on a query.
794          * <p>
795          * Defaults to deleting as many documents as match the query.
796          * </p>
797          * 
798          * @param query
799          *            The query to find the document to delete.
800          * @param singleDelete
801          *            If true then only a single document will be deleted. If
802          *            running in a sharded environment then this field must be
803          *            false or the query must contain the shard key.
804          * @return This builder for chaining method calls.
805          */
806         public Builder delete(final DocumentAssignable query,
807                 final boolean singleDelete) {
808             return write(new DeleteOperation(query, singleDelete));
809         }
810 
811         /**
812          * Sets the durability for the writes.
813          * <p>
814          * This method delegates to {@link #setDurability(Durability)}.
815          * </p>
816          * 
817          * @param durability
818          *            The new value for the durability for the writes.
819          * @return This builder for chaining method calls.
820          */
821         public Builder durability(final Durability durability) {
822             return setDurability(durability);
823         }
824 
825         /**
826          * Returns the durability for the write.
827          * 
828          * @return This durability for the write.
829          */
830         public Durability getDurability() {
831             return myDurability;
832         }
833 
834         /**
835          * Adds an insert operation to the batched write.
836          * 
837          * @param document
838          *            The document to insert.
839          * @return This builder for chaining method calls.
840          */
841         public Builder insert(final DocumentAssignable document) {
842             return write(new InsertOperation(document));
843         }
844 
845         /**
846          * Sets the mode for submitting the writes to the server.
847          * <p>
848          * This method delegates to {@link #setMode(BatchedWriteMode)}.
849          * </p>
850          * 
851          * @param mode
852          *            The new value for the mode for submitting the writes to
853          *            the server.
854          * @return This builder for chaining method calls.
855          */
856         public Builder mode(final BatchedWriteMode mode) {
857             return setMode(mode);
858         }
859 
860         /**
861          * Resets the builder back to its initial state for reuse.
862          * 
863          * @return This builder for chaining method calls.
864          */
865         public Builder reset() {
866             myWrites.clear();
867             myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
868             myDurability = null;
869 
870             return this;
871         }
872 
873         /**
874          * Saves the {@code document} to MongoDB.
875          * <p>
876          * If the {@code document} does not contain an {@code _id} field then
877          * this method is equivalent to: {@link #insert(DocumentAssignable)
878          * insert(document)}.
879          * </p>
880          * <p>
881          * If the {@code document} does contain an {@code _id} field then this
882          * method is equivalent to:
883          * {@link #update(DocumentAssignable, DocumentAssignable)
884          * updateAsync(BuilderFactory.start().add(document.get("_id")),
885          * document, false, true)}.
886          * </p>
887          * 
888          * @param document
889          *            The document to save.
890          * @return This builder for chaining method calls.
891          */
892         public Builder save(final DocumentAssignable document) {
893             final Document doc = document.asDocument();
894             final Element id = doc.get("_id");
895             if (id == null) {
896                 return insert(doc);
897             }
898             return update(BuilderFactory.start().add(id), doc, false, true);
899         }
900 
901         /**
902          * Sets the durability for the writes.
903          * 
904          * @param durability
905          *            The new value for the durability for the writes.
906          * @return This builder for chaining method calls.
907          */
908         public Builder setDurability(final Durability durability) {
909             myDurability = durability;
910             return this;
911         }
912 
913         /**
914          * Sets the mode for submitting the writes to the server.
915          * 
916          * @param mode
917          *            The new value for the mode for submitting the writes to
918          *            the server.
919          * @return This builder for chaining method calls.
920          */
921         public Builder setMode(final BatchedWriteMode mode) {
922             myMode = mode;
923             return this;
924         }
925 
926         /**
927          * Sets the writes to submit to the server.
928          * 
929          * @param writes
930          *            The new value for the writes to submit to the server.
931          * @return This builder for chaining method calls.
932          */
933         public Builder setWrites(final List<WriteOperation> writes) {
934             myWrites.clear();
935             if (writes != null) {
936                 myWrites.addAll(writes);
937             }
938             return this;
939         }
940 
941         /**
942          * Update a document based on a query.
943          * <p>
944          * Defaults to updating a single document and not performing an upsert
945          * if no document is found.
946          * </p>
947          * <p>
948          * This method is delegates to
949          * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean)
950          * update(query, update, false, false)}
951          * </p>
952          * 
953          * @param query
954          *            The query to find the document to update.
955          * @param update
956          *            The update operations to apply to the document.
957          * @return This builder for chaining method calls.
958          */
959         public Builder update(final DocumentAssignable query,
960                 final DocumentAssignable update) {
961             return update(query, update, false, false);
962         }
963 
964         /**
965          * Update a document based on a query.
966          * <p>
967          * Defaults to updating a single document and not performing an upsert
968          * if no document is found.
969          * </p>
970          * 
971          * @param query
972          *            The query to find the document to update.
973          * @param update
974          *            The update operations to apply to the document.
975          * @param multiUpdate
976          *            If true then the update is applied to all of the matching
977          *            documents, otherwise only the first document found is
978          *            updated.
979          * @param upsert
980          *            If true then if no document is found then a new document
981          *            is created and updated, otherwise no operation is
982          *            performed.
983          * @return This builder for chaining method calls.
984          */
985         public Builder update(final DocumentAssignable query,
986                 final DocumentAssignable update, final boolean multiUpdate,
987                 final boolean upsert) {
988             return write(new UpdateOperation(query, update, multiUpdate, upsert));
989         }
990 
991         /**
992          * Adds a single write to the list of writes to send to the server.
993          * 
994          * @param write
995          *            The write to add to the list of writes to send to the
996          *            server.
997          * @return This builder for chaining method calls.
998          */
999         public Builder write(final WriteOperation write) {
1000             myWrites.add(write);
1001             return this;
1002         }
1003 
1004         /**
1005          * Sets the writes to submit to the server.
1006          * <p>
1007          * This method delegates to {@link #setWrites(List)}.
1008          * </p>
1009          * 
1010          * @param writes
1011          *            The new value for the writes to submit to the server.
1012          * @return This builder for chaining method calls.
1013          */
1014         public Builder writes(final List<WriteOperation> writes) {
1015             return setWrites(writes);
1016         }
1017     }
1018 
1019     /**
1020      * Bundle is a container for the write command and the
1021      * {@link WriteOperation} it contains.
1022      * 
1023      * @api.yes This class is part of the driver's API. Public and protected
1024      *          members will be deprecated for at least 1 non-bugfix release
1025      *          (version numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;)
1026      *          before being removed or modified.
1027      */
1028     public static final class Bundle {
1029         /** The command containing the bundled write operations. */
1030         private final Document myCommand;
1031 
1032         /** The writes that are bundled in the command. */
1033         private final List<WriteOperation> myWrites;
1034 
1035         /**
1036          * Creates a new Bundle.
1037          * 
1038          * @param command
1039          *            The command containing the bundled write operations.
1040          * @param writes
1041          *            The writes that are bundled in the command.
1042          */
1043         protected Bundle(final Document command,
1044                 final List<WriteOperation> writes) {
1045             super();
1046             myCommand = command;
1047             myWrites = Collections
1048                     .unmodifiableList(new ArrayList<WriteOperation>(writes));
1049         }
1050 
1051         /**
1052          * Returns the command containing the bundled write operations.
1053          * 
1054          * @return The command containing the bundled write operations.
1055          */
1056         public Document getCommand() {
1057             return myCommand;
1058         }
1059 
1060         /**
1061          * Returns the writes that are bundled in the command.
1062          * 
1063          * @return The writes that are bundled in the command.
1064          */
1065         public List<WriteOperation> getWrites() {
1066             return myWrites;
1067         }
1068     }
1069 }