/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.consumer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.common.RemotingApiWrapper;
import com.jcloud.jcq.client.consumer.AsyncAckCallback;
import com.jcloud.jcq.client.consumer.ConsumeResult;
import com.jcloud.jcq.client.consumer.ConsumeService;
import com.jcloud.jcq.client.consumer.MessageListener;
import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.common.message.AckAction;
import com.jcloud.jcq.common.thread.ThreadFactoryImpl;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.ResponseCode;
import com.jcloud.jcq.protocol.client.AckMessageRequest;
import com.jcloud.jcq.protocol.client.AckMessageResponse;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumeServiceImpl
implements ConsumeService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumeServiceImpl.class);
    private ConcurrentMap<String, MessageListener> topicListenerMap = new ConcurrentHashMap<String, MessageListener>();
    private SubscribeConsumer consumer;
    private RemotingApiWrapper remotingApiWrapper = RemotingApiWrapper.getInstance();
    private BlockingDeque<Runnable> consumeQueue;
    private BlockingDeque<Runnable> ackQueue;
    private ExecutorService consumeExecutor;
    private ExecutorService ackExecutor;

    public ConsumeServiceImpl(SubscribeConsumer subscribeConsumer) {
        this.consumer = subscribeConsumer;
        this.consumeQueue = new LinkedBlockingDeque<Runnable>(subscribeConsumer.getConsumerConfig().getMessageBufferSize());
        this.ackQueue = new LinkedBlockingDeque<Runnable>(subscribeConsumer.getConsumerConfig().getAckBufferSize());
        this.consumeExecutor = new ThreadPoolExecutor(this.consumer.getConsumerConfig().getConsumePoolCoreSize(), this.consumer.getConsumerConfig().getConsumePoolCoreSize() * 2, 60000L, TimeUnit.MILLISECONDS, this.consumeQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
        this.ackExecutor = new ThreadPoolExecutor(this.consumer.getConsumerConfig().getAckPoolCoreSize(), this.consumer.getConsumerConfig().getAckPoolCoreSize() * 2, 60000L, TimeUnit.MILLISECONDS, this.ackQueue, new ThreadFactoryImpl("AckMessageThread_"));
    }

    @Override
    public void registerTopic(String topic, MessageListener messageListener) {
        this.topicListenerMap.put(topic, messageListener);
    }

    @Override
    public void unregisterTopic(String topic) {
        this.topicListenerMap.remove(topic);
    }

    @Override
    public void submitMessages(String topic, List<Message> messages, long ackIndex, String brokerGroupId) {
        try {
            this.consumeExecutor.submit(new ConsumeTask(topic, messages, ackIndex, brokerGroupId));
        }
        catch (RejectedExecutionException e) {
            logger.warn("rejected by consume executor when submitting consume task for messages:{}, possibly message buffer is full.", messages);
        }
    }

    @Override
    public void start() {
    }

    @Override
    public void shutdown() {
        this.consumeExecutor.shutdown();
        this.ackExecutor.shutdown();
    }

    class AckTask
    implements Runnable {
        private String topic;
        private long ackIndex;
        private AckAction ackAction;
        private String brokerGroupId;

        public AckTask(String topic, AckAction ackAction, long ackIndex, String brokerGroupId) {
            this.topic = topic;
            this.ackAction = ackAction;
            this.ackIndex = ackIndex;
            this.brokerGroupId = brokerGroupId;
        }

        @Override
        public void run() {
            final AckMessageRequest ackMessageRequest = new AckMessageRequest();
            ackMessageRequest.setTopic(this.topic);
            ackMessageRequest.setAckAction(this.ackAction);
            ackMessageRequest.setAckIndex(this.ackIndex);
            ackMessageRequest.setConsumerGroupId(ConsumeServiceImpl.this.consumer.getConsumerGroupId());
            try {
                final String brokerAddress = (String)ConsumeServiceImpl.this.consumer.getQueueSelector().getBrokerGroupAddressMap().get(this.brokerGroupId);
                if (StringUtils.isEmpty(brokerAddress)) {
                    logger.warn("cannot get address of broker group:{} when do ack for topic:{} ackIndex:{} ackAction:{}", new Object[]{this.brokerGroupId, this.topic, this.ackIndex, this.ackAction});
                    return;
                }
                ConsumeServiceImpl.this.remotingApiWrapper.async(ConsumeServiceImpl.this.consumer, brokerAddress, ackMessageRequest, new AsyncAckCallback(){
                    String requestId;
                    {
                        this.requestId = ackMessageRequest.getRequestId();
                    }

                    @Override
                    public void onResponse(AckMessageResponse response) {
                        logger.info("ackIndex: {}, ackAction: {}, ackMessage to broker: {} done, response: {}", new Object[]{AckTask.this.ackIndex, AckTask.this.ackAction.name(), brokerAddress, ResponseCode.getName(response.getResponseCode())});
                        if (ConsumeServiceImpl.this.consumer.isMessageTraceOn()) {
                            ConsumeServiceImpl.this.consumer.completeAndAppendAfterConsumeTracePoints(response, brokerAddress, AckTask.this.ackIndex, AckTask.this.ackAction == AckAction.SUCCESS && response.success());
                        }
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        logger.warn("got exception {} when ack ackIndex: {}", (Object)throwable, (Object)AckTask.this.ackIndex);
                        if (ConsumeServiceImpl.this.consumer.isMessageTraceOn()) {
                            AckMessageResponse response = new AckMessageResponse();
                            response.setRequestId(this.requestId);
                            ConsumeServiceImpl.this.consumer.completeAndAppendAfterConsumeTracePoints(response, brokerAddress, AckTask.this.ackIndex, false);
                        }
                    }
                }, AckMessageResponse.class);
            }
            catch (ClientException e) {
                logger.warn("got exception {} when ack ackIndex:{}", (Object)e, (Object)this.ackIndex);
            }
        }
    }

    class ConsumeTask
    implements Runnable {
        private String topic;
        private List<Message> messages;
        private long ackIndex;
        private String brokerGroupId;
        private int retryTimes;

        public ConsumeTask(String topic, List<Message> messages, long ackIndex, String brokerGroupId) {
            this(topic, messages, ackIndex, brokerGroupId, 0);
        }

        public ConsumeTask(String topic, List<Message> messages, long ackIndex, String brokerGroupId, int retryTimes) {
            this.topic = topic;
            this.messages = messages;
            this.ackIndex = ackIndex;
            this.brokerGroupId = brokerGroupId;
            this.retryTimes = retryTimes;
        }

        @Override
        public void run() {
            AckAction ackAction = AckAction.SUCCESS;
            MessageListener messageListener = (MessageListener)ConsumeServiceImpl.this.topicListenerMap.get(this.topic);
            if (messageListener == null) {
                logger.warn("messageListener is null for topic {}", (Object)this.topic);
                return;
            }
            ConsumeResult consumeResult = null;
            try {
                consumeResult = messageListener.consumeMessages(this.messages);
            }
            catch (Exception e) {
                logger.warn("got exception {} when consume messages {}", (Object)e, this.messages);
                consumeResult = ConsumeResult.FAILED;
            }
            if (consumeResult != ConsumeResult.SUCCESS) {
                logger.warn("consume {} when consume messages:{}, retryTimes:{}", new Object[]{consumeResult.name(), this.messages, this.retryTimes});
                if (this.retryTimes < ConsumeServiceImpl.this.consumer.getConsumerConfig().getConsumeMaxRetryTimes()) {
                    ConsumeServiceImpl.this.consumeExecutor.submit(new ConsumeTask(this.topic, this.messages, this.ackIndex, this.brokerGroupId, this.retryTimes + 1));
                    return;
                }
                ackAction = AckAction.CONSUME_FAILED;
            }
            ConsumeServiceImpl.this.ackExecutor.submit(new AckTask(this.topic, ackAction, this.ackIndex, this.brokerGroupId));
        }
    }
}

