/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.session;

import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Prepare;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.protocol.internal.util.Bytes;
import io.netty.util.concurrent.EventExecutor;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
class ReprepareOnUp {
    private static final Logger LOG = LoggerFactory.getLogger(ReprepareOnUp.class);
    private static final Query QUERY_SERVER_IDS = new Query("SELECT prepared_id FROM system.prepared_statements");
    private final String logPrefix;
    private final ChannelPool pool;
    private final EventExecutor adminExecutor;
    private final Map<ByteBuffer, RepreparePayload> repreparePayloads;
    private final Runnable whenPrepared;
    private final boolean checkSystemTable;
    private final int maxStatements;
    private final int maxParallelism;
    private final Duration timeout;
    private final RequestThrottler throttler;
    private final SessionMetricUpdater metricUpdater;
    private Set<ByteBuffer> serverKnownIds;
    private Queue<RepreparePayload> toReprepare;
    private int runningWorkers;

    ReprepareOnUp(String logPrefix, ChannelPool pool, EventExecutor adminExecutor, Map<ByteBuffer, RepreparePayload> repreparePayloads, InternalDriverContext context, Runnable whenPrepared) {
        this.logPrefix = logPrefix;
        this.pool = pool;
        this.adminExecutor = adminExecutor;
        this.repreparePayloads = repreparePayloads;
        this.whenPrepared = whenPrepared;
        this.throttler = context.getRequestThrottler();
        DriverConfig config = context.getConfig();
        this.checkSystemTable = config.getDefaultProfile().getBoolean(DefaultDriverOption.REPREPARE_CHECK_SYSTEM_TABLE);
        this.timeout = config.getDefaultProfile().getDuration(DefaultDriverOption.REPREPARE_TIMEOUT);
        this.maxStatements = config.getDefaultProfile().getInt(DefaultDriverOption.REPREPARE_MAX_STATEMENTS);
        this.maxParallelism = config.getDefaultProfile().getInt(DefaultDriverOption.REPREPARE_MAX_PARALLELISM);
        this.metricUpdater = context.getMetricsFactory().getSessionUpdater();
    }

    void start() {
        if (this.repreparePayloads.isEmpty()) {
            LOG.debug("[{}] No statements to reprepare, done", (Object)this.logPrefix);
            this.whenPrepared.run();
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("[{}] {} statements to reprepare on newly added/up node", (Object)this.logPrefix, (Object)this.repreparePayloads.size());
            }
            if (this.checkSystemTable) {
                LOG.debug("[{}] Checking which statements the server knows about", (Object)this.logPrefix);
                this.queryAsync(QUERY_SERVER_IDS, Collections.emptyMap(), "QUERY system.prepared_statements").whenCompleteAsync(this::gatherServerIds, this.adminExecutor);
            } else {
                LOG.debug("[{}] {} is disabled, repreparing directly", (Object)this.logPrefix, (Object)DefaultDriverOption.REPREPARE_CHECK_SYSTEM_TABLE.getPath());
                RunOrSchedule.on(this.adminExecutor, () -> {
                    this.serverKnownIds = Collections.emptySet();
                    this.gatherPayloadsToReprepare();
                });
            }
        }
    }

    private void gatherServerIds(AdminResult rows, Throwable error) {
        assert (this.adminExecutor.inEventLoop());
        if (this.serverKnownIds == null) {
            this.serverKnownIds = new HashSet<ByteBuffer>();
        }
        if (error != null) {
            LOG.debug("[{}] Error querying system.prepared_statements ({}), proceeding without server ids", (Object)this.logPrefix, (Object)error.toString());
            this.gatherPayloadsToReprepare();
        } else {
            for (AdminRow row : rows) {
                this.serverKnownIds.add(row.getByteBuffer("prepared_id"));
            }
            if (rows.hasNextPage()) {
                LOG.debug("[{}] system.prepared_statements has more pages", (Object)this.logPrefix);
                rows.nextPage().whenCompleteAsync(this::gatherServerIds, this.adminExecutor);
            } else {
                LOG.debug("[{}] Gathered {} server ids, proceeding", (Object)this.logPrefix, (Object)this.serverKnownIds.size());
                this.gatherPayloadsToReprepare();
            }
        }
    }

    private void gatherPayloadsToReprepare() {
        assert (this.adminExecutor.inEventLoop());
        this.toReprepare = new ArrayDeque<RepreparePayload>();
        for (RepreparePayload payload : this.repreparePayloads.values()) {
            if (this.serverKnownIds.contains(payload.id)) {
                LOG.trace("[{}] Skipping statement {} because it is already known to the server", (Object)this.logPrefix, (Object)Bytes.toHexString(payload.id));
                continue;
            }
            if (this.maxStatements > 0 && this.toReprepare.size() == this.maxStatements) {
                LOG.debug("[{}] Limiting number of statements to reprepare to {} as configured, but there are more", (Object)this.logPrefix, (Object)this.maxStatements);
                break;
            }
            this.toReprepare.add(payload);
        }
        if (this.toReprepare.isEmpty()) {
            LOG.debug("[{}] No statements to reprepare that are not known by the server already, done", (Object)this.logPrefix);
            this.whenPrepared.run();
        } else {
            this.startWorkers();
        }
    }

    private void startWorkers() {
        assert (this.adminExecutor.inEventLoop());
        this.runningWorkers = Math.min(this.maxParallelism, this.toReprepare.size());
        LOG.debug("[{}] Repreparing {} statements with {} parallel workers", this.logPrefix, this.toReprepare.size(), this.runningWorkers);
        for (int i = 0; i < this.runningWorkers; ++i) {
            this.startWorker();
        }
    }

    private void startWorker() {
        assert (this.adminExecutor.inEventLoop());
        if (this.toReprepare.isEmpty()) {
            --this.runningWorkers;
            if (this.runningWorkers == 0) {
                LOG.debug("[{}] All workers finished, done", (Object)this.logPrefix);
                this.whenPrepared.run();
            }
        } else {
            RepreparePayload payload = this.toReprepare.poll();
            this.prepareAsync(new Prepare(payload.query, payload.keyspace == null ? null : payload.keyspace.asInternal()), payload.customPayload).handleAsync((result, error) -> {
                this.startWorker();
                return null;
            }, this.adminExecutor);
        }
    }

    @VisibleForTesting
    protected CompletionStage<AdminResult> queryAsync(Message message, Map<String, ByteBuffer> customPayload, String debugString) {
        DriverChannel channel = this.pool.next();
        if (channel == null) {
            return CompletableFutures.failedFuture(new BusyConnectionException("Found no channel to execute reprepare query"));
        }
        ThrottledAdminRequestHandler<AdminResult> reprepareHandler = ThrottledAdminRequestHandler.query(channel, false, message, customPayload, this.timeout, this.throttler, this.metricUpdater, this.logPrefix, debugString);
        return reprepareHandler.start();
    }

    @VisibleForTesting
    protected CompletionStage<ByteBuffer> prepareAsync(Message message, Map<String, ByteBuffer> customPayload) {
        DriverChannel channel = this.pool.next();
        if (channel == null) {
            return CompletableFutures.failedFuture(new BusyConnectionException("Found no channel to execute reprepare query"));
        }
        ThrottledAdminRequestHandler<ByteBuffer> reprepareHandler = ThrottledAdminRequestHandler.prepare(channel, false, message, customPayload, this.timeout, this.throttler, this.metricUpdater, this.logPrefix);
        return reprepareHandler.start();
    }
}

