/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ageiport.processor.core.task.exporter.worker;

import com.alibaba.ageiport.common.feature.FeatureUtils;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.IOUtils;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.TaskIdUtil;
import com.alibaba.ageiport.ext.arch.ExtensionLoader;
import com.alibaba.ageiport.ext.file.store.FileStore;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.Processor;
import com.alibaba.ageiport.processor.core.constants.MainTaskFeatureKeys;
import com.alibaba.ageiport.processor.core.model.api.BizColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizDynamicColumnHeaders;
import com.alibaba.ageiport.processor.core.model.api.BizUser;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeader;
import com.alibaba.ageiport.processor.core.model.core.ColumnHeaders;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.cache.BigDataCache;
import com.alibaba.ageiport.processor.core.spi.cache.BigDataCacheManager;
import com.alibaba.ageiport.processor.core.spi.client.CreateSubTasksRequest;
import com.alibaba.ageiport.processor.core.spi.convertor.Model;
import com.alibaba.ageiport.processor.core.spi.file.DataGroup;
import com.alibaba.ageiport.processor.core.spi.file.FileContext;
import com.alibaba.ageiport.processor.core.spi.file.FileWriter;
import com.alibaba.ageiport.processor.core.spi.file.FileWriterFactory;
import com.alibaba.ageiport.processor.core.spi.task.factory.MainTaskContextFactory;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.ageiport.processor.core.spi.task.selector.TaskSpiSelector;
import com.alibaba.ageiport.processor.core.spi.task.slice.SliceStrategy;
import com.alibaba.ageiport.processor.core.spi.task.stage.MainTaskStageProvider;
import com.alibaba.ageiport.processor.core.spi.task.stage.Stage;
import com.alibaba.ageiport.processor.core.spi.task.stage.SubTaskStageProvider;
import com.alibaba.ageiport.processor.core.task.AbstractMainTaskWorker;
import com.alibaba.ageiport.processor.core.task.exporter.adapter.ExportProcessorAdapter;
import com.alibaba.ageiport.processor.core.task.exporter.api.BizExportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.exporter.context.ExportMainTaskContext;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskRuntimeConfig;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskRuntimeConfigImpl;
import com.alibaba.ageiport.processor.core.task.exporter.model.ExportTaskSpecification;
import com.alibaba.ageiport.processor.core.task.exporter.slice.ExportSlice;
import com.alibaba.ageiport.processor.core.task.exporter.slice.ExportSliceStrategy;
import com.alibaba.ageiport.processor.core.utils.HeadersUtil;
import java.io.Closeable;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ExportMainTaskWorker<QUERY, DATA, VIEW>
extends AbstractMainTaskWorker {
    public static Logger log = LoggerFactory.getLogger(ExportMainTaskWorker.class);

    public ExportMainTaskWorker(AgeiPort ageiPort, MainTask mainTask) {
        this.ageiPort = ageiPort;
        this.mainTask = mainTask;
    }

    @Override
    public void doPrepare() {
        AgeiPort ageiPort = this.getAgeiPort();
        String mainTaskId = this.mainTask.getMainTaskId();
        String executeType = this.mainTask.getExecuteType();
        String taskType = this.mainTask.getType();
        String taskCode = this.mainTask.getCode();
        try {
            TaskSpiSelector spiSelector = ageiPort.getTaskSpiSelector();
            MainTaskStageProvider stageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskStageProvider.class);
            MainTaskContextFactory contextFactory = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskContextFactory.class);
            ExportMainTaskContext context = (ExportMainTaskContext)((Object)contextFactory.create(ageiPort, this.mainTask));
            context.getMainTask().setStatus("EXECUTING");
            context.getMainTask().setGmtStart(new Date());
            context.save();
            context.setStage(stageProvider.mainTaskStart());
            ExportTaskSpecification taskSpec = context.getExportTaskSpec();
            Processor processor = taskSpec.getProcessor();
            ExportProcessorAdapter adapter = (ExportProcessorAdapter)processor.getConcreteAdapter();
            BizUser bizUser = context.getBizUser();
            Object query = context.getQuery();
            context.goNextStageEventNew();
            BizExportTaskRuntimeConfig bizExportTaskRuntimeConfig = adapter.taskRuntimeConfig(bizUser, query, processor, context);
            context.load(bizExportTaskRuntimeConfig);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            Object resetQuery = adapter.resetQuery(bizUser, query, processor, context);
            context.load(resetQuery);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            Integer totalCount = adapter.totalCount(bizUser, query, processor, context);
            context.load(totalCount);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            BizColumnHeaders bizColumnHeaders = adapter.getHeaders(bizUser, query, processor, context);
            context.goNextStageEventNew();
            context.goNextStageEventNew();
            BizDynamicColumnHeaders bizDynamicColumnHeaders = adapter.getDynamicHeaders(bizUser, query, processor, context);
            context.goNextStageEventNew();
            ColumnHeaders columnHeaders = HeadersUtil.buildHeaders(bizColumnHeaders, taskSpec.getViewClass(), bizDynamicColumnHeaders);
            for (ColumnHeader columnHeader : columnHeaders.getColumnHeaders()) {
                if (columnHeader.getIgnoreHeader() != null) continue;
                columnHeader.setIgnoreHeader(columnHeader.getErrorHeader());
            }
            context.load(columnHeaders);
            context.goNextStageEventNew();
            String sliceStrategyName = context.getExportTaskRuntimeConfig().getTaskSliceStrategy();
            ExportSliceStrategy sliceStrategy = (ExportSliceStrategy)ExtensionLoader.getExtensionLoader(SliceStrategy.class).getExtension(sliceStrategyName);
            List slices = sliceStrategy.slice(context);
            context.load(slices);
            context.goNextStageEventNew();
            context.save();
            context.goNextStageEventNew();
            if (totalCount == 0) {
                this.doReduce();
                return;
            }
            ArrayList<CreateSubTasksRequest.SubTaskInstance> subTaskInstances = new ArrayList<CreateSubTasksRequest.SubTaskInstance>();
            for (ExportSlice slice : slices) {
                CreateSubTasksRequest.SubTaskInstance subTaskInstance = new CreateSubTasksRequest.SubTaskInstance();
                subTaskInstance.setSubTaskNo(slice.getNo());
                subTaskInstance.setBizQuery(slice.getQueryJson());
                Map<String, Object> mainRuntimeParam = Model.toMap(context.getExportTaskRuntimeConfig());
                ExportTaskRuntimeConfigImpl sliceRuntimeConfig = Model.toModel(mainRuntimeParam, new ExportTaskRuntimeConfigImpl());
                sliceRuntimeConfig.setNo(slice.getNo());
                sliceRuntimeConfig.setPageOffset(slice.getOffset());
                sliceRuntimeConfig.setPageSize(slice.getSize());
                String runtimeParam = FeatureUtils.merge((String)JsonUtil.toJsonString(mainRuntimeParam), (String)JsonUtil.toJsonString((Object)sliceRuntimeConfig));
                subTaskInstance.setRuntimeParam(runtimeParam);
                subTaskInstances.add(subTaskInstance);
            }
            CreateSubTasksRequest createSubTasksRequest = new CreateSubTasksRequest();
            createSubTasksRequest.setMainTaskId(mainTaskId);
            createSubTasksRequest.setSubTaskInstances(subTaskInstances);
            ageiPort.getTaskServerClient().createSubTask(createSubTasksRequest);
            SubTaskStageProvider subTaskStageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, SubTaskStageProvider.class);
            Stage subTaskCreated = subTaskStageProvider.subTaskCreated();
            for (ExportSlice slice : slices) {
                String subTaskId = TaskIdUtil.genSubTaskId((String)mainTaskId, (Integer)slice.getNo());
                ageiPort.getEventBusManager().getEventBus(executeType).post(TaskStageEvent.subTaskEvent(subTaskId, subTaskCreated));
            }
            context.goNextStageEventNew();
            context.assertCurrentStage(stageProvider.mainTaskSaveSliceEnd());
        }
        catch (Throwable e) {
            log.error("StandaloneExportMainTaskWorker#doPrepare failed, main:{}", new Object[]{mainTaskId, e});
            ageiPort.onMainError(this.mainTask, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void doReduce() {
        block8: {
            MainTask mainTask;
            InputStream fileStream;
            FileWriter fileWriter;
            block7: {
                fileWriter = null;
                fileStream = null;
                mainTask = this.getMainTask();
                if (!mainTask.getStatus().equals("ERROR")) break block7;
                IOUtils.closeQuietly(fileWriter);
                IOUtils.closeQuietly(fileStream);
                return;
            }
            try {
                String executeType = mainTask.getExecuteType();
                String taskType = mainTask.getType();
                String taskCode = mainTask.getCode();
                TaskSpiSelector spiSelector = this.ageiPort.getTaskSpiSelector();
                MainTaskContextFactory contextFactory = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskContextFactory.class);
                MainTaskStageProvider stageProvider = spiSelector.selectExtension(executeType, taskType, taskCode, MainTaskStageProvider.class);
                ExportMainTaskContext context = (ExportMainTaskContext)((Object)contextFactory.create(this.ageiPort, mainTask));
                context.setStage(stageProvider.mainTaskReduceStart());
                context.eventCurrentStage();
                ExportTaskRuntimeConfig runtimeConfig = context.getExportTaskRuntimeConfig();
                String fileWriterFactoryName = this.ageiPort.getOptions().getFileTypeWriterSpiMappings().get(runtimeConfig.getFileType());
                FileWriterFactory fileWriterFactory = (FileWriterFactory)ExtensionLoader.getExtensionLoader(FileWriterFactory.class).getExtension(fileWriterFactoryName);
                ColumnHeaders columnHeaders = context.getColumnHeaders();
                FileContext fileContext = new FileContext();
                fileContext.setBizQuery(JsonUtil.toJsonString((Object)mainTask.getBizQuery()));
                fileContext.setTaskSpec(context.getExportTaskSpec());
                fileContext.setMainTask(mainTask);
                fileWriter = fileWriterFactory.create(this.ageiPort, columnHeaders, fileContext);
                for (int i = 1; i <= mainTask.getSubTotalCount(); ++i) {
                    String subTaskId = TaskIdUtil.genSubTaskId((String)mainTask.getMainTaskId(), (Integer)i);
                    BigDataCacheManager bigDataCacheManager = this.ageiPort.getBigDataCacheManager();
                    BigDataCache cache = bigDataCacheManager.getBigDataCacheCache(mainTask.getExecuteType());
                    DataGroup dataGroup = cache.remove(subTaskId, DataGroup.class);
                    fileWriter.write(dataGroup);
                }
                fileStream = fileWriter.finish();
                context.goNextStageEventNew();
                context.goNextStageEventNew();
                FileStore fileStore = this.ageiPort.getFileStore();
                String key = mainTask.getMainTaskId() + "." + runtimeConfig.getFileType();
                fileStore.save(key, fileStream, new HashMap());
                MainTask contextMainTask = context.getMainTask();
                String feature = FeatureUtils.putFeature((String)contextMainTask.getFeature(), MainTaskFeatureKeys.OUTPUT_FILE_KEY, (Object)key);
                contextMainTask.setFeature(feature);
                context.goNextStageEventNew();
                this.onFinished(context);
                context.goNextStageEventNew();
                context.assertCurrentStage(stageProvider.mainTaskFinished());
            }
            catch (Throwable e) {
                try {
                    log.error("StandaloneExportMainTaskWorker#doReduce failed, main:{}", new Object[]{mainTask.getMainTaskId(), e});
                    this.ageiPort.onMainError(mainTask, e);
                    break block8;
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    IOUtils.closeQuietly(fileWriter);
                    IOUtils.closeQuietly(fileStream);
                }
            }
            IOUtils.closeQuietly((Closeable)fileWriter);
            IOUtils.closeQuietly((Closeable)fileStream);
        }
    }
}

