View Javadoc
1   /*
2    * #%L
3    * BatchedNativeWriteCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.callback;
22  
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.IdentityHashMap;
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.Map;
29  
30  import com.allanbank.mongodb.Callback;
31  import com.allanbank.mongodb.Durability;
32  import com.allanbank.mongodb.builder.BatchedWrite;
33  import com.allanbank.mongodb.builder.BatchedWriteMode;
34  import com.allanbank.mongodb.builder.write.DeleteOperation;
35  import com.allanbank.mongodb.builder.write.InsertOperation;
36  import com.allanbank.mongodb.builder.write.UpdateOperation;
37  import com.allanbank.mongodb.builder.write.WriteOperation;
38  import com.allanbank.mongodb.client.AbstractMongoOperations;
39  import com.allanbank.mongodb.error.BatchedWriteException;
40  
41  /**
42   * BatchedWriteCallback provides the global callback for the batched writes when
43   * the server does not support the write commands. This callback will issue the
44   * writes using the original wire protocol.
45   * 
46   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
47   *         mutated in incompatible ways between any two releases of the driver.
48   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
49   */
50  public class BatchedNativeWriteCallback extends ReplyLongCallback {
51  
52      /** The collection to send individual operations with. */
53      private final AbstractMongoOperations myCollection;
54  
55      /** The list of write operations which failed. */
56      private final Map<WriteOperation, Throwable> myFailedOperations;
57  
58      /** The count of finished bundles or operations. */
59      private int myFinished;
60  
61      /** The result. */
62      private long myN = 0;
63  
64      /** The list of write operations to send. */
65      private final List<WriteOperation> myOperations;
66  
67      /** The list of write operations waiting to be sent to the server. */
68      private final List<WriteOperation> myPendingOperations;
69  
70      /** The original write operation. */
71      private final BatchedWrite myWrite;
72  
73      /**
74       * Creates a new BatchedWriteCallback.
75       * 
76       * @param results
77       *            The callback for the final results.
78       * @param write
79       *            The original write.
80       * @param collection
81       *            The collection for sending the operations.
82       * @param operations
83       *            The operations to send.
84       */
85      public BatchedNativeWriteCallback(final Callback<Long> results,
86              final BatchedWrite write, final AbstractMongoOperations collection,
87              final List<WriteOperation> operations) {
88          super(results);
89  
90          myWrite = write;
91          myCollection = collection;
92          myOperations = Collections
93                  .unmodifiableList(new ArrayList<WriteOperation>(operations));
94  
95          myPendingOperations = new LinkedList<WriteOperation>(myOperations);
96  
97          myFinished = 0;
98          myN = 0;
99  
100         myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
101     }
102 
103     /**
104      * Sends the next set of operations to the server.
105      */
106     public void send() {
107 
108         List<WriteOperation> toSendOperations;
109         Durability durability = null;
110 
111         synchronized (this) {
112             List<WriteOperation> toSend = myPendingOperations;
113             if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
114                 toSend = myPendingOperations.subList(0, 1);
115             }
116 
117             durability = myWrite.getDurability();
118             if ((durability == null) || (durability == Durability.NONE)) {
119                 durability = Durability.ACK;
120             }
121 
122             // Clear toSend before sending so the callbacks see the right
123             // state for the operations.
124             toSendOperations = new ArrayList<WriteOperation>(toSend);
125             toSend.clear();
126         } // Release lock.
127 
128         // Release the lock before sending to avoid deadlock in processing
129         // replies.
130 
131         for (final WriteOperation operation : toSendOperations) {
132             switch (operation.getType()) {
133             case INSERT: {
134                 final InsertOperation insert = (InsertOperation) operation;
135                 myCollection.insertAsync(new NativeCallback<Integer>(insert),
136                         true, durability, insert.getDocument());
137                 break;
138             }
139             case UPDATE: {
140                 final UpdateOperation update = (UpdateOperation) operation;
141                 myCollection.updateAsync(new NativeCallback<Long>(operation),
142                         update.getQuery(), update.getUpdate(),
143                         update.isMultiUpdate(), update.isUpsert(), durability);
144                 break;
145             }
146             case DELETE: {
147                 final DeleteOperation delete = (DeleteOperation) operation;
148                 myCollection.deleteAsync(new NativeCallback<Long>(operation),
149                         delete.getQuery(), delete.isSingleDelete(), durability);
150                 break;
151             }
152             }
153         }
154     }
155 
156     /**
157      * Callback for a single write operation sent via the native messages.
158      * 
159      * @param operation
160      *            The write operation.
161      * @param result
162      *            The result of the write operation.
163      */
164     protected synchronized void callback(final WriteOperation operation,
165             final long result) {
166         myN += result;
167         myFinished += 1;
168 
169         if (!myPendingOperations.isEmpty()) {
170             send();
171         }
172         else if (myFinished == myOperations.size()) {
173             publishResults();
174         }
175     }
176 
177     /**
178      * Callback for a single write operation sent via the native messages has
179      * failed.
180      * 
181      * @param operation
182      *            The write operation.
183      * @param thrown
184      *            The error for the operation.
185      */
186     protected synchronized void exception(final WriteOperation operation,
187             final Throwable thrown) {
188         myFinished += 1;
189         myFailedOperations.put(operation, thrown);
190 
191         if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
192             publishResults();
193         }
194         // No need to check if we have to send. Would have already sent all of
195         // the operations if not SERIALIZE_AND_STOP.
196         else if (myFinished == myOperations.size()) {
197             publishResults();
198         }
199 
200     }
201 
202     /**
203      * Publishes the final results.
204      */
205     private void publishResults() {
206         if (myFailedOperations.isEmpty()) {
207             myForwardCallback.callback(Long.valueOf(myN));
208         }
209         else {
210             myForwardCallback.exception(new BatchedWriteException(myWrite, myN,
211                     myPendingOperations, myFailedOperations));
212         }
213     }
214 
215     /**
216      * NativeCallback provides the callback for a single operation within a
217      * batched write. This callback is used when the batched write commands are
218      * not supported and the driver falls back to using the native insert,
219      * update, and delete messages.
220      * 
221      * @param <T>
222      *            The type for the callback. Expected to be either Integer or
223      *            Long.
224      * 
225      * @api.no This class is <b>NOT</b> part of the drivers API. This class may
226      *         be mutated in incompatible ways between any two releases of the
227      *         driver.
228      * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
229      */
230     /* package */class NativeCallback<T extends Number> implements Callback<T> {
231 
232         /** The operation this callback is waiting for the reply from. */
233         private final WriteOperation myOperation;
234 
235         /**
236          * Creates a new BatchedWriteNativeCallback.
237          * 
238          * @param operation
239          *            The operation this callback is waiting for the reply from.
240          */
241         public NativeCallback(final WriteOperation operation) {
242             myOperation = operation;
243         }
244 
245         /**
246          * {@inheritDoc}
247          * <p>
248          * Overridden to forward the results to the parent callback.
249          * </p>
250          */
251         @Override
252         public void callback(final T result) {
253             BatchedNativeWriteCallback.this.callback(myOperation,
254                     result.longValue());
255         }
256 
257         /**
258          * {@inheritDoc}
259          * <p>
260          * Overridden to forward the error to the parent callback.
261          * </p>
262          */
263         @Override
264         public void exception(final Throwable thrown) {
265             BatchedNativeWriteCallback.this.exception(myOperation, thrown);
266         }
267     }
268 }