package com.alibaba.schedulerx.worker.processor;

import akka.actor.ActorSelection;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.ExceptionUtil;
import com.alibaba.schedulerx.common.util.HessianUtil;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.com.google.protobuf.ByteString;
import com.alibaba.schedulerx.shade.com.mashape.unirest.http.options.Options;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.discovery.GroupManager;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.master.MapTaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMaster;
import com.alibaba.schedulerx.worker.master.TaskMasterPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:com/alibaba/schedulerx/worker/processor/MapJobProcessor.class */
public abstract class MapJobProcessor extends JavaProcessor {
    private LogCollector logCollector = LogCollectorFactory.get();
    private static final Logger LOGGER = LogFactory.getLogger(MapJobProcessor.class);
    private static final Integer MAX_RETRY_COUNT = 3;

    public ProcessResult map(List<? extends Object> list, String str) {
        Worker.WorkerMapTaskResponse workerMapTaskResponse;
        ProcessResult processResult = new ProcessResult(false);
        JobContext context = ContainerFactory.getContainerPool().getContext();
        ActorSelection actorSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
        if (actorSelection == null) {
            String str2 = "get taskMaster akka path error, path=" + context.getInstanceMasterActorPath();
            LOGGER.error(str2);
            processResult.setResult(str2);
            return processResult;
        }
        if (CollectionUtils.isEmpty(list)) {
            processResult.setResult("task list is empty");
            return processResult;
        }
        int i = ConfigUtil.getWorkerConfig().getInt(WorkerConstants.WORKER_MAP_PAGE_SIZE, 1000);
        int size = list.size();
        LOGGER.info("map task list, jobInstanceId={}, taskName={}, size={}, batchSize={}", Long.valueOf(context.getJobInstanceId()), str, Integer.valueOf(size), Integer.valueOf(i));
        int i2 = size / i;
        int i3 = size % i > 0 ? i2 + 1 : i2;
        ArrayList<Worker.WorkerMapTaskRequest.Builder> newArrayList = Lists.newArrayList();
        for (int i4 = 0; i4 < i3; i4++) {
            newArrayList.add(Worker.WorkerMapTaskRequest.newBuilder());
        }
        int i5 = 0;
        int i6 = ConfigUtil.getWorkerConfig().getInt(WorkerConstants.TASK_BODY_SIZE_MAX, 65536);
        try {
            for (Object obj : list) {
                checkTaskObject(obj);
                int i7 = i5;
                i5++;
                int i8 = i7 / i;
                byte[] bytes = HessianUtil.toBytes(obj);
                if (bytes.length > i6) {
                    throw new IOException("taskBody size more than " + i6 + "B!");
                }
                ((Worker.WorkerMapTaskRequest.Builder) newArrayList.get(i8)).addTaskBody(ByteString.copyFrom(bytes));
            }
            int i9 = 0;
            for (Worker.WorkerMapTaskRequest.Builder builder : newArrayList) {
                builder.setJobId(context.getJobId());
                builder.setJobInstanceId(context.getJobInstanceId());
                builder.setTaskId(context.getTaskId());
                builder.setTaskName(str);
                try {
                    TaskMaster taskMaster = TaskMasterPool.INSTANCE.get(context.getJobInstanceId());
                    workerMapTaskResponse = (taskMaster == null || !(taskMaster instanceof MapTaskMaster)) ? (Worker.WorkerMapTaskResponse) FutureUtils.awaitResult(actorSelection, builder.build(), 30L) : handleMapTask(taskMaster, builder.build());
                } catch (TimeoutException e) {
                    LOGGER.warn("JobInstanceId={} WorkerMapTaskRequest dispatch timeout.", Long.valueOf(context.getJobInstanceId()), e);
                    if (0 >= MAX_RETRY_COUNT.intValue()) {
                        throw e;
                    }
                    Thread.sleep(Options.CONNECTION_TIMEOUT);
                    actorSelection = SchedulerxWorker.actorSystem.actorSelection(context.getInstanceMasterActorPath());
                    workerMapTaskResponse = (Worker.WorkerMapTaskResponse) FutureUtils.awaitResult(actorSelection, builder.build(), 30L);
                    int i10 = 0 + 1;
                }
                if (!workerMapTaskResponse.getSuccess()) {
                    LOGGER.error(workerMapTaskResponse.getMessage());
                    this.logCollector.collect(context.getUniqueId(), workerMapTaskResponse.getMessage());
                    processResult.setResult(workerMapTaskResponse.getMessage());
                    return processResult;
                }
                int i11 = i9;
                i9++;
                newArrayList.set(i11, null);
                if (workerMapTaskResponse.hasOverload() && workerMapTaskResponse.getOverload()) {
                    LOGGER.warn("Task Master is busy, sleeping a while {}s...", 10);
                    Thread.sleep(Options.CONNECTION_TIMEOUT);
                }
            }
            processResult.setStatus(true);
        } catch (Throwable th) {
            LOGGER.error("JobInstanceId={} WorkerMapTaskRequest dispatch error.", Long.valueOf(context.getJobInstanceId()), th);
            this.logCollector.collect(context.getUniqueId(), ExceptionUtil.getTrace(th));
            processResult.setResult(ExceptionUtil.getMessage(th));
        }
        return processResult;
    }

    private void checkTaskObject(Object obj) {
        JobContext context = ContainerFactory.getContainerPool().getContext();
        if (GroupManager.INSTANCE.isAdvancedVersion(context.getGroupId()) && (obj instanceof BizSubTask)) {
            Map<String, String> labelMap = ((BizSubTask) obj).labelMap();
            if (labelMap.size() > 3) {
                throw new RuntimeException("label map size can't beyond 3.");
            }
            for (Map.Entry<String, String> entry : labelMap.entrySet()) {
                if (entry.getKey().length() > 60 || entry.getValue().length() > 180) {
                    LOGGER.error("Job instance={} label map<{}, {}> content can't beyond max size(60,180).", Long.valueOf(context.getJobInstanceId()), Integer.valueOf(entry.getKey().length()), entry.getValue());
                    throw new RuntimeException("label map content can't beyond max size(60,180).");
                }
            }
        }
    }

    private Worker.WorkerMapTaskResponse handleMapTask(TaskMaster taskMaster, Worker.WorkerMapTaskRequest workerMapTaskRequest) throws Exception {
        Worker.WorkerMapTaskResponse build;
        try {
            long jobInstanceId = workerMapTaskRequest.getJobInstanceId();
            if (taskMaster == null) {
                build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("can't found TaskMaster by jobInstanceId=" + jobInstanceId).build();
            } else if (taskMaster instanceof MapTaskMaster) {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    boolean map = ((MapTaskMaster) taskMaster).map(workerMapTaskRequest.getTaskBodyList(), workerMapTaskRequest.getTaskName());
                    LOGGER.debug("jobInstanceId={} map, cost={}ms", Long.valueOf(jobInstanceId), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(true).setOverload(map).build();
                } catch (Exception e) {
                    LOGGER.error("jobInstanceId={} map error", e);
                    taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, ExceptionUtil.getMessage(e));
                    throw e;
                }
            } else {
                build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage("TaskMaster is not MapTaskMaster").build();
                taskMaster.updateNewInstanceStatus(taskMaster.getSerialNum(), InstanceStatus.FAILED, "TaskMaster is not MapTaskMaster");
            }
        } catch (Throwable th) {
            LOGGER.error("jobInstanceId={}, handleMapTask error.", Long.valueOf(workerMapTaskRequest.getJobInstanceId()), th);
            build = Worker.WorkerMapTaskResponse.newBuilder().setSuccess(false).setMessage(ExceptionUtil.getMessage(th)).build();
        }
        return build;
    }

    protected boolean isRootTask(JobContext jobContext) {
        return jobContext.getTaskName().equals(WorkerConstants.MAP_TASK_ROOT_NAME);
    }
}
