Coverage Report - com.allanbank.mongodb.client.message.Query
 
Classes in this File Line Coverage Branch Coverage Complexity
Query
100%
136/136
98%
98/100
2.87
 
 1  
 /*
 2  
  * #%L
 3  
  * Query.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.message;
 21  
 
 22  
 import java.io.IOException;
 23  
 
 24  
 import com.allanbank.mongodb.ReadPreference;
 25  
 import com.allanbank.mongodb.bson.Document;
 26  
 import com.allanbank.mongodb.bson.io.BsonInputStream;
 27  
 import com.allanbank.mongodb.bson.io.BsonOutputStream;
 28  
 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
 29  
 import com.allanbank.mongodb.bson.io.StringEncoder;
 30  
 import com.allanbank.mongodb.client.Message;
 31  
 import com.allanbank.mongodb.client.Operation;
 32  
 import com.allanbank.mongodb.error.DocumentToLargeException;
 33  
 
 34  
 /**
 35  
  * Message to <a href=
 36  
  * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY"
 37  
  * >query</a> documents from the database matching a criteria. Also used to
 38  
  * issue commands to the database.
 39  
  * 
 40  
  * <pre>
 41  
  * <code>
 42  
  * struct OP_QUERY {
 43  
  *     MsgHeader header;                // standard message header
 44  
  *     int32     flags;                  // bit vector of query options.  See below for details.
 45  
  *     cstring   fullCollectionName;    // "dbname.collectionname"
 46  
  *     int32     numberToSkip;          // number of documents to skip
 47  
  *     int32     numberToReturn;        // number of documents to return
 48  
  *                                      //  in the first OP_REPLY batch
 49  
  *     document  query;                 // query object.
 50  
  *   [ document  returnFieldSelector; ] // Optional. Selector indicating the fields
 51  
  *                                      //  to return.
 52  
  * }
 53  
  * </code>
 54  
  * </pre>
 55  
  * 
 56  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 57  
  *         mutated in incompatible ways between any two releases of the driver.
 58  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 59  
  */
 60  
 public class Query extends AbstractMessage implements CursorableMessage {
 61  
 
 62  
     /** Flag bit for the await data. */
 63  
     public static final int AWAIT_DATA_FLAG_BIT = 0x20;
 64  
 
 65  
     /** The default batch size for a MongoDB query. */
 66  
     public static final int DEFAULT_BATCH_SIZE = 101;
 67  
 
 68  
     /** Flag bit for the exhaust results. */
 69  
     public static final int EXHAUST_FLAG_BIT = 0x40;
 70  
 
 71  
     /** Flag bit for the no cursor timeout. */
 72  
     public static final int NO_CURSOR_TIMEOUT_FLAG_BIT = 0x10;
 73  
 
 74  
     /** Flag bit for the OPLOG_REPLAY. */
 75  
     public static final int OPLOG_REPLAY_FLAG_BIT = 0x08;
 76  
 
 77  
     /** Flag bit for the partial results. */
 78  
     public static final int PARTIAL_FLAG_BIT = 0x80;
 79  
 
 80  
     /** Flag bit for the replica OK. */
 81  
     public static final int REPLICA_OK_FLAG_BIT = 0x04;
 82  
 
 83  
     /** Flag bit for the tailable cursors. */
 84  
     public static final int TAILABLE_CURSOR_FLAG_BIT = 0x02;
 85  
 
 86  
     /**
 87  
      * If true and if using a tailable cursor then the connection will block
 88  
      * waiting for more data.
 89  
      */
 90  
     private final boolean myAwaitData;
 91  
 
 92  
     /** The number of documents to be returned in each batch. */
 93  
     private final int myBatchSize;
 94  
 
 95  
     /** If true, all results should be returned in multiple results. */
 96  
     private final boolean myExhaust;
 97  
 
 98  
     /** The maximum number of documents to be returned. */
 99  
     private final int myLimit;
 100  
 
 101  
     /**
 102  
      * The size of the message. If negative then the size has not been computed.
 103  
      */
 104  
     private int myMessageSize;
 105  
 
 106  
     /** If true, marks the cursor as not having a timeout. */
 107  
     private final boolean myNoCursorTimeout;
 108  
 
 109  
     /** The number of documents to be returned in the first batch. */
 110  
     private final int myNumberToReturn;
 111  
 
 112  
     /**
 113  
      * The number of documents to skip before starting to return documents.
 114  
      */
 115  
     private final int myNumberToSkip;
 116  
 
 117  
     /**
 118  
      * If true, return the results found and suppress shard down errors.
 119  
      */
 120  
     private final boolean myPartial;
 121  
 
 122  
     /**
 123  
      * The query document containing the expression to select documents from the
 124  
      * collection.
 125  
      */
 126  
     private final Document myQuery;
 127  
 
 128  
     /** Optional document containing the fields to be returned. */
 129  
     private final Document myReturnFields;
 130  
 
 131  
     /**
 132  
      * If true, then the cursor created should follow additional documents being
 133  
      * inserted.
 134  
      */
 135  
     private final boolean myTailable;
 136  
 
 137  
     /**
 138  
      * Creates a new Query.
 139  
      * 
 140  
      * @param header
 141  
      *            The header for the query message.
 142  
      * @param in
 143  
      *            The stream to read the kill_cursors message from.
 144  
      * @throws IOException
 145  
      *             On a failure reading the kill_cursors message.
 146  
      */
 147  
     public Query(final Header header, final BsonInputStream in)
 148  682
             throws IOException {
 149  682
         final long position = in.getBytesRead();
 150  682
         final long end = (position + header.getLength()) - Header.SIZE;
 151  
 
 152  682
         final int flags = in.readInt();
 153  682
         init(in.readCString());
 154  682
         myNumberToSkip = in.readInt();
 155  682
         myNumberToReturn = in.readInt();
 156  682
         myQuery = in.readDocument();
 157  682
         if (in.getBytesRead() < end) {
 158  324
             myReturnFields = in.readDocument();
 159  
         }
 160  
         else {
 161  358
             myReturnFields = null;
 162  
         }
 163  682
         myAwaitData = (flags & AWAIT_DATA_FLAG_BIT) == AWAIT_DATA_FLAG_BIT;
 164  682
         myExhaust = (flags & EXHAUST_FLAG_BIT) == EXHAUST_FLAG_BIT;
 165  682
         myNoCursorTimeout = (flags & NO_CURSOR_TIMEOUT_FLAG_BIT) == NO_CURSOR_TIMEOUT_FLAG_BIT;
 166  682
         myPartial = (flags & PARTIAL_FLAG_BIT) == PARTIAL_FLAG_BIT;
 167  682
         myTailable = (flags & TAILABLE_CURSOR_FLAG_BIT) == TAILABLE_CURSOR_FLAG_BIT;
 168  
 
 169  682
         myLimit = 0;
 170  682
         myBatchSize = 0;
 171  682
         myMessageSize = -1;
 172  682
     }
 173  
 
 174  
     /**
 175  
      * Creates a new Query.
 176  
      * 
 177  
      * @param databaseName
 178  
      *            The name of the database.
 179  
      * @param collectionName
 180  
      *            The name of the collection.
 181  
      * @param query
 182  
      *            The query document containing the expression to select
 183  
      *            documents from the collection.
 184  
      * @param returnFields
 185  
      *            Optional document containing the fields to be returned.
 186  
      * @param batchSize
 187  
      *            The number of documents to be returned in each batch.
 188  
      * @param limit
 189  
      *            The limit on the number of documents to return.
 190  
      * @param numberToSkip
 191  
      *            The number of documents to skip before starting to return
 192  
      *            documents.
 193  
      * @param tailable
 194  
      *            If true, then the cursor created should follow additional
 195  
      *            documents being inserted.
 196  
      * @param readPreference
 197  
      *            The preference for which servers to use to retrieve the
 198  
      *            results.
 199  
      * @param noCursorTimeout
 200  
      *            If true, marks the cursor as not having a timeout.
 201  
      * @param awaitData
 202  
      *            If true and if using a tailable cursor then the connection
 203  
      *            will block waiting for more data.
 204  
      * @param exhaust
 205  
      *            If true, all results should be returned in multiple results.
 206  
      * @param partial
 207  
      *            If true, return the results found and suppress shard down
 208  
      *            errors.
 209  
      */
 210  
     public Query(final String databaseName, final String collectionName,
 211  
             final Document query, final Document returnFields,
 212  
             final int batchSize, final int limit, final int numberToSkip,
 213  
             final boolean tailable, final ReadPreference readPreference,
 214  
             final boolean noCursorTimeout, final boolean awaitData,
 215  
             final boolean exhaust, final boolean partial) {
 216  2330
         super(databaseName, collectionName, readPreference, QueryVersionVisitor
 217  
                 .version(query));
 218  
 
 219  2330
         myQuery = query;
 220  2330
         myReturnFields = returnFields;
 221  2330
         myLimit = limit;
 222  2330
         myBatchSize = batchSize;
 223  2330
         myNumberToSkip = numberToSkip;
 224  2330
         myTailable = tailable;
 225  2330
         myNoCursorTimeout = noCursorTimeout;
 226  2330
         myAwaitData = awaitData;
 227  2330
         myExhaust = exhaust;
 228  2330
         myPartial = partial;
 229  2330
         myMessageSize = -1;
 230  
 
 231  2330
         if (isBatchSizeSet()) {
 232  1411
             if (isLimitSet() && (myLimit <= myBatchSize)) {
 233  761
                 myNumberToReturn = -myLimit;
 234  
             }
 235  
             else {
 236  650
                 myNumberToReturn = myBatchSize;
 237  
             }
 238  
         }
 239  919
         else if (isLimitSet() && (myLimit <= DEFAULT_BATCH_SIZE)) {
 240  185
             myNumberToReturn = -myLimit;
 241  
         }
 242  
         else {
 243  734
             myNumberToReturn = 0;
 244  
         }
 245  2330
     }
 246  
 
 247  
     /**
 248  
      * Determines if the passed object is of this same type as this object and
 249  
      * if so that its fields are equal.
 250  
      * 
 251  
      * @param object
 252  
      *            The object to compare to.
 253  
      * 
 254  
      * @see java.lang.Object#equals(java.lang.Object)
 255  
      */
 256  
     @Override
 257  
     public boolean equals(final Object object) {
 258  265717
         boolean result = false;
 259  265717
         if (this == object) {
 260  744
             result = true;
 261  
         }
 262  264973
         else if ((object != null) && (getClass() == object.getClass())) {
 263  262801
             final Query other = (Query) object;
 264  
 
 265  262801
             result = super.equals(object)
 266  
                     && (myAwaitData == other.myAwaitData)
 267  
                     && (myExhaust == other.myExhaust)
 268  
                     && (myNoCursorTimeout == other.myNoCursorTimeout)
 269  
                     && (myPartial == other.myPartial)
 270  
                     && (myTailable == other.myTailable)
 271  
                     && (myBatchSize == other.myBatchSize)
 272  
                     && (myLimit == other.myLimit)
 273  
                     && (myNumberToReturn == other.myNumberToReturn)
 274  
                     && (myNumberToSkip == other.myNumberToSkip)
 275  
                     && myQuery.equals(other.myQuery)
 276  
                     && ((myReturnFields == other.myReturnFields) || ((myReturnFields != null) && myReturnFields
 277  
                             .equals(other.myReturnFields)));
 278  
         }
 279  265717
         return result;
 280  
     }
 281  
 
 282  
     /**
 283  
      * Returns the number of documents to be returned in each batch of results.
 284  
      * 
 285  
      * @return The number of documents to be returned in each batch of results.
 286  
      */
 287  
     @Override
 288  
     public int getBatchSize() {
 289  82
         return myBatchSize;
 290  
     }
 291  
 
 292  
     /**
 293  
      * Returns the total number of documents to be returned.
 294  
      * 
 295  
      * @return The total number of documents to be returned.
 296  
      */
 297  
     @Override
 298  
     public int getLimit() {
 299  77
         return myLimit;
 300  
     }
 301  
 
 302  
     /**
 303  
      * Returns the number of documents to be returned.
 304  
      * 
 305  
      * @return The number of documents to be returned.
 306  
      */
 307  
     public int getNumberToReturn() {
 308  6
         return myNumberToReturn;
 309  
     }
 310  
 
 311  
     /**
 312  
      * Returns the number of documents to skip before starting to return
 313  
      * documents.
 314  
      * 
 315  
      * @return The number of documents to skip before starting to return
 316  
      *         documents.
 317  
      */
 318  
     public int getNumberToSkip() {
 319  7
         return myNumberToSkip;
 320  
     }
 321  
 
 322  
     /**
 323  
      * {@inheritDoc}
 324  
      * <p>
 325  
      * Overridden to return the name of the operation: "QUERY".
 326  
      * </p>
 327  
      */
 328  
     @Override
 329  
     public String getOperationName() {
 330  1
         return Operation.QUERY.name();
 331  
     }
 332  
 
 333  
     /**
 334  
      * Returns the query document containing the expression to select documents
 335  
      * from the collection.
 336  
      * 
 337  
      * @return The query document containing the expression to select documents
 338  
      *         from the collection.
 339  
      */
 340  
     public Document getQuery() {
 341  32
         return myQuery;
 342  
     }
 343  
 
 344  
     /**
 345  
      * Returns the optional document containing the fields to be returned.
 346  
      * Optional here means this method may return <code>null</code>.
 347  
      * 
 348  
      * @return The optional document containing the fields to be returned.
 349  
      */
 350  
     public Document getReturnFields() {
 351  7
         return myReturnFields;
 352  
     }
 353  
 
 354  
     /**
 355  
      * Computes a reasonable hash code.
 356  
      * 
 357  
      * @return The hash code value.
 358  
      */
 359  
     @Override
 360  
     public int hashCode() {
 361  524901
         int result = 1;
 362  524901
         result = (31 * result) + super.hashCode();
 363  524901
         result = (31 * result) + (myAwaitData ? 1 : 3);
 364  524901
         result = (31 * result) + (myExhaust ? 1 : 7);
 365  524901
         result = (31 * result) + (myNoCursorTimeout ? 1 : 11);
 366  524901
         result = (31 * result) + (myPartial ? 1 : 13);
 367  524901
         result = (31 * result) + (myTailable ? 1 : 19);
 368  524901
         result = (31 * result) + myBatchSize;
 369  524901
         result = (31 * result) + myLimit;
 370  524901
         result = (31 * result) + myNumberToReturn;
 371  524901
         result = (31 * result) + myNumberToSkip;
 372  524901
         result = (31 * result) + myQuery.hashCode();
 373  524901
         result = (31 * result)
 374  
                 + (myReturnFields == null ? 1 : myReturnFields.hashCode());
 375  524901
         return result;
 376  
     }
 377  
 
 378  
     /**
 379  
      * Returns true and if using a tailable cursor then the connection will
 380  
      * block waiting for more data.
 381  
      * 
 382  
      * @return True and if using a tailable cursor then the connection will
 383  
      *         block waiting for more data.
 384  
      */
 385  
     public boolean isAwaitData() {
 386  7
         return myAwaitData;
 387  
     }
 388  
 
 389  
     /**
 390  
      * Returns true if the batch size is greater than zero.
 391  
      * 
 392  
      * @return True if the batch size is greater than zero.
 393  
      */
 394  
     public boolean isBatchSizeSet() {
 395  2330
         return 0 < myBatchSize;
 396  
     }
 397  
 
 398  
     /**
 399  
      * Returns true if all results should be returned in multiple results.
 400  
      * 
 401  
      * @return True if all results should be returned in multiple results.
 402  
      */
 403  
     public boolean isExhaust() {
 404  7
         return myExhaust;
 405  
     }
 406  
 
 407  
     /**
 408  
      * Returns true if the limit is greater than zero.
 409  
      * 
 410  
      * @return True if the limit is greater than zero.
 411  
      */
 412  
     public boolean isLimitSet() {
 413  2330
         return 0 < myLimit;
 414  
     }
 415  
 
 416  
     /**
 417  
      * Returns true if marking the cursor as not having a timeout.
 418  
      * 
 419  
      * @return True if marking the cursor as not having a timeout.
 420  
      */
 421  
     public boolean isNoCursorTimeout() {
 422  7
         return myNoCursorTimeout;
 423  
     }
 424  
 
 425  
     /**
 426  
      * Returns true if return the results found and suppress shard down errors.
 427  
      * 
 428  
      * @return True if return the results found and suppress shard down errors..
 429  
      */
 430  
     public boolean isPartial() {
 431  7
         return myPartial;
 432  
     }
 433  
 
 434  
     /**
 435  
      * Returns true if the cursor created should follow additional documents
 436  
      * being inserted.
 437  
      * 
 438  
      * @return True if the cursor created should follow additional documents
 439  
      *         being inserted.
 440  
      */
 441  
     public boolean isTailable() {
 442  7
         return myTailable;
 443  
     }
 444  
 
 445  
     /**
 446  
      * {@inheritDoc}
 447  
      * <p>
 448  
      * Overridden to return the size of the {@link Query}.
 449  
      * </p>
 450  
      */
 451  
     @Override
 452  
     public int size() {
 453  
 
 454  432
         int size = HEADER_SIZE + 14; // See below.
 455  
         // size += 4; // flags;
 456  432
         size += StringEncoder.utf8Size(myDatabaseName);
 457  
         // size += 1; // StringEncoder.utf8Size(".");
 458  432
         size += StringEncoder.utf8Size(myCollectionName);
 459  
         // size += 1; // \0 on the CString.
 460  
         // size += 4; // numberToSkip
 461  
         // size += 4; // numberToReturn
 462  432
         size += myQuery.size();
 463  432
         if (myReturnFields != null) {
 464  324
             size += myReturnFields.size();
 465  
         }
 466  
 
 467  432
         return size;
 468  
     }
 469  
 
 470  
     /**
 471  
      * {@inheritDoc}
 472  
      * <p>
 473  
      * Overridden to ensure the inserted documents are not too large in
 474  
      * aggregate.
 475  
      * </p>
 476  
      */
 477  
     @Override
 478  
     public void validateSize(final int maxDocumentSize)
 479  
             throws DocumentToLargeException {
 480  62
         if (myMessageSize < 0) {
 481  56
             long size = 0;
 482  56
             if (myQuery != null) {
 483  55
                 size += myQuery.size();
 484  
             }
 485  56
             if (myReturnFields != null) {
 486  4
                 size += myReturnFields.size();
 487  
             }
 488  
 
 489  56
             myMessageSize = (int) size;
 490  
         }
 491  
 
 492  62
         if (maxDocumentSize < myMessageSize) {
 493  1
             throw new DocumentToLargeException(myMessageSize, maxDocumentSize,
 494  
                     myQuery);
 495  
         }
 496  61
     }
 497  
 
 498  
     /**
 499  
      * {@inheritDoc}
 500  
      * <p>
 501  
      * Overridden to write the query message.
 502  
      * </p>
 503  
      * 
 504  
      * @see Message#write(int, BsonOutputStream)
 505  
      */
 506  
     @Override
 507  
     public void write(final int messageId, final BsonOutputStream out)
 508  
             throws IOException {
 509  433
         final int flags = computeFlags();
 510  
 
 511  433
         int size = HEADER_SIZE;
 512  433
         size += 4; // flags;
 513  433
         size += out.sizeOfCString(myDatabaseName, ".", myCollectionName);
 514  433
         size += 4; // numberToSkip
 515  433
         size += 4; // numberToReturn
 516  433
         size += myQuery.size();
 517  433
         if (myReturnFields != null) {
 518  324
             size += myReturnFields.size();
 519  
         }
 520  
 
 521  433
         writeHeader(out, messageId, 0, Operation.QUERY, size);
 522  433
         out.writeInt(flags);
 523  433
         out.writeCString(myDatabaseName, ".", myCollectionName);
 524  433
         out.writeInt(myNumberToSkip);
 525  433
         out.writeInt(myNumberToReturn);
 526  433
         out.writeDocument(myQuery);
 527  433
         if (myReturnFields != null) {
 528  324
             out.writeDocument(myReturnFields);
 529  
         }
 530  433
     }
 531  
 
 532  
     /**
 533  
      * {@inheritDoc}
 534  
      * <p>
 535  
      * Overridden to write the query message.
 536  
      * </p>
 537  
      * 
 538  
      * @see Message#write(int, BsonOutputStream)
 539  
      */
 540  
     @Override
 541  
     public void write(final int messageId, final BufferingBsonOutputStream out)
 542  
             throws IOException {
 543  56
         final int flags = computeFlags();
 544  
 
 545  56
         final long start = writeHeader(out, messageId, 0, Operation.QUERY);
 546  56
         out.writeInt(flags);
 547  56
         out.writeCString(myDatabaseName, ".", myCollectionName);
 548  56
         out.writeInt(myNumberToSkip);
 549  56
         out.writeInt(myNumberToReturn);
 550  56
         out.writeDocument(myQuery);
 551  56
         if (myReturnFields != null) {
 552  2
             out.writeDocument(myReturnFields);
 553  
         }
 554  56
         finishHeader(out, start);
 555  
 
 556  56
         out.flushBuffer();
 557  56
     }
 558  
 
 559  
     /**
 560  
      * Computes the message flags bit field.
 561  
      * 
 562  
      * @return The message flags bit field.
 563  
      */
 564  
     private int computeFlags() {
 565  489
         int flags = 0;
 566  489
         if (myAwaitData) {
 567  218
             flags += AWAIT_DATA_FLAG_BIT;
 568  
         }
 569  489
         if (myExhaust) {
 570  206
             flags += EXHAUST_FLAG_BIT;
 571  
         }
 572  489
         if (myNoCursorTimeout) {
 573  196
             flags += NO_CURSOR_TIMEOUT_FLAG_BIT;
 574  
         }
 575  489
         if (myPartial) {
 576  191
             flags += PARTIAL_FLAG_BIT;
 577  
         }
 578  489
         if (getReadPreference().isSecondaryOk()) {
 579  211
             flags += REPLICA_OK_FLAG_BIT;
 580  
         }
 581  489
         if (myTailable) {
 582  213
             flags += TAILABLE_CURSOR_FLAG_BIT;
 583  
         }
 584  489
         return flags;
 585  
     }
 586  
 }