/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.channel.tcp;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel;
import pt.unl.fct.di.novasys.channel.tcp.ConnectionState;
import pt.unl.fct.di.novasys.channel.tcp.events.ChannelMetrics;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.AttributeValidator;
import pt.unl.fct.di.novasys.network.Connection;
import pt.unl.fct.di.novasys.network.ISerializer;
import pt.unl.fct.di.novasys.network.NetworkManager;
import pt.unl.fct.di.novasys.network.data.Attributes;
import pt.unl.fct.di.novasys.network.data.Host;

public class TCPChannel<T>
extends SingleThreadedBiChannel<T, T>
implements AttributeValidator {
    private static final Logger logger = LogManager.getLogger(TCPChannel.class);
    private static final short TCP_MAGIC_NUMBER = 17669;
    public static final String NAME = "TCPChannel";
    public static final String ADDRESS_KEY = "address";
    public static final String PORT_KEY = "port";
    public static final String WORKER_GROUP_KEY = "worker_group";
    public static final String TRIGGER_SENT_KEY = "trigger_sent";
    public static final String METRICS_INTERVAL_KEY = "metrics_interval";
    public static final String HEARTBEAT_INTERVAL_KEY = "heartbeat_interval";
    public static final String HEARTBEAT_TOLERANCE_KEY = "heartbeat_tolerance";
    public static final String CONNECT_TIMEOUT_KEY = "connect_timeout";
    public static final String LISTEN_ADDRESS_ATTRIBUTE = "listen_address";
    public static final String DEFAULT_PORT = "8573";
    public static final String DEFAULT_HB_INTERVAL = "0";
    public static final String DEFAULT_HB_TOLERANCE = "0";
    public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
    public static final String DEFAULT_METRICS_INTERVAL = "-1";
    public static final int CONNECTION_OUT = 0;
    public static final int CONNECTION_IN = 1;
    private final NetworkManager<T> network;
    private final ChannelListener<T> listener;
    private final Attributes attributes;
    private final Map<Host, LinkedList<Connection<T>>> inConnections;
    private final Map<Host, ConnectionState<T>> outConnections;
    private List<Pair<Host, Connection<T>>> oldIn;
    private List<Pair<Host, ConnectionState<T>>> oldOUt;
    private final boolean triggerSent;
    private final boolean metrics;

    public TCPChannel(ISerializer<T> serializer, ChannelListener<T> list, Properties properties) throws IOException {
        super(NAME);
        this.listener = list;
        if (!properties.containsKey(ADDRESS_KEY)) {
            throw new IllegalArgumentException("TCPChannel requires binding address");
        }
        InetAddress addr = Inet4Address.getByName(properties.getProperty(ADDRESS_KEY));
        int port = Integer.parseInt(properties.getProperty(PORT_KEY, DEFAULT_PORT));
        int hbInterval = Integer.parseInt(properties.getProperty(HEARTBEAT_INTERVAL_KEY, "0"));
        int hbTolerance = Integer.parseInt(properties.getProperty(HEARTBEAT_TOLERANCE_KEY, "0"));
        int connTimeout = Integer.parseInt(properties.getProperty(CONNECT_TIMEOUT_KEY, DEFAULT_CONNECT_TIMEOUT));
        int metricsInterval = Integer.parseInt(properties.getProperty(METRICS_INTERVAL_KEY, DEFAULT_METRICS_INTERVAL));
        this.triggerSent = Boolean.parseBoolean(properties.getProperty(TRIGGER_SENT_KEY, "false"));
        this.metrics = metricsInterval > 0;
        Host listenAddress = new Host(addr, port);
        EventLoopGroup eventExecutors = properties.containsKey(WORKER_GROUP_KEY) ? (EventLoopGroup)properties.get(WORKER_GROUP_KEY) : NetworkManager.createNewWorkerGroup();
        this.network = new NetworkManager<T>(serializer, this, hbInterval, hbTolerance, connTimeout, eventExecutors);
        this.network.createServerSocket(this, listenAddress, (AttributeValidator)this, eventExecutors);
        this.attributes = new Attributes();
        this.attributes.putShort("magic_number", (short)17669);
        this.attributes.putHost(LISTEN_ADDRESS_ATTRIBUTE, listenAddress);
        this.inConnections = new HashMap<Host, LinkedList<Connection<T>>>();
        this.outConnections = new HashMap<Host, ConnectionState<T>>();
        if (this.metrics) {
            this.oldIn = new LinkedList<Pair<Host, Connection<T>>>();
            this.oldOUt = new LinkedList<Pair<Host, ConnectionState<T>>>();
            this.loop.scheduleAtFixedRate(this::triggerMetricsEvent, (long)metricsInterval, (long)metricsInterval, TimeUnit.MILLISECONDS);
        }
    }

    void triggerMetricsEvent() {
        this.listener.deliverEvent(new ChannelMetrics(this.oldIn, this.oldOUt, this.inConnections, this.outConnections));
    }

    @Override
    protected void onOpenConnection(Host peer, int connection) {
        ConnectionState<T> conState = this.outConnections.get(peer);
        if (conState == null) {
            logger.debug("onOpenConnection creating connection to: " + peer);
            this.outConnections.put(peer, new ConnectionState<T>(this.network.createConnection(peer, this.attributes, this)));
        } else if (conState.getState() == ConnectionState.State.DISCONNECTING) {
            logger.debug("onOpenConnection reopening after close to: " + peer);
            conState.setState(ConnectionState.State.DISCONNECTING_RECONNECT);
        } else {
            logger.debug("onOpenConnection ignored: " + peer);
        }
    }

    @Override
    protected void onSendMessage(T msg, Host peer, int connection) {
        logger.debug("SendMessage " + msg + " " + peer + " " + (connection == 1 ? "IN" : "OUT"));
        if (connection <= 0) {
            ConnectionState<T> conState = this.outConnections.get(peer);
            if (conState != null) {
                if (conState.getState() == ConnectionState.State.CONNECTING || conState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
                    conState.getQueue().add(msg);
                } else if (conState.getState() == ConnectionState.State.CONNECTED) {
                    this.sendWithListener(msg, peer, conState.getConnection());
                } else if (conState.getState() == ConnectionState.State.DISCONNECTING) {
                    conState.getQueue().add(msg);
                    conState.setState(ConnectionState.State.DISCONNECTING_RECONNECT);
                }
            } else {
                this.listener.messageFailed(msg, peer, new IllegalArgumentException("No outgoing connection"));
            }
        } else if (connection == 1) {
            LinkedList<Connection<T>> inConnList = this.inConnections.get(peer);
            if (inConnList != null) {
                this.sendWithListener(msg, peer, inConnList.getLast());
            } else {
                this.listener.messageFailed(msg, peer, new IllegalArgumentException("No incoming connection"));
            }
        } else {
            this.listener.messageFailed(msg, peer, new IllegalArgumentException("Invalid connection: " + connection));
            logger.error("Invalid sendMessage mode " + connection);
        }
    }

    private void sendWithListener(T msg, Host peer, Connection<T> established) {
        Promise promise = this.loop.newPromise();
        promise.addListener(future -> {
            if (future.isSuccess() && this.triggerSent) {
                this.listener.messageSent(msg, peer);
            } else if (!future.isSuccess()) {
                this.listener.messageFailed(msg, peer, future.cause());
            }
        });
        established.sendMessage(msg, (Promise<Void>)promise);
    }

    @Override
    protected void onOutboundConnectionUp(Connection<T> conn) {
        logger.debug("OutboundConnectionUp " + conn.getPeer());
        ConnectionState<T> conState = this.outConnections.get(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionUp with no conState: " + conn));
        }
        if (conState.getState() == ConnectionState.State.CONNECTED) {
            throw new AssertionError((Object)("ConnectionUp in CONNECTED state: " + conn));
        }
        if (conState.getState() == ConnectionState.State.CONNECTING) {
            conState.setState(ConnectionState.State.CONNECTED);
            conState.getQueue().forEach(m -> this.sendWithListener(m, conn.getPeer(), conn));
            conState.getQueue().clear();
            this.listener.deliverEvent(new OutConnectionUp(conn.getPeer()));
        }
    }

    @Override
    protected void onCloseConnection(Host peer, int connection) {
        logger.debug("CloseConnection " + peer);
        ConnectionState<T> conState = this.outConnections.get(peer);
        if (conState != null && (conState.getState() == ConnectionState.State.CONNECTED || conState.getState() == ConnectionState.State.CONNECTING || conState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT)) {
            conState.setState(ConnectionState.State.DISCONNECTING);
            conState.getQueue().clear();
            conState.getConnection().disconnect();
        }
    }

    @Override
    protected void onOutboundConnectionDown(Connection<T> conn, Throwable cause) {
        logger.debug("OutboundConnectionDown " + conn.getPeer() + (cause != null ? " " + cause : ""));
        ConnectionState<T> conState = this.outConnections.remove(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionDown with no conState: " + conn));
        }
        if (conState.getState() == ConnectionState.State.CONNECTING) {
            throw new AssertionError((Object)("ConnectionDown in CONNECTING state: " + conn));
        }
        if (conState.getState() == ConnectionState.State.CONNECTED) {
            this.listener.deliverEvent(new OutConnectionDown(conn.getPeer(), cause));
        } else if (conState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(conn.getPeer(), new ConnectionState<T>(this.network.createConnection(conn.getPeer(), this.attributes, this), conState.getQueue()));
        }
        if (this.metrics) {
            this.oldOUt.add(Pair.of((Object)conn.getPeer(), conState));
        }
    }

    @Override
    protected void onOutboundConnectionFailed(Connection<T> conn, Throwable cause) {
        logger.debug("OutboundConnectionFailed " + conn.getPeer() + (cause != null ? " " + cause : ""));
        ConnectionState<T> conState = this.outConnections.remove(conn.getPeer());
        if (conState == null) {
            throw new AssertionError((Object)("ConnectionFailed with no conState: " + conn));
        }
        if (conState.getState() == ConnectionState.State.CONNECTING) {
            this.listener.deliverEvent(new OutConnectionFailed<T>(conn.getPeer(), conState.getQueue(), cause));
        } else if (conState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(conn.getPeer(), new ConnectionState<T>(this.network.createConnection(conn.getPeer(), this.attributes, this), conState.getQueue()));
        } else if (conState.getState() == ConnectionState.State.CONNECTED) {
            throw new AssertionError((Object)("ConnectionFailed in state: " + (Object)((Object)conState.getState()) + " - " + conn));
        }
        if (this.metrics) {
            this.oldOUt.add(Pair.of((Object)conn.getPeer(), conState));
        }
    }

    @Override
    protected void onInboundConnectionUp(Connection<T> con) {
        Host clientSocket;
        try {
            clientSocket = con.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
        }
        catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionUp: " + e.getMessage());
            con.disconnect();
            return;
        }
        LinkedList inConnList = this.inConnections.computeIfAbsent(clientSocket, k -> new LinkedList());
        inConnList.add(con);
        if (inConnList.size() == 1) {
            logger.debug("InboundConnectionUp " + clientSocket);
            this.listener.deliverEvent(new InConnectionUp(clientSocket));
        } else {
            logger.debug("Multiple InboundConnectionUp " + inConnList.size() + clientSocket);
        }
    }

    @Override
    protected void onInboundConnectionDown(Connection<T> con, Throwable cause) {
        Host clientSocket;
        try {
            clientSocket = con.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
        }
        catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionDown: " + e.getMessage());
            con.disconnect();
            return;
        }
        LinkedList<Connection<T>> inConnList = this.inConnections.get(clientSocket);
        if (inConnList == null || inConnList.isEmpty()) {
            throw new AssertionError((Object)("No connections in InboundConnectionDown " + clientSocket));
        }
        if (!inConnList.remove(con)) {
            throw new AssertionError((Object)("No connection in InboundConnectionDown " + clientSocket));
        }
        if (inConnList.isEmpty()) {
            logger.debug("InboundConnectionDown " + clientSocket + (cause != null ? " " + cause : ""));
            this.listener.deliverEvent(new InConnectionDown(clientSocket, cause));
            this.inConnections.remove(clientSocket);
        } else {
            logger.debug("Extra InboundConnectionDown " + inConnList.size() + clientSocket);
        }
        if (this.metrics) {
            this.oldIn.add(Pair.of((Object)clientSocket, con));
        }
    }

    @Override
    public void onServerSocketBind(boolean success, Throwable cause) {
        if (success) {
            logger.debug("Server socket ready");
        } else {
            logger.error("Server socket bind failed: " + cause);
        }
    }

    @Override
    public void onServerSocketClose(boolean success, Throwable cause) {
        logger.debug("Server socket closed. " + (success ? "" : "Cause: " + cause));
    }

    @Override
    public void onDeliverMessage(T msg, Connection<T> conn) {
        Host host;
        if (conn.isInbound()) {
            try {
                host = conn.getPeerAttributes().getHost(LISTEN_ADDRESS_ATTRIBUTE);
            }
            catch (IOException e) {
                logger.error("Inbound connection without valid listen address in deliver message: " + e.getMessage());
                conn.disconnect();
                return;
            }
        } else {
            host = conn.getPeer();
        }
        logger.debug("DeliverMessage " + msg + " " + host + " " + (conn.isInbound() ? "IN" : "OUT"));
        this.listener.deliverMessage(msg, host);
    }

    @Override
    public boolean validateAttributes(Attributes attr) {
        Short channel = attr.getShort("magic_number");
        return channel != null && channel == 17669;
    }
}

