package pt.unl.fct.di.novasys.sumo.utils;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/* loaded from: input_file:pt/unl/fct/di/novasys/sumo/utils/ConcurrencyEngine.class */
public class ConcurrencyEngine<T> {
    public static final Supplier<String> STRING_POISON_PILL_SUPPLIER = () -> {
        return "__STOP__";
    };
    private static final int CONSUMER_TERMINATION_TIMEOUT = 10;
    private final int numConsumers;
    private final BlockingQueue<T> queue;
    private final ExecutorService consumerPool;
    private final Supplier<T> poisonPillSupplier;

    public ConcurrencyEngine(int i, int i2, Supplier<T> supplier) {
        this.numConsumers = i;
        this.queue = new LinkedBlockingQueue(i2);
        this.consumerPool = Executors.newFixedThreadPool(i);
        this.poisonPillSupplier = supplier;
    }

    public static void main(String[] strArr) throws InterruptedException {
        ConcurrencyEngine concurrencyEngine = new ConcurrencyEngine(10, 10000, () -> {
            return "__STOP__";
        });
        concurrencyEngine.start(() -> {
            for (int i = 1; i <= 100000; i++) {
                try {
                    String str = "Message " + i;
                    System.out.println("Produced: " + str);
                    concurrencyEngine.submit(str);
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.err.println("Producer interrupted.");
                    return;
                }
            }
        }, str -> {
            System.out.println("Consumed: " + str + " by " + Thread.currentThread().getName());
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public void start(Runnable runnable, Consumer<T> consumer) throws InterruptedException {
        for (int i = 0; i < this.numConsumers; i++) {
            this.consumerPool.submit(() -> {
                while (true) {
                    try {
                        T take = this.queue.take();
                        if (take.equals(this.poisonPillSupplier.get())) {
                            return;
                        } else {
                            consumer.accept(take);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            });
        }
        runnable.run();
        for (int i2 = 0; i2 < this.numConsumers; i2++) {
            this.queue.put(this.poisonPillSupplier.get());
        }
        this.consumerPool.shutdown();
        while (!this.consumerPool.awaitTermination(10L, TimeUnit.SECONDS)) {
            System.out.println("Waiting for consumer threads to finish...");
        }
        System.out.println("All consumers have finished processing and terminated.");
    }

    public void submit(T t) throws InterruptedException {
        this.queue.put(t);
    }
}
