package com.azure.spring.integration.servicebus.topic;

import com.azure.spring.cloud.context.core.util.Tuple;
import com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor;
import com.azure.spring.integration.servicebus.ServiceBusClientConfig;
import com.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageConverter;
import com.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import com.azure.spring.integration.servicebus.health.Instrumentation;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:com/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.class */
public class ServiceBusTopicTemplate extends ServiceBusTemplate<ServiceBusTopicClientFactory> implements ServiceBusTopicOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusTopicTemplate.class);
    private static final String MSG_FAIL_CHECKPOINT = "Consumer group '%s' of topic '%s' failed to checkpoint %s";
    private static final String MSG_SUCCESS_CHECKPOINT = "Consumer group '%s' of topic '%s' checkpointed %s in %s mode";
    private final Set<Tuple<String, String>> nameAndConsumerGroups;

    public ServiceBusTopicTemplate(ServiceBusTopicClientFactory serviceBusTopicClientFactory) {
        super(serviceBusTopicClientFactory);
        this.nameAndConsumerGroups = ConcurrentHashMap.newKeySet();
    }

    public ServiceBusTopicTemplate(ServiceBusTopicClientFactory serviceBusTopicClientFactory, ServiceBusMessageConverter serviceBusMessageConverter) {
        super(serviceBusTopicClientFactory, serviceBusMessageConverter);
        this.nameAndConsumerGroups = ConcurrentHashMap.newKeySet();
    }

    @Override // com.azure.spring.integration.servicebus.topic.ServiceBusTopicOperation
    public void setClientConfig(@NonNull ServiceBusClientConfig serviceBusClientConfig) {
        this.clientConfig = serviceBusClientConfig;
    }

    public boolean subscribe(String str, String str2, @NonNull Consumer<Message<?>> consumer, Class<?> cls) {
        Assert.hasText(str, "destination can't be null or empty");
        Tuple<String, String> of = Tuple.of(str, str2);
        if (this.nameAndConsumerGroups.contains(of)) {
            return false;
        }
        this.nameAndConsumerGroups.add(of);
        internalSubscribe(str, str2, consumer, cls);
        return true;
    }

    public boolean unsubscribe(String str, String str2) {
        return this.nameAndConsumerGroups.remove(Tuple.of(str, str2));
    }

    protected void internalSubscribe(final String str, String str2, final Consumer<Message<?>> consumer, Class<?> cls) {
        DefaultServiceBusMessageProcessor defaultServiceBusMessageProcessor = new DefaultServiceBusMessageProcessor(this.checkpointConfig, cls, consumer, this.messageConverter) { // from class: com.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate.1
            @Override // com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor
            protected String buildCheckpointFailMessage(Message<?> message) {
                return String.format(ServiceBusTopicTemplate.MSG_FAIL_CHECKPOINT, consumer, str, message);
            }

            @Override // com.azure.spring.integration.servicebus.DefaultServiceBusMessageProcessor
            protected String buildCheckpointSuccessMessage(Message<?> message) {
                return String.format(ServiceBusTopicTemplate.MSG_SUCCESS_CHECKPOINT, consumer, str, message, ServiceBusTopicTemplate.this.getCheckpointConfig().getCheckpointMode());
            }
        };
        Instrumentation instrumentation = new Instrumentation(str + str2, Instrumentation.Type.CONSUME);
        try {
            this.instrumentationManager.addHealthInstrumentation(instrumentation);
            ((ServiceBusTopicClientFactory) this.clientFactory).getOrCreateProcessor(str, str2, this.clientConfig, defaultServiceBusMessageProcessor).start();
            this.instrumentationManager.getHealthInstrumentation(instrumentation).markStartedSuccessfully();
        } catch (Exception e) {
            this.instrumentationManager.getHealthInstrumentation(instrumentation).markStartFailed(e);
            LOGGER.error("ServiceBus processorClient startup failed, Caused by " + e.getMessage());
            throw new ServiceBusRuntimeException("ServiceBus processor client startup failed, Caused by " + e.getMessage(), e);
        }
    }
}
