1 /*
2 * #%L
3 * BsonInputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.bson.io;
21
22 import java.io.DataInput;
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.StreamCorruptedException;
27 import java.nio.charset.Charset;
28 import java.util.ArrayList;
29 import java.util.List;
30
31 import com.allanbank.mongodb.bson.Document;
32 import com.allanbank.mongodb.bson.Element;
33 import com.allanbank.mongodb.bson.ElementType;
34 import com.allanbank.mongodb.bson.element.ArrayElement;
35 import com.allanbank.mongodb.bson.element.BinaryElement;
36 import com.allanbank.mongodb.bson.element.BooleanElement;
37 import com.allanbank.mongodb.bson.element.DocumentElement;
38 import com.allanbank.mongodb.bson.element.DoubleElement;
39 import com.allanbank.mongodb.bson.element.IntegerElement;
40 import com.allanbank.mongodb.bson.element.JavaScriptElement;
41 import com.allanbank.mongodb.bson.element.JavaScriptWithScopeElement;
42 import com.allanbank.mongodb.bson.element.LongElement;
43 import com.allanbank.mongodb.bson.element.MaxKeyElement;
44 import com.allanbank.mongodb.bson.element.MinKeyElement;
45 import com.allanbank.mongodb.bson.element.MongoTimestampElement;
46 import com.allanbank.mongodb.bson.element.NullElement;
47 import com.allanbank.mongodb.bson.element.ObjectId;
48 import com.allanbank.mongodb.bson.element.ObjectIdElement;
49 import com.allanbank.mongodb.bson.element.RegularExpressionElement;
50 import com.allanbank.mongodb.bson.element.StringElement;
51 import com.allanbank.mongodb.bson.element.SymbolElement;
52 import com.allanbank.mongodb.bson.element.TimestampElement;
53 import com.allanbank.mongodb.bson.element.UuidElement;
54 import com.allanbank.mongodb.bson.impl.RootDocument;
55
56 /**
57 * {@link BsonInputStream} provides a class to read BSON documents based on the
58 * <a href="http://bsonspec.org/">BSON specification</a>.
59 *
60 * @api.yes This class is part of the driver's API. Public and protected members
61 * will be deprecated for at least 1 non-bugfix release (version
62 * numbers are <major>.<minor>.<bugfix>) before being
63 * removed or modified.
64 * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
65 */
66 public class BsonInputStream extends InputStream {
67
68 /** UTF-8 Character set for encoding strings. */
69 public final static Charset UTF8 = StringDecoder.UTF8;
70
71 /** The buffered data. */
72 private byte[] myBuffer;
73
74 /** The offset into the current buffer. */
75 private int myBufferLimit;
76
77 /** The offset into the current buffer. */
78 private int myBufferOffset;
79
80 /** Tracks the number of bytes that have been read by the stream. */
81 private long myBytesRead;
82
83 /** The underlying input stream. */
84 private final InputStream myInput;
85
86 /** The decoder for strings. */
87 private final StringDecoder myStringDecoder;
88
89 /**
90 * Creates a BSON document reader.
91 *
92 * @param input
93 * the underlying stream to read from.
94 */
95 public BsonInputStream(final InputStream input) {
96 this(input, 8 * 1024); // 8K to start.
97 }
98
99 /**
100 * Creates a BSON document reader.
101 *
102 * @param input
103 * the underlying stream to read from.
104 * @param expectedMaxDocumentSize
105 * The expected maximum size for a document. If this guess is
106 * wrong then there may be incremental allocations of the read
107 * buffer.
108 */
109 public BsonInputStream(final InputStream input,
110 final int expectedMaxDocumentSize) {
111 this(input, expectedMaxDocumentSize, new StringDecoderCache());
112 }
113
114 /**
115 * Creates a BSON document reader.
116 *
117 * @param input
118 * the underlying stream to read from.
119 * @param expectedMaxDocumentSize
120 * The expected maximum size for a document. If this guess is
121 * wrong then there may be incremental allocations of the read
122 * buffer.
123 * @param cache
124 * The cache to use for decoded strings.
125 */
126 public BsonInputStream(final InputStream input,
127 final int expectedMaxDocumentSize, final StringDecoderCache cache) {
128 myInput = input;
129 myBuffer = new byte[expectedMaxDocumentSize];
130 myBufferOffset = 0;
131 myBufferLimit = 0;
132 myBytesRead = 0;
133
134 myStringDecoder = new StringDecoder(cache);
135 }
136
137 /**
138 * Creates a BSON document reader.
139 *
140 * @param input
141 * the underlying stream to read from.
142 * @param cache
143 * The cache to use for decoded strings.
144 */
145 public BsonInputStream(final InputStream input,
146 final StringDecoderCache cache) {
147 this(input, 8 * 1024, cache); // 8K to start.
148 }
149
150 /**
151 * {@inheritDoc}
152 * <p>
153 * Overridden to return the number of bytes in the buffer and from the
154 * source stream.
155 * </p>
156 */
157 @Override
158 public int available() throws IOException {
159 return availableInBuffer() + myInput.available();
160 }
161
162 /**
163 * {@inheritDoc}
164 * <p>
165 * Overridden to close the wrapped {@link InputStream}.
166 * </p>
167 */
168 @Override
169 public void close() throws IOException {
170 myInput.close();
171 }
172
173 /**
174 * Returns the number of bytes that have been read by the stream.
175 *
176 * @return The number of bytes that have been read from the stream.
177 */
178 public long getBytesRead() {
179 return myBytesRead + myBufferOffset;
180 }
181
182 /**
183 * Returns the maximum number of strings that may have their encoded form
184 * cached.
185 *
186 * @return The maximum number of strings that may have their encoded form
187 * cached.
188 * @deprecated The cache {@link StringDecoderCache} should be controlled
189 * directory. This method will be removed after the 2.1.0
190 * release.
191 */
192 @Deprecated
193 public int getMaxCachedStringEntries() {
194 return myStringDecoder.getCache().getMaxCacheEntries();
195 }
196
197 /**
198 * Returns the maximum length for a string that the stream is allowed to
199 * cache.
200 *
201 * @return The maximum length for a string that the stream is allowed to
202 * cache.
203 * @deprecated The cache {@link StringDecoderCache} should be controlled
204 * directory. This method will be removed after the 2.1.0
205 * release.
206 */
207 @Deprecated
208 public int getMaxCachedStringLength() {
209 return myStringDecoder.getCache().getMaxCacheLength();
210 }
211
212 /**
213 * {@inheritDoc}
214 * <p>
215 * Overridden to throw an {@link UnsupportedOperationException}.
216 * </p>
217 */
218 @Override
219 public synchronized void mark(final int readlimit) {
220 throw new UnsupportedOperationException("Mark not supported.");
221 }
222
223 /**
224 * {@inheritDoc}
225 * <p>
226 * Overridden to return false.
227 * </p>
228 */
229 @Override
230 public boolean markSupported() {
231 return false;
232 }
233
234 /**
235 * Tries to prefetch the requested number of bytes from the underlying
236 * stream.
237 *
238 * @param size
239 * The number of bytes to try and read.
240 * @throws IOException
241 * On a failure to read from the underlying stream.
242 */
243 public final void prefetch(final int size) throws IOException {
244 fetch(size, false);
245 }
246
247 /**
248 * {@inheritDoc}
249 * <p>
250 * Overridden to track the bytes that have been read.
251 * </p>
252 */
253 @Override
254 public int read() throws IOException {
255 if (ensureFetched(1) != 1) {
256 return -1; // EOF.
257 }
258
259 final int read = (myBuffer[myBufferOffset] & 0xFF);
260
261 myBufferOffset += 1;
262
263 return read;
264 }
265
266 /**
267 * {@inheritDoc}
268 * <p>
269 * Overridden to track the bytes that have been read.
270 * </p>
271 */
272 @Override
273 public int read(final byte b[]) throws IOException {
274 return read(b, 0, b.length);
275 }
276
277 /**
278 * {@inheritDoc}
279 * <p>
280 * Overridden to track the bytes that have been read.
281 * </p>
282 */
283 @Override
284 public int read(final byte b[], final int off, final int len)
285 throws IOException {
286 final int read = ensureFetched(len - off);
287
288 System.arraycopy(myBuffer, myBufferOffset, b, off, read);
289
290 myBufferOffset += read;
291
292 return read;
293 }
294
295 /**
296 * Reads a "cstring" value from the stream:<code>
297 * <pre>
298 * cstring ::= (byte*) "\x00"
299 * </pre>
300 * </code>
301 * <p>
302 * <blockquote> CString - Zero or more modified UTF-8 encoded characters
303 * followed by '\x00'. The (byte*) MUST NOT contain '\x00', hence it is not
304 * full UTF-8. </blockquote>
305 * </p>
306 *
307 * @return The string value.
308 * @throws EOFException
309 * On insufficient data for the integer.
310 * @throws IOException
311 * On a failure reading the integer.
312 */
313 public String readCString() throws EOFException, IOException {
314
315 while (true) {
316 for (int i = myBufferOffset; i < myBufferLimit; ++i) {
317 final byte b = myBuffer[i];
318 if (b == 0) {
319 // Found the end.
320 final int offset = myBufferOffset;
321 final int length = (1 + i) - offset;
322
323 // Advance the buffer.
324 myBufferOffset = i + 1;
325
326 return myStringDecoder.decode(myBuffer, offset, length);
327 }
328 }
329
330 // Need more data.
331 ensureFetched(availableInBuffer() + 1);
332 }
333 }
334
335 /**
336 * Reads a BSON document element: <code>
337 * <pre>
338 * document ::= int32 e_list "\x00"
339 * </pre>
340 * </code>
341 *
342 * @return The Document.
343 * @throws EOFException
344 * On insufficient data for the document.
345 * @throws IOException
346 * On a failure reading the document.
347 */
348 public Document readDocument() throws IOException {
349
350 // The total length of the document.
351 final int size = readInt();
352
353 prefetch(size - 4);
354
355 return new RootDocument(readElements(), false, size);
356 }
357
358 /**
359 * Reads the complete set of bytes from the stream or throws an
360 * {@link EOFException}.
361 *
362 * @param buffer
363 * The buffer into which the data is read.
364 * @exception EOFException
365 * If the input stream reaches the end before reading all the
366 * bytes.
367 * @exception IOException
368 * On an error reading from the underlying stream.
369 */
370 public void readFully(final byte[] buffer) throws EOFException, IOException {
371 readFully(buffer, 0, buffer.length);
372 }
373
374 /**
375 * Reads a little-endian 4 byte signed integer from the stream.
376 *
377 * @return The integer value.
378 * @throws EOFException
379 * On insufficient data for the integer.
380 * @throws IOException
381 * On a failure reading the integer.
382 */
383 public int readInt() throws EOFException, IOException {
384 if (ensureFetched(4) != 4) {
385 throw new EOFException();
386 }
387
388 // Little endian.
389 int result = (myBuffer[myBufferOffset] & 0xFF);
390 result += (myBuffer[myBufferOffset + 1] & 0xFF) << 8;
391 result += (myBuffer[myBufferOffset + 2] & 0xFF) << 16;
392 result += (myBuffer[myBufferOffset + 3] & 0xFF) << 24;
393
394 myBufferOffset += 4;
395
396 return result;
397 }
398
399 /**
400 * Reads a little-endian 8 byte signed integer from the stream.
401 *
402 * @return The long value.
403 * @throws EOFException
404 * On insufficient data for the long.
405 * @throws IOException
406 * On a failure reading the long.
407 */
408 public long readLong() throws EOFException, IOException {
409 if (ensureFetched(8) != 8) {
410 throw new EOFException();
411 }
412
413 // Little endian.
414 long result = (myBuffer[myBufferOffset] & 0xFFL);
415 result += (myBuffer[myBufferOffset + 1] & 0xFFL) << 8;
416 result += (myBuffer[myBufferOffset + 2] & 0xFFL) << 16;
417 result += (myBuffer[myBufferOffset + 3] & 0xFFL) << 24;
418 result += (myBuffer[myBufferOffset + 4] & 0xFFL) << 32;
419 result += (myBuffer[myBufferOffset + 5] & 0xFFL) << 40;
420 result += (myBuffer[myBufferOffset + 6] & 0xFFL) << 48;
421 result += (myBuffer[myBufferOffset + 7] & 0xFFL) << 56;
422
423 myBufferOffset += 8;
424
425 return result;
426 }
427
428 /**
429 * {@inheritDoc}
430 * <p>
431 * Overridden to throw an {@link UnsupportedOperationException}.
432 * </p>
433 */
434 @Override
435 public synchronized void reset() throws UnsupportedOperationException {
436 throw new UnsupportedOperationException("Mark not supported.");
437 }
438
439 /**
440 * Sets the value of maximum number of strings that may have their encoded
441 * form cached.
442 *
443 * @param maxCacheEntries
444 * The new value for the maximum number of strings that may have
445 * their encoded form cached.
446 * @deprecated The cache {@link StringDecoderCache} should be controlled
447 * directory. This method will be removed after the 2.1.0
448 * release.
449 */
450 @Deprecated
451 public void setMaxCachedStringEntries(final int maxCacheEntries) {
452 myStringDecoder.getCache().setMaxCacheEntries(maxCacheEntries);
453 }
454
455 /**
456 * Sets the value of length for a string that the stream is allowed to cache
457 * to the new value. This can be used to stop a single long string from
458 * pushing useful values out of the cache.
459 *
460 * @param maxlength
461 * The new value for the length for a string that the encoder is
462 * allowed to cache.
463 * @deprecated The cache {@link StringDecoderCache} should be controlled
464 * directory. This method will be removed after the 2.1.0
465 * release.
466 */
467 @Deprecated
468 public void setMaxCachedStringLength(final int maxlength) {
469 myStringDecoder.getCache().setMaxCacheLength(maxlength);
470
471 }
472
473 /**
474 * {@inheritDoc}
475 * <p>
476 * Overridden to track the bytes that have been skipped.
477 * </p>
478 */
479 @Override
480 public long skip(final long n) throws IOException {
481
482 long skipped = Math.min(n, availableInBuffer());
483 myBufferOffset += skipped;
484
485 if (skipped < n) {
486 // Exhausted the buffer - skip in the source.
487 final long streamSkipped = myInput.skip(n - skipped);
488 myBytesRead += streamSkipped;
489 skipped += streamSkipped;
490 }
491
492 return skipped;
493 }
494
495 /**
496 * Returns the number of bytes available in the buffer.
497 *
498 * @return The number of bytes available in the buffer.
499 */
500 protected final int availableInBuffer() {
501 return myBufferLimit - myBufferOffset;
502 }
503
504 /**
505 * Reads a BSON Array element: <code>
506 * <pre>
507 * "\x04" e_name document
508 * </pre>
509 * </code>
510 *
511 * @return The {@link ArrayElement}.
512 * @throws EOFException
513 * On insufficient data for the document.
514 * @throws IOException
515 * On a failure reading the document.
516 */
517 protected ArrayElement readArrayElement() throws IOException {
518
519 final long start = getBytesRead() - 1; // Token already read.
520
521 final String name = readCString();
522 final int fetch = readInt(); // The total length of the array elements.
523 prefetch(fetch - 4);
524 final List<Element> elements = readElements();
525 final long size = getBytesRead() - start;
526
527 return new ArrayElement(name, elements, size);
528 }
529
530 /**
531 * Reads a {@link BinaryElement}'s contents: <code><pre>
532 * binary ::= int32 subtype (byte*)
533 * subtype ::= "\x00" Binary / Generic
534 * | "\x01" Function
535 * | "\x02" Binary (Old)
536 * | "\x03" UUID
537 * | "\x05" MD5
538 * | "\x80" User defined
539 * </pre>
540 * </code>
541 *
542 * @return The {@link BinaryElement}.
543 * @throws IOException
544 * On a failure reading the binary data.
545 */
546 protected BinaryElement readBinaryElement() throws IOException {
547
548 final long start = getBytesRead() - 1; // Token already read.
549
550 final String name = readCString();
551 int length = readInt();
552
553 final int subType = read();
554 if (subType < 0) {
555 throw new EOFException();
556 }
557
558 // Old binary handling.
559 if (subType == 2) {
560 final int anotherLength = readInt();
561
562 assert (anotherLength == (length - 4)) : "Binary Element Subtye 2 "
563 + "length should be outer length - 4.";
564
565 length -= 4;
566 }
567 else if ((subType == UuidElement.LEGACY_UUID_SUBTTYPE)
568 || (subType == UuidElement.UUID_SUBTTYPE)) {
569
570 final byte[] binary = new byte[length];
571 readFully(binary);
572
573 final long size = getBytesRead() - start;
574 try {
575 return new UuidElement(name, (byte) subType, binary, size);
576 }
577 catch (final IllegalArgumentException iae) {
578 // Just use the vanilla BinaryElement.
579 return new BinaryElement(name, (byte) subType, binary, size);
580 }
581 }
582
583 final long size = getBytesRead() - start;
584 return new BinaryElement(name, (byte) subType, this, length, size
585 + length);
586 }
587
588 /**
589 * Reads a {@link BooleanElement} from the stream.
590 *
591 * @return The {@link BooleanElement}.
592 * @throws IOException
593 * On a failure to read the contents of the
594 * {@link BooleanElement}.
595 */
596 protected BooleanElement readBooleanElement() throws IOException {
597 final long start = getBytesRead() - 1; // Token already read.
598
599 final String name = readCString();
600 final boolean value = (read() == 1);
601
602 final long size = getBytesRead() - start;
603
604 return new BooleanElement(name, value, size);
605 }
606
607 /**
608 * Reads a {@code DBPointerElement} from the stream.
609 *
610 * @return The {@code DBPointerElement}.
611 * @throws IOException
612 * On a failure to read the contents of the
613 * {@code DBPointerElement}.
614 * @deprecated Per the BSON specification.
615 */
616 @Deprecated
617 protected Element readDBPointerElement() throws IOException {
618 final long start = getBytesRead() - 1; // Token already read.
619
620 final String name = readCString();
621 final String dbDotCollection = readString();
622 final int timestamp = EndianUtils.swap(readInt());
623 final long machineId = EndianUtils.swap(readLong());
624
625 final long size = getBytesRead() - start;
626
627 String db = dbDotCollection;
628 String collection = "";
629 final int firstDot = dbDotCollection.indexOf('.');
630 if (0 <= firstDot) {
631 db = dbDotCollection.substring(0, firstDot);
632 collection = dbDotCollection.substring(firstDot + 1);
633 }
634 return new com.allanbank.mongodb.bson.element.DBPointerElement(name,
635 db, collection, new ObjectId(timestamp, machineId), size);
636 }
637
638 /**
639 * Reads a BSON Subdocument element: <code>
640 * <pre>
641 * "\x03" e_name document
642 * </pre>
643 * </code>
644 *
645 * @return The {@link ArrayElement}.
646 * @throws IOException
647 * On a failure reading the document.
648 */
649 protected DocumentElement readDocumentElement() throws IOException {
650 final long start = getBytesRead() - 1; // Token already read.
651
652 final String name = readCString();
653 final int fetch = readInt(); // The total length of the sub-document
654 // elements.
655 prefetch(fetch - 4);
656 final List<Element> elements = readElements();
657
658 final long size = getBytesRead() - start;
659
660 return new DocumentElement(name, elements, true, size);
661 }
662
663 /**
664 * Reads a {@link DoubleElement} from the stream.
665 *
666 * @return The {@link DoubleElement}.
667 * @throws IOException
668 * On a failure to read the contents of the
669 * {@link DoubleElement}.
670 */
671 protected DoubleElement readDoubleElement() throws IOException {
672 final long start = getBytesRead() - 1; // Token already read.
673
674 final String name = readCString();
675 final double value = Double.longBitsToDouble(readLong());
676
677 final long size = getBytesRead() - start;
678
679 return new DoubleElement(name, value, size);
680 }
681
682 /**
683 * Reads the element:<code>
684 * <pre>
685 * element ::= "\x01" e_name double Floating point
686 * | "\x02" e_name string UTF-8 string
687 * | "\x03" e_name document Embedded document
688 * | "\x04" e_name document Array
689 * | "\x05" e_name binary Binary data
690 * | "\x06" e_name Undefined — Deprecated
691 * | "\x07" e_name (byte*12) ObjectId
692 * | "\x08" e_name "\x00" Boolean "false"
693 * | "\x08" e_name "\x01" Boolean "true"
694 * | "\x09" e_name int64 UTC datetime
695 * | "\x0A" e_name Null value
696 * | "\x0B" e_name cstring cstring Regular expression
697 * | "\x0C" e_name string (byte*12) DBPointer — Deprecated
698 * | "\x0D" e_name string JavaScript code
699 * | "\x0E" e_name string Symbol
700 * | "\x0F" e_name code_w_s JavaScript code w/ scope
701 * | "\x10" e_name int32 32-bit Integer
702 * | "\x11" e_name int64 Timestamp
703 * | "\x12" e_name int64 64-bit integer
704 * | "\xFF" e_name Min key
705 * | "\x7F" e_name Max key
706 * </pre>
707 * </code>
708 *
709 * @param token
710 * The element's token.
711 * @return The Element.
712 * @throws EOFException
713 * On insufficient data for the element.
714 * @throws IOException
715 * On a failure reading the element.
716 */
717 @SuppressWarnings("deprecation")
718 protected Element readElement(final byte token) throws EOFException,
719 IOException {
720 final ElementType type = ElementType.valueOf(token);
721 if (type == null) {
722 throw new StreamCorruptedException("Unknown element type: 0x"
723 + Integer.toHexString(token & 0xFF) + ".");
724 }
725 switch (type) {
726 case ARRAY: {
727 return readArrayElement();
728 }
729 case BINARY: {
730 return readBinaryElement();
731 }
732 case DB_POINTER: {
733 return readDBPointerElement();
734 }
735 case DOCUMENT: {
736 return readDocumentElement();
737 }
738 case DOUBLE: {
739 return readDoubleElement();
740 }
741 case BOOLEAN: {
742 return readBooleanElement();
743 }
744 case INTEGER: {
745 return readIntegerElement();
746 }
747 case JAVA_SCRIPT: {
748 return readJavaScriptElement();
749 }
750 case JAVA_SCRIPT_WITH_SCOPE: {
751 return readJavaScriptWithScopeElement();
752 }
753 case LONG: {
754 return readLongElement();
755 }
756 case MAX_KEY: {
757 return readMaxKeyElement();
758 }
759 case MIN_KEY: {
760 return readMinKeyElement();
761 }
762 case MONGO_TIMESTAMP: {
763 return readMongoTimestampElement();
764 }
765 case NULL: {
766 return readNullElement();
767 }
768 case OBJECT_ID: {
769 return readObjectIdElement();
770 }
771 case REGEX: {
772 return readRegularExpressionElement();
773 }
774 case STRING: {
775 return readStringElement();
776 }
777 case SYMBOL: {
778 return readSymbolElement();
779 }
780 case UTC_TIMESTAMP: {
781 return readTimestampElement();
782 }
783 }
784
785 throw new StreamCorruptedException("Unknown element type: "
786 + type.name() + ".");
787 }
788
789 /**
790 * Reads a BSON element list (e_list): <code>
791 * <pre>
792 * e_list ::= element e_list | ""
793 * </pre>
794 * </code>
795 *
796 * @return The list of elements.
797 * @throws EOFException
798 * On insufficient data for the elements.
799 * @throws IOException
800 * On a failure reading the elements.
801 */
802 protected List<Element> readElements() throws EOFException, IOException {
803 final List<Element> elements = new ArrayList<Element>();
804 int elementToken = read();
805 while (elementToken > 0) {
806 elements.add(readElement((byte) elementToken));
807
808 elementToken = read();
809 }
810 if (elementToken < 0) {
811 throw new EOFException();
812 }
813 return elements;
814 }
815
816 /**
817 * Reads a {@link IntegerElement} from the stream.
818 *
819 * @return The {@link IntegerElement}.
820 * @throws IOException
821 * On a failure to read the contents of the
822 * {@link IntegerElement}.
823 */
824 protected IntegerElement readIntegerElement() throws IOException {
825 final long start = getBytesRead() - 1; // Token already read.
826
827 final String name = readCString();
828 final int value = readInt();
829
830 final long size = getBytesRead() - start;
831
832 return new IntegerElement(name, value, size);
833 }
834
835 /**
836 * Reads a {@link JavaScriptElement} from the stream.
837 *
838 * @return The {@link JavaScriptElement}.
839 * @throws IOException
840 * On a failure to read the contents of the
841 * {@link JavaScriptElement}.
842 */
843 protected JavaScriptElement readJavaScriptElement() throws IOException {
844 final long start = getBytesRead() - 1; // Token already read.
845
846 final String name = readCString();
847 final String javascript = readString();
848
849 final long size = getBytesRead() - start;
850
851 return new JavaScriptElement(name, javascript, size);
852 }
853
854 /**
855 * Reads a {@link JavaScriptWithScopeElement} from the stream.
856 *
857 * @return The {@link JavaScriptWithScopeElement}.
858 * @throws IOException
859 * On a failure to read the contents of the
860 * {@link JavaScriptWithScopeElement}.
861 */
862 protected JavaScriptWithScopeElement readJavaScriptWithScopeElement()
863 throws IOException {
864 final long start = getBytesRead() - 1; // Token already read.
865
866 final String name = readCString();
867 readInt(); // Total length - not used.
868 final String javascript = readString();
869 final Document scope = readDocument();
870
871 final long size = getBytesRead() - start;
872
873 return new JavaScriptWithScopeElement(name, javascript, scope, size);
874 }
875
876 /**
877 * Reads a {@link LongElement} from the stream.
878 *
879 * @return The {@link LongElement}.
880 * @throws IOException
881 * On a failure to read the contents of the {@link LongElement}.
882 */
883 protected LongElement readLongElement() throws IOException {
884 final long start = getBytesRead() - 1; // Token already read.
885
886 final String name = readCString();
887 final long value = readLong();
888
889 final long size = getBytesRead() - start;
890
891 return new LongElement(name, value, size);
892 }
893
894 /**
895 * Reads a {@link MaxKeyElement} from the stream.
896 *
897 * @return The {@link MaxKeyElement}.
898 * @throws IOException
899 * On a failure to read the contents of the
900 * {@link MaxKeyElement}.
901 */
902 protected MaxKeyElement readMaxKeyElement() throws IOException {
903 final long start = getBytesRead() - 1; // Token already read.
904
905 final String name = readCString();
906
907 final long size = getBytesRead() - start;
908
909 return new MaxKeyElement(name, size);
910 }
911
912 /**
913 * Reads a {@link MinKeyElement} from the stream.
914 *
915 * @return The {@link MinKeyElement}.
916 * @throws IOException
917 * On a failure to read the contents of the
918 * {@link MinKeyElement}.
919 */
920 protected MinKeyElement readMinKeyElement() throws IOException {
921 final long start = getBytesRead() - 1; // Token already read.
922
923 final String name = readCString();
924
925 final long size = getBytesRead() - start;
926
927 return new MinKeyElement(name, size);
928 }
929
930 /**
931 * Reads a {@link MongoTimestampElement} from the stream.
932 *
933 * @return The {@link MongoTimestampElement}.
934 * @throws IOException
935 * On a failure to read the contents of the
936 * {@link MongoTimestampElement}.
937 */
938 protected MongoTimestampElement readMongoTimestampElement()
939 throws IOException {
940 final long start = getBytesRead() - 1; // Token already read.
941
942 final String name = readCString();
943 final long timestamp = readLong();
944
945 final long size = getBytesRead() - start;
946
947 return new MongoTimestampElement(name, timestamp, size);
948 }
949
950 /**
951 * Reads a {@link NullElement} from the stream.
952 *
953 * @return The {@link NullElement}.
954 * @throws IOException
955 * On a failure to read the contents of the {@link NullElement}.
956 */
957 protected NullElement readNullElement() throws IOException {
958 final long start = getBytesRead() - 1; // Token already read.
959
960 final String name = readCString();
961
962 final long size = getBytesRead() - start;
963
964 return new NullElement(name, size);
965 }
966
967 /**
968 * Reads a {@link ObjectIdElement} from the stream.
969 *
970 * @return The {@link ObjectIdElement}.
971 * @throws IOException
972 * On a failure to read the contents of the
973 * {@link ObjectIdElement}.
974 */
975 protected ObjectIdElement readObjectIdElement() throws IOException {
976 final long start = getBytesRead() - 1; // Token already read.
977
978 final String name = readCString();
979 final int timestamp = EndianUtils.swap(readInt());
980 final long machineId = EndianUtils.swap(readLong());
981
982 final long size = getBytesRead() - start;
983
984 return new ObjectIdElement(name, new ObjectId(timestamp, machineId),
985 size);
986 }
987
988 /**
989 * Reads a {@link RegularExpressionElement} from the stream.
990 *
991 * @return The {@link RegularExpressionElement}.
992 * @throws IOException
993 * On a failure to read the contents of the
994 * {@link RegularExpressionElement}.
995 */
996 protected RegularExpressionElement readRegularExpressionElement()
997 throws IOException {
998 final long start = getBytesRead() - 1; // Token already read.
999
1000 final String name = readCString();
1001 final String pattern = readCString();
1002 final String options = readCString();
1003
1004 final long size = getBytesRead() - start;
1005
1006 return new RegularExpressionElement(name, pattern, options, size);
1007 }
1008
1009 /**
1010 * Reads a "string" value from the stream:<code>
1011 * <pre>
1012 * string ::= int32 (byte*) "\x00"
1013 * </pre>
1014 * </code>
1015 * <p>
1016 * <blockquote>String - The int32 is the number bytes in the (byte*) + 1
1017 * (for the trailing '\x00'). The (byte*) is zero or more UTF-8 encoded
1018 * characters. </blockquote>
1019 * </p>
1020 *
1021 * @return The string value.
1022 * @throws EOFException
1023 * On insufficient data for the integer.
1024 * @throws IOException
1025 * On a failure reading the integer.
1026 */
1027 protected String readString() throws EOFException, IOException {
1028 final int length = readInt();
1029
1030 if (ensureFetched(length) != length) {
1031 throw new EOFException();
1032 }
1033
1034 final int offset = myBufferOffset;
1035
1036 // Advance the buffer.
1037 myBufferOffset += length;
1038
1039 return myStringDecoder.decode(myBuffer, offset, length);
1040 }
1041
1042 /**
1043 * Reads a {@link StringElement} from the stream.
1044 *
1045 * @return The {@link StringElement}.
1046 * @throws IOException
1047 * On a failure to read the contents of the
1048 * {@link StringElement}.
1049 */
1050 protected StringElement readStringElement() throws IOException {
1051 final long start = getBytesRead() - 1; // Token already read.
1052
1053 final String name = readCString();
1054 final String value = readString();
1055
1056 final long size = getBytesRead() - start;
1057
1058 return new StringElement(name, value, size);
1059 }
1060
1061 /**
1062 * Reads a {@link SymbolElement} from the stream.
1063 *
1064 * @return The {@link SymbolElement}.
1065 * @throws IOException
1066 * On a failure to read the contents of the
1067 * {@link SymbolElement}.
1068 */
1069 protected SymbolElement readSymbolElement() throws IOException {
1070 final long start = getBytesRead() - 1; // Token already read.
1071
1072 final String name = readCString();
1073 final String symbol = readString();
1074
1075 final long size = getBytesRead() - start;
1076
1077 return new SymbolElement(name, symbol, size);
1078 }
1079
1080 /**
1081 * Reads a {@link TimestampElement} from the stream.
1082 *
1083 * @return The {@link TimestampElement}.
1084 * @throws IOException
1085 * On a failure to read the contents of the
1086 * {@link TimestampElement}.
1087 */
1088 protected TimestampElement readTimestampElement() throws IOException {
1089 final long start = getBytesRead() - 1; // Token already read.
1090
1091 final String name = readCString();
1092 final long time = readLong();
1093
1094 final long size = getBytesRead() - start;
1095
1096 return new TimestampElement(name, time, size);
1097 }
1098
1099 /**
1100 * Fetch the requested number of bytes from the underlying stream. Returns
1101 * the number of bytes available in the buffer or the number of requested
1102 * bytes, which ever is smaller.
1103 *
1104 * @param size
1105 * The number of bytes to be read.
1106 * @return The smaller of the number of bytes requested or the number of
1107 * bytes available in the buffer.
1108 * @throws IOException
1109 * On a failure to read from the underlying stream.
1110 */
1111 private final int ensureFetched(final int size) throws IOException {
1112 return fetch(size, true);
1113 }
1114
1115 /**
1116 * Fetch the requested number of bytes from the underlying stream. Returns
1117 * the number of bytes available in the buffer or the number of requested
1118 * bytes, which ever is smaller.
1119 *
1120 * @param size
1121 * The number of bytes to be read.
1122 * @param forceRead
1123 * Determines if a read is forced to ensure the buffer contains
1124 * the number of bytes.
1125 * @return The smaller of the number of bytes requested or the number of
1126 * bytes available in the buffer.
1127 * @throws IOException
1128 * On a failure to read from the underlying stream.
1129 */
1130 private final int fetch(final int size, final boolean forceRead)
1131 throws IOException {
1132 // See if we need to read more data.
1133 int available = availableInBuffer();
1134 if (available < size) {
1135 // Yes - we do.
1136
1137 // Will the size fit in the existing buffer?
1138 if (myBuffer.length < size) {
1139 // Nope - grow the buffer to the needed size.
1140 final byte[] newBuffer = new byte[size];
1141
1142 // Copy the existing content into the new buffer.
1143 System.arraycopy(myBuffer, myBufferOffset, newBuffer, 0,
1144 available);
1145 myBuffer = newBuffer;
1146 }
1147 else if (0 < available) {
1148 // Compact the buffer.
1149 System.arraycopy(myBuffer, myBufferOffset, myBuffer, 0,
1150 available);
1151 }
1152
1153 // Reset the limit and offset...
1154 myBytesRead += myBufferOffset;
1155 myBufferOffset = 0;
1156 myBufferLimit = available;
1157
1158 // Now read as much as possible to fill the buffer.
1159 int read;
1160 do {
1161 read = myInput.read(myBuffer, myBufferLimit, myBuffer.length
1162 - myBufferLimit);
1163 if (0 < read) {
1164 available += read;
1165 myBufferLimit += read;
1166 }
1167 }
1168 while (forceRead && (0 <= read) && (available < size));
1169
1170 return Math.min(size, available);
1171 }
1172
1173 return size;
1174 }
1175
1176 /**
1177 * Reads the complete set of bytes from the stream or throws an
1178 * {@link EOFException}.
1179 *
1180 * @param buffer
1181 * The buffer into which the data is read.
1182 * @param offset
1183 * The offset to start writing into the buffer.
1184 * @param length
1185 * The number of bytes to write into the buffer.
1186 * @exception EOFException
1187 * If the input stream reaches the end before reading all the
1188 * bytes.
1189 * @exception IOException
1190 * On an error reading from the underlying stream.
1191 * @see DataInput#readFully(byte[], int, int)
1192 */
1193 private void readFully(final byte[] buffer, final int offset,
1194 final int length) throws EOFException, IOException {
1195
1196 int read = Math.min(length, availableInBuffer());
1197 System.arraycopy(myBuffer, myBufferOffset, buffer, offset, read);
1198 myBufferOffset += read;
1199
1200 // Read directly from the stream to avoid a copy.
1201 while (read < length) {
1202 final int count = myInput
1203 .read(buffer, offset + read, length - read);
1204 if (count < 0) {
1205 throw new EOFException();
1206 }
1207 read += count;
1208
1209 // Directly read bytes never hit the buffer.
1210 myBytesRead += read;
1211 }
1212 }
1213 }