/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.di.novasys.babel.nimbus.rc.partialentropypush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.AckMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.DeltaMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteOperationMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteOperationResultMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteStateMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.messages.RemoteStateResultMessage;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreCollection;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreKeySpace;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.structures.ReplicationCoreManager;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.ConnectionTimeoutTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.timers.PropagateDeltasTimer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.ConnectedHost;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.DeltaBuffer;
import pt.unl.di.novasys.babel.nimbus.rc.partialentropypush.utils.DeltaState;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.DeliverStateNotification;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.StateMergeNotification;
import pt.unl.di.novasys.babel.rc.commons.replies.state.ReturnStateReply;
import pt.unl.di.novasys.babel.rc.commons.requests.state.GetStateRequest;
import pt.unl.di.novasys.babel.rc.commons.requests.state.PropagateStateRequest;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.generic.DeltaCausalCRDT;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.implementations.DeltaOORMap;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaBasedCRDT;
import pt.unl.fct.di.novasys.babel.crdts.generic.GenericCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.crdts.utils.ReplicaID;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.generic.ProtoNotification;
import pt.unl.fct.di.novasys.babel.generic.ProtoReply;
import pt.unl.fct.di.novasys.babel.generic.ProtoRequest;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.storage.operations.utils.CommonOperationStatus;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusConstants;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusID;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusUtils;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.ExecuteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.GetReplicationUnitReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteStateRequest;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicasModificationNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitDownNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitUpNotification;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.NimbusReplicationCore;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteStateReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.ExecuteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.GetReplicationUnitRequest;

public class PartialAntiEntropyPush
extends NimbusReplicationCore {
    public static final String PROTOCOL_NAME = "PartialAntiEntropyPush";
    private static final Logger logger = LogManager.getLogger(PartialAntiEntropyPush.class);
    private ReplicationCoreManager storage;
    private Map<NimbusID, List<Host>> pendingGetCRDTs;
    private Map<NimbusID, Queue<DeltaMessage>> queueMessages;
    private Map<NimbusID, Host> pendingRemoteOps;
    private Set<ConnectedHost> connected = new HashSet<ConnectedHost>();
    private ReplicaID myself;
    private long gcTimeout;
    private long propagateTimeout;
    private int minGCAcks;
    private int fanout;
    private int connectionTimeout;
    private int port;

    public PartialAntiEntropyPush(Peer myself, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME);
        this.pendingGetCRDTs = new HashMap<NimbusID, List<Host>>();
        this.queueMessages = new HashMap<NimbusID, Queue<DeltaMessage>>();
        this.pendingRemoteOps = new HashMap<NimbusID, Host>();
        this.myself = new ReplicaID(myself);
        this.storage = new ReplicationCoreManager(this.myself);
        this.connectionTimeout = Integer.parseInt(properties.getProperty("NimbusReplicationCore.connectionTimeout", "30000"));
        this.minGCAcks = Integer.parseInt(properties.getProperty("NimbusReplicationCore.minGCAcks", "1"));
        this.fanout = Integer.parseInt(properties.getProperty("NimbusReplicationCore.fanout", "3"));
        this.gcTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.gcTimeout", "8000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.propagateTimeout", "4000"));
        String address = properties.getProperty("NimbusReplicationCore.channel.address");
        if (address == null) {
            address = myself.getAddress().getHostAddress();
        }
        String port = properties.getProperty("NimbusReplicationCore.channel.port");
        this.port = Integer.parseInt(port);
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        int channelId = this.createChannel("TCPChannel", channelProps);
        this.registerRequestHandler((short)521, this::uponPropagateCRDTRequest);
        this.registerRequestHandler((short)620, this::uponRemoteOperationRequest);
        this.registerRequestHandler((short)621, this::uponRemoteStateRequest);
        this.registerReplyHandler((short)621, this::uponReturnCRDTStateReply);
        this.registerReplyHandler((short)625, this::uponRemoteOperationDeliverReply);
        this.registerReplyHandler((short)626, this::uponRemoteStateDeliverReply);
        this.subscribeNotification((short)723, this::uponStateMerge);
        this.subscribeNotification((short)652, this::uponReplicasModification);
        this.subscribeNotification((short)650, this::uponReplicaUnitUp);
        this.subscribeNotification((short)656, this::uppnReplicaUnitDown);
        this.registerTimerHandler((short)650, this::uponPropagateDeltasTimer);
        this.registerTimerHandler((short)651, this::uponGarbageCollectorTimer);
        this.registerTimerHandler((short)653, this::uponConnectionTimeoutTimer);
        this.registerMessageSerializer(channelId, (short)6400, AckMessage.serializer);
        this.registerMessageSerializer(channelId, (short)6401, DeltaMessage.serializer);
        this.registerMessageSerializer(channelId, (short)6402, RemoteOperationMessage.serializer);
        this.registerMessageSerializer(channelId, (short)6403, RemoteOperationResultMessage.serializer);
        this.registerMessageSerializer(channelId, (short)6404, RemoteStateMessage.serializer);
        this.registerMessageSerializer(channelId, (short)6405, RemoteStateResultMessage.serializer);
        this.registerMessageHandler(channelId, (short)6400, this::uponAckMessage);
        this.registerMessageHandler(channelId, (short)6401, this::uponDeltaMesssage);
        this.registerMessageHandler(channelId, (short)6402, this::uponRemoteOperationMessage);
        this.registerMessageHandler(channelId, (short)6403, this::uponRemoteOperationResultMessage);
        this.registerMessageHandler(channelId, (short)6404, this::uponRemoteStateMessage);
        this.registerMessageHandler(channelId, (short)6405, this::uponRemoteStateResultMessage);
        this.registerChannelEventHandler(channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(channelId, (short)5, this::uponOutConnectionUp);
    }

    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.setupPeriodicTimer(new GarbageCollectorTimer(), this.gcTimeout, this.gcTimeout);
        this.setupPeriodicTimer(new PropagateDeltasTimer(), this.propagateTimeout, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage msg, Host sender, short protoID, int cID) {
        int ackSeqNumber = msg.getSeqNumber();
        NimbusID nimbusID = msg.getNimbusID();
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        Host origin = msg.getOrigin();
        logger.debug("ACK: {} {} {}", (Object)origin.toString(), (Object)ackSeqNumber, (Object)nimbusID.toString());
        DeltaState state = this.storage.getDeltaState(keySpaceID, collectionID);
        state.updateAcks(origin, ackSeqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage msg, Host sender, short protoID, int cID) {
        logger.debug("Remote delta message message on {}", (Object)msg.getNimbusID());
        NimbusID nimbusID = msg.getNimbusID();
        DeltaCausalCRDT delta = msg.getDelta();
        this.addToQueue(nimbusID, msg);
        logger.debug("Sending state notification to storage layer with {} {} from {}", (Object)nimbusID, (Object)msg.getSeqNumber(), (Object)msg.getOrigin().toString());
        this.triggerNotification((ProtoNotification)new DeliverStateNotification(nimbusID.toString(), (GenericCRDT)delta));
    }

    private void uponRemoteOperationMessage(RemoteOperationMessage msg, Host sender, short protoID, int cID) {
        ExecuteOperationRequest request = new ExecuteOperationRequest(msg.getNimbusID(), msg.getOperation());
        logger.debug("Remote operation message on {} and {}", (Object)request.getNimbusID(), (Object)msg.getOperation());
        this.pendingRemoteOps.put(msg.getNimbusID(), sender);
        this.sendRequest((ProtoRequest)request, NimbusConstants.NIMBUS_PROTO_ID);
    }

    private void uponRemoteOperationResultMessage(RemoteOperationResultMessage msg, Host sender, short protoID, int cID) {
        RemoteOperationReply reply = new RemoteOperationReply(msg.getNimbusID(), msg.getStatus(), msg.getCRDT(), msg.getMessage());
        this.sendReply((ProtoReply)reply, NimbusConstants.NIMBUS_PROTO_ID);
    }

    private void uponRemoteStateMessage(RemoteStateMessage msg, Host sender, short protoID, int cID) {
        logger.debug("Remote state message on {}", (Object)msg.getNimbusID());
        GetReplicationUnitRequest request = new GetReplicationUnitRequest(msg.getNimbusID());
        this.pendingRemoteOps.put(msg.getNimbusID(), sender);
        this.sendRequest((ProtoRequest)request, NimbusConstants.NIMBUS_PROTO_ID);
    }

    private void uponRemoteStateResultMessage(RemoteStateResultMessage msg, Host sender, short protoID, int cID) {
        RemoteStateReply reply = new RemoteStateReply(msg.getNimbusID(), msg.getCollections());
        this.sendReply((ProtoReply)reply, NimbusConstants.NIMBUS_PROTO_ID);
    }

    void uponPropagateCRDTRequest(PropagateStateRequest request, short protoID) {
        NimbusID nimbusID = new NimbusID(request.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (request.getCrdt().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
            return;
        }
        DeltaCausalCRDT crdt = (DeltaCausalCRDT)request.getCrdt();
        DeltaState crdtState = this.storage.getDeltaState(keySpaceID, collectionID);
        crdtState.putCRDT(nimbusID, crdt, (Host)this.myself.getHost());
    }

    void uponRemoteOperationRequest(RemoteOperationRequest request, short protoID) {
        logger.debug("Remote operation request on {}", (Object)request.getNimbusID());
        RemoteOperationMessage msg = new RemoteOperationMessage(request.getNimbusID(), request.getOperation());
        ReplicaID replica = this.storage.getReplicaFrom(msg.getNimbusID().getKeySpaceID(), msg.getNimbusID().getCollectionID());
        if (replica == null) {
            return;
        }
        logger.debug("Remote operation request on {} to {}", (Object)request.getNimbusID(), (Object)replica.toString());
        this.connectAndSendMessage(msg, (Host)replica.getHost());
    }

    void uponRemoteStateRequest(RemoteStateRequest request, short protoID) {
        logger.debug("Remote status request on {}", (Object)request.getNimbusID());
        NimbusID nimbusID = request.getNimbusID();
        RemoteStateMessage msg = new RemoteStateMessage(nimbusID);
        ReplicaID replica = null;
        replica = nimbusID.isKeySpaceID() ? this.storage.getReplicaFrom(nimbusID.getKeySpaceID()) : this.storage.getReplicaFrom(nimbusID.getKeySpaceID(), nimbusID.getCollectionID());
        if (replica == null) {
            return;
        }
        logger.debug("Remote status request on {} to {}", (Object)request.getNimbusID(), (Object)replica);
        this.connectAndSendMessage(msg, (Host)replica.getHost());
    }

    void uponReturnCRDTStateReply(ReturnStateReply reply, short protoID) {
        NimbusID nimbusID = new NimbusID(reply.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (reply.getCRDT() == null) {
            logger.debug("Collection didn't exist in upper layer!");
            this.pendingGetCRDTs.remove(nimbusID);
            return;
        }
        if (reply.getCRDT().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        DeltaCausalCRDT crdt = (DeltaCausalCRDT)reply.getCRDT();
        logger.debug("Got FULL STATE reply from {}", (Object)nimbusID);
        List<Host> pendingBuffer = this.pendingGetCRDTs.remove(nimbusID);
        if (pendingBuffer == null) {
            return;
        }
        DeltaState state = this.storage.getDeltaState(keySpaceID, collectionID);
        for (Host host : pendingBuffer) {
            Integer lastAckHost = state.getACK(host);
            logger.debug("{} {} {}", (Object)host.toString(), (Object)lastAckHost, (Object)state.getSeqNumber());
            if (lastAckHost >= state.getSeqNumber()) continue;
            logger.warn("Sending full CRDT {} with context {} to {}", (Object)nimbusID, (Object)crdt.getCausalContext().toString(), (Object)host);
            logger.debug("Sending full state message of object {} with FULL STATE to {}", (Object)nimbusID, (Object)host);
            DeltaMessage msg = new DeltaMessage(nimbusID, crdt, state.getSeqNumber(), (Host)this.myself.getHost());
            this.connectAndSendMessage(msg, host);
        }
    }

    void uponRemoteOperationDeliverReply(ExecuteOperationReply reply, short protoID) {
        NimbusID nimbusID = reply.getNimbusID();
        CommonOperationStatus status = reply.getStatus();
        DeltaCausalCRDT crdt = reply.getCRDT();
        String message = reply.getMessage();
        Host sender = this.pendingRemoteOps.remove(nimbusID);
        if (sender == null) {
            return;
        }
        logger.debug("Sending remote operation reply {} {} {} {}", (Object)reply.getNimbusID(), (Object)status, (Object)reply.getCRDT(), (Object)message);
        RemoteOperationResultMessage msg = new RemoteOperationResultMessage(nimbusID, status, crdt, message);
        this.connectAndSendMessage(msg, sender);
    }

    void uponRemoteStateDeliverReply(GetReplicationUnitReply reply, short protoID) {
        logger.debug("Get replication unit reply on {} with collections {}", (Object)reply.getNimbusID(), reply.getCollections().keySet());
        NimbusID nimbusID = reply.getNimbusID();
        Map collections = reply.getCollections();
        Host sender = this.pendingRemoteOps.remove(nimbusID);
        if (sender == null) {
            return;
        }
        RemoteStateResultMessage msg = new RemoteStateResultMessage(nimbusID, collections);
        this.connectAndSendMessage(msg, sender);
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer timer, long timerID) {
        Map<String, ReplicationCoreKeySpace> keySpaces = this.storage.getKeySpaces();
        for (Map.Entry<String, ReplicationCoreKeySpace> keySpacePair : keySpaces.entrySet()) {
            String keySpaceID = keySpacePair.getKey();
            ReplicationCoreKeySpace keySpace = keySpacePair.getValue();
            ReplicaID keySpaceDestination = keySpace.isReplica(this.myself) ? (ReplicaID)NimbusUtils.randomExcept(keySpace.getReplicas(), (Object)this.myself) : null;
            logger.debug("Propagating keySpace {} deltas to {}", (Object)keySpaceID, (Object)keySpaceDestination);
            for (Map.Entry<String, ReplicationCoreCollection> collectionPair : keySpace.getCollections().entrySet()) {
                String collectionID = collectionPair.getKey();
                NimbusID nimbusCollectionID = new NimbusID(keySpaceID, collectionID);
                ReplicationCoreCollection collection = collectionPair.getValue();
                if (keySpaceDestination != null) {
                    this.handleDeltaPropagation(nimbusCollectionID, (Host)keySpaceDestination.getHost(), collection.getDeltaState());
                }
                if (!collection.isReplica(this.myself)) continue;
                ArrayList<ReplicaID> validTargets = new ArrayList<ReplicaID>(collection.getReplicas().size());
                validTargets.addAll(collection.getReplicas());
                validTargets.remove(keySpaceDestination);
                validTargets.remove(this.myself);
                logger.debug("Propagating collection {} deltas to {}", (Object)nimbusCollectionID, validTargets);
                for (int i = 0; i < this.fanout && !validTargets.isEmpty(); ++i) {
                    ReplicaID collectionDestination = (ReplicaID)NimbusUtils.randomAndRemove(validTargets);
                    this.handleDeltaPropagation(nimbusCollectionID, (Host)collectionDestination.getHost(), collection.getDeltaState());
                }
            }
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer timer, long timerID) {
        this.storage.garbageCollectDeltas(this.minGCAcks);
    }

    private void uponConnectionTimeoutTimer(ConnectionTimeoutTimer timer, long timerID) {
        Iterator<ConnectedHost> it = this.connected.iterator();
        while (it.hasNext()) {
            ConnectedHost cO = it.next();
            if (cO.getLastConnectionTime() + (long)this.connectionTimeout <= System.currentTimeMillis()) continue;
            it.remove();
            this.closeConnection(cO.getHost());
        }
    }

    private void uponReplicasModification(ReplicasModificationNotification notification, short protoID) {
        String keySpaceID = notification.getKeySpaceID();
        Set replicas = notification.getReplicas();
        logger.debug("Replica unit modification {} {} {}", (Object)keySpaceID, (Object)notification.getCollectionID(), (Object)replicas);
        if (notification.isReplicatingKeySpace()) {
            this.storage.replaceReplicas(keySpaceID, replicas);
        } else {
            String collectionID = notification.getCollectionID();
            this.storage.replaceReplicas(keySpaceID, collectionID, replicas);
        }
    }

    private void uponReplicaUnitUp(ReplicationUnitUpNotification notification, short protoID) {
        String keySpaceID = notification.getKeySpaceID();
        Set newReplicas = notification.getNewReplicas();
        logger.debug("Replica unit up {} {} {}", (Object)keySpaceID, (Object)notification.getCollectionID(), (Object)newReplicas);
        if (notification.isReplicatingKeyspace()) {
            this.storage.addReplicas(keySpaceID, newReplicas);
        } else {
            String collectionID = notification.getCollectionID();
            this.storage.addReplicas(keySpaceID, collectionID, newReplicas);
        }
    }

    private void uppnReplicaUnitDown(ReplicationUnitDownNotification notification, short protoID) {
        String keySpaceID = notification.getKeySpaceID();
        logger.debug("Replica unit down {} {} {}", (Object)keySpaceID, (Object)notification.getCollectionID());
        if (notification.isReplicatingKeyspace()) {
            this.storage.removeKeySpace(keySpaceID);
        } else {
            String collectionID = notification.getCollectionID();
            this.storage.removeCollection(keySpaceID, collectionID);
        }
    }

    private void uponStateMerge(StateMergeNotification merge, short protoID) {
        NimbusID nimbusID = new NimbusID(merge.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        boolean changed = merge.isMerged();
        DeltaCausalCRDT delta = (DeltaCausalCRDT)merge.getResultingMerge();
        DeltaMessage mergedMsg = this.removeFromQueue(nimbusID);
        logger.debug("RECEIVED StateMergeNotification on {} with delta {}. Changes: {}", (Object)nimbusID, (Object)mergedMsg, (Object)changed);
        if (mergedMsg == null) {
            return;
        }
        int seqNumber = mergedMsg.getSeqNumber();
        if (changed && nimbusID.isFullID()) {
            DeltaState crdtState = this.storage.getDeltaState(keySpaceID, collectionID);
            crdtState.putCRDT(nimbusID, delta, mergedMsg.getOrigin());
        }
        logger.debug("Sending ACKMessage to {} from {}", (Object)mergedMsg.getOrigin(), (Object)nimbusID.toString());
        AckMessage ack = new AckMessage(nimbusID, seqNumber, (Host)this.myself.getHost());
        this.connectAndSendMessage(ack, mergedMsg.getOrigin());
        DeltaMessage toMergeMsg = this.pollFromQueue(nimbusID);
        if (toMergeMsg != null) {
            this.triggerNotification((ProtoNotification)new DeliverStateNotification(nimbusID.toString(), (GenericCRDT)toMergeMsg.getDelta()));
        }
    }

    private void handleDeltaPropagation(NimbusID nimbusID, Host host, DeltaState state) {
        Integer minSeqNumber = state.getMinBufferSeqNum();
        Integer lastAckHost = state.getACK(host);
        logger.debug("Trying to propagate deltas from {} to {}", (Object)nimbusID, (Object)host.toString());
        logger.debug("ACK Host: {} {}", (Object)minSeqNumber, (Object)lastAckHost);
        logger.debug("Seq Number: {}", (Object)state.getSeqNumber());
        if (minSeqNumber == -1 && lastAckHost == -1 || minSeqNumber > lastAckHost) {
            logger.debug("Requesting full state");
            this.addToPendingCRDTRequests(nimbusID, host);
            GetStateRequest request = new GetStateRequest(nimbusID.toString());
            this.sendRequest((ProtoRequest)request, NimbusConstants.NIMBUS_PROTO_ID);
            return;
        }
        if (lastAckHost >= state.getSeqNumber()) {
            return;
        }
        if (state.getBuffer().isEmpty()) {
            return;
        }
        boolean changed = false;
        DeltaOORMap map = new DeltaOORMap(true, this.myself);
        Set<Map.Entry<Integer, DeltaBuffer>> buffer = state.getBuffer().entrySet();
        for (Map.Entry<Integer, DeltaBuffer> pairDelta : buffer) {
            Integer seqNumber = pairDelta.getKey();
            DeltaCausalCRDT deltaToAdd = pairDelta.getValue().getDelta();
            NimbusID crdtNimbusID = pairDelta.getValue().getNimbusID();
            Host origin = pairDelta.getValue().getOrigin();
            if (origin.equals((Object)host) || seqNumber < lastAckHost || seqNumber >= state.getSeqNumber() || crdtNimbusID.isCollectionID()) continue;
            DeltaCausalCRDT prevCRDT = map.get(crdtNimbusID.getCRDTID());
            if (prevCRDT == null) {
                map.embeddedPut(crdtNimbusID.getCRDTID(), deltaToAdd, true);
            } else {
                prevCRDT.mergeDelta((DeltaBasedCRDT)deltaToAdd);
            }
            changed = true;
        }
        if (changed) {
            logger.debug("Sending delta message of object {} to {}", (Object)nimbusID, (Object)host);
            DeltaMessage msg = new DeltaMessage(nimbusID, (DeltaCausalCRDT)map, state.getSeqNumber(), (Host)this.myself.getHost());
            this.connectAndSendMessage(msg, host);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h, (Object)event.getCause());
        this.connected.remove(h);
        this.closeConnection(h);
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h, (Object)event.getCause());
        this.connected.remove(h);
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h);
        ConnectedHost cO = new ConnectedHost(h);
        this.connected.add(cO);
    }

    private void connectAndSendMessage(ProtoMessage msg, Host destination) {
        Host temp = new Host(destination.getAddress(), this.port);
        ConnectedHost cO = new ConnectedHost(temp);
        if (!this.connected.contains(cO)) {
            this.openConnection(temp);
        }
        logger.debug("Sending message to {} with {}", (Object)temp, (Object)msg);
        this.sendMessage(msg, temp);
    }

    private void addToPendingCRDTRequests(NimbusID nimbusID, Host peer) {
        List<Host> pendingPeers = this.pendingGetCRDTs.get(nimbusID);
        if (pendingPeers == null) {
            pendingPeers = new LinkedList<Host>();
            this.pendingGetCRDTs.put(nimbusID, pendingPeers);
        }
        pendingPeers.add(peer);
    }

    private void addToQueue(NimbusID nimbusID, DeltaMessage msg) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            queue = new LinkedList<DeltaMessage>();
            this.queueMessages.put(nimbusID, queue);
        }
        queue.add(msg);
    }

    private DeltaMessage pollFromQueue(NimbusID nimbusID) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            return null;
        }
        DeltaMessage msg = queue.poll();
        return msg;
    }

    private DeltaMessage removeFromQueue(NimbusID nimbusID) {
        Queue<DeltaMessage> queue = this.queueMessages.get(nimbusID);
        if (queue == null) {
            return null;
        }
        DeltaMessage msg = queue.remove();
        if (msg == null) {
            this.queueMessages.remove(nimbusID);
        }
        return msg;
    }
}

