Classes in this File | Line Coverage | Branch Coverage | Complexity | ||||
BsonInputStream |
|
| 2.58;2.58 | ||||
BsonInputStream$1 |
|
| 2.58;2.58 |
1 | /* | |
2 | * #%L | |
3 | * BsonInputStream.java - mongodb-async-driver - Allanbank Consulting, Inc. | |
4 | * %% | |
5 | * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc. | |
6 | * %% | |
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
8 | * you may not use this file except in compliance with the License. | |
9 | * You may obtain a copy of the License at | |
10 | * | |
11 | * http://www.apache.org/licenses/LICENSE-2.0 | |
12 | * | |
13 | * Unless required by applicable law or agreed to in writing, software | |
14 | * distributed under the License is distributed on an "AS IS" BASIS, | |
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
16 | * See the License for the specific language governing permissions and | |
17 | * limitations under the License. | |
18 | * #L% | |
19 | */ | |
20 | package com.allanbank.mongodb.bson.io; | |
21 | ||
22 | import java.io.DataInput; | |
23 | import java.io.EOFException; | |
24 | import java.io.IOException; | |
25 | import java.io.InputStream; | |
26 | import java.io.StreamCorruptedException; | |
27 | import java.nio.charset.Charset; | |
28 | import java.util.ArrayList; | |
29 | import java.util.List; | |
30 | ||
31 | import com.allanbank.mongodb.bson.Document; | |
32 | import com.allanbank.mongodb.bson.Element; | |
33 | import com.allanbank.mongodb.bson.ElementType; | |
34 | import com.allanbank.mongodb.bson.element.ArrayElement; | |
35 | import com.allanbank.mongodb.bson.element.BinaryElement; | |
36 | import com.allanbank.mongodb.bson.element.BooleanElement; | |
37 | import com.allanbank.mongodb.bson.element.DocumentElement; | |
38 | import com.allanbank.mongodb.bson.element.DoubleElement; | |
39 | import com.allanbank.mongodb.bson.element.IntegerElement; | |
40 | import com.allanbank.mongodb.bson.element.JavaScriptElement; | |
41 | import com.allanbank.mongodb.bson.element.JavaScriptWithScopeElement; | |
42 | import com.allanbank.mongodb.bson.element.LongElement; | |
43 | import com.allanbank.mongodb.bson.element.MaxKeyElement; | |
44 | import com.allanbank.mongodb.bson.element.MinKeyElement; | |
45 | import com.allanbank.mongodb.bson.element.MongoTimestampElement; | |
46 | import com.allanbank.mongodb.bson.element.NullElement; | |
47 | import com.allanbank.mongodb.bson.element.ObjectId; | |
48 | import com.allanbank.mongodb.bson.element.ObjectIdElement; | |
49 | import com.allanbank.mongodb.bson.element.RegularExpressionElement; | |
50 | import com.allanbank.mongodb.bson.element.StringElement; | |
51 | import com.allanbank.mongodb.bson.element.SymbolElement; | |
52 | import com.allanbank.mongodb.bson.element.TimestampElement; | |
53 | import com.allanbank.mongodb.bson.element.UuidElement; | |
54 | import com.allanbank.mongodb.bson.impl.RootDocument; | |
55 | ||
56 | /** | |
57 | * {@link BsonInputStream} provides a class to read BSON documents based on the | |
58 | * <a href="http://bsonspec.org/">BSON specification</a>. | |
59 | * | |
60 | * @api.yes This class is part of the driver's API. Public and protected members | |
61 | * will be deprecated for at least 1 non-bugfix release (version | |
62 | * numbers are <major>.<minor>.<bugfix>) before being | |
63 | * removed or modified. | |
64 | * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved | |
65 | */ | |
66 | 1 | public class BsonInputStream extends InputStream { |
67 | ||
68 | /** UTF-8 Character set for encoding strings. */ | |
69 | 1 | public final static Charset UTF8 = StringDecoder.UTF8; |
70 | ||
71 | /** The buffered data. */ | |
72 | private byte[] myBuffer; | |
73 | ||
74 | /** The offset into the current buffer. */ | |
75 | private int myBufferLimit; | |
76 | ||
77 | /** The offset into the current buffer. */ | |
78 | private int myBufferOffset; | |
79 | ||
80 | /** Tracks the number of bytes that have been read by the stream. */ | |
81 | private long myBytesRead; | |
82 | ||
83 | /** The underlying input stream. */ | |
84 | private final InputStream myInput; | |
85 | ||
86 | /** The decoder for strings. */ | |
87 | private final StringDecoder myStringDecoder; | |
88 | ||
89 | /** | |
90 | * Creates a BSON document reader. | |
91 | * | |
92 | * @param input | |
93 | * the underlying stream to read from. | |
94 | */ | |
95 | public BsonInputStream(final InputStream input) { | |
96 | 3336 | this(input, 8 * 1024); // 8K to start. |
97 | 3336 | } |
98 | ||
99 | /** | |
100 | * Creates a BSON document reader. | |
101 | * | |
102 | * @param input | |
103 | * the underlying stream to read from. | |
104 | * @param expectedMaxDocumentSize | |
105 | * The expected maximum size for a document. If this guess is | |
106 | * wrong then there may be incremental allocations of the read | |
107 | * buffer. | |
108 | */ | |
109 | public BsonInputStream(final InputStream input, | |
110 | final int expectedMaxDocumentSize) { | |
111 | 3336 | this(input, expectedMaxDocumentSize, new StringDecoderCache()); |
112 | 3336 | } |
113 | ||
114 | /** | |
115 | * Creates a BSON document reader. | |
116 | * | |
117 | * @param input | |
118 | * the underlying stream to read from. | |
119 | * @param expectedMaxDocumentSize | |
120 | * The expected maximum size for a document. If this guess is | |
121 | * wrong then there may be incremental allocations of the read | |
122 | * buffer. | |
123 | * @param cache | |
124 | * The cache to use for decoded strings. | |
125 | */ | |
126 | public BsonInputStream(final InputStream input, | |
127 | 3509 | final int expectedMaxDocumentSize, final StringDecoderCache cache) { |
128 | 3509 | myInput = input; |
129 | 3509 | myBuffer = new byte[expectedMaxDocumentSize]; |
130 | 3509 | myBufferOffset = 0; |
131 | 3509 | myBufferLimit = 0; |
132 | 3509 | myBytesRead = 0; |
133 | ||
134 | 3509 | myStringDecoder = new StringDecoder(cache); |
135 | 3509 | } |
136 | ||
137 | /** | |
138 | * Creates a BSON document reader. | |
139 | * | |
140 | * @param input | |
141 | * the underlying stream to read from. | |
142 | * @param cache | |
143 | * The cache to use for decoded strings. | |
144 | */ | |
145 | public BsonInputStream(final InputStream input, | |
146 | final StringDecoderCache cache) { | |
147 | 173 | this(input, 8 * 1024, cache); // 8K to start. |
148 | 173 | } |
149 | ||
150 | /** | |
151 | * {@inheritDoc} | |
152 | * <p> | |
153 | * Overridden to return the number of bytes in the buffer and from the | |
154 | * source stream. | |
155 | * </p> | |
156 | */ | |
157 | @Override | |
158 | public int available() throws IOException { | |
159 | 1 | return availableInBuffer() + myInput.available(); |
160 | } | |
161 | ||
162 | /** | |
163 | * {@inheritDoc} | |
164 | * <p> | |
165 | * Overridden to close the wrapped {@link InputStream}. | |
166 | * </p> | |
167 | */ | |
168 | @Override | |
169 | public void close() throws IOException { | |
170 | 24 | myInput.close(); |
171 | 24 | } |
172 | ||
173 | /** | |
174 | * Returns the number of bytes that have been read by the stream. | |
175 | * | |
176 | * @return The number of bytes that have been read from the stream. | |
177 | */ | |
178 | public long getBytesRead() { | |
179 | 139191 | return myBytesRead + myBufferOffset; |
180 | } | |
181 | ||
182 | /** | |
183 | * Returns the maximum number of strings that may have their encoded form | |
184 | * cached. | |
185 | * | |
186 | * @return The maximum number of strings that may have their encoded form | |
187 | * cached. | |
188 | * @deprecated The cache {@link StringDecoderCache} should be controlled | |
189 | * directory. This method will be removed after the 2.1.0 | |
190 | * release. | |
191 | */ | |
192 | @Deprecated | |
193 | public int getMaxCachedStringEntries() { | |
194 | 0 | return myStringDecoder.getCache().getMaxCacheEntries(); |
195 | } | |
196 | ||
197 | /** | |
198 | * Returns the maximum length for a string that the stream is allowed to | |
199 | * cache. | |
200 | * | |
201 | * @return The maximum length for a string that the stream is allowed to | |
202 | * cache. | |
203 | * @deprecated The cache {@link StringDecoderCache} should be controlled | |
204 | * directory. This method will be removed after the 2.1.0 | |
205 | * release. | |
206 | */ | |
207 | @Deprecated | |
208 | public int getMaxCachedStringLength() { | |
209 | 0 | return myStringDecoder.getCache().getMaxCacheLength(); |
210 | } | |
211 | ||
212 | /** | |
213 | * {@inheritDoc} | |
214 | * <p> | |
215 | * Overridden to throw an {@link UnsupportedOperationException}. | |
216 | * </p> | |
217 | */ | |
218 | @Override | |
219 | public synchronized void mark(final int readlimit) { | |
220 | 0 | throw new UnsupportedOperationException("Mark not supported."); |
221 | } | |
222 | ||
223 | /** | |
224 | * {@inheritDoc} | |
225 | * <p> | |
226 | * Overridden to return false. | |
227 | * </p> | |
228 | */ | |
229 | @Override | |
230 | public boolean markSupported() { | |
231 | 0 | return false; |
232 | } | |
233 | ||
234 | /** | |
235 | * Tries to prefetch the requested number of bytes from the underlying | |
236 | * stream. | |
237 | * | |
238 | * @param size | |
239 | * The number of bytes to try and read. | |
240 | * @throws IOException | |
241 | * On a failure to read from the underlying stream. | |
242 | */ | |
243 | public final void prefetch(final int size) throws IOException { | |
244 | 37481 | fetch(size, false); |
245 | 37473 | } |
246 | ||
247 | /** | |
248 | * {@inheritDoc} | |
249 | * <p> | |
250 | * Overridden to track the bytes that have been read. | |
251 | * </p> | |
252 | */ | |
253 | @Override | |
254 | public int read() throws IOException { | |
255 | 139135 | if (ensureFetched(1) != 1) { |
256 | 20 | return -1; // EOF. |
257 | } | |
258 | ||
259 | 138963 | final int read = (myBuffer[myBufferOffset] & 0xFF); |
260 | ||
261 | 138963 | myBufferOffset += 1; |
262 | ||
263 | 138963 | return read; |
264 | } | |
265 | ||
266 | /** | |
267 | * {@inheritDoc} | |
268 | * <p> | |
269 | * Overridden to track the bytes that have been read. | |
270 | * </p> | |
271 | */ | |
272 | @Override | |
273 | public int read(final byte b[]) throws IOException { | |
274 | 0 | return read(b, 0, b.length); |
275 | } | |
276 | ||
277 | /** | |
278 | * {@inheritDoc} | |
279 | * <p> | |
280 | * Overridden to track the bytes that have been read. | |
281 | * </p> | |
282 | */ | |
283 | @Override | |
284 | public int read(final byte b[], final int off, final int len) | |
285 | throws IOException { | |
286 | 0 | final int read = ensureFetched(len - off); |
287 | ||
288 | 0 | System.arraycopy(myBuffer, myBufferOffset, b, off, read); |
289 | ||
290 | 0 | myBufferOffset += read; |
291 | ||
292 | 0 | return read; |
293 | } | |
294 | ||
295 | /** | |
296 | * Reads a "cstring" value from the stream:<code> | |
297 | * <pre> | |
298 | * cstring ::= (byte*) "\x00" | |
299 | * </pre> | |
300 | * </code> | |
301 | * <p> | |
302 | * <blockquote> CString - Zero or more modified UTF-8 encoded characters | |
303 | * followed by '\x00'. The (byte*) MUST NOT contain '\x00', hence it is not | |
304 | * full UTF-8. </blockquote> | |
305 | * </p> | |
306 | * | |
307 | * @return The string value. | |
308 | * @throws EOFException | |
309 | * On insufficient data for the integer. | |
310 | * @throws IOException | |
311 | * On a failure reading the integer. | |
312 | */ | |
313 | public String readCString() throws EOFException, IOException { | |
314 | ||
315 | while (true) { | |
316 | 400798 | for (int i = myBufferOffset; i < myBufferLimit; ++i) { |
317 | 400798 | final byte b = myBuffer[i]; |
318 | 400798 | if (b == 0) { |
319 | // Found the end. | |
320 | 70636 | final int offset = myBufferOffset; |
321 | 70636 | final int length = (1 + i) - offset; |
322 | ||
323 | // Advance the buffer. | |
324 | 70636 | myBufferOffset = i + 1; |
325 | ||
326 | 70636 | return myStringDecoder.decode(myBuffer, offset, length); |
327 | } | |
328 | } | |
329 | ||
330 | // Need more data. | |
331 | 0 | ensureFetched(availableInBuffer() + 1); |
332 | } | |
333 | } | |
334 | ||
335 | /** | |
336 | * Reads a BSON document element: <code> | |
337 | * <pre> | |
338 | * document ::= int32 e_list "\x00" | |
339 | * </pre> | |
340 | * </code> | |
341 | * | |
342 | * @return The Document. | |
343 | * @throws EOFException | |
344 | * On insufficient data for the document. | |
345 | * @throws IOException | |
346 | * On a failure reading the document. | |
347 | */ | |
348 | public Document readDocument() throws IOException { | |
349 | ||
350 | // The total length of the document. | |
351 | 5006 | final int size = readInt(); |
352 | ||
353 | 5006 | prefetch(size - 4); |
354 | ||
355 | 5006 | return new RootDocument(readElements(), false, size); |
356 | } | |
357 | ||
358 | /** | |
359 | * Reads the complete set of bytes from the stream or throws an | |
360 | * {@link EOFException}. | |
361 | * | |
362 | * @param buffer | |
363 | * The buffer into which the data is read. | |
364 | * @exception EOFException | |
365 | * If the input stream reaches the end before reading all the | |
366 | * bytes. | |
367 | * @exception IOException | |
368 | * On an error reading from the underlying stream. | |
369 | */ | |
370 | public void readFully(final byte[] buffer) throws EOFException, IOException { | |
371 | 32020 | readFully(buffer, 0, buffer.length); |
372 | 32020 | } |
373 | ||
374 | /** | |
375 | * Reads a little-endian 4 byte signed integer from the stream. | |
376 | * | |
377 | * @return The integer value. | |
378 | * @throws EOFException | |
379 | * On insufficient data for the integer. | |
380 | * @throws IOException | |
381 | * On a failure reading the integer. | |
382 | */ | |
383 | public int readInt() throws EOFException, IOException { | |
384 | 96724 | if (ensureFetched(4) != 4) { |
385 | 78 | throw new EOFException(); |
386 | } | |
387 | ||
388 | // Little endian. | |
389 | 96644 | int result = (myBuffer[myBufferOffset] & 0xFF); |
390 | 96645 | result += (myBuffer[myBufferOffset + 1] & 0xFF) << 8; |
391 | 96648 | result += (myBuffer[myBufferOffset + 2] & 0xFF) << 16; |
392 | 96648 | result += (myBuffer[myBufferOffset + 3] & 0xFF) << 24; |
393 | ||
394 | 96648 | myBufferOffset += 4; |
395 | ||
396 | 96648 | return result; |
397 | } | |
398 | ||
399 | /** | |
400 | * Reads a little-endian 8 byte signed integer from the stream. | |
401 | * | |
402 | * @return The long value. | |
403 | * @throws EOFException | |
404 | * On insufficient data for the long. | |
405 | * @throws IOException | |
406 | * On a failure reading the long. | |
407 | */ | |
408 | public long readLong() throws EOFException, IOException { | |
409 | 2297 | if (ensureFetched(8) != 8) { |
410 | 0 | throw new EOFException(); |
411 | } | |
412 | ||
413 | // Little endian. | |
414 | 2297 | long result = (myBuffer[myBufferOffset] & 0xFFL); |
415 | 2297 | result += (myBuffer[myBufferOffset + 1] & 0xFFL) << 8; |
416 | 2297 | result += (myBuffer[myBufferOffset + 2] & 0xFFL) << 16; |
417 | 2297 | result += (myBuffer[myBufferOffset + 3] & 0xFFL) << 24; |
418 | 2297 | result += (myBuffer[myBufferOffset + 4] & 0xFFL) << 32; |
419 | 2297 | result += (myBuffer[myBufferOffset + 5] & 0xFFL) << 40; |
420 | 2297 | result += (myBuffer[myBufferOffset + 6] & 0xFFL) << 48; |
421 | 2297 | result += (myBuffer[myBufferOffset + 7] & 0xFFL) << 56; |
422 | ||
423 | 2297 | myBufferOffset += 8; |
424 | ||
425 | 2297 | return result; |
426 | } | |
427 | ||
428 | /** | |
429 | * {@inheritDoc} | |
430 | * <p> | |
431 | * Overridden to throw an {@link UnsupportedOperationException}. | |
432 | * </p> | |
433 | */ | |
434 | @Override | |
435 | public synchronized void reset() throws UnsupportedOperationException { | |
436 | 0 | throw new UnsupportedOperationException("Mark not supported."); |
437 | } | |
438 | ||
439 | /** | |
440 | * Sets the value of maximum number of strings that may have their encoded | |
441 | * form cached. | |
442 | * | |
443 | * @param maxCacheEntries | |
444 | * The new value for the maximum number of strings that may have | |
445 | * their encoded form cached. | |
446 | * @deprecated The cache {@link StringDecoderCache} should be controlled | |
447 | * directory. This method will be removed after the 2.1.0 | |
448 | * release. | |
449 | */ | |
450 | @Deprecated | |
451 | public void setMaxCachedStringEntries(final int maxCacheEntries) { | |
452 | 0 | myStringDecoder.getCache().setMaxCacheEntries(maxCacheEntries); |
453 | 0 | } |
454 | ||
455 | /** | |
456 | * Sets the value of length for a string that the stream is allowed to cache | |
457 | * to the new value. This can be used to stop a single long string from | |
458 | * pushing useful values out of the cache. | |
459 | * | |
460 | * @param maxlength | |
461 | * The new value for the length for a string that the encoder is | |
462 | * allowed to cache. | |
463 | * @deprecated The cache {@link StringDecoderCache} should be controlled | |
464 | * directory. This method will be removed after the 2.1.0 | |
465 | * release. | |
466 | */ | |
467 | @Deprecated | |
468 | public void setMaxCachedStringLength(final int maxlength) { | |
469 | 0 | myStringDecoder.getCache().setMaxCacheLength(maxlength); |
470 | ||
471 | 0 | } |
472 | ||
473 | /** | |
474 | * {@inheritDoc} | |
475 | * <p> | |
476 | * Overridden to track the bytes that have been skipped. | |
477 | * </p> | |
478 | */ | |
479 | @Override | |
480 | public long skip(final long n) throws IOException { | |
481 | ||
482 | 0 | long skipped = Math.min(n, availableInBuffer()); |
483 | 0 | myBufferOffset += skipped; |
484 | ||
485 | 0 | if (skipped < n) { |
486 | // Exhausted the buffer - skip in the source. | |
487 | 0 | final long streamSkipped = myInput.skip(n - skipped); |
488 | 0 | myBytesRead += streamSkipped; |
489 | 0 | skipped += streamSkipped; |
490 | } | |
491 | ||
492 | 0 | return skipped; |
493 | } | |
494 | ||
495 | /** | |
496 | * Returns the number of bytes available in the buffer. | |
497 | * | |
498 | * @return The number of bytes available in the buffer. | |
499 | */ | |
500 | protected final int availableInBuffer() { | |
501 | 308114 | return myBufferLimit - myBufferOffset; |
502 | } | |
503 | ||
504 | /** | |
505 | * Reads a BSON Array element: <code> | |
506 | * <pre> | |
507 | * "\x04" e_name document | |
508 | * </pre> | |
509 | * </code> | |
510 | * | |
511 | * @return The {@link ArrayElement}. | |
512 | * @throws EOFException | |
513 | * On insufficient data for the document. | |
514 | * @throws IOException | |
515 | * On a failure reading the document. | |
516 | */ | |
517 | protected ArrayElement readArrayElement() throws IOException { | |
518 | ||
519 | 119 | final long start = getBytesRead() - 1; // Token already read. |
520 | ||
521 | 119 | final String name = readCString(); |
522 | 119 | final int fetch = readInt(); // The total length of the array elements. |
523 | 119 | prefetch(fetch - 4); |
524 | 119 | final List<Element> elements = readElements(); |
525 | 119 | final long size = getBytesRead() - start; |
526 | ||
527 | 119 | return new ArrayElement(name, elements, size); |
528 | } | |
529 | ||
530 | /** | |
531 | * Reads a {@link BinaryElement}'s contents: <code><pre> | |
532 | * binary ::= int32 subtype (byte*) | |
533 | * subtype ::= "\x00" Binary / Generic | |
534 | * | "\x01" Function | |
535 | * | "\x02" Binary (Old) | |
536 | * | "\x03" UUID | |
537 | * | "\x05" MD5 | |
538 | * | "\x80" User defined | |
539 | * </pre> | |
540 | * </code> | |
541 | * | |
542 | * @return The {@link BinaryElement}. | |
543 | * @throws IOException | |
544 | * On a failure reading the binary data. | |
545 | */ | |
546 | protected BinaryElement readBinaryElement() throws IOException { | |
547 | ||
548 | 32019 | final long start = getBytesRead() - 1; // Token already read. |
549 | ||
550 | 32019 | final String name = readCString(); |
551 | 32019 | int length = readInt(); |
552 | ||
553 | 32019 | final int subType = read(); |
554 | 32019 | if (subType < 0) { |
555 | 0 | throw new EOFException(); |
556 | } | |
557 | ||
558 | // Old binary handling. | |
559 | 32019 | if (subType == 2) { |
560 | 8 | final int anotherLength = readInt(); |
561 | ||
562 | assert (anotherLength == (length - 4)) : "Binary Element Subtye 2 " | |
563 | 8 | + "length should be outer length - 4."; |
564 | ||
565 | 8 | length -= 4; |
566 | 8 | } |
567 | 32011 | else if ((subType == UuidElement.LEGACY_UUID_SUBTTYPE) |
568 | || (subType == UuidElement.UUID_SUBTTYPE)) { | |
569 | ||
570 | 3 | final byte[] binary = new byte[length]; |
571 | 3 | readFully(binary); |
572 | ||
573 | 3 | final long size = getBytesRead() - start; |
574 | try { | |
575 | 3 | return new UuidElement(name, (byte) subType, binary, size); |
576 | } | |
577 | 1 | catch (final IllegalArgumentException iae) { |
578 | // Just use the vanilla BinaryElement. | |
579 | 1 | return new BinaryElement(name, (byte) subType, binary, size); |
580 | } | |
581 | } | |
582 | ||
583 | 32016 | final long size = getBytesRead() - start; |
584 | 32016 | return new BinaryElement(name, (byte) subType, this, length, size |
585 | + length); | |
586 | } | |
587 | ||
588 | /** | |
589 | * Reads a {@link BooleanElement} from the stream. | |
590 | * | |
591 | * @return The {@link BooleanElement}. | |
592 | * @throws IOException | |
593 | * On a failure to read the contents of the | |
594 | * {@link BooleanElement}. | |
595 | */ | |
596 | protected BooleanElement readBooleanElement() throws IOException { | |
597 | 154 | final long start = getBytesRead() - 1; // Token already read. |
598 | ||
599 | 154 | final String name = readCString(); |
600 | 154 | final boolean value = (read() == 1); |
601 | ||
602 | 154 | final long size = getBytesRead() - start; |
603 | ||
604 | 154 | return new BooleanElement(name, value, size); |
605 | } | |
606 | ||
607 | /** | |
608 | * Reads a {@code DBPointerElement} from the stream. | |
609 | * | |
610 | * @return The {@code DBPointerElement}. | |
611 | * @throws IOException | |
612 | * On a failure to read the contents of the | |
613 | * {@code DBPointerElement}. | |
614 | * @deprecated Per the BSON specification. | |
615 | */ | |
616 | @Deprecated | |
617 | protected Element readDBPointerElement() throws IOException { | |
618 | 8 | final long start = getBytesRead() - 1; // Token already read. |
619 | ||
620 | 8 | final String name = readCString(); |
621 | 8 | final String dbDotCollection = readString(); |
622 | 8 | final int timestamp = EndianUtils.swap(readInt()); |
623 | 8 | final long machineId = EndianUtils.swap(readLong()); |
624 | ||
625 | 8 | final long size = getBytesRead() - start; |
626 | ||
627 | 8 | String db = dbDotCollection; |
628 | 8 | String collection = ""; |
629 | 8 | final int firstDot = dbDotCollection.indexOf('.'); |
630 | 8 | if (0 <= firstDot) { |
631 | 8 | db = dbDotCollection.substring(0, firstDot); |
632 | 8 | collection = dbDotCollection.substring(firstDot + 1); |
633 | } | |
634 | 8 | return new com.allanbank.mongodb.bson.element.DBPointerElement(name, |
635 | db, collection, new ObjectId(timestamp, machineId), size); | |
636 | } | |
637 | ||
638 | /** | |
639 | * Reads a BSON Subdocument element: <code> | |
640 | * <pre> | |
641 | * "\x03" e_name document | |
642 | * </pre> | |
643 | * </code> | |
644 | * | |
645 | * @return The {@link ArrayElement}. | |
646 | * @throws IOException | |
647 | * On a failure reading the document. | |
648 | */ | |
649 | protected DocumentElement readDocumentElement() throws IOException { | |
650 | 32128 | final long start = getBytesRead() - 1; // Token already read. |
651 | ||
652 | 32128 | final String name = readCString(); |
653 | 32128 | final int fetch = readInt(); // The total length of the sub-document |
654 | // elements. | |
655 | 32128 | prefetch(fetch - 4); |
656 | 32128 | final List<Element> elements = readElements(); |
657 | ||
658 | 32128 | final long size = getBytesRead() - start; |
659 | ||
660 | 32128 | return new DocumentElement(name, elements, true, size); |
661 | } | |
662 | ||
663 | /** | |
664 | * Reads a {@link DoubleElement} from the stream. | |
665 | * | |
666 | * @return The {@link DoubleElement}. | |
667 | * @throws IOException | |
668 | * On a failure to read the contents of the | |
669 | * {@link DoubleElement}. | |
670 | */ | |
671 | protected DoubleElement readDoubleElement() throws IOException { | |
672 | 9 | final long start = getBytesRead() - 1; // Token already read. |
673 | ||
674 | 9 | final String name = readCString(); |
675 | 9 | final double value = Double.longBitsToDouble(readLong()); |
676 | ||
677 | 9 | final long size = getBytesRead() - start; |
678 | ||
679 | 9 | return new DoubleElement(name, value, size); |
680 | } | |
681 | ||
682 | /** | |
683 | * Reads the element:<code> | |
684 | * <pre> | |
685 | * element ::= "\x01" e_name double Floating point | |
686 | * | "\x02" e_name string UTF-8 string | |
687 | * | "\x03" e_name document Embedded document | |
688 | * | "\x04" e_name document Array | |
689 | * | "\x05" e_name binary Binary data | |
690 | * | "\x06" e_name Undefined — Deprecated | |
691 | * | "\x07" e_name (byte*12) ObjectId | |
692 | * | "\x08" e_name "\x00" Boolean "false" | |
693 | * | "\x08" e_name "\x01" Boolean "true" | |
694 | * | "\x09" e_name int64 UTC datetime | |
695 | * | "\x0A" e_name Null value | |
696 | * | "\x0B" e_name cstring cstring Regular expression | |
697 | * | "\x0C" e_name string (byte*12) DBPointer — Deprecated | |
698 | * | "\x0D" e_name string JavaScript code | |
699 | * | "\x0E" e_name string Symbol | |
700 | * | "\x0F" e_name code_w_s JavaScript code w/ scope | |
701 | * | "\x10" e_name int32 32-bit Integer | |
702 | * | "\x11" e_name int64 Timestamp | |
703 | * | "\x12" e_name int64 64-bit integer | |
704 | * | "\xFF" e_name Min key | |
705 | * | "\x7F" e_name Max key | |
706 | * </pre> | |
707 | * </code> | |
708 | * | |
709 | * @param token | |
710 | * The element's token. | |
711 | * @return The Element. | |
712 | * @throws EOFException | |
713 | * On insufficient data for the element. | |
714 | * @throws IOException | |
715 | * On a failure reading the element. | |
716 | */ | |
717 | @SuppressWarnings("deprecation") | |
718 | protected Element readElement(final byte token) throws EOFException, | |
719 | IOException { | |
720 | 68626 | final ElementType type = ElementType.valueOf(token); |
721 | 68626 | if (type == null) { |
722 | 1 | throw new StreamCorruptedException("Unknown element type: 0x" |
723 | + Integer.toHexString(token & 0xFF) + "."); | |
724 | } | |
725 | 1 | switch (type) { |
726 | case ARRAY: { | |
727 | 119 | return readArrayElement(); |
728 | } | |
729 | case BINARY: { | |
730 | 32019 | return readBinaryElement(); |
731 | } | |
732 | case DB_POINTER: { | |
733 | 8 | return readDBPointerElement(); |
734 | } | |
735 | case DOCUMENT: { | |
736 | 32128 | return readDocumentElement(); |
737 | } | |
738 | case DOUBLE: { | |
739 | 9 | return readDoubleElement(); |
740 | } | |
741 | case BOOLEAN: { | |
742 | 154 | return readBooleanElement(); |
743 | } | |
744 | case INTEGER: { | |
745 | 3649 | return readIntegerElement(); |
746 | } | |
747 | case JAVA_SCRIPT: { | |
748 | 8 | return readJavaScriptElement(); |
749 | } | |
750 | case JAVA_SCRIPT_WITH_SCOPE: { | |
751 | 8 | return readJavaScriptWithScopeElement(); |
752 | } | |
753 | case LONG: { | |
754 | 10 | return readLongElement(); |
755 | } | |
756 | case MAX_KEY: { | |
757 | 8 | return readMaxKeyElement(); |
758 | } | |
759 | case MIN_KEY: { | |
760 | 8 | return readMinKeyElement(); |
761 | } | |
762 | case MONGO_TIMESTAMP: { | |
763 | 8 | return readMongoTimestampElement(); |
764 | } | |
765 | case NULL: { | |
766 | 8 | return readNullElement(); |
767 | } | |
768 | case OBJECT_ID: { | |
769 | 8 | return readObjectIdElement(); |
770 | } | |
771 | case REGEX: { | |
772 | 14 | return readRegularExpressionElement(); |
773 | } | |
774 | case STRING: { | |
775 | 439 | return readStringElement(); |
776 | } | |
777 | case SYMBOL: { | |
778 | 8 | return readSymbolElement(); |
779 | } | |
780 | case UTC_TIMESTAMP: { | |
781 | 12 | return readTimestampElement(); |
782 | } | |
783 | } | |
784 | ||
785 | 0 | throw new StreamCorruptedException("Unknown element type: " |
786 | + type.name() + "."); | |
787 | } | |
788 | ||
789 | /** | |
790 | * Reads a BSON element list (e_list): <code> | |
791 | * <pre> | |
792 | * e_list ::= element e_list | "" | |
793 | * </pre> | |
794 | * </code> | |
795 | * | |
796 | * @return The list of elements. | |
797 | * @throws EOFException | |
798 | * On insufficient data for the elements. | |
799 | * @throws IOException | |
800 | * On a failure reading the elements. | |
801 | */ | |
802 | protected List<Element> readElements() throws EOFException, IOException { | |
803 | 37253 | final List<Element> elements = new ArrayList<Element>(); |
804 | 37253 | int elementToken = read(); |
805 | 105877 | while (elementToken > 0) { |
806 | 68626 | elements.add(readElement((byte) elementToken)); |
807 | ||
808 | 68623 | elementToken = read(); |
809 | } | |
810 | 37251 | if (elementToken < 0) { |
811 | 0 | throw new EOFException(); |
812 | } | |
813 | 37251 | return elements; |
814 | } | |
815 | ||
816 | /** | |
817 | * Reads a {@link IntegerElement} from the stream. | |
818 | * | |
819 | * @return The {@link IntegerElement}. | |
820 | * @throws IOException | |
821 | * On a failure to read the contents of the | |
822 | * {@link IntegerElement}. | |
823 | */ | |
824 | protected IntegerElement readIntegerElement() throws IOException { | |
825 | 3649 | final long start = getBytesRead() - 1; // Token already read. |
826 | ||
827 | 3649 | final String name = readCString(); |
828 | 3649 | final int value = readInt(); |
829 | ||
830 | 3649 | final long size = getBytesRead() - start; |
831 | ||
832 | 3649 | return new IntegerElement(name, value, size); |
833 | } | |
834 | ||
835 | /** | |
836 | * Reads a {@link JavaScriptElement} from the stream. | |
837 | * | |
838 | * @return The {@link JavaScriptElement}. | |
839 | * @throws IOException | |
840 | * On a failure to read the contents of the | |
841 | * {@link JavaScriptElement}. | |
842 | */ | |
843 | protected JavaScriptElement readJavaScriptElement() throws IOException { | |
844 | 8 | final long start = getBytesRead() - 1; // Token already read. |
845 | ||
846 | 8 | final String name = readCString(); |
847 | 8 | final String javascript = readString(); |
848 | ||
849 | 8 | final long size = getBytesRead() - start; |
850 | ||
851 | 8 | return new JavaScriptElement(name, javascript, size); |
852 | } | |
853 | ||
854 | /** | |
855 | * Reads a {@link JavaScriptWithScopeElement} from the stream. | |
856 | * | |
857 | * @return The {@link JavaScriptWithScopeElement}. | |
858 | * @throws IOException | |
859 | * On a failure to read the contents of the | |
860 | * {@link JavaScriptWithScopeElement}. | |
861 | */ | |
862 | protected JavaScriptWithScopeElement readJavaScriptWithScopeElement() | |
863 | throws IOException { | |
864 | 8 | final long start = getBytesRead() - 1; // Token already read. |
865 | ||
866 | 8 | final String name = readCString(); |
867 | 8 | readInt(); // Total length - not used. |
868 | 8 | final String javascript = readString(); |
869 | 8 | final Document scope = readDocument(); |
870 | ||
871 | 8 | final long size = getBytesRead() - start; |
872 | ||
873 | 8 | return new JavaScriptWithScopeElement(name, javascript, scope, size); |
874 | } | |
875 | ||
876 | /** | |
877 | * Reads a {@link LongElement} from the stream. | |
878 | * | |
879 | * @return The {@link LongElement}. | |
880 | * @throws IOException | |
881 | * On a failure to read the contents of the {@link LongElement}. | |
882 | */ | |
883 | protected LongElement readLongElement() throws IOException { | |
884 | 10 | final long start = getBytesRead() - 1; // Token already read. |
885 | ||
886 | 10 | final String name = readCString(); |
887 | 10 | final long value = readLong(); |
888 | ||
889 | 10 | final long size = getBytesRead() - start; |
890 | ||
891 | 10 | return new LongElement(name, value, size); |
892 | } | |
893 | ||
894 | /** | |
895 | * Reads a {@link MaxKeyElement} from the stream. | |
896 | * | |
897 | * @return The {@link MaxKeyElement}. | |
898 | * @throws IOException | |
899 | * On a failure to read the contents of the | |
900 | * {@link MaxKeyElement}. | |
901 | */ | |
902 | protected MaxKeyElement readMaxKeyElement() throws IOException { | |
903 | 8 | final long start = getBytesRead() - 1; // Token already read. |
904 | ||
905 | 8 | final String name = readCString(); |
906 | ||
907 | 8 | final long size = getBytesRead() - start; |
908 | ||
909 | 8 | return new MaxKeyElement(name, size); |
910 | } | |
911 | ||
912 | /** | |
913 | * Reads a {@link MinKeyElement} from the stream. | |
914 | * | |
915 | * @return The {@link MinKeyElement}. | |
916 | * @throws IOException | |
917 | * On a failure to read the contents of the | |
918 | * {@link MinKeyElement}. | |
919 | */ | |
920 | protected MinKeyElement readMinKeyElement() throws IOException { | |
921 | 8 | final long start = getBytesRead() - 1; // Token already read. |
922 | ||
923 | 8 | final String name = readCString(); |
924 | ||
925 | 8 | final long size = getBytesRead() - start; |
926 | ||
927 | 8 | return new MinKeyElement(name, size); |
928 | } | |
929 | ||
930 | /** | |
931 | * Reads a {@link MongoTimestampElement} from the stream. | |
932 | * | |
933 | * @return The {@link MongoTimestampElement}. | |
934 | * @throws IOException | |
935 | * On a failure to read the contents of the | |
936 | * {@link MongoTimestampElement}. | |
937 | */ | |
938 | protected MongoTimestampElement readMongoTimestampElement() | |
939 | throws IOException { | |
940 | 8 | final long start = getBytesRead() - 1; // Token already read. |
941 | ||
942 | 8 | final String name = readCString(); |
943 | 8 | final long timestamp = readLong(); |
944 | ||
945 | 8 | final long size = getBytesRead() - start; |
946 | ||
947 | 8 | return new MongoTimestampElement(name, timestamp, size); |
948 | } | |
949 | ||
950 | /** | |
951 | * Reads a {@link NullElement} from the stream. | |
952 | * | |
953 | * @return The {@link NullElement}. | |
954 | * @throws IOException | |
955 | * On a failure to read the contents of the {@link NullElement}. | |
956 | */ | |
957 | protected NullElement readNullElement() throws IOException { | |
958 | 8 | final long start = getBytesRead() - 1; // Token already read. |
959 | ||
960 | 8 | final String name = readCString(); |
961 | ||
962 | 8 | final long size = getBytesRead() - start; |
963 | ||
964 | 8 | return new NullElement(name, size); |
965 | } | |
966 | ||
967 | /** | |
968 | * Reads a {@link ObjectIdElement} from the stream. | |
969 | * | |
970 | * @return The {@link ObjectIdElement}. | |
971 | * @throws IOException | |
972 | * On a failure to read the contents of the | |
973 | * {@link ObjectIdElement}. | |
974 | */ | |
975 | protected ObjectIdElement readObjectIdElement() throws IOException { | |
976 | 8 | final long start = getBytesRead() - 1; // Token already read. |
977 | ||
978 | 8 | final String name = readCString(); |
979 | 8 | final int timestamp = EndianUtils.swap(readInt()); |
980 | 8 | final long machineId = EndianUtils.swap(readLong()); |
981 | ||
982 | 8 | final long size = getBytesRead() - start; |
983 | ||
984 | 8 | return new ObjectIdElement(name, new ObjectId(timestamp, machineId), |
985 | size); | |
986 | } | |
987 | ||
988 | /** | |
989 | * Reads a {@link RegularExpressionElement} from the stream. | |
990 | * | |
991 | * @return The {@link RegularExpressionElement}. | |
992 | * @throws IOException | |
993 | * On a failure to read the contents of the | |
994 | * {@link RegularExpressionElement}. | |
995 | */ | |
996 | protected RegularExpressionElement readRegularExpressionElement() | |
997 | throws IOException { | |
998 | 14 | final long start = getBytesRead() - 1; // Token already read. |
999 | ||
1000 | 14 | final String name = readCString(); |
1001 | 14 | final String pattern = readCString(); |
1002 | 14 | final String options = readCString(); |
1003 | ||
1004 | 14 | final long size = getBytesRead() - start; |
1005 | ||
1006 | 14 | return new RegularExpressionElement(name, pattern, options, size); |
1007 | } | |
1008 | ||
1009 | /** | |
1010 | * Reads a "string" value from the stream:<code> | |
1011 | * <pre> | |
1012 | * string ::= int32 (byte*) "\x00" | |
1013 | * </pre> | |
1014 | * </code> | |
1015 | * <p> | |
1016 | * <blockquote>String - The int32 is the number bytes in the (byte*) + 1 | |
1017 | * (for the trailing '\x00'). The (byte*) is zero or more UTF-8 encoded | |
1018 | * characters. </blockquote> | |
1019 | * </p> | |
1020 | * | |
1021 | * @return The string value. | |
1022 | * @throws EOFException | |
1023 | * On insufficient data for the integer. | |
1024 | * @throws IOException | |
1025 | * On a failure reading the integer. | |
1026 | */ | |
1027 | protected String readString() throws EOFException, IOException { | |
1028 | 471 | final int length = readInt(); |
1029 | ||
1030 | 471 | if (ensureFetched(length) != length) { |
1031 | 1 | throw new EOFException(); |
1032 | } | |
1033 | ||
1034 | 470 | final int offset = myBufferOffset; |
1035 | ||
1036 | // Advance the buffer. | |
1037 | 470 | myBufferOffset += length; |
1038 | ||
1039 | 470 | return myStringDecoder.decode(myBuffer, offset, length); |
1040 | } | |
1041 | ||
1042 | /** | |
1043 | * Reads a {@link StringElement} from the stream. | |
1044 | * | |
1045 | * @return The {@link StringElement}. | |
1046 | * @throws IOException | |
1047 | * On a failure to read the contents of the | |
1048 | * {@link StringElement}. | |
1049 | */ | |
1050 | protected StringElement readStringElement() throws IOException { | |
1051 | 439 | final long start = getBytesRead() - 1; // Token already read. |
1052 | ||
1053 | 439 | final String name = readCString(); |
1054 | 439 | final String value = readString(); |
1055 | ||
1056 | 438 | final long size = getBytesRead() - start; |
1057 | ||
1058 | 438 | return new StringElement(name, value, size); |
1059 | } | |
1060 | ||
1061 | /** | |
1062 | * Reads a {@link SymbolElement} from the stream. | |
1063 | * | |
1064 | * @return The {@link SymbolElement}. | |
1065 | * @throws IOException | |
1066 | * On a failure to read the contents of the | |
1067 | * {@link SymbolElement}. | |
1068 | */ | |
1069 | protected SymbolElement readSymbolElement() throws IOException { | |
1070 | 8 | final long start = getBytesRead() - 1; // Token already read. |
1071 | ||
1072 | 8 | final String name = readCString(); |
1073 | 8 | final String symbol = readString(); |
1074 | ||
1075 | 8 | final long size = getBytesRead() - start; |
1076 | ||
1077 | 8 | return new SymbolElement(name, symbol, size); |
1078 | } | |
1079 | ||
1080 | /** | |
1081 | * Reads a {@link TimestampElement} from the stream. | |
1082 | * | |
1083 | * @return The {@link TimestampElement}. | |
1084 | * @throws IOException | |
1085 | * On a failure to read the contents of the | |
1086 | * {@link TimestampElement}. | |
1087 | */ | |
1088 | protected TimestampElement readTimestampElement() throws IOException { | |
1089 | 12 | final long start = getBytesRead() - 1; // Token already read. |
1090 | ||
1091 | 12 | final String name = readCString(); |
1092 | 12 | final long time = readLong(); |
1093 | ||
1094 | 12 | final long size = getBytesRead() - start; |
1095 | ||
1096 | 12 | return new TimestampElement(name, time, size); |
1097 | } | |
1098 | ||
1099 | /** | |
1100 | * Fetch the requested number of bytes from the underlying stream. Returns | |
1101 | * the number of bytes available in the buffer or the number of requested | |
1102 | * bytes, which ever is smaller. | |
1103 | * | |
1104 | * @param size | |
1105 | * The number of bytes to be read. | |
1106 | * @return The smaller of the number of bytes requested or the number of | |
1107 | * bytes available in the buffer. | |
1108 | * @throws IOException | |
1109 | * On a failure to read from the underlying stream. | |
1110 | */ | |
1111 | private final int ensureFetched(final int size) throws IOException { | |
1112 | 238619 | return fetch(size, true); |
1113 | } | |
1114 | ||
1115 | /** | |
1116 | * Fetch the requested number of bytes from the underlying stream. Returns | |
1117 | * the number of bytes available in the buffer or the number of requested | |
1118 | * bytes, which ever is smaller. | |
1119 | * | |
1120 | * @param size | |
1121 | * The number of bytes to be read. | |
1122 | * @param forceRead | |
1123 | * Determines if a read is forced to ensure the buffer contains | |
1124 | * the number of bytes. | |
1125 | * @return The smaller of the number of bytes requested or the number of | |
1126 | * bytes available in the buffer. | |
1127 | * @throws IOException | |
1128 | * On a failure to read from the underlying stream. | |
1129 | */ | |
1130 | private final int fetch(final int size, final boolean forceRead) | |
1131 | throws IOException { | |
1132 | // See if we need to read more data. | |
1133 | 276092 | int available = availableInBuffer(); |
1134 | 276093 | if (available < size) { |
1135 | // Yes - we do. | |
1136 | ||
1137 | // Will the size fit in the existing buffer? | |
1138 | 3951 | if (myBuffer.length < size) { |
1139 | // Nope - grow the buffer to the needed size. | |
1140 | 10 | final byte[] newBuffer = new byte[size]; |
1141 | ||
1142 | // Copy the existing content into the new buffer. | |
1143 | 10 | System.arraycopy(myBuffer, myBufferOffset, newBuffer, 0, |
1144 | available); | |
1145 | 10 | myBuffer = newBuffer; |
1146 | 10 | } |
1147 | 3941 | else if (0 < available) { |
1148 | // Compact the buffer. | |
1149 | 2 | System.arraycopy(myBuffer, myBufferOffset, myBuffer, 0, |
1150 | available); | |
1151 | } | |
1152 | ||
1153 | // Reset the limit and offset... | |
1154 | 3951 | myBytesRead += myBufferOffset; |
1155 | 3951 | myBufferOffset = 0; |
1156 | 3951 | myBufferLimit = available; |
1157 | ||
1158 | // Now read as much as possible to fill the buffer. | |
1159 | int read; | |
1160 | do { | |
1161 | 3951 | read = myInput.read(myBuffer, myBufferLimit, myBuffer.length |
1162 | - myBufferLimit); | |
1163 | 3788 | if (0 < read) { |
1164 | 3688 | available += read; |
1165 | 3688 | myBufferLimit += read; |
1166 | } | |
1167 | } | |
1168 | 3788 | while (forceRead && (0 <= read) && (available < size)); |
1169 | ||
1170 | 3788 | return Math.min(size, available); |
1171 | } | |
1172 | ||
1173 | 272146 | return size; |
1174 | } | |
1175 | ||
1176 | /** | |
1177 | * Reads the complete set of bytes from the stream or throws an | |
1178 | * {@link EOFException}. | |
1179 | * | |
1180 | * @param buffer | |
1181 | * The buffer into which the data is read. | |
1182 | * @param offset | |
1183 | * The offset to start writing into the buffer. | |
1184 | * @param length | |
1185 | * The number of bytes to write into the buffer. | |
1186 | * @exception EOFException | |
1187 | * If the input stream reaches the end before reading all the | |
1188 | * bytes. | |
1189 | * @exception IOException | |
1190 | * On an error reading from the underlying stream. | |
1191 | * @see DataInput#readFully(byte[], int, int) | |
1192 | */ | |
1193 | private void readFully(final byte[] buffer, final int offset, | |
1194 | final int length) throws EOFException, IOException { | |
1195 | ||
1196 | 32020 | int read = Math.min(length, availableInBuffer()); |
1197 | 32020 | System.arraycopy(myBuffer, myBufferOffset, buffer, offset, read); |
1198 | 32020 | myBufferOffset += read; |
1199 | ||
1200 | // Read directly from the stream to avoid a copy. | |
1201 | 32021 | while (read < length) { |
1202 | 1 | final int count = myInput |
1203 | .read(buffer, offset + read, length - read); | |
1204 | 1 | if (count < 0) { |
1205 | 0 | throw new EOFException(); |
1206 | } | |
1207 | 1 | read += count; |
1208 | ||
1209 | // Directly read bytes never hit the buffer. | |
1210 | 1 | myBytesRead += read; |
1211 | 1 | } |
1212 | 32020 | } |
1213 | } |