/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ageiport.processor.core.dispatcher.http;

import com.alibaba.ageiport.common.collections.Lists;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.TaskSpec;
import com.alibaba.ageiport.processor.core.dispatcher.http.DispatchQueue;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatchRequest;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatchResponse;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatcherAgent;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatcherOptions;
import com.alibaba.ageiport.processor.core.executor.MainWorkerExecutor;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.cluster.ClusterManager;
import com.alibaba.ageiport.processor.core.spi.cluster.Node;
import com.alibaba.ageiport.processor.core.spi.dispatcher.Dispatcher;
import com.alibaba.ageiport.processor.core.spi.dispatcher.RootDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.dispatcher.SubDispatcherContext;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorker;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskWorkerFactory;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.specification.TaskSpecificationRegistry;
import com.alibaba.ageiport.processor.core.task.monitor.ClearTask;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class HttpDispatcher
implements Dispatcher {
    public static final String URL = "/subTasks";
    private static final Logger logger = LoggerFactory.getLogger(HttpDispatcher.class);
    private AgeiPort ageiPort;
    private HttpDispatcherOptions options;
    private Map<String, Node> failedNodeMap;
    private DispatchQueue dispatchQueue;
    private ClearTask clearTask;
    private int nodeIndex;
    HttpClient httpClient;

    public HttpDispatcher(AgeiPort ageiPort, HttpDispatcherOptions options) {
        this.ageiPort = ageiPort;
        this.options = options;
        this.failedNodeMap = new ConcurrentHashMap<String, Node>();
        this.dispatchQueue = new DispatchQueue();
        this.clearTask = new ClearTask("HttpDispatcher clear task");
        HttpDispatcherAgent agent = new HttpDispatcherAgent(ageiPort, this);
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle((Verticle)agent);
        this.httpClient = vertx.createHttpClient();
        new Thread(() -> {
            while (true) {
                this.doDispatchInClient();
            }
        }).start();
    }

    private void doDispatchInClient() {
        SubDispatcherContext context = null;
        try {
            context = this.dispatchQueue.get();
            Node node = this.getNextNode();
            int count = 0;
            while (this.failedNodeMap.containsKey(node.getId())) {
                node = this.getNextNode();
                int okNodeCount = this.ageiPort.getClusterManager().getNodes().size() - this.failedNodeMap.size();
                if (count++ <= okNodeCount) continue;
                throw new RuntimeException("no ok node");
            }
            this.dispatchToNode(context, node);
        }
        catch (Throwable e) {
            logger.error("doDispatchInClient failed, context:{}", new Object[]{context, e});
        }
    }

    private void dispatchToNode(SubDispatcherContext context, Node node) {
        HttpDispatchRequest request = new HttpDispatchRequest(context.getMainTaskId(), context.getSubTaskNos());
        RequestOptions options = new RequestOptions();
        options.setHost(node.getHost()).setPort(this.options.getPort()).setMethod(HttpMethod.POST).setURI(URL).setTimeout(3000L);
        this.httpClient.request(options, event -> {
            if (event.succeeded()) {
                HttpClientRequest httpClientRequest = (HttpClientRequest)event.result();
                String body = JsonUtil.toJsonString((Object)request);
                httpClientRequest.send(body, event1 -> {
                    if (event1.succeeded()) {
                        HttpClientResponse response = (HttpClientResponse)event1.result();
                        response.bodyHandler(event11 -> {
                            String resultJson = event11.toString();
                            HttpDispatchResponse dispatchResponse = (HttpDispatchResponse)JsonUtil.toObject((String)resultJson, HttpDispatchResponse.class);
                            if (dispatchResponse.getSuccess().booleanValue()) {
                                logger.info("dispatchToNode success, main:{}, ip{}, nos:{}, labels{}", new Object[]{context.getMainTaskId(), node.getHost(), context.getSubTaskNos(), context.getLabels()});
                            } else {
                                logger.error("dispatchToNode server response failed, ", new Object[]{resultJson});
                                this.dispatchFailed(node, context);
                            }
                        });
                    } else {
                        logger.error("dispatchToNode handle response, ", event1.cause());
                        this.dispatchFailed(node, context);
                    }
                });
            } else {
                logger.error("dispatchToNode get request failed, ", event.cause());
                this.dispatchFailed(node, context);
            }
        });
    }

    private void dispatchFailed(Node node, SubDispatcherContext context) {
        logger.error("dispatchFailed, main:{}, ip:{}, nos:{}, labels:{}", new Object[]{context.getMainTaskId(), node.getHost(), context.getSubTaskNos(), context.getLabels()});
        this.failedNodeMap.put(node.getId(), node);
        this.clearTask.addClearTask(node.getId(), this.options.getNodeFallbackMs().intValue(), () -> this.failedNodeMap.remove(node.getId()));
        this.dispatchQueue.add(context);
    }

    private Node getNextNode() {
        ClusterManager clusterManager = this.ageiPort.getClusterManager();
        List<Node> nodes = clusterManager.getNodes();
        int nodeCount = nodes.size();
        int index = this.nodeIndex % nodeCount;
        Node node = nodes.get(index);
        this.nodeIndex = (this.nodeIndex + 1) % nodeCount;
        return node;
    }

    @Override
    public void dispatchMainTaskPrepare(RootDispatcherContext context) {
        String mainTaskId = context.getMainTaskId();
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(mainTaskId);
        TaskSpecificationRegistry taskSpecificationRegistry = this.ageiPort.getSpecificationRegistry();
        TaskSpec taskSpec = taskSpecificationRegistry.get(mainTask.getCode());
        TaskSpiSelector spiSelector = this.ageiPort.getTaskSpiSelector();
        MainTaskWorkerFactory workerFactory = spiSelector.selectExtension(taskSpec.getExecuteType(), mainTask.getType(), mainTask.getCode(), MainTaskWorkerFactory.class);
        MainTaskWorker worker = workerFactory.create(this.ageiPort, mainTask);
        worker.isReduce(false);
        MainWorkerExecutor workerExecutor = this.ageiPort.getMainWorkerExecutor();
        workerExecutor.submit(worker);
    }

    @Override
    public void dispatchSubTasks(SubDispatcherContext context) {
        List<Integer> subTaskNos = context.getSubTaskNos();
        int nodeCount = this.ageiPort.getClusterManager().getNodes().size();
        List subTaskAvgByNodeCount = Lists.averageAssign(subTaskNos, (int)nodeCount);
        for (List nos : subTaskAvgByNodeCount) {
            SubDispatcherContext contextToDispatch = new SubDispatcherContext();
            contextToDispatch.setMainTaskId(context.getMainTaskId());
            contextToDispatch.setSubTaskNos(nos);
            contextToDispatch.setLabels(context.getLabels());
            this.dispatchQueue.add(contextToDispatch);
        }
    }

    @Override
    public void dispatchMainTaskReduce(RootDispatcherContext context) {
        String mainTaskId = context.getMainTaskId();
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(mainTaskId);
        TaskSpecificationRegistry taskSpecificationRegistry = this.ageiPort.getSpecificationRegistry();
        TaskSpec taskSpec = taskSpecificationRegistry.get(mainTask.getCode());
        TaskSpiSelector spiSelector = this.ageiPort.getTaskSpiSelector();
        MainTaskWorkerFactory workerFactory = spiSelector.selectExtension(taskSpec.getExecuteType(), taskSpec.getTaskType(), mainTask.getCode(), MainTaskWorkerFactory.class);
        MainTaskWorker worker = workerFactory.create(this.ageiPort, mainTask);
        worker.isReduce(true);
        MainWorkerExecutor workerExecutor = this.ageiPort.getMainWorkerExecutor();
        workerExecutor.submit(worker);
    }

    public AgeiPort getAgeiPort() {
        return this.ageiPort;
    }

    public HttpDispatcherOptions getOptions() {
        return this.options;
    }

    public Map<String, Node> getFailedNodeMap() {
        return this.failedNodeMap;
    }

    public DispatchQueue getDispatchQueue() {
        return this.dispatchQueue;
    }

    public ClearTask getClearTask() {
        return this.clearTask;
    }

    public int getNodeIndex() {
        return this.nodeIndex;
    }

    public HttpClient getHttpClient() {
        return this.httpClient;
    }

    public void setAgeiPort(AgeiPort ageiPort) {
        this.ageiPort = ageiPort;
    }

    public void setOptions(HttpDispatcherOptions options) {
        this.options = options;
    }

    public void setFailedNodeMap(Map<String, Node> failedNodeMap) {
        this.failedNodeMap = failedNodeMap;
    }

    public void setDispatchQueue(DispatchQueue dispatchQueue) {
        this.dispatchQueue = dispatchQueue;
    }

    public void setClearTask(ClearTask clearTask) {
        this.clearTask = clearTask;
    }

    public void setNodeIndex(int nodeIndex) {
        this.nodeIndex = nodeIndex;
    }

    public void setHttpClient(HttpClient httpClient) {
        this.httpClient = httpClient;
    }
}

