Coverage Report - com.allanbank.mongodb.client.callback.CursorStreamingCallback
 
Classes in this File Line Coverage Branch Coverage Complexity
CursorStreamingCallback
97%
117/120
96%
31/32
1.76
 
 1  
 /*
 2  
  * #%L
 3  
  * CursorStreamingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.client.callback;
 22  
 
 23  
 import java.util.List;
 24  
 
 25  
 import com.allanbank.mongodb.MongoCursorControl;
 26  
 import com.allanbank.mongodb.MongoDbException;
 27  
 import com.allanbank.mongodb.ReadPreference;
 28  
 import com.allanbank.mongodb.StreamCallback;
 29  
 import com.allanbank.mongodb.bson.Document;
 30  
 import com.allanbank.mongodb.bson.NumericElement;
 31  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 32  
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 33  
 import com.allanbank.mongodb.bson.element.StringElement;
 34  
 import com.allanbank.mongodb.client.Client;
 35  
 import com.allanbank.mongodb.client.MongoIteratorImpl;
 36  
 import com.allanbank.mongodb.client.message.CursorableMessage;
 37  
 import com.allanbank.mongodb.client.message.GetMore;
 38  
 import com.allanbank.mongodb.client.message.KillCursors;
 39  
 import com.allanbank.mongodb.client.message.Query;
 40  
 import com.allanbank.mongodb.client.message.Reply;
 41  
 import com.allanbank.mongodb.error.ReplyException;
 42  
 
 43  
 /**
 44  
  * Callback to convert a {@link CursorableMessage} {@link Reply} into a series
 45  
  * of callback for each document received.
 46  
  * 
 47  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 48  
  *         mutated in incompatible ways between any two releases of the driver.
 49  
  * @copyright 2012-2014, Allanbank Consulting, Inc., All Rights Reserved
 50  
  */
 51  
 public final class CursorStreamingCallback extends
 52  
         AbstractValidatingReplyCallback implements MongoCursorControl,
 53  
         AddressAware {
 54  
 
 55  
     /** The server the original request was sent to. */
 56  
     private volatile String myAddress;
 57  
 
 58  
     /** The requested batch size. */
 59  
     private int myBatchSize;
 60  
 
 61  
     /** The original query. */
 62  
     private final Client myClient;
 63  
 
 64  
     /**
 65  
      * Flag to indictate that the stream has been closed.
 66  
      */
 67  39
     private volatile boolean myClosed = false;
 68  
 
 69  
     /** The name of the collection the query was originally created on. */
 70  
     private final String myCollectionName;
 71  
 
 72  
     /** If true then the callback should expect a command formated cursor reply. */
 73  
     private boolean myCommand;
 74  
 
 75  
     /** The original query. */
 76  39
     private long myCursorId = 0;
 77  
 
 78  
     /** The name of the database the query was originally created on. */
 79  
     private final String myDatabaseName;
 80  
 
 81  
     /** The callback to forward the returned documents to. */
 82  
     private final StreamCallback<Document> myForwardCallback;
 83  
 
 84  
     /**
 85  
      * The maximum number of document to return from the cursor. Zero or
 86  
      * negative means all.
 87  
      */
 88  39
     private int myLimit = 0;
 89  
 
 90  
     /** The original message that started the cursor, if known. */
 91  
     private final CursorableMessage myMessage;
 92  
 
 93  
     /** The last reply. */
 94  
     private volatile Reply myReply;
 95  
 
 96  
     /**
 97  
      * Flag to shutdown this iterator gracefully without closing the cursor on
 98  
      * the server.
 99  
      */
 100  39
     private boolean myShutdown = false;
 101  
 
 102  
     /**
 103  
      * Create a new CursorCallback.
 104  
      * 
 105  
      * @param client
 106  
      *            The client interface to the server.
 107  
      * @param originalMessage
 108  
      *            The original message.
 109  
      * @param command
 110  
      *            If true then the callback should expect a command formated
 111  
      *            cursor reply.
 112  
      * @param results
 113  
      *            The callback to update with each document.
 114  
      */
 115  
     public CursorStreamingCallback(final Client client,
 116  
             final CursorableMessage originalMessage, final boolean command,
 117  36
             final StreamCallback<Document> results) {
 118  
 
 119  36
         myClient = client;
 120  36
         myDatabaseName = originalMessage.getDatabaseName();
 121  36
         myCollectionName = originalMessage.getCollectionName();
 122  36
         myBatchSize = originalMessage.getBatchSize();
 123  36
         myMessage = originalMessage;
 124  36
         myCommand = command;
 125  36
         myForwardCallback = results;
 126  36
         myLimit = originalMessage.getLimit();
 127  36
     }
 128  
 
 129  
     /**
 130  
      * Create a new CursorCallback from a cursor document.
 131  
      * 
 132  
      * @param client
 133  
      *            The client interface to the server.
 134  
      * @param cursorDocument
 135  
      *            The original query.
 136  
      * @param results
 137  
      *            The callback to update with each document.
 138  
      * 
 139  
      * @see MongoIteratorImpl#asDocument()
 140  
      */
 141  
     public CursorStreamingCallback(final Client client,
 142  
             final Document cursorDocument,
 143  3
             final StreamCallback<Document> results) {
 144  
 
 145  3
         final String ns = cursorDocument.get(StringElement.class,
 146  
                 NAME_SPACE_FIELD).getValue();
 147  3
         String db = ns;
 148  3
         String collection = ns;
 149  3
         final int index = ns.indexOf('.');
 150  3
         if (0 < index) {
 151  2
             db = ns.substring(0, index);
 152  2
             collection = ns.substring(index + 1);
 153  
         }
 154  
 
 155  3
         myMessage = null;
 156  3
         myCommand = false;
 157  3
         myClient = client;
 158  3
         myDatabaseName = db;
 159  3
         myCollectionName = collection;
 160  3
         myForwardCallback = results;
 161  3
         myCursorId = cursorDocument.get(NumericElement.class, CURSOR_ID_FIELD)
 162  
                 .getLongValue();
 163  3
         myLimit = cursorDocument.get(NumericElement.class, LIMIT_FIELD)
 164  
                 .getIntValue();
 165  3
         myBatchSize = cursorDocument
 166  
                 .get(NumericElement.class, BATCH_SIZE_FIELD).getIntValue();
 167  3
         myAddress = cursorDocument.get(StringElement.class, SERVER_FIELD)
 168  
                 .getValue();
 169  3
     }
 170  
 
 171  
     /**
 172  
      * {@inheritDoc}
 173  
      * <p>
 174  
      * Overridden to return the current state of the stream as a document.
 175  
      * </p>
 176  
      */
 177  
     @Override
 178  
     public Document asDocument() {
 179  4
         final long cursorId = myCursorId;
 180  
 
 181  4
         if (cursorId != 0) {
 182  2
             final DocumentBuilder b = BuilderFactory.start();
 183  2
             b.add(NAME_SPACE_FIELD, myDatabaseName + "." + myCollectionName);
 184  2
             b.add(CURSOR_ID_FIELD, cursorId);
 185  2
             b.add(SERVER_FIELD, myAddress);
 186  2
             b.add(LIMIT_FIELD, myLimit);
 187  2
             b.add(BATCH_SIZE_FIELD, myBatchSize);
 188  
 
 189  2
             return b.build();
 190  
         }
 191  2
         return null;
 192  
     }
 193  
 
 194  
     /**
 195  
      * Overridden to close the iterator and send a {@link KillCursors} for the
 196  
      * open cursor, if any.
 197  
      */
 198  
     @Override
 199  
     public void close() {
 200  24
         synchronized (myForwardCallback) {
 201  24
             myClosed = true;
 202  24
             sendKill();
 203  24
         }
 204  24
     }
 205  
 
 206  
     /**
 207  
      * {@inheritDoc}
 208  
      * <p>
 209  
      * Overridden to forward the error the the user.
 210  
      * </p>
 211  
      */
 212  
     @Override
 213  
     public void exception(final Throwable thrown) {
 214  
         try {
 215  3
             synchronized (myForwardCallback) {
 216  3
                 myForwardCallback.exception(thrown);
 217  3
             }
 218  
         }
 219  
         finally {
 220  3
             close();
 221  3
         }
 222  3
     }
 223  
 
 224  
     /**
 225  
      * Returns the server the original request was sent to.
 226  
      * 
 227  
      * @return The server the original request was sent to.
 228  
      */
 229  
     public String getAddress() {
 230  5
         return myAddress;
 231  
     }
 232  
 
 233  
     /**
 234  
      * {@inheritDoc}
 235  
      * <p>
 236  
      * Overridden to set the batch size.
 237  
      * </p>
 238  
      */
 239  
     @Override
 240  
     public int getBatchSize() {
 241  5
         return myBatchSize;
 242  
     }
 243  
 
 244  
     /**
 245  
      * Returns the client value.
 246  
      * 
 247  
      * @return The client value.
 248  
      */
 249  
     public Client getClient() {
 250  3
         return myClient;
 251  
     }
 252  
 
 253  
     /**
 254  
      * Returns the collection name.
 255  
      * 
 256  
      * @return The collection name.
 257  
      */
 258  
     public String getCollectionName() {
 259  3
         return myCollectionName;
 260  
     }
 261  
 
 262  
     /**
 263  
      * Returns the cursor Id value.
 264  
      * 
 265  
      * @return The cursor Id value.
 266  
      */
 267  
     public long getCursorId() {
 268  3
         return myCursorId;
 269  
     }
 270  
 
 271  
     /**
 272  
      * Returns the database name value.
 273  
      * 
 274  
      * @return The database name value.
 275  
      */
 276  
     public String getDatabaseName() {
 277  3
         return myDatabaseName;
 278  
     }
 279  
 
 280  
     /**
 281  
      * Returns the limit value.
 282  
      * 
 283  
      * @return The limit value.
 284  
      */
 285  
     public int getLimit() {
 286  3
         return myLimit;
 287  
     }
 288  
 
 289  
     /**
 290  
      * {@inheritDoc}
 291  
      * <p>
 292  
      * Overridden to return false.
 293  
      * </p>
 294  
      */
 295  
     @Override
 296  
     public boolean isLightWeight() {
 297  0
         return false;
 298  
     }
 299  
 
 300  
     /**
 301  
      * Restarts the stream by sending a request for the next batch of documents.
 302  
      * 
 303  
      * @throws MongoDbException
 304  
      *             On a failure to send the request for more document.
 305  
      */
 306  
     public void restart() {
 307  2
         sendRequest();
 308  2
     }
 309  
 
 310  
     /**
 311  
      * Sets the value of the server the original request was sent to.
 312  
      * 
 313  
      * @param address
 314  
      *            The new value for the server the original request was sent to.
 315  
      */
 316  
     @Override
 317  
     public void setAddress(final String address) {
 318  28
         myAddress = address;
 319  
         // For races make sure that the push has the server name.
 320  28
         if (myReply != null) {
 321  1
             final Reply reply = myReply;
 322  1
             myReply = null;
 323  1
             push(reply);
 324  
         }
 325  28
     }
 326  
 
 327  
     /**
 328  
      * {@inheritDoc}
 329  
      * <p>
 330  
      * Overridden to get the batch size.
 331  
      * </p>
 332  
      */
 333  
     @Override
 334  
     public void setBatchSize(final int batchSize) {
 335  1
         myBatchSize = batchSize;
 336  1
     }
 337  
 
 338  
     /**
 339  
      * {@inheritDoc}
 340  
      * <p>
 341  
      * Overridden to stop requesting more batches of documents.
 342  
      * </p>
 343  
      */
 344  
     @Override
 345  
     public void stop() {
 346  1
         myShutdown = true;
 347  1
     }
 348  
 
 349  
     /**
 350  
      * {@inheritDoc}
 351  
      * <p>
 352  
      * Overridden to add the {@link Query} to the exception.
 353  
      * </p>
 354  
      * 
 355  
      * @see AbstractReplyCallback#asError(Reply, int, int, String)
 356  
      */
 357  
     @Override
 358  
     protected MongoDbException asError(final Reply reply, final int okValue,
 359  
             final int errorNumber, final String errorMessage) {
 360  1
         return new ReplyException(okValue, errorNumber, errorMessage,
 361  
                 myMessage, reply);
 362  
     }
 363  
 
 364  
     /**
 365  
      * {@inheritDoc}
 366  
      * <p>
 367  
      * Overridden to push the documents to the application's callback.
 368  
      * </p>
 369  
      * 
 370  
      * @see AbstractReplyCallback#convert(Reply)
 371  
      */
 372  
     @Override
 373  
     protected void handle(final Reply reply) throws MongoDbException {
 374  
         // Handle the first reply being from a command.
 375  29
         Reply result = reply;
 376  29
         if (isCommand()) {
 377  0
             result = CommandCursorTranslator.translate(reply);
 378  
 
 379  
             // But only the first reply...
 380  0
             myCommand = false;
 381  
         }
 382  
 
 383  29
         myReply = result;
 384  29
         if (myAddress != null) {
 385  28
             push(result);
 386  
         }
 387  29
     }
 388  
 
 389  
     /**
 390  
      * Returns true if the callback should expect a command formated cursor
 391  
      * reply.
 392  
      * 
 393  
      * @return True if the callback should expect a command formated cursor
 394  
      *         reply.
 395  
      */
 396  
     protected boolean isCommand() {
 397  29
         return myCommand;
 398  
     }
 399  
 
 400  
     /**
 401  
      * Loads more documents. This issues a get_more command as soon as the
 402  
      * previous results start to be used.
 403  
      * 
 404  
      * @param reply
 405  
      *            The last reply received.
 406  
      * @return The list of loaded documents.
 407  
      * 
 408  
      * @throws RuntimeException
 409  
      *             On a failure to load documents.
 410  
      */
 411  
     protected List<Document> loadDocuments(final Reply reply)
 412  
             throws RuntimeException {
 413  
 
 414  28
         myCursorId = reply.getCursorId();
 415  
 
 416  
         // Setup the documents and adjust the limit for the documents we have.
 417  
         // Do this before the fetch again so the nextBatchSize() has the updated
 418  
         // limit.
 419  28
         List<Document> docs = reply.getResults();
 420  28
         if (0 < myLimit) {
 421  
             // Check if we have too many docs.
 422  9
             if (myLimit <= docs.size()) {
 423  3
                 docs = docs.subList(0, myLimit);
 424  3
                 close();
 425  
             }
 426  9
             myLimit -= docs.size();
 427  
         }
 428  
 
 429  
         // Pre-fetch the next set of documents while we iterate over the
 430  
         // documents we just got.
 431  28
         if ((myCursorId != 0) && !myShutdown) {
 432  5
             sendRequest();
 433  
         }
 434  
         // Exhausted the cursor - no more results.
 435  
         // Don't need to kill the cursor since we exhausted it.
 436  
 
 437  28
         return docs;
 438  
     }
 439  
 
 440  
     /**
 441  
      * Computes the size for the next batch of documents to get.
 442  
      * 
 443  
      * @return The returnNex
 444  
      */
 445  
     protected int nextBatchSize() {
 446  10
         if ((0 < myLimit) && (myLimit <= myBatchSize)) {
 447  2
             return myLimit;
 448  
         }
 449  8
         return myBatchSize;
 450  
     }
 451  
 
 452  
     /**
 453  
      * Sends a {@link KillCursors} message if there is an active cursor.
 454  
      * 
 455  
      * @throws MongoDbException
 456  
      *             On a failure to send the {@link KillCursors} message.
 457  
      */
 458  
     protected void sendKill() throws MongoDbException {
 459  25
         final long cursorId = myCursorId;
 460  25
         if ((cursorId != 0) && !myShutdown) {
 461  4
             myCursorId = 0;
 462  4
             myClient.send(new KillCursors(new long[] { cursorId },
 463  
                     ReadPreference.server(myAddress)), null);
 464  
         }
 465  25
     }
 466  
 
 467  
     /**
 468  
      * Sends a request to start the next match of documents.
 469  
      * 
 470  
      * @throws MongoDbException
 471  
      *             On a failure to send the request.
 472  
      */
 473  
     protected void sendRequest() throws MongoDbException {
 474  7
         final GetMore getMore = new GetMore(myDatabaseName, myCollectionName,
 475  
                 myCursorId, nextBatchSize(), ReadPreference.server(myAddress));
 476  
 
 477  7
         myClient.send(getMore, this);
 478  7
     }
 479  
 
 480  
     /**
 481  
      * Pushes the results from the reply to the application's callback.
 482  
      * 
 483  
      * @param reply
 484  
      *            The reply containing the results to push to the application's
 485  
      *            callback.
 486  
      */
 487  
     private void push(final Reply reply) {
 488  
         // Request the load in the synchronized block so there is only 1
 489  
         // outstanding request.
 490  29
         synchronized (myForwardCallback) {
 491  29
             if (myClosed) {
 492  1
                 myCursorId = reply.getCursorId();
 493  1
                 sendKill();
 494  
             }
 495  
             else {
 496  
                 try {
 497  28
                     for (final Document document : loadDocuments(reply)) {
 498  83
                         myForwardCallback.callback(document);
 499  82
                     }
 500  27
                     if (myCursorId == 0) {
 501  
                         // Signal the end of the results.
 502  21
                         myForwardCallback.done();
 503  
                     }
 504  
                 }
 505  1
                 catch (final RuntimeException re) {
 506  1
                     exception(re);
 507  1
                     close();
 508  27
                 }
 509  
             }
 510  29
         }
 511  29
     }
 512  
 }