/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.endpoint;

import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.IntegrationConsumer;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
import org.springframework.integration.router.MessageRouter;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.ReactiveMessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class ReactiveStreamsConsumer
extends AbstractEndpoint
implements IntegrationConsumer {
    private final MessageChannel inputChannel;
    private final Publisher<Message<Object>> publisher;
    private final MessageHandler handler;
    @Nullable
    private final ReactiveMessageHandler reactiveMessageHandler;
    @Nullable
    private final Subscriber<Message<?>> subscriber;
    @Nullable
    private final Lifecycle lifecycleDelegate;
    private ErrorHandler errorHandler;
    private volatile Disposable subscription;

    public ReactiveStreamsConsumer(MessageChannel inputChannel, MessageHandler messageHandler) {
        this(inputChannel, (Subscriber<Message<?>>)(messageHandler instanceof Subscriber ? (Subscriber)messageHandler : new MessageHandlerSubscriber(messageHandler)));
    }

    public ReactiveStreamsConsumer(MessageChannel inputChannel, Subscriber<Message<?>> subscriber) {
        Assert.notNull((Object)inputChannel, (String)"'inputChannel' must not be null");
        Assert.notNull(subscriber, (String)"'subscriber' must not be null");
        this.inputChannel = inputChannel;
        if (inputChannel instanceof NullChannel && this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"The consuming from the NullChannel does not have any effects: it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
        }
        this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
        this.subscriber = subscriber;
        Lifecycle lifecycle = this.lifecycleDelegate = subscriber instanceof Lifecycle ? (Lifecycle)subscriber : null;
        this.handler = subscriber instanceof MessageHandlerSubscriber ? ((MessageHandlerSubscriber)subscriber).messageHandler : (subscriber instanceof MessageHandler ? (MessageHandler)subscriber : arg_0 -> this.subscriber.onNext(arg_0));
        this.reactiveMessageHandler = null;
    }

    public ReactiveStreamsConsumer(MessageChannel inputChannel, ReactiveMessageHandler reactiveMessageHandler) {
        Assert.notNull((Object)inputChannel, (String)"'inputChannel' must not be null");
        this.inputChannel = inputChannel;
        this.handler = new ReactiveMessageHandlerAdapter(reactiveMessageHandler);
        this.reactiveMessageHandler = reactiveMessageHandler;
        this.publisher = IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
        this.subscriber = null;
        this.lifecycleDelegate = reactiveMessageHandler instanceof Lifecycle ? (Lifecycle)reactiveMessageHandler : null;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Override
    public MessageChannel getInputChannel() {
        return this.inputChannel;
    }

    @Override
    public MessageChannel getOutputChannel() {
        if (this.handler instanceof MessageProducer) {
            return ((MessageProducer)this.handler).getOutputChannel();
        }
        if (this.handler instanceof MessageRouter) {
            return ((MessageRouter)this.handler).getDefaultOutputChannel();
        }
        return null;
    }

    @Override
    public MessageHandler getHandler() {
        return this.handler;
    }

    @Override
    protected void onInit() {
        super.onInit();
        if (this.errorHandler == null) {
            this.errorHandler = ChannelUtils.getErrorHandler(this.getBeanFactory());
        }
    }

    @Override
    protected void doStart() {
        if (this.lifecycleDelegate != null) {
            this.lifecycleDelegate.start();
        }
        if (this.reactiveMessageHandler != null) {
            this.subscription = Flux.from(this.publisher).flatMap(arg_0 -> ((ReactiveMessageHandler)this.reactiveMessageHandler).handleMessage(arg_0)).onErrorContinue((ex, data) -> this.errorHandler.handleError(ex)).subscribe();
        } else if (this.subscriber != null) {
            this.subscription = Flux.from(this.publisher).subscribe(data -> {
                try {
                    this.subscriber.onNext(data);
                }
                catch (Exception ex) {
                    this.errorHandler.handleError((Throwable)ex);
                }
            }, null, () -> this.subscriber.onComplete(), arg_0 -> this.subscriber.onSubscribe(arg_0));
        }
    }

    @Override
    protected void doStop() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
        if (this.lifecycleDelegate != null) {
            this.lifecycleDelegate.stop();
        }
    }

    private static final class MessageHandlerSubscriber
    implements CoreSubscriber<Message<?>>,
    Disposable,
    Lifecycle {
        private final Consumer<Message<?>> consumer;
        private Subscription subscription;
        private final MessageHandler messageHandler;

        MessageHandlerSubscriber(MessageHandler messageHandler) {
            Assert.notNull((Object)messageHandler, (String)"'messageHandler' must not be null");
            this.messageHandler = messageHandler;
            this.consumer = arg_0 -> ((MessageHandler)this.messageHandler).handleMessage(arg_0);
        }

        public void onSubscribe(Subscription s) {
            this.subscription = s;
            s.request(Long.MAX_VALUE);
        }

        public void onNext(Message<?> message) {
            this.consumer.accept(message);
        }

        public void onError(Throwable t) {
        }

        public void onComplete() {
            this.dispose();
        }

        public void dispose() {
            Subscription s = this.subscription;
            if (s != null) {
                this.subscription = null;
                s.cancel();
            }
        }

        public boolean isDisposed() {
            return this.subscription == null;
        }

        public void start() {
            if (this.messageHandler instanceof Lifecycle) {
                ((Lifecycle)this.messageHandler).start();
            }
        }

        public void stop() {
            if (this.messageHandler instanceof Lifecycle) {
                ((Lifecycle)this.messageHandler).stop();
            }
        }

        public boolean isRunning() {
            return !(this.messageHandler instanceof Lifecycle) || ((Lifecycle)this.messageHandler).isRunning();
        }
    }
}

