package pt.unl.fct.di.novasys.babel.protocols.eagerpush;

import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.security.SignatureException;
import java.security.UnrecoverableEntryException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.messages.GossipMessage;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.secure.events.SecureInConnectionDown;
import pt.unl.fct.di.novasys.channel.secure.events.SecureInConnectionUp;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionDown;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionFailed;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Bytes;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/fct/di/novasys/babel/protocols/eagerpush/EagerPushGossipBroadcast.class */
public class EagerPushGossipBroadcast extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger(EagerPushGossipBroadcast.class);
    public static final short PROTOCOL_ID = 1600;
    public static final String PROTOCOL_NAME = "EagerPushGossipBroadcast";
    public static final String PAR_CHANNEL_ADDRESS = "EagerPushGossipBroadcast.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "EagerPushGossipBroadcast.Channel.Port";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String DEFAULT_FANOUT = "4";
    public static final String PAR_DELIVERY_TIMEOUT = "EagerPushGossipBroadcast.DeliveredTimeout";
    public static final String DEFAULT_DELIVERY_TIEMEOUT = "600000";
    public static final String PAR_SUPPORT_ANTIENTROPHY = "EagerPushGosssipBroadcast.SupportAntiEntropy";
    public static final boolean DEFAULT_SUPPORT_ANTIENTROPHY = false;
    public final int fanout;
    public final long removeTimeWindow;
    public final int networkPort;
    public final boolean supportAntiEntrophy;
    protected int channelId;
    protected final Peer myself;
    private Set<Peer> pending;
    private Set<Peer> connectedNeighbors;
    private Set<Peer> neighbors;
    private final Map<Bytes, List<Pair<GossipMessage, Peer>>> waitingCertificates;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> receivedTimestamps;
    private final Random r;
    private Counter sentMessagesCounter;

    public EagerPushGossipBroadcast(String str, Properties properties, Host host) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 1600);
        String str2;
        this.sentMessagesCounter = registerMetric(new Counter("SentMessages", "", new String[0]));
        this.myself = new Peer(babelSecurity.getDefaultIdentity().identity(), host);
        this.pending = new TreeSet();
        this.connectedNeighbors = new TreeSet();
        this.neighbors = new TreeSet();
        this.waitingCertificates = new HashMap();
        this.receivedInOrder = new LinkedList<>();
        this.receivedTimestamps = new HashMap<>();
        this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
        this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, DEFAULT_DELIVERY_TIEMEOUT));
        this.r = new Random(System.currentTimeMillis());
        if (properties.containsKey(PAR_SUPPORT_ANTIENTROPHY)) {
            this.supportAntiEntrophy = Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY));
        } else {
            this.supportAntiEntrophy = false;
        }
        String property = properties.containsKey(PAR_CHANNEL_ADDRESS) ? properties.getProperty(PAR_CHANNEL_ADDRESS) : host.getAddress().getHostAddress().toString();
        if (properties.containsKey(PAR_CHANNEL_PORT)) {
            str2 = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Integer.parseInt(str2);
        } else {
            this.networkPort = host.getPort();
            str2 = this.networkPort;
        }
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", str2);
        this.channelId = createSecureChannel("AuthChannel", properties2, this.myself.getIdentity());
        setDefaultChannel(this.channelId);
        logger.debug("Created new channel with id " + this.channelId + " and bounded to: " + property + ":" + str2);
        registerMessageSerializer(this.channelId, (short) 1651, GossipMessage.serializer);
        registerMessageHandler(this.channelId, (short) 1651, this::uponGossipMessage, (v1, v2, v3, v4, v5, v6) -> {
            uponMessageFailed(v1, v2, v3, v4, v5, v6);
        });
        registerRequestHandler((short) 551, this::uponBroadcastRequest);
        subscribeNotification((short) 451, this::uponNeighborUp);
        subscribeNotification((short) 452, this::uponNeighborDown);
        if (this.supportAntiEntrophy) {
            registerRequestHandler((short) 553, this::uponMissingMessageRequest);
        }
        registerChannelEventHandler(this.channelId, (short) 13, this::uponOutConnectionDown);
        registerChannelEventHandler(this.channelId, (short) 14, this::uponOutConnectionFailed);
        registerChannelEventHandler(this.channelId, (short) 15, this::uponOutConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 12, this::uponInConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 11, this::uponInConnectionDown);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        triggerNotification(new ChannelAvailableNotification((short) 1600, PROTOCOL_NAME, this.channelId, "AuthChannel", this.myself.getHost()));
    }

    private void uponMessageFailed(ProtoMessage protoMessage, Host host, byte[] bArr, short s, Throwable th, int i) {
        logger.warn("Message failed: msg={}, to={}, destProto={}, cause={}, channelId={}", protoMessage, new Peer(bArr, host), Short.valueOf(s), th, Integer.valueOf(i));
    }

    private void uponBroadcastRequest(BroadcastRequest broadcastRequest, short s) {
        try {
            GossipMessage gossipMessage = new GossipMessage(broadcastRequest.getTimestamp(), this.myself, broadcastRequest.getPayload(), s);
            deliverMessage(gossipMessage.m2clone());
            ArrayList arrayList = new ArrayList(this.connectedNeighbors);
            logger.debug("Received request to broadcast. currenyly I have " + arrayList.size() + " targets. Fanout is " + this.fanout);
            for (int i = this.fanout; arrayList.size() > 0 && i > 0; i--) {
                logger.debug("Sending message " + String.valueOf(gossipMessage.getMID()) + " to a neighbor.");
                sendMessage(gossipMessage.m2clone(), (Peer) arrayList.remove(this.r.nextInt(arrayList.size())));
            }
        } catch (InvalidKeyException | NoSuchAlgorithmException | SignatureException | UnrecoverableEntryException e) {
            logger.error("Failed signing broadcast message: " + String.valueOf(e));
        }
        cleanUp();
    }

    private void deliverMessage(GossipMessage gossipMessage) {
        this.receivedInOrder.addLast(gossipMessage.getMID());
        this.receivedTimestamps.put(gossipMessage.getMID(), Long.valueOf(System.currentTimeMillis()));
        triggerNotification(new BroadcastDelivery(gossipMessage.getOrigin(), gossipMessage.getPayload(), gossipMessage.getTimestamp()));
        if (this.supportAntiEntrophy) {
            triggerNotification(new IdentifiableMessageNotification(gossipMessage, (short) 1600));
        }
    }

    private void cleanUp() {
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.receivedTimestamps.get(this.receivedInOrder.pollFirst()).longValue() + this.removeTimeWindow < currentTimeMillis) {
            this.receivedTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void uponGossipMessage(GossipMessage gossipMessage, Host host, byte[] bArr, short s, int i) {
        Peer peer = new Peer(bArr, host);
        Peer origin = gossipMessage.getOrigin();
        logger.debug("Received gossip message from {} with origin {}", peer, origin);
        Certificate trustedCertificate = babelSecurity.getTrustedCertificate(origin.getIdentity());
        if (trustedCertificate != null) {
            validateAndProcessGossipMessage(trustedCertificate, gossipMessage, peer);
            return;
        }
        List<Pair<GossipMessage, Peer>> computeIfAbsent = this.waitingCertificates.computeIfAbsent(Bytes.of(origin.getIdentity()), bytes -> {
            return new ArrayList();
        });
        if (computeIfAbsent.isEmpty()) {
            openConnection(origin);
        }
        computeIfAbsent.add(Pair.of(gossipMessage, peer));
    }

    private void validateAndProcessGossipMessage(Certificate certificate, GossipMessage gossipMessage, Peer peer) {
        try {
            if (!(certificate instanceof X509Certificate ? gossipMessage.verifySignature((X509Certificate) certificate) : gossipMessage.verifySignature(certificate.getPublicKey()))) {
                logger.warn("Gossip message {} verification failed. Signature did not correspond to known peer public key.", gossipMessage);
                return;
            }
            if (this.receivedTimestamps.containsKey(gossipMessage.getMID())) {
                return;
            }
            deliverMessage(gossipMessage.m2clone());
            gossipMessage.incrementHopCount();
            ArrayList arrayList = new ArrayList(this.connectedNeighbors);
            arrayList.remove(peer);
            arrayList.remove(peer);
            for (int i = this.fanout; arrayList.size() > 0 && i > 0; i--) {
                sendMessage(gossipMessage, (Peer) arrayList.remove(this.r.nextInt(arrayList.size())));
                this.sentMessagesCounter.inc();
            }
            cleanUp();
        } catch (IllegalStateException | InvalidKeyException | NoSuchAlgorithmException | SignatureException e) {
            logger.warn("Gossip message {} verification failed with exception: {}", gossipMessage, e);
        }
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        logger.debug("NeighborUp received: " + String.valueOf(neighborUp.getPeer()));
        Peer withMyPort = withMyPort(neighborUp.getPeer());
        this.neighbors.add(withMyPort);
        if (this.connectedNeighbors.contains(withMyPort) || !this.pending.add(withMyPort)) {
            return;
        }
        logger.debug("No open connection with neighbor. Opening...");
        openConnection(withMyPort);
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        Peer withMyPort = withMyPort(neighborDown.getPeer());
        this.neighbors.remove(withMyPort);
        if (this.connectedNeighbors.remove(withMyPort)) {
            closeConnection(withMyPort);
        } else if (this.pending.remove(withMyPort)) {
            closeConnection(withMyPort);
        }
    }

    private void uponMissingMessageRequest(MissingIdentifiableMessageRequest missingIdentifiableMessageRequest, short s) {
        logger.debug("Received an anti-entrophy request indicating that " + String.valueOf(missingIdentifiableMessageRequest.getDestination()) + " is missing " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()));
        Peer peer = null;
        if (this.connectedNeighbors.contains(missingIdentifiableMessageRequest.getDestination()) || this.pending.contains(missingIdentifiableMessageRequest.getDestination())) {
            peer = missingIdentifiableMessageRequest.getDestination();
            logger.debug("Destination in requeest is among my connections (active or pending)");
        } else {
            Peer withMyPort = withMyPort(missingIdentifiableMessageRequest.getDestination());
            if (this.connectedNeighbors.contains(withMyPort) || this.pending.contains(withMyPort)) {
                peer = withMyPort;
                logger.debug("Destination was translated to my protocol channel (" + String.valueOf(withMyPort) + ") and was found among my connections (active or pending)");
            }
        }
        if (peer == null) {
            logger.info("Unable to recover message " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()) + " to " + String.valueOf(missingIdentifiableMessageRequest.getDestination()));
            return;
        }
        sendMessage(missingIdentifiableMessageRequest.getMessage(), peer);
        logger.info("Recoved message " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()) + " to " + String.valueOf(peer));
        this.sentMessagesCounter.inc();
    }

    private void uponOutConnectionDown(SecureOutConnectionDown secureOutConnectionDown, int i) {
        Peer peer = new Peer(secureOutConnectionDown.getNodeId(), secureOutConnectionDown.getNode());
        logger.trace("Host {} is down, cause: {}", peer, secureOutConnectionDown.getCause());
        if (this.neighbors.contains(peer)) {
            this.connectedNeighbors.remove(peer);
            this.pending.add(peer);
            openConnection(peer);
        } else {
            this.connectedNeighbors.remove(peer);
            this.pending.remove(peer);
            closeConnection(peer);
        }
    }

    private void uponOutConnectionFailed(SecureOutConnectionFailed<?> secureOutConnectionFailed, int i) {
        Peer peer = new Peer(secureOutConnectionFailed.getNodeId(), secureOutConnectionFailed.getNode());
        logger.trace("Connection to host {} failed, cause: {}", peer, secureOutConnectionFailed.getCause());
        if (this.neighbors.contains(peer)) {
            this.connectedNeighbors.remove(peer);
            this.pending.add(peer);
            openConnection(peer);
        } else {
            this.connectedNeighbors.remove(peer);
            this.pending.remove(peer);
            closeConnection(peer);
        }
    }

    private void uponOutConnectionUp(SecureOutConnectionUp secureOutConnectionUp, int i) {
        Peer peer = new Peer(secureOutConnectionUp.getNodeId(), secureOutConnectionUp.getNode());
        logger.trace("Peer (out) {} is up", peer);
        if (this.pending.remove(peer)) {
            if (this.neighbors.contains(peer)) {
                this.connectedNeighbors.add(peer);
            } else {
                this.connectedNeighbors.remove(peer);
                closeConnection(peer);
            }
        }
        List<Pair<GossipMessage, Peer>> remove = this.waitingCertificates.remove(Bytes.of(peer.getIdentity()));
        if (remove == null) {
            return;
        }
        Certificate trustedCertificate = babelSecurity.getTrustedCertificate(peer.getIdentity());
        if (trustedCertificate == null) {
            logger.error("Just opened a secure connection to " + String.valueOf(peer) + ", but it's certificate is missing from the trust stores... " + remove.size() + " broadcast messages dropped.");
            return;
        }
        for (Pair<GossipMessage, Peer> pair : remove) {
            validateAndProcessGossipMessage(trustedCertificate, (GossipMessage) pair.getLeft(), (Peer) pair.getRight());
        }
    }

    private void uponInConnectionUp(SecureInConnectionUp secureInConnectionUp, int i) {
        logger.debug("Peer (in) {} is up", secureInConnectionUp.getNode());
    }

    private void uponInConnectionDown(SecureInConnectionDown secureInConnectionDown, int i) {
        logger.debug("Connection from peer {} is down, cause: {}", secureInConnectionDown.getNode(), secureInConnectionDown.getCause());
    }

    public Peer getPeer() {
        return this.myself;
    }

    private void openConnection(Peer peer) {
        super.openConnection(peer.getHost(), peer.getIdentity());
    }

    private void closeConnection(Peer peer) {
        super.closeConnection(peer.getIdentity());
    }

    private void sendMessage(ProtoMessage protoMessage, Peer peer) {
        super.sendMessage(protoMessage, peer.getIdentity());
    }

    private Peer withMyPort(Peer peer) {
        return new Peer(peer.getIdentity(), new Host(peer.getAddress(), this.networkPort));
    }
}
