package pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect;

import java.io.IOException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.messages.Collect;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.messages.Monitor;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.messages.Prune;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.messages.Session;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.messages.SessionOk;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.notifications.CollectDataNotification;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.notifications.CollectNotification;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.notifications.ReceiveAggregatedDataNotification;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.requests.AggregateDataRequest;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.requests.MonitorDataRequest;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.requests.MonitorRequest;
import pt.unl.fct.di.novasys.babel.protocols.overlord.moncollect.timers.TimeoutTimer;
import pt.unl.fct.di.novasys.channel.tcp.TCPChannel;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/fct/di/novasys/babel/protocols/overlord/moncollect/MonCollect.class */
public class MonCollect extends GenericProtocol {
    public static final short PROTO_ID = 2700;
    public static final String PROTO_NAME = "Mon-Collect";
    public static final UUID NULL_UUID = new UUID(0, 0);
    public static final int DEFAULT_TIMEOUT_SECCONDS = 10000;
    public static final String MON_COLLECT_PORT = "MON-Collect.Channel.port";
    public static final String OVERLORD = "Overlord";
    public static final String TIMEOUT_VALUE = "MON-Collect.timeout";
    private static final int DEFAULT_PORT = 5556;
    private final short monitorProtoId;
    private final Logger logger;
    public int channelId;
    private Set<Host> membership;
    private short timeout;
    private UUID currentId;
    private Set<Host> children;
    private Host parent;
    private Set<byte[]> data;
    private int responses;
    private Host myself;
    private int channelPort;
    private boolean overlord;

    public MonCollect(Host host, short s) {
        super(PROTO_NAME, (short) 2700);
        this.logger = LogManager.getLogger((Class<?>) MonCollect.class);
        this.myself = host;
        this.monitorProtoId = s;
        this.overlord = false;
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws IOException, HandlerRegistrationException {
        if (properties.containsKey(MON_COLLECT_PORT)) {
            this.channelPort = Integer.parseInt(properties.getProperty(MON_COLLECT_PORT));
        } else {
            this.channelPort = DEFAULT_PORT;
        }
        this.myself = new Host(this.myself.getAddress(), this.channelPort);
        this.channelId = createChannel(TCPChannel.NAME, getProperties(properties));
        this.overlord = properties.containsKey(OVERLORD) && Boolean.parseBoolean(properties.getProperty(OVERLORD));
        registerMessageHandler(this.channelId, (short) 2701, this::session);
        registerMessageHandler(this.channelId, (short) 2703, this::sessionOk);
        registerMessageHandler(this.channelId, (short) 2702, this::prune);
        registerMessageHandler(this.channelId, (short) 2704, this::monitor);
        registerMessageHandler(this.channelId, (short) 2705, this::collectData);
        registerTimerHandler((short) 2750, this::timeoutHandler);
        if (this.overlord) {
            registerRequestHandler((short) 2601, this::monitorRequest);
        }
        subscribeNotification((short) 401, this::uponNeighborUp);
        subscribeNotification((short) 402, this::uponNeighborDown);
        subscribeNotification((short) 2770, this::collectMonitoredData);
        subscribeNotification((short) 2771, this::collect);
        registerMessageSerializer(this.channelId, (short) 2705, Collect.serializer);
        registerMessageSerializer(this.channelId, (short) 2704, Monitor.serializer);
        registerMessageSerializer(this.channelId, (short) 2702, Prune.serializer);
        registerMessageSerializer(this.channelId, (short) 2701, Session.serializer);
        registerMessageSerializer(this.channelId, (short) 2703, SessionOk.serializer);
        this.membership = new HashSet();
        resetTree();
        if (properties.containsKey(TIMEOUT_VALUE)) {
            this.timeout = Short.parseShort(properties.getProperty(TIMEOUT_VALUE));
        } else {
            this.timeout = (short) 10000;
        }
        if (this.overlord) {
            this.logger.info("Overlord MON-Collect successfuly Initialized");
        } else {
            this.logger.info("MON-Collect successfuly Initialized");
        }
    }

    public void monitorRequest(MonitorRequest monitorRequest, short s) {
        this.logger.debug("[Monitor Request]: Received Monitor Request.");
        this.currentId = UUID.randomUUID();
        this.logger.debug("[Monitor Request]: Generating random UUID: {}.", this.currentId);
        this.logger.debug("[SESSION]: Sending message to children");
        for (Host host : this.membership) {
            Session session = new Session(this.currentId, this.myself);
            this.logger.debug("Sending message to Host: {}", host);
            openConnection(host);
            sendMessage(session, host);
        }
        if (this.membership.isEmpty()) {
            enoughSessionOks();
        }
        setupTimer(new TimeoutTimer((short) 2701), this.timeout);
    }

    public void session(Session session, Host host, short s, int i) {
        if (this.overlord) {
            this.logger.debug("[SESSION]: Received SESSION. Im the central node, sending PRUNE back");
            openConnection(host);
            sendMessage(new Prune(session.getOpId(), this.myself), host);
            return;
        }
        this.logger.debug("[SESSION]: Received SESSION. sender: {}, Operation ID: {}", host, session.getOpId());
        if (this.currentId.equals(session.getOpId())) {
            this.logger.debug("[SESSION]: Message is not for me, sending PRUNE");
            Prune prune = new Prune(session.getOpId(), this.myself);
            openConnection(host);
            sendMessage(prune, host);
            return;
        }
        this.logger.debug("[SESSION]: Message is for me");
        this.currentId = session.getOpId();
        this.parent = host;
        this.logger.debug("[SESSION]: Sending message to children");
        for (Host host2 : this.membership) {
            Session session2 = new Session(session.getOpId(), this.myself);
            this.logger.debug("Sending message to Host: {}", host2);
            openConnection(host2);
            sendMessage(session2, host2);
        }
        setupTimer(new TimeoutTimer((short) 2701), this.timeout);
    }

    public void sessionOk(SessionOk sessionOk, Host host, short s, int i) {
        this.logger.debug("[SESSION_OK]: Received SESSION_OK. sender: {}, Operation ID: {}. Responses: {}/{}", host, sessionOk.getOpId(), Integer.valueOf(this.responses + 1), Integer.valueOf(this.membership.size()));
        if (!this.currentId.equals(sessionOk.getOpId())) {
            this.logger.debug("[SESSION_OK]: Message is not for me. My current ID: {}, Received current ID: {}", this.currentId, sessionOk.getOpId());
            return;
        }
        this.children.add(host);
        this.responses++;
        enoughSessionOks();
    }

    public void prune(Prune prune, Host host, short s, int i) {
        this.logger.debug("[PRUNE]: Received PRUNE. sender: {}, Operation ID:{}.", host, prune.getOpId());
        if (!this.currentId.equals(prune.getOpId())) {
            this.logger.debug("[PRUNE]: Message is not for me. My current ID: {}, Received current ID: {}.", this.currentId, prune.getOpId());
            return;
        }
        this.responses++;
        this.logger.debug("[PRUNE]: Current Responses: {}/{}.", Integer.valueOf(this.responses), Integer.valueOf(this.membership.size()));
        enoughSessionOks();
    }

    public void monitor(Monitor monitor, Host host, short s, int i) {
        this.logger.debug("[MONITOR]: Received MONITOR. sender: {}, Operation ID:{}.", host, monitor.getOpId());
        if (!this.currentId.equals(monitor.getOpId())) {
            this.logger.debug("[MONITOR]: Message is not for me. My current ID: {}, Received current ID: {}.", this.currentId, monitor.getOpId());
            return;
        }
        this.logger.debug("[MONITOR]: Sending message to children: {}.", this.children);
        for (Host host2 : this.children) {
            openConnection(host2);
            sendMessage(new Monitor(monitor.getOpId(), this.myself), host2);
        }
        setupTimer(new TimeoutTimer((short) 2704), this.timeout);
        sendRequest(new MonitorDataRequest(), this.monitorProtoId);
    }

    public void collectData(Collect collect, Host host, short s, int i) {
        this.logger.debug("[COLLECT]: Received COLLECT. Sender: {}, Operation ID: {}, Data received: {}.", host, collect.getOpId(), collect.getData());
        if (!this.currentId.equals(collect.getOpId())) {
            this.logger.debug("[COLLECT]: Message is not for me. My current ID: {}, Received current ID: {}.", this.currentId, collect.getOpId());
            return;
        }
        this.logger.debug("[COLLECT]: Received agrregated data, continuing collect.");
        this.data.add(collect.getData());
        this.logger.debug("[COLLECT]: Aggregated data: {}.", this.data);
        this.responses++;
        this.logger.debug("[COLLECT]: Responses: {}/{}.", Integer.valueOf(this.responses), Integer.valueOf(this.children.size() + 1));
        if (this.responses >= this.children.size() + 1) {
            cancelTimer(2750L);
            sendRequest(new AggregateDataRequest(this.data), this.monitorProtoId);
        }
    }

    public void collectMonitoredData(CollectDataNotification collectDataNotification, short s) {
        this.logger.debug("[collect_data]: Received collect_data. Deserialized Data received: {}.", collectDataNotification.getData());
        openConnection(this.myself);
        sendMessage(new Collect(this.currentId, this.myself, collectDataNotification.getData()), this.myself);
    }

    public void collect(ReceiveAggregatedDataNotification receiveAggregatedDataNotification, short s) {
        this.logger.debug("[COLLECT]: Received agrregated data, continuing collect.");
        this.logger.debug("[COLLECT]: Aggregated data: {}.", receiveAggregatedDataNotification.getData());
        if (this.overlord) {
            CollectNotification collectNotification = new CollectNotification(receiveAggregatedDataNotification.getData());
            this.logger.debug("[COLLECT]: MON-Collect complete, sending data {}", receiveAggregatedDataNotification.getData());
            triggerNotification(collectNotification);
        } else {
            this.logger.debug("[COLLECT]: Have enough responses or timed out. Sending to parent: {}", this.parent);
            openConnection(this.parent);
            sendMessage(new Collect(this.currentId, this.myself, receiveAggregatedDataNotification.getData()), this.parent);
        }
        resetTree();
    }

    private void enoughSessionOks() {
        if (this.responses >= this.membership.size()) {
            cancelTimer(2750L);
            this.responses = 0;
            if (this.overlord) {
                Monitor monitor = new Monitor(this.currentId, this.myself);
                openConnection(this.myself);
                sendMessage(monitor, this.myself);
            } else {
                if (this.parent == null) {
                    this.logger.error("[SESSION_OK] CRITICAL ERROR: Received SESSION_OK, but parent is null. Current ID is {}", this.currentId);
                }
                openConnection(this.parent);
                sendMessage(new SessionOk(this.currentId, this.parent), this.parent);
            }
        }
    }

    private void timeoutHandler(TimeoutTimer timeoutTimer, long j) {
        if (this.currentId.equals(NULL_UUID)) {
            cancelTimer(j);
            return;
        }
        this.logger.debug("[Timeout]: Operation timeout, sending message anyways.");
        if (timeoutTimer.getObjId() == 2701) {
            sessionTimeoutHandler();
        } else if (timeoutTimer.getObjId() == 2704) {
            monitorTimeoutHandler();
        } else {
            this.logger.error("[Timeout]: CRITICAL ERROR: Timeout was wrongly created.");
        }
    }

    private void monitorTimeoutHandler() {
        this.logger.debug("[Timeout]: Aggregating data {}.", this.data);
        sendRequest(new AggregateDataRequest(this.data), this.monitorProtoId);
    }

    private void sessionTimeoutHandler() {
        if (this.overlord) {
            this.logger.debug("[Timeout]: Begining Monitor operation.");
            openConnection(this.myself);
            sendMessage(new Monitor(this.currentId, this.myself), this.myself);
        } else {
            this.logger.debug("[Timeout]: Sending Message to Parent.");
            if (this.parent == null) {
                this.logger.error("[Timeout]: CRITICAL ERROR: Timedout but parent is null. Current ID is {}.", this.currentId);
            }
            openConnection(this.parent);
            sendMessage(new SessionOk(this.currentId, this.myself), this.parent);
        }
    }

    private void resetTree() {
        this.logger.debug("Reseting Tree...");
        this.currentId = NULL_UUID;
        this.responses = 0;
        this.children = new HashSet();
        this.data = new HashSet();
    }

    private Properties getProperties(Properties properties) {
        Properties properties2 = new Properties();
        properties2.setProperty("address", this.myself.getAddress().getHostAddress());
        properties2.setProperty("port", properties.getProperty(MON_COLLECT_PORT));
        properties2.setProperty("heartbeat_interval", "1000");
        properties2.setProperty("heartbeat_tolerance", "3000");
        properties2.setProperty("connect_timeout", "1000");
        return properties2;
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        this.logger.debug("[MEMBERSHIP]: New neighbor: {}", neighborUp.getPeer());
        this.membership.add(new Host(neighborUp.getPeer().getAddress(), this.channelPort));
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        for (Host host : this.membership) {
            if (host.getAddress().equals(neighborDown.getPeer().getAddress())) {
                this.logger.debug("[MEMBERSHIP]: Lost neighbor: {}", neighborDown.getPeer());
                this.membership.remove(host);
                this.children.remove(host);
                return;
            }
        }
    }
}
