View Javadoc
1   /*
2    * #%L
3    * BatchedWriteCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.callback;
22  
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.IdentityHashMap;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  
31  import com.allanbank.mongodb.Callback;
32  import com.allanbank.mongodb.Durability;
33  import com.allanbank.mongodb.MongoDbException;
34  import com.allanbank.mongodb.bson.Document;
35  import com.allanbank.mongodb.bson.Element;
36  import com.allanbank.mongodb.bson.NumericElement;
37  import com.allanbank.mongodb.bson.builder.BuilderFactory;
38  import com.allanbank.mongodb.bson.element.ArrayElement;
39  import com.allanbank.mongodb.bson.element.DocumentElement;
40  import com.allanbank.mongodb.builder.BatchedWrite;
41  import com.allanbank.mongodb.builder.BatchedWrite.Bundle;
42  import com.allanbank.mongodb.builder.BatchedWriteMode;
43  import com.allanbank.mongodb.builder.write.WriteOperation;
44  import com.allanbank.mongodb.client.Client;
45  import com.allanbank.mongodb.client.message.BatchedWriteCommand;
46  import com.allanbank.mongodb.client.message.Command;
47  import com.allanbank.mongodb.client.message.Reply;
48  import com.allanbank.mongodb.error.BatchedWriteException;
49  import com.allanbank.mongodb.util.Assertions;
50  
51  /**
52   * BatchedWriteCallback provides the global callback for the batched writes.
53   * 
54   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
55   *         mutated in incompatible ways between any two releases of the driver.
56   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
57   */
58  public class BatchedWriteCallback extends ReplyLongCallback {
59  
60      /** The list of bundles to send. */
61      private final List<BatchedWrite.Bundle> myBundles;
62  
63      /** The client to send messages with. */
64      private Client myClient;
65  
66      /** The name of the collection. */
67      private final String myCollectionName;
68  
69      /** The name of the database. */
70      private final String myDatabaseName;
71  
72      /** The list of write operations which failed. */
73      private final Map<WriteOperation, Throwable> myFailedOperations;
74  
75      /** The count of finished bundles or operations. */
76      private int myFinished;
77  
78      /** The result. */
79      private long myN = 0;
80  
81      /** The list of bundles waiting to be sent to the server. */
82      private final List<BatchedWrite.Bundle> myPendingBundles;
83  
84      /** The real callback for each operation. */
85      private final List<Callback<Reply>> myRealCallbacks;
86  
87      /** The list of write operations which have been skipped due to an error. */
88      private List<WriteOperation> mySkippedOperations;
89  
90      /** The original write operation. */
91      private final BatchedWrite myWrite;
92  
93      /**
94       * Creates a new BatchedWriteCallback.
95       * 
96       * @param databaseName
97       *            The name of the database.
98       * @param collectionName
99       *            The name of the collection.
100      * @param results
101      *            The callback for the final results.
102      * @param write
103      *            The original write.
104      * @param client
105      *            The client for sending the bundled write commands.
106      * @param bundles
107      *            The bundled writes.
108      */
109     public BatchedWriteCallback(final String databaseName,
110             final String collectionName, final Callback<Long> results,
111             final BatchedWrite write, final Client client,
112             final List<BatchedWrite.Bundle> bundles) {
113         super(results);
114 
115         myDatabaseName = databaseName;
116         myCollectionName = collectionName;
117         myWrite = write;
118         myClient = client;
119         myBundles = Collections
120                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
121 
122         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
123 
124         myFinished = 0;
125         myN = 0;
126 
127         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
128         mySkippedOperations = null;
129 
130         myRealCallbacks = Collections.emptyList();
131     }
132 
133     /**
134      * Creates a new BatchedWriteCallback.
135      * 
136      * @param databaseName
137      *            The name of the database.
138      * @param collectionName
139      *            The name of the collection.
140      * @param realCallbacks
141      *            The list of callbacks. One for each write.
142      * @param write
143      *            The original write.
144      * @param bundles
145      *            The bundled writes.
146      */
147     public BatchedWriteCallback(final String databaseName,
148             final String collectionName,
149             final List<Callback<Reply>> realCallbacks,
150             final BatchedWrite write, final List<Bundle> bundles) {
151         super(null);
152 
153         myDatabaseName = databaseName;
154         myCollectionName = collectionName;
155         myWrite = write;
156         myClient = null;
157         myBundles = Collections
158                 .unmodifiableList(new ArrayList<BatchedWrite.Bundle>(bundles));
159 
160         myPendingBundles = new LinkedList<BatchedWrite.Bundle>(myBundles);
161 
162         myFinished = 0;
163         myN = 0;
164 
165         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
166         mySkippedOperations = null;
167 
168         myRealCallbacks = new ArrayList<Callback<Reply>>(realCallbacks);
169 
170         int count = 0;
171         for (final Bundle b : myBundles) {
172             count += b.getWrites().size();
173         }
174         Assertions.assertThat(
175                 myRealCallbacks.size() == count,
176                 "There nust be an operation (" + count
177                         + ") in a bundle for each callback ("
178                         + myRealCallbacks.size() + ").");
179     }
180 
181     /**
182      * Sends the next set of operations to the server.
183      */
184     public void send() {
185 
186         List<BatchedWrite.Bundle> toSendBundles;
187         synchronized (this) {
188             List<BatchedWrite.Bundle> toSend = myPendingBundles;
189             if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
190                 toSend = myPendingBundles.subList(0, 1);
191             }
192 
193             // Clear toSend before sending so the callbacks see the right
194             // state for the bundles.
195             toSendBundles = new ArrayList<BatchedWrite.Bundle>(toSend);
196             toSend.clear();
197         } // Release lock.
198 
199         // Release the lock before sending to avoid deadlock in processing
200         // replies.
201 
202         // Batches....
203         for (final BatchedWrite.Bundle bundle : toSendBundles) {
204             final Command commandMsg = new BatchedWriteCommand(myDatabaseName,
205                     myCollectionName, bundle);
206 
207             // Our documents may be bigger than normally allowed...
208             commandMsg.setAllowJumbo(true);
209 
210             if (myWrite.getDurability() == Durability.NONE) {
211                 // Fake reply.
212                 final Document doc = BuilderFactory.start().add("ok", 1)
213                         .add("n", -1).build();
214                 final Reply reply = new Reply(0, 0, 0,
215                         Collections.singletonList(doc), false, false, false,
216                         false);
217 
218                 myClient.send(commandMsg, NoOpCallback.NO_OP);
219                 publish(bundle, reply);
220             }
221             else {
222                 myClient.send(commandMsg, new BundleCallback(bundle));
223 
224             }
225         }
226 
227         if ((myWrite.getDurability() == Durability.NONE)
228                 && myPendingBundles.isEmpty() && (myForwardCallback != null)) {
229             myForwardCallback.callback(-1L);
230         }
231     }
232 
233     /**
234      * Sets the client to use to send the bundled writes.
235      * 
236      * @param client
237      *            The new client for the batch.
238      */
239     public void setClient(final Client client) {
240         myClient = client;
241     }
242 
243     /**
244      * Callback for a bundle of write operations sent via the write commands.
245      * 
246      * @param bundle
247      *            The bundle of write operations.
248      * @param result
249      *            The result of the write operations.
250      */
251     protected synchronized void callback(final Bundle bundle, final Reply result) {
252         final MongoDbException error = asError(result);
253         if (error != null) {
254             // Everything failed...
255             exception(bundle, error);
256         }
257         else {
258             myFinished += 1;
259             myN += convert(result).longValue();
260 
261             // Want to run both the durability and write failure so just | here.
262             final boolean failed = failedDurability(bundle, result)
263                     | failedWrites(bundle, result);
264 
265             publish(bundle, result);
266 
267             if (failed) {
268                 publishResults();
269             }
270             else if (!myPendingBundles.isEmpty()) {
271                 send();
272             }
273             else if (myFinished == myBundles.size()) {
274                 publishResults();
275             }
276         }
277     }
278 
279     /**
280      * Callback for a bundle of write operations sent via the write commands has
281      * failed.
282      * 
283      * @param bundle
284      *            The bundle of write operations.
285      * @param thrown
286      *            The error for the operations.
287      */
288     protected synchronized void exception(final Bundle bundle,
289             final Throwable thrown) {
290         myFinished += 1;
291         for (final WriteOperation operation : bundle.getWrites()) {
292             myFailedOperations.put(operation, thrown);
293         }
294 
295         // No need to check if we have to send. Would have already sent all of
296         // the operations if not SERIALIZE_AND_STOP.
297         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
298             publishResults();
299         }
300         else if (myFinished == myBundles.size()) {
301             publishResults();
302         }
303     }
304 
305     /**
306      * Checks for a failure in the durability requirements (e.g., did not
307      * replicate to sufficient servers within the timeout) and updates the
308      * failed operations map if any are found.
309      * 
310      * @param bundle
311      *            The bundle for the reply.
312      * @param reply
313      *            The reply from the server.
314      * @return True if there are failed writes and we should not send any
315      *         additional requests.
316      */
317     private boolean failedDurability(final Bundle bundle, final Reply reply) {
318         final List<Document> results = reply.getResults();
319         if (results.size() == 1) {
320             final Document doc = results.get(0);
321             final DocumentElement error = doc.get(DocumentElement.class,
322                     "writeConcernError");
323             if (error != null) {
324                 final int code = toInt(error.get(NumericElement.class, "code"));
325                 final String errmsg = asString(error.get(Element.class,
326                         "errmsg"));
327                 final MongoDbException exception = asError(reply, 0, code,
328                         true, errmsg, null);
329                 for (final WriteOperation op : bundle.getWrites()) {
330                     myFailedOperations.put(op, exception);
331                 }
332             }
333         }
334 
335         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
336                 && !myFailedOperations.isEmpty();
337     }
338 
339     /**
340      * Checks for individual {@code writeErrors} and updates the failed
341      * operations map if any are found.
342      * 
343      * @param bundle
344      *            The bundle for the reply.
345      * @param reply
346      *            The reply from the server.
347      * @return True if there are failed writes and we should not send any
348      *         additional requests.
349      */
350     private boolean failedWrites(final Bundle bundle, final Reply reply) {
351         final List<Document> results = reply.getResults();
352         if (results.size() == 1) {
353             final Document doc = results.get(0);
354             final ArrayElement errors = doc.get(ArrayElement.class,
355                     "writeErrors");
356             if (errors != null) {
357                 final List<WriteOperation> operations = bundle.getWrites();
358                 for (final DocumentElement error : errors.find(
359                         DocumentElement.class, ".*")) {
360                     final int index = toInt(error.get(NumericElement.class,
361                             "index"));
362                     final int code = toInt(error.get(NumericElement.class,
363                             "code"));
364                     final String errmsg = asString(error.get(Element.class,
365                             "errmsg"));
366 
367                     if ((0 <= index) && (index < operations.size())) {
368                         final WriteOperation op = operations.get(index);
369 
370                         myFailedOperations.put(op,
371                                 asError(reply, 0, code, false, errmsg, null));
372 
373                         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
374                             mySkippedOperations = new ArrayList<WriteOperation>();
375                             mySkippedOperations.addAll(operations.subList(
376                                     index + 1, operations.size()));
377                         }
378                     }
379                 }
380             }
381         }
382 
383         return (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP)
384                 && !myFailedOperations.isEmpty();
385     }
386 
387     /**
388      * Publishes the results for an individual bundle.
389      * 
390      * @param bundle
391      *            The bundle that we received the results for.
392      * @param reply
393      *            The received reply.
394      */
395     private void publish(final Bundle bundle, final Reply reply) {
396         if (myForwardCallback == null) {
397             // Publish to each callback.
398             int index = 0;
399             for (final Bundle b : myBundles) {
400                 final List<WriteOperation> writes = b.getWrites();
401                 final int count = writes.size();
402 
403                 // Bundles can compare logically the same but still be
404                 // different.
405                 if (b == bundle) {
406                     for (int i = 0; i < count; ++i) {
407                         // Replace the callback to avoid double calls.
408                         final Throwable t = myFailedOperations.get(writes
409                                 .get(i));
410                         final Callback<Reply> cb = myRealCallbacks.set(index
411                                 + i, NoOpCallback.NO_OP);
412                         if (cb != null) {
413                             if (t == null) {
414                                 // Worked
415                                 cb.callback(reply);
416                             }
417                             else {
418                                 cb.exception(t);
419                             }
420                         }
421                     }
422                     break; // for(Bundle)
423                 }
424 
425                 index += count;
426             }
427         }
428     }
429 
430     /**
431      * Publishes the final results.
432      */
433     private void publishResults() {
434         if (myFailedOperations.isEmpty()) {
435             if (myForwardCallback != null) {
436                 myForwardCallback.callback(Long.valueOf(myN));
437             }
438             // If there are no failures then all of the real call-backs have
439             // already been triggered.
440         }
441         else {
442             if (mySkippedOperations == null) {
443                 mySkippedOperations = new ArrayList<WriteOperation>();
444             }
445             for (final Bundle pending : myPendingBundles) {
446                 mySkippedOperations.addAll(pending.getWrites());
447             }
448 
449             if (myForwardCallback != null) {
450                 // If there is only 1 operation and it failed then just publish
451                 // that error.
452                 if ((myBundles.size() == 1)
453                         && (myBundles.get(0).getWrites().size() == 1)
454                         && (myFailedOperations.size() == 1)) {
455                     myForwardCallback.exception(myFailedOperations.values()
456                             .iterator().next());
457                 }
458                 else {
459                     myForwardCallback.exception(new BatchedWriteException(
460                             myWrite, myN, mySkippedOperations,
461                             myFailedOperations));
462                 }
463             }
464             else {
465                 // Publish to each callback.
466                 final List<WriteOperation> emptySkipped = Collections
467                         .emptyList();
468                 final Map<WriteOperation, Throwable> emptyError = Collections
469                         .emptyMap();
470 
471                 // For fast lookup and lookup by identity.
472                 final Set<WriteOperation> skipped = Collections
473                         .newSetFromMap(new IdentityHashMap<WriteOperation, Boolean>());
474                 skipped.addAll(mySkippedOperations);
475 
476                 final Document doc = BuilderFactory.start().add("ok", 1)
477                         .add("n", myN).build();
478                 final Reply reply = new Reply(0, 0, 0,
479                         Collections.singletonList(doc), false, false, false,
480                         false);
481 
482                 int index = 0;
483                 for (final Bundle b : myBundles) {
484                     for (final WriteOperation op : b.getWrites()) {
485                         final Callback<Reply> cb = myRealCallbacks.get(index);
486 
487                         if (cb != null) {
488                             // Did this write fail?
489                             final Throwable thrown = myFailedOperations.get(op);
490                             if (thrown != null) {
491                                 cb.exception(new BatchedWriteException(myWrite,
492                                         myN, emptySkipped, Collections
493                                                 .singletonMap(op, thrown)));
494                             }
495                             else if (skipped.contains(op)) {
496                                 // Skipped the write.
497                                 cb.exception(new BatchedWriteException(myWrite,
498                                         myN, Collections.singletonList(op),
499                                         emptyError));
500                             }
501                             else {
502                                 // Worked
503                                 cb.callback(reply);
504                             }
505                         }
506 
507                         // Next...
508                         index += 1;
509                     }
510                 }
511             }
512         }
513     }
514 
515     /**
516      * BundleCallback provides the callback for a single batched write.
517      * 
518      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
519      *         be mutated in incompatible ways between any two releases of the
520      *         driver.
521      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
522      */
523     /* package */class BundleCallback implements ReplyCallback {
524 
525         /**
526          * The bundle of operations this callback is waiting for the reply from.
527          */
528         private final Bundle myBundle;
529 
530         /**
531          * Creates a new BatchedWriteBundleCallback.
532          * 
533          * @param bundle
534          *            The bundle of operations this callback is waiting for the
535          *            reply from.
536          */
537         public BundleCallback(final Bundle bundle) {
538             myBundle = bundle;
539         }
540 
541         /**
542          * {@inheritDoc}
543          * <p>
544          * Overridden to forward the results to the parent callback.
545          * </p>
546          */
547         @Override
548         public void callback(final Reply result) {
549             BatchedWriteCallback.this.callback(myBundle, result);
550         }
551 
552         /**
553          * {@inheritDoc}
554          * <p>
555          * Overridden to forward the error to the parent callback.
556          * </p>
557          */
558         @Override
559         public void exception(final Throwable thrown) {
560             BatchedWriteCallback.this.exception(myBundle, thrown);
561         }
562 
563         /**
564          * {@inheritDoc}
565          * <p>
566          * Overridden to return false.
567          * </p>
568          */
569         @Override
570         public boolean isLightWeight() {
571             return false;
572         }
573     }
574 }