/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.spring.integration.servicebus.topic.support;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import com.microsoft.azure.spring.integration.servicebus.topic.ServiceBusTopicTemplate;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.springframework.messaging.Message;

public class ServiceBusTopicTestOperation
extends ServiceBusTopicTemplate {
    private final Multimap<String, IMessage> topicsByName = ArrayListMultimap.create();
    private final Map<String, Map<String, ServiceBusMessageHandler<?>>> handlersByNameAndGroup = new ConcurrentHashMap();

    public ServiceBusTopicTestOperation(ServiceBusTopicClientFactory clientFactory) {
        super(clientFactory);
    }

    @Override
    public <U> CompletableFuture<Void> sendAsync(String name, Message<U> message, PartitionSupplier partitionSupplier) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        IMessage azureMessage = (IMessage)this.getMessageConverter().fromMessage(message, IMessage.class);
        this.topicsByName.put((Object)name, (Object)azureMessage);
        this.handlersByNameAndGroup.putIfAbsent(name, new ConcurrentHashMap());
        this.handlersByNameAndGroup.get(name).values().forEach(c -> c.onMessageAsync(azureMessage));
        future.complete(null);
        return future;
    }

    @Override
    protected void internalSubscribe(String name, String consumerGroup, Consumer<Message<?>> consumer, Class<?> payloadType) {
        ISubscriptionClient subscriptionClient = ((ServiceBusTopicClientFactory)this.senderFactory).getOrCreateSubscriptionClient(name, consumerGroup);
        ServiceBusTopicTemplate.TopicMessageHandler handler = new ServiceBusTopicTemplate.TopicMessageHandler(this, consumer, payloadType, subscriptionClient);
        try {
            subscriptionClient.registerMessageHandler(handler);
        }
        catch (ServiceBusException | InterruptedException e) {
            throw new ServiceBusRuntimeException("Failed to internalSubscribe message handler", e);
        }
        this.handlersByNameAndGroup.putIfAbsent(name, new ConcurrentHashMap());
        this.handlersByNameAndGroup.get(name).put(consumerGroup, handler);
    }

    @Override
    public boolean unsubscribe(String name, String consumerGroup) {
        this.handlersByNameAndGroup.get(name).remove(consumerGroup);
        return true;
    }
}

