View Javadoc
1   /*
2    * #%L
3    * SocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
4    * %%
5    * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
6    * %%
7    * Licensed under the Apache License, Version 2.0 (the "License");
8    * you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   * 
11   *      http://www.apache.org/licenses/LICENSE-2.0
12   * 
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   * #L%
19   */
20  package com.allanbank.mongodb.client.connection.socket;
21  
22  import java.io.IOException;
23  import java.lang.ref.Reference;
24  import java.lang.ref.SoftReference;
25  import java.net.Socket;
26  import java.net.SocketException;
27  
28  import com.allanbank.mongodb.MongoClientConfiguration;
29  import com.allanbank.mongodb.MongoDbException;
30  import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
31  import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
32  import com.allanbank.mongodb.bson.io.StringDecoderCache;
33  import com.allanbank.mongodb.bson.io.StringEncoderCache;
34  import com.allanbank.mongodb.client.Message;
35  import com.allanbank.mongodb.client.callback.AddressAware;
36  import com.allanbank.mongodb.client.callback.ReplyCallback;
37  import com.allanbank.mongodb.client.message.BuildInfo;
38  import com.allanbank.mongodb.client.message.PendingMessage;
39  import com.allanbank.mongodb.client.state.Server;
40  import com.allanbank.mongodb.client.state.ServerUpdateCallback;
41  import com.allanbank.mongodb.util.IOUtils;
42  
43  /**
44   * Provides a blocking Socket based connection to a MongoDB server.
45   * <p>
46   * This version uses single receive thread. Sending of messages is done by the
47   * application thread sending the message.
48   * </p>
49   * <p>
50   * This implementation does not perform as well as the
51   * {@link TwoThreadSocketConnection} when the {@link Socket} implementation does
52   * not have built in buffering of messages or requires acknowledgment of
53   * messages before releasing a write request. For that reason this
54   * implementation is only used with the normal Java Socket implementations.
55   * </p>
56   * 
57   * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
58   *         mutated in incompatible ways between any two releases of the driver.
59   * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
60   */
61  public class SocketConnection extends AbstractSocketConnection {
62  
63      /**
64       * The buffers used each connection. Each buffer is shared by all
65       * connections but there can be up to 1 buffer per application thread.
66       */
67      private final ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
68  
69      /** The thread receiving replies. */
70      private final Thread myReceiver;
71  
72      /** The sequence for serializing sends. */
73      private final Sequence mySendSequence;
74  
75      /**
76       * Creates a new SocketConnection to a MongoDB server.
77       * 
78       * @param server
79       *            The MongoDB server to connect to.
80       * @param config
81       *            The configuration for the Connection to the MongoDB server.
82       * @throws SocketException
83       *             On a failure connecting to the MongoDB server.
84       * @throws IOException
85       *             On a failure to read or write data to the MongoDB server.
86       */
87      public SocketConnection(final Server server,
88              final MongoClientConfiguration config) throws SocketException,
89              IOException {
90          this(server, config, new StringEncoderCache(),
91                  new StringDecoderCache(),
92                  new ThreadLocal<Reference<BufferingBsonOutputStream>>());
93      }
94  
95      /**
96       * Creates a new SocketConnection to a MongoDB server.
97       * 
98       * @param server
99       *            The MongoDB server to connect to.
100      * @param config
101      *            The configuration for the Connection to the MongoDB server.
102      * @param encoderCache
103      *            Cache used for encoding strings.
104      * @param decoderCache
105      *            Cache used for decoding strings.
106      * @param buffers
107      *            The buffers used each connection. Each buffer is shared by all
108      *            connections but there can be up to 1 buffer per application
109      *            thread.
110      * @throws SocketException
111      *             On a failure connecting to the MongoDB server.
112      * @throws IOException
113      *             On a failure to read or write data to the MongoDB server.
114      */
115     public SocketConnection(final Server server,
116             final MongoClientConfiguration config,
117             final StringEncoderCache encoderCache,
118             final StringDecoderCache decoderCache,
119             final ThreadLocal<Reference<BufferingBsonOutputStream>> buffers)
120             throws SocketException, IOException {
121         super(server, config, encoderCache, decoderCache);
122 
123         myBuffers = buffers;
124 
125         mySendSequence = new Sequence(1L, config.getLockType());
126 
127         myReceiver = config.getThreadFactory().newThread(
128                 new ReceiveRunnable(this));
129         myReceiver.setDaemon(true);
130         myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
131                 + myServer.getCanonicalName());
132     }
133 
134     /**
135      * {@inheritDoc}
136      */
137     @Override
138     public void close() throws IOException {
139         if (myOpen.compareAndSet(true, false)) {
140             myReceiver.interrupt();
141 
142             // Now that output is shutdown. Close up the socket. This
143             // Triggers the receiver to close if the interrupt didn't work.
144             myOutput.close();
145             myInput.close();
146             mySocket.close();
147 
148             try {
149                 if (Thread.currentThread() != myReceiver) {
150                     myReceiver.join();
151                 }
152             }
153             catch (final InterruptedException ie) {
154                 // Ignore.
155             }
156 
157             myEventSupport.firePropertyChange(OPEN_PROP_NAME, true, false);
158         }
159     }
160 
161     /**
162      * {@inheritDoc}
163      * <p>
164      * True if the send and pending queues are empty.
165      * </p>
166      */
167     @Override
168     public int getPendingCount() {
169         return super.getPendingCount() + mySendSequence.getWaitersCount();
170     }
171 
172     /**
173      * {@inheritDoc}
174      * <p>
175      * True if the send and pending queues are empty.
176      * </p>
177      */
178     @Override
179     public boolean isIdle() {
180         return super.isIdle() && mySendSequence.isIdle();
181     }
182 
183     /**
184      * {@inheritDoc}
185      */
186     @Override
187     public void send(final Message message1, final Message message2,
188             final ReplyCallback replyCallback) throws MongoDbException {
189 
190         validate(message1, message2);
191 
192         if (replyCallback instanceof AddressAware) {
193             ((AddressAware) replyCallback).setAddress(myServer
194                     .getCanonicalName());
195         }
196 
197         final int count = (message2 == null) ? 1 : 2;
198         final long seq = mySendSequence.reserve(count);
199         final long end = seq + count;
200 
201         boolean sawError = false;
202         final PendingMessage pendingMessage = new PendingMessage();
203         try {
204 
205             // Serialize the messages now so the critical section becomes close
206             // to a write(byte[]) (with a little accounting overhead).
207             final Reference<BufferingBsonOutputStream> outRef = myBuffers.get();
208             BufferingBsonOutputStream out = (outRef != null) ? outRef.get()
209                     : null;
210             if (out == null) {
211                 out = new BufferingBsonOutputStream(
212                         new RandomAccessOutputStream(myEncoderCache));
213                 myBuffers
214                         .set(new SoftReference<BufferingBsonOutputStream>(out));
215             }
216 
217             message1.write((int) (seq & 0xFFFFFF), out);
218             if (message2 != null) {
219                 message2.write((int) ((seq + 1) & 0xFFFFFF), out);
220             }
221 
222             // Now stand in line.
223             mySendSequence.waitFor(seq);
224 
225             if (count == 1) {
226                 pendingMessage.set((int) (seq & 0xFFFFFF), message1,
227                         replyCallback);
228                 send(pendingMessage, out.getOutput());
229             }
230             else {
231                 pendingMessage.set((int) ((seq + 1) & 0xFFFFFF), message2,
232                         replyCallback);
233                 send(pendingMessage, out.getOutput());
234             }
235 
236             // If no-one is waiting we need to flush the message.
237             if (mySendSequence.noWaiter(end)) {
238                 if (myReceiver != Thread.currentThread()) {
239                     flush();
240                 }
241                 else {
242                     markReaderNeedsToFlush();
243                 }
244             }
245         }
246         catch (final InterruptedException ie) {
247             // Handled by loop but if we have a message, need to
248             // tell him something bad happened (but we shouldn't).
249             raiseError(ie, pendingMessage.getReplyCallback());
250         }
251         catch (final IOException ioe) {
252             myLog.warn(ioe, "I/O Error sending a message.");
253             raiseError(ioe, pendingMessage.getReplyCallback());
254             sawError = true;
255         }
256         catch (final RuntimeException re) {
257             myLog.warn(re, "Runtime error sending a message.");
258             raiseError(re, pendingMessage.getReplyCallback());
259             sawError = true;
260         }
261         catch (final Error error) {
262             myLog.error(error, "Error sending a message.");
263             raiseError(error, pendingMessage.getReplyCallback());
264             sawError = true;
265         }
266         finally {
267             pendingMessage.clear();
268             mySendSequence.release(seq, end);
269 
270             if (sawError) {
271                 // This may/will fail because we are dying.
272                 try {
273                     if (myOpen.get()) {
274                         flush();
275                     }
276                 }
277                 catch (final IOException ioe) {
278                     myLog.warn(ioe, "I/O Error on final flush of messages.");
279                 }
280                 finally {
281                     // Make sure we get shutdown completely.
282                     IOUtils.close(SocketConnection.this);
283                 }
284             }
285         }
286     }
287 
288     /**
289      * {@inheritDoc}
290      */
291     @Override
292     public void send(final Message message, final ReplyCallback replyCallback)
293             throws MongoDbException {
294         send(message, null, replyCallback);
295     }
296 
297     /**
298      * Starts the connections read and write threads.
299      */
300     @Override
301     public void start() {
302         myReceiver.start();
303 
304         if (myServer.needBuildInfo()) {
305             send(new BuildInfo(), new ServerUpdateCallback(myServer));
306         }
307     }
308 }