Coverage Report - com.allanbank.mongodb.client.callback.BatchedWriteCallback
 
Classes in this File Line Coverage Branch Coverage Complexity
BatchedWriteCallback
100%
158/158
92%
87/94
4.143
BatchedWriteCallback$BundleCallback
100%
9/9
N/A
4.143
 
 1  
 /*
 2  
  * #%L
 3  
  * BatchedWriteCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.callback;
 22  
 
 23  
 import java.util.ArrayList;
 24  
 import java.util.Collections;
 25  
 import java.util.IdentityHashMap;
 26  
 import java.util.LinkedList;
 27  
 import java.util.List;
 28  
 import java.util.Map;
 29  
 import java.util.Set;
 30  
 
 31  
 import com.allanbank.mongodb.Callback;
 32  
 import com.allanbank.mongodb.Durability;
 33  
 import com.allanbank.mongodb.MongoDbException;
 34  
 import com.allanbank.mongodb.bson.Document;
 35  
 import com.allanbank.mongodb.bson.Element;
 36  
 import com.allanbank.mongodb.bson.NumericElement;
 37  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 38  
 import com.allanbank.mongodb.bson.element.ArrayElement;
 39  
 import com.allanbank.mongodb.bson.element.DocumentElement;
 40  
 import com.allanbank.mongodb.builder.BatchedWrite;
 41  
 import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
 42  
 import com.allanbank.mongodb.builder.BatchedWriteMode;
 43  
 import com.allanbank.mongodb.builder.write.WriteOperation;
 44  
 import com.allanbank.mongodb.client.Client;
 45  
 import com.allanbank.mongodb.client.message.BatchedWriteCommand;
 46  
 import com.allanbank.mongodb.client.message.Command;
 47  
 import com.allanbank.mongodb.client.message.Reply;
 48  
 import com.allanbank.mongodb.error.BatchedWriteException;
 49  
 import com.allanbank.mongodb.util.Assertions;
 50  
 
 51  
 /**
 52  
  * BatchedWriteCallback provides the global callback for the batched writes.
 53  
  * 
 54  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 55  
  *         mutated in incompatible ways between any two releases of the driver.
 56  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 57  
  */
 58  
 public class BatchedWriteCallback extends ReplyLongCallback {
 59  
 
 60  
     /** The list of bundles to send. */
 61  
     private final List<BatchedWrite.Bundle> myBundles;
 62  
 
 63  
     /** The client to send messages with. */
 64  
     private Client myClient;
 65  
 
 66  
     /** The name of the collection. */
 67  
     private final String myCollectionName;
 68  
 
 69  
     /** The name of the database. */
 70  
     private final String myDatabaseName;
 71  
 
 72  
     /** The list of write operations which failed. */
 73  
     private final Map<WriteOperation, Throwable> myFailedOperations;
 74  
 
 75  
     /** The count of finished bundles or operations. */
 76  
     private int myFinished;
 77  
 
 78  
     /** The result. */
 79  32
     private long myN = 0;
 80  
 
 81  
     /** The list of bundles waiting to be sent to the server. */
 82  
     private final List<BatchedWrite.Bundle> myPendingBundles;
 83  
 
 84  
     /** The real callback for each operation. */
 85  
     private final List<Callback<Reply>> myRealCallbacks;
 86  
 
 87  
     /** The list of write operations which have been skipped due to an error. */
 88  
     private List<WriteOperation> mySkippedOperations;
 89  
 
 90  
     /** The original write operation. */
 91  
     private final BatchedWrite myWrite;
 92  
 
 93  
     /**
 94  
      * Creates a new BatchedWriteCallback.
 95  
      * 
 96  
      * @param databaseName
 97  
      *            The name of the database.
 98  
      * @param collectionName
 99  
      *            The name of the collection.
 100  
      * @param results
 101  
      *            The callback for the final results.
 102  
      * @param write
 103  
      *            The original write.
 104  
      * @param client
 105  
      *            The client for sending the bundled write commands.
 106  
      * @param bundles
 107  
      *            The bundled writes.
 108  
      */
 109  
     public BatchedWriteCallback(final String databaseName,
 110  
             final String collectionName, final Callback<Long> results,
 111  
             final BatchedWrite write, final Client client,
 112  
             final List<BatchedWrite.Bundle> bundles) {
 113  18
         super(results);
 114  
 
 115  18
         myDatabaseName = databaseName;
 116  18
         myCollectionName = collectionName;
 117  18
         myWrite = write;
 118  18
         myClient = client;
 119  18
         myBundles = Collections
 120  
                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
 121  
 
 122  18
         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
 123  
 
 124  18
         myFinished = 0;
 125  18
         myN = 0;
 126  
 
 127  18
         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
 128  18
         mySkippedOperations = null;
 129  
 
 130  18
         myRealCallbacks = Collections.emptyList();
 131  18
     }
 132  
 
 133  
     /**
 134  
      * Creates a new BatchedWriteCallback.
 135  
      * 
 136  
      * @param databaseName
 137  
      *            The name of the database.
 138  
      * @param collectionName
 139  
      *            The name of the collection.
 140  
      * @param realCallbacks
 141  
      *            The list of callbacks. One for each write.
 142  
      * @param write
 143  
      *            The original write.
 144  
      * @param bundles
 145  
      *            The bundled writes.
 146  
      */
 147  
     public BatchedWriteCallback(final String databaseName,
 148  
             final String collectionName,
 149  
             final List<Callback<Reply>> realCallbacks,
 150  
             final BatchedWrite write, final List<Bundle> bundles) {
 151  14
         super(null);
 152  
 
 153  14
         myDatabaseName = databaseName;
 154  14
         myCollectionName = collectionName;
 155  14
         myWrite = write;
 156  14
         myClient = null;
 157  14
         myBundles = Collections
 158  
                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
 159  
 
 160  14
         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
 161  
 
 162  14
         myFinished = 0;
 163  14
         myN = 0;
 164  
 
 165  14
         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
 166  14
         mySkippedOperations = null;
 167  
 
 168  14
         myRealCallbacks = new ArrayList<Callback<Reply>>(realCallbacks);
 169  
 
 170  14
         int count = 0;
 171  14
         for (final Bundle b : myBundles) {
 172  30
             count += b.getWrites().size();
 173  30
         }
 174  14
         Assertions.assertThat(
 175  
                 myRealCallbacks.size() == count,
 176  
                 "There nust be an operation (" + count
 177  
                         + ") in a bundle for each callback ("
 178  
                         + myRealCallbacks.size() + ").");
 179  13
     }
 180  
 
 181  
     /**
 182  
      * Sends the next set of operations to the server.
 183  
      */
 184  
     public void send() {
 185  
 
 186  
         List<BatchedWrite.Bundle> toSendBundles;
 187  39
         synchronized (this) {
 188  39
             List<BatchedWrite.Bundle> toSend = myPendingBundles;
 189  39
             if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
 190  14
                 toSend = myPendingBundles.subList(0, 1);
 191  
             }
 192  
 
 193  
             // Clear toSend before sending so the callbacks see the right
 194  
             // state for the bundles.
 195  39
             toSendBundles = new ArrayList<BatchedWrite.Bundle>(toSend);
 196  39
             toSend.clear();
 197  39
         } // Release lock.
 198  
 
 199  
         // Release the lock before sending to avoid deadlock in processing
 200  
         // replies.
 201  
 
 202  
         // Batches....
 203  39
         for (final BatchedWrite.Bundle bundle : toSendBundles) {
 204  68
             final Command commandMsg = new BatchedWriteCommand(myDatabaseName,
 205  
                     myCollectionName, bundle);
 206  
 
 207  
             // Our documents may be bigger than normally allowed...
 208  68
             commandMsg.setAllowJumbo(true);
 209  
 
 210  68
             if (myWrite.getDurability() == Durability.NONE) {
 211  
                 // Fake reply.
 212  6
                 final Document doc = BuilderFactory.start().add("ok", 1)
 213  
                         .add("n", -1).build();
 214  6
                 final Reply reply = new Reply(0, 0, 0,
 215  
                         Collections.singletonList(doc), false, false, false,
 216  
                         false);
 217  
 
 218  6
                 myClient.send(commandMsg, NoOpCallback.NO_OP);
 219  6
                 publish(bundle, reply);
 220  6
             }
 221  
             else {
 222  62
                 myClient.send(commandMsg, new BundleCallback(bundle));
 223  
 
 224  
             }
 225  68
         }
 226  
 
 227  39
         if ((myWrite.getDurability() == Durability.NONE)
 228  
                 && myPendingBundles.isEmpty() && (myForwardCallback != null)) {
 229  1
             myForwardCallback.callback(-1L);
 230  
         }
 231  39
     }
 232  
 
 233  
     /**
 234  
      * Sets the client to use to send the bundled writes.
 235  
      * 
 236  
      * @param client
 237  
      *            The new client for the batch.
 238  
      */
 239  
     public void setClient(final Client client) {
 240  13
         myClient = client;
 241  13
     }
 242  
 
 243  
     /**
 244  
      * Callback for a bundle of write operations sent via the write commands.
 245  
      * 
 246  
      * @param bundle
 247  
      *            The bundle of write operations.
 248  
      * @param result
 249  
      *            The result of the write operations.
 250  
      */
 251  
     protected synchronized void callback(final Bundle bundle, final Reply result) {
 252  60
         final MongoDbException error = asError(result);
 253  60
         if (error != null) {
 254  
             // Everything failed...
 255  4
             exception(bundle, error);
 256  
         }
 257  
         else {
 258  56
             myFinished += 1;
 259  56
             myN += convert(result).longValue();
 260  
 
 261  
             // Want to run both the durability and write failure so just | here.
 262  56
             final boolean failed = failedDurability(bundle, result)
 263  
                     | failedWrites(bundle, result);
 264  
 
 265  56
             publish(bundle, result);
 266  
 
 267  56
             if (failed) {
 268  4
                 publishResults();
 269  
             }
 270  52
             else if (!myPendingBundles.isEmpty()) {
 271  7
                 send();
 272  
             }
 273  45
             else if (myFinished == myBundles.size()) {
 274  21
                 publishResults();
 275  
             }
 276  
         }
 277  60
     }
 278  
 
 279  
     /**
 280  
      * Callback for a bundle of write operations sent via the write commands has
 281  
      * failed.
 282  
      * 
 283  
      * @param bundle
 284  
      *            The bundle of write operations.
 285  
      * @param thrown
 286  
      *            The error for the operations.
 287  
      */
 288  
     protected synchronized void exception(final Bundle bundle,
 289  
             final Throwable thrown) {
 290  5
         myFinished += 1;
 291  5
         for (final WriteOperation operation : bundle.getWrites()) {
 292  5
             myFailedOperations.put(operation, thrown);
 293  5
         }
 294  
 
 295  
         // No need to check if we have to send. Would have already sent all of
 296  
         // the operations if not SERIALIZE_AND_STOP.
 297  5
         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 298  1
             publishResults();
 299  
         }
 300  4
         else if (myFinished == myBundles.size()) {
 301  2
             publishResults();
 302  
         }
 303  5
     }
 304  
 
 305  
     /**
 306  
      * Checks for a failure in the durability requirements (e.g., did not
 307  
      * replicate to sufficient servers within the timeout) and updates the
 308  
      * failed operations map if any are found.
 309  
      * 
 310  
      * @param bundle
 311  
      *            The bundle for the reply.
 312  
      * @param reply
 313  
      *            The reply from the server.
 314  
      * @return True if there are failed writes and we should not send any
 315  
      *         additional requests.
 316  
      */
 317  
     private boolean failedDurability(final Bundle bundle, final Reply reply) {
 318  56
         final List<Document> results = reply.getResults();
 319  56
         if (results.size() == 1) {
 320  56
             final Document doc = results.get(0);
 321  56
             final DocumentElement error = doc.get(DocumentElement.class,
 322  
                     "writeConcernError");
 323  56
             if (error != null) {
 324  2
                 final int code = toInt(error.get(NumericElement.class, "code"));
 325  2
                 final String errmsg = asString(error.get(Element.class,
 326  
                         "errmsg"));
 327  2
                 final MongoDbException exception = asError(reply, 0, code,
 328  
                         true, errmsg, null);
 329  2
                 for (final WriteOperation op : bundle.getWrites()) {
 330  2
                     myFailedOperations.put(op, exception);
 331  2
                 }
 332  
             }
 333  
         }
 334  
 
 335  56
         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
 336  
                 && !myFailedOperations.isEmpty();
 337  
     }
 338  
 
 339  
     /**
 340  
      * Checks for individual {@code writeErrors} and updates the failed
 341  
      * operations map if any are found.
 342  
      * 
 343  
      * @param bundle
 344  
      *            The bundle for the reply.
 345  
      * @param reply
 346  
      *            The reply from the server.
 347  
      * @return True if there are failed writes and we should not send any
 348  
      *         additional requests.
 349  
      */
 350  
     private boolean failedWrites(final Bundle bundle, final Reply reply) {
 351  56
         final List<Document> results = reply.getResults();
 352  56
         if (results.size() == 1) {
 353  56
             final Document doc = results.get(0);
 354  56
             final ArrayElement errors = doc.get(ArrayElement.class,
 355  
                     "writeErrors");
 356  56
             if (errors != null) {
 357  8
                 final List<WriteOperation> operations = bundle.getWrites();
 358  8
                 for (final DocumentElement error : errors.find(
 359  
                         DocumentElement.class, ".*")) {
 360  8
                     final int index = toInt(error.get(NumericElement.class,
 361  
                             "index"));
 362  8
                     final int code = toInt(error.get(NumericElement.class,
 363  
                             "code"));
 364  8
                     final String errmsg = asString(error.get(Element.class,
 365  
                             "errmsg"));
 366  
 
 367  8
                     if ((0 <= index) && (index < operations.size())) {
 368  6
                         final WriteOperation op = operations.get(index);
 369  
 
 370  6
                         myFailedOperations.put(op,
 371  
                                 asError(reply, 0, code, false, errmsg, null));
 372  
 
 373  6
                         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 374  3
                             mySkippedOperations = new ArrayList<WriteOperation>();
 375  3
                             mySkippedOperations.addAll(operations.subList(
 376  
                                     index + 1, operations.size()));
 377  
                         }
 378  
                     }
 379  8
                 }
 380  
             }
 381  
         }
 382  
 
 383  56
         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
 384  
                 && !myFailedOperations.isEmpty();
 385  
     }
 386  
 
 387  
     /**
 388  
      * Publishes the results for an individual bundle.
 389  
      * 
 390  
      * @param bundle
 391  
      *            The bundle that we received the results for.
 392  
      * @param reply
 393  
      *            The received reply.
 394  
      */
 395  
     private void publish(final Bundle bundle, final Reply reply) {
 396  62
         if (myForwardCallback == null) {
 397  
             // Publish to each callback.
 398  26
             int index = 0;
 399  26
             for (final Bundle b : myBundles) {
 400  44
                 final List<WriteOperation> writes = b.getWrites();
 401  44
                 final int count = writes.size();
 402  
 
 403  
                 // Bundles can compare logically the same but still be
 404  
                 // different.
 405  44
                 if (b == bundle) {
 406  60
                     for (int i = 0; i < count; ++i) {
 407  
                         // Replace the callback to avoid double calls.
 408  34
                         final Throwable t = myFailedOperations.get(writes
 409  
                                 .get(i));
 410  34
                         final Callback<Reply> cb = myRealCallbacks.set(index
 411  
                                 + i, NoOpCallback.NO_OP);
 412  34
                         if (cb != null) {
 413  32
                             if (t == null) {
 414  
                                 // Worked
 415  29
                                 cb.callback(reply);
 416  
                             }
 417  
                             else {
 418  3
                                 cb.exception(t);
 419  
                             }
 420  
                         }
 421  
                     }
 422  26
                     break; // for(Bundle)
 423  
                 }
 424  
 
 425  18
                 index += count;
 426  18
             }
 427  
         }
 428  62
     }
 429  
 
 430  
     /**
 431  
      * Publishes the final results.
 432  
      */
 433  
     private void publishResults() {
 434  28
         if (myFailedOperations.isEmpty()) {
 435  15
             if (myForwardCallback != null) {
 436  6
                 myForwardCallback.callback(Long.valueOf(myN));
 437  
             }
 438  
             // If there are no failures then all of the real call-backs have
 439  
             // already been triggered.
 440  
         }
 441  
         else {
 442  13
             if (mySkippedOperations == null) {
 443  10
                 mySkippedOperations = new ArrayList<WriteOperation>();
 444  
             }
 445  13
             for (final Bundle pending : myPendingBundles) {
 446  5
                 mySkippedOperations.addAll(pending.getWrites());
 447  5
             }
 448  
 
 449  13
             if (myForwardCallback != null) {
 450  
                 // If there is only 1 operation and it failed then just publish
 451  
                 // that error.
 452  10
                 if ((myBundles.size() == 1)
 453  
                         && (myBundles.get(0).getWrites().size() == 1)
 454  
                         && (myFailedOperations.size() == 1)) {
 455  1
                     myForwardCallback.exception(myFailedOperations.values()
 456  
                             .iterator().next());
 457  
                 }
 458  
                 else {
 459  9
                     myForwardCallback.exception(new BatchedWriteException(
 460  
                             myWrite, myN, mySkippedOperations,
 461  
                             myFailedOperations));
 462  
                 }
 463  
             }
 464  
             else {
 465  
                 // Publish to each callback.
 466  3
                 final List<WriteOperation> emptySkipped = Collections
 467  
                         .emptyList();
 468  3
                 final Map<WriteOperation, Throwable> emptyError = Collections
 469  
                         .emptyMap();
 470  
 
 471  
                 // For fast lookup and lookup by identity.
 472  3
                 final Set<WriteOperation> skipped = Collections
 473  
                         .newSetFromMap(new IdentityHashMap<WriteOperation, Boolean>());
 474  3
                 skipped.addAll(mySkippedOperations);
 475  
 
 476  3
                 final Document doc = BuilderFactory.start().add("ok", 1)
 477  
                         .add("n", myN).build();
 478  3
                 final Reply reply = new Reply(0, 0, 0,
 479  
                         Collections.singletonList(doc), false, false, false,
 480  
                         false);
 481  
 
 482  3
                 int index = 0;
 483  3
                 for (final Bundle b : myBundles) {
 484  7
                     for (final WriteOperation op : b.getWrites()) {
 485  7
                         final Callback<Reply> cb = myRealCallbacks.get(index);
 486  
 
 487  7
                         if (cb != null) {
 488  
                             // Did this write fail?
 489  7
                             final Throwable thrown = myFailedOperations.get(op);
 490  7
                             if (thrown != null) {
 491  3
                                 cb.exception(new BatchedWriteException(myWrite,
 492  
                                         myN, emptySkipped, Collections
 493  
                                                 .singletonMap(op, thrown)));
 494  
                             }
 495  4
                             else if (skipped.contains(op)) {
 496  
                                 // Skipped the write.
 497  1
                                 cb.exception(new BatchedWriteException(myWrite,
 498  
                                         myN, Collections.singletonList(op),
 499  
                                         emptyError));
 500  
                             }
 501  
                             else {
 502  
                                 // Worked
 503  3
                                 cb.callback(reply);
 504  
                             }
 505  
                         }
 506  
 
 507  
                         // Next...
 508  7
                         index += 1;
 509  7
                     }
 510  7
                 }
 511  
             }
 512  
         }
 513  28
     }
 514  
 
 515  
     /**
 516  
      * BundleCallback provides the callback for a single batched write.
 517  
      * 
 518  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 519  
      *         be mutated in incompatible ways between any two releases of the
 520  
      *         driver.
 521  
      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 522  
      */
 523  57
     /* package */class BundleCallback implements ReplyCallback {
 524  
 
 525  
         /**
 526  
          * The bundle of operations this callback is waiting for the reply from.
 527  
          */
 528  
         private final Bundle myBundle;
 529  
 
 530  
         /**
 531  
          * Creates a new BatchedWriteBundleCallback.
 532  
          * 
 533  
          * @param bundle
 534  
          *            The bundle of operations this callback is waiting for the
 535  
          *            reply from.
 536  
          */
 537  62
         public BundleCallback(final Bundle bundle) {
 538  62
             myBundle = bundle;
 539  62
         }
 540  
 
 541  
         /**
 542  
          * {@inheritDoc}
 543  
          * <p>
 544  
          * Overridden to forward the results to the parent callback.
 545  
          * </p>
 546  
          */
 547  
         @Override
 548  
         public void callback(final Reply result) {
 549  57
             BatchedWriteCallback.this.callback(myBundle, result);
 550  57
         }
 551  
 
 552  
         /**
 553  
          * {@inheritDoc}
 554  
          * <p>
 555  
          * Overridden to forward the error to the parent callback.
 556  
          * </p>
 557  
          */
 558  
         @Override
 559  
         public void exception(final Throwable thrown) {
 560  1
             BatchedWriteCallback.this.exception(myBundle, thrown);
 561  1
         }
 562  
 
 563  
         /**
 564  
          * {@inheritDoc}
 565  
          * <p>
 566  
          * Overridden to return false.
 567  
          * </p>
 568  
          */
 569  
         @Override
 570  
         public boolean isLightWeight() {
 571  1
             return false;
 572  
         }
 573  
     }
 574  
 }