package cn.com.pg.paas.stream.eventhub.binder.provisioning;

import cn.com.pg.paas.stream.eventhub.binder.properties.EventHubConfigProperties;
import cn.com.pg.paas.stream.eventhub.binder.properties.EventHubsConsumerProperties;
import cn.com.pg.paas.stream.eventhub.binder.properties.EventHubsExtendedBindingProperties;
import cn.com.pg.paas.stream.eventhub.binder.properties.EventHubsProducerProperties;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;

/* loaded from: input_file:cn/com/pg/paas/stream/eventhub/binder/provisioning/EventHubsStreamProvisioner.class */
public class EventHubsStreamProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<EventHubsConsumerProperties>, ExtendedProducerProperties<EventHubsProducerProperties>>, SmartInitializingSingleton {
    private static final String SPRINGCLOUDBUS_CHANNEL_NAME = "springCloudBus";
    private final Map<String, EventHubClient> ehClientMap = new ConcurrentHashMap();
    private final Map<String, EventProcessorHost> epClientMap = new ConcurrentHashMap();
    private EventHubConfigProperties eventHubConfigProperties;
    private EventHubsExtendedBindingProperties eventHubsExtendedBindingProperties;
    private final String binderName;
    private final ApplicationContext applicationContext;
    private static final Logger log = LoggerFactory.getLogger(EventHubsStreamProvisioner.class);
    private static final Map<String, ApplicationContext> binderApplicationContexts = new HashMap();

    public EventHubsStreamProvisioner(String str, EventHubConfigProperties eventHubConfigProperties, EventHubsExtendedBindingProperties eventHubsExtendedBindingProperties, ApplicationContext applicationContext) {
        this.eventHubsExtendedBindingProperties = eventHubsExtendedBindingProperties;
        this.eventHubConfigProperties = eventHubConfigProperties;
        this.applicationContext = applicationContext;
        this.binderName = str;
    }

    public EventHubClient getEventHubClient(String str) {
        if (!this.ehClientMap.containsKey(str)) {
            createEventHubsClient(str, this.eventHubConfigProperties.getProducerConnectionStr());
        }
        return this.ehClientMap.get(str);
    }

    public EventProcessorHost getEventProcessorHost(String str) {
        if (!this.epClientMap.containsKey(str)) {
            createEventProcessorHost(str, this.eventHubConfigProperties.getConsumerConnectionStr());
        }
        return this.epClientMap.get(str);
    }

    private void createEventProcessorHost(String str, String str2) {
        EventHubsConsumerProperties m8getExtendedConsumerProperties = this.eventHubsExtendedBindingProperties.m8getExtendedConsumerProperties(str);
        if (StringUtils.isNotBlank(str2)) {
            ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(str2);
            this.epClientMap.put(str, new EventProcessorHost(EventProcessorHost.createHostName((String) null), connectionStringBuilder.getEventHubName(), m8getExtendedConsumerProperties.getGroup(), connectionStringBuilder.toString(), this.eventHubConfigProperties.getConsumerStorageConnectionStr(), this.eventHubConfigProperties.getConsumerContainer()));
        }
    }

    private void createEventHubsClient(String str, String str2) {
        EventHubsProducerProperties m7getExtendedProducerProperties = this.eventHubsExtendedBindingProperties.m7getExtendedProducerProperties(str);
        try {
            if (StringUtils.isNotBlank(str2)) {
                this.ehClientMap.put(str, EventHubClient.createSync(new ConnectionStringBuilder(str2).toString(), Executors.newScheduledThreadPool(m7getExtendedProducerProperties.getThreadCorePoolSize())));
            }
        } catch (IOException | EventHubException e) {
            log.error("初始化EventHubs链接出错,producerConnectionStr:{}", str2, e);
            e.printStackTrace();
        }
    }

    public void afterSingletonsInstantiated() {
        binderApplicationContexts.put(this.binderName, this.applicationContext);
    }

    public static ApplicationContext getBinderApplicationContexts(String str) {
        return binderApplicationContexts.get(str);
    }

    public void refreshConsumerConnection(String str) {
        log.info(">>>>>>>>>>>>RefreshEventHubConsumerConnection start binderName:{}", this.binderName);
        log.info(">>>>>>>>>>>>changed connectionString:{}", str);
        this.eventHubConfigProperties.setConsumerConnectionStr(str);
        this.epClientMap.clear();
        log.info(">>>>>>>>>>>>RefreshEventHubConsumerConnection end !!");
    }

    public void refreshProduceConnection(String str) {
        log.info(">>>>>>>>>>>>RefreshEventHubProducerConnection start binderName:{}", this.binderName);
        log.info(">>>>>>>>>>>>changed connectionString:{}", str);
        this.eventHubConfigProperties.setProducerConnectionStr(str);
        this.ehClientMap.clear();
        log.info(">>>>>>>>>>>>RefreshEventHubProducerConnection end !!");
    }

    public ProducerDestination provisionProducerDestination(String str, ExtendedProducerProperties<EventHubsProducerProperties> extendedProducerProperties) throws ProvisioningException {
        if (!SPRINGCLOUDBUS_CHANNEL_NAME.equals(str)) {
            log.info("provisionProducerDestination:{}", str);
        }
        return new EventHubsProducerDestination(str);
    }

    public ConsumerDestination provisionConsumerDestination(String str, String str2, ExtendedConsumerProperties<EventHubsConsumerProperties> extendedConsumerProperties) throws ProvisioningException {
        if (!SPRINGCLOUDBUS_CHANNEL_NAME.equals(str)) {
            log.info("provisionProducerDestination:{}", str);
        }
        log.info("provisionConsumerDestination:{}", str);
        return new EventHubsConsumerDestination(str);
    }
}
