package pt.unl.fct.di.novasys.babel.protocols.eagerpush;

import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.AutoConfigureParameter;
import pt.unl.fct.di.novasys.babel.core.adaptive.AdaptiveProtocol;
import pt.unl.fct.di.novasys.babel.core.adaptive.annotations.Adaptive;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.metrics.Record;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.messages.GossipMessage;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/fct/di/novasys/babel/protocols/eagerpush/AdaptiveEagerPushGossipBroadcast.class */
public class AdaptiveEagerPushGossipBroadcast extends AdaptiveProtocol {
    private static final Logger logger = LogManager.getLogger(AdaptiveEagerPushGossipBroadcast.class);
    public static final short PROTOCOL_ID = 1601;
    public static final String PROTOCOL_NAME = "AdaptiveEagerPushGossipBroadcast";
    public static final String PAR_CHANNEL_ADDRESS = "EagerPushGossipBroadcast.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "EagerPushGossipBroadcast.Channel.Port";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String DEFAULT_FANOUT = "4";
    public static final String PAR_DELIVERY_TIMEOUT = "EagerPushGossipBroadcast.DeliveredTimeout";
    public static final String DEFAULT_DELIVERY_TIEMEOUT = "600000";
    public static final String PAR_SUPPORT_ANTIENTROPHY = "EagerPushGosssipBroadcast.SupportAntiEntropy";
    public static final String DEFAULT_SUPPORT_ANTIENTROPHY = "false";
    public static final String PAR_USE_DEFAULT_CONFIG = "EagerPushGossipBroadcast.UseDefaultConfig";
    public static final String DEFAULT_USE_DEFAULT_CONFIG = "true";
    public static final String PAR_DNS_HOST = "EagerPushGossipBroadcast.DNSHost";

    @Adaptive
    @AutoConfigureParameter
    public int fanout;

    @AutoConfigureParameter
    public long removeTimeWindow;

    @AutoConfigureParameter
    public Boolean supportAntiEntrophy;
    private final Boolean useDefaultConfig;
    protected int channelId;
    public final int networkPort;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private Set<Host> neighbors;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> receivedTimestamps;
    private final Random r;
    private final String DNSHost;
    private Counter sentMessagesCounter;
    private Counter receivedMessagesCounter;
    private Counter duplicateMessagesCounter;
    private Record recvMsgRecord;

    public AdaptiveEagerPushGossipBroadcast(String str, Properties properties, Host host) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 1601);
        this.sentMessagesCounter = registerMetric(new Counter("SentMessages", "", new String[0]));
        this.receivedMessagesCounter = registerMetric(new Counter("ReceivedMessages", "", new String[0]));
        this.duplicateMessagesCounter = registerMetric(new Counter("DuplicateMessages", "", new String[0]));
        this.recvMsgRecord = registerMetric(new Record.Builder("RecvMessagesRecord", new String[]{"node", "message_id", "timestampRecv", "timestampSent", "from"}).build());
        setMyself(host);
        this.useDefaultConfig = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty(PAR_USE_DEFAULT_CONFIG, DEFAULT_USE_DEFAULT_CONFIG)));
        this.pending = new TreeSet();
        this.connectedNeighbors = new TreeSet();
        this.neighbors = new TreeSet();
        this.receivedInOrder = new LinkedList<>();
        this.receivedTimestamps = new HashMap<>();
        this.r = new Random(System.currentTimeMillis());
        this.DNSHost = properties.getProperty(PAR_DNS_HOST, null);
        String property = properties.getProperty(PAR_CHANNEL_ADDRESS, host.getAddress().getHostAddress().toString());
        String property2 = properties.getProperty(PAR_CHANNEL_PORT, host.getPort());
        this.networkPort = Integer.parseInt(property2);
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", property2);
        this.channelId = createChannel("TCPChannel", properties2);
        setDefaultChannel(this.channelId);
        logger.debug("Created new channel with id " + this.channelId + " and bounded to: " + property + ":" + property2);
        setDefaultChannel(this.channelId);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        if (this.useDefaultConfig.booleanValue()) {
            this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
            this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, DEFAULT_DELIVERY_TIEMEOUT));
            this.supportAntiEntrophy = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY, DEFAULT_SUPPORT_ANTIENTROPHY)));
        } else {
            this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, "-1"));
            this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, "-1"));
            if (properties.containsKey(PAR_SUPPORT_ANTIENTROPHY)) {
                this.supportAntiEntrophy = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY)));
            } else {
                this.supportAntiEntrophy = null;
            }
        }
        subscribeNotification((short) 401, this::uponNeighborUp);
        subscribeNotification((short) 402, this::uponNeighborDown);
        registerChannelEventHandler(this.channelId, (short) 3, this::uponOutConnectionDown);
        registerChannelEventHandler(this.channelId, (short) 4, this::uponOutConnectionFailed);
        registerChannelEventHandler(this.channelId, (short) 5, this::uponOutConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 2, this::uponInConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 1, this::uponInConnectionDown);
    }

    private void uponBroadcastRequest(BroadcastRequest broadcastRequest, short s) {
        GossipMessage gossipMessage = new GossipMessage(broadcastRequest.getTimestamp(), getMyself(), broadcastRequest.getPayload(), s);
        deliverMessage(gossipMessage.m2clone());
        ArrayList arrayList = new ArrayList(this.connectedNeighbors);
        logger.debug("Received request to broadcast. currenyly I have " + arrayList.size() + " targets. Fanout is " + this.fanout);
        for (int i = this.fanout; arrayList.size() > 0 && i > 0; i--) {
            logger.debug("Sending message " + String.valueOf(gossipMessage.getMID()) + " to a neighbor.");
            sendMessage(gossipMessage.m2clone(), (Host) arrayList.remove(this.r.nextInt(arrayList.size())));
            this.sentMessagesCounter.inc();
        }
        cleanUp();
    }

    private void deliverMessage(GossipMessage gossipMessage) {
        this.receivedInOrder.addLast(gossipMessage.getMID());
        this.receivedTimestamps.put(gossipMessage.getMID(), Long.valueOf(System.currentTimeMillis()));
        triggerNotification(new BroadcastDelivery(gossipMessage.getSender(), gossipMessage.getPayload(), gossipMessage.getTimestamp()));
        if (this.supportAntiEntrophy.booleanValue()) {
            triggerNotification(new IdentifiableMessageNotification(gossipMessage, (short) 1601));
        }
    }

    private void cleanUp() {
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.receivedTimestamps.get(this.receivedInOrder.pollFirst()).longValue() + this.removeTimeWindow < currentTimeMillis) {
            this.receivedTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void uponGossipMessage(GossipMessage gossipMessage, Host host, short s, int i) {
        if (this.receivedTimestamps.containsKey(gossipMessage.getMID())) {
            this.duplicateMessagesCounter.inc();
            return;
        }
        deliverMessage(gossipMessage.m2clone());
        this.receivedMessagesCounter.inc();
        this.recvMsgRecord.record(new String[]{getMyself().toString(), String.valueOf((int) gossipMessage.getId()), new Timestamp(System.currentTimeMillis()).toString(), gossipMessage.getTimestamp().toString(), gossipMessage.getSender().toString()});
        gossipMessage.incrementHopCount();
        ArrayList arrayList = new ArrayList(this.connectedNeighbors);
        arrayList.remove(host);
        arrayList.remove(gossipMessage.getSender());
        int i2 = this.fanout;
        while (arrayList.size() > 0 && i2 > 0) {
            sendMessage(gossipMessage, (Host) arrayList.remove(this.r.nextInt(arrayList.size())));
            this.sentMessagesCounter.inc();
        }
        cleanUp();
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        logger.debug("NeighborUp received: " + String.valueOf(neighborUp.getPeer()));
        Host host = new Host(neighborUp.getPeer().getAddress(), this.networkPort);
        this.neighbors.add(host);
        if (this.connectedNeighbors.contains(host) || !this.pending.add(host)) {
            return;
        }
        openConnection(host);
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        Host host = new Host(neighborDown.getPeer().getAddress(), this.networkPort);
        this.neighbors.remove(host);
        if (this.connectedNeighbors.remove(host)) {
            closeConnection(host);
        } else if (this.pending.remove(host)) {
            closeConnection(host);
        }
    }

    private void uponMissingMessageRequest(MissingIdentifiableMessageRequest missingIdentifiableMessageRequest, short s) {
        logger.debug("Received an anti-entrophy request indicating that " + String.valueOf(missingIdentifiableMessageRequest.getDestination()) + " is missing " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()));
        Host host = null;
        if (this.connectedNeighbors.contains(missingIdentifiableMessageRequest.getDestination()) || this.pending.contains(missingIdentifiableMessageRequest.getDestination())) {
            host = missingIdentifiableMessageRequest.getDestination();
            logger.debug("Destination in requeest is among my connections (active or pending)");
        } else {
            Host host2 = new Host(missingIdentifiableMessageRequest.getDestination().getAddress(), this.networkPort);
            if (this.connectedNeighbors.contains(host2) || this.pending.contains(host2)) {
                host = host2;
                logger.debug("Destination was translated to my protocol channel (" + String.valueOf(host2) + ") and was found among my connections (active or pending)");
            }
        }
        if (host == null) {
            logger.info("Unable to recover message " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()) + " to " + String.valueOf(missingIdentifiableMessageRequest.getDestination()));
            return;
        }
        sendMessage(missingIdentifiableMessageRequest.getMessage(), host);
        this.sentMessagesCounter.inc();
        logger.info("Recoved message " + String.valueOf(missingIdentifiableMessageRequest.getMessage().getMID()) + " to " + String.valueOf(host));
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        if (this.neighbors.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        } else {
            this.connectedNeighbors.remove(node);
            this.pending.remove(node);
            closeConnection(node);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        if (this.neighbors.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        } else {
            this.connectedNeighbors.remove(node);
            this.pending.remove(node);
            closeConnection(node);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        if (this.neighbors.contains(node)) {
            this.pending.remove(node);
            this.connectedNeighbors.add(node);
        } else {
            this.pending.remove(node);
            this.connectedNeighbors.remove(node);
            closeConnection(node);
        }
    }

    private void uponInConnectionUp(InConnectionUp inConnectionUp, int i) {
        logger.debug("Host (in) {} is up", inConnectionUp.getNode());
    }

    private void uponInConnectionDown(InConnectionDown inConnectionDown, int i) {
        logger.debug("Connection from host {} is down, cause: {}", inConnectionDown.getNode(), inConnectionDown.getCause());
    }

    public void setFanout(Integer num) {
        if (num == null) {
            return;
        }
        logger.debug("Setting adaptive fanout to {} ", num);
        this.fanout = num.intValue();
    }

    public void start() {
        try {
            if (this.supportAntiEntrophy.booleanValue()) {
                registerRequestHandler((short) 503, this::uponMissingMessageRequest);
            }
            registerMessageSerializer(this.channelId, (short) 1601, GossipMessage.serializer);
            registerMessageHandler(this.channelId, (short) 1601, this::uponGossipMessage);
            registerRequestHandler((short) 501, this::uponBroadcastRequest);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        triggerNotification(new ChannelAvailableNotification((short) 1601, PROTOCOL_NAME, this.channelId, "TCPChannel", getMyself()));
    }

    public boolean readyToStart() {
        return (this.fanout == -1 || this.removeTimeWindow == -1 || this.supportAntiEntrophy == null) ? false : true;
    }

    public boolean needsDiscovery() {
        return false;
    }

    public void addContact(Host host) {
    }

    public Host getContact() {
        return null;
    }

    public String getFirstFanout() {
        if (this.fanout == -1) {
            return null;
        }
        return this.fanout;
    }

    public String getFirstRemoveTimeWindow() {
        if (this.removeTimeWindow == -1) {
            return null;
        }
        return this.removeTimeWindow;
    }

    public String getFirstSupportAntiEntrophy() {
        if (this.supportAntiEntrophy == null) {
            return null;
        }
        return this.supportAntiEntrophy.toString();
    }

    public void setFirstFanout(String str) {
        if (str == null) {
            return;
        }
        logger.debug("Fanout set to {} on startup", str);
        this.fanout = Integer.parseInt(str);
    }

    public void setFirstRemoveTimeWindow(String str) {
        if (str == null) {
            return;
        }
        logger.debug("RemoveTimeWindow set to {} on startup", str);
        this.removeTimeWindow = Long.parseLong(str);
    }

    public void setFirstSupportAntiEntrophy(String str) {
        if (str == null) {
            return;
        }
        logger.debug("SupportAntiEntrophy set to {} on startup", str);
        this.supportAntiEntrophy = Boolean.valueOf(Boolean.parseBoolean(str));
    }

    public String getHost() {
        return this.DNSHost;
    }
}
