package com.alibaba.schedulerx.worker.pull;

import com.alibaba.schedulerx.shade.com.google.common.collect.Maps;
import com.alibaba.schedulerx.shade.com.google.common.collect.Sets;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandler;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.batch.ReqQueue;
import com.alibaba.schedulerx.worker.container.ContainerFactory;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:com/alibaba/schedulerx/worker/pull/PullManager.class */
public enum PullManager {
    INSTANCE;

    private Map<Long, BlockingContainerQueue> queueMap = Maps.newConcurrentMap();
    private Map<Long, PullThread> pullThreadMap = Maps.newConcurrentMap();
    private Map<Long, ConsumerThread[]> consumerThreadMap = Maps.newConcurrentMap();
    private ContainerStatusReqHandlerPool statusReqBatchHandlerPool = ContainerStatusReqHandlerPool.INSTANCE;
    private Set<Long> crashedInstanceSet = Sets.newConcurrentHashSet();
    private static Logger LOGGER = LogFactory.getLogger(PullManager.class);

    PullManager() {
    }

    public void init(long j, int i, int i2, int i3, String str) throws Exception {
        if (this.queueMap.containsKey(Long.valueOf(j))) {
            return;
        }
        BlockingContainerQueue blockingContainerQueue = new BlockingContainerQueue(i2);
        this.queueMap.put(Long.valueOf(j), blockingContainerQueue);
        PullThread pullThread = new PullThread(j, i, str, blockingContainerQueue);
        pullThread.start();
        this.pullThreadMap.put(Long.valueOf(j), pullThread);
        if (!this.statusReqBatchHandlerPool.contains(j)) {
            ReqQueue reqQueue = new ReqQueue(j, 100000);
            reqQueue.init();
            this.statusReqBatchHandlerPool.start(j, new ContainerStatusReqHandler<>(j, 1, 1, 3000, reqQueue, str));
        }
        ConsumerThread[] consumerThreadArr = new ConsumerThread[i3];
        for (int i4 = 0; i4 < i3; i4++) {
            consumerThreadArr[i4] = new ConsumerThread(blockingContainerQueue, ContainerFactory.getContainerPool(), str);
            new Thread(consumerThreadArr[i4], "Schedulerx-ConsumerThread-" + j + "-" + i4).start();
        }
        this.consumerThreadMap.put(Long.valueOf(j), consumerThreadArr);
    }

    public void crash(long j) {
        this.crashedInstanceSet.add(Long.valueOf(j));
    }

    public void stop(long j) {
        if (this.pullThreadMap.containsKey(Long.valueOf(j))) {
            this.pullThreadMap.get(Long.valueOf(j)).stopRunning();
            this.pullThreadMap.remove(Long.valueOf(j));
        }
        if (this.consumerThreadMap.containsKey(Long.valueOf(j))) {
            for (ConsumerThread consumerThread : this.consumerThreadMap.get(Long.valueOf(j))) {
                consumerThread.stopRunning();
            }
            this.consumerThreadMap.remove(Long.valueOf(j));
        }
        if (this.queueMap.containsKey(Long.valueOf(j))) {
            this.queueMap.get(Long.valueOf(j)).clear();
            this.queueMap.remove(Long.valueOf(j));
        }
        this.crashedInstanceSet.remove(Long.valueOf(j));
    }

    public boolean contains(long j) {
        return this.queueMap.containsKey(Long.valueOf(j));
    }

    public boolean isCrashed(long j) {
        return this.crashedInstanceSet.contains(Long.valueOf(j));
    }
}
