/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.SynchronousMessageSubscriber;
import com.azure.messaging.servicebus.SynchronousReceiveWork;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@ServiceClient(builder=ServiceBusClientBuilder.class)
public final class ServiceBusReceiverClient
implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(ServiceBusReceiverClient.class);
    private final AtomicInteger idGenerator = new AtomicInteger();
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final Duration operationTimeout;
    private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference();

    ServiceBusReceiverClient(ServiceBusReceiverAsyncClient asyncClient, Duration operationTimeout) {
        this.asyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null.");
        this.operationTimeout = Objects.requireNonNull(operationTimeout, "'operationTimeout' cannot be null.");
    }

    public String getFullyQualifiedNamespace() {
        return this.asyncClient.getFullyQualifiedNamespace();
    }

    public String getEntityPath() {
        return this.asyncClient.getEntityPath();
    }

    public void abandon(ServiceBusReceivedMessage message) {
        this.asyncClient.abandon(message).block(this.operationTimeout);
    }

    public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
        this.asyncClient.abandon(message, options).block(this.operationTimeout);
    }

    public void complete(ServiceBusReceivedMessage message) {
        this.asyncClient.complete(message).block(this.operationTimeout);
    }

    public void complete(ServiceBusReceivedMessage message, CompleteOptions options) {
        this.asyncClient.complete(message, options).block(this.operationTimeout);
    }

    public void defer(ServiceBusReceivedMessage message) {
        this.asyncClient.defer(message).block(this.operationTimeout);
    }

    public void defer(ServiceBusReceivedMessage message, DeferOptions options) {
        this.asyncClient.defer(message, options).block(this.operationTimeout);
    }

    public void deadLetter(ServiceBusReceivedMessage message) {
        this.asyncClient.deadLetter(message).block(this.operationTimeout);
    }

    public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
        this.asyncClient.deadLetter(message, options).block(this.operationTimeout);
    }

    public byte[] getSessionState() {
        return this.getSessionState(this.asyncClient.getReceiverOptions().getSessionId());
    }

    public ServiceBusReceivedMessage peekMessage() {
        return this.peekMessage(this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage peekMessage(String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.peekMessage(sessionId).block(this.operationTimeout);
    }

    public ServiceBusReceivedMessage peekMessage(long sequenceNumber) {
        return this.peekMessage(sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage peekMessage(long sequenceNumber, String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.peekMessage(sequenceNumber, sessionId).block(this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages) {
        return this.peekMessages(maxMessages, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, String sessionId) {
        if (maxMessages <= 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        Flux messages = this.asyncClient.peekMessages(maxMessages, sessionId).timeout(this.operationTimeout);
        messages.subscribe();
        return new IterableStream(messages);
    }

    public IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
        return this.peekMessages(maxMessages, sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
        if (maxMessages <= 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        Flux messages = this.asyncClient.peekMessages(maxMessages, sequenceNumber, sessionId).timeout(this.operationTimeout);
        messages.subscribe();
        return new IterableStream(messages);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages) {
        return this.receiveMessages(maxMessages, this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveMessages(int maxMessages, Duration maxWaitTime) {
        if (maxMessages <= 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
        }
        if (Objects.isNull(maxWaitTime)) {
            throw this.logger.logExceptionAsError((RuntimeException)new NullPointerException("'maxWaitTime' cannot be null."));
        }
        if (maxWaitTime.isNegative() || maxWaitTime.isZero()) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'maxWaitTime' cannot be zero or less. maxWaitTime: " + maxWaitTime));
        }
        Flux messages = Flux.create(emitter -> this.queueWork(maxMessages, maxWaitTime, (FluxSink<ServiceBusReceivedMessage>)emitter));
        return new IterableStream(messages);
    }

    public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) {
        return this.receiveDeferredMessage(sequenceNumber, this.asyncClient.getReceiverOptions().getSessionId());
    }

    ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) {
        return (ServiceBusReceivedMessage)this.asyncClient.receiveDeferredMessage(sequenceNumber, sessionId).block(this.operationTimeout);
    }

    public IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers) {
        return this.receiveDeferredMessageBatch(sequenceNumbers, this.asyncClient.getReceiverOptions().getSessionId());
    }

    IterableStream<ServiceBusReceivedMessage> receiveDeferredMessageBatch(Iterable<Long> sequenceNumbers, String sessionId) {
        Flux messages = this.asyncClient.receiveDeferredMessages(sequenceNumbers, sessionId).timeout(this.operationTimeout);
        messages.subscribe();
        return new IterableStream(messages);
    }

    public OffsetDateTime renewMessageLock(ServiceBusReceivedMessage message) {
        return (OffsetDateTime)this.asyncClient.renewMessageLock(message).block(this.operationTimeout);
    }

    public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        String lockToken = message != null ? message.getLockToken() : "null";
        Consumer<Throwable> throwableConsumer = onError != null ? onError : error -> this.logger.warning("Exception occurred while renewing lock token '{}'.", new Object[]{lockToken, error});
        this.asyncClient.renewMessageLock(message, maxLockRenewalDuration).subscribe(v -> this.logger.verbose("Completed renewing lock token: '{}'", new Object[]{lockToken}), throwableConsumer, () -> this.logger.verbose("Auto message lock renewal operation completed: {}", new Object[]{lockToken}));
    }

    public OffsetDateTime renewSessionLock() {
        return this.renewSessionLock(this.asyncClient.getReceiverOptions().getSessionId());
    }

    public void renewSessionLock(Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        this.renewSessionLock(this.asyncClient.getReceiverOptions().getSessionId(), maxLockRenewalDuration, onError);
    }

    public void setSessionState(byte[] sessionState) {
        this.setSessionState(this.asyncClient.getReceiverOptions().getSessionId(), sessionState);
    }

    public ServiceBusTransactionContext createTransaction() {
        return (ServiceBusTransactionContext)this.asyncClient.createTransaction().block(this.operationTimeout);
    }

    public void commitTransaction(ServiceBusTransactionContext transactionContext) {
        this.asyncClient.commitTransaction(transactionContext).block(this.operationTimeout);
    }

    public void rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        this.asyncClient.rollbackTransaction(transactionContext).block(this.operationTimeout);
    }

    @Override
    public void close() {
        SynchronousMessageSubscriber messageSubscriber = this.synchronousMessageSubscriber.getAndSet(null);
        if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
            messageSubscriber.dispose();
        }
        this.asyncClient.close();
    }

    private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink<ServiceBusReceivedMessage> emitter) {
        long id = this.idGenerator.getAndIncrement();
        int prefetch = this.asyncClient.getReceiverOptions().getPrefetchCount();
        int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount;
        SynchronousReceiveWork work = new SynchronousReceiveWork(id, toRequest, maxWaitTime, emitter);
        SynchronousMessageSubscriber messageSubscriber = this.synchronousMessageSubscriber.get();
        if (messageSubscriber == null) {
            SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work);
            if (!this.synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
                newSubscriber.dispose();
                SynchronousMessageSubscriber existing = this.synchronousMessageSubscriber.get();
                existing.queueWork(work);
            } else {
                this.asyncClient.receiveMessagesNoBackPressure().subscribeWith((Subscriber)newSubscriber);
            }
        } else {
            messageSubscriber.queueWork(work);
        }
        this.logger.verbose("[{}] Receive request queued up.", new Object[]{work.getId()});
    }

    OffsetDateTime renewSessionLock(String sessionId) {
        return (OffsetDateTime)this.asyncClient.renewSessionLock(sessionId).block(this.operationTimeout);
    }

    void renewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer<Throwable> onError) {
        Consumer<Throwable> throwableConsumer = onError != null ? onError : error -> this.logger.warning("Exception occurred while renewing session: '{}'.", new Object[]{sessionId, error});
        this.asyncClient.renewSessionLock(maxLockRenewalDuration).subscribe(v -> this.logger.verbose("Completed renewing session: '{}'", new Object[]{sessionId}), throwableConsumer, () -> this.logger.verbose("Auto session lock renewal operation completed: {}", new Object[]{sessionId}));
    }

    void setSessionState(String sessionId, byte[] sessionState) {
        this.asyncClient.setSessionState(sessionId, sessionState).block(this.operationTimeout);
    }

    byte[] getSessionState(String sessionId) {
        return (byte[])this.asyncClient.getSessionState(sessionId).block(this.operationTimeout);
    }
}

