package com.azure.spring.integration.servicebus.factory;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.util.ClientOptions;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.spring.cloud.context.core.impl.ServiceBusNamespaceManager;
import com.azure.spring.cloud.context.core.impl.ServiceBusQueueManager;
import com.azure.spring.cloud.context.core.impl.ServiceBusTopicManager;
import com.azure.spring.cloud.context.core.impl.ServiceBusTopicSubscriptionManager;
import com.azure.spring.cloud.context.core.util.Constants;
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
import com.azure.spring.integration.servicebus.ServiceBusMessageProcessor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;

/* loaded from: input_file:com/azure/spring/integration/servicebus/factory/DefaultServiceBusQueueClientFactory.class */
public class DefaultServiceBusQueueClientFactory extends AbstractServiceBusSenderFactory implements ServiceBusQueueClientFactory, DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusQueueClientFactory.class);
    private final Map<String, ServiceBusProcessorClient> processorClientMap;
    private final Map<String, ServiceBusSenderAsyncClient> senderClientMap;
    private final ServiceBusClientBuilder serviceBusClientBuilder;

    public DefaultServiceBusQueueClientFactory(String str) {
        this(str, AmqpTransportType.AMQP);
    }

    public DefaultServiceBusQueueClientFactory(String str, AmqpTransportType amqpTransportType) {
        super(str);
        this.processorClientMap = new ConcurrentHashMap();
        this.senderClientMap = new ConcurrentHashMap();
        this.serviceBusClientBuilder = new ServiceBusClientBuilder().connectionString(str).transportType(amqpTransportType).clientOptions(new ClientOptions().setApplicationId(Constants.SPRING_SERVICE_BUS_APPLICATION_ID));
    }

    private <K, V> void close(Map<K, V> map, Consumer<V> consumer) {
        map.values().forEach(obj -> {
            try {
                consumer.accept(obj);
            } catch (Exception e) {
                LOGGER.warn("Failed to clean service bus queue client factory", e);
            }
        });
    }

    public void destroy() {
        close(this.senderClientMap, (v0) -> {
            v0.close();
        });
        close(this.processorClientMap, (v0) -> {
            v0.close();
        });
    }

    @Override // com.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory
    public ServiceBusProcessorClient getOrCreateProcessor(String str, ServiceBusClientConfig serviceBusClientConfig, ServiceBusMessageProcessor<ServiceBusReceivedMessageContext, ServiceBusErrorContext> serviceBusMessageProcessor) {
        return this.processorClientMap.computeIfAbsent(str, str2 -> {
            return createProcessorClient(str2, serviceBusClientConfig, serviceBusMessageProcessor);
        });
    }

    @Override // com.azure.spring.integration.servicebus.factory.ServiceBusSenderFactory
    public ServiceBusSenderAsyncClient getOrCreateSender(String str) {
        return this.senderClientMap.computeIfAbsent(str, this::createQueueSender);
    }

    private ServiceBusProcessorClient createProcessorClient(String str, ServiceBusClientConfig serviceBusClientConfig, ServiceBusMessageProcessor<ServiceBusReceivedMessageContext, ServiceBusErrorContext> serviceBusMessageProcessor) {
        if (serviceBusClientConfig.getConcurrency() != 1) {
            LOGGER.warn("It is detected that concurrency is set, this attribute has been deprecated, you can use " + (serviceBusClientConfig.isSessionsEnabled() ? "maxConcurrentSessions" : "maxConcurrentCalls") + " instead");
        }
        if (serviceBusClientConfig.isSessionsEnabled()) {
            ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processError = this.serviceBusClientBuilder.sessionProcessor().queueName(str).receiveMode(serviceBusClientConfig.getServiceBusReceiveMode()).maxConcurrentCalls(serviceBusClientConfig.getMaxConcurrentCalls()).maxConcurrentSessions(serviceBusClientConfig.getMaxConcurrentSessions()).prefetchCount(serviceBusClientConfig.getPrefetchCount()).processMessage(serviceBusMessageProcessor.processMessage()).processError(serviceBusMessageProcessor.processError());
            return !serviceBusClientConfig.isEnableAutoComplete() ? processError.disableAutoComplete().buildProcessorClient() : processError.buildProcessorClient();
        }
        ServiceBusClientBuilder.ServiceBusProcessorClientBuilder processError2 = this.serviceBusClientBuilder.processor().queueName(str).receiveMode(serviceBusClientConfig.getServiceBusReceiveMode()).maxConcurrentCalls(serviceBusClientConfig.getMaxConcurrentCalls()).prefetchCount(serviceBusClientConfig.getPrefetchCount()).processMessage(serviceBusMessageProcessor.processMessage()).processError(serviceBusMessageProcessor.processError());
        return !serviceBusClientConfig.isEnableAutoComplete() ? processError2.disableAutoComplete().buildProcessorClient() : processError2.buildProcessorClient();
    }

    private ServiceBusSenderAsyncClient createQueueSender(String str) {
        return this.serviceBusClientBuilder.sender().queueName(str).buildAsyncClient();
    }

    public void setRetryOptions(AmqpRetryOptions amqpRetryOptions) {
        this.serviceBusClientBuilder.retryOptions(amqpRetryOptions);
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ String getConnectionString() {
        return super.getConnectionString();
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ void setNamespace(String str) {
        super.setNamespace(str);
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ void setServiceBusTopicSubscriptionManager(ServiceBusTopicSubscriptionManager serviceBusTopicSubscriptionManager) {
        super.setServiceBusTopicSubscriptionManager(serviceBusTopicSubscriptionManager);
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ void setServiceBusTopicManager(ServiceBusTopicManager serviceBusTopicManager) {
        super.setServiceBusTopicManager(serviceBusTopicManager);
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ void setServiceBusQueueManager(ServiceBusQueueManager serviceBusQueueManager) {
        super.setServiceBusQueueManager(serviceBusQueueManager);
    }

    @Override // com.azure.spring.integration.servicebus.factory.AbstractServiceBusSenderFactory
    public /* bridge */ /* synthetic */ void setServiceBusNamespaceManager(ServiceBusNamespaceManager serviceBusNamespaceManager) {
        super.setServiceBusNamespaceManager(serviceBusNamespaceManager);
    }
}
