/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.babel.protocols.eagerpush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.AutoConfigureParameter;
import pt.unl.fct.di.novasys.babel.core.adaptive.AdaptiveProtocol;
import pt.unl.fct.di.novasys.babel.core.adaptive.annotations.Adaptive;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.generic.ProtoNotification;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.metrics.Metric;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.messages.IdentifiableProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.messages.GossipMessage;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.babel.utils.recordexporter.utils.ExportRecordNotification;
import pt.unl.fct.di.novasys.babel.utils.recordexporter.utils.ReceiveRecord;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

public class AdaptiveEagerPushGossipBroadcast
extends AdaptiveProtocol {
    private static final Logger logger = LogManager.getLogger(AdaptiveEagerPushGossipBroadcast.class);
    public static final short PROTOCOL_ID = 1601;
    public static final String PROTOCOL_NAME = "AdaptiveEagerPushGossipBroadcast";
    public static final String PAR_CHANNEL_ADDRESS = "EagerPushGossipBroadcast.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "EagerPushGossipBroadcast.Channel.Port";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String DEFAULT_FANOUT = "4";
    public static final String PAR_DELIVERY_TIMEOUT = "EagerPushGossipBroadcast.DeliveredTimeout";
    public static final String DEFAULT_DELIVERY_TIMEOUT = "600000";
    public static final String PAR_SUPPORT_ANTIENTROPHY = "EagerPushGossipBroadcast.SupportAntiEntropy";
    public static final String DEFAULT_SUPPORT_ANTIENTROPHY = "false";
    public static final String PAR_USE_DEFAULT_CONFIG = "EagerPushGossipBroadcast.UseDefaultConfig";
    public static final String DEFAULT_USE_DEFAULT_CONFIG = "true";
    public static final String PAR_DNS_HOST = "EagerPushGossipBroadcast.DNSHost";
    @Adaptive
    @AutoConfigureParameter
    public int fanout;
    @AutoConfigureParameter
    public long removeTimeWindow;
    @AutoConfigureParameter
    public Boolean supportAntiEntrophy;
    private final Boolean useDefaultConfig;
    protected int channelId;
    public final int networkPort;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private Set<Host> neighbors;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> receivedTimestamps;
    private final Random r;
    private final String DNSHost;
    private Counter sentMessagesCounter = (Counter)this.registerMetric((Metric)new Counter("SentMessages", "", new String[0]));
    private Counter receivedMessagesCounter = (Counter)this.registerMetric((Metric)new Counter("ReceivedMessages", "", new String[0]));
    private Counter duplicateMessagesCounter = (Counter)this.registerMetric((Metric)new Counter("DuplicateMessages", "", new String[0]));

    public AdaptiveEagerPushGossipBroadcast(String channelName, Properties properties, Host myself) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)1601);
        this.setMyself(myself);
        this.useDefaultConfig = Boolean.parseBoolean(properties.getProperty(PAR_USE_DEFAULT_CONFIG, DEFAULT_USE_DEFAULT_CONFIG));
        this.pending = new TreeSet<Host>();
        this.connectedNeighbors = new TreeSet<Host>();
        this.neighbors = new TreeSet<Host>();
        this.receivedInOrder = new LinkedList();
        this.receivedTimestamps = new HashMap();
        this.r = new Random(System.currentTimeMillis());
        this.DNSHost = properties.getProperty(PAR_DNS_HOST, null);
        String address = null;
        String port = null;
        address = properties.getProperty(PAR_CHANNEL_ADDRESS, myself.getAddress().getHostAddress().toString());
        port = properties.getProperty(PAR_CHANNEL_PORT, "" + myself.getPort());
        this.networkPort = Integer.parseInt(port);
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        this.channelId = this.createChannel("TCPChannel", channelProps);
        this.setDefaultChannel(this.channelId);
        logger.debug("Created new channel with id " + this.channelId + " and bounded to: " + address + ":" + port);
        this.setDefaultChannel(this.channelId);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        if (this.useDefaultConfig.booleanValue()) {
            this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
            this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, DEFAULT_DELIVERY_TIMEOUT));
            this.supportAntiEntrophy = Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY, DEFAULT_SUPPORT_ANTIENTROPHY));
        } else {
            this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, "-1"));
            this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, "-1"));
            this.supportAntiEntrophy = properties.containsKey(PAR_SUPPORT_ANTIENTROPHY) ? Boolean.valueOf(Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY))) : null;
        }
        logger.info("Configured Fanout={}", (Object)this.fanout);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
    }

    private void pushGossipMessage(GossipMessage msg, ArrayList<Host> validTargets, int toSend) {
        while (validTargets.size() > 0 && toSend > 0) {
            Host target = validTargets.remove(this.r.nextInt(validTargets.size()));
            this.sendMessage((ProtoMessage)msg.clone(), target);
            this.sentMessagesCounter.inc();
            logger.debug("Forwarded gossip message " + String.valueOf(msg.getMID()) + " to " + String.valueOf(target) + " (valid targets: " + validTargets.size() + " missing: " + --toSend + ")");
        }
    }

    private void uponBroadcastRequest(BroadcastRequest request, short protoID) {
        GossipMessage msg = new GossipMessage(request.getTimestamp(), this.getMyself(), request.getPayload(), protoID);
        this.deliverMessage(msg.clone());
        logger.debug("Received request to broadcast. currently I have " + this.connectedNeighbors.size() + " targets. Fanout is " + this.fanout);
        ArrayList<Host> validTargets = new ArrayList<Host>(this.connectedNeighbors);
        this.pushGossipMessage(msg.clone(), validTargets, this.fanout);
        this.cleanUp();
    }

    private void deliverMessage(GossipMessage msg) {
        this.receivedInOrder.addLast(msg.getMID());
        this.receivedTimestamps.put(msg.getMID(), System.currentTimeMillis());
        ReceiveRecord record = new ReceiveRecord(this.getMyself(), msg.getMID(), System.currentTimeMillis(), msg.getTimestamp().getTime(), msg.getSender(), (int)msg.getHopCount());
        this.triggerNotification((ProtoNotification)new ExportRecordNotification(record));
        this.triggerNotification((ProtoNotification)new BroadcastDelivery(msg.getSender(), msg.getPayload(), msg.getTimestamp()));
        if (this.supportAntiEntrophy.booleanValue()) {
            this.triggerNotification((ProtoNotification)new IdentifiableMessageNotification((IdentifiableProtoMessage)msg, 1601));
        }
    }

    private void cleanUp() {
        long now = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.receivedTimestamps.get(this.receivedInOrder.pollFirst()) + this.removeTimeWindow < now) {
            this.receivedTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void uponGossipMessage(GossipMessage msg, Host sender, short protoID, int cID) {
        if (!this.receivedTimestamps.containsKey(msg.getMID())) {
            msg.incrementHopCount();
            this.deliverMessage(msg.clone());
            this.receivedMessagesCounter.inc();
            ArrayList<Host> validTargets = new ArrayList<Host>(this.connectedNeighbors);
            validTargets.remove(sender);
            validTargets.remove(msg.getSender());
            this.pushGossipMessage(msg, validTargets, this.fanout);
            this.cleanUp();
        } else {
            ReceiveRecord record = new ReceiveRecord(this.getMyself(), msg.getMID(), System.currentTimeMillis(), msg.getTimestamp().getTime(), msg.getSender(), (int)msg.getHopCount());
            this.triggerNotification((ProtoNotification)new ExportRecordNotification(record));
            this.duplicateMessagesCounter.inc();
        }
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        logger.debug("NeighborUp received: " + String.valueOf(up.getPeer()));
        Host h = new Host(up.getPeer().getAddress(), this.networkPort);
        this.neighbors.add(h);
        if (!this.connectedNeighbors.contains(h) && this.pending.add(h)) {
            this.openConnection(h);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h = new Host(down.getPeer().getAddress(), this.networkPort);
        this.neighbors.remove(h);
        if (this.connectedNeighbors.remove(h)) {
            this.closeConnection(h);
        } else if (this.pending.remove(h)) {
            this.closeConnection(h);
        }
    }

    private void uponMissingMessageRequest(MissingIdentifiableMessageRequest req, short protoID) {
        logger.debug("Received an anti-entrophy request indicating that " + String.valueOf(req.getDestination()) + " is missing " + String.valueOf(req.getMessage().getMID()));
        Host d = null;
        if (this.connectedNeighbors.contains(req.getDestination()) || this.pending.contains(req.getDestination())) {
            d = req.getDestination();
            logger.debug("Destination in requeest is among my connections (active or pending)");
        } else {
            Host tmp = new Host(req.getDestination().getAddress(), this.networkPort);
            if (this.connectedNeighbors.contains(tmp) || this.pending.contains(tmp)) {
                d = tmp;
                logger.debug("Destination was translated to my protocol channel (" + String.valueOf(tmp) + ") and was found among my connections (active or pending)");
            }
        }
        if (d != null) {
            this.sendMessage((ProtoMessage)req.getMessage(), d);
            this.sentMessagesCounter.inc();
            logger.info("Recoved message " + String.valueOf(req.getMessage().getMID()) + " to " + String.valueOf(d));
        } else {
            logger.info("Unable to recover message " + String.valueOf(req.getMessage().getMID()) + " to " + String.valueOf(req.getDestination()));
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h, (Object)event.getCause());
        if (this.neighbors.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        } else {
            this.connectedNeighbors.remove(h);
            this.pending.remove(h);
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h, (Object)event.getCause());
        if (this.neighbors.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        } else {
            this.connectedNeighbors.remove(h);
            this.pending.remove(h);
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h);
        if (this.neighbors.contains(h)) {
            this.pending.remove(h);
            this.connectedNeighbors.add(h);
        } else {
            this.pending.remove(h);
            this.connectedNeighbors.remove(h);
            this.closeConnection(h);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.debug("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.debug("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }

    public void setFanout(Integer fanout) {
        if (fanout == null) {
            return;
        }
        logger.debug("Setting adaptive fanout to {} ", (Object)fanout);
        this.fanout = fanout;
    }

    public void start() {
        try {
            if (this.supportAntiEntrophy.booleanValue()) {
                this.registerRequestHandler((short)503, this::uponMissingMessageRequest);
            }
            this.registerMessageSerializer(this.channelId, (short)1601, GossipMessage.serializer);
            this.registerMessageHandler(this.channelId, (short)1601, this::uponGossipMessage);
            this.registerRequestHandler((short)501, this::uponBroadcastRequest);
        }
        catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        this.triggerNotification((ProtoNotification)new ChannelAvailableNotification(1601, PROTOCOL_NAME, this.channelId, "TCPChannel", this.getMyself()));
    }

    public boolean readyToStart() {
        return this.fanout != -1 && this.removeTimeWindow != -1L && this.supportAntiEntrophy != null;
    }

    public boolean needsDiscovery() {
        return false;
    }

    public void addContact(Host host) {
    }

    public Host getContact() {
        return null;
    }

    public String getFirstFanout() {
        return this.fanout == -1 ? null : "" + this.fanout;
    }

    public String getFirstRemoveTimeWindow() {
        return this.removeTimeWindow == -1L ? null : "" + this.removeTimeWindow;
    }

    public String getFirstSupportAntiEntrophy() {
        return this.supportAntiEntrophy == null ? null : this.supportAntiEntrophy.toString();
    }

    public void setFirstFanout(String fanout) {
        if (fanout == null) {
            return;
        }
        logger.debug("Fanout set to {} on startup", (Object)fanout);
        this.fanout = Integer.parseInt(fanout);
    }

    public void setFirstRemoveTimeWindow(String removeTimeWindow) {
        if (removeTimeWindow == null) {
            return;
        }
        logger.debug("RemoveTimeWindow set to {} on startup", (Object)removeTimeWindow);
        this.removeTimeWindow = Long.parseLong(removeTimeWindow);
    }

    public void setFirstSupportAntiEntrophy(String supportAntiEntrophy) {
        if (supportAntiEntrophy == null) {
            return;
        }
        logger.debug("SupportAntiEntrophy set to {} on startup", (Object)supportAntiEntrophy);
        this.supportAntiEntrophy = Boolean.parseBoolean(supportAntiEntrophy);
    }

    public String getHost() {
        return this.DNSHost;
    }
}

