package pt.unl.di.novasys.babel.rc.eventual.antientropy;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.DeliverStateNotification;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.StateMergeNotification;
import pt.unl.di.novasys.babel.rc.commons.replies.state.replies.ReturnStateReply;
import pt.unl.di.novasys.babel.rc.commons.requests.state.requests.GetStateRequest;
import pt.unl.di.novasys.babel.rc.commons.requests.state.requests.PropagateStateRequest;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.AckMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.DeltaMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.PropagateDeltasTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.DeltaBuffer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.DeltaState;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.Utils;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaBasedCRDT;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

/* loaded from: input_file:pt/unl/di/novasys/babel/rc/eventual/antientropy/DeltaCausalAntiEntropy.class */
public class DeltaCausalAntiEntropy extends GenericProtocol {
    public static final String PROTOCOL_NAME = "DeltaCausalAntiEntropy";
    public static final short PROTOCOL_ID = 760;
    private static final Logger logger = LogManager.getLogger(DeltaCausalAntiEntropy.class);
    private Map<String, DeltaState> buffer;
    private Map<String, List<Host>> pendingGetCRDTs;
    private Map<String, DeltaMessage> merginMessages;
    private Map<String, Queue<DeltaMessage>> queueMessages;
    private final short networkPort;
    private int channelId;
    private final Peer myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private long gcTimeout;
    private long propagateTimeout;
    private short storageProtoID;

    public DeltaCausalAntiEntropy(Peer peer, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short) 760);
        this.pending = new TreeSet();
        this.connectedNeighbors = new TreeSet();
        this.buffer = new HashMap();
        this.pendingGetCRDTs = new HashMap();
        this.merginMessages = new HashMap();
        this.queueMessages = new HashMap();
        this.storageProtoID = Short.parseShort(properties.getProperty("ReplicationCore.storageSystemID"));
        this.myself = peer;
        String property = properties.getProperty("ReplicationCore.channel.address");
        String property2 = properties.getProperty("ReplicationCore.channel.port");
        this.gcTimeout = Long.parseLong(properties.getProperty("ReplicationCore.gcTimeout", "8000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("ReplicationCore.propagateTimeout", "4000"));
        this.networkPort = Short.parseShort(property2);
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", property2);
        this.channelId = createChannel("TCPChannel", properties2);
        registerRequestHandler((short) 521, this::uponPropagateCRDTRequest);
        registerReplyHandler((short) 621, this::uponReturnCRDTStateReply);
        subscribeNotification((short) 401, this::uponNeighborUp);
        subscribeNotification((short) 402, this::uponNeighborDown);
        subscribeNotification((short) 723, this::uponStateMerge);
        registerTimerHandler((short) 4003, this::uponPropagateDeltasTimer);
        registerTimerHandler((short) 404, this::uponGarbageCollectorTimer);
        registerMessageSerializer(this.channelId, (short) 602, AckMessage.serializer);
        registerMessageSerializer(this.channelId, (short) 603, DeltaMessage.serializer);
        registerMessageHandler(this.channelId, (short) 602, this::uponAckMessage);
        registerMessageHandler(this.channelId, (short) 603, this::uponDeltaMesssage);
        registerChannelEventHandler(this.channelId, (short) 3, this::uponOutConnectionDown);
        registerChannelEventHandler(this.channelId, (short) 4, this::uponOutConnectionFailed);
        registerChannelEventHandler(this.channelId, (short) 5, this::uponOutConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 2, this::uponInConnectionUp);
        registerChannelEventHandler(this.channelId, (short) 1, this::uponInConnectionDown);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        setupPeriodicTimer(new GarbageCollectorTimer(), this.gcTimeout, this.gcTimeout);
        setupPeriodicTimer(new PropagateDeltasTimer(), this.propagateTimeout, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage ackMessage, Host host, short s, int i) {
        int seqNumber = ackMessage.getSeqNumber();
        String objectID = ackMessage.getObjectID();
        Host origin = ackMessage.getOrigin();
        logger.debug("ACK: {} {} {}", origin.toString(), Integer.valueOf(seqNumber), objectID);
        this.buffer.get(objectID).updateAcks(origin, seqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage deltaMessage, Host host, short s, int i) {
        String objectID = deltaMessage.getObjectID();
        DeltaCRDT delta = deltaMessage.getDelta();
        if (!this.merginMessages.containsKey(objectID)) {
            this.merginMessages.put(objectID, deltaMessage);
            logger.debug("Sending state notification to storage layer with {} {} from {}", objectID, Integer.valueOf(deltaMessage.getSeqNumber()), deltaMessage.getOrigin().toString());
            triggerNotification(new DeliverStateNotification(objectID, delta));
        } else {
            logger.debug("Merge is already happening, putting in queue with {} {} from {}", objectID, Integer.valueOf(deltaMessage.getSeqNumber()), deltaMessage.getOrigin().toString());
            Queue<DeltaMessage> queue = this.queueMessages.get(objectID);
            if (queue == null) {
                queue = new LinkedList();
                this.queueMessages.put(objectID, queue);
            }
            queue.add(deltaMessage);
        }
    }

    void uponPropagateCRDTRequest(PropagateStateRequest propagateStateRequest, short s) {
        String objectID = propagateStateRequest.getObjectID();
        if (propagateStateRequest.getCrdt().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
            return;
        }
        DeltaCRDT crdt = propagateStateRequest.getCrdt();
        DeltaState deltaState = this.buffer.get(objectID);
        if (deltaState == null) {
            deltaState = new DeltaState(objectID);
            this.buffer.put(objectID, deltaState);
        }
        deltaState.putCRDT(crdt, this.myself);
    }

    void uponReturnCRDTStateReply(ReturnStateReply returnStateReply, short s) {
        String objectID = returnStateReply.getObjectID();
        if (returnStateReply.getCRDT() == null) {
            logger.warn("CRDT didn't exist in upper layer!");
            this.pendingGetCRDTs.remove(objectID);
            return;
        }
        if (!returnStateReply.getCRDT().getType().isOfFlavor(CRDTsTypes.FLAVOR.DELTA)) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        logger.debug("Got FULL STATE reply from {}", objectID);
        DeltaCRDT crdt = returnStateReply.getCRDT();
        List<Host> list = this.pendingGetCRDTs.get(returnStateReply.getObjectID());
        DeltaState deltaState = this.buffer.get(objectID);
        for (Host host : list) {
            if (deltaState.getACK(host).intValue() < deltaState.getSeqNumber()) {
                logger.debug("Sending delta message of object {} with FULL STATE to {}", objectID, host);
                sendMessage(new DeltaMessage(objectID, crdt, deltaState.getSeqNumber(), this.myself), host);
            }
        }
        this.pendingGetCRDTs.remove(objectID);
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer propagateDeltasTimer, long j) {
        Host host = (Host) Utils.getRandomSetElement(this.connectedNeighbors);
        if (host == null) {
            return;
        }
        for (Map.Entry<String, DeltaState> entry : this.buffer.entrySet()) {
            String key = entry.getKey();
            DeltaState value = entry.getValue();
            Integer valueOf = Integer.valueOf(value.getMinBufferSeqNum());
            Integer ack = value.getACK(host);
            logger.debug("Trying to propagate deltas from {} to {}", key, host.toString());
            logger.debug("ACK Host: {} {}", valueOf, ack);
            if (valueOf.intValue() == -1 || valueOf.intValue() > ack.intValue()) {
                logger.debug("Requesting full state");
                addToPendingCRDTRequests(key, host);
                sendRequest(new GetStateRequest(key, (short) 760), this.storageProtoID);
            } else if (!value.getBuffer().isEmpty()) {
                DeltaCRDT deltaCRDT = null;
                for (Map.Entry<Integer, DeltaBuffer> entry2 : value.getBuffer().entrySet()) {
                    Integer key2 = entry2.getKey();
                    DeltaCRDT delta = entry2.getValue().getDelta();
                    if (!entry2.getValue().getOrigin().equals(host) && key2.intValue() >= ack.intValue() && key2.intValue() < value.getSeqNumber()) {
                        if (deltaCRDT == null) {
                            deltaCRDT = delta;
                        } else if (deltaCRDT.mergeDelta((DeltaBasedCRDT) delta)) {
                            deltaCRDT.setReplicaID(delta.getReplicaID());
                        }
                    }
                }
                if (deltaCRDT != null && ack.intValue() < value.getSeqNumber()) {
                    logger.debug("Sending delta message of object {} to {}", key, host);
                    sendMessage(new DeltaMessage(key, deltaCRDT, value.getSeqNumber(), this.myself), host);
                }
            }
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer garbageCollectorTimer, long j) {
        for (DeltaState deltaState : this.buffer.values()) {
            if (!deltaState.getAcks().values().isEmpty()) {
                int intValue = ((Integer) Collections.max(deltaState.getAcks().values())).intValue();
                int intValue2 = ((Integer) Collections.min(deltaState.getAcks().values())).intValue();
                logger.debug("GC: {} {} {}", deltaState.getCrdtID(), Integer.valueOf(intValue2), Integer.valueOf(intValue));
                deltaState.getBuffer().keySet().forEach(num -> {
                    logger.debug("{}", num);
                    if (num.intValue() < intValue2) {
                        logger.debug("Removing {}", num);
                    }
                });
                deltaState.getBuffer().keySet().removeIf(num2 -> {
                    return num2.intValue() < intValue2;
                });
            }
        }
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        Host host = new Host(neighborUp.getPeer().getAddress(), this.networkPort);
        logger.info("Got new neighbor: {}", host.toString());
        if (this.connectedNeighbors.contains(host) || !this.pending.add(host)) {
            return;
        }
        openConnection(host);
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        Host host = new Host(neighborDown.getPeer().getAddress(), this.networkPort);
        logger.info("Neighbor down: {}", host.toString());
        if (this.connectedNeighbors.remove(host)) {
            closeConnection(host);
        } else if (this.pending.remove(host)) {
            closeConnection(host);
        }
    }

    private void uponStateMerge(StateMergeNotification stateMergeNotification, short s) {
        String objectID = stateMergeNotification.getObjectID();
        boolean isMerged = stateMergeNotification.isMerged();
        DeltaMessage remove = this.merginMessages.remove(objectID);
        logger.debug("RECEIVED StateMergeNotification on {} with delta {}. Changes: {}", objectID, remove, Boolean.valueOf(isMerged));
        if (remove == null) {
            return;
        }
        DeltaCRDT delta = remove.getDelta();
        int seqNumber = remove.getSeqNumber();
        if (isMerged) {
            DeltaState deltaState = this.buffer.get(objectID);
            if (deltaState == null) {
                deltaState = new DeltaState(objectID);
                this.buffer.put(objectID, deltaState);
            }
            deltaState.putCRDT(delta, remove.getOrigin());
        }
        Queue<DeltaMessage> queue = this.queueMessages.get(objectID);
        if (queue != null) {
            if (!queue.isEmpty()) {
                logger.debug("Queue was not empty, sending a state notification: {}", Boolean.valueOf(isMerged));
                this.merginMessages.put(objectID, queue.remove());
                triggerNotification(new DeliverStateNotification(objectID, delta));
            } else if (queue.isEmpty()) {
                logger.debug("Queue is empty, pruning", Boolean.valueOf(isMerged));
                this.queueMessages.remove(objectID);
            }
        }
        logger.debug("Sending ACKMessage to {} from {}", remove.getOrigin(), objectID);
        sendMessage(new AckMessage(objectID, seqNumber, this.myself), remove.getOrigin());
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        if (this.connectedNeighbors.contains(node) || this.pending.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.remove(node);
            closeConnection(node);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        if (this.connectedNeighbors.contains(node) || this.pending.contains(node)) {
            this.connectedNeighbors.remove(node);
            this.pending.add(node);
            openConnection(node);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        if (this.connectedNeighbors.contains(node) || !this.pending.remove(node)) {
            return;
        }
        this.connectedNeighbors.add(node);
    }

    private void uponInConnectionUp(InConnectionUp inConnectionUp, int i) {
        logger.trace("Host (in) {} is up", inConnectionUp.getNode());
    }

    private void uponInConnectionDown(InConnectionDown inConnectionDown, int i) {
        logger.trace("Connection from host {} is down, cause: {}", inConnectionDown.getNode(), inConnectionDown.getCause());
    }

    private void addToPendingCRDTRequests(String str, Host host) {
        List<Host> list = this.pendingGetCRDTs.get(str);
        if (list == null) {
            list = new LinkedList();
            this.pendingGetCRDTs.put(str, list);
        }
        list.add(host);
    }
}
