package com.yqbsoft.laser.service.ext.channel.jdjos.message.service;

import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.common.consumer.ConsumeFromWhere;
import com.jcloud.jcq.common.filter.FilterExpression;
import com.jcloud.jcq.common.message.AckAction;
import com.jcloud.jcq.common.utils.StringUtils;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.sdk.JCQClientFactory;
import com.jcloud.jcq.sdk.auth.UserCredential;
import com.jcloud.jcq.sdk.consumer.PullConsumer;
import com.jcloud.jcq.sdk.consumer.PullConsumerConfig;
import com.jcloud.jcq.sdk.consumer.async.AsyncAckCallback;
import com.jcloud.jcq.sdk.consumer.async.AsyncPullCallback;
import com.jcloud.jcq.sdk.consumer.model.AckResult;
import com.jcloud.jcq.sdk.consumer.model.PullResult;
import com.jcloud.jcq.sdk.producer.model.ResultCode;
import com.yqbsoft.laser.service.ext.channel.jdjos.JdJosConstants;
import com.yqbsoft.laser.service.ext.channel.jdjos.domain.EnvConstants;
import com.yqbsoft.laser.service.ext.channel.jdjos.enums.JdJcqTopic;
import com.yqbsoft.laser.service.ext.channel.jdjos.goods.EsGoodsService;
import com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService;
import com.yqbsoft.laser.service.ext.channel.jdjos.order.service.DisRefundServiceImpl;
import com.yqbsoft.laser.service.tool.util.ListUtil;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/yqbsoft/laser/service/ext/channel/jdjos/message/service/JdJcqPullMessageServiceImpl.class */
public class JdJcqPullMessageServiceImpl implements JdJcqPullMessageService {

    @Autowired
    private EsGoodsService esGoodsService;

    @Autowired
    private DisRefundServiceImpl disRefundService;
    private static final Logger logger = LoggerFactory.getLogger(JdJcqPullMessageServiceImpl.class);

    private static PullConsumer buildPullConsumer(String str, Integer num) throws ClientException {
        PullConsumer createPullConsumer = JCQClientFactory.getInstance().createPullConsumer(new UserCredential(EnvConstants.ACCESS_KEY, EnvConstants.SECRET_KEY), PullConsumerConfig.builder().consumerGroupId(str).metaServerAddress(EnvConstants.META_SERVER_ADDRESS).enableMessageTrace(true).defaultConsumePosition(ConsumeFromWhere.HEAD).maxBatchSizePerPull(num.intValue()).maxRetryTimes(3).build());
        createPullConsumer.start();
        return createPullConsumer;
    }

    public void onMessage(JdJcqTopic jdJcqTopic, Integer num, String str) {
        try {
            final PullConsumer buildPullConsumer = buildPullConsumer(EnvConstants.CONSUMER_GROUP, num);
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.yqbsoft.laser.service.ext.channel.jdjos.message.service.JdJcqPullMessageServiceImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        buildPullConsumer.shutdown();
                    } catch (ClientException e) {
                        e.printStackTrace();
                    }
                }
            }));
            while (true) {
                try {
                    pullMessageSync(buildPullConsumer, jdJcqTopic, str);
                } catch (ClientException e) {
                    logger.error("there some thing happen topic:{},异常{}", jdJcqTopic.getTopicName(), e);
                }
            }
        } catch (Exception e2) {
            logger.error("接收推送消息结束 topic:{}，异常信息:{}", jdJcqTopic.getTopicName(), e2);
        }
    }

    public static void main(String[] strArr) throws Exception {
        JdJcqPullMessageServiceImpl jdJcqPullMessageServiceImpl = new JdJcqPullMessageServiceImpl();
        jdJcqPullMessageServiceImpl.onMessage(JdJcqTopic.CT_ORDER_CREATE, 1, "");
        final PullConsumer buildPullConsumer = buildPullConsumer(EnvConstants.CONSUMER_GROUP, 2);
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.yqbsoft.laser.service.ext.channel.jdjos.message.service.JdJcqPullMessageServiceImpl.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    buildPullConsumer.shutdown();
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            }
        }));
        while (true) {
            try {
                jdJcqPullMessageServiceImpl.pullMessageSync(buildPullConsumer, null, "");
            } catch (ClientException e) {
                logger.error("there some thing happen ", e);
            }
        }
    }

    private void pullMessageSync(PullConsumer pullConsumer, JdJcqTopic jdJcqTopic, String str) throws ClientException, UnsupportedEncodingException {
        processPullResult(pullConsumer, pullConsumer.pullMessage(jdJcqTopic.getTopicMessageTheme(), (FilterExpression) null), jdJcqTopic, str);
    }

    private void pullMessageAsync(final PullConsumer pullConsumer) throws ClientException {
        pullConsumer.pullMessageAsync(EnvConstants.TOPIC, (FilterExpression) null, new AsyncPullCallback() { // from class: com.yqbsoft.laser.service.ext.channel.jdjos.message.service.JdJcqPullMessageServiceImpl.3
            public void onResult(PullResult pullResult) {
                try {
                    JdJcqPullMessageServiceImpl.this.processPullResult(pullConsumer, pullResult, null, null);
                } catch (ClientException e) {
                    JdJcqPullMessageServiceImpl.logger.warn("there some error happened ", e);
                } catch (UnsupportedEncodingException e2) {
                    throw new RuntimeException(e2);
                }
            }

            public void onException(Throwable th) {
                JdJcqPullMessageServiceImpl.logger.error("async pull message error ", th);
            }
        });
    }

    private static void pullMessageAckASync(final PullConsumer pullConsumer) throws ClientException {
        PullResult pullMessage = pullConsumer.pullMessage(EnvConstants.TOPIC, (FilterExpression) null);
        if (ResultCode.SUCCESS != pullMessage.getResultCode()) {
            logger.warn("pull request failed, pull result is  {}", pullMessage);
            return;
        }
        if (StringUtils.isEmpty(pullMessage.getAckIndex()) || pullMessage.getMessages() == null) {
            logger.info("there is no message. pull result is {}", pullMessage);
            return;
        }
        final String ackIndex = pullMessage.getAckIndex();
        logger.info("Sync pullResult.resultCode:{}, pullResult.ackIndex:{}, pullResult.messages:{}", new Object[]{pullMessage.getResultCode(), pullMessage.getAckIndex(), pullMessage.getMessages()});
        final AtomicReference atomicReference = new AtomicReference(AckAction.CONSUME_FAILED);
        pullConsumer.ackMessageAsync(EnvConstants.TOPIC, ackIndex, (AckAction) atomicReference.get(), new AsyncAckCallback() { // from class: com.yqbsoft.laser.service.ext.channel.jdjos.message.service.JdJcqPullMessageServiceImpl.4
            public void onResult(AckResult ackResult) {
                JdJcqPullMessageServiceImpl.processAckResult(pullConsumer, ackIndex, (AckAction) atomicReference.get(), ackResult, "");
            }

            public void onException(Throwable th) {
                JdJcqPullMessageServiceImpl.logger.error("async pull message error ", th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processPullResult(PullConsumer pullConsumer, PullResult pullResult, JdJcqTopic jdJcqTopic, String str) throws ClientException, UnsupportedEncodingException {
        if (!dataFiltering(pullResult, jdJcqTopic.getTopicName())) {
            logger.info("京东开普勒消息推送topic:" + jdJcqTopic.getTopicName() + ",数据过滤失败,丢弃:", pullResult.getMessages());
            return;
        }
        String ackIndex = pullResult.getAckIndex();
        AckAction businessFactory = businessFactory(jdJcqTopic, pullResult.getMessages(), str);
        processAckResult(pullConsumer, ackIndex, businessFactory, pullConsumer.ackMessage(jdJcqTopic.getTopicMessageTheme(), ackIndex, businessFactory), jdJcqTopic.getTopicMessageTheme());
    }

    public AckAction businessFactory(JdJcqTopic jdJcqTopic, List<Message> list, String str) throws UnsupportedEncodingException {
        switch (jdJcqTopic) {
            case CT_SKU_CHANGE:
                return getSkuDetailChange(list, str);
            case CT_SKU_PRICE_CHANGE:
                return getSkuPriceChange(list, str);
            case JD_ADDRESS_CHANGE:
                return getJdAddressChange(list);
            case CT_ORDER_CREATE:
                return getCtOrderCreate(list);
            case CT_ORDER_PAY:
                return getCtOrderPay(list);
            case CT_ORDER_STOCKOUT:
                return getCtOrderStockout(list, str);
            case CT_ORDER_DELIVERED:
                return getCtOrderDelivered(list, str);
            case CT_ORDER_FINISH:
                return getCtOrderFinish(list, str);
            case CT_ORDER_CANCEL:
                return getCtOrderCancel(list);
            case CT_ORDER_REFUND:
                return getCtOrderRefund(list);
            case CT_AFS_CREATE:
                return getCtAfsCreate(list);
            case CT_AFS_STEP_RESULT:
                return getCtAfsStepResult(list);
            case CT_ORDER_BALANCE_NOT_ENOUGH:
                return getCtOrderBalanceNotEnough(list);
            case CT_CLEARANCE_RESULT:
                return getCtClearanceResult(list);
            default:
                return AckAction.CONSUME_FAILED;
        }
    }

    public static AckAction getCtClearanceResult(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtOrderBalanceNotEnough(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtAfsStepResult(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtAfsCreate(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtOrderRefund(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtOrderCancel(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public AckAction getCtOrderFinish(List<Message> list, String str) {
        if (ListUtil.isNotEmpty(list) && !this.disRefundService.ctOrderFinish(list.get(0), str).equals("SUCCESS")) {
            return AckAction.CONSUME_FAILED;
        }
        return AckAction.SUCCESS;
    }

    public AckAction getCtOrderDelivered(List<Message> list, String str) {
        if (ListUtil.isNotEmpty(list) && !this.disRefundService.ctOrderDelivered(list.get(0), str).equals("SUCCESS")) {
            return AckAction.CONSUME_FAILED;
        }
        return AckAction.SUCCESS;
    }

    public AckAction getCtOrderStockout(List<Message> list, String str) {
        if (ListUtil.isNotEmpty(list) && !this.disRefundService.ctStockOut(list.get(0), str).equals("SUCCESS")) {
            return AckAction.CONSUME_FAILED;
        }
        return AckAction.SUCCESS;
    }

    public static AckAction getCtOrderPay(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getCtOrderCreate(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public static AckAction getJdAddressChange(List<Message> list) {
        return AckAction.SUCCESS;
    }

    public AckAction getSkuPriceChange(List<Message> list, String str) throws UnsupportedEncodingException {
        if (ListUtil.isNotEmpty(list) && !this.esGoodsService.updateJcqGoodsPrice(list.get(0), str).equals("SUCCESS")) {
            return AckAction.CONSUME_FAILED;
        }
        return AckAction.SUCCESS;
    }

    public AckAction getSkuDetailChange(List<Message> list, String str) throws UnsupportedEncodingException {
        if (ListUtil.isNotEmpty(list) && !this.esGoodsService.updateJcqGoodsInfo(list.get(0), str).equals("SUCCESS")) {
            return AckAction.CONSUME_FAILED;
        }
        return AckAction.SUCCESS;
    }

    public static boolean dataFiltering(PullResult pullResult, String str) {
        return (ResultCode.SUCCESS != pullResult.getResultCode() || StringUtils.isEmpty(pullResult.getAckIndex()) || pullResult.getMessages() == null) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processAckResult(PullConsumer pullConsumer, String str, AckAction ackAction, AckResult ackResult, String str2) {
        logger.info("Sync ackResult:{}", ackResult.getResultCode());
        if (ResultCode.SUCCESS == ackResult.getResultCode() || ackAction != AckAction.SUCCESS) {
            return;
        }
        try {
            if (pullConsumer.ackMessage(str2, str, ackAction).getResultCode() != ResultCode.SUCCESS) {
                logger.warn("when retry ack for  topic {}, at ack Index {}, action {}, failed or ack already ,please confirm", new Object[]{str2, str, ackAction});
            }
        } catch (ClientException e) {
            logger.warn("when retry ack exception", e);
        }
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void skuDetailChange() {
        onMessage(JdJcqTopic.CT_SKU_CHANGE, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void skuPriceChange() {
        onMessage(JdJcqTopic.CT_SKU_PRICE_CHANGE, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void jdAddressChange() {
        onMessage(JdJcqTopic.CT_SKU_PRICE_CHANGE, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderCreate() {
        onMessage(JdJcqTopic.CT_ORDER_CREATE, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void gctOrderPay() {
        onMessage(JdJcqTopic.CT_ORDER_PAY, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderStockout() {
        onMessage(JdJcqTopic.CT_ORDER_STOCKOUT, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderDelivered() {
        onMessage(JdJcqTopic.CT_ORDER_DELIVERED, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderFinish() {
        onMessage(JdJcqTopic.CT_ORDER_FINISH, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderCancel() {
        onMessage(JdJcqTopic.CT_ORDER_CANCEL, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderRefund() {
        onMessage(JdJcqTopic.CT_ORDER_REFUND, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctAfsCreate() {
        onMessage(JdJcqTopic.CT_AFS_CREATE, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctAfsStepResult() {
        onMessage(JdJcqTopic.CT_AFS_STEP_RESULT, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctOrderBalanceNotEnough() {
        onMessage(JdJcqTopic.CT_ORDER_BALANCE_NOT_ENOUGH, 1, JdJosConstants.tenantCode);
    }

    @Override // com.yqbsoft.laser.service.ext.channel.jdjos.message.JdJcqPullMessageService
    public void ctClearanceResult() {
        onMessage(JdJcqTopic.CT_CLEARANCE_RESULT, 1, JdJosConstants.tenantCode);
    }
}
