/*
 * Decompiled with CFR 0.152.
 */
package com.allanbank.mongodb.connection.socket;

import com.allanbank.mongodb.Callback;
import com.allanbank.mongodb.MongoClientConfiguration;
import com.allanbank.mongodb.MongoDbException;
import com.allanbank.mongodb.bson.io.BsonInputStream;
import com.allanbank.mongodb.bson.io.BsonOutputStream;
import com.allanbank.mongodb.connection.Connection;
import com.allanbank.mongodb.connection.Message;
import com.allanbank.mongodb.connection.Operation;
import com.allanbank.mongodb.connection.message.AbstractMessage;
import com.allanbank.mongodb.connection.message.Delete;
import com.allanbank.mongodb.connection.message.GetMore;
import com.allanbank.mongodb.connection.message.Header;
import com.allanbank.mongodb.connection.message.Insert;
import com.allanbank.mongodb.connection.message.IsMaster;
import com.allanbank.mongodb.connection.message.KillCursors;
import com.allanbank.mongodb.connection.message.PendingMessage;
import com.allanbank.mongodb.connection.message.PendingMessageQueue;
import com.allanbank.mongodb.connection.message.Query;
import com.allanbank.mongodb.connection.message.Reply;
import com.allanbank.mongodb.connection.message.ReplyHandler;
import com.allanbank.mongodb.connection.message.Update;
import com.allanbank.mongodb.connection.state.ServerState;
import com.allanbank.mongodb.error.ConnectionLostException;
import com.allanbank.mongodb.util.IOUtils;
import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SocketConnection
implements Connection {
    public static final int HEADER_LENGTH = 16;
    public static final MongoDbException NO_REPLY = new MongoDbException("No reply received.");
    protected static final Logger LOG = Logger.getLogger(SocketConnection.class.getCanonicalName());
    protected final Executor myExecutor;
    protected final AtomicBoolean myOpen;
    protected final PendingMessageQueue myPendingQueue;
    protected final AtomicBoolean myShutdown;
    protected final PendingMessageQueue myToSendQueue;
    private final BsonInputStream myBsonIn;
    private final BsonOutputStream myBsonOut;
    private final PropertyChangeSupport myEventSupport;
    private final BufferedInputStream myInput;
    private final BufferedOutputStream myOutput;
    private final Thread myReceiver;
    private final Thread mySender;
    private final ServerState myServer;
    private final Socket mySocket;

    public SocketConnection(ServerState serverState, MongoClientConfiguration mongoClientConfiguration) throws SocketException, IOException {
        this.myExecutor = mongoClientConfiguration.getExecutor();
        this.myServer = serverState;
        this.myEventSupport = new PropertyChangeSupport(this);
        this.myOpen = new AtomicBoolean(false);
        this.myShutdown = new AtomicBoolean(false);
        this.mySocket = mongoClientConfiguration.getSocketFactory().createSocket();
        this.mySocket.connect(this.myServer.getServer(), mongoClientConfiguration.getConnectTimeout());
        this.updateSocketWithOptions(mongoClientConfiguration);
        this.myOpen.set(true);
        this.myInput = new BufferedInputStream(this.mySocket.getInputStream());
        this.myBsonIn = new BsonInputStream(this.myInput);
        this.myOutput = new BufferedOutputStream(this.mySocket.getOutputStream());
        this.myBsonOut = new BsonOutputStream(this.myOutput);
        this.myToSendQueue = new PendingMessageQueue(mongoClientConfiguration.getMaxPendingOperationsPerConnection(), mongoClientConfiguration.getLockType());
        this.myPendingQueue = new PendingMessageQueue(mongoClientConfiguration.getMaxPendingOperationsPerConnection(), mongoClientConfiguration.getLockType());
        this.myReceiver = mongoClientConfiguration.getThreadFactory().newThread(new ReceiveRunnable());
        this.myReceiver.setDaemon(true);
        this.myReceiver.setName("MongoDB " + this.mySocket.getLocalPort() + "<--" + this.myServer.getServer().toString());
        this.mySender = mongoClientConfiguration.getThreadFactory().newThread(new SendRunnable());
        this.mySender.setDaemon(true);
        this.mySender.setName("MongoDB " + this.mySocket.getLocalPort() + "-->" + this.myServer.getServer().toString());
    }

    @Override
    public void addPending(List<PendingMessage> list) throws InterruptedException {
        while (!list.isEmpty()) {
            this.myToSendQueue.put(list.get(0));
            list.remove(0);
        }
    }

    @Override
    public void addPropertyChangeListener(PropertyChangeListener propertyChangeListener) {
        this.myEventSupport.addPropertyChangeListener(propertyChangeListener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        boolean bl = this.myOpen.get();
        this.myOpen.set(false);
        this.mySender.interrupt();
        this.myReceiver.interrupt();
        try {
            if (Thread.currentThread() != this.mySender) {
                this.mySender.join();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            this.myOutput.close();
            this.myInput.close();
            this.mySocket.close();
        }
        try {
            if (Thread.currentThread() != this.myReceiver) {
                this.myReceiver.join();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.myEventSupport.firePropertyChange("open", bl, false);
    }

    @Override
    public void drainPending(List<PendingMessage> list) {
        this.myToSendQueue.drainTo(list);
    }

    @Override
    public void flush() throws IOException {
        this.myOutput.flush();
    }

    @Override
    public int getPendingCount() {
        return this.myPendingQueue.size() + this.myToSendQueue.size();
    }

    @Override
    public String getServerName() {
        return this.myServer.getServer().getHostName();
    }

    @Override
    public boolean isIdle() {
        return this.myPendingQueue.isEmpty() && this.myToSendQueue.isEmpty();
    }

    @Override
    public boolean isOpen() {
        return this.myOpen.get();
    }

    @Override
    public void raiseErrors(MongoDbException mongoDbException, boolean bl) {
        PendingMessage pendingMessage = new PendingMessage();
        if (bl) {
            while (this.myToSendQueue.poll(pendingMessage)) {
                this.raiseError(mongoDbException, pendingMessage.getReplyCallback());
            }
        }
        while (this.myPendingQueue.poll(pendingMessage)) {
            this.raiseError(mongoDbException, pendingMessage.getReplyCallback());
        }
    }

    @Override
    public void removePropertyChangeListener(PropertyChangeListener propertyChangeListener) {
        this.myEventSupport.removePropertyChangeListener(propertyChangeListener);
    }

    @Override
    public String send(Message message, Callback<Reply> callback) throws MongoDbException {
        try {
            this.myToSendQueue.put(message, callback);
        }
        catch (InterruptedException interruptedException) {
            throw new MongoDbException(interruptedException);
        }
        return this.myServer.getName();
    }

    @Override
    public String send(Message message, Message message2, Callback<Reply> callback) throws MongoDbException {
        try {
            this.myToSendQueue.put(message, null, message2, callback);
        }
        catch (InterruptedException interruptedException) {
            throw new MongoDbException(interruptedException);
        }
        return this.myServer.getName();
    }

    @Override
    public void shutdown() {
        this.myShutdown.set(true);
        this.send(new IsMaster(), new NoopCallback());
    }

    public void start() {
        this.myReceiver.start();
        this.mySender.start();
    }

    public void stop() {
        this.shutdown();
    }

    public String toString() {
        return "MongoDB(" + this.mySocket.getLocalPort() + "-->" + this.mySocket.getRemoteSocketAddress() + ")";
    }

    @Override
    public void waitForClosed(int n, TimeUnit timeUnit) {
        long l = System.currentTimeMillis();
        long l2 = l + timeUnit.toMillis(n);
        while (this.isOpen() && l < l2) {
            try {
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            catch (InterruptedException interruptedException) {
                interruptedException.hashCode();
            }
            l = System.currentTimeMillis();
        }
    }

    protected Message doReceive() throws MongoDbException {
        try {
            int n;
            try {
                n = this.readIntSuppressTimeoutOnNonFirstByte();
            }
            catch (SocketTimeoutException socketTimeoutException) {
                return null;
            }
            int n2 = this.myBsonIn.readInt();
            int n3 = this.myBsonIn.readInt();
            int n4 = this.myBsonIn.readInt();
            Operation operation = Operation.fromCode(n4);
            if (operation == null) {
                throw new MongoDbException("Unexpected operation read '" + n4 + "'.");
            }
            Header header = new Header(n, n2, n3, operation);
            AbstractMessage abstractMessage = null;
            switch (operation) {
                case REPLY: {
                    abstractMessage = new Reply(header, this.myBsonIn);
                    break;
                }
                case QUERY: {
                    abstractMessage = new Query(header, this.myBsonIn);
                    break;
                }
                case UPDATE: {
                    abstractMessage = new Update(this.myBsonIn);
                    break;
                }
                case INSERT: {
                    abstractMessage = new Insert(header, this.myBsonIn);
                    break;
                }
                case GET_MORE: {
                    abstractMessage = new GetMore(this.myBsonIn);
                    break;
                }
                case DELETE: {
                    abstractMessage = new Delete(this.myBsonIn);
                    break;
                }
                case KILL_CURSORS: {
                    abstractMessage = new KillCursors(this.myBsonIn);
                }
            }
            return abstractMessage;
        }
        catch (IOException iOException) {
            ConnectionLostException connectionLostException = new ConnectionLostException(iOException);
            PendingMessage pendingMessage = new PendingMessage();
            while (this.myPendingQueue.poll(pendingMessage)) {
                this.raiseError(connectionLostException, pendingMessage.getReplyCallback());
            }
            this.closeQuietly();
            throw connectionLostException;
        }
    }

    protected void doSend(int n, Message message) throws IOException {
        message.write(n, this.myBsonOut);
    }

    protected void raiseError(Throwable throwable, Callback<Reply> callback) {
        ReplyHandler.raiseError(throwable, callback, this.myExecutor);
    }

    protected int readIntSuppressTimeoutOnNonFirstByte() throws EOFException, IOException {
        int n = 0;
        int n2 = 0;
        int n3 = 0;
        n = this.myBsonIn.read();
        n2 |= n;
        n3 += n << 0;
        for (int i = 8; i < 32; i += 8) {
            try {
                n = this.myBsonIn.read();
            }
            catch (SocketTimeoutException socketTimeoutException) {
                throw new IOException(socketTimeoutException);
            }
            n2 |= n;
            n3 += n << i;
        }
        if (n2 < 0) {
            throw new EOFException();
        }
        return n3;
    }

    protected void reply(Reply reply, Callback<Reply> callback) {
        ReplyHandler.reply(reply, callback, this.myExecutor);
    }

    protected void updateSocketWithOptions(MongoClientConfiguration mongoClientConfiguration) throws SocketException {
        block2: {
            this.mySocket.setKeepAlive(mongoClientConfiguration.isUsingSoKeepalive());
            this.mySocket.setSoTimeout(mongoClientConfiguration.getReadTimeout());
            try {
                this.mySocket.setTcpNoDelay(true);
            }
            catch (SocketException socketException) {
                if ("AFUNIXSocketException".equals(socketException.getClass().getSimpleName())) break block2;
                throw socketException;
            }
        }
        this.mySocket.setPerformancePreferences(1, 5, 6);
    }

    protected void waitForPending(int n, long l) {
        long l2 = System.currentTimeMillis();
        long l3 = l2 + l;
        while (n < this.myPendingQueue.size() && l2 < l3) {
            try {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
            catch (InterruptedException interruptedException) {
                interruptedException.hashCode();
            }
            l2 = System.currentTimeMillis();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(5L);
        }
        catch (InterruptedException interruptedException) {
            interruptedException.hashCode();
        }
    }

    private void closeQuietly() {
        try {
            this.close();
        }
        catch (IOException iOException) {
            LOG.log(Level.WARNING, "I/O exception trying to shutdown the connection.", iOException);
        }
    }

    protected class SendRunnable
    implements Runnable {
        private boolean myNeedToFlush = false;
        private final PendingMessage myPendingMessage = new PendingMessage();

        protected SendRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            boolean bl = false;
            try {
                while (SocketConnection.this.myOpen.get() && !bl) {
                    try {
                        this.sendOne();
                    }
                    catch (InterruptedException interruptedException) {
                        SocketConnection.this.raiseError(interruptedException, this.myPendingMessage.getReplyCallback());
                    }
                    catch (IOException iOException) {
                        LOG.log(Level.WARNING, "I/O Error sending a message.", iOException);
                        SocketConnection.this.raiseError(iOException, this.myPendingMessage.getReplyCallback());
                        bl = true;
                    }
                    catch (RuntimeException runtimeException) {
                        LOG.log(Level.WARNING, "Runtime error sending a message.", runtimeException);
                        SocketConnection.this.raiseError(runtimeException, this.myPendingMessage.getReplyCallback());
                        bl = true;
                    }
                    catch (Error error) {
                        LOG.log(Level.SEVERE, "Error sending a message.", error);
                        SocketConnection.this.raiseError(error, this.myPendingMessage.getReplyCallback());
                        bl = true;
                    }
                    finally {
                        this.myPendingMessage.clear();
                    }
                }
                return;
            }
            finally {
                try {
                    if (SocketConnection.this.myOpen.get()) {
                        this.doFlush();
                    }
                }
                catch (IOException iOException) {
                    LOG.log(Level.WARNING, "I/O Error on final flush of messages.", iOException);
                }
                finally {
                    IOUtils.close(SocketConnection.this);
                }
            }
        }

        protected final void doFlush() throws IOException {
            if (this.myNeedToFlush) {
                SocketConnection.this.flush();
                this.myNeedToFlush = false;
            }
        }

        protected final void sendOne() throws InterruptedException, IOException {
            boolean bl = false;
            if (this.myNeedToFlush) {
                bl = SocketConnection.this.myToSendQueue.poll(this.myPendingMessage);
            } else {
                SocketConnection.this.myToSendQueue.take(this.myPendingMessage);
                bl = true;
            }
            if (bl) {
                int n = this.myPendingMessage.getMessageId();
                Message message = this.myPendingMessage.getMessage();
                if (this.myPendingMessage.getReplyCallback() != null && !SocketConnection.this.myPendingQueue.offer(this.myPendingMessage)) {
                    this.doFlush();
                    SocketConnection.this.myPendingQueue.put(this.myPendingMessage);
                }
                this.myNeedToFlush = true;
                SocketConnection.this.doSend(n, message);
                if (SocketConnection.this.myShutdown.get()) {
                    this.doFlush();
                }
                this.myPendingMessage.clear();
            } else {
                this.doFlush();
            }
        }
    }

    protected class ReceiveRunnable
    implements Runnable {
        private final PendingMessage myPendingMessage = new PendingMessage();

        protected ReceiveRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (SocketConnection.this.myOpen.get()) {
                    try {
                        this.receiveOne();
                        if (!SocketConnection.this.myShutdown.get() || !SocketConnection.this.myToSendQueue.isEmpty() || !SocketConnection.this.myPendingQueue.isEmpty()) continue;
                        return;
                    }
                    catch (MongoDbException mongoDbException) {
                        if (!SocketConnection.this.myOpen.get()) return;
                        LOG.log(Level.WARNING, "Error reading a message: " + mongoDbException.getMessage(), mongoDbException);
                        return;
                    }
                }
            }
            finally {
                IOUtils.close(SocketConnection.this);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected final void receiveOne() {
            Message message = SocketConnection.this.doReceive();
            if (message instanceof Reply) {
                Reply reply = (Reply)message;
                int n = reply.getResponseToId();
                boolean bl = false;
                try {
                    bl = SocketConnection.this.myPendingQueue.poll(this.myPendingMessage);
                    while (bl && this.myPendingMessage.getMessageId() != n) {
                        SocketConnection.this.raiseError(NO_REPLY, this.myPendingMessage.getReplyCallback());
                        bl = SocketConnection.this.myPendingQueue.poll(this.myPendingMessage);
                    }
                    if (bl) {
                        SocketConnection.this.reply(reply, this.myPendingMessage.getReplyCallback());
                    }
                    LOG.warning("Could not find the callback for reply '" + n + "'.");
                }
                finally {
                    this.myPendingMessage.clear();
                }
            } else if (message != null) {
                LOG.warning("Received a non-Reply message: " + message);
            }
        }
    }

    protected static final class NoopCallback
    implements Callback<Reply> {
        protected NoopCallback() {
        }

        @Override
        public void callback(Reply reply) {
        }

        @Override
        public void exception(Throwable throwable) {
        }
    }
}

