/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.reactive.RedisServerReactiveCommands;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ClusterTopologyProvider;
import org.springframework.data.redis.connection.ReactiveClusterServerCommands;
import org.springframework.data.redis.connection.RedisClusterNode;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

class LettuceReactiveClusterServerCommands
extends LettuceReactiveServerCommands
implements ReactiveClusterServerCommands {
    private final LettuceReactiveRedisClusterConnection connection;
    private final ClusterTopologyProvider topologyProvider;

    LettuceReactiveClusterServerCommands(LettuceReactiveRedisClusterConnection connection, ClusterTopologyProvider topologyProvider) {
        super(connection);
        this.connection = connection;
        this.topologyProvider = topologyProvider;
    }

    @Override
    public Mono<String> bgReWriteAof(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::bgrewriteaof).next();
    }

    @Override
    public Mono<String> bgSave(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::bgsave).next();
    }

    @Override
    public Mono<Long> lastSave(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::lastsave).map(Date::getTime).next();
    }

    @Override
    public Mono<String> save(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::save).next();
    }

    @Override
    public Mono<Long> dbSize(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::dbsize).next();
    }

    @Override
    public Mono<String> flushDb(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::flushdb).next();
    }

    @Override
    public Mono<String> flushDb(RedisClusterNode node, RedisServerCommands.FlushOption option) {
        return this.connection.execute(node, it -> it.flushdb(LettuceConverters.toFlushMode(option))).next();
    }

    @Override
    public Mono<String> flushAll(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::flushall).next();
    }

    @Override
    public Mono<String> flushAll(RedisClusterNode node, RedisServerCommands.FlushOption option) {
        return this.connection.execute(node, it -> it.flushall(LettuceConverters.toFlushMode(option))).next();
    }

    @Override
    public Mono<Properties> info() {
        return Flux.merge(this.executeOnAllNodes(this::info)).collect(PropertiesCollector.INSTANCE);
    }

    @Override
    public Mono<Properties> info(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::info).map(Converters::toProperties).next();
    }

    @Override
    public Mono<Properties> info(String section) {
        Assert.hasText(section, "Section must not be null or empty!");
        return Flux.merge(this.executeOnAllNodes(redisClusterNode -> this.info((RedisClusterNode)redisClusterNode, section))).collect(PropertiesCollector.INSTANCE);
    }

    @Override
    public Mono<Properties> info(RedisClusterNode node, String section) {
        Assert.hasText(section, "Section must not be null or empty!");
        return this.connection.execute(node, c -> c.info(section)).map(Converters::toProperties).next();
    }

    @Override
    public Mono<Properties> getConfig(String pattern) {
        Assert.hasText(pattern, "Pattern must not be null or empty!");
        return Flux.merge(this.executeOnAllNodes(node -> this.getConfig((RedisClusterNode)node, pattern))).collect(PropertiesCollector.INSTANCE);
    }

    @Override
    public Mono<Properties> getConfig(RedisClusterNode node, String pattern) {
        Assert.hasText(pattern, "Pattern must not be null or empty!");
        return this.connection.execute(node, c -> c.configGet(pattern)).map(Converters::toProperties).next();
    }

    @Override
    public Mono<String> setConfig(String param, String value) {
        return Flux.merge(this.executeOnAllNodes(node -> this.setConfig((RedisClusterNode)node, param, value))).map(Tuple2::getT2).last();
    }

    @Override
    public Mono<String> setConfig(RedisClusterNode node, String param, String value) {
        Assert.hasText(param, "Parameter must not be null or empty!");
        Assert.hasText(value, "Value must not be null or empty!");
        return this.connection.execute(node, c -> c.configSet(param, value)).next();
    }

    @Override
    public Mono<String> resetConfigStats() {
        return Flux.merge(this.executeOnAllNodes(this::resetConfigStats)).map(Tuple2::getT2).last();
    }

    @Override
    public Mono<String> resetConfigStats(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::configResetstat).next();
    }

    @Override
    public Mono<Long> time(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::time).map(ByteUtils::getBytes).collectList().map(LettuceConverters.toTimeConverter(TimeUnit.MILLISECONDS)::convert);
    }

    @Override
    public Flux<RedisClientInfo> getClientList() {
        return Flux.merge(this.executeOnAllNodesMany(this::getClientList)).map(Tuple2::getT2);
    }

    @Override
    public Flux<RedisClientInfo> getClientList(RedisClusterNode node) {
        return this.connection.execute(node, RedisServerReactiveCommands::clientList).concatMapIterable(LettuceConverters.stringToRedisClientListConverter()::convert);
    }

    private <T> Collection<Publisher<Tuple2<RedisClusterNode, T>>> executeOnAllNodes(Function<RedisClusterNode, Mono<T>> callback) {
        Set<RedisClusterNode> nodes = this.topologyProvider.getTopology().getNodes();
        ArrayList<Publisher<Tuple2<RedisClusterNode, T>>> pipeline = new ArrayList<Publisher<Tuple2<RedisClusterNode, T>>>(nodes.size());
        for (RedisClusterNode redisClusterNode : nodes) {
            pipeline.add(callback.apply(redisClusterNode).map(p -> Tuples.of(redisClusterNode, p)));
        }
        return pipeline;
    }

    private <T> Collection<Publisher<Tuple2<RedisClusterNode, T>>> executeOnAllNodesMany(Function<RedisClusterNode, Flux<T>> callback) {
        Set<RedisClusterNode> nodes = this.topologyProvider.getTopology().getNodes();
        ArrayList<Publisher<Tuple2<RedisClusterNode, T>>> pipeline = new ArrayList<Publisher<Tuple2<RedisClusterNode, T>>>(nodes.size());
        for (RedisClusterNode redisClusterNode : nodes) {
            pipeline.add(callback.apply(redisClusterNode).map(p -> Tuples.of(redisClusterNode, p)));
        }
        return pipeline;
    }

    private static enum PropertiesCollector implements Collector<Tuple2<RedisClusterNode, Properties>, Properties, Properties>
    {
        INSTANCE;


        @Override
        public Supplier<Properties> supplier() {
            return Properties::new;
        }

        @Override
        public BiConsumer<Properties, Tuple2<RedisClusterNode, Properties>> accumulator() {
            return (properties, tuple) -> {
                for (Map.Entry<Object, Object> entry : ((Properties)tuple.getT2()).entrySet()) {
                    properties.put(((RedisClusterNode)tuple.getT1()).asString() + "." + entry.getKey(), entry.getValue());
                }
            };
        }

        @Override
        public BinaryOperator<Properties> combiner() {
            return (left, right) -> {
                Properties merge = new Properties();
                merge.putAll((Map<?, ?>)left);
                merge.putAll((Map<?, ?>)right);
                return merge;
            };
        }

        @Override
        public Function<Properties, Properties> finisher() {
            return properties -> properties;
        }

        @Override
        public Set<Collector.Characteristics> characteristics() {
            return new HashSet<Collector.Characteristics>(Arrays.asList(Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH));
        }
    }
}

