/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.trace;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.common.ClientInstance;
import com.jcloud.jcq.client.common.QueueSelectStrategy;
import com.jcloud.jcq.client.common.impl.HashQueueSelectTrategy;
import com.jcloud.jcq.client.producer.AsyncSendCallback;
import com.jcloud.jcq.client.producer.Producer;
import com.jcloud.jcq.client.producer.ProducerConfig;
import com.jcloud.jcq.client.producer.ProducerFactory;
import com.jcloud.jcq.client.trace.TraceDispatcher;
import com.jcloud.jcq.common.queue.QueueRouteInfo;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.trace.TracePoint;
import com.jcloud.jcq.common.trace.TraceStringHelper;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.common.utils.SystemUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.client.SendMessageResponse;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTraceDispatcherImpl
implements TraceDispatcher {
    private static final Logger logger = LoggerFactory.getLogger((String)DefaultTraceDispatcherImpl.class.getName());
    private String traceTopic;
    private String tenantId;
    private Producer traceProducer;
    private String hostClientIp;
    private ClientInstance hostClient;
    private Map<String, List<TracePoint>> consumerTraceMap;
    private BlockingQueue<TracePoint> traceQueue;
    private final int batchSize;
    private BlockingQueue<Runnable> sendTraceThreadPoolQueue;
    private ExecutorService sendTraceExecutor;
    private ExecutorService fetchTraceExecutor;
    private QueueSelectStrategy queueSelectStrategy;
    private AtomicBoolean isRunning = new AtomicBoolean(false);

    public DefaultTraceDispatcherImpl(ClientInstance clientInstance) {
        this.batchSize = 32;
        this.traceQueue = new ArrayBlockingQueue<TracePoint>(2048);
        this.consumerTraceMap = new HashMap<String, List<TracePoint>>();
        this.sendTraceThreadPoolQueue = new LinkedBlockingQueue<Runnable>(1024);
        this.queueSelectStrategy = new HashQueueSelectTrategy();
        this.sendTraceExecutor = new ThreadPoolExecutor(2, 2, 60000L, TimeUnit.MILLISECONDS, this.sendTraceThreadPoolQueue, new ThreadFactoryImpl("TraceProduceThread_"));
        this.fetchTraceExecutor = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryImpl("FetchTraceThread_"));
        this.hostClient = clientInstance;
        this.hostClientIp = SystemUtils.getLocalIpAddress();
        ProducerConfig producerConfig = new ProducerConfig();
        producerConfig.setMaxMsgNumsPerBatch(this.batchSize);
        producerConfig.setEnableCompress(true);
        producerConfig.setForTrace(true);
        producerConfig.setMetaServerAddress(this.hostClient.getClientConfig().getMetaServerAddress());
        try {
            this.traceProducer = ProducerFactory.getInstance().createProducer(clientInstance.getAccessKey(), clientInstance.getSecretKey(), producerConfig);
            this.traceProducer.getClientConfig().setToken(clientInstance.getClientConfig().getToken());
        }
        catch (ClientException e) {
            logger.warn("failed to create trace producer.");
        }
    }

    @Override
    public void start() {
        if (!this.isRunning.compareAndSet(false, true)) {
            logger.warn("trace dispatcher already started.");
            return;
        }
        try {
            this.traceProducer.start();
        }
        catch (ClientException e) {
            logger.warn("failed to start trace producer.");
        }
        this.traceTopic = this.traceProducer.getQueueSelector().getTraceTopic();
        this.tenantId = this.traceProducer.getQueueSelector().getTenantId();
        this.fetchTraceExecutor.submit(new FetchTraceTask());
    }

    @Override
    public boolean append(List<TracePoint> tracePoints) {
        boolean flag = true;
        if (tracePoints == null || tracePoints.isEmpty()) {
            return true;
        }
        for (TracePoint tracePoint : tracePoints) {
            tracePoint.setClientAddress(this.hostClientIp);
            flag &= this.traceQueue.offer(tracePoint);
        }
        if (flag) {
            logger.debug("TracePoints: {} appended.", tracePoints);
        } else {
            logger.debug("TracePoints: {} append failed.", tracePoints);
        }
        return flag;
    }

    @Override
    public boolean append(List<TracePoint> tracePoints, String brokerAddress, long ackIndex) {
        if (!this.append(tracePoints)) {
            return false;
        }
        this.consumerTraceMap.put(StringUtils.join(brokerAddress, String.valueOf(ackIndex)), tracePoints);
        logger.debug("TracePoints: {}, BrokerAddress: {}, AckIndex: {} appended.", new Object[]{tracePoints, brokerAddress, ackIndex});
        return true;
    }

    @Override
    public List<TracePoint> getAndRemoveTracePoints(String brokerAddress, long ackIndex) {
        return this.consumerTraceMap.remove(StringUtils.join(brokerAddress, String.valueOf(ackIndex)));
    }

    @Override
    public void waitForFlush() {
        long endTime = System.currentTimeMillis() + 1000L;
        while (!(this.traceQueue.isEmpty() && this.sendTraceThreadPoolQueue.isEmpty() || System.currentTimeMillis() >= endTime)) {
            try {
                Thread.sleep(5L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        logger.info("Trace queue flush done. Left trace: {}, task: {}", (Object)this.traceQueue.size(), (Object)this.sendTraceThreadPoolQueue.size());
    }

    @Override
    public void shutdown() {
        this.waitForFlush();
        try {
            this.traceProducer.shutdown();
        }
        catch (ClientException e) {
            logger.warn("ClientException happened when shutdown trace producer.");
        }
        this.fetchTraceExecutor.shutdown();
        this.sendTraceExecutor.shutdown();
    }

    class SendTraceMessageTask
    implements Runnable {
        List<TracePoint> tracePoints;

        public SendTraceMessageTask(List<TracePoint> tracePoints) {
            this.tracePoints = tracePoints;
        }

        @Override
        public void run() {
            try {
                this.sendTraceMessage();
            }
            catch (ClientException e) {
                logger.warn("Exception happened when send trace message.", (Throwable)e);
            }
        }

        private void sendTraceMessage() throws ClientException {
            ArrayList<Message> traceMessages = new ArrayList<Message>();
            List<QueueRouteInfo> queueRouteInfos = DefaultTraceDispatcherImpl.this.traceProducer.getQueueSelector().getTraceQueueRoutes();
            QueueRouteInfo queueRouteInfo = DefaultTraceDispatcherImpl.this.queueSelectStrategy.selectQueue(DefaultTraceDispatcherImpl.this.hostClient.getInstanceId(), queueRouteInfos);
            if (queueRouteInfo == null) {
                logger.warn("Failed to get Trace queue in cluster.");
                return;
            }
            int queueId = queueRouteInfo.getQueueId();
            for (TracePoint tracePoint : this.tracePoints) {
                tracePoint.setTenantId(DefaultTraceDispatcherImpl.this.tenantId);
                String traceString = TraceStringHelper.getTraceStringFromTracePoint(tracePoint);
                Message message = new Message();
                message.setTopic(DefaultTraceDispatcherImpl.this.traceTopic);
                message.setMessageId(tracePoint.getMessageId());
                message.setBody(traceString.getBytes(Charset.forName("UTF-8")));
                message.setQueueId(queueId);
                traceMessages.add(message);
            }
            DefaultTraceDispatcherImpl.this.traceProducer.sendMessageAsync(traceMessages, new AsyncSendCallback(){

                @Override
                public void onResponse(SendMessageResponse response) {
                    logger.debug("Got Send trace messages response: {}.", (Object)response);
                }

                @Override
                public void onException(Throwable throwable) {
                    logger.warn("Exception happened when async send message.", throwable);
                }
            }, 0L, queueId);
        }
    }

    class FetchTraceTask
    implements Runnable {
        FetchTraceTask() {
        }

        @Override
        public void run() {
            while (DefaultTraceDispatcherImpl.this.isRunning.get()) {
                ArrayList<TracePoint> tracePoints = new ArrayList<TracePoint>();
                for (int i = 0; i < DefaultTraceDispatcherImpl.this.batchSize; ++i) {
                    TracePoint tracePoint = null;
                    try {
                        tracePoint = (TracePoint)DefaultTraceDispatcherImpl.this.traceQueue.poll(5L, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        logger.warn("Exception happened when poll TracePoint from traceQueue.", (Throwable)e);
                    }
                    if (tracePoint == null) break;
                    tracePoints.add(tracePoint);
                }
                if (tracePoints.isEmpty()) continue;
                DefaultTraceDispatcherImpl.this.sendTraceExecutor.submit(new SendTraceMessageTask(tracePoints));
            }
        }
    }
}

