/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.di.novasys.babel.protocols.eagerpush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.IdentifiableMessageNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.MissingIdentifiableMessageRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.messages.GossipMessage;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

public class EagerPushGossipBroadcast
extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger(EagerPushGossipBroadcast.class);
    public static final short PROTOCOL_ID = 600;
    public static final String PROTOCOL_NAME = "EagerPushGossipBroadcast";
    public static final String PAR_CHANNEL_ADDRESS = "EagerPushGossipBroadcast.Channel.Address";
    public static final String PAR_CHANNEL_PORT = "EagerPushGossipBroadcast.Channel.Port";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String DEFAULT_FANOUT = "4";
    public static final String PAR_DELIVERY_TIMEOUT = "EagerPushGossipBroadcast.DeliveredTimeout";
    public static final String DEFAULT_DELIVERY_TIEMEOUT = "600000";
    public static final String PAR_SUPPORT_ANTIENTROPHY = "EagerPushGosssipBroadcast.SupportAntiEntropy";
    public static final boolean DEFAULT_SUPPORT_ANTIENTROPHY = false;
    public final int fanout;
    public final long removeTimeWindow;
    public final int networkPort;
    public final boolean supportAntiEntrophy;
    protected int channelId;
    protected final Host myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private Set<Host> neighbors;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> receivedTimestamps;
    private final Random r;

    public EagerPushGossipBroadcast(String channelName, Properties properties, Host myself) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)600);
        this.myself = myself;
        this.pending = new TreeSet<Host>();
        this.connectedNeighbors = new TreeSet<Host>();
        this.neighbors = new TreeSet<Host>();
        this.receivedInOrder = new LinkedList();
        this.receivedTimestamps = new HashMap();
        this.fanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
        this.removeTimeWindow = Long.parseLong(properties.getProperty(PAR_DELIVERY_TIMEOUT, DEFAULT_DELIVERY_TIEMEOUT));
        this.r = new Random(System.currentTimeMillis());
        this.supportAntiEntrophy = properties.containsKey(PAR_SUPPORT_ANTIENTROPHY) ? Boolean.parseBoolean(properties.getProperty(PAR_SUPPORT_ANTIENTROPHY)) : false;
        String address = null;
        String port = null;
        address = properties.containsKey(PAR_CHANNEL_ADDRESS) ? properties.getProperty(PAR_CHANNEL_ADDRESS) : myself.getAddress().getHostAddress().toString();
        if (properties.containsKey(PAR_CHANNEL_PORT)) {
            port = properties.getProperty(PAR_CHANNEL_PORT);
            this.networkPort = Integer.parseInt(port);
        } else {
            this.networkPort = myself.getPort();
            port = this.networkPort + "";
        }
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        this.channelId = this.createChannel("TCPChannel", channelProps);
        this.setDefaultChannel(this.channelId);
        this.registerMessageSerializer(this.channelId, (short)601, GossipMessage.serializer);
        this.registerMessageHandler(this.channelId, (short)601, this::uponGossipMessage);
        this.registerRequestHandler((short)501, this::uponBroadcastRequest);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        if (this.supportAntiEntrophy) {
            this.registerRequestHandler((short)503, this::uponMissingMessageRequest);
        }
        this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
    }

    @Override
    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.triggerNotification(new ChannelAvailableNotification(600, PROTOCOL_NAME, this.channelId, "TCPChannel", this.myself));
    }

    private void uponBroadcastRequest(BroadcastRequest request, short protoID) {
        GossipMessage msg = new GossipMessage(request.getTimestamp(), this.myself, request.getPayload(), protoID);
        this.deliverMessage(msg.clone());
        ArrayList<Host> validTargets = new ArrayList<Host>(this.connectedNeighbors);
        int toSend = this.fanout;
        while (validTargets.size() > 0 && toSend > 0) {
            this.sendMessage(msg, validTargets.remove(this.r.nextInt(validTargets.size())));
        }
        this.cleanUp();
    }

    private void deliverMessage(GossipMessage msg) {
        this.receivedInOrder.addLast(msg.getMID());
        this.receivedTimestamps.put(msg.getMID(), System.currentTimeMillis());
        this.triggerNotification(new BroadcastDelivery(msg.getSender(), msg.getPayload(), msg.getTimestamp()));
        if (this.supportAntiEntrophy) {
            this.triggerNotification(new IdentifiableMessageNotification(msg, 600));
        }
    }

    private void cleanUp() {
        long now = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.receivedTimestamps.get(this.receivedInOrder.pollFirst()) + this.removeTimeWindow < now) {
            this.receivedTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void uponGossipMessage(GossipMessage msg, Host sender, short protoID, int cID) {
        if (!this.receivedTimestamps.containsKey(msg.getMID())) {
            this.deliverMessage(msg.clone());
            msg.incrementHopCount();
            ArrayList<Host> validTargets = new ArrayList<Host>(this.connectedNeighbors);
            validTargets.remove(sender);
            validTargets.remove(msg.getSender());
            int toSend = this.fanout;
            while (validTargets.size() > 0 && toSend > 0) {
                this.sendMessage(msg, validTargets.remove(this.r.nextInt(validTargets.size())));
            }
            this.cleanUp();
        }
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Host h2 = new Host(up.getPeer().getAddress(), this.networkPort);
        this.neighbors.add(h2);
        if (!this.connectedNeighbors.contains(h2) && this.pending.add(h2)) {
            this.openConnection(h2);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h2 = new Host(down.getPeer().getAddress(), this.networkPort);
        this.neighbors.remove(h2);
        if (this.connectedNeighbors.remove(h2)) {
            this.closeConnection(h2);
        } else if (this.pending.remove(h2)) {
            this.closeConnection(h2);
        }
    }

    private void uponMissingMessageRequest(MissingIdentifiableMessageRequest req, short protoID) {
        logger.debug("Received an anti-entrophy request indicating that " + req.getDestination() + " is missing " + req.getMessage().getMID());
        Host d = null;
        if (this.connectedNeighbors.contains(req.getDestination()) || this.pending.contains(req.getDestination())) {
            d = req.getDestination();
            logger.debug("Destination in requeest is among my connections (active or pending)");
        } else {
            Host tmp = new Host(req.getDestination().getAddress(), this.networkPort);
            if (this.connectedNeighbors.contains(tmp) || this.pending.contains(tmp)) {
                d = tmp;
                logger.debug("Destination was translated to my protocol channel (" + tmp + ") and was found among my connections (active or pending)");
            }
        }
        if (d != null) {
            this.sendMessage(req.getMessage(), d);
            logger.info("Recoved message " + req.getMessage().getMID() + " to " + d);
        } else {
            logger.info("Unable to recover message " + req.getMessage().getMID() + " to " + req.getDestination());
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h2, (Object)event.getCause());
        if (this.neighbors.contains(h2)) {
            this.connectedNeighbors.remove(h2);
            this.pending.add(h2);
            this.openConnection(h2);
        } else {
            this.connectedNeighbors.remove(h2);
            this.pending.remove(h2);
            this.closeConnection(h2);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h2, (Object)event.getCause());
        if (this.neighbors.contains(h2)) {
            this.connectedNeighbors.remove(h2);
            this.pending.add(h2);
            this.openConnection(h2);
        } else {
            this.connectedNeighbors.remove(h2);
            this.pending.remove(h2);
            this.closeConnection(h2);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h2 = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h2);
        if (this.neighbors.contains(h2)) {
            this.pending.remove(h2);
            this.connectedNeighbors.add(h2);
        } else {
            this.pending.remove(h2);
            this.connectedNeighbors.remove(h2);
            this.closeConnection(h2);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.trace("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.trace("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }

    public Host getHost() {
        return this.myself;
    }
}

