/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.di.novasys.babel.rc.eventual.antientropy;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.DeliverStateNotification;
import pt.unl.di.novasys.babel.rc.commons.notifications.state.StateMergeNotification;
import pt.unl.di.novasys.babel.rc.commons.replies.state.replies.ReturnStateReply;
import pt.unl.di.novasys.babel.rc.commons.requests.state.requests.GetStateRequest;
import pt.unl.di.novasys.babel.rc.commons.requests.state.requests.PropagateStateRequest;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.AckMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.DeltaMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.PropagateDeltasTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.DeltaBuffer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.DeltaState;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.Utils;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.generic.DeltaCausalCRDT;
import pt.unl.fct.di.novasys.babel.crdts.delta.causal.implementations.DeltaOORMap;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaBasedCRDT;
import pt.unl.fct.di.novasys.babel.crdts.generic.GenericCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.crdts.utils.ReplicaID;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoNotification;
import pt.unl.fct.di.novasys.babel.generic.ProtoRequest;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;
import pt.unl.fct.di.novasys.nimbus.utils.NimbusID;

public class NimbusAntiEntropy
extends GenericProtocol {
    public static final String PROTOCOL_NAME = "NimbusAntiEntropy";
    public static final short PROTOCOL_ID = 760;
    private static final Logger logger = LogManager.getLogger(NimbusAntiEntropy.class);
    private Map<String, DeltaState> buffer;
    private Map<String, List<Host>> pendingGetCRDTs;
    private Map<String, DeltaMessage> merginMessages;
    private Map<String, Queue<DeltaMessage>> queueMessages;
    private final short networkPort;
    private int channelId;
    private final Peer myself;
    private Set<Host> pending = new TreeSet<Host>();
    private Set<Host> connectedNeighbors = new TreeSet<Host>();
    private long gcTimeout;
    private long propagateTimeout;
    private short storageProtoID;

    public NimbusAntiEntropy(Peer myself, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)760);
        this.buffer = new HashMap<String, DeltaState>();
        this.pendingGetCRDTs = new HashMap<String, List<Host>>();
        this.merginMessages = new HashMap<String, DeltaMessage>();
        this.queueMessages = new HashMap<String, Queue<DeltaMessage>>();
        this.myself = myself;
        this.gcTimeout = Long.parseLong(properties.getProperty("ReplicationCore.gcTimeout", "8000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("ReplicationCore.propagateTimeout", "4000"));
        this.storageProtoID = Short.parseShort(properties.getProperty("ReplicationCore.storageSystemID"));
        String address = properties.getProperty("ReplicationCore.channel.address");
        if (address == null) {
            address = myself.getAddress().getHostAddress();
        }
        String port = properties.getProperty("ReplicationCore.channel.port");
        this.networkPort = Short.parseShort(port);
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        this.channelId = this.createChannel("TCPChannel", channelProps);
        this.registerRequestHandler((short)521, this::uponPropagateCRDTRequest);
        this.registerReplyHandler((short)621, this::uponReturnCRDTStateReply);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        this.subscribeNotification((short)723, this::uponStateMerge);
        this.registerTimerHandler((short)4003, this::uponPropagateDeltasTimerTest);
        this.registerTimerHandler((short)404, this::uponGarbageCollectorTimer);
        this.registerMessageSerializer(this.channelId, (short)602, AckMessage.serializer);
        this.registerMessageSerializer(this.channelId, (short)603, DeltaMessage.serializer);
        this.registerMessageHandler(this.channelId, (short)602, this::uponAckMessage);
        this.registerMessageHandler(this.channelId, (short)603, this::uponDeltaMesssage);
        this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
    }

    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.setupPeriodicTimer(new GarbageCollectorTimer(), this.gcTimeout, this.gcTimeout);
        this.setupPeriodicTimer(new PropagateDeltasTimer(), this.propagateTimeout, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage msg, Host sender, short protoID, int cID) {
        int ackSeqNumber = msg.getSeqNumber();
        String nimbusCollection = msg.getCollectionID();
        Host origin = msg.getOrigin();
        logger.debug("ACK: {} {} {}", (Object)origin.toString(), (Object)ackSeqNumber, (Object)nimbusCollection);
        this.buffer.get(nimbusCollection).updateAcks(origin, ackSeqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage msg, Host sender, short protoID, int cID) {
        String nimbusCollection = msg.getCollectionID();
        DeltaCausalCRDT delta = msg.getDelta();
        if (this.merginMessages.containsKey(nimbusCollection)) {
            logger.debug("Merge is already happening, putting in queue with {} {} from {}", (Object)nimbusCollection, (Object)msg.getSeqNumber(), (Object)msg.getOrigin().toString());
            Queue<DeltaMessage> queue = this.queueMessages.get(nimbusCollection);
            if (queue == null) {
                queue = new LinkedList<DeltaMessage>();
                this.queueMessages.put(nimbusCollection, queue);
            }
            queue.add(msg);
        } else {
            this.merginMessages.put(nimbusCollection, msg);
            logger.debug("Sending state notification to storage layer with {} {} from {}", (Object)nimbusCollection, (Object)msg.getSeqNumber(), (Object)msg.getOrigin().toString());
            this.triggerNotification((ProtoNotification)new DeliverStateNotification(nimbusCollection, (GenericCRDT)delta));
        }
    }

    void uponPropagateCRDTRequest(PropagateStateRequest request, short protoID) {
        NimbusID nimbusID = new NimbusID(request.getObjectID());
        String nimbusCollectionn = nimbusID.getNimbusCollection();
        if (request.getCrdt().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
            return;
        }
        DeltaCausalCRDT crdt = (DeltaCausalCRDT)request.getCrdt();
        DeltaState crdtState = this.buffer.get(nimbusCollectionn);
        if (crdtState == null) {
            crdtState = new DeltaState(nimbusCollectionn);
            this.buffer.put(nimbusCollectionn, crdtState);
        }
        crdtState.putCRDT(nimbusID, crdt, (Host)this.myself);
    }

    void uponReturnCRDTStateReply(ReturnStateReply reply, short protoID) {
        NimbusID nimbusID = new NimbusID(reply.getObjectID());
        String nimbusCollection = nimbusID.getNimbusCollection();
        if (reply.getCRDT() == null) {
            logger.debug("Collection didn't exist in upper layer!");
            this.pendingGetCRDTs.remove(nimbusCollection);
            return;
        }
        if (reply.getCRDT().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        DeltaCausalCRDT crdt = (DeltaCausalCRDT)reply.getCRDT();
        logger.debug("Got FULL STATE reply from {}", (Object)nimbusCollection);
        List<Host> pendingBuffer = this.pendingGetCRDTs.remove(nimbusCollection);
        if (pendingBuffer == null) {
            return;
        }
        DeltaState state = this.buffer.get(nimbusCollection);
        for (Host host : pendingBuffer) {
            Integer lastAckHost = state.getACK(host);
            if (lastAckHost >= state.getSeqNumber()) continue;
            logger.debug("Sending full CRDT {} with context {}", (Object)nimbusID, (Object)crdt.getCausalContext().toString());
            logger.debug("Sending full state message of object {} with FULL STATE to {}", (Object)nimbusID, (Object)host);
            DeltaMessage msg = new DeltaMessage(nimbusCollection, crdt, state.getSeqNumber(), (Host)this.myself);
            this.sendMessage(msg, host);
        }
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer timer, long timerID) {
        Host host = Utils.getRandomSetElement(this.connectedNeighbors);
        if (host == null) {
            return;
        }
        this.handleDeltaPropagation(host);
    }

    private void uponPropagateDeltasTimerTest(PropagateDeltasTimer timer, long timerID) {
        for (Host host : this.connectedNeighbors) {
            this.handleDeltaPropagation(host);
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer timer, long timerID) {
        for (DeltaState state : this.buffer.values()) {
            if (state.getAcks().values().isEmpty()) continue;
            int highestAck = Collections.max(state.getAcks().values());
            int lowestAck = Collections.min(state.getAcks().values());
            logger.debug("GC: {} {} {}", (Object)state.getCollectionID(), (Object)lowestAck, (Object)highestAck);
            state.getBuffer().keySet().removeIf(seqNumber -> seqNumber < lowestAck);
        }
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Host h = new Host(up.getPeer().getAddress(), (int)this.networkPort);
        logger.info("Got new neighbor: {}", (Object)h.toString());
        if (!this.connectedNeighbors.contains(h) && this.pending.add(h)) {
            this.openConnection(h);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h = new Host(down.getPeer().getAddress(), (int)this.networkPort);
        logger.info("Neighbor down: {}", (Object)h.toString());
        if (this.connectedNeighbors.remove(h)) {
            this.closeConnection(h);
        } else if (this.pending.remove(h)) {
            this.closeConnection(h);
        }
    }

    private void uponStateMerge(StateMergeNotification merge, short protoID) {
        Queue<DeltaMessage> queue;
        NimbusID nimbusID = new NimbusID(merge.getObjectID());
        String nimbusCollection = nimbusID.getNimbusCollection();
        boolean changed = merge.isMerged();
        DeltaMessage msg = this.merginMessages.remove(nimbusCollection);
        logger.debug("RECEIVED StateMergeNotification on {} with delta {}. Changes: {}", (Object)nimbusID, (Object)msg, (Object)changed);
        if (msg == null) {
            return;
        }
        DeltaCausalCRDT delta = msg.getDelta();
        int seqNumber = msg.getSeqNumber();
        if (changed && nimbusID.isFullID()) {
            DeltaState crdtState = this.buffer.get(nimbusCollection);
            if (crdtState == null) {
                crdtState = new DeltaState(nimbusCollection);
                this.buffer.put(nimbusCollection, crdtState);
            }
            crdtState.putCRDT(nimbusID, delta, msg.getOrigin());
        }
        if ((queue = this.queueMessages.get(nimbusCollection)) != null) {
            if (!queue.isEmpty()) {
                logger.debug("Queue was not empty, sending a state notification: {}", (Object)changed);
                this.merginMessages.put(nimbusCollection, queue.remove());
                this.triggerNotification((ProtoNotification)new DeliverStateNotification(nimbusCollection, (GenericCRDT)delta));
            } else {
                logger.debug("Queue is empty, pruning", (Object)changed);
                this.queueMessages.remove(nimbusCollection);
            }
        }
        logger.debug("Sending ACKMessage to {} from {}", (Object)msg.getOrigin(), (Object)nimbusID.toString());
        AckMessage ack = new AckMessage(nimbusCollection, seqNumber, (Host)this.myself);
        this.sendMessage(ack, msg.getOrigin());
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.remove(h);
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h);
        if (!this.connectedNeighbors.contains(h) && this.pending.remove(h)) {
            this.connectedNeighbors.add(h);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.trace("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.trace("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }

    private void addToPendingCRDTRequests(String collectionID, Host peer) {
        List<Host> pendingPeers = this.pendingGetCRDTs.get(collectionID);
        if (pendingPeers == null) {
            pendingPeers = new LinkedList<Host>();
            this.pendingGetCRDTs.put(collectionID, pendingPeers);
        }
        pendingPeers.add(peer);
    }

    private void handleDeltaPropagation(Host host) {
        for (Map.Entry<String, DeltaState> pairState : this.buffer.entrySet()) {
            String nimbusCollection = pairState.getKey();
            DeltaState state = pairState.getValue();
            Integer minSeqNumber = state.getMinBufferSeqNum();
            Integer lastAckHost = state.getACK(host);
            logger.debug("Trying to propagate deltas from {} to {}", (Object)nimbusCollection, (Object)host.toString());
            logger.debug("ACK Host: {} {}", (Object)minSeqNumber, (Object)lastAckHost);
            if (minSeqNumber == -1 || minSeqNumber > lastAckHost) {
                logger.debug("Requesting full state");
                this.addToPendingCRDTRequests(nimbusCollection, host);
                GetStateRequest request = new GetStateRequest(nimbusCollection, 760);
                this.sendRequest((ProtoRequest)request, this.storageProtoID);
                continue;
            }
            if (lastAckHost >= state.getSeqNumber() || state.getBuffer().isEmpty()) continue;
            boolean changed = false;
            DeltaOORMap map = new DeltaOORMap(new ReplicaID(this.myself));
            for (Map.Entry<Integer, DeltaBuffer> pairDelta : state.getBuffer().entrySet()) {
                Integer seqNumber = pairDelta.getKey();
                DeltaCausalCRDT deltaToAdd = pairDelta.getValue().getDelta();
                NimbusID nimbusID = pairDelta.getValue().getNimbusID();
                Host origin = pairDelta.getValue().getOrigin();
                if (origin.equals((Object)host) || seqNumber < lastAckHost || seqNumber >= state.getSeqNumber() || nimbusID.isCollectionID()) continue;
                DeltaCausalCRDT prevCRDT = map.get(nimbusID.getCRDTID());
                if (prevCRDT == null) {
                    map.put(nimbusID.getCRDTID(), deltaToAdd);
                } else {
                    prevCRDT.mergeDelta((DeltaBasedCRDT)deltaToAdd);
                }
                changed = true;
            }
            if (!changed) continue;
            logger.debug("Sending delta message of object {} to {}", (Object)nimbusCollection, (Object)host);
            DeltaMessage msg = new DeltaMessage(nimbusCollection, (DeltaCausalCRDT)map, state.getSeqNumber(), (Host)this.myself);
            this.sendMessage(msg, host);
        }
    }
}

