1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  package com.allanbank.mongodb.client.connection.socket;
22  
23  import java.beans.PropertyChangeListener;
24  import java.beans.PropertyChangeSupport;
25  import java.io.BufferedOutputStream;
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.InterruptedIOException;
30  import java.io.StreamCorruptedException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.net.SocketTimeoutException;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import javax.net.SocketFactory;
41  
42  import com.allanbank.mongodb.MongoClientConfiguration;
43  import com.allanbank.mongodb.MongoDbException;
44  import com.allanbank.mongodb.Version;
45  import com.allanbank.mongodb.bson.io.BsonInputStream;
46  import com.allanbank.mongodb.bson.io.RandomAccessOutputStream;
47  import com.allanbank.mongodb.bson.io.StringDecoderCache;
48  import com.allanbank.mongodb.bson.io.StringEncoderCache;
49  import com.allanbank.mongodb.client.Message;
50  import com.allanbank.mongodb.client.Operation;
51  import com.allanbank.mongodb.client.VersionRange;
52  import com.allanbank.mongodb.client.callback.NoOpCallback;
53  import com.allanbank.mongodb.client.callback.Receiver;
54  import com.allanbank.mongodb.client.callback.ReplyCallback;
55  import com.allanbank.mongodb.client.callback.ReplyHandler;
56  import com.allanbank.mongodb.client.connection.Connection;
57  import com.allanbank.mongodb.client.connection.SocketConnectionListener;
58  import com.allanbank.mongodb.client.message.Delete;
59  import com.allanbank.mongodb.client.message.GetMore;
60  import com.allanbank.mongodb.client.message.Header;
61  import com.allanbank.mongodb.client.message.Insert;
62  import com.allanbank.mongodb.client.message.IsMaster;
63  import com.allanbank.mongodb.client.message.KillCursors;
64  import com.allanbank.mongodb.client.message.PendingMessage;
65  import com.allanbank.mongodb.client.message.PendingMessageQueue;
66  import com.allanbank.mongodb.client.message.Query;
67  import com.allanbank.mongodb.client.message.Reply;
68  import com.allanbank.mongodb.client.message.Update;
69  import com.allanbank.mongodb.client.state.Server;
70  import com.allanbank.mongodb.error.ConnectionLostException;
71  import com.allanbank.mongodb.error.DocumentToLargeException;
72  import com.allanbank.mongodb.error.ServerVersionException;
73  import com.allanbank.mongodb.util.IOUtils;
74  import com.allanbank.mongodb.util.log.Log;
75  import com.allanbank.mongodb.util.log.LogFactory;
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  public abstract class AbstractSocketConnection implements Connection, Receiver {
86  
87      
88      public static final int HEADER_LENGTH = 16;
89  
90      
91      protected final BsonInputStream myBsonIn;
92  
93      
94      protected final MongoClientConfiguration myConfig;
95  
96      
97      protected final StringEncoderCache myEncoderCache;
98  
99      
100     protected final PropertyChangeSupport myEventSupport;
101 
102     
103     protected final Executor myExecutor;
104 
105     
106     protected final InputStream myInput;
107 
108     
109     protected final Log myLog;
110 
111     
112     protected final AtomicBoolean myOpen;
113 
114     
115     protected final BufferedOutputStream myOutput;
116 
117     
118     protected final PendingMessageQueue myPendingQueue;
119 
120     
121     protected final Server myServer;
122 
123     
124     protected final AtomicBoolean myShutdown;
125 
126     
127     protected final Socket mySocket;
128 
129     
130     private int myIdleTicks = 0;
131 
132     
133     private final PendingMessage myPendingMessage = new PendingMessage();
134 
135     
136     private final AtomicInteger myReaderNeedsToFlush = new AtomicInteger(0);
137 
138     
139 
140 
141 
142 
143 
144 
145 
146 
147 
148 
149 
150 
151 
152 
153 
154     public AbstractSocketConnection(final Server server,
155             final MongoClientConfiguration config,
156             final StringEncoderCache encoderCache,
157             final StringDecoderCache decoderCache) throws SocketException,
158             IOException {
159         super();
160 
161         myServer = server;
162         myConfig = config;
163         myEncoderCache = encoderCache;
164 
165         myLog = LogFactory.getLog(getClass());
166 
167         myExecutor = config.getExecutor();
168         myEventSupport = new PropertyChangeSupport(this);
169         myOpen = new AtomicBoolean(false);
170         myShutdown = new AtomicBoolean(false);
171 
172         mySocket = openSocket(server, config);
173         updateSocketWithOptions(config);
174 
175         myOpen.set(true);
176 
177         myInput = mySocket.getInputStream();
178         myBsonIn = new BsonInputStream(myInput, decoderCache);
179 
180         
181         
182         
183         
184         
185         
186         
187         
188         
189         
190         
191         
192         
193         myOutput = new BufferedOutputStream(mySocket.getOutputStream(),
194                 32 * 1024);
195 
196         myPendingQueue = new PendingMessageQueue(
197                 config.getMaxPendingOperationsPerConnection(),
198                 config.getLockType());
199     }
200 
201     
202 
203 
204 
205 
206 
207     @Override
208     public void addPropertyChangeListener(final PropertyChangeListener listener) {
209         myEventSupport.addPropertyChangeListener(listener);
210     }
211 
212     
213 
214 
215     @Override
216     public void flush() throws IOException {
217         myReaderNeedsToFlush.set(0);
218         myOutput.flush();
219     }
220 
221     
222 
223 
224     @Override
225     public int getPendingCount() {
226         return myPendingQueue.size();
227     }
228 
229     
230 
231 
232 
233 
234 
235     @Override
236     public String getServerName() {
237         return myServer.getCanonicalName();
238     }
239 
240     
241 
242 
243 
244 
245 
246     @Override
247     public boolean isAvailable() {
248         return isOpen() && !isShuttingDown();
249     }
250 
251     
252 
253 
254 
255 
256 
257     @Override
258     public boolean isIdle() {
259         return myPendingQueue.isEmpty();
260     }
261 
262     
263 
264 
265 
266 
267 
268     @Override
269     public boolean isOpen() {
270         return myOpen.get();
271     }
272 
273     
274 
275 
276     @Override
277     public boolean isShuttingDown() {
278         return myShutdown.get();
279     }
280 
281     
282 
283 
284 
285 
286 
287     @Override
288     public void raiseErrors(final MongoDbException exception) {
289         final PendingMessage message = new PendingMessage();
290 
291         while (myPendingQueue.poll(message)) {
292             raiseError(exception, message.getReplyCallback());
293         }
294     }
295 
296     
297 
298 
299 
300 
301 
302     @Override
303     public void removePropertyChangeListener(
304             final PropertyChangeListener listener) {
305         myEventSupport.removePropertyChangeListener(listener);
306     }
307 
308     
309 
310 
311 
312 
313 
314 
315     @Override
316     public void shutdown(final boolean force) {
317         
318         myShutdown.set(true);
319 
320         if (force) {
321             IOUtils.close(this);
322         }
323         else {
324             if (isOpen()) {
325                 
326                 send(new IsMaster(), new NoOpCallback());
327             }
328         }
329     }
330 
331     
332 
333 
334     public abstract void start();
335 
336     
337 
338 
339 
340     public void stop() {
341         shutdown(false);
342     }
343 
344     
345 
346 
347 
348 
349 
350     @Override
351     public String toString() {
352         return "MongoDB(" + mySocket.getLocalPort() + "-->"
353                 + mySocket.getRemoteSocketAddress() + ")";
354     }
355 
356     
357 
358 
359 
360 
361 
362 
363 
364 
365     @Override
366     public void tryReceive() {
367         try {
368             doReceiverFlush();
369 
370             if ((myBsonIn.available() > 0) || (myInput.available() > 0)) {
371                 doReceiveOne();
372             }
373         }
374         catch (final IOException error) {
375             myLog.info(
376                     "Received an error when checking for pending messages: {}.",
377                     error.getMessage());
378         }
379     }
380 
381     
382 
383 
384 
385 
386 
387     @Override
388     public void waitForClosed(final int timeout, final TimeUnit timeoutUnits) {
389         long now = System.currentTimeMillis();
390         final long deadline = now + timeoutUnits.toMillis(timeout);
391 
392         while (isOpen() && (now < deadline)) {
393             try {
394                 
395                 TimeUnit.MILLISECONDS.sleep(10);
396             }
397             catch (final InterruptedException e) {
398                 
399                 e.hashCode();
400             }
401             now = System.currentTimeMillis();
402         }
403     }
404 
405     
406 
407 
408 
409 
410 
411 
412     protected Message doReceive() throws MongoDbException {
413         try {
414             int length;
415             try {
416                 length = readIntSuppressTimeoutOnNonFirstByte();
417             }
418             catch (final SocketTimeoutException ok) {
419                 
420                 
421                 return null;
422             }
423 
424             myBsonIn.prefetch(length - 4);
425 
426             final int requestId = myBsonIn.readInt();
427             final int responseId = myBsonIn.readInt();
428             final int opCode = myBsonIn.readInt();
429 
430             final Operation op = Operation.fromCode(opCode);
431             if (op == null) {
432                 
433                 throw new MongoDbException("Unexpected operation read '"
434                         + opCode + "'.");
435             }
436 
437             final Header header = new Header(length, requestId, responseId, op);
438             Message message = null;
439             switch (op) {
440             case REPLY:
441                 message = new Reply(header, myBsonIn);
442                 break;
443             case QUERY:
444                 message = new Query(header, myBsonIn);
445                 break;
446             case UPDATE:
447                 message = new Update(myBsonIn);
448                 break;
449             case INSERT:
450                 message = new Insert(header, myBsonIn);
451                 break;
452             case GET_MORE:
453                 message = new GetMore(myBsonIn);
454                 break;
455             case DELETE:
456                 message = new Delete(myBsonIn);
457                 break;
458             case KILL_CURSORS:
459                 message = new KillCursors(myBsonIn);
460                 break;
461             }
462 
463             myServer.incrementRepliesReceived();
464 
465             return message;
466         }
467 
468         catch (final IOException ioe) {
469             final MongoDbException error = new ConnectionLostException(ioe);
470 
471             shutdown(error, (ioe instanceof InterruptedIOException));
472 
473             throw error;
474         }
475     }
476 
477     
478 
479 
480     protected void doReceiveOne() {
481 
482         doReceiverFlush();
483 
484         final Message received = doReceive();
485         if (received instanceof Reply) {
486             myIdleTicks = 0;
487             final Reply reply = (Reply) received;
488             final int replyId = reply.getResponseToId();
489             boolean took = false;
490 
491             
492             
493             try {
494                 took = myPendingQueue.poll(myPendingMessage);
495                 while (took && (myPendingMessage.getMessageId() != replyId)) {
496 
497                     final MongoDbException noReply = new MongoDbException(
498                             "No reply received.");
499 
500                     
501                     raiseError(noReply, myPendingMessage.getReplyCallback());
502 
503                     
504                     took = myPendingQueue.poll(myPendingMessage);
505                 }
506 
507                 if (took) {
508                     
509                     reply(reply, myPendingMessage);
510                 }
511                 else {
512                     myLog.warn("Could not find the callback for reply '{}'.",
513                             +replyId);
514                 }
515             }
516             finally {
517                 myPendingMessage.clear();
518             }
519         }
520         else if (received != null) {
521             myLog.warn("Received a non-Reply message: {}.", received);
522             shutdown(new ConnectionLostException(new StreamCorruptedException(
523                     "Received a non-Reply message: " + received)), false);
524         }
525         else {
526             myIdleTicks += 1;
527 
528             if (myConfig.getMaxIdleTickCount() <= myIdleTicks) {
529                 
530                 shutdown(false);
531             }
532         }
533     }
534 
535     
536 
537 
538 
539 
540 
541 
542 
543 
544 
545     protected void doSend(final int messageId,
546             final RandomAccessOutputStream message) throws IOException {
547         message.writeTo(myOutput);
548         message.reset();
549 
550         myServer.incrementMessagesSent();
551     }
552 
553     
554 
555 
556 
557 
558 
559     protected void markReaderNeedsToFlush() {
560         myReaderNeedsToFlush.incrementAndGet();
561     }
562 
563     
564 
565 
566 
567 
568 
569 
570 
571     protected void raiseError(final Throwable exception,
572             final ReplyCallback replyCallback) {
573         ReplyHandler.raiseError(exception, replyCallback, myExecutor);
574     }
575 
576     
577 
578 
579 
580 
581 
582 
583 
584 
585     protected int readIntSuppressTimeoutOnNonFirstByte() throws EOFException,
586             IOException {
587         int read = 0;
588         int eofCheck = 0;
589         int result = 0;
590 
591         read = myBsonIn.read();
592         eofCheck |= read;
593         result += (read << 0);
594 
595         for (int i = Byte.SIZE; i < Integer.SIZE; i += Byte.SIZE) {
596             try {
597                 read = myBsonIn.read();
598             }
599             catch (final SocketTimeoutException ste) {
600                 
601                 throw new IOException(ste);
602             }
603             eofCheck |= read;
604             result += (read << i);
605         }
606 
607         if (eofCheck < 0) {
608             throw new EOFException("Remote connection closed: "
609                     + mySocket.getRemoteSocketAddress());
610         }
611         return result;
612     }
613 
614     
615 
616 
617 
618 
619 
620 
621 
622     protected void reply(final Reply reply, final PendingMessage pendingMessage) {
623 
624         final long latency = pendingMessage.latency();
625 
626         
627         if (latency > 0) {
628             myServer.updateAverageLatency(latency);
629         }
630 
631         final ReplyCallback callback = pendingMessage.getReplyCallback();
632         ReplyHandler.reply(this, reply, callback, myExecutor);
633     }
634 
635     
636 
637 
638 
639 
640 
641 
642 
643 
644 
645 
646 
647 
648 
649     protected final void send(final PendingMessage pendingMessage,
650             final RandomAccessOutputStream message)
651             throws InterruptedException, IOException {
652 
653         final int messageId = pendingMessage.getMessageId();
654 
655         
656         pendingMessage.timestampNow();
657 
658         
659         
660         
661         
662         if ((pendingMessage.getReplyCallback() != null)
663                 && !myPendingQueue.offer(pendingMessage)) {
664             
665             flush();
666             myPendingQueue.put(pendingMessage);
667         }
668 
669         doSend(messageId, message);
670 
671         
672         if (myShutdown.get()) {
673             flush();
674         }
675     }
676 
677     
678 
679 
680 
681 
682 
683 
684 
685     protected void shutdown(final MongoDbException error,
686             final boolean receiveError) {
687         if (receiveError) {
688             myServer.connectionTerminated();
689         }
690 
691         
692         final PendingMessage message = new PendingMessage();
693         while (myPendingQueue.poll(message)) {
694             raiseError(error, message.getReplyCallback());
695         }
696 
697         closeQuietly();
698     }
699 
700     
701 
702 
703 
704 
705 
706 
707 
708     protected void updateSocketWithOptions(final MongoClientConfiguration config)
709             throws SocketException {
710         mySocket.setKeepAlive(config.isUsingSoKeepalive());
711         mySocket.setSoTimeout(config.getReadTimeout());
712         try {
713             mySocket.setTcpNoDelay(true);
714         }
715         catch (final SocketException seIgnored) {
716             
717             
718             
719             if (!"AFUNIXSocketException".equals(seIgnored.getClass()
720                     .getSimpleName())) {
721                 throw seIgnored;
722             }
723         }
724         mySocket.setPerformancePreferences(1, 5, 6);
725     }
726 
727     
728 
729 
730 
731 
732 
733 
734 
735 
736 
737 
738 
739 
740     protected void validate(final Message message1, final Message message2)
741             throws DocumentToLargeException, ServerVersionException {
742 
743         final Version serverVersion = myServer.getVersion();
744         final int maxBsonSize = myServer.getMaxBsonObjectSize();
745 
746         message1.validateSize(maxBsonSize);
747         validateVersion(message1, serverVersion);
748 
749         if (message2 != null) {
750             message2.validateSize(maxBsonSize);
751             validateVersion(message1, serverVersion);
752         }
753     }
754 
755     
756 
757 
758 
759     private void closeQuietly() {
760         try {
761             close();
762         }
763         catch (final IOException e) {
764             myLog.warn(e, "I/O exception trying to shutdown the connection.");
765         }
766     }
767 
768     
769 
770 
771 
772 
773     private void doReceiverFlush() {
774         try {
775             final int unflushedMessages = myReaderNeedsToFlush.get();
776             if ((unflushedMessages != 0)
777                     && (myPendingQueue.size() <= unflushedMessages)) {
778                 flush();
779             }
780         }
781         catch (final IOException ignored) {
782             myLog.warn("Error flushing data to the server: "
783                     + ignored.getMessage());
784         }
785     }
786 
787     
788 
789 
790 
791 
792 
793 
794 
795 
796 
797 
798     private Socket openSocket(final Server server,
799             final MongoClientConfiguration config) throws IOException {
800         final SocketFactory factory = config.getSocketFactory();
801 
802         IOException last = null;
803         Socket socket = null;
804         for (final InetSocketAddress address : myServer.getAddresses()) {
805             try {
806 
807                 socket = factory.createSocket();
808                 socket.connect(address, config.getConnectTimeout());
809 
810                 
811                 
812                 if (factory instanceof SocketConnectionListener) {
813                     ((SocketConnectionListener) factory).connected(address,
814                             socket);
815                 }
816 
817                 
818                 server.connectionOpened(address);
819 
820                 last = null;
821                 break;
822             }
823             catch (final IOException error) {
824                 last = error;
825                 try {
826                     if (socket != null) {
827                         socket.close();
828                     }
829                 }
830                 catch (final IOException ignore) {
831                     myLog.info(
832                             "Could not close the defunct socket connection: {}",
833                             socket);
834                 }
835             }
836 
837         }
838         if (last != null) {
839             server.connectFailed();
840             throw last;
841         }
842 
843         return socket;
844     }
845 
846     
847 
848 
849 
850 
851 
852 
853 
854 
855 
856 
857     private void validateVersion(final Message message,
858             final Version serverVersion) throws ServerVersionException {
859         final VersionRange range = message.getRequiredVersionRange();
860         if ((range != null) && !range.contains(serverVersion)) {
861             throw new ServerVersionException(message.getOperationName(), range,
862                     serverVersion, message);
863         }
864     }
865 }