Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
Query |
|
| 2.869565217391304;2.87 |
1 | /* | |
2 | * #%L | |
3 | * Query.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.client.message; | |
21 | ||
22 | import java.io.IOException; | |
23 | ||
24 | import com.allanbank.mongodb.ReadPreference; | |
25 | import com.allanbank.mongodb.bson.Document; | |
26 | import com.allanbank.mongodb.bson.io.BsonInputStream; | |
27 | import com.allanbank.mongodb.bson.io.BsonOutputStream; | |
28 | import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream; | |
29 | import com.allanbank.mongodb.bson.io.StringEncoder; | |
30 | import com.allanbank.mongodb.client.Message; | |
31 | import com.allanbank.mongodb.client.Operation; | |
32 | import com.allanbank.mongodb.error.DocumentToLargeException; | |
33 | ||
34 | /** | |
35 | * Message to <a href= | |
36 | * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY" | |
37 | * >query</a> documents from the database matching a criteria. Also used to | |
38 | * issue commands to the database. | |
39 | * | |
40 | * <pre> | |
41 | * <code> | |
42 | * struct OP_QUERY { | |
43 | * MsgHeader header; // standard message header | |
44 | * int32 flags; // bit vector of query options. See below for details. | |
45 | * cstring fullCollectionName; // "dbname.collectionname" | |
46 | * int32 numberToSkip; // number of documents to skip | |
47 | * int32 numberToReturn; // number of documents to return | |
48 | * // in the first OP_REPLY batch | |
49 | * document query; // query object. | |
50 | * [ document returnFieldSelector; ] // Optional. Selector indicating the fields | |
51 | * // to return. | |
52 | * } | |
53 | * </code> | |
54 | * </pre> | |
55 | * | |
56 | * @api.no This class is <b>NOT</b> part of the drivers API. This class may be | |
57 | * mutated in incompatible ways between any two releases of the driver. | |
58 | * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved | |
59 | */ | |
60 | public class Query extends AbstractMessage implements CursorableMessage { | |
61 | ||
62 | /** Flag bit for the await data. */ | |
63 | public static final int AWAIT_DATA_FLAG_BIT = 0x20; | |
64 | ||
65 | /** The default batch size for a MongoDB query. */ | |
66 | public static final int DEFAULT_BATCH_SIZE = 101; | |
67 | ||
68 | /** Flag bit for the exhaust results. */ | |
69 | public static final int EXHAUST_FLAG_BIT = 0x40; | |
70 | ||
71 | /** Flag bit for the no cursor timeout. */ | |
72 | public static final int NO_CURSOR_TIMEOUT_FLAG_BIT = 0x10; | |
73 | ||
74 | /** Flag bit for the OPLOG_REPLAY. */ | |
75 | public static final int OPLOG_REPLAY_FLAG_BIT = 0x08; | |
76 | ||
77 | /** Flag bit for the partial results. */ | |
78 | public static final int PARTIAL_FLAG_BIT = 0x80; | |
79 | ||
80 | /** Flag bit for the replica OK. */ | |
81 | public static final int REPLICA_OK_FLAG_BIT = 0x04; | |
82 | ||
83 | /** Flag bit for the tailable cursors. */ | |
84 | public static final int TAILABLE_CURSOR_FLAG_BIT = 0x02; | |
85 | ||
86 | /** | |
87 | * If true and if using a tailable cursor then the connection will block | |
88 | * waiting for more data. | |
89 | */ | |
90 | private final boolean myAwaitData; | |
91 | ||
92 | /** The number of documents to be returned in each batch. */ | |
93 | private final int myBatchSize; | |
94 | ||
95 | /** If true, all results should be returned in multiple results. */ | |
96 | private final boolean myExhaust; | |
97 | ||
98 | /** The maximum number of documents to be returned. */ | |
99 | private final int myLimit; | |
100 | ||
101 | /** | |
102 | * The size of the message. If negative then the size has not been computed. | |
103 | */ | |
104 | private int myMessageSize; | |
105 | ||
106 | /** If true, marks the cursor as not having a timeout. */ | |
107 | private final boolean myNoCursorTimeout; | |
108 | ||
109 | /** The number of documents to be returned in the first batch. */ | |
110 | private final int myNumberToReturn; | |
111 | ||
112 | /** | |
113 | * The number of documents to skip before starting to return documents. | |
114 | */ | |
115 | private final int myNumberToSkip; | |
116 | ||
117 | /** | |
118 | * If true, return the results found and suppress shard down errors. | |
119 | */ | |
120 | private final boolean myPartial; | |
121 | ||
122 | /** | |
123 | * The query document containing the expression to select documents from the | |
124 | * collection. | |
125 | */ | |
126 | private final Document myQuery; | |
127 | ||
128 | /** Optional document containing the fields to be returned. */ | |
129 | private final Document myReturnFields; | |
130 | ||
131 | /** | |
132 | * If true, then the cursor created should follow additional documents being | |
133 | * inserted. | |
134 | */ | |
135 | private final boolean myTailable; | |
136 | ||
137 | /** | |
138 | * Creates a new Query. | |
139 | * | |
140 | * @param header | |
141 | * The header for the query message. | |
142 | * @param in | |
143 | * The stream to read the kill_cursors message from. | |
144 | * @throws IOException | |
145 | * On a failure reading the kill_cursors message. | |
146 | */ | |
147 | public Query(final Header header, final BsonInputStream in) | |
148 | 682 | throws IOException { |
149 | 682 | final long position = in.getBytesRead(); |
150 | 682 | final long end = (position + header.getLength()) - Header.SIZE; |
151 | ||
152 | 682 | final int flags = in.readInt(); |
153 | 682 | init(in.readCString()); |
154 | 682 | myNumberToSkip = in.readInt(); |
155 | 682 | myNumberToReturn = in.readInt(); |
156 | 682 | myQuery = in.readDocument(); |
157 | 682 | if (in.getBytesRead() < end) { |
158 | 324 | myReturnFields = in.readDocument(); |
159 | } | |
160 | else { | |
161 | 358 | myReturnFields = null; |
162 | } | |
163 | 682 | myAwaitData = (flags & AWAIT_DATA_FLAG_BIT) == AWAIT_DATA_FLAG_BIT; |
164 | 682 | myExhaust = (flags & EXHAUST_FLAG_BIT) == EXHAUST_FLAG_BIT; |
165 | 682 | myNoCursorTimeout = (flags & NO_CURSOR_TIMEOUT_FLAG_BIT) == NO_CURSOR_TIMEOUT_FLAG_BIT; |
166 | 682 | myPartial = (flags & PARTIAL_FLAG_BIT) == PARTIAL_FLAG_BIT; |
167 | 682 | myTailable = (flags & TAILABLE_CURSOR_FLAG_BIT) == TAILABLE_CURSOR_FLAG_BIT; |
168 | ||
169 | 682 | myLimit = 0; |
170 | 682 | myBatchSize = 0; |
171 | 682 | myMessageSize = -1; |
172 | 682 | } |
173 | ||
174 | /** | |
175 | * Creates a new Query. | |
176 | * | |
177 | * @param databaseName | |
178 | * The name of the database. | |
179 | * @param collectionName | |
180 | * The name of the collection. | |
181 | * @param query | |
182 | * The query document containing the expression to select | |
183 | * documents from the collection. | |
184 | * @param returnFields | |
185 | * Optional document containing the fields to be returned. | |
186 | * @param batchSize | |
187 | * The number of documents to be returned in each batch. | |
188 | * @param limit | |
189 | * The limit on the number of documents to return. | |
190 | * @param numberToSkip | |
191 | * The number of documents to skip before starting to return | |
192 | * documents. | |
193 | * @param tailable | |
194 | * If true, then the cursor created should follow additional | |
195 | * documents being inserted. | |
196 | * @param readPreference | |
197 | * The preference for which servers to use to retrieve the | |
198 | * results. | |
199 | * @param noCursorTimeout | |
200 | * If true, marks the cursor as not having a timeout. | |
201 | * @param awaitData | |
202 | * If true and if using a tailable cursor then the connection | |
203 | * will block waiting for more data. | |
204 | * @param exhaust | |
205 | * If true, all results should be returned in multiple results. | |
206 | * @param partial | |
207 | * If true, return the results found and suppress shard down | |
208 | * errors. | |
209 | */ | |
210 | public Query(final String databaseName, final String collectionName, | |
211 | final Document query, final Document returnFields, | |
212 | final int batchSize, final int limit, final int numberToSkip, | |
213 | final boolean tailable, final ReadPreference readPreference, | |
214 | final boolean noCursorTimeout, final boolean awaitData, | |
215 | final boolean exhaust, final boolean partial) { | |
216 | 2330 | super(databaseName, collectionName, readPreference, QueryVersionVisitor |
217 | .version(query)); | |
218 | ||
219 | 2330 | myQuery = query; |
220 | 2330 | myReturnFields = returnFields; |
221 | 2330 | myLimit = limit; |
222 | 2330 | myBatchSize = batchSize; |
223 | 2330 | myNumberToSkip = numberToSkip; |
224 | 2330 | myTailable = tailable; |
225 | 2330 | myNoCursorTimeout = noCursorTimeout; |
226 | 2330 | myAwaitData = awaitData; |
227 | 2330 | myExhaust = exhaust; |
228 | 2330 | myPartial = partial; |
229 | 2330 | myMessageSize = -1; |
230 | ||
231 | 2330 | if (isBatchSizeSet()) { |
232 | 1411 | if (isLimitSet() && (myLimit <= myBatchSize)) { |
233 | 761 | myNumberToReturn = -myLimit; |
234 | } | |
235 | else { | |
236 | 650 | myNumberToReturn = myBatchSize; |
237 | } | |
238 | } | |
239 | 919 | else if (isLimitSet() && (myLimit <= DEFAULT_BATCH_SIZE)) { |
240 | 185 | myNumberToReturn = -myLimit; |
241 | } | |
242 | else { | |
243 | 734 | myNumberToReturn = 0; |
244 | } | |
245 | 2330 | } |
246 | ||
247 | /** | |
248 | * Determines if the passed object is of this same type as this object and | |
249 | * if so that its fields are equal. | |
250 | * | |
251 | * @param object | |
252 | * The object to compare to. | |
253 | * | |
254 | * @see java.lang.Object#equals(java.lang.Object) | |
255 | */ | |
256 | @Override | |
257 | public boolean equals(final Object object) { | |
258 | 265717 | boolean result = false; |
259 | 265717 | if (this == object) { |
260 | 744 | result = true; |
261 | } | |
262 | 264973 | else if ((object != null) && (getClass() == object.getClass())) { |
263 | 262801 | final Query other = (Query) object; |
264 | ||
265 | 262801 | result = super.equals(object) |
266 | && (myAwaitData == other.myAwaitData) | |
267 | && (myExhaust == other.myExhaust) | |
268 | && (myNoCursorTimeout == other.myNoCursorTimeout) | |
269 | && (myPartial == other.myPartial) | |
270 | && (myTailable == other.myTailable) | |
271 | && (myBatchSize == other.myBatchSize) | |
272 | && (myLimit == other.myLimit) | |
273 | && (myNumberToReturn == other.myNumberToReturn) | |
274 | && (myNumberToSkip == other.myNumberToSkip) | |
275 | && myQuery.equals(other.myQuery) | |
276 | && ((myReturnFields == other.myReturnFields) || ((myReturnFields != null) && myReturnFields | |
277 | .equals(other.myReturnFields))); | |
278 | } | |
279 | 265717 | return result; |
280 | } | |
281 | ||
282 | /** | |
283 | * Returns the number of documents to be returned in each batch of results. | |
284 | * | |
285 | * @return The number of documents to be returned in each batch of results. | |
286 | */ | |
287 | @Override | |
288 | public int getBatchSize() { | |
289 | 82 | return myBatchSize; |
290 | } | |
291 | ||
292 | /** | |
293 | * Returns the total number of documents to be returned. | |
294 | * | |
295 | * @return The total number of documents to be returned. | |
296 | */ | |
297 | @Override | |
298 | public int getLimit() { | |
299 | 77 | return myLimit; |
300 | } | |
301 | ||
302 | /** | |
303 | * Returns the number of documents to be returned. | |
304 | * | |
305 | * @return The number of documents to be returned. | |
306 | */ | |
307 | public int getNumberToReturn() { | |
308 | 6 | return myNumberToReturn; |
309 | } | |
310 | ||
311 | /** | |
312 | * Returns the number of documents to skip before starting to return | |
313 | * documents. | |
314 | * | |
315 | * @return The number of documents to skip before starting to return | |
316 | * documents. | |
317 | */ | |
318 | public int getNumberToSkip() { | |
319 | 7 | return myNumberToSkip; |
320 | } | |
321 | ||
322 | /** | |
323 | * {@inheritDoc} | |
324 | * <p> | |
325 | * Overridden to return the name of the operation: "QUERY". | |
326 | * </p> | |
327 | */ | |
328 | @Override | |
329 | public String getOperationName() { | |
330 | 1 | return Operation.QUERY.name(); |
331 | } | |
332 | ||
333 | /** | |
334 | * Returns the query document containing the expression to select documents | |
335 | * from the collection. | |
336 | * | |
337 | * @return The query document containing the expression to select documents | |
338 | * from the collection. | |
339 | */ | |
340 | public Document getQuery() { | |
341 | 32 | return myQuery; |
342 | } | |
343 | ||
344 | /** | |
345 | * Returns the optional document containing the fields to be returned. | |
346 | * Optional here means this method may return <code>null</code>. | |
347 | * | |
348 | * @return The optional document containing the fields to be returned. | |
349 | */ | |
350 | public Document getReturnFields() { | |
351 | 7 | return myReturnFields; |
352 | } | |
353 | ||
354 | /** | |
355 | * Computes a reasonable hash code. | |
356 | * | |
357 | * @return The hash code value. | |
358 | */ | |
359 | @Override | |
360 | public int hashCode() { | |
361 | 524901 | int result = 1; |
362 | 524901 | result = (31 * result) + super.hashCode(); |
363 | 524901 | result = (31 * result) + (myAwaitData ? 1 : 3); |
364 | 524901 | result = (31 * result) + (myExhaust ? 1 : 7); |
365 | 524901 | result = (31 * result) + (myNoCursorTimeout ? 1 : 11); |
366 | 524901 | result = (31 * result) + (myPartial ? 1 : 13); |
367 | 524901 | result = (31 * result) + (myTailable ? 1 : 19); |
368 | 524901 | result = (31 * result) + myBatchSize; |
369 | 524901 | result = (31 * result) + myLimit; |
370 | 524901 | result = (31 * result) + myNumberToReturn; |
371 | 524901 | result = (31 * result) + myNumberToSkip; |
372 | 524901 | result = (31 * result) + myQuery.hashCode(); |
373 | 524901 | result = (31 * result) |
374 | + (myReturnFields == null ? 1 : myReturnFields.hashCode()); | |
375 | 524901 | return result; |
376 | } | |
377 | ||
378 | /** | |
379 | * Returns true and if using a tailable cursor then the connection will | |
380 | * block waiting for more data. | |
381 | * | |
382 | * @return True and if using a tailable cursor then the connection will | |
383 | * block waiting for more data. | |
384 | */ | |
385 | public boolean isAwaitData() { | |
386 | 7 | return myAwaitData; |
387 | } | |
388 | ||
389 | /** | |
390 | * Returns true if the batch size is greater than zero. | |
391 | * | |
392 | * @return True if the batch size is greater than zero. | |
393 | */ | |
394 | public boolean isBatchSizeSet() { | |
395 | 2330 | return 0 < myBatchSize; |
396 | } | |
397 | ||
398 | /** | |
399 | * Returns true if all results should be returned in multiple results. | |
400 | * | |
401 | * @return True if all results should be returned in multiple results. | |
402 | */ | |
403 | public boolean isExhaust() { | |
404 | 7 | return myExhaust; |
405 | } | |
406 | ||
407 | /** | |
408 | * Returns true if the limit is greater than zero. | |
409 | * | |
410 | * @return True if the limit is greater than zero. | |
411 | */ | |
412 | public boolean isLimitSet() { | |
413 | 2330 | return 0 < myLimit; |
414 | } | |
415 | ||
416 | /** | |
417 | * Returns true if marking the cursor as not having a timeout. | |
418 | * | |
419 | * @return True if marking the cursor as not having a timeout. | |
420 | */ | |
421 | public boolean isNoCursorTimeout() { | |
422 | 7 | return myNoCursorTimeout; |
423 | } | |
424 | ||
425 | /** | |
426 | * Returns true if return the results found and suppress shard down errors. | |
427 | * | |
428 | * @return True if return the results found and suppress shard down errors.. | |
429 | */ | |
430 | public boolean isPartial() { | |
431 | 7 | return myPartial; |
432 | } | |
433 | ||
434 | /** | |
435 | * Returns true if the cursor created should follow additional documents | |
436 | * being inserted. | |
437 | * | |
438 | * @return True if the cursor created should follow additional documents | |
439 | * being inserted. | |
440 | */ | |
441 | public boolean isTailable() { | |
442 | 7 | return myTailable; |
443 | } | |
444 | ||
445 | /** | |
446 | * {@inheritDoc} | |
447 | * <p> | |
448 | * Overridden to return the size of the {@link Query}. | |
449 | * </p> | |
450 | */ | |
451 | @Override | |
452 | public int size() { | |
453 | ||
454 | 432 | int size = HEADER_SIZE + 14; // See below. |
455 | // size += 4; // flags; | |
456 | 432 | size += StringEncoder.utf8Size(myDatabaseName); |
457 | // size += 1; // StringEncoder.utf8Size("."); | |
458 | 432 | size += StringEncoder.utf8Size(myCollectionName); |
459 | // size += 1; // \0 on the CString. | |
460 | // size += 4; // numberToSkip | |
461 | // size += 4; // numberToReturn | |
462 | 432 | size += myQuery.size(); |
463 | 432 | if (myReturnFields != null) { |
464 | 324 | size += myReturnFields.size(); |
465 | } | |
466 | ||
467 | 432 | return size; |
468 | } | |
469 | ||
470 | /** | |
471 | * {@inheritDoc} | |
472 | * <p> | |
473 | * Overridden to ensure the inserted documents are not too large in | |
474 | * aggregate. | |
475 | * </p> | |
476 | */ | |
477 | @Override | |
478 | public void validateSize(final int maxDocumentSize) | |
479 | throws DocumentToLargeException { | |
480 | 62 | if (myMessageSize < 0) { |
481 | 56 | long size = 0; |
482 | 56 | if (myQuery != null) { |
483 | 55 | size += myQuery.size(); |
484 | } | |
485 | 56 | if (myReturnFields != null) { |
486 | 4 | size += myReturnFields.size(); |
487 | } | |
488 | ||
489 | 56 | myMessageSize = (int) size; |
490 | } | |
491 | ||
492 | 62 | if (maxDocumentSize < myMessageSize) { |
493 | 1 | throw new DocumentToLargeException(myMessageSize, maxDocumentSize, |
494 | myQuery); | |
495 | } | |
496 | 61 | } |
497 | ||
498 | /** | |
499 | * {@inheritDoc} | |
500 | * <p> | |
501 | * Overridden to write the query message. | |
502 | * </p> | |
503 | * | |
504 | * @see Message#write(int, BsonOutputStream) | |
505 | */ | |
506 | @Override | |
507 | public void write(final int messageId, final BsonOutputStream out) | |
508 | throws IOException { | |
509 | 433 | final int flags = computeFlags(); |
510 | ||
511 | 433 | int size = HEADER_SIZE; |
512 | 433 | size += 4; // flags; |
513 | 433 | size += out.sizeOfCString(myDatabaseName, ".", myCollectionName); |
514 | 433 | size += 4; // numberToSkip |
515 | 433 | size += 4; // numberToReturn |
516 | 433 | size += myQuery.size(); |
517 | 433 | if (myReturnFields != null) { |
518 | 324 | size += myReturnFields.size(); |
519 | } | |
520 | ||
521 | 433 | writeHeader(out, messageId, 0, Operation.QUERY, size); |
522 | 433 | out.writeInt(flags); |
523 | 433 | out.writeCString(myDatabaseName, ".", myCollectionName); |
524 | 433 | out.writeInt(myNumberToSkip); |
525 | 433 | out.writeInt(myNumberToReturn); |
526 | 433 | out.writeDocument(myQuery); |
527 | 433 | if (myReturnFields != null) { |
528 | 324 | out.writeDocument(myReturnFields); |
529 | } | |
530 | 433 | } |
531 | ||
532 | /** | |
533 | * {@inheritDoc} | |
534 | * <p> | |
535 | * Overridden to write the query message. | |
536 | * </p> | |
537 | * | |
538 | * @see Message#write(int, BsonOutputStream) | |
539 | */ | |
540 | @Override | |
541 | public void write(final int messageId, final BufferingBsonOutputStream out) | |
542 | throws IOException { | |
543 | 56 | final int flags = computeFlags(); |
544 | ||
545 | 56 | final long start = writeHeader(out, messageId, 0, Operation.QUERY); |
546 | 56 | out.writeInt(flags); |
547 | 56 | out.writeCString(myDatabaseName, ".", myCollectionName); |
548 | 56 | out.writeInt(myNumberToSkip); |
549 | 56 | out.writeInt(myNumberToReturn); |
550 | 56 | out.writeDocument(myQuery); |
551 | 56 | if (myReturnFields != null) { |
552 | 2 | out.writeDocument(myReturnFields); |
553 | } | |
554 | 56 | finishHeader(out, start); |
555 | ||
556 | 56 | out.flushBuffer(); |
557 | 56 | } |
558 | ||
559 | /** | |
560 | * Computes the message flags bit field. | |
561 | * | |
562 | * @return The message flags bit field. | |
563 | */ | |
564 | private int computeFlags() { | |
565 | 489 | int flags = 0; |
566 | 489 | if (myAwaitData) { |
567 | 218 | flags += AWAIT_DATA_FLAG_BIT; |
568 | } | |
569 | 489 | if (myExhaust) { |
570 | 206 | flags += EXHAUST_FLAG_BIT; |
571 | } | |
572 | 489 | if (myNoCursorTimeout) { |
573 | 196 | flags += NO_CURSOR_TIMEOUT_FLAG_BIT; |
574 | } | |
575 | 489 | if (myPartial) { |
576 | 191 | flags += PARTIAL_FLAG_BIT; |
577 | } | |
578 | 489 | if (getReadPreference().isSecondaryOk()) { |
579 | 211 | flags += REPLICA_OK_FLAG_BIT; |
580 | } | |
581 | 489 | if (myTailable) { |
582 | 213 | flags += TAILABLE_CURSOR_FLAG_BIT; |
583 | } | |
584 | 489 | return flags; |
585 | } | |
586 | } |