View Javadoc
1   /*
2    * #%L
3    * CursorStreamingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.callback;
22  
23  import java.util.List;
24  
25  import com.allanbank.mongodb.MongoCursorControl;
26  import com.allanbank.mongodb.MongoDbException;
27  import com.allanbank.mongodb.ReadPreference;
28  import com.allanbank.mongodb.StreamCallback;
29  import com.allanbank.mongodb.bson.Document;
30  import com.allanbank.mongodb.bson.NumericElement;
31  import com.allanbank.mongodb.bson.builder.BuilderFactory;
32  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
33  import com.allanbank.mongodb.bson.element.StringElement;
34  import com.allanbank.mongodb.client.Client;
35  import com.allanbank.mongodb.client.MongoIteratorImpl;
36  import com.allanbank.mongodb.client.message.CursorableMessage;
37  import com.allanbank.mongodb.client.message.GetMore;
38  import com.allanbank.mongodb.client.message.KillCursors;
39  import com.allanbank.mongodb.client.message.Query;
40  import com.allanbank.mongodb.client.message.Reply;
41  import com.allanbank.mongodb.error.ReplyException;
42  
43  /**
44   * Callback to convert a {@link CursorableMessage} {@link Reply} into a series
45   * of callback for each document received.
46   * 
47   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
48   *         mutated in incompatible ways between any two releases of the driver.
49   * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
50   */
51  public final class CursorStreamingCallback extends
52          AbstractValidatingReplyCallback implements MongoCursorControl,
53          AddressAware {
54  
55      /** The server the original request was sent to. */
56      private volatile String myAddress;
57  
58      /** The requested batch size. */
59      private int myBatchSize;
60  
61      /** The original query. */
62      private final Client myClient;
63  
64      /**
65       * Flag to indictate that the stream has been closed.
66       */
67      private volatile boolean myClosed = false;
68  
69      /** The name of the collection the query was originally created on. */
70      private final String myCollectionName;
71  
72      /** If true then the callback should expect a command formated cursor reply. */
73      private boolean myCommand;
74  
75      /** The original query. */
76      private long myCursorId = 0;
77  
78      /** The name of the database the query was originally created on. */
79      private final String myDatabaseName;
80  
81      /** The callback to forward the returned documents to. */
82      private final StreamCallback<Document> myForwardCallback;
83  
84      /**
85       * The maximum number of document to return from the cursor. Zero or
86       * negative means all.
87       */
88      private int myLimit = 0;
89  
90      /** The original message that started the cursor, if known. */
91      private final CursorableMessage myMessage;
92  
93      /** The last reply. */
94      private volatile Reply myReply;
95  
96      /**
97       * Flag to shutdown this iterator gracefully without closing the cursor on
98       * the server.
99       */
100     private boolean myShutdown = false;
101 
102     /**
103      * Create a new CursorCallback.
104      * 
105      * @param client
106      *            The client interface to the server.
107      * @param originalMessage
108      *            The original message.
109      * @param command
110      *            If true then the callback should expect a command formated
111      *            cursor reply.
112      * @param results
113      *            The callback to update with each document.
114      */
115     public CursorStreamingCallback(final Client client,
116             final CursorableMessage originalMessage, final boolean command,
117             final StreamCallback<Document> results) {
118 
119         myClient = client;
120         myDatabaseName = originalMessage.getDatabaseName();
121         myCollectionName = originalMessage.getCollectionName();
122         myBatchSize = originalMessage.getBatchSize();
123         myMessage = originalMessage;
124         myCommand = command;
125         myForwardCallback = results;
126         myLimit = originalMessage.getLimit();
127     }
128 
129     /**
130      * Create a new CursorCallback from a cursor document.
131      * 
132      * @param client
133      *            The client interface to the server.
134      * @param cursorDocument
135      *            The original query.
136      * @param results
137      *            The callback to update with each document.
138      * 
139      * @see MongoIteratorImpl#asDocument()
140      */
141     public CursorStreamingCallback(final Client client,
142             final Document cursorDocument,
143             final StreamCallback<Document> results) {
144 
145         final String ns = cursorDocument.get(StringElement.class,
146                 NAME_SPACE_FIELD).getValue();
147         String db = ns;
148         String collection = ns;
149         final int index = ns.indexOf('.');
150         if (0 < index) {
151             db = ns.substring(0, index);
152             collection = ns.substring(index + 1);
153         }
154 
155         myMessage = null;
156         myCommand = false;
157         myClient = client;
158         myDatabaseName = db;
159         myCollectionName = collection;
160         myForwardCallback = results;
161         myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
162                 .getLongValue();
163         myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
164                 .getIntValue();
165         myBatchSize = cursorDocument
166                 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
167         myAddress = cursorDocument.get(StringElement.class, SERVER_FIELD)
168                 .getValue();
169     }
170 
171     /**
172      * {@inheritDoc}
173      * <p>
174      * Overridden to return the current state of the stream as a document.
175      * </p>
176      */
177     @Override
178     public Document asDocument() {
179         final long cursorId = myCursorId;
180 
181         if (cursorId != 0) {
182             final DocumentBuilder b = BuilderFactory.start();
183             b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
184             b.add(CURSOR_ID_FIELD, cursorId);
185             b.add(SERVER_FIELD, myAddress);
186             b.add(LIMIT_FIELD, myLimit);
187             b.add(BATCH_SIZE_FIELD, myBatchSize);
188 
189             return b.build();
190         }
191         return null;
192     }
193 
194     /**
195      * Overridden to close the iterator and send a {@link KillCursors} for the
196      * open cursor, if any.
197      */
198     @Override
199     public void close() {
200         synchronized (myForwardCallback) {
201             myClosed = true;
202             sendKill();
203         }
204     }
205 
206     /**
207      * {@inheritDoc}
208      * <p>
209      * Overridden to forward the error the the user.
210      * </p>
211      */
212     @Override
213     public void exception(final Throwable thrown) {
214         try {
215             synchronized (myForwardCallback) {
216                 myForwardCallback.exception(thrown);
217             }
218         }
219         finally {
220             close();
221         }
222     }
223 
224     /**
225      * Returns the server the original request was sent to.
226      * 
227      * @return The server the original request was sent to.
228      */
229     public String getAddress() {
230         return myAddress;
231     }
232 
233     /**
234      * {@inheritDoc}
235      * <p>
236      * Overridden to set the batch size.
237      * </p>
238      */
239     @Override
240     public int getBatchSize() {
241         return myBatchSize;
242     }
243 
244     /**
245      * Returns the client value.
246      * 
247      * @return The client value.
248      */
249     public Client getClient() {
250         return myClient;
251     }
252 
253     /**
254      * Returns the collection name.
255      * 
256      * @return The collection name.
257      */
258     public String getCollectionName() {
259         return myCollectionName;
260     }
261 
262     /**
263      * Returns the cursor Id value.
264      * 
265      * @return The cursor Id value.
266      */
267     public long getCursorId() {
268         return myCursorId;
269     }
270 
271     /**
272      * Returns the database name value.
273      * 
274      * @return The database name value.
275      */
276     public String getDatabaseName() {
277         return myDatabaseName;
278     }
279 
280     /**
281      * Returns the limit value.
282      * 
283      * @return The limit value.
284      */
285     public int getLimit() {
286         return myLimit;
287     }
288 
289     /**
290      * {@inheritDoc}
291      * <p>
292      * Overridden to return false.
293      * </p>
294      */
295     @Override
296     public boolean isLightWeight() {
297         return false;
298     }
299 
300     /**
301      * Restarts the stream by sending a request for the next batch of documents.
302      * 
303      * @throws MongoDbException
304      *             On a failure to send the request for more document.
305      */
306     public void restart() {
307         sendRequest();
308     }
309 
310     /**
311      * Sets the value of the server the original request was sent to.
312      * 
313      * @param address
314      *            The new value for the server the original request was sent to.
315      */
316     @Override
317     public void setAddress(final String address) {
318         myAddress = address;
319         // For races make sure that the push has the server name.
320         if (myReply != null) {
321             final Reply reply = myReply;
322             myReply = null;
323             push(reply);
324         }
325     }
326 
327     /**
328      * {@inheritDoc}
329      * <p>
330      * Overridden to get the batch size.
331      * </p>
332      */
333     @Override
334     public void setBatchSize(final int batchSize) {
335         myBatchSize = batchSize;
336     }
337 
338     /**
339      * {@inheritDoc}
340      * <p>
341      * Overridden to stop requesting more batches of documents.
342      * </p>
343      */
344     @Override
345     public void stop() {
346         myShutdown = true;
347     }
348 
349     /**
350      * {@inheritDoc}
351      * <p>
352      * Overridden to add the {@link Query} to the exception.
353      * </p>
354      * 
355      * @see AbstractReplyCallback#asError(Reply, int, int, String)
356      */
357     @Override
358     protected MongoDbException asError(final Reply reply, final int okValue,
359             final int errorNumber, final String errorMessage) {
360         return new ReplyException(okValue, errorNumber, errorMessage,
361                 myMessage, reply);
362     }
363 
364     /**
365      * {@inheritDoc}
366      * <p>
367      * Overridden to push the documents to the application's callback.
368      * </p>
369      * 
370      * @see AbstractReplyCallback#convert(Reply)
371      */
372     @Override
373     protected void handle(final Reply reply) throws MongoDbException {
374         // Handle the first reply being from a command.
375         Reply result = reply;
376         if (isCommand()) {
377             result = CommandCursorTranslator.translate(reply);
378 
379             // But only the first reply...
380             myCommand = false;
381         }
382 
383         myReply = result;
384         if (myAddress != null) {
385             push(result);
386         }
387     }
388 
389     /**
390      * Returns true if the callback should expect a command formated cursor
391      * reply.
392      * 
393      * @return True if the callback should expect a command formated cursor
394      *         reply.
395      */
396     protected boolean isCommand() {
397         return myCommand;
398     }
399 
400     /**
401      * Loads more documents. This issues a get_more command as soon as the
402      * previous results start to be used.
403      * 
404      * @param reply
405      *            The last reply received.
406      * @return The list of loaded documents.
407      * 
408      * @throws RuntimeException
409      *             On a failure to load documents.
410      */
411     protected List<Document> loadDocuments(final Reply reply)
412             throws RuntimeException {
413 
414         myCursorId = reply.getCursorId();
415 
416         // Setup the documents and adjust the limit for the documents we have.
417         // Do this before the fetch again so the nextBatchSize() has the updated
418         // limit.
419         List<Document> docs = reply.getResults();
420         if (0 < myLimit) {
421             // Check if we have too many docs.
422             if (myLimit <= docs.size()) {
423                 docs = docs.subList(0, myLimit);
424                 close();
425             }
426             myLimit -= docs.size();
427         }
428 
429         // Pre-fetch the next set of documents while we iterate over the
430         // documents we just got.
431         if ((myCursorId != 0) && !myShutdown) {
432             sendRequest();
433         }
434         // Exhausted the cursor - no more results.
435         // Don't need to kill the cursor since we exhausted it.
436 
437         return docs;
438     }
439 
440     /**
441      * Computes the size for the next batch of documents to get.
442      * 
443      * @return The returnNex
444      */
445     protected int nextBatchSize() {
446         if ((0 < myLimit) && (myLimit <= myBatchSize)) {
447             return myLimit;
448         }
449         return myBatchSize;
450     }
451 
452     /**
453      * Sends a {@link KillCursors} message if there is an active cursor.
454      * 
455      * @throws MongoDbException
456      *             On a failure to send the {@link KillCursors} message.
457      */
458     protected void sendKill() throws MongoDbException {
459         final long cursorId = myCursorId;
460         if ((cursorId != 0) && !myShutdown) {
461             myCursorId = 0;
462             myClient.send(new KillCursors(new long[] { cursorId },
463                     ReadPreference.server(myAddress)), null);
464         }
465     }
466 
467     /**
468      * Sends a request to start the next match of documents.
469      * 
470      * @throws MongoDbException
471      *             On a failure to send the request.
472      */
473     protected void sendRequest() throws MongoDbException {
474         final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
475                 myCursorId, nextBatchSize(), ReadPreference.server(myAddress));
476 
477         myClient.send(getMore, this);
478     }
479 
480     /**
481      * Pushes the results from the reply to the application's callback.
482      * 
483      * @param reply
484      *            The reply containing the results to push to the application's
485      *            callback.
486      */
487     private void push(final Reply reply) {
488         // Request the load in the synchronized block so there is only 1
489         // outstanding request.
490         synchronized (myForwardCallback) {
491             if (myClosed) {
492                 myCursorId = reply.getCursorId();
493                 sendKill();
494             }
495             else {
496                 try {
497                     for (final Document document : loadDocuments(reply)) {
498                         myForwardCallback.callback(document);
499                     }
500                     if (myCursorId == 0) {
501                         // Signal the end of the results.
502                         myForwardCallback.done();
503                     }
504                 }
505                 catch (final RuntimeException re) {
506                     exception(re);
507                     close();
508                 }
509             }
510         }
511     }
512 }