package com.azure.spring.integration.servicebus;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.integration.core.AzureCheckpointer;
import com.azure.spring.integration.core.api.CheckpointConfig;
import com.azure.spring.integration.core.api.CheckpointMode;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:com/azure/spring/integration/servicebus/DefaultServiceBusMessageProcessor.class */
public class DefaultServiceBusMessageProcessor implements ServiceBusMessageProcessor<ServiceBusReceivedMessageContext, ServiceBusErrorContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusMessageProcessor.class);
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in %s mode";
    private final CheckpointConfig checkpointConfig;
    private final Class<?> payloadType;
    private final Consumer<Message<?>> consumer;
    private final ServiceBusMessageConverter messageConverter;

    public DefaultServiceBusMessageProcessor(CheckpointConfig checkpointConfig, Class<?> cls, Consumer<Message<?>> consumer, ServiceBusMessageConverter serviceBusMessageConverter) {
        this.consumer = consumer;
        this.payloadType = cls;
        this.checkpointConfig = checkpointConfig;
        this.messageConverter = serviceBusMessageConverter;
    }

    @Override // com.azure.spring.integration.servicebus.ServiceBusMessageProcessor
    public Consumer<ServiceBusErrorContext> processError() {
        return serviceBusErrorContext -> {
        };
    }

    @Override // com.azure.spring.integration.servicebus.ServiceBusMessageProcessor
    public Consumer<ServiceBusReceivedMessageContext> processMessage() {
        return serviceBusReceivedMessageContext -> {
            HashMap hashMap = new HashMap();
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
                return success(serviceBusReceivedMessageContext);
            }, () -> {
                return fail(serviceBusReceivedMessageContext);
            });
            hashMap.put(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT, serviceBusReceivedMessageContext);
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
                hashMap.put("azure_checkpointer", azureCheckpointer);
            }
            Message<?> message = this.messageConverter.toMessage(serviceBusReceivedMessageContext.getMessage(), new MessageHeaders(hashMap), this.payloadType);
            this.consumer.accept(message);
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD) {
                azureCheckpointer.success().whenComplete((r6, th) -> {
                    checkpointHandler(message, th);
                });
            }
        };
    }

    private CompletableFuture<Void> success(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        Objects.requireNonNull(serviceBusReceivedMessageContext);
        return CompletableFuture.runAsync(serviceBusReceivedMessageContext::complete);
    }

    private CompletableFuture<Void> fail(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        Objects.requireNonNull(serviceBusReceivedMessageContext);
        return CompletableFuture.runAsync(serviceBusReceivedMessageContext::abandon);
    }

    private void checkpointHandler(Message<?> message, Throwable th) {
        if (th != null) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn(buildCheckpointFailMessage(message), th);
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(buildCheckpointSuccessMessage(message));
        }
    }

    protected String buildCheckpointFailMessage(Message<?> message) {
        return String.format(MSG_FAIL_CHECKPOINT, message);
    }

    protected String buildCheckpointSuccessMessage(Message<?> message) {
        return String.format(MSG_SUCCESS_CHECKPOINT, message, this.checkpointConfig.getCheckpointMode());
    }
}
