package pt.unl.fct.di.novasys.channel.tcp;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualHashBidiMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.channel.ChannelListener;
import pt.unl.fct.di.novasys.channel.IChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.AttributeValidator;
import pt.unl.fct.di.novasys.network.Connection;
import pt.unl.fct.di.novasys.network.ISerializer;
import pt.unl.fct.di.novasys.network.NetworkManager;
import pt.unl.fct.di.novasys.network.data.Attributes;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.network.listeners.InConnListener;
import pt.unl.fct.di.novasys.network.listeners.MessageListener;
import pt.unl.fct.di.novasys.network.listeners.OutConnListener;

/* loaded from: input_file:pt/unl/fct/di/novasys/channel/tcp/MultithreadedTCPChannel.class */
public class MultithreadedTCPChannel<T> implements IChannel<T>, MessageListener<T>, InConnListener<T>, OutConnListener<T>, AttributeValidator {
    private static final Logger logger;
    private static final short TCP_MAGIC_NUMBER = 17669;
    public static final String NAME = "MultithreadedTCPChannel";
    public static final String ADDRESS_KEY = "address";
    public static final String PORT_KEY = "port";
    public static final String WORKER_GROUP_KEY = "workerGroup";
    public static final String LISTEN_ADDRESS_ATTRIBUTE = "listen_address";
    public static final int DEFAULT_PORT = 8573;
    public static final int CONNECTION_OUT = 0;
    public static final int CONNECTION_IN = 1;
    private final NetworkManager<T> network;
    private final ChannelListener<T> listener;
    private Attributes attributes;
    private Map<Host, Pair<Connection<T>, Queue<T>>> pendingOut;
    private Map<Host, Connection<T>> establishedOut;
    private final BidiMap<Host, Connection<T>> establishedIn;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MultithreadedTCPChannel(ISerializer<T> iSerializer, ChannelListener<T> channelListener, Properties properties) throws IOException {
        this.listener = channelListener;
        if (!properties.containsKey("address")) {
            throw new IllegalArgumentException("MultithreadedTCPChannel requires binding address");
        }
        InetAddress byName = Inet4Address.getByName(properties.getProperty("address"));
        int intValue = properties.containsKey("port") ? ((Integer) properties.get("port")).intValue() : DEFAULT_PORT;
        Host host = new Host(byName, intValue);
        EventLoopGroup createNewWorkerGroup = properties.containsKey("workerGroup") ? (EventLoopGroup) properties.get("workerGroup") : NetworkManager.createNewWorkerGroup(0);
        this.network = new NetworkManager<>(iSerializer, this, 1000, 3000, 1000, createNewWorkerGroup);
        this.network.createServerSocket(this, new Host(byName, intValue), this, createNewWorkerGroup);
        this.attributes = new Attributes();
        this.attributes.putShort(AttributeValidator.CHANNEL_MAGIC_ATTRIBUTE, (short) 17669);
        this.attributes.putHost("listen_address", host);
        this.pendingOut = new ConcurrentHashMap();
        this.establishedOut = new ConcurrentHashMap();
        this.establishedIn = new DualHashBidiMap();
    }

    @Override // pt.unl.fct.di.novasys.channel.IChannel
    public void openConnection(Host host, int i) {
        if (this.establishedOut.containsKey(host)) {
            return;
        }
        this.pendingOut.computeIfAbsent(host, host2 -> {
            return Pair.of(this.network.createConnection(host, this.attributes, this), new LinkedList());
        });
    }

    @Override // pt.unl.fct.di.novasys.channel.IChannel
    public void sendMessage(T t, Host host, int i) {
        logger.debug("SendMessage " + t + " " + host + " " + (i == 1 ? "IN" : "OUT"));
        if (i <= 0) {
            Connection<T> connection = this.establishedOut.get(host);
            if (connection != null) {
                sendWithListener(t, host, connection);
                return;
            }
            Pair<Connection<T>, Queue<T>> pair = this.pendingOut.get(host);
            if (pair != null) {
                ((Queue) pair.getValue()).add(t);
                return;
            } else {
                this.listener.messageFailed(t, host, new IllegalArgumentException("No outgoing connection to peer."));
                return;
            }
        }
        if (i != 1) {
            this.listener.messageFailed(t, host, new IllegalArgumentException("Invalid send connection: " + i));
            logger.error("Invalid sendMessage connection " + i);
            return;
        }
        Connection<T> connection2 = (Connection) this.establishedIn.get(host);
        if (connection2 != null) {
            sendWithListener(t, host, connection2);
        } else {
            this.listener.messageFailed(t, host, new IllegalArgumentException("No incoming connection"));
        }
    }

    private void sendWithListener(T t, Host host, Connection<T> connection) {
        EventLoop loop = connection.getLoop();
        loop.submit(() -> {
            Promise<Void> newPromise = loop.newPromise();
            newPromise.addListener(future -> {
                if (future.isSuccess()) {
                    this.listener.messageSent(t, host);
                } else {
                    this.listener.messageFailed(t, host, future.cause());
                }
            });
            connection.sendMessage(t, newPromise);
        });
    }

    @Override // pt.unl.fct.di.novasys.channel.IChannel
    public void closeConnection(Host host, int i) {
        logger.debug("CloseConnection " + host);
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(host);
        if (remove != null) {
            ((Connection) remove.getKey()).disconnect();
        }
        Connection<T> connection = this.establishedOut.get(host);
        if (connection != null) {
            connection.disconnect();
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.OutConnListener
    public void outboundConnectionUp(Connection<T> connection) {
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        logger.debug("OutboundConnectionUp " + connection.getPeer());
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(connection.getPeer());
        if (remove == null) {
            logger.warn("ConnectionUp with no pending: " + connection);
        } else {
            if (remove.getKey() != connection) {
                throw new RuntimeException("Reference mismatch");
            }
            if (this.establishedOut.put(connection.getPeer(), connection) != null) {
                throw new RuntimeException("Connection already exists in connection up");
            }
            this.listener.deliverEvent(new OutConnectionUp(connection.getPeer()));
            ((Queue) remove.getValue()).forEach(obj -> {
                sendWithListener(obj, connection.getPeer(), connection);
            });
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.OutConnListener
    public void outboundConnectionDown(Connection<T> connection, Throwable th) {
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        logger.debug("OutboundConnectionDown " + connection.getPeer() + (th != null ? " " + th : ""));
        if (this.establishedOut.remove(connection.getPeer()) != null) {
            this.listener.deliverEvent(new OutConnectionDown(connection.getPeer(), th));
        } else {
            logger.warn("ConnectionDown with no context available: " + connection);
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.OutConnListener
    public void outboundConnectionFailed(Connection<T> connection, Throwable th) {
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        logger.debug("OutboundConnectionFailed " + connection.getPeer() + (th != null ? " " + th : ""));
        if (this.establishedOut.containsKey(connection.getPeer())) {
            throw new RuntimeException("Connection exists in conn failed");
        }
        Pair<Connection<T>, Queue<T>> remove = this.pendingOut.remove(connection.getPeer());
        if (remove != null) {
            this.listener.deliverEvent(new OutConnectionFailed(connection.getPeer(), (Queue) remove.getRight(), th));
        } else {
            logger.warn("ConnectionFailed with no pending: " + connection);
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.InConnListener
    public void inboundConnectionUp(Connection<T> connection) {
        Connection connection2;
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        try {
            Host host = connection.getPeerAttributes().getHost("listen_address");
            if (host == null) {
                logger.error("Inbound connection without LISTEN_ADDRESS: " + connection.getPeer() + " " + connection.getPeerAttributes());
                return;
            }
            logger.debug("InboundConnectionUp " + host);
            synchronized (this.establishedIn) {
                connection2 = (Connection) this.establishedIn.putIfAbsent(host, connection);
            }
            if (connection2 != null) {
                throw new RuntimeException("Double incoming connection from: " + host + " (" + connection.getPeer() + ")");
            }
            this.listener.deliverEvent(new InConnectionUp(host));
        } catch (IOException e) {
            logger.error("Error parsing LISTEN_ADDRESS_ATTRIBUTE of inbound connection: " + e.getMessage());
            connection.disconnect();
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.InConnListener
    public void inboundConnectionDown(Connection<T> connection, Throwable th) {
        Host host;
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        synchronized (this.establishedIn) {
            host = (Host) this.establishedIn.removeValue(connection);
        }
        logger.debug("InboundConnectionDown " + host + (th != null ? " " + th : ""));
        this.listener.deliverEvent(new InConnectionDown(host, th));
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.MessageListener
    public void deliverMessage(T t, Connection<T> connection) {
        Host peer;
        if (!$assertionsDisabled && !connection.getLoop().inEventLoop()) {
            throw new AssertionError();
        }
        if (connection.isInbound()) {
            peer = (Host) this.establishedIn.getKey(connection);
            if (peer == null) {
                throw new AssertionError("Null host");
            }
        } else {
            peer = connection.getPeer();
        }
        logger.debug("DeliverMessage " + t + " " + peer + " " + (connection.isInbound() ? "IN" : "OUT"));
        this.listener.deliverMessage(t, peer);
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.InConnListener
    public void serverSocketBind(boolean z, Throwable th) {
        if (z) {
            logger.debug("Server socket ready");
        } else {
            logger.error("Server socket bind failed: " + th);
        }
    }

    @Override // pt.unl.fct.di.novasys.network.listeners.InConnListener
    public void serverSocketClose(boolean z, Throwable th) {
        logger.debug("Server socket closed. " + (z ? "" : "Cause: " + th));
    }

    @Override // pt.unl.fct.di.novasys.network.AttributeValidator
    public boolean validateAttributes(Attributes attributes) {
        Short sh = attributes.getShort(AttributeValidator.CHANNEL_MAGIC_ATTRIBUTE);
        return sh != null && sh.shortValue() == TCP_MAGIC_NUMBER;
    }

    static {
        $assertionsDisabled = !MultithreadedTCPChannel.class.desiredAssertionStatus();
        logger = LogManager.getLogger(MultithreadedTCPChannel.class);
    }
}
