/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonList;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.BaseRemoteService;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceKey;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonRemoteService
extends BaseRemoteService
implements RRemoteService {
    private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
    private final Map<RemoteServiceKey, RemoteServiceMethod> beans = new ConcurrentHashMap<RemoteServiceKey, RemoteServiceMethod>();
    private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap();

    public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
        super(codec, name, commandExecutor, executorId, responses);
    }

    @Override
    protected RFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request, RemotePromise<Object> result) {
        RFuture<Boolean> future = this.commandExecutor.evalWriteAsync(this.name, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);redis.call('rpush', KEYS[1], ARGV[1]); return 1;", Arrays.asList(requestQueueName, requestQueueName + ":tasks"), request.getId(), this.encode(request));
        result.setAddFuture(future);
        return future;
    }

    @Override
    protected RFuture<Boolean> removeAsync(String requestQueueName, RequestId taskId) {
        return this.commandExecutor.evalWriteAsync(this.name, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('lrem', KEYS[1], 1, ARGV[1]); redis.call('hdel', KEYS[2], ARGV[1]);return 1;", Arrays.asList(requestQueueName, requestQueueName + ":tasks"), taskId.toString());
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object) {
        this.register(remoteInterface, object, 1);
    }

    @Override
    public <T> void deregister(Class<T> remoteInterface) {
        for (Method method : remoteInterface.getMethods()) {
            RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), this.getMethodSignature(method));
            this.beans.remove(key);
        }
        Entry entry = this.remoteMap.remove(remoteInterface);
        if (entry != null && entry.getFuture() != null) {
            entry.getFuture().cancel(false);
        }
    }

    @Override
    public int getPendingInvocations(Class<?> remoteInterface) {
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        return requestQueue.size();
    }

    @Override
    public int getFreeWorkers(Class<?> remoteInterface) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry == null) {
            return 0;
        }
        return entry.getCounter().get();
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers) {
        this.register(remoteInterface, object, workers, this.commandExecutor.getConnectionManager().getExecutor());
    }

    private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
        return new RedissonBlockingQueue(codec, this.commandExecutor, name, null);
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
        if (workers < 1) {
            throw new IllegalArgumentException("executorsAmount can't be lower than 1");
        }
        for (Method method : remoteInterface.getMethods()) {
            RemoteServiceMethod value = new RemoteServiceMethod(method, object);
            RemoteServiceKey key = new RemoteServiceKey(remoteInterface, method.getName(), this.getMethodSignature(method));
            if (this.beans.put(key, value) == null) continue;
            return;
        }
        this.remoteMap.put(remoteInterface, new Entry(workers));
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue<String> requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        this.subscribe(remoteInterface, requestQueue, executor);
    }

    private <T> void subscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry == null) {
            return;
        }
        RFuture<String> take = requestQueue.takeAsync();
        entry.setFuture(take);
        take.onComplete((requestId, e) -> {
            Entry entr = this.remoteMap.get(remoteInterface);
            if (entr == null) {
                return;
            }
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error("Can't process the remote service request.", e);
                this.subscribe(remoteInterface, requestQueue, executor);
                return;
            }
            if (entry.getCounter().get() == 0) {
                return;
            }
            if (entry.getCounter().decrementAndGet() > 0) {
                this.subscribe(remoteInterface, requestQueue, executor);
            }
            RMap<String, RemoteServiceRequest> tasks = this.getMap(requestQueue.getName() + ":tasks");
            RFuture<RemoteServiceRequest> taskFuture = this.getTask((String)requestId, tasks);
            taskFuture.onComplete((request, exc) -> {
                if (exc != null) {
                    if (exc instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error("Can't process the remote service request with id " + requestId, exc);
                    this.resubscribe(remoteInterface, requestQueue, executor);
                    return;
                }
                if (request == null) {
                    log.debug("Task can't be found for request: {}", requestId);
                    this.resubscribe(remoteInterface, requestQueue, executor);
                    return;
                }
                long elapsedTime = System.currentTimeMillis() - request.getDate();
                if (request.getOptions().isAckExpected() && elapsedTime > request.getOptions().getAckTimeoutInMillis()) {
                    log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", (Object)request.getId(), (Object)elapsedTime);
                    this.resubscribe(remoteInterface, requestQueue, executor);
                    return;
                }
                if (request.getOptions().isAckExpected()) {
                    String responseName = this.getResponseQueueName(request.getExecutorId());
                    String ackName = this.getAckName(request.getId());
                    RFuture ackClientsFuture = this.commandExecutor.evalWriteAsync(responseName, (Codec)LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[1]);return 1;end;return 0;", Arrays.asList(ackName), request.getOptions().getAckTimeoutInMillis());
                    ackClientsFuture.onComplete((r, ex) -> {
                        if (ex != null) {
                            if (ex instanceof RedissonShutdownException) {
                                return;
                            }
                            log.error("Can't send ack for request: " + request, ex);
                            this.resubscribe(remoteInterface, requestQueue, executor);
                            return;
                        }
                        if (!r.booleanValue()) {
                            this.resubscribe(remoteInterface, requestQueue, executor);
                            return;
                        }
                        RedissonList<RemoteServiceAck> list = new RedissonList<RemoteServiceAck>(this.codec, this.commandExecutor, responseName, null);
                        RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
                        addFuture.onComplete((res, exce) -> {
                            if (exce != null) {
                                if (exce instanceof RedissonShutdownException) {
                                    return;
                                }
                                log.error("Can't send ack for request: " + request, exce);
                                this.resubscribe(remoteInterface, requestQueue, executor);
                                return;
                            }
                            if (!res.booleanValue()) {
                                this.resubscribe(remoteInterface, requestQueue, executor);
                                return;
                            }
                            this.executeMethod(remoteInterface, requestQueue, executor, (RemoteServiceRequest)request);
                        });
                    });
                } else {
                    this.executeMethod(remoteInterface, requestQueue, executor, (RemoteServiceRequest)request);
                }
            });
        });
    }

    private <T> void executeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor, RemoteServiceRequest request) {
        RemoteServiceMethod method = this.beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignature()));
        String responseName = this.getResponseQueueName(request.getExecutorId());
        AtomicReference responseHolder = new AtomicReference();
        RedissonPromise cancelRequestFuture = new RedissonPromise();
        this.scheduleCheck(this.cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);
        Future<?> submitFuture = executor.submit(() -> this.invokeMethod(remoteInterface, requestQueue, request, method, responseName, executor, cancelRequestFuture, responseHolder));
        cancelRequestFuture.onComplete((r, e) -> {
            if (e != null) {
                return;
            }
            boolean res = submitFuture.cancel(r.isMayInterruptIfRunning());
            if (res) {
                RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true);
                if (!responseHolder.compareAndSet(null, response)) {
                    response = new RemoteServiceCancelResponse(request.getId(), false);
                }
                if (r.isSendResponse()) {
                    RMap<String, RemoteServiceCancelResponse> map = this.getMap(this.cancelResponseMapName);
                    map.fastPutAsync(request.getId(), response);
                    map.expireAsync(60L, TimeUnit.SECONDS);
                }
            }
        });
    }

    private <T> void invokeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, RemoteServiceRequest request, RemoteServiceMethod method, String responseName, ExecutorService executor, RFuture<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
        try {
            Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
            RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
            responseHolder.compareAndSet(null, response);
        }
        catch (Exception e2) {
            RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e2.getCause());
            responseHolder.compareAndSet(null, response);
            log.error("Can't execute: " + request, (Throwable)e2);
        }
        if (cancelRequestFuture != null) {
            cancelRequestFuture.cancel(false);
        }
        if (request.getOptions().isResultExpected() || responseHolder.get() instanceof RemoteServiceCancelResponse) {
            long timeout = 60000L;
            if (request.getOptions().getExecutionTimeoutInMillis() != null) {
                timeout = request.getOptions().getExecutionTimeoutInMillis();
            }
            RBlockingQueue<RRemoteServiceResponse> queue = this.getBlockingQueue(responseName, this.codec);
            RFuture<Void> clientsFuture = queue.putAsync(responseHolder.get());
            queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
            clientsFuture.onComplete((res, e) -> {
                if (e != null) {
                    if (e instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error("Can't send response: " + responseHolder.get() + " for request: " + request, e);
                }
                this.resubscribe(remoteInterface, requestQueue, executor);
            });
        } else {
            this.resubscribe(remoteInterface, requestQueue, executor);
        }
    }

    private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry != null && entry.getCounter().getAndIncrement() == 0) {
            this.subscribe(remoteInterface, requestQueue, executor);
        }
    }

    protected RFuture<RemoteServiceRequest> getTask(String requestId, RMap<String, RemoteServiceRequest> tasks) {
        return tasks.removeAsync(requestId);
    }

    public static class Entry {
        RFuture<String> future;
        final AtomicInteger counter;

        public Entry(int workers) {
            this.counter = new AtomicInteger(workers);
        }

        public void setFuture(RFuture<String> future) {
            this.future = future;
        }

        public RFuture<String> getFuture() {
            return this.future;
        }

        public AtomicInteger getCounter() {
            return this.counter;
        }
    }
}

