Coverage Report - com.allanbank.mongodb.bson.io.BsonInputStream
 
Classes in this File Line Coverage Branch Coverage Complexity
BsonInputStream
90%
247/274
84%
62/73
2.58
BsonInputStream$1
100%
1/1
N/A
2.58
 
 1  
 /*
 2  
  * #%L
 3  
  * BsonInputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.bson.io;
 21  
 
 22  
 import java.io.DataInput;
 23  
 import java.io.EOFException;
 24  
 import java.io.IOException;
 25  
 import java.io.InputStream;
 26  
 import java.io.StreamCorruptedException;
 27  
 import java.nio.charset.Charset;
 28  
 import java.util.ArrayList;
 29  
 import java.util.List;
 30  
 
 31  
 import com.allanbank.mongodb.bson.Document;
 32  
 import com.allanbank.mongodb.bson.Element;
 33  
 import com.allanbank.mongodb.bson.ElementType;
 34  
 import com.allanbank.mongodb.bson.element.ArrayElement;
 35  
 import com.allanbank.mongodb.bson.element.BinaryElement;
 36  
 import com.allanbank.mongodb.bson.element.BooleanElement;
 37  
 import com.allanbank.mongodb.bson.element.DocumentElement;
 38  
 import com.allanbank.mongodb.bson.element.DoubleElement;
 39  
 import com.allanbank.mongodb.bson.element.IntegerElement;
 40  
 import com.allanbank.mongodb.bson.element.JavaScriptElement;
 41  
 import com.allanbank.mongodb.bson.element.JavaScriptWithScopeElement;
 42  
 import com.allanbank.mongodb.bson.element.LongElement;
 43  
 import com.allanbank.mongodb.bson.element.MaxKeyElement;
 44  
 import com.allanbank.mongodb.bson.element.MinKeyElement;
 45  
 import com.allanbank.mongodb.bson.element.MongoTimestampElement;
 46  
 import com.allanbank.mongodb.bson.element.NullElement;
 47  
 import com.allanbank.mongodb.bson.element.ObjectId;
 48  
 import com.allanbank.mongodb.bson.element.ObjectIdElement;
 49  
 import com.allanbank.mongodb.bson.element.RegularExpressionElement;
 50  
 import com.allanbank.mongodb.bson.element.StringElement;
 51  
 import com.allanbank.mongodb.bson.element.SymbolElement;
 52  
 import com.allanbank.mongodb.bson.element.TimestampElement;
 53  
 import com.allanbank.mongodb.bson.element.UuidElement;
 54  
 import com.allanbank.mongodb.bson.impl.RootDocument;
 55  
 
 56  
 /**
 57  
  * {@link BsonInputStream} provides a class to read BSON documents based on the
 58  
  * <a href="http://bsonspec.org/">BSON specification</a>.
 59  
  * 
 60  
  * @api.yes This class is part of the driver's API. Public and protected members
 61  
  *          will be deprecated for at least 1 non-bugfix release (version
 62  
  *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
 63  
  *          removed or modified.
 64  
  * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
 65  
  */
 66  1
 public class BsonInputStream extends InputStream {
 67  
 
 68  
     /** UTF-8 Character set for encoding strings. */
 69  1
     public final static Charset UTF8 = StringDecoder.UTF8;
 70  
 
 71  
     /** The buffered data. */
 72  
     private byte[] myBuffer;
 73  
 
 74  
     /** The offset into the current buffer. */
 75  
     private int myBufferLimit;
 76  
 
 77  
     /** The offset into the current buffer. */
 78  
     private int myBufferOffset;
 79  
 
 80  
     /** Tracks the number of bytes that have been read by the stream. */
 81  
     private long myBytesRead;
 82  
 
 83  
     /** The underlying input stream. */
 84  
     private final InputStream myInput;
 85  
 
 86  
     /** The decoder for strings. */
 87  
     private final StringDecoder myStringDecoder;
 88  
 
 89  
     /**
 90  
      * Creates a BSON document reader.
 91  
      * 
 92  
      * @param input
 93  
      *            the underlying stream to read from.
 94  
      */
 95  
     public BsonInputStream(final InputStream input) {
 96  3336
         this(input, 8 * 1024); // 8K to start.
 97  3336
     }
 98  
 
 99  
     /**
 100  
      * Creates a BSON document reader.
 101  
      * 
 102  
      * @param input
 103  
      *            the underlying stream to read from.
 104  
      * @param expectedMaxDocumentSize
 105  
      *            The expected maximum size for a document. If this guess is
 106  
      *            wrong then there may be incremental allocations of the read
 107  
      *            buffer.
 108  
      */
 109  
     public BsonInputStream(final InputStream input,
 110  
             final int expectedMaxDocumentSize) {
 111  3336
         this(input, expectedMaxDocumentSize, new StringDecoderCache());
 112  3336
     }
 113  
 
 114  
     /**
 115  
      * Creates a BSON document reader.
 116  
      * 
 117  
      * @param input
 118  
      *            the underlying stream to read from.
 119  
      * @param expectedMaxDocumentSize
 120  
      *            The expected maximum size for a document. If this guess is
 121  
      *            wrong then there may be incremental allocations of the read
 122  
      *            buffer.
 123  
      * @param cache
 124  
      *            The cache to use for decoded strings.
 125  
      */
 126  
     public BsonInputStream(final InputStream input,
 127  3509
             final int expectedMaxDocumentSize, final StringDecoderCache cache) {
 128  3509
         myInput = input;
 129  3509
         myBuffer = new byte[expectedMaxDocumentSize];
 130  3509
         myBufferOffset = 0;
 131  3509
         myBufferLimit = 0;
 132  3509
         myBytesRead = 0;
 133  
 
 134  3509
         myStringDecoder = new StringDecoder(cache);
 135  3509
     }
 136  
 
 137  
     /**
 138  
      * Creates a BSON document reader.
 139  
      * 
 140  
      * @param input
 141  
      *            the underlying stream to read from.
 142  
      * @param cache
 143  
      *            The cache to use for decoded strings.
 144  
      */
 145  
     public BsonInputStream(final InputStream input,
 146  
             final StringDecoderCache cache) {
 147  173
         this(input, 8 * 1024, cache); // 8K to start.
 148  173
     }
 149  
 
 150  
     /**
 151  
      * {@inheritDoc}
 152  
      * <p>
 153  
      * Overridden to return the number of bytes in the buffer and from the
 154  
      * source stream.
 155  
      * </p>
 156  
      */
 157  
     @Override
 158  
     public int available() throws IOException {
 159  1
         return availableInBuffer() + myInput.available();
 160  
     }
 161  
 
 162  
     /**
 163  
      * {@inheritDoc}
 164  
      * <p>
 165  
      * Overridden to close the wrapped {@link InputStream}.
 166  
      * </p>
 167  
      */
 168  
     @Override
 169  
     public void close() throws IOException {
 170  24
         myInput.close();
 171  24
     }
 172  
 
 173  
     /**
 174  
      * Returns the number of bytes that have been read by the stream.
 175  
      * 
 176  
      * @return The number of bytes that have been read from the stream.
 177  
      */
 178  
     public long getBytesRead() {
 179  139191
         return myBytesRead + myBufferOffset;
 180  
     }
 181  
 
 182  
     /**
 183  
      * Returns the maximum number of strings that may have their encoded form
 184  
      * cached.
 185  
      * 
 186  
      * @return The maximum number of strings that may have their encoded form
 187  
      *         cached.
 188  
      * @deprecated The cache {@link StringDecoderCache} should be controlled
 189  
      *             directory. This method will be removed after the 2.1.0
 190  
      *             release.
 191  
      */
 192  
     @Deprecated
 193  
     public int getMaxCachedStringEntries() {
 194  0
         return myStringDecoder.getCache().getMaxCacheEntries();
 195  
     }
 196  
 
 197  
     /**
 198  
      * Returns the maximum length for a string that the stream is allowed to
 199  
      * cache.
 200  
      * 
 201  
      * @return The maximum length for a string that the stream is allowed to
 202  
      *         cache.
 203  
      * @deprecated The cache {@link StringDecoderCache} should be controlled
 204  
      *             directory. This method will be removed after the 2.1.0
 205  
      *             release.
 206  
      */
 207  
     @Deprecated
 208  
     public int getMaxCachedStringLength() {
 209  0
         return myStringDecoder.getCache().getMaxCacheLength();
 210  
     }
 211  
 
 212  
     /**
 213  
      * {@inheritDoc}
 214  
      * <p>
 215  
      * Overridden to throw an {@link UnsupportedOperationException}.
 216  
      * </p>
 217  
      */
 218  
     @Override
 219  
     public synchronized void mark(final int readlimit) {
 220  0
         throw new UnsupportedOperationException("Mark not supported.");
 221  
     }
 222  
 
 223  
     /**
 224  
      * {@inheritDoc}
 225  
      * <p>
 226  
      * Overridden to return false.
 227  
      * </p>
 228  
      */
 229  
     @Override
 230  
     public boolean markSupported() {
 231  0
         return false;
 232  
     }
 233  
 
 234  
     /**
 235  
      * Tries to prefetch the requested number of bytes from the underlying
 236  
      * stream.
 237  
      * 
 238  
      * @param size
 239  
      *            The number of bytes to try and read.
 240  
      * @throws IOException
 241  
      *             On a failure to read from the underlying stream.
 242  
      */
 243  
     public final void prefetch(final int size) throws IOException {
 244  37481
         fetch(size, false);
 245  37473
     }
 246  
 
 247  
     /**
 248  
      * {@inheritDoc}
 249  
      * <p>
 250  
      * Overridden to track the bytes that have been read.
 251  
      * </p>
 252  
      */
 253  
     @Override
 254  
     public int read() throws IOException {
 255  139135
         if (ensureFetched(1) != 1) {
 256  20
             return -1; // EOF.
 257  
         }
 258  
 
 259  138963
         final int read = (myBuffer[myBufferOffset] & 0xFF);
 260  
 
 261  138963
         myBufferOffset += 1;
 262  
 
 263  138963
         return read;
 264  
     }
 265  
 
 266  
     /**
 267  
      * {@inheritDoc}
 268  
      * <p>
 269  
      * Overridden to track the bytes that have been read.
 270  
      * </p>
 271  
      */
 272  
     @Override
 273  
     public int read(final byte b[]) throws IOException {
 274  0
         return read(b, 0, b.length);
 275  
     }
 276  
 
 277  
     /**
 278  
      * {@inheritDoc}
 279  
      * <p>
 280  
      * Overridden to track the bytes that have been read.
 281  
      * </p>
 282  
      */
 283  
     @Override
 284  
     public int read(final byte b[], final int off, final int len)
 285  
             throws IOException {
 286  0
         final int read = ensureFetched(len - off);
 287  
 
 288  0
         System.arraycopy(myBuffer, myBufferOffset, b, off, read);
 289  
 
 290  0
         myBufferOffset += read;
 291  
 
 292  0
         return read;
 293  
     }
 294  
 
 295  
     /**
 296  
      * Reads a "cstring" value from the stream:<code>
 297  
      * <pre>
 298  
      * cstring         ::=         (byte*) "\x00"
 299  
      * </pre>
 300  
      * </code>
 301  
      * <p>
 302  
      * <blockquote> CString - Zero or more modified UTF-8 encoded characters
 303  
      * followed by '\x00'. The (byte*) MUST NOT contain '\x00', hence it is not
 304  
      * full UTF-8. </blockquote>
 305  
      * </p>
 306  
      * 
 307  
      * @return The string value.
 308  
      * @throws EOFException
 309  
      *             On insufficient data for the integer.
 310  
      * @throws IOException
 311  
      *             On a failure reading the integer.
 312  
      */
 313  
     public String readCString() throws EOFException, IOException {
 314  
 
 315  
         while (true) {
 316  400798
             for (int i = myBufferOffset; i < myBufferLimit; ++i) {
 317  400798
                 final byte b = myBuffer[i];
 318  400798
                 if (b == 0) {
 319  
                     // Found the end.
 320  70636
                     final int offset = myBufferOffset;
 321  70636
                     final int length = (1 + i) - offset;
 322  
 
 323  
                     // Advance the buffer.
 324  70636
                     myBufferOffset = i + 1;
 325  
 
 326  70636
                     return myStringDecoder.decode(myBuffer, offset, length);
 327  
                 }
 328  
             }
 329  
 
 330  
             // Need more data.
 331  0
             ensureFetched(availableInBuffer() + 1);
 332  
         }
 333  
     }
 334  
 
 335  
     /**
 336  
      * Reads a BSON document element: <code>
 337  
      * <pre>
 338  
      * document         ::=         int32 e_list "\x00"
 339  
      * </pre>
 340  
      * </code>
 341  
      * 
 342  
      * @return The Document.
 343  
      * @throws EOFException
 344  
      *             On insufficient data for the document.
 345  
      * @throws IOException
 346  
      *             On a failure reading the document.
 347  
      */
 348  
     public Document readDocument() throws IOException {
 349  
 
 350  
         // The total length of the document.
 351  5006
         final int size = readInt();
 352  
 
 353  5006
         prefetch(size - 4);
 354  
 
 355  5006
         return new RootDocument(readElements(), false, size);
 356  
     }
 357  
 
 358  
     /**
 359  
      * Reads the complete set of bytes from the stream or throws an
 360  
      * {@link EOFException}.
 361  
      * 
 362  
      * @param buffer
 363  
      *            The buffer into which the data is read.
 364  
      * @exception EOFException
 365  
      *                If the input stream reaches the end before reading all the
 366  
      *                bytes.
 367  
      * @exception IOException
 368  
      *                On an error reading from the underlying stream.
 369  
      */
 370  
     public void readFully(final byte[] buffer) throws EOFException, IOException {
 371  32020
         readFully(buffer, 0, buffer.length);
 372  32020
     }
 373  
 
 374  
     /**
 375  
      * Reads a little-endian 4 byte signed integer from the stream.
 376  
      * 
 377  
      * @return The integer value.
 378  
      * @throws EOFException
 379  
      *             On insufficient data for the integer.
 380  
      * @throws IOException
 381  
      *             On a failure reading the integer.
 382  
      */
 383  
     public int readInt() throws EOFException, IOException {
 384  96724
         if (ensureFetched(4) != 4) {
 385  78
             throw new EOFException();
 386  
         }
 387  
 
 388  
         // Little endian.
 389  96644
         int result = (myBuffer[myBufferOffset] & 0xFF);
 390  96645
         result += (myBuffer[myBufferOffset + 1] & 0xFF) << 8;
 391  96648
         result += (myBuffer[myBufferOffset + 2] & 0xFF) << 16;
 392  96648
         result += (myBuffer[myBufferOffset + 3] & 0xFF) << 24;
 393  
 
 394  96648
         myBufferOffset += 4;
 395  
 
 396  96648
         return result;
 397  
     }
 398  
 
 399  
     /**
 400  
      * Reads a little-endian 8 byte signed integer from the stream.
 401  
      * 
 402  
      * @return The long value.
 403  
      * @throws EOFException
 404  
      *             On insufficient data for the long.
 405  
      * @throws IOException
 406  
      *             On a failure reading the long.
 407  
      */
 408  
     public long readLong() throws EOFException, IOException {
 409  2297
         if (ensureFetched(8) != 8) {
 410  0
             throw new EOFException();
 411  
         }
 412  
 
 413  
         // Little endian.
 414  2297
         long result = (myBuffer[myBufferOffset] & 0xFFL);
 415  2297
         result += (myBuffer[myBufferOffset + 1] & 0xFFL) << 8;
 416  2297
         result += (myBuffer[myBufferOffset + 2] & 0xFFL) << 16;
 417  2297
         result += (myBuffer[myBufferOffset + 3] & 0xFFL) << 24;
 418  2297
         result += (myBuffer[myBufferOffset + 4] & 0xFFL) << 32;
 419  2297
         result += (myBuffer[myBufferOffset + 5] & 0xFFL) << 40;
 420  2297
         result += (myBuffer[myBufferOffset + 6] & 0xFFL) << 48;
 421  2297
         result += (myBuffer[myBufferOffset + 7] & 0xFFL) << 56;
 422  
 
 423  2297
         myBufferOffset += 8;
 424  
 
 425  2297
         return result;
 426  
     }
 427  
 
 428  
     /**
 429  
      * {@inheritDoc}
 430  
      * <p>
 431  
      * Overridden to throw an {@link UnsupportedOperationException}.
 432  
      * </p>
 433  
      */
 434  
     @Override
 435  
     public synchronized void reset() throws UnsupportedOperationException {
 436  0
         throw new UnsupportedOperationException("Mark not supported.");
 437  
     }
 438  
 
 439  
     /**
 440  
      * Sets the value of maximum number of strings that may have their encoded
 441  
      * form cached.
 442  
      * 
 443  
      * @param maxCacheEntries
 444  
      *            The new value for the maximum number of strings that may have
 445  
      *            their encoded form cached.
 446  
      * @deprecated The cache {@link StringDecoderCache} should be controlled
 447  
      *             directory. This method will be removed after the 2.1.0
 448  
      *             release.
 449  
      */
 450  
     @Deprecated
 451  
     public void setMaxCachedStringEntries(final int maxCacheEntries) {
 452  0
         myStringDecoder.getCache().setMaxCacheEntries(maxCacheEntries);
 453  0
     }
 454  
 
 455  
     /**
 456  
      * Sets the value of length for a string that the stream is allowed to cache
 457  
      * to the new value. This can be used to stop a single long string from
 458  
      * pushing useful values out of the cache.
 459  
      * 
 460  
      * @param maxlength
 461  
      *            The new value for the length for a string that the encoder is
 462  
      *            allowed to cache.
 463  
      * @deprecated The cache {@link StringDecoderCache} should be controlled
 464  
      *             directory. This method will be removed after the 2.1.0
 465  
      *             release.
 466  
      */
 467  
     @Deprecated
 468  
     public void setMaxCachedStringLength(final int maxlength) {
 469  0
         myStringDecoder.getCache().setMaxCacheLength(maxlength);
 470  
 
 471  0
     }
 472  
 
 473  
     /**
 474  
      * {@inheritDoc}
 475  
      * <p>
 476  
      * Overridden to track the bytes that have been skipped.
 477  
      * </p>
 478  
      */
 479  
     @Override
 480  
     public long skip(final long n) throws IOException {
 481  
 
 482  0
         long skipped = Math.min(n, availableInBuffer());
 483  0
         myBufferOffset += skipped;
 484  
 
 485  0
         if (skipped < n) {
 486  
             // Exhausted the buffer - skip in the source.
 487  0
             final long streamSkipped = myInput.skip(n - skipped);
 488  0
             myBytesRead += streamSkipped;
 489  0
             skipped += streamSkipped;
 490  
         }
 491  
 
 492  0
         return skipped;
 493  
     }
 494  
 
 495  
     /**
 496  
      * Returns the number of bytes available in the buffer.
 497  
      * 
 498  
      * @return The number of bytes available in the buffer.
 499  
      */
 500  
     protected final int availableInBuffer() {
 501  308114
         return myBufferLimit - myBufferOffset;
 502  
     }
 503  
 
 504  
     /**
 505  
      * Reads a BSON Array element: <code>
 506  
      * <pre>
 507  
      * "\x04" e_name document
 508  
      * </pre>
 509  
      * </code>
 510  
      * 
 511  
      * @return The {@link ArrayElement}.
 512  
      * @throws EOFException
 513  
      *             On insufficient data for the document.
 514  
      * @throws IOException
 515  
      *             On a failure reading the document.
 516  
      */
 517  
     protected ArrayElement readArrayElement() throws IOException {
 518  
 
 519  119
         final long start = getBytesRead() - 1; // Token already read.
 520  
 
 521  119
         final String name = readCString();
 522  119
         final int fetch = readInt(); // The total length of the array elements.
 523  119
         prefetch(fetch - 4);
 524  119
         final List<Element> elements = readElements();
 525  119
         final long size = getBytesRead() - start;
 526  
 
 527  119
         return new ArrayElement(name, elements, size);
 528  
     }
 529  
 
 530  
     /**
 531  
      * Reads a {@link BinaryElement}'s contents: <code><pre>
 532  
      * binary         ::=         int32 subtype (byte*)
 533  
      * subtype         ::=         "\x00"         Binary / Generic
 534  
      *                    |         "\x01"         Function
 535  
      *                    |         "\x02"         Binary (Old)
 536  
      *                    |         "\x03"         UUID
 537  
      *                    |         "\x05"         MD5
 538  
      *                    |         "\x80"         User defined
 539  
      * </pre>
 540  
      * </code>
 541  
      * 
 542  
      * @return The {@link BinaryElement}.
 543  
      * @throws IOException
 544  
      *             On a failure reading the binary data.
 545  
      */
 546  
     protected BinaryElement readBinaryElement() throws IOException {
 547  
 
 548  32019
         final long start = getBytesRead() - 1; // Token already read.
 549  
 
 550  32019
         final String name = readCString();
 551  32019
         int length = readInt();
 552  
 
 553  32019
         final int subType = read();
 554  32019
         if (subType < 0) {
 555  0
             throw new EOFException();
 556  
         }
 557  
 
 558  
         // Old binary handling.
 559  32019
         if (subType == 2) {
 560  8
             final int anotherLength = readInt();
 561  
 
 562  
             assert (anotherLength == (length - 4)) : "Binary Element Subtye 2 "
 563  8
                     + "length should be outer length - 4.";
 564  
 
 565  8
             length -= 4;
 566  8
         }
 567  32011
         else if ((subType == UuidElement.LEGACY_UUID_SUBTTYPE)
 568  
                 || (subType == UuidElement.UUID_SUBTTYPE)) {
 569  
 
 570  3
             final byte[] binary = new byte[length];
 571  3
             readFully(binary);
 572  
 
 573  3
             final long size = getBytesRead() - start;
 574  
             try {
 575  3
                 return new UuidElement(name, (byte) subType, binary, size);
 576  
             }
 577  1
             catch (final IllegalArgumentException iae) {
 578  
                 // Just use the vanilla BinaryElement.
 579  1
                 return new BinaryElement(name, (byte) subType, binary, size);
 580  
             }
 581  
         }
 582  
 
 583  32016
         final long size = getBytesRead() - start;
 584  32016
         return new BinaryElement(name, (byte) subType, this, length, size
 585  
                 + length);
 586  
     }
 587  
 
 588  
     /**
 589  
      * Reads a {@link BooleanElement} from the stream.
 590  
      * 
 591  
      * @return The {@link BooleanElement}.
 592  
      * @throws IOException
 593  
      *             On a failure to read the contents of the
 594  
      *             {@link BooleanElement}.
 595  
      */
 596  
     protected BooleanElement readBooleanElement() throws IOException {
 597  154
         final long start = getBytesRead() - 1; // Token already read.
 598  
 
 599  154
         final String name = readCString();
 600  154
         final boolean value = (read() == 1);
 601  
 
 602  154
         final long size = getBytesRead() - start;
 603  
 
 604  154
         return new BooleanElement(name, value, size);
 605  
     }
 606  
 
 607  
     /**
 608  
      * Reads a {@code DBPointerElement} from the stream.
 609  
      * 
 610  
      * @return The {@code DBPointerElement}.
 611  
      * @throws IOException
 612  
      *             On a failure to read the contents of the
 613  
      *             {@code DBPointerElement}.
 614  
      * @deprecated Per the BSON specification.
 615  
      */
 616  
     @Deprecated
 617  
     protected Element readDBPointerElement() throws IOException {
 618  8
         final long start = getBytesRead() - 1; // Token already read.
 619  
 
 620  8
         final String name = readCString();
 621  8
         final String dbDotCollection = readString();
 622  8
         final int timestamp = EndianUtils.swap(readInt());
 623  8
         final long machineId = EndianUtils.swap(readLong());
 624  
 
 625  8
         final long size = getBytesRead() - start;
 626  
 
 627  8
         String db = dbDotCollection;
 628  8
         String collection = "";
 629  8
         final int firstDot = dbDotCollection.indexOf('.');
 630  8
         if (0 <= firstDot) {
 631  8
             db = dbDotCollection.substring(0, firstDot);
 632  8
             collection = dbDotCollection.substring(firstDot + 1);
 633  
         }
 634  8
         return new com.allanbank.mongodb.bson.element.DBPointerElement(name,
 635  
                 db, collection, new ObjectId(timestamp, machineId), size);
 636  
     }
 637  
 
 638  
     /**
 639  
      * Reads a BSON Subdocument element: <code>
 640  
      * <pre>
 641  
      * "\x03" e_name document
 642  
      * </pre>
 643  
      * </code>
 644  
      * 
 645  
      * @return The {@link ArrayElement}.
 646  
      * @throws IOException
 647  
      *             On a failure reading the document.
 648  
      */
 649  
     protected DocumentElement readDocumentElement() throws IOException {
 650  32128
         final long start = getBytesRead() - 1; // Token already read.
 651  
 
 652  32128
         final String name = readCString();
 653  32128
         final int fetch = readInt(); // The total length of the sub-document
 654  
         // elements.
 655  32128
         prefetch(fetch - 4);
 656  32128
         final List<Element> elements = readElements();
 657  
 
 658  32128
         final long size = getBytesRead() - start;
 659  
 
 660  32128
         return new DocumentElement(name, elements, true, size);
 661  
     }
 662  
 
 663  
     /**
 664  
      * Reads a {@link DoubleElement} from the stream.
 665  
      * 
 666  
      * @return The {@link DoubleElement}.
 667  
      * @throws IOException
 668  
      *             On a failure to read the contents of the
 669  
      *             {@link DoubleElement}.
 670  
      */
 671  
     protected DoubleElement readDoubleElement() throws IOException {
 672  9
         final long start = getBytesRead() - 1; // Token already read.
 673  
 
 674  9
         final String name = readCString();
 675  9
         final double value = Double.longBitsToDouble(readLong());
 676  
 
 677  9
         final long size = getBytesRead() - start;
 678  
 
 679  9
         return new DoubleElement(name, value, size);
 680  
     }
 681  
 
 682  
     /**
 683  
      * Reads the element:<code>
 684  
      * <pre>
 685  
      * element         ::=         "\x01" e_name double                         Floating point
 686  
      *                    |         "\x02" e_name string                         UTF-8 string
 687  
      *                    |         "\x03" e_name document                         Embedded document
 688  
      *                    |         "\x04" e_name document                         Array
 689  
      *                    |         "\x05" e_name binary                         Binary data
 690  
      *                    |         "\x06" e_name                                         Undefined — Deprecated
 691  
      *                    |         "\x07" e_name (byte*12)                 ObjectId
 692  
      *                    |         "\x08" e_name "\x00"                         Boolean "false"
 693  
      *                    |         "\x08" e_name "\x01"                         Boolean "true"
 694  
      *                    |         "\x09" e_name int64                         UTC datetime
 695  
      *                    |         "\x0A" e_name                                         Null value
 696  
      *                    |         "\x0B" e_name cstring cstring         Regular expression
 697  
      *                    |         "\x0C" e_name string (byte*12)         DBPointer — Deprecated
 698  
      *                    |         "\x0D" e_name string                         JavaScript code
 699  
      *                    |         "\x0E" e_name string                         Symbol
 700  
      *                    |         "\x0F" e_name code_w_s                         JavaScript code w/ scope
 701  
      *                    |         "\x10" e_name int32                         32-bit Integer
 702  
      *                    |         "\x11" e_name int64                         Timestamp
 703  
      *                    |         "\x12" e_name int64                         64-bit integer
 704  
      *                    |         "\xFF" e_name                                         Min key
 705  
      *                    |         "\x7F" e_name                                         Max key
 706  
      * </pre>
 707  
      * </code>
 708  
      * 
 709  
      * @param token
 710  
      *            The element's token.
 711  
      * @return The Element.
 712  
      * @throws EOFException
 713  
      *             On insufficient data for the element.
 714  
      * @throws IOException
 715  
      *             On a failure reading the element.
 716  
      */
 717  
     @SuppressWarnings("deprecation")
 718  
     protected Element readElement(final byte token) throws EOFException,
 719  
             IOException {
 720  68626
         final ElementType type = ElementType.valueOf(token);
 721  68626
         if (type == null) {
 722  1
             throw new StreamCorruptedException("Unknown element type: 0x"
 723  
                     + Integer.toHexString(token & 0xFF) + ".");
 724  
         }
 725  1
         switch (type) {
 726  
         case ARRAY: {
 727  119
             return readArrayElement();
 728  
         }
 729  
         case BINARY: {
 730  32019
             return readBinaryElement();
 731  
         }
 732  
         case DB_POINTER: {
 733  8
             return readDBPointerElement();
 734  
         }
 735  
         case DOCUMENT: {
 736  32128
             return readDocumentElement();
 737  
         }
 738  
         case DOUBLE: {
 739  9
             return readDoubleElement();
 740  
         }
 741  
         case BOOLEAN: {
 742  154
             return readBooleanElement();
 743  
         }
 744  
         case INTEGER: {
 745  3649
             return readIntegerElement();
 746  
         }
 747  
         case JAVA_SCRIPT: {
 748  8
             return readJavaScriptElement();
 749  
         }
 750  
         case JAVA_SCRIPT_WITH_SCOPE: {
 751  8
             return readJavaScriptWithScopeElement();
 752  
         }
 753  
         case LONG: {
 754  10
             return readLongElement();
 755  
         }
 756  
         case MAX_KEY: {
 757  8
             return readMaxKeyElement();
 758  
         }
 759  
         case MIN_KEY: {
 760  8
             return readMinKeyElement();
 761  
         }
 762  
         case MONGO_TIMESTAMP: {
 763  8
             return readMongoTimestampElement();
 764  
         }
 765  
         case NULL: {
 766  8
             return readNullElement();
 767  
         }
 768  
         case OBJECT_ID: {
 769  8
             return readObjectIdElement();
 770  
         }
 771  
         case REGEX: {
 772  14
             return readRegularExpressionElement();
 773  
         }
 774  
         case STRING: {
 775  439
             return readStringElement();
 776  
         }
 777  
         case SYMBOL: {
 778  8
             return readSymbolElement();
 779  
         }
 780  
         case UTC_TIMESTAMP: {
 781  12
             return readTimestampElement();
 782  
         }
 783  
         }
 784  
 
 785  0
         throw new StreamCorruptedException("Unknown element type: "
 786  
                 + type.name() + ".");
 787  
     }
 788  
 
 789  
     /**
 790  
      * Reads a BSON element list (e_list): <code>
 791  
      * <pre>
 792  
      * e_list         ::=         element e_list | ""
 793  
      * </pre>
 794  
      * </code>
 795  
      * 
 796  
      * @return The list of elements.
 797  
      * @throws EOFException
 798  
      *             On insufficient data for the elements.
 799  
      * @throws IOException
 800  
      *             On a failure reading the elements.
 801  
      */
 802  
     protected List<Element> readElements() throws EOFException, IOException {
 803  37253
         final List<Element> elements = new ArrayList<Element>();
 804  37253
         int elementToken = read();
 805  105877
         while (elementToken > 0) {
 806  68626
             elements.add(readElement((byte) elementToken));
 807  
 
 808  68623
             elementToken = read();
 809  
         }
 810  37251
         if (elementToken < 0) {
 811  0
             throw new EOFException();
 812  
         }
 813  37251
         return elements;
 814  
     }
 815  
 
 816  
     /**
 817  
      * Reads a {@link IntegerElement} from the stream.
 818  
      * 
 819  
      * @return The {@link IntegerElement}.
 820  
      * @throws IOException
 821  
      *             On a failure to read the contents of the
 822  
      *             {@link IntegerElement}.
 823  
      */
 824  
     protected IntegerElement readIntegerElement() throws IOException {
 825  3649
         final long start = getBytesRead() - 1; // Token already read.
 826  
 
 827  3649
         final String name = readCString();
 828  3649
         final int value = readInt();
 829  
 
 830  3649
         final long size = getBytesRead() - start;
 831  
 
 832  3649
         return new IntegerElement(name, value, size);
 833  
     }
 834  
 
 835  
     /**
 836  
      * Reads a {@link JavaScriptElement} from the stream.
 837  
      * 
 838  
      * @return The {@link JavaScriptElement}.
 839  
      * @throws IOException
 840  
      *             On a failure to read the contents of the
 841  
      *             {@link JavaScriptElement}.
 842  
      */
 843  
     protected JavaScriptElement readJavaScriptElement() throws IOException {
 844  8
         final long start = getBytesRead() - 1; // Token already read.
 845  
 
 846  8
         final String name = readCString();
 847  8
         final String javascript = readString();
 848  
 
 849  8
         final long size = getBytesRead() - start;
 850  
 
 851  8
         return new JavaScriptElement(name, javascript, size);
 852  
     }
 853  
 
 854  
     /**
 855  
      * Reads a {@link JavaScriptWithScopeElement} from the stream.
 856  
      * 
 857  
      * @return The {@link JavaScriptWithScopeElement}.
 858  
      * @throws IOException
 859  
      *             On a failure to read the contents of the
 860  
      *             {@link JavaScriptWithScopeElement}.
 861  
      */
 862  
     protected JavaScriptWithScopeElement readJavaScriptWithScopeElement()
 863  
             throws IOException {
 864  8
         final long start = getBytesRead() - 1; // Token already read.
 865  
 
 866  8
         final String name = readCString();
 867  8
         readInt(); // Total length - not used.
 868  8
         final String javascript = readString();
 869  8
         final Document scope = readDocument();
 870  
 
 871  8
         final long size = getBytesRead() - start;
 872  
 
 873  8
         return new JavaScriptWithScopeElement(name, javascript, scope, size);
 874  
     }
 875  
 
 876  
     /**
 877  
      * Reads a {@link LongElement} from the stream.
 878  
      * 
 879  
      * @return The {@link LongElement}.
 880  
      * @throws IOException
 881  
      *             On a failure to read the contents of the {@link LongElement}.
 882  
      */
 883  
     protected LongElement readLongElement() throws IOException {
 884  10
         final long start = getBytesRead() - 1; // Token already read.
 885  
 
 886  10
         final String name = readCString();
 887  10
         final long value = readLong();
 888  
 
 889  10
         final long size = getBytesRead() - start;
 890  
 
 891  10
         return new LongElement(name, value, size);
 892  
     }
 893  
 
 894  
     /**
 895  
      * Reads a {@link MaxKeyElement} from the stream.
 896  
      * 
 897  
      * @return The {@link MaxKeyElement}.
 898  
      * @throws IOException
 899  
      *             On a failure to read the contents of the
 900  
      *             {@link MaxKeyElement}.
 901  
      */
 902  
     protected MaxKeyElement readMaxKeyElement() throws IOException {
 903  8
         final long start = getBytesRead() - 1; // Token already read.
 904  
 
 905  8
         final String name = readCString();
 906  
 
 907  8
         final long size = getBytesRead() - start;
 908  
 
 909  8
         return new MaxKeyElement(name, size);
 910  
     }
 911  
 
 912  
     /**
 913  
      * Reads a {@link MinKeyElement} from the stream.
 914  
      * 
 915  
      * @return The {@link MinKeyElement}.
 916  
      * @throws IOException
 917  
      *             On a failure to read the contents of the
 918  
      *             {@link MinKeyElement}.
 919  
      */
 920  
     protected MinKeyElement readMinKeyElement() throws IOException {
 921  8
         final long start = getBytesRead() - 1; // Token already read.
 922  
 
 923  8
         final String name = readCString();
 924  
 
 925  8
         final long size = getBytesRead() - start;
 926  
 
 927  8
         return new MinKeyElement(name, size);
 928  
     }
 929  
 
 930  
     /**
 931  
      * Reads a {@link MongoTimestampElement} from the stream.
 932  
      * 
 933  
      * @return The {@link MongoTimestampElement}.
 934  
      * @throws IOException
 935  
      *             On a failure to read the contents of the
 936  
      *             {@link MongoTimestampElement}.
 937  
      */
 938  
     protected MongoTimestampElement readMongoTimestampElement()
 939  
             throws IOException {
 940  8
         final long start = getBytesRead() - 1; // Token already read.
 941  
 
 942  8
         final String name = readCString();
 943  8
         final long timestamp = readLong();
 944  
 
 945  8
         final long size = getBytesRead() - start;
 946  
 
 947  8
         return new MongoTimestampElement(name, timestamp, size);
 948  
     }
 949  
 
 950  
     /**
 951  
      * Reads a {@link NullElement} from the stream.
 952  
      * 
 953  
      * @return The {@link NullElement}.
 954  
      * @throws IOException
 955  
      *             On a failure to read the contents of the {@link NullElement}.
 956  
      */
 957  
     protected NullElement readNullElement() throws IOException {
 958  8
         final long start = getBytesRead() - 1; // Token already read.
 959  
 
 960  8
         final String name = readCString();
 961  
 
 962  8
         final long size = getBytesRead() - start;
 963  
 
 964  8
         return new NullElement(name, size);
 965  
     }
 966  
 
 967  
     /**
 968  
      * Reads a {@link ObjectIdElement} from the stream.
 969  
      * 
 970  
      * @return The {@link ObjectIdElement}.
 971  
      * @throws IOException
 972  
      *             On a failure to read the contents of the
 973  
      *             {@link ObjectIdElement}.
 974  
      */
 975  
     protected ObjectIdElement readObjectIdElement() throws IOException {
 976  8
         final long start = getBytesRead() - 1; // Token already read.
 977  
 
 978  8
         final String name = readCString();
 979  8
         final int timestamp = EndianUtils.swap(readInt());
 980  8
         final long machineId = EndianUtils.swap(readLong());
 981  
 
 982  8
         final long size = getBytesRead() - start;
 983  
 
 984  8
         return new ObjectIdElement(name, new ObjectId(timestamp, machineId),
 985  
                 size);
 986  
     }
 987  
 
 988  
     /**
 989  
      * Reads a {@link RegularExpressionElement} from the stream.
 990  
      * 
 991  
      * @return The {@link RegularExpressionElement}.
 992  
      * @throws IOException
 993  
      *             On a failure to read the contents of the
 994  
      *             {@link RegularExpressionElement}.
 995  
      */
 996  
     protected RegularExpressionElement readRegularExpressionElement()
 997  
             throws IOException {
 998  14
         final long start = getBytesRead() - 1; // Token already read.
 999  
 
 1000  14
         final String name = readCString();
 1001  14
         final String pattern = readCString();
 1002  14
         final String options = readCString();
 1003  
 
 1004  14
         final long size = getBytesRead() - start;
 1005  
 
 1006  14
         return new RegularExpressionElement(name, pattern, options, size);
 1007  
     }
 1008  
 
 1009  
     /**
 1010  
      * Reads a "string" value from the stream:<code>
 1011  
      * <pre>
 1012  
      * string         ::=         int32 (byte*) "\x00"
 1013  
      * </pre>
 1014  
      * </code>
 1015  
      * <p>
 1016  
      * <blockquote>String - The int32 is the number bytes in the (byte*) + 1
 1017  
      * (for the trailing '\x00'). The (byte*) is zero or more UTF-8 encoded
 1018  
      * characters. </blockquote>
 1019  
      * </p>
 1020  
      * 
 1021  
      * @return The string value.
 1022  
      * @throws EOFException
 1023  
      *             On insufficient data for the integer.
 1024  
      * @throws IOException
 1025  
      *             On a failure reading the integer.
 1026  
      */
 1027  
     protected String readString() throws EOFException, IOException {
 1028  471
         final int length = readInt();
 1029  
 
 1030  471
         if (ensureFetched(length) != length) {
 1031  1
             throw new EOFException();
 1032  
         }
 1033  
 
 1034  470
         final int offset = myBufferOffset;
 1035  
 
 1036  
         // Advance the buffer.
 1037  470
         myBufferOffset += length;
 1038  
 
 1039  470
         return myStringDecoder.decode(myBuffer, offset, length);
 1040  
     }
 1041  
 
 1042  
     /**
 1043  
      * Reads a {@link StringElement} from the stream.
 1044  
      * 
 1045  
      * @return The {@link StringElement}.
 1046  
      * @throws IOException
 1047  
      *             On a failure to read the contents of the
 1048  
      *             {@link StringElement}.
 1049  
      */
 1050  
     protected StringElement readStringElement() throws IOException {
 1051  439
         final long start = getBytesRead() - 1; // Token already read.
 1052  
 
 1053  439
         final String name = readCString();
 1054  439
         final String value = readString();
 1055  
 
 1056  438
         final long size = getBytesRead() - start;
 1057  
 
 1058  438
         return new StringElement(name, value, size);
 1059  
     }
 1060  
 
 1061  
     /**
 1062  
      * Reads a {@link SymbolElement} from the stream.
 1063  
      * 
 1064  
      * @return The {@link SymbolElement}.
 1065  
      * @throws IOException
 1066  
      *             On a failure to read the contents of the
 1067  
      *             {@link SymbolElement}.
 1068  
      */
 1069  
     protected SymbolElement readSymbolElement() throws IOException {
 1070  8
         final long start = getBytesRead() - 1; // Token already read.
 1071  
 
 1072  8
         final String name = readCString();
 1073  8
         final String symbol = readString();
 1074  
 
 1075  8
         final long size = getBytesRead() - start;
 1076  
 
 1077  8
         return new SymbolElement(name, symbol, size);
 1078  
     }
 1079  
 
 1080  
     /**
 1081  
      * Reads a {@link TimestampElement} from the stream.
 1082  
      * 
 1083  
      * @return The {@link TimestampElement}.
 1084  
      * @throws IOException
 1085  
      *             On a failure to read the contents of the
 1086  
      *             {@link TimestampElement}.
 1087  
      */
 1088  
     protected TimestampElement readTimestampElement() throws IOException {
 1089  12
         final long start = getBytesRead() - 1; // Token already read.
 1090  
 
 1091  12
         final String name = readCString();
 1092  12
         final long time = readLong();
 1093  
 
 1094  12
         final long size = getBytesRead() - start;
 1095  
 
 1096  12
         return new TimestampElement(name, time, size);
 1097  
     }
 1098  
 
 1099  
     /**
 1100  
      * Fetch the requested number of bytes from the underlying stream. Returns
 1101  
      * the number of bytes available in the buffer or the number of requested
 1102  
      * bytes, which ever is smaller.
 1103  
      * 
 1104  
      * @param size
 1105  
      *            The number of bytes to be read.
 1106  
      * @return The smaller of the number of bytes requested or the number of
 1107  
      *         bytes available in the buffer.
 1108  
      * @throws IOException
 1109  
      *             On a failure to read from the underlying stream.
 1110  
      */
 1111  
     private final int ensureFetched(final int size) throws IOException {
 1112  238619
         return fetch(size, true);
 1113  
     }
 1114  
 
 1115  
     /**
 1116  
      * Fetch the requested number of bytes from the underlying stream. Returns
 1117  
      * the number of bytes available in the buffer or the number of requested
 1118  
      * bytes, which ever is smaller.
 1119  
      * 
 1120  
      * @param size
 1121  
      *            The number of bytes to be read.
 1122  
      * @param forceRead
 1123  
      *            Determines if a read is forced to ensure the buffer contains
 1124  
      *            the number of bytes.
 1125  
      * @return The smaller of the number of bytes requested or the number of
 1126  
      *         bytes available in the buffer.
 1127  
      * @throws IOException
 1128  
      *             On a failure to read from the underlying stream.
 1129  
      */
 1130  
     private final int fetch(final int size, final boolean forceRead)
 1131  
             throws IOException {
 1132  
         // See if we need to read more data.
 1133  276092
         int available = availableInBuffer();
 1134  276093
         if (available < size) {
 1135  
             // Yes - we do.
 1136  
 
 1137  
             // Will the size fit in the existing buffer?
 1138  3951
             if (myBuffer.length < size) {
 1139  
                 // Nope - grow the buffer to the needed size.
 1140  10
                 final byte[] newBuffer = new byte[size];
 1141  
 
 1142  
                 // Copy the existing content into the new buffer.
 1143  10
                 System.arraycopy(myBuffer, myBufferOffset, newBuffer, 0,
 1144  
                         available);
 1145  10
                 myBuffer = newBuffer;
 1146  10
             }
 1147  3941
             else if (0 < available) {
 1148  
                 // Compact the buffer.
 1149  2
                 System.arraycopy(myBuffer, myBufferOffset, myBuffer, 0,
 1150  
                         available);
 1151  
             }
 1152  
 
 1153  
             // Reset the limit and offset...
 1154  3951
             myBytesRead += myBufferOffset;
 1155  3951
             myBufferOffset = 0;
 1156  3951
             myBufferLimit = available;
 1157  
 
 1158  
             // Now read as much as possible to fill the buffer.
 1159  
             int read;
 1160  
             do {
 1161  3951
                 read = myInput.read(myBuffer, myBufferLimit, myBuffer.length
 1162  
                         - myBufferLimit);
 1163  3788
                 if (0 < read) {
 1164  3688
                     available += read;
 1165  3688
                     myBufferLimit += read;
 1166  
                 }
 1167  
             }
 1168  3788
             while (forceRead && (0 <= read) && (available < size));
 1169  
 
 1170  3788
             return Math.min(size, available);
 1171  
         }
 1172  
 
 1173  272146
         return size;
 1174  
     }
 1175  
 
 1176  
     /**
 1177  
      * Reads the complete set of bytes from the stream or throws an
 1178  
      * {@link EOFException}.
 1179  
      * 
 1180  
      * @param buffer
 1181  
      *            The buffer into which the data is read.
 1182  
      * @param offset
 1183  
      *            The offset to start writing into the buffer.
 1184  
      * @param length
 1185  
      *            The number of bytes to write into the buffer.
 1186  
      * @exception EOFException
 1187  
      *                If the input stream reaches the end before reading all the
 1188  
      *                bytes.
 1189  
      * @exception IOException
 1190  
      *                On an error reading from the underlying stream.
 1191  
      * @see DataInput#readFully(byte[], int, int)
 1192  
      */
 1193  
     private void readFully(final byte[] buffer, final int offset,
 1194  
             final int length) throws EOFException, IOException {
 1195  
 
 1196  32020
         int read = Math.min(length, availableInBuffer());
 1197  32020
         System.arraycopy(myBuffer, myBufferOffset, buffer, offset, read);
 1198  32020
         myBufferOffset += read;
 1199  
 
 1200  
         // Read directly from the stream to avoid a copy.
 1201  32021
         while (read < length) {
 1202  1
             final int count = myInput
 1203  
                     .read(buffer, offset + read, length - read);
 1204  1
             if (count < 0) {
 1205  0
                 throw new EOFException();
 1206  
             }
 1207  1
             read += count;
 1208  
 
 1209  
             // Directly read bytes never hit the buffer.
 1210  1
             myBytesRead += read;
 1211  1
         }
 1212  32020
     }
 1213  
 }