Coverage Report - com.allanbank.mongodb.gridfs.GridFs
 
Classes in this File Line Coverage Branch Coverage Complexity
GridFs
97%
244/249
98%
79/80
3.909
 
 1  
 /*
 2  
  * #%L
 3  
  * GridFs.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 
 21  
 package com.allanbank.mongodb.gridfs;
 22  
 
 23  
 import static com.allanbank.mongodb.builder.QueryBuilder.where;
 24  
 import static com.allanbank.mongodb.builder.Sort.asc;
 25  
 
 26  
 import java.io.FileNotFoundException;
 27  
 import java.io.IOException;
 28  
 import java.io.InputStream;
 29  
 import java.io.InterruptedIOException;
 30  
 import java.io.OutputStream;
 31  
 import java.security.MessageDigest;
 32  
 import java.security.NoSuchAlgorithmException;
 33  
 import java.util.ArrayList;
 34  
 import java.util.Arrays;
 35  
 import java.util.HashMap;
 36  
 import java.util.List;
 37  
 import java.util.Map;
 38  
 import java.util.concurrent.ExecutionException;
 39  
 import java.util.concurrent.Future;
 40  
 
 41  
 import com.allanbank.mongodb.Durability;
 42  
 import com.allanbank.mongodb.MongoCollection;
 43  
 import com.allanbank.mongodb.MongoDatabase;
 44  
 import com.allanbank.mongodb.MongoDbException;
 45  
 import com.allanbank.mongodb.MongoDbUri;
 46  
 import com.allanbank.mongodb.MongoFactory;
 47  
 import com.allanbank.mongodb.MongoIterator;
 48  
 import com.allanbank.mongodb.bson.Document;
 49  
 import com.allanbank.mongodb.bson.Element;
 50  
 import com.allanbank.mongodb.bson.NumericElement;
 51  
 import com.allanbank.mongodb.bson.builder.BuilderFactory;
 52  
 import com.allanbank.mongodb.bson.builder.DocumentBuilder;
 53  
 import com.allanbank.mongodb.bson.element.BinaryElement;
 54  
 import com.allanbank.mongodb.bson.element.ObjectId;
 55  
 import com.allanbank.mongodb.bson.element.StringElement;
 56  
 import com.allanbank.mongodb.builder.Find;
 57  
 import com.allanbank.mongodb.builder.Index;
 58  
 import com.allanbank.mongodb.util.IOUtils;
 59  
 
 60  
 /**
 61  
  * GridFs provides an interface for working with a GridFS collection.
 62  
  * <p>
 63  
  * This implementation uses a {@link ObjectId} as the id when writing and stores
 64  
  * the name of the file in the files collection document's "filename" field. To
 65  
  * {@link #unlink(String)} or {@link #read(String, OutputStream)} a file from
 66  
  * the collection the _id field may contain any value but the filename field
 67  
  * must be present.
 68  
  * </p>
 69  
  * 
 70  
  * @api.yes This class is part of the driver's API. Public and protected members
 71  
  *          will be deprecated for at least 1 non-bugfix release (version
 72  
  *          numbers are &lt;major&gt;.&lt;minor&gt;.&lt;bugfix&gt;) before being
 73  
  *          removed or modified.
 74  
  * @copyright 2012-2013, Allanbank Consulting, Inc., All Rights Reserved
 75  
  */
 76  
 public class GridFs {
 77  
 
 78  
     /**
 79  
      * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
 80  
      * chunk's number.
 81  
      */
 82  
     public static final String CHUNK_NUMBER_FIELD = "n";
 83  
 
 84  
     /** The amount of overhead in a chunk document in bytes: {@value} */
 85  
     public static final int CHUNK_OVERHEAD = 62;
 86  
 
 87  
     /**
 88  
      * The field in the {@link #FILES_SUFFIX files} collection containing the
 89  
      * file's chunk size.
 90  
      */
 91  
     public static final String CHUNK_SIZE_FIELD = "chunkSize";
 92  
 
 93  
     /** The suffix for the chunks collection. */
 94  
     public static final String CHUNKS_SUFFIX = ".chunks";
 95  
 
 96  
     /**
 97  
      * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
 98  
      * chunk's data.
 99  
      */
 100  
     public static final String DATA_FIELD = "data";
 101  
 
 102  
     /**
 103  
      * The default chunk size. This is slightly less than 256K to allow for the
 104  
      * {@link #CHUNK_OVERHEAD} when using the power of two allocator.
 105  
      */
 106  
     public static final int DEFAULT_CHUNK_SIZE;
 107  
 
 108  
     /** The suffix for the files collection. */
 109  
     public static final String DEFAULT_ROOT = "fs";
 110  
 
 111  
     /**
 112  
      * The field in the {@link #FILES_SUFFIX files} collection containing the
 113  
      * file's name.
 114  
      */
 115  
     public static final String FILENAME_FIELD = "filename";
 116  
 
 117  
     /**
 118  
      * The field in the {@link #CHUNKS_SUFFIX chunks} collection containing the
 119  
      * chunk's related file id.
 120  
      */
 121  
     public static final String FILES_ID_FIELD = "files_id";
 122  
 
 123  
     /** The suffix for the files collection. */
 124  
     public static final String FILES_SUFFIX = ".files";
 125  
 
 126  
     /** The {@code _id} field name. */
 127  
     public static final String ID_FIELD = "_id";
 128  
 
 129  
     /**
 130  
      * The field in the {@link #FILES_SUFFIX files} collection containing the
 131  
      * file's length.
 132  
      */
 133  
     public static final String LENGTH_FIELD = "length";
 134  
 
 135  
     /**
 136  
      * The field in the {@link #FILES_SUFFIX files} collection containing the
 137  
      * file's MD5.
 138  
      */
 139  
     public static final String MD5_FIELD = "md5";
 140  
 
 141  
     /**
 142  
      * The field in the {@link #FILES_SUFFIX files} collection containing the
 143  
      * file's upload date.
 144  
      */
 145  
     public static final String UPLOAD_DATE_FIELD = "uploadDate";
 146  
 
 147  
     static {
 148  1
         DEFAULT_CHUNK_SIZE = (256 * 1024) - CHUNK_OVERHEAD;
 149  1
     }
 150  
 
 151  
     /** The GridFS chunks collection. */
 152  
     private final MongoCollection myChunksCollection;
 153  
 
 154  
     /** The size for a chunk written. */
 155  45
     private int myChunkSize = DEFAULT_CHUNK_SIZE;
 156  
 
 157  
     /** The GridFS database. */
 158  
     private final MongoDatabase myDatabase;
 159  
 
 160  
     /** The GridFS files collection. */
 161  
     private final MongoCollection myFilesCollection;
 162  
 
 163  
     /** The root name for the GridFS collections. */
 164  
     private final String myRootName;
 165  
 
 166  
     /**
 167  
      * Creates a new GridFs.
 168  
      * <p>
 169  
      * The GridFS objects will be stored in the 'fs' collection.
 170  
      * </p>
 171  
      * 
 172  
      * @param database
 173  
      *            The database containing the GridFS collections.
 174  
      */
 175  
     public GridFs(final MongoDatabase database) {
 176  42
         this(database, DEFAULT_ROOT);
 177  42
     }
 178  
 
 179  
     /**
 180  
      * Creates a new GridFs.
 181  
      * 
 182  
      * 
 183  
      * @param database
 184  
      *            The database containing the GridFS collections.
 185  
      * @param rootName
 186  
      *            The rootName for the collections. The {@link #FILES_SUFFIX}
 187  
      *            and {@link #CHUNKS_SUFFIX} will be appended to create the two
 188  
      *            collection names.
 189  
      */
 190  42
     public GridFs(final MongoDatabase database, final String rootName) {
 191  42
         myRootName = rootName;
 192  42
         myDatabase = database;
 193  42
         myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
 194  42
         myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
 195  42
     }
 196  
 
 197  
     /**
 198  
      * Creates a new GridFs.
 199  
      * 
 200  
      * @param mongoDbUri
 201  
      *            The configuration for the connection to MongoDB expressed as a
 202  
      *            MongoDB URL.
 203  
      * @throws IllegalArgumentException
 204  
      *             If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
 205  
      *             style URL.
 206  
      * 
 207  
      * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
 208  
      *      Connections</a>
 209  
      */
 210  
     public GridFs(final String mongoDbUri) {
 211  3
         this(mongoDbUri, DEFAULT_ROOT);
 212  3
     }
 213  
 
 214  
     /**
 215  
      * Creates a new GridFs.
 216  
      * 
 217  
      * @param mongoDbUri
 218  
      *            The configuration for the connection to MongoDB expressed as a
 219  
      *            MongoDB URL.
 220  
      * @param rootName
 221  
      *            The rootName for the collections. The {@link #FILES_SUFFIX}
 222  
      *            and {@link #CHUNKS_SUFFIX} will be appended to create the two
 223  
      *            collection names.
 224  
      * @throws IllegalArgumentException
 225  
      *             If the <tt>mongoDbUri</tt> is not a properly formated MongoDB
 226  
      *             style URL.
 227  
      * 
 228  
      * @see <a href="http://www.mongodb.org/display/DOCS/Connections"> MongoDB
 229  
      *      Connections</a>
 230  
      */
 231  3
     public GridFs(final String mongoDbUri, final String rootName) {
 232  3
         final MongoDbUri uri = new MongoDbUri(mongoDbUri);
 233  
 
 234  3
         final MongoDatabase database = MongoFactory.createClient(uri)
 235  
                 .getDatabase(uri.getDatabase());
 236  
 
 237  3
         myRootName = rootName;
 238  3
         myDatabase = database;
 239  3
         myFilesCollection = database.getCollection(rootName + FILES_SUFFIX);
 240  3
         myChunksCollection = database.getCollection(rootName + CHUNKS_SUFFIX);
 241  3
     }
 242  
 
 243  
     /**
 244  
      * Creates the following indexes:
 245  
      * <ul>
 246  
      * <li>
 247  
      * Files Collection:
 248  
      * <ul>
 249  
      * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
 250  
      * </ul>
 251  
      * </li>
 252  
      * <li>
 253  
      * Chunks Collection:
 254  
      * <ul>
 255  
      * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
 256  
      * </ul>
 257  
      * </li>
 258  
      * </ul>
 259  
      * If in a non-sharded environment the indexes will be unique.
 260  
      */
 261  
     public void createIndexes() {
 262  
         try {
 263  10
             myFilesCollection.createIndex(true, Index.asc(FILENAME_FIELD),
 264  
                     Index.asc(UPLOAD_DATE_FIELD));
 265  
         }
 266  1
         catch (final MongoDbException error) {
 267  
             // Can't be unique in a sharded environment.
 268  1
             myFilesCollection.createIndex(false, Index.asc(FILENAME_FIELD),
 269  
                     Index.asc(UPLOAD_DATE_FIELD));
 270  9
         }
 271  
 
 272  
         try {
 273  10
             myChunksCollection.createIndex(true, Index.asc(FILES_ID_FIELD),
 274  
                     Index.asc(CHUNK_NUMBER_FIELD));
 275  
         }
 276  1
         catch (final MongoDbException error) {
 277  
             // Can't be unique in a sharded environment.
 278  1
             myChunksCollection.createIndex(false, Index.asc(FILES_ID_FIELD),
 279  
                     Index.asc(CHUNK_NUMBER_FIELD));
 280  9
         }
 281  10
     }
 282  
 
 283  
     /**
 284  
      * Validates and optionally tries to repair the GridFS collections.
 285  
      * <ul>
 286  
      * <li>
 287  
      * Ensure the following indexes exist:
 288  
      * <ul>
 289  
      * <li>
 290  
      * Files Collection:
 291  
      * <ul>
 292  
      * <li><code>{ 'filename' : 1, 'uploadDate' : 1 }</code></li>
 293  
      * </ul>
 294  
      * </li>
 295  
      * <li>
 296  
      * Chunks Collection:
 297  
      * <ul>
 298  
      * <li><code>{ 'files_id' : 1, 'n' : 1 }</code></li>
 299  
      * </ul>
 300  
      * </li>
 301  
      * </ul>
 302  
      * </li>
 303  
      * <li>
 304  
      * Ensure there are no duplicate {@code n} values for the chunks of a file.
 305  
      * If {@code repair} is true then the {@code n} values will be updated to be
 306  
      * sequential based on the ordering <tt>{ 'n' : 1, '_id' 1 }</tt>.</li>
 307  
      * <li>
 308  
      * Validates the MD5 sum for each file via the <a
 309  
      * href="http://docs.mongodb.org/manual/reference/command/filemd5"
 310  
      * >filemd5</a> command.</li>
 311  
      * </ul>
 312  
      * <p>
 313  
      * <b>Warning:</b> This function iterates over every file in the GridFS
 314  
      * collection and can take a considerable amount of time and resources on
 315  
      * the client and the server.
 316  
      * </p>
 317  
      * <p>
 318  
      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
 319  
      * return false positives when used with a sharded cluster when the shard
 320  
      * key for the chunks collection is not one of <code>{files_id:1}</code> or
 321  
      * <code>{files_id:1, n:1}</code>. See <a
 322  
      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
 323  
      * </p>
 324  
      * 
 325  
      * @param repair
 326  
      *            If set to <code>true</code> then the fsck will attempt to
 327  
      *            repair common errors.
 328  
      * @return A map of the file ids to the errors found for the file and the
 329  
      *         repair status. If no errors are found an empty map is returned.
 330  
      * @throws IOException
 331  
      *             On a failure to execute the fsck.
 332  
      * 
 333  
      * @see <a
 334  
      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
 335  
      */
 336  
     public Map<Object, List<String>> fsck(final boolean repair)
 337  
             throws IOException {
 338  
 
 339  8
         final Map<Object, List<String>> faults = new HashMap<Object, List<String>>();
 340  
 
 341  8
         createIndexes();
 342  
 
 343  
         // Use the filemd5 command to locate files to inspect more closely.
 344  8
         final MongoIterator<Document> iter = myFilesCollection.find(Find.ALL);
 345  
         try {
 346  8
             for (final Document fileDoc : iter) {
 347  8
                 final Element id = fileDoc.get(ID_FIELD);
 348  
 
 349  8
                 final DocumentBuilder commandDoc = BuilderFactory.start();
 350  8
                 commandDoc.add(id.withName("filemd5"));
 351  8
                 commandDoc.add("root", myRootName);
 352  
 
 353  8
                 final Document commandResult = myDatabase.runCommand(commandDoc
 354  
                         .build());
 355  8
                 if (!doVerifyFileMd5(faults, fileDoc, commandResult) && repair) {
 356  5
                     doTryAndRepair(fileDoc, faults);
 357  
                 }
 358  8
             }
 359  
         }
 360  
         finally {
 361  8
             iter.close();
 362  8
         }
 363  8
         return faults;
 364  
     }
 365  
 
 366  
     /**
 367  
      * Returns the size for a chunk written.
 368  
      * 
 369  
      * @return The size for a chunk written.
 370  
      */
 371  
     public int getChunkSize() {
 372  7
         return myChunkSize;
 373  
     }
 374  
 
 375  
     /**
 376  
      * Reads a file from the GridFS collections and writes the contents to the
 377  
      * {@code sink}
 378  
      * 
 379  
      * @param id
 380  
      *            The id of the file.
 381  
      * @param sink
 382  
      *            The stream to write the data to. This stream will not be
 383  
      *            closed by this method.
 384  
      * @throws IOException
 385  
      *             On a failure reading the data from MongoDB or writing to the
 386  
      *             {@code sink}.
 387  
      */
 388  
     public void read(final ObjectId id, final OutputStream sink)
 389  
             throws IOException {
 390  
         // Find the document with the specified name.
 391  2
         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
 392  
                 .equals(id));
 393  2
         if (fileDoc == null) {
 394  1
             throw new FileNotFoundException(id.toString());
 395  
         }
 396  
 
 397  1
         doRead(fileDoc, sink);
 398  1
     }
 399  
 
 400  
     /**
 401  
      * Reads a file from the GridFS collections and writes the contents to the
 402  
      * {@code sink}
 403  
      * 
 404  
      * @param name
 405  
      *            The name of the file.
 406  
      * @param sink
 407  
      *            The stream to write the data to. This stream will not be
 408  
      *            closed by this method.
 409  
      * @throws IOException
 410  
      *             On a failure reading the data from MongoDB or writing to the
 411  
      *             {@code sink}.
 412  
      */
 413  
     public void read(final String name, final OutputStream sink)
 414  
             throws IOException {
 415  
 
 416  
         // Find the document with the specified name.
 417  11
         final Document fileDoc = myFilesCollection
 418  
                 .findOne(where(FILENAME_FIELD).equals(name));
 419  11
         if (fileDoc == null) {
 420  1
             throw new FileNotFoundException(name);
 421  
         }
 422  
 
 423  10
         doRead(fileDoc, sink);
 424  5
     }
 425  
 
 426  
     /**
 427  
      * Sets the value of size for a chunk written.
 428  
      * 
 429  
      * @param chunkSize
 430  
      *            The new value for the size for a chunk written.
 431  
      */
 432  
     public void setChunkSize(final int chunkSize) {
 433  1
         myChunkSize = chunkSize;
 434  1
     }
 435  
 
 436  
     /**
 437  
      * Unlinks (deletes) the file from the GridFS collections.
 438  
      * 
 439  
      * @param id
 440  
      *            The id of the file to be deleted.
 441  
      * @return True if a file was deleted, false otherwise.
 442  
      * @throws IOException
 443  
      *             On a failure to delete the file.
 444  
      */
 445  
     public boolean unlink(final ObjectId id) throws IOException {
 446  
 
 447  
         // Find the document with the specified name.
 448  2
         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
 449  
                 .equals(id));
 450  2
         if (fileDoc == null) {
 451  1
             return false;
 452  
         }
 453  
 
 454  1
         return doUnlink(fileDoc);
 455  
     }
 456  
 
 457  
     /**
 458  
      * Unlinks (deletes) the file from the GridFS collections.
 459  
      * 
 460  
      * @param name
 461  
      *            The name of the file to be deleted.
 462  
      * @return True if a file was deleted, false otherwise.
 463  
      * @throws IOException
 464  
      *             On a failure to validate the file.
 465  
      */
 466  
     public boolean unlink(final String name) throws IOException {
 467  
 
 468  
         // Find the document with the specified name.
 469  6
         final Document fileDoc = myFilesCollection
 470  
                 .findOne(where(FILENAME_FIELD).equals(name));
 471  6
         if (fileDoc == null) {
 472  1
             return false;
 473  
         }
 474  
 
 475  5
         return doUnlink(fileDoc);
 476  
     }
 477  
 
 478  
     /**
 479  
      * Validates the file from the GridFS collections using the {@code filemd5}
 480  
      * command.
 481  
      * <p>
 482  
      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
 483  
      * always return <code>false</code> when used with a sharded cluster when
 484  
      * the shard key for the chunks collection is not one of
 485  
      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
 486  
      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
 487  
      * </p>
 488  
      * 
 489  
      * @param id
 490  
      *            The id of the file to be validate.
 491  
      * @return True if a file was validated (md5 hash matches), false otherwise.
 492  
      * @throws IOException
 493  
      *             On a failure to validate the file.
 494  
      * 
 495  
      * @see <a
 496  
      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
 497  
      */
 498  
     public boolean validate(final ObjectId id) throws IOException {
 499  
 
 500  
         // Find the document with the specified name.
 501  2
         final Document fileDoc = myFilesCollection.findOne(where(ID_FIELD)
 502  
                 .equals(id));
 503  2
         if (fileDoc == null) {
 504  1
             throw new FileNotFoundException(id.toString());
 505  
         }
 506  
 
 507  1
         return doValidate(fileDoc);
 508  
     }
 509  
 
 510  
     /**
 511  
      * Validates the file from the GridFS collections using the {@code filemd5}
 512  
      * command.
 513  
      * <p>
 514  
      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
 515  
      * always return <code>false</code> when used with a sharded cluster when
 516  
      * the shard key for the chunks collection is not one of
 517  
      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
 518  
      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
 519  
      * </p>
 520  
      * 
 521  
      * @param name
 522  
      *            The name of the file to be validate.
 523  
      * @return True if a file was validated (md5 hash matches), false otherwise.
 524  
      * @throws IOException
 525  
      *             On a failure to validate the file.
 526  
      * 
 527  
      * @see <a
 528  
      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
 529  
      */
 530  
     public boolean validate(final String name) throws IOException {
 531  
 
 532  
         // Find the document with the specified name.
 533  4
         final Document fileDoc = myFilesCollection
 534  
                 .findOne(where(FILENAME_FIELD).equals(name));
 535  4
         if (fileDoc == null) {
 536  1
             throw new FileNotFoundException(name);
 537  
         }
 538  
 
 539  3
         return doValidate(fileDoc);
 540  
     }
 541  
 
 542  
     /**
 543  
      * Attempts to write a file into the GridFS collections using the specified
 544  
      * name for the file and deriving the chunks from the data read from the
 545  
      * <tt>source</tt>.
 546  
      * 
 547  
      * @param name
 548  
      *            The name of the file being written.
 549  
      * @param source
 550  
      *            The source of the bits in the file. This stream will not be
 551  
      *            closed.
 552  
      * @return The {@link ObjectId} associted with the file.
 553  
      * @throws IOException
 554  
      *             On a failure writing the documents or reading the file
 555  
      *             contents. In the case of a failure an attempt is made to
 556  
      *             remove the documents written to the collections.
 557  
      */
 558  
     public ObjectId write(final String name, final InputStream source)
 559  
             throws IOException {
 560  4
         final ObjectId id = new ObjectId();
 561  4
         boolean failed = false;
 562  
         try {
 563  4
             final byte[] buffer = new byte[myChunkSize];
 564  4
             final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
 565  
 
 566  4
             final List<Future<Integer>> results = new ArrayList<Future<Integer>>();
 567  4
             final DocumentBuilder doc = BuilderFactory.start();
 568  4
             int n = 0;
 569  4
             long length = 0;
 570  4
             int read = readFully(source, buffer);
 571  9
             while (read > 0) {
 572  
 
 573  5
                 final ObjectId chunkId = new ObjectId();
 574  
 
 575  5
                 doc.reset();
 576  5
                 doc.addObjectId(ID_FIELD, chunkId);
 577  5
                 doc.addObjectId(FILES_ID_FIELD, id);
 578  5
                 doc.addInteger(CHUNK_NUMBER_FIELD, n);
 579  
 
 580  5
                 final byte[] data = (read == buffer.length) ? buffer : Arrays
 581  
                         .copyOf(buffer, read);
 582  5
                 md5Digest.update(data);
 583  5
                 doc.addBinary(DATA_FIELD, data);
 584  
 
 585  5
                 results.add(myChunksCollection.insertAsync(doc.build()));
 586  
 
 587  5
                 length += data.length;
 588  5
                 read = readFully(source, buffer);
 589  5
                 n += 1;
 590  5
             }
 591  
 
 592  4
             doc.reset();
 593  4
             doc.addObjectId(ID_FIELD, id);
 594  4
             doc.addString(FILENAME_FIELD, name);
 595  4
             doc.addTimestamp(UPLOAD_DATE_FIELD, System.currentTimeMillis());
 596  4
             doc.addInteger(CHUNK_SIZE_FIELD, buffer.length);
 597  4
             doc.addLong(LENGTH_FIELD, length);
 598  4
             doc.addString(MD5_FIELD, IOUtils.toHex(md5Digest.digest()));
 599  
 
 600  4
             results.add(myFilesCollection.insertAsync(doc.build()));
 601  
 
 602  
             // Make sure everything made it to the server.
 603  4
             for (final Future<Integer> f : results) {
 604  8
                 f.get();
 605  6
             }
 606  
         }
 607  0
         catch (final NoSuchAlgorithmException e) {
 608  0
             failed = true;
 609  0
             throw new IOException(e);
 610  
         }
 611  1
         catch (final InterruptedException e) {
 612  1
             failed = true;
 613  1
             final InterruptedIOException error = new InterruptedIOException(
 614  
                     e.getMessage());
 615  1
             error.initCause(e);
 616  1
             throw error;
 617  
         }
 618  1
         catch (final ExecutionException e) {
 619  1
             failed = true;
 620  1
             throw new IOException(e.getCause());
 621  
         }
 622  
         finally {
 623  4
             if (failed) {
 624  2
                 myFilesCollection.delete(where(ID_FIELD).equals(id));
 625  2
                 myChunksCollection.delete(where(FILES_ID_FIELD).equals(id));
 626  
             }
 627  
         }
 628  
 
 629  2
         return id;
 630  
     }
 631  
 
 632  
     /**
 633  
      * Adds a fault message to the faults map.
 634  
      * 
 635  
      * @param faults
 636  
      *            The map of file ids to the error messages.
 637  
      * @param idObj
 638  
      *            The id for the file.
 639  
      * @param message
 640  
      *            The message to add.
 641  
      */
 642  
     protected void doAddFault(final Map<Object, List<String>> faults,
 643  
             final Element idObj, final String message) {
 644  14
         List<String> docFaults = faults.get(idObj.getValueAsObject());
 645  14
         if (docFaults == null) {
 646  8
             docFaults = new ArrayList<String>();
 647  8
             faults.put(idObj.getValueAsObject(), docFaults);
 648  
         }
 649  14
         docFaults.add(message);
 650  14
     }
 651  
 
 652  
     /**
 653  
      * Reads a file from the GridFS collections and writes the contents to the
 654  
      * {@code sink}
 655  
      * 
 656  
      * @param fileDoc
 657  
      *            The document for the file.
 658  
      * @param sink
 659  
      *            The stream to write the data to. This stream will not be
 660  
      *            closed by this method.
 661  
      * @throws IOException
 662  
      *             On a failure reading the data from MongoDB or writing to the
 663  
      *             {@code sink}.
 664  
      */
 665  
     protected void doRead(final Document fileDoc, final OutputStream sink)
 666  
             throws IOException {
 667  
 
 668  11
         final Element id = fileDoc.get(ID_FIELD);
 669  
 
 670  11
         long length = -1;
 671  11
         final NumericElement lengthElement = fileDoc.get(NumericElement.class,
 672  
                 LENGTH_FIELD);
 673  11
         if (lengthElement != null) {
 674  10
             length = lengthElement.getLongValue();
 675  
         }
 676  
 
 677  11
         long chunkSize = -1;
 678  11
         final NumericElement chunkSizeElement = fileDoc.get(
 679  
                 NumericElement.class, CHUNK_SIZE_FIELD);
 680  11
         if (chunkSizeElement != null) {
 681  9
             chunkSize = chunkSizeElement.getLongValue();
 682  
         }
 683  
 
 684  11
         long numberChunks = -1;
 685  11
         if ((0 <= length) && (0 < chunkSize)) {
 686  9
             numberChunks = (long) Math.ceil((double) length
 687  
                     / (double) chunkSize);
 688  
         }
 689  
 
 690  11
         final Element queryElement = id.withName(FILES_ID_FIELD);
 691  11
         final DocumentBuilder queryDoc = BuilderFactory.start();
 692  11
         queryDoc.add(queryElement);
 693  
 
 694  11
         final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
 695  11
         findBuilder.setSort(asc(CHUNK_NUMBER_FIELD));
 696  
 
 697  
         // Small batch size since the docs are big and we can do parallel I/O.
 698  11
         findBuilder.setBatchSize(2);
 699  
 
 700  11
         long expectedChunk = 0;
 701  11
         long totalSize = 0;
 702  11
         final MongoIterator<Document> iter = myChunksCollection
 703  
                 .find(findBuilder.build());
 704  
         try {
 705  11
             for (final Document chunk : iter) {
 706  
 
 707  9
                 final NumericElement n = chunk.get(NumericElement.class,
 708  
                         CHUNK_NUMBER_FIELD);
 709  9
                 final BinaryElement bytes = chunk.get(BinaryElement.class,
 710  
                         DATA_FIELD);
 711  
 
 712  9
                 if (n == null) {
 713  1
                     throw new IOException("Missing chunk number '"
 714  
                             + (expectedChunk + 1) + "' of '" + numberChunks
 715  
                             + "'.");
 716  
                 }
 717  8
                 else if (n.getLongValue() != expectedChunk) {
 718  1
                     throw new IOException("Skipped chunk '"
 719  
                             + (expectedChunk + 1) + "', retreived '"
 720  
                             + n.getLongValue() + "' of '" + numberChunks + "'.");
 721  
                 }
 722  7
                 else if (bytes == null) {
 723  1
                     throw new IOException("Missing bytes in chunk '"
 724  
                             + (expectedChunk + 1) + "' of '" + numberChunks
 725  
                             + "'.");
 726  
                 }
 727  
                 else {
 728  
 
 729  6
                     final byte[] buffer = bytes.getValue();
 730  
 
 731  6
                     sink.write(buffer);
 732  6
                     expectedChunk += 1;
 733  6
                     totalSize += buffer.length;
 734  
                 }
 735  6
             }
 736  
         }
 737  
         finally {
 738  11
             iter.close();
 739  11
             sink.flush();
 740  8
         }
 741  
 
 742  8
         if ((0 <= numberChunks) && (expectedChunk < numberChunks)) {
 743  1
             throw new IOException("Missing chunks after '" + expectedChunk
 744  
                     + "' of '" + numberChunks + "'.");
 745  
         }
 746  7
         if ((0 <= length) && (totalSize != length)) {
 747  1
             throw new IOException("File size mismatch. Expected '" + length
 748  
                     + "' but only read '" + totalSize + "' bytes.");
 749  
         }
 750  6
     }
 751  
 
 752  
     /**
 753  
      * Tries to repair the file.
 754  
      * <p>
 755  
      * Currently the only strategy is to reorder the chunk's into _id order. The
 756  
      * operation verifies that the reorder fixes the file prior to modifying
 757  
      * anything. it also verifies that the reordering worked after reordering
 758  
      * the chunks.
 759  
      * 
 760  
      * @param fileDoc
 761  
      *            The document representing the file.
 762  
      * @param faults
 763  
      *            The map to update with the status of the repair.
 764  
      */
 765  
     protected void doTryAndRepair(final Document fileDoc,
 766  
             final Map<Object, List<String>> faults) {
 767  
         // First see if the MD5 for the file's chunks in _id order returns the
 768  
         // right results.
 769  5
         final List<Element> chunkIds = new ArrayList<Element>();
 770  
 
 771  5
         final Element id = fileDoc.get(ID_FIELD);
 772  5
         final Element md5 = fileDoc.get(MD5_FIELD);
 773  5
         final Element queryElement = id.withName(FILES_ID_FIELD);
 774  5
         final DocumentBuilder queryDoc = BuilderFactory.start().add(
 775  
                 queryElement);
 776  
 
 777  5
         final Find.Builder findBuilder = new Find.Builder(queryDoc.build());
 778  5
         findBuilder.setSort(asc(ID_FIELD));
 779  
 
 780  
         // Small batch size since the docs are big and we can do parallel I/O.
 781  5
         findBuilder.setBatchSize(2);
 782  
 
 783  5
         MongoIterator<Document> iter = null;
 784  
         try {
 785  5
             final MessageDigest md5Digest = MessageDigest.getInstance("MD5");
 786  5
             iter = myChunksCollection.find(findBuilder);
 787  4
             for (final Document chunkDoc : iter) {
 788  
 
 789  4
                 chunkIds.add(chunkDoc.get(ID_FIELD));
 790  
 
 791  4
                 final BinaryElement chunk = chunkDoc.get(BinaryElement.class,
 792  
                         DATA_FIELD);
 793  4
                 if (chunk != null) {
 794  3
                     md5Digest.update(chunk.getValue());
 795  
                 }
 796  4
             }
 797  
 
 798  4
             final String digest = IOUtils.toHex(md5Digest.digest());
 799  4
             final StringElement computed = new StringElement(MD5_FIELD, digest);
 800  4
             if (computed.equals(md5)) {
 801  
                 // Update the 'n' fields for each chunk to be in the right
 802  
                 // order.
 803  2
                 int n = 0;
 804  2
                 for (final Element idElement : chunkIds) {
 805  2
                     final DocumentBuilder query = BuilderFactory.start();
 806  2
                     query.add(idElement);
 807  2
                     query.add(queryElement); // Direct to the right shard.
 808  
 
 809  2
                     final DocumentBuilder update = BuilderFactory.start();
 810  2
                     update.push("$set").add(CHUNK_NUMBER_FIELD, n);
 811  
 
 812  
                     // Use a multi-update to ensure the write happens when a
 813  
                     // files chunks are across shards.
 814  2
                     myChunksCollection.update(query.build(), update.build(),
 815  
                             true /* =multi */, false, Durability.ACK);
 816  
 
 817  2
                     n += 1;
 818  2
                 }
 819  
 
 820  2
                 if (doValidate(fileDoc)) {
 821  1
                     doAddFault(faults, id, "File repaired.");
 822  
 
 823  
                 }
 824  
                 else {
 825  1
                     doAddFault(faults, id,
 826  
                             "Repair failed: Chunks reordered but sill not validating.");
 827  
                 }
 828  2
             }
 829  
             else {
 830  2
                 doAddFault(faults, id,
 831  
                         "Repair failed: Could not determine correct chunk order.");
 832  
             }
 833  
         }
 834  0
         catch (final NoSuchAlgorithmException e) {
 835  0
             doAddFault(faults, id,
 836  
                     "Repair failed: Could not compute the MD5 for the file: "
 837  
                             + e.getMessage());
 838  
         }
 839  1
         catch (final RuntimeException e) {
 840  1
             doAddFault(faults, id, "Potential Repair Failure: Runtime error: "
 841  
                     + e.getMessage());
 842  
         }
 843  
         finally {
 844  5
             IOUtils.close(iter);
 845  5
         }
 846  5
     }
 847  
 
 848  
     /**
 849  
      * Unlinks (deletes) the file from the GridFS collections.
 850  
      * 
 851  
      * @param fileDoc
 852  
      *            The document for the file to delete.
 853  
      * @return True if a file was deleted, false otherwise.
 854  
      * @throws IOException
 855  
      *             On a failure to delete the file.
 856  
      */
 857  
     protected boolean doUnlink(final Document fileDoc) throws IOException {
 858  6
         final Element id = fileDoc.get(ID_FIELD);
 859  
 
 860  6
         final DocumentBuilder queryDoc = BuilderFactory.start();
 861  6
         queryDoc.add(id.withName(FILES_ID_FIELD));
 862  6
         final Future<Long> cFuture = myChunksCollection.deleteAsync(queryDoc);
 863  
 
 864  6
         queryDoc.reset();
 865  6
         queryDoc.add(id);
 866  6
         final Future<Long> fFuture = myFilesCollection.deleteAsync(queryDoc);
 867  
 
 868  
         try {
 869  6
             return (cFuture.get().longValue() >= 0)
 870  
                     && (fFuture.get().longValue() > 0);
 871  
         }
 872  1
         catch (final InterruptedException e) {
 873  1
             return false;
 874  
         }
 875  1
         catch (final ExecutionException e) {
 876  1
             return false;
 877  
         }
 878  
     }
 879  
 
 880  
     /**
 881  
      * Validates the file from the GridFS collections using the {@code filemd5}
 882  
      * command.
 883  
      * <p>
 884  
      * <b>Note:</b> Due to a limitation in the MongoDB server this method will
 885  
      * always return <code>false</code> when used with a sharded cluster when
 886  
      * the shard key for the chunks collection is not one of
 887  
      * <code>{files_id:1}</code> or <code>{files_id:1, n:1}</code>. See <a
 888  
      * href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>.
 889  
      * </p>
 890  
      * 
 891  
      * @param fileDoc
 892  
      *            The document for the file to delete.
 893  
      * @return True if a file was deleted, false otherwise.
 894  
      * 
 895  
      * @see <a
 896  
      *      href="https://jira.mongodb.org/browse/SERVER-9888">SERVER-9888</a>
 897  
      */
 898  
     protected boolean doValidate(final Document fileDoc) {
 899  6
         final Element id = fileDoc.get(ID_FIELD);
 900  6
         final Element md5 = fileDoc.get(MD5_FIELD);
 901  
 
 902  6
         final DocumentBuilder commandDoc = BuilderFactory.start();
 903  6
         commandDoc.add(id.withName("filemd5"));
 904  6
         commandDoc.add("root", myRootName);
 905  6
         final Document result = myDatabase.runCommand(commandDoc.build());
 906  
 
 907  6
         return (md5 != null) && md5.equals(result.findFirst(MD5_FIELD));
 908  
     }
 909  
 
 910  
     /**
 911  
      * Verifies the MD5 result for the filemd5 command.
 912  
      * 
 913  
      * @param faults
 914  
      *            The faults for to update if the verify fails.
 915  
      * @param fileDoc
 916  
      *            The document representing the file.
 917  
      * @param cmdResult
 918  
      *            The document returned from the 'filemd5' command.
 919  
      * @return True if the file was successful.
 920  
      */
 921  
     protected boolean doVerifyFileMd5(final Map<Object, List<String>> faults,
 922  
             final Document fileDoc, final Document cmdResult) {
 923  8
         boolean ok = false;
 924  8
         final Element idElement = fileDoc.get(ID_FIELD);
 925  
 
 926  8
         final Element md5 = fileDoc.get(MD5_FIELD);
 927  8
         final Element commandMd5 = cmdResult.findFirst(MD5_FIELD);
 928  
 
 929  8
         ok = (md5 != null) && md5.equals(commandMd5);
 930  8
         if (!ok) {
 931  7
             doAddFault(faults, idElement,
 932  
                     "MD5 sums do not match. File document contains '" + md5
 933  
                             + "' and the filemd5 command produced '"
 934  
                             + commandMd5 + "'.");
 935  
         }
 936  
 
 937  8
         return ok;
 938  
     }
 939  
 
 940  
     /**
 941  
      * Read the full contents of the stream until an EOF into the buffer.
 942  
      * 
 943  
      * @param source
 944  
      *            The source if bytes to read.
 945  
      * @param buffer
 946  
      *            The buffer to read into.
 947  
      * @return The number of bytes read. If less than <tt>buffer.length</tt>
 948  
      *         then the stream reach the end-of-file.
 949  
      * @throws IOException
 950  
      *             On a failure reading from the stream.
 951  
      */
 952  
     private int readFully(final InputStream source, final byte[] buffer)
 953  
             throws IOException {
 954  
 
 955  9
         int offset = 0;
 956  
 
 957  
         while (true) {
 958  11
             final int read = source
 959  
                     .read(buffer, offset, buffer.length - offset);
 960  11
             if (read < 0) {
 961  6
                 return offset;
 962  
             }
 963  
 
 964  5
             offset += read;
 965  
 
 966  5
             if (offset == buffer.length) {
 967  3
                 return offset;
 968  
             }
 969  2
         }
 970  
     }
 971  
 }