package tardis.management;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.core.adaptive.AdaptiveMembershipProtocol;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.GetAdaptiveFieldsReply;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.GetAdaptiveFieldsRequest;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.Reconfigure;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.AdaptiveEagerPushGossipBroadcast;
import pt.unl.fct.di.novasys.network.data.Host;
import tardis.management.timers.GetAdaptiveFieldsTimer;

/* loaded from: input_file:tardis/management/Controller.class */
public class Controller extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger((Class<?>) Controller.class);
    public static final short PROTOCOL_ID = 13237;
    public static final String PROTOCOL_NAME = "Tardis-Controller";
    public static final String PAR_MEMBERSHIP_PROTO_ID = "Controller.membership.id";
    public static final String PAR_BROADCAST_PROTO_ID = "Controller.broadcast.id";
    public static final String PROP_CHANGE_BROADCAST_FANOUT_TYPE = "ChangeBroadcastFanout.type";
    public static final String PROP_AGENT_ADDRESS = "Agent.address";
    public static final String PROP_AGENT_PORT = "Agent.port";
    public static final String PROP_FANOUT = "Controller.fanout";
    private String agentAddress;
    private int agentPort;
    private short membershipProtocolID;
    private short membershipNeighborCapacity;
    private short broadcastProtocolID;
    private int broadcastFanout;
    private Host myself;

    public Controller(Properties properties, Host host) throws HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 13237);
        this.membershipProtocolID = (short) 400;
        this.membershipNeighborCapacity = (short) -1;
        this.broadcastProtocolID = (short) 1601;
        this.myself = host;
        this.agentAddress = "";
        this.agentPort = -1;
        this.broadcastFanout = Integer.parseInt(AdaptiveEagerPushGossipBroadcast.DEFAULT_FANOUT);
        registerTimerHandler((short) 7438, this::uponGetAdaptiveFieldsTimer);
        registerReplyHandler((short) 706, this::uponGetAdaptiveFieldsReply);
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException {
        System.out.println(properties);
        if (properties.containsKey(PAR_MEMBERSHIP_PROTO_ID)) {
            this.membershipProtocolID = Short.parseShort(properties.getProperty(PAR_MEMBERSHIP_PROTO_ID));
        }
        if (properties.containsKey(PAR_BROADCAST_PROTO_ID)) {
            this.broadcastProtocolID = Short.parseShort(properties.getProperty(PAR_BROADCAST_PROTO_ID));
        }
        String property = properties.containsKey(PROP_CHANGE_BROADCAST_FANOUT_TYPE) ? properties.getProperty(PROP_CHANGE_BROADCAST_FANOUT_TYPE) : "none";
        if (properties.containsKey(PROP_AGENT_PORT)) {
            this.agentPort = Integer.parseInt(properties.getProperty(PROP_AGENT_PORT));
        }
        if (properties.containsKey(PROP_AGENT_ADDRESS)) {
            this.agentAddress = properties.getProperty(PROP_AGENT_ADDRESS);
        }
        if (properties.containsKey(PROP_AGENT_ADDRESS)) {
            this.agentAddress = properties.getProperty(PROP_AGENT_ADDRESS);
        }
        if (properties.containsKey(PROP_FANOUT)) {
            this.broadcastFanout = Integer.parseInt(properties.getProperty(PROP_FANOUT));
        }
        subscribeNotification((short) 501, this::uponBroadcastDelivery);
        if (property.equals("agent")) {
            if (this.agentPort == -1 || this.agentAddress.equals("")) {
                logger.error("Missing address or port for agent communication");
                System.exit(1);
            }
            logger.debug("Setting up an agent listener for changing broadcast fanout");
            new Thread(() -> {
                subscribeToBroadcastReconfiguration(this.agentAddress, this.agentPort);
            }).start();
        }
        sendRequest(new Reconfigure.ReconfigureBuilder().addProperty("fanout", Integer.valueOf(this.broadcastFanout)).build(), this.broadcastProtocolID);
    }

    private void subscribeToBroadcastReconfiguration(String str, int i) {
        try {
            ZContext zContext = new ZContext();
            try {
                ZMQ.Socket createSocket = zContext.createSocket(SocketType.SUB);
                createSocket.connect(String.format("tcp://%s:%d", str, Integer.valueOf(i)));
                createSocket.subscribe(new byte[0]);
                while (!Thread.currentThread().isInterrupted()) {
                    int i2 = ByteBuffer.wrap(createSocket.recv(0)).getInt();
                    logger.debug("Received fanout {} from agent", Integer.valueOf(i2));
                    propagateBroadcastFanout(i2);
                }
                zContext.close();
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.warn("Failed to subscribe to fanout", (Throwable) e);
        }
    }

    private void uponBroadcastDelivery(BroadcastDelivery broadcastDelivery, short s) {
        try {
            ChangeBroadcastFanoutMessage fromByteArray = ChangeBroadcastFanoutMessage.fromByteArray(broadcastDelivery.getPayload());
            logger.debug("Current broadcast fanout is {}", Integer.valueOf(this.broadcastFanout));
            logger.debug("Received broadcast fanout {} from {} ", Integer.valueOf(fromByteArray.getNewFanout()), fromByteArray.getSender());
            if (this.broadcastFanout == fromByteArray.getNewFanout()) {
                logger.debug("No need to adjust broadcast fanout");
            } else {
                sendRequest(new Reconfigure.ReconfigureBuilder().addProperty("fanout", Integer.valueOf(this.broadcastFanout)).build(), this.broadcastProtocolID);
                this.broadcastFanout = fromByteArray.getNewFanout();
            }
        } catch (IOException e) {
        }
    }

    private void propagateBroadcastFanout(int i) {
        try {
            sendRequest(new BroadcastRequest(this.myself, new ChangeBroadcastFanoutMessage(this.myself.toString(), i).toByteArray(), (short) 13237), this.broadcastProtocolID);
        } catch (IOException e) {
            logger.error("Failed to serialize ChangeFanoutMessage");
        }
    }

    private void uponGetAdaptiveFieldsTimer(GetAdaptiveFieldsTimer getAdaptiveFieldsTimer, long j) {
        if (this.membershipNeighborCapacity != -1 && this.broadcastFanout != -1) {
            logger.debug("Adaptive fields already received");
            cancelTimer(j);
            return;
        }
        logger.debug("Requesting adaptive fields to {} and {}", Short.valueOf(this.membershipProtocolID), Short.valueOf(this.broadcastProtocolID));
        if (this.membershipProtocolID != -1 && this.membershipNeighborCapacity == -1) {
            logger.debug("Missing number of neighbors");
            sendRequest(new GetAdaptiveFieldsRequest(), this.membershipProtocolID);
        }
        if (this.broadcastProtocolID == -1 || this.broadcastFanout != -1) {
            return;
        }
        logger.debug("Missing broadcast fanout");
        sendRequest(new GetAdaptiveFieldsRequest(), this.broadcastProtocolID);
    }

    private void uponGetAdaptiveFieldsReply(GetAdaptiveFieldsReply getAdaptiveFieldsReply, short s) {
        Integer num;
        logger.debug("Received adaptive fields: {}", getAdaptiveFieldsReply.fields);
        if (s == this.membershipProtocolID) {
            Long l = (Long) getAdaptiveFieldsReply.fields.get(AdaptiveMembershipProtocol.NUMBER_OF_NEIGHBORS);
            if (l != null) {
                this.membershipNeighborCapacity = l.shortValue();
                logger.debug("Received number of neighbors: " + this.membershipNeighborCapacity);
                return;
            }
            return;
        }
        if (s != this.broadcastProtocolID || (num = (Integer) getAdaptiveFieldsReply.fields.get("fanout")) == null) {
            return;
        }
        this.broadcastFanout = num.intValue();
        logger.debug("Received broadcast fanout: " + this.broadcastFanout);
    }
}
