/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.babel.protocols.antientropy;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.Funnels;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.generic.ProtoRequest;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.metrics.Metric;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.messages.AntiEntrophyAnnounce;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.timers.AntiEntrophyTimer;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.messages.SignedIdentifiableProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.secure.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.secure.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.secure.events.SecureInConnectionDown;
import pt.unl.fct.di.novasys.channel.secure.events.SecureInConnectionUp;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionDown;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionFailed;
import pt.unl.fct.di.novasys.channel.secure.events.SecureOutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

public class AntiEntropy
extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger(AntiEntropy.class);
    public static final short PROTOCOL_ID = 1900;
    public static final String PROTOCOL_NAME = "AntiEntrophy";
    public static final String PAR_CHANNEL_ADDRESS = "AntiEntrophy.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "AntiEntrophy.Channel.Port";
    public static final String PAR_PERIOD = "AntiEntrophy.Period";
    public static final long DEFAULT_PERIOD = 60000L;
    public final long antientropyPeriod;
    public static final String PAR_REMOVE_TIME = "AntiEntrophy.GCTimeout";
    public static final long DEFAULT_REMOVE_TIME = 600000L;
    public final long removeTimeWindow;
    public static final String PAR_BLOOM_FILTER_FPP = "AntiEntophy.BloomFilter.FPP";
    public static final double DEFAULT_BLOOM_FILTER_FPP = 1.0E-4;
    public final double bloomFilterFPP;
    public static final String PAR_GRACE_PERIOD = "AntiEntrophy.GracePeriod";
    public static final long DEFAULT_GRACE_PERIOD = 90000L;
    public final long gracePeriod;
    public int networkPort;
    protected int channelId;
    protected Peer myself;
    private Set<Peer> pending;
    private Set<Peer> connectedNeighbors;
    private TreeSet<Element> buffer;
    private final boolean managingChannel;
    private final Random r;
    private final Counter sentMessagesCounter;

    public AntiEntropy(Properties properties, Host myAddr) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)1900);
        this.myself = new Peer(babelSecurity.getDefaultIdentity().identity(), myAddr);
        this.pending = new TreeSet<Peer>();
        this.connectedNeighbors = new TreeSet<Peer>();
        this.buffer = new TreeSet();
        this.antientropyPeriod = properties.containsKey(PAR_PERIOD) ? Long.parseLong(properties.getProperty(PAR_PERIOD)) : 60000L;
        this.removeTimeWindow = properties.containsKey(PAR_REMOVE_TIME) ? Long.parseLong(properties.getProperty(PAR_REMOVE_TIME)) : 600000L;
        this.gracePeriod = properties.containsKey(PAR_GRACE_PERIOD) ? Long.parseLong(properties.getProperty(PAR_GRACE_PERIOD)) : 90000L;
        this.bloomFilterFPP = properties.containsKey(PAR_BLOOM_FILTER_FPP) ? Double.parseDouble(properties.getProperty(PAR_BLOOM_FILTER_FPP)) : 1.0E-4;
        this.r = new Random(System.currentTimeMillis());
        this.sentMessagesCounter = (Counter)this.registerMetric((Metric)new Counter.Builder("SentMessages", "", new String[0]).build());
        String address = null;
        Object port = null;
        if (properties.containsKey(PAR_CHANNEL_ADDRESS) && properties.containsKey(PAR_CHANNEL_PORT)) {
            address = properties.getProperty(PAR_CHANNEL_ADDRESS);
            port = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Short.parseShort((String)port);
            if (myAddr == null) {
                myAddr = new Host(InetAddress.getByName(address), this.networkPort);
            }
        } else if (myAddr != null) {
            address = myAddr.getAddress().getHostAddress();
            this.networkPort = myAddr.getPort();
            port = "" + this.networkPort;
        }
        if (address != null && port != null) {
            this.managingChannel = true;
            Properties channelProps = new Properties();
            channelProps.setProperty("address", address);
            channelProps.setProperty("port", (String)port);
            this.channelId = this.createSecureChannel("AuthChannel", channelProps);
            this.setDefaultChannel(this.channelId);
            this.registerMessageSerializer(this.channelId, (short)901, AntiEntrophyAnnounce.serializer);
            this.registerMessageHandler(this.channelId, (short)901, this::uponAntiEntrophyAnnounceMessage, this::uponMessageFailed);
            this.subscribeNotification((short)451, this::uponNeighborUp);
            this.subscribeNotification((short)452, this::uponNeighborDown);
            this.registerChannelEventHandler(this.channelId, (short)13, this::uponOutConnectionDown);
            this.registerChannelEventHandler(this.channelId, (short)14, this::uponOutConnectionFailed);
            this.registerChannelEventHandler(this.channelId, (short)15, this::uponOutConnectionUp);
            this.registerChannelEventHandler(this.channelId, (short)12, this::uponInConnectionUp);
            this.registerChannelEventHandler(this.channelId, (short)11, this::uponInConnectionDown);
        } else {
            this.managingChannel = false;
            this.subscribeNotification((short)1, this::uponChannelAvailableNotifiction);
        }
        this.subscribeNotification((short)552, this::uponIdentifiableMessageNotification);
        this.subscribeNotification((short)451, this::uponNeighborUp);
        this.subscribeNotification((short)452, this::uponNeighborDown);
        this.registerTimerHandler((short)901, this::uponAntiEntrophyTimer);
    }

    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.setupPeriodicTimer(new AntiEntrophyTimer(), this.antientropyPeriod, this.antientropyPeriod);
    }

    private void uponMessageFailed(ProtoMessage msg, Host to, byte[] toId, short destProto, Throwable cause, int channelId) {
        logger.warn("Message failed: msg={}, to={}, destProto={}, cause={}, channelId={}", (Object)msg, (Object)new Peer(toId, to), (Object)destProto, (Object)cause, (Object)channelId);
    }

    private void uponAntiEntrophyTimer(AntiEntrophyTimer timer, long time) {
        if (this.connectedNeighbors.size() > 0) {
            Peer peer = this.connectedNeighbors.toArray(new Peer[this.connectedNeighbors.size()])[this.r.nextInt(this.connectedNeighbors.size())];
            BloomFilter bf = BloomFilter.create((Funnel)Funnels.unencodedCharsFunnel(), (int)this.buffer.size(), (double)this.bloomFilterFPP);
            for (Element e : this.buffer) {
                bf.put((Object)e.getMID().toString());
            }
            logger.debug("Sent an AntiEntrophyAnnounce to " + String.valueOf(peer) + " with " + this.buffer.size() + " elements.");
            this.sendMessage(new AntiEntrophyAnnounce(this.myself, (BloomFilter<String>)bf, 1900), peer);
            this.sentMessagesCounter.inc();
        }
        HashSet<Element> gc = new HashSet<Element>();
        long cleanUpBarrier = System.currentTimeMillis() - this.removeTimeWindow;
        for (Element e : this.buffer) {
            if (e.getTimestamp() >= cleanUpBarrier) break;
            gc.add(e);
        }
        this.buffer.removeAll(gc);
        logger.debug("Purged " + gc.size() + " elements from my buffer.");
    }

    private void uponAntiEntrophyAnnounceMessage(AntiEntrophyAnnounce msg, Host sender, byte[] senderId, short protoID, int cID) {
        Peer peer = new Peer(senderId, sender);
        logger.debug("Received and AntiEntrophyAnnounce reporting " + msg.getSetSize() + " messages from " + String.valueOf(sender));
        long now = System.currentTimeMillis();
        for (Element e : this.buffer) {
            if (e.getTimestamp() < now - this.gracePeriod && e.getTimestamp() > now - this.removeTimeWindow + this.gracePeriod) {
                if (!msg.contains(e.getMID().toString())) {
                    logger.info("Requesting recovery of message " + String.valueOf(e.getMID()) + " to " + String.valueOf(sender));
                    this.sendRequest((ProtoRequest)new MissingIdentifiableMessageRequest(e.getMessage(), peer), e.getProtocolSource());
                    continue;
                }
                logger.debug("Messagee " + String.valueOf(e.getMID()) + " appears to be known by " + String.valueOf(sender));
                continue;
            }
            logger.trace("Message " + String.valueOf(e.getMID()) + " excluded due to grace period (" + this.gracePeriod + "). Received at: " + e.getTimestamp() + " GC: " + this.removeTimeWindow + " now: " + now);
        }
    }

    private void uponChannelAvailableNotifiction(ChannelAvailableNotification event, short protoID) {
        if (this.myself.getHost() == null) {
            this.myself.setHost(event.getChannelListenData());
            this.networkPort = this.myself.getPort();
            this.channelId = event.getChannelID();
            this.registerSharedChannel(this.channelId);
            this.registerMessageSerializer(this.channelId, (short)901, AntiEntrophyAnnounce.serializer);
            try {
                this.registerMessageHandler(this.channelId, (short)901, this::uponAntiEntrophyAnnounceMessage);
                this.registerChannelEventHandler(this.channelId, (short)13, this::uponOutConnectionDown);
                this.registerChannelEventHandler(this.channelId, (short)14, this::uponOutConnectionFailed);
                this.registerChannelEventHandler(this.channelId, (short)15, this::uponOutConnectionUp);
                this.registerChannelEventHandler(this.channelId, (short)12, this::uponInConnectionUp);
                this.registerChannelEventHandler(this.channelId, (short)11, this::uponInConnectionDown);
            }
            catch (HandlerRegistrationException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }

    private void uponIdentifiableMessageNotification(IdentifiableMessageNotification event, short protoID) {
        this.buffer.add(new Element(this, event.getMessage(), event.getSourceProtocol()));
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Peer peer = up.getPeer();
        if (this.managingChannel) {
            if (!this.connectedNeighbors.contains(peer) && this.pending.add(peer)) {
                this.openConnection(peer);
            }
        } else {
            this.connectedNeighbors.add(peer);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Peer peer = down.getPeer().clone();
        if (this.managingChannel) {
            if (this.connectedNeighbors.remove(peer)) {
                this.closeConnection(peer);
            } else if (this.pending.remove(peer)) {
                this.closeConnection(peer);
            }
        } else {
            this.connectedNeighbors.remove(peer);
        }
    }

    private void uponOutConnectionDown(SecureOutConnectionDown event, int channelId) {
        Peer peer = new Peer(event.getNodeId(), event.getNode());
        logger.trace("Peer {} is down, cause: {}", (Object)peer, (Object)event.getCause());
        if (this.connectedNeighbors.contains(peer) || this.pending.contains(peer)) {
            this.connectedNeighbors.remove(peer);
            this.pending.add(peer);
            this.openConnection(peer);
        }
    }

    private void uponOutConnectionFailed(SecureOutConnectionFailed<?> event, int channelId) {
        Peer peer = new Peer(event.getNodeId(), event.getNode());
        logger.trace("Connection to peer {} failed, cause: {}", (Object)peer, (Object)event.getCause());
        if (this.connectedNeighbors.contains(peer) || this.pending.contains(peer)) {
            this.connectedNeighbors.remove(peer);
            this.pending.add(peer);
            this.openConnection(peer);
        }
    }

    private void uponOutConnectionUp(SecureOutConnectionUp event, int channelId) {
        Peer peer = new Peer(event.getNodeId(), event.getNode());
        logger.trace("Peer (out) {} is up", (Object)peer);
        if (!this.connectedNeighbors.contains(peer) && !this.pending.contains(peer)) {
            this.closeConnection(peer);
            return;
        }
        this.pending.remove(peer);
        this.connectedNeighbors.add(peer);
    }

    private void uponInConnectionUp(SecureInConnectionUp event, int channelId) {
        logger.trace("Peer (in) {} is up", (Object)new Peer(event.getNodeId(), event.getNode()));
    }

    private void uponInConnectionDown(SecureInConnectionDown event, int channelId) {
        logger.trace("Connection from peer {} is down, cause: {}", (Object)new Peer(event.getNodeId(), event.getNode()), (Object)event.getCause());
    }

    public Peer getPeer() {
        return this.myself;
    }

    public Host getHost() {
        return this.myself.getHost();
    }

    private void openConnection(Peer peer) {
        super.openConnection(peer.getHost(), peer.getIdentity());
    }

    private void closeConnection(Peer peer) {
        super.closeConnection(peer.getIdentity());
    }

    private void sendMessage(ProtoMessage msg, Peer dest) {
        super.sendMessage(msg, dest.getIdentity());
    }

    private class Element
    implements Comparable<Element> {
        private final SignedIdentifiableProtoMessage m;
        private final Long timestamp;
        private final short protocolSource;

        public Element(AntiEntropy antiEntropy, SignedIdentifiableProtoMessage m, short protocolSource) {
            this.m = m;
            this.timestamp = System.currentTimeMillis();
            this.protocolSource = protocolSource;
        }

        public UUID getMID() {
            return this.m.getMID();
        }

        public SignedIdentifiableProtoMessage getMessage() {
            return this.m;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public short getProtocolSource() {
            return this.protocolSource;
        }

        @Override
        public int compareTo(Element e) {
            return this.timestamp.compareTo(e.timestamp);
        }
    }
}

