/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

class SynchronousReceiveWork
implements AutoCloseable {
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000L);
    private final ClientLogger logger = new ClientLogger(SynchronousReceiveWork.class);
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Duration timeout;
    private final FluxSink<ServiceBusReceivedMessage> emitter;
    private final FluxSink<ServiceBusReceivedMessage> messageReceivedSink;
    private final DirectProcessor<ServiceBusReceivedMessage> emitterProcessor;
    private final Disposable nextMessageSubscriber;
    private boolean workTimedOut = false;
    private boolean processingStarted;
    private volatile Throwable error = null;

    SynchronousReceiveWork(long id, int numberToReceive, Duration timeout, FluxSink<ServiceBusReceivedMessage> emitter) {
        this.id = id;
        this.remaining = new AtomicInteger(numberToReceive);
        this.numberToReceive = numberToReceive;
        this.timeout = timeout;
        this.emitter = emitter;
        this.emitterProcessor = DirectProcessor.create();
        this.messageReceivedSink = this.emitterProcessor.sink();
        this.nextMessageSubscriber = Flux.switchOnNext((Publisher)this.emitterProcessor.map(messageContext -> Flux.interval((Duration)TIMEOUT_BETWEEN_MESSAGES))).handle((delay, sink) -> {
            this.logger.info("[{}]: Timeout between the messages occurred. Completing the work.", new Object[]{id});
            sink.next(delay);
            emitter.complete();
        }).subscribe();
    }

    long getId() {
        return this.id;
    }

    Duration getTimeout() {
        return this.timeout;
    }

    int getNumberOfEvents() {
        return this.numberToReceive;
    }

    int getRemaining() {
        return this.remaining.get();
    }

    boolean isTerminal() {
        return this.emitter.isCancelled() || this.remaining.get() == 0 || this.error != null || this.workTimedOut;
    }

    void next(ServiceBusReceivedMessage message) {
        try {
            this.emitter.next((Object)message);
            this.messageReceivedSink.next((Object)message);
            this.remaining.decrementAndGet();
        }
        catch (Exception e) {
            this.logger.warning("Exception occurred while publishing downstream.", new Object[]{e});
            this.error(e);
        }
    }

    void complete() {
        this.logger.info("[{}]: Completing task.", new Object[]{this.id});
        this.emitter.complete();
        this.close();
    }

    void timeout() {
        this.logger.info("[{}]: Work timeout occurred. Completing the work.", new Object[]{this.id});
        this.emitter.complete();
        this.workTimedOut = true;
        this.close();
    }

    void error(Throwable error) {
        this.error = error;
        this.emitter.error(error);
        this.close();
    }

    Throwable getError() {
        return this.error;
    }

    void startedProcessing() {
        this.processingStarted = true;
    }

    boolean isProcessingStarted() {
        return this.processingStarted;
    }

    @Override
    public void close() {
        if (!this.nextMessageSubscriber.isDisposed()) {
            this.nextMessageSubscriber.dispose();
        }
    }
}

