Coverage Report - com.allanbank.mongodb.client.BatchedAsyncMongoCollectionImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
BatchedAsyncMongoCollectionImpl
92%
24/26
50%
2/4
2.636
BatchedAsyncMongoCollectionImpl$CaptureClientHandler
91%
113/124
68%
45/66
2.636
 
 1  
 /*
 2  
  * #%L
 3  
  * BatchedAsyncMongoCollectionImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client;
 21  
 
 22  
 import java.lang.reflect.InvocationHandler;
 23  
 import java.lang.reflect.Method;
 24  
 import java.lang.reflect.Proxy;
 25  
 import java.util.ArrayList;
 26  
 import java.util.Collections;
 27  
 import java.util.LinkedList;
 28  
 import java.util.List;
 29  
 import java.util.concurrent.CancellationException;
 30  
 import java.util.concurrent.Future;
 31  
 
 32  
 import com.allanbank.mongodb.BatchedAsyncMongoCollection;
 33  
 import com.allanbank.mongodb.Callback;
 34  
 import com.allanbank.mongodb.Durability;
 35  
 import com.allanbank.mongodb.MongoDatabase;
 36  
 import com.allanbank.mongodb.MongoDbException;
 37  
 import com.allanbank.mongodb.Version;
 38  
 import com.allanbank.mongodb.bson.Document;
 39  
 import com.allanbank.mongodb.builder.BatchedWrite;
 40  
 import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
 41  
 import com.allanbank.mongodb.builder.BatchedWriteMode;
 42  
 import com.allanbank.mongodb.client.callback.AbstractReplyCallback;
 43  
 import com.allanbank.mongodb.client.callback.BatchedInsertCountingCallback;
 44  
 import com.allanbank.mongodb.client.callback.BatchedWriteCallback;
 45  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 46  
 import com.allanbank.mongodb.client.message.Delete;
 47  
 import com.allanbank.mongodb.client.message.GetLastError;
 48  
 import com.allanbank.mongodb.client.message.Insert;
 49  
 import com.allanbank.mongodb.client.message.Reply;
 50  
 import com.allanbank.mongodb.client.message.Update;
 51  
 
 52  
 /**
 53  
  * BatchedAsyncMongoCollectionImpl provides the implementation for the
 54  
  * {@link BatchedAsyncMongoCollection}.
 55  
  * 
 56  
  * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
 57  
  */
 58  
 public class BatchedAsyncMongoCollectionImpl extends
 59  
         AbstractAsyncMongoCollection implements BatchedAsyncMongoCollection {
 60  
 
 61  
     /** The interfaces to implement via the proxy. */
 62  1
     private static final Class<?>[] CLIENT_INTERFACE = new Class[] { Client.class };
 63  
 
 64  
     /** set to true to batch deletes. */
 65  11
     private boolean myBatchDeletes = false;
 66  
 
 67  
     /** Set to true to batch updates. */
 68  11
     private boolean myBatchUpdates = false;
 69  
 
 70  
     /** The mode for the writes. */
 71  11
     private BatchedWriteMode myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
 72  
 
 73  
     /**
 74  
      * Creates a new BatchedAsyncMongoCollectionImpl.
 75  
      * 
 76  
      * @param client
 77  
      *            The client for interacting with MongoDB.
 78  
      * @param database
 79  
      *            The database we interact with.
 80  
      * @param name
 81  
      *            The name of the collection we interact with.
 82  
      */
 83  
     public BatchedAsyncMongoCollectionImpl(final Client client,
 84  
             final MongoDatabase database, final String name) {
 85  
 
 86  11
         super((Client) Proxy.newProxyInstance(
 87  
                 BatchedAsyncMongoCollectionImpl.class.getClassLoader(),
 88  
                 CLIENT_INTERFACE, new CaptureClientHandler(client)), database,
 89  
                 name);
 90  11
     }
 91  
 
 92  
     /**
 93  
      * {@inheritDoc}
 94  
      * <p>
 95  
      * Overridden to clear any pending messages without sending them to MongoDB.
 96  
      * </p>
 97  
      */
 98  
     @Override
 99  
     public void cancel() {
 100  1
         final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
 101  1
         if (handler instanceof CaptureClientHandler) {
 102  1
             ((CaptureClientHandler) handler).clear();
 103  
         }
 104  1
     }
 105  
 
 106  
     /**
 107  
      * {@inheritDoc}
 108  
      * <p>
 109  
      * Overridden to flush any pending messages to a real serialized client.
 110  
      * </p>
 111  
      */
 112  
     @Override
 113  
     public void close() throws MongoDbException {
 114  4
         flush();
 115  4
     }
 116  
 
 117  
     /**
 118  
      * {@inheritDoc}
 119  
      * <p>
 120  
      * Overridden to flush any pending messages to a real serialized client.
 121  
      * </p>
 122  
      */
 123  
     @Override
 124  
     public void flush() throws MongoDbException {
 125  11
         final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
 126  11
         if (handler instanceof CaptureClientHandler) {
 127  11
             ((CaptureClientHandler) handler).flush(this);
 128  
         }
 129  11
     }
 130  
 
 131  
     /**
 132  
      * Returns the mode for the batched writes.
 133  
      * 
 134  
      * @return The mode for the batched writes.
 135  
      */
 136  
     public BatchedWriteMode getMode() {
 137  37
         return myMode;
 138  
     }
 139  
 
 140  
     /**
 141  
      * Returns true if the deletes should be batched.
 142  
      * 
 143  
      * @return True if the deletes should be batched.
 144  
      */
 145  
     public boolean isBatchDeletes() {
 146  8
         return myBatchDeletes;
 147  
     }
 148  
 
 149  
     /**
 150  
      * Returns true if the updates should be batched.
 151  
      * 
 152  
      * @return True if the updates should be batched.
 153  
      */
 154  
     public boolean isBatchUpdates() {
 155  14
         return myBatchUpdates;
 156  
     }
 157  
 
 158  
     /**
 159  
      * {@inheritDoc}
 160  
      */
 161  
     @Override
 162  
     public void setBatchDeletes(final boolean batchDeletes) {
 163  4
         myBatchDeletes = batchDeletes;
 164  4
     }
 165  
 
 166  
     /**
 167  
      * {@inheritDoc}
 168  
      */
 169  
     @Override
 170  
     public void setBatchUpdates(final boolean batchUpdates) {
 171  4
         myBatchUpdates = batchUpdates;
 172  4
     }
 173  
 
 174  
     /**
 175  
      * {@inheritDoc}
 176  
      */
 177  
     @Override
 178  
     public void setMode(final BatchedWriteMode mode) {
 179  0
         myMode = mode;
 180  0
     }
 181  
 
 182  
     /**
 183  
      * {@inheritDoc}
 184  
      * <p>
 185  
      * Overridden to return false to force the {@link AbstractMongoOperations}
 186  
      * class to always use the legacy {@link Insert}, {@link Update}, and
 187  
      * {@link Delete} messages. The {@code CaptureClientHandler.optimize()} will
 188  
      * convert those operations to bulk write commands as appropriate.
 189  
      */
 190  
     @Override
 191  
     protected boolean useWriteCommand() {
 192  34
         return false;
 193  
     }
 194  
 
 195  
     /**
 196  
      * CaptureClientHandler provides an {@link InvocationHandler} to capture all
 197  
      * send requests and defer them until flushed.
 198  
      * 
 199  
      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
 200  
      */
 201  
     private static class CaptureClientHandler implements InvocationHandler {
 202  
 
 203  
         /** The first version to support batch write commands. */
 204  1
         public static final Version BATCH_WRITE_VERSION = Version
 205  
                 .parse("2.5.4");
 206  
 
 207  
         /** The collection we are proxying. */
 208  
         private BatchedAsyncMongoCollectionImpl myCollection;
 209  
 
 210  
         /** The real (e.g., user's) callbacks. */
 211  
         private List<Callback<Reply>> myRealCallbacks;
 212  
 
 213  
         /**
 214  
          * The {@link Client} implementation to delegate to when sending
 215  
          * messages or handling other method calls.
 216  
          */
 217  
         private final Client myRealClient;
 218  
 
 219  
         /** The final results from the callback. */
 220  
         private List<Object> myResults;
 221  
 
 222  
         /**
 223  
          * The {@link Client} implementation to delegate to when sending
 224  
          * messages or handling other method calls.
 225  
          */
 226  
         private final List<Object[]> mySendArgs;
 227  
 
 228  
         /** The batched writer we are building. */
 229  
         private final BatchedWrite.Builder myWrite;
 230  
 
 231  
         /**
 232  
          * Creates a new CaptureClientHandler.
 233  
          * 
 234  
          * @param realClient
 235  
          *            The {@link Client} implementation to delegate to when
 236  
          *            sending messages or handling other method calls.
 237  
          */
 238  11
         public CaptureClientHandler(final Client realClient) {
 239  11
             myRealClient = realClient;
 240  
 
 241  11
             myRealCallbacks = null;
 242  11
             myResults = null;
 243  
 
 244  11
             mySendArgs = new LinkedList<Object[]>();
 245  11
             myWrite = BatchedWrite.builder();
 246  11
         }
 247  
 
 248  
         /**
 249  
          * Clears the pending messages without sending them to MongoDB.
 250  
          */
 251  
         public synchronized void clear() {
 252  12
             final List<Object[]> copy = new ArrayList<Object[]>(mySendArgs);
 253  
 
 254  12
             mySendArgs.clear();
 255  12
             myWrite.reset();
 256  
 
 257  12
             myResults = null;
 258  12
             myRealCallbacks = null;
 259  12
             myCollection = null;
 260  
 
 261  12
             for (final Object[] args : copy) {
 262  4
                 final Object lastArg = args[args.length - 1];
 263  4
                 if (lastArg instanceof Future<?>) {
 264  0
                     ((Future<?>) lastArg).cancel(false);
 265  
                 }
 266  4
                 else if (lastArg instanceof Callback<?>) {
 267  4
                     ((Callback<?>) lastArg)
 268  
                             .exception(new CancellationException(
 269  
                                     "Batch request cancelled."));
 270  
                 }
 271  4
             }
 272  12
         }
 273  
 
 274  
         /**
 275  
          * Flushes the pending messages to a serialized client.
 276  
          * 
 277  
          * @param collection
 278  
          *            The Collection the we are flushing operations for.
 279  
          */
 280  
         public synchronized void flush(
 281  
                 final BatchedAsyncMongoCollectionImpl collection) {
 282  
 
 283  
             // Use a serialized client to keep all of the messages on a single
 284  
             // connection as much as possible.
 285  
             SerialClientImpl serialized;
 286  11
             if (myRealClient instanceof SerialClientImpl) {
 287  11
                 serialized = (SerialClientImpl) myRealClient;
 288  
             }
 289  
             else {
 290  0
                 serialized = new SerialClientImpl((ClientImpl) myRealClient);
 291  
             }
 292  
 
 293  
             try {
 294  
                 // Send the optimized requests.
 295  11
                 final List<Object> optimized = optimize(collection);
 296  11
                 for (final Object toSend : optimized) {
 297  18
                     if (toSend instanceof BatchedWriteCallback) {
 298  7
                         final BatchedWriteCallback cb = (BatchedWriteCallback) toSend;
 299  7
                         cb.setClient(serialized);
 300  7
                         cb.send();
 301  7
                     }
 302  11
                     else if (toSend instanceof Object[]) {
 303  11
                         final Object[] sendArg = (Object[]) toSend;
 304  11
                         if (sendArg.length == 2) {
 305  0
                             serialized.send((Message) sendArg[0],
 306  
                                     (ReplyCallback) sendArg[1]);
 307  
                         }
 308  
                         else {
 309  11
                             serialized.send((Message) sendArg[0],
 310  
                                     (Message) sendArg[1],
 311  
                                     (ReplyCallback) sendArg[2]);
 312  
                         }
 313  
                     }
 314  18
                 }
 315  
             }
 316  
             finally {
 317  11
                 clear();
 318  11
             }
 319  11
         }
 320  
 
 321  
         /**
 322  
          * {@inheritDoc}
 323  
          * <p>
 324  
          * Overridden to batch all {@link Client#send} operations.
 325  
          * </p>
 326  
          */
 327  
         @Override
 328  
         public synchronized Object invoke(final Object proxy,
 329  
                 final Method method, final Object[] args) throws Throwable {
 330  
 
 331  87
             final String methodName = method.getName();
 332  
 
 333  87
             if (methodName.equals("send")) {
 334  34
                 mySendArgs.add(args);
 335  34
                 return null;
 336  
             }
 337  53
             return method.invoke(myRealClient, args);
 338  
         }
 339  
 
 340  
         /**
 341  
          * Adds a delete to the batch.
 342  
          * 
 343  
          * @param delete
 344  
          *            The delete to add to the batch.
 345  
          * @param args
 346  
          *            The raw send() arguments.
 347  
          */
 348  
         private void addDelete(final Delete delete, final Object[] args) {
 349  
 
 350  6
             updateDurability(args);
 351  
 
 352  6
             myRealCallbacks.add(extractCallback(args));
 353  6
             myWrite.delete(delete.getQuery(), delete.isSingleDelete());
 354  6
         }
 355  
 
 356  
         /**
 357  
          * Adds an insert to the batch.
 358  
          * 
 359  
          * @param insert
 360  
          *            The insert to add to the batch.
 361  
          * @param args
 362  
          *            The raw send() arguments.
 363  
          */
 364  
         private void addInsert(final Insert insert, final Object[] args) {
 365  
 
 366  7
             updateDurability(args);
 367  
 
 368  7
             final int docCount = insert.getDocuments().size();
 369  7
             Callback<Reply> cb = extractCallback(args);
 370  7
             final boolean breakBatch = (cb != null)
 371  
                     && insert.isContinueOnError() && (docCount > 1);
 372  
 
 373  7
             if (breakBatch) {
 374  0
                 closeBatch();
 375  0
                 myWrite.setMode(BatchedWriteMode.SERIALIZE_AND_STOP);
 376  
             }
 377  
             else {
 378  7
                 cb = new BatchedInsertCountingCallback(cb, docCount);
 379  
             }
 380  
 
 381  7
             for (final Document doc : insert.getDocuments()) {
 382  7
                 myWrite.insert(doc);
 383  7
                 myRealCallbacks.add(cb);
 384  7
             }
 385  7
             if (breakBatch) {
 386  0
                 closeBatch();
 387  
             }
 388  7
         }
 389  
 
 390  
         /**
 391  
          * Adds an update to the batch.
 392  
          * 
 393  
          * @param update
 394  
          *            The update to add to the batch.
 395  
          * @param args
 396  
          *            The raw send() arguments.
 397  
          */
 398  
         private void addUpdate(final Update update, final Object[] args) {
 399  
 
 400  6
             updateDurability(args);
 401  
 
 402  6
             myRealCallbacks.add(extractCallback(args));
 403  6
             myWrite.update(update.getQuery(), update.getUpdate(),
 404  
                     update.isMultiUpdate(), update.isUpsert());
 405  6
         }
 406  
 
 407  
         /**
 408  
          * Closes the current batch of operations and re-initializes the batched
 409  
          * writer.
 410  
          */
 411  
         private void closeBatch() {
 412  9
             final ClusterStats stats = myRealClient.getClusterStats();
 413  9
             final BatchedWrite w = myWrite.build();
 414  9
             final List<Bundle> bundles = w.toBundles(myCollection.getName(),
 415  
                     stats.getSmallestMaxBsonObjectSize(),
 416  
                     stats.getSmallestMaxBatchedWriteOperations());
 417  9
             if (!bundles.isEmpty()) {
 418  7
                 final BatchedWriteCallback cb = new BatchedWriteCallback(
 419  
                         myCollection.getDatabaseName(), myCollection.getName(),
 420  
                         myRealCallbacks, w, bundles);
 421  7
                 myResults.add(cb);
 422  
             }
 423  
 
 424  9
             myWrite.reset();
 425  9
             myWrite.setMode(myCollection.getMode());
 426  
 
 427  9
             myRealCallbacks.clear();
 428  9
         }
 429  
 
 430  
         /**
 431  
          * Extracts the callback from the write arguments. If the write has a
 432  
          * {@link Callback} then it will be the last argument.
 433  
          * 
 434  
          * @param args
 435  
          *            The arguments for the original {@link Client#send} call.
 436  
          * @return The callback for the call. Returns null if there is no
 437  
          *         {@link Callback}.
 438  
          */
 439  
         private Callback<Reply> extractCallback(final Object[] args) {
 440  19
             final Object cb = args[args.length - 1];
 441  19
             if (cb instanceof AbstractReplyCallback<?>) {
 442  19
                 return (AbstractReplyCallback<?>) args[2];
 443  
             }
 444  
 
 445  0
             return null;
 446  
         }
 447  
 
 448  
         /**
 449  
          * Tries the optimize the messages we will send to the server by
 450  
          * coalescing the sequential insert, update and delete messages into the
 451  
          * batched write commands of the same name.
 452  
          * 
 453  
          * @param collection
 454  
          *            The collection we are sending requests to.
 455  
          * @return The list of optimized messages.
 456  
          */
 457  
         private List<Object> optimize(
 458  
                 final BatchedAsyncMongoCollectionImpl collection) {
 459  
 
 460  11
             if (mySendArgs.isEmpty()) {
 461  1
                 return Collections.emptyList();
 462  
             }
 463  
 
 464  10
             final ClusterStats stats = myRealClient.getClusterStats();
 465  10
             final Version minVersion = stats.getServerVersionRange()
 466  
                     .getLowerBounds();
 467  10
             final boolean supportsBatch = BATCH_WRITE_VERSION
 468  
                     .compareTo(minVersion) <= 0;
 469  10
             if (supportsBatch) {
 470  7
                 myCollection = collection;
 471  
 
 472  7
                 myWrite.reset();
 473  7
                 myWrite.setMode(collection.getMode());
 474  
 
 475  7
                 myResults = new ArrayList<Object>(mySendArgs.size());
 476  7
                 myRealCallbacks = new ArrayList<Callback<Reply>>(
 477  
                         mySendArgs.size());
 478  
 
 479  28
                 while (!mySendArgs.isEmpty()) {
 480  21
                     final Object[] args = mySendArgs.remove(0);
 481  21
                     if (args[0] instanceof Insert) {
 482  7
                         addInsert((Insert) args[0], args);
 483  
                     }
 484  14
                     else if (collection.isBatchUpdates()
 485  
                             && (args[0] instanceof Update)) {
 486  6
                         addUpdate((Update) args[0], args);
 487  
                     }
 488  8
                     else if (collection.isBatchDeletes()
 489  
                             && (args[0] instanceof Delete)) {
 490  6
                         addDelete((Delete) args[0], args);
 491  
                     }
 492  
                     else {
 493  2
                         closeBatch();
 494  2
                         myResults.add(args);
 495  
                     }
 496  
 
 497  21
                     if (collection.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
 498  0
                         closeBatch();
 499  
                     }
 500  21
                 }
 501  
 
 502  7
                 closeBatch();
 503  
             }
 504  
             else {
 505  3
                 myResults = new ArrayList<Object>(mySendArgs.size());
 506  3
                 myResults.addAll(mySendArgs);
 507  
 
 508  
                 // Clear the sendArgs or they will get notified of a cancel.
 509  3
                 mySendArgs.clear();
 510  
             }
 511  
 
 512  10
             return myResults;
 513  
         }
 514  
 
 515  
         /**
 516  
          * Updates the durability for the batch. If the durability changes
 517  
          * mid-batch then we force a break in the batch.
 518  
          * 
 519  
          * @param args
 520  
          *            The arguments for the send() call. The
 521  
          *            {@link GetLastError} will be the second of three
 522  
          *            arguments.
 523  
          */
 524  
         private void updateDurability(final Object[] args) {
 525  
 
 526  19
             Durability active = myWrite.getDurability();
 527  
 
 528  19
             if ((args.length == 3) && (args[1] instanceof GetLastError)) {
 529  19
                 final GetLastError error = (GetLastError) args[1];
 530  
 
 531  19
                 final Durability d = Durability.valueOf(error.getQuery()
 532  
                         .toString());
 533  
 
 534  19
                 if (active == null) {
 535  7
                     active = d;
 536  7
                     myWrite.setDurability(active);
 537  
                 }
 538  12
                 else if (!d.equals(active) && !d.equals(Durability.ACK)
 539  
                         && !d.equals(Durability.NONE)) {
 540  0
                     closeBatch();
 541  0
                     active = d;
 542  0
                     myWrite.setDurability(active);
 543  
                 }
 544  
             } // else Durability is none or not applicable.
 545  19
         }
 546  
     }
 547  
 }