package pt.unl.di.novasys.babel.nimbus.rc.partialentropypush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.AckMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.DeltaMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteOperationMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteOperationResultMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteStateMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteStateResultMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreCollection;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreKeySpace;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreManager;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.ConnectionTimeoutTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.PropagateDeltasTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.ConnectedHost;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.DeltaBuffer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.DeltaState;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.DeliverStateNotification;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.StateMergeNotification;
import pt.unl.di.novasys.babel.rc.commons.replies.state.ReturnStateReply;
import pt.unl.di.novasys.babel.rc.commons.requests.state.GetStateRequest;
import pt.unl.di.novasys.babel.rc.commons.requests.state.PropagateStateRequest;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.generic.DeltaCausalCRDT;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.implementations.DeltaOORMap;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaBasedCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.crdts.utils.ReplicaID;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.storage.operations.utils.CommonOperationStatus;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusConstants;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusID;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusUtils;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.ExecuteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.GetReplicationUnitReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteStateRequest;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicasModificationNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitDownNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitUpNotification;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.NimbusReplicationCore;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteStateReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.ExecuteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.GetReplicationUnitRequest;

/* loaded from: input_file:pt/unl/di/novasys/babel/nimbus/rc/partialentropypush/PartialAntiEntropyPush.class */
public class PartialAntiEntropyPush extends NimbusReplicationCore {
    public static final String PROTOCOL_NAME = "PartialAntiEntropyPush";
    private static final Logger logger = LogManager.getLogger(PartialAntiEntropyPush.class);
    private ReplicationCoreManager storage;
    private Map<NimbusID, List<Host>> pendingGetCRDTs;
    private Map<NimbusID, Queue<DeltaMessage>> queueMessages;
    private Map<NimbusID, Host> pendingRemoteOps;
    private Set<ConnectedHost> connected;
    private ReplicaID myself;
    private long gcTimeout;
    private long propagateTimeout;
    private int minGCAcks;
    private int fanout;
    private int connectionTimeout;
    private int port;

    public PartialAntiEntropyPush(Peer peer, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME);
        this.connected = new HashSet();
        this.pendingGetCRDTs = new HashMap();
        this.queueMessages = new HashMap();
        this.pendingRemoteOps = new HashMap();
        this.myself = new ReplicaID(peer);
        this.storage = new ReplicationCoreManager(this.myself);
        this.connectionTimeout = Integer.parseInt(properties.getProperty("NimbusReplicationCore.connectionTimeout", "30000"));
        this.minGCAcks = Integer.parseInt(properties.getProperty("NimbusReplicationCore.minGCAcks", "1"));
        this.fanout = Integer.parseInt(properties.getProperty("NimbusReplicationCore.fanout", "3"));
        this.gcTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.gcTimeout", "8000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.propagateTimeout", "4000"));
        String property = properties.getProperty("NimbusReplicationCore.channel.address");
        property = property == null ? peer.getAddress().getHostAddress() : property;
        String property2 = properties.getProperty("NimbusReplicationCore.channel.port");
        this.port = Integer.parseInt(property2);
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", property2);
        int createChannel = createChannel("TCPChannel", properties2);
        registerRequestHandler((short) 521, this::uponPropagateCRDTRequest);
        registerRequestHandler((short) 620, this::uponRemoteOperationRequest);
        registerRequestHandler((short) 621, this::uponRemoteStateRequest);
        registerReplyHandler((short) 621, this::uponReturnCRDTStateReply);
        registerReplyHandler((short) 625, this::uponRemoteOperationDeliverReply);
        registerReplyHandler((short) 626, this::uponRemoteStateDeliverReply);
        subscribeNotification((short) 723, this::uponStateMerge);
        subscribeNotification((short) 652, this::uponReplicasModification);
        subscribeNotification((short) 650, this::uponReplicaUnitUp);
        subscribeNotification((short) 656, this::uppnReplicaUnitDown);
        registerTimerHandler((short) 650, this::uponPropagateDeltasTimer);
        registerTimerHandler((short) 651, this::uponGarbageCollectorTimer);
        registerTimerHandler((short) 653, this::uponConnectionTimeoutTimer);
        registerMessageSerializer(createChannel, (short) 6400, AckMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6401, DeltaMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6402, RemoteOperationMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6403, RemoteOperationResultMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6404, RemoteStateMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6405, RemoteStateResultMessage.serializer);
        registerMessageHandler(createChannel, (short) 6400, this::uponAckMessage);
        registerMessageHandler(createChannel, (short) 6401, this::uponDeltaMesssage);
        registerMessageHandler(createChannel, (short) 6402, this::uponRemoteOperationMessage);
        registerMessageHandler(createChannel, (short) 6403, this::uponRemoteOperationResultMessage);
        registerMessageHandler(createChannel, (short) 6404, this::uponRemoteStateMessage);
        registerMessageHandler(createChannel, (short) 6405, this::uponRemoteStateResultMessage);
        registerChannelEventHandler(createChannel, (short) 3, this::uponOutConnectionDown);
        registerChannelEventHandler(createChannel, (short) 4, this::uponOutConnectionFailed);
        registerChannelEventHandler(createChannel, (short) 5, this::uponOutConnectionUp);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        setupPeriodicTimer(new GarbageCollectorTimer(), this.gcTimeout, this.gcTimeout);
        setupPeriodicTimer(new PropagateDeltasTimer(), this.propagateTimeout, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage ackMessage, Host host, short s, int i) {
        int seqNumber = ackMessage.getSeqNumber();
        NimbusID nimbusID = ackMessage.getNimbusID();
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        Host origin = ackMessage.getOrigin();
        logger.debug("ACK: {} {} {}", origin.toString(), Integer.valueOf(seqNumber), nimbusID.toString());
        this.storage.getDeltaState(keySpaceID, collectionID).updateAcks(origin, seqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage deltaMessage, Host host, short s, int i) {
        logger.debug("Remote delta message message on {}", deltaMessage.getNimbusID());
        NimbusID nimbusID = deltaMessage.getNimbusID();
        DeltaCausalCRDT delta = deltaMessage.getDelta();
        addToQueue(nimbusID, deltaMessage);
        logger.debug("Sending state notification to storage layer with {} {} from {}", nimbusID, Integer.valueOf(deltaMessage.getSeqNumber()), deltaMessage.getOrigin().toString());
        triggerNotification(new DeliverStateNotification(nimbusID.toString(), delta));
    }

    private void uponRemoteOperationMessage(RemoteOperationMessage remoteOperationMessage, Host host, short s, int i) {
        ExecuteOperationRequest executeOperationRequest = new ExecuteOperationRequest(remoteOperationMessage.getNimbusID(), remoteOperationMessage.getOperation());
        logger.debug("Remote operation message on {} and {}", executeOperationRequest.getNimbusID(), remoteOperationMessage.getOperation());
        this.pendingRemoteOps.put(remoteOperationMessage.getNimbusID(), host);
        sendRequest(executeOperationRequest, NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteOperationResultMessage(RemoteOperationResultMessage remoteOperationResultMessage, Host host, short s, int i) {
        sendReply(new RemoteOperationReply(remoteOperationResultMessage.getNimbusID(), remoteOperationResultMessage.getStatus(), remoteOperationResultMessage.getCRDT(), remoteOperationResultMessage.getMessage()), NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteStateMessage(RemoteStateMessage remoteStateMessage, Host host, short s, int i) {
        logger.debug("Remote state message on {}", remoteStateMessage.getNimbusID());
        GetReplicationUnitRequest getReplicationUnitRequest = new GetReplicationUnitRequest(remoteStateMessage.getNimbusID());
        this.pendingRemoteOps.put(remoteStateMessage.getNimbusID(), host);
        sendRequest(getReplicationUnitRequest, NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteStateResultMessage(RemoteStateResultMessage remoteStateResultMessage, Host host, short s, int i) {
        sendReply(new RemoteStateReply(remoteStateResultMessage.getNimbusID(), remoteStateResultMessage.getCollections()), NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    void uponPropagateCRDTRequest(PropagateStateRequest propagateStateRequest, short s) {
        NimbusID nimbusID = new NimbusID(propagateStateRequest.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (propagateStateRequest.getCrdt().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
        } else {
            this.storage.getDeltaState(keySpaceID, collectionID).putCRDT(nimbusID, propagateStateRequest.getCrdt(), this.myself.getHost());
        }
    }

    void uponRemoteOperationRequest(RemoteOperationRequest remoteOperationRequest, short s) {
        logger.debug("Remote operation request on {}", remoteOperationRequest.getNimbusID());
        RemoteOperationMessage remoteOperationMessage = new RemoteOperationMessage(remoteOperationRequest.getNimbusID(), remoteOperationRequest.getOperation());
        ReplicaID replicaFrom = this.storage.getReplicaFrom(remoteOperationMessage.getNimbusID().getKeySpaceID(), remoteOperationMessage.getNimbusID().getCollectionID());
        if (replicaFrom == null) {
            return;
        }
        logger.debug("Remote operation request on {} to {}", remoteOperationRequest.getNimbusID(), replicaFrom.toString());
        connectAndSendMessage(remoteOperationMessage, replicaFrom.getHost());
    }

    void uponRemoteStateRequest(RemoteStateRequest remoteStateRequest, short s) {
        logger.debug("Remote status request on {}", remoteStateRequest.getNimbusID());
        NimbusID nimbusID = remoteStateRequest.getNimbusID();
        RemoteStateMessage remoteStateMessage = new RemoteStateMessage(nimbusID);
        ReplicaID replicaFrom = nimbusID.isKeySpaceID() ? this.storage.getReplicaFrom(nimbusID.getKeySpaceID()) : this.storage.getReplicaFrom(nimbusID.getKeySpaceID(), nimbusID.getCollectionID());
        if (replicaFrom == null) {
            return;
        }
        logger.debug("Remote status request on {} to {}", remoteStateRequest.getNimbusID(), replicaFrom);
        connectAndSendMessage(remoteStateMessage, replicaFrom.getHost());
    }

    void uponReturnCRDTStateReply(ReturnStateReply returnStateReply, short s) {
        NimbusID nimbusID = new NimbusID(returnStateReply.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (returnStateReply.getCRDT() == null) {
            logger.debug("Collection didn't exist in upper layer!");
            this.pendingGetCRDTs.remove(nimbusID);
            return;
        }
        if (returnStateReply.getCRDT().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        DeltaCausalCRDT crdt = returnStateReply.getCRDT();
        logger.debug("Got FULL STATE reply from {}", nimbusID);
        List<Host> remove = this.pendingGetCRDTs.remove(nimbusID);
        if (remove == null) {
            return;
        }
        DeltaState deltaState = this.storage.getDeltaState(keySpaceID, collectionID);
        for (Host host : remove) {
            Integer ack = deltaState.getACK(host);
            logger.debug("{} {} {}", host.toString(), ack, Integer.valueOf(deltaState.getSeqNumber()));
            if (ack.intValue() < deltaState.getSeqNumber()) {
                logger.warn("Sending full CRDT {} with context {} to {}", nimbusID, crdt.getCausalContext().toString(), host);
                logger.debug("Sending full state message of object {} with FULL STATE to {}", nimbusID, host);
                connectAndSendMessage(new DeltaMessage(nimbusID, crdt, deltaState.getSeqNumber(), this.myself.getHost()), host);
            }
        }
    }

    void uponRemoteOperationDeliverReply(ExecuteOperationReply executeOperationReply, short s) {
        NimbusID nimbusID = executeOperationReply.getNimbusID();
        CommonOperationStatus status = executeOperationReply.getStatus();
        DeltaCausalCRDT crdt = executeOperationReply.getCRDT();
        String message = executeOperationReply.getMessage();
        Host remove = this.pendingRemoteOps.remove(nimbusID);
        if (remove == null) {
            return;
        }
        logger.debug("Sending remote operation reply {} {} {} {}", executeOperationReply.getNimbusID(), status, executeOperationReply.getCRDT(), message);
        connectAndSendMessage(new RemoteOperationResultMessage(nimbusID, status, crdt, message), remove);
    }

    void uponRemoteStateDeliverReply(GetReplicationUnitReply getReplicationUnitReply, short s) {
        logger.debug("Get replication unit reply on {} with collections {}", getReplicationUnitReply.getNimbusID(), getReplicationUnitReply.getCollections().keySet());
        NimbusID nimbusID = getReplicationUnitReply.getNimbusID();
        Map collections = getReplicationUnitReply.getCollections();
        Host remove = this.pendingRemoteOps.remove(nimbusID);
        if (remove == null) {
            return;
        }
        connectAndSendMessage(new RemoteStateResultMessage(nimbusID, collections), remove);
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer propagateDeltasTimer, long j) {
        for (Map.Entry<String, ReplicationCoreKeySpace> entry : this.storage.getKeySpaces().entrySet()) {
            String key = entry.getKey();
            ReplicationCoreKeySpace value = entry.getValue();
            ReplicaID replicaID = value.isReplica(this.myself) ? (ReplicaID) NimbusUtils.randomExcept(value.getReplicas(), this.myself) : null;
            logger.debug("Propagating keySpace {} deltas to {}", key, replicaID);
            for (Map.Entry<String, ReplicationCoreCollection> entry2 : value.getCollections().entrySet()) {
                NimbusID nimbusID = new NimbusID(key, entry2.getKey());
                ReplicationCoreCollection value2 = entry2.getValue();
                if (replicaID != null) {
                    handleDeltaPropagation(nimbusID, replicaID.getHost(), value2.getDeltaState());
                }
                if (value2.isReplica(this.myself)) {
                    ArrayList arrayList = new ArrayList(value2.getReplicas().size());
                    arrayList.addAll(value2.getReplicas());
                    arrayList.remove(replicaID);
                    arrayList.remove(this.myself);
                    logger.debug("Propagating collection {} deltas to {}", nimbusID, arrayList);
                    for (int i = 0; i < this.fanout && !arrayList.isEmpty(); i++) {
                        handleDeltaPropagation(nimbusID, ((ReplicaID) NimbusUtils.randomAndRemove(arrayList)).getHost(), value2.getDeltaState());
                    }
                }
            }
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer garbageCollectorTimer, long j) {
        this.storage.garbageCollectDeltas(this.minGCAcks);
    }

    private void uponConnectionTimeoutTimer(ConnectionTimeoutTimer connectionTimeoutTimer, long j) {
        Iterator<ConnectedHost> it = this.connected.iterator();
        while (it.hasNext()) {
            ConnectedHost next = it.next();
            if (next.getLastConnectionTime() + this.connectionTimeout > System.currentTimeMillis()) {
                it.remove();
                closeConnection(next.getHost());
            }
        }
    }

    private void uponReplicasModification(ReplicasModificationNotification replicasModificationNotification, short s) {
        String keySpaceID = replicasModificationNotification.getKeySpaceID();
        Set<ReplicaID> replicas = replicasModificationNotification.getReplicas();
        logger.debug("Replica unit modification {} {} {}", keySpaceID, replicasModificationNotification.getCollectionID(), replicas);
        if (replicasModificationNotification.isReplicatingKeySpace()) {
            this.storage.replaceReplicas(keySpaceID, replicas);
        } else {
            this.storage.replaceReplicas(keySpaceID, replicasModificationNotification.getCollectionID(), replicas);
        }
    }

    private void uponReplicaUnitUp(ReplicationUnitUpNotification replicationUnitUpNotification, short s) {
        String keySpaceID = replicationUnitUpNotification.getKeySpaceID();
        Set<ReplicaID> newReplicas = replicationUnitUpNotification.getNewReplicas();
        logger.debug("Replica unit up {} {} {}", keySpaceID, replicationUnitUpNotification.getCollectionID(), newReplicas);
        if (replicationUnitUpNotification.isReplicatingKeyspace()) {
            this.storage.addReplicas(keySpaceID, newReplicas);
        } else {
            this.storage.addReplicas(keySpaceID, replicationUnitUpNotification.getCollectionID(), newReplicas);
        }
    }

    private void uppnReplicaUnitDown(ReplicationUnitDownNotification replicationUnitDownNotification, short s) {
        String keySpaceID = replicationUnitDownNotification.getKeySpaceID();
        logger.debug("Replica unit down {} {} {}", keySpaceID, replicationUnitDownNotification.getCollectionID());
        if (replicationUnitDownNotification.isReplicatingKeyspace()) {
            this.storage.removeKeySpace(keySpaceID);
        } else {
            this.storage.removeCollection(keySpaceID, replicationUnitDownNotification.getCollectionID());
        }
    }

    private void uponStateMerge(StateMergeNotification stateMergeNotification, short s) {
        NimbusID nimbusID = new NimbusID(stateMergeNotification.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        boolean isMerged = stateMergeNotification.isMerged();
        DeltaCausalCRDT resultingMerge = stateMergeNotification.getResultingMerge();
        DeltaMessage removeFromQueue = removeFromQueue(nimbusID);
        logger.debug("RECEIVED StateMergeNotification on {} with delta {}. Changes: {}", nimbusID, removeFromQueue, Boolean.valueOf(isMerged));
        if (removeFromQueue == null) {
            return;
        }
        int seqNumber = removeFromQueue.getSeqNumber();
        if (isMerged && nimbusID.isFullID()) {
            this.storage.getDeltaState(keySpaceID, collectionID).putCRDT(nimbusID, resultingMerge, removeFromQueue.getOrigin());
        }
        logger.debug("Sending ACKMessage to {} from {}", removeFromQueue.getOrigin(), nimbusID.toString());
        connectAndSendMessage(new AckMessage(nimbusID, seqNumber, this.myself.getHost()), removeFromQueue.getOrigin());
        DeltaMessage pollFromQueue = pollFromQueue(nimbusID);
        if (pollFromQueue != null) {
            triggerNotification(new DeliverStateNotification(nimbusID.toString(), pollFromQueue.getDelta()));
        }
    }

    private void handleDeltaPropagation(NimbusID nimbusID, Host host, DeltaState deltaState) {
        Integer valueOf = Integer.valueOf(deltaState.getMinBufferSeqNum());
        Integer ack = deltaState.getACK(host);
        logger.debug("Trying to propagate deltas from {} to {}", nimbusID, host.toString());
        logger.debug("ACK Host: {} {}", valueOf, ack);
        logger.debug("Seq Number: {}", Integer.valueOf(deltaState.getSeqNumber()));
        if ((valueOf.intValue() == -1 && ack.intValue() == -1) || valueOf.intValue() > ack.intValue()) {
            logger.debug("Requesting full state");
            addToPendingCRDTRequests(nimbusID, host);
            sendRequest(new GetStateRequest(nimbusID.toString()), NimbusConstants.NIMBUS_PROTO_ID.shortValue());
            return;
        }
        if (ack.intValue() < deltaState.getSeqNumber() && !deltaState.getBuffer().isEmpty()) {
            boolean z = false;
            DeltaOORMap deltaOORMap = new DeltaOORMap(true, this.myself);
            for (Map.Entry<Integer, DeltaBuffer> entry : deltaState.getBuffer().entrySet()) {
                Integer key = entry.getKey();
                DeltaBasedCRDT delta = entry.getValue().getDelta();
                NimbusID nimbusID2 = entry.getValue().getNimbusID();
                if (!entry.getValue().getOrigin().equals(host) && key.intValue() >= ack.intValue() && key.intValue() < deltaState.getSeqNumber() && !nimbusID2.isCollectionID()) {
                    DeltaCausalCRDT deltaCausalCRDT = deltaOORMap.get(nimbusID2.getCRDTID());
                    if (deltaCausalCRDT == null) {
                        deltaOORMap.embeddedPut(nimbusID2.getCRDTID(), delta, true);
                    } else {
                        deltaCausalCRDT.mergeDelta(delta);
                    }
                    z = true;
                }
            }
            if (z) {
                logger.debug("Sending delta message of object {} to {}", nimbusID, host);
                connectAndSendMessage(new DeltaMessage(nimbusID, deltaOORMap, deltaState.getSeqNumber(), this.myself.getHost()), host);
            }
        }
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        this.connected.remove(node);
        closeConnection(node);
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        this.connected.remove(node);
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        this.connected.add(new ConnectedHost(node));
    }

    private void connectAndSendMessage(ProtoMessage protoMessage, Host host) {
        Host host2 = new Host(host.getAddress(), this.port);
        if (!this.connected.contains(new ConnectedHost(host2))) {
            openConnection(host2);
        }
        logger.debug("Sending message to {} with {}", host2, protoMessage);
        sendMessage(protoMessage, host2);
    }

    private void addToPendingCRDTRequests(NimbusID nimbusID, Host host) {
        List<Host> list = this.pendingGetCRDTs.get(nimbusID);
        if (list == null) {
            list = new LinkedList();
            this.pendingGetCRDTs.put(nimbusID, list);
        }
        list.add(host);
    }

    private void addToQueue(NimbusID nimbusID, DeltaMessage deltaMessage) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            queue = new LinkedList();
            this.queueMessages.put(nimbusID, queue);
        }
        queue.add(deltaMessage);
    }

    private DeltaMessage pollFromQueue(NimbusID nimbusID) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            return null;
        }
        return queue.poll();
    }

    private DeltaMessage removeFromQueue(NimbusID nimbusID) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            return null;
        }
        DeltaMessage remove = queue.remove();
        if (remove == null) {
            this.queueMessages.remove(nimbusID);
        }
        return remove;
    }
}
