package io.jboot.components.mq.rocketmq;

import com.jfinal.log.Log;
import io.jboot.Jboot;
import io.jboot.components.mq.Jbootmq;
import io.jboot.components.mq.JbootmqBase;
import io.jboot.components.mq.JbootmqConfig;
import io.jboot.exception.JbootIllegalConfigException;
import io.jboot.utils.ConfigUtil;
import io.jboot.utils.StrUtil;
import java.util.Iterator;
import java.util.Map;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

/* loaded from: input_file:io/jboot/components/mq/rocketmq/JbootRocketmqImpl.class */
public class JbootRocketmqImpl extends JbootmqBase implements Jbootmq {
    private static final Log LOG = Log.getLog(JbootRocketmqImpl.class);
    private JbootRocketmqConfig rocketmqConfig;
    private MQProducer mqProducer;
    private DefaultMQPushConsumer queueConsumer;
    private DefaultMQPushConsumer broadcastConsumer;

    public JbootRocketmqImpl(JbootmqConfig jbootmqConfig) {
        super(jbootmqConfig);
        String typeName = jbootmqConfig.getTypeName();
        if (!StrUtil.isNotBlank(typeName)) {
            this.rocketmqConfig = (JbootRocketmqConfig) Jboot.config(JbootRocketmqConfig.class);
            return;
        }
        Map configModels = ConfigUtil.getConfigModels(JbootRocketmqConfig.class);
        if (!configModels.containsKey(typeName)) {
            throw new JbootIllegalConfigException("Please config \"jboot.mq.rocket." + typeName + ".namesrvAddr\" in your jboot.properties.");
        }
        this.rocketmqConfig = (JbootRocketmqConfig) configModels.get(typeName);
    }

    @Override // io.jboot.components.mq.JbootmqBase
    protected void onStartListening() {
        try {
            startQueueConsumer();
            startBroadcastConsumer();
        } catch (MQClientException e) {
            LOG.error(e.toString(), e);
        }
    }

    @Override // io.jboot.components.mq.JbootmqBase
    protected void onStopListening() {
        if (this.queueConsumer != null) {
            this.queueConsumer.shutdown();
        }
        if (this.broadcastConsumer != null) {
            this.broadcastConsumer.shutdown();
        }
    }

    public void startQueueConsumer() throws MQClientException {
        this.queueConsumer = new DefaultMQPushConsumer(this.rocketmqConfig.getConsumerGroup());
        this.queueConsumer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        this.queueConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            this.queueConsumer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        if (this.rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
            this.queueConsumer.setConsumeMessageBatchMaxSize(this.rocketmqConfig.getConsumeMessageBatchMaxSize().intValue());
        }
        Iterator<String> it = this.channels.iterator();
        while (it.hasNext()) {
            this.queueConsumer.subscribe(it.next(), this.rocketmqConfig.getSubscribeSubExpression());
        }
        this.queueConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            RokectmqMessageContext rokectmqMessageContext = new RokectmqMessageContext(this, list, consumeConcurrentlyContext);
            if (list != null && !list.isEmpty()) {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    MessageExt messageExt = (MessageExt) it2.next();
                    notifyListeners(messageExt.getTopic(), getSerializer().deserialize(messageExt.getBody()), rokectmqMessageContext);
                }
            }
            return rokectmqMessageContext.getReturnStatus();
        });
        this.queueConsumer.start();
    }

    public void startBroadcastConsumer() throws MQClientException {
        this.broadcastConsumer = new DefaultMQPushConsumer(this.rocketmqConfig.getBroadcastChannelPrefix() + this.rocketmqConfig.getConsumerGroup());
        this.broadcastConsumer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            this.broadcastConsumer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        this.broadcastConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.broadcastConsumer.setMessageModel(MessageModel.BROADCASTING);
        if (this.rocketmqConfig.getConsumeMessageBatchMaxSize() != null) {
            this.broadcastConsumer.setConsumeMessageBatchMaxSize(this.rocketmqConfig.getConsumeMessageBatchMaxSize().intValue());
        }
        Iterator<String> it = this.channels.iterator();
        while (it.hasNext()) {
            this.broadcastConsumer.subscribe(this.rocketmqConfig.getBroadcastChannelPrefix() + it.next(), this.rocketmqConfig.getSubscribeSubExpression());
        }
        int length = this.rocketmqConfig.getBroadcastChannelPrefix().length();
        this.broadcastConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            RokectmqMessageContext rokectmqMessageContext = new RokectmqMessageContext(this, list, consumeConcurrentlyContext);
            if (list != null && !list.isEmpty()) {
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    MessageExt messageExt = (MessageExt) it2.next();
                    notifyListeners(messageExt.getTopic().substring(length), getSerializer().deserialize(messageExt.getBody()), rokectmqMessageContext);
                }
            }
            return rokectmqMessageContext.getReturnStatus();
        });
        this.broadcastConsumer.start();
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void enqueue(Object obj, String str) {
        doSendMessage(obj, str);
    }

    @Override // io.jboot.components.mq.Jbootmq
    public void publish(Object obj, String str) {
        doSendMessage(obj, this.rocketmqConfig.getBroadcastChannelPrefix() + str);
    }

    public void doSendMessage(Object obj, String str) {
        try {
            if (getMQProducer().send(obj instanceof Message ? (Message) obj : new Message(str, getSerializer().serialize(obj))) == null) {
                LOG.warn("Rockect mq send message fail!!!");
            }
        } catch (Exception e) {
            LOG.error(e.toString(), e);
        }
    }

    public MQProducer getMQProducer() throws MQClientException {
        if (this.mqProducer == null) {
            synchronized (this) {
                if (this.mqProducer == null) {
                    createMqProducer();
                }
            }
        }
        return this.mqProducer;
    }

    public void createMqProducer() throws MQClientException {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(this.rocketmqConfig.getProducerGroup());
        defaultMQProducer.setNamesrvAddr(this.rocketmqConfig.getNamesrvAddr());
        if (StrUtil.isNotBlank(this.rocketmqConfig.getNamespace())) {
            defaultMQProducer.setNamespace(this.rocketmqConfig.getNamespace());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getInstanceName())) {
            defaultMQProducer.setInstanceName(this.rocketmqConfig.getInstanceName());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getClientIP())) {
            defaultMQProducer.setClientIP(this.rocketmqConfig.getClientIP());
        }
        if (StrUtil.isNotBlank(this.rocketmqConfig.getCreateTopicKey())) {
            defaultMQProducer.setCreateTopicKey(this.rocketmqConfig.getCreateTopicKey());
        }
        if (this.rocketmqConfig.getUseTLS() != null) {
            defaultMQProducer.setUseTLS(this.rocketmqConfig.getUseTLS().booleanValue());
        }
        if (this.rocketmqConfig.getSendLatencyFaultEnable() != null) {
            defaultMQProducer.setSendLatencyFaultEnable(this.rocketmqConfig.getSendLatencyFaultEnable().booleanValue());
        }
        if (this.rocketmqConfig.getSendMessageWithVIPChannel() != null) {
            defaultMQProducer.setSendMessageWithVIPChannel(this.rocketmqConfig.getSendMessageWithVIPChannel().booleanValue());
        }
        if (this.rocketmqConfig.getSendMsgTimeout() != null) {
            defaultMQProducer.setSendMsgTimeout(this.rocketmqConfig.getSendMsgTimeout().intValue());
        }
        if (this.rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK() != null) {
            defaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(this.rocketmqConfig.getRetryAnotherBrokerWhenNotStoreOK().booleanValue());
        }
        if (this.rocketmqConfig.getRetryTimesWhenSendAsyncFailed() != null) {
            defaultMQProducer.setRetryTimesWhenSendAsyncFailed(this.rocketmqConfig.getRetryTimesWhenSendAsyncFailed().intValue());
        }
        if (this.rocketmqConfig.getRetryTimesWhenSendFailed() != null) {
            defaultMQProducer.setRetryTimesWhenSendFailed(this.rocketmqConfig.getRetryTimesWhenSendFailed().intValue());
        }
        this.mqProducer = defaultMQProducer;
        defaultMQProducer.start();
    }
}
