View Javadoc
1   /*
2    * #%L
3    * BsonInputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.bson.io;
21  
22  import java.io.DataInput;
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.io.InputStream;
26  import java.io.StreamCorruptedException;
27  import java.nio.charset.Charset;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import com.allanbank.mongodb.bson.Document;
32  import com.allanbank.mongodb.bson.Element;
33  import com.allanbank.mongodb.bson.ElementType;
34  import com.allanbank.mongodb.bson.element.ArrayElement;
35  import com.allanbank.mongodb.bson.element.BinaryElement;
36  import com.allanbank.mongodb.bson.element.BooleanElement;
37  import com.allanbank.mongodb.bson.element.DocumentElement;
38  import com.allanbank.mongodb.bson.element.DoubleElement;
39  import com.allanbank.mongodb.bson.element.IntegerElement;
40  import com.allanbank.mongodb.bson.element.JavaScriptElement;
41  import com.allanbank.mongodb.bson.element.JavaScriptWithScopeElement;
42  import com.allanbank.mongodb.bson.element.LongElement;
43  import com.allanbank.mongodb.bson.element.MaxKeyElement;
44  import com.allanbank.mongodb.bson.element.MinKeyElement;
45  import com.allanbank.mongodb.bson.element.MongoTimestampElement;
46  import com.allanbank.mongodb.bson.element.NullElement;
47  import com.allanbank.mongodb.bson.element.ObjectId;
48  import com.allanbank.mongodb.bson.element.ObjectIdElement;
49  import com.allanbank.mongodb.bson.element.RegularExpressionElement;
50  import com.allanbank.mongodb.bson.element.StringElement;
51  import com.allanbank.mongodb.bson.element.SymbolElement;
52  import com.allanbank.mongodb.bson.element.TimestampElement;
53  import com.allanbank.mongodb.bson.element.UuidElement;
54  import com.allanbank.mongodb.bson.impl.RootDocument;
55  
56  /**
57   * {@link BsonInputStream} provides a class to read BSON documents based on the
58   * <a href="http://bsonspec.org/">BSON specification</a>.
59   * 
60   * @api.yes This class is part of the driver's API. Public and protected members
61   *          will be deprecated for at least 1 non-bugfix release (version
62   *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
63   *          removed or modified.
64   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
65   */
66  public class BsonInputStream extends InputStream {
67  
68      /** UTF-8 Character set for encoding strings. */
69      public final static Charset UTF8 = StringDecoder.UTF8;
70  
71      /** The buffered data. */
72      private byte[] myBuffer;
73  
74      /** The offset into the current buffer. */
75      private int myBufferLimit;
76  
77      /** The offset into the current buffer. */
78      private int myBufferOffset;
79  
80      /** Tracks the number of bytes that have been read by the stream. */
81      private long myBytesRead;
82  
83      /** The underlying input stream. */
84      private final InputStream myInput;
85  
86      /** The decoder for strings. */
87      private final StringDecoder myStringDecoder;
88  
89      /**
90       * Creates a BSON document reader.
91       * 
92       * @param input
93       *            the underlying stream to read from.
94       */
95      public BsonInputStream(final InputStream input) {
96          this(input, 8 * 1024); // 8K to start.
97      }
98  
99      /**
100      * Creates a BSON document reader.
101      * 
102      * @param input
103      *            the underlying stream to read from.
104      * @param expectedMaxDocumentSize
105      *            The expected maximum size for a document. If this guess is
106      *            wrong then there may be incremental allocations of the read
107      *            buffer.
108      */
109     public BsonInputStream(final InputStream input,
110             final int expectedMaxDocumentSize) {
111         this(input, expectedMaxDocumentSize, new StringDecoderCache());
112     }
113 
114     /**
115      * Creates a BSON document reader.
116      * 
117      * @param input
118      *            the underlying stream to read from.
119      * @param expectedMaxDocumentSize
120      *            The expected maximum size for a document. If this guess is
121      *            wrong then there may be incremental allocations of the read
122      *            buffer.
123      * @param cache
124      *            The cache to use for decoded strings.
125      */
126     public BsonInputStream(final InputStream input,
127             final int expectedMaxDocumentSize, final StringDecoderCache cache) {
128         myInput = input;
129         myBuffer = new byte[expectedMaxDocumentSize];
130         myBufferOffset = 0;
131         myBufferLimit = 0;
132         myBytesRead = 0;
133 
134         myStringDecoder = new StringDecoder(cache);
135     }
136 
137     /**
138      * Creates a BSON document reader.
139      * 
140      * @param input
141      *            the underlying stream to read from.
142      * @param cache
143      *            The cache to use for decoded strings.
144      */
145     public BsonInputStream(final InputStream input,
146             final StringDecoderCache cache) {
147         this(input, 8 * 1024, cache); // 8K to start.
148     }
149 
150     /**
151      * {@inheritDoc}
152      * <p>
153      * Overridden to return the number of bytes in the buffer and from the
154      * source stream.
155      * </p>
156      */
157     @Override
158     public int available() throws IOException {
159         return availableInBuffer() + myInput.available();
160     }
161 
162     /**
163      * {@inheritDoc}
164      * <p>
165      * Overridden to close the wrapped {@link InputStream}.
166      * </p>
167      */
168     @Override
169     public void close() throws IOException {
170         myInput.close();
171     }
172 
173     /**
174      * Returns the number of bytes that have been read by the stream.
175      * 
176      * @return The number of bytes that have been read from the stream.
177      */
178     public long getBytesRead() {
179         return myBytesRead + myBufferOffset;
180     }
181 
182     /**
183      * Returns the maximum number of strings that may have their encoded form
184      * cached.
185      * 
186      * @return The maximum number of strings that may have their encoded form
187      *         cached.
188      * @deprecated The cache {@link StringDecoderCache} should be controlled
189      *             directory. This method will be removed after the 2.1.0
190      *             release.
191      */
192     @Deprecated
193     public int getMaxCachedStringEntries() {
194         return myStringDecoder.getCache().getMaxCacheEntries();
195     }
196 
197     /**
198      * Returns the maximum length for a string that the stream is allowed to
199      * cache.
200      * 
201      * @return The maximum length for a string that the stream is allowed to
202      *         cache.
203      * @deprecated The cache {@link StringDecoderCache} should be controlled
204      *             directory. This method will be removed after the 2.1.0
205      *             release.
206      */
207     @Deprecated
208     public int getMaxCachedStringLength() {
209         return myStringDecoder.getCache().getMaxCacheLength();
210     }
211 
212     /**
213      * {@inheritDoc}
214      * <p>
215      * Overridden to throw an {@link UnsupportedOperationException}.
216      * </p>
217      */
218     @Override
219     public synchronized void mark(final int readlimit) {
220         throw new UnsupportedOperationException("Mark not supported.");
221     }
222 
223     /**
224      * {@inheritDoc}
225      * <p>
226      * Overridden to return false.
227      * </p>
228      */
229     @Override
230     public boolean markSupported() {
231         return false;
232     }
233 
234     /**
235      * Tries to prefetch the requested number of bytes from the underlying
236      * stream.
237      * 
238      * @param size
239      *            The number of bytes to try and read.
240      * @throws IOException
241      *             On a failure to read from the underlying stream.
242      */
243     public final void prefetch(final int size) throws IOException {
244         fetch(size, false);
245     }
246 
247     /**
248      * {@inheritDoc}
249      * <p>
250      * Overridden to track the bytes that have been read.
251      * </p>
252      */
253     @Override
254     public int read() throws IOException {
255         if (ensureFetched(1) != 1) {
256             return -1; // EOF.
257         }
258 
259         final int read = (myBuffer[myBufferOffset] & 0xFF);
260 
261         myBufferOffset += 1;
262 
263         return read;
264     }
265 
266     /**
267      * {@inheritDoc}
268      * <p>
269      * Overridden to track the bytes that have been read.
270      * </p>
271      */
272     @Override
273     public int read(final byte b[]) throws IOException {
274         return read(b, 0, b.length);
275     }
276 
277     /**
278      * {@inheritDoc}
279      * <p>
280      * Overridden to track the bytes that have been read.
281      * </p>
282      */
283     @Override
284     public int read(final byte b[], final int off, final int len)
285             throws IOException {
286         final int read = ensureFetched(len - off);
287 
288         System.arraycopy(myBuffer, myBufferOffset, b, off, read);
289 
290         myBufferOffset += read;
291 
292         return read;
293     }
294 
295     /**
296      * Reads a "cstring" value from the stream:<code>
297      * <pre>
298      * cstring 	::= 	(byte*) "\x00"
299      * </pre>
300      * </code>
301      * <p>
302      * <blockquote> CString - Zero or more modified UTF-8 encoded characters
303      * followed by '\x00'. The (byte*) MUST NOT contain '\x00', hence it is not
304      * full UTF-8. </blockquote>
305      * </p>
306      * 
307      * @return The string value.
308      * @throws EOFException
309      *             On insufficient data for the integer.
310      * @throws IOException
311      *             On a failure reading the integer.
312      */
313     public String readCString() throws EOFException, IOException {
314 
315         while (true) {
316             for (int i = myBufferOffset; i < myBufferLimit; ++i) {
317                 final byte b = myBuffer[i];
318                 if (b == 0) {
319                     // Found the end.
320                     final int offset = myBufferOffset;
321                     final int length = (1 + i) - offset;
322 
323                     // Advance the buffer.
324                     myBufferOffset = i + 1;
325 
326                     return myStringDecoder.decode(myBuffer, offset, length);
327                 }
328             }
329 
330             // Need more data.
331             ensureFetched(availableInBuffer() + 1);
332         }
333     }
334 
335     /**
336      * Reads a BSON document element: <code>
337      * <pre>
338      * document 	::= 	int32 e_list "\x00"
339      * </pre>
340      * </code>
341      * 
342      * @return The Document.
343      * @throws EOFException
344      *             On insufficient data for the document.
345      * @throws IOException
346      *             On a failure reading the document.
347      */
348     public Document readDocument() throws IOException {
349 
350         // The total length of the document.
351         final int size = readInt();
352 
353         prefetch(size - 4);
354 
355         return new RootDocument(readElements(), false, size);
356     }
357 
358     /**
359      * Reads the complete set of bytes from the stream or throws an
360      * {@link EOFException}.
361      * 
362      * @param buffer
363      *            The buffer into which the data is read.
364      * @exception EOFException
365      *                If the input stream reaches the end before reading all the
366      *                bytes.
367      * @exception IOException
368      *                On an error reading from the underlying stream.
369      */
370     public void readFully(final byte[] buffer) throws EOFException, IOException {
371         readFully(buffer, 0, buffer.length);
372     }
373 
374     /**
375      * Reads a little-endian 4 byte signed integer from the stream.
376      * 
377      * @return The integer value.
378      * @throws EOFException
379      *             On insufficient data for the integer.
380      * @throws IOException
381      *             On a failure reading the integer.
382      */
383     public int readInt() throws EOFException, IOException {
384         if (ensureFetched(4) != 4) {
385             throw new EOFException();
386         }
387 
388         // Little endian.
389         int result = (myBuffer[myBufferOffset] & 0xFF);
390         result += (myBuffer[myBufferOffset + 1] & 0xFF) << 8;
391         result += (myBuffer[myBufferOffset + 2] & 0xFF) << 16;
392         result += (myBuffer[myBufferOffset + 3] & 0xFF) << 24;
393 
394         myBufferOffset += 4;
395 
396         return result;
397     }
398 
399     /**
400      * Reads a little-endian 8 byte signed integer from the stream.
401      * 
402      * @return The long value.
403      * @throws EOFException
404      *             On insufficient data for the long.
405      * @throws IOException
406      *             On a failure reading the long.
407      */
408     public long readLong() throws EOFException, IOException {
409         if (ensureFetched(8) != 8) {
410             throw new EOFException();
411         }
412 
413         // Little endian.
414         long result = (myBuffer[myBufferOffset] & 0xFFL);
415         result += (myBuffer[myBufferOffset + 1] & 0xFFL) << 8;
416         result += (myBuffer[myBufferOffset + 2] & 0xFFL) << 16;
417         result += (myBuffer[myBufferOffset + 3] & 0xFFL) << 24;
418         result += (myBuffer[myBufferOffset + 4] & 0xFFL) << 32;
419         result += (myBuffer[myBufferOffset + 5] & 0xFFL) << 40;
420         result += (myBuffer[myBufferOffset + 6] & 0xFFL) << 48;
421         result += (myBuffer[myBufferOffset + 7] & 0xFFL) << 56;
422 
423         myBufferOffset += 8;
424 
425         return result;
426     }
427 
428     /**
429      * {@inheritDoc}
430      * <p>
431      * Overridden to throw an {@link UnsupportedOperationException}.
432      * </p>
433      */
434     @Override
435     public synchronized void reset() throws UnsupportedOperationException {
436         throw new UnsupportedOperationException("Mark not supported.");
437     }
438 
439     /**
440      * Sets the value of maximum number of strings that may have their encoded
441      * form cached.
442      * 
443      * @param maxCacheEntries
444      *            The new value for the maximum number of strings that may have
445      *            their encoded form cached.
446      * @deprecated The cache {@link StringDecoderCache} should be controlled
447      *             directory. This method will be removed after the 2.1.0
448      *             release.
449      */
450     @Deprecated
451     public void setMaxCachedStringEntries(final int maxCacheEntries) {
452         myStringDecoder.getCache().setMaxCacheEntries(maxCacheEntries);
453     }
454 
455     /**
456      * Sets the value of length for a string that the stream is allowed to cache
457      * to the new value. This can be used to stop a single long string from
458      * pushing useful values out of the cache.
459      * 
460      * @param maxlength
461      *            The new value for the length for a string that the encoder is
462      *            allowed to cache.
463      * @deprecated The cache {@link StringDecoderCache} should be controlled
464      *             directory. This method will be removed after the 2.1.0
465      *             release.
466      */
467     @Deprecated
468     public void setMaxCachedStringLength(final int maxlength) {
469         myStringDecoder.getCache().setMaxCacheLength(maxlength);
470 
471     }
472 
473     /**
474      * {@inheritDoc}
475      * <p>
476      * Overridden to track the bytes that have been skipped.
477      * </p>
478      */
479     @Override
480     public long skip(final long n) throws IOException {
481 
482         long skipped = Math.min(n, availableInBuffer());
483         myBufferOffset += skipped;
484 
485         if (skipped < n) {
486             // Exhausted the buffer - skip in the source.
487             final long streamSkipped = myInput.skip(n - skipped);
488             myBytesRead += streamSkipped;
489             skipped += streamSkipped;
490         }
491 
492         return skipped;
493     }
494 
495     /**
496      * Returns the number of bytes available in the buffer.
497      * 
498      * @return The number of bytes available in the buffer.
499      */
500     protected final int availableInBuffer() {
501         return myBufferLimit - myBufferOffset;
502     }
503 
504     /**
505      * Reads a BSON Array element: <code>
506      * <pre>
507      * "\x04" e_name document
508      * </pre>
509      * </code>
510      * 
511      * @return The {@link ArrayElement}.
512      * @throws EOFException
513      *             On insufficient data for the document.
514      * @throws IOException
515      *             On a failure reading the document.
516      */
517     protected ArrayElement readArrayElement() throws IOException {
518 
519         final long start = getBytesRead() - 1; // Token already read.
520 
521         final String name = readCString();
522         final int fetch = readInt(); // The total length of the array elements.
523         prefetch(fetch - 4);
524         final List<Element> elements = readElements();
525         final long size = getBytesRead() - start;
526 
527         return new ArrayElement(name, elements, size);
528     }
529 
530     /**
531      * Reads a {@link BinaryElement}'s contents: <code><pre>
532      * binary 	::= 	int32 subtype (byte*)
533      * subtype 	::= 	"\x00" 	Binary / Generic
534      * 	           | 	"\x01" 	Function
535      * 	           | 	"\x02" 	Binary (Old)
536      * 	           | 	"\x03" 	UUID
537      * 	           | 	"\x05" 	MD5
538      * 	           | 	"\x80" 	User defined
539      * </pre>
540      * </code>
541      * 
542      * @return The {@link BinaryElement}.
543      * @throws IOException
544      *             On a failure reading the binary data.
545      */
546     protected BinaryElement readBinaryElement() throws IOException {
547 
548         final long start = getBytesRead() - 1; // Token already read.
549 
550         final String name = readCString();
551         int length = readInt();
552 
553         final int subType = read();
554         if (subType < 0) {
555             throw new EOFException();
556         }
557 
558         // Old binary handling.
559         if (subType == 2) {
560             final int anotherLength = readInt();
561 
562             assert (anotherLength == (length - 4)) : "Binary Element Subtye 2 "
563                     + "length should be outer length - 4.";
564 
565             length -= 4;
566         }
567         else if ((subType == UuidElement.LEGACY_UUID_SUBTTYPE)
568                 || (subType == UuidElement.UUID_SUBTTYPE)) {
569 
570             final byte[] binary = new byte[length];
571             readFully(binary);
572 
573             final long size = getBytesRead() - start;
574             try {
575                 return new UuidElement(name, (byte) subType, binary, size);
576             }
577             catch (final IllegalArgumentException iae) {
578                 // Just use the vanilla BinaryElement.
579                 return new BinaryElement(name, (byte) subType, binary, size);
580             }
581         }
582 
583         final long size = getBytesRead() - start;
584         return new BinaryElement(name, (byte) subType, this, length, size
585                 + length);
586     }
587 
588     /**
589      * Reads a {@link BooleanElement} from the stream.
590      * 
591      * @return The {@link BooleanElement}.
592      * @throws IOException
593      *             On a failure to read the contents of the
594      *             {@link BooleanElement}.
595      */
596     protected BooleanElement readBooleanElement() throws IOException {
597         final long start = getBytesRead() - 1; // Token already read.
598 
599         final String name = readCString();
600         final boolean value = (read() == 1);
601 
602         final long size = getBytesRead() - start;
603 
604         return new BooleanElement(name, value, size);
605     }
606 
607     /**
608      * Reads a {@code DBPointerElement} from the stream.
609      * 
610      * @return The {@code DBPointerElement}.
611      * @throws IOException
612      *             On a failure to read the contents of the
613      *             {@code DBPointerElement}.
614      * @deprecated Per the BSON specification.
615      */
616     @Deprecated
617     protected Element readDBPointerElement() throws IOException {
618         final long start = getBytesRead() - 1; // Token already read.
619 
620         final String name = readCString();
621         final String dbDotCollection = readString();
622         final int timestamp = EndianUtils.swap(readInt());
623         final long machineId = EndianUtils.swap(readLong());
624 
625         final long size = getBytesRead() - start;
626 
627         String db = dbDotCollection;
628         String collection = "";
629         final int firstDot = dbDotCollection.indexOf('.');
630         if (0 <= firstDot) {
631             db = dbDotCollection.substring(0, firstDot);
632             collection = dbDotCollection.substring(firstDot + 1);
633         }
634         return new com.allanbank.mongodb.bson.element.DBPointerElement(name,
635                 db, collection, new ObjectId(timestamp, machineId), size);
636     }
637 
638     /**
639      * Reads a BSON Subdocument element: <code>
640      * <pre>
641      * "\x03" e_name document
642      * </pre>
643      * </code>
644      * 
645      * @return The {@link ArrayElement}.
646      * @throws IOException
647      *             On a failure reading the document.
648      */
649     protected DocumentElement readDocumentElement() throws IOException {
650         final long start = getBytesRead() - 1; // Token already read.
651 
652         final String name = readCString();
653         final int fetch = readInt(); // The total length of the sub-document
654         // elements.
655         prefetch(fetch - 4);
656         final List<Element> elements = readElements();
657 
658         final long size = getBytesRead() - start;
659 
660         return new DocumentElement(name, elements, true, size);
661     }
662 
663     /**
664      * Reads a {@link DoubleElement} from the stream.
665      * 
666      * @return The {@link DoubleElement}.
667      * @throws IOException
668      *             On a failure to read the contents of the
669      *             {@link DoubleElement}.
670      */
671     protected DoubleElement readDoubleElement() throws IOException {
672         final long start = getBytesRead() - 1; // Token already read.
673 
674         final String name = readCString();
675         final double value = Double.longBitsToDouble(readLong());
676 
677         final long size = getBytesRead() - start;
678 
679         return new DoubleElement(name, value, size);
680     }
681 
682     /**
683      * Reads the element:<code>
684      * <pre>
685      * element 	::= 	"\x01" e_name double 			Floating point
686      * 	           | 	"\x02" e_name string 			UTF-8 string
687      * 	           | 	"\x03" e_name document 			Embedded document
688      * 	           | 	"\x04" e_name document 			Array
689      * 	           | 	"\x05" e_name binary 			Binary data
690      * 	           | 	"\x06" e_name 					Undefined — Deprecated
691      * 	           | 	"\x07" e_name (byte*12) 		ObjectId
692      * 	           | 	"\x08" e_name "\x00" 			Boolean "false"
693      * 	           | 	"\x08" e_name "\x01" 			Boolean "true"
694      * 	           | 	"\x09" e_name int64 			UTC datetime
695      * 	           | 	"\x0A" e_name 					Null value
696      * 	           | 	"\x0B" e_name cstring cstring 	Regular expression
697      * 	           | 	"\x0C" e_name string (byte*12) 	DBPointer — Deprecated
698      * 	           | 	"\x0D" e_name string 			JavaScript code
699      * 	           | 	"\x0E" e_name string 			Symbol
700      * 	           | 	"\x0F" e_name code_w_s 			JavaScript code w/ scope
701      * 	           | 	"\x10" e_name int32 			32-bit Integer
702      * 	           | 	"\x11" e_name int64 			Timestamp
703      * 	           | 	"\x12" e_name int64 			64-bit integer
704      * 	           | 	"\xFF" e_name 					Min key
705      * 	           | 	"\x7F" e_name 					Max key
706      * </pre>
707      * </code>
708      * 
709      * @param token
710      *            The element's token.
711      * @return The Element.
712      * @throws EOFException
713      *             On insufficient data for the element.
714      * @throws IOException
715      *             On a failure reading the element.
716      */
717     @SuppressWarnings("deprecation")
718     protected Element readElement(final byte token) throws EOFException,
719             IOException {
720         final ElementType type = ElementType.valueOf(token);
721         if (type == null) {
722             throw new StreamCorruptedException("Unknown element type: 0x"
723                     + Integer.toHexString(token & 0xFF) + ".");
724         }
725         switch (type) {
726         case ARRAY: {
727             return readArrayElement();
728         }
729         case BINARY: {
730             return readBinaryElement();
731         }
732         case DB_POINTER: {
733             return readDBPointerElement();
734         }
735         case DOCUMENT: {
736             return readDocumentElement();
737         }
738         case DOUBLE: {
739             return readDoubleElement();
740         }
741         case BOOLEAN: {
742             return readBooleanElement();
743         }
744         case INTEGER: {
745             return readIntegerElement();
746         }
747         case JAVA_SCRIPT: {
748             return readJavaScriptElement();
749         }
750         case JAVA_SCRIPT_WITH_SCOPE: {
751             return readJavaScriptWithScopeElement();
752         }
753         case LONG: {
754             return readLongElement();
755         }
756         case MAX_KEY: {
757             return readMaxKeyElement();
758         }
759         case MIN_KEY: {
760             return readMinKeyElement();
761         }
762         case MONGO_TIMESTAMP: {
763             return readMongoTimestampElement();
764         }
765         case NULL: {
766             return readNullElement();
767         }
768         case OBJECT_ID: {
769             return readObjectIdElement();
770         }
771         case REGEX: {
772             return readRegularExpressionElement();
773         }
774         case STRING: {
775             return readStringElement();
776         }
777         case SYMBOL: {
778             return readSymbolElement();
779         }
780         case UTC_TIMESTAMP: {
781             return readTimestampElement();
782         }
783         }
784 
785         throw new StreamCorruptedException("Unknown element type: "
786                 + type.name() + ".");
787     }
788 
789     /**
790      * Reads a BSON element list (e_list): <code>
791      * <pre>
792      * e_list 	::= 	element e_list | ""
793      * </pre>
794      * </code>
795      * 
796      * @return The list of elements.
797      * @throws EOFException
798      *             On insufficient data for the elements.
799      * @throws IOException
800      *             On a failure reading the elements.
801      */
802     protected List<Element> readElements() throws EOFException, IOException {
803         final List<Element> elements = new ArrayList<Element>();
804         int elementToken = read();
805         while (elementToken > 0) {
806             elements.add(readElement((byte) elementToken));
807 
808             elementToken = read();
809         }
810         if (elementToken < 0) {
811             throw new EOFException();
812         }
813         return elements;
814     }
815 
816     /**
817      * Reads a {@link IntegerElement} from the stream.
818      * 
819      * @return The {@link IntegerElement}.
820      * @throws IOException
821      *             On a failure to read the contents of the
822      *             {@link IntegerElement}.
823      */
824     protected IntegerElement readIntegerElement() throws IOException {
825         final long start = getBytesRead() - 1; // Token already read.
826 
827         final String name = readCString();
828         final int value = readInt();
829 
830         final long size = getBytesRead() - start;
831 
832         return new IntegerElement(name, value, size);
833     }
834 
835     /**
836      * Reads a {@link JavaScriptElement} from the stream.
837      * 
838      * @return The {@link JavaScriptElement}.
839      * @throws IOException
840      *             On a failure to read the contents of the
841      *             {@link JavaScriptElement}.
842      */
843     protected JavaScriptElement readJavaScriptElement() throws IOException {
844         final long start = getBytesRead() - 1; // Token already read.
845 
846         final String name = readCString();
847         final String javascript = readString();
848 
849         final long size = getBytesRead() - start;
850 
851         return new JavaScriptElement(name, javascript, size);
852     }
853 
854     /**
855      * Reads a {@link JavaScriptWithScopeElement} from the stream.
856      * 
857      * @return The {@link JavaScriptWithScopeElement}.
858      * @throws IOException
859      *             On a failure to read the contents of the
860      *             {@link JavaScriptWithScopeElement}.
861      */
862     protected JavaScriptWithScopeElement readJavaScriptWithScopeElement()
863             throws IOException {
864         final long start = getBytesRead() - 1; // Token already read.
865 
866         final String name = readCString();
867         readInt(); // Total length - not used.
868         final String javascript = readString();
869         final Document scope = readDocument();
870 
871         final long size = getBytesRead() - start;
872 
873         return new JavaScriptWithScopeElement(name, javascript, scope, size);
874     }
875 
876     /**
877      * Reads a {@link LongElement} from the stream.
878      * 
879      * @return The {@link LongElement}.
880      * @throws IOException
881      *             On a failure to read the contents of the {@link LongElement}.
882      */
883     protected LongElement readLongElement() throws IOException {
884         final long start = getBytesRead() - 1; // Token already read.
885 
886         final String name = readCString();
887         final long value = readLong();
888 
889         final long size = getBytesRead() - start;
890 
891         return new LongElement(name, value, size);
892     }
893 
894     /**
895      * Reads a {@link MaxKeyElement} from the stream.
896      * 
897      * @return The {@link MaxKeyElement}.
898      * @throws IOException
899      *             On a failure to read the contents of the
900      *             {@link MaxKeyElement}.
901      */
902     protected MaxKeyElement readMaxKeyElement() throws IOException {
903         final long start = getBytesRead() - 1; // Token already read.
904 
905         final String name = readCString();
906 
907         final long size = getBytesRead() - start;
908 
909         return new MaxKeyElement(name, size);
910     }
911 
912     /**
913      * Reads a {@link MinKeyElement} from the stream.
914      * 
915      * @return The {@link MinKeyElement}.
916      * @throws IOException
917      *             On a failure to read the contents of the
918      *             {@link MinKeyElement}.
919      */
920     protected MinKeyElement readMinKeyElement() throws IOException {
921         final long start = getBytesRead() - 1; // Token already read.
922 
923         final String name = readCString();
924 
925         final long size = getBytesRead() - start;
926 
927         return new MinKeyElement(name, size);
928     }
929 
930     /**
931      * Reads a {@link MongoTimestampElement} from the stream.
932      * 
933      * @return The {@link MongoTimestampElement}.
934      * @throws IOException
935      *             On a failure to read the contents of the
936      *             {@link MongoTimestampElement}.
937      */
938     protected MongoTimestampElement readMongoTimestampElement()
939             throws IOException {
940         final long start = getBytesRead() - 1; // Token already read.
941 
942         final String name = readCString();
943         final long timestamp = readLong();
944 
945         final long size = getBytesRead() - start;
946 
947         return new MongoTimestampElement(name, timestamp, size);
948     }
949 
950     /**
951      * Reads a {@link NullElement} from the stream.
952      * 
953      * @return The {@link NullElement}.
954      * @throws IOException
955      *             On a failure to read the contents of the {@link NullElement}.
956      */
957     protected NullElement readNullElement() throws IOException {
958         final long start = getBytesRead() - 1; // Token already read.
959 
960         final String name = readCString();
961 
962         final long size = getBytesRead() - start;
963 
964         return new NullElement(name, size);
965     }
966 
967     /**
968      * Reads a {@link ObjectIdElement} from the stream.
969      * 
970      * @return The {@link ObjectIdElement}.
971      * @throws IOException
972      *             On a failure to read the contents of the
973      *             {@link ObjectIdElement}.
974      */
975     protected ObjectIdElement readObjectIdElement() throws IOException {
976         final long start = getBytesRead() - 1; // Token already read.
977 
978         final String name = readCString();
979         final int timestamp = EndianUtils.swap(readInt());
980         final long machineId = EndianUtils.swap(readLong());
981 
982         final long size = getBytesRead() - start;
983 
984         return new ObjectIdElement(name, new ObjectId(timestamp, machineId),
985                 size);
986     }
987 
988     /**
989      * Reads a {@link RegularExpressionElement} from the stream.
990      * 
991      * @return The {@link RegularExpressionElement}.
992      * @throws IOException
993      *             On a failure to read the contents of the
994      *             {@link RegularExpressionElement}.
995      */
996     protected RegularExpressionElement readRegularExpressionElement()
997             throws IOException {
998         final long start = getBytesRead() - 1; // Token already read.
999 
1000         final String name = readCString();
1001         final String pattern = readCString();
1002         final String options = readCString();
1003 
1004         final long size = getBytesRead() - start;
1005 
1006         return new RegularExpressionElement(name, pattern, options, size);
1007     }
1008 
1009     /**
1010      * Reads a "string" value from the stream:<code>
1011      * <pre>
1012      * string 	::= 	int32 (byte*) "\x00"
1013      * </pre>
1014      * </code>
1015      * <p>
1016      * <blockquote>String - The int32 is the number bytes in the (byte*) + 1
1017      * (for the trailing '\x00'). The (byte*) is zero or more UTF-8 encoded
1018      * characters. </blockquote>
1019      * </p>
1020      * 
1021      * @return The string value.
1022      * @throws EOFException
1023      *             On insufficient data for the integer.
1024      * @throws IOException
1025      *             On a failure reading the integer.
1026      */
1027     protected String readString() throws EOFException, IOException {
1028         final int length = readInt();
1029 
1030         if (ensureFetched(length) != length) {
1031             throw new EOFException();
1032         }
1033 
1034         final int offset = myBufferOffset;
1035 
1036         // Advance the buffer.
1037         myBufferOffset += length;
1038 
1039         return myStringDecoder.decode(myBuffer, offset, length);
1040     }
1041 
1042     /**
1043      * Reads a {@link StringElement} from the stream.
1044      * 
1045      * @return The {@link StringElement}.
1046      * @throws IOException
1047      *             On a failure to read the contents of the
1048      *             {@link StringElement}.
1049      */
1050     protected StringElement readStringElement() throws IOException {
1051         final long start = getBytesRead() - 1; // Token already read.
1052 
1053         final String name = readCString();
1054         final String value = readString();
1055 
1056         final long size = getBytesRead() - start;
1057 
1058         return new StringElement(name, value, size);
1059     }
1060 
1061     /**
1062      * Reads a {@link SymbolElement} from the stream.
1063      * 
1064      * @return The {@link SymbolElement}.
1065      * @throws IOException
1066      *             On a failure to read the contents of the
1067      *             {@link SymbolElement}.
1068      */
1069     protected SymbolElement readSymbolElement() throws IOException {
1070         final long start = getBytesRead() - 1; // Token already read.
1071 
1072         final String name = readCString();
1073         final String symbol = readString();
1074 
1075         final long size = getBytesRead() - start;
1076 
1077         return new SymbolElement(name, symbol, size);
1078     }
1079 
1080     /**
1081      * Reads a {@link TimestampElement} from the stream.
1082      * 
1083      * @return The {@link TimestampElement}.
1084      * @throws IOException
1085      *             On a failure to read the contents of the
1086      *             {@link TimestampElement}.
1087      */
1088     protected TimestampElement readTimestampElement() throws IOException {
1089         final long start = getBytesRead() - 1; // Token already read.
1090 
1091         final String name = readCString();
1092         final long time = readLong();
1093 
1094         final long size = getBytesRead() - start;
1095 
1096         return new TimestampElement(name, time, size);
1097     }
1098 
1099     /**
1100      * Fetch the requested number of bytes from the underlying stream. Returns
1101      * the number of bytes available in the buffer or the number of requested
1102      * bytes, which ever is smaller.
1103      * 
1104      * @param size
1105      *            The number of bytes to be read.
1106      * @return The smaller of the number of bytes requested or the number of
1107      *         bytes available in the buffer.
1108      * @throws IOException
1109      *             On a failure to read from the underlying stream.
1110      */
1111     private final int ensureFetched(final int size) throws IOException {
1112         return fetch(size, true);
1113     }
1114 
1115     /**
1116      * Fetch the requested number of bytes from the underlying stream. Returns
1117      * the number of bytes available in the buffer or the number of requested
1118      * bytes, which ever is smaller.
1119      * 
1120      * @param size
1121      *            The number of bytes to be read.
1122      * @param forceRead
1123      *            Determines if a read is forced to ensure the buffer contains
1124      *            the number of bytes.
1125      * @return The smaller of the number of bytes requested or the number of
1126      *         bytes available in the buffer.
1127      * @throws IOException
1128      *             On a failure to read from the underlying stream.
1129      */
1130     private final int fetch(final int size, final boolean forceRead)
1131             throws IOException {
1132         // See if we need to read more data.
1133         int available = availableInBuffer();
1134         if (available < size) {
1135             // Yes - we do.
1136 
1137             // Will the size fit in the existing buffer?
1138             if (myBuffer.length < size) {
1139                 // Nope - grow the buffer to the needed size.
1140                 final byte[] newBuffer = new byte[size];
1141 
1142                 // Copy the existing content into the new buffer.
1143                 System.arraycopy(myBuffer, myBufferOffset, newBuffer, 0,
1144                         available);
1145                 myBuffer = newBuffer;
1146             }
1147             else if (0 < available) {
1148                 // Compact the buffer.
1149                 System.arraycopy(myBuffer, myBufferOffset, myBuffer, 0,
1150                         available);
1151             }
1152 
1153             // Reset the limit and offset...
1154             myBytesRead += myBufferOffset;
1155             myBufferOffset = 0;
1156             myBufferLimit = available;
1157 
1158             // Now read as much as possible to fill the buffer.
1159             int read;
1160             do {
1161                 read = myInput.read(myBuffer, myBufferLimit, myBuffer.length
1162                         - myBufferLimit);
1163                 if (0 < read) {
1164                     available += read;
1165                     myBufferLimit += read;
1166                 }
1167             }
1168             while (forceRead && (0 <= read) && (available < size));
1169 
1170             return Math.min(size, available);
1171         }
1172 
1173         return size;
1174     }
1175 
1176     /**
1177      * Reads the complete set of bytes from the stream or throws an
1178      * {@link EOFException}.
1179      * 
1180      * @param buffer
1181      *            The buffer into which the data is read.
1182      * @param offset
1183      *            The offset to start writing into the buffer.
1184      * @param length
1185      *            The number of bytes to write into the buffer.
1186      * @exception EOFException
1187      *                If the input stream reaches the end before reading all the
1188      *                bytes.
1189      * @exception IOException
1190      *                On an error reading from the underlying stream.
1191      * @see DataInput#readFully(byte[], int, int)
1192      */
1193     private void readFully(final byte[] buffer, final int offset,
1194             final int length) throws EOFException, IOException {
1195 
1196         int read = Math.min(length, availableInBuffer());
1197         System.arraycopy(myBuffer, myBufferOffset, buffer, offset, read);
1198         myBufferOffset += read;
1199 
1200         // Read directly from the stream to avoid a copy.
1201         while (read < length) {
1202             final int count = myInput
1203                     .read(buffer, offset + read, length - read);
1204             if (count < 0) {
1205                 throw new EOFException();
1206             }
1207             read += count;
1208 
1209             // Directly read bytes never hit the buffer.
1210             myBytesRead += read;
1211         }
1212     }
1213 }