package com.azure.spring.integration.servicebus.queue;

import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor;
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
import com.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders;
import com.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
import com.azure.spring.integration.servicebus.health.Instrumentation;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:com/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.class */
public class ServiceBusQueueTemplate extends ServiceBusTemplate<ServiceBusQueueClientFactory> implements ServiceBusQueueOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueTemplate.class);
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s in queue '%s'";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in queue '%s' in %s mode";
    private final Set<String> subscribedQueues;

    public ServiceBusQueueTemplate(ServiceBusQueueClientFactory serviceBusQueueClientFactory) {
        super(serviceBusQueueClientFactory);
        this.subscribedQueues = ConcurrentHashMap.newKeySet();
    }

    public ServiceBusQueueTemplate(ServiceBusQueueClientFactory serviceBusQueueClientFactory, ServiceBusMessageConverter serviceBusMessageConverter) {
        super(serviceBusQueueClientFactory, serviceBusMessageConverter);
        this.subscribedQueues = ConcurrentHashMap.newKeySet();
    }

    protected void internalSubscribe(final String str, Consumer<Message<?>> consumer, Class<?> cls) {
        DefaultServiceBusMessageProcessor defaultServiceBusMessageProcessor = new DefaultServiceBusMessageProcessor(this.checkpointConfig, cls, consumer, this.messageConverter) { // from class: com.azure.spring.integration.servicebus.queue.ServiceBusQueueTemplate.1
            @Override // com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor
            protected String buildCheckpointFailMessage(Message<?> message) {
                return String.format(ServiceBusQueueTemplate.MSG_FAIL_CHECKPOINT, message, str);
            }

            @Override // com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor
            protected String buildCheckpointSuccessMessage(Message<?> message) {
                return String.format(ServiceBusQueueTemplate.MSG_SUCCESS_CHECKPOINT, message, str, ServiceBusQueueTemplate.this.getCheckpointConfig().getCheckpointMode());
            }
        };
        Instrumentation instrumentation = new Instrumentation(str, Instrumentation.Type.CONSUME);
        try {
            this.instrumentationManager.addHealthInstrumentation(instrumentation);
            ((ServiceBusQueueClientFactory) this.clientFactory).getOrCreateProcessor(str, this.clientConfig, defaultServiceBusMessageProcessor).start();
            this.instrumentationManager.getHealthInstrumentation(instrumentation).markStartedSuccessfully();
        } catch (Exception e) {
            this.instrumentationManager.getHealthInstrumentation(instrumentation).markStartFailed(e);
            LOGGER.error("ServiceBus processorClient startup failed, Caused by " + e.getMessage());
            throw new ServiceBusRuntimeException("ServiceBus processor client startup failed, Caused by " + e.getMessage(), e);
        }
    }

    @Override // com.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation
    public void setClientConfig(@NonNull ServiceBusClientConfig serviceBusClientConfig) {
        this.clientConfig = serviceBusClientConfig;
    }

    @Override // com.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation
    public <T> void deadLetter(String str, Message<T> message, String str2, String str3) {
        Assert.hasText(str, "destination can't be null or empty");
        ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = (ServiceBusReceivedMessageContext) message.getHeaders().get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT);
        if (serviceBusReceivedMessageContext != null) {
            serviceBusReceivedMessageContext.deadLetter();
        }
    }

    @Override // com.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation
    public <T> void abandon(String str, Message<T> message) {
        Assert.hasText(str, "destination can't be null or empty");
        ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = (ServiceBusReceivedMessageContext) message.getHeaders().get(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT);
        if (serviceBusReceivedMessageContext != null) {
            serviceBusReceivedMessageContext.abandon();
        }
    }

    public boolean subscribe(String str, @NonNull Consumer<Message<?>> consumer, @NonNull Class<?> cls) {
        Assert.hasText(str, "destination can't be null or empty");
        if (this.subscribedQueues.contains(str)) {
            return false;
        }
        this.subscribedQueues.add(str);
        internalSubscribe(str, consumer, cls);
        return true;
    }

    public boolean unsubscribe(String str) {
        return this.subscribedQueues.remove(str);
    }
}
