package pt.unl.fct.di.novasys.babel.protocols.antientropy;

import com.tardis.shaded.google.common.hash.BloomFilter;
import com.tardis.shaded.google.common.hash.Funnel;
import com.tardis.shaded.google.common.hash.Funnels;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.messages.AntiEntrophyAnnounce;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.timers.AntiEntrophyTimer;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.messages.IdentifiableProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.TCPChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/fct/di/novasys/babel/protocols/antientropy/AntiEntropy.class */
public class AntiEntropy extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger((Class<?>) AntiEntropy.class);
    public static final short PROTOCOL_ID = 1900;
    public static final String PROTOCOL_NAME = "AntiEntrophy";
    public static final String PAR_CHANNEL_ADDRESS = "AntiEntrophy.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "AntiEntrophy.Channel.Port";
    public static final String PAR_PERIOD = "AntiEntrophy.Period";
    public static final long DEFAULT_PERIOD = 60000;
    public final long antientropyPeriod;
    public static final String PAR_REMOVE_TIME = "AntiEntrophy.GCTimeout";
    public static final long DEFAULT_REMOVE_TIME = 600000;
    public final long removeTimeWindow;
    public static final String PAR_BLOOM_FILTER_FPP = "AntiEntophy.BloomFilter.FPP";
    public static final double DEFAULT_BLOOM_FILTER_FPP = 1.0E-4d;
    public final double bloomFilterFPP;
    public static final String PAR_GRACE_PERIOD = "AntiEntrophy.GracePeriod";
    public static final long DEFAULT_GRACE_PERIOD = 90000;
    public final long gracePeriod;
    public int networkPort;
    protected int channelId;
    protected Host myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private TreeSet<Element> buffer;
    private final boolean managingChannel;
    private final Random r;
    private Counter sentMessagesCounter;

    /* loaded from: input_file:pt/unl/fct/di/novasys/babel/protocols/antientropy/AntiEntropy$Element.class */
    private class Element implements Comparable<Element> {
        private final IdentifiableProtoMessage m;
        private final Long timestamp = Long.valueOf(System.currentTimeMillis());
        private final short protocolSource;

        public Element(IdentifiableProtoMessage identifiableProtoMessage, short s) {
            this.m = identifiableProtoMessage;
            this.protocolSource = s;
        }

        public UUID getMID() {
            return this.m.getMID();
        }

        public IdentifiableProtoMessage getMessage() {
            return this.m;
        }

        public long getTimestamp() {
            return this.timestamp.longValue();
        }

        public short getProtocolSource() {
            return this.protocolSource;
        }

        @Override // java.lang.Comparable
        public int compareTo(Element element) {
            return this.timestamp.compareTo(element.timestamp);
        }
    }

    public AntiEntropy(Properties properties, Host host) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 1900);
        this.sentMessagesCounter = (Counter) registerMetric(new Counter("SentMessages", "", new String[0]));
        this.myself = host;
        this.pending = new TreeSet();
        this.connectedNeighbors = new TreeSet();
        this.buffer = new TreeSet<>();
        if (properties.containsKey(PAR_PERIOD)) {
            this.antientropyPeriod = Long.parseLong(properties.getProperty(PAR_PERIOD));
        } else {
            this.antientropyPeriod = 60000L;
        }
        if (properties.containsKey(PAR_REMOVE_TIME)) {
            this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_REMOVE_TIME));
        } else {
            this.removeTimeWindow = DEFAULT_REMOVE_TIME;
        }
        if (properties.containsKey(PAR_GRACE_PERIOD)) {
            this.gracePeriod = Long.parseLong(properties.getProperty(PAR_GRACE_PERIOD));
        } else {
            this.gracePeriod = DEFAULT_GRACE_PERIOD;
        }
        if (properties.containsKey(PAR_BLOOM_FILTER_FPP)) {
            this.bloomFilterFPP = Double.parseDouble(properties.getProperty(PAR_BLOOM_FILTER_FPP));
        } else {
            this.bloomFilterFPP = 1.0E-4d;
        }
        this.r = new Random(System.currentTimeMillis());
        String str = null;
        String str2 = null;
        if (properties.containsKey(PAR_CHANNEL_ADDRESS) && properties.containsKey(PAR_CHANNEL_PORT)) {
            str = properties.getProperty(PAR_CHANNEL_ADDRESS);
            str2 = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Short.parseShort(str2);
            if (host == null) {
                new Host(InetAddress.getByName(str), this.networkPort);
            }
        } else if (this.myself != null) {
            str = host.getAddress().getHostAddress();
            this.networkPort = host.getPort();
            str2 = this.networkPort;
        }
        if (str == null || str2 == null) {
            this.managingChannel = false;
            subscribeNotification((short) 1, this::uponChannelAvailableNotifiction);
        } else {
            this.managingChannel = true;
            Properties properties2 = new Properties();
            properties2.setProperty("address", str);
            properties2.setProperty("port", str2);
            this.channelId = createChannel(TCPChannel.NAME, properties2);
            setDefaultChannel(this.channelId);
            registerMessageSerializer(this.channelId, (short) 901, AntiEntrophyAnnounce.serializer);
            registerMessageHandler(this.channelId, (short) 901, this::uponAntiEntrophyAnnounceMessage);
            subscribeNotification((short) 401, this::uponNeighborUp);
            subscribeNotification((short) 402, this::uponNeighborDown);
            registerChannelEventHandler(this.channelId, (short) 3, this::uponOutConnectionDown);
            registerChannelEventHandler(this.channelId, (short) 4, this::uponOutConnectionFailed);
            registerChannelEventHandler(this.channelId, (short) 5, this::uponOutConnectionUp);
            registerChannelEventHandler(this.channelId, (short) 2, this::uponInConnectionUp);
            registerChannelEventHandler(this.channelId, (short) 1, this::uponInConnectionDown);
        }
        subscribeNotification((short) 502, this::uponIdentifiableMessageNotification);
        subscribeNotification((short) 401, this::uponNeighborUp);
        subscribeNotification((short) 402, this::uponNeighborDown);
        registerTimerHandler((short) 901, this::uponAntiEntrophyTimer);
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        setupPeriodicTimer(new AntiEntrophyTimer(), this.antientropyPeriod, this.antientropyPeriod);
    }

    private void uponAntiEntrophyTimer(AntiEntrophyTimer antiEntrophyTimer, long j) {
        if (this.connectedNeighbors.size() > 0) {
            Host host = ((Host[]) this.connectedNeighbors.toArray(new Host[this.connectedNeighbors.size()]))[this.r.nextInt(this.connectedNeighbors.size())];
            BloomFilter create = BloomFilter.create((Funnel) Funnels.unencodedCharsFunnel(), this.buffer.size(), this.bloomFilterFPP);
            Iterator<Element> it = this.buffer.iterator();
            while (it.hasNext()) {
                create.put(it.next().getMID().toString());
            }
            logger.debug("Sent an AntiEntrophyAnnounce to " + host + " with " + this.buffer.size() + " elements.");
            sendMessage(new AntiEntrophyAnnounce(this.myself, create, PROTOCOL_ID), host);
            this.sentMessagesCounter.inc();
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis() - this.removeTimeWindow;
        Iterator<Element> it2 = this.buffer.iterator();
        while (it2.hasNext()) {
            Element next = it2.next();
            if (next.getTimestamp() >= currentTimeMillis) {
                break;
            } else {
                hashSet.add(next);
            }
        }
        this.buffer.removeAll(hashSet);
        logger.debug("Purged " + hashSet.size() + " elements from my buffer.");
    }

    private void uponAntiEntrophyAnnounceMessage(AntiEntrophyAnnounce antiEntrophyAnnounce, Host host, short s, int i) {
        logger.debug("Received and AntiEntrophyAnnounce reporting " + antiEntrophyAnnounce.getSetSize() + " messages from " + host);
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Element> it = this.buffer.iterator();
        while (it.hasNext()) {
            Element next = it.next();
            if (next.getTimestamp() >= currentTimeMillis - this.gracePeriod || next.getTimestamp() <= (currentTimeMillis - this.removeTimeWindow) + this.gracePeriod) {
                Logger logger2 = logger;
                UUID mid = next.getMID();
                long j = this.gracePeriod;
                long timestamp = next.getTimestamp();
                long j2 = this.removeTimeWindow;
                logger2.trace("Message " + mid + " excluded due to grace period (" + j + "). Received at: " + logger2 + " GC: " + timestamp + " now: " + logger2);
            } else if (antiEntrophyAnnounce.contains(next.getMID().toString())) {
                logger.debug("Messagee " + next.getMID() + " appears to be known by " + host);
            } else {
                logger.info("Requesting recovery of message " + next.getMID() + " to " + host);
                sendRequest(new MissingIdentifiableMessageRequest(next.getMessage(), host), next.getProtocolSource());
            }
        }
    }

    private void uponChannelAvailableNotifiction(ChannelAvailableNotification channelAvailableNotification, short s) {
        if (this.myself == null) {
            this.myself = channelAvailableNotification.getChannelListenData();
            this.networkPort = this.myself.getPort();
            this.channelId = channelAvailableNotification.getChannelID();
            registerSharedChannel(this.channelId);
            registerMessageSerializer(this.channelId, (short) 901, AntiEntrophyAnnounce.serializer);
            try {
                registerMessageHandler(this.channelId, (short) 901, this::uponAntiEntrophyAnnounceMessage);
                registerChannelEventHandler(this.channelId, (short) 3, this::uponOutConnectionDown);
                registerChannelEventHandler(this.channelId, (short) 4, this::uponOutConnectionFailed);
                registerChannelEventHandler(this.channelId, (short) 5, this::uponOutConnectionUp);
                registerChannelEventHandler(this.channelId, (short) 2, this::uponInConnectionUp);
                registerChannelEventHandler(this.channelId, (short) 1, this::uponInConnectionDown);
            } catch (HandlerRegistrationException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }

    private void uponIdentifiableMessageNotification(IdentifiableMessageNotification identifiableMessageNotification, short s) {
        this.buffer.add(new Element(identifiableMessageNotification.getMessage(), identifiableMessageNotification.getSourceProtocol()));
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        Host host = new Host(neighborUp.getPeer().getAddress(), this.networkPort);
        if (!this.managingChannel) {
            this.connectedNeighbors.add(host);
        } else {
            if (this.connectedNeighbors.contains(host) || !this.pending.add(host)) {
                return;
            }
            openConnection(host);
        }
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        Host host = new Host(neighborDown.getPeer().getAddress(), this.networkPort);
        if (!this.managingChannel) {
            this.connectedNeighbors.remove(host);
        } else if (this.connectedNeighbors.remove(host)) {
            closeConnection(host);
        } else if (this.pending.remove(host)) {
            closeConnection(host);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        if (this.connectedNeighbors.contains(node) || this.pending.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        if (this.connectedNeighbors.contains(node) || this.pending.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        if (!this.connectedNeighbors.contains(node) && !this.pending.contains(node)) {
            closeConnection(node);
        } else {
            this.pending.remove(node);
            this.connectedNeighbors.add(node);
        }
    }

    private void uponInConnectionUp(InConnectionUp inConnectionUp, int i) {
        logger.trace("Host (in) {} is up", inConnectionUp.getNode());
    }

    private void uponInConnectionDown(InConnectionDown inConnectionDown, int i) {
        logger.trace("Connection from host {} is down, cause: {}", inConnectionDown.getNode(), inConnectionDown.getCause());
    }

    public Host getHost() {
        return this.myself;
    }
}
