/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.dse.driver.internal.core.cql.reactive;

import com.datastax.dse.driver.internal.core.cql.reactive.EmptySubscription;
import com.datastax.dse.driver.internal.core.cql.reactive.ReactiveOperators;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleUnicastProcessor<ElementT>
implements Processor<ElementT, ElementT>,
Subscription {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleUnicastProcessor.class);
    private static final Object ON_COMPLETE = new Object();
    private final Queue<Object> queue = new ConcurrentLinkedDeque<Object>();
    private final AtomicBoolean once = new AtomicBoolean(false);
    private final AtomicInteger draining = new AtomicInteger(0);
    private final AtomicLong requested = new AtomicLong(0L);
    private volatile Subscriber<? super ElementT> subscriber;
    private volatile boolean cancelled;

    @Override
    public void subscribe(Subscriber<? super ElementT> subscriber) {
        Objects.requireNonNull(subscriber, "Subscriber cannot be null");
        if (this.once.compareAndSet(false, true)) {
            this.subscriber = subscriber;
            try {
                subscriber.onSubscribe(this);
            }
            catch (Throwable t2) {
                this.doOnError(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t2));
            }
        } else {
            subscriber.onSubscribe(EmptySubscription.INSTANCE);
            subscriber.onError(new IllegalStateException("This publisher does not support multiple subscriptions"));
        }
    }

    @Override
    public void onSubscribe(Subscription s2) {
    }

    @Override
    public void onNext(ElementT value) {
        if (!this.cancelled) {
            this.queue.offer(value);
            this.drain();
        }
    }

    @Override
    public void onError(Throwable error) {
        if (!this.cancelled) {
            this.queue.offer(error);
            this.drain();
        }
    }

    @Override
    public void onComplete() {
        if (!this.cancelled) {
            this.queue.offer(ON_COMPLETE);
            this.drain();
        }
    }

    @Override
    public void request(long n) {
        if (!this.cancelled) {
            if (n < 1L) {
                this.doOnError(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            } else {
                ReactiveOperators.addCap(this.requested, n);
                this.drain();
            }
        }
    }

    @Override
    public void cancel() {
        if (!this.cancelled) {
            this.cancelled = true;
            if (this.draining.getAndIncrement() == 0) {
                this.clear();
            }
        }
    }

    private void drain() {
        if (this.draining.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        do {
            long emitted;
            long requested = this.requested.get();
            for (emitted = 0L; requested != emitted; ++emitted) {
                if (this.cancelled) {
                    this.clear();
                    return;
                }
                Object t2 = this.queue.poll();
                if (t2 == null) break;
                if (t2 instanceof Throwable) {
                    Throwable error = (Throwable)t2;
                    this.doOnError(error);
                    this.clear();
                    return;
                }
                if (t2 == ON_COMPLETE) {
                    this.doOnComplete();
                    this.clear();
                    return;
                }
                Object item = t2;
                this.doOnNext(item);
            }
            if (this.cancelled) {
                this.clear();
                return;
            }
            if (emitted == 0L) continue;
            ReactiveOperators.subCap(this.requested, emitted);
        } while ((missed = this.draining.addAndGet(-missed)) != 0);
    }

    private void doOnNext(@NonNull ElementT result) {
        try {
            this.subscriber.onNext(result);
        }
        catch (Throwable t2) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext.", t2);
            this.cancel();
        }
    }

    private void doOnComplete() {
        try {
            this.subscriber.onComplete();
        }
        catch (Throwable t2) {
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t2);
        }
        this.cancel();
    }

    private void doOnError(@NonNull Throwable error) {
        try {
            this.subscriber.onError(error);
        }
        catch (Throwable t2) {
            t2.addSuppressed(error);
            LOG.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2);
        }
        this.cancel();
    }

    private void clear() {
        this.queue.clear();
        this.subscriber = null;
    }
}

