/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.spring.data.connection;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.redisson.api.PendingResult;
import org.redisson.api.StreamInfo;
import org.redisson.api.StreamMessageId;
import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ObjectDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.StreamInfoDecoder;
import org.redisson.reactive.CommandReactiveExecutor;
import org.redisson.spring.data.connection.RedissonBaseReactive;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RedissonReactiveStreamCommands
extends RedissonBaseReactive
implements ReactiveStreamCommands {
    private static final RedisCommand<StreamInfo<Object, Object>> XINFO_STREAM = new RedisCommand<Object>("XINFO", "STREAM", new ListMultiDecoder2(new StreamInfoDecoder(), new ObjectDecoder<Object>(StringCodec.INSTANCE.getValueDecoder()), new ObjectMapDecoder(false)));
    private static final RedisStrictCommand<String> XGROUP_STRING = new RedisStrictCommand("XGROUP");

    RedissonReactiveStreamCommands(CommandReactiveExecutor executorService) {
        super(executorService);
    }

    private static List<String> toStringList(List<RecordId> recordIds) {
        return recordIds.stream().map(RecordId::getValue).collect(Collectors.toList());
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "Group name must not be null!");
            Assert.notNull((Object)command.getNewOwner(), "NewOwner must not be null!");
            Assert.notEmpty(command.getOptions().getIds(), "Ids collection must not be empty!");
            ArrayList<Object> params = new ArrayList<Object>();
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            params.add(k);
            params.add(command.getGroupName());
            params.add(command.getNewOwner());
            params.add(Objects.requireNonNull(command.getOptions().getIdleTime()).toMillis());
            params.addAll(Arrays.asList(command.getOptions().getIdsAsStringArray()));
            params.add("JUSTID");
            Mono<Map> m = this.write(k, ByteArrayCodec.INSTANCE, RedisCommands.XCLAIM, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>((ReactiveStreamCommands.XClaimCommand)command, Flux.fromStream(v.entrySet().stream()).map(e -> RecordId.of(((StreamMessageId)e.getKey()).toString()))));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<ReactiveStreamCommands.XClaimCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "Group name must not be null!");
            Assert.notNull((Object)command.getNewOwner(), "NewOwner must not be null!");
            Assert.notEmpty(command.getOptions().getIds(), "Ids collection must not be empty!");
            ArrayList<Object> params = new ArrayList<Object>();
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            params.add(k);
            params.add(command.getGroupName());
            params.add(command.getNewOwner());
            params.add(Objects.requireNonNull(command.getOptions().getIdleTime()).toMillis());
            params.addAll(Arrays.asList(command.getOptions().getIdsAsStringArray()));
            Mono<Map> m = this.write(k, ByteArrayCodec.INSTANCE, RedisCommands.XCLAIM, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>((ReactiveStreamCommands.XClaimCommand)command, Flux.fromStream(v.entrySet().stream()).map(e -> {
                Map<ByteBuffer, ByteBuffer> map = ((Map)e.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> ByteBuffer.wrap((byte[])entry.getKey()), entry -> ByteBuffer.wrap((byte[])entry.getValue())));
                return StreamRecords.newRecord().in(command.getKey()).withId(RecordId.of(((StreamMessageId)e.getKey()).toString())).ofBuffer(map);
            })));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "Group name must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<PendingResult> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XPENDING, k, command.getGroupName());
            return m.map(v -> {
                Range<String> range = Range.open(v.getLowestId().toString(), v.getHighestId().toString());
                PendingMessagesSummary s = new PendingMessagesSummary(command.getGroupName(), v.getTotal(), range, v.getConsumerNames());
                return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>((ReactiveStreamCommands.PendingRecordsCommand)command, s);
            });
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>> xPending(Publisher<ReactiveStreamCommands.PendingRecordsCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "Group name must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            ArrayList<Object> params = new ArrayList<Object>();
            params.add(k);
            params.add(command.getRange().getLowerBound().getValue().orElse("-"));
            params.add(command.getRange().getUpperBound().getValue().orElse("+"));
            if (command.getCount() != null) {
                params.add(command.getCount());
            }
            if (command.getConsumerName() != null) {
                params.add(command.getConsumerName());
            }
            Mono<List> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XPENDING_ENTRIES, params.toArray());
            return m.map(list -> {
                List<PendingMessage> msgs = list.stream().map(v -> new PendingMessage(RecordId.of(v.getId().toString()), Consumer.from(command.getGroupName(), v.getConsumerName()), Duration.of(v.getIdleTime(), ChronoUnit.MILLIS), v.getLastTimeDelivered())).collect(Collectors.toList());
                PendingMessages s = new PendingMessages(command.getGroupName(), command.getRange(), msgs);
                return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>((ReactiveStreamCommands.PendingRecordsCommand)command, s);
            });
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<StreamInfo> m = this.write(k, ByteArrayCodec.INSTANCE, XINFO_STREAM, new Object[]{k});
            return m.map(i -> {
                HashMap<String, Object> res = new HashMap<String, Object>();
                res.put("length", Long.valueOf(i.getLength()));
                res.put("first-entry", i.getFirstEntry().getData());
                res.put("last-entry", i.getLastEntry().getData());
                res.put("radix-tree-keys", i.getRadixTreeKeys());
                res.put("radix-tree-nodes", i.getRadixTreeNodes());
                res.put("groups", Long.valueOf(i.getGroups()));
                res.put("last-generated-id", i.getLastGeneratedId().toString());
                List<Object> list = res.entrySet().stream().flatMap(e -> Stream.of(e.getKey(), e.getValue())).collect(Collectors.toList());
                return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>((ReactiveStreamCommands.XInfoCommand)command, StreamInfo.XInfoStream.fromList(list));
            });
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>> xInfoGroups(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<List> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XINFO_GROUPS, new Object[]{k});
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>((ReactiveStreamCommands.XInfoCommand)command, Flux.fromStream(v.stream()).map(r -> {
                HashMap<String, Object> res = new HashMap<String, Object>();
                res.put("name", r.getName());
                res.put("consumers", Long.valueOf(r.getConsumers()));
                res.put("pending", Long.valueOf(r.getPending()));
                res.put("last-delivered-id", r.getLastDeliveredId().toString());
                List<Object> list = res.entrySet().stream().flatMap(e -> Stream.of(e.getKey(), e.getValue())).collect(Collectors.toList());
                return StreamInfo.XInfoGroup.fromList(list);
            })));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>> xInfoConsumers(Publisher<ReactiveStreamCommands.XInfoCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "Group name must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<List> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XINFO_CONSUMERS, k, command.getGroupName());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>((ReactiveStreamCommands.XInfoCommand)command, Flux.fromStream(v.stream()).map(r -> {
                HashMap<String, Object> res = new HashMap<String, Object>();
                res.put("name", r.getName());
                res.put("idle", r.getIdleTime());
                res.put("pending", Long.valueOf(r.getPending()));
                List<Object> list = res.entrySet().stream().flatMap(e -> Stream.of(e.getKey(), e.getValue())).collect(Collectors.toList());
                return new StreamInfo.XInfoConsumer(command.getGroupName(), list);
            })));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroup(), "Group must not be null!");
            Assert.notNull(command.getRecordIds(), "recordIds must not be null!");
            ArrayList<Object> params = new ArrayList<Object>();
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            params.add(k);
            params.add(command.getGroup());
            params.addAll(RedissonReactiveStreamCommands.toStringList(command.getRecordIds()));
            Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XACK, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>((ReactiveStreamCommands.AcknowledgeCommand)command, (Long)v));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull(command.getBody(), "Body must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            LinkedList<Object> params = new LinkedList<Object>();
            params.add(k);
            if (command.getMaxlen() != null) {
                params.add("MAXLEN");
                params.add(command.getMaxlen());
            }
            if (!command.getRecord().getId().shouldBeAutoGenerated()) {
                params.add(command.getRecord().getId().getValue());
            } else {
                params.add("*");
            }
            for (Map.Entry<ByteBuffer, ByteBuffer> entry : command.getBody().entrySet()) {
                params.add(RedissonReactiveStreamCommands.toByteArray(entry.getKey()));
                params.add(RedissonReactiveStreamCommands.toByteArray(entry.getValue()));
            }
            Mono<StreamMessageId> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XADD, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>((ReactiveStreamCommands.AddStreamRecord)command, RecordId.of(v.toString())));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull(command.getRecordIds(), "recordIds must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            ArrayList<Object> params = new ArrayList<Object>();
            params.add(k);
            params.addAll(RedissonReactiveStreamCommands.toStringList(command.getRecordIds()));
            Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XDEL, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>((ReactiveStreamCommands.DeleteCommand)command, (Long)v));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XLEN, new Object[]{k});
            return m.map(v -> new ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>((ReactiveRedisConnection.KeyCommand)command, (Long)v));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.range(RedisCommands.XRANGE, publisher);
    }

    private Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> range(RedisCommand<?> rangeCommand, Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull(command.getRange(), "Range must not be null!");
            Assert.notNull((Object)command.getLimit(), "Limit must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            LinkedList<Object> params = new LinkedList<Object>();
            params.add(k);
            if (rangeCommand == RedisCommands.XRANGE) {
                params.add(command.getRange().getLowerBound().getValue().orElse("-"));
                params.add(command.getRange().getUpperBound().getValue().orElse("+"));
            } else {
                params.add(command.getRange().getUpperBound().getValue().orElse("+"));
                params.add(command.getRange().getLowerBound().getValue().orElse("-"));
            }
            if (command.getLimit().getCount() > 0) {
                params.add("COUNT");
                params.add(command.getLimit().getCount());
            }
            Mono<Map> m = this.write(k, ByteArrayCodec.INSTANCE, rangeCommand, params.toArray());
            return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>((ReactiveStreamCommands.RangeCommand)command, Flux.fromStream(v.entrySet().stream()).map(e -> {
                Map<ByteBuffer, ByteBuffer> map = ((Map)e.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> ByteBuffer.wrap((byte[])entry.getKey()), entry -> ByteBuffer.wrap((byte[])entry.getValue())));
                return StreamRecords.newRecord().in(command.getKey()).withId(RecordId.of(((StreamMessageId)e.getKey()).toString())).ofBuffer(map);
            })));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull(command.getStreamOffsets(), "StreamOffsets must not be null!");
            Assert.notNull((Object)command.getReadOptions(), "ReadOptions must not be null!");
            ArrayList<Object> params = new ArrayList<Object>();
            if (command.getConsumer() != null) {
                params.add("GROUP");
                params.add(command.getConsumer().getGroup());
                params.add(command.getConsumer().getName());
            }
            if (command.getReadOptions().getCount() != null && command.getReadOptions().getCount() > 0L) {
                params.add("COUNT");
                params.add(command.getReadOptions().getCount());
            }
            if (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L) {
                params.add("BLOCK");
                params.add(command.getReadOptions().getBlock());
            }
            if (command.getConsumer() != null && command.getReadOptions().isNoack()) {
                params.add("NOACK");
            }
            params.add("STREAMS");
            for (StreamOffset<ByteBuffer> streamOffset : command.getStreamOffsets()) {
                params.add(RedissonReactiveStreamCommands.toByteArray(streamOffset.getKey()));
            }
            for (StreamOffset<ByteBuffer> streamOffset : command.getStreamOffsets()) {
                params.add(streamOffset.getOffset().getOffset());
            }
            Mono<Object> m = command.getConsumer() == null ? (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L ? this.read(RedissonReactiveStreamCommands.toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, this.executorService.getServiceManager().getXReadBlockingCommand(), params.toArray()) : this.read(RedissonReactiveStreamCommands.toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, this.executorService.getServiceManager().getXReadCommand(), params.toArray())) : (command.getReadOptions().getBlock() != null && command.getReadOptions().getBlock() > 0L ? this.read(RedissonReactiveStreamCommands.toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, this.executorService.getServiceManager().getXReadGroupBlockingCommand(), params.toArray()) : this.read(RedissonReactiveStreamCommands.toByteArray(command.getStreamOffsets().get(0).getKey()), ByteArrayCodec.INSTANCE, this.executorService.getServiceManager().getXReadGroupCommand(), params.toArray()));
            return m.map(v -> new ReactiveRedisConnection.CommandResponse((ReactiveStreamCommands.ReadCommand)command, Flux.fromStream(v.entrySet().stream()).map(ee -> ((Map)ee.getValue()).entrySet().stream().map(e -> {
                Map<ByteBuffer, ByteBuffer> map = ((Map)e.getValue()).entrySet().stream().collect(Collectors.toMap(entry -> ByteBuffer.wrap((byte[])entry.getKey()), entry -> ByteBuffer.wrap((byte[])entry.getValue())));
                return StreamRecords.newRecord().in((String)ee.getKey()).withId(RecordId.of(((StreamMessageId)e.getKey()).toString())).ofBuffer(map);
            })).flatMap(Flux::fromStream)));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getGroupName(), "GroupName must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                Assert.notNull((Object)command.getReadOffset(), "ReadOffset must not be null!");
                Mono<String> m = this.write(k, StringCodec.INSTANCE, XGROUP_STRING, "CREATE", k, command.getGroupName(), command.getReadOffset().getOffset(), "MKSTREAM");
                return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, (String)v));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                Assert.notNull((Object)command.getConsumerName(), "ConsumerName must not be null!");
                Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DELCONSUMER", k, command.getGroupName(), command.getConsumerName());
                return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, v > 0L ? "OK" : "Error"));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XGROUP_LONG, "DESTROY", k, command.getGroupName());
                return m.map(v -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, v > 0L ? "OK" : "Error"));
            }
            throw new IllegalArgumentException("unknown command " + (Object)((Object)command.getAction()));
        });
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> publisher) {
        return this.range(RedisCommands.XREVRANGE, publisher);
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> publisher) {
        return this.execute(publisher, command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null!");
            Assert.notNull((Object)command.getCount(), "Count must not be null!");
            byte[] k = RedissonReactiveStreamCommands.toByteArray(command.getKey());
            Mono<Long> m = this.write(k, StringCodec.INSTANCE, RedisCommands.XTRIM, k, "MAXLEN", command.getCount());
            return m.map(v -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.TrimCommand, Long>((ReactiveStreamCommands.TrimCommand)command, (Long)v));
        });
    }
}

