package tardis.monitor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Queue;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.utils.recordexporter.utils.ReceiveRecord;
import pt.unl.fct.di.novasys.babel.utils.recordexporter.utils.SendRecordMessage;
import pt.unl.fct.di.novasys.channel.tcp.TCPChannel;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import tardis.monitor.data.MembershipSnapshot;
import tardis.monitor.data.MessageStatistics;
import tardis.monitor.storage.JSONStorage;
import tardis.monitor.timers.AggregationTimer;

/* loaded from: input_file:tardis/monitor/Monitor.class */
public class Monitor extends GenericProtocol {
    public static final String PROTOCOL_NAME = "MetricsMonitor";
    public static final short PROTOCOL_ID = 6666;
    public static final String PAR_MESSAGE_VALIDITY_TIME_MS = "monitor.cutoff";
    public static final String PAR_AGGREGATION_TIME_MS = "monitor.aggregation";
    public static final String PAR_PUBLISHER = "monitor.publisher";
    public static final String PAR_PUBLISHER_PORT = "monitor.publisher.port";
    public static final String PAR_FANOUT = "EagerPushGossipBroadcast.Fanout";
    public static final String DEFAULT_FANOUT = "4";
    public static final String PROP_CHANGE_BROADCAST_FANOUT_TYPE = "ChangeBroadcastFanout.type";
    public static final String PROP_AGENT_ADDRESS = "Agent.address";
    public static final String PROP_AGENT_PORT = "Agent.port";
    public static final String DEFAULT_MESSAGE_VALIDITY_TIME_MS = "30000";
    public static final String DEFAULT_AGGREGATION_TIMER = "30000";
    public long messageValidity;
    private int channelId;
    private final String STORAGE_FILENAME = "storage.json";
    private final boolean OVERWRITE = true;
    private JSONStorage storage;
    private MetricsPublisher publisher;
    private int currentFanout;
    private Queue<ReceiveRecord> toBeProcessed;
    private LinkedList<MessageStatistics> timeline;
    private HashMap<UUID, MessageStatistics> stats;
    private HashSet<Host> hosts;
    private Host myself;
    private final Logger logger;
    private TreeSet<MembershipSnapshot> membershipInfo;
    private MembershipSnapshot currentWindow;
    static final /* synthetic */ boolean $assertionsDisabled;

    public Monitor(Host host) {
        super(PROTOCOL_NAME, (short) 6666);
        this.STORAGE_FILENAME = "storage.json";
        this.OVERWRITE = true;
        this.toBeProcessed = new LinkedList();
        this.logger = LogManager.getLogger((Class<?>) Monitor.class);
        this.myself = host;
        this.timeline = new LinkedList<>();
        this.stats = new HashMap<>();
        this.hosts = new HashSet<>();
        this.currentFanout = -1;
        try {
            this.storage = new JSONStorage("storage.json", true);
        } catch (IOException e) {
            this.storage = null;
            e.printStackTrace();
        }
        this.membershipInfo = new TreeSet<>();
        this.currentWindow = new MembershipSnapshot(this.hosts);
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        this.messageValidity = Long.parseLong(properties.getProperty(PAR_MESSAGE_VALIDITY_TIME_MS, "30000"));
        long parseLong = Long.parseLong(properties.getProperty(PAR_AGGREGATION_TIME_MS, "30000"));
        this.logger.info("Message validity time: {}s", Long.valueOf(this.messageValidity / 1000));
        this.logger.info("Aggregation timer: {}s", Long.valueOf(parseLong / 1000));
        if (properties.containsKey(PAR_PUBLISHER) && Boolean.parseBoolean(properties.getProperty(PAR_PUBLISHER))) {
            if (!properties.containsKey(PAR_PUBLISHER_PORT)) {
                System.err.println("Missing publisher port.");
                System.exit(1);
            }
            this.publisher = new MetricsPublisher(Integer.parseInt(properties.getProperty(PAR_PUBLISHER_PORT)));
        }
        this.currentFanout = Integer.parseInt(properties.getProperty(PAR_FANOUT, DEFAULT_FANOUT));
        Properties properties2 = new Properties();
        properties2.setProperty("address", this.myself.getAddress().getHostAddress());
        properties2.setProperty("port", this.myself.getPort());
        this.channelId = createChannel(TCPChannel.NAME, properties2);
        registerMessageSerializer(this.channelId, (short) 9192, SendRecordMessage.serializer);
        registerMessageHandler(this.channelId, (short) 9192, this::uponReceiveSendRecordMessage);
        registerTimerHandler((short) 10110, this::uponAggregationTimer);
        setupPeriodicTimer(new AggregationTimer(), parseLong, parseLong);
        registerChannelEventHandler(this.channelId, (short) 3, this::uponOutConnectionDown);
        registerChannelEventHandler(this.channelId, (short) 4, this::uponOutConnectionFailed);
        registerChannelEventHandler(this.channelId, (short) 5, this::uponOutConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 2, this::uponInConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 1, this::uponInConnectionDown);
        String property = properties.getProperty(PROP_CHANGE_BROADCAST_FANOUT_TYPE, "none");
        String property2 = properties.getProperty(PROP_AGENT_ADDRESS);
        int parseInt = Integer.parseInt(properties.getProperty(PROP_AGENT_PORT, "-1"));
        if (property.equals("agent")) {
            if (property2 == null || parseInt == -1) {
                this.logger.error("Missing address or port for agent communication");
                System.exit(1);
            }
            this.logger.debug("Setting up an agent listener for changing broadcast fanout");
            new Thread(() -> {
                subscribeToBroadcastReconfiguration(property2, parseInt);
            }).start();
        }
    }

    private void subscribeToBroadcastReconfiguration(String str, int i) {
        try {
            ZContext zContext = new ZContext();
            try {
                ZMQ.Socket createSocket = zContext.createSocket(SocketType.SUB);
                createSocket.connect(String.format("tcp://%s:%d", str, Integer.valueOf(i)));
                createSocket.subscribe(new byte[0]);
                while (!Thread.currentThread().isInterrupted()) {
                    int i2 = ByteBuffer.wrap(createSocket.recv(0)).getInt();
                    this.logger.debug("Received fanout {} from agent", Integer.valueOf(i2));
                    this.currentFanout = i2;
                }
                zContext.close();
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            this.logger.warn("Failed to subscribe to fanout", (Throwable) e);
        }
    }

    public void addRecord(ReceiveRecord receiveRecord) {
        UUID messageId = receiveRecord.getMessageId();
        if (this.stats.containsKey(messageId)) {
            this.stats.get(messageId).updateStatistics(receiveRecord.getTimestampRecv(), receiveRecord.getHopCount(), receiveRecord.getNode());
        } else {
            MessageStatistics messageStatistics = new MessageStatistics(messageId, receiveRecord.getTimestampSent(), receiveRecord.getTimestampRecv(), receiveRecord.getHopCount(), receiveRecord.getNode());
            this.stats.put(messageId, messageStatistics);
            this.timeline.add(messageStatistics);
            this.timeline.sort((messageStatistics2, messageStatistics3) -> {
                return Long.compare(messageStatistics2.getCreationTime(), messageStatistics3.getCreationTime());
            });
        }
    }

    private boolean isMature(MessageStatistics messageStatistics) {
        return messageStatistics.getCreationTime() + (2 * this.messageValidity) <= System.currentTimeMillis();
    }

    private void uponAggregationTimer(AggregationTimer aggregationTimer, long j) {
        while (!this.toBeProcessed.isEmpty()) {
            try {
                addRecord(this.toBeProcessed.poll());
            } catch (Exception e) {
                this.logger.error(e);
                e.printStackTrace();
                return;
            }
        }
        if (this.timeline.isEmpty() || !isMature(this.timeline.peek())) {
            this.logger.debug("No messages are stable yet ({} entries in queue).", Integer.valueOf(this.timeline.size()));
            if (this.timeline.size() > 0) {
                this.logger.debug("{} seconds until first report", Long.valueOf(((this.timeline.peek().getCreationTime() + (2 * this.messageValidity)) - System.currentTimeMillis()) / 1000));
                return;
            }
            return;
        }
        int i = 0;
        float f = 0.0f;
        float f2 = 0.0f;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        float f3 = 0.0f;
        long creationTime = this.timeline.peek().getCreationTime();
        long creationTime2 = this.timeline.peek().getCreationTime();
        while (!this.timeline.isEmpty() && isMature(this.timeline.peek())) {
            MessageStatistics poll = this.timeline.poll();
            creationTime2 = poll.getCreationTime();
            while (this.membershipInfo.size() > 0 && poll.getCreationTime() > ((MembershipSnapshot) this.membershipInfo.getFirst()).getTimestamp()) {
                this.currentWindow = this.membershipInfo.pollFirst();
            }
            poll.computeReliability(this.currentWindow.size());
            i3++;
            if (poll.getDeliveryCount() > 1) {
                f3 += (poll.getReceiveCount() / (poll.getDeliveryCount() - 1)) - 1.0f;
            }
            f += poll.getReliability();
            f2 += (float) poll.getLatency();
            i2 += poll.getHighestHop();
            i4 += poll.getReceiveCount();
            i5 += poll.getReceiveCount() - poll.getDeliveryCount();
            i++;
            this.stats.remove(poll.getMsgID());
        }
        if (!$assertionsDisabled && i3 <= 0) {
            throw new AssertionError();
        }
        AggregatedBroadcastStatistics aggregatedBroadcastStatistics = new AggregatedBroadcastStatistics(creationTime, creationTime2, this.currentWindow.size(), f2 / i3, f / i3, i2 / i3, f3 / i3, i, i4, i5, 0, this.currentFanout);
        this.logger.info("Stats: {}", aggregatedBroadcastStatistics);
        if (this.publisher != null) {
            this.publisher.publishMetric(aggregatedBroadcastStatistics);
        }
        if (this.storage != null) {
            this.storage.addStatistic(aggregatedBroadcastStatistics);
        }
    }

    private void uponReceiveSendRecordMessage(SendRecordMessage sendRecordMessage, Host host, short s, int i) {
        this.logger.trace("recv {}", host.toString());
        this.toBeProcessed.add(sendRecordMessage.getRecord());
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        this.logger.trace("Host {} is down, cause: {}", outConnectionDown.getNode(), outConnectionDown.getCause());
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        this.logger.trace("Connection to host {} failed, cause: {}", outConnectionFailed.getNode(), outConnectionFailed.getCause());
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        this.logger.trace("Host (out) {} is up", outConnectionUp.getNode());
    }

    private void uponInConnectionUp(InConnectionUp inConnectionUp, int i) {
        this.logger.trace("Host (in) {} is up", inConnectionUp.getNode());
        this.hosts.add(inConnectionUp.getNode());
        this.membershipInfo.add(new MembershipSnapshot(this.hosts));
    }

    private void uponInConnectionDown(InConnectionDown inConnectionDown, int i) {
        this.logger.trace("Connection from host {} is down, cause: {}", inConnectionDown.getNode(), inConnectionDown.getCause());
        this.hosts.remove(inConnectionDown.getNode());
        this.membershipInfo.add(new MembershipSnapshot(this.hosts));
    }

    static {
        $assertionsDisabled = !Monitor.class.desiredAssertionStatus();
    }
}
