View Javadoc
1   /*
2    * #%L
3    * GridFs.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.gridfs;
22  
23  import static com.allanbank.mongodb.builder.QueryBuilder.where;
24  import static com.allanbank.mongodb.builder.Sort.asc;
25  
26  import java.io.FileNotFoundException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.OutputStream;
31  import java.security.MessageDigest;
32  import java.security.NoSuchAlgorithmException;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.HashMap;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  
41  import com.allanbank.mongodb.Durability;
42  import com.allanbank.mongodb.MongoCollection;
43  import com.allanbank.mongodb.MongoDatabase;
44  import com.allanbank.mongodb.MongoDbException;
45  import com.allanbank.mongodb.MongoDbUri;
46  import com.allanbank.mongodb.MongoFactory;
47  import com.allanbank.mongodb.MongoIterator;
48  import com.allanbank.mongodb.bson.Document;
49  import com.allanbank.mongodb.bson.Element;
50  import com.allanbank.mongodb.bson.NumericElement;
51  import com.allanbank.mongodb.bson.builder.BuilderFactory;
52  import com.allanbank.mongodb.bson.builder.DocumentBuilder;
53  import com.allanbank.mongodb.bson.element.BinaryElement;
54  import com.allanbank.mongodb.bson.element.ObjectId;
55  import com.allanbank.mongodb.bson.element.StringElement;
56  import com.allanbank.mongodb.builder.Find;
57  import com.allanbank.mongodb.builder.Index;
58  import com.allanbank.mongodb.util.IOUtils;
59  
60  /**
61   * GridFs provides an interface for working with a GridFS collection.
62   * <p>
63   * This implementation uses a {@link ObjectId} as the id when writing and stores
64   * the name of the file in the files collection document's "filename" field. To
65   * {@link #unlink(String)} or {@link #read(String, OutputStream)} a file from
66   * the collection the _id field may contain any value but the filename field
67   * must be present.
68   * </p>
69   * 
70   * @api.yes This class is part of the driver's API. Public and protected members
71   *          will be deprecated for at least 1 non-bugfix release (version
72   *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
73   *          removed or modified.
74   * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
75   */
76  public class GridFs {
77  
78      /**
79       * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
80       * chunk's number.
81       */
82      public static final String CHUNK_NUMBER_FIELD = "n";
83  
84      /** The amount of overhead in a chunk document in bytes: {@value} */
85      public static final int CHUNK_OVERHEAD = 62;
86  
87      /**
88       * The field in the {@link #FILES_SUFFIX files} collection containing the
89       * file's chunk size.
90       */
91      public static final String CHUNK_SIZE_FIELD = "chunkSize";
92  
93      /** The suffix for the chunks collection. */
94      public static final String CHUNKS_SUFFIX = ".chunks";
95  
96      /**
97       * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
98       * chunk's data.
99       */
100     public static final String DATA_FIELD = "data";
101 
102     /**
103      * The default chunk size. This is slightly less than 256K to allow for the
104      * {@link #CHUNK_OVERHEAD} when using the power of two allocator.
105      */
106     public static final int DEFAULT_CHUNK_SIZE;
107 
108     /** The suffix for the files collection. */
109     public static final String DEFAULT_ROOT = "fs";
110 
111     /**
112      * The field in the {@link #FILES_SUFFIX files} collection containing the
113      * file's name.
114      */
115     public static final String FILENAME_FIELD = "filename";
116 
117     /**
118      * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
119      * chunk's related file id.
120      */
121     public static final String FILES_ID_FIELD = "files_id";
122 
123     /** The suffix for the files collection. */
124     public static final String FILES_SUFFIX = ".files";
125 
126     /** The {@code _id} field name. */
127     public static final String ID_FIELD = "_id";
128 
129     /**
130      * The field in the {@link #FILES_SUFFIX files} collection containing the
131      * file's length.
132      */
133     public static final String LENGTH_FIELD = "length";
134 
135     /**
136      * The field in the {@link #FILES_SUFFIX files} collection containing the
137      * file's MD5.
138      */
139     public static final String MD5_FIELD = "md5";
140 
141     /**
142      * The field in the {@link #FILES_SUFFIX files} collection containing the
143      * file's upload date.
144      */
145     public static final String UPLOAD_DATE_FIELD = "uploadDate";
146 
147     static {
148         DEFAULT_CHUNK_SIZE = (256 * 1024) - CHUNK_OVERHEAD;
149     }
150 
151     /** The GridFS chunks collection. */
152     private final MongoCollection myChunksCollection;
153 
154     /** The size for a chunk written. */
155     private int myChunkSize = DEFAULT_CHUNK_SIZE;
156 
157     /** The GridFS database. */
158     private final MongoDatabase myDatabase;
159 
160     /** The GridFS files collection. */
161     private final MongoCollection myFilesCollection;
162 
163     /** The root name for the GridFS collections. */
164     private final String myRootName;
165 
166     /**
167      * Creates a new GridFs.
168      * <p>
169      * The GridFS objects will be stored in the 'fs' collection.
170      * </p>
171      * 
172      * @param database
173      *            The database containing the GridFS collections.
174      */
175     public GridFs(final MongoDatabase database) {
176         this(database, DEFAULT_ROOT);
177     }
178 
179     /**
180      * Creates a new GridFs.
181      * 
182      * 
183      * @param database
184      *            The database containing the GridFS collections.
185      * @param rootName
186      *            The rootName for the collections. The {@link #FILES_SUFFIX}
187      *            and {@link #CHUNKS_SUFFIX} will be appended to create the two
188      *            collection names.
189      */
190     public GridFs(final MongoDatabase database, final String rootName) {
191         myRootName = rootName;
192         myDatabase = database;
193         myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
194         myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
195     }
196 
197     /**
198      * Creates a new GridFs.
199      * 
200      * @param mongoDbUri
201      *            The configuration for the connection to MongoDB expressed as a
202      *            MongoDB URL.
203      * @throws IllegalArgumentException
204      *             If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
205      *             style URL.
206      * 
207      * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
208      *      Connections</a>
209      */
210     public GridFs(final String mongoDbUri) {
211         this(mongoDbUri, DEFAULT_ROOT);
212     }
213 
214     /**
215      * Creates a new GridFs.
216      * 
217      * @param mongoDbUri
218      *            The configuration for the connection to MongoDB expressed as a
219      *            MongoDB URL.
220      * @param rootName
221      *            The rootName for the collections. The {@link #FILES_SUFFIX}
222      *            and {@link #CHUNKS_SUFFIX} will be appended to create the two
223      *            collection names.
224      * @throws IllegalArgumentException
225      *             If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
226      *             style URL.
227      * 
228      * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
229      *      Connections</a>
230      */
231     public GridFs(final String mongoDbUri, final String rootName) {
232         final MongoDbUri uri = new MongoDbUri(mongoDbUri);
233 
234         final MongoDatabase database = MongoFactory.createClient(uri)
235                 .getDatabase(uri.getDatabase());
236 
237         myRootName = rootName;
238         myDatabase = database;
239         myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
240         myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
241     }
242 
243     /**
244      * Creates the following indexes:
245      * <ul>
246      * <li>
247      * Files Collection:
248      * <ul>
249      * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
250      * </ul>
251      * </li>
252      * <li>
253      * Chunks Collection:
254      * <ul>
255      * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
256      * </ul>
257      * </li>
258      * </ul>
259      * If in a non-sharded environment the indexes will be unique.
260      */
261     public void createIndexes() {
262         try {
263             myFilesCollection.createIndex(true, Index.asc(FILENAME_FIELD),
264                     Index.asc(UPLOAD_DATE_FIELD));
265         }
266         catch (final MongoDbException error) {
267             // Can't be unique in a sharded environment.
268             myFilesCollection.createIndex(false, Index.asc(FILENAME_FIELD),
269                     Index.asc(UPLOAD_DATE_FIELD));
270         }
271 
272         try {
273             myChunksCollection.createIndex(true, Index.asc(FILES_ID_FIELD),
274                     Index.asc(CHUNK_NUMBER_FIELD));
275         }
276         catch (final MongoDbException error) {
277             // Can't be unique in a sharded environment.
278             myChunksCollection.createIndex(false, Index.asc(FILES_ID_FIELD),
279                     Index.asc(CHUNK_NUMBER_FIELD));
280         }
281     }
282 
283     /**
284      * Validates and optionally tries to repair the GridFS collections.
285      * <ul>
286      * <li>
287      * Ensure the following indexes exist:
288      * <ul>
289      * <li>
290      * Files Collection:
291      * <ul>
292      * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
293      * </ul>
294      * </li>
295      * <li>
296      * Chunks Collection:
297      * <ul>
298      * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
299      * </ul>
300      * </li>
301      * </ul>
302      * </li>
303      * <li>
304      * Ensure there are no duplicate {@code n} values for the chunks of a file.
305      * If {@code repair} is true then the {@code n} values will be updated to be
306      * sequential based on the ordering <tt>{ 'n' : 1, '_id' 1 }</tt>.</li>
307      * <li>
308      * Validates the MD5 sum for each file via the <a
309      * href="http://docs.mongodb.org/manual/reference/command/filemd5"
310      * >filemd5</a> command.</li>
311      * </ul>
312      * <p>
313      * <b>Warning:</b> This function iterates over every file in the GridFS
314      * collection and can take a considerable amount of time and resources on
315      * the client and the server.
316      * </p>
317      * <p>
318      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
319      * return false positives when used with a sharded cluster when the shard
320      * key for the chunks collection is not one of <code>{files_id:1}</code> or
321      * <code>{files_id:1, n:1}</code>. See <a
322      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
323      * </p>
324      * 
325      * @param repair
326      *            If set to <code>true</code> then the fsck will attempt to
327      *            repair common errors.
328      * @return A map of the file ids to the errors found for the file and the
329      *         repair status. If no errors are found an empty map is returned.
330      * @throws IOException
331      *             On a failure to execute the fsck.
332      * 
333      * @see <a
334      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
335      */
336     public Map<Object, List<String>> fsck(final boolean repair)
337             throws IOException {
338 
339         final Map<Object, List<String>> faults = new HashMap<Object, List<String>>();
340 
341         createIndexes();
342 
343         // Use the filemd5 command to locate files to inspect more closely.
344         final MongoIterator<Document> iter = myFilesCollection.find(Find.ALL);
345         try {
346             for (final Document fileDoc : iter) {
347                 final Element id = fileDoc.get(ID_FIELD);
348 
349                 final DocumentBuilder commandDoc = BuilderFactory.start();
350                 commandDoc.add(id.withName("filemd5"));
351                 commandDoc.add("root", myRootName);
352 
353                 final Document commandResult = myDatabase.runCommand(commandDoc
354                         .build());
355                 if (!doVerifyFileMd5(faults, fileDoc, commandResult) && repair) {
356                     doTryAndRepair(fileDoc, faults);
357                 }
358             }
359         }
360         finally {
361             iter.close();
362         }
363         return faults;
364     }
365 
366     /**
367      * Returns the size for a chunk written.
368      * 
369      * @return The size for a chunk written.
370      */
371     public int getChunkSize() {
372         return myChunkSize;
373     }
374 
375     /**
376      * Reads a file from the GridFS collections and writes the contents to the
377      * {@code sink}
378      * 
379      * @param id
380      *            The id of the file.
381      * @param sink
382      *            The stream to write the data to. This stream will not be
383      *            closed by this method.
384      * @throws IOException
385      *             On a failure reading the data from MongoDB or writing to the
386      *             {@code sink}.
387      */
388     public void read(final ObjectId id, final OutputStream sink)
389             throws IOException {
390         // Find the document with the specified name.
391         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
392                 .equals(id));
393         if (fileDoc == null) {
394             throw new FileNotFoundException(id.toString());
395         }
396 
397         doRead(fileDoc, sink);
398     }
399 
400     /**
401      * Reads a file from the GridFS collections and writes the contents to the
402      * {@code sink}
403      * 
404      * @param name
405      *            The name of the file.
406      * @param sink
407      *            The stream to write the data to. This stream will not be
408      *            closed by this method.
409      * @throws IOException
410      *             On a failure reading the data from MongoDB or writing to the
411      *             {@code sink}.
412      */
413     public void read(final String name, final OutputStream sink)
414             throws IOException {
415 
416         // Find the document with the specified name.
417         final Document fileDoc = myFilesCollection
418                 .findOne(where(FILENAME_FIELD).equals(name));
419         if (fileDoc == null) {
420             throw new FileNotFoundException(name);
421         }
422 
423         doRead(fileDoc, sink);
424     }
425 
426     /**
427      * Sets the value of size for a chunk written.
428      * 
429      * @param chunkSize
430      *            The new value for the size for a chunk written.
431      */
432     public void setChunkSize(final int chunkSize) {
433         myChunkSize = chunkSize;
434     }
435 
436     /**
437      * Unlinks (deletes) the file from the GridFS collections.
438      * 
439      * @param id
440      *            The id of the file to be deleted.
441      * @return True if a file was deleted, false otherwise.
442      * @throws IOException
443      *             On a failure to delete the file.
444      */
445     public boolean unlink(final ObjectId id) throws IOException {
446 
447         // Find the document with the specified name.
448         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
449                 .equals(id));
450         if (fileDoc == null) {
451             return false;
452         }
453 
454         return doUnlink(fileDoc);
455     }
456 
457     /**
458      * Unlinks (deletes) the file from the GridFS collections.
459      * 
460      * @param name
461      *            The name of the file to be deleted.
462      * @return True if a file was deleted, false otherwise.
463      * @throws IOException
464      *             On a failure to validate the file.
465      */
466     public boolean unlink(final String name) throws IOException {
467 
468         // Find the document with the specified name.
469         final Document fileDoc = myFilesCollection
470                 .findOne(where(FILENAME_FIELD).equals(name));
471         if (fileDoc == null) {
472             return false;
473         }
474 
475         return doUnlink(fileDoc);
476     }
477 
478     /**
479      * Validates the file from the GridFS collections using the {@code filemd5}
480      * command.
481      * <p>
482      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
483      * always return <code>false</code> when used with a sharded cluster when
484      * the shard key for the chunks collection is not one of
485      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
486      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
487      * </p>
488      * 
489      * @param id
490      *            The id of the file to be validate.
491      * @return True if a file was validated (md5 hash matches), false otherwise.
492      * @throws IOException
493      *             On a failure to validate the file.
494      * 
495      * @see <a
496      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
497      */
498     public boolean validate(final ObjectId id) throws IOException {
499 
500         // Find the document with the specified name.
501         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
502                 .equals(id));
503         if (fileDoc == null) {
504             throw new FileNotFoundException(id.toString());
505         }
506 
507         return doValidate(fileDoc);
508     }
509 
510     /**
511      * Validates the file from the GridFS collections using the {@code filemd5}
512      * command.
513      * <p>
514      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
515      * always return <code>false</code> when used with a sharded cluster when
516      * the shard key for the chunks collection is not one of
517      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
518      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
519      * </p>
520      * 
521      * @param name
522      *            The name of the file to be validate.
523      * @return True if a file was validated (md5 hash matches), false otherwise.
524      * @throws IOException
525      *             On a failure to validate the file.
526      * 
527      * @see <a
528      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
529      */
530     public boolean validate(final String name) throws IOException {
531 
532         // Find the document with the specified name.
533         final Document fileDoc = myFilesCollection
534                 .findOne(where(FILENAME_FIELD).equals(name));
535         if (fileDoc == null) {
536             throw new FileNotFoundException(name);
537         }
538 
539         return doValidate(fileDoc);
540     }
541 
542     /**
543      * Attempts to write a file into the GridFS collections using the specified
544      * name for the file and deriving the chunks from the data read from the
545      * <tt>source</tt>.
546      * 
547      * @param name
548      *            The name of the file being written.
549      * @param source
550      *            The source of the bits in the file. This stream will not be
551      *            closed.
552      * @return The {@link ObjectId} associted with the file.
553      * @throws IOException
554      *             On a failure writing the documents or reading the file
555      *             contents. In the case of a failure an attempt is made to
556      *             remove the documents written to the collections.
557      */
558     public ObjectId write(final String name, final InputStream source)
559             throws IOException {
560         final ObjectId id = new ObjectId();
561         boolean failed = false;
562         try {
563             final byte[] buffer = new byte[myChunkSize];
564             final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
565 
566             final List<Future<Integer>> results = new ArrayList<Future<Integer>>();
567             final DocumentBuilder doc = BuilderFactory.start();
568             int n = 0;
569             long length = 0;
570             int read = readFully(source, buffer);
571             while (read > 0) {
572 
573                 final ObjectId chunkId = new ObjectId();
574 
575                 doc.reset();
576                 doc.addObjectId(ID_FIELD, chunkId);
577                 doc.addObjectId(FILES_ID_FIELD, id);
578                 doc.addInteger(CHUNK_NUMBER_FIELD, n);
579 
580                 final byte[] data = (read == buffer.length) ? buffer : Arrays
581                         .copyOf(buffer, read);
582                 md5Digest.update(data);
583                 doc.addBinary(DATA_FIELD, data);
584 
585                 results.add(myChunksCollection.insertAsync(doc.build()));
586 
587                 length += data.length;
588                 read = readFully(source, buffer);
589                 n += 1;
590             }
591 
592             doc.reset();
593             doc.addObjectId(ID_FIELD, id);
594             doc.addString(FILENAME_FIELD, name);
595             doc.addTimestamp(UPLOAD_DATE_FIELD, System.currentTimeMillis());
596             doc.addInteger(CHUNK_SIZE_FIELD, buffer.length);
597             doc.addLong(LENGTH_FIELD, length);
598             doc.addString(MD5_FIELD, IOUtils.toHex(md5Digest.digest()));
599 
600             results.add(myFilesCollection.insertAsync(doc.build()));
601 
602             // Make sure everything made it to the server.
603             for (final Future<Integer> f : results) {
604                 f.get();
605             }
606         }
607         catch (final NoSuchAlgorithmException e) {
608             failed = true;
609             throw new IOException(e);
610         }
611         catch (final InterruptedException e) {
612             failed = true;
613             final InterruptedIOException error = new InterruptedIOException(
614                     e.getMessage());
615             error.initCause(e);
616             throw error;
617         }
618         catch (final ExecutionException e) {
619             failed = true;
620             throw new IOException(e.getCause());
621         }
622         finally {
623             if (failed) {
624                 myFilesCollection.delete(where(ID_FIELD).equals(id));
625                 myChunksCollection.delete(where(FILES_ID_FIELD).equals(id));
626             }
627         }
628 
629         return id;
630     }
631 
632     /**
633      * Adds a fault message to the faults map.
634      * 
635      * @param faults
636      *            The map of file ids to the error messages.
637      * @param idObj
638      *            The id for the file.
639      * @param message
640      *            The message to add.
641      */
642     protected void doAddFault(final Map<Object, List<String>> faults,
643             final Element idObj, final String message) {
644         List<String> docFaults = faults.get(idObj.getValueAsObject());
645         if (docFaults == null) {
646             docFaults = new ArrayList<String>();
647             faults.put(idObj.getValueAsObject(), docFaults);
648         }
649         docFaults.add(message);
650     }
651 
652     /**
653      * Reads a file from the GridFS collections and writes the contents to the
654      * {@code sink}
655      * 
656      * @param fileDoc
657      *            The document for the file.
658      * @param sink
659      *            The stream to write the data to. This stream will not be
660      *            closed by this method.
661      * @throws IOException
662      *             On a failure reading the data from MongoDB or writing to the
663      *             {@code sink}.
664      */
665     protected void doRead(final Document fileDoc, final OutputStream sink)
666             throws IOException {
667 
668         final Element id = fileDoc.get(ID_FIELD);
669 
670         long length = -1;
671         final NumericElement lengthElement = fileDoc.get(NumericElement.class,
672                 LENGTH_FIELD);
673         if (lengthElement != null) {
674             length = lengthElement.getLongValue();
675         }
676 
677         long chunkSize = -1;
678         final NumericElement chunkSizeElement = fileDoc.get(
679                 NumericElement.class, CHUNK_SIZE_FIELD);
680         if (chunkSizeElement != null) {
681             chunkSize = chunkSizeElement.getLongValue();
682         }
683 
684         long numberChunks = -1;
685         if ((0 <= length) && (0 < chunkSize)) {
686             numberChunks = (long) Math.ceil((double) length
687                     / (double) chunkSize);
688         }
689 
690         final Element queryElement = id.withName(FILES_ID_FIELD);
691         final DocumentBuilder queryDoc = BuilderFactory.start();
692         queryDoc.add(queryElement);
693 
694         final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
695         findBuilder.setSort(asc(CHUNK_NUMBER_FIELD));
696 
697         // Small batch size since the docs are big and we can do parallel I/O.
698         findBuilder.setBatchSize(2);
699 
700         long expectedChunk = 0;
701         long totalSize = 0;
702         final MongoIterator<Document> iter = myChunksCollection
703                 .find(findBuilder.build());
704         try {
705             for (final Document chunk : iter) {
706 
707                 final NumericElement n = chunk.get(NumericElement.class,
708                         CHUNK_NUMBER_FIELD);
709                 final BinaryElement bytes = chunk.get(BinaryElement.class,
710                         DATA_FIELD);
711 
712                 if (n == null) {
713                     throw new IOException("Missing chunk number '"
714                             + (expectedChunk + 1) + "' of '" + numberChunks
715                             + "'.");
716                 }
717                 else if (n.getLongValue() != expectedChunk) {
718                     throw new IOException("Skipped chunk '"
719                             + (expectedChunk + 1) + "', retreived '"
720                             + n.getLongValue() + "' of '" + numberChunks + "'.");
721                 }
722                 else if (bytes == null) {
723                     throw new IOException("Missing bytes in chunk '"
724                             + (expectedChunk + 1) + "' of '" + numberChunks
725                             + "'.");
726                 }
727                 else {
728 
729                     final byte[] buffer = bytes.getValue();
730 
731                     sink.write(buffer);
732                     expectedChunk += 1;
733                     totalSize += buffer.length;
734                 }
735             }
736         }
737         finally {
738             iter.close();
739             sink.flush();
740         }
741 
742         if ((0 <= numberChunks) && (expectedChunk < numberChunks)) {
743             throw new IOException("Missing chunks after '" + expectedChunk
744                     + "' of '" + numberChunks + "'.");
745         }
746         if ((0 <= length) && (totalSize != length)) {
747             throw new IOException("File size mismatch. Expected '" + length
748                     + "' but only read '" + totalSize + "' bytes.");
749         }
750     }
751 
752     /**
753      * Tries to repair the file.
754      * <p>
755      * Currently the only strategy is to reorder the chunk's into _id order. The
756      * operation verifies that the reorder fixes the file prior to modifying
757      * anything. it also verifies that the reordering worked after reordering
758      * the chunks.
759      * 
760      * @param fileDoc
761      *            The document representing the file.
762      * @param faults
763      *            The map to update with the status of the repair.
764      */
765     protected void doTryAndRepair(final Document fileDoc,
766             final Map<Object, List<String>> faults) {
767         // First see if the MD5 for the file's chunks in _id order returns the
768         // right results.
769         final List<Element> chunkIds = new ArrayList<Element>();
770 
771         final Element id = fileDoc.get(ID_FIELD);
772         final Element md5 = fileDoc.get(MD5_FIELD);
773         final Element queryElement = id.withName(FILES_ID_FIELD);
774         final DocumentBuilder queryDoc = BuilderFactory.start().add(
775                 queryElement);
776 
777         final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
778         findBuilder.setSort(asc(ID_FIELD));
779 
780         // Small batch size since the docs are big and we can do parallel I/O.
781         findBuilder.setBatchSize(2);
782 
783         MongoIterator<Document> iter = null;
784         try {
785             final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
786             iter = myChunksCollection.find(findBuilder);
787             for (final Document chunkDoc : iter) {
788 
789                 chunkIds.add(chunkDoc.get(ID_FIELD));
790 
791                 final BinaryElement chunk = chunkDoc.get(BinaryElement.class,
792                         DATA_FIELD);
793                 if (chunk != null) {
794                     md5Digest.update(chunk.getValue());
795                 }
796             }
797 
798             final String digest = IOUtils.toHex(md5Digest.digest());
799             final StringElement computed = new StringElement(MD5_FIELD, digest);
800             if (computed.equals(md5)) {
801                 // Update the 'n' fields for each chunk to be in the right
802                 // order.
803                 int n = 0;
804                 for (final Element idElement : chunkIds) {
805                     final DocumentBuilder query = BuilderFactory.start();
806                     query.add(idElement);
807                     query.add(queryElement); // Direct to the right shard.
808 
809                     final DocumentBuilder update = BuilderFactory.start();
810                     update.push("$set").add(CHUNK_NUMBER_FIELD, n);
811 
812                     // Use a multi-update to ensure the write happens when a
813                     // files chunks are across shards.
814                     myChunksCollection.update(query.build(), update.build(),
815                             true /* =multi */, false, Durability.ACK);
816 
817                     n += 1;
818                 }
819 
820                 if (doValidate(fileDoc)) {
821                     doAddFault(faults, id, "File repaired.");
822 
823                 }
824                 else {
825                     doAddFault(faults, id,
826                             "Repair failed: Chunks reordered but sill not validating.");
827                 }
828             }
829             else {
830                 doAddFault(faults, id,
831                         "Repair failed: Could not determine correct chunk order.");
832             }
833         }
834         catch (final NoSuchAlgorithmException e) {
835             doAddFault(faults, id,
836                     "Repair failed: Could not compute the MD5 for the file: "
837                             + e.getMessage());
838         }
839         catch (final RuntimeException e) {
840             doAddFault(faults, id, "Potential Repair Failure: Runtime error: "
841                     + e.getMessage());
842         }
843         finally {
844             IOUtils.close(iter);
845         }
846     }
847 
848     /**
849      * Unlinks (deletes) the file from the GridFS collections.
850      * 
851      * @param fileDoc
852      *            The document for the file to delete.
853      * @return True if a file was deleted, false otherwise.
854      * @throws IOException
855      *             On a failure to delete the file.
856      */
857     protected boolean doUnlink(final Document fileDoc) throws IOException {
858         final Element id = fileDoc.get(ID_FIELD);
859 
860         final DocumentBuilder queryDoc = BuilderFactory.start();
861         queryDoc.add(id.withName(FILES_ID_FIELD));
862         final Future<Long> cFuture = myChunksCollection.deleteAsync(queryDoc);
863 
864         queryDoc.reset();
865         queryDoc.add(id);
866         final Future<Long> fFuture = myFilesCollection.deleteAsync(queryDoc);
867 
868         try {
869             return (cFuture.get().longValue() >= 0)
870                     && (fFuture.get().longValue() > 0);
871         }
872         catch (final InterruptedException e) {
873             return false;
874         }
875         catch (final ExecutionException e) {
876             return false;
877         }
878     }
879 
880     /**
881      * Validates the file from the GridFS collections using the {@code filemd5}
882      * command.
883      * <p>
884      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
885      * always return <code>false</code> when used with a sharded cluster when
886      * the shard key for the chunks collection is not one of
887      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
888      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
889      * </p>
890      * 
891      * @param fileDoc
892      *            The document for the file to delete.
893      * @return True if a file was deleted, false otherwise.
894      * 
895      * @see <a
896      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
897      */
898     protected boolean doValidate(final Document fileDoc) {
899         final Element id = fileDoc.get(ID_FIELD);
900         final Element md5 = fileDoc.get(MD5_FIELD);
901 
902         final DocumentBuilder commandDoc = BuilderFactory.start();
903         commandDoc.add(id.withName("filemd5"));
904         commandDoc.add("root", myRootName);
905         final Document result = myDatabase.runCommand(commandDoc.build());
906 
907         return (md5 != null) && md5.equals(result.findFirst(MD5_FIELD));
908     }
909 
910     /**
911      * Verifies the MD5 result for the filemd5 command.
912      * 
913      * @param faults
914      *            The faults for to update if the verify fails.
915      * @param fileDoc
916      *            The document representing the file.
917      * @param cmdResult
918      *            The document returned from the 'filemd5' command.
919      * @return True if the file was successful.
920      */
921     protected boolean doVerifyFileMd5(final Map<Object, List<String>> faults,
922             final Document fileDoc, final Document cmdResult) {
923         boolean ok = false;
924         final Element idElement = fileDoc.get(ID_FIELD);
925 
926         final Element md5 = fileDoc.get(MD5_FIELD);
927         final Element commandMd5 = cmdResult.findFirst(MD5_FIELD);
928 
929         ok = (md5 != null) && md5.equals(commandMd5);
930         if (!ok) {
931             doAddFault(faults, idElement,
932                     "MD5 sums do not match. File document contains '" + md5
933                             + "' and the filemd5 command produced '"
934                             + commandMd5 + "'.");
935         }
936 
937         return ok;
938     }
939 
940     /**
941      * Read the full contents of the stream until an EOF into the buffer.
942      * 
943      * @param source
944      *            The source if bytes to read.
945      * @param buffer
946      *            The buffer to read into.
947      * @return The number of bytes read. If less than <tt>buffer.length</tt>
948      *         then the stream reach the end-of-file.
949      * @throws IOException
950      *             On a failure reading from the stream.
951      */
952     private int readFully(final InputStream source, final byte[] buffer)
953             throws IOException {
954 
955         int offset = 0;
956 
957         while (true) {
958             final int read = source
959                     .read(buffer, offset, buffer.length - offset);
960             if (read < 0) {
961                 return offset;
962             }
963 
964             offset += read;
965 
966             if (offset == buffer.length) {
967                 return offset;
968             }
969         }
970     }
971 }