View Javadoc
1   /*
2    * #%L
3    * BatchedInsertCountingCallback.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  
21  package com.allanbank.mongodb.client.callback;
22  
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.IdentityHashMap;
26  import java.util.List;
27  import java.util.Map;
28  
29  import com.allanbank.mongodb.Callback;
30  import com.allanbank.mongodb.bson.Document;
31  import com.allanbank.mongodb.bson.builder.BuilderFactory;
32  import com.allanbank.mongodb.builder.BatchedWrite;
33  import com.allanbank.mongodb.builder.write.WriteOperation;
34  import com.allanbank.mongodb.client.message.Reply;
35  import com.allanbank.mongodb.error.BatchedWriteException;
36  
37  /**
38   * BatchedInsertCountingCallback is designed to work with the
39   * {@link BatchedWriteCallback}. This callback can be used as the callback for a
40   * series of individual writes and will coalesce the results into a single
41   * result based on a an expected number of callbacks.
42   * <p>
43   * The class does not track the input {@code n} value and instead always returns
44   * an N value based on the expected count. That limits the utility of this class
45   * to inserts.
46   * </p>
47   * 
48   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
49   *         mutated in incompatible ways between any two releases of the driver.
50   * @copyright 2014, Allanbank Consulting, Inc., All Rights Reserved
51   */
52  public class BatchedInsertCountingCallback implements Callback<Reply> {
53  
54      /** The count of the number of callbacks received so far. */
55      private int myCount = 0;
56  
57      /** The failed operations. */
58      private Map<WriteOperation, Throwable> myErrors;
59  
60      /** The expected number of callbacks. */
61      private final int myExpectedCount;
62  
63      /**
64       * The count of the number of failure ({@link #exception(Throwable)})
65       * callbacks received so far.
66       */
67      private int myFailureCount = 0;
68  
69      /** The callback to notify with the final results once we receive them all. */
70      private final Callback<Reply> myForwardCallback;
71  
72      /** The last batched write that failed. */
73      private BatchedWrite myLastWrite;
74  
75      /** The skipped operations. */
76      private List<WriteOperation> mySkipped;
77  
78      /**
79       * Creates a new CountingCallback.
80       * 
81       * @param forwardCallback
82       *            The callback to notify with the final results once we receive
83       *            them all.
84       * @param expectedCount
85       *            The expected number of callbacks.
86       */
87      public BatchedInsertCountingCallback(final Callback<Reply> forwardCallback,
88              final int expectedCount) {
89          myForwardCallback = forwardCallback;
90          myExpectedCount = expectedCount;
91          myCount = 0;
92      }
93  
94      /**
95       * {@inheritDoc}
96       * <p>
97       * Overridden to increment the count and when the max is reached forward the
98       * final results.
99       * </p>
100      */
101     @Override
102     public void callback(final Reply result) {
103         boolean publish;
104         synchronized (this) {
105             myCount += 1;
106             publish = (myCount == myExpectedCount);
107         }
108 
109         if (publish) {
110             publish();
111         }
112     }
113 
114     /**
115      * {@inheritDoc}
116      * <p>
117      * Overridden to record the exception details.
118      * </p>
119      */
120     @Override
121     public void exception(final Throwable thrown) {
122         boolean publish;
123         synchronized (this) {
124             myFailureCount += 1;
125             myCount += 1;
126             publish = (myCount == myExpectedCount);
127 
128             if (mySkipped == null) {
129                 mySkipped = new ArrayList<WriteOperation>();
130                 myErrors = new IdentityHashMap<WriteOperation, Throwable>();
131             }
132 
133             if (thrown instanceof BatchedWriteException) {
134                 final BatchedWriteException errors = (BatchedWriteException) thrown;
135 
136                 myLastWrite = errors.getWrite();
137                 mySkipped.addAll(errors.getSkipped());
138                 myErrors.putAll(errors.getErrors());
139             }
140         }
141 
142         if (publish) {
143             publish();
144         }
145     }
146 
147     /**
148      * Publishes the final results to {@link #myForwardCallback}.
149      */
150     private void publish() {
151         Reply reply = null;
152         BatchedWriteException error = null;
153         synchronized (this) {
154             if (myFailureCount == 0) {
155                 final Document doc = BuilderFactory.start().add("ok", 1)
156                         .add("n", myExpectedCount).build();
157                 reply = new Reply(0, 0, 0, Collections.singletonList(doc),
158                         false, false, false, false);
159             }
160             else {
161                 error = new BatchedWriteException(myLastWrite,
162                         (myExpectedCount - myFailureCount), mySkipped, myErrors);
163             }
164         }
165 
166         // Reply outside the lock.
167         if (reply != null) {
168             myForwardCallback.callback(reply);
169         }
170         else {
171             myForwardCallback.exception(error);
172         }
173     }
174 
175 }