/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

public class RequestResponseChannel
implements AsyncCloseable {
    private final ClientLogger logger;
    private final Sender sendLink;
    private final Receiver receiveLink;
    private final SendLinkHandler sendLinkHandler;
    private final ReceiveLinkHandler receiveLinkHandler;
    private final SenderSettleMode senderSettleMode;
    private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
    private volatile AmqpEndpointState sendLinkState;
    private volatile AmqpEndpointState receiveLinkState;
    private final AtomicLong requestId = new AtomicLong(0L);
    private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends = new ConcurrentSkipListMap();
    private final AtomicInteger pendingLinkTerminations = new AtomicInteger(2);
    private final Sinks.One<Void> closeMono = Sinks.one();
    private final AtomicBoolean hasError = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Disposable.Composite subscriptions;
    private final AmqpRetryOptions retryOptions;
    private final String replyTo;
    private final String activeEndpointTimeoutMessage;
    private final MessageSerializer messageSerializer;
    private final ReactorProvider provider;

    protected RequestResponseChannel(AmqpConnection amqpConnection, String connectionId, String fullyQualifiedNamespace, String linkName, String entityPath, Session session, AmqpRetryOptions retryOptions, ReactorHandlerProvider handlerProvider, ReactorProvider provider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        Map<String, Object> loggingContext = AmqpLoggingUtils.createContextWithConnectionId(connectionId);
        loggingContext.put("linkName", linkName);
        this.logger = new ClientLogger(RequestResponseChannel.class, loggingContext);
        this.retryOptions = retryOptions;
        this.provider = provider;
        this.senderSettleMode = senderSettleMode;
        this.activeEndpointTimeoutMessage = String.format("RequestResponseChannel connectionId[%s], linkName[%s]: Waiting for send and receive handler to be ACTIVE", connectionId, linkName);
        this.replyTo = entityPath.replace("$", "") + "-client-reply-to";
        this.messageSerializer = messageSerializer;
        this.sendLink = session.sender(linkName + ":sender");
        Target senderTarget = new Target();
        senderTarget.setAddress(entityPath);
        this.sendLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)senderTarget);
        this.sendLink.setSource((org.apache.qpid.proton.amqp.transport.Source)new Source());
        this.sendLink.setSenderSettleMode(senderSettleMode);
        this.sendLinkHandler = handlerProvider.createSendLinkHandler(connectionId, fullyQualifiedNamespace, linkName, entityPath);
        BaseHandler.setHandler((Extendable)this.sendLink, (Handler)this.sendLinkHandler);
        this.receiveLink = session.receiver(linkName + ":receiver");
        Source receiverSource = new Source();
        receiverSource.setAddress(entityPath);
        this.receiveLink.setSource((org.apache.qpid.proton.amqp.transport.Source)receiverSource);
        Target receiverTarget = new Target();
        receiverTarget.setAddress(this.replyTo);
        this.receiveLink.setTarget((org.apache.qpid.proton.amqp.transport.Target)receiverTarget);
        this.receiveLink.setSenderSettleMode(senderSettleMode);
        this.receiveLink.setReceiverSettleMode(receiverSettleMode);
        this.receiveLinkHandler = handlerProvider.createReceiveLinkHandler(connectionId, fullyQualifiedNamespace, linkName, entityPath);
        BaseHandler.setHandler((Extendable)this.receiveLink, (Handler)this.receiveLinkHandler);
        this.subscriptions = Disposables.composite((Disposable[])new Disposable[]{this.receiveLinkHandler.getDeliveredMessages().map(this::decodeDelivery).subscribe(message -> {
            this.logger.atVerbose().addKeyValue("messageId", message.getCorrelationId()).log("Settling message.");
            this.settleMessage((Message)message);
        }), this.receiveLinkHandler.getEndpointStates().subscribe(state -> this.updateEndpointState(null, AmqpEndpointStateUtil.getConnectionState(state)), error -> {
            this.handleError((Throwable)error, "Error in ReceiveLinkHandler.");
            this.onTerminalState("ReceiveLinkHandler");
        }, () -> {
            this.closeAsync().subscribe();
            this.onTerminalState("ReceiveLinkHandler");
        }), this.sendLinkHandler.getEndpointStates().subscribe(state -> this.updateEndpointState(AmqpEndpointStateUtil.getConnectionState(state), null), error -> {
            this.handleError((Throwable)error, "Error in SendLinkHandler.");
            this.onTerminalState("SendLinkHandler");
        }, () -> {
            this.closeAsync().subscribe();
            this.onTerminalState("SendLinkHandler");
        }), amqpConnection.getShutdownSignals().next().flatMap(signal -> {
            this.logger.verbose("Shutdown signal received.");
            return this.closeAsync();
        }).subscribe()});
        try {
            this.provider.getReactorDispatcher().invoke(() -> {
                this.sendLink.open();
                this.receiveLink.open();
            });
        }
        catch (IOException | RejectedExecutionException e) {
            throw this.logger.logExceptionAsError(new RuntimeException("Unable to open send and receive link.", e));
        }
    }

    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates.asFlux();
    }

    public Mono<Void> closeAsync() {
        Mono closeOperationWithTimeout = this.closeMono.asMono().timeout(this.retryOptions.getTryTimeout()).onErrorResume(TimeoutException.class, error -> Mono.fromRunnable(() -> {
            this.logger.info("Timed out waiting for RequestResponseChannel to complete closing. Manually closing.");
            this.onTerminalState("SendLinkHandler");
            this.onTerminalState("ReceiveLinkHandler");
        })).subscribeOn(Schedulers.boundedElastic());
        if (this.isDisposed.getAndSet(true)) {
            this.logger.verbose("Channel already closed.");
            return closeOperationWithTimeout;
        }
        this.logger.verbose("Closing request/response channel.");
        return Mono.fromRunnable(() -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    this.logger.verbose("Closing send link and receive link.");
                    this.sendLink.close();
                    this.receiveLink.close();
                });
            }
            catch (IOException | RejectedExecutionException e) {
                this.logger.info("Unable to schedule close work. Closing manually.");
                this.sendLink.close();
                this.receiveLink.close();
            }
        }).subscribeOn(Schedulers.boundedElastic()).then(closeOperationWithTimeout);
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public Mono<Message> sendWithAck(Message message) {
        return this.sendWithAck(message, null);
    }

    public Mono<Message> sendWithAck(Message message, DeliveryState deliveryState) {
        if (this.isDisposed()) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new RequestResponseChannelClosedException());
        }
        if (message == null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new NullPointerException("message cannot be null"));
        }
        if (message.getMessageId() != null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("message.getMessageId() should be null"));
        }
        if (message.getReplyTo() != null) {
            return FluxUtil.monoError((ClientLogger)this.logger, (RuntimeException)new IllegalArgumentException("message.getReplyTo() should be null"));
        }
        UnsignedLong messageId = UnsignedLong.valueOf((long)this.requestId.incrementAndGet());
        message.setMessageId((Object)messageId);
        message.setReplyTo(this.replyTo);
        Mono onActiveEndpoints = Mono.when((Publisher[])new Publisher[]{this.sendLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE), this.receiveLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE)});
        return RetryUtil.withRetry(onActiveEndpoints, this.retryOptions, this.activeEndpointTimeoutMessage).then(Mono.create(sink -> {
            try {
                this.logger.atVerbose().addKeyValue("messageId", message.getCorrelationId()).log("Scheduling on dispatcher.");
                this.unconfirmedSends.putIfAbsent(messageId, (MonoSink<Message>)sink);
                this.provider.getReactorDispatcher().invoke(() -> {
                    if (this.isDisposed()) {
                        sink.error((Throwable)new RequestResponseChannelClosedException(this.sendLink.getLocalState(), this.receiveLink.getLocalState()));
                        return;
                    }
                    Delivery delivery = this.sendLink.delivery(UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8));
                    if (deliveryState != null) {
                        this.logger.atVerbose().addKeyValue("state", (Object)deliveryState).log("Setting delivery state.");
                        delivery.setMessageFormat(0);
                        delivery.disposition(deliveryState);
                    }
                    int payloadSize = this.messageSerializer.getSize(message) + 512;
                    byte[] bytes = new byte[payloadSize];
                    int encodedSize = message.encode(bytes, 0, payloadSize);
                    this.receiveLink.flow(1);
                    this.sendLink.send(bytes, 0, encodedSize);
                    delivery.settle();
                    this.sendLink.advance();
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        }));
    }

    public AmqpErrorContext getErrorContext() {
        return this.receiveLinkHandler.getErrorContext((Link)this.receiveLink);
    }

    protected Message decodeDelivery(Delivery delivery) {
        Message response = Proton.message();
        int msgSize = delivery.pending();
        byte[] buffer = new byte[msgSize];
        int read = this.receiveLink.recv(buffer, 0, msgSize);
        response.decode(buffer, 0, read);
        if (this.senderSettleMode == SenderSettleMode.SETTLED) {
            delivery.disposition((DeliveryState)Accepted.getInstance());
            delivery.settle();
        }
        return response;
    }

    private void settleMessage(Message message) {
        String id = String.valueOf(message.getCorrelationId());
        UnsignedLong correlationId = UnsignedLong.valueOf((String)id);
        MonoSink<Message> sink = this.unconfirmedSends.remove(correlationId);
        if (sink == null) {
            this.logger.atWarning().addKeyValue("messageId", id).log("Received delivery without pending message.");
            return;
        }
        sink.success((Object)message);
    }

    private void handleError(Throwable error, String message) {
        if (this.hasError.getAndSet(true)) {
            return;
        }
        this.logger.atError().log("{} Disposing unconfirmed sends.", new Object[]{message, error});
        this.endpointStates.emitError(error, (signalType, emitResult) -> {
            AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType, emitResult).log("Could not emit error to sink.");
            return false;
        });
        this.terminateUnconfirmedSends(error);
        this.closeAsync().subscribe();
    }

    private void onTerminalState(String handlerName) {
        if (this.pendingLinkTerminations.get() <= 0) {
            this.logger.atVerbose().log("Already disposed send/receive links.");
            return;
        }
        int remaining = this.pendingLinkTerminations.decrementAndGet();
        this.logger.verbose("{} disposed. Remaining: {}", new Object[]{handlerName, remaining});
        if (remaining == 0) {
            this.subscriptions.dispose();
            this.terminateUnconfirmedSends((Throwable)((Object)new AmqpException(true, "The RequestResponseChannel didn't receive the acknowledgment for the send due receive link termination.", null)));
            this.endpointStates.emitComplete((signalType, emitResult) -> this.onEmitSinkFailure(signalType, emitResult, "Could not emit complete signal."));
            this.closeMono.emitEmpty((signalType, emitResult) -> this.onEmitSinkFailure(signalType, emitResult, handlerName + ". Error closing mono."));
        }
    }

    private boolean onEmitSinkFailure(SignalType signalType, Sinks.EmitResult emitResult, String message) {
        AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atVerbose(), signalType, emitResult).log(message);
        return false;
    }

    private synchronized void updateEndpointState(AmqpEndpointState sendLinkState, AmqpEndpointState receiveLinkState) {
        if (sendLinkState != null) {
            this.sendLinkState = sendLinkState;
        } else if (receiveLinkState != null) {
            this.receiveLinkState = receiveLinkState;
        }
        this.logger.atVerbose().addKeyValue("sendState", (Object)this.sendLinkState).addKeyValue("receiveState", (Object)this.receiveLinkState).log("Updating endpoint states.");
        if (this.sendLinkState == this.receiveLinkState) {
            this.endpointStates.emitNext((Object)this.sendLinkState, Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }

    private void terminateUnconfirmedSends(Throwable error) {
        Map.Entry<UnsignedLong, MonoSink<Message>> next;
        this.logger.verbose("Terminating {} unconfirmed sends (reason: {}).", new Object[]{this.unconfirmedSends.size(), error.getMessage()});
        int count = 0;
        while ((next = this.unconfirmedSends.pollFirstEntry()) != null) {
            next.getValue().error(error);
            ++count;
        }
        this.logger.atVerbose().log("completed the termination of {} unconfirmed sends (reason: {}).", new Object[]{count, error.getMessage()});
    }
}

