/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ageiport.processor.core.task.importer.worker;

import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.CollectionUtils;
import com.alibaba.ageiport.common.utils.IOUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.TaskIdUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.ext.file.store.FileStore;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.api.BizColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizDynamicColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeaders;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.cache.BigDataCache;
import com.alibaba.ageiport.processor.core.spi.client.CreateSubTasksRequest;
import com.alibaba.ageiport.processor.core.spi.convertor.Model;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileContext;
import com.alibaba.ageiport.processor.core.spi.file.FileReader;
import com.alibaba.ageiport.processor.core.spi.file.FileReaderFactory;
import com.alibaba.ageiport.processor.core.spi.file.FileWriter;
import com.alibaba.ageiport.processor.core.spi.file.FileWriterFactory;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.slice.SliceStrategy;
import com.alibaba.ageiport.processor.core.spi.task.stage.MainTaskStageProvider;
import com.alibaba.ageiport.processor.core.spi.task.stage.Stage;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractMainTaskWorker;
import com.alibaba.ageiport.processor.core.task.importer.ImportProcessor;
import com.alibaba.ageiport.processor.core.task.importer.adapter.ImportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.importer.api.BizImportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.importer.context.ImportMainTaskContext;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskRuntimeConfigImpl;
import com.alibaba.ageiport.processor.core.task.importer.model.ImportTaskSpecification;
import com.alibaba.ageiport.processor.core.task.importer.slice.ImportSlice;
import com.alibaba.ageiport.processor.core.task.importer.slice.ImportSliceStrategy;
import com.alibaba.ageiport.processor.core.utils.HeadersUtil;
import java.io.Closeable;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ImportMainTaskWorker<QUERY, DATA, VIEW>
extends AbstractMainTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ImportMainTaskWorker.class);

    public ImportMainTaskWorker(AgeiPort ageiPort, MainTask mainTask) {
        this.ageiPort = ageiPort;
        this.mainTask = mainTask;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doPrepare() {
        AgeiPort ageiPort = this.getAgeiPort();
        String mainTaskId = this.mainTask.getMainTaskId();
        String executeType = this.mainTask.getExecuteType();
        String taskType = this.mainTask.getType();
        String taskCode = this.mainTask.getCode();
        FileReader fileReader = null;
        try {
            TaskSpiSelector spiSelector = ageiPort.getTaskSpiSelector();
            MainTaskStageProvider stageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskStageProvider.class);
            MainTaskContextFactory contextFactory = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskContextFactory.class);
            ImportMainTaskContext context = (ImportMainTaskContext)((Object)contextFactory.create(ageiPort, this.mainTask));
            context.getMainTask().setStatus("EXECUTING");
            context.getMainTask().setGmtStart(new Date());
            context.save();
            context.setStage(stageProvider.mainTaskStart());
            ImportTaskSpecification taskSpec = context.getImportTaskSpec();
            ImportProcessor processor = taskSpec.getImportProcessor();
            ImportProcessorAdapter adapter = (ImportProcessorAdapter)processor.getConcreteAdapter();
            BizUser bizUser = context.getBizUser();
            Object query = context.getQuery();
            context.goNextStageEventNew();
            BizImportTaskRuntimeConfig runtimeConfig = adapter.taskRuntimeConfig(bizUser, query, processor, context);
            context.load(runtimeConfig);
            context.save();
            context.goNextStageEventNew();
            ImportTaskRuntimeConfig importTaskRuntimeConfig = context.getImportTaskRuntimeConfig();
            String fileType = importTaskRuntimeConfig.getFileType();
            context.goNextStageEventNew();
            Object resetQuery = adapter.resetQuery(bizUser, query, processor, context);
            context.load(resetQuery);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            BizColumnHeaders bizColumnHeaders = adapter.getHeaders(bizUser, query, processor, context);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            BizDynamicColumnHeaders bizDynamicColumnHeaders = adapter.getDynamicHeaders(bizUser, query, processor, context);
            context.goNextStageEventNew();
            ColumnHeaders columnHeaders = HeadersUtil.buildHeaders(bizColumnHeaders, taskSpec.getViewClass(), bizDynamicColumnHeaders);
            for (ColumnHeader columnHeader : columnHeaders.getColumnHeaders()) {
                if (columnHeader.getIgnoreHeader() != null) continue;
                columnHeader.setIgnoreHeader(false);
            }
            context.load(columnHeaders);
            context.goNextStageEventNew();
            String inputFileKey = (String)FeatureUtils.getFeature((String)this.mainTask.getFeature(), MainTaskFeatureKeys.INPUT_FILE_KEY);
            InputStream inputStream = ageiPort.getFileStore().get(inputFileKey, new HashMap());
            String fileReaderFactoryName = ageiPort.getOptions().getFileTypeReaderSpiMappings().get(fileType);
            FileReaderFactory fileReaderFactory = (FileReaderFactory)ExtensionLoader.getExtensionLoader(FileReaderFactory.class).getExtension(fileReaderFactoryName);
            FileContext fileContext = new FileContext();
            fileContext.setBizQuery(JsonUtil.toJsonString((Object)this.mainTask.getBizQuery()));
            fileContext.setTaskSpec(context.getImportTaskSpec());
            fileContext.setMainTask(this.mainTask);
            fileReader = fileReaderFactory.create(ageiPort, columnHeaders, fileContext);
            fileReader.read(inputStream);
            DataGroup dataGroup = fileReader.finish();
            context.load(dataGroup);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            String sliceStrategyName = importTaskRuntimeConfig.getTaskSliceStrategy();
            ImportSliceStrategy sliceStrategy = (ImportSliceStrategy)ExtensionLoader.getExtensionLoader(SliceStrategy.class).getExtension(sliceStrategyName);
            List slices = sliceStrategy.slice(context);
            context.load(slices);
            context.goNextStageEventNew();
            context.save();
            context.goNextStageEventNew();
            BigDataCache cache = ageiPort.getBigDataCacheManager().getBigDataCacheCache(this.mainTask.getExecuteType());
            ArrayList<CreateSubTasksRequest.SubTaskInstance> subTaskInstances = new ArrayList<CreateSubTasksRequest.SubTaskInstance>();
            for (ImportSlice slice : slices) {
                CreateSubTasksRequest.SubTaskInstance subTaskInstance = new CreateSubTasksRequest.SubTaskInstance();
                subTaskInstance.setSubTaskNo(slice.getNo());
                subTaskInstance.setBizQuery(slice.getQueryJson());
                Map<String, Object> mainRuntimeParam = Model.toMap(importTaskRuntimeConfig);
                ImportTaskRuntimeConfigImpl sliceRuntimeConfig = Model.toModel(mainRuntimeParam, new ImportTaskRuntimeConfigImpl());
                sliceRuntimeConfig.setNo(slice.getNo());
                sliceRuntimeConfig.setPageSize(slice.getPageSize());
                String runtimeParam = FeatureUtils.merge((String)JsonUtil.toJsonString(mainRuntimeParam), (String)JsonUtil.toJsonString((Object)sliceRuntimeConfig));
                subTaskInstance.setRuntimeParam(runtimeParam);
                subTaskInstances.add(subTaskInstance);
                String subTaskId = TaskIdUtil.genSubTaskId((String)mainTaskId, (Integer)slice.getNo());
                String key = subTaskId + ".InputDataSlice";
                cache.put(key, slice.getDataGroup());
            }
            CreateSubTasksRequest createSubTasksRequest = new CreateSubTasksRequest();
            createSubTasksRequest.setMainTaskId(mainTaskId);
            createSubTasksRequest.setSubTaskInstances(subTaskInstances);
            ageiPort.getTaskServerClient().createSubTask(createSubTasksRequest);
            context.goNextStageEventNew();
            context.assertCurrentStage(stageProvider.mainTaskSaveSliceEnd());
            SubTaskStageProvider subTaskStageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, SubTaskStageProvider.class);
            Stage subTaskCreated = subTaskStageProvider.subTaskCreated();
            for (ImportSlice slice : slices) {
                String subTaskId = TaskIdUtil.genSubTaskId((String)mainTaskId, (Integer)slice.getNo());
                ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(subTaskId, subTaskCreated));
            }
        }
        catch (Throwable e) {
            try {
                log.error("doPrepare failed, main:{}", new Object[]{mainTaskId, e});
                ageiPort.onMainError(this.mainTask, e);
            }
            catch (Throwable throwable) {
                throw throwable;
            }
            finally {
                IOUtils.closeQuietly(fileReader);
            }
        }
        IOUtils.closeQuietly((Closeable)fileReader);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doReduce() {
        block11: {
            MainTask mainTask;
            InputStream fileStream;
            FileWriter fileWriter;
            block10: {
                fileWriter = null;
                fileStream = null;
                mainTask = this.getMainTask();
                if (!mainTask.getStatus().equals("ERROR")) break block10;
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(fileStream);
                return;
            }
            try {
                String executeType = mainTask.getExecuteType();
                String taskType = mainTask.getType();
                String taskCode = mainTask.getCode();
                TaskSpiSelector spiSelector = this.ageiPort.getTaskSpiSelector();
                MainTaskContextFactory contextFactory = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskContextFactory.class);
                MainTaskStageProvider stageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskStageProvider.class);
                ImportMainTaskContext context = (ImportMainTaskContext)((Object)contextFactory.create(this.ageiPort, mainTask));
                context.setStage(stageProvider.mainTaskReduceStart());
                context.eventCurrentStage();
                ImportTaskRuntimeConfig runtimeConfig = context.getImportTaskRuntimeConfig();
                ArrayList<String> subTaskExistView = new ArrayList<String>();
                BigDataCache cache = this.ageiPort.getBigDataCacheManager().getBigDataCacheCache(mainTask.getExecuteType());
                for (int i = 1; i <= mainTask.getSubTotalCount(); ++i) {
                    String subTaskId = TaskIdUtil.genSubTaskId((String)mainTask.getMainTaskId(), (Integer)i);
                    if (!cache.exist(subTaskId)) continue;
                    subTaskExistView.add(subTaskId);
                }
                if (CollectionUtils.isNotEmpty(subTaskExistView)) {
                    String fileWriterFactoryName = this.ageiPort.getOptions().getFileTypeWriterSpiMappings().get(runtimeConfig.getFileType());
                    FileWriterFactory fileWriterFactory = (FileWriterFactory)ExtensionLoader.getExtensionLoader(FileWriterFactory.class).getExtension(fileWriterFactoryName);
                    ColumnHeaders columnHeaders = context.getColumnHeaders();
                    FileContext fileContext = new FileContext();
                    fileContext.setBizQuery(JsonUtil.toJsonString((Object)mainTask.getBizQuery()));
                    fileContext.setTaskSpec(context.getImportTaskSpec());
                    fileContext.setMainTask(mainTask);
                    fileWriter = fileWriterFactory.create(this.ageiPort, columnHeaders, fileContext);
                    for (String subTaskId : subTaskExistView) {
                        DataGroup dataGroup = cache.remove(subTaskId, DataGroup.class);
                        fileWriter.write(dataGroup);
                    }
                    fileStream = fileWriter.finish();
                }
                context.goNextStageEventNew();
                context.goNextStageEventNew();
                String key = mainTask.getMainTaskId() + "." + runtimeConfig.getFileType();
                if (CollectionUtils.isNotEmpty(subTaskExistView)) {
                    FileStore fileStore = this.ageiPort.getFileStore();
                    fileStore.save(key, fileStream, new HashMap());
                }
                MainTask contextMainTask = context.getMainTask();
                String feature = FeatureUtils.putFeature((String)contextMainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY, (Object)key);
                contextMainTask.setFeature(feature);
                context.goNextStageEventNew();
                this.onFinished(context);
                context.goNextStageEventNew();
                context.assertCurrentStage(stageProvider.mainTaskFinished());
            }
            catch (Throwable e) {
                try {
                    log.error("StandaloneExportMainTaskWorker#doReduce failed, main:{}", new Object[]{mainTask.getMainTaskId(), e});
                    this.ageiPort.onMainError(mainTask, e);
                    break block11;
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    IOUtils.closeQuietly(fileWriter);
                    IOUtils.closeQuietly(fileStream);
                }
            }
            IOUtils.closeQuietly((Closeable)fileWriter);
            IOUtils.closeQuietly(fileStream);
        }
    }
}

