Coverage Report - com.allanbank.mongodb.client.callback.BatchedWriteCallback
Classes in this File Line Coverage Branch Coverage Complexity
  * #%L
  * - mongodb-async-driver - Allanbank Consulting, Inc.
  * %%
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * #L%
 package com.allanbank.mongodb.client.callback;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import com.allanbank.mongodb.Callback;
 import com.allanbank.mongodb.Durability;
 import com.allanbank.mongodb.MongoDbException;
 import com.allanbank.mongodb.bson.Document;
 import com.allanbank.mongodb.bson.Element;
 import com.allanbank.mongodb.bson.NumericElement;
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 import com.allanbank.mongodb.bson.element.ArrayElement;
 import com.allanbank.mongodb.bson.element.DocumentElement;
 import com.allanbank.mongodb.builder.BatchedWrite;
 import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
 import com.allanbank.mongodb.builder.BatchedWriteMode;
 import com.allanbank.mongodb.builder.write.WriteOperation;
 import com.allanbank.mongodb.client.Client;
 import com.allanbank.mongodb.client.message.BatchedWriteCommand;
 import com.allanbank.mongodb.client.message.Command;
 import com.allanbank.mongodb.client.message.Reply;
 import com.allanbank.mongodb.error.BatchedWriteException;
 import com.allanbank.mongodb.util.Assertions;
  * BatchedWriteCallback provides the global callback for the batched writes.
  * This class is <b>NOT</b> part of the drivers API. This class may be
  *         mutated in incompatible ways between any two releases of the driver.
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 public class BatchedWriteCallback extends ReplyLongCallback {
     /** The list of bundles to send. */
     private final List<BatchedWrite.Bundle> myBundles;
     /** The client to send messages with. */
     private Client myClient;
     /** The name of the collection. */
     private final String myCollectionName;
     /** The name of the database. */
     private final String myDatabaseName;
     /** The list of write operations which failed. */
     private final Map<WriteOperation, Throwable> myFailedOperations;
     /** The count of finished bundles or operations. */
     private int myFinished;
     /** The result. */
 79  32
     private long myN = 0;
     /** The list of bundles waiting to be sent to the server. */
     private final List<BatchedWrite.Bundle> myPendingBundles;
     /** The real callback for each operation. */
     private final List<Callback<Reply>> myRealCallbacks;
     /** The list of write operations which have been skipped due to an error. */
     private List<WriteOperation> mySkippedOperations;
     /** The original write operation. */
     private final BatchedWrite myWrite;
      * Creates a new BatchedWriteCallback.
      * @param databaseName
      *            The name of the database.
      * @param collectionName
      *            The name of the collection.
      * @param results
      *            The callback for the final results.
      * @param write
      *            The original write.
      * @param client
      *            The client for sending the bundled write commands.
      * @param bundles
      *            The bundled writes.
     public BatchedWriteCallback(final String databaseName,
             final String collectionName, final Callback<Long> results,
             final BatchedWrite write, final Client client,
             final List<BatchedWrite.Bundle> bundles) {
 113  18
 115  18
         myDatabaseName = databaseName;
 116  18
         myCollectionName = collectionName;
 117  18
         myWrite = write;
 118  18
         myClient = client;
 119  18
         myBundles = Collections
                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
 122  18
         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
 124  18
         myFinished = 0;
 125  18
         myN = 0;
 127  18
         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
 128  18
         mySkippedOperations = null;
 130  18
         myRealCallbacks = Collections.emptyList();
 131  18
      * Creates a new BatchedWriteCallback.
      * @param databaseName
      *            The name of the database.
      * @param collectionName
      *            The name of the collection.
      * @param realCallbacks
      *            The list of callbacks. One for each write.
      * @param write
      *            The original write.
      * @param bundles
      *            The bundled writes.
     public BatchedWriteCallback(final String databaseName,
             final String collectionName,
             final List<Callback<Reply>> realCallbacks,
             final BatchedWrite write, final List<Bundle> bundles) {
 151  14
 153  14
         myDatabaseName = databaseName;
 154  14
         myCollectionName = collectionName;
 155  14
         myWrite = write;
 156  14
         myClient = null;
 157  14
         myBundles = Collections
                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
 160  14
         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
 162  14
         myFinished = 0;
 163  14
         myN = 0;
 165  14
         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
 166  14
         mySkippedOperations = null;
 168  14
         myRealCallbacks = new ArrayList<Callback<Reply>>(realCallbacks);
 170  14
         int count = 0;
 171  14
         for (final Bundle b : myBundles) {
 172  30
             count += b.getWrites().size();
 173  30
 174  14
                 myRealCallbacks.size() == count,
                 "There nust be an operation (" + count
                         + ") in a bundle for each callback ("
                         + myRealCallbacks.size() + ").");
 179  13
      * Sends the next set of operations to the server.
     public void send() {
         List<BatchedWrite.Bundle> toSendBundles;
 187  39
         synchronized (this) {
 188  39
             List<BatchedWrite.Bundle> toSend = myPendingBundles;
 189  39
             if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
 190  14
                 toSend = myPendingBundles.subList(0, 1);
             // Clear toSend before sending so the callbacks see the right
             // state for the bundles.
 195  39
             toSendBundles = new ArrayList<BatchedWrite.Bundle>(toSend);
 196  39
 197  39
         } // Release lock.
         // Release the lock before sending to avoid deadlock in processing
         // replies.
         // Batches....
 203  39
         for (final BatchedWrite.Bundle bundle : toSendBundles) {
 204  68
             final Command commandMsg = new BatchedWriteCommand(myDatabaseName,
                     myCollectionName, bundle);
             // Our documents may be bigger than normally allowed...
 208  68
 210  68
             if (myWrite.getDurability() == Durability.NONE) {
                 // Fake reply.
 212  6
                 final Document doc = BuilderFactory.start().add("ok", 1)
                         .add("n", -1).build();
 214  6
                 final Reply reply = new Reply(0, 0, 0,
                         Collections.singletonList(doc), false, false, false,
 218  6
                 myClient.send(commandMsg, NoOpCallback.NO_OP);
 219  6
                 publish(bundle, reply);
 220  6
             else {
 222  62
                 myClient.send(commandMsg, new BundleCallback(bundle));
 225  68
 227  39
         if ((myWrite.getDurability() == Durability.NONE)
                 && myPendingBundles.isEmpty() && (myForwardCallback != null)) {
 229  1
 231  39
      * Sets the client to use to send the bundled writes.
      * @param client
      *            The new client for the batch.
     public void setClient(final Client client) {
 240  13
         myClient = client;
 241  13
      * Callback for a bundle of write operations sent via the write commands.
      * @param bundle
      *            The bundle of write operations.
      * @param result
      *            The result of the write operations.
     protected synchronized void callback(final Bundle bundle, final Reply result) {
 252  60
         final MongoDbException error = asError(result);
 253  60
         if (error != null) {
             // Everything failed...
 255  4
             exception(bundle, error);
         else {
 258  56
             myFinished += 1;
 259  56
             myN += convert(result).longValue();
             // Want to run both the durability and write failure so just | here.
 262  56
             final boolean failed = failedDurability(bundle, result)
                     | failedWrites(bundle, result);
 265  56
             publish(bundle, result);
 267  56
             if (failed) {
 268  4
 270  52
             else if (!myPendingBundles.isEmpty()) {
 271  7
 273  45
             else if (myFinished == myBundles.size()) {
 274  21
 277  60
      * Callback for a bundle of write operations sent via the write commands has
      * failed.
      * @param bundle
      *            The bundle of write operations.
      * @param thrown
      *            The error for the operations.
     protected synchronized void exception(final Bundle bundle,
             final Throwable thrown) {
 290  5
         myFinished += 1;
 291  5
         for (final WriteOperation operation : bundle.getWrites()) {
 292  5
             myFailedOperations.put(operation, thrown);
 293  5
         // No need to check if we have to send. Would have already sent all of
         // the operations if not SERIALIZE_AND_STOP.
 297  5
         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 298  1
 300  4
         else if (myFinished == myBundles.size()) {
 301  2
 303  5
      * Checks for a failure in the durability requirements (e.g., did not
      * replicate to sufficient servers within the timeout) and updates the
      * failed operations map if any are found.
      * @param bundle
      *            The bundle for the reply.
      * @param reply
      *            The reply from the server.
      * @return True if there are failed writes and we should not send any
      *         additional requests.
     private boolean failedDurability(final Bundle bundle, final Reply reply) {
 318  56
         final List<Document> results = reply.getResults();
 319  56
         if (results.size() == 1) {
 320  56
             final Document doc = results.get(0);
 321  56
             final DocumentElement error = doc.get(DocumentElement.class,
 323  56
             if (error != null) {
 324  2
                 final int code = toInt(error.get(NumericElement.class, "code"));
 325  2
                 final String errmsg = asString(error.get(Element.class,
 327  2
                 final MongoDbException exception = asError(reply, 0, code,
                         true, errmsg, null);
 329  2
                 for (final WriteOperation op : bundle.getWrites()) {
 330  2
                     myFailedOperations.put(op, exception);
 331  2
 335  56
         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
                 && !myFailedOperations.isEmpty();
      * Checks for individual {@code writeErrors} and updates the failed
      * operations map if any are found.
      * @param bundle
      *            The bundle for the reply.
      * @param reply
      *            The reply from the server.
      * @return True if there are failed writes and we should not send any
      *         additional requests.
     private boolean failedWrites(final Bundle bundle, final Reply reply) {
 351  56
         final List<Document> results = reply.getResults();
 352  56
         if (results.size() == 1) {
 353  56
             final Document doc = results.get(0);
 354  56
             final ArrayElement errors = doc.get(ArrayElement.class,
 356  56
             if (errors != null) {
 357  8
                 final List<WriteOperation> operations = bundle.getWrites();
 358  8
                 for (final DocumentElement error : errors.find(
                         DocumentElement.class, ".*")) {
 360  8
                     final int index = toInt(error.get(NumericElement.class,
 362  8
                     final int code = toInt(error.get(NumericElement.class,
 364  8
                     final String errmsg = asString(error.get(Element.class,
 367  8
                     if ((0 <= index) && (index < operations.size())) {
 368  6
                         final WriteOperation op = operations.get(index);
 370  6
                                 asError(reply, 0, code, false, errmsg, null));
 373  6
                         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 374  3
                             mySkippedOperations = new ArrayList<WriteOperation>();
 375  3
                                     index + 1, operations.size()));
 379  8
 383  56
         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
                 && !myFailedOperations.isEmpty();
      * Publishes the results for an individual bundle.
      * @param bundle
      *            The bundle that we received the results for.
      * @param reply
      *            The received reply.
     private void publish(final Bundle bundle, final Reply reply) {
 396  62
         if (myForwardCallback == null) {
             // Publish to each callback.
 398  26
             int index = 0;
 399  26
             for (final Bundle b : myBundles) {
 400  44
                 final List<WriteOperation> writes = b.getWrites();
 401  44
                 final int count = writes.size();
                 // Bundles can compare logically the same but still be
                 // different.
 405  44
                 if (b == bundle) {
 406  60
                     for (int i = 0; i < count; ++i) {
                         // Replace the callback to avoid double calls.
 408  34
                         final Throwable t = myFailedOperations.get(writes
 410  34
                         final Callback<Reply> cb = myRealCallbacks.set(index
                                 + i, NoOpCallback.NO_OP);
 412  34
                         if (cb != null) {
 413  32
                             if (t == null) {
                                 // Worked
 415  29
                             else {
 418  3
 422  26
                     break; // for(Bundle)
 425  18
                 index += count;
 426  18
 428  62
      * Publishes the final results.
     private void publishResults() {
 434  28
         if (myFailedOperations.isEmpty()) {
 435  15
             if (myForwardCallback != null) {
 436  6
             // If there are no failures then all of the real call-backs have
             // already been triggered.
         else {
 442  13
             if (mySkippedOperations == null) {
 443  10
                 mySkippedOperations = new ArrayList<WriteOperation>();
 445  13
             for (final Bundle pending : myPendingBundles) {
 446  5
 447  5
 449  13
             if (myForwardCallback != null) {
                 // If there is only 1 operation and it failed then just publish
                 // that error.
 452  10
                 if ((myBundles.size() == 1)
                         && (myBundles.get(0).getWrites().size() == 1)
                         && (myFailedOperations.size() == 1)) {
 455  1
                 else {
 459  9
                     myForwardCallback.exception(new BatchedWriteException(
                             myWrite, myN, mySkippedOperations,
             else {
                 // Publish to each callback.
 466  3
                 final List<WriteOperation> emptySkipped = Collections
 468  3
                 final Map<WriteOperation, Throwable> emptyError = Collections
                 // For fast lookup and lookup by identity.
 472  3
                 final Set<WriteOperation> skipped = Collections
                         .newSetFromMap(new IdentityHashMap<WriteOperation, Boolean>());
 474  3
 476  3
                 final Document doc = BuilderFactory.start().add("ok", 1)
                         .add("n", myN).build();
 478  3
                 final Reply reply = new Reply(0, 0, 0,
                         Collections.singletonList(doc), false, false, false,
 482  3
                 int index = 0;
 483  3
                 for (final Bundle b : myBundles) {
 484  7
                     for (final WriteOperation op : b.getWrites()) {
 485  7
                         final Callback<Reply> cb = myRealCallbacks.get(index);
 487  7
                         if (cb != null) {
                             // Did this write fail?
 489  7
                             final Throwable thrown = myFailedOperations.get(op);
 490  7
                             if (thrown != null) {
 491  3
                                 cb.exception(new BatchedWriteException(myWrite,
                                         myN, emptySkipped, Collections
                                                 .singletonMap(op, thrown)));
 495  4
                             else if (skipped.contains(op)) {
                                 // Skipped the write.
 497  1
                                 cb.exception(new BatchedWriteException(myWrite,
                                         myN, Collections.singletonList(op),
                             else {
                                 // Worked
 503  3
                         // Next...
 508  7
                         index += 1;
 509  7
 510  7
 513  28
      * BundleCallback provides the callback for a single batched write.
      * This class is <b>NOT</b> part of the drivers API. This class may
      *         be mutated in incompatible ways between any two releases of the
      *         driver.
      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 523  57
     /* package */class BundleCallback implements ReplyCallback {
          * The bundle of operations this callback is waiting for the reply from.
         private final Bundle myBundle;
          * Creates a new BatchedWriteBundleCallback.
          * @param bundle
          *            The bundle of operations this callback is waiting for the
          *            reply from.
 537  62
         public BundleCallback(final Bundle bundle) {
 538  62
             myBundle = bundle;
 539  62
          * {@inheritDoc}
          * <p>
          * Overridden to forward the results to the parent callback.
          * </p>
         public void callback(final Reply result) {
 549  57
             BatchedWriteCallback.this.callback(myBundle, result);
 550  57
          * {@inheritDoc}
          * <p>
          * Overridden to forward the error to the parent callback.
          * </p>
         public void exception(final Throwable thrown) {
 560  1
             BatchedWriteCallback.this.exception(myBundle, thrown);
 561  1
          * {@inheritDoc}
          * <p>
          * Overridden to return false.
          * </p>
         public boolean isLightWeight() {
 571  1
             return false;