View Javadoc
1   /*
2    * #%L
3    * BatchedAsyncMongoCollectionImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.lang.reflect.InvocationHandler;
23  import java.lang.reflect.Method;
24  import java.lang.reflect.Proxy;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.concurrent.CancellationException;
30  import java.util.concurrent.Future;
31  
32  import com.allanbank.mongodb.BatchedAsyncMongoCollection;
33  import com.allanbank.mongodb.Callback;
34  import com.allanbank.mongodb.Durability;
35  import com.allanbank.mongodb.MongoDatabase;
36  import com.allanbank.mongodb.MongoDbException;
37  import com.allanbank.mongodb.Version;
38  import com.allanbank.mongodb.bson.Document;
39  import com.allanbank.mongodb.builder.BatchedWrite;
40  import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
41  import com.allanbank.mongodb.builder.BatchedWriteMode;
42  import com.allanbank.mongodb.client.callback.AbstractReplyCallback;
43  import com.allanbank.mongodb.client.callback.BatchedInsertCountingCallback;
44  import com.allanbank.mongodb.client.callback.BatchedWriteCallback;
45  import com.allanbank.mongodb.client.callback.ReplyCallback;
46  import com.allanbank.mongodb.client.message.Delete;
47  import com.allanbank.mongodb.client.message.GetLastError;
48  import com.allanbank.mongodb.client.message.Insert;
49  import com.allanbank.mongodb.client.message.Reply;
50  import com.allanbank.mongodb.client.message.Update;
51  
52  /**
53   * BatchedAsyncMongoCollectionImpl provides the implementation for the
54   * {@link BatchedAsyncMongoCollection}.
55   * 
56   * @copyright 2013-2014, Allanbank Consulting, Inc., All Rights Reserved
57   */
58  public class BatchedAsyncMongoCollectionImpl extends
59          AbstractAsyncMongoCollection implements BatchedAsyncMongoCollection {
60  
61      /** The interfaces to implement via the proxy. */
62      private static final Class<?>[] CLIENT_INTERFACE = new Class[] { Client.class };
63  
64      /** set to true to batch deletes. */
65      private boolean myBatchDeletes = false;
66  
67      /** Set to true to batch updates. */
68      private boolean myBatchUpdates = false;
69  
70      /** The mode for the writes. */
71      private BatchedWriteMode myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE;
72  
73      /**
74       * Creates a new BatchedAsyncMongoCollectionImpl.
75       * 
76       * @param client
77       *            The client for interacting with MongoDB.
78       * @param database
79       *            The database we interact with.
80       * @param name
81       *            The name of the collection we interact with.
82       */
83      public BatchedAsyncMongoCollectionImpl(final Client client,
84              final MongoDatabase database, final String name) {
85  
86          super((Client) Proxy.newProxyInstance(
87                  BatchedAsyncMongoCollectionImpl.class.getClassLoader(),
88                  CLIENT_INTERFACE, new CaptureClientHandler(client)), database,
89                  name);
90      }
91  
92      /**
93       * {@inheritDoc}
94       * <p>
95       * Overridden to clear any pending messages without sending them to MongoDB.
96       * </p>
97       */
98      @Override
99      public void cancel() {
100         final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
101         if (handler instanceof CaptureClientHandler) {
102             ((CaptureClientHandler) handler).clear();
103         }
104     }
105 
106     /**
107      * {@inheritDoc}
108      * <p>
109      * Overridden to flush any pending messages to a real serialized client.
110      * </p>
111      */
112     @Override
113     public void close() throws MongoDbException {
114         flush();
115     }
116 
117     /**
118      * {@inheritDoc}
119      * <p>
120      * Overridden to flush any pending messages to a real serialized client.
121      * </p>
122      */
123     @Override
124     public void flush() throws MongoDbException {
125         final InvocationHandler handler = Proxy.getInvocationHandler(myClient);
126         if (handler instanceof CaptureClientHandler) {
127             ((CaptureClientHandler) handler).flush(this);
128         }
129     }
130 
131     /**
132      * Returns the mode for the batched writes.
133      * 
134      * @return The mode for the batched writes.
135      */
136     public BatchedWriteMode getMode() {
137         return myMode;
138     }
139 
140     /**
141      * Returns true if the deletes should be batched.
142      * 
143      * @return True if the deletes should be batched.
144      */
145     public boolean isBatchDeletes() {
146         return myBatchDeletes;
147     }
148 
149     /**
150      * Returns true if the updates should be batched.
151      * 
152      * @return True if the updates should be batched.
153      */
154     public boolean isBatchUpdates() {
155         return myBatchUpdates;
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public void setBatchDeletes(final boolean batchDeletes) {
163         myBatchDeletes = batchDeletes;
164     }
165 
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     public void setBatchUpdates(final boolean batchUpdates) {
171         myBatchUpdates = batchUpdates;
172     }
173 
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     public void setMode(final BatchedWriteMode mode) {
179         myMode = mode;
180     }
181 
182     /**
183      * {@inheritDoc}
184      * <p>
185      * Overridden to return false to force the {@link AbstractMongoOperations}
186      * class to always use the legacy {@link Insert}, {@link Update}, and
187      * {@link Delete} messages. The {@code CaptureClientHandler.optimize()} will
188      * convert those operations to bulk write commands as appropriate.
189      */
190     @Override
191     protected boolean useWriteCommand() {
192         return false;
193     }
194 
195     /**
196      * CaptureClientHandler provides an {@link InvocationHandler} to capture all
197      * send requests and defer them until flushed.
198      * 
199      * @copyright 2013, Allanbank Consulting, Inc., All Rights Reserved
200      */
201     private static class CaptureClientHandler implements InvocationHandler {
202 
203         /** The first version to support batch write commands. */
204         public static final Version BATCH_WRITE_VERSION = Version
205                 .parse("2.5.4");
206 
207         /** The collection we are proxying. */
208         private BatchedAsyncMongoCollectionImpl myCollection;
209 
210         /** The real (e.g., user's) callbacks. */
211         private List<Callback<Reply>> myRealCallbacks;
212 
213         /**
214          * The {@link Client} implementation to delegate to when sending
215          * messages or handling other method calls.
216          */
217         private final Client myRealClient;
218 
219         /** The final results from the callback. */
220         private List<Object> myResults;
221 
222         /**
223          * The {@link Client} implementation to delegate to when sending
224          * messages or handling other method calls.
225          */
226         private final List<Object[]> mySendArgs;
227 
228         /** The batched writer we are building. */
229         private final BatchedWrite.Builder myWrite;
230 
231         /**
232          * Creates a new CaptureClientHandler.
233          * 
234          * @param realClient
235          *            The {@link Client} implementation to delegate to when
236          *            sending messages or handling other method calls.
237          */
238         public CaptureClientHandler(final Client realClient) {
239             myRealClient = realClient;
240 
241             myRealCallbacks = null;
242             myResults = null;
243 
244             mySendArgs = new LinkedList<Object[]>();
245             myWrite = BatchedWrite.builder();
246         }
247 
248         /**
249          * Clears the pending messages without sending them to MongoDB.
250          */
251         public synchronized void clear() {
252             final List<Object[]> copy = new ArrayList<Object[]>(mySendArgs);
253 
254             mySendArgs.clear();
255             myWrite.reset();
256 
257             myResults = null;
258             myRealCallbacks = null;
259             myCollection = null;
260 
261             for (final Object[] args : copy) {
262                 final Object lastArg = args[args.length - 1];
263                 if (lastArg instanceof Future<?>) {
264                     ((Future<?>) lastArg).cancel(false);
265                 }
266                 else if (lastArg instanceof Callback<?>) {
267                     ((Callback<?>) lastArg)
268                             .exception(new CancellationException(
269                                     "Batch request cancelled."));
270                 }
271             }
272         }
273 
274         /**
275          * Flushes the pending messages to a serialized client.
276          * 
277          * @param collection
278          *            The Collection the we are flushing operations for.
279          */
280         public synchronized void flush(
281                 final BatchedAsyncMongoCollectionImpl collection) {
282 
283             // Use a serialized client to keep all of the messages on a single
284             // connection as much as possible.
285             SerialClientImpl serialized;
286             if (myRealClient instanceof SerialClientImpl) {
287                 serialized = (SerialClientImpl) myRealClient;
288             }
289             else {
290                 serialized = new SerialClientImpl((ClientImpl) myRealClient);
291             }
292 
293             try {
294                 // Send the optimized requests.
295                 final List<Object> optimized = optimize(collection);
296                 for (final Object toSend : optimized) {
297                     if (toSend instanceof BatchedWriteCallback) {
298                         final BatchedWriteCallback cb = (BatchedWriteCallback) toSend;
299                         cb.setClient(serialized);
300                         cb.send();
301                     }
302                     else if (toSend instanceof Object[]) {
303                         final Object[] sendArg = (Object[]) toSend;
304                         if (sendArg.length == 2) {
305                             serialized.send((Message) sendArg[0],
306                                     (ReplyCallback) sendArg[1]);
307                         }
308                         else {
309                             serialized.send((Message) sendArg[0],
310                                     (Message) sendArg[1],
311                                     (ReplyCallback) sendArg[2]);
312                         }
313                     }
314                 }
315             }
316             finally {
317                 clear();
318             }
319         }
320 
321         /**
322          * {@inheritDoc}
323          * <p>
324          * Overridden to batch all {@link Client#send} operations.
325          * </p>
326          */
327         @Override
328         public synchronized Object invoke(final Object proxy,
329                 final Method method, final Object[] args) throws Throwable {
330 
331             final String methodName = method.getName();
332 
333             if (methodName.equals("send")) {
334                 mySendArgs.add(args);
335                 return null;
336             }
337             return method.invoke(myRealClient, args);
338         }
339 
340         /**
341          * Adds a delete to the batch.
342          * 
343          * @param delete
344          *            The delete to add to the batch.
345          * @param args
346          *            The raw send() arguments.
347          */
348         private void addDelete(final Delete delete, final Object[] args) {
349 
350             updateDurability(args);
351 
352             myRealCallbacks.add(extractCallback(args));
353             myWrite.delete(delete.getQuery(), delete.isSingleDelete());
354         }
355 
356         /**
357          * Adds an insert to the batch.
358          * 
359          * @param insert
360          *            The insert to add to the batch.
361          * @param args
362          *            The raw send() arguments.
363          */
364         private void addInsert(final Insert insert, final Object[] args) {
365 
366             updateDurability(args);
367 
368             final int docCount = insert.getDocuments().size();
369             Callback<Reply> cb = extractCallback(args);
370             final boolean breakBatch = (cb != null)
371                     && insert.isContinueOnError() && (docCount > 1);
372 
373             if (breakBatch) {
374                 closeBatch();
375                 myWrite.setMode(BatchedWriteMode.SERIALIZE_AND_STOP);
376             }
377             else {
378                 cb = new BatchedInsertCountingCallback(cb, docCount);
379             }
380 
381             for (final Document doc : insert.getDocuments()) {
382                 myWrite.insert(doc);
383                 myRealCallbacks.add(cb);
384             }
385             if (breakBatch) {
386                 closeBatch();
387             }
388         }
389 
390         /**
391          * Adds an update to the batch.
392          * 
393          * @param update
394          *            The update to add to the batch.
395          * @param args
396          *            The raw send() arguments.
397          */
398         private void addUpdate(final Update update, final Object[] args) {
399 
400             updateDurability(args);
401 
402             myRealCallbacks.add(extractCallback(args));
403             myWrite.update(update.getQuery(), update.getUpdate(),
404                     update.isMultiUpdate(), update.isUpsert());
405         }
406 
407         /**
408          * Closes the current batch of operations and re-initializes the batched
409          * writer.
410          */
411         private void closeBatch() {
412             final ClusterStats stats = myRealClient.getClusterStats();
413             final BatchedWrite w = myWrite.build();
414             final List<Bundle> bundles = w.toBundles(myCollection.getName(),
415                     stats.getSmallestMaxBsonObjectSize(),
416                     stats.getSmallestMaxBatchedWriteOperations());
417             if (!bundles.isEmpty()) {
418                 final BatchedWriteCallback cb = new BatchedWriteCallback(
419                         myCollection.getDatabaseName(), myCollection.getName(),
420                         myRealCallbacks, w, bundles);
421                 myResults.add(cb);
422             }
423 
424             myWrite.reset();
425             myWrite.setMode(myCollection.getMode());
426 
427             myRealCallbacks.clear();
428         }
429 
430         /**
431          * Extracts the callback from the write arguments. If the write has a
432          * {@link Callback} then it will be the last argument.
433          * 
434          * @param args
435          *            The arguments for the original {@link Client#send} call.
436          * @return The callback for the call. Returns null if there is no
437          *         {@link Callback}.
438          */
439         private Callback<Reply> extractCallback(final Object[] args) {
440             final Object cb = args[args.length - 1];
441             if (cb instanceof AbstractReplyCallback<?>) {
442                 return (AbstractReplyCallback<?>) args[2];
443             }
444 
445             return null;
446         }
447 
448         /**
449          * Tries the optimize the messages we will send to the server by
450          * coalescing the sequential insert, update and delete messages into the
451          * batched write commands of the same name.
452          * 
453          * @param collection
454          *            The collection we are sending requests to.
455          * @return The list of optimized messages.
456          */
457         private List<Object> optimize(
458                 final BatchedAsyncMongoCollectionImpl collection) {
459 
460             if (mySendArgs.isEmpty()) {
461                 return Collections.emptyList();
462             }
463 
464             final ClusterStats stats = myRealClient.getClusterStats();
465             final Version minVersion = stats.getServerVersionRange()
466                     .getLowerBounds();
467             final boolean supportsBatch = BATCH_WRITE_VERSION
468                     .compareTo(minVersion) <= 0;
469             if (supportsBatch) {
470                 myCollection = collection;
471 
472                 myWrite.reset();
473                 myWrite.setMode(collection.getMode());
474 
475                 myResults = new ArrayList<Object>(mySendArgs.size());
476                 myRealCallbacks = new ArrayList<Callback<Reply>>(
477                         mySendArgs.size());
478 
479                 while (!mySendArgs.isEmpty()) {
480                     final Object[] args = mySendArgs.remove(0);
481                     if (args[0] instanceof Insert) {
482                         addInsert((Insert) args[0], args);
483                     }
484                     else if (collection.isBatchUpdates()
485                             && (args[0] instanceof Update)) {
486                         addUpdate((Update) args[0], args);
487                     }
488                     else if (collection.isBatchDeletes()
489                             && (args[0] instanceof Delete)) {
490                         addDelete((Delete) args[0], args);
491                     }
492                     else {
493                         closeBatch();
494                         myResults.add(args);
495                     }
496 
497                     if (collection.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
498                         closeBatch();
499                     }
500                 }
501 
502                 closeBatch();
503             }
504             else {
505                 myResults = new ArrayList<Object>(mySendArgs.size());
506                 myResults.addAll(mySendArgs);
507 
508                 // Clear the sendArgs or they will get notified of a cancel.
509                 mySendArgs.clear();
510             }
511 
512             return myResults;
513         }
514 
515         /**
516          * Updates the durability for the batch. If the durability changes
517          * mid-batch then we force a break in the batch.
518          * 
519          * @param args
520          *            The arguments for the send() call. The
521          *            {@link GetLastError} will be the second of three
522          *            arguments.
523          */
524         private void updateDurability(final Object[] args) {
525 
526             Durability active = myWrite.getDurability();
527 
528             if ((args.length == 3) && (args[1] instanceof GetLastError)) {
529                 final GetLastError error = (GetLastError) args[1];
530 
531                 final Durability d = Durability.valueOf(error.getQuery()
532                         .toString());
533 
534                 if (active == null) {
535                     active = d;
536                     myWrite.setDurability(active);
537                 }
538                 else if (!d.equals(active) && !d.equals(Durability.ACK)
539                         && !d.equals(Durability.NONE)) {
540                     closeBatch();
541                     active = d;
542                     myWrite.setDurability(active);
543                 }
544             } // else Durability is none or not applicable.
545         }
546     }
547 }