View Javadoc
1   /*
2    * #%L
3    * Reply.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.message;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import com.allanbank.mongodb.ReadPreference;
27  import com.allanbank.mongodb.bson.Document;
28  import com.allanbank.mongodb.bson.io.BsonInputStream;
29  import com.allanbank.mongodb.bson.io.BsonOutputStream;
30  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
31  import com.allanbank.mongodb.client.Message;
32  import com.allanbank.mongodb.client.Operation;
33  import com.allanbank.mongodb.error.DocumentToLargeException;
34  
35  /**
36   * Message received from the database in <a href=
37   * "http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPREPLY"
38   * >reply</a> to a query.
39   * 
40   * <pre>
41   * <code>
42   * struct {
43   *     MsgHeader header;         // standard message header
44   *     int32     responseFlags;  // bit vector - see details below
45   *     int64     cursorID;       // cursor id if client needs to do get more's
46   *     int32     startingFrom;   // where in the cursor this reply is starting
47   *     int32     numberReturned; // number of documents in the reply
48   *     document* documents;      // documents
49   * }
50   * </code>
51   * </pre>
52   * 
53   * 
54   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
55   *         mutated in incompatible ways between any two releases of the driver.
56   * @copyright 2011-2013, Allanbank Consulting, Inc., All Rights Reserved
57   */
58  public class Reply extends AbstractMessage {
59      /** Bit for the await capable flag. */
60      public static final int AWAIT_CAPABLE_BIT = 8;
61  
62      /** Bit for the cursor not found flag. */
63      public static final int CURSOR_NOT_FOUND_BIT = 1;
64  
65      /** Bit for the query failure flag. */
66      public static final int QUERY_FAILURE_BIT = 2;
67  
68      /** Bit for the shard configuration stale flag. */
69      public static final int SHARD_CONFIG_STALE_BIT = 4;
70  
71      /** Indicates the server is await capable for tailable cursors. */
72      private final boolean myAwaitCapable;
73  
74      /**
75       * The id of the cursor if the user needs to do a get_more to get the
76       * complete results.
77       */
78      private final long myCursorId;
79  
80      /** Indicates that the cursor in the <tt>getmore</tt> command was not found. */
81      private final boolean myCursorNotFound;
82  
83      /** The offset (index) of the first document returned from the cursor. */
84      private final int myCursorOffset;
85  
86      /** Indicates that the query failed. */
87      private final boolean myQueryFailed;
88  
89      /** The id of the request this response is for. */
90      private final int myResponseToId;
91  
92      /** The returned documents. */
93      private final List<Document> myResults;
94  
95      /** Indicates (to a MongoS?) that its shard configuration is stale. */
96      private final boolean myShardConfigStale;
97  
98      /**
99       * Creates a new Reply.
100      * 
101      * @param header
102      *            The header from the reply message.
103      * @param in
104      *            Stream to read the reply message from.
105      * @throws IOException
106      *             On a failure to read the reply.
107      */
108     public Reply(final Header header, final BsonInputStream in)
109             throws IOException {
110         init(".");
111 
112         myResponseToId = header.getResponseId();
113 
114         final int flags = in.readInt();
115         myCursorId = in.readLong();
116         myCursorOffset = in.readInt();
117 
118         final int docCount = in.readInt();
119         myResults = new ArrayList<Document>(docCount);
120         for (int i = 0; i < docCount; ++i) {
121             myResults.add(in.readDocument());
122         }
123 
124         myAwaitCapable = (flags & AWAIT_CAPABLE_BIT) == AWAIT_CAPABLE_BIT;
125         myCursorNotFound = (flags & CURSOR_NOT_FOUND_BIT) == CURSOR_NOT_FOUND_BIT;
126         myQueryFailed = (flags & QUERY_FAILURE_BIT) == QUERY_FAILURE_BIT;
127         myShardConfigStale = (flags & SHARD_CONFIG_STALE_BIT) == SHARD_CONFIG_STALE_BIT;
128     }
129 
130     /**
131      * Creates a new Reply.
132      * 
133      * @param responseToId
134      *            The id of the request this response is for.
135      * @param cursorId
136      *            The id of the cursor if the user needs to do a get_more to get
137      *            the complete results.
138      * @param cursorOffset
139      *            The offset (index) of the first document returned from the
140      *            cursor.
141      * @param results
142      *            The returned documents.
143      * @param awaitCapable
144      *            If true, indicates the server is await capable for tailable
145      *            cursors.
146      * @param cursorNotFound
147      *            If true, indicates that the cursor in the <tt>get_more</tt>
148      *            message was not found.
149      * @param queryFailed
150      *            If true, indicates that the query failed.
151      * @param shardConfigStale
152      *            If true, indicates (to a MongoS?) that its shard configuration
153      *            is stale.
154      * 
155      */
156     public Reply(final int responseToId, final long cursorId,
157             final int cursorOffset, final List<Document> results,
158             final boolean awaitCapable, final boolean cursorNotFound,
159             final boolean queryFailed, final boolean shardConfigStale) {
160         super("", "", ReadPreference.PRIMARY);
161 
162         myResponseToId = responseToId;
163         myCursorId = cursorId;
164         myCursorOffset = cursorOffset;
165         myResults = new ArrayList<Document>(results);
166         myAwaitCapable = awaitCapable;
167         myCursorNotFound = cursorNotFound;
168         myQueryFailed = queryFailed;
169         myShardConfigStale = shardConfigStale;
170     }
171 
172     /**
173      * Determines if the passed object is of this same type as this object and
174      * if so that its fields are equal.
175      * 
176      * @param object
177      *            The object to compare to.
178      * 
179      * @see java.lang.Object#equals(java.lang.Object)
180      */
181     @Override
182     public boolean equals(final Object object) {
183         boolean result = false;
184         if (this == object) {
185             result = true;
186         }
187         else if ((object != null) && (getClass() == object.getClass())) {
188             final Reply other = (Reply) object;
189 
190             // Base class fields are always the same ""."".
191             result = (myAwaitCapable == other.myAwaitCapable)
192                     && (myCursorNotFound == other.myCursorNotFound)
193                     && (myQueryFailed == other.myQueryFailed)
194                     && (myShardConfigStale == other.myShardConfigStale)
195                     && (myResponseToId == other.myResponseToId)
196                     && (myCursorOffset == other.myCursorOffset)
197                     && (myCursorId == other.myCursorId)
198                     && myResults.equals(other.myResults);
199         }
200         return result;
201     }
202 
203     /**
204      * Returns the id of the cursor if the user needs to do a get_more to get
205      * the complete results.
206      * 
207      * @return The id of the cursor if the user needs to do a get_more to get
208      *         the complete results.
209      */
210     public long getCursorId() {
211         return myCursorId;
212     }
213 
214     /**
215      * Returns the offset (index) of the first document returned from the
216      * cursor.
217      * 
218      * @return The offset (index) of the first document returned from the
219      *         cursor.
220      */
221     public int getCursorOffset() {
222         return myCursorOffset;
223     }
224 
225     /**
226      * {@inheritDoc}
227      * <p>
228      * Overridden to return the name of the operation: "REPLY".
229      * </p>
230      */
231     @Override
232     public String getOperationName() {
233         return Operation.REPLY.name();
234     }
235 
236     /**
237      * Returns the id of the request this response is for.
238      * 
239      * @return The id of the request this response is for.
240      */
241     public int getResponseToId() {
242         return myResponseToId;
243     }
244 
245     /**
246      * Returns the query results.
247      * 
248      * @return The query results.
249      */
250     public List<Document> getResults() {
251         return myResults;
252     }
253 
254     /**
255      * Computes a reasonable hash code.
256      * 
257      * @return The hash code value.
258      */
259     @Override
260     public int hashCode() {
261         int result = 1;
262         result = (31 * result) + super.hashCode();
263         result = (31 * result) + (myAwaitCapable ? 1 : 3);
264         result = (31 * result) + (myCursorNotFound ? 1 : 3);
265         result = (31 * result) + (myQueryFailed ? 1 : 3);
266         result = (31 * result) + (myShardConfigStale ? 1 : 3);
267         result = (31 * result) + myResponseToId;
268         result = (31 * result) + myCursorOffset;
269         result = (31 * result) + (int) (myCursorId >> Integer.SIZE);
270         result = (31 * result) + (int) myCursorId;
271         result = (31 * result) + myResults.hashCode();
272         return result;
273     }
274 
275     /**
276      * Returns true if the server is await capable for tailable cursors.
277      * 
278      * @return True if the server is await capable for tailable cursors.
279      */
280     public boolean isAwaitCapable() {
281         return myAwaitCapable;
282     }
283 
284     /**
285      * Returns true if the cursor in the <tt>get_more</tt> message was not
286      * found.
287      * 
288      * @return True if the cursor in the <tt>get_more</tt> message was not
289      *         found.
290      */
291     public boolean isCursorNotFound() {
292         return myCursorNotFound;
293     }
294 
295     /**
296      * Returns true if the query failed.
297      * 
298      * @return True if the query failed.
299      */
300     public boolean isQueryFailed() {
301         return myQueryFailed;
302     }
303 
304     /**
305      * Returns true if the shard configuration is stale.
306      * 
307      * @return True if the shard configuration is stale.
308      */
309     public boolean isShardConfigStale() {
310         return myShardConfigStale;
311     }
312 
313     /**
314      * {@inheritDoc}
315      * <p>
316      * Overridden to return the size of the {@link Query}.
317      * </p>
318      */
319     @Override
320     public int size() {
321 
322         int size = HEADER_SIZE + 20;
323         // size += 4; // flags;
324         // size += 8; // cursorId
325         // size += 4; // cursorOffset
326         // size += 4; // result count.
327         for (final Document result : myResults) {
328             size += result.size();
329         }
330 
331         return size;
332     }
333 
334     /**
335      * {@inheritDoc}
336      * <p>
337      * Overrridden to be a no-op since we normally only receive a reply and
338      * don't care about the size.
339      * </p>
340      */
341     @Override
342     public void validateSize(final int maxDocumentSize)
343             throws DocumentToLargeException {
344         // Can't be too large.
345     }
346 
347     /**
348      * {@inheritDoc}
349      * <p>
350      * Overridden to write the reply message.
351      * </p>
352      * 
353      * @see Message#write(int, BsonOutputStream)
354      */
355     @Override
356     public void write(final int messageId, final BsonOutputStream out)
357             throws IOException {
358         final int flags = computeFlags();
359 
360         int size = HEADER_SIZE;
361         size += 4; // flags;
362         size += 8; // cursorId
363         size += 4; // cursorOffset
364         size += 4; // result count.
365         for (final Document result : myResults) {
366             size += result.size();
367         }
368 
369         writeHeader(out, messageId, myResponseToId, Operation.REPLY, size);
370         out.writeInt(flags);
371         out.writeLong(myCursorId);
372         out.writeInt(myCursorOffset);
373         out.writeInt(myResults.size());
374         for (final Document result : myResults) {
375             out.writeDocument(result);
376         }
377     }
378 
379     /**
380      * {@inheritDoc}
381      * <p>
382      * Overridden to write the reply message.
383      * </p>
384      * 
385      * @see Message#write(int, BsonOutputStream)
386      */
387     @Override
388     public void write(final int messageId, final BufferingBsonOutputStream out)
389             throws IOException {
390         final int flags = computeFlags();
391 
392         final long start = writeHeader(out, messageId, myResponseToId,
393                 Operation.REPLY);
394         out.writeInt(flags);
395         out.writeLong(myCursorId);
396         out.writeInt(myCursorOffset);
397         out.writeInt(myResults.size());
398         for (final Document result : myResults) {
399             out.writeDocument(result);
400         }
401         finishHeader(out, start);
402 
403         out.flushBuffer();
404     }
405 
406     /**
407      * Computes the message flags bit field.
408      * 
409      * @return The message flags bit field.
410      */
411     private int computeFlags() {
412         int flags = 0;
413         if (myAwaitCapable) {
414             flags += AWAIT_CAPABLE_BIT;
415         }
416         if (myCursorNotFound) {
417             flags += CURSOR_NOT_FOUND_BIT;
418         }
419         if (myQueryFailed) {
420             flags += QUERY_FAILURE_BIT;
421         }
422         if (myShardConfigStale) {
423             flags += SHARD_CONFIG_STALE_BIT;
424         }
425         return flags;
426     }
427 
428 }