Restarting Iterators and Streams

Normally, users will create an iterator or start a stream of documents based on a Find executed on a MongoCollection and consume the complete results of the Find within the lifetime of a single application. In some cases, it is useful to either be able to migrate the processing of the results to a new application instance or to stop processing the results and restart processing those results later. In support of these use cases the driver supports the concept of restarting an iterator or stream.

To restart an iterator or stream the user has to perform the following steps:

  1. Execute a query on a collection to create a MongoIterator or MongoCursorControl object.
  2. Stop the requests for more documents via the iterator or stream and consume all documents already requested from the server.
  3. Persist the state of the iterator or stream.
  4. Restart the iterator or stream from the persisted state.

Executing the query

No special calls are needed to create a restartable MongoIterator or stream. The MongoCursorControl interface defines all of the methods required to suspend the current iteration or stream and capture the state. The MongoIterator interface extends the MongoCursorControl. All of the streamingFind(...) methods of the MongoCollection return a MongoCursorControl reference to control the state of the stream.

MongoCollection collection = ...;

Find.Builder findCriteria = new Find.Builder( where("_id").lessThan(42) ).tailable();

MongoIterator<Document> iter = collection.find( findCriteria );

// or

StreamCallback<Document> streamCallback = ...;
MongoCursorControl streamControl = collection.streamingFind( streamCallback, findCriteria );

There are a few settings on the Find object that are of particular use when planning to restart a stream or iterator.

  • Find.Builder.tailable()

    A tailable cursor can be used with a capped collection to capture new documents that are inserted. A tailable cursor can continue to return documents indefinitely.

  • Find.Builder.setAwaitData(boolean)

    The tailable() method will automatically set the cursor created to await data before returning. This causes the connection being used to not be able to process other requests until data is available or the request times out (typically 2 seconds). The advantage is that in case there is no additional data available the await data avoids the iterator or stream entering a busy spin loop constantly requesting more data.

  • Find.Builder.setImmortalCursor(boolean)

    Of particular use with iterator and stream restarts is setting the MongoDB server side cursor to not timeout. Normally, the server will delete any inactive cursors after a period of time. By setting the cursor to be immortal or not timeout, the auto-delete feature can be disabled at the expense of wasted memory on the server if the cursor is forgotten.

  • Find.Builder.setBatchSize(int)

    The batch size controls how many documents are retrieved from the server at a time. Once a client is sent a group of documents they will not be returned from the server again for the same cursor. Setting a small batch size limits the number of potentially missed documents in case of a failure at the expense of some performance. This is mostly mitigated by the driver's asynchronous nature that allows it to be processing one batch while the request for the next batch is being processed by the server.

Stopping the Requests for More Documents and Finish Consuming Documents

Once an iterator or stream has been started it is important to gracefully stop the flow of documents to the client to ensure that all documents in the results are processed. The MongoCursorControl.stop() method will halt any further requests for documents but allow the document already requested from the server to continue to be processed. At the end of the iteration or stream the cursor's state on the server (including any limit) will be in a well defined state.

MongoIterator<Document> iter = ...;
iter.stop();
// Process the already requested set of documents.
while( iter.hasNext() ) {
  Document doc = iter.next();
}

// or

StreamCallback streamCallback = ...;
MongoCursorControl streamControl = collection.streamingFind( streamCallback, ... );
streamControl.stop();
// The streamCallback.done() method will be called once the already requested set of documents has been exhausted.

Obtaining the State of the Cursor

The MongoCursorControl interface defines the asDocument() method to return the complete state of the stream or iterator. The document can be persisted as JSON using the Json class or simply saved within MongoDB. The document contains several fields that must all be present for the restart to be successful. The order of the fields is not important.

MongoIterator<Document> iter = ...;
Document iteratorState = iter.asDocument();

// or

MongoCursorControl streamControl = ...;
Document streamState = streamControl.asDocument();

Restarting the Iterator or Stream

With the cursor's state document the MongoClient.restart(...) methods can be used to reestablish the iterator or stream. As seen in the code block below, there is no requirement to reestablish an iterator from an iterators state or a stream only from a stream's state. An iterator's state can also be used to establish a new stream or a stream's state used for a iterator. There is also no requirement that the cursor's state be used from the same MongoClient instance, JVM or even machine. The new client will, of course, need to be able to connect to the MongoDB server maintaining the server side cursor state.

MongoClient mongoClient = ...

MongoIterator<Document> iter = ...;
Document iteratorState = iter.asDocument();
StreamCallback streamCallback = ...;
MongoCursorControl restartStreamControl = mongoClient.restart( streamCallback, iteratorState );

// or

MongoCursorControl streamControl = ...;
Document streamState = streamControl.asDocument();
MongoIterator restartIterator = mongoClient.restart( streamState );