package pt.unl.fct.di.novasys.channel.tcp;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel;
import pt.unl.fct.di.novasys.channel.tcp.ConnectionState;
import pt.unl.fct.di.novasys.channel.tcp.events.ChannelMetrics;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.AttributeValidator;
import pt.unl.fct.di.novasys.network.Connection;
import pt.unl.fct.di.novasys.network.ISerializer;
import pt.unl.fct.di.novasys.network.NetworkManager;
import pt.unl.fct.di.novasys.network.data.Attributes;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/fct/di/novasys/channel/tcp/TCPChannel.class */
public class TCPChannel<T> extends SingleThreadedBiChannel<T, T> implements AttributeValidator {
    private static final Logger logger = LogManager.getLogger((Class<?>) TCPChannel.class);
    private static final short TCP_MAGIC_NUMBER = 17669;
    public static final String NAME = "TCPChannel";
    public static final String ADDRESS_KEY = "address";
    public static final String PORT_KEY = "port";
    public static final String WORKER_GROUP_KEY = "worker_group";
    public static final String TRIGGER_SENT_KEY = "trigger_sent";
    public static final String METRICS_INTERVAL_KEY = "metrics_interval";
    public static final String HEARTBEAT_INTERVAL_KEY = "heartbeat_interval";
    public static final String HEARTBEAT_TOLERANCE_KEY = "heartbeat_tolerance";
    public static final String CONNECT_TIMEOUT_KEY = "connect_timeout";
    public static final String LISTEN_ADDRESS_ATTRIBUTE = "listen_address";
    public static final String DEFAULT_PORT = "8573";
    public static final String DEFAULT_HB_INTERVAL = "0";
    public static final String DEFAULT_HB_TOLERANCE = "0";
    public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
    public static final String DEFAULT_METRICS_INTERVAL = "-1";
    public static final int CONNECTION_OUT = 0;
    public static final int CONNECTION_IN = 1;
    private final NetworkManager<T> network;
    private final ChannelListener<T> listener;
    private final Attributes attributes;
    private final Map<Host, LinkedList<Connection<T>>> inConnections;
    private final Map<Host, ConnectionState<T>> outConnections;
    private List<Pair<Host, Connection<T>>> oldIn;
    private List<Pair<Host, ConnectionState<T>>> oldOUt;
    private final boolean triggerSent;
    private final boolean metrics;

    public TCPChannel(ISerializer<T> iSerializer, ChannelListener<T> channelListener, Properties properties) throws IOException {
        super(NAME);
        this.listener = channelListener;
        if (!properties.containsKey("address")) {
            throw new IllegalArgumentException("TCPChannel requires binding address");
        }
        InetAddress byName = Inet4Address.getByName(properties.getProperty("address"));
        int parseInt = Integer.parseInt(properties.getProperty("port", DEFAULT_PORT));
        int parseInt2 = Integer.parseInt(properties.getProperty("heartbeat_interval", "0"));
        int parseInt3 = Integer.parseInt(properties.getProperty("heartbeat_tolerance", "0"));
        int parseInt4 = Integer.parseInt(properties.getProperty("connect_timeout", "1000"));
        int parseInt5 = Integer.parseInt(properties.getProperty("metrics_interval", "-1"));
        this.triggerSent = Boolean.parseBoolean(properties.getProperty("trigger_sent", "false"));
        this.metrics = parseInt5 > 0;
        Host host = new Host(byName, parseInt);
        EventLoopGroup createNewWorkerGroup = properties.containsKey("worker_group") ? (EventLoopGroup) properties.get("worker_group") : NetworkManager.createNewWorkerGroup();
        this.network = new NetworkManager<>(iSerializer, this, parseInt2, parseInt3, parseInt4, createNewWorkerGroup);
        this.network.createServerSocket(this, host, this, createNewWorkerGroup);
        this.attributes = new Attributes();
        this.attributes.putShort("magic_number", (short) 17669);
        this.attributes.putHost("listen_address", host);
        this.inConnections = new HashMap();
        this.outConnections = new HashMap();
        if (this.metrics) {
            this.oldIn = new LinkedList();
            this.oldOUt = new LinkedList();
            this.loop.scheduleAtFixedRate(this::triggerMetricsEvent, parseInt5, parseInt5, TimeUnit.MILLISECONDS);
        }
    }

    void triggerMetricsEvent() {
        this.listener.deliverEvent(new ChannelMetrics(this.oldIn, this.oldOUt, this.inConnections, this.outConnections));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedChannel
    public void onOpenConnection(Host host, int i) {
        ConnectionState<T> connectionState = this.outConnections.get(host);
        if (connectionState == null) {
            logger.debug("onOpenConnection creating connection to: " + host);
            this.outConnections.put(host, new ConnectionState<>(this.network.createConnection(host, this.attributes, this)));
        } else if (connectionState.getState() != ConnectionState.State.DISCONNECTING) {
            logger.debug("onOpenConnection ignored: " + host);
        } else {
            logger.debug("onOpenConnection reopening after close to: " + host);
            connectionState.setState(ConnectionState.State.DISCONNECTING_RECONNECT);
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedChannel
    protected void onSendMessage(T t, Host host, int i) {
        logger.debug("SendMessage " + t + " " + host + " " + (i == 1 ? "IN" : "OUT"));
        if (i > 0) {
            if (i != 1) {
                this.listener.messageFailed(t, host, new IllegalArgumentException("Invalid connection: " + i));
                logger.error("Invalid sendMessage mode " + i);
                return;
            }
            LinkedList<Connection<T>> linkedList = this.inConnections.get(host);
            if (linkedList != null) {
                sendWithListener(t, host, linkedList.getLast());
                return;
            } else {
                this.listener.messageFailed(t, host, new IllegalArgumentException("No incoming connection"));
                return;
            }
        }
        ConnectionState<T> connectionState = this.outConnections.get(host);
        if (connectionState == null) {
            this.listener.messageFailed(t, host, new IllegalArgumentException("No outgoing connection"));
            return;
        }
        if (connectionState.getState() == ConnectionState.State.CONNECTING || connectionState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
            connectionState.getQueue().add(t);
            return;
        }
        if (connectionState.getState() == ConnectionState.State.CONNECTED) {
            sendWithListener(t, host, connectionState.getConnection());
        } else if (connectionState.getState() == ConnectionState.State.DISCONNECTING) {
            connectionState.getQueue().add(t);
            connectionState.setState(ConnectionState.State.DISCONNECTING_RECONNECT);
        }
    }

    private void sendWithListener(T t, Host host, Connection<T> connection) {
        Promise<Void> newPromise = this.loop.newPromise();
        newPromise.addListener2(future -> {
            if (future.isSuccess() && this.triggerSent) {
                this.listener.messageSent(t, host);
            } else {
                if (future.isSuccess()) {
                    return;
                }
                this.listener.messageFailed(t, host, future.cause());
            }
        });
        connection.sendMessage(t, newPromise);
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    protected void onOutboundConnectionUp(Connection<T> connection) {
        logger.debug("OutboundConnectionUp " + connection.getPeer());
        ConnectionState<T> connectionState = this.outConnections.get(connection.getPeer());
        if (connectionState == null) {
            throw new AssertionError("ConnectionUp with no conState: " + connection);
        }
        if (connectionState.getState() == ConnectionState.State.CONNECTED) {
            throw new AssertionError("ConnectionUp in CONNECTED state: " + connection);
        }
        if (connectionState.getState() == ConnectionState.State.CONNECTING) {
            connectionState.setState(ConnectionState.State.CONNECTED);
            connectionState.getQueue().forEach(obj -> {
                sendWithListener(obj, connection.getPeer(), connection);
            });
            connectionState.getQueue().clear();
            this.listener.deliverEvent(new OutConnectionUp(connection.getPeer()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedChannel
    public void onCloseConnection(Host host, int i) {
        logger.debug("CloseConnection " + host);
        ConnectionState<T> connectionState = this.outConnections.get(host);
        if (connectionState != null) {
            if (connectionState.getState() == ConnectionState.State.CONNECTED || connectionState.getState() == ConnectionState.State.CONNECTING || connectionState.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
                connectionState.setState(ConnectionState.State.DISCONNECTING);
                connectionState.getQueue().clear();
                connectionState.getConnection().disconnect();
            }
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    protected void onOutboundConnectionDown(Connection<T> connection, Throwable th) {
        logger.debug("OutboundConnectionDown " + connection.getPeer() + (th != null ? " " + th : ""));
        ConnectionState<T> remove = this.outConnections.remove(connection.getPeer());
        if (remove == null) {
            throw new AssertionError("ConnectionDown with no conState: " + connection);
        }
        if (remove.getState() == ConnectionState.State.CONNECTING) {
            throw new AssertionError("ConnectionDown in CONNECTING state: " + connection);
        }
        if (remove.getState() == ConnectionState.State.CONNECTED) {
            this.listener.deliverEvent(new OutConnectionDown(connection.getPeer(), th));
        } else if (remove.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(connection.getPeer(), new ConnectionState<>(this.network.createConnection(connection.getPeer(), this.attributes, this), remove.getQueue()));
        }
        if (this.metrics) {
            this.oldOUt.add(Pair.of(connection.getPeer(), remove));
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    protected void onOutboundConnectionFailed(Connection<T> connection, Throwable th) {
        logger.debug("OutboundConnectionFailed " + connection.getPeer() + (th != null ? " " + th : ""));
        ConnectionState<T> remove = this.outConnections.remove(connection.getPeer());
        if (remove == null) {
            throw new AssertionError("ConnectionFailed with no conState: " + connection);
        }
        if (remove.getState() == ConnectionState.State.CONNECTING) {
            this.listener.deliverEvent(new OutConnectionFailed(connection.getPeer(), remove.getQueue(), th));
        } else if (remove.getState() == ConnectionState.State.DISCONNECTING_RECONNECT) {
            this.outConnections.put(connection.getPeer(), new ConnectionState<>(this.network.createConnection(connection.getPeer(), this.attributes, this), remove.getQueue()));
        } else if (remove.getState() == ConnectionState.State.CONNECTED) {
            throw new AssertionError("ConnectionFailed in state: " + remove.getState() + " - " + connection);
        }
        if (this.metrics) {
            this.oldOUt.add(Pair.of(connection.getPeer(), remove));
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    protected void onInboundConnectionUp(Connection<T> connection) {
        try {
            Host host = connection.getPeerAttributes().getHost("listen_address");
            LinkedList<Connection<T>> computeIfAbsent = this.inConnections.computeIfAbsent(host, host2 -> {
                return new LinkedList();
            });
            computeIfAbsent.add(connection);
            if (computeIfAbsent.size() != 1) {
                logger.debug("Multiple InboundConnectionUp " + computeIfAbsent.size() + host);
            } else {
                logger.debug("InboundConnectionUp " + host);
                this.listener.deliverEvent(new InConnectionUp(host));
            }
        } catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionUp: " + e.getMessage());
            connection.disconnect();
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    protected void onInboundConnectionDown(Connection<T> connection, Throwable th) {
        try {
            Host host = connection.getPeerAttributes().getHost("listen_address");
            LinkedList<Connection<T>> linkedList = this.inConnections.get(host);
            if (linkedList == null || linkedList.isEmpty()) {
                throw new AssertionError("No connections in InboundConnectionDown " + host);
            }
            if (!linkedList.remove(connection)) {
                throw new AssertionError("No connection in InboundConnectionDown " + host);
            }
            if (linkedList.isEmpty()) {
                logger.debug("InboundConnectionDown " + host + (th != null ? " " + th : ""));
                this.listener.deliverEvent(new InConnectionDown(host, th));
                this.inConnections.remove(host);
            } else {
                logger.debug("Extra InboundConnectionDown " + linkedList.size() + host);
            }
            if (this.metrics) {
                this.oldIn.add(Pair.of(host, connection));
            }
        } catch (IOException e) {
            logger.error("Inbound connection without valid listen address in connectionDown: " + e.getMessage());
            connection.disconnect();
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    public void onServerSocketBind(boolean z, Throwable th) {
        if (z) {
            logger.debug("Server socket ready");
        } else {
            logger.error("Server socket bind failed: " + th);
        }
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedBiChannel
    public void onServerSocketClose(boolean z, Throwable th) {
        logger.debug("Server socket closed. " + (z ? "" : "Cause: " + th));
    }

    @Override // pt.unl.fct.di.novasys.channel.base.SingleThreadedChannel
    public void onDeliverMessage(T t, Connection<T> connection) {
        Host host;
        if (connection.isInbound()) {
            try {
                host = connection.getPeerAttributes().getHost("listen_address");
            } catch (IOException e) {
                logger.error("Inbound connection without valid listen address in deliver message: " + e.getMessage());
                connection.disconnect();
                return;
            }
        } else {
            host = connection.getPeer();
        }
        logger.debug("DeliverMessage " + t + " " + host + " " + (connection.isInbound() ? "IN" : "OUT"));
        this.listener.deliverMessage(t, host);
    }

    @Override // pt.unl.fct.di.novasys.network.AttributeValidator
    public boolean validateAttributes(Attributes attributes) {
        Short sh = attributes.getShort("magic_number");
        return sh != null && sh.shortValue() == TCP_MAGIC_NUMBER;
    }
}
