View Javadoc
1   /*
2    * #%L
3    * Query.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.message;
21  
22  import java.io.IOException;
23  
24  import com.allanbank.mongodb.ReadPreference;
25  import com.allanbank.mongodb.bson.Document;
26  import com.allanbank.mongodb.bson.io.BsonInputStream;
27  import com.allanbank.mongodb.bson.io.BsonOutputStream;
28  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
29  import com.allanbank.mongodb.bson.io.StringEncoder;
30  import com.allanbank.mongodb.client.Message;
31  import com.allanbank.mongodb.client.Operation;
32  import com.allanbank.mongodb.error.DocumentToLargeException;
33  
34  /**
35   * Message to <a href=
36   * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY"
37   * >query</a> documents from the database matching a criteria. Also used to
38   * issue commands to the database.
39   * 
40   * <pre>
41   * <code>
42   * struct OP_QUERY {
43   *     MsgHeader header;                // standard message header
44   *     int32     flags;                  // bit vector of query options.  See below for details.
45   *     cstring   fullCollectionName;    // "dbname.collectionname"
46   *     int32     numberToSkip;          // number of documents to skip
47   *     int32     numberToReturn;        // number of documents to return
48   *                                      //  in the first OP_REPLY batch
49   *     document  query;                 // query object.
50   *   [ document  returnFieldSelector; ] // Optional. Selector indicating the fields
51   *                                      //  to return.
52   * }
53   * </code>
54   * </pre>
55   * 
56   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
57   *         mutated in incompatible ways between any two releases of the driver.
58   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
59   */
60  public class Query extends AbstractMessage implements CursorableMessage {
61  
62      /** Flag bit for the await data. */
63      public static final int AWAIT_DATA_FLAG_BIT = 0x20;
64  
65      /** The default batch size for a MongoDB query. */
66      public static final int DEFAULT_BATCH_SIZE = 101;
67  
68      /** Flag bit for the exhaust results. */
69      public static final int EXHAUST_FLAG_BIT = 0x40;
70  
71      /** Flag bit for the no cursor timeout. */
72      public static final int NO_CURSOR_TIMEOUT_FLAG_BIT = 0x10;
73  
74      /** Flag bit for the OPLOG_REPLAY. */
75      public static final int OPLOG_REPLAY_FLAG_BIT = 0x08;
76  
77      /** Flag bit for the partial results. */
78      public static final int PARTIAL_FLAG_BIT = 0x80;
79  
80      /** Flag bit for the replica OK. */
81      public static final int REPLICA_OK_FLAG_BIT = 0x04;
82  
83      /** Flag bit for the tailable cursors. */
84      public static final int TAILABLE_CURSOR_FLAG_BIT = 0x02;
85  
86      /**
87       * If true and if using a tailable cursor then the connection will block
88       * waiting for more data.
89       */
90      private final boolean myAwaitData;
91  
92      /** The number of documents to be returned in each batch. */
93      private final int myBatchSize;
94  
95      /** If true, all results should be returned in multiple results. */
96      private final boolean myExhaust;
97  
98      /** The maximum number of documents to be returned. */
99      private final int myLimit;
100 
101     /**
102      * The size of the message. If negative then the size has not been computed.
103      */
104     private int myMessageSize;
105 
106     /** If true, marks the cursor as not having a timeout. */
107     private final boolean myNoCursorTimeout;
108 
109     /** The number of documents to be returned in the first batch. */
110     private final int myNumberToReturn;
111 
112     /**
113      * The number of documents to skip before starting to return documents.
114      */
115     private final int myNumberToSkip;
116 
117     /**
118      * If true, return the results found and suppress shard down errors.
119      */
120     private final boolean myPartial;
121 
122     /**
123      * The query document containing the expression to select documents from the
124      * collection.
125      */
126     private final Document myQuery;
127 
128     /** Optional document containing the fields to be returned. */
129     private final Document myReturnFields;
130 
131     /**
132      * If true, then the cursor created should follow additional documents being
133      * inserted.
134      */
135     private final boolean myTailable;
136 
137     /**
138      * Creates a new Query.
139      * 
140      * @param header
141      *            The header for the query message.
142      * @param in
143      *            The stream to read the kill_cursors message from.
144      * @throws IOException
145      *             On a failure reading the kill_cursors message.
146      */
147     public Query(final Header header, final BsonInputStream in)
148             throws IOException {
149         final long position = in.getBytesRead();
150         final long end = (position + header.getLength()) - Header.SIZE;
151 
152         final int flags = in.readInt();
153         init(in.readCString());
154         myNumberToSkip = in.readInt();
155         myNumberToReturn = in.readInt();
156         myQuery = in.readDocument();
157         if (in.getBytesRead() < end) {
158             myReturnFields = in.readDocument();
159         }
160         else {
161             myReturnFields = null;
162         }
163         myAwaitData = (flags & AWAIT_DATA_FLAG_BIT) == AWAIT_DATA_FLAG_BIT;
164         myExhaust = (flags & EXHAUST_FLAG_BIT) == EXHAUST_FLAG_BIT;
165         myNoCursorTimeout = (flags & NO_CURSOR_TIMEOUT_FLAG_BIT) == NO_CURSOR_TIMEOUT_FLAG_BIT;
166         myPartial = (flags & PARTIAL_FLAG_BIT) == PARTIAL_FLAG_BIT;
167         myTailable = (flags & TAILABLE_CURSOR_FLAG_BIT) == TAILABLE_CURSOR_FLAG_BIT;
168 
169         myLimit = 0;
170         myBatchSize = 0;
171         myMessageSize = -1;
172     }
173 
174     /**
175      * Creates a new Query.
176      * 
177      * @param databaseName
178      *            The name of the database.
179      * @param collectionName
180      *            The name of the collection.
181      * @param query
182      *            The query document containing the expression to select
183      *            documents from the collection.
184      * @param returnFields
185      *            Optional document containing the fields to be returned.
186      * @param batchSize
187      *            The number of documents to be returned in each batch.
188      * @param limit
189      *            The limit on the number of documents to return.
190      * @param numberToSkip
191      *            The number of documents to skip before starting to return
192      *            documents.
193      * @param tailable
194      *            If true, then the cursor created should follow additional
195      *            documents being inserted.
196      * @param readPreference
197      *            The preference for which servers to use to retrieve the
198      *            results.
199      * @param noCursorTimeout
200      *            If true, marks the cursor as not having a timeout.
201      * @param awaitData
202      *            If true and if using a tailable cursor then the connection
203      *            will block waiting for more data.
204      * @param exhaust
205      *            If true, all results should be returned in multiple results.
206      * @param partial
207      *            If true, return the results found and suppress shard down
208      *            errors.
209      */
210     public Query(final String databaseName, final String collectionName,
211             final Document query, final Document returnFields,
212             final int batchSize, final int limit, final int numberToSkip,
213             final boolean tailable, final ReadPreference readPreference,
214             final boolean noCursorTimeout, final boolean awaitData,
215             final boolean exhaust, final boolean partial) {
216         super(databaseName, collectionName, readPreference, QueryVersionVisitor
217                 .version(query));
218 
219         myQuery = query;
220         myReturnFields = returnFields;
221         myLimit = limit;
222         myBatchSize = batchSize;
223         myNumberToSkip = numberToSkip;
224         myTailable = tailable;
225         myNoCursorTimeout = noCursorTimeout;
226         myAwaitData = awaitData;
227         myExhaust = exhaust;
228         myPartial = partial;
229         myMessageSize = -1;
230 
231         if (isBatchSizeSet()) {
232             if (isLimitSet() && (myLimit <= myBatchSize)) {
233                 myNumberToReturn = -myLimit;
234             }
235             else {
236                 myNumberToReturn = myBatchSize;
237             }
238         }
239         else if (isLimitSet() && (myLimit <= DEFAULT_BATCH_SIZE)) {
240             myNumberToReturn = -myLimit;
241         }
242         else {
243             myNumberToReturn = 0;
244         }
245     }
246 
247     /**
248      * Determines if the passed object is of this same type as this object and
249      * if so that its fields are equal.
250      * 
251      * @param object
252      *            The object to compare to.
253      * 
254      * @see java.lang.Object#equals(java.lang.Object)
255      */
256     @Override
257     public boolean equals(final Object object) {
258         boolean result = false;
259         if (this == object) {
260             result = true;
261         }
262         else if ((object != null) && (getClass() == object.getClass())) {
263             final Query other = (Query) object;
264 
265             result = super.equals(object)
266                     && (myAwaitData == other.myAwaitData)
267                     && (myExhaust == other.myExhaust)
268                     && (myNoCursorTimeout == other.myNoCursorTimeout)
269                     && (myPartial == other.myPartial)
270                     && (myTailable == other.myTailable)
271                     && (myBatchSize == other.myBatchSize)
272                     && (myLimit == other.myLimit)
273                     && (myNumberToReturn == other.myNumberToReturn)
274                     && (myNumberToSkip == other.myNumberToSkip)
275                     && myQuery.equals(other.myQuery)
276                     && ((myReturnFields == other.myReturnFields) || ((myReturnFields != null) && myReturnFields
277                             .equals(other.myReturnFields)));
278         }
279         return result;
280     }
281 
282     /**
283      * Returns the number of documents to be returned in each batch of results.
284      * 
285      * @return The number of documents to be returned in each batch of results.
286      */
287     @Override
288     public int getBatchSize() {
289         return myBatchSize;
290     }
291 
292     /**
293      * Returns the total number of documents to be returned.
294      * 
295      * @return The total number of documents to be returned.
296      */
297     @Override
298     public int getLimit() {
299         return myLimit;
300     }
301 
302     /**
303      * Returns the number of documents to be returned.
304      * 
305      * @return The number of documents to be returned.
306      */
307     public int getNumberToReturn() {
308         return myNumberToReturn;
309     }
310 
311     /**
312      * Returns the number of documents to skip before starting to return
313      * documents.
314      * 
315      * @return The number of documents to skip before starting to return
316      *         documents.
317      */
318     public int getNumberToSkip() {
319         return myNumberToSkip;
320     }
321 
322     /**
323      * {@inheritDoc}
324      * <p>
325      * Overridden to return the name of the operation: "QUERY".
326      * </p>
327      */
328     @Override
329     public String getOperationName() {
330         return Operation.QUERY.name();
331     }
332 
333     /**
334      * Returns the query document containing the expression to select documents
335      * from the collection.
336      * 
337      * @return The query document containing the expression to select documents
338      *         from the collection.
339      */
340     public Document getQuery() {
341         return myQuery;
342     }
343 
344     /**
345      * Returns the optional document containing the fields to be returned.
346      * Optional here means this method may return <code>null</code>.
347      * 
348      * @return The optional document containing the fields to be returned.
349      */
350     public Document getReturnFields() {
351         return myReturnFields;
352     }
353 
354     /**
355      * Computes a reasonable hash code.
356      * 
357      * @return The hash code value.
358      */
359     @Override
360     public int hashCode() {
361         int result = 1;
362         result = (31 * result) + super.hashCode();
363         result = (31 * result) + (myAwaitData ? 1 : 3);
364         result = (31 * result) + (myExhaust ? 1 : 7);
365         result = (31 * result) + (myNoCursorTimeout ? 1 : 11);
366         result = (31 * result) + (myPartial ? 1 : 13);
367         result = (31 * result) + (myTailable ? 1 : 19);
368         result = (31 * result) + myBatchSize;
369         result = (31 * result) + myLimit;
370         result = (31 * result) + myNumberToReturn;
371         result = (31 * result) + myNumberToSkip;
372         result = (31 * result) + myQuery.hashCode();
373         result = (31 * result)
374                 + (myReturnFields == null ? 1 : myReturnFields.hashCode());
375         return result;
376     }
377 
378     /**
379      * Returns true and if using a tailable cursor then the connection will
380      * block waiting for more data.
381      * 
382      * @return True and if using a tailable cursor then the connection will
383      *         block waiting for more data.
384      */
385     public boolean isAwaitData() {
386         return myAwaitData;
387     }
388 
389     /**
390      * Returns true if the batch size is greater than zero.
391      * 
392      * @return True if the batch size is greater than zero.
393      */
394     public boolean isBatchSizeSet() {
395         return 0 < myBatchSize;
396     }
397 
398     /**
399      * Returns true if all results should be returned in multiple results.
400      * 
401      * @return True if all results should be returned in multiple results.
402      */
403     public boolean isExhaust() {
404         return myExhaust;
405     }
406 
407     /**
408      * Returns true if the limit is greater than zero.
409      * 
410      * @return True if the limit is greater than zero.
411      */
412     public boolean isLimitSet() {
413         return 0 < myLimit;
414     }
415 
416     /**
417      * Returns true if marking the cursor as not having a timeout.
418      * 
419      * @return True if marking the cursor as not having a timeout.
420      */
421     public boolean isNoCursorTimeout() {
422         return myNoCursorTimeout;
423     }
424 
425     /**
426      * Returns true if return the results found and suppress shard down errors.
427      * 
428      * @return True if return the results found and suppress shard down errors..
429      */
430     public boolean isPartial() {
431         return myPartial;
432     }
433 
434     /**
435      * Returns true if the cursor created should follow additional documents
436      * being inserted.
437      * 
438      * @return True if the cursor created should follow additional documents
439      *         being inserted.
440      */
441     public boolean isTailable() {
442         return myTailable;
443     }
444 
445     /**
446      * {@inheritDoc}
447      * <p>
448      * Overridden to return the size of the {@link Query}.
449      * </p>
450      */
451     @Override
452     public int size() {
453 
454         int size = HEADER_SIZE + 14; // See below.
455         // size += 4; // flags;
456         size += StringEncoder.utf8Size(myDatabaseName);
457         // size += 1; // StringEncoder.utf8Size(".");
458         size += StringEncoder.utf8Size(myCollectionName);
459         // size += 1; // \0 on the CString.
460         // size += 4; // numberToSkip
461         // size += 4; // numberToReturn
462         size += myQuery.size();
463         if (myReturnFields != null) {
464             size += myReturnFields.size();
465         }
466 
467         return size;
468     }
469 
470     /**
471      * {@inheritDoc}
472      * <p>
473      * Overridden to ensure the inserted documents are not too large in
474      * aggregate.
475      * </p>
476      */
477     @Override
478     public void validateSize(final int maxDocumentSize)
479             throws DocumentToLargeException {
480         if (myMessageSize < 0) {
481             long size = 0;
482             if (myQuery != null) {
483                 size += myQuery.size();
484             }
485             if (myReturnFields != null) {
486                 size += myReturnFields.size();
487             }
488 
489             myMessageSize = (int) size;
490         }
491 
492         if (maxDocumentSize < myMessageSize) {
493             throw new DocumentToLargeException(myMessageSize, maxDocumentSize,
494                     myQuery);
495         }
496     }
497 
498     /**
499      * {@inheritDoc}
500      * <p>
501      * Overridden to write the query message.
502      * </p>
503      * 
504      * @see Message#write(int, BsonOutputStream)
505      */
506     @Override
507     public void write(final int messageId, final BsonOutputStream out)
508             throws IOException {
509         final int flags = computeFlags();
510 
511         int size = HEADER_SIZE;
512         size += 4; // flags;
513         size += out.sizeOfCString(myDatabaseName, ".", myCollectionName);
514         size += 4; // numberToSkip
515         size += 4; // numberToReturn
516         size += myQuery.size();
517         if (myReturnFields != null) {
518             size += myReturnFields.size();
519         }
520 
521         writeHeader(out, messageId, 0, Operation.QUERY, size);
522         out.writeInt(flags);
523         out.writeCString(myDatabaseName, ".", myCollectionName);
524         out.writeInt(myNumberToSkip);
525         out.writeInt(myNumberToReturn);
526         out.writeDocument(myQuery);
527         if (myReturnFields != null) {
528             out.writeDocument(myReturnFields);
529         }
530     }
531 
532     /**
533      * {@inheritDoc}
534      * <p>
535      * Overridden to write the query message.
536      * </p>
537      * 
538      * @see Message#write(int, BsonOutputStream)
539      */
540     @Override
541     public void write(final int messageId, final BufferingBsonOutputStream out)
542             throws IOException {
543         final int flags = computeFlags();
544 
545         final long start = writeHeader(out, messageId, 0, Operation.QUERY);
546         out.writeInt(flags);
547         out.writeCString(myDatabaseName, ".", myCollectionName);
548         out.writeInt(myNumberToSkip);
549         out.writeInt(myNumberToReturn);
550         out.writeDocument(myQuery);
551         if (myReturnFields != null) {
552             out.writeDocument(myReturnFields);
553         }
554         finishHeader(out, start);
555 
556         out.flushBuffer();
557     }
558 
559     /**
560      * Computes the message flags bit field.
561      * 
562      * @return The message flags bit field.
563      */
564     private int computeFlags() {
565         int flags = 0;
566         if (myAwaitData) {
567             flags += AWAIT_DATA_FLAG_BIT;
568         }
569         if (myExhaust) {
570             flags += EXHAUST_FLAG_BIT;
571         }
572         if (myNoCursorTimeout) {
573             flags += NO_CURSOR_TIMEOUT_FLAG_BIT;
574         }
575         if (myPartial) {
576             flags += PARTIAL_FLAG_BIT;
577         }
578         if (getReadPreference().isSecondaryOk()) {
579             flags += REPLICA_OK_FLAG_BIT;
580         }
581         if (myTailable) {
582             flags += TAILABLE_CURSOR_FLAG_BIT;
583         }
584         return flags;
585     }
586 }