Coverage Report - com.allanbank.mongodb.client.MongoIteratorImpl
 
Classes in this File Line Coverage Branch Coverage Complexity
MongoIteratorImpl
94%
123/130
100%
46/46
2.385
 
 1  
 /*
 2  
  * #%L
 3  
  * MongoIteratorImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client;
 21  
 
 22  
 import java.util.ArrayList;
 23  
 import java.util.Iterator;
 24  
 import java.util.List;
 25  
 import java.util.NoSuchElementException;
 26  
 import java.util.concurrent.ExecutionException;
 27  
 import java.util.concurrent.Future;
 28  
 
 29  
 import com.allanbank.mongodb.MongoClient;
 30  
 import com.allanbank.mongodb.MongoDbException;
 31  
 import com.allanbank.mongodb.MongoIterator;
 32  
 import com.allanbank.mongodb.ReadPreference;
 33  
 import com.allanbank.mongodb.bson.Document;
 34  
 import com.allanbank.mongodb.bson.NumericElement;
 35  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 36  
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 37  
 import com.allanbank.mongodb.bson.element.StringElement;
 38  
 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
 39  
 import com.allanbank.mongodb.client.message.CursorableMessage;
 40  
 import com.allanbank.mongodb.client.message.GetMore;
 41  
 import com.allanbank.mongodb.client.message.KillCursors;
 42  
 import com.allanbank.mongodb.client.message.Reply;
 43  
 import com.allanbank.mongodb.error.CursorNotFoundException;
 44  
 import com.allanbank.mongodb.util.log.Log;
 45  
 import com.allanbank.mongodb.util.log.LogFactory;
 46  
 
 47  
 /**
 48  
  * Iterator over the results of the MongoDB cursor.
 49  
  * 
 50  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 51  
  *         mutated in incompatible ways between any two releases of the driver.
 52  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 53  
  */
 54  25
 public class MongoIteratorImpl implements MongoIterator<Document> {
 55  
 
 56  
     /** The log for the iterator. */
 57  1
     private static final Log LOG = LogFactory.getLog(MongoIteratorImpl.class);
 58  
 
 59  
     /** The size of batches that are requested from the servers. */
 60  50
     private int myBatchSize = 0;
 61  
 
 62  
     /** The client for sending get_more requests to the server. */
 63  
     private final Client myClient;
 64  
 
 65  
     /** The name of the collection the query was originally created on. */
 66  
     private final String myCollectionName;
 67  
 
 68  
     /** The iterator over the current set of documents. */
 69  
     private Iterator<Document> myCurrentIterator;
 70  
 
 71  
     /** The original query. */
 72  50
     private long myCursorId = 0;
 73  
 
 74  
     /** The name of the database the query was originally created on. */
 75  
     private final String myDatabaseName;
 76  
 
 77  
     /**
 78  
      * The maximum number of document to return from the cursor. Zero or
 79  
      * negative means all.
 80  
      */
 81  50
     private int myLimit = 0;
 82  
 
 83  
     /** The {@link Future} that will be updated with the next set of results. */
 84  
     private FutureReplyCallback myNextReply;
 85  
 
 86  
     /** The read preference to subsequent requests. */
 87  
     private final ReadPreference myReadPerference;
 88  
 
 89  
     /**
 90  
      * Flag to shutdown this iterator gracefully without closing the cursor on
 91  
      * the server.
 92  
      */
 93  50
     private boolean myShutdown = false;
 94  
 
 95  
     /**
 96  
      * Create a new MongoDBInterator.
 97  
      * 
 98  
      * @param originalQuery
 99  
      *            The original query being iterated over.
 100  
      * @param client
 101  
      *            The client for issuing more requests.
 102  
      * @param server
 103  
      *            The server that received the original query request.
 104  
      * @param reply
 105  
      *            The initial results of the query that are available.
 106  
      */
 107  
     public MongoIteratorImpl(final CursorableMessage originalQuery,
 108  46
             final Client client, final String server, final Reply reply) {
 109  46
         myNextReply = new FutureReplyCallback();
 110  46
         myNextReply.callback(reply);
 111  
 
 112  46
         myReadPerference = ReadPreference.server(server);
 113  46
         myCursorId = 0;
 114  46
         myClient = client;
 115  46
         myCurrentIterator = null;
 116  46
         myBatchSize = originalQuery.getBatchSize();
 117  46
         myLimit = originalQuery.getLimit();
 118  46
         myDatabaseName = originalQuery.getDatabaseName();
 119  46
         myCollectionName = originalQuery.getCollectionName();
 120  
 
 121  46
     }
 122  
 
 123  
     /**
 124  
      * Create a new MongoIteratorImpl from a cursor document.
 125  
      * 
 126  
      * @param client
 127  
      *            The client interface to the server.
 128  
      * @param cursorDocument
 129  
      *            The original query.
 130  
      * 
 131  
      * @see MongoIteratorImpl#asDocument()
 132  
      */
 133  4
     public MongoIteratorImpl(final Document cursorDocument, final Client client) {
 134  4
         final String ns = cursorDocument.get(StringElement.class,
 135  
                 NAME_SPACE_FIELD).getValue();
 136  4
         String db = ns;
 137  4
         String collection = ns;
 138  4
         final int index = ns.indexOf('.');
 139  4
         if (0 < index) {
 140  3
             db = ns.substring(0, index);
 141  3
             collection = ns.substring(index + 1);
 142  
         }
 143  
 
 144  4
         myClient = client;
 145  4
         myDatabaseName = db;
 146  4
         myCollectionName = collection;
 147  4
         myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
 148  
                 .getLongValue();
 149  4
         myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
 150  
                 .getIntValue();
 151  4
         myBatchSize = cursorDocument
 152  
                 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
 153  4
         myReadPerference = ReadPreference.server(cursorDocument.get(
 154  
                 StringElement.class, SERVER_FIELD).getValue());
 155  4
     }
 156  
 
 157  
     /**
 158  
      * {@inheritDoc}
 159  
      * <p>
 160  
      * Overridden to return the active cursor in the defined format.
 161  
      * </p>
 162  
      * 
 163  
      * @see ClientImpl#isCursorDocument(Document)
 164  
      */
 165  
     @Override
 166  
     public Document asDocument() {
 167  5
         long cursorId = myCursorId;
 168  5
         final Future<Reply> replyFuture = myNextReply;
 169  
 
 170  5
         cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
 171  
 
 172  5
         if (cursorId != 0) {
 173  3
             final DocumentBuilder b = BuilderFactory.start();
 174  3
             b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
 175  3
             b.add(CURSOR_ID_FIELD, cursorId);
 176  3
             b.add(SERVER_FIELD, myReadPerference.getServer());
 177  3
             b.add(LIMIT_FIELD, myLimit);
 178  3
             b.add(BATCH_SIZE_FIELD, myBatchSize);
 179  
 
 180  3
             return b.build();
 181  
         }
 182  
 
 183  2
         return null;
 184  
     }
 185  
 
 186  
     /**
 187  
      * {@inheritDoc}
 188  
      * <p>
 189  
      * Overridden to close the iterator and send a {@link KillCursors} for the
 190  
      * open cursor, if any.
 191  
      * </p>
 192  
      */
 193  
     @Override
 194  
     public void close() {
 195  24
         long cursorId = myCursorId;
 196  24
         final Future<Reply> replyFuture = myNextReply;
 197  
 
 198  24
         myCurrentIterator = null;
 199  24
         myNextReply = null;
 200  24
         myCursorId = 0;
 201  
 
 202  24
         cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
 203  
 
 204  24
         if ((cursorId != 0) && !myShutdown) {
 205  
             // The user asked us to leave the cursor be.
 206  8
             myClient.send(new KillCursors(new long[] { cursorId },
 207  
                     myReadPerference), null);
 208  
         }
 209  24
     }
 210  
 
 211  
     /**
 212  
      * {@inheritDoc}
 213  
      * <p>
 214  
      * Overridden to get the batch size from the original query or set
 215  
      * explicitly.
 216  
      * </p>
 217  
      */
 218  
     @Override
 219  
     public int getBatchSize() {
 220  5
         return myBatchSize;
 221  
     }
 222  
 
 223  
     /**
 224  
      * Returns the iterator's read preference which points to the original
 225  
      * server performing the query.
 226  
      * 
 227  
      * @return The iterator's read preference which points to the original
 228  
      *         server performing the query.
 229  
      */
 230  
     public ReadPreference getReadPerference() {
 231  5
         return myReadPerference;
 232  
     }
 233  
 
 234  
     /**
 235  
      * {@inheritDoc}
 236  
      * <p>
 237  
      * Overridden to return true if there are more documents.
 238  
      * </p>
 239  
      */
 240  
     @Override
 241  
     public boolean hasNext() {
 242  287
         if (myCurrentIterator == null) {
 243  33
             loadDocuments();
 244  
         }
 245  254
         else if (!myCurrentIterator.hasNext() && (myNextReply != null)) {
 246  11
             loadDocuments();
 247  
         }
 248  284
         return myCurrentIterator.hasNext();
 249  
     }
 250  
 
 251  
     /**
 252  
      * {@inheritDoc}
 253  
      * <p>
 254  
      * Overridden to return this iterator.
 255  
      * </p>
 256  
      */
 257  
     @Override
 258  
     public Iterator<Document> iterator() {
 259  14
         return this;
 260  
     }
 261  
 
 262  
     /**
 263  
      * {@inheritDoc}
 264  
      * <p>
 265  
      * Overridden to return the next document from the query.
 266  
      * </p>
 267  
      * 
 268  
      * @see java.util.Iterator#next()
 269  
      */
 270  
     @Override
 271  
     public Document next() {
 272  128
         if (hasNext()) {
 273  127
             return myCurrentIterator.next();
 274  
         }
 275  1
         throw new NoSuchElementException("No more documents.");
 276  
     }
 277  
 
 278  
     /**
 279  
      * Computes the size for the next batch of documents to get.
 280  
      * 
 281  
      * @return The returnNex
 282  
      */
 283  
     public int nextBatchSize() {
 284  1000520
         if ((0 < myLimit) && (myLimit <= myBatchSize)) {
 285  1
             return myLimit;
 286  
         }
 287  1000519
         return myBatchSize;
 288  
     }
 289  
 
 290  
     /**
 291  
      * {@inheritDoc}
 292  
      * <p>
 293  
      * Overridden to throw and {@link UnsupportedOperationException}.
 294  
      * </p>
 295  
      * 
 296  
      * @see java.util.Iterator#remove()
 297  
      */
 298  
     @Override
 299  
     public void remove() {
 300  1
         throw new UnsupportedOperationException(
 301  
                 "Cannot remove a document via a MongoDB iterator.");
 302  
     }
 303  
 
 304  
     /**
 305  
      * Restarts the iterator by sending a request for more documents.
 306  
      * 
 307  
      * @throws MongoDbException
 308  
      *             On a failure to send the request for more document.
 309  
      */
 310  
     public void restart() throws MongoDbException {
 311  2
         sendRequest();
 312  2
     }
 313  
 
 314  
     /**
 315  
      * {@inheritDoc}
 316  
      * <p>
 317  
      * Overridden to set the batch size.
 318  
      * </p>
 319  
      */
 320  
     @Override
 321  
     public void setBatchSize(final int batchSize) {
 322  1
         myBatchSize = batchSize;
 323  1
     }
 324  
 
 325  
     /**
 326  
      * Stops the iterator after consuming any received and/or requested batches.
 327  
      * <p>
 328  
      * <b>WARNING</b>: This will leave the cursor open on the server. Users
 329  
      * should persist the state of the cursor as returned from
 330  
      * {@link #asDocument()} and restart the cursor using one of the
 331  
      * {@link MongoClient#restart(com.allanbank.mongodb.bson.DocumentAssignable)}
 332  
      * or
 333  
      * {@link MongoClient#restart(com.allanbank.mongodb.StreamCallback, com.allanbank.mongodb.bson.DocumentAssignable)}
 334  
      * methods. Use with extreme caution.
 335  
      * </p>
 336  
      * <p>
 337  
      * The iterator will naturally stop ({@link #hasNext()} will return false)
 338  
      * when the current batch and any already requested batches are finished.
 339  
      * </p>
 340  
      */
 341  
     @Override
 342  
     public void stop() {
 343  1
         myShutdown = true;
 344  1
     }
 345  
 
 346  
     /**
 347  
      * {@inheritDoc}
 348  
      * <p>
 349  
      * Overridden to return the remaining elements as a array.
 350  
      * </p>
 351  
      */
 352  
     @Override
 353  
     public Object[] toArray() {
 354  1
         final List<Document> remaining = toList();
 355  
 
 356  1
         return remaining.toArray();
 357  
     }
 358  
 
 359  
     /**
 360  
      * {@inheritDoc}
 361  
      * <p>
 362  
      * Overridden to return the remaining elements as a array.
 363  
      * </p>
 364  
      */
 365  
     @Override
 366  
     public <S> S[] toArray(final S[] to) {
 367  1
         final List<Document> remaining = toList();
 368  
 
 369  1
         return remaining.toArray(to);
 370  
     }
 371  
 
 372  
     /**
 373  
      * {@inheritDoc}
 374  
      * <p>
 375  
      * Overridden to return the remaining elements as a list.
 376  
      * </p>
 377  
      */
 378  
     @Override
 379  
     public List<Document> toList() {
 380  3
         final List<Document> remaining = new ArrayList<Document>();
 381  
 
 382  18
         while (hasNext()) {
 383  15
             remaining.add(next());
 384  
         }
 385  
 
 386  3
         return remaining;
 387  
     }
 388  
 
 389  
     /**
 390  
      * Returns the client value.
 391  
      * 
 392  
      * @return The client value.
 393  
      */
 394  
     protected Client getClient() {
 395  3
         return myClient;
 396  
     }
 397  
 
 398  
     /**
 399  
      * Returns the collection name.
 400  
      * 
 401  
      * @return The collection name.
 402  
      */
 403  
     protected String getCollectionName() {
 404  3
         return myCollectionName;
 405  
     }
 406  
 
 407  
     /**
 408  
      * Returns the cursor Id value.
 409  
      * 
 410  
      * @return The cursor Id value.
 411  
      */
 412  
     protected long getCursorId() {
 413  3
         return myCursorId;
 414  
     }
 415  
 
 416  
     /**
 417  
      * Returns the database name value.
 418  
      * 
 419  
      * @return The database name value.
 420  
      */
 421  
     protected String getDatabaseName() {
 422  3
         return myDatabaseName;
 423  
     }
 424  
 
 425  
     /**
 426  
      * Returns the limit value.
 427  
      * 
 428  
      * @return The limit value.
 429  
      */
 430  
     protected int getLimit() {
 431  3
         return myLimit;
 432  
     }
 433  
 
 434  
     /**
 435  
      * Loads more documents into the iterator. This iterator issues a get_more
 436  
      * command as soon as the previous results start to be used.
 437  
      * 
 438  
      * @throws RuntimeException
 439  
      *             On a failure to load documents.
 440  
      */
 441  
     protected void loadDocuments() throws RuntimeException {
 442  44
         loadDocuments(true);
 443  41
     }
 444  
 
 445  
     /**
 446  
      * Loads more documents into the iterator. This iterator issues a get_more
 447  
      * command as soon as the previous results start to be used.
 448  
      * 
 449  
      * @param blockForTailable
 450  
      *            If true then the method will recursively call itself on a
 451  
      *            tailable cursor with no results. This makes the call blocking.
 452  
      *            It false then the call will not block. This is used by the
 453  
      *            method to ensure that the outermost load blocks but the
 454  
      *            recursion is not inifinite.
 455  
      * @return The list of loaded documents.
 456  
      * 
 457  
      * @throws RuntimeException
 458  
      *             On a failure to load documents.
 459  
      */
 460  
     protected List<Document> loadDocuments(final boolean blockForTailable)
 461  
             throws RuntimeException {
 462  
         List<Document> docs;
 463  
         try {
 464  
             // Pull the reply from the future. Hopefully it is already there!
 465  1000545
             final Reply reply = myNextReply.get();
 466  1000544
             if (reply.isCursorNotFound() || reply.isQueryFailed()) {
 467  2
                 final long cursorid = myCursorId;
 468  2
                 myCursorId = 0;
 469  2
                 throw new CursorNotFoundException(reply, "Cursor id ("
 470  
                         + cursorid + ") not found by the MongoDB server.");
 471  
             }
 472  
 
 473  1000542
             myCursorId = reply.getCursorId();
 474  
 
 475  
             // Setup and iterator over the documents and adjust the limit
 476  
             // for the documents we have. Do this before the fetch again
 477  
             // so the nextBatchSize() has the updated limit.
 478  1000542
             docs = reply.getResults();
 479  1000542
             myCurrentIterator = docs.iterator();
 480  1000542
             if (0 < myLimit) {
 481  
                 // Check if we have too many docs.
 482  5
                 if (myLimit <= docs.size()) {
 483  2
                     myCurrentIterator = docs.subList(0, myLimit).iterator();
 484  2
                     if (myCursorId != 0) {
 485  
                         // Kill the cursor.
 486  1
                         myClient.send(new KillCursors(
 487  
                                 new long[] { myCursorId }, myReadPerference),
 488  
                                 null);
 489  1
                         myCursorId = 0;
 490  
                     }
 491  
                 }
 492  5
                 myLimit -= docs.size();
 493  
             }
 494  
 
 495  
             // Pre-fetch the next set of documents while we iterate over the
 496  
             // documents we just got.
 497  1000542
             if ((myCursorId != 0) && !myShutdown) {
 498  1000515
                 sendRequest();
 499  
 
 500  
                 // Include the (myNextReply != null) to catch failures on the
 501  
                 // server.
 502  2001016
                 while (docs.isEmpty() && blockForTailable
 503  
                         && (myNextReply != null)) {
 504  
                     // Tailable - Wait for a reply with documents.
 505  1000501
                     docs = loadDocuments(false);
 506  
                 }
 507  
             }
 508  
             else {
 509  
                 // Exhausted the cursor or are shutting down - no more results.
 510  27
                 myNextReply = null;
 511  
 
 512  
                 // Don't need to kill the cursor since we exhausted it or are
 513  
                 // shutting down.
 514  
             }
 515  
 
 516  
         }
 517  1
         catch (final InterruptedException e) {
 518  1
             throw new RuntimeException(e);
 519  
         }
 520  0
         catch (final ExecutionException e) {
 521  0
             throw new RuntimeException(e);
 522  1000542
         }
 523  
 
 524  1000542
         return docs;
 525  
     }
 526  
 
 527  
     /**
 528  
      * If the current cursor id is zero then waits for the response from the
 529  
      * pending request to determine the real cursor id.
 530  
      * 
 531  
      * @param cursorId
 532  
      *            The presumed cursor id.
 533  
      * @param replyFuture
 534  
      *            The pending reply's future.
 535  
      * @return The best known cursor id.
 536  
      */
 537  
     protected long retreiveCursorIdFromPendingRequest(final long cursorId,
 538  
             final Future<Reply> replyFuture) {
 539  
         // May not have processed any of the results yet...
 540  29
         if ((cursorId == 0) && (replyFuture != null)) {
 541  
             try {
 542  9
                 final Reply reply = replyFuture.get();
 543  
 
 544  9
                 return reply.getCursorId();
 545  
             }
 546  0
             catch (final InterruptedException e) {
 547  0
                 LOG.warn(e, "Interrupted waiting for a query reply: {}",
 548  
                         e.getMessage());
 549  
             }
 550  0
             catch (final ExecutionException e) {
 551  0
                 LOG.warn(e, "Interrupted waiting for a query reply: {}",
 552  
                         e.getMessage());
 553  0
             }
 554  
         }
 555  20
         return cursorId;
 556  
     }
 557  
 
 558  
     /**
 559  
      * Sends a request for more documents.
 560  
      * 
 561  
      * @throws MongoDbException
 562  
      *             On a failure to send the request for more document.
 563  
      */
 564  
     protected void sendRequest() throws MongoDbException {
 565  1000517
         final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
 566  
                 myCursorId, nextBatchSize(), myReadPerference);
 567  
 
 568  1000517
         myNextReply = new FutureReplyCallback();
 569  1000517
         myClient.send(getMore, myNextReply);
 570  1000517
     }
 571  
 }