package pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.messages.StoreForwardMessage;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.messages.TrajectoryMessage;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.temp.Trajectory;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.temp.TrajectoryNotification;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.nimbus.rc.mobilityentropypush.timers.PropagateDeltasTimer;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.generic.DeltaCausalCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.crdts.utils.ReplicaID;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoMessage;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.babel.protocols.storage.operations.utils.CommonOperationStatus;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusConstants;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusID;
import pt.unl.fct.di.novasys.nimbus.utils.common.NimbusUtils;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.ExecuteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.replies.GetReplicationUnitReply;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.core.requests.RemoteStateRequest;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicasModificationNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitDownNotification;
import pt.unl.fct.di.novasys.nimbus.utils.partialoverlays.notifications.ReplicationUnitUpNotification;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.NimbusReplicationCore;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.AckMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.DeltaMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.RemoteOperationMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.RemoteOperationResultMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.RemoteStateMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.messages.RemoteStateResultMessage;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.notifications.DeliverStateNotification;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.notifications.MergeNotification;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.GetCollectionReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteOperationReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.replies.RemoteStateReply;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.ExecuteOperationRequest;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.GetReplicationUnitRequest;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.requests.PropagateCRDTRequest;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.structures.DeltaState;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.structures.ReplicationCoreCollection;
import pt.unl.fct.di.novasys.nimbus.utils.replicationcore.structures.ReplicationCoreKeySpace;
import pt.unl.fct.di.novasys.nimbus.utils.structures.datatypes.Coordinates;
import pt.unl.fct.di.novasys.nimbus.utils.structures.updates.NimbusDelta;

/* loaded from: input_file:pt/unl/di/novasys/babel/nimbus/rc/mobilityentropypush/MobilityAwareAntiEntropyPush.class */
public class MobilityAwareAntiEntropyPush extends NimbusReplicationCore {
    public static final String PROTOCOL_NAME = "MobilityAwareAntiEntropyPush";
    private static final Logger logger = LogManager.getLogger(MobilityAwareAntiEntropyPush.class);
    private double radius;
    private double timeLimit;
    private List<Trajectory> nextPositions;
    private Map<Host, List<Trajectory>> neighborsNextPositions;
    private Map<Host, Queue<ProtoMessage>> forwardBuffer;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private final Random r;
    private int maxHops;
    private long gcTimeout;
    private long propagateTimeout;
    private int minGCAcks;
    private int port;

    public MobilityAwareAntiEntropyPush(Peer peer, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, peer);
        this.r = new Random(System.currentTimeMillis());
        this.nextPositions = new LinkedList();
        this.neighborsNextPositions = new HashMap();
        this.forwardBuffer = new HashMap();
        this.radius = Double.parseDouble(properties.getProperty("NimbusReplicationCore.Mobility.radiusMeters", "100"));
        this.timeLimit = Double.parseDouble(properties.getProperty("NimbusReplicationCore.Mobility.timeLimit", "10000"));
        this.pending = new HashSet();
        this.connectedNeighbors = new HashSet();
        this.maxHops = Integer.parseInt(properties.getProperty("NimbusMetadata.maxHops", "3"));
        this.minGCAcks = Integer.parseInt(properties.getProperty("NimbusReplicationCore.minGCAcks", "1"));
        this.gcTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.gcTimeout", "8000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("NimbusReplicationCore.propagateTimeout", "4000"));
        String property = properties.getProperty("NimbusReplicationCore.channel.address");
        property = property == null ? peer.getAddress().getHostAddress() : property;
        String property2 = properties.getProperty("NimbusReplicationCore.channel.port");
        this.port = Integer.parseInt(property2);
        Properties properties2 = new Properties();
        properties2.setProperty("address", property);
        properties2.setProperty("port", property2);
        int createChannel = createChannel("TCPChannel", properties2);
        registerRequestHandler((short) 449, this::uponPropagateCRDTRequest);
        registerRequestHandler((short) 620, this::uponRemoteOperationRequest);
        registerRequestHandler((short) 621, this::uponRemoteStateRequest);
        registerReplyHandler((short) 627, this::uponGetCollectionReply);
        registerReplyHandler((short) 625, this::uponRemoteOperationDeliverReply);
        registerReplyHandler((short) 626, this::uponRemoteStateDeliverReply);
        subscribeNotification((short) 723, this::uponMergeNotification);
        subscribeNotification((short) 652, this::uponReplicasModification);
        subscribeNotification((short) 650, this::uponReplicaUnitUp);
        subscribeNotification((short) 656, this::uppnReplicaUnitDown);
        subscribeNotification((short) 999, this::uponTrajectoryNotification);
        registerTimerHandler((short) 650, this::uponPropagateDeltasTimer);
        registerTimerHandler((short) 651, this::uponGarbageCollectorTimer);
        registerMessageSerializer(createChannel, (short) 6400, AckMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6401, DeltaMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6402, RemoteOperationMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6403, RemoteOperationResultMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6404, RemoteStateMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6405, RemoteStateResultMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6409, TrajectoryMessage.serializer);
        registerMessageSerializer(createChannel, (short) 6408, StoreForwardMessage.serializer);
        registerMessageHandler(createChannel, (short) 6400, this::uponAckMessage);
        registerMessageHandler(createChannel, (short) 6401, this::uponDeltaMesssage);
        registerMessageHandler(createChannel, (short) 6402, this::uponRemoteOperationMessage);
        registerMessageHandler(createChannel, (short) 6403, this::uponRemoteOperationResultMessage);
        registerMessageHandler(createChannel, (short) 6404, this::uponRemoteStateMessage);
        registerMessageHandler(createChannel, (short) 6405, this::uponRemoteStateResultMessage);
        registerMessageHandler(createChannel, (short) 6409, this::uponTrajectoryMessage);
        registerMessageHandler(createChannel, (short) 6408, this::uponStoreForwardMessage);
        registerChannelEventHandler(createChannel, (short) 3, this::uponOutConnectionDown);
        registerChannelEventHandler(createChannel, (short) 4, this::uponOutConnectionFailed);
        registerChannelEventHandler(createChannel, (short) 5, this::uponOutConnectionUp);
        subscribeNotification((short) 401, this::uponNeighborUp);
        subscribeNotification((short) 402, this::uponNeighborDown);
    }

    public void init(Properties properties) throws HandlerRegistrationException, IOException {
        setupPeriodicTimer(new GarbageCollectorTimer(), this.gcTimeout, this.gcTimeout);
        setupPeriodicTimer(new PropagateDeltasTimer(), this.propagateTimeout, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage ackMessage, Host host, short s, int i) {
        int seqNumber = ackMessage.getSeqNumber();
        NimbusID nimbusID = ackMessage.getNimbusID();
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        Host origin = ackMessage.getOrigin();
        logger.debug("ACK: {} {} {}", origin.toString(), Integer.valueOf(seqNumber), nimbusID.toString());
        this.storage.getDeltaState(keySpaceID, collectionID).updateAcks(origin, seqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage deltaMessage, Host host, short s, int i) {
        logger.debug("Remote delta message message on {}", deltaMessage.getNimbusID());
        NimbusID nimbusID = deltaMessage.getNimbusID();
        NimbusDelta delta = deltaMessage.getDelta();
        addToQueue(nimbusID, deltaMessage);
        logger.debug("Sending state notification to storage layer with {} {} from {}", nimbusID, Integer.valueOf(deltaMessage.getSeqNumber()), deltaMessage.getOrigin().toString());
        triggerNotification(new DeliverStateNotification(nimbusID.toString(), delta));
    }

    private void uponRemoteOperationMessage(RemoteOperationMessage remoteOperationMessage, Host host, short s, int i) {
        ExecuteOperationRequest executeOperationRequest = new ExecuteOperationRequest(remoteOperationMessage.getNimbusID(), remoteOperationMessage.getOperation());
        logger.debug("Remote operation message on {} and {}", executeOperationRequest.getNimbusID(), remoteOperationMessage.getOperation());
        this.pendingRemoteOps.put(remoteOperationMessage.getNimbusID(), host);
        sendRequest(executeOperationRequest, NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteOperationResultMessage(RemoteOperationResultMessage remoteOperationResultMessage, Host host, short s, int i) {
        sendReply(new RemoteOperationReply(remoteOperationResultMessage.getNimbusID(), remoteOperationResultMessage.getStatus(), remoteOperationResultMessage.getCRDT(), remoteOperationResultMessage.getMessage()), NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteStateMessage(RemoteStateMessage remoteStateMessage, Host host, short s, int i) {
        logger.debug("Remote state message on {}", remoteStateMessage.getNimbusID());
        GetReplicationUnitRequest getReplicationUnitRequest = new GetReplicationUnitRequest(remoteStateMessage.getNimbusID());
        this.pendingRemoteOps.put(remoteStateMessage.getNimbusID(), host);
        sendRequest(getReplicationUnitRequest, NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponRemoteStateResultMessage(RemoteStateResultMessage remoteStateResultMessage, Host host, short s, int i) {
        sendReply(new RemoteStateReply(remoteStateResultMessage.getNimbusID(), remoteStateResultMessage.getCollections()), NimbusConstants.NIMBUS_PROTO_ID.shortValue());
    }

    private void uponTrajectoryMessage(TrajectoryMessage trajectoryMessage, Host host, short s, int i) {
        if (trajectoryMessage.getHopCount() > this.maxHops) {
            return;
        }
        this.neighborsNextPositions.put(trajectoryMessage.getOrigin(), trajectoryMessage.getTrajectory());
        propagateMessage(trajectoryMessage, host, this.myself.getHost());
    }

    private void uponStoreForwardMessage(StoreForwardMessage storeForwardMessage, Host host, short s, int i) {
        addToForwardBuffer(storeForwardMessage.getDestination(), storeForwardMessage.getDelta());
    }

    void uponPropagateCRDTRequest(PropagateCRDTRequest propagateCRDTRequest, short s) {
        NimbusID nimbusID = new NimbusID(propagateCRDTRequest.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (propagateCRDTRequest.getNimbusCRDT().getCRDT().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
        } else {
            this.storage.getDeltaState(keySpaceID, collectionID).putCRDT(nimbusID, propagateCRDTRequest.getNimbusCRDT(), this.myself.getHost());
        }
    }

    void uponRemoteOperationRequest(RemoteOperationRequest remoteOperationRequest, short s) {
        logger.debug("Remote operation request on {}", remoteOperationRequest.getNimbusID());
        RemoteOperationMessage remoteOperationMessage = new RemoteOperationMessage(remoteOperationRequest.getNimbusID(), remoteOperationRequest.getOperation());
        ReplicaID replicaFrom = this.storage.getReplicaFrom(remoteOperationMessage.getNimbusID().getKeySpaceID(), remoteOperationMessage.getNimbusID().getCollectionID());
        if (replicaFrom == null) {
            return;
        }
        logger.debug("Remote operation request on {} to {}", remoteOperationRequest.getNimbusID(), replicaFrom.toString());
        sendIfConnected(remoteOperationMessage, replicaFrom.getHost());
    }

    void uponRemoteStateRequest(RemoteStateRequest remoteStateRequest, short s) {
        logger.debug("Remote status request on {}", remoteStateRequest.getNimbusID());
        NimbusID nimbusID = remoteStateRequest.getNimbusID();
        RemoteStateMessage remoteStateMessage = new RemoteStateMessage(nimbusID);
        ReplicaID replicaFrom = nimbusID.isKeySpaceID() ? this.storage.getReplicaFrom(nimbusID.getKeySpaceID()) : this.storage.getReplicaFrom(nimbusID.getKeySpaceID(), nimbusID.getCollectionID());
        if (replicaFrom == null) {
            return;
        }
        logger.debug("Remote status request on {} to {}", remoteStateRequest.getNimbusID(), replicaFrom);
        sendIfConnected(remoteStateMessage, replicaFrom.getHost());
    }

    void uponGetCollectionReply(GetCollectionReply getCollectionReply, short s) {
        NimbusID nimbusID = new NimbusID(getCollectionReply.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        if (getCollectionReply.getCollection() == null) {
            logger.debug("Collection didn't exist in upper layer!");
            this.pendingGetCRDTs.remove(nimbusID);
            return;
        }
        if (getCollectionReply.getCollection().getDelta().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        NimbusDelta collection = getCollectionReply.getCollection();
        logger.debug("Got FULL STATE reply from {}", nimbusID);
        List<Host> list = (List) this.pendingGetCRDTs.remove(nimbusID);
        if (list == null) {
            return;
        }
        DeltaState deltaState = this.storage.getDeltaState(keySpaceID, collectionID);
        for (Host host : list) {
            Integer ack = deltaState.getACK(host);
            logger.debug("{} {} {}", host.toString(), ack, Integer.valueOf(deltaState.getSeqNumber()));
            if (ack.intValue() < deltaState.getSeqNumber()) {
                logger.warn("Sending full CRDT {} with context {} to {}", nimbusID, collection.getDelta().getCausalContext().toString(), host);
                logger.debug("Sending full state message of object {} with FULL STATE to {}", nimbusID, host);
                sendOrStore(new DeltaMessage(nimbusID, collection, deltaState.getSeqNumber(), this.myself.getHost()), (Peer) host);
            }
        }
    }

    void uponRemoteOperationDeliverReply(ExecuteOperationReply executeOperationReply, short s) {
        NimbusID nimbusID = executeOperationReply.getNimbusID();
        CommonOperationStatus status = executeOperationReply.getStatus();
        DeltaCausalCRDT crdt = executeOperationReply.getCRDT();
        String message = executeOperationReply.getMessage();
        Host host = (Host) this.pendingRemoteOps.remove(nimbusID);
        if (host == null) {
            return;
        }
        logger.debug("Sending remote operation reply {} {} {} {}", executeOperationReply.getNimbusID(), status, executeOperationReply.getCRDT(), message);
        sendIfConnected(new RemoteOperationResultMessage(nimbusID, status, crdt, message), host);
    }

    void uponRemoteStateDeliverReply(GetReplicationUnitReply getReplicationUnitReply, short s) {
        logger.debug("Get replication unit reply on {} with collections {}", getReplicationUnitReply.getNimbusID(), getReplicationUnitReply.getCollections().keySet());
        NimbusID nimbusID = getReplicationUnitReply.getNimbusID();
        Map collections = getReplicationUnitReply.getCollections();
        Host host = (Host) this.pendingRemoteOps.remove(nimbusID);
        if (host == null) {
            return;
        }
        sendIfConnected(new RemoteStateResultMessage(nimbusID, collections), host);
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer propagateDeltasTimer, long j) {
        DeltaMessage handleDeltaPropagation;
        for (Map.Entry entry : this.storage.getKeySpaces().entrySet()) {
            String str = (String) entry.getKey();
            ReplicationCoreKeySpace replicationCoreKeySpace = (ReplicationCoreKeySpace) entry.getValue();
            ReplicaID replicaID = replicationCoreKeySpace.isReplica(this.myself) ? (ReplicaID) NimbusUtils.randomExcept(replicationCoreKeySpace.getReplicas(), this.myself) : null;
            logger.debug("Propagating keySpace {} deltas to {}", str, replicaID);
            for (Map.Entry entry2 : replicationCoreKeySpace.getCollections().entrySet()) {
                NimbusID nimbusID = new NimbusID(str, (String) entry2.getKey());
                ReplicationCoreCollection replicationCoreCollection = (ReplicationCoreCollection) entry2.getValue();
                if (replicaID != null && (handleDeltaPropagation = handleDeltaPropagation(nimbusID, replicaID.getHost(), replicationCoreCollection.getDeltaState())) != null) {
                    sendOrStore(handleDeltaPropagation, replicaID.getHost());
                }
                if (replicationCoreCollection.isReplica(this.myself)) {
                    ArrayList arrayList = new ArrayList(replicationCoreCollection.getReplicas().size());
                    arrayList.addAll(replicationCoreCollection.getReplicas());
                    arrayList.remove(replicaID);
                    arrayList.remove(this.myself);
                    logger.debug("Propagating collection {} deltas to {}", nimbusID, arrayList);
                    while (!arrayList.isEmpty()) {
                        Peer host = ((ReplicaID) NimbusUtils.randomAndRemove(arrayList)).getHost();
                        DeltaMessage handleDeltaPropagation2 = handleDeltaPropagation(nimbusID, host, replicationCoreCollection.getDeltaState());
                        if (handleDeltaPropagation2 != null) {
                            sendOrStore(handleDeltaPropagation2, host);
                        }
                    }
                }
            }
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer garbageCollectorTimer, long j) {
        this.storage.garbageCollectDeltas(this.minGCAcks);
    }

    private void uponReplicasModification(ReplicasModificationNotification replicasModificationNotification, short s) {
        String keySpaceID = replicasModificationNotification.getKeySpaceID();
        Set replicas = replicasModificationNotification.getReplicas();
        logger.debug("Replica unit modification {} {} {}", keySpaceID, replicasModificationNotification.getCollectionID(), replicas);
        if (replicasModificationNotification.isReplicatingKeySpace()) {
            this.storage.replaceReplicas(keySpaceID, replicas);
        } else {
            this.storage.replaceReplicas(keySpaceID, replicasModificationNotification.getCollectionID(), replicas);
        }
    }

    private void uponReplicaUnitUp(ReplicationUnitUpNotification replicationUnitUpNotification, short s) {
        String keySpaceID = replicationUnitUpNotification.getKeySpaceID();
        Set replicas = replicationUnitUpNotification.getReplicas();
        logger.debug("Replica unit up {} {} {}", keySpaceID, replicationUnitUpNotification.getCollectionID(), replicas);
        if (replicationUnitUpNotification.isReplicatingKeyspace()) {
            this.storage.addReplicas(keySpaceID, replicas);
        } else {
            this.storage.addReplicas(keySpaceID, replicationUnitUpNotification.getCollectionID(), replicas);
        }
    }

    private void uppnReplicaUnitDown(ReplicationUnitDownNotification replicationUnitDownNotification, short s) {
        String keySpaceID = replicationUnitDownNotification.getKeySpaceID();
        logger.debug("Replica unit down {} {} {}", keySpaceID, replicationUnitDownNotification.getCollectionID());
        if (replicationUnitDownNotification.isReplicatingKeyspace()) {
            this.storage.removeKeySpace(keySpaceID);
        } else {
            this.storage.removeCollection(keySpaceID, replicationUnitDownNotification.getCollectionID());
        }
    }

    private void uponMergeNotification(MergeNotification mergeNotification, short s) {
        NimbusID nimbusID = new NimbusID(mergeNotification.getObjectID());
        String keySpaceID = nimbusID.getKeySpaceID();
        String collectionID = nimbusID.getCollectionID();
        boolean isMerged = mergeNotification.isMerged();
        NimbusDelta resultingMerge = mergeNotification.getResultingMerge();
        DeltaMessage removeFromQueue = removeFromQueue(nimbusID);
        logger.debug("RECEIVED StateMergeNotification on {} with delta {}. Changes: {}", nimbusID, removeFromQueue, Boolean.valueOf(isMerged));
        if (removeFromQueue == null) {
            return;
        }
        int seqNumber = removeFromQueue.getSeqNumber();
        if (isMerged) {
            this.storage.getDeltaState(keySpaceID, collectionID).putCRDT(nimbusID, resultingMerge, removeFromQueue.getOrigin());
        }
        logger.debug("Sending ACKMessage to {} from {}", removeFromQueue.getOrigin(), nimbusID.toString());
        sendOrStore(new AckMessage(nimbusID, seqNumber, this.myself.getHost()), (Peer) removeFromQueue.getOrigin());
        DeltaMessage pollFromQueue = pollFromQueue(nimbusID);
        if (pollFromQueue != null) {
            triggerNotification(new DeliverStateNotification(nimbusID.toString(), pollFromQueue.getDelta()));
        }
    }

    private void uponTrajectoryNotification(TrajectoryNotification trajectoryNotification, short s) {
        this.nextPositions = trajectoryNotification.getTrajectory();
        propagateMessage(new TrajectoryMessage(this.nextPositions, this.myself.getHost()), new Host[0]);
    }

    private void uponNeighborUp(NeighborUp neighborUp, short s) {
        Host host = new Host(neighborUp.getPeer().getAddress(), this.port);
        logger.info("Got neighbor: " + host.toString());
        if (this.connectedNeighbors.contains(host) || !this.pending.add(host)) {
            return;
        }
        openConnection(host);
    }

    private void uponNeighborDown(NeighborDown neighborDown, short s) {
        Host host = new Host(neighborDown.getPeer().getAddress(), this.port);
        logger.info("Neighbor down: {}", host.toString());
        if (this.connectedNeighbors.remove(host)) {
            closeConnection(host);
        } else if (this.pending.remove(host)) {
            closeConnection(host);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown outConnectionDown, int i) {
        Host node = outConnectionDown.getNode();
        logger.trace("Host {} is down, cause: {}", node, outConnectionDown.getCause());
        this.connectedNeighbors.remove(node);
        closeConnection(node);
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> outConnectionFailed, int i) {
        Host node = outConnectionFailed.getNode();
        logger.trace("Connection to host {} failed, cause: {}", node, outConnectionFailed.getCause());
        this.connectedNeighbors.remove(node);
    }

    private void uponOutConnectionUp(OutConnectionUp outConnectionUp, int i) {
        Host node = outConnectionUp.getNode();
        logger.trace("Host (out) {} is up", node);
        this.connectedNeighbors.add(node);
        checkStoreForwardBuffer(node);
    }

    private void sendIfConnected(ProtoMessage protoMessage, Host host) {
        Host host2 = new Host(host.getAddress(), this.port);
        if (isConnected((Peer) host2)) {
            return;
        }
        logger.debug("Sending message to {} with {}", host2, protoMessage);
        sendMessage(protoMessage, host2);
    }

    private void sendOrStore(ProtoMessage protoMessage, Peer peer) {
        if (isConnected(peer)) {
            sendMessage(protoMessage, peer);
            return;
        }
        List<Trajectory> list = this.neighborsNextPositions.get(peer);
        for (Host host : this.connectedNeighbors) {
            if (willEncounterWithin(list, this.neighborsNextPositions.get(host)) != -1) {
                sendMessage(new StoreForwardMessage(peer, protoMessage), host);
                return;
            }
        }
    }

    private void addToForwardBuffer(Host host, ProtoMessage protoMessage) {
        Queue<ProtoMessage> queue = this.forwardBuffer.get(host);
        if (queue == null) {
            queue = new LinkedList();
            this.forwardBuffer.put(host, queue);
        }
        queue.add(protoMessage);
    }

    private boolean isConnected(Peer peer) {
        return this.connectedNeighbors.contains(peer);
    }

    private void propagateMessage(ProtoMessage protoMessage, Host... hostArr) {
        ArrayList arrayList = new ArrayList(this.connectedNeighbors);
        for (Host host : hostArr) {
            arrayList.remove(host);
        }
        while (!arrayList.isEmpty()) {
            sendMessage(protoMessage, (Host) arrayList.remove(this.r.nextInt(arrayList.size())));
        }
    }

    private void checkStoreForwardBuffer(Host host) {
        Iterator<ProtoMessage> it = this.forwardBuffer.get(host).iterator();
        while (it.hasNext()) {
            sendMessage(it.next(), host);
            it.remove();
        }
    }

    public long willEncounterWithin(List<Trajectory> list, List<Trajectory> list2) {
        int min = Math.min(list.size(), list2.size());
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < min; i++) {
            Trajectory trajectory = list.get(i);
            Trajectory trajectory2 = list2.get(i);
            if (trajectory.getTimestamp() >= currentTimeMillis) {
                if (trajectory.getTimestamp() > currentTimeMillis + this.timeLimit) {
                    return -1L;
                }
                if (Coordinates.haversineDistance(trajectory.getPosition(), trajectory2.getPosition()) <= this.radius) {
                    return trajectory.getTimestamp() - currentTimeMillis;
                }
            }
        }
        return -1L;
    }
}
