/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.fct.dii.novasys.babel.protocols.floodbcast;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoNotification;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.dii.novasys.babel.protocols.floodbcast.messages.BCastMessage;

public class FloodBroadcast
extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger(FloodBroadcast.class);
    public static final short PROTOCOL_ID = 500;
    public static final String PROTOCOL_NAME = "FloodBroadcast";
    public final long removeTimeWindow;
    public final short networkPort;
    protected int channelId;
    protected final Host myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private LinkedList<UUID> receivedInOrder;
    private HashMap<UUID, Long> rerceiveTimestamps;

    public FloodBroadcast(String channelName, Properties properties, Host myself) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)500);
        this.myself = myself;
        this.pending = new TreeSet<Host>();
        this.connectedNeighbors = new TreeSet<Host>();
        this.receivedInOrder = new LinkedList();
        this.rerceiveTimestamps = new HashMap();
        this.removeTimeWindow = Long.parseLong(properties.getProperty("FloodBroadcast.DeliveredTimeout", "600000"));
        String address = properties.getProperty("FloodBroadcast.Channel.Address");
        String port = properties.getProperty("FloodBroadcast.Channel.Port");
        this.networkPort = Short.parseShort(port);
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        this.channelId = this.createChannel("TCPChannel", channelProps);
        this.registerMessageSerializer(this.channelId, (short)501, BCastMessage.serializer);
        this.registerMessageHandler(this.channelId, (short)501, this::uponBCastMessage);
        this.registerRequestHandler((short)501, this::uponBroadcastRequest);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
    }

    public void init(Properties props) throws HandlerRegistrationException, IOException {
    }

    private void uponBroadcastRequest(BroadcastRequest request, short protoID) {
        BCastMessage msg = new BCastMessage(request.getTimestamp(), this.myself, request.getPayload(), request.getId());
        this.deliverMessage(msg.clone());
        for (Host h : this.connectedNeighbors) {
            this.sendMessage(msg, h);
        }
        this.cleanUp();
    }

    private void deliverMessage(BCastMessage msg) {
        this.receivedInOrder.addLast(msg.getMID());
        this.rerceiveTimestamps.put(msg.getMID(), System.currentTimeMillis());
        this.triggerNotification((ProtoNotification)new BroadcastDelivery(msg.getSender(), msg.getPayload(), msg.getTimestamp()));
    }

    private void cleanUp() {
        long now = System.currentTimeMillis();
        while (!this.receivedInOrder.isEmpty() && this.rerceiveTimestamps.get(this.receivedInOrder.pollFirst()) + this.removeTimeWindow < now) {
            this.rerceiveTimestamps.remove(this.receivedInOrder.removeFirst());
        }
    }

    private void uponBCastMessage(BCastMessage msg, Host sender, short protoID, int cID) {
        if (!this.rerceiveTimestamps.containsKey(msg.getMID())) {
            this.deliverMessage(msg.clone());
        }
        msg.incrementHopCount();
        for (Host h : this.connectedNeighbors) {
            if (h.equals((Object)sender) || h.equals((Object)msg.getSender())) continue;
            this.sendMessage(msg, sender);
        }
        this.cleanUp();
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Host h = new Host(up.getPeer().getAddress(), (int)this.networkPort);
        if (!this.connectedNeighbors.contains(h) && this.pending.add(h)) {
            this.openConnection(h);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h = new Host(down.getPeer().getAddress(), (int)this.networkPort);
        if (this.connectedNeighbors.remove(h)) {
            this.closeConnection(h);
        } else if (this.pending.remove(h)) {
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h);
        if (!this.connectedNeighbors.contains(h) && !this.pending.contains(h)) {
            this.closeConnection(h);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.trace("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.trace("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }
}

