Coverage Report - com.allanbank.mongodb.client.callback.BatchedInsertCountingCallback
 
Classes in this File Line Coverage Branch Coverage Complexity
BatchedInsertCountingCallback
59%
26/44
25%
4/16
2.5
 
 1  
 /*
 2  
  * #%L
 3  
  * BatchedInsertCountingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.callback;
 22  
 
 23  
 import java.util.ArrayList;
 24  
 import java.util.Collections;
 25  
 import java.util.IdentityHashMap;
 26  
 import java.util.List;
 27  
 import java.util.Map;
 28  
 
 29  
 import com.allanbank.mongodb.Callback;
 30  
 import com.allanbank.mongodb.bson.Document;
 31  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 32  
 import com.allanbank.mongodb.builder.BatchedWrite;
 33  
 import com.allanbank.mongodb.builder.write.WriteOperation;
 34  
 import com.allanbank.mongodb.client.message.Reply;
 35  
 import com.allanbank.mongodb.error.BatchedWriteException;
 36  
 
 37  
 /**
 38  
  * BatchedInsertCountingCallback is designed to work with the
 39  
  * {@link BatchedWriteCallback}. This callback can be used as the callback for a
 40  
  * series of individual writes and will coalesce the results into a single
 41  
  * result based on a an expected number of callbacks.
 42  
  * <p>
 43  
  * The class does not track the input {@code n} value and instead always returns
 44  
  * an N value based on the expected count. That limits the utility of this class
 45  
  * to inserts.
 46  
  * </p>
 47  
  * 
 48  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 49  
  *         mutated in incompatible ways between any two releases of the driver.
 50  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 51  
  */
 52  7
 public class BatchedInsertCountingCallback implements Callback<Reply> {
 53  
 
 54  
     /** The count of the number of callbacks received so far. */
 55  7
     private int myCount = 0;
 56  
 
 57  
     /** The failed operations. */
 58  
     private Map<WriteOperation, Throwable> myErrors;
 59  
 
 60  
     /** The expected number of callbacks. */
 61  
     private final int myExpectedCount;
 62  
 
 63  
     /**
 64  
      * The count of the number of failure ({@link #exception(Throwable)})
 65  
      * callbacks received so far.
 66  
      */
 67  7
     private int myFailureCount = 0;
 68  
 
 69  
     /** The callback to notify with the final results once we receive them all. */
 70  
     private final Callback<Reply> myForwardCallback;
 71  
 
 72  
     /** The last batched write that failed. */
 73  
     private BatchedWrite myLastWrite;
 74  
 
 75  
     /** The skipped operations. */
 76  
     private List<WriteOperation> mySkipped;
 77  
 
 78  
     /**
 79  
      * Creates a new CountingCallback.
 80  
      * 
 81  
      * @param forwardCallback
 82  
      *            The callback to notify with the final results once we receive
 83  
      *            them all.
 84  
      * @param expectedCount
 85  
      *            The expected number of callbacks.
 86  
      */
 87  
     public BatchedInsertCountingCallback(final Callback<Reply> forwardCallback,
 88  7
             final int expectedCount) {
 89  7
         myForwardCallback = forwardCallback;
 90  7
         myExpectedCount = expectedCount;
 91  7
         myCount = 0;
 92  7
     }
 93  
 
 94  
     /**
 95  
      * {@inheritDoc}
 96  
      * <p>
 97  
      * Overridden to increment the count and when the max is reached forward the
 98  
      * final results.
 99  
      * </p>
 100  
      */
 101  
     @Override
 102  
     public void callback(final Reply result) {
 103  
         boolean publish;
 104  7
         synchronized (this) {
 105  7
             myCount += 1;
 106  7
             publish = (myCount == myExpectedCount);
 107  7
         }
 108  
 
 109  7
         if (publish) {
 110  7
             publish();
 111  
         }
 112  7
     }
 113  
 
 114  
     /**
 115  
      * {@inheritDoc}
 116  
      * <p>
 117  
      * Overridden to record the exception details.
 118  
      * </p>
 119  
      */
 120  
     @Override
 121  
     public void exception(final Throwable thrown) {
 122  
         boolean publish;
 123  0
         synchronized (this) {
 124  0
             myFailureCount += 1;
 125  0
             myCount += 1;
 126  0
             publish = (myCount == myExpectedCount);
 127  
 
 128  0
             if (mySkipped == null) {
 129  0
                 mySkipped = new ArrayList<WriteOperation>();
 130  0
                 myErrors = new IdentityHashMap<WriteOperation, Throwable>();
 131  
             }
 132  
 
 133  0
             if (thrown instanceof BatchedWriteException) {
 134  0
                 final BatchedWriteException errors = (BatchedWriteException) thrown;
 135  
 
 136  0
                 myLastWrite = errors.getWrite();
 137  0
                 mySkipped.addAll(errors.getSkipped());
 138  0
                 myErrors.putAll(errors.getErrors());
 139  
             }
 140  0
         }
 141  
 
 142  0
         if (publish) {
 143  0
             publish();
 144  
         }
 145  0
     }
 146  
 
 147  
     /**
 148  
      * Publishes the final results to {@link #myForwardCallback}.
 149  
      */
 150  
     private void publish() {
 151  7
         Reply reply = null;
 152  7
         BatchedWriteException error = null;
 153  7
         synchronized (this) {
 154  7
             if (myFailureCount == 0) {
 155  7
                 final Document doc = BuilderFactory.start().add("ok", 1)
 156  
                         .add("n", myExpectedCount).build();
 157  7
                 reply = new Reply(0, 0, 0, Collections.singletonList(doc),
 158  
                         false, false, false, false);
 159  7
             }
 160  
             else {
 161  0
                 error = new BatchedWriteException(myLastWrite,
 162  
                         (myExpectedCount - myFailureCount), mySkipped, myErrors);
 163  
             }
 164  7
         }
 165  
 
 166  
         // Reply outside the lock.
 167  7
         if (reply != null) {
 168  7
             myForwardCallback.callback(reply);
 169  
         }
 170  
         else {
 171  0
             myForwardCallback.exception(error);
 172  
         }
 173  7
     }
 174  
 
 175  
 }