/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.schedulerx.worker.master;

import akka.actor.ActorContext;
import akka.actor.ActorSelection;
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.domain.JobInstanceInfo;
import com.alibaba.schedulerx.common.domain.MapTaskProgress;
import com.alibaba.schedulerx.common.domain.MapTaskXAttrs;
import com.alibaba.schedulerx.common.domain.Metrics;
import com.alibaba.schedulerx.common.domain.TaskDispatchMode;
import com.alibaba.schedulerx.common.domain.TaskProgressCounter;
import com.alibaba.schedulerx.common.domain.TaskStatus;
import com.alibaba.schedulerx.common.domain.TimeType;
import com.alibaba.schedulerx.common.domain.WorkerProgressCounter;
import com.alibaba.schedulerx.common.monitor.MetricsCollector;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.common.util.JobUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.scala.concurrent.ExecutionContext;
import com.alibaba.schedulerx.shade.scala.concurrent.Future;
import com.alibaba.schedulerx.shade.scala.concurrent.duration.Duration;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.actor.FutureExecutorPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.batch.TMStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.TaskDispatchReqHandler;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.TaskInfo;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.ClientLoggerMessage;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.persistence.TaskPersistence;
import com.alibaba.schedulerx.worker.processor.JobProcessor;
import com.alibaba.schedulerx.worker.processor.MapJobProcessor;
import com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.alibaba.schedulerx.worker.util.ActorPathUtil;
import com.alibaba.schedulerx.worker.util.JavaProcessorProfileUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class MapTaskMaster
extends TaskMaster {
    private static final Logger LOGGER = LogFactory.getLogger(MapTaskMaster.class);
    private volatile int index = 0;
    protected volatile int pageSize = ConfigUtil.getWorkerConfig().getInt("map.master.page.size", 100);
    protected volatile int queueSize = ConfigUtil.getWorkerConfig().getInt("map.master.queue.size", 10000);
    private volatile int dispatcherSize = ConfigUtil.getWorkerConfig().getInt("map.master.dispatcher.size", 5);
    protected ReqQueue<Worker.ContainerReportTaskStatusRequest> taskStatusReqQueue;
    protected TMStatusReqHandler<Worker.ContainerReportTaskStatusRequest> taskStatusReqBatchHandler;
    protected ReqQueue<Worker.MasterStartContainerRequest> taskBlockingQueue;
    protected TaskDispatchReqHandler<Worker.MasterStartContainerRequest> taskDispatchReqHandler;
    private volatile String rootTaskResult;
    protected TaskPersistence taskPersistence;
    protected Map<String, TaskProgressCounter> taskProgressMap = Maps.newConcurrentMap();
    protected Map<String, WorkerProgressCounter> workerProgressMap = Maps.newConcurrentMap();
    private Map<Long, String> taskResultMap = Maps.newHashMap();
    private Map<Long, TaskStatus> taskStatusMap = Maps.newHashMap();
    protected MapTaskXAttrs xAttrs = null;
    protected volatile AtomicInteger taskCounter = new AtomicInteger(0);
    protected ExecutionContext futureExecutor;
    private LogCollector logCollector = LogCollectorFactory.get();

    public MapTaskMaster(JobInstanceInfo jobInstanceInfo, ActorContext actorContext) throws Exception {
        super(jobInstanceInfo, actorContext);
        this.futureExecutor = FutureExecutorPool.INSTANCE.get("MapTaskMaster");
    }

    @Override
    protected void init() {
        if (this.INITED) {
            return;
        }
        super.init();
        final String jobIdAndInstanceId = this.jobInstanceInfo.getJobId() + "_" + this.jobInstanceInfo.getJobInstanceId();
        LOGGER.info("jobInstanceId={}, map master config, pageSize:{}, queueSize:{}, dispatcherSize:{}, workerSize:{}", jobIdAndInstanceId, this.pageSize, this.queueSize, this.dispatcherSize, this.jobInstanceInfo.getAllWorkers().size());
        new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!MapTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        long startTime = System.currentTimeMillis();
                        List<TaskInfo> taskInfos = MapTaskMaster.this.taskPersistence.pull(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), MapTaskMaster.this.pageSize);
                        LOGGER.debug("jobInstanceId={}, pull cost={}ms", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime);
                        if (taskInfos.isEmpty()) {
                            LOGGER.debug("pull task empty of jobInstanceId={}, sleep 10000 ms ...", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                            Thread.sleep(10000L);
                            continue;
                        }
                        for (TaskInfo taskInfo : taskInfos) {
                            String taskName = taskInfo.getTaskName();
                            if (!MapTaskMaster.this.taskProgressMap.containsKey(taskName)) {
                                1 var7_9 = this;
                                synchronized (var7_9) {
                                    if (!MapTaskMaster.this.taskProgressMap.containsKey(taskName)) {
                                        TaskProgressCounter taskProgressCounter = new TaskProgressCounter(taskName);
                                        MapTaskMaster.this.taskProgressMap.put(taskName, taskProgressCounter);
                                    }
                                }
                            }
                            MapTaskMaster.this.taskProgressMap.get(taskName).decrementRunning();
                            ByteString taskBody = null;
                            if (taskInfo.getTaskBody() != null) {
                                taskBody = ByteString.copyFrom(taskInfo.getTaskBody());
                            }
                            Worker.MasterStartContainerRequest request2 = MapTaskMaster.this.convert2StartContainerRequest(MapTaskMaster.this.jobInstanceInfo, taskInfo.getTaskId(), taskInfo.getTaskName(), taskBody, true);
                            MapTaskMaster.this.taskBlockingQueue.submitRequest(request2);
                        }
                    }
                    catch (TimeoutException te) {
                        LOGGER.error("pull task timeout, uniqueId:{}", jobIdAndInstanceId, te);
                        MapTaskMaster.this.logCollector.collect(jobIdAndInstanceId, "[MapTaskMaster-init-pull]map task pull fail.", te);
                        try {
                            Thread.sleep(10000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                    catch (Throwable e) {
                        MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                        MapTaskMaster.this.logCollector.collect(jobIdAndInstanceId, "[MapTaskMaster-init-pull]map task pull fail.", e);
                        LOGGER.error("pull task error, uniqueId:{}", jobIdAndInstanceId, e);
                    }
                }
            }
        }, "Schedulerx-MapTaskMaster-pull-thread-" + jobIdAndInstanceId).start();
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (!MapTaskMaster.this.instanceStatus.isFinish()) {
                    try {
                        Thread.sleep(3000L);
                        InstanceStatus newStatus = MapTaskMaster.this.taskPersistence.checkInstanceStatus(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                        if (newStatus.isFinish() && MapTaskMaster.this.taskDispatchReqHandler.isActive()) {
                            Thread.sleep(3000L);
                            continue;
                        }
                        String result2 = MapTaskMaster.this.getRootTaskResult();
                        if (newStatus.isFinish()) {
                            int failCnt = 0;
                            int successCnt = 0;
                            int totalCnt = 0;
                            for (TaskProgressCounter taskProgressCounter : MapTaskMaster.this.taskProgressMap.values()) {
                                failCnt += taskProgressCounter.getFailed();
                                successCnt += taskProgressCounter.getSuccess();
                                totalCnt += taskProgressCounter.getTotal();
                            }
                            if (successCnt + failCnt < totalCnt) {
                                newStatus = InstanceStatus.FAILED;
                                LOGGER.warn("jobInstanceId={} turn into finish status, but count isn't correct, successCnt:{}, failCnt:{}, totalCnt:{}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), successCnt, failCnt, totalCnt);
                                result2 = "Turn into finish status, but count is wrong, sucCnt:" + successCnt + ", failCnt:" + failCnt + ", totalCnt:" + totalCnt;
                            } else {
                                newStatus = failCnt > 0 ? InstanceStatus.FAILED : InstanceStatus.SUCCESS;
                            }
                        }
                        MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), newStatus, result2);
                    }
                    catch (Throwable e) {
                        LOGGER.error("status check error, uniqueId:{}", jobIdAndInstanceId, e);
                    }
                }
            }
        }, "Schedulerx-MapTaskMaster-status-check-thread-" + jobIdAndInstanceId).start();
        if (!JobUtil.isSecondTypeJob(TimeType.parseValue(this.jobInstanceInfo.getTimeType()))) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    while (!MapTaskMaster.this.instanceStatus.isFinish()) {
                        Worker.WorkerReportJobInstanceProgressRequest request2 = Worker.WorkerReportJobInstanceProgressRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setProgress(MapTaskMaster.this.getJobInstanceProgress()).build();
                        MapTaskMaster.this.SERVER_DISCOVERY.getMapMasterRouter().tell(request2, null);
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("report status error, uniqueId={}", jobIdAndInstanceId, e);
                            break;
                        }
                    }
                }
            }, "Schedulerx-MapTaskMaster-report-progress-thread-" + jobIdAndInstanceId).start();
        }
        new Thread(new Runnable(){

            @Override
            public void run() {
                while (!MapTaskMaster.this.instanceStatus.isFinish()) {
                    MapTaskMaster.this.aliveCheckWorkerSet.addAll(MapTaskMaster.this.jobInstanceInfo.getAllWorkers());
                    if (MapTaskMaster.this.aliveCheckWorkerSet.isEmpty()) {
                        LOGGER.warn("worker list is empty, jobInstanceId={}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                        MapTaskMaster.this.taskPersistence.batchUpdateTaskStatus(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), TaskStatus.FAILED, null, null);
                        break;
                    }
                    try {
                        for (String workerIdAddr : MapTaskMaster.this.aliveCheckWorkerSet) {
                            try {
                                Worker.MasterCheckWorkerAliveRequest request2;
                                ActorSelection selection = MapTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerHeartbeatRouterPath(workerIdAddr));
                                Worker.MasterCheckWorkerAliveResponse response = (Worker.MasterCheckWorkerAliveResponse)FutureUtils.awaitResult(selection, (Object)(request2 = Worker.MasterCheckWorkerAliveRequest.newBuilder().setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setDispatchMode(MapTaskMaster.this.xAttrs.getTaskDispatchMode()).build()), 10L);
                                if (response.getSuccess()) continue;
                                LOGGER.warn("jobInstanceId={} of worker={} is not alive", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, response.getMessage());
                                MapTaskMaster.this.handleWorkerShutdown(workerIdAddr);
                                Worker.MasterDestroyContainerPoolRequest destroyContainerPoolRequest = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).setSerialNum(MapTaskMaster.this.getSerialNum()).build();
                                SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(destroyContainerPoolRequest, null);
                            }
                            catch (TimeoutException e) {
                                LOGGER.warn("worker[{}] is down, start to remove this worker and failover tasks, jobInstanceId={}", workerIdAddr, MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                                MapTaskMaster.this.handleWorkerShutdown(workerIdAddr);
                            }
                            catch (Throwable e) {
                                LOGGER.error("check worker error worker={}, jobInstanceId={}", workerIdAddr, MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), e);
                            }
                        }
                        Thread.sleep(5000L);
                    }
                    catch (Throwable e) {
                        LOGGER.error("check worker error, jobInstanceId={}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), e);
                    }
                }
            }
        }, "Schedulerx-MapTaskMaster-check-worker-alive-thread-" + jobIdAndInstanceId).start();
        if (this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PULL.getValue())) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    while (!MapTaskMaster.this.instanceStatus.isFinish()) {
                        for (String workerIdAddr : MapTaskMaster.this.jobInstanceInfo.getAllWorkers()) {
                            try {
                                Worker.MasterNotifyWorkerPullRequest request2;
                                ActorSelection selection = MapTaskMaster.this.getActorContext().actorSelection(ActorPathUtil.getWorkerJobInstancePath(workerIdAddr));
                                Worker.MasterNotifyWorkerPullResponse response = (Worker.MasterNotifyWorkerPullResponse)FutureUtils.awaitResult(selection, (Object)(request2 = Worker.MasterNotifyWorkerPullRequest.newBuilder().setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setPageSize(MapTaskMaster.this.xAttrs.getPageSize()).setQueueSize(MapTaskMaster.this.xAttrs.getQueueSize()).setTaskMasterAkkaPath(MapTaskMaster.this.getLocalTaskRouterPath()).setConsumerSize(MapTaskMaster.this.xAttrs.getConsumerSize()).build()), 5L);
                                if (response.getSuccess()) continue;
                                String errorMsg = response.getMessage();
                                LOGGER.error("notify worker pull failed, jobInstanceId={}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), errorMsg);
                                MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, errorMsg);
                            }
                            catch (Throwable e) {
                                LOGGER.error("notify worker pull error, jobInstanceId={}, worker={}", jobIdAndInstanceId, workerIdAddr, e);
                            }
                        }
                        try {
                            Thread.sleep(5000L);
                        }
                        catch (InterruptedException e) {
                            LOGGER.error("", e);
                        }
                    }
                }
            }, "Schedulerx-PullTaskMaster-notify-workers-pull-thread-" + jobIdAndInstanceId).start();
        }
    }

    @Override
    public void submitInstance(JobInstanceInfo jobInstanceInfo) throws Exception {
        try {
            long startTime = System.currentTimeMillis();
            if (this.dispatcherSize > 200) {
                this.dispatcherSize = 200;
            }
            this.startBatchHandler();
            this.createTask("MAP_TASK_ROOT", ByteString.copyFrom(HessianUtil.toBytes("MAP_TASK_ROOT")));
            LOGGER.info("jobInstanceId={} create root task, cost={}ms", jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime);
            this.init();
        }
        catch (Throwable e) {
            String jobIdAndInstanceId = jobInstanceInfo.getJobId() + "_" + jobInstanceInfo.getJobInstanceId();
            LOGGER.error("", e);
            this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
            this.logCollector.collect(jobIdAndInstanceId, "[MapTaskMaster-submitInstance]instance init fail.", e);
        }
    }

    @Override
    public void updateTaskStatus(Worker.ContainerReportTaskStatusRequest request2) {
        try {
            this.taskStatusReqQueue.submitRequest(request2);
        }
        catch (Throwable e) {
            LOGGER.error("", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void batchUpdateTaskStatues(List<Worker.ContainerReportTaskStatusRequest> requests) {
        try {
            for (Worker.ContainerReportTaskStatusRequest request2 : requests) {
                MapTaskMaster mapTaskMaster;
                TaskStatus taskStatus = TaskStatus.parseValue(request2.getStatus());
                String workerAddr = request2.getWorkerAddr();
                String taskName = request2.getTaskName();
                LOGGER.debug("report task status:{} from worker:{}, uniqueId:{}", taskStatus.getDescription(), workerAddr, IdUtil.getUniqueId(request2.getJobId(), request2.getJobInstanceId(), request2.getTaskId()));
                if (!this.taskProgressMap.containsKey(taskName)) {
                    mapTaskMaster = this;
                    synchronized (mapTaskMaster) {
                        if (!this.taskProgressMap.containsKey(taskName)) {
                            TaskProgressCounter taskProgressCounter = new TaskProgressCounter(taskName);
                            this.taskProgressMap.put(taskName, taskProgressCounter);
                        }
                    }
                }
                if (workerAddr != null && !this.workerProgressMap.containsKey(workerAddr)) {
                    mapTaskMaster = this;
                    synchronized (mapTaskMaster) {
                        if (!this.workerProgressMap.containsKey(workerAddr)) {
                            WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                            this.workerProgressMap.put(workerAddr, workerProgressCounter);
                        }
                    }
                }
                if (taskStatus.equals((Object)TaskStatus.RUNNING)) {
                    this.taskProgressMap.get(taskName).incrementRunning();
                    if (workerAddr != null) {
                        this.workerProgressMap.get(workerAddr).incrementRunning();
                    }
                } else if (taskStatus.equals((Object)TaskStatus.SUCCESS)) {
                    this.taskProgressMap.get(taskName).incrementSuccess();
                    if (workerAddr != null) {
                        this.workerProgressMap.get(workerAddr).incrementSuccess();
                    }
                } else if (taskStatus.equals((Object)TaskStatus.FAILED)) {
                    this.taskProgressMap.get(taskName).incrementFailed();
                    if (workerAddr != null) {
                        this.workerProgressMap.get(workerAddr).incrementFailed();
                    }
                }
                this.taskResultMap.put(request2.getTaskId(), request2.getResult());
                this.taskStatusMap.put(request2.getTaskId(), taskStatus);
            }
        }
        catch (Throwable e) {
            LOGGER.error("jobInstanceId={}, update progressMap error.", this.jobInstanceInfo.getJobInstanceId(), e);
        }
        try {
            long startTime = System.currentTimeMillis();
            int index2 = requests.size() - 1;
            if (index2 >= 0 && TaskStatus.FAILED.getValue() == requests.get(index2).getStatus() && "MAP_TASK_ROOT".equals(requests.get(index2).getTaskName())) {
                this.setRootTaskResult(requests.get(index2).getResult());
            }
            boolean updateSuccess = false;
            for (int i = 0; i < 3; ++i) {
                try {
                    this.taskPersistence.updateTaskStatues(requests);
                    updateSuccess = true;
                    break;
                }
                catch (Throwable t) {
                    LOGGER.error("jobInstanceId={}, persistent batch updateTaskStatus error.", t);
                    continue;
                }
            }
            if (!updateSuccess) {
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "persistent batch update TaskStatus error up to 3 times");
            }
            LOGGER.debug("{} batch update status db cost:{}", this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime);
        }
        catch (Throwable e) {
            LOGGER.error("jobInstanceId={}, batch updateTaskStatus error.", this.jobInstanceInfo.getJobInstanceId(), e);
        }
    }

    public boolean map(List<ByteString> taskList, String taskName) throws Exception {
        LOGGER.debug("map taskName:{}, size:{}", taskName, taskList.size());
        this.initTaskProgress(taskName, taskList.size());
        for (ByteString taskBody : taskList) {
            Worker.MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
            this.taskBlockingQueue.submitRequest(startContainerRequest);
        }
        return this.machineOverload();
    }

    protected void clearTasks(long jobInstanceId) {
        try {
            this.taskPersistence.clearTasks(jobInstanceId);
            LOGGER.info("jobInstanceId={} clearTasks success.", jobInstanceId);
        }
        catch (Throwable ex) {
            LOGGER.error("jobInstanceId={} clearTasks error", jobInstanceId, ex);
        }
    }

    protected void createTask(String taskName, ByteString taskBody) throws Exception {
        this.initTaskProgress(taskName, 1);
        Worker.MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
        this.taskBlockingQueue.submitRequest(startContainerRequest);
    }

    private void batchHandleContainers(final String workerIdAddr, final List<Worker.MasterStartContainerRequest> reqs, boolean isFailover, TaskDispatchMode dispatchMode) {
        block3: {
            final String workerId = workerIdAddr.split("@")[0];
            final String workerAddr = workerIdAddr.split("@")[1];
            LOGGER.debug("jobInstanceId={}, batch dispatch, worker:{}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size());
            try {
                this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
                if (!dispatchMode.equals((Object)TaskDispatchMode.PUSH)) break block3;
                final long startTime = System.currentTimeMillis();
                ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
                Worker.MasterBatchStartContainersRequest request2 = Worker.MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
                Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
                Future<Object> future2 = Patterns.ask(selection, (Object)request2, timeout);
                future2.onSuccess(new OnSuccess<Object>(){

                    @Override
                    public void onSuccess(Object obj) throws Throwable {
                        Worker.MasterBatchStartContainersResponse response = (Worker.MasterBatchStartContainersResponse)obj;
                        if (response.getSuccess()) {
                            LOGGER.info("jobInstanceId={}, batch start containers successfully, size:{} , worker={}, cost={}ms", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), reqs.size(), workerIdAddr, System.currentTimeMillis() - startTime);
                        } else {
                            LOGGER.error("jobInstanceId={}, batch start containers failed, worker={}, response={}, size:{}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, response.getMessage(), reqs.size());
                            for (Worker.MasterStartContainerRequest req : reqs) {
                                MapTaskMaster.this.taskProgressMap.get(req.getTaskName()).incrementFailed();
                                MapTaskMaster.this.workerProgressMap.get(workerAddr).incrementFailed();
                                Worker.ContainerReportTaskStatusRequest faileStatusRequest = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
                                MapTaskMaster.this.updateTaskStatus(faileStatusRequest);
                            }
                        }
                    }
                }, this.futureExecutor);
                future2.onFailure(new OnFailure(){

                    @Override
                    public void onFailure(Throwable e) throws Throwable {
                        if (e instanceof TimeoutException) {
                            LOGGER.warn("jobInstanceId={}, worker[{}] is down, try another worker, size:{}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size());
                            ArrayList<Long> taskIds = Lists.newArrayList();
                            for (Worker.MasterStartContainerRequest req : reqs) {
                                taskIds.add(req.getTaskId());
                            }
                            try {
                                int affectCnt = MapTaskMaster.this.taskPersistence.updateTaskStatus(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.INIT, workerId, workerAddr);
                                MapTaskMaster.this.workerProgressMap.get(workerAddr).decrementRunning(affectCnt);
                            }
                            catch (Exception e1) {
                                LOGGER.error("jobInstanceId={}, timeout return init error", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                                MapTaskMaster.this.updateNewInstanceStatus(MapTaskMaster.this.getSerialNum(), InstanceStatus.FAILED, "timeout dispatch return init error");
                            }
                        } else {
                            String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(MapTaskMaster.this.jobInstanceInfo.getJobId(), MapTaskMaster.this.jobInstanceInfo.getJobInstanceId());
                            LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", MapTaskMaster.this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), e);
                            MapTaskMaster.this.logCollector.collect(uniqueIdWithoutTask, "[MapTaskMaster-init-dispatch]map task dispatch fail.", e);
                            for (Worker.MasterStartContainerRequest req : reqs) {
                                MapTaskMaster.this.taskProgressMap.get(req.getTaskName()).incrementFailed();
                                MapTaskMaster.this.workerProgressMap.get(workerAddr).incrementFailed();
                                Worker.ContainerReportTaskStatusRequest faileReq = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(MapTaskMaster.this.jobInstanceInfo.getJobId()).setJobInstanceId(MapTaskMaster.this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
                                MapTaskMaster.this.updateTaskStatus(faileReq);
                            }
                        }
                    }
                }, this.futureExecutor);
            }
            catch (Throwable exception) {
                String uniqueIdWithoutTask = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
                LOGGER.error("jobInstanceId:{}, batch dispatch Tasks error worker={}, size:{}", this.jobInstanceInfo.getJobInstanceId(), workerIdAddr, reqs.size(), exception);
                this.logCollector.collect(uniqueIdWithoutTask, "[MapTaskMaster-init-dispatch]map task dispatch fail.", exception);
                for (Worker.MasterStartContainerRequest req : reqs) {
                    this.taskProgressMap.get(req.getTaskName()).incrementFailed();
                    this.workerProgressMap.get(workerAddr).incrementFailed();
                    Worker.ContainerReportTaskStatusRequest faileReq = Worker.ContainerReportTaskStatusRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setTaskId(req.getTaskId()).setStatus(TaskStatus.FAILED.getValue()).setWorkerId(workerId).setTaskName(req.getTaskName()).setWorkerAddr(workerAddr).setTaskName(req.getTaskName()).build();
                    this.updateTaskStatus(faileReq);
                }
            }
        }
    }

    private void batchHandlePersistence(String workerId, String workerAddr, List<Worker.MasterStartContainerRequest> reqs, boolean isFailover) throws Exception {
        long startTime = System.currentTimeMillis();
        if (!isFailover) {
            this.taskPersistence.createTasks(reqs, workerId, workerAddr);
        } else {
            ArrayList<Long> taskIds = Lists.newArrayList();
            for (Worker.MasterStartContainerRequest req : reqs) {
                taskIds.add(req.getTaskId());
            }
            this.taskPersistence.updateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), taskIds, TaskStatus.RUNNING, workerId, workerAddr);
        }
        LOGGER.debug("jobInstance={}, batch dispatch db cost:{} ms, size:{}", this.jobInstanceInfo.getJobInstanceId(), System.currentTimeMillis() - startTime, reqs.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void batchHandleRunningProgress(List<Worker.MasterStartContainerRequest> masterStartContainerRequests, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover) {
        for (Worker.MasterStartContainerRequest request2 : masterStartContainerRequests) {
            String workerIdAddr = this.selectWorker();
            if (workerIdAddr == null) {
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "all worker is down!");
                break;
            }
            String workerAddr = workerIdAddr.split("@")[1];
            if (request2.getFailover()) {
                if (!worker2ReqsWithFailover.containsKey(workerIdAddr)) {
                    worker2ReqsWithFailover.put(workerIdAddr, Lists.newArrayList(request2));
                } else {
                    worker2ReqsWithFailover.get(workerIdAddr).add(request2);
                }
            } else if (!worker2ReqsWithNormal.containsKey(workerIdAddr)) {
                worker2ReqsWithNormal.put(workerIdAddr, Lists.newArrayList(request2));
            } else {
                worker2ReqsWithNormal.get(workerIdAddr).add(request2);
            }
            this.taskProgressMap.get(request2.getTaskName()).incrementRunning();
            if (workerAddr != null && !this.workerProgressMap.containsKey(workerAddr)) {
                MapTaskMaster mapTaskMaster = this;
                synchronized (mapTaskMaster) {
                    if (!this.workerProgressMap.containsKey(workerAddr)) {
                        WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                        this.workerProgressMap.put(workerAddr, workerProgressCounter);
                    }
                }
            }
            this.workerProgressMap.get(workerAddr).incrementTotal();
            this.workerProgressMap.get(workerAddr).incrementRunning();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void batchHandlePulledProgress(List<Worker.MasterStartContainerRequest> masterStartContainerRequests, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal, Map<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover, String remoteWorker) {
        for (Worker.MasterStartContainerRequest request2 : masterStartContainerRequests) {
            String workerIdAddr;
            String string2 = workerIdAddr = remoteWorker != null ? remoteWorker : this.selectWorker();
            if (workerIdAddr == null) {
                this.updateNewInstanceStatus(this.getSerialNum(), InstanceStatus.FAILED, "all worker is down!");
                break;
            }
            String workerAddr = workerIdAddr.split("@")[1];
            if (request2.getFailover()) {
                if (!worker2ReqsWithFailover.containsKey(workerIdAddr)) {
                    worker2ReqsWithFailover.put(workerIdAddr, Lists.newArrayList(request2));
                } else {
                    worker2ReqsWithFailover.get(workerIdAddr).add(request2);
                }
            } else if (!worker2ReqsWithNormal.containsKey(workerIdAddr)) {
                worker2ReqsWithNormal.put(workerIdAddr, Lists.newArrayList(request2));
            } else {
                worker2ReqsWithNormal.get(workerIdAddr).add(request2);
            }
            this.taskProgressMap.get(request2.getTaskName()).incrementPulled();
            if (workerAddr != null && !this.workerProgressMap.containsKey(workerAddr)) {
                MapTaskMaster mapTaskMaster = this;
                synchronized (mapTaskMaster) {
                    if (!this.workerProgressMap.containsKey(workerAddr)) {
                        WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                        this.workerProgressMap.put(workerAddr, workerProgressCounter);
                    }
                }
            }
            this.workerProgressMap.get(workerAddr).incrementTotal();
            this.workerProgressMap.get(workerAddr).incrementPulled();
        }
    }

    public void batchDispatchTasks(List<Worker.MasterStartContainerRequest> masterStartContainerRequests) {
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal = Maps.newHashMap();
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover = Maps.newHashMap();
        this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, null);
        for (Map.Entry entry : worker2ReqsWithNormal.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
        }
        for (Map.Entry entry : worker2ReqsWithFailover.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
        }
    }

    public void batchPullTasks(List<Worker.MasterStartContainerRequest> masterStartContainerRequests, String workerIdAddr) {
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithNormal = Maps.newHashMap();
        HashMap<String, List<Worker.MasterStartContainerRequest>> worker2ReqsWithFailover = Maps.newHashMap();
        this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, workerIdAddr);
        for (Map.Entry entry : worker2ReqsWithNormal.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PULL);
        }
        for (Map.Entry entry : worker2ReqsWithFailover.entrySet()) {
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PULL);
        }
    }

    protected synchronized String selectWorker() {
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        int size2 = allWorkers.size();
        if (size2 == 0) {
            return null;
        }
        if (this.index >= size2) {
            this.index %= size2;
        }
        String worker = allWorkers.get(this.index);
        ++this.index;
        return worker;
    }

    @Override
    public void killInstance(String reason) {
        super.killInstance(reason);
        String uniqueId = IdUtil.getUniqueIdWithoutTask(this.jobInstanceInfo.getJobId(), this.jobInstanceInfo.getJobInstanceId());
        this.updateNewInstanceStatus(this.getSerialNum(), this.jobInstanceInfo.getJobInstanceId(), InstanceStatus.FAILED, reason);
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            try {
                ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
                Worker.MasterKillContainerRequest request2 = Worker.MasterKillContainerRequest.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).build();
                selection.tell(request2, null);
            }
            catch (Throwable e) {
                this.logCollector.collect(uniqueId, ClientLoggerMessage.appendMessage("[Master-killInstance]kill instance tell worker fail worker addr is ", workerIdAddr), e);
                LOGGER.error("send kill instance request exception, worker:{}, uniqueId:{}", workerIdAddr, uniqueId);
            }
        }
    }

    @Override
    public void destroyContainerPool() {
        List<String> allWorkers = this.jobInstanceInfo.getAllWorkers();
        for (String workerIdAddr : allWorkers) {
            Worker.MasterDestroyContainerPoolRequest request2 = Worker.MasterDestroyContainerPoolRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setSerialNum(this.getSerialNum()).setJobId(this.jobInstanceInfo.getJobId()).setWorkerIdAddr(workerIdAddr).build();
            SchedulerxWorker.AtLeastDeliveryRoutingActor.tell(request2, null);
        }
    }

    @Override
    public void killTask(String uniqueId, String workerId, String workerAddr) {
        String workerIdAddr = workerId + "@" + workerAddr;
        try {
            ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
            Worker.MasterKillContainerRequest request2 = Worker.MasterKillContainerRequest.newBuilder().setJobId(IdUtil.parse(uniqueId, IdUtil.IdType.JOB_ID)).setJobInstanceId(IdUtil.parse(uniqueId, IdUtil.IdType.JOB_INSTANCE_ID)).setTaskId(IdUtil.parse(uniqueId, IdUtil.IdType.TASK_ID)).build();
            selection.tell(request2, null);
        }
        catch (Throwable e) {
            LOGGER.error("send kill request exception, worker:" + workerIdAddr);
        }
    }

    @Override
    public String getJobInstanceProgress() {
        MapTaskProgress detail = new MapTaskProgress();
        detail.setTaskProgress(this.taskProgressMap.values());
        detail.setWorkerProgress(this.workerProgressMap.values());
        return JsonUtil.toJson(detail);
    }

    @Override
    public ProcessResult postFinish(long jobInstanceId) {
        ProcessResult reduceResult;
        block10: {
            reduceResult = null;
            try {
                JobContext context = JobContext.newBuilder().setJobId(this.jobInstanceInfo.getJobId()).setJobInstanceId(jobInstanceId).setJobType(this.jobInstanceInfo.getJobType()).setContent(this.jobInstanceInfo.getContent()).setScheduleTime(this.jobInstanceInfo.getScheduleTime()).setDataTime(this.jobInstanceInfo.getDataTime()).setJobParameters(this.jobInstanceInfo.getParameters()).setInstanceParameters(this.jobInstanceInfo.getInstanceParameters()).setUser(this.jobInstanceInfo.getUser()).setTaskResults(this.taskResultMap).setTaskStatuses(this.taskStatusMap).build();
                JobProcessor jobProcessor = JavaProcessorProfileUtil.getJavaProcessor(context.getContent());
                if (jobProcessor instanceof MapReduceJobProcessor) {
                    boolean runReduceIfFail = ((MapReduceJobProcessor)jobProcessor).runReduceIfFail(context);
                    if (this.getInstanceStatus().equals((Object)InstanceStatus.FAILED) && !runReduceIfFail) {
                        LOGGER.warn("jobInstanceId={} is failed, skip reduce", jobInstanceId);
                        return null;
                    }
                    String reduceTaskName = "REDUCE_TASK";
                    if (!this.taskProgressMap.containsKey(reduceTaskName)) {
                        TaskProgressCounter taskProgressCounter = new TaskProgressCounter(reduceTaskName);
                        this.taskProgressMap.put(reduceTaskName, taskProgressCounter);
                    }
                    this.taskProgressMap.get(reduceTaskName).incrementTotal();
                    this.taskProgressMap.get(reduceTaskName).incrementRunning();
                    String workerAddr = this.getActorContext().provider().getDefaultAddress().host().get() + ":" + this.getActorContext().provider().getDefaultAddress().port().get();
                    if (!this.workerProgressMap.containsKey(workerAddr)) {
                        WorkerProgressCounter workerProgressCounter = new WorkerProgressCounter(workerAddr);
                        this.workerProgressMap.put(workerAddr, workerProgressCounter);
                    }
                    this.workerProgressMap.get(workerAddr).incrementTotal();
                    this.workerProgressMap.get(workerAddr).incrementRunning();
                    try {
                        reduceResult = ((MapReduceJobProcessor)jobProcessor).reduce(context);
                    }
                    catch (Exception e) {
                        LOGGER.error("", e);
                        reduceResult = new ProcessResult(false);
                        reduceResult.setResult("reduce exception: " + ExceptionUtil.getMessage(e));
                    }
                    if (reduceResult.getStatus().equals((Object)InstanceStatus.SUCCESS)) {
                        this.taskProgressMap.get(reduceTaskName).incrementSuccess();
                        this.workerProgressMap.get(workerAddr).incrementSuccess();
                    } else {
                        this.taskProgressMap.get(reduceTaskName).incrementFailed();
                        this.workerProgressMap.get(workerAddr).incrementFailed();
                    }
                    break block10;
                }
                jobProcessor.postProcess(context);
            }
            catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
        return reduceResult;
    }

    @Override
    public void stop() {
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.stop();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.stop();
        }
        LOGGER.info("jobInstanceId:{}, instance master successfully stop.", this.jobInstanceInfo.getJobInstanceId());
    }

    protected void startBatchHandler() {
        if (this.INITED) {
            return;
        }
        this.taskStatusReqQueue.init();
        this.taskStatusReqBatchHandler.start();
        this.taskBlockingQueue.setCapacity(this.queueSize);
        this.taskBlockingQueue.init();
        if (this.xAttrs.getTaskDispatchMode().equals(TaskDispatchMode.PUSH.getValue())) {
            this.taskDispatchReqHandler.setWorkThreadNum(this.dispatcherSize);
            this.taskDispatchReqHandler.setDispatchSize(this.pageSize * this.jobInstanceInfo.getAllWorkers().size());
            this.taskDispatchReqHandler.start();
        }
    }

    private int getTotalPulledAndRunning() {
        int total = 0;
        ArrayList<TaskProgressCounter> taskCounters = Lists.newArrayList(this.taskProgressMap.values());
        for (TaskProgressCounter taskProgressCounter : taskCounters) {
            total += taskProgressCounter.getPulled();
            total += taskProgressCounter.getRunning();
        }
        return total;
    }

    private boolean machineOverload() {
        boolean memOverload = false;
        boolean loadOverload = false;
        Metrics vmDetail = MetricsCollector.getMetrics();
        if (vmDetail != null) {
            memOverload = vmDetail.getHeap1Usage() >= (double)0.9f;
            loadOverload = vmDetail.getCpuLoad1() >= (double)vmDetail.getCpuProcessors();
        }
        return memOverload || loadOverload;
    }

    public String getRootTaskResult() {
        return this.rootTaskResult;
    }

    public void setRootTaskResult(String rootTaskResult) {
        this.rootTaskResult = rootTaskResult;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTaskProgress(String taskName, int delta) {
        if (!this.taskProgressMap.containsKey(taskName)) {
            MapTaskMaster mapTaskMaster = this;
            synchronized (mapTaskMaster) {
                if (!this.taskProgressMap.containsKey(taskName)) {
                    TaskProgressCounter taskProgressCounter = new TaskProgressCounter(taskName);
                    this.taskProgressMap.put(taskName, taskProgressCounter);
                }
            }
        }
        this.taskProgressMap.get(taskName).incrementTotal(delta);
    }

    @Override
    public void clear() {
        super.clear();
        if (this.taskStatusReqQueue != null) {
            this.taskStatusReqQueue.clear();
        }
        if (this.taskBlockingQueue != null) {
            this.taskBlockingQueue.clear();
        }
        if (this.taskDispatchReqHandler != null) {
            this.taskDispatchReqHandler.clear();
        }
        if (this.taskStatusReqBatchHandler != null) {
            this.taskStatusReqBatchHandler.clear();
        }
        if (this.taskProgressMap != null) {
            this.taskProgressMap.clear();
        }
        if (this.workerProgressMap != null) {
            this.workerProgressMap.clear();
        }
        if (this.taskResultMap != null) {
            this.taskResultMap.clear();
        }
        if (this.taskStatusMap != null) {
            this.taskStatusMap.clear();
        }
        this.clearTasks(this.jobInstanceInfo.getJobInstanceId());
        this.taskCounter.set(0);
    }

    public Map<String, TaskProgressCounter> getTaskProgressMap() {
        return this.taskProgressMap;
    }

    public synchronized List<Worker.MasterStartContainerRequest> syncPullTasks(int pageSize, String workerIdAddr) {
        if (this.getTotalPulledAndRunning() >= this.xAttrs.getGlobalConsumerSize()) {
            return Lists.newArrayList();
        }
        return this.taskDispatchReqHandler.syncHandleReqs(pageSize, workerIdAddr);
    }

    @Override
    protected void checkProcessor() throws Exception {
        JobProcessor processor;
        if ("java".equalsIgnoreCase(this.jobInstanceInfo.getJobType()) && !((processor = JavaProcessorProfileUtil.getJavaProcessor(this.jobInstanceInfo.getContent())) instanceof MapJobProcessor)) {
            throw new IOException(processor.getClass().getName() + " must extends MapJobProcessor or MapReduceJobProcessor");
        }
    }

    private void handleWorkerShutdown(String workerIdAddr) {
        this.aliveCheckWorkerSet.remove(workerIdAddr);
        this.jobInstanceInfo.getAllWorkers().remove(workerIdAddr);
        this.taskDispatchReqHandler.setDispatchSize(this.aliveCheckWorkerSet.size() * this.pageSize);
        String[] workerInfo = workerIdAddr.split("@");
        String workerAddr = workerInfo[1];
        String workerId = workerInfo[0];
        int affectCnt = this.taskPersistence.batchUpdateTaskStatus(this.jobInstanceInfo.getJobInstanceId(), TaskStatus.INIT, workerId, workerAddr);
        LOGGER.warn("jobInstanceId={}, failover task number:{}, workerId:{}, workerAddr:{}", this.jobInstanceInfo.getJobInstanceId(), affectCnt, workerId, workerAddr);
        if (affectCnt > 0) {
            this.workerProgressMap.get(workerAddr).decrementRunning(affectCnt);
        }
    }
}

