package tardis.management;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.DecreaseNumberNeighbors;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.IncreaseNumberNeighbors;
import pt.unl.fct.di.novasys.babel.core.adaptive.requests.Reconfigure;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.notifications.BroadcastDelivery;
import pt.unl.fct.di.novasys.babel.protocols.dissemination.requests.BroadcastRequest;
import pt.unl.fct.di.novasys.babel.protocols.eagerpush.AdaptiveEagerPushGossipBroadcast;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:tardis/management/Controller.class */
public class Controller extends GenericProtocol {
    private static final String PAR_RECONFIGURATOR = "reconfigurator";
    private static final Logger logger = LogManager.getLogger((Class<?>) Controller.class);
    public static final short PROTOCOL_ID = 13237;
    public static final String PROTOCOL_NAME = "Tardis-Controller";
    public static final String PAR_MEMBERSHIP_PROTO_ID = "Controller.membership.id";
    public static final String PAR_BROADCAST_PROTO_ID = "Controller.broadcast.id";
    public static final String PROP_CHANGE_BROADCAST_FANOUT_TYPE = "ChangeBroadcastFanout.type";
    public static final String PAR_ACTIVE_VIEW = "HyParView.ActiveView";
    public static final String PROP_AGENT_ADDRESS = "Agent.address";
    public static final String PROP_AGENT_PORT = "Agent.port";
    public static final String PROP_FANOUT = "Controller.fanout";
    private short numNeighbors;
    private short broadcastProtocolID;
    private short membershipProtocolID;
    private int broadcastFanout;
    private Host myself;

    public Controller(Properties properties, Host host) throws HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 13237);
        this.broadcastProtocolID = (short) 1601;
        this.myself = host;
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException {
        System.out.println(properties);
        if (properties.containsKey("HyParView.ActiveView")) {
            this.numNeighbors = Short.parseShort(properties.getProperty("HyParView.ActiveView"));
        }
        if (properties.containsKey(PAR_BROADCAST_PROTO_ID)) {
            this.broadcastProtocolID = Short.parseShort(properties.getProperty(PAR_BROADCAST_PROTO_ID));
        }
        if (properties.containsKey(PAR_MEMBERSHIP_PROTO_ID)) {
            this.membershipProtocolID = Short.parseShort(properties.getProperty(PAR_MEMBERSHIP_PROTO_ID));
        }
        this.broadcastFanout = Integer.parseInt(properties.getProperty(AdaptiveEagerPushGossipBroadcast.PAR_FANOUT, AdaptiveEagerPushGossipBroadcast.DEFAULT_FANOUT));
        if (properties.getProperty(PROP_CHANGE_BROADCAST_FANOUT_TYPE, "none").equals("agent") && Boolean.parseBoolean(properties.getProperty(PAR_RECONFIGURATOR, "false"))) {
            String property = properties.getProperty(PROP_AGENT_ADDRESS);
            int parseInt = Integer.parseInt(properties.getProperty(PROP_AGENT_PORT, "-1"));
            if (property == null || parseInt == -1) {
                logger.error("Missing address or port for agent communication");
                System.exit(1);
            }
            new Thread(() -> {
                subscribeToBroadcastReconfiguration(property, parseInt);
            }).start();
        }
        subscribeNotification((short) 501, this::uponBroadcastDelivery);
    }

    private void subscribeToBroadcastReconfiguration(String str, int i) {
        logger.info("Listening to broadcast reconfigurations");
        try {
            ZContext zContext = new ZContext();
            try {
                ZMQ.Socket createSocket = zContext.createSocket(SocketType.SUB);
                createSocket.connect(String.format("tcp://%s:%d", str, Integer.valueOf(i)));
                createSocket.subscribe(new byte[0]);
                while (!Thread.currentThread().isInterrupted()) {
                    int i2 = ByteBuffer.wrap(createSocket.recv(0)).getInt();
                    logger.debug("Received fanout {} from agent", Integer.valueOf(i2));
                    propagateBroadcastFanout(i2);
                }
                zContext.close();
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.warn("Failed to subscribe to fanout", (Throwable) e);
        }
    }

    private void reconfigureBroadcast(int i) {
        sendRequest(new Reconfigure.ReconfigureBuilder().addProperty("fanout", Integer.valueOf(i)).build(), this.broadcastProtocolID);
    }

    private void uponBroadcastDelivery(BroadcastDelivery broadcastDelivery, short s) {
        try {
            ChangeBroadcastFanoutMessage fromByteArray = ChangeBroadcastFanoutMessage.fromByteArray(broadcastDelivery.getPayload());
            logger.debug("Current broadcast fanout is {}", Integer.valueOf(this.broadcastFanout));
            logger.debug("Received broadcast fanout {} from {} ", Integer.valueOf(fromByteArray.getNewFanout()), fromByteArray.getSender());
            if (this.broadcastFanout == fromByteArray.getNewFanout()) {
                logger.debug("No need to adjust broadcast fanout");
                return;
            }
            int newFanout = fromByteArray.getNewFanout() - this.broadcastFanout;
            this.broadcastFanout = fromByteArray.getNewFanout();
            if (Math.abs(newFanout) != 1) {
                logger.error("Unexpected fanout value for update");
                throw new RuntimeException();
            }
            if (newFanout == 1) {
                logger.info("Increasing active view");
                sendRequest(new IncreaseNumberNeighbors(), this.membershipProtocolID);
            } else if (this.numNeighbors - 1 >= 3) {
                logger.info("Decreasing active view");
                sendRequest(new DecreaseNumberNeighbors(), this.membershipProtocolID);
            }
            reconfigureBroadcast(fromByteArray.getNewFanout());
        } catch (IOException e) {
        }
    }

    private void propagateBroadcastFanout(int i) {
        try {
            sendRequest(new BroadcastRequest(this.myself, new ChangeBroadcastFanoutMessage(this.myself.toString(), i).toByteArray(), (short) 13237), this.broadcastProtocolID);
        } catch (IOException e) {
            logger.error("Failed to serialize ChangeFanoutMessage");
        }
    }
}
