/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.discovery;

import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.alibaba.schedulerx.common.domain.JSONResult;
import com.alibaba.schedulerx.common.domain.ScaleGroupResult;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.HttpResponse;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.JsonNode;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.Unirest;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.lang.StringUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.discovery.GroupManager;
import com.alibaba.schedulerx.worker.discovery.ServerDiscovery;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DefaultServerDiscovery
implements ServerDiscovery {
    private static final Logger LOGGER = LogFactory.getLogger(DefaultServerDiscovery.class);
    private static final String ACTIVE_SERVER_QUERY_PATH = "/worker/v1/appgroup/getLeaderAddr";
    private static final String SERVER_DISCOVERY_THREAD_NAME = "activeServerDiscoveryThread-";
    private ScheduledExecutorService scheduledExecutorService;
    private volatile String activeServerAddr;
    private volatile ActorSelection instanceStatusRouter;
    private volatile ActorSelection mapMasterRouter;
    private volatile ActorSelection taskStatusRouter;
    private volatile ActorSelection heartbeatActor;
    @Deprecated
    private volatile List<ActorSelection> standbyServerHeatbeatActors;
    private volatile ActorSystem actorSystem = SchedulerxWorker.actorSystem;
    private GroupManager groupManager = GroupManager.INSTANCE;

    @Override
    public void start(final String namespace, final String namespaceSource, final String groupId) throws Exception {
        final String consoleDomain = ConfigUtil.getWorkerConfig().getString("domainName");
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(SERVER_DISCOVERY_THREAD_NAME + groupId).build(), new ThreadPoolExecutor.DiscardPolicy());
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    String activeServer = DefaultServerDiscovery.this.queryActiveServer(consoleDomain, groupId, namespace, namespaceSource);
                    if (!StringUtils.isEmpty(activeServer) && !activeServer.equalsIgnoreCase(DefaultServerDiscovery.this.activeServerAddr)) {
                        LOGGER.info("activeServerAddr={} change to {}, actorSystem={}", DefaultServerDiscovery.this.activeServerAddr, activeServer, DefaultServerDiscovery.this.actorSystem.provider().getDefaultAddress());
                        DefaultServerDiscovery.this.activeServerAddr = activeServer;
                        DefaultServerDiscovery.this.heartbeatActor = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getActiveHeartbeatAkkaPath(DefaultServerDiscovery.this.activeServerAddr));
                        DefaultServerDiscovery.this.instanceStatusRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerInstanceStatusRouterAkkaPath());
                        DefaultServerDiscovery.this.mapMasterRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerMapMasterRouterAkkaPath());
                        DefaultServerDiscovery.this.taskStatusRouter = DefaultServerDiscovery.this.actorSystem.actorSelection(DefaultServerDiscovery.this.getServerTaskStatusRouterAkkaPath());
                    }
                }
                catch (Throwable t) {
                    LOGGER.error("scheduled query active server error!", t);
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
    }

    @Override
    public String getActiveServerAddr() {
        return this.activeServerAddr;
    }

    private String getServerInstanceStatusRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/instance_status_router";
    }

    private String getServerMapMasterRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/map_master_router";
    }

    private String getServerTaskStatusRouterAkkaPath() {
        return "akka.tcp://server@" + this.activeServerAddr + "/user/task_status_router";
    }

    private String getActiveHeartbeatAkkaPath(String serverAddr) {
        return "akka.tcp://server@" + serverAddr + "/user/heartbeat";
    }

    @Override
    public ActorSelection getActiveHeartBeatActor() {
        return this.heartbeatActor;
    }

    @Override
    public void stop() throws Exception {
        this.scheduledExecutorService.shutdown();
    }

    private String queryActiveServer(String domain, String groupId, String namespace, String namespaceSource) {
        String activeServerQueryUrl;
        String activeServer = null;
        if (namespace != null) {
            activeServerQueryUrl = "http://" + domain + ACTIVE_SERVER_QUERY_PATH + "?groupId=" + groupId + "&namespace=" + namespace;
            if (StringUtils.isNotBlank(namespaceSource)) {
                activeServerQueryUrl = activeServerQueryUrl + "&namespaceSource=" + namespaceSource;
            }
        } else {
            activeServerQueryUrl = "http://" + domain + ACTIVE_SERVER_QUERY_PATH + "?groupId=" + groupId;
        }
        activeServerQueryUrl = activeServerQueryUrl + "&enableScale=true";
        try {
            HttpResponse<JsonNode> jsonResponse = Unirest.get(activeServerQueryUrl).asJson();
            JSONResult jsonResult = JsonUtil.fromJson(jsonResponse.getBody().toString(), JSONResult.class);
            LOGGER.debug("queryActiveServer url={}, response={}", activeServerQueryUrl, jsonResponse.getBody().toString());
            if (jsonResult != null && jsonResult.isSuccess()) {
                if (jsonResult.getCode() == 300) {
                    ScaleGroupResult groupResult = JsonUtil.fromJson(jsonResult.getData().toString(), ScaleGroupResult.class);
                    activeServer = groupResult.getCurrentLeaderAddr();
                    if (CollectionUtils.isNotEmpty(groupResult.getGroupIds())) {
                        for (String childGroup : groupResult.getGroupIds()) {
                            if (this.groupManager.contains(childGroup)) continue;
                            this.groupManager.startServerDiscovery(childGroup);
                            this.groupManager.appendGroupId(childGroup, groupId);
                        }
                    }
                } else {
                    activeServer = (String)jsonResult.getData();
                }
            }
        }
        catch (Throwable e) {
            LOGGER.error("query active server error, url=" + activeServerQueryUrl, e);
        }
        return activeServer;
    }

    @Override
    public ActorSelection getInstanceStatusRouter() {
        return this.instanceStatusRouter;
    }

    @Override
    public ActorSelection getMapMasterRouter() {
        return this.mapMasterRouter;
    }

    @Override
    public ActorSelection getTaskStatusRouter() {
        return this.taskStatusRouter;
    }

    @Override
    @Deprecated
    public List<ActorSelection> getStandbyServerHeatbeatActors() {
        return this.standbyServerHeatbeatActors;
    }

    @Override
    public void reset(ActorSystem actorSystem) {
        this.activeServerAddr = null;
        this.heartbeatActor = null;
        this.instanceStatusRouter = null;
        this.mapMasterRouter = null;
        this.taskStatusRouter = null;
        this.actorSystem = actorSystem;
    }
}

