View Javadoc
1   /*
2    * #%L
3    * MongoIteratorImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client;
21  
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.NoSuchElementException;
26  import java.util.concurrent.ExecutionException;
27  import java.util.concurrent.Future;
28  
29  import com.allanbank.mongodb.MongoClient;
30  import com.allanbank.mongodb.MongoDbException;
31  import com.allanbank.mongodb.MongoIterator;
32  import com.allanbank.mongodb.ReadPreference;
33  import com.allanbank.mongodb.bson.Document;
34  import com.allanbank.mongodb.bson.NumericElement;
35  import com.allanbank.mongodb.bson.builder.BuilderFactory;
36  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
37  import com.allanbank.mongodb.bson.element.StringElement;
38  import com.allanbank.mongodb.client.callback.FutureReplyCallback;
39  import com.allanbank.mongodb.client.message.CursorableMessage;
40  import com.allanbank.mongodb.client.message.GetMore;
41  import com.allanbank.mongodb.client.message.KillCursors;
42  import com.allanbank.mongodb.client.message.Reply;
43  import com.allanbank.mongodb.error.CursorNotFoundException;
44  import com.allanbank.mongodb.util.log.Log;
45  import com.allanbank.mongodb.util.log.LogFactory;
46  
47  /**
48   * Iterator over the results of the MongoDB cursor.
49   * 
50   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
51   *         mutated in incompatible ways between any two releases of the driver.
52   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
53   */
54  public class MongoIteratorImpl implements MongoIterator<Document> {
55  
56      /** The log for the iterator. */
57      private static final Log LOG = LogFactory.getLog(MongoIteratorImpl.class);
58  
59      /** The size of batches that are requested from the servers. */
60      private int myBatchSize = 0;
61  
62      /** The client for sending get_more requests to the server. */
63      private final Client myClient;
64  
65      /** The name of the collection the query was originally created on. */
66      private final String myCollectionName;
67  
68      /** The iterator over the current set of documents. */
69      private Iterator<Document> myCurrentIterator;
70  
71      /** The original query. */
72      private long myCursorId = 0;
73  
74      /** The name of the database the query was originally created on. */
75      private final String myDatabaseName;
76  
77      /**
78       * The maximum number of document to return from the cursor. Zero or
79       * negative means all.
80       */
81      private int myLimit = 0;
82  
83      /** The {@link Future} that will be updated with the next set of results. */
84      private FutureReplyCallback myNextReply;
85  
86      /** The read preference to subsequent requests. */
87      private final ReadPreference myReadPerference;
88  
89      /**
90       * Flag to shutdown this iterator gracefully without closing the cursor on
91       * the server.
92       */
93      private boolean myShutdown = false;
94  
95      /**
96       * Create a new MongoDBInterator.
97       * 
98       * @param originalQuery
99       *            The original query being iterated over.
100      * @param client
101      *            The client for issuing more requests.
102      * @param server
103      *            The server that received the original query request.
104      * @param reply
105      *            The initial results of the query that are available.
106      */
107     public MongoIteratorImpl(final CursorableMessage originalQuery,
108             final Client client, final String server, final Reply reply) {
109         myNextReply = new FutureReplyCallback();
110         myNextReply.callback(reply);
111 
112         myReadPerference = ReadPreference.server(server);
113         myCursorId = 0;
114         myClient = client;
115         myCurrentIterator = null;
116         myBatchSize = originalQuery.getBatchSize();
117         myLimit = originalQuery.getLimit();
118         myDatabaseName = originalQuery.getDatabaseName();
119         myCollectionName = originalQuery.getCollectionName();
120 
121     }
122 
123     /**
124      * Create a new MongoIteratorImpl from a cursor document.
125      * 
126      * @param client
127      *            The client interface to the server.
128      * @param cursorDocument
129      *            The original query.
130      * 
131      * @see MongoIteratorImpl#asDocument()
132      */
133     public MongoIteratorImpl(final Document cursorDocument, final Client client) {
134         final String ns = cursorDocument.get(StringElement.class,
135                 NAME_SPACE_FIELD).getValue();
136         String db = ns;
137         String collection = ns;
138         final int index = ns.indexOf('.');
139         if (0 < index) {
140             db = ns.substring(0, index);
141             collection = ns.substring(index + 1);
142         }
143 
144         myClient = client;
145         myDatabaseName = db;
146         myCollectionName = collection;
147         myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
148                 .getLongValue();
149         myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
150                 .getIntValue();
151         myBatchSize = cursorDocument
152                 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
153         myReadPerference = ReadPreference.server(cursorDocument.get(
154                 StringElement.class, SERVER_FIELD).getValue());
155     }
156 
157     /**
158      * {@inheritDoc}
159      * <p>
160      * Overridden to return the active cursor in the defined format.
161      * </p>
162      * 
163      * @see ClientImpl#isCursorDocument(Document)
164      */
165     @Override
166     public Document asDocument() {
167         long cursorId = myCursorId;
168         final Future<Reply> replyFuture = myNextReply;
169 
170         cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
171 
172         if (cursorId != 0) {
173             final DocumentBuilder b = BuilderFactory.start();
174             b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
175             b.add(CURSOR_ID_FIELD, cursorId);
176             b.add(SERVER_FIELD, myReadPerference.getServer());
177             b.add(LIMIT_FIELD, myLimit);
178             b.add(BATCH_SIZE_FIELD, myBatchSize);
179 
180             return b.build();
181         }
182 
183         return null;
184     }
185 
186     /**
187      * {@inheritDoc}
188      * <p>
189      * Overridden to close the iterator and send a {@link KillCursors} for the
190      * open cursor, if any.
191      * </p>
192      */
193     @Override
194     public void close() {
195         long cursorId = myCursorId;
196         final Future<Reply> replyFuture = myNextReply;
197 
198         myCurrentIterator = null;
199         myNextReply = null;
200         myCursorId = 0;
201 
202         cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
203 
204         if ((cursorId != 0) && !myShutdown) {
205             // The user asked us to leave the cursor be.
206             myClient.send(new KillCursors(new long[] { cursorId },
207                     myReadPerference), null);
208         }
209     }
210 
211     /**
212      * {@inheritDoc}
213      * <p>
214      * Overridden to get the batch size from the original query or set
215      * explicitly.
216      * </p>
217      */
218     @Override
219     public int getBatchSize() {
220         return myBatchSize;
221     }
222 
223     /**
224      * Returns the iterator's read preference which points to the original
225      * server performing the query.
226      * 
227      * @return The iterator's read preference which points to the original
228      *         server performing the query.
229      */
230     public ReadPreference getReadPerference() {
231         return myReadPerference;
232     }
233 
234     /**
235      * {@inheritDoc}
236      * <p>
237      * Overridden to return true if there are more documents.
238      * </p>
239      */
240     @Override
241     public boolean hasNext() {
242         if (myCurrentIterator == null) {
243             loadDocuments();
244         }
245         else if (!myCurrentIterator.hasNext() && (myNextReply != null)) {
246             loadDocuments();
247         }
248         return myCurrentIterator.hasNext();
249     }
250 
251     /**
252      * {@inheritDoc}
253      * <p>
254      * Overridden to return this iterator.
255      * </p>
256      */
257     @Override
258     public Iterator<Document> iterator() {
259         return this;
260     }
261 
262     /**
263      * {@inheritDoc}
264      * <p>
265      * Overridden to return the next document from the query.
266      * </p>
267      * 
268      * @see java.util.Iterator#next()
269      */
270     @Override
271     public Document next() {
272         if (hasNext()) {
273             return myCurrentIterator.next();
274         }
275         throw new NoSuchElementException("No more documents.");
276     }
277 
278     /**
279      * Computes the size for the next batch of documents to get.
280      * 
281      * @return The returnNex
282      */
283     public int nextBatchSize() {
284         if ((0 < myLimit) && (myLimit <= myBatchSize)) {
285             return myLimit;
286         }
287         return myBatchSize;
288     }
289 
290     /**
291      * {@inheritDoc}
292      * <p>
293      * Overridden to throw and {@link UnsupportedOperationException}.
294      * </p>
295      * 
296      * @see java.util.Iterator#remove()
297      */
298     @Override
299     public void remove() {
300         throw new UnsupportedOperationException(
301                 "Cannot remove a document via a MongoDB iterator.");
302     }
303 
304     /**
305      * Restarts the iterator by sending a request for more documents.
306      * 
307      * @throws MongoDbException
308      *             On a failure to send the request for more document.
309      */
310     public void restart() throws MongoDbException {
311         sendRequest();
312     }
313 
314     /**
315      * {@inheritDoc}
316      * <p>
317      * Overridden to set the batch size.
318      * </p>
319      */
320     @Override
321     public void setBatchSize(final int batchSize) {
322         myBatchSize = batchSize;
323     }
324 
325     /**
326      * Stops the iterator after consuming any received and/or requested batches.
327      * <p>
328      * <b>WARNING</b>: This will leave the cursor open on the server. Users
329      * should persist the state of the cursor as returned from
330      * {@link #asDocument()} and restart the cursor using one of the
331      * {@link MongoClient#restart(com.allanbank.mongodb.bson.DocumentAssignable)}
332      * or
333      * {@link MongoClient#restart(com.allanbank.mongodb.StreamCallback, com.allanbank.mongodb.bson.DocumentAssignable)}
334      * methods. Use with extreme caution.
335      * </p>
336      * <p>
337      * The iterator will naturally stop ({@link #hasNext()} will return false)
338      * when the current batch and any already requested batches are finished.
339      * </p>
340      */
341     @Override
342     public void stop() {
343         myShutdown = true;
344     }
345 
346     /**
347      * {@inheritDoc}
348      * <p>
349      * Overridden to return the remaining elements as a array.
350      * </p>
351      */
352     @Override
353     public Object[] toArray() {
354         final List<Document> remaining = toList();
355 
356         return remaining.toArray();
357     }
358 
359     /**
360      * {@inheritDoc}
361      * <p>
362      * Overridden to return the remaining elements as a array.
363      * </p>
364      */
365     @Override
366     public <S> S[] toArray(final S[] to) {
367         final List<Document> remaining = toList();
368 
369         return remaining.toArray(to);
370     }
371 
372     /**
373      * {@inheritDoc}
374      * <p>
375      * Overridden to return the remaining elements as a list.
376      * </p>
377      */
378     @Override
379     public List<Document> toList() {
380         final List<Document> remaining = new ArrayList<Document>();
381 
382         while (hasNext()) {
383             remaining.add(next());
384         }
385 
386         return remaining;
387     }
388 
389     /**
390      * Returns the client value.
391      * 
392      * @return The client value.
393      */
394     protected Client getClient() {
395         return myClient;
396     }
397 
398     /**
399      * Returns the collection name.
400      * 
401      * @return The collection name.
402      */
403     protected String getCollectionName() {
404         return myCollectionName;
405     }
406 
407     /**
408      * Returns the cursor Id value.
409      * 
410      * @return The cursor Id value.
411      */
412     protected long getCursorId() {
413         return myCursorId;
414     }
415 
416     /**
417      * Returns the database name value.
418      * 
419      * @return The database name value.
420      */
421     protected String getDatabaseName() {
422         return myDatabaseName;
423     }
424 
425     /**
426      * Returns the limit value.
427      * 
428      * @return The limit value.
429      */
430     protected int getLimit() {
431         return myLimit;
432     }
433 
434     /**
435      * Loads more documents into the iterator. This iterator issues a get_more
436      * command as soon as the previous results start to be used.
437      * 
438      * @throws RuntimeException
439      *             On a failure to load documents.
440      */
441     protected void loadDocuments() throws RuntimeException {
442         loadDocuments(true);
443     }
444 
445     /**
446      * Loads more documents into the iterator. This iterator issues a get_more
447      * command as soon as the previous results start to be used.
448      * 
449      * @param blockForTailable
450      *            If true then the method will recursively call itself on a
451      *            tailable cursor with no results. This makes the call blocking.
452      *            It false then the call will not block. This is used by the
453      *            method to ensure that the outermost load blocks but the
454      *            recursion is not inifinite.
455      * @return The list of loaded documents.
456      * 
457      * @throws RuntimeException
458      *             On a failure to load documents.
459      */
460     protected List<Document> loadDocuments(final boolean blockForTailable)
461             throws RuntimeException {
462         List<Document> docs;
463         try {
464             // Pull the reply from the future. Hopefully it is already there!
465             final Reply reply = myNextReply.get();
466             if (reply.isCursorNotFound() || reply.isQueryFailed()) {
467                 final long cursorid = myCursorId;
468                 myCursorId = 0;
469                 throw new CursorNotFoundException(reply, "Cursor id ("
470                         + cursorid + ") not found by the MongoDB server.");
471             }
472 
473             myCursorId = reply.getCursorId();
474 
475             // Setup and iterator over the documents and adjust the limit
476             // for the documents we have. Do this before the fetch again
477             // so the nextBatchSize() has the updated limit.
478             docs = reply.getResults();
479             myCurrentIterator = docs.iterator();
480             if (0 < myLimit) {
481                 // Check if we have too many docs.
482                 if (myLimit <= docs.size()) {
483                     myCurrentIterator = docs.subList(0, myLimit).iterator();
484                     if (myCursorId != 0) {
485                         // Kill the cursor.
486                         myClient.send(new KillCursors(
487                                 new long[] { myCursorId }, myReadPerference),
488                                 null);
489                         myCursorId = 0;
490                     }
491                 }
492                 myLimit -= docs.size();
493             }
494 
495             // Pre-fetch the next set of documents while we iterate over the
496             // documents we just got.
497             if ((myCursorId != 0) && !myShutdown) {
498                 sendRequest();
499 
500                 // Include the (myNextReply != null) to catch failures on the
501                 // server.
502                 while (docs.isEmpty() && blockForTailable
503                         && (myNextReply != null)) {
504                     // Tailable - Wait for a reply with documents.
505                     docs = loadDocuments(false);
506                 }
507             }
508             else {
509                 // Exhausted the cursor or are shutting down - no more results.
510                 myNextReply = null;
511 
512                 // Don't need to kill the cursor since we exhausted it or are
513                 // shutting down.
514             }
515 
516         }
517         catch (final InterruptedException e) {
518             throw new RuntimeException(e);
519         }
520         catch (final ExecutionException e) {
521             throw new RuntimeException(e);
522         }
523 
524         return docs;
525     }
526 
527     /**
528      * If the current cursor id is zero then waits for the response from the
529      * pending request to determine the real cursor id.
530      * 
531      * @param cursorId
532      *            The presumed cursor id.
533      * @param replyFuture
534      *            The pending reply's future.
535      * @return The best known cursor id.
536      */
537     protected long retreiveCursorIdFromPendingRequest(final long cursorId,
538             final Future<Reply> replyFuture) {
539         // May not have processed any of the results yet...
540         if ((cursorId == 0) && (replyFuture != null)) {
541             try {
542                 final Reply reply = replyFuture.get();
543 
544                 return reply.getCursorId();
545             }
546             catch (final InterruptedException e) {
547                 LOG.warn(e, "Interrupted waiting for a query reply: {}",
548                         e.getMessage());
549             }
550             catch (final ExecutionException e) {
551                 LOG.warn(e, "Interrupted waiting for a query reply: {}",
552                         e.getMessage());
553             }
554         }
555         return cursorId;
556     }
557 
558     /**
559      * Sends a request for more documents.
560      * 
561      * @throws MongoDbException
562      *             On a failure to send the request for more document.
563      */
564     protected void sendRequest() throws MongoDbException {
565         final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
566                 myCursorId, nextBatchSize(), myReadPerference);
567 
568         myNextReply = new FutureReplyCallback();
569         myClient.send(getMore, myNextReply);
570     }
571 }