/*
 * Decompiled with CFR 0.152.
 */
package pt.unl.di.novasys.babel.rc.eventual.antientropy;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import pt.unl.di.novasys.babel.rc.commons.replies.crdt.replies.DeliverCRDTReply;
import pt.unl.di.novasys.babel.rc.commons.replies.crdt.replies.ReturnCRDTReply;
import pt.unl.di.novasys.babel.rc.commons.requests.crdt.requests.GetCRDTRequest;
import pt.unl.di.novasys.babel.rc.commons.requests.crdt.requests.PropagateCRDTRequest;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.AckMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.messages.DeltaMessage;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.GarbageCollectorTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.timers.PropagateDeltasTimer;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.DeltaState;
import pt.unl.di.novasys.babel.rc.eventual.antientropy.utils.Utils;
import pt.unl.fct.di.novasys.babel.core.GenericProtocol;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaBasedCRDT;
import pt.unl.fct.di.novasys.babel.crdts.delta.generic.DeltaCRDT;
import pt.unl.fct.di.novasys.babel.crdts.generic.GenericCRDT;
import pt.unl.fct.di.novasys.babel.crdts.utils.CRDTsTypes;
import pt.unl.fct.di.novasys.babel.exceptions.HandlerRegistrationException;
import pt.unl.fct.di.novasys.babel.generic.ProtoReply;
import pt.unl.fct.di.novasys.babel.generic.ProtoRequest;
import pt.unl.fct.di.novasys.babel.protocols.membership.Peer;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborDown;
import pt.unl.fct.di.novasys.babel.protocols.membership.notifications.NeighborUp;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.InConnectionUp;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionDown;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionFailed;
import pt.unl.fct.di.novasys.channel.tcp.events.OutConnectionUp;
import pt.unl.fct.di.novasys.network.data.Host;

public class DeltaCausalAntiEntropy
extends GenericProtocol {
    public static final String PROTOCOL_NAME = "DeltaCausalAntiEntropy";
    public static final short PROTOCOL_ID = 760;
    private static final Logger logger = LogManager.getLogger(DeltaCausalAntiEntropy.class);
    private Map<String, DeltaState> buffer = new HashMap<String, DeltaState>();
    private Map<String, List<Host>> pendingCRDTs = new HashMap<String, List<Host>>();
    private final short networkPort;
    private int channelId;
    private final Peer myself;
    private Set<Host> pending;
    private Set<Host> connectedNeighbors;
    private long gcTimeout;
    private long propagateTimeout;
    private short storageProtoID;

    public DeltaCausalAntiEntropy(Peer myself, short storageProtoID, Properties properties) throws IOException, HandlerRegistrationException {
        super(PROTOCOL_NAME, (short)760);
        this.storageProtoID = storageProtoID;
        this.myself = myself;
        String address = properties.getProperty("ReplicationCore.channel.address");
        String port = properties.getProperty("ReplicationCore.channel.port");
        this.gcTimeout = Long.parseLong(properties.getProperty("ReplicationCore.gcTimeout", "1000"));
        this.propagateTimeout = Long.parseLong(properties.getProperty("ReplicationCore.propagateTimeout", "1000"));
        this.networkPort = Short.parseShort(port);
        Properties channelProps = new Properties();
        channelProps.setProperty("address", address);
        channelProps.setProperty("port", port);
        this.channelId = this.createChannel("TCPChannel", channelProps);
        this.registerRequestHandler((short)521, this::uponPropagateCRDTRequest);
        this.registerReplyHandler((short)621, this::uponReturnCRDTStateReply);
        this.subscribeNotification((short)401, this::uponNeighborUp);
        this.subscribeNotification((short)402, this::uponNeighborDown);
        this.registerTimerHandler((short)403, this::uponPropagateDeltasTimer);
        this.registerTimerHandler((short)404, this::uponGarbageCollectorTimer);
        this.registerMessageSerializer(this.channelId, (short)602, AckMessage.serializer);
        this.registerMessageSerializer(this.channelId, (short)603, DeltaMessage.serializer);
        this.registerMessageHandler(this.channelId, (short)602, this::uponAckMessage);
        this.registerMessageHandler(this.channelId, (short)603, this::uponDeltaMesssage);
        this.registerChannelEventHandler(this.channelId, (short)3, this::uponOutConnectionDown);
        this.registerChannelEventHandler(this.channelId, (short)4, this::uponOutConnectionFailed);
        this.registerChannelEventHandler(this.channelId, (short)5, this::uponOutConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)2, this::uponInConnectionUp);
        this.registerChannelEventHandler(this.channelId, (short)1, this::uponInConnectionDown);
    }

    public void init(Properties props) throws HandlerRegistrationException, IOException {
        this.setupPeriodicTimer(new GarbageCollectorTimer(), 10000L, this.gcTimeout);
        this.setupPeriodicTimer(new PropagateDeltasTimer(), 10000L, this.propagateTimeout);
    }

    private void uponAckMessage(AckMessage msg, Host sender, short protoID, int cID) {
        int ackSeqNumber = msg.getSeqNumber();
        String objectID = msg.getObjectID();
        Host origin = msg.getOrigin();
        this.buffer.get(objectID).updateAcks(origin, ackSeqNumber);
    }

    private void uponDeltaMesssage(DeltaMessage msg, Host sender, short protoID, int cID) {
        String objectID = msg.getObjectID();
        DeltaCRDT delta = msg.getDelta();
        int seqNumber = msg.getSeqNumber();
        DeltaState crdtState = this.buffer.get(objectID);
        if (crdtState == null) {
            crdtState = new DeltaState(objectID);
        }
        crdtState.putCRDT(delta);
        DeliverCRDTReply reply = new DeliverCRDTReply(objectID, (GenericCRDT)delta, 760);
        this.sendReply((ProtoReply)reply, this.storageProtoID);
        AckMessage ack = new AckMessage(objectID, seqNumber, (Host)this.myself);
        this.sendMessage(ack, msg.getOrigin());
    }

    void uponPropagateCRDTRequest(PropagateCRDTRequest request, short protoID) {
        String objectID = request.getObjectID();
        if (request.getCrdt().getType().getFlavor() != CRDTsTypes.FLAVOR.DELTA) {
            logger.error("CRDT passed to replication core is not delta-based!");
            return;
        }
        DeltaCRDT crdt = (DeltaCRDT)request.getCrdt();
        DeltaState crdtState = this.buffer.get(objectID);
        if (crdtState == null) {
            crdtState = new DeltaState(objectID);
        }
        crdtState.putCRDT(crdt);
    }

    void uponReturnCRDTStateReply(ReturnCRDTReply reply, short protoID) {
        String objectID = reply.getObjectID();
        if (reply.getCrdt() == null) {
            logger.error("Crdt didn't exist in upper layer!");
            this.pendingCRDTs.remove(objectID);
            return;
        }
        if (!reply.getCrdt().getType().isOfFlavor(CRDTsTypes.FLAVOR.DELTA)) {
            logger.error("Didn't receive delta-based crdt!");
            return;
        }
        DeltaCRDT crdt = (DeltaCRDT)reply.getCrdt();
        List<Host> pendingBuffer = this.pendingCRDTs.get(reply.getObjectID());
        for (Host host : pendingBuffer) {
            DeltaState state = this.buffer.get(objectID);
            Integer minSeqNumber = state.getMinBufferSeqNum();
            if (minSeqNumber >= state.getSeqNumber()) continue;
            DeltaMessage msg = new DeltaMessage(objectID, crdt, state.getSeqNumber(), (Host)this.myself);
            this.sendMessage(msg, host);
        }
        this.pendingCRDTs.remove(objectID);
    }

    private void uponPropagateDeltasTimer(PropagateDeltasTimer timer, long timerID) {
        Host host = Utils.getRandomSetElement(this.connectedNeighbors);
        for (Map.Entry<String, DeltaState> pairState : this.buffer.entrySet()) {
            String objectID = pairState.getKey();
            DeltaState state = pairState.getValue();
            Integer minSeqNumber = state.getMinBufferSeqNum();
            Integer lastAckHost = state.getACK(host);
            if (minSeqNumber == -1 || lastAckHost == -1 || minSeqNumber > lastAckHost) {
                this.addToPendingCRDTRequests(objectID, host);
                GetCRDTRequest request = new GetCRDTRequest(objectID, 760);
                this.sendRequest((ProtoRequest)request, this.storageProtoID);
                continue;
            }
            if (state.getBuffer().isEmpty()) continue;
            DeltaCRDT delta = state.getCRDT(state.getMinBufferSeqNum());
            Iterator<Map.Entry<Integer, DeltaCRDT>> it = state.getBuffer().entrySet().iterator();
            it.next();
            while (it.hasNext()) {
                Map.Entry<Integer, DeltaCRDT> pairDelta = it.next();
                Integer seqNumber = pairDelta.getKey();
                DeltaCRDT deltaToAdd = pairDelta.getValue();
                if (seqNumber < minSeqNumber || minSeqNumber >= state.getSeqNumber()) continue;
                delta.mergeDelta((DeltaBasedCRDT)deltaToAdd);
            }
            if (minSeqNumber >= state.getSeqNumber()) continue;
            DeltaMessage msg = new DeltaMessage(objectID, delta, state.getSeqNumber(), (Host)this.myself);
            this.sendMessage(msg, host);
        }
    }

    private void uponGarbageCollectorTimer(GarbageCollectorTimer timer, long timerID) {
        for (DeltaState state : this.buffer.values()) {
            int lowestAck = Collections.min(state.getAcks().values());
            state.getBuffer().keySet().removeIf(seqNumber -> seqNumber < lowestAck);
        }
    }

    private void uponNeighborUp(NeighborUp up, short protoID) {
        Host h = new Host(up.getPeer().getAddress(), (int)this.networkPort);
        if (!this.connectedNeighbors.contains(h) && this.pending.add(h)) {
            this.openConnection(h);
        }
    }

    private void uponNeighborDown(NeighborDown down, short protoID) {
        Host h = new Host(down.getPeer().getAddress(), (int)this.networkPort);
        if (this.connectedNeighbors.remove(h)) {
            this.closeConnection(h);
        } else if (this.pending.remove(h)) {
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionDown(OutConnectionDown event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host {} is down, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.remove(h);
            this.closeConnection(h);
        }
    }

    private void uponOutConnectionFailed(OutConnectionFailed<?> event, int channelId) {
        Host h = event.getNode();
        logger.trace("Connection to host {} failed, cause: {}", (Object)h, (Object)event.getCause());
        if (this.connectedNeighbors.contains(h) || this.pending.contains(h)) {
            this.connectedNeighbors.remove(h);
            this.pending.add(h);
            this.openConnection(h);
        }
    }

    private void uponOutConnectionUp(OutConnectionUp event, int channelId) {
        Host h = event.getNode();
        logger.trace("Host (out) {} is up", (Object)h);
        if (!this.connectedNeighbors.contains(h) && this.pending.remove(h)) {
            this.connectedNeighbors.add(h);
        }
    }

    private void uponInConnectionUp(InConnectionUp event, int channelId) {
        logger.trace("Host (in) {} is up", (Object)event.getNode());
    }

    private void uponInConnectionDown(InConnectionDown event, int channelId) {
        logger.trace("Connection from host {} is down, cause: {}", (Object)event.getNode(), (Object)event.getCause());
    }

    private void addToPendingCRDTRequests(String objectID, Host peer) {
        List<Host> pendingPeers = this.pendingCRDTs.get(objectID);
        if (pendingPeers == null) {
            pendingPeers = new LinkedList<Host>();
        }
        pendingPeers.add(peer);
        this.pendingCRDTs.replace(objectID, pendingPeers);
    }
}

