View Javadoc
1   /*
2    * #%L
3    * RandomAccessOutputStream.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.bson.io;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.nio.charset.Charset;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  /**
29   * Provides a capability similar to the <tt>ByteArrayOutputStream</tt> but also
30   * provides the ability to re-write portions of the stream already written and
31   * to determine the current size (or position) of the written data.
32   * <p>
33   * Instead of allocating a single large byte array this implementation tracks a
34   * group of (increasing in size) buffers. This should reduce the runtime cost of
35   * buffer reallocations since it avoids the copy of contents from one buffer to
36   * another.
37   * </p>
38   * 
39   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
40   *         mutated in incompatible ways between any two releases of the driver.
41   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
42   */
43  public class RandomAccessOutputStream extends OutputStream {
44      /** UTF-8 Character set for encoding strings. */
45      public final static Charset UTF8 = StringDecoder.UTF8;
46  
47      /** The maximum size buffer to allocate. Must be a power of 2. */
48      private static final int BUFFER_SIZE;
49  
50      /** Mask for the buffer position within a specific buffer. */
51      private static final int BUFFER_SIZE_MASK;
52  
53      /** The number of bits in the buffer size. */
54      private static final int BUFFER_SIZE_SHIFT;
55  
56      static {
57          BUFFER_SIZE = Integer.highestOneBit(8192);
58          BUFFER_SIZE_MASK = BUFFER_SIZE - 1;
59          BUFFER_SIZE_SHIFT = Integer.numberOfTrailingZeros(BUFFER_SIZE);
60      }
61  
62      /** The set of buffers allocated. */
63      private final List<byte[]> myBuffers;
64  
65      /** The current buffer being written. */
66      private byte[] myCurrentBuffer;
67  
68      /** The index of the current buffer. */
69      private int myCurrentBufferIndex;
70  
71      /** The offset into the current buffer. */
72      private int myCurrentBufferOffset;
73  
74      /**
75       * Buffer for serialization of integer types. Not needed for normal integer
76       * writes since the {@link RandomAccessOutputStream} will coalesce the
77       * single byte writes but for the {@link RandomAccessOutputStream#writeAt}
78       * operation a seek to the appropriate backing buffer is required. For large
79       * documents the seeks could be significant. This buffer ensures there is
80       * only 1 seek for each {@link #writeIntAt(long, int)}.
81       */
82      private final byte[] myIntegerBytes;
83  
84      /** The current buffer being written. */
85      private long mySize;
86  
87      /** The offset into the current buffer. */
88      private final StringEncoder myStringEncoder;
89  
90      /**
91       * Creates a new {@link RandomAccessOutputStream}.
92       */
93      public RandomAccessOutputStream() {
94          this(new StringEncoderCache());
95      }
96  
97      /**
98       * Creates a new {@link RandomAccessOutputStream}.
99       * 
100      * @param cache
101      *            The cache for encoding string.
102      */
103     public RandomAccessOutputStream(final StringEncoderCache cache) {
104         mySize = 0;
105         myCurrentBufferOffset = 0;
106         myCurrentBufferIndex = 0;
107         myCurrentBuffer = new byte[BUFFER_SIZE];
108 
109         myStringEncoder = new StringEncoder(cache);
110 
111         myBuffers = new ArrayList<byte[]>();
112         myBuffers.add(myCurrentBuffer);
113 
114         myIntegerBytes = new byte[8];
115     }
116 
117     /**
118      * {@inheritDoc}
119      */
120     @Override
121     public void close() {
122         // Nothing.
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     @Override
129     public void flush() {
130         // Nothing.
131     }
132 
133     /**
134      * Returns the maximum number of strings that may have their encoded form
135      * cached.
136      * 
137      * @return The maximum number of strings that may have their encoded form
138      *         cached.
139      * @deprecated The cache {@link StringEncoderCache} should be controlled
140      *             directory. This method will be removed after the 2.1.0
141      *             release.
142      */
143     @Deprecated
144     public int getMaxCachedStringEntries() {
145         return myStringEncoder.getCache().getMaxCacheEntries();
146     }
147 
148     /**
149      * Returns the maximum length for a string that the stream is allowed to
150      * cache.
151      * 
152      * @return The maximum length for a string that the stream is allowed to
153      *         cache.
154      * @deprecated The cache {@link StringEncoderCache} should be controlled
155      *             directory. This method will be removed after the 2.1.0
156      *             release.
157      */
158     @Deprecated
159     public int getMaxCachedStringLength() {
160         return myStringEncoder.getCache().getMaxCacheLength();
161     }
162 
163     /**
164      * Returns the current position in the stream. This is equivalent to
165      * {@link #getSize()}.
166      * 
167      * @return The current position in the stream.
168      */
169     public long getPosition() {
170         return getSize();
171     }
172 
173     /**
174      * Returns the number of bytes written to the stream.
175      * 
176      * @return The current number of bytes written to the stream.
177      */
178     public long getSize() {
179         return mySize;
180     }
181 
182     /**
183      * Resets the <code>size</code> of the buffer to zero. All buffers can be
184      * re-used.
185      */
186     public void reset() {
187         mySize = 0;
188         myCurrentBufferOffset = 0;
189         myCurrentBufferIndex = 0;
190         myCurrentBuffer = myBuffers.get(0);
191     }
192 
193     /**
194      * Sets the value of maximum number of strings that may have their encoded
195      * form cached.
196      * 
197      * @param maxCacheEntries
198      *            The new value for the maximum number of strings that may have
199      *            their encoded form cached.
200      * @deprecated The cache {@link StringEncoderCache} should be controlled
201      *             directory. This method will be removed after the 2.1.0
202      *             release.
203      */
204     @Deprecated
205     public void setMaxCachedStringEntries(final int maxCacheEntries) {
206         myStringEncoder.getCache().setMaxCacheEntries(maxCacheEntries);
207     }
208 
209     /**
210      * Sets the value of length for a string that the stream is allowed to cache
211      * to the new value. This can be used to stop a single long string from
212      * pushing useful values out of the cache.
213      * 
214      * @param maxlength
215      *            The new value for the length for a string that the encoder is
216      *            allowed to cache.
217      * @deprecated The cache {@link StringEncoderCache} should be controlled
218      *             directory. This method will be removed after the 2.1.0
219      *             release.
220      */
221     @Deprecated
222     public void setMaxCachedStringLength(final int maxlength) {
223         myStringEncoder.getCache().setMaxCacheLength(maxlength);
224 
225     }
226 
227     /**
228      * {@inheritDoc}
229      * 
230      * @param buffer
231      *            the data.
232      */
233     @Override
234     public void write(final byte buffer[]) {
235         write(buffer, 0, buffer.length);
236     }
237 
238     /**
239      * {@inheritDoc}
240      * 
241      * @param buffer
242      *            the data.
243      * @param offset
244      *            the start offset in the data.
245      * @param length
246      *            the number of bytes to write.
247      */
248     @Override
249     public void write(final byte buffer[], final int offset, final int length) {
250         if (buffer == null) {
251             throw new NullPointerException();
252         }
253         else if ((offset < 0) || (offset > buffer.length) || (length < 0)
254                 || ((offset + length) > buffer.length)
255                 || ((offset + length) < 0)) {
256             throw new IndexOutOfBoundsException();
257         }
258         else if (length == 0) {
259             return;
260         }
261 
262         int wrote = 0;
263         while (wrote < length) {
264             if (myCurrentBuffer.length <= myCurrentBufferOffset) {
265                 nextBuffer();
266             }
267 
268             final int available = myCurrentBuffer.length
269                     - myCurrentBufferOffset;
270             final int toWrite = Math.min(length - wrote, available);
271 
272             System.arraycopy(buffer, offset + wrote, myCurrentBuffer,
273                     myCurrentBufferOffset, toWrite);
274 
275             myCurrentBufferOffset += toWrite;
276             mySize += toWrite;
277             wrote += toWrite;
278         }
279     }
280 
281     /**
282      * {@inheritDoc}
283      */
284     @Override
285     public void write(final int b) {
286         if (myCurrentBuffer.length <= myCurrentBufferOffset) {
287             nextBuffer();
288         }
289 
290         myCurrentBuffer[myCurrentBufferOffset] = (byte) b;
291         myCurrentBufferOffset += 1;
292         mySize += 1;
293     }
294 
295     /**
296      * Similar to {@link #write(byte[])} but allows a portion of the already
297      * written buffer to be re-written.
298      * <p>
299      * Equivalent to <code>writeAt(position, buffer, 0, buffer.length);</code>.
300      * </p>
301      * 
302      * @param position
303      *            The position to write at. This location should have already
304      *            been written.
305      * @param buffer
306      *            the data.
307      */
308     public void writeAt(final long position, final byte buffer[]) {
309         writeAt(position, buffer, 0, buffer.length);
310     }
311 
312     /**
313      * Similar to {@link #write(byte[], int, int)} but allows a portion of the
314      * already written buffer to be re-written.
315      * 
316      * @param position
317      *            The position to write at. This location should have already
318      *            been written.
319      * @param buffer
320      *            the data.
321      * @param offset
322      *            the start offset in the data.
323      * @param length
324      *            the number of bytes to write.
325      */
326     public void writeAt(final long position, final byte buffer[],
327             final int offset, final int length) {
328         if (buffer == null) {
329             throw new NullPointerException();
330         }
331         else if ((offset < 0) || (offset > buffer.length) || (length < 0)
332                 || ((offset + length) > buffer.length)
333                 || ((offset + length) < 0) || ((position + length) > getSize())) {
334             throw new IndexOutOfBoundsException();
335         }
336         else if (length == 0) {
337             return;
338         }
339 
340         // Find the start buffer.
341         final long start = position & BUFFER_SIZE_MASK;
342         int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
343         byte[] internalBuffer = myBuffers.get(bufferIndex);
344 
345         // Write into the correct position.
346         int wrote = 0;
347         int internalOffset = (int) start;
348         while (wrote < length) {
349             if (internalBuffer.length <= internalOffset) {
350                 bufferIndex += 1;
351                 internalBuffer = myBuffers.get(bufferIndex);
352                 internalOffset = 0;
353             }
354 
355             final int available = internalBuffer.length - internalOffset;
356             final int toWrite = Math.min(length - wrote, available);
357 
358             System.arraycopy(buffer, offset + wrote, internalBuffer,
359                     internalOffset, toWrite);
360 
361             internalOffset += toWrite;
362             wrote += toWrite;
363         }
364     }
365 
366     /**
367      * Similar to {@link #write(int)} but allows a portion of the already
368      * written buffer to be re-written.
369      * 
370      * @param position
371      *            The position to write at. This location should have already
372      *            been written.
373      * @param b
374      *            The byte value to write.
375      */
376     public void writeAt(final long position, final int b) {
377         // Find the start buffer.
378         final long start = position & BUFFER_SIZE_MASK;
379         final int bufferIndex = (int) (position >> BUFFER_SIZE_SHIFT);
380         final byte[] internalBuffer = myBuffers.get(bufferIndex);
381 
382         internalBuffer[(int) start] = (byte) b;
383     }
384 
385     /**
386      * Writes a single byte to the stream.
387      * 
388      * @param b
389      *            The byte to write.
390      */
391     public void writeByte(final byte b) {
392         write(b & 0xFF);
393     }
394 
395     /**
396      * Writes a sequence of bytes to the under lying stream.
397      * 
398      * @param data
399      *            The bytes to write.
400      */
401     public void writeBytes(final byte[] data) {
402         write(data);
403     }
404 
405     /**
406      * Writes a "Cstring" to the stream.
407      * 
408      * @param strings
409      *            The CString to write. The strings are concatenated into a
410      *            single CString value.
411      */
412     public void writeCString(final String... strings) {
413         for (final String string : strings) {
414             // writeBytes(string.getBytes(UTF8));
415             try {
416                 myStringEncoder.encode(string, this);
417             }
418             catch (final IOException cannotHappen) {
419                 // We never throw so should not throw from the encoder.
420                 throw new IllegalStateException(
421                         "Encoder should not throw when writing to a buffer.");
422             }
423         }
424         writeByte((byte) 0);
425     }
426 
427     /**
428      * Write the integer value in little-endian byte order.
429      * 
430      * @param value
431      *            The integer to write.
432      */
433     public void writeInt(final int value) {
434         myIntegerBytes[0] = (byte) (value & 0xFF);
435         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
436         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
437         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
438 
439         write(myIntegerBytes, 0, 4);
440     }
441 
442     /**
443      * Write the integer value in little-endian byte order at the specified
444      * position in the stream.
445      * 
446      * @param position
447      *            The position in the stream to write the integer.
448      * @param value
449      *            The long to write.
450      */
451     public void writeIntAt(final long position, final int value) {
452         myIntegerBytes[0] = (byte) (value & 0xFF);
453         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
454         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
455         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
456 
457         writeAt(position, myIntegerBytes, 0, 4);
458     }
459 
460     /**
461      * Write the long value in little-endian byte order.
462      * 
463      * @param value
464      *            The long to write.
465      */
466     public void writeLong(final long value) {
467         myIntegerBytes[0] = (byte) (value & 0xFF);
468         myIntegerBytes[1] = (byte) ((value >> 8) & 0xFF);
469         myIntegerBytes[2] = (byte) ((value >> 16) & 0xFF);
470         myIntegerBytes[3] = (byte) ((value >> 24) & 0xFF);
471         myIntegerBytes[4] = (byte) ((value >> 32) & 0xFF);
472         myIntegerBytes[5] = (byte) ((value >> 40) & 0xFF);
473         myIntegerBytes[6] = (byte) ((value >> 48) & 0xFF);
474         myIntegerBytes[7] = (byte) ((value >> 56) & 0xFF);
475 
476         write(myIntegerBytes, 0, 8);
477     }
478 
479     /**
480      * Writes a "string" to the stream.
481      * 
482      * @param string
483      *            The String to write.
484      */
485     public void writeString(final String string) {
486         // final byte[] bytes = string.getBytes(UTF8);
487         //
488         // writeInt(bytes.length + 1);
489         // writeBytes(bytes);
490         // writeByte((byte) 0);
491 
492         final long position = getPosition();
493         writeInt(0); // For size.
494         try {
495             myStringEncoder.encode(string, this);
496         }
497         catch (final IOException cannotHappen) {
498             // We never throw so should not throw from the encoder.
499             throw new IllegalStateException(
500                     "Encoder should not throw when writing to a buffer.");
501         }
502         writeByte((byte) 0);
503 
504         final int size = (int) (getPosition() - position - 4);
505         writeIntAt(position, size);
506 
507     }
508 
509     /**
510      * Writes the complete contents of this byte array output stream to the
511      * specified output stream argument, as if by calling the output stream's
512      * write method using <code>out.write(buf, 0, count)</code>.
513      * 
514      * @param out
515      *            the output stream to which to write the data.
516      * @exception IOException
517      *                if an I/O error occurs.
518      */
519     public void writeTo(final OutputStream out) throws IOException {
520         for (int i = 0; i < myCurrentBufferIndex; ++i) {
521             out.write(myBuffers.get(i), 0, BUFFER_SIZE);
522         }
523         out.write(myCurrentBuffer, 0, myCurrentBufferOffset);
524     }
525 
526     /**
527      * Allocates a new buffer to use.
528      */
529     protected void nextBuffer() {
530         // Need a new buffer.
531         myCurrentBufferIndex += 1;
532 
533         if (myCurrentBufferIndex < myBuffers.size()) {
534             myCurrentBuffer = myBuffers.get(myCurrentBufferIndex);
535         }
536         else {
537             myCurrentBuffer = new byte[BUFFER_SIZE];
538             myBuffers.add(myCurrentBuffer);
539         }
540 
541         myCurrentBufferOffset = 0;
542     }
543 }