/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master.persistence;

import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.discovery.ServerDiscovery;
import com.alibaba.schedulerx.worker.discovery.ServerDiscoveryFactory;
import com.alibaba.schedulerx.worker.domain.TaskInfo;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.persistence.PersistenceUtil;
import com.alibaba.schedulerx.worker.master.persistence.TaskPersistence;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ServerTaskPersistence
implements TaskPersistence {
    private static final Logger LOGGER = LogFactory.getLogger(ServerTaskPersistence.class);
    private ServerDiscovery serverDiscovery;
    private final String groupId;

    public ServerTaskPersistence(String groupId) {
        this.groupId = groupId;
        this.serverDiscovery = ServerDiscoveryFactory.getDiscovery(groupId);
    }

    @Override
    public int updateTaskStatus(long jobInstanceId, List<Long> taskIds, TaskStatus status, String workerId, String workerAddr) throws Exception {
        int res = -1;
        if (CollectionUtils.isEmpty(taskIds)) {
            return res;
        }
        Worker.WorkerBatchReportTaskStatuesRequest.Builder builder = Worker.WorkerBatchReportTaskStatuesRequest.newBuilder();
        Worker.BatchTaskStatues batchTaskStatues = Worker.BatchTaskStatues.newBuilder().setStatus(status.getValue()).setWorkerId(workerId).setWorkerAddr(workerAddr).addAllTaskIds(taskIds).build();
        builder.setJobInstanceId(jobInstanceId);
        builder.setGroupId(this.groupId);
        builder.addTaskStatues(batchTaskStatues);
        try {
            Worker.WorkerBatchReportTaskStatuesResponse response = (Worker.WorkerBatchReportTaskStatuesResponse)FutureUtils.awaitResult(this.serverDiscovery.getTaskStatusRouter(), (Object)builder.build(), 15L);
            if (!response.getSuccess()) {
                LOGGER.error("batch update task status of jobInstance={} error, {}", jobInstanceId, response.getMessage());
                throw new IOException(response.getMessage());
            }
            res = response.getAffectCnt();
        }
        catch (Throwable e) {
            String errMsg = "batch update task status of jobInstanceId=" + jobInstanceId + " timeout.";
            LOGGER.error(errMsg, e);
            throw e;
        }
        return res;
    }

    @Override
    public void updateTaskStatues(List<Worker.ContainerReportTaskStatusRequest> taskStatusInfos) throws Exception {
        if (CollectionUtils.isEmpty(taskStatusInfos)) {
            return;
        }
        Worker.ContainerReportTaskStatusRequest info = taskStatusInfos.get(0);
        Map<Integer, Map<String, List<Long>>> status2WorkIdAddr2TaskIds = PersistenceUtil.getTaskStatusMap(taskStatusInfos);
        Worker.WorkerBatchReportTaskStatuesRequest.Builder builder = Worker.WorkerBatchReportTaskStatuesRequest.newBuilder();
        for (Map.Entry<Integer, Map<String, List<Long>>> entry : status2WorkIdAddr2TaskIds.entrySet()) {
            int status = entry.getKey();
            Map<String, List<Long>> workerAddr2TaskIds = entry.getValue();
            for (Map.Entry<String, List<Long>> entry1 : workerAddr2TaskIds.entrySet()) {
                String[] workerIdAddr = entry1.getKey().split("@");
                List<Long> taskIds = entry1.getValue();
                Worker.BatchTaskStatues batchTaskStatues = Worker.BatchTaskStatues.newBuilder().setStatus(status).setWorkerId(workerIdAddr[0]).setWorkerAddr(workerIdAddr[1]).addAllTaskIds(taskIds).build();
                builder.addTaskStatues(batchTaskStatues);
                builder.setGroupId(this.groupId);
            }
        }
        builder.setJobInstanceId(info.getJobInstanceId());
        SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(builder.build(), null);
    }

    @Override
    public void clearTasks(long jobInstanceId) throws Exception {
        Worker.WorkerClearTasksRequest request2 = Worker.WorkerClearTasksRequest.newBuilder().setJobInstanceId(jobInstanceId).build();
        try {
            Worker.WorkerClearTasksResponse response = (Worker.WorkerClearTasksResponse)FutureUtils.awaitResult(this.serverDiscovery.getMapMasterRouter(), (Object)request2, 5L);
            if (!response.getSuccess()) {
                LOGGER.error("clear tasks of jobInstance[{}] error, {}", jobInstanceId, response.getMessage());
                throw new IOException(response.getMessage());
            }
            LOGGER.info("clear tasks of jobInstance[{}] sucessfully", jobInstanceId);
        }
        catch (Throwable e) {
            String errMsg = "clear tasks of jobInstanceId=" + jobInstanceId + " timeout.";
            LOGGER.error(errMsg, e);
            throw new IOException(errMsg);
        }
    }

    @Override
    public void createTask(long jobId, long jobInstanceId, long taskId, String taskName, ByteString taskBody) throws Exception {
    }

    @Override
    public void createTasks(List<Worker.MasterStartContainerRequest> containers, String workerId, String workerAddr) throws Exception {
        if (CollectionUtils.isEmpty(containers)) {
            return;
        }
        long jobInstanceId = containers.get(0).getJobInstanceId();
        Worker.WorkerBatchCreateTasksRequest.Builder builder = Worker.WorkerBatchCreateTasksRequest.newBuilder();
        for (Worker.MasterStartContainerRequest taskInfo : containers) {
            Worker.WorkerCreateTaskRequest taskRequest = Worker.WorkerCreateTaskRequest.newBuilder().setJobId(taskInfo.getJobId()).setJobInstanceId(taskInfo.getJobInstanceId()).setTaskId(taskInfo.getTaskId()).setTaskName(taskInfo.getTaskName()).setTaskBody(taskInfo.getTask()).build();
            builder.addTask(taskRequest);
        }
        builder.setJobInstanceId(jobInstanceId);
        builder.setWorkerId(workerId);
        builder.setWorkerAddr(workerAddr);
        try {
            Worker.WorkerBatchCreateTasksResponse response = (Worker.WorkerBatchCreateTasksResponse)FutureUtils.awaitResult(this.serverDiscovery.getMapMasterRouter(), (Object)builder.build(), 45L);
            if (!response.getSuccess()) {
                LOGGER.error("batch create tasks error, jobInstanceId={}, reason={}.", jobInstanceId, response.getMessage());
                throw new IOException(response.getMessage());
            }
            LOGGER.info("batch create tasks to Server successfully, jobInstanceId={}, size={}", jobInstanceId, containers.size());
        }
        catch (Throwable e) {
            String errMsg = "batch create tasks timeout, jobInstanceId=" + jobInstanceId;
            LOGGER.error(errMsg, e);
            throw new IOException(errMsg);
        }
    }

    @Override
    public List<TaskInfo> pull(long jobInstanceId, int pageSize) throws Exception {
        ArrayList<TaskInfo> taskInfos = Lists.newArrayList();
        Worker.WorkerPullTasksRequest request2 = Worker.WorkerPullTasksRequest.newBuilder().setJobInstanceId(jobInstanceId).setPageSize(pageSize).build();
        try {
            Worker.WorkerPullTasksResponse response = (Worker.WorkerPullTasksResponse)FutureUtils.awaitResult(this.serverDiscovery.getMapMasterRouter(), (Object)request2, 30L);
            if (response.getSuccess()) {
                List<Worker.TaskMessage> taskMessages = response.getTaskMessageList();
                for (Worker.TaskMessage taskMessage : taskMessages) {
                    taskInfos.add(this.convert2TaskInfo(taskMessage));
                }
            } else {
                LOGGER.error("pull tasks of jobInstance[{}] failed, {}", jobInstanceId, response.getMessage());
            }
        }
        catch (Throwable e) {
            String errMsg = "pull task[jobInstanceId=" + jobInstanceId + ", pageSize=" + pageSize + "] timeout.";
            LOGGER.error(errMsg, e);
            throw e;
        }
        return taskInfos;
    }

    @Override
    public InstanceStatus checkInstanceStatus(long jobInstanceId) throws Exception {
        InstanceStatus status = InstanceStatus.UNKNOWN;
        Worker.WorkerQueryJobInstanceStatusRequest request2 = Worker.WorkerQueryJobInstanceStatusRequest.newBuilder().setJobInstanceId(jobInstanceId).build();
        try {
            Worker.WorkerQueryJobInstanceStatusResponse response = (Worker.WorkerQueryJobInstanceStatusResponse)FutureUtils.awaitResult(this.serverDiscovery.getMapMasterRouter(), (Object)request2, 30L);
            if (response.getSuccess()) {
                status = InstanceStatus.parseValue(response.getStatus());
            } else {
                LOGGER.error("query job instance status failed, " + response.getMessage());
            }
        }
        catch (Throwable e) {
            String errMsg = "checkInstanceStatus of jobInstanceId=" + jobInstanceId + " timeout.";
            LOGGER.error(errMsg + " server={}", this.serverDiscovery.getActiveServerAddr(), e);
            throw new IOException(errMsg);
        }
        return status;
    }

    private TaskInfo convert2TaskInfo(Worker.TaskMessage taskMessage) {
        return TaskInfo.newBuilder().setTaskId(taskMessage.getTaskId()).setTaskName(taskMessage.getTaskName()).setTaskBody(taskMessage.getTaskBody().toByteArray()).build();
    }

    @Override
    public void initTable() throws Exception {
    }

    @Override
    public int batchUpdateTaskStatus(long jobInstanceId, TaskStatus status, String workerId, String workerAddr) {
        int res = -1;
        Worker.WorkerBatchUpdateTaskStatusRequest.Builder builder = Worker.WorkerBatchUpdateTaskStatusRequest.newBuilder();
        builder.setJobInstanceId(jobInstanceId);
        builder.setStatus(status.getValue());
        if (workerAddr != null) {
            builder.setWorkerAddr(workerAddr);
            builder.setWorkerId(workerId);
        }
        Worker.WorkerBatchUpdateTaskStatusRequest request2 = builder.build();
        try {
            Worker.WorkerBatchUpdateTaskStatusResponse response = (Worker.WorkerBatchUpdateTaskStatusResponse)FutureUtils.awaitResult(this.serverDiscovery.getMapMasterRouter(), (Object)request2, 5L);
            if (response.getSuccess()) {
                res = response.getAffectCnt();
                LOGGER.debug("batch update status=>{} to Server sucessfully, jobInstanceId={}, workerAddr={}", new Object[]{status, jobInstanceId, workerAddr});
            } else {
                LOGGER.error("batch update status failed, " + response.getMessage());
            }
        }
        catch (Throwable e) {
            LOGGER.error("batchUpdateTaskStatus of jobInstanceId={} timeout", jobInstanceId, e);
        }
        return res;
    }
}

