Coverage Report - com.allanbank.mongodb.client.callback.BatchedNativeWriteCallback
 
Classes in this File Line Coverage Branch Coverage Complexity
BatchedNativeWriteCallback
100%
52/52
95%
21/22
2.5
BatchedNativeWriteCallback$1
100%
1/1
N/A
2.5
BatchedNativeWriteCallback$NativeCallback
100%
8/8
N/A
2.5
 
 1  
 /*
 2  
  * #%L
 3  
  * BatchedNativeWriteCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.callback;
 22  
 
 23  
 import java.util.ArrayList;
 24  
 import java.util.Collections;
 25  
 import java.util.IdentityHashMap;
 26  
 import java.util.LinkedList;
 27  
 import java.util.List;
 28  
 import java.util.Map;
 29  
 
 30  
 import com.allanbank.mongodb.Callback;
 31  
 import com.allanbank.mongodb.Durability;
 32  
 import com.allanbank.mongodb.builder.BatchedWrite;
 33  
 import com.allanbank.mongodb.builder.BatchedWriteMode;
 34  
 import com.allanbank.mongodb.builder.write.DeleteOperation;
 35  
 import com.allanbank.mongodb.builder.write.InsertOperation;
 36  
 import com.allanbank.mongodb.builder.write.UpdateOperation;
 37  
 import com.allanbank.mongodb.builder.write.WriteOperation;
 38  
 import com.allanbank.mongodb.client.AbstractMongoOperations;
 39  
 import com.allanbank.mongodb.error.BatchedWriteException;
 40  
 
 41  
 /**
 42  
  * BatchedWriteCallback provides the global callback for the batched writes when
 43  
  * the server does not support the write commands. This callback will issue the
 44  
  * writes using the original wire protocol.
 45  
  * 
 46  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 47  
  *         mutated in incompatible ways between any two releases of the driver.
 48  
  * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 49  
  */
 50  
 public class BatchedNativeWriteCallback extends ReplyLongCallback {
 51  
 
 52  
     /** The collection to send individual operations with. */
 53  
     private final AbstractMongoOperations myCollection;
 54  
 
 55  
     /** The list of write operations which failed. */
 56  
     private final Map<WriteOperation, Throwable> myFailedOperations;
 57  
 
 58  
     /** The count of finished bundles or operations. */
 59  
     private int myFinished;
 60  
 
 61  
     /** The result. */
 62  8
     private long myN = 0;
 63  
 
 64  
     /** The list of write operations to send. */
 65  
     private final List<WriteOperation> myOperations;
 66  
 
 67  
     /** The list of write operations waiting to be sent to the server. */
 68  
     private final List<WriteOperation> myPendingOperations;
 69  
 
 70  
     /** The original write operation. */
 71  
     private final BatchedWrite myWrite;
 72  
 
 73  
     /**
 74  
      * Creates a new BatchedWriteCallback.
 75  
      * 
 76  
      * @param results
 77  
      *            The callback for the final results.
 78  
      * @param write
 79  
      *            The original write.
 80  
      * @param collection
 81  
      *            The collection for sending the operations.
 82  
      * @param operations
 83  
      *            The operations to send.
 84  
      */
 85  
     public BatchedNativeWriteCallback(final Callback<Long> results,
 86  
             final BatchedWrite write, final AbstractMongoOperations collection,
 87  
             final List<WriteOperation> operations) {
 88  8
         super(results);
 89  
 
 90  8
         myWrite = write;
 91  8
         myCollection = collection;
 92  8
         myOperations = Collections
 93  
                 .unmodifiableList(new ArrayList<WriteOperation>(operations));
 94  
 
 95  8
         myPendingOperations = new LinkedList<WriteOperation>(myOperations);
 96  
 
 97  8
         myFinished = 0;
 98  8
         myN = 0;
 99  
 
 100  8
         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
 101  8
     }
 102  
 
 103  
     /**
 104  
      * Sends the next set of operations to the server.
 105  
      */
 106  
     public void send() {
 107  
 
 108  
         List<WriteOperation> toSendOperations;
 109  11
         Durability durability = null;
 110  
 
 111  11
         synchronized (this) {
 112  11
             List<WriteOperation> toSend = myPendingOperations;
 113  11
             if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
 114  6
                 toSend = myPendingOperations.subList(0, 1);
 115  
             }
 116  
 
 117  11
             durability = myWrite.getDurability();
 118  11
             if ((durability == null) || (durability == Durability.NONE)) {
 119  9
                 durability = Durability.ACK;
 120  
             }
 121  
 
 122  
             // Clear toSend before sending so the callbacks see the right
 123  
             // state for the operations.
 124  11
             toSendOperations = new ArrayList<WriteOperation>(toSend);
 125  11
             toSend.clear();
 126  11
         } // Release lock.
 127  
 
 128  
         // Release the lock before sending to avoid deadlock in processing
 129  
         // replies.
 130  
 
 131  11
         for (final WriteOperation operation : toSendOperations) {
 132  1
             switch (operation.getType()) {
 133  
             case INSERT: {
 134  7
                 final InsertOperation insert = (InsertOperation) operation;
 135  7
                 myCollection.insertAsync(new NativeCallback<Integer>(insert),
 136  
                         true, durability, insert.getDocument());
 137  7
                 break;
 138  
             }
 139  
             case UPDATE: {
 140  5
                 final UpdateOperation update = (UpdateOperation) operation;
 141  5
                 myCollection.updateAsync(new NativeCallback<Long>(operation),
 142  
                         update.getQuery(), update.getUpdate(),
 143  
                         update.isMultiUpdate(), update.isUpsert(), durability);
 144  5
                 break;
 145  
             }
 146  
             case DELETE: {
 147  5
                 final DeleteOperation delete = (DeleteOperation) operation;
 148  5
                 myCollection.deleteAsync(new NativeCallback<Long>(operation),
 149  
                         delete.getQuery(), delete.isSingleDelete(), durability);
 150  5
                 break;
 151  
             }
 152  
             }
 153  17
         }
 154  11
     }
 155  
 
 156  
     /**
 157  
      * Callback for a single write operation sent via the native messages.
 158  
      * 
 159  
      * @param operation
 160  
      *            The write operation.
 161  
      * @param result
 162  
      *            The result of the write operation.
 163  
      */
 164  
     protected synchronized void callback(final WriteOperation operation,
 165  
             final long result) {
 166  9
         myN += result;
 167  9
         myFinished += 1;
 168  
 
 169  9
         if (!myPendingOperations.isEmpty()) {
 170  3
             send();
 171  
         }
 172  6
         else if (myFinished == myOperations.size()) {
 173  2
             publishResults();
 174  
         }
 175  9
     }
 176  
 
 177  
     /**
 178  
      * Callback for a single write operation sent via the native messages has
 179  
      * failed.
 180  
      * 
 181  
      * @param operation
 182  
      *            The write operation.
 183  
      * @param thrown
 184  
      *            The error for the operation.
 185  
      */
 186  
     protected synchronized void exception(final WriteOperation operation,
 187  
             final Throwable thrown) {
 188  6
         myFinished += 1;
 189  6
         myFailedOperations.put(operation, thrown);
 190  
 
 191  6
         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 192  2
             publishResults();
 193  
         }
 194  
         // No need to check if we have to send. Would have already sent all of
 195  
         // the operations if not SERIALIZE_AND_STOP.
 196  4
         else if (myFinished == myOperations.size()) {
 197  2
             publishResults();
 198  
         }
 199  
 
 200  6
     }
 201  
 
 202  
     /**
 203  
      * Publishes the final results.
 204  
      */
 205  
     private void publishResults() {
 206  6
         if (myFailedOperations.isEmpty()) {
 207  2
             myForwardCallback.callback(Long.valueOf(myN));
 208  
         }
 209  
         else {
 210  4
             myForwardCallback.exception(new BatchedWriteException(myWrite, myN,
 211  
                     myPendingOperations, myFailedOperations));
 212  
         }
 213  6
     }
 214  
 
 215  
     /**
 216  
      * NativeCallback provides the callback for a single operation within a
 217  
      * batched write. This callback is used when the batched write commands are
 218  
      * not supported and the driver falls back to using the native insert,
 219  
      * update, and delete messages.
 220  
      * 
 221  
      * @param <T>
 222  
      *            The type for the callback. Expected to be either Integer or
 223  
      *            Long.
 224  
      * 
 225  
      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
 226  
      *         be mutated in incompatible ways between any two releases of the
 227  
      *         driver.
 228  
      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
 229  
      */
 230  6
     /* package */class NativeCallback<T extends Number> implements Callback<T> {
 231  
 
 232  
         /** The operation this callback is waiting for the reply from. */
 233  
         private final WriteOperation myOperation;
 234  
 
 235  
         /**
 236  
          * Creates a new BatchedWriteNativeCallback.
 237  
          * 
 238  
          * @param operation
 239  
          *            The operation this callback is waiting for the reply from.
 240  
          */
 241  17
         public NativeCallback(final WriteOperation operation) {
 242  17
             myOperation = operation;
 243  17
         }
 244  
 
 245  
         /**
 246  
          * {@inheritDoc}
 247  
          * <p>
 248  
          * Overridden to forward the results to the parent callback.
 249  
          * </p>
 250  
          */
 251  
         @Override
 252  
         public void callback(final T result) {
 253  6
             BatchedNativeWriteCallback.this.callback(myOperation,
 254  
                     result.longValue());
 255  6
         }
 256  
 
 257  
         /**
 258  
          * {@inheritDoc}
 259  
          * <p>
 260  
          * Overridden to forward the error to the parent callback.
 261  
          * </p>
 262  
          */
 263  
         @Override
 264  
         public void exception(final Throwable thrown) {
 265  6
             BatchedNativeWriteCallback.this.exception(myOperation, thrown);
 266  6
         }
 267  
     }
 268  
 }