package pt.unl.fct.di.novasys.babel.protocols.eagerpush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.generic.ProtoNotification;
import pt.unl.fct.di.novasys.babel.generic.ProtoRequest;
import pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler;
import pt.unl.fct.di.novasys.babel.handlers.MessageInHandler;
import pt.unl.fct.di.novasys.babel.handlers.NotificationHandler;
import pt.unl.fct.di.novasys.babel.handlers.RequestHandler;
import pt.unl.fct.di.novasys.babel.metrics.Counter;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.messages.GossipMessage;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.ChannelEvent;
import pt.unl.fct.di.novasys.channel.tcp.TCPChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: classes5.dex */
public class EagerPushGossipBroadcast extends GenericProtocol {
    public static final String DEFAULT_DELIVERY_TIEMEOUT = "600000";
    public static final String DEFAULT_FANOUT = "4";
    public static final boolean DEFAULT_SUPPORT_ANTIENTROPHY = false;
    public static final String PAR_CHANNEL_ADDRESS = "EagerPushGossipBroadcast.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "EagerPushGossipBroadcast.Channel.Port";
    public static final String PAR_DELIVERY_TIMEOUT = "EagerPushGossipBroadcast.DeliveredTimeout";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String PAR_SUPPORT_ANTIENTROPHY = "EagerPushGosssipBroadcast.SupportAntiEntropy";
    public static final short PROTOCOL_ID = 1600;
    public static final String PROTOCOL_NAME = "EagerPushGossipBroadcast";
    private static final Logger logger = LogManager.getLogger((Class<?>) EagerPushGossipBroadcast.class);
    protected int channelId;
    private Set<Host> connectedNeighbors;
    public final int fanout;
    protected final Host myself;
    private Set<Host> neighbors;
    public final int networkPort;
    private Set<Host> pending;
    private final Random r;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> receivedTimestamps;
    public final long removeTimeWindow;
    private Counter sentMessagesCounter;
    public final boolean supportAntiEntrophy;

    public EagerPushGossipBroadcast(String str, Properties properties, Host host) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, PROTOCOL_ID);
        String sb;
        this.sentMessagesCounter = (Counter) registerMetric(new Counter("SentMessages", "", new String[0]));
        this.myself = host;
        this.pending = new TreeSet();
        this.connectedNeighbors = new TreeSet();
        this.neighbors = new TreeSet();
        this.receivedInOrder = new LinkedList<>();
        this.receivedTimestamps = new HashMap<>();
        this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
        this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, DEFAULT_DELIVERY_TIEMEOUT));
        this.r = new Random(System.currentTimeMillis());
        if (properties.containsKey(PAR_SUPPORT_ANTIENTROPHY)) {
            this.supportAntiEntrophy = Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY));
        } else {
            this.supportAntiEntrophy = false;
        }
        String property = properties.containsKey(PAR_CHANNEL_ADDRESS) ? properties.getProperty(PAR_CHANNEL_ADDRESS) : host.getAddress().getHostAddress().toString();
        if (properties.containsKey(PAR_CHANNEL_PORT)) {
            sb = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Integer.parseInt(sb);
        } else {
            int port = host.getPort();
            this.networkPort = port;
            sb = new StringBuilder().append(port).toString();
        }
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", sb);
        int createChannel = createChannel(TCPChannel.NAME, properties2);
        this.channelId = createChannel;
        setDefaultChannel(createChannel);
        logger.debug("Created new channel with id " + this.channelId + " and bounded to: " + property + ParameterizedMessage.ERROR_MSG_SEPARATOR + sb);
        setDefaultChannel(this.channelId);
        registerMessageSerializer(this.channelId, GossipMessage.MSG_CODE, GossipMessage.serializer);
        registerMessageHandler(this.channelId, GossipMessage.MSG_CODE, new MessageInHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda0
            @Override // pt.unl.fct.di.novasys.babel.handlers.MessageInHandler
            public final void receive(ProtoMessage protoMessage, Host host2, short s, int i) {
                EagerPushGossipBroadcast.this.uponGossipMessage((GossipMessage) protoMessage, host2, s, i);
            }
        });
        registerRequestHandler((short) 501, new RequestHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda1
            @Override // pt.unl.fct.di.novasys.babel.handlers.RequestHandler
            public final void uponRequest(ProtoRequest protoRequest, short s) {
                EagerPushGossipBroadcast.this.uponBroadcastRequest((BroadcastRequest) protoRequest, s);
            }
        });
        subscribeNotification((short) 401, new NotificationHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda2
            @Override // pt.unl.fct.di.novasys.babel.handlers.NotificationHandler
            public final void uponNotification(ProtoNotification protoNotification, short s) {
                EagerPushGossipBroadcast.this.uponNeighborUp((NeighborUp) protoNotification, s);
            }
        });
        subscribeNotification((short) 402, new NotificationHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda3
            @Override // pt.unl.fct.di.novasys.babel.handlers.NotificationHandler
            public final void uponNotification(ProtoNotification protoNotification, short s) {
                EagerPushGossipBroadcast.this.uponNeighborDown((NeighborDown) protoNotification, s);
            }
        });
        if (this.supportAntiEntrophy) {
            registerRequestHandler((short) 503, new RequestHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda4
                @Override // pt.unl.fct.di.novasys.babel.handlers.RequestHandler
                public final void uponRequest(ProtoRequest protoRequest, short s) {
                    EagerPushGossipBroadcast.this.uponMissingMessageRequest((MissingIdentifiableMessageRequest) protoRequest, s);
                }
            });
        }
        registerChannelEventHandler(this.channelId, (short) 3, new ChannelEventHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda5
            @Override // pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler
            public final void handleEvent(ChannelEvent channelEvent, int i) {
                EagerPushGossipBroadcast.this.uponOutConnectionDown((OutConnectionDown) channelEvent, i);
            }
        });
        registerChannelEventHandler(this.channelId, (short) 4, new ChannelEventHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda6
            @Override // pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler
            public final void handleEvent(ChannelEvent channelEvent, int i) {
                EagerPushGossipBroadcast.this.uponOutConnectionFailed((OutConnectionFailed) channelEvent, i);
            }
        });
        registerChannelEventHandler(this.channelId, (short) 5, new ChannelEventHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda7
            @Override // pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler
            public final void handleEvent(ChannelEvent channelEvent, int i) {
                EagerPushGossipBroadcast.this.uponOutConnectionUp((OutConnectionUp) channelEvent, i);
            }
        });
        registerChannelEventHandler(this.channelId, (short) 2, new ChannelEventHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda8
            @Override // pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler
            public final void handleEvent(ChannelEvent channelEvent, int i) {
                EagerPushGossipBroadcast.this.uponInConnectionUp((InConnectionUp) channelEvent, i);
            }
        });
        registerChannelEventHandler(this.channelId, (short) 1, new ChannelEventHandler() { // from class: pt.unl.fct.di.novasys.babel.protocols.eagerpush.EagerPushGossipBroadcast$$ExternalSyntheticLambda9
            @Override // pt.unl.fct.di.novasys.babel.handlers.ChannelEventHandler
            public final void handleEvent(ChannelEvent channelEvent, int i) {
                EagerPushGossipBroadcast.this.uponInConnectionDown((InConnectionDown) channelEvent, i);
            }
        });
    }

    private void cleanUp() {
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.receivedTimestamps.get(this.receivedInOrder.pollFirst()).longValue() + this.removeTimeWindow < currentTimeMillis) {
            this.receivedTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void deliverMessage(GossipMessage gossipMessage) {
        this.receivedInOrder.addLast(gossipMessage.getMID());
        this.receivedTimestamps.put(gossipMessage.getMID(), Long.valueOf(System.currentTimeMillis()));
        triggerNotification(new BroadcastDelivery(gossipMessage.getSender(), gossipMessage.getPayload(), gossipMessage.getTimestamp()));
        if (this.supportAntiEntrophy) {
            triggerNotification(new IdentifiableMessageNotification(gossipMessage, PROTOCOL_ID));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponBroadcastRequest(BroadcastRequest broadcastRequest, short s) {
        GossipMessage gossipMessage = new GossipMessage(broadcastRequest.getTimestamp(), this.myself, broadcastRequest.getPayload(), s);
        deliverMessage(gossipMessage.clone());
        ArrayList arrayList = new ArrayList(this.connectedNeighbors);
        logger.debug("Received request to broadcast. currenyly I have " + arrayList.size() + " targets. Fanout is " + this.fanout);
        for (int i = this.fanout; arrayList.size() > 0 && i > 0; i--) {
            logger.debug("Sending message " + gossipMessage.getMID() + " to a neighbor.");
            sendMessage(gossipMessage.clone(), (Host) arrayList.remove(this.r.nextInt(arrayList.size())));
            this.sentMessagesCounter.inc();
        }
        cleanUp();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponGossipMessage(GossipMessage gossipMessage, Host host, short s, int i) {
        if (this.receivedTimestamps.containsKey(gossipMessage.getMID())) {
            return;
        }
        deliverMessage(gossipMessage.clone());
        gossipMessage.incrementHopCount();
        ArrayList arrayList = new ArrayList(this.connectedNeighbors);
        arrayList.remove(host);
        arrayList.remove(gossipMessage.getSender());
        for (int i2 = this.fanout; arrayList.size() > 0 && i2 > 0; i2--) {
            sendMessage(gossipMessage, (Host) arrayList.remove(this.r.nextInt(arrayList.size())));
            this.sentMessagesCounter.inc();
        }
        cleanUp();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponInConnectionDown(InConnectionDown inConnectionDown, int i) {
        logger.debug("Connection from host {} is down, cause: {}", inConnectionDown.getNode(), inConnectionDown.getCause());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponInConnectionUp(InConnectionUp inConnectionUp, int i) {
        logger.debug("Host (in) {} is up", inConnectionUp.getNode());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponMissingMessageRequest(MissingIdentifiableMessageRequest missingIdentifiableMessageRequest, short s) {
        Host destination;
        Logger logger2 = logger;
        logger2.debug("Received an anti-entrophy request indicating that " + missingIdentifiableMessageRequest.getDestination() + " is missing " + missingIdentifiableMessageRequest.getMessage().getMID());
        if (this.connectedNeighbors.contains(missingIdentifiableMessageRequest.getDestination()) || this.pending.contains(missingIdentifiableMessageRequest.getDestination())) {
            destination = missingIdentifiableMessageRequest.getDestination();
            logger2.debug("Destination in requeest is among my connections (active or pending)");
        } else {
            destination = new Host(missingIdentifiableMessageRequest.getDestination().getAddress(), this.networkPort);
            if (this.connectedNeighbors.contains(destination) || this.pending.contains(destination)) {
                logger2.debug("Destination was translated to my protocol channel (" + destination + ") and was found among my connections (active or pending)");
            } else {
                destination = null;
            }
        }
        if (destination == null) {
            logger2.info("Unable to recover message " + missingIdentifiableMessageRequest.getMessage().getMID() + " to " + missingIdentifiableMessageRequest.getDestination());
        } else {
            sendMessage(missingIdentifiableMessageRequest.getMessage(), destination);
            this.sentMessagesCounter.inc();
            logger2.info("Recoved message " + missingIdentifiableMessageRequest.getMessage().getMID() + " to " + destination);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponNeighborDown(NeighborDown neighborDown, short s) {
        Host host = new Host(neighborDown.getPeer().getAddress(), this.networkPort);
        this.neighbors.remove(host);
        if (this.connectedNeighbors.remove(host)) {
            closeConnection(host);
        } else if (this.pending.remove(host)) {
            closeConnection(host);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponNeighborUp(NeighborUp neighborUp, short s) {
        logger.debug("NeighborUp received: " + neighborUp.getPeer());
        Host host = new Host(neighborUp.getPeer().getAddress(), this.networkPort);
        this.neighbors.add(host);
        if (this.connectedNeighbors.contains(host) || !this.pending.add(host)) {
            return;
        }
        openConnection(host);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        if (this.neighbors.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        } else {
            this.connectedNeighbors.remove(node);
            this.pending.remove(node);
            closeConnection(node);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        if (this.neighbors.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        } else {
            this.connectedNeighbors.remove(node);
            this.pending.remove(node);
            closeConnection(node);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        if (this.neighbors.contains(node)) {
            this.pending.remove(node);
            this.connectedNeighbors.add(node);
        } else {
            this.pending.remove(node);
            this.connectedNeighbors.remove(node);
            closeConnection(node);
        }
    }

    public Host getHost() {
        return this.myself;
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        triggerNotification(new ChannelAvailableNotification(PROTOCOL_ID, PROTOCOL_NAME, this.channelId, TCPChannel.NAME, this.myself));
    }
}
