1 /*
2 * #%L
3 * TwoThreadSocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4 * %%
5 * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6 * %%
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * #L%
19 */
20 package com.allanbank.mongodb.client.connection.socket;
21
22 import java.io.IOException;
23 import java.net.SocketException;
24
25 import com.allanbank.mongodb.MongoClientConfiguration;
26 import com.allanbank.mongodb.MongoDbException;
27 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
28 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
29 import com.allanbank.mongodb.bson.io.StringDecoderCache;
30 import com.allanbank.mongodb.bson.io.StringEncoderCache;
31 import com.allanbank.mongodb.client.Message;
32 import com.allanbank.mongodb.client.callback.AddressAware;
33 import com.allanbank.mongodb.client.callback.ReplyCallback;
34 import com.allanbank.mongodb.client.message.BuildInfo;
35 import com.allanbank.mongodb.client.message.PendingMessage;
36 import com.allanbank.mongodb.client.message.PendingMessageQueue;
37 import com.allanbank.mongodb.client.state.Server;
38 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
39 import com.allanbank.mongodb.util.IOUtils;
40
41 /**
42 * Provides a blocking Socket based connection to a MongoDB server.
43 * <p>
44 * This version uses a pair of threads (1 send and 1 receive) to handle the
45 * messages going to and from MongoDB.
46 * </p>
47 * <p>
48 * This implementation was the default for the driver through the 1.2.3 release.
49 * It is still used by the driver for communication sockets that are not know to
50 * be standard Java Sockets as it performs better when the communication path
51 * does not have built in buffering of messages.
52 * </p>
53 *
54 * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
55 * mutated in incompatible ways between any two releases of the driver.
56 * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
57 */
58 public class TwoThreadSocketConnection extends AbstractSocketConnection {
59
60 /** The writer for BSON documents. */
61 protected final BufferingBsonOutputStream myBsonOut;
62
63 /** The queue of messages to be sent. */
64 protected final PendingMessageQueue myToSendQueue;
65
66 /** The thread receiving replies. */
67 private final Thread myReceiver;
68
69 /** The thread sending messages. */
70 private final Thread mySender;
71
72 /**
73 * Creates a new SocketConnection to a MongoDB server.
74 *
75 * @param server
76 * The MongoDB server to connect to.
77 * @param config
78 * The configuration for the Connection to the MongoDB server.
79 * @throws SocketException
80 * On a failure connecting to the MongoDB server.
81 * @throws IOException
82 * On a failure to read or write data to the MongoDB server.
83 */
84 public TwoThreadSocketConnection(final Server server,
85 final MongoClientConfiguration config) throws SocketException,
86 IOException {
87 this(server, config, new StringEncoderCache(), new StringDecoderCache());
88 }
89
90 /**
91 * Creates a new SocketConnection to a MongoDB server.
92 *
93 * @param server
94 * The MongoDB server to connect to.
95 * @param config
96 * The configuration for the Connection to the MongoDB server.
97 * @param encoderCache
98 * Cache used for encoding strings.
99 * @param decoderCache
100 * Cache used for decoding strings.
101 * @throws SocketException
102 * On a failure connecting to the MongoDB server.
103 * @throws IOException
104 * On a failure to read or write data to the MongoDB server.
105 */
106 public TwoThreadSocketConnection(final Server server,
107 final MongoClientConfiguration config,
108 final StringEncoderCache encoderCache,
109 final StringDecoderCache decoderCache) throws SocketException,
110 IOException {
111 super(server, config, encoderCache, decoderCache);
112
113 myBsonOut = new BufferingBsonOutputStream(new RandomAccessOutputStream(
114 encoderCache));
115
116 myToSendQueue = new PendingMessageQueue(
117 config.getMaxPendingOperationsPerConnection(),
118 config.getLockType());
119
120 myReceiver = config.getThreadFactory().newThread(
121 new ReceiveRunnable(this));
122 myReceiver.setDaemon(true);
123 myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
124 + myServer.getCanonicalName());
125
126 mySender = config.getThreadFactory().newThread(new SendRunnable());
127 mySender.setDaemon(true);
128 mySender.setName("MongoDB " + mySocket.getLocalPort() + "-->"
129 + myServer.getCanonicalName());
130 }
131
132 /**
133 * {@inheritDoc}
134 */
135 @Override
136 public void close() throws IOException {
137 final boolean wasOpen = myOpen.get();
138 myOpen.set(false);
139
140 mySender.interrupt();
141 myReceiver.interrupt();
142
143 try {
144 if (Thread.currentThread() != mySender) {
145 mySender.join();
146 }
147 }
148 catch (final InterruptedException ie) {
149 // Ignore.
150 }
151 finally {
152 // Now that output is shutdown. Close up the socket. This
153 // Triggers the receiver to close if the interrupt didn't work.
154 myOutput.close();
155 myInput.close();
156 mySocket.close();
157 }
158
159 try {
160 if (Thread.currentThread() != myReceiver) {
161 myReceiver.join();
162 }
163 }
164 catch (final InterruptedException ie) {
165 // Ignore.
166 }
167
168 myEventSupport.firePropertyChange(OPEN_PROP_NAME, wasOpen, false);
169 }
170
171 /**
172 * {@inheritDoc}
173 */
174 @Override
175 public int getPendingCount() {
176 return super.getPendingCount() + myToSendQueue.size();
177 }
178
179 /**
180 * {@inheritDoc}
181 * <p>
182 * True if the send and pending queues are empty.
183 * </p>
184 */
185 @Override
186 public boolean isIdle() {
187 return super.isIdle() && myToSendQueue.isEmpty();
188 }
189
190 /**
191 * {@inheritDoc}
192 * <p>
193 * Notifies the appropriate messages of the error.
194 * </p>
195 */
196 @Override
197 public void raiseErrors(final MongoDbException exception) {
198 final PendingMessage message = new PendingMessage();
199 while (myToSendQueue.poll(message)) {
200 raiseError(exception, message.getReplyCallback());
201 }
202
203 super.raiseErrors(exception);
204 }
205
206 /**
207 * {@inheritDoc}
208 */
209 @Override
210 public void send(final Message message1, final Message message2,
211 final ReplyCallback replyCallback) throws MongoDbException {
212
213 validate(message1, message2);
214
215 if (replyCallback instanceof AddressAware) {
216 ((AddressAware) replyCallback).setAddress(myServer
217 .getCanonicalName());
218 }
219
220 try {
221 myToSendQueue.put(message1, null, message2, replyCallback);
222 }
223 catch (final InterruptedException e) {
224 throw new MongoDbException(e);
225 }
226 }
227
228 /**
229 * {@inheritDoc}
230 */
231 @Override
232 public void send(final Message message, final ReplyCallback replyCallback)
233 throws MongoDbException {
234
235 validate(message, null);
236
237 if (replyCallback instanceof AddressAware) {
238 ((AddressAware) replyCallback).setAddress(myServer
239 .getCanonicalName());
240 }
241
242 try {
243 myToSendQueue.put(message, replyCallback);
244 }
245 catch (final InterruptedException e) {
246 throw new MongoDbException(e);
247 }
248 }
249
250 /**
251 * Starts the connections read and write threads.
252 */
253 @Override
254 public void start() {
255 myReceiver.start();
256 mySender.start();
257
258 if (myServer.needBuildInfo()) {
259 send(new BuildInfo(), new ServerUpdateCallback(myServer));
260 }
261 }
262
263 /**
264 * Runnable to push data out over the MongoDB connection.
265 *
266 * @copyright 2011, Allanbank Consulting, Inc., All Rights Reserved
267 */
268 protected class SendRunnable implements Runnable {
269
270 /** Tracks if there are messages in the buffer that need to be flushed. */
271 private boolean myNeedToFlush = false;
272
273 /** The {@link PendingMessage} used for the local cached copy. */
274 private final PendingMessage myPendingMessage = new PendingMessage();
275
276 /**
277 * {@inheritDoc}
278 * <p>
279 * Overridden to pull messages off the
280 * {@link TwoThreadSocketConnection#myToSendQueue} and push them into
281 * the socket connection. If <code>null</code> is ever received from a
282 * poll of the queue then the socket connection is flushed and blocking
283 * call is made to the queue.
284 * </p>
285 *
286 * @see Runnable#run()
287 */
288 @Override
289 public void run() {
290 boolean sawError = false;
291 try {
292 while (myOpen.get() && !sawError) {
293 try {
294 sendOne();
295 }
296 catch (final InterruptedException ie) {
297 // Handled by loop but if we have a message, need to
298 // tell him something bad happened (but we shouldn't).
299 raiseError(ie, myPendingMessage.getReplyCallback());
300 }
301 catch (final IOException ioe) {
302 myLog.warn(ioe, "I/O Error sending a message.");
303 raiseError(ioe, myPendingMessage.getReplyCallback());
304 sawError = true;
305 }
306 catch (final RuntimeException re) {
307 myLog.warn(re, "Runtime error sending a message.");
308 raiseError(re, myPendingMessage.getReplyCallback());
309 sawError = true;
310 }
311 catch (final Error error) {
312 myLog.error(error, "Error sending a message.");
313 raiseError(error, myPendingMessage.getReplyCallback());
314 sawError = true;
315 }
316 finally {
317 myPendingMessage.clear();
318 }
319 }
320 }
321 finally {
322 // This may/will fail because we are dying.
323 try {
324 if (myOpen.get()) {
325 doFlush();
326 }
327 }
328 catch (final IOException ioe) {
329 myLog.warn(ioe, "I/O Error on final flush of messages.");
330 }
331 finally {
332 // Make sure we get shutdown completely.
333 IOUtils.close(TwoThreadSocketConnection.this);
334 }
335 }
336 }
337
338 /**
339 * Flushes the messages in the buffer and clears the need-to-flush flag.
340 *
341 * @throws IOException
342 * On a failure flushing the messages.
343 */
344 protected final void doFlush() throws IOException {
345 if (myNeedToFlush) {
346 flush();
347 myNeedToFlush = false;
348 }
349 }
350
351 /**
352 * Sends a single message.
353 *
354 * @throws InterruptedException
355 * If the thread is interrupted waiting for a message to
356 * send.
357 * @throws IOException
358 * On a failure sending the message.
359 */
360 protected final void sendOne() throws InterruptedException, IOException {
361 boolean took = false;
362 if (myNeedToFlush) {
363 took = myToSendQueue.poll(myPendingMessage);
364 }
365 else {
366 myToSendQueue.take(myPendingMessage);
367 took = true;
368 }
369
370 if (took) {
371 myNeedToFlush = true;
372
373 myPendingMessage.getMessage().write(
374 myPendingMessage.getMessageId(), myBsonOut);
375
376 send(myPendingMessage, myBsonOut.getOutput());
377
378 // We have handed the message off. Not our problem any more.
379 // We could legitimately do this before the send but in the case
380 // of an I/O error the send's exception is more meaningful then
381 // the receivers generic "Didn't get a reply".
382 myPendingMessage.clear();
383 }
384 else {
385 doFlush();
386 }
387 }
388 }
389 }