/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.consumer.impl;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.Exception.ClientExceptionCode;
import com.jcloud.jcq.client.common.AbstractClient;
import com.jcloud.jcq.client.common.ClientConfig;
import com.jcloud.jcq.client.consumer.Consumer;
import com.jcloud.jcq.client.consumer.ConsumerConfig;
import com.jcloud.jcq.client.trace.DefaultTraceDispatcherImpl;
import com.jcloud.jcq.common.filter.FilterExpression;
import com.jcloud.jcq.common.msg.attribute.CompressType;
import com.jcloud.jcq.common.trace.TracePoint;
import com.jcloud.jcq.common.trace.TraceType;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.common.utils.ZipUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.protocol.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DefaultConsumerImpl
extends AbstractClient
implements Consumer {
    protected String consumerGroupId;
    protected String consumerId;
    protected ConsumerConfig consumerConfig;

    public DefaultConsumerImpl(String accessKey, String secretKey, String consumerId, ConsumerConfig consumerConfig) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
        this.consumerGroupId = consumerConfig.getConsumerGroupId();
        this.consumerId = consumerId;
        this.consumerConfig = consumerConfig;
        if (consumerConfig.isMessageTraceOn()) {
            this.traceDispatcher = new DefaultTraceDispatcherImpl(this);
        }
    }

    @Override
    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    @Override
    public String getConsumerId() {
        return this.consumerId;
    }

    @Override
    public ConsumerConfig getConsumerConfig() {
        return this.consumerConfig;
    }

    @Override
    public String getInstanceId() {
        return this.consumerId;
    }

    @Override
    public ClientConfig getClientConfig() {
        return this.consumerConfig;
    }

    @Override
    public List<TracePoint> getTracePoints(List<Message> messages) {
        ArrayList<TracePoint> tracePoints = new ArrayList<TracePoint>();
        if (messages == null || messages.isEmpty()) {
            return tracePoints;
        }
        for (Message message : messages) {
            TracePoint tracePoint = new TracePoint(message.getMessageId(), message.getTopic(), message.getQueueId());
            tracePoint.setTraceType(TraceType.BEFORE_CONSUME);
            tracePoint.setConsumerGroupId(this.consumerGroupId);
            String retryTimesStr = message.getProperties().get("PROPERTY_RETRY_TIMES");
            tracePoint.setRetryTimes(retryTimesStr == null ? 0 : Integer.parseInt(retryTimesStr));
            tracePoints.add(tracePoint);
        }
        return tracePoints;
    }

    List<TracePoint> getTracePoints(String brokerAddress, long ackIndex) {
        return this.traceDispatcher.getAndRemoveTracePoints(brokerAddress, ackIndex);
    }

    @Override
    public void completeAndAppendTracePoints(List<TracePoint> tracePoints, Response response) {
    }

    @Override
    public void completeAndAppendBeforeConsumeTracePoints(List<TracePoint> tracePoints, String brokerAddress, Response response, long ackIndex) {
        if (!this.isMessageTraceOn() || tracePoints == null || tracePoints.isEmpty()) {
            return;
        }
        for (TracePoint tracePoint : tracePoints) {
            tracePoint.setRequestId(response == null ? "" : response.getRequestId());
        }
        this.traceDispatcher.append(tracePoints, brokerAddress, ackIndex);
    }

    @Override
    public void completeAndAppendAfterConsumeTracePoints(Response response, String brokerAddress, long ackIndex, boolean success) {
        List<TracePoint> tracePoints = this.getTracePoints(brokerAddress, ackIndex);
        if (!this.isMessageTraceOn() || response == null || tracePoints == null || tracePoints.isEmpty()) {
            return;
        }
        ArrayList<TracePoint> afterTracePoints = new ArrayList<TracePoint>();
        for (TracePoint tracePoint : tracePoints) {
            TracePoint afterTracePoint = new TracePoint(tracePoint.getMessageId(), tracePoint.getTopic(), tracePoint.getQueueId());
            afterTracePoint.setTraceType(TraceType.AFTER_CONSUME);
            afterTracePoint.setCostTime((int)(System.currentTimeMillis() - tracePoint.getTimeStamp()));
            afterTracePoint.setTimeStamp(System.currentTimeMillis());
            afterTracePoint.setRequestId(response.getRequestId());
            afterTracePoint.setConsumerGroupId(tracePoint.getConsumerGroupId());
            afterTracePoint.setRetryTimes(tracePoint.getRetryTimes());
            afterTracePoint.setBusinessId(tracePoint.getBusinessId());
            afterTracePoint.setSuccess(success);
            afterTracePoints.add(afterTracePoint);
        }
        this.traceDispatcher.append(afterTracePoints);
    }

    @Override
    public boolean isMessageTraceOn() {
        return this.consumerConfig.isMessageTraceOn();
    }

    @Override
    public boolean isTraceProducer() {
        return false;
    }

    @Override
    protected void doBeforeStart() throws ClientException {
    }

    @Override
    protected void doBeforeShutdown() throws ClientException {
    }

    protected void checkFilterExpression(FilterExpression filterExpression) throws ClientException {
        if (filterExpression == null) {
            return;
        }
        if (filterExpression.getExpressionType() == null) {
            throw new ClientException("filterExpression.expressionType is null", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
        if (StringUtils.isEmpty(filterExpression.getExpression())) {
            throw new ClientException("filterExpression.expression is null", ClientExceptionCode.INVALID_PARAMETER.getCode());
        }
    }

    protected void tryDecompressMessages(List<Message> messages) throws ClientException {
        if (messages == null || messages.size() == 0) {
            return;
        }
        for (Message message : messages) {
            if (message.getCompressType() != CompressType.ZIP) continue;
            try {
                message.setBody(ZipUtils.decompress(message.getBody()));
            }
            catch (IOException e) {
                String warnMsg = String.format("got exception:%s when try decompressing message body of message:%s", e, message.getMessageId());
                this.logger.warn(warnMsg);
                throw new ClientException(warnMsg);
            }
        }
    }
}

