package com.alibaba.schedulerx.worker.actor;

import akka.actor.UntypedActor;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.IdUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.master.MapTaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;

/* loaded from: input_file:com/alibaba/schedulerx/worker/actor/TaskActor.class */
public class TaskActor extends UntypedActor {
    private TaskMasterPool masterPool = TaskMasterPool.INSTANCE;
    private static final Logger LOGGER = LogFactory.getLogger(TaskActor.class);

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof Worker.ContainerReportTaskStatusRequest) {
            handleTaskStatus((Worker.ContainerReportTaskStatusRequest) obj);
            return;
        }
        if (obj instanceof Worker.ContainerBatchReportTaskStatuesRequest) {
            handleBatchTaskStatues((Worker.ContainerBatchReportTaskStatuesRequest) obj);
        } else if (obj instanceof Worker.WorkerMapTaskRequest) {
            handleMapTask((Worker.WorkerMapTaskRequest) obj);
        } else if (obj instanceof Worker.PullTaskFromMasterRequest) {
            handlePullTasks((Worker.PullTaskFromMasterRequest) obj);
        }
    }

    private void handleTaskStatus(Worker.ContainerReportTaskStatusRequest containerReportTaskStatusRequest) {
        try {
            TaskMaster taskMaster = this.masterPool.get(containerReportTaskStatusRequest.getJobInstanceId());
            LOGGER.debug("handleTaskStatus, uniqueId:{}, status:{}, workerAddr:{}", IdUtil.getUniqueId(containerReportTaskStatusRequest.getJobId(), containerReportTaskStatusRequest.getJobInstanceId(), containerReportTaskStatusRequest.getTaskId()), Integer.valueOf(containerReportTaskStatusRequest.getStatus()), containerReportTaskStatusRequest.getWorkerAddr());
            if (taskMaster != null) {
                taskMaster.updateTaskStatus(containerReportTaskStatusRequest);
            }
        } catch (Throwable th) {
            LOGGER.error("jobInstanceId={}, taskId={}", Long.valueOf(containerReportTaskStatusRequest.getJobInstanceId()), Long.valueOf(containerReportTaskStatusRequest.getTaskId()));
        }
    }

    private void handleBatchTaskStatues(Worker.ContainerBatchReportTaskStatuesRequest containerBatchReportTaskStatuesRequest) {
        LOGGER.info("jobInstanceId={}, serialNum={}, batch receive task status reqs, size:{}", Long.valueOf(containerBatchReportTaskStatuesRequest.getJobInstanceId()), Long.valueOf(containerBatchReportTaskStatuesRequest.getSerialNum()), Integer.valueOf(containerBatchReportTaskStatuesRequest.getTaskStatuesCount()));
        Worker.ContainerBatchReportTaskStatuesResponse containerBatchReportTaskStatuesResponse = null;
        try {
            try {
                TaskMaster taskMaster = this.masterPool.get(containerBatchReportTaskStatuesRequest.getJobInstanceId());
                if (taskMaster != null) {
                    taskMaster.batchUpdateTaskStatus(containerBatchReportTaskStatuesRequest);
                }
                containerBatchReportTaskStatuesResponse = Worker.ContainerBatchReportTaskStatuesResponse.newBuilder().setSuccess(true).setDeliveryId(containerBatchReportTaskStatuesRequest.getDeliveryId()).build();
                getSender().tell(containerBatchReportTaskStatuesResponse, getSelf());
            } catch (Throwable th) {
                LOGGER.error("jobInstanceId={}, handleBatchTaskStatues error.", Long.valueOf(containerBatchReportTaskStatuesRequest.getJobInstanceId()), th);
                getSender().tell(Worker.ContainerBatchReportTaskStatuesResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(th)).setDeliveryId(containerBatchReportTaskStatuesRequest.getDeliveryId()).build(), getSelf());
            }
        } catch (Throwable th2) {
            getSender().tell(containerBatchReportTaskStatuesResponse, getSelf());
            throw th2;
        }
    }

    private void handleMapTask(Worker.WorkerMapTaskRequest workerMapTaskRequest) throws Exception {
        Worker.WorkerMapTaskResponse build;
        try {
            try {
                long jobInstanceId = workerMapTaskRequest.getJobInstanceId();
                TaskMaster taskMaster = this.masterPool.get(jobInstanceId);
                if (taskMaster == null) {
                    build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
                } else if (taskMaster instanceof MapTaskMaster) {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        boolean map = ((MapTaskMaster) taskMaster).map(workerMapTaskRequest.getTaskBodyList(), workerMapTaskRequest.getTaskName());
                        LOGGER.debug("jobInstanceId={} map, cost={}ms", Long.valueOf(jobInstanceId), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(map).build();
                    } catch (Exception e) {
                        LOGGER.error("jobInstanceId={} map error", e);
                        taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                        throw e;
                    }
                } else {
                    build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
                    taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
                }
                getSender().tell(build, getSelf());
            } catch (Throwable th) {
                LOGGER.error("jobInstanceId={}, handleMapTask error.", Long.valueOf(workerMapTaskRequest.getJobInstanceId()), th);
                getSender().tell(Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(th)).build(), getSelf());
            }
        } catch (Throwable th2) {
            getSender().tell(null, getSelf());
            throw th2;
        }
    }

    private void handlePullTasks(Worker.PullTaskFromMasterRequest pullTaskFromMasterRequest) {
        long jobInstanceId = pullTaskFromMasterRequest.getJobInstanceId();
        TaskMaster taskMaster = this.masterPool.get(jobInstanceId);
        Worker.PullTaskFromMasterResponse pullTaskFromMasterResponse = null;
        if (taskMaster != null) {
            try {
                try {
                    if (taskMaster instanceof MapTaskMaster) {
                        pullTaskFromMasterResponse = Worker.PullTaskFromMasterResponse.newBuilder().setSuccess(true).addAllRequest(((MapTaskMaster) taskMaster).syncPullTasks(pullTaskFromMasterRequest.getSerialNum(), pullTaskFromMasterRequest.getPageSize(), pullTaskFromMasterRequest.getWorkerIdAddr())).build();
                        getSender().tell(pullTaskFromMasterResponse, getSelf());
                    }
                } catch (Throwable th) {
                    LOGGER.error("", th);
                    getSender().tell(pullTaskFromMasterResponse, getSelf());
                    return;
                }
            } catch (Throwable th2) {
                getSender().tell(pullTaskFromMasterResponse, getSelf());
                throw th2;
            }
        }
        pullTaskFromMasterResponse = Worker.PullTaskFromMasterResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is null or not MapTaskMaster, jobInstanceId=" + jobInstanceId).build();
        getSender().tell(pullTaskFromMasterResponse, getSelf());
    }
}
