1 /*
2 * #%L
3 * BatchedNativeWriteCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20
21 package com.allanbank.mongodb.client.callback;
22
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.IdentityHashMap;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Map;
29
30 import com.allanbank.mongodb.Callback;
31 import com.allanbank.mongodb.Durability;
32 import com.allanbank.mongodb.builder.BatchedWrite;
33 import com.allanbank.mongodb.builder.BatchedWriteMode;
34 import com.allanbank.mongodb.builder.write.DeleteOperation;
35 import com.allanbank.mongodb.builder.write.InsertOperation;
36 import com.allanbank.mongodb.builder.write.UpdateOperation;
37 import com.allanbank.mongodb.builder.write.WriteOperation;
38 import com.allanbank.mongodb.client.AbstractMongoOperations;
39 import com.allanbank.mongodb.error.BatchedWriteException;
40
41 /**
42 * BatchedWriteCallback provides the global callback for the batched writes when
43 * the server does not support the write commands. This callback will issue the
44 * writes using the original wire protocol.
45 *
46 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
47 * mutated in incompatible ways between any two releases of the driver.
48 * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
49 */
50 public class BatchedNativeWriteCallback extends ReplyLongCallback {
51
52 /** The collection to send individual operations with. */
53 private final AbstractMongoOperations myCollection;
54
55 /** The list of write operations which failed. */
56 private final Map<WriteOperation, Throwable> myFailedOperations;
57
58 /** The count of finished bundles or operations. */
59 private int myFinished;
60
61 /** The result. */
62 private long myN = 0;
63
64 /** The list of write operations to send. */
65 private final List<WriteOperation> myOperations;
66
67 /** The list of write operations waiting to be sent to the server. */
68 private final List<WriteOperation> myPendingOperations;
69
70 /** The original write operation. */
71 private final BatchedWrite myWrite;
72
73 /**
74 * Creates a new BatchedWriteCallback.
75 *
76 * @param results
77 * The callback for the final results.
78 * @param write
79 * The original write.
80 * @param collection
81 * The collection for sending the operations.
82 * @param operations
83 * The operations to send.
84 */
85 public BatchedNativeWriteCallback(final Callback<Long> results,
86 final BatchedWrite write, final AbstractMongoOperations collection,
87 final List<WriteOperation> operations) {
88 super(results);
89
90 myWrite = write;
91 myCollection = collection;
92 myOperations = Collections
93 .unmodifiableList(new ArrayList<WriteOperation>(operations));
94
95 myPendingOperations = new LinkedList<WriteOperation>(myOperations);
96
97 myFinished = 0;
98 myN = 0;
99
100 myFailedOperations = new IdentityHashMap<WriteOperation, Throwable>();
101 }
102
103 /**
104 * Sends the next set of operations to the server.
105 */
106 public void send() {
107
108 List<WriteOperation> toSendOperations;
109 Durability durability = null;
110
111 synchronized (this) {
112 List<WriteOperation> toSend = myPendingOperations;
113 if (BatchedWriteMode.SERIALIZE_AND_STOP.equals(myWrite.getMode())) {
114 toSend = myPendingOperations.subList(0, 1);
115 }
116
117 durability = myWrite.getDurability();
118 if ((durability == null) || (durability == Durability.NONE)) {
119 durability = Durability.ACK;
120 }
121
122 // Clear toSend before sending so the callbacks see the right
123 // state for the operations.
124 toSendOperations = new ArrayList<WriteOperation>(toSend);
125 toSend.clear();
126 } // Release lock.
127
128 // Release the lock before sending to avoid deadlock in processing
129 // replies.
130
131 for (final WriteOperation operation : toSendOperations) {
132 switch (operation.getType()) {
133 case INSERT: {
134 final InsertOperation insert = (InsertOperation) operation;
135 myCollection.insertAsync(new NativeCallback<Integer>(insert),
136 true, durability, insert.getDocument());
137 break;
138 }
139 case UPDATE: {
140 final UpdateOperation update = (UpdateOperation) operation;
141 myCollection.updateAsync(new NativeCallback<Long>(operation),
142 update.getQuery(), update.getUpdate(),
143 update.isMultiUpdate(), update.isUpsert(), durability);
144 break;
145 }
146 case DELETE: {
147 final DeleteOperation delete = (DeleteOperation) operation;
148 myCollection.deleteAsync(new NativeCallback<Long>(operation),
149 delete.getQuery(), delete.isSingleDelete(), durability);
150 break;
151 }
152 }
153 }
154 }
155
156 /**
157 * Callback for a single write operation sent via the native messages.
158 *
159 * @param operation
160 * The write operation.
161 * @param result
162 * The result of the write operation.
163 */
164 protected synchronized void callback(final WriteOperation operation,
165 final long result) {
166 myN += result;
167 myFinished += 1;
168
169 if (!myPendingOperations.isEmpty()) {
170 send();
171 }
172 else if (myFinished == myOperations.size()) {
173 publishResults();
174 }
175 }
176
177 /**
178 * Callback for a single write operation sent via the native messages has
179 * failed.
180 *
181 * @param operation
182 * The write operation.
183 * @param thrown
184 * The error for the operation.
185 */
186 protected synchronized void exception(final WriteOperation operation,
187 final Throwable thrown) {
188 myFinished += 1;
189 myFailedOperations.put(operation, thrown);
190
191 if (myWrite.getMode() == BatchedWriteMode.SERIALIZE_AND_STOP) {
192 publishResults();
193 }
194 // No need to check if we have to send. Would have already sent all of
195 // the operations if not SERIALIZE_AND_STOP.
196 else if (myFinished == myOperations.size()) {
197 publishResults();
198 }
199
200 }
201
202 /**
203 * Publishes the final results.
204 */
205 private void publishResults() {
206 if (myFailedOperations.isEmpty()) {
207 myForwardCallback.callback(Long.valueOf(myN));
208 }
209 else {
210 myForwardCallback.exception(new BatchedWriteException(myWrite, myN,
211 myPendingOperations, myFailedOperations));
212 }
213 }
214
215 /**
216 * NativeCallback provides the callback for a single operation within a
217 * batched write. This callback is used when the batched write commands are
218 * not supported and the driver falls back to using the native insert,
219 * update, and delete messages.
220 *
221 * @param <T>
222 * The type for the callback. Expected to be either Integer or
223 * Long.
224 *
225 * @api.no This class is <b>NOT</b> part of the drivers API. This class may
226 * be mutated in incompatible ways between any two releases of the
227 * driver.
228 * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
229 */
230 /* package */class NativeCallback<T extends Number> implements Callback<T> {
231
232 /** The operation this callback is waiting for the reply from. */
233 private final WriteOperation myOperation;
234
235 /**
236 * Creates a new BatchedWriteNativeCallback.
237 *
238 * @param operation
239 * The operation this callback is waiting for the reply from.
240 */
241 public NativeCallback(final WriteOperation operation) {
242 myOperation = operation;
243 }
244
245 /**
246 * {@inheritDoc}
247 * <p>
248 * Overridden to forward the results to the parent callback.
249 * </p>
250 */
251 @Override
252 public void callback(final T result) {
253 BatchedNativeWriteCallback.this.callback(myOperation,
254 result.longValue());
255 }
256
257 /**
258 * {@inheritDoc}
259 * <p>
260 * Overridden to forward the error to the parent callback.
261 * </p>
262 */
263 @Override
264 public void exception(final Throwable thrown) {
265 BatchedNativeWriteCallback.this.exception(myOperation, thrown);
266 }
267 }
268 }