Coverage Report - com.allanbank.mongodb.client.connection.socket.SocketConnection
 
Classes in this File Line Coverage Branch Coverage Complexity
SocketConnection
92%
72/78
90%
27/30
3.5
 
 1  
 /*
 2  
  * #%L
 3  
  * SocketConnection.java - mongodb-async-driver - Allanbank Consulting, Inc.
 4  
  * %%
 5  
  * Copyright (C) 2011 - 2014 Allanbank Consulting, Inc.
 6  
  * %%
 7  
  * Licensed under the Apache License, Version 2.0 (the "License");
 8  
  * you may not use this file except in compliance with the License.
 9  
  * You may obtain a copy of the License at
 10  
  * 
 11  
  *      http://www.apache.org/licenses/LICENSE-2.0
 12  
  * 
 13  
  * Unless required by applicable law or agreed to in writing, software
 14  
  * distributed under the License is distributed on an "AS IS" BASIS,
 15  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 16  
  * See the License for the specific language governing permissions and
 17  
  * limitations under the License.
 18  
  * #L%
 19  
  */
 20  
 package com.allanbank.mongodb.client.connection.socket;
 21  
 
 22  
 import java.io.IOException;
 23  
 import java.lang.ref.Reference;
 24  
 import java.lang.ref.SoftReference;
 25  
 import java.net.Socket;
 26  
 import java.net.SocketException;
 27  
 
 28  
 import com.allanbank.mongodb.MongoClientConfiguration;
 29  
 import com.allanbank.mongodb.MongoDbException;
 30  
 import com.allanbank.mongodb.bson.io.BufferingBsonOutputStream;
 31  
 import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
 32  
 import com.allanbank.mongodb.bson.io.StringDecoderCache;
 33  
 import com.allanbank.mongodb.bson.io.StringEncoderCache;
 34  
 import com.allanbank.mongodb.client.Message;
 35  
 import com.allanbank.mongodb.client.callback.AddressAware;
 36  
 import com.allanbank.mongodb.client.callback.ReplyCallback;
 37  
 import com.allanbank.mongodb.client.message.BuildInfo;
 38  
 import com.allanbank.mongodb.client.message.PendingMessage;
 39  
 import com.allanbank.mongodb.client.state.Server;
 40  
 import com.allanbank.mongodb.client.state.ServerUpdateCallback;
 41  
 import com.allanbank.mongodb.util.IOUtils;
 42  
 
 43  
 /**
 44  
  * Provides a blocking Socket based connection to a MongoDB server.
 45  
  * <p>
 46  
  * This version uses single receive thread. Sending of messages is done by the
 47  
  * application thread sending the message.
 48  
  * </p>
 49  
  * <p>
 50  
  * This implementation does not perform as well as the
 51  
  * {@link TwoThreadSocketConnection} when the {@link Socket} implementation does
 52  
  * not have built in buffering of messages or requires acknowledgment of
 53  
  * messages before releasing a write request. For that reason this
 54  
  * implementation is only used with the normal Java Socket implementations.
 55  
  * </p>
 56  
  * 
 57  
  * @api.no This class is <b>NOT</b> part of the drivers API. This class may be
 58  
  *         mutated in incompatible ways between any two releases of the driver.
 59  
  * @copyright 2011-2014, Allanbank Consulting, Inc., All Rights Reserved
 60  
  */
 61  
 public class SocketConnection extends AbstractSocketConnection {
 62  
 
 63  
     /**
 64  
      * The buffers used each connection. Each buffer is shared by all
 65  
      * connections but there can be up to 1 buffer per application thread.
 66  
      */
 67  
     private final ThreadLocal<Reference<BufferingBsonOutputStream>> myBuffers;
 68  
 
 69  
     /** The thread receiving replies. */
 70  
     private final Thread myReceiver;
 71  
 
 72  
     /** The sequence for serializing sends. */
 73  
     private final Sequence mySendSequence;
 74  
 
 75  
     /**
 76  
      * Creates a new SocketConnection to a MongoDB server.
 77  
      * 
 78  
      * @param server
 79  
      *            The MongoDB server to connect to.
 80  
      * @param config
 81  
      *            The configuration for the Connection to the MongoDB server.
 82  
      * @throws SocketException
 83  
      *             On a failure connecting to the MongoDB server.
 84  
      * @throws IOException
 85  
      *             On a failure to read or write data to the MongoDB server.
 86  
      */
 87  
     public SocketConnection(final Server server,
 88  
             final MongoClientConfiguration config) throws SocketException,
 89  
             IOException {
 90  58
         this(server, config, new StringEncoderCache(),
 91  
                 new StringDecoderCache(),
 92  
                 new ThreadLocal<Reference<BufferingBsonOutputStream>>());
 93  52
     }
 94  
 
 95  
     /**
 96  
      * Creates a new SocketConnection to a MongoDB server.
 97  
      * 
 98  
      * @param server
 99  
      *            The MongoDB server to connect to.
 100  
      * @param config
 101  
      *            The configuration for the Connection to the MongoDB server.
 102  
      * @param encoderCache
 103  
      *            Cache used for encoding strings.
 104  
      * @param decoderCache
 105  
      *            Cache used for decoding strings.
 106  
      * @param buffers
 107  
      *            The buffers used each connection. Each buffer is shared by all
 108  
      *            connections but there can be up to 1 buffer per application
 109  
      *            thread.
 110  
      * @throws SocketException
 111  
      *             On a failure connecting to the MongoDB server.
 112  
      * @throws IOException
 113  
      *             On a failure to read or write data to the MongoDB server.
 114  
      */
 115  
     public SocketConnection(final Server server,
 116  
             final MongoClientConfiguration config,
 117  
             final StringEncoderCache encoderCache,
 118  
             final StringDecoderCache decoderCache,
 119  
             final ThreadLocal<Reference<BufferingBsonOutputStream>> buffers)
 120  
             throws SocketException, IOException {
 121  164
         super(server, config, encoderCache, decoderCache);
 122  
 
 123  136
         myBuffers = buffers;
 124  
 
 125  136
         mySendSequence = new Sequence(1L, config.getLockType());
 126  
 
 127  136
         myReceiver = config.getThreadFactory().newThread(
 128  
                 new ReceiveRunnable(this));
 129  136
         myReceiver.setDaemon(true);
 130  136
         myReceiver.setName("MongoDB " + mySocket.getLocalPort() + "<--"
 131  
                 + myServer.getCanonicalName());
 132  136
     }
 133  
 
 134  
     /**
 135  
      * {@inheritDoc}
 136  
      */
 137  
     @Override
 138  
     public void close() throws IOException {
 139  407
         if (myOpen.compareAndSet(true, false)) {
 140  136
             myReceiver.interrupt();
 141  
 
 142  
             // Now that output is shutdown. Close up the socket. This
 143  
             // Triggers the receiver to close if the interrupt didn't work.
 144  136
             myOutput.close();
 145  136
             myInput.close();
 146  136
             mySocket.close();
 147  
 
 148  
             try {
 149  136
                 if (Thread.currentThread() != myReceiver) {
 150  119
                     myReceiver.join();
 151  
                 }
 152  
             }
 153  0
             catch (final InterruptedException ie) {
 154  
                 // Ignore.
 155  136
             }
 156  
 
 157  136
             myEventSupport.firePropertyChange(OPEN_PROP_NAME, true, false);
 158  
         }
 159  407
     }
 160  
 
 161  
     /**
 162  
      * {@inheritDoc}
 163  
      * <p>
 164  
      * True if the send and pending queues are empty.
 165  
      * </p>
 166  
      */
 167  
     @Override
 168  
     public int getPendingCount() {
 169  3
         return super.getPendingCount() + mySendSequence.getWaitersCount();
 170  
     }
 171  
 
 172  
     /**
 173  
      * {@inheritDoc}
 174  
      * <p>
 175  
      * True if the send and pending queues are empty.
 176  
      * </p>
 177  
      */
 178  
     @Override
 179  
     public boolean isIdle() {
 180  90
         return super.isIdle() && mySendSequence.isIdle();
 181  
     }
 182  
 
 183  
     /**
 184  
      * {@inheritDoc}
 185  
      */
 186  
     @Override
 187  
     public void send(final Message message1, final Message message2,
 188  
             final ReplyCallback replyCallback) throws MongoDbException {
 189  
 
 190  298
         validate(message1, message2);
 191  
 
 192  295
         if (replyCallback instanceof AddressAware) {
 193  0
             ((AddressAware) replyCallback).setAddress(myServer
 194  
                     .getCanonicalName());
 195  
         }
 196  
 
 197  295
         final int count = (message2 == null) ? 1 : 2;
 198  295
         final long seq = mySendSequence.reserve(count);
 199  295
         final long end = seq + count;
 200  
 
 201  295
         boolean sawError = false;
 202  295
         final PendingMessage pendingMessage = new PendingMessage();
 203  
         try {
 204  
 
 205  
             // Serialize the messages now so the critical section becomes close
 206  
             // to a write(byte[]) (with a little accounting overhead).
 207  295
             final Reference<BufferingBsonOutputStream> outRef = myBuffers.get();
 208  295
             BufferingBsonOutputStream out = (outRef != null) ? outRef.get()
 209  
                     : null;
 210  295
             if (out == null) {
 211  75
                 out = new BufferingBsonOutputStream(
 212  
                         new RandomAccessOutputStream(myEncoderCache));
 213  75
                 myBuffers
 214  
                         .set(new SoftReference<BufferingBsonOutputStream>(out));
 215  
             }
 216  
 
 217  295
             message1.write((int) (seq & 0xFFFFFF), out);
 218  292
             if (message2 != null) {
 219  1
                 message2.write((int) ((seq + 1) & 0xFFFFFF), out);
 220  
             }
 221  
 
 222  
             // Now stand in line.
 223  292
             mySendSequence.waitFor(seq);
 224  
 
 225  292
             if (count == 1) {
 226  291
                 pendingMessage.set((int) (seq & 0xFFFFFF), message1,
 227  
                         replyCallback);
 228  291
                 send(pendingMessage, out.getOutput());
 229  
             }
 230  
             else {
 231  1
                 pendingMessage.set((int) ((seq + 1) & 0xFFFFFF), message2,
 232  
                         replyCallback);
 233  1
                 send(pendingMessage, out.getOutput());
 234  
             }
 235  
 
 236  
             // If no-one is waiting we need to flush the message.
 237  291
             if (mySendSequence.noWaiter(end)) {
 238  291
                 if (myReceiver != Thread.currentThread()) {
 239  289
                     flush();
 240  
                 }
 241  
                 else {
 242  2
                     markReaderNeedsToFlush();
 243  
                 }
 244  
             }
 245  
         }
 246  0
         catch (final InterruptedException ie) {
 247  
             // Handled by loop but if we have a message, need to
 248  
             // tell him something bad happened (but we shouldn't).
 249  0
             raiseError(ie, pendingMessage.getReplyCallback());
 250  
         }
 251  2
         catch (final IOException ioe) {
 252  2
             myLog.warn(ioe, "I/O Error sending a message.");
 253  2
             raiseError(ioe, pendingMessage.getReplyCallback());
 254  2
             sawError = true;
 255  
         }
 256  1
         catch (final RuntimeException re) {
 257  1
             myLog.warn(re, "Runtime error sending a message.");
 258  1
             raiseError(re, pendingMessage.getReplyCallback());
 259  1
             sawError = true;
 260  
         }
 261  1
         catch (final Error error) {
 262  1
             myLog.error(error, "Error sending a message.");
 263  1
             raiseError(error, pendingMessage.getReplyCallback());
 264  1
             sawError = true;
 265  
         }
 266  
         finally {
 267  295
             pendingMessage.clear();
 268  295
             mySendSequence.release(seq, end);
 269  
 
 270  295
             if (sawError) {
 271  
                 // This may/will fail because we are dying.
 272  
                 try {
 273  4
                     if (myOpen.get()) {
 274  3
                         flush();
 275  
                     }
 276  
                 }
 277  0
                 catch (final IOException ioe) {
 278  0
                     myLog.warn(ioe, "I/O Error on final flush of messages.");
 279  
                 }
 280  
                 finally {
 281  
                     // Make sure we get shutdown completely.
 282  4
                     IOUtils.close(SocketConnection.this);
 283  4
                 }
 284  
             }
 285  
         }
 286  295
     }
 287  
 
 288  
     /**
 289  
      * {@inheritDoc}
 290  
      */
 291  
     @Override
 292  
     public void send(final Message message, final ReplyCallback replyCallback)
 293  
             throws MongoDbException {
 294  296
         send(message, null, replyCallback);
 295  294
     }
 296  
 
 297  
     /**
 298  
      * Starts the connections read and write threads.
 299  
      */
 300  
     @Override
 301  
     public void start() {
 302  135
         myReceiver.start();
 303  
 
 304  135
         if (myServer.needBuildInfo()) {
 305  83
             send(new BuildInfo(), new ServerUpdateCallback(myServer));
 306  
         }
 307  135
     }
 308  
 }