/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.mqtt.inbound;

import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

public class MqttPahoMessageDrivenChannelAdapter
extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback,
MqttPahoComponent {
    public static final long DISCONNECT_COMPLETION_TIMEOUT = 5000L;
    private static final int DEFAULT_RECOVERY_INTERVAL = 10000;
    private final MqttPahoClientFactory clientFactory;
    private int recoveryInterval = 10000;
    private long disconnectCompletionTimeout = 5000L;
    private volatile IMqttClient client;
    private volatile ScheduledFuture<?> reconnectFuture;
    private volatile boolean connected;
    private volatile boolean cleanSession;
    private volatile ConsumerStopAction consumerStopAction;

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(url, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactory clientFactory, String ... topic) {
        super(null, clientId, topic);
        this.clientFactory = clientFactory;
    }

    public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String ... topic) {
        this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
    }

    public synchronized void setDisconnectCompletionTimeout(long completionTimeout) {
        this.disconnectCompletionTimeout = completionTimeout;
    }

    public synchronized void setRecoveryInterval(int recoveryInterval) {
        this.recoveryInterval = recoveryInterval;
    }

    @Override
    public MqttConnectOptions getConnectionInfo() {
        String url;
        MqttConnectOptions options = this.clientFactory.getConnectionOptions();
        if (options.getServerURIs() == null && (url = this.getUrl()) != null) {
            options = MqttUtils.cloneConnectOptions(options);
            options.setServerURIs(new String[]{url});
        }
        return options;
    }

    protected void onInit() {
        super.onInit();
        if (this.getConverter() == null) {
            DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
            pahoMessageConverter.setBeanFactory(this.getBeanFactory());
            this.setConverter(pahoMessageConverter);
        }
    }

    protected void doStart() {
        Assert.state((this.getTaskScheduler() != null ? 1 : 0) != 0, (String)"A 'taskScheduler' is required");
        try {
            this.connectAndSubscribe();
        }
        catch (Exception ex) {
            this.logger.error((Throwable)ex, (CharSequence)"Exception while connecting and subscribing, retrying");
            this.scheduleReconnect();
        }
    }

    protected synchronized void doStop() {
        this.cancelReconnect();
        if (this.client != null) {
            try {
                if (this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_ALWAYS) || this.consumerStopAction.equals((Object)ConsumerStopAction.UNSUBSCRIBE_CLEAN) && this.cleanSession) {
                    this.client.unsubscribe(this.getTopic());
                }
            }
            catch (MqttException ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Exception while unsubscribing");
            }
            try {
                this.client.disconnectForcibly(this.disconnectCompletionTimeout);
            }
            catch (MqttException ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Exception while disconnecting");
            }
            this.client.setCallback(null);
            try {
                this.client.close();
            }
            catch (MqttException ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Exception while closing");
            }
            this.connected = false;
            this.client = null;
        }
    }

    @Override
    public void addTopic(String topic, int qos) {
        this.topicLock.lock();
        try {
            super.addTopic(topic, qos);
            if (this.client != null && this.client.isConnected()) {
                this.client.subscribe(topic, qos);
            }
        }
        catch (MqttException e) {
            super.removeTopic(topic);
            throw new MessagingException("Failed to subscribe to topic " + topic, (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    @Override
    public void removeTopic(String ... topic) {
        this.topicLock.lock();
        try {
            if (this.client != null && this.client.isConnected()) {
                this.client.unsubscribe(topic);
            }
            super.removeTopic(topic);
        }
        catch (MqttException e) {
            throw new MessagingException("Failed to unsubscribe from topic(s) " + Arrays.toString(topic), (Throwable)e);
        }
        finally {
            this.topicLock.unlock();
        }
    }

    private synchronized void connectAndSubscribe() throws MqttException {
        MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
        this.cleanSession = connectionOptions.isCleanSession();
        this.consumerStopAction = this.clientFactory.getConsumerStopAction();
        if (this.consumerStopAction == null) {
            this.consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
        }
        Assert.state((this.getUrl() != null || connectionOptions.getServerURIs() != null ? 1 : 0) != 0, (String)"If no 'url' provided, connectionOptions.getServerURIs() must not be null");
        this.client = this.clientFactory.getClientInstance(this.getUrl(), this.getClientId());
        this.client.setCallback((MqttCallback)this);
        if (this.client instanceof MqttClient) {
            ((MqttClient)this.client).setTimeToWait(this.getCompletionTimeout());
        }
        this.topicLock.lock();
        Object[] topics = this.getTopic();
        ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
        try {
            this.client.connect(connectionOptions);
            this.client.setManualAcks(this.isManualAcks());
            if (topics.length > 0) {
                int[] requestedQos = this.getQos();
                int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
                this.client.subscribe((String[])topics, grantedQos);
                this.warnInvalidQosForSubscription((String[])topics, requestedQos, grantedQos);
            }
        }
        catch (MqttException ex) {
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, ex));
            }
            this.logger.error((Throwable)ex, () -> MqttPahoMessageDrivenChannelAdapter.lambda$connectAndSubscribe$0((String[])topics));
            if (this.client != null) {
                this.client.disconnectForcibly(this.disconnectCompletionTimeout);
                try {
                    this.client.setCallback(null);
                    this.client.close();
                }
                catch (MqttException mqttException) {
                    // empty catch block
                }
                this.client = null;
            }
            throw ex;
        }
        finally {
            this.topicLock.unlock();
        }
        if (this.client.isConnected()) {
            this.connected = true;
            String message = "Connected and subscribed to " + Arrays.toString(topics);
            this.logger.debug((CharSequence)message);
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttSubscribedEvent((Object)this, message));
            }
        }
    }

    private void warnInvalidQosForSubscription(String[] topics, int[] requestedQos, int[] grantedQos) {
        for (int i = 0; i < requestedQos.length; ++i) {
            if (grantedQos[i] == requestedQos[i]) continue;
            this.logger.warn(() -> "Granted QOS different to Requested QOS; topics: " + Arrays.toString(topics) + " requested: " + Arrays.toString(requestedQos) + " granted: " + Arrays.toString(grantedQos));
            break;
        }
    }

    private synchronized void cancelReconnect() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(false);
            this.reconnectFuture = null;
        }
    }

    private synchronized void scheduleReconnect() {
        this.cancelReconnect();
        if (this.isActive()) {
            try {
                this.reconnectFuture = this.getTaskScheduler().schedule(() -> {
                    try {
                        this.logger.debug((CharSequence)"Attempting reconnect");
                        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = this;
                        synchronized (mqttPahoMessageDrivenChannelAdapter) {
                            if (!this.connected) {
                                this.connectAndSubscribe();
                                this.reconnectFuture = null;
                            }
                        }
                    }
                    catch (MqttException ex) {
                        this.logger.error((Throwable)ex, (CharSequence)"Exception while connecting and subscribing");
                        this.scheduleReconnect();
                    }
                }, new Date(System.currentTimeMillis() + (long)this.recoveryInterval));
            }
            catch (Exception ex) {
                this.logger.error((Throwable)ex, (CharSequence)"Failed to schedule reconnect");
            }
        }
    }

    public synchronized void connectionLost(Throwable cause) {
        if (this.isRunning()) {
            this.logger.error(() -> "Lost connection: " + cause.getMessage() + "; retrying...");
            this.connected = false;
            if (this.client != null) {
                try {
                    this.client.setCallback(null);
                    this.client.close();
                }
                catch (MqttException mqttException) {
                    // empty catch block
                }
            }
            this.client = null;
            this.scheduleReconnect();
            ApplicationEventPublisher applicationEventPublisher = this.getApplicationEventPublisher();
            if (applicationEventPublisher != null) {
                applicationEventPublisher.publishEvent((ApplicationEvent)new MqttConnectionFailedEvent(this, cause));
            }
        }
    }

    public void messageArrived(String topic, MqttMessage mqttMessage) {
        AbstractIntegrationMessageBuilder<?> builder = this.toMessageBuilder(topic, mqttMessage);
        if (builder != null) {
            if (this.isManualAcks()) {
                builder.setHeader("acknowledgmentCallback", (Object)new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
            }
            Message message = builder.build();
            try {
                this.sendMessage(message);
            }
            catch (RuntimeException ex) {
                this.logger.error((Throwable)ex, () -> "Unhandled exception for " + message);
                throw ex;
            }
        }
    }

    private AbstractIntegrationMessageBuilder<?> toMessageBuilder(String topic, MqttMessage mqttMessage) {
        GenericMessage message;
        AbstractIntegrationMessageBuilder<?> builder = null;
        Exception conversionError = null;
        try {
            builder = this.getConverter().toMessageBuilder(topic, mqttMessage);
        }
        catch (Exception ex) {
            conversionError = ex;
        }
        if (builder == null && conversionError == null) {
            conversionError = new IllegalStateException("'MqttMessageConverter' returned 'null'");
        }
        if (conversionError != null && !this.sendErrorMessageIfNecessary((Message)(message = new GenericMessage((Object)mqttMessage)), conversionError)) {
            MessageConversionException conversionException = conversionError instanceof MessageConversionException ? (MessageConversionException)conversionError : new MessageConversionException((Message)message, "Failed to convert from MQTT Message", (Throwable)conversionError);
            throw conversionException;
        }
        return builder;
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }

    private static /* synthetic */ CharSequence lambda$connectAndSubscribe$0(String[] topics) {
        return "Error connecting or subscribing to " + Arrays.toString(topics);
    }

    private static class AcknowledgmentImpl
    implements SimpleAcknowledgment {
        private final int id;
        private final int qos;
        private final IMqttClient ackClient;

        AcknowledgmentImpl(int id, int qos, IMqttClient client) {
            this.id = id;
            this.qos = qos;
            this.ackClient = client;
        }

        public void acknowledge() {
            if (this.ackClient != null) {
                try {
                    this.ackClient.messageArrivedComplete(this.id, this.qos);
                }
                catch (MqttException e) {
                    throw new IllegalStateException(e);
                }
            } else {
                throw new IllegalStateException("Client has changed");
            }
        }
    }
}

