/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.babel.protocols.antientropy;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.messages.AntiEntrophyAnnounce;
import pt.unl.fct.di.novasys.babel.protocols.antientropy.timers.AntiEntrophyTimer;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.messages.IdentifiableProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

public class AntiEntropy
extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger(AntiEntropy.class);
    public static final short PROTOCOL_ID = 1900;
    public static final String PROTOCOL_NAME = "AntiEntrophy";
    public static final String PAR_CHANNEL_ADDRESS = "AntiEntrophy.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "AntiEntrophy.Channel.Port";
    public static final String PAR_PERIOD = "AntiEntrophy.Period";
    public static final long DEFAULT_PERIOD = 60000L;
    public final long antientropyPeriod;
    public static final String PAR_REMOVE_TIME = "AntiEntrophy.GCTimeout";
    public static final long DEFAULT_REMOVE_TIME = 600000L;
    public final long removeTimeWindow;
    public static final String PAR_BLOOM_FILTER_FPP = "AntiEntophy.BloomFilter.FPP";
    public static final double DEFAULT_BLOOM_FILTER_FPP = 1.0E-4;
    public final double bloomFilterFPP;
    public static final String PAR_GRACE_PERIOD = "AntiEntrophy.GracePeriod";
    public static final long DEFAULT_GRACE_PERIOD = 90000L;
    public final long gracePeriod;
    public int networkPort;
    protected int channelId;
    protected Host myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private TreeSet<Element> buffer;
    private final boolean managingChannel;
    private final Random r;

    public AntiEntropy(Properties properties, Host myself) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)1900);
        this.myself = myself;
        this.pending = new TreeSet<Host>();
        this.connectedNeighbors = new TreeSet<Host>();
        this.buffer = new TreeSet();
        this.antientropyPeriod = properties.containsKey(PAR_PERIOD) ? Long.parseLong(properties.getProperty(PAR_PERIOD)) : 60000L;
        this.removeTimeWindow = properties.containsKey(PAR_REMOVE_TIME) ? Long.parseLong(properties.getProperty(PAR_REMOVE_TIME)) : 600000L;
        this.gracePeriod = properties.containsKey(PAR_GRACE_PERIOD) ? Long.parseLong(properties.getProperty(PAR_GRACE_PERIOD)) : 90000L;
        this.bloomFilterFPP = properties.containsKey(PAR_BLOOM_FILTER_FPP) ? Double.parseDouble(properties.getProperty(PAR_BLOOM_FILTER_FPP)) : 1.0E-4;
        this.r = new Random(System.currentTimeMillis());
        String address = null;
        Object port = null;
        if (properties.containsKey(PAR_CHANNEL_ADDRESS) && properties.containsKey(PAR_CHANNEL_PORT)) {
            address = properties.getProperty(PAR_CHANNEL_ADDRESS);
            port = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Short.parseShort((String)port);
            if (myself == null) {
                myself = new Host(InetAddress.getByName(address), this.networkPort);
            }
        } else if (this.myself != null) {
            address = myself.getAddress().getHostAddress();
            this.networkPort = myself.getPort();
            port = "" + this.networkPort;
        }
        if (address != null && port != null) {
            this.managingChannel = true;
            Properties channelProps = new Properties();
            channelProps.setProperty("address", address);
            channelProps.setProperty("port", (String)port);
            this.channelId = this.createChannel("TCPChannel", channelProps);
            this.setDefaultChannel(this.channelId);
            this.registerMessageSerializer(this.channelId, (short)901, AntiEntrophyAnnounce.serializer);
            this.registerMessageHandler(this.channelId, (short)901, this::uponAntiEntrophyAnnounceMessage);
            this.subscribeNotification((short)401, this::uponNeighborUp);
            this.subscribeNotification((short)402, this::uponNeighborDown);
            this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
            this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
            this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
            this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
            this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
        } else {
            this.managingChannel = false;
            this.subscribeNotification((short)1, this::uponChannelAvailableNotifiction);
        }
        this.subscribeNotification((short)502, this::uponIdentifiableMessageNotification);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        this.registerTimerHandler((short)901, this::uponAntiEntrophyTimer);
    }

    @Override
    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.setupPeriodicTimer(new AntiEntrophyTimer(), this.antientropyPeriod, this.antientropyPeriod);
    }

    private void uponAntiEntrophyTimer(AntiEntrophyTimer timer, long time) {
        if (this.connectedNeighbors.size() > 0) {
            Host peer = this.connectedNeighbors.toArray(new Host[this.connectedNeighbors.size()])[this.r.nextInt(this.connectedNeighbors.size())];
            BloomFilter<CharSequence> bf = BloomFilter.create(Funnels.unencodedCharsFunnel(), this.buffer.size(), this.bloomFilterFPP);
            for (Element e : this.buffer) {
                bf.put(e.getMID().toString());
            }
            logger.debug("Sent an AntiEntrophyAnnounce to " + String.valueOf(peer) + " with " + this.buffer.size() + " elements.");
            this.sendMessage(new AntiEntrophyAnnounce(this.myself, bf, 1900), peer);
        }
        HashSet<Element> gc = new HashSet<Element>();
        long cleanUpBarrier = System.currentTimeMillis() - this.removeTimeWindow;
        for (Element e : this.buffer) {
            if (e.getTimestamp() >= cleanUpBarrier) break;
            gc.add(e);
        }
        this.buffer.removeAll(gc);
        logger.debug("Purged " + gc.size() + " elements from my buffer.");
    }

    private void uponAntiEntrophyAnnounceMessage(AntiEntrophyAnnounce msg, Host sender, short protoID, int cID) {
        logger.debug("Received and AntiEntrophyAnnounce reporting " + msg.getSetSize() + " messages from " + String.valueOf(sender));
        long now = System.currentTimeMillis();
        for (Element e : this.buffer) {
            if (e.getTimestamp() < now - this.gracePeriod && e.getTimestamp() > now - this.removeTimeWindow + this.gracePeriod) {
                if (!msg.contains(e.getMID().toString())) {
                    logger.info("Requesting recovery of message " + String.valueOf(e.getMID()) + " to " + String.valueOf(sender));
                    this.sendRequest(new MissingIdentifiableMessageRequest(e.getMessage(), sender), e.getProtocolSource());
                    continue;
                }
                logger.debug("Messagee " + String.valueOf(e.getMID()) + " appears to be known by " + String.valueOf(sender));
                continue;
            }
            logger.trace("Message " + String.valueOf(e.getMID()) + " excluded due to grace period (" + this.gracePeriod + "). Received at: " + e.getTimestamp() + " GC: " + this.removeTimeWindow + " now: " + now);
        }
    }

    private void uponChannelAvailableNotifiction(ChannelAvailableNotification event, short protoID) {
        if (this.myself == null) {
            this.myself = event.getChannelListenData();
            this.networkPort = this.myself.getPort();
            this.channelId = event.getChannelID();
            this.registerSharedChannel(this.channelId);
            this.registerMessageSerializer(this.channelId, (short)901, AntiEntrophyAnnounce.serializer);
            try {
                this.registerMessageHandler(this.channelId, (short)901, this::uponAntiEntrophyAnnounceMessage);
                this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
                this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
                this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
                this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
                this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
            }
            catch (HandlerRegistrationException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }

    private void uponIdentifiableMessageNotification(IdentifiableMessageNotification event, short protoID) {
        this.buffer.add(new Element(this, event.getMessage(), event.getSourceProtocol()));
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Host h2 = new Host(up.getPeer().getAddress(), this.networkPort);
        if (this.managingChannel) {
            if (!this.connectedNeighbors.contains(h2) && this.pending.add(h2)) {
                this.openConnection(h2);
            }
        } else {
            this.connectedNeighbors.add(h2);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h2 = new Host(down.getPeer().getAddress(), this.networkPort);
        if (this.managingChannel) {
            if (this.connectedNeighbors.remove(h2)) {
                this.closeConnection(h2);
            } else if (this.pending.remove(h2)) {
                this.closeConnection(h2);
            }
        } else {
            this.connectedNeighbors.remove(h2);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h2, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h2) || this.pending.contains(h2)) {
            this.connectedNeighbors.remove(h2);
            this.pending.add(h2);
            this.openConnection(h2);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h2, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h2) || this.pending.contains(h2)) {
            this.connectedNeighbors.remove(h2);
            this.pending.add(h2);
            this.openConnection(h2);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h2);
        if (!this.connectedNeighbors.contains(h2) && !this.pending.contains(h2)) {
            this.closeConnection(h2);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.trace("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.trace("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }

    private class Element
    implements Comparable<Element> {
        private final IdentifiableProtoMessage m;
        private final Long timestamp;
        private final short protocolSource;

        public Element(AntiEntropy antiEntropy, IdentifiableProtoMessage m4, short protocolSource) {
            this.m = m4;
            this.timestamp = System.currentTimeMillis();
            this.protocolSource = protocolSource;
        }

        public UUID getMID() {
            return this.m.getMID();
        }

        public IdentifiableProtoMessage getMessage() {
            return this.m;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public short getProtocolSource() {
            return this.protocolSource;
        }

        @Override
        public int compareTo(Element e) {
            return this.timestamp.compareTo(e.timestamp);
        }
    }
}

