View Javadoc
1   /*
2    * #%L
3    * Insert.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.message;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import com.allanbank.mongodb.ReadPreference;
28  import com.allanbank.mongodb.bson.Document;
29  import com.allanbank.mongodb.bson.io.BsonInputStream;
30  import com.allanbank.mongodb.bson.io.BsonOutputStream;
31  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
32  import com.allanbank.mongodb.bson.io.StringEncoder;
33  import com.allanbank.mongodb.client.Message;
34  import com.allanbank.mongodb.client.Operation;
35  import com.allanbank.mongodb.client.VersionRange;
36  import com.allanbank.mongodb.error.DocumentToLargeException;
37  
38  /**
39   * Message to <a href=
40   * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPINSERT"
41   * >insert</a> a set of documents into a collection.
42   * 
43   * <pre>
44   * <code>
45   * struct {
46   *     MsgHeader header;             // standard message header
47   *     int32     flags;              // bit vector - see below
48   *     cstring   fullCollectionName; // "dbname.collectionname"
49   *     document* documents;          // one or more documents to insert into the collection
50   * }
51   * </code>
52   * </pre>
53   * 
54   * 
55   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
56   *         mutated in incompatible ways between any two releases of the driver.
57   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
58   */
59  public class Insert extends AbstractMessage {
60  
61      /** The flag bit to keep inserting documents on an error. */
62      public static final int CONTINUE_ON_ERROR_BIT = 1;
63  
64      /**
65       * If true, then the insert of documents should continue if one document
66       * causes an error.
67       */
68      private final boolean myContinueOnError;
69  
70      /** The documents to be inserted. */
71      private final List<Document> myDocuments;
72  
73      /**
74       * The documents to be inserted. If negative then the size has not been
75       * computed.
76       */
77      private int myDocumentsSize;
78  
79      /**
80       * Creates a new Insert.
81       * 
82       * @param header
83       *            The header proceeding the insert message. This is used to
84       *            locate the end of the insert.
85       * @param in
86       *            The stream to read the insert message from.
87       * @throws IOException
88       *             On a failure reading the insert message.
89       */
90      public Insert(final Header header, final BsonInputStream in)
91              throws IOException {
92  
93          final long position = in.getBytesRead();
94          final long end = (position + header.getLength()) - Header.SIZE;
95  
96          final int flags = in.readInt();
97          init(in.readCString());
98  
99          // Read the documents to the end of the message.
100         myDocuments = new ArrayList<Document>();
101         while (in.getBytesRead() < end) {
102             myDocuments.add(in.readDocument());
103         }
104 
105         myContinueOnError = (flags & CONTINUE_ON_ERROR_BIT) == CONTINUE_ON_ERROR_BIT;
106         myDocumentsSize = -1;
107     }
108 
109     /**
110      * Creates a new Insert.
111      * 
112      * @param databaseName
113      *            The name of the database.
114      * @param collectionName
115      *            The name of the collection.
116      * @param documents
117      *            The documents to be inserted.
118      * @param continueOnError
119      *            If the insert should continue if one of the documents causes
120      *            an error.
121      */
122     public Insert(final String databaseName, final String collectionName,
123             final List<Document> documents, final boolean continueOnError) {
124         this(databaseName, collectionName, documents, continueOnError, null);
125     }
126 
127     /**
128      * Creates a new Insert.
129      * 
130      * @param databaseName
131      *            The name of the database.
132      * @param collectionName
133      *            The name of the collection.
134      * @param documents
135      *            The documents to be inserted.
136      * @param continueOnError
137      *            If the insert should continue if one of the documents causes
138      *            an error.
139      * @param requiredServerVersion
140      *            The required version of the server to support processing the
141      *            message.
142      */
143     public Insert(final String databaseName, final String collectionName,
144             final List<Document> documents, final boolean continueOnError,
145             final VersionRange requiredServerVersion) {
146         super(databaseName, collectionName, ReadPreference.PRIMARY,
147                 requiredServerVersion);
148 
149         myDocuments = new ArrayList<Document>(documents);
150         myContinueOnError = continueOnError;
151         myDocumentsSize = -1;
152     }
153 
154     /**
155      * Determines if the passed object is of this same type as this object and
156      * if so that its fields are equal.
157      * 
158      * @param object
159      *            The object to compare to.
160      * 
161      * @see java.lang.Object#equals(java.lang.Object)
162      */
163     @Override
164     public boolean equals(final Object object) {
165         boolean result = false;
166         if (this == object) {
167             result = true;
168         }
169         else if ((object != null) && (getClass() == object.getClass())) {
170             final Insert other = (Insert) object;
171 
172             result = super.equals(object)
173                     && (myContinueOnError == other.myContinueOnError)
174                     && myDocuments.equals(other.myDocuments);
175         }
176         return result;
177     }
178 
179     /**
180      * Returns the documents to insert.
181      * 
182      * @return The documents to insert.
183      */
184     public List<Document> getDocuments() {
185         return Collections.unmodifiableList(myDocuments);
186     }
187 
188     /**
189      * {@inheritDoc}
190      * <p>
191      * Overridden to return the name of the operation: "INSERT".
192      * </p>
193      */
194     @Override
195     public String getOperationName() {
196         return Operation.INSERT.name();
197     }
198 
199     /**
200      * Computes a reasonable hash code.
201      * 
202      * @return The hash code value.
203      */
204     @Override
205     public int hashCode() {
206         int result = 1;
207         result = (31 * result) + super.hashCode();
208         result = (31 * result) + (myContinueOnError ? 1 : 3);
209         result = (31 * result) + myDocuments.hashCode();
210         return result;
211     }
212 
213     /**
214      * Returns true if the insert should continue with other documents if one of
215      * the document inserts encounters an error.
216      * 
217      * @return True if the insert should continue with other documents if one of
218      *         the document inserts encounters an error.
219      */
220     public boolean isContinueOnError() {
221         return myContinueOnError;
222     }
223 
224     /**
225      * {@inheritDoc}
226      * <p>
227      * Overridden to return the size of the {@link Insert}.
228      * </p>
229      */
230     @Override
231     public int size() {
232 
233         int size = HEADER_SIZE + 6; // See below.
234         // size += 4; // flags
235         size += StringEncoder.utf8Size(myDatabaseName);
236         // size += 1; // StringEncoder.utf8Size(".");
237         size += StringEncoder.utf8Size(myCollectionName);
238         // size += 1; // \0 on the CString.
239         for (final Document document : myDocuments) {
240             size += document.size();
241         }
242 
243         return size;
244     }
245 
246     /**
247      * {@inheritDoc}
248      * <p>
249      * Overridden to output the documents and insert flags.
250      * </p>
251      */
252     @Override
253     public String toString() {
254         return "Insert [myContinueOnError=" + myContinueOnError
255                 + ", myDocuments=" + myDocuments + "]";
256     }
257 
258     /**
259      * {@inheritDoc}
260      * <p>
261      * Overridden to ensure the inserted documents are not too large in
262      * aggregate.
263      * </p>
264      */
265     @Override
266     public void validateSize(final int maxDocumentSize)
267             throws DocumentToLargeException {
268         if (myDocumentsSize < 0) {
269             long size = 0;
270             for (final Document doc : myDocuments) {
271                 size += doc.size();
272             }
273 
274             myDocumentsSize = (int) size;
275         }
276 
277         if (maxDocumentSize < myDocumentsSize) {
278             throw new DocumentToLargeException(myDocumentsSize,
279                     maxDocumentSize, myDocuments.get(0));
280         }
281     }
282 
283     /**
284      * {@inheritDoc}
285      * <p>
286      * Overridden to write the insert message.
287      * </p>
288      * 
289      * @see Message#write(int, BsonOutputStream)
290      */
291     @Override
292     public void write(final int messageId, final BsonOutputStream out)
293             throws IOException {
294         final int flags = computeFlags();
295 
296         int size = HEADER_SIZE;
297         size += 4; // flags
298         size += out.sizeOfCString(myDatabaseName, ".", myCollectionName);
299         for (final Document document : myDocuments) {
300             size += document.size();
301         }
302 
303         writeHeader(out, messageId, 0, Operation.INSERT, size);
304         out.writeInt(flags);
305         out.writeCString(myDatabaseName, ".", myCollectionName);
306         for (final Document document : myDocuments) {
307             out.writeDocument(document);
308         }
309     }
310 
311     /**
312      * {@inheritDoc}
313      * <p>
314      * Overridden to write the insert message.
315      * </p>
316      * 
317      * @see Message#write(int, BsonOutputStream)
318      */
319     @Override
320     public void write(final int messageId, final BufferingBsonOutputStream out)
321             throws IOException {
322         final int flags = computeFlags();
323 
324         final long start = writeHeader(out, messageId, 0, Operation.INSERT);
325         out.writeInt(flags);
326         out.writeCString(myDatabaseName, ".", myCollectionName);
327         for (final Document document : myDocuments) {
328             out.writeDocument(document);
329         }
330         finishHeader(out, start);
331 
332         out.flushBuffer();
333     }
334 
335     /**
336      * Computes the message flags bit field.
337      * 
338      * @return The message flags bit field.
339      */
340     private int computeFlags() {
341         int flags = 0;
342         if (myContinueOnError) {
343             flags += CONTINUE_ON_ERROR_BIT;
344         }
345         return flags;
346     }
347 
348 }