Coverage Report - com.allanbank.mongodb.builder.BatchedWrite
 
Classes in this File Line Coverage Branch Coverage Complexity
BatchedWrite
99%
185/186
94%
83/88
2.564
BatchedWrite$1
100%
2/2
N/A
2.564
BatchedWrite$Builder
100%
33/33
100%
4/4
2.564
BatchedWrite$Bundle
100%
6/6
N/A
2.564
 
 1  
 /*
 2  
  * #%L
 3  
  * BatchedWrite.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.builder;
 22  
 
 23  
 import java.io.Serializable;
 24  
 import java.util.ArrayList;
 25  
 import java.util.Collections;
 26  
 import java.util.LinkedHashMap;
 27  
 import java.util.LinkedList;
 28  
 import java.util.List;
 29  
 import java.util.Map;
 30  
 import java.util.SortedMap;
 31  
 import java.util.TreeMap;
 32  
 
 33  
 import com.allanbank.mongodb.BatchedAsyncMongoCollection;
 34  
 import com.allanbank.mongodb.Durability;
 35  
 import com.allanbank.mongodb.MongoCollection;
 36  
 import com.allanbank.mongodb.Version;
 37  
 import com.allanbank.mongodb.bson.Document;
 38  
 import com.allanbank.mongodb.bson.DocumentAssignable;
 39  
 import com.allanbank.mongodb.bson.Element;
 40  
 import com.allanbank.mongodb.bson.builder.ArrayBuilder;
 41  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 42  
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 43  
 import com.allanbank.mongodb.bson.impl.EmptyDocument;
 44  
 import com.allanbank.mongodb.builder.write.DeleteOperation;
 45  
 import com.allanbank.mongodb.builder.write.InsertOperation;
 46  
 import com.allanbank.mongodb.builder.write.UpdateOperation;
 47  
 import com.allanbank.mongodb.builder.write.WriteOperation;
 48  
 import com.allanbank.mongodb.builder.write.WriteOperationType;
 49  
 import com.allanbank.mongodb.error.DocumentToLargeException;
 50  
 
 51  
 /**
 52  
  * BatchedWrite provides a container for a group of write operations to be sent
 53  
  * to the server as one group.
 54  
  * <p>
 55  
  * The default mode ({@link BatchedWriteMode#SERIALIZE_AND_CONTINUE}) for this
 56  
  * class is to submit the operations to the server in the order that they are
 57  
  * added to the Builder and to apply as many of the writes as possible (commonly
 58  
  * referred to as continue-on-error). This has the effect of causing the fewest
 59  
  * surprises and optimizing the performance of the writes since the driver can
 60  
  * send multiple distinct writes to the server at once.
 61  
  * </p>
 62  
  * <p>
 63  
  * The {@link BatchedWriteMode#SERIALIZE_AND_STOP} mode also sends each write as
 64  
  * a separate request but instead of attempting all writes the driver will stop
 65  
  * sending requests once one of the writes fails. This also prevents the driver
 66  
  * from sending multiple write messages to the server which can degrade
 67  
  * performance.
 68  
  * </p>
 69  
  * <p>
 70  
  * The last mode, {@link BatchedWriteMode#REORDERED}, may re-order writes to
 71  
  * maximize performance. Similar to the
 72  
  * {@link BatchedWriteMode#SERIALIZE_AND_CONTINUE} this mode will also attempt
 73  
  * all writes. The reordering of writes is across all {@link WriteOperationType}
 74  
  * s.
 75  
  * </p>
 76  
  * <p>
 77  
  * If using a MongoDB server after {@link #REQUIRED_VERSION 2.5.5} a batched
 78  
  * write will result in use of the new write commands.
 79  
  * </p>
 80  
  * <p>
 81  
  * For a more generalized batched write and query capability see the
 82  
  * {@link BatchedAsyncMongoCollection} and {@link MongoCollection#startBatch()}.
 83  
  * </p>
 84  
  * 
 85  
  * @api.yes This class is part of the driver's API. Public and protected members
 86  
  *          will be deprecated for at least 1 non-bugfix release (version
 87  
  *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
 88  
  *          removed or modified.
 89  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 90  
  */
 91  
 public class BatchedWrite implements Serializable {
 92  
 
 93  
     /** The first version of MongoDB to support the {@code aggregate} command. */
 94  1
     public static final Version REQUIRED_VERSION = Version.parse("2.5.5");
 95  
 
 96  
     /** Serialization version for the class. */
 97  
     private static final long serialVersionUID = 6984498574755719178L;
 98  
 
 99  
     /**
 100  
      * Creates a new builder for a {@link BatchedWrite}.
 101  
      * 
 102  
      * @return The builder to construct a {@link BatchedWrite}.
 103  
      */
 104  
     public static Builder builder() {
 105  65
         return new Builder();
 106  
     }
 107  
 
 108  
     /**
 109  
      * Create a batched write with a single delete operation. Users can just use
 110  
      * the {@link MongoCollection#delete} variants and the driver will convert
 111  
      * the deletes to batched writes as appropriate.
 112  
      * <p>
 113  
      * This method avoids the construction of a builder.
 114  
      * </p>
 115  
      * 
 116  
      * @param query
 117  
      *            The query to find the documents to delete.
 118  
      * @param singleDelete
 119  
      *            If true then only a single document will be deleted. If
 120  
      *            running in a sharded environment then this field must be false
 121  
      *            or the query must contain the shard key.
 122  
      * @param durability
 123  
      *            The durability of the delete.
 124  
      * @return The BatchedWrite with the single delete.
 125  
      */
 126  
     public static BatchedWrite delete(final DocumentAssignable query,
 127  
             final boolean singleDelete, final Durability durability) {
 128  1
         final DeleteOperation op = new DeleteOperation(query, singleDelete);
 129  1
         return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
 130  
                 durability);
 131  
     }
 132  
 
 133  
     /**
 134  
      * Create a batched write with a single inserts operation. Users can just
 135  
      * use the {@link MongoCollection#insert} variants and the driver will
 136  
      * convert the inserts to batched writes as appropriate.
 137  
      * <p>
 138  
      * This method avoids the construction of a builder.
 139  
      * </p>
 140  
      * 
 141  
      * @param continueOnError
 142  
      *            If the insert should continue if one of the documents causes
 143  
      *            an error.
 144  
      * @param durability
 145  
      *            The durability for the insert.
 146  
      * @param documents
 147  
      *            The documents to add to the collection.
 148  
      * @return The BatchedWrite with the inserts.
 149  
      */
 150  
     public static BatchedWrite insert(final boolean continueOnError,
 151  
             final Durability durability, final DocumentAssignable... documents) {
 152  1
         final List<WriteOperation> ops = new ArrayList<WriteOperation>(
 153  
                 documents.length);
 154  2
         for (final DocumentAssignable doc : documents) {
 155  1
             ops.add(new InsertOperation(doc));
 156  
         }
 157  1
         return new BatchedWrite(ops,
 158  
                 continueOnError ? BatchedWriteMode.SERIALIZE_AND_CONTINUE
 159  
                         : BatchedWriteMode.SERIALIZE_AND_STOP, durability);
 160  
     }
 161  
 
 162  
     /**
 163  
      * Create a batched write with a single update operation. Users can just use
 164  
      * the {@link MongoCollection#update} variants and the driver will convert
 165  
      * the updates to batched writes as appropriate.
 166  
      * 
 167  
      * @param query
 168  
      *            The query for the update.
 169  
      * @param update
 170  
      *            The update for the update.
 171  
      * @param multiUpdate
 172  
      *            If true then the update will update multiple documents.
 173  
      * @param upsert
 174  
      *            If no document is found then upsert the document.
 175  
      * @param durability
 176  
      *            The durability of the update.
 177  
      * @return The BatchedWrite with the single update.
 178  
      */
 179  
     public static BatchedWrite update(final DocumentAssignable query,
 180  
             final DocumentAssignable update, final boolean multiUpdate,
 181  
             final boolean upsert, final Durability durability) {
 182  1
         final UpdateOperation op = new UpdateOperation(query, update,
 183  
                 multiUpdate, upsert);
 184  1
         return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE,
 185  
                 durability);
 186  
     }
 187  
 
 188  
     /** The durability for the writes. */
 189  
     private final Durability myDurability;
 190  
 
 191  
     /** The mode for submitting the writes to the server. */
 192  
     private final BatchedWriteMode myMode;
 193  
 
 194  
     /** The writes to submit to the server. */
 195  
     private final List<WriteOperation> myWrites;
 196  
 
 197  
     /**
 198  
      * Creates a new BatchedWrite.
 199  
      * 
 200  
      * @param builder
 201  
      *            The builder for the writes.
 202  
      */
 203  72
     protected BatchedWrite(final Builder builder) {
 204  72
         myWrites = Collections.unmodifiableList(new ArrayList<WriteOperation>(
 205  
                 builder.myWrites));
 206  72
         myMode = builder.myMode;
 207  72
         myDurability = builder.myDurability;
 208  72
     }
 209  
 
 210  
     /**
 211  
      * Creates a new BatchedWrite.
 212  
      * 
 213  
      * @param ops
 214  
      *            The operations for the batch.
 215  
      * @param mode
 216  
      *            The mode for the batch.
 217  
      * @param durability
 218  
      *            The durability for the batch.
 219  
      */
 220  
     private BatchedWrite(final List<WriteOperation> ops,
 221  3
             final BatchedWriteMode mode, final Durability durability) {
 222  3
         myWrites = Collections.unmodifiableList(ops);
 223  3
         myMode = mode;
 224  3
         myDurability = durability;
 225  3
     }
 226  
 
 227  
     /**
 228  
      * Creates a new BatchedWrite.
 229  
      * 
 230  
      * @param op
 231  
      *            The single operation for the batch.
 232  
      * @param mode
 233  
      *            The mode for the batch.
 234  
      * @param durability
 235  
      *            The durability for the batch.
 236  
      */
 237  
     private BatchedWrite(final WriteOperation op, final BatchedWriteMode mode,
 238  
             final Durability durability) {
 239  2
         this(Collections.singletonList(op), mode, durability);
 240  2
     }
 241  
 
 242  
     /**
 243  
      * Returns the durability for the writes.
 244  
      * 
 245  
      * @return The durability for the writes.
 246  
      */
 247  
     public Durability getDurability() {
 248  1042
         return myDurability;
 249  
     }
 250  
 
 251  
     /**
 252  
      * Returns the mode for submitting the writes to the server.
 253  
      * 
 254  
      * @return The mode for submitting the writes to the server.
 255  
      */
 256  
     public BatchedWriteMode getMode() {
 257  238
         return myMode;
 258  
     }
 259  
 
 260  
     /**
 261  
      * Returns the writes to submit to the server.
 262  
      * 
 263  
      * @return The writes to submit to the server.
 264  
      */
 265  
     public List<WriteOperation> getWrites() {
 266  79
         return myWrites;
 267  
     }
 268  
 
 269  
     /**
 270  
      * Creates write commands for all of the insert, updates and deletes. The
 271  
      * number and order of the writes is based on the {@link #getMode() mode}.
 272  
      * 
 273  
      * @param collectionName
 274  
      *            The name of the collection the documents will be inserted
 275  
      *            into.
 276  
      * @param maxCommandSize
 277  
      *            The maximum document size.
 278  
      * @param maxOperationsPerBundle
 279  
      *            The maximum number of writes to include in each bundle.
 280  
      * @return The list of command documents to be sent.
 281  
      */
 282  
     public List<Bundle> toBundles(final String collectionName,
 283  
             final long maxCommandSize, final int maxOperationsPerBundle) {
 284  1
         switch (getMode()) {
 285  
         case REORDERED: {
 286  4
             return createOptimized(collectionName, maxCommandSize,
 287  
                     maxOperationsPerBundle);
 288  
         }
 289  
         case SERIALIZE_AND_CONTINUE: {
 290  42
             return createSerialized(collectionName, maxCommandSize,
 291  
                     maxOperationsPerBundle, false);
 292  
         }
 293  
         default: {
 294  9
             return createSerialized(collectionName, maxCommandSize,
 295  
                     maxOperationsPerBundle, true);
 296  
         }
 297  
         }
 298  
     }
 299  
 
 300  
     /**
 301  
      * Adds the document to the array of documents.
 302  
      * 
 303  
      * @param array
 304  
      *            The array to add the operation to.
 305  
      * @param operation
 306  
      *            The operation to add.
 307  
      */
 308  
     private void add(final ArrayBuilder array, final WriteOperation operation) {
 309  1
         switch (operation.getType()) {
 310  
         case INSERT: {
 311  200042
             final InsertOperation insertOperation = (InsertOperation) operation;
 312  
 
 313  200042
             array.add(insertOperation.getDocument());
 314  200042
             break;
 315  
         }
 316  
         case UPDATE: {
 317  400070
             final UpdateOperation updateOperation = (UpdateOperation) operation;
 318  400070
             final DocumentBuilder update = array.push();
 319  
 
 320  400070
             update.add("q", updateOperation.getQuery());
 321  400070
             update.add("u", updateOperation.getUpdate());
 322  400070
             if (updateOperation.isUpsert()) {
 323  100018
                 update.add("upsert", true);
 324  
             }
 325  400070
             if (updateOperation.isMultiUpdate()) {
 326  100010
                 update.add("multi", true);
 327  
             }
 328  
             break;
 329  
         }
 330  
         case DELETE: {
 331  300051
             final DeleteOperation deleteOperation = (DeleteOperation) operation;
 332  300051
             array.push().add("q", deleteOperation.getQuery())
 333  
                     .add("limit", deleteOperation.isSingleDelete() ? 1 : 0);
 334  300051
             break;
 335  
         }
 336  
         }
 337  900163
     }
 338  
 
 339  
     /**
 340  
      * Adds the durability ('writeConcern') to the command document.
 341  
      * 
 342  
      * @param command
 343  
      *            The command document to add the durability to.
 344  
      * @param durability
 345  
      *            The durability to add. May be <code>null</code>.
 346  
      */
 347  
     private void addDurability(final DocumentBuilder command,
 348  
             final Durability durability) {
 349  919
         if (durability != null) {
 350  27
             final DocumentBuilder durabilityDoc = command.push("writeConcern");
 351  27
             if (durability.equals(Durability.NONE)) {
 352  6
                 durabilityDoc.add("w", 0);
 353  
             }
 354  21
             else if (durability.equals(Durability.ACK)) {
 355  18
                 durabilityDoc.add("w", 1);
 356  
             }
 357  
             else {
 358  3
                 boolean first = true;
 359  3
                 for (final Element part : durability.asDocument()) {
 360  9
                     if (first) {
 361  
                         // The first element is "getlasterror".
 362  3
                         first = false;
 363  
                     }
 364  
                     else {
 365  6
                         durabilityDoc.add(part);
 366  
                     }
 367  9
                 }
 368  
             }
 369  
         }
 370  919
     }
 371  
 
 372  
     /**
 373  
      * Creates a {@link DocumentToLargeException} for the operation.
 374  
      * 
 375  
      * @param operation
 376  
      *            The large operation.
 377  
      * @param size
 378  
      *            The size of the operation.
 379  
      * @param maxCommandSize
 380  
      *            The maximum size of the operation.
 381  
      * @return The created exception.
 382  
      */
 383  
     private DocumentToLargeException createDocumentToLargeException(
 384  
             final WriteOperation operation, final int size,
 385  
             final int maxCommandSize) {
 386  
 
 387  4
         Document doc = EmptyDocument.INSTANCE;
 388  
 
 389  4
         switch (operation.getType()) {
 390  
         case INSERT: {
 391  2
             final InsertOperation insertOperation = (InsertOperation) operation;
 392  2
             doc = insertOperation.getDocument();
 393  2
             break;
 394  
         }
 395  
         case UPDATE: {
 396  1
             final UpdateOperation updateOperation = (UpdateOperation) operation;
 397  1
             doc = updateOperation.getQuery();
 398  1
             final Document update = updateOperation.getUpdate();
 399  1
             if (doc.size() < update.size()) {
 400  0
                 doc = update;
 401  
             }
 402  
             break;
 403  
         }
 404  
         case DELETE: {
 405  1
             final DeleteOperation deleteOperation = (DeleteOperation) operation;
 406  1
             doc = deleteOperation.getQuery();
 407  1
             break;
 408  
         }
 409  
         }
 410  
 
 411  4
         return new DocumentToLargeException(size, maxCommandSize, doc);
 412  
     }
 413  
 
 414  
     /**
 415  
      * Reorders the writes into as few write commands as possible.
 416  
      * <p>
 417  
      * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
 418  
      * href=
 419  
      * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
 420  
      * >16K</a>). This is for the command overhead. We don't explicitly use the
 421  
      * overhead but we may end up implicitly using it in the case of a operation
 422  
      * that is just at or below maxCommandSize. For those cases we start the
 423  
      * 'head' map below with the full map. That allows the big operations to be
 424  
      * added to command documents of there own once the command overhead has
 425  
      * been factored in.
 426  
      * </p>
 427  
      * 
 428  
      * @param collectionName
 429  
      *            The name of the collection the documents will be inserted
 430  
      *            into.
 431  
      * @param maxCommandSize
 432  
      *            The maximum document size.
 433  
      * @param maxOperationsPerBundle
 434  
      *            The maximum number of writes to include in each bundle.
 435  
      * @return The list of command documents to be sent.
 436  
      */
 437  
     private List<Bundle> createOptimized(final String collectionName,
 438  
             final long maxCommandSize, final int maxOperationsPerBundle) {
 439  
         // Bucket the operations and sort by size.
 440  
         Map<WriteOperationType, SortedMap<Long, List<WriteOperation>>> operationsBuckets;
 441  4
         operationsBuckets = new LinkedHashMap<WriteOperationType, SortedMap<Long, List<WriteOperation>>>();
 442  4
         for (final WriteOperation writeOp : getWrites()) {
 443  800019
             SortedMap<Long, List<WriteOperation>> operations = operationsBuckets
 444  
                     .get(writeOp.getType());
 445  800019
             if (operations == null) {
 446  10
                 operations = new TreeMap<Long, List<WriteOperation>>();
 447  10
                 operationsBuckets.put(writeOp.getType(), operations);
 448  
             }
 449  
 
 450  800019
             final Long size = Long.valueOf(sizeOf(-1, writeOp));
 451  800019
             List<WriteOperation> list = operations.get(size);
 452  800019
             if (list == null) {
 453  12
                 list = new LinkedList<WriteOperation>();
 454  12
                 operations.put(size, list);
 455  
             }
 456  800019
             list.add(writeOp);
 457  800019
         }
 458  
 
 459  
         // Check if any operation is too big.
 460  4
         final Long maxMessageSize = Long.valueOf(maxCommandSize + 1);
 461  4
         for (final SortedMap<Long, List<WriteOperation>> operations : operationsBuckets
 462  
                 .values()) {
 463  10
             if (!operations.tailMap(maxMessageSize).isEmpty()) {
 464  1
                 final Long biggest = operations.lastKey();
 465  1
                 final List<WriteOperation> operation = operations.get(biggest);
 466  1
                 throw createDocumentToLargeException(operation.get(0),
 467  
                         biggest.intValue(), (int) maxCommandSize);
 468  
             }
 469  9
         }
 470  
 
 471  
         // Now build commands packing the operations into a few messages as
 472  
         // possible.
 473  3
         final List<Bundle> commands = new ArrayList<Bundle>();
 474  3
         final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
 475  
                 Math.min(maxOperationsPerBundle, myWrites.size()));
 476  3
         final DocumentBuilder command = BuilderFactory.start();
 477  3
         for (final Map.Entry<WriteOperationType, SortedMap<Long, List<WriteOperation>>> entry : operationsBuckets
 478  
                 .entrySet()) {
 479  9
             final SortedMap<Long, List<WriteOperation>> operations = entry
 480  
                     .getValue();
 481  818
             while (!operations.isEmpty()) {
 482  809
                 final ArrayBuilder docs = start(entry.getKey(), collectionName,
 483  
                         false, command);
 484  809
                 long remaining = maxCommandSize - command.build().size();
 485  
 
 486  809
                 SortedMap<Long, List<WriteOperation>> head = operations;
 487  809
                 int index = 0;
 488  800827
                 while (!head.isEmpty()
 489  
                         && (bundled.size() < maxOperationsPerBundle)) {
 490  800018
                     final Long biggest = head.lastKey();
 491  800018
                     final List<WriteOperation> bigOps = head.get(biggest);
 492  800018
                     final WriteOperation operation = bigOps.remove(0);
 493  800018
                     if (bigOps.isEmpty()) {
 494  11
                         head.remove(biggest);
 495  
                     }
 496  
 
 497  800018
                     add(docs, operation);
 498  800018
                     bundled.add(operation);
 499  
 
 500  800018
                     remaining -= sizeOf(index, operation);
 501  800018
                     index += 1;
 502  800018
                     head = operations.headMap(Long.valueOf(remaining
 503  
                             - sizeOfIndex(index)));
 504  800018
                 }
 505  
 
 506  809
                 commands.add(new Bundle(command.build(), bundled));
 507  809
                 bundled.clear();
 508  809
             }
 509  9
         }
 510  
 
 511  3
         return commands;
 512  
     }
 513  
 
 514  
     /**
 515  
      * Creates write commands for each sequence of insert, updates and deletes.
 516  
      * <p>
 517  
      * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a
 518  
      * href=
 519  
      * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56"
 520  
      * >16K</a>). This is for the command overhead. We don't explicitly use the
 521  
      * overhead but we may end up using it in the case of a operation that is
 522  
      * just at or below maxCommandSize. That is why we start the 'head' map
 523  
      * below with the full map. That allows those big operations to be added to
 524  
      * commands of there own once the command overhead has been factored in.
 525  
      * </p>
 526  
      * 
 527  
      * @param collectionName
 528  
      *            The name of the collection the documents will be inserted
 529  
      *            into.
 530  
      * @param maxCommandSize
 531  
      *            The maximum document size.
 532  
      * @param stopOnError
 533  
      *            If true then the ordered flag is set to true.
 534  
      * @param maxOperationsPerBundle
 535  
      *            The maximum number of writes to include in each bundle.
 536  
      * @return The list of command documents to be sent.
 537  
      */
 538  
     private List<Bundle> createSerialized(final String collectionName,
 539  
             final long maxCommandSize, final int maxOperationsPerBundle,
 540  
             final boolean stopOnError) {
 541  51
         final List<Bundle> commands = new ArrayList<Bundle>();
 542  51
         final DocumentBuilder command = BuilderFactory.start();
 543  
 
 544  51
         final List<WriteOperation> toSend = getWrites();
 545  51
         final List<WriteOperation> bundled = new ArrayList<WriteOperation>(
 546  
                 Math.min(maxOperationsPerBundle, myWrites.size()));
 547  
 
 548  51
         ArrayBuilder opsArray = null;
 549  51
         WriteOperationType lastType = null;
 550  
 
 551  51
         long remaining = maxCommandSize;
 552  51
         for (final WriteOperation writeOp : toSend) {
 553  100148
             long size = sizeOf(-1, writeOp);
 554  100148
             final long indexSize = sizeOfIndex(bundled.size());
 555  100148
             if (maxCommandSize < size) {
 556  3
                 throw createDocumentToLargeException(writeOp, (int) size,
 557  
                         (int) maxCommandSize);
 558  
             }
 559  100145
             size += indexSize; // Add in the index overhead.
 560  
 
 561  
             // Close a command if change type or too big.
 562  100145
             if (!bundled.isEmpty()
 563  
                     && ((lastType != writeOp.getType())
 564  
                             || ((remaining - size) < 0) || (maxOperationsPerBundle <= bundled
 565  
                             .size()))) {
 566  67
                 commands.add(new Bundle(command.build(), bundled));
 567  67
                 bundled.clear();
 568  
             }
 569  
 
 570  
             // Start a command? - Maybe after closing?
 571  100145
             if (bundled.isEmpty()) {
 572  110
                 opsArray = start(writeOp.getType(), collectionName,
 573  
                         stopOnError, command);
 574  110
                 lastType = writeOp.getType();
 575  110
                 remaining = (maxCommandSize - command.build().size());
 576  
             }
 577  
 
 578  
             // Add the operation.
 579  100145
             add(opsArray, writeOp);
 580  100145
             bundled.add(writeOp);
 581  
 
 582  
             // Remove the size of the operation from the remaining.
 583  100145
             remaining -= size;
 584  100145
         }
 585  
 
 586  48
         if (!bundled.isEmpty()) {
 587  43
             commands.add(new Bundle(command.build(), bundled));
 588  
         }
 589  
 
 590  48
         return commands;
 591  
     }
 592  
 
 593  
     /**
 594  
      * Returns the size of the encoded operation.
 595  
      * <p>
 596  
      * For an {@code InsertOperation} this is the size of the document to
 597  
      * insert.
 598  
      * <p>
 599  
      * For an {@code UpdateOperation} this includes the space for:
 600  
      * <dl>
 601  
      * <dt>Document Overhead</dt>
 602  
      * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
 603  
      * <dt>'q' field</dt>
 604  
      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
 605  
      * <dt>'u' field</dt>
 606  
      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
 607  
      * <dt>'upsert' field</dt>
 608  
      * <dd>name (7 bytes), type (1 byte), value (1 byte)</dd>
 609  
      * <dt>'multi' field</dt>
 610  
      * <dd>name (6 bytes), type (1 byte), value (1 byte)</dd>
 611  
      * </dl>
 612  
      * </p>
 613  
      * <p>
 614  
      * For a {@code DeleteOperation} this includes the space for:
 615  
      * <dl>
 616  
      * <dt>Document Overhead</dt>
 617  
      * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd>
 618  
      * <dt>'q' field</dt>
 619  
      * <dd>name (2 bytes), type (1 byte), value (document size)</dd>
 620  
      * <dt>'limit' field</dt>
 621  
      * <dd>name (6 bytes), type (1 byte), value (4 bytes)</dd>
 622  
      * </dl>
 623  
      * 
 624  
      * @param index
 625  
      *            The index of the operation in the operations array.
 626  
      * @param operation
 627  
      *            The operation to determine the size of.
 628  
      * @return The size of the operation.
 629  
      */
 630  
     private long sizeOf(final int index, final WriteOperation operation) {
 631  1700185
         long result = 0;
 632  1700185
         switch (operation.getType()) {
 633  
         case INSERT: {
 634  300046
             final InsertOperation insertOperation = (InsertOperation) operation;
 635  300046
             result = sizeOfIndex(index) + insertOperation.getDocument().size();
 636  300046
             break;
 637  
         }
 638  
         case UPDATE: {
 639  800081
             final UpdateOperation updateOperation = (UpdateOperation) operation;
 640  800081
             result = sizeOfIndex(index) + updateOperation.getQuery().size()
 641  
                     + updateOperation.getUpdate().size() + 29;
 642  800081
             break;
 643  
         }
 644  
         case DELETE: {
 645  600058
             final DeleteOperation deleteOperation = (DeleteOperation) operation;
 646  600058
             result = sizeOfIndex(index) + deleteOperation.getQuery().size()
 647  
                     + 20;
 648  600058
             break;
 649  
         }
 650  
         }
 651  
 
 652  1700185
         return result;
 653  
     }
 654  
 
 655  
     /**
 656  
      * Returns the number of bytes required to encode the index within the array
 657  
      * element.
 658  
      * 
 659  
      * @param index
 660  
      *            The index to return the size of.
 661  
      * @return The length of the encoded index.
 662  
      */
 663  
     private long sizeOfIndex(final int index) {
 664  
         // For 2.6 the number of items in the array is capped at 1000. This
 665  
         // allows up to 99,999 without resorting to turning the value into
 666  
         // a string which seems like safe enough padding.
 667  2600351
         if (index < 0) {
 668  900167
             return 0; // For estimating operation sizes.
 669  
         }
 670  1700184
         else if (index < 10) {
 671  15394
             return 3; // single character plus a null plus a type.
 672  
         }
 673  1684790
         else if (index < 100) {
 674  144090
             return 4; // two characters plus a null plus a type.
 675  
         }
 676  1540700
         else if (index < 1000) {
 677  1440900
             return 5; // three characters plus a null plus a type.
 678  
         }
 679  99800
         else if (index < 10000) {
 680  9800
             return 6; // four characters plus a null plus a type.
 681  
         }
 682  
 
 683  90000
         return Integer.toString(index).length() + 2;
 684  
     }
 685  
 
 686  
     /**
 687  
      * Starts a new command document.
 688  
      * 
 689  
      * @param operation
 690  
      *            The operation to start.
 691  
      * @param collectionName
 692  
      *            The collection to operate on.
 693  
      * @param stopOnError
 694  
      *            If true then the operations should stop once an error is
 695  
      *            encountered. Is mapped to the {@code ordered} field in the
 696  
      *            command document.
 697  
      * @param command
 698  
      *            The command builder.
 699  
      * @return The {@link ArrayBuilder} for the operations array.
 700  
      */
 701  
     private ArrayBuilder start(final WriteOperationType operation,
 702  
             final String collectionName, final boolean stopOnError,
 703  
             final DocumentBuilder command) {
 704  
 
 705  919
         String commandName = "";
 706  919
         String arrayName = "";
 707  919
         switch (operation) {
 708  
         case INSERT: {
 709  141
             commandName = "insert";
 710  141
             arrayName = "documents";
 711  141
             break;
 712  
         }
 713  
         case UPDATE: {
 714  442
             commandName = "update";
 715  442
             arrayName = "updates";
 716  442
             break;
 717  
         }
 718  
         case DELETE: {
 719  336
             commandName = "delete";
 720  336
             arrayName = "deletes";
 721  
             break;
 722  
         }
 723  
         }
 724  
 
 725  919
         command.reset();
 726  919
         command.add(commandName, collectionName);
 727  919
         if (!stopOnError) {
 728  896
             command.add("ordered", stopOnError);
 729  
         }
 730  919
         addDurability(command, getDurability());
 731  
 
 732  919
         return command.pushArray(arrayName);
 733  
     }
 734  
 
 735  
     /**
 736  
      * Builder for creating {@link BatchedWrite}s.
 737  
      * 
 738  
      * @api.yes This class is part of the driver's API. Public and protected
 739  
      *          members will be deprecated for at least 1 non-bugfix release
 740  
      *          (version numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;)
 741  
      *          before being removed or modified.
 742  
      * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
 743  
      */
 744  
     public static class Builder {
 745  
 
 746  
         /** The durability for the writes. */
 747  
         protected Durability myDurability;
 748  
 
 749  
         /** The mode for submitting the writes to the server. */
 750  
         protected BatchedWriteMode myMode;
 751  
 
 752  
         /** The writes to submit to the server. */
 753  
         protected final List<WriteOperation> myWrites;
 754  
 
 755  
         /**
 756  
          * Creates a new Builder.
 757  
          */
 758  65
         public Builder() {
 759  65
             myWrites = new ArrayList<WriteOperation>();
 760  
 
 761  65
             reset();
 762  65
         }
 763  
 
 764  
         /**
 765  
          * Constructs a new {@link BatchedWrite} object from the state of the
 766  
          * builder.
 767  
          * 
 768  
          * @return The new {@link BatchedWrite} object.
 769  
          */
 770  
         public BatchedWrite build() {
 771  72
             return new BatchedWrite(this);
 772  
         }
 773  
 
 774  
         /**
 775  
          * Update a document based on a query.
 776  
          * <p>
 777  
          * Defaults to deleting as many documents as match the query.
 778  
          * </p>
 779  
          * <p>
 780  
          * This method is delegates to
 781  
          * {@link #delete(DocumentAssignable, boolean) delete(query, false)}
 782  
          * </p>
 783  
          * 
 784  
          * @param query
 785  
          *            The query to find the document to delete.
 786  
          * @return This builder for chaining method calls.
 787  
          */
 788  
         public Builder delete(final DocumentAssignable query) {
 789  100039
             return delete(query, false);
 790  
         }
 791  
 
 792  
         /**
 793  
          * Update a document based on a query.
 794  
          * <p>
 795  
          * Defaults to deleting as many documents as match the query.
 796  
          * </p>
 797  
          * 
 798  
          * @param query
 799  
          *            The query to find the document to delete.
 800  
          * @param singleDelete
 801  
          *            If true then only a single document will be deleted. If
 802  
          *            running in a sharded environment then this field must be
 803  
          *            false or the query must contain the shard key.
 804  
          * @return This builder for chaining method calls.
 805  
          */
 806  
         public Builder delete(final DocumentAssignable query,
 807  
                 final boolean singleDelete) {
 808  300063
             return write(new DeleteOperation(query, singleDelete));
 809  
         }
 810  
 
 811  
         /**
 812  
          * Sets the durability for the writes.
 813  
          * <p>
 814  
          * This method delegates to {@link #setDurability(Durability)}.
 815  
          * </p>
 816  
          * 
 817  
          * @param durability
 818  
          *            The new value for the durability for the writes.
 819  
          * @return This builder for chaining method calls.
 820  
          */
 821  
         public Builder durability(final Durability durability) {
 822  12
             return setDurability(durability);
 823  
         }
 824  
 
 825  
         /**
 826  
          * Returns the durability for the write.
 827  
          * 
 828  
          * @return This durability for the write.
 829  
          */
 830  
         public Durability getDurability() {
 831  19
             return myDurability;
 832  
         }
 833  
 
 834  
         /**
 835  
          * Adds an insert operation to the batched write.
 836  
          * 
 837  
          * @param document
 838  
          *            The document to insert.
 839  
          * @return This builder for chaining method calls.
 840  
          */
 841  
         public Builder insert(final DocumentAssignable document) {
 842  200052
             return write(new InsertOperation(document));
 843  
         }
 844  
 
 845  
         /**
 846  
          * Sets the mode for submitting the writes to the server.
 847  
          * <p>
 848  
          * This method delegates to {@link #setMode(BatchedWriteMode)}.
 849  
          * </p>
 850  
          * 
 851  
          * @param mode
 852  
          *            The new value for the mode for submitting the writes to
 853  
          *            the server.
 854  
          * @return This builder for chaining method calls.
 855  
          */
 856  
         public Builder mode(final BatchedWriteMode mode) {
 857  18
             return setMode(mode);
 858  
         }
 859  
 
 860  
         /**
 861  
          * Resets the builder back to its initial state for reuse.
 862  
          * 
 863  
          * @return This builder for chaining method calls.
 864  
          */
 865  
         public Builder reset() {
 866  93
             myWrites.clear();
 867  93
             myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
 868  93
             myDurability = null;
 869  
 
 870  93
             return this;
 871  
         }
 872  
 
 873  
         /**
 874  
          * Saves the {@code document} to MongoDB.
 875  
          * <p>
 876  
          * If the {@code document} does not contain an {@code _id} field then
 877  
          * this method is equivalent to: {@link #insert(DocumentAssignable)
 878  
          * insert(document)}.
 879  
          * </p>
 880  
          * <p>
 881  
          * If the {@code document} does contain an {@code _id} field then this
 882  
          * method is equivalent to:
 883  
          * {@link #update(DocumentAssignable, DocumentAssignable)
 884  
          * updateAsync(BuilderFactory.start().add(document.get("_id")),
 885  
          * document, false, true)}.
 886  
          * </p>
 887  
          * 
 888  
          * @param document
 889  
          *            The document to save.
 890  
          * @return This builder for chaining method calls.
 891  
          */
 892  
         public Builder save(final DocumentAssignable document) {
 893  18
             final Document doc = document.asDocument();
 894  18
             final Element id = doc.get("_id");
 895  18
             if (id == null) {
 896  9
                 return insert(doc);
 897  
             }
 898  9
             return update(BuilderFactory.start().add(id), doc, false, true);
 899  
         }
 900  
 
 901  
         /**
 902  
          * Sets the durability for the writes.
 903  
          * 
 904  
          * @param durability
 905  
          *            The new value for the durability for the writes.
 906  
          * @return This builder for chaining method calls.
 907  
          */
 908  
         public Builder setDurability(final Durability durability) {
 909  19
             myDurability = durability;
 910  19
             return this;
 911  
         }
 912  
 
 913  
         /**
 914  
          * Sets the mode for submitting the writes to the server.
 915  
          * 
 916  
          * @param mode
 917  
          *            The new value for the mode for submitting the writes to
 918  
          *            the server.
 919  
          * @return This builder for chaining method calls.
 920  
          */
 921  
         public Builder setMode(final BatchedWriteMode mode) {
 922  34
             myMode = mode;
 923  34
             return this;
 924  
         }
 925  
 
 926  
         /**
 927  
          * Sets the writes to submit to the server.
 928  
          * 
 929  
          * @param writes
 930  
          *            The new value for the writes to submit to the server.
 931  
          * @return This builder for chaining method calls.
 932  
          */
 933  
         public Builder setWrites(final List<WriteOperation> writes) {
 934  2
             myWrites.clear();
 935  2
             if (writes != null) {
 936  1
                 myWrites.addAll(writes);
 937  
             }
 938  2
             return this;
 939  
         }
 940  
 
 941  
         /**
 942  
          * Update a document based on a query.
 943  
          * <p>
 944  
          * Defaults to updating a single document and not performing an upsert
 945  
          * if no document is found.
 946  
          * </p>
 947  
          * <p>
 948  
          * This method is delegates to
 949  
          * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean)
 950  
          * update(query, update, false, false)}
 951  
          * </p>
 952  
          * 
 953  
          * @param query
 954  
          *            The query to find the document to update.
 955  
          * @param update
 956  
          *            The update operations to apply to the document.
 957  
          * @return This builder for chaining method calls.
 958  
          */
 959  
         public Builder update(final DocumentAssignable query,
 960  
                 final DocumentAssignable update) {
 961  100040
             return update(query, update, false, false);
 962  
         }
 963  
 
 964  
         /**
 965  
          * Update a document based on a query.
 966  
          * <p>
 967  
          * Defaults to updating a single document and not performing an upsert
 968  
          * if no document is found.
 969  
          * </p>
 970  
          * 
 971  
          * @param query
 972  
          *            The query to find the document to update.
 973  
          * @param update
 974  
          *            The update operations to apply to the document.
 975  
          * @param multiUpdate
 976  
          *            If true then the update is applied to all of the matching
 977  
          *            documents, otherwise only the first document found is
 978  
          *            updated.
 979  
          * @param upsert
 980  
          *            If true then if no document is found then a new document
 981  
          *            is created and updated, otherwise no operation is
 982  
          *            performed.
 983  
          * @return This builder for chaining method calls.
 984  
          */
 985  
         public Builder update(final DocumentAssignable query,
 986  
                 final DocumentAssignable update, final boolean multiUpdate,
 987  
                 final boolean upsert) {
 988  400083
             return write(new UpdateOperation(query, update, multiUpdate, upsert));
 989  
         }
 990  
 
 991  
         /**
 992  
          * Adds a single write to the list of writes to send to the server.
 993  
          * 
 994  
          * @param write
 995  
          *            The write to add to the list of writes to send to the
 996  
          *            server.
 997  
          * @return This builder for chaining method calls.
 998  
          */
 999  
         public Builder write(final WriteOperation write) {
 1000  900199
             myWrites.add(write);
 1001  900199
             return this;
 1002  
         }
 1003  
 
 1004  
         /**
 1005  
          * Sets the writes to submit to the server.
 1006  
          * <p>
 1007  
          * This method delegates to {@link #setWrites(List)}.
 1008  
          * </p>
 1009  
          * 
 1010  
          * @param writes
 1011  
          *            The new value for the writes to submit to the server.
 1012  
          * @return This builder for chaining method calls.
 1013  
          */
 1014  
         public Builder writes(final List<WriteOperation> writes) {
 1015  2
             return setWrites(writes);
 1016  
         }
 1017  
     }
 1018  
 
 1019  
     /**
 1020  
      * Bundle is a container for the write command and the
 1021  
      * {@link WriteOperation} it contains.
 1022  
      * 
 1023  
      * @api.yes This class is part of the driver's API. Public and protected
 1024  
      *          members will be deprecated for at least 1 non-bugfix release
 1025  
      *          (version numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;)
 1026  
      *          before being removed or modified.
 1027  
      */
 1028  
     public static final class Bundle {
 1029  
         /** The command containing the bundled write operations. */
 1030  
         private final Document myCommand;
 1031  
 
 1032  
         /** The writes that are bundled in the command. */
 1033  
         private final List<WriteOperation> myWrites;
 1034  
 
 1035  
         /**
 1036  
          * Creates a new Bundle.
 1037  
          * 
 1038  
          * @param command
 1039  
          *            The command containing the bundled write operations.
 1040  
          * @param writes
 1041  
          *            The writes that are bundled in the command.
 1042  
          */
 1043  
         protected Bundle(final Document command,
 1044  
                 final List<WriteOperation> writes) {
 1045  919
             super();
 1046  919
             myCommand = command;
 1047  919
             myWrites = Collections
 1048  
                     .unmodifiableList(new ArrayList<WriteOperation>(writes));
 1049  919
         }
 1050  
 
 1051  
         /**
 1052  
          * Returns the command containing the bundled write operations.
 1053  
          * 
 1054  
          * @return The command containing the bundled write operations.
 1055  
          */
 1056  
         public Document getCommand() {
 1057  113
             return myCommand;
 1058  
         }
 1059  
 
 1060  
         /**
 1061  
          * Returns the writes that are bundled in the command.
 1062  
          * 
 1063  
          * @return The writes that are bundled in the command.
 1064  
          */
 1065  
         public List<WriteOperation> getWrites() {
 1066  138
             return myWrites;
 1067  
         }
 1068  
     }
 1069  
 }