Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
BatchedWrite |
|
| 2.5641025641025643;2.564 | ||||
BatchedWrite$1 |
|
| 2.5641025641025643;2.564 | ||||
BatchedWrite$Builder |
|
| 2.5641025641025643;2.564 | ||||
BatchedWrite$Bundle |
|
| 2.5641025641025643;2.564 |
1 | /* | |
2 | * #%L | |
3 | * BatchedWrite.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | ||
21 | package com.allanbank.mongodb.builder; | |
22 | ||
23 | import java.io.Serializable; | |
24 | import java.util.ArrayList; | |
25 | import java.util.Collections; | |
26 | import java.util.LinkedHashMap; | |
27 | import java.util.LinkedList; | |
28 | import java.util.List; | |
29 | import java.util.Map; | |
30 | import java.util.SortedMap; | |
31 | import java.util.TreeMap; | |
32 | ||
33 | import com.allanbank.mongodb.BatchedAsyncMongoCollection; | |
34 | import com.allanbank.mongodb.Durability; | |
35 | import com.allanbank.mongodb.MongoCollection; | |
36 | import com.allanbank.mongodb.Version; | |
37 | import com.allanbank.mongodb.bson.Document; | |
38 | import com.allanbank.mongodb.bson.DocumentAssignable; | |
39 | import com.allanbank.mongodb.bson.Element; | |
40 | import com.allanbank.mongodb.bson.builder.ArrayBuilder; | |
41 | import com.allanbank.mongodb.bson.builder.BuilderFactory; | |
42 | import com.allanbank.mongodb.bson.builder.DocumentBuilder; | |
43 | import com.allanbank.mongodb.bson.impl.EmptyDocument; | |
44 | import com.allanbank.mongodb.builder.write.DeleteOperation; | |
45 | import com.allanbank.mongodb.builder.write.InsertOperation; | |
46 | import com.allanbank.mongodb.builder.write.UpdateOperation; | |
47 | import com.allanbank.mongodb.builder.write.WriteOperation; | |
48 | import com.allanbank.mongodb.builder.write.WriteOperationType; | |
49 | import com.allanbank.mongodb.error.DocumentToLargeException; | |
50 | ||
51 | /** | |
52 | * BatchedWrite provides a container for a group of write operations to be sent | |
53 | * to the server as one group. | |
54 | * <p> | |
55 | * The default mode ({@link BatchedWriteMode#SERIALIZE_AND_CONTINUE}) for this | |
56 | * class is to submit the operations to the server in the order that they are | |
57 | * added to the Builder and to apply as many of the writes as possible (commonly | |
58 | * referred to as continue-on-error). This has the effect of causing the fewest | |
59 | * surprises and optimizing the performance of the writes since the driver can | |
60 | * send multiple distinct writes to the server at once. | |
61 | * </p> | |
62 | * <p> | |
63 | * The {@link BatchedWriteMode#SERIALIZE_AND_STOP} mode also sends each write as | |
64 | * a separate request but instead of attempting all writes the driver will stop | |
65 | * sending requests once one of the writes fails. This also prevents the driver | |
66 | * from sending multiple write messages to the server which can degrade | |
67 | * performance. | |
68 | * </p> | |
69 | * <p> | |
70 | * The last mode, {@link BatchedWriteMode#REORDERED}, may re-order writes to | |
71 | * maximize performance. Similar to the | |
72 | * {@link BatchedWriteMode#SERIALIZE_AND_CONTINUE} this mode will also attempt | |
73 | * all writes. The reordering of writes is across all {@link WriteOperationType} | |
74 | * s. | |
75 | * </p> | |
76 | * <p> | |
77 | * If using a MongoDB server after {@link #REQUIRED_VERSION 2.5.5} a batched | |
78 | * write will result in use of the new write commands. | |
79 | * </p> | |
80 | * <p> | |
81 | * For a more generalized batched write and query capability see the | |
82 | * {@link BatchedAsyncMongoCollection} and {@link MongoCollection#startBatch()}. | |
83 | * </p> | |
84 | * | |
85 | * @api.yes This class is part of the driver's API. Public and protected members | |
86 | * will be deprecated for at least 1 non-bugfix release (version | |
87 | * numbers are <major>.<minor>.<bugfix>) before being | |
88 | * removed or modified. | |
89 | * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved | |
90 | */ | |
91 | public class BatchedWrite implements Serializable { | |
92 | ||
93 | /** The first version of MongoDB to support the {@code aggregate} command. */ | |
94 | 1 | public static final Version REQUIRED_VERSION = Version.parse("2.5.5"); |
95 | ||
96 | /** Serialization version for the class. */ | |
97 | private static final long serialVersionUID = 6984498574755719178L; | |
98 | ||
99 | /** | |
100 | * Creates a new builder for a {@link BatchedWrite}. | |
101 | * | |
102 | * @return The builder to construct a {@link BatchedWrite}. | |
103 | */ | |
104 | public static Builder builder() { | |
105 | 65 | return new Builder(); |
106 | } | |
107 | ||
108 | /** | |
109 | * Create a batched write with a single delete operation. Users can just use | |
110 | * the {@link MongoCollection#delete} variants and the driver will convert | |
111 | * the deletes to batched writes as appropriate. | |
112 | * <p> | |
113 | * This method avoids the construction of a builder. | |
114 | * </p> | |
115 | * | |
116 | * @param query | |
117 | * The query to find the documents to delete. | |
118 | * @param singleDelete | |
119 | * If true then only a single document will be deleted. If | |
120 | * running in a sharded environment then this field must be false | |
121 | * or the query must contain the shard key. | |
122 | * @param durability | |
123 | * The durability of the delete. | |
124 | * @return The BatchedWrite with the single delete. | |
125 | */ | |
126 | public static BatchedWrite delete(final DocumentAssignable query, | |
127 | final boolean singleDelete, final Durability durability) { | |
128 | 1 | final DeleteOperation op = new DeleteOperation(query, singleDelete); |
129 | 1 | return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE, |
130 | durability); | |
131 | } | |
132 | ||
133 | /** | |
134 | * Create a batched write with a single inserts operation. Users can just | |
135 | * use the {@link MongoCollection#insert} variants and the driver will | |
136 | * convert the inserts to batched writes as appropriate. | |
137 | * <p> | |
138 | * This method avoids the construction of a builder. | |
139 | * </p> | |
140 | * | |
141 | * @param continueOnError | |
142 | * If the insert should continue if one of the documents causes | |
143 | * an error. | |
144 | * @param durability | |
145 | * The durability for the insert. | |
146 | * @param documents | |
147 | * The documents to add to the collection. | |
148 | * @return The BatchedWrite with the inserts. | |
149 | */ | |
150 | public static BatchedWrite insert(final boolean continueOnError, | |
151 | final Durability durability, final DocumentAssignable... documents) { | |
152 | 1 | final List<WriteOperation> ops = new ArrayList<WriteOperation>( |
153 | documents.length); | |
154 | 2 | for (final DocumentAssignable doc : documents) { |
155 | 1 | ops.add(new InsertOperation(doc)); |
156 | } | |
157 | 1 | return new BatchedWrite(ops, |
158 | continueOnError ? BatchedWriteMode.SERIALIZE_AND_CONTINUE | |
159 | : BatchedWriteMode.SERIALIZE_AND_STOP, durability); | |
160 | } | |
161 | ||
162 | /** | |
163 | * Create a batched write with a single update operation. Users can just use | |
164 | * the {@link MongoCollection#update} variants and the driver will convert | |
165 | * the updates to batched writes as appropriate. | |
166 | * | |
167 | * @param query | |
168 | * The query for the update. | |
169 | * @param update | |
170 | * The update for the update. | |
171 | * @param multiUpdate | |
172 | * If true then the update will update multiple documents. | |
173 | * @param upsert | |
174 | * If no document is found then upsert the document. | |
175 | * @param durability | |
176 | * The durability of the update. | |
177 | * @return The BatchedWrite with the single update. | |
178 | */ | |
179 | public static BatchedWrite update(final DocumentAssignable query, | |
180 | final DocumentAssignable update, final boolean multiUpdate, | |
181 | final boolean upsert, final Durability durability) { | |
182 | 1 | final UpdateOperation op = new UpdateOperation(query, update, |
183 | multiUpdate, upsert); | |
184 | 1 | return new BatchedWrite(op, BatchedWriteMode.SERIALIZE_AND_CONTINUE, |
185 | durability); | |
186 | } | |
187 | ||
188 | /** The durability for the writes. */ | |
189 | private final Durability myDurability; | |
190 | ||
191 | /** The mode for submitting the writes to the server. */ | |
192 | private final BatchedWriteMode myMode; | |
193 | ||
194 | /** The writes to submit to the server. */ | |
195 | private final List<WriteOperation> myWrites; | |
196 | ||
197 | /** | |
198 | * Creates a new BatchedWrite. | |
199 | * | |
200 | * @param builder | |
201 | * The builder for the writes. | |
202 | */ | |
203 | 72 | protected BatchedWrite(final Builder builder) { |
204 | 72 | myWrites = Collections.unmodifiableList(new ArrayList<WriteOperation>( |
205 | builder.myWrites)); | |
206 | 72 | myMode = builder.myMode; |
207 | 72 | myDurability = builder.myDurability; |
208 | 72 | } |
209 | ||
210 | /** | |
211 | * Creates a new BatchedWrite. | |
212 | * | |
213 | * @param ops | |
214 | * The operations for the batch. | |
215 | * @param mode | |
216 | * The mode for the batch. | |
217 | * @param durability | |
218 | * The durability for the batch. | |
219 | */ | |
220 | private BatchedWrite(final List<WriteOperation> ops, | |
221 | 3 | final BatchedWriteMode mode, final Durability durability) { |
222 | 3 | myWrites = Collections.unmodifiableList(ops); |
223 | 3 | myMode = mode; |
224 | 3 | myDurability = durability; |
225 | 3 | } |
226 | ||
227 | /** | |
228 | * Creates a new BatchedWrite. | |
229 | * | |
230 | * @param op | |
231 | * The single operation for the batch. | |
232 | * @param mode | |
233 | * The mode for the batch. | |
234 | * @param durability | |
235 | * The durability for the batch. | |
236 | */ | |
237 | private BatchedWrite(final WriteOperation op, final BatchedWriteMode mode, | |
238 | final Durability durability) { | |
239 | 2 | this(Collections.singletonList(op), mode, durability); |
240 | 2 | } |
241 | ||
242 | /** | |
243 | * Returns the durability for the writes. | |
244 | * | |
245 | * @return The durability for the writes. | |
246 | */ | |
247 | public Durability getDurability() { | |
248 | 1042 | return myDurability; |
249 | } | |
250 | ||
251 | /** | |
252 | * Returns the mode for submitting the writes to the server. | |
253 | * | |
254 | * @return The mode for submitting the writes to the server. | |
255 | */ | |
256 | public BatchedWriteMode getMode() { | |
257 | 238 | return myMode; |
258 | } | |
259 | ||
260 | /** | |
261 | * Returns the writes to submit to the server. | |
262 | * | |
263 | * @return The writes to submit to the server. | |
264 | */ | |
265 | public List<WriteOperation> getWrites() { | |
266 | 79 | return myWrites; |
267 | } | |
268 | ||
269 | /** | |
270 | * Creates write commands for all of the insert, updates and deletes. The | |
271 | * number and order of the writes is based on the {@link #getMode() mode}. | |
272 | * | |
273 | * @param collectionName | |
274 | * The name of the collection the documents will be inserted | |
275 | * into. | |
276 | * @param maxCommandSize | |
277 | * The maximum document size. | |
278 | * @param maxOperationsPerBundle | |
279 | * The maximum number of writes to include in each bundle. | |
280 | * @return The list of command documents to be sent. | |
281 | */ | |
282 | public List<Bundle> toBundles(final String collectionName, | |
283 | final long maxCommandSize, final int maxOperationsPerBundle) { | |
284 | 1 | switch (getMode()) { |
285 | case REORDERED: { | |
286 | 4 | return createOptimized(collectionName, maxCommandSize, |
287 | maxOperationsPerBundle); | |
288 | } | |
289 | case SERIALIZE_AND_CONTINUE: { | |
290 | 42 | return createSerialized(collectionName, maxCommandSize, |
291 | maxOperationsPerBundle, false); | |
292 | } | |
293 | default: { | |
294 | 9 | return createSerialized(collectionName, maxCommandSize, |
295 | maxOperationsPerBundle, true); | |
296 | } | |
297 | } | |
298 | } | |
299 | ||
300 | /** | |
301 | * Adds the document to the array of documents. | |
302 | * | |
303 | * @param array | |
304 | * The array to add the operation to. | |
305 | * @param operation | |
306 | * The operation to add. | |
307 | */ | |
308 | private void add(final ArrayBuilder array, final WriteOperation operation) { | |
309 | 1 | switch (operation.getType()) { |
310 | case INSERT: { | |
311 | 200042 | final InsertOperation insertOperation = (InsertOperation) operation; |
312 | ||
313 | 200042 | array.add(insertOperation.getDocument()); |
314 | 200042 | break; |
315 | } | |
316 | case UPDATE: { | |
317 | 400070 | final UpdateOperation updateOperation = (UpdateOperation) operation; |
318 | 400070 | final DocumentBuilder update = array.push(); |
319 | ||
320 | 400070 | update.add("q", updateOperation.getQuery()); |
321 | 400070 | update.add("u", updateOperation.getUpdate()); |
322 | 400070 | if (updateOperation.isUpsert()) { |
323 | 100018 | update.add("upsert", true); |
324 | } | |
325 | 400070 | if (updateOperation.isMultiUpdate()) { |
326 | 100010 | update.add("multi", true); |
327 | } | |
328 | break; | |
329 | } | |
330 | case DELETE: { | |
331 | 300051 | final DeleteOperation deleteOperation = (DeleteOperation) operation; |
332 | 300051 | array.push().add("q", deleteOperation.getQuery()) |
333 | .add("limit", deleteOperation.isSingleDelete() ? 1 : 0); | |
334 | 300051 | break; |
335 | } | |
336 | } | |
337 | 900163 | } |
338 | ||
339 | /** | |
340 | * Adds the durability ('writeConcern') to the command document. | |
341 | * | |
342 | * @param command | |
343 | * The command document to add the durability to. | |
344 | * @param durability | |
345 | * The durability to add. May be <code>null</code>. | |
346 | */ | |
347 | private void addDurability(final DocumentBuilder command, | |
348 | final Durability durability) { | |
349 | 919 | if (durability != null) { |
350 | 27 | final DocumentBuilder durabilityDoc = command.push("writeConcern"); |
351 | 27 | if (durability.equals(Durability.NONE)) { |
352 | 6 | durabilityDoc.add("w", 0); |
353 | } | |
354 | 21 | else if (durability.equals(Durability.ACK)) { |
355 | 18 | durabilityDoc.add("w", 1); |
356 | } | |
357 | else { | |
358 | 3 | boolean first = true; |
359 | 3 | for (final Element part : durability.asDocument()) { |
360 | 9 | if (first) { |
361 | // The first element is "getlasterror". | |
362 | 3 | first = false; |
363 | } | |
364 | else { | |
365 | 6 | durabilityDoc.add(part); |
366 | } | |
367 | 9 | } |
368 | } | |
369 | } | |
370 | 919 | } |
371 | ||
372 | /** | |
373 | * Creates a {@link DocumentToLargeException} for the operation. | |
374 | * | |
375 | * @param operation | |
376 | * The large operation. | |
377 | * @param size | |
378 | * The size of the operation. | |
379 | * @param maxCommandSize | |
380 | * The maximum size of the operation. | |
381 | * @return The created exception. | |
382 | */ | |
383 | private DocumentToLargeException createDocumentToLargeException( | |
384 | final WriteOperation operation, final int size, | |
385 | final int maxCommandSize) { | |
386 | ||
387 | 4 | Document doc = EmptyDocument.INSTANCE; |
388 | ||
389 | 4 | switch (operation.getType()) { |
390 | case INSERT: { | |
391 | 2 | final InsertOperation insertOperation = (InsertOperation) operation; |
392 | 2 | doc = insertOperation.getDocument(); |
393 | 2 | break; |
394 | } | |
395 | case UPDATE: { | |
396 | 1 | final UpdateOperation updateOperation = (UpdateOperation) operation; |
397 | 1 | doc = updateOperation.getQuery(); |
398 | 1 | final Document update = updateOperation.getUpdate(); |
399 | 1 | if (doc.size() < update.size()) { |
400 | 0 | doc = update; |
401 | } | |
402 | break; | |
403 | } | |
404 | case DELETE: { | |
405 | 1 | final DeleteOperation deleteOperation = (DeleteOperation) operation; |
406 | 1 | doc = deleteOperation.getQuery(); |
407 | 1 | break; |
408 | } | |
409 | } | |
410 | ||
411 | 4 | return new DocumentToLargeException(size, maxCommandSize, doc); |
412 | } | |
413 | ||
414 | /** | |
415 | * Reorders the writes into as few write commands as possible. | |
416 | * <p> | |
417 | * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a | |
418 | * href= | |
419 | * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56" | |
420 | * >16K</a>). This is for the command overhead. We don't explicitly use the | |
421 | * overhead but we may end up implicitly using it in the case of a operation | |
422 | * that is just at or below maxCommandSize. For those cases we start the | |
423 | * 'head' map below with the full map. That allows the big operations to be | |
424 | * added to command documents of there own once the command overhead has | |
425 | * been factored in. | |
426 | * </p> | |
427 | * | |
428 | * @param collectionName | |
429 | * The name of the collection the documents will be inserted | |
430 | * into. | |
431 | * @param maxCommandSize | |
432 | * The maximum document size. | |
433 | * @param maxOperationsPerBundle | |
434 | * The maximum number of writes to include in each bundle. | |
435 | * @return The list of command documents to be sent. | |
436 | */ | |
437 | private List<Bundle> createOptimized(final String collectionName, | |
438 | final long maxCommandSize, final int maxOperationsPerBundle) { | |
439 | // Bucket the operations and sort by size. | |
440 | Map<WriteOperationType, SortedMap<Long, List<WriteOperation>>> operationsBuckets; | |
441 | 4 | operationsBuckets = new LinkedHashMap<WriteOperationType, SortedMap<Long, List<WriteOperation>>>(); |
442 | 4 | for (final WriteOperation writeOp : getWrites()) { |
443 | 800019 | SortedMap<Long, List<WriteOperation>> operations = operationsBuckets |
444 | .get(writeOp.getType()); | |
445 | 800019 | if (operations == null) { |
446 | 10 | operations = new TreeMap<Long, List<WriteOperation>>(); |
447 | 10 | operationsBuckets.put(writeOp.getType(), operations); |
448 | } | |
449 | ||
450 | 800019 | final Long size = Long.valueOf(sizeOf(-1, writeOp)); |
451 | 800019 | List<WriteOperation> list = operations.get(size); |
452 | 800019 | if (list == null) { |
453 | 12 | list = new LinkedList<WriteOperation>(); |
454 | 12 | operations.put(size, list); |
455 | } | |
456 | 800019 | list.add(writeOp); |
457 | 800019 | } |
458 | ||
459 | // Check if any operation is too big. | |
460 | 4 | final Long maxMessageSize = Long.valueOf(maxCommandSize + 1); |
461 | 4 | for (final SortedMap<Long, List<WriteOperation>> operations : operationsBuckets |
462 | .values()) { | |
463 | 10 | if (!operations.tailMap(maxMessageSize).isEmpty()) { |
464 | 1 | final Long biggest = operations.lastKey(); |
465 | 1 | final List<WriteOperation> operation = operations.get(biggest); |
466 | 1 | throw createDocumentToLargeException(operation.get(0), |
467 | biggest.intValue(), (int) maxCommandSize); | |
468 | } | |
469 | 9 | } |
470 | ||
471 | // Now build commands packing the operations into a few messages as | |
472 | // possible. | |
473 | 3 | final List<Bundle> commands = new ArrayList<Bundle>(); |
474 | 3 | final List<WriteOperation> bundled = new ArrayList<WriteOperation>( |
475 | Math.min(maxOperationsPerBundle, myWrites.size())); | |
476 | 3 | final DocumentBuilder command = BuilderFactory.start(); |
477 | 3 | for (final Map.Entry<WriteOperationType, SortedMap<Long, List<WriteOperation>>> entry : operationsBuckets |
478 | .entrySet()) { | |
479 | 9 | final SortedMap<Long, List<WriteOperation>> operations = entry |
480 | .getValue(); | |
481 | 818 | while (!operations.isEmpty()) { |
482 | 809 | final ArrayBuilder docs = start(entry.getKey(), collectionName, |
483 | false, command); | |
484 | 809 | long remaining = maxCommandSize - command.build().size(); |
485 | ||
486 | 809 | SortedMap<Long, List<WriteOperation>> head = operations; |
487 | 809 | int index = 0; |
488 | 800827 | while (!head.isEmpty() |
489 | && (bundled.size() < maxOperationsPerBundle)) { | |
490 | 800018 | final Long biggest = head.lastKey(); |
491 | 800018 | final List<WriteOperation> bigOps = head.get(biggest); |
492 | 800018 | final WriteOperation operation = bigOps.remove(0); |
493 | 800018 | if (bigOps.isEmpty()) { |
494 | 11 | head.remove(biggest); |
495 | } | |
496 | ||
497 | 800018 | add(docs, operation); |
498 | 800018 | bundled.add(operation); |
499 | ||
500 | 800018 | remaining -= sizeOf(index, operation); |
501 | 800018 | index += 1; |
502 | 800018 | head = operations.headMap(Long.valueOf(remaining |
503 | - sizeOfIndex(index))); | |
504 | 800018 | } |
505 | ||
506 | 809 | commands.add(new Bundle(command.build(), bundled)); |
507 | 809 | bundled.clear(); |
508 | 809 | } |
509 | 9 | } |
510 | ||
511 | 3 | return commands; |
512 | } | |
513 | ||
514 | /** | |
515 | * Creates write commands for each sequence of insert, updates and deletes. | |
516 | * <p> | |
517 | * <b>Note</b>: MongoDB gives a slightly larger document for the command (<a | |
518 | * href= | |
519 | * "https://github.com/mongodb/mongo/blob/master/src/mongo/bson/util/builder.h#L56" | |
520 | * >16K</a>). This is for the command overhead. We don't explicitly use the | |
521 | * overhead but we may end up using it in the case of a operation that is | |
522 | * just at or below maxCommandSize. That is why we start the 'head' map | |
523 | * below with the full map. That allows those big operations to be added to | |
524 | * commands of there own once the command overhead has been factored in. | |
525 | * </p> | |
526 | * | |
527 | * @param collectionName | |
528 | * The name of the collection the documents will be inserted | |
529 | * into. | |
530 | * @param maxCommandSize | |
531 | * The maximum document size. | |
532 | * @param stopOnError | |
533 | * If true then the ordered flag is set to true. | |
534 | * @param maxOperationsPerBundle | |
535 | * The maximum number of writes to include in each bundle. | |
536 | * @return The list of command documents to be sent. | |
537 | */ | |
538 | private List<Bundle> createSerialized(final String collectionName, | |
539 | final long maxCommandSize, final int maxOperationsPerBundle, | |
540 | final boolean stopOnError) { | |
541 | 51 | final List<Bundle> commands = new ArrayList<Bundle>(); |
542 | 51 | final DocumentBuilder command = BuilderFactory.start(); |
543 | ||
544 | 51 | final List<WriteOperation> toSend = getWrites(); |
545 | 51 | final List<WriteOperation> bundled = new ArrayList<WriteOperation>( |
546 | Math.min(maxOperationsPerBundle, myWrites.size())); | |
547 | ||
548 | 51 | ArrayBuilder opsArray = null; |
549 | 51 | WriteOperationType lastType = null; |
550 | ||
551 | 51 | long remaining = maxCommandSize; |
552 | 51 | for (final WriteOperation writeOp : toSend) { |
553 | 100148 | long size = sizeOf(-1, writeOp); |
554 | 100148 | final long indexSize = sizeOfIndex(bundled.size()); |
555 | 100148 | if (maxCommandSize < size) { |
556 | 3 | throw createDocumentToLargeException(writeOp, (int) size, |
557 | (int) maxCommandSize); | |
558 | } | |
559 | 100145 | size += indexSize; // Add in the index overhead. |
560 | ||
561 | // Close a command if change type or too big. | |
562 | 100145 | if (!bundled.isEmpty() |
563 | && ((lastType != writeOp.getType()) | |
564 | || ((remaining - size) < 0) || (maxOperationsPerBundle <= bundled | |
565 | .size()))) { | |
566 | 67 | commands.add(new Bundle(command.build(), bundled)); |
567 | 67 | bundled.clear(); |
568 | } | |
569 | ||
570 | // Start a command? - Maybe after closing? | |
571 | 100145 | if (bundled.isEmpty()) { |
572 | 110 | opsArray = start(writeOp.getType(), collectionName, |
573 | stopOnError, command); | |
574 | 110 | lastType = writeOp.getType(); |
575 | 110 | remaining = (maxCommandSize - command.build().size()); |
576 | } | |
577 | ||
578 | // Add the operation. | |
579 | 100145 | add(opsArray, writeOp); |
580 | 100145 | bundled.add(writeOp); |
581 | ||
582 | // Remove the size of the operation from the remaining. | |
583 | 100145 | remaining -= size; |
584 | 100145 | } |
585 | ||
586 | 48 | if (!bundled.isEmpty()) { |
587 | 43 | commands.add(new Bundle(command.build(), bundled)); |
588 | } | |
589 | ||
590 | 48 | return commands; |
591 | } | |
592 | ||
593 | /** | |
594 | * Returns the size of the encoded operation. | |
595 | * <p> | |
596 | * For an {@code InsertOperation} this is the size of the document to | |
597 | * insert. | |
598 | * <p> | |
599 | * For an {@code UpdateOperation} this includes the space for: | |
600 | * <dl> | |
601 | * <dt>Document Overhead</dt> | |
602 | * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd> | |
603 | * <dt>'q' field</dt> | |
604 | * <dd>name (2 bytes), type (1 byte), value (document size)</dd> | |
605 | * <dt>'u' field</dt> | |
606 | * <dd>name (2 bytes), type (1 byte), value (document size)</dd> | |
607 | * <dt>'upsert' field</dt> | |
608 | * <dd>name (7 bytes), type (1 byte), value (1 byte)</dd> | |
609 | * <dt>'multi' field</dt> | |
610 | * <dd>name (6 bytes), type (1 byte), value (1 byte)</dd> | |
611 | * </dl> | |
612 | * </p> | |
613 | * <p> | |
614 | * For a {@code DeleteOperation} this includes the space for: | |
615 | * <dl> | |
616 | * <dt>Document Overhead</dt> | |
617 | * <dd>type (1 byte), length (4 bytes), trailing null (1 byte).</dd> | |
618 | * <dt>'q' field</dt> | |
619 | * <dd>name (2 bytes), type (1 byte), value (document size)</dd> | |
620 | * <dt>'limit' field</dt> | |
621 | * <dd>name (6 bytes), type (1 byte), value (4 bytes)</dd> | |
622 | * </dl> | |
623 | * | |
624 | * @param index | |
625 | * The index of the operation in the operations array. | |
626 | * @param operation | |
627 | * The operation to determine the size of. | |
628 | * @return The size of the operation. | |
629 | */ | |
630 | private long sizeOf(final int index, final WriteOperation operation) { | |
631 | 1700185 | long result = 0; |
632 | 1700185 | switch (operation.getType()) { |
633 | case INSERT: { | |
634 | 300046 | final InsertOperation insertOperation = (InsertOperation) operation; |
635 | 300046 | result = sizeOfIndex(index) + insertOperation.getDocument().size(); |
636 | 300046 | break; |
637 | } | |
638 | case UPDATE: { | |
639 | 800081 | final UpdateOperation updateOperation = (UpdateOperation) operation; |
640 | 800081 | result = sizeOfIndex(index) + updateOperation.getQuery().size() |
641 | + updateOperation.getUpdate().size() + 29; | |
642 | 800081 | break; |
643 | } | |
644 | case DELETE: { | |
645 | 600058 | final DeleteOperation deleteOperation = (DeleteOperation) operation; |
646 | 600058 | result = sizeOfIndex(index) + deleteOperation.getQuery().size() |
647 | + 20; | |
648 | 600058 | break; |
649 | } | |
650 | } | |
651 | ||
652 | 1700185 | return result; |
653 | } | |
654 | ||
655 | /** | |
656 | * Returns the number of bytes required to encode the index within the array | |
657 | * element. | |
658 | * | |
659 | * @param index | |
660 | * The index to return the size of. | |
661 | * @return The length of the encoded index. | |
662 | */ | |
663 | private long sizeOfIndex(final int index) { | |
664 | // For 2.6 the number of items in the array is capped at 1000. This | |
665 | // allows up to 99,999 without resorting to turning the value into | |
666 | // a string which seems like safe enough padding. | |
667 | 2600351 | if (index < 0) { |
668 | 900167 | return 0; // For estimating operation sizes. |
669 | } | |
670 | 1700184 | else if (index < 10) { |
671 | 15394 | return 3; // single character plus a null plus a type. |
672 | } | |
673 | 1684790 | else if (index < 100) { |
674 | 144090 | return 4; // two characters plus a null plus a type. |
675 | } | |
676 | 1540700 | else if (index < 1000) { |
677 | 1440900 | return 5; // three characters plus a null plus a type. |
678 | } | |
679 | 99800 | else if (index < 10000) { |
680 | 9800 | return 6; // four characters plus a null plus a type. |
681 | } | |
682 | ||
683 | 90000 | return Integer.toString(index).length() + 2; |
684 | } | |
685 | ||
686 | /** | |
687 | * Starts a new command document. | |
688 | * | |
689 | * @param operation | |
690 | * The operation to start. | |
691 | * @param collectionName | |
692 | * The collection to operate on. | |
693 | * @param stopOnError | |
694 | * If true then the operations should stop once an error is | |
695 | * encountered. Is mapped to the {@code ordered} field in the | |
696 | * command document. | |
697 | * @param command | |
698 | * The command builder. | |
699 | * @return The {@link ArrayBuilder} for the operations array. | |
700 | */ | |
701 | private ArrayBuilder start(final WriteOperationType operation, | |
702 | final String collectionName, final boolean stopOnError, | |
703 | final DocumentBuilder command) { | |
704 | ||
705 | 919 | String commandName = ""; |
706 | 919 | String arrayName = ""; |
707 | 919 | switch (operation) { |
708 | case INSERT: { | |
709 | 141 | commandName = "insert"; |
710 | 141 | arrayName = "documents"; |
711 | 141 | break; |
712 | } | |
713 | case UPDATE: { | |
714 | 442 | commandName = "update"; |
715 | 442 | arrayName = "updates"; |
716 | 442 | break; |
717 | } | |
718 | case DELETE: { | |
719 | 336 | commandName = "delete"; |
720 | 336 | arrayName = "deletes"; |
721 | break; | |
722 | } | |
723 | } | |
724 | ||
725 | 919 | command.reset(); |
726 | 919 | command.add(commandName, collectionName); |
727 | 919 | if (!stopOnError) { |
728 | 896 | command.add("ordered", stopOnError); |
729 | } | |
730 | 919 | addDurability(command, getDurability()); |
731 | ||
732 | 919 | return command.pushArray(arrayName); |
733 | } | |
734 | ||
735 | /** | |
736 | * Builder for creating {@link BatchedWrite}s. | |
737 | * | |
738 | * @api.yes This class is part of the driver's API. Public and protected | |
739 | * members will be deprecated for at least 1 non-bugfix release | |
740 | * (version numbers are <major>.<minor>.<bugfix>) | |
741 | * before being removed or modified. | |
742 | * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved | |
743 | */ | |
744 | public static class Builder { | |
745 | ||
746 | /** The durability for the writes. */ | |
747 | protected Durability myDurability; | |
748 | ||
749 | /** The mode for submitting the writes to the server. */ | |
750 | protected BatchedWriteMode myMode; | |
751 | ||
752 | /** The writes to submit to the server. */ | |
753 | protected final List<WriteOperation> myWrites; | |
754 | ||
755 | /** | |
756 | * Creates a new Builder. | |
757 | */ | |
758 | 65 | public Builder() { |
759 | 65 | myWrites = new ArrayList<WriteOperation>(); |
760 | ||
761 | 65 | reset(); |
762 | 65 | } |
763 | ||
764 | /** | |
765 | * Constructs a new {@link BatchedWrite} object from the state of the | |
766 | * builder. | |
767 | * | |
768 | * @return The new {@link BatchedWrite} object. | |
769 | */ | |
770 | public BatchedWrite build() { | |
771 | 72 | return new BatchedWrite(this); |
772 | } | |
773 | ||
774 | /** | |
775 | * Update a document based on a query. | |
776 | * <p> | |
777 | * Defaults to deleting as many documents as match the query. | |
778 | * </p> | |
779 | * <p> | |
780 | * This method is delegates to | |
781 | * {@link #delete(DocumentAssignable, boolean) delete(query, false)} | |
782 | * </p> | |
783 | * | |
784 | * @param query | |
785 | * The query to find the document to delete. | |
786 | * @return This builder for chaining method calls. | |
787 | */ | |
788 | public Builder delete(final DocumentAssignable query) { | |
789 | 100039 | return delete(query, false); |
790 | } | |
791 | ||
792 | /** | |
793 | * Update a document based on a query. | |
794 | * <p> | |
795 | * Defaults to deleting as many documents as match the query. | |
796 | * </p> | |
797 | * | |
798 | * @param query | |
799 | * The query to find the document to delete. | |
800 | * @param singleDelete | |
801 | * If true then only a single document will be deleted. If | |
802 | * running in a sharded environment then this field must be | |
803 | * false or the query must contain the shard key. | |
804 | * @return This builder for chaining method calls. | |
805 | */ | |
806 | public Builder delete(final DocumentAssignable query, | |
807 | final boolean singleDelete) { | |
808 | 300063 | return write(new DeleteOperation(query, singleDelete)); |
809 | } | |
810 | ||
811 | /** | |
812 | * Sets the durability for the writes. | |
813 | * <p> | |
814 | * This method delegates to {@link #setDurability(Durability)}. | |
815 | * </p> | |
816 | * | |
817 | * @param durability | |
818 | * The new value for the durability for the writes. | |
819 | * @return This builder for chaining method calls. | |
820 | */ | |
821 | public Builder durability(final Durability durability) { | |
822 | 12 | return setDurability(durability); |
823 | } | |
824 | ||
825 | /** | |
826 | * Returns the durability for the write. | |
827 | * | |
828 | * @return This durability for the write. | |
829 | */ | |
830 | public Durability getDurability() { | |
831 | 19 | return myDurability; |
832 | } | |
833 | ||
834 | /** | |
835 | * Adds an insert operation to the batched write. | |
836 | * | |
837 | * @param document | |
838 | * The document to insert. | |
839 | * @return This builder for chaining method calls. | |
840 | */ | |
841 | public Builder insert(final DocumentAssignable document) { | |
842 | 200052 | return write(new InsertOperation(document)); |
843 | } | |
844 | ||
845 | /** | |
846 | * Sets the mode for submitting the writes to the server. | |
847 | * <p> | |
848 | * This method delegates to {@link #setMode(BatchedWriteMode)}. | |
849 | * </p> | |
850 | * | |
851 | * @param mode | |
852 | * The new value for the mode for submitting the writes to | |
853 | * the server. | |
854 | * @return This builder for chaining method calls. | |
855 | */ | |
856 | public Builder mode(final BatchedWriteMode mode) { | |
857 | 18 | return setMode(mode); |
858 | } | |
859 | ||
860 | /** | |
861 | * Resets the builder back to its initial state for reuse. | |
862 | * | |
863 | * @return This builder for chaining method calls. | |
864 | */ | |
865 | public Builder reset() { | |
866 | 93 | myWrites.clear(); |
867 | 93 | myMode = BatchedWriteMode.SERIALIZE_AND_CONTINUE; |
868 | 93 | myDurability = null; |
869 | ||
870 | 93 | return this; |
871 | } | |
872 | ||
873 | /** | |
874 | * Saves the {@code document} to MongoDB. | |
875 | * <p> | |
876 | * If the {@code document} does not contain an {@code _id} field then | |
877 | * this method is equivalent to: {@link #insert(DocumentAssignable) | |
878 | * insert(document)}. | |
879 | * </p> | |
880 | * <p> | |
881 | * If the {@code document} does contain an {@code _id} field then this | |
882 | * method is equivalent to: | |
883 | * {@link #update(DocumentAssignable, DocumentAssignable) | |
884 | * updateAsync(BuilderFactory.start().add(document.get("_id")), | |
885 | * document, false, true)}. | |
886 | * </p> | |
887 | * | |
888 | * @param document | |
889 | * The document to save. | |
890 | * @return This builder for chaining method calls. | |
891 | */ | |
892 | public Builder save(final DocumentAssignable document) { | |
893 | 18 | final Document doc = document.asDocument(); |
894 | 18 | final Element id = doc.get("_id"); |
895 | 18 | if (id == null) { |
896 | 9 | return insert(doc); |
897 | } | |
898 | 9 | return update(BuilderFactory.start().add(id), doc, false, true); |
899 | } | |
900 | ||
901 | /** | |
902 | * Sets the durability for the writes. | |
903 | * | |
904 | * @param durability | |
905 | * The new value for the durability for the writes. | |
906 | * @return This builder for chaining method calls. | |
907 | */ | |
908 | public Builder setDurability(final Durability durability) { | |
909 | 19 | myDurability = durability; |
910 | 19 | return this; |
911 | } | |
912 | ||
913 | /** | |
914 | * Sets the mode for submitting the writes to the server. | |
915 | * | |
916 | * @param mode | |
917 | * The new value for the mode for submitting the writes to | |
918 | * the server. | |
919 | * @return This builder for chaining method calls. | |
920 | */ | |
921 | public Builder setMode(final BatchedWriteMode mode) { | |
922 | 34 | myMode = mode; |
923 | 34 | return this; |
924 | } | |
925 | ||
926 | /** | |
927 | * Sets the writes to submit to the server. | |
928 | * | |
929 | * @param writes | |
930 | * The new value for the writes to submit to the server. | |
931 | * @return This builder for chaining method calls. | |
932 | */ | |
933 | public Builder setWrites(final List<WriteOperation> writes) { | |
934 | 2 | myWrites.clear(); |
935 | 2 | if (writes != null) { |
936 | 1 | myWrites.addAll(writes); |
937 | } | |
938 | 2 | return this; |
939 | } | |
940 | ||
941 | /** | |
942 | * Update a document based on a query. | |
943 | * <p> | |
944 | * Defaults to updating a single document and not performing an upsert | |
945 | * if no document is found. | |
946 | * </p> | |
947 | * <p> | |
948 | * This method is delegates to | |
949 | * {@link #update(DocumentAssignable, DocumentAssignable, boolean, boolean) | |
950 | * update(query, update, false, false)} | |
951 | * </p> | |
952 | * | |
953 | * @param query | |
954 | * The query to find the document to update. | |
955 | * @param update | |
956 | * The update operations to apply to the document. | |
957 | * @return This builder for chaining method calls. | |
958 | */ | |
959 | public Builder update(final DocumentAssignable query, | |
960 | final DocumentAssignable update) { | |
961 | 100040 | return update(query, update, false, false); |
962 | } | |
963 | ||
964 | /** | |
965 | * Update a document based on a query. | |
966 | * <p> | |
967 | * Defaults to updating a single document and not performing an upsert | |
968 | * if no document is found. | |
969 | * </p> | |
970 | * | |
971 | * @param query | |
972 | * The query to find the document to update. | |
973 | * @param update | |
974 | * The update operations to apply to the document. | |
975 | * @param multiUpdate | |
976 | * If true then the update is applied to all of the matching | |
977 | * documents, otherwise only the first document found is | |
978 | * updated. | |
979 | * @param upsert | |
980 | * If true then if no document is found then a new document | |
981 | * is created and updated, otherwise no operation is | |
982 | * performed. | |
983 | * @return This builder for chaining method calls. | |
984 | */ | |
985 | public Builder update(final DocumentAssignable query, | |
986 | final DocumentAssignable update, final boolean multiUpdate, | |
987 | final boolean upsert) { | |
988 | 400083 | return write(new UpdateOperation(query, update, multiUpdate, upsert)); |
989 | } | |
990 | ||
991 | /** | |
992 | * Adds a single write to the list of writes to send to the server. | |
993 | * | |
994 | * @param write | |
995 | * The write to add to the list of writes to send to the | |
996 | * server. | |
997 | * @return This builder for chaining method calls. | |
998 | */ | |
999 | public Builder write(final WriteOperation write) { | |
1000 | 900199 | myWrites.add(write); |
1001 | 900199 | return this; |
1002 | } | |
1003 | ||
1004 | /** | |
1005 | * Sets the writes to submit to the server. | |
1006 | * <p> | |
1007 | * This method delegates to {@link #setWrites(List)}. | |
1008 | * </p> | |
1009 | * | |
1010 | * @param writes | |
1011 | * The new value for the writes to submit to the server. | |
1012 | * @return This builder for chaining method calls. | |
1013 | */ | |
1014 | public Builder writes(final List<WriteOperation> writes) { | |
1015 | 2 | return setWrites(writes); |
1016 | } | |
1017 | } | |
1018 | ||
1019 | /** | |
1020 | * Bundle is a container for the write command and the | |
1021 | * {@link WriteOperation} it contains. | |
1022 | * | |
1023 | * @api.yes This class is part of the driver's API. Public and protected | |
1024 | * members will be deprecated for at least 1 non-bugfix release | |
1025 | * (version numbers are <major>.<minor>.<bugfix>) | |
1026 | * before being removed or modified. | |
1027 | */ | |
1028 | public static final class Bundle { | |
1029 | /** The command containing the bundled write operations. */ | |
1030 | private final Document myCommand; | |
1031 | ||
1032 | /** The writes that are bundled in the command. */ | |
1033 | private final List<WriteOperation> myWrites; | |
1034 | ||
1035 | /** | |
1036 | * Creates a new Bundle. | |
1037 | * | |
1038 | * @param command | |
1039 | * The command containing the bundled write operations. | |
1040 | * @param writes | |
1041 | * The writes that are bundled in the command. | |
1042 | */ | |
1043 | protected Bundle(final Document command, | |
1044 | final List<WriteOperation> writes) { | |
1045 | 919 | super(); |
1046 | 919 | myCommand = command; |
1047 | 919 | myWrites = Collections |
1048 | .unmodifiableList(new ArrayList<WriteOperation>(writes)); | |
1049 | 919 | } |
1050 | ||
1051 | /** | |
1052 | * Returns the command containing the bundled write operations. | |
1053 | * | |
1054 | * @return The command containing the bundled write operations. | |
1055 | */ | |
1056 | public Document getCommand() { | |
1057 | 113 | return myCommand; |
1058 | } | |
1059 | ||
1060 | /** | |
1061 | * Returns the writes that are bundled in the command. | |
1062 | * | |
1063 | * @return The writes that are bundled in the command. | |
1064 | */ | |
1065 | public List<WriteOperation> getWrites() { | |
1066 | 138 | return myWrites; |
1067 | } | |
1068 | } | |
1069 | } |