1 /*
2 * #%L
3 * MongoIteratorImpl.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client;
21
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Future;
28
29 import com.allanbank.mongodb.MongoClient;
30 import com.allanbank.mongodb.MongoDbException;
31 import com.allanbank.mongodb.MongoIterator;
32 import com.allanbank.mongodb.ReadPreference;
33 import com.allanbank.mongodb.bson.Document;
34 import com.allanbank.mongodb.bson.NumericElement;
35 import com.allanbank.mongodb.bson.builder.BuilderFactory;
36 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
37 import com.allanbank.mongodb.bson.element.StringElement;
38 import com.allanbank.mongodb.client.callback.FutureReplyCallback;
39 import com.allanbank.mongodb.client.message.CursorableMessage;
40 import com.allanbank.mongodb.client.message.GetMore;
41 import com.allanbank.mongodb.client.message.KillCursors;
42 import com.allanbank.mongodb.client.message.Reply;
43 import com.allanbank.mongodb.error.CursorNotFoundException;
44 import com.allanbank.mongodb.util.log.Log;
45 import com.allanbank.mongodb.util.log.LogFactory;
46
47 /**
48 * Iterator over the results of the MongoDB cursor.
49 *
50 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
51 * mutated in incompatible ways between any two releases of the driver.
52 * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
53 */
54 public class MongoIteratorImpl implements MongoIterator<Document> {
55
56 /** The log for the iterator. */
57 private static final Log LOG = LogFactory.getLog(MongoIteratorImpl.class);
58
59 /** The size of batches that are requested from the servers. */
60 private int myBatchSize = 0;
61
62 /** The client for sending get_more requests to the server. */
63 private final Client myClient;
64
65 /** The name of the collection the query was originally created on. */
66 private final String myCollectionName;
67
68 /** The iterator over the current set of documents. */
69 private Iterator<Document> myCurrentIterator;
70
71 /** The original query. */
72 private long myCursorId = 0;
73
74 /** The name of the database the query was originally created on. */
75 private final String myDatabaseName;
76
77 /**
78 * The maximum number of document to return from the cursor. Zero or
79 * negative means all.
80 */
81 private int myLimit = 0;
82
83 /** The {@link Future} that will be updated with the next set of results. */
84 private FutureReplyCallback myNextReply;
85
86 /** The read preference to subsequent requests. */
87 private final ReadPreference myReadPerference;
88
89 /**
90 * Flag to shutdown this iterator gracefully without closing the cursor on
91 * the server.
92 */
93 private boolean myShutdown = false;
94
95 /**
96 * Create a new MongoDBInterator.
97 *
98 * @param originalQuery
99 * The original query being iterated over.
100 * @param client
101 * The client for issuing more requests.
102 * @param server
103 * The server that received the original query request.
104 * @param reply
105 * The initial results of the query that are available.
106 */
107 public MongoIteratorImpl(final CursorableMessage originalQuery,
108 final Client client, final String server, final Reply reply) {
109 myNextReply = new FutureReplyCallback();
110 myNextReply.callback(reply);
111
112 myReadPerference = ReadPreference.server(server);
113 myCursorId = 0;
114 myClient = client;
115 myCurrentIterator = null;
116 myBatchSize = originalQuery.getBatchSize();
117 myLimit = originalQuery.getLimit();
118 myDatabaseName = originalQuery.getDatabaseName();
119 myCollectionName = originalQuery.getCollectionName();
120
121 }
122
123 /**
124 * Create a new MongoIteratorImpl from a cursor document.
125 *
126 * @param client
127 * The client interface to the server.
128 * @param cursorDocument
129 * The original query.
130 *
131 * @see MongoIteratorImpl#asDocument()
132 */
133 public MongoIteratorImpl(final Document cursorDocument, final Client client) {
134 final String ns = cursorDocument.get(StringElement.class,
135 NAME_SPACE_FIELD).getValue();
136 String db = ns;
137 String collection = ns;
138 final int index = ns.indexOf('.');
139 if (0 < index) {
140 db = ns.substring(0, index);
141 collection = ns.substring(index + 1);
142 }
143
144 myClient = client;
145 myDatabaseName = db;
146 myCollectionName = collection;
147 myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
148 .getLongValue();
149 myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
150 .getIntValue();
151 myBatchSize = cursorDocument
152 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
153 myReadPerference = ReadPreference.server(cursorDocument.get(
154 StringElement.class, SERVER_FIELD).getValue());
155 }
156
157 /**
158 * {@inheritDoc}
159 * <p>
160 * Overridden to return the active cursor in the defined format.
161 * </p>
162 *
163 * @see ClientImpl#isCursorDocument(Document)
164 */
165 @Override
166 public Document asDocument() {
167 long cursorId = myCursorId;
168 final Future<Reply> replyFuture = myNextReply;
169
170 cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
171
172 if (cursorId != 0) {
173 final DocumentBuilder b = BuilderFactory.start();
174 b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
175 b.add(CURSOR_ID_FIELD, cursorId);
176 b.add(SERVER_FIELD, myReadPerference.getServer());
177 b.add(LIMIT_FIELD, myLimit);
178 b.add(BATCH_SIZE_FIELD, myBatchSize);
179
180 return b.build();
181 }
182
183 return null;
184 }
185
186 /**
187 * {@inheritDoc}
188 * <p>
189 * Overridden to close the iterator and send a {@link KillCursors} for the
190 * open cursor, if any.
191 * </p>
192 */
193 @Override
194 public void close() {
195 long cursorId = myCursorId;
196 final Future<Reply> replyFuture = myNextReply;
197
198 myCurrentIterator = null;
199 myNextReply = null;
200 myCursorId = 0;
201
202 cursorId = retreiveCursorIdFromPendingRequest(cursorId, replyFuture);
203
204 if ((cursorId != 0) && !myShutdown) {
205 // The user asked us to leave the cursor be.
206 myClient.send(new KillCursors(new long[] { cursorId },
207 myReadPerference), null);
208 }
209 }
210
211 /**
212 * {@inheritDoc}
213 * <p>
214 * Overridden to get the batch size from the original query or set
215 * explicitly.
216 * </p>
217 */
218 @Override
219 public int getBatchSize() {
220 return myBatchSize;
221 }
222
223 /**
224 * Returns the iterator's read preference which points to the original
225 * server performing the query.
226 *
227 * @return The iterator's read preference which points to the original
228 * server performing the query.
229 */
230 public ReadPreference getReadPerference() {
231 return myReadPerference;
232 }
233
234 /**
235 * {@inheritDoc}
236 * <p>
237 * Overridden to return true if there are more documents.
238 * </p>
239 */
240 @Override
241 public boolean hasNext() {
242 if (myCurrentIterator == null) {
243 loadDocuments();
244 }
245 else if (!myCurrentIterator.hasNext() && (myNextReply != null)) {
246 loadDocuments();
247 }
248 return myCurrentIterator.hasNext();
249 }
250
251 /**
252 * {@inheritDoc}
253 * <p>
254 * Overridden to return this iterator.
255 * </p>
256 */
257 @Override
258 public Iterator<Document> iterator() {
259 return this;
260 }
261
262 /**
263 * {@inheritDoc}
264 * <p>
265 * Overridden to return the next document from the query.
266 * </p>
267 *
268 * @see java.util.Iterator#next()
269 */
270 @Override
271 public Document next() {
272 if (hasNext()) {
273 return myCurrentIterator.next();
274 }
275 throw new NoSuchElementException("No more documents.");
276 }
277
278 /**
279 * Computes the size for the next batch of documents to get.
280 *
281 * @return The returnNex
282 */
283 public int nextBatchSize() {
284 if ((0 < myLimit) && (myLimit <= myBatchSize)) {
285 return myLimit;
286 }
287 return myBatchSize;
288 }
289
290 /**
291 * {@inheritDoc}
292 * <p>
293 * Overridden to throw and {@link UnsupportedOperationException}.
294 * </p>
295 *
296 * @see java.util.Iterator#remove()
297 */
298 @Override
299 public void remove() {
300 throw new UnsupportedOperationException(
301 "Cannot remove a document via a MongoDB iterator.");
302 }
303
304 /**
305 * Restarts the iterator by sending a request for more documents.
306 *
307 * @throws MongoDbException
308 * On a failure to send the request for more document.
309 */
310 public void restart() throws MongoDbException {
311 sendRequest();
312 }
313
314 /**
315 * {@inheritDoc}
316 * <p>
317 * Overridden to set the batch size.
318 * </p>
319 */
320 @Override
321 public void setBatchSize(final int batchSize) {
322 myBatchSize = batchSize;
323 }
324
325 /**
326 * Stops the iterator after consuming any received and/or requested batches.
327 * <p>
328 * <b>WARNING</b>: This will leave the cursor open on the server. Users
329 * should persist the state of the cursor as returned from
330 * {@link #asDocument()} and restart the cursor using one of the
331 * {@link MongoClient#restart(com.allanbank.mongodb.bson.DocumentAssignable)}
332 * or
333 * {@link MongoClient#restart(com.allanbank.mongodb.StreamCallback, com.allanbank.mongodb.bson.DocumentAssignable)}
334 * methods. Use with extreme caution.
335 * </p>
336 * <p>
337 * The iterator will naturally stop ({@link #hasNext()} will return false)
338 * when the current batch and any already requested batches are finished.
339 * </p>
340 */
341 @Override
342 public void stop() {
343 myShutdown = true;
344 }
345
346 /**
347 * {@inheritDoc}
348 * <p>
349 * Overridden to return the remaining elements as a array.
350 * </p>
351 */
352 @Override
353 public Object[] toArray() {
354 final List<Document> remaining = toList();
355
356 return remaining.toArray();
357 }
358
359 /**
360 * {@inheritDoc}
361 * <p>
362 * Overridden to return the remaining elements as a array.
363 * </p>
364 */
365 @Override
366 public <S> S[] toArray(final S[] to) {
367 final List<Document> remaining = toList();
368
369 return remaining.toArray(to);
370 }
371
372 /**
373 * {@inheritDoc}
374 * <p>
375 * Overridden to return the remaining elements as a list.
376 * </p>
377 */
378 @Override
379 public List<Document> toList() {
380 final List<Document> remaining = new ArrayList<Document>();
381
382 while (hasNext()) {
383 remaining.add(next());
384 }
385
386 return remaining;
387 }
388
389 /**
390 * Returns the client value.
391 *
392 * @return The client value.
393 */
394 protected Client getClient() {
395 return myClient;
396 }
397
398 /**
399 * Returns the collection name.
400 *
401 * @return The collection name.
402 */
403 protected String getCollectionName() {
404 return myCollectionName;
405 }
406
407 /**
408 * Returns the cursor Id value.
409 *
410 * @return The cursor Id value.
411 */
412 protected long getCursorId() {
413 return myCursorId;
414 }
415
416 /**
417 * Returns the database name value.
418 *
419 * @return The database name value.
420 */
421 protected String getDatabaseName() {
422 return myDatabaseName;
423 }
424
425 /**
426 * Returns the limit value.
427 *
428 * @return The limit value.
429 */
430 protected int getLimit() {
431 return myLimit;
432 }
433
434 /**
435 * Loads more documents into the iterator. This iterator issues a get_more
436 * command as soon as the previous results start to be used.
437 *
438 * @throws RuntimeException
439 * On a failure to load documents.
440 */
441 protected void loadDocuments() throws RuntimeException {
442 loadDocuments(true);
443 }
444
445 /**
446 * Loads more documents into the iterator. This iterator issues a get_more
447 * command as soon as the previous results start to be used.
448 *
449 * @param blockForTailable
450 * If true then the method will recursively call itself on a
451 * tailable cursor with no results. This makes the call blocking.
452 * It false then the call will not block. This is used by the
453 * method to ensure that the outermost load blocks but the
454 * recursion is not inifinite.
455 * @return The list of loaded documents.
456 *
457 * @throws RuntimeException
458 * On a failure to load documents.
459 */
460 protected List<Document> loadDocuments(final boolean blockForTailable)
461 throws RuntimeException {
462 List<Document> docs;
463 try {
464 // Pull the reply from the future. Hopefully it is already there!
465 final Reply reply = myNextReply.get();
466 if (reply.isCursorNotFound() || reply.isQueryFailed()) {
467 final long cursorid = myCursorId;
468 myCursorId = 0;
469 throw new CursorNotFoundException(reply, "Cursor id ("
470 + cursorid + ") not found by the MongoDB server.");
471 }
472
473 myCursorId = reply.getCursorId();
474
475 // Setup and iterator over the documents and adjust the limit
476 // for the documents we have. Do this before the fetch again
477 // so the nextBatchSize() has the updated limit.
478 docs = reply.getResults();
479 myCurrentIterator = docs.iterator();
480 if (0 < myLimit) {
481 // Check if we have too many docs.
482 if (myLimit <= docs.size()) {
483 myCurrentIterator = docs.subList(0, myLimit).iterator();
484 if (myCursorId != 0) {
485 // Kill the cursor.
486 myClient.send(new KillCursors(
487 new long[] { myCursorId }, myReadPerference),
488 null);
489 myCursorId = 0;
490 }
491 }
492 myLimit -= docs.size();
493 }
494
495 // Pre-fetch the next set of documents while we iterate over the
496 // documents we just got.
497 if ((myCursorId != 0) && !myShutdown) {
498 sendRequest();
499
500 // Include the (myNextReply != null) to catch failures on the
501 // server.
502 while (docs.isEmpty() && blockForTailable
503 && (myNextReply != null)) {
504 // Tailable - Wait for a reply with documents.
505 docs = loadDocuments(false);
506 }
507 }
508 else {
509 // Exhausted the cursor or are shutting down - no more results.
510 myNextReply = null;
511
512 // Don't need to kill the cursor since we exhausted it or are
513 // shutting down.
514 }
515
516 }
517 catch (final InterruptedException e) {
518 throw new RuntimeException(e);
519 }
520 catch (final ExecutionException e) {
521 throw new RuntimeException(e);
522 }
523
524 return docs;
525 }
526
527 /**
528 * If the current cursor id is zero then waits for the response from the
529 * pending request to determine the real cursor id.
530 *
531 * @param cursorId
532 * The presumed cursor id.
533 * @param replyFuture
534 * The pending reply's future.
535 * @return The best known cursor id.
536 */
537 protected long retreiveCursorIdFromPendingRequest(final long cursorId,
538 final Future<Reply> replyFuture) {
539 // May not have processed any of the results yet...
540 if ((cursorId == 0) && (replyFuture != null)) {
541 try {
542 final Reply reply = replyFuture.get();
543
544 return reply.getCursorId();
545 }
546 catch (final InterruptedException e) {
547 LOG.warn(e, "Interrupted waiting for a query reply: {}",
548 e.getMessage());
549 }
550 catch (final ExecutionException e) {
551 LOG.warn(e, "Interrupted waiting for a query reply: {}",
552 e.getMessage());
553 }
554 }
555 return cursorId;
556 }
557
558 /**
559 * Sends a request for more documents.
560 *
561 * @throws MongoDbException
562 * On a failure to send the request for more document.
563 */
564 protected void sendRequest() throws MongoDbException {
565 final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
566 myCursorId, nextBatchSize(), myReadPerference);
567
568 myNextReply = new FutureReplyCallback();
569 myClient.send(getMore, myNextReply);
570 }
571 }