package tardis.cfl.app;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import java.util.Scanner;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.fct.di.novasys.babel.core.Babel;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.exceptions.ProtocolAlreadyExistsException;
import pt.unl.fct.di.novasys.babel.protocols.client.ClientProtocol;
import pt.unl.fct.di.novasys.babel.protocols.general.notifications.ChannelAvailableNotification;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.babel.protocols.server.ServerProtocol;
import pt.unl.fct.di.novasys.network.data.Host;
import tardis.cfl.app.data.ClientData;
import tardis.cfl.app.messages.PrepareTrainingMessage;
import tardis.cfl.app.messages.ReadyMessage;
import tardis.cfl.app.messages.RegisterMessage;
import tardis.cfl.app.messages.RegisterReplyMessage;
import tardis.cfl.app.messages.RoundCompletedMessage;
import tardis.cfl.app.messages.StartNextRoundMessage;
import tardis.cfl.app.messages.StartTrainingMessage;
import tardis.cfl.app.messages.StartTrainingReplyMessage;
import tardis.cfl.app.messages.TerminateMessage;
import tardis.cfl.app.notifications.CleanUpNotification;
import tardis.cfl.app.notifications.ShowClientsNotification;
import tardis.cfl.app.notifications.StartTrainingNotification;

/* loaded from: input_file:tardis/cfl/app/CentralizedFL.class */
public class CentralizedFL extends GenericProtocol {
    private static final Logger logger = LogManager.getLogger((Class<?>) CentralizedFL.class);
    public static String configFileName = "babel.conf";
    public static final short PROTO_ID = 666;
    public static final String PROTO_NAME = "CentralizedFLSupport";
    public static final String PAR_FL_ROUNDS = "CentralizedFL.Rounds";
    public static final String DEFAULT_FL_ROUNDS = "100";
    public static final String PAR_FL_LOCAL_EPOCH = "CentralizedFL.LocalEpoch";
    public static final String DEFAULT_FL_LOCAL_EPOCH = "1";
    public static final String PAR_FL_DATA_LOCATION = "CentralizedFL.Data";
    public static final String DEFAULT_FL_DATA_LOCATION = "./cifar10_data";
    public static final String PAR_FL_BATCH_SIZE = "CentralizedFL.BatchSize";
    public static final String DEFAULT_FL_BATCH_SIZE = "4";
    public static final String PAR_FL_WORKERS = "CentralizedFL.Workers";
    public static final String DEFAULT_FL_WORKERS = "2";
    private Babel babel;
    private Properties props;
    private boolean isClient;
    private int nextIndexToAtribute;
    private final HashMap<String, ClientData> clientIndexes;
    private final HashMap<Integer, ClientData> clientData;
    private final HashMap<Host, ClientData> clients;
    private UUID myIdentifier;
    private int myIndex;
    private Host server;
    private int rounds;
    private int local_epoch;
    private String data;
    private int batch_size;
    private int workers;
    private Scanner pythonReader;
    private PrintStream pythonWriter;
    private BufferedReader pythonError;
    private Process pythonProcess;
    private final Thread controlThread;
    private Thread pythonErrorLog;
    private Thread trainingThread;
    private boolean running;
    private int clientsInTraining;
    private int missingClientReplies;
    private int currentRound;
    private boolean keepExecuting;

    public CentralizedFL(boolean z, String[] strArr) {
        super(PROTO_NAME, (short) 666);
        this.isClient = z;
        this.running = false;
        this.trainingThread = null;
        this.keepExecuting = true;
        try {
            this.props = Babel.loadConfig(strArr, configFileName);
        } catch (Exception e) {
            System.err.println("Error loading babel configuration");
            e.printStackTrace();
            System.exit(1);
        }
        this.babel = Babel.getInstance();
        try {
            subscribeNotification((short) 1, this::uponChannelAvailableNotification);
        } catch (HandlerRegistrationException e2) {
            e2.printStackTrace();
            System.exit(1);
        }
        this.data = this.props.getProperty(PAR_FL_DATA_LOCATION, DEFAULT_FL_DATA_LOCATION);
        this.batch_size = Integer.parseInt(this.props.getProperty(PAR_FL_BATCH_SIZE, DEFAULT_FL_BATCH_SIZE));
        this.workers = Integer.parseInt(this.props.getProperty(PAR_FL_WORKERS, DEFAULT_FL_WORKERS));
        if (this.isClient) {
            this.clientIndexes = null;
            this.clientData = null;
            this.clients = null;
            this.controlThread = null;
        } else {
            this.controlThread = new Thread(new Runnable() { // from class: tardis.cfl.app.CentralizedFL.1
                /* JADX WARN: Failed to find 'out' block for switch in B:11:0x0045. Please report as an issue. */
                /* JADX WARN: Failed to find 'out' block for switch in B:22:0x0094. Please report as an issue. */
                @Override // java.lang.Runnable
                public void run() {
                    boolean z2;
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
                    while (CentralizedFL.this.keepExecuting) {
                        try {
                            System.out.print("Type 'clients', 'start', or 'exit' to proceed\ncmd> ");
                            while (!bufferedReader.ready()) {
                                Thread.sleep(600L);
                            }
                            String trim = bufferedReader.readLine().trim();
                            z2 = -1;
                            switch (trim.hashCode()) {
                                case 3127582:
                                    if (trim.equals("exit")) {
                                        z2 = 2;
                                        break;
                                    }
                                    break;
                                case 109757538:
                                    if (trim.equals("start")) {
                                        z2 = true;
                                        break;
                                    }
                                    break;
                                case 860587528:
                                    if (trim.equals("clients")) {
                                        z2 = false;
                                        break;
                                    }
                                    break;
                            }
                        } catch (IOException | InterruptedException e3) {
                        }
                        switch (z2) {
                            case false:
                                synchronized (CentralizedFL.this.controlThread) {
                                    CentralizedFL.this.triggerNotification(new ShowClientsNotification());
                                    CentralizedFL.this.controlThread.wait();
                                }
                            case true:
                                if (CentralizedFL.this.running) {
                                    System.out.println("Training process is already running.");
                                } else {
                                    synchronized (CentralizedFL.this.controlThread) {
                                        CentralizedFL.this.triggerNotification(new StartTrainingNotification());
                                        CentralizedFL.this.controlThread.wait();
                                    }
                                }
                            case true:
                                CentralizedFL.this.keepExecuting = false;
                                bufferedReader.close();
                                System.err.println("Bye...");
                                System.exit(0);
                        }
                    }
                    try {
                        bufferedReader.close();
                    } catch (Exception e4) {
                    }
                }
            });
            this.clientIndexes = new HashMap<>();
            this.clientData = new HashMap<>();
            this.clients = new HashMap<>();
        }
        this.missingClientReplies = 0;
        this.currentRound = 0;
    }

    private void waitForLocalRoundToComplete() {
        this.trainingThread = new Thread(new Runnable() { // from class: tardis.cfl.app.CentralizedFL.2
            @Override // java.lang.Runnable
            public void run() {
                CentralizedFL.this.sendMessage(new RoundCompletedMessage(CentralizedFL.this.myIdentifier, CentralizedFL.this.currentRound, CentralizedFL.this.pythonReader.nextLine()), CentralizedFL.this.server);
            }
        });
        this.trainingThread.start();
    }

    @Override // pt.unl.fct.di.novasys.babel.core.GenericProtocol
    public void init(Properties properties) throws HandlerRegistrationException {
        subscribeNotification((short) 401, this::uponNeighborUP);
        subscribeNotification((short) 402, this::uponNeighborDOWN);
        subscribeNotification((short) 6662, this::uponShowClientsNotification);
        subscribeNotification((short) 6661, this::uponStartTrainingNotification);
        subscribeNotification((short) 6663, this::uponCleanUpNotification);
    }

    public void setup() throws ProtocolAlreadyExistsException, HandlerRegistrationException, IOException {
        init(this.props);
        if (this.isClient) {
            ClientProtocol clientProtocol = new ClientProtocol();
            clientProtocol.init(this.props);
            this.babel.registerProtocol(clientProtocol);
            logger.info("Registered Client Protocol");
            this.nextIndexToAtribute = -1;
            this.myIdentifier = UUID.randomUUID();
            this.server = null;
            this.myIndex = -1;
            try {
                initalizeClientPythonEnvironment();
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            ServerProtocol serverProtocol = new ServerProtocol();
            serverProtocol.init(this.props);
            this.babel.registerProtocol(serverProtocol);
            logger.info("Registered Server Protocol");
            this.nextIndexToAtribute = 0;
            this.myIdentifier = null;
            this.server = null;
            this.myIndex = -1;
            this.rounds = Integer.parseInt(this.props.getProperty(PAR_FL_ROUNDS, "100"));
            this.local_epoch = Integer.parseInt(this.props.getProperty(PAR_FL_LOCAL_EPOCH, DEFAULT_FL_LOCAL_EPOCH));
            try {
                initalizeServerPythonEnvironment();
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        this.babel.registerProtocol(this);
        logger.info("Registered Application Protocol");
    }

    private void startProcessingPythonErrorlog() {
        this.pythonErrorLog = new Thread(new Runnable() { // from class: tardis.cfl.app.CentralizedFL.3
            @Override // java.lang.Runnable
            public void run() {
                while (CentralizedFL.this.keepExecuting && CentralizedFL.this.pythonProcess.isAlive()) {
                    while (!CentralizedFL.this.pythonError.ready()) {
                        try {
                            Thread.sleep(500L);
                        } catch (IOException | InterruptedException e) {
                        }
                    }
                    do {
                        System.err.println(CentralizedFL.this.pythonError.readLine());
                    } while (CentralizedFL.this.pythonError.ready());
                    if (CentralizedFL.this.controlThread != null) {
                        CentralizedFL.this.controlThread.interrupt();
                    }
                }
            }
        });
        this.pythonErrorLog.start();
    }

    private void initalizeClientPythonEnvironment() throws IOException, InterruptedException {
        System.out.println("Initializing Python Clientland");
        this.pythonProcess = new ProcessBuilder("python3", "client.py").start();
        this.pythonWriter = new PrintStream(this.pythonProcess.getOutputStream());
        this.pythonReader = new Scanner(new InputStreamReader(this.pythonProcess.getInputStream()));
        this.pythonError = new BufferedReader(new InputStreamReader(this.pythonProcess.getErrorStream()));
        startProcessingPythonErrorlog();
        System.out.println("Setting up the base configuration on pythonland");
        this.pythonWriter.println(this.data);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.batch_size);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.workers);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
    }

    private void initalizeServerPythonEnvironment() throws IOException, InterruptedException {
        System.out.println("Initializing Python Serverland");
        this.pythonProcess = new ProcessBuilder("python3", "server.py").start();
        this.pythonWriter = new PrintStream(this.pythonProcess.getOutputStream());
        this.pythonReader = new Scanner(new InputStreamReader(this.pythonProcess.getInputStream()));
        this.pythonError = new BufferedReader(new InputStreamReader(this.pythonProcess.getErrorStream()));
        startProcessingPythonErrorlog();
        System.out.println("Setting up the base configuration on pythonland");
        this.pythonWriter.println(this.data);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.batch_size);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.workers);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.rounds);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.local_epoch);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
    }

    public void uponShowClientsNotification(ShowClientsNotification showClientsNotification, short s) {
        if (this.isClient) {
            logger.warn("Received a notifaction that is only for a server");
            return;
        }
        System.out.println("Client Information\n------------------\n");
        for (int i = 0; i < this.nextIndexToAtribute; i++) {
            System.out.println(this.clientData.get(Integer.valueOf(i)));
        }
        synchronized (this.controlThread) {
            this.controlThread.notify();
        }
    }

    public void uponStartTrainingNotification(StartTrainingNotification startTrainingNotification, short s) {
        if (this.isClient) {
            logger.warn("Received a notifaction that is only for a server");
            return;
        }
        this.missingClientReplies = this.clientIndexes.keySet().size();
        this.clientsInTraining = this.missingClientReplies;
        this.pythonWriter.println(this.missingClientReplies);
        this.pythonWriter.flush();
        String str = "";
        while (!str.contains("Setting up number of clients")) {
            str = this.pythonReader.nextLine();
            System.out.println(str);
        }
        for (ClientData clientData : this.clientData.values()) {
            clientData.setStandby();
            sendMessage(new PrepareTrainingMessage(this.missingClientReplies), clientData.getClientEndpoint());
        }
        this.running = true;
        synchronized (this.controlThread) {
            this.controlThread.notify();
        }
    }

    public void uponCleanUpNotification(CleanUpNotification cleanUpNotification, short s) {
        while (true) {
            String nextLine = this.pythonReader.nextLine();
            if (nextLine.equalsIgnoreCase("TERMINATED")) {
                this.pythonWriter.println("bye");
                this.pythonWriter.flush();
                this.pythonWriter.close();
                try {
                    this.pythonError.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(Duration.ofSeconds(1L));
                } catch (InterruptedException e2) {
                }
                System.exit(0);
            } else {
                System.out.println(nextLine);
            }
        }
    }

    public void uponChannelAvailableNotification(ChannelAvailableNotification channelAvailableNotification, short s) {
        registerSharedChannel(channelAvailableNotification.getChannelID());
        setDefaultChannel(channelAvailableNotification.getChannelID());
        try {
            registerMessageHandler(getDefaultChannel(), (short) 6661, this::uponReceiveRegisterMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6662, this::uponReceiveRegisterReplyMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6665, this::uponReceiveStartTrainingMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6663, this::uponReceivePrepareTrainingMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6664, this::uponReceiveReadyMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6666, this::uponReceiveStartTrainingReplyMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6667, this::uponReceiveStartNextRoundMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6668, this::uponReceiveRoundCompletedMessage);
            registerMessageHandler(getDefaultChannel(), (short) 6669, this::uponReceiveTerminateMessage);
            registerMessageSerializer(getDefaultChannel(), (short) 6661, RegisterMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6662, RegisterReplyMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6665, StartTrainingMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6663, PrepareTrainingMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6664, ReadyMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6666, StartTrainingReplyMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6667, StartNextRoundMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6668, RoundCompletedMessage.serializer);
            registerMessageSerializer(getDefaultChannel(), (short) 6669, TerminateMessage.serializer);
        } catch (HandlerRegistrationException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void uponReceiveRegisterMessage(RegisterMessage registerMessage, Host host, short s, int i) {
        if (this.isClient) {
            logger.warn("Received a message that is only for a server");
            return;
        }
        if (this.clientIndexes.containsKey(registerMessage.getHostID())) {
            ClientData clientData = this.clientIndexes.get(registerMessage.getHostID());
            if (clientData.isOffline()) {
                clientData.setOnline();
            }
            Host checkHostIsUpToDate = clientData.checkHostIsUpToDate(host);
            if (checkHostIsUpToDate != null) {
                this.clients.put(clientData.getClientEndpoint(), this.clients.remove(checkHostIsUpToDate));
            }
            sendMessage(new RegisterReplyMessage(registerMessage.getHostID(), (short) 2, clientData.getIndex(), this.rounds, this.local_epoch), host);
            return;
        }
        UUID fromString = UUID.fromString(registerMessage.getHostID());
        int i2 = this.nextIndexToAtribute;
        this.nextIndexToAtribute++;
        ClientData clientData2 = new ClientData(host, fromString, i2);
        this.clientIndexes.put(registerMessage.getHostID(), clientData2);
        this.clientData.put(Integer.valueOf(i2), clientData2);
        this.clients.put(host, clientData2);
        sendMessage(new RegisterReplyMessage(registerMessage.getHostID(), (short) 1, i2, this.rounds, this.local_epoch), host);
    }

    public void uponReceiveRegisterReplyMessage(RegisterReplyMessage registerReplyMessage, Host host, short s, int i) {
        if (!this.isClient) {
            logger.warn("Received a message that is only for a client");
            return;
        }
        if (!this.myIdentifier.toString().equals(registerReplyMessage.getHostID())) {
            logger.warn("Received a message for another client (my id: " + this.myIdentifier.toString() + " destination id: " + registerReplyMessage.getHostID() + ")");
            return;
        }
        if (!registerReplyMessage.isSuccessful()) {
            logger.warn("Failed to register with server: " + String.valueOf(host));
        }
        if (this.myIndex != -1) {
            logger.warn("Unexpected RegisterReplyMessage from: " + String.valueOf(host));
            return;
        }
        this.server = host;
        this.myIndex = registerReplyMessage.getIndex();
        this.rounds = registerReplyMessage.getRounds();
        this.local_epoch = registerReplyMessage.getLocalEpoch();
        this.pythonWriter.println(this.rounds);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.local_epoch);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        this.pythonWriter.println(this.myIndex);
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        logger.info("Client " + String.valueOf(this.myIdentifier) + " is setup with server " + String.valueOf(this.server) + " and index " + this.myIndex + " (rounds: " + this.rounds + "; local epoch: " + this.local_epoch + ")");
    }

    public void uponReceivePrepareTrainingMessage(PrepareTrainingMessage prepareTrainingMessage, Host host, short s, int i) {
        logger.info("Received PrepareTrainingMessage");
        if (!this.isClient) {
            logger.warn("Received a message that is only for a client");
            return;
        }
        this.pythonWriter.println(prepareTrainingMessage.getTotalClients());
        this.pythonWriter.flush();
        if (this.pythonReader.nextLine().equalsIgnoreCase("ok")) {
            sendMessage(new ReadyMessage(this.myIdentifier), host);
        }
        System.out.println(this.pythonReader.nextLine());
    }

    public void uponReceiveReadyMessage(ReadyMessage readyMessage, Host host, short s, int i) {
        logger.info("Received ReadyMessage");
        if (this.isClient) {
            logger.warn("Received a message that is only for a server");
            return;
        }
        ClientData clientData = this.clientIndexes.get(readyMessage.getClientID());
        if (clientData == null) {
            System.err.println("Unknown client: " + readyMessage.getClientID());
            if (this.controlThread != null) {
                this.controlThread.interrupt();
                return;
            }
            return;
        }
        if (!clientData.isStandby()) {
            System.err.println("Client " + String.valueOf(clientData.getID()) + " (" + clientData.getIndex() + ") is in incorrect state " + clientData.getStateText() + " expected " + ClientData.printState(ClientData.CState.STANDBY));
            if (this.controlThread != null) {
                this.controlThread.interrupt();
                return;
            }
            return;
        }
        clientData.setReady();
        this.missingClientReplies--;
        if (this.missingClientReplies == 0) {
            for (ClientData clientData2 : this.clientData.values()) {
                if (clientData2.isReady()) {
                    sendMessage(new StartTrainingMessage(), clientData2.getClientEndpoint());
                    clientData2.setRunning();
                }
            }
            this.pythonWriter.println("Start");
            this.missingClientReplies = this.clientsInTraining;
        }
    }

    public void uponReceiveStartTrainingMessage(StartTrainingMessage startTrainingMessage, Host host, short s, int i) {
        logger.info("Received StartTrainingMessage");
        if (!this.isClient) {
            logger.warn("Received a message that is only for a client");
            return;
        }
        this.pythonWriter.println("Start");
        this.pythonWriter.flush();
        System.out.println(this.pythonReader.nextLine());
        while (true) {
            try {
                sendMessage(new StartTrainingReplyMessage(this.myIdentifier, Integer.parseInt(this.pythonReader.nextLine())), host);
                return;
            } catch (NumberFormatException e) {
            }
        }
    }

    public void uponReceiveStartTrainingReplyMessage(StartTrainingReplyMessage startTrainingReplyMessage, Host host, short s, int i) {
        String str;
        logger.info("Received StartTrainingReplyMessage");
        if (this.isClient) {
            logger.warn("Received a message that is only for the server");
            return;
        }
        ClientData clientData = this.clientIndexes.get(startTrainingReplyMessage.getClientID());
        if (clientData == null || !clientData.isRunning()) {
            if (clientData == null) {
                System.err.println("Received a StartTrainingReply from an unkown client: " + startTrainingReplyMessage.getClientID());
            } else {
                System.err.println("Received a StartTrainingReply from a client in the incorrect state (" + clientData.getStateText() + " shoudb be " + ClientData.printState(ClientData.CState.RUNNING) + ")");
            }
            if (this.controlThread != null) {
                this.controlThread.interrupt();
                return;
            }
            return;
        }
        clientData.setTrainDataSetSize(startTrainingReplyMessage.getTrainsetLen());
        clientData.setWaiting();
        this.missingClientReplies--;
        if (this.missingClientReplies == 0) {
            int i2 = 0;
            boolean z = false;
            for (ClientData clientData2 : this.clients.values()) {
                if (clientData2.isWaiting()) {
                    i2++;
                } else {
                    System.err.println("Client " + String.valueOf(clientData2.getID()) + "(" + String.valueOf(clientData2.getClientEndpoint()) + ") is not in Waiting state");
                    z = true;
                }
            }
            if (i2 != this.clientsInTraining) {
                System.err.println("Only " + i2 + " clients are in Waiting state (should be " + this.clientsInTraining + ")");
                if (this.controlThread != null) {
                    this.controlThread.interrupt();
                    return;
                }
                return;
            }
            this.currentRound = 1;
            StringBuilder sb = new StringBuilder();
            for (int i3 = 0; i3 < this.nextIndexToAtribute; i3++) {
                ClientData clientData3 = this.clientData.get(Integer.valueOf(i3));
                if (clientData3 != null && clientData3.isWaiting()) {
                    clientData3.setRunning();
                    if (!sb.isEmpty()) {
                        sb.append(StringUtils.SPACE);
                    }
                    sb.append(clientData3.getIndex());
                    sb.append(StringUtils.SPACE);
                    sb.append(clientData3.getTrainDataSetSize());
                }
            }
            this.pythonWriter.println(sb.toString());
            this.pythonWriter.flush();
            String str2 = "";
            while (true) {
                str = str2;
                if (!str.isEmpty()) {
                    break;
                } else {
                    str2 = this.pythonReader.nextLine();
                }
            }
            for (ClientData clientData4 : this.clientData.values()) {
                if (clientData4.isRunning()) {
                    clientData4.setRound(this.currentRound);
                    sendMessage(new StartNextRoundMessage(this.currentRound, str), clientData4.getClientEndpoint());
                }
            }
            this.missingClientReplies = this.clientsInTraining;
            if (!z || this.controlThread == null) {
                return;
            }
            this.controlThread.interrupt();
        }
    }

    public void uponReceiveStartNextRoundMessage(StartNextRoundMessage startNextRoundMessage, Host host, short s, int i) {
        if (!this.isClient) {
            logger.warn("Received a message that is only for a client");
            return;
        }
        this.currentRound = startNextRoundMessage.getRound();
        System.err.println("Starting round " + this.currentRound);
        this.pythonWriter.println(startNextRoundMessage.getGlobalWeights());
        this.pythonWriter.flush();
        waitForLocalRoundToComplete();
    }

    public void uponReceiveRoundCompletedMessage(RoundCompletedMessage roundCompletedMessage, Host host, short s, int i) {
        logger.info("Received RoundCompletedMessage");
        if (this.isClient) {
            logger.warn("Received a message that is only for the server");
            return;
        }
        ClientData clientData = this.clientIndexes.get(roundCompletedMessage.getClientID());
        if (clientData == null || !clientData.isRunning()) {
            if (clientData == null) {
                System.err.println("Received a RoundCompletedMessage from an unkown client: " + roundCompletedMessage.getClientID());
            } else {
                System.err.println("Received a RoundCompletedMessage from a client in the incorrect state (" + clientData.getStateText() + " shoudb be " + ClientData.printState(ClientData.CState.RUNNING) + ")");
            }
            if (this.controlThread != null) {
                this.controlThread.interrupt();
                return;
            }
            return;
        }
        clientData.setWaiting();
        this.pythonWriter.println(clientData.getIndex());
        this.pythonWriter.println(roundCompletedMessage.getLocalWeights());
        this.pythonWriter.flush();
        this.missingClientReplies--;
        if (this.missingClientReplies == 0) {
            int i2 = 0;
            boolean z = false;
            for (ClientData clientData2 : this.clients.values()) {
                if (clientData2.isWaiting()) {
                    i2++;
                } else {
                    System.err.println("Client " + String.valueOf(clientData2.getID()) + "(" + String.valueOf(clientData2.getClientEndpoint()) + ") is not in Waiting state");
                    z = true;
                }
            }
            if (i2 != this.clientsInTraining) {
                System.err.println("Only " + i2 + " clients are in Waiting state (should be " + this.clientsInTraining + ")");
                if (this.controlThread != null) {
                    this.controlThread.interrupt();
                    return;
                }
                return;
            }
            this.currentRound++;
            if (this.currentRound <= this.rounds) {
                String nextLine = this.pythonReader.nextLine();
                for (ClientData clientData3 : this.clientData.values()) {
                    if (clientData3.isWaiting()) {
                        clientData3.setRound(this.currentRound);
                        clientData3.setRunning();
                        sendMessage(new StartNextRoundMessage(this.currentRound, nextLine), clientData3.getClientEndpoint());
                    }
                }
            } else {
                for (ClientData clientData4 : this.clientData.values()) {
                    if (clientData4.isWaiting()) {
                        sendMessage(new TerminateMessage(), clientData4.getClientEndpoint());
                    }
                }
                this.keepExecuting = false;
                this.pythonErrorLog.interrupt();
                if (this.controlThread != null) {
                    this.controlThread.interrupt();
                }
                System.out.println("\nTraining has terminated");
                triggerNotification(new CleanUpNotification());
            }
            this.missingClientReplies = this.clientsInTraining;
            if (!z || this.controlThread == null) {
                return;
            }
            this.controlThread.interrupt();
        }
    }

    public void uponReceiveTerminateMessage(TerminateMessage terminateMessage, Host host, short s, int i) {
        if (!this.isClient) {
            logger.warn("Received a message that is only for a client");
            return;
        }
        this.keepExecuting = false;
        this.pythonErrorLog.interrupt();
        this.pythonWriter.close();
        System.exit(0);
    }

    public void uponNeighborUP(NeighborUp neighborUp, short s) {
        logger.info("New neighbor: " + String.valueOf(neighborUp.getPeer()));
        if (this.controlThread != null) {
            this.controlThread.interrupt();
        }
        if (this.isClient && this.server == null) {
            sendMessage(new RegisterMessage(this.myIdentifier.toString()), neighborUp.getPeer());
        }
    }

    public void uponNeighborDOWN(NeighborDown neighborDown, short s) {
        ClientData clientData;
        logger.info("Neighbor lost: " + String.valueOf(neighborDown.getPeer()));
        if (!this.isClient && (clientData = this.clients.get(neighborDown.getPeer())) != null) {
            clientData.setOffline();
        }
        if (this.controlThread != null) {
            this.controlThread.interrupt();
        }
    }

    public void start() {
        this.babel.start();
        logger.info("Babel is started");
        if (this.controlThread != null) {
            this.controlThread.start();
        }
    }

    public static void main(String[] strArr) throws ProtocolAlreadyExistsException, HandlerRegistrationException, IOException {
        if (strArr.length < 1) {
            System.err.println("Missing arguments");
            System.err.println("arguments: <client/server> [configFIle] [parameter=value] ... [parameter=value]");
            System.exit(1);
        }
        boolean z = false;
        if (strArr[0].equalsIgnoreCase("client")) {
            z = true;
        } else if (!strArr[0].equalsIgnoreCase("server")) {
            System.err.println("Missing arguments");
            System.err.println("arguments: <client/server> [configFIle] [parameter=value] ... [parameter=value]");
            System.exit(1);
        }
        boolean z2 = false;
        if (strArr.length >= 2 && Files.exists(Path.of(strArr[1], new String[0]), LinkOption.NOFOLLOW_LINKS)) {
            configFileName = strArr[0];
            z2 = true;
        }
        CentralizedFL centralizedFL = new CentralizedFL(z, (String[]) Arrays.copyOfRange(strArr, z2 ? 2 : 1, strArr.length));
        centralizedFL.setup();
        centralizedFL.start();
    }
}
