package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/eventprocessorhost/PartitionPump.class */
class PartitionPump extends Closable implements PartitionReceiveHandler {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionPump.class);
    protected final HostContext hostContext;
    protected final CompleteLease lease;
    private final CompletableFuture<Void> shutdownTriggerFuture;
    private final CompletableFuture<Void> shutdownFinishedFuture;
    private final Object processingSynchronizer;
    private final Consumer<String> pumpManagerCallback;
    private EventHubClient eventHubClient;
    private PartitionReceiver partitionReceiver;
    private CloseReason shutdownReason;
    private volatile CompletableFuture<?> internalOperationFuture;
    private IEventProcessor processor;
    private PartitionContext partitionContext;
    private ScheduledFuture<?> leaseRenewerFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionPump(HostContext hostContext, CompleteLease completeLease, Closable closable, Consumer<String> consumer) {
        super(closable);
        this.eventHubClient = null;
        this.partitionReceiver = null;
        this.internalOperationFuture = null;
        this.processor = null;
        this.partitionContext = null;
        this.leaseRenewerFuture = null;
        this.hostContext = hostContext;
        this.lease = completeLease;
        this.pumpManagerCallback = consumer;
        this.processingSynchronizer = new Object();
        this.partitionContext = new PartitionContext(this.hostContext, this.lease.getPartitionId());
        this.partitionContext.setLease(this.lease);
        this.shutdownTriggerFuture = new CompletableFuture<>();
        this.shutdownFinishedFuture = this.shutdownTriggerFuture.handleAsync((r4, th) -> {
            this.pumpManagerCallback.accept(this.lease.getPartitionId());
            return cancelPendingOperations();
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) r42 -> {
            return cleanUpAll(this.shutdownReason);
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync(r3 -> {
            return releaseLeaseOnShutdown();
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((r32, th2) -> {
            setClosed();
        }, (Executor) this.hostContext.getExecutor());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> startPump() {
        CompletableFuture.runAsync(() -> {
            openProcessor();
        }, this.hostContext.getExecutor()).thenComposeAsync(r3 -> {
            return openClientsRetryWrapper();
        }, (Executor) this.hostContext.getExecutor()).thenRunAsync(() -> {
            scheduleLeaseRenewer();
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((r5, th) -> {
            if (th != null) {
                internalShutdown(CloseReason.Shutdown, th);
            }
        }, (Executor) this.hostContext.getExecutor());
        return this.shutdownFinishedFuture;
    }

    /* JADX WARN: Type inference failed for: r1v14, types: [com.microsoft.azure.eventprocessorhost.IEventProcessor] */
    private void openProcessor() {
        TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Creating and opening event processor instance"));
        String str = EventProcessorHostActionStrings.CREATING_EVENT_PROCESSOR;
        try {
            this.processor = this.hostContext.getEventProcessorFactory().createEventProcessor(this.partitionContext);
            str = EventProcessorHostActionStrings.OPENING_EVENT_PROCESSOR;
            this.processor.onOpen(this.partitionContext);
        } catch (Exception e) {
            this.processor = null;
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Failed " + str), e);
            this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), e, str, this.lease.getPartitionId());
            throw new CompletionException(e);
        }
    }

    private CompletableFuture<Void> openClientsRetryWrapper() {
        CompletableFuture<Boolean> openClients = openClients();
        for (int i = 1; i < 5; i++) {
            openClients = openClients.handleAsync((bool, th) -> {
                if (th != null) {
                    Exception exc = (Exception) LoggingUtils.unwrapException(th, null);
                    if (exc instanceof ReceiverDisconnectedException) {
                        throw new CompletionException(exc);
                    }
                    TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Failure creating client or receiver, retrying"), th);
                }
                return Boolean.valueOf(th == null ? bool.booleanValue() : false);
            }, (Executor) this.hostContext.getExecutor()).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) bool2 -> {
                return bool2.booleanValue() ? CompletableFuture.completedFuture(bool2) : openClients();
            }, (Executor) this.hostContext.getExecutor());
        }
        return openClients.handleAsync((bool3, th2) -> {
            if (th2 == null) {
                this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout().booleanValue());
                return null;
            }
            Exception exc = (Exception) LoggingUtils.unwrapException(th2, null);
            if (exc instanceof ReceiverDisconnectedException) {
                TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Receiver disconnected on create, bad epoch?"), exc);
            } else {
                TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Failure creating client or receiver, out of retries"), th2);
            }
            this.processor.onError(this.partitionContext, new ExceptionWithAction(exc, EventProcessorHostActionStrings.CREATING_EVENT_HUB_CLIENT));
            throw LoggingUtils.wrapException(exc, EventProcessorHostActionStrings.CREATING_EVENT_HUB_CLIENT);
        }, (Executor) this.hostContext.getExecutor());
    }

    protected void scheduleLeaseRenewer() {
        if (getIsClosingOrClosed()) {
            return;
        }
        int leaseRenewIntervalInSeconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
        this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> {
            leaseRenewer();
        }, leaseRenewIntervalInSeconds, TimeUnit.SECONDS);
        TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + leaseRenewIntervalInSeconds));
    }

    private CompletableFuture<Boolean> openClients() {
        CompletableFuture<?> completableFuture;
        TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Opening EH client"));
        try {
            completableFuture = EventHubClient.create(this.hostContext.getEventHubConnectionString(), this.hostContext.getRetryPolicy(), this.hostContext.getExecutor());
        } catch (EventHubException | IOException e) {
            completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(e);
        }
        this.internalOperationFuture = completableFuture;
        return completableFuture.whenCompleteAsync((eventHubClient, th) -> {
            if (eventHubClient == null || th != null) {
                TRACE_LOGGER.error(this.hostContext.withHostAndPartition(this.partitionContext, "EventHubClient creation failed"), th);
            } else {
                this.eventHubClient = eventHubClient;
            }
            this.internalOperationFuture = null;
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync(eventHubClient2 -> {
            return this.partitionContext.getInitialOffset();
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) eventPosition -> {
            CompletableFuture<?> completableFuture2;
            long epoch = this.lease.getEpoch();
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + eventPosition));
            try {
                ReceiverOptions receiverOptions = new ReceiverOptions();
                receiverOptions.setReceiverRuntimeMetricEnabled(this.hostContext.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
                receiverOptions.setPrefetchCount(this.hostContext.getEventProcessorOptions().getPrefetchCount());
                completableFuture2 = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), eventPosition, epoch, receiverOptions);
                this.internalOperationFuture = completableFuture2;
            } catch (EventHubException e2) {
                TRACE_LOGGER.error(this.hostContext.withHostAndPartition(this.partitionContext, "Opening EH receiver failed with an error "), e2);
                completableFuture2 = new CompletableFuture<>();
                completableFuture2.completeExceptionally(e2);
            }
            return completableFuture2;
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((partitionReceiver, th2) -> {
            if (partitionReceiver != null && th2 == null) {
                this.partitionReceiver = partitionReceiver;
            } else if (this.eventHubClient != null) {
                if (th2 instanceof ReceiverDisconnectedException) {
                    TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "PartitionReceiver disconnected during startup"));
                } else {
                    TRACE_LOGGER.error(this.hostContext.withHostAndPartition(this.partitionContext, "PartitionReceiver creation failed"), th2);
                }
            }
            this.internalOperationFuture = null;
        }, (Executor) this.hostContext.getExecutor()).thenApplyAsync(partitionReceiver2 -> {
            this.partitionReceiver.setReceiveTimeout(this.hostContext.getEventProcessorOptions().getReceiveTimeOut());
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "EH client and receiver creation finished"));
            return true;
        }, (Executor) this.hostContext.getExecutor());
    }

    private CompletableFuture<Void> cleanUpAll(CloseReason closeReason) {
        return cleanUpClients().thenRunAsync(() -> {
            if (this.processor != null) {
                try {
                    synchronized (this.processingSynchronizer) {
                        this.processor.onClose(this.partitionContext, closeReason);
                    }
                } catch (Exception e) {
                    TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Failure closing processor"), e);
                    this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), e, EventProcessorHostActionStrings.CLOSING_EVENT_PROCESSOR, this.lease.getPartitionId());
                }
            }
        }, (Executor) this.hostContext.getExecutor());
    }

    private CompletableFuture<Void> cleanUpClients() {
        CompletableFuture completedFuture;
        if (this.partitionReceiver != null) {
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Setting receive handler to null"));
            completedFuture = this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler) null);
        } else {
            TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.partitionContext, "partitionReceiver is null in cleanup"));
            completedFuture = CompletableFuture.completedFuture(null);
        }
        return completedFuture.handleAsync((r6, th) -> {
            if (th == null) {
                return null;
            }
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Got exception when ReceiveHandler is set to null."), LoggingUtils.unwrapException(th, null));
            return null;
        }, (Executor) this.hostContext.getExecutor()).thenApplyAsync(obj -> {
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH receiver"));
            PartitionReceiver partitionReceiver = this.partitionReceiver;
            this.partitionReceiver = null;
            return partitionReceiver;
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync(partitionReceiver -> {
            return partitionReceiver != null ? partitionReceiver.close() : CompletableFuture.completedFuture(null);
        }, (Executor) this.hostContext.getExecutor()).handleAsync((r62, th2) -> {
            if (th2 == null) {
                return null;
            }
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH receiver failed."), LoggingUtils.unwrapException(th2, null));
            return null;
        }, (Executor) this.hostContext.getExecutor()).thenApplyAsync(obj2 -> {
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH client"));
            EventHubClient eventHubClient = this.eventHubClient;
            this.eventHubClient = null;
            if (eventHubClient == null) {
                TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.partitionContext, "eventHubClient is null in cleanup"));
            }
            return eventHubClient;
        }, (Executor) this.hostContext.getExecutor()).thenComposeAsync(eventHubClient -> {
            return eventHubClient != null ? eventHubClient.close() : CompletableFuture.completedFuture(null);
        }, (Executor) this.hostContext.getExecutor()).handleAsync((r63, th3) -> {
            if (th3 == null) {
                return null;
            }
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Closing EH client failed."), LoggingUtils.unwrapException(th3, null));
            return null;
        }, (Executor) this.hostContext.getExecutor());
    }

    protected Void cancelPendingOperations() {
        CompletableFuture<?> completableFuture = this.internalOperationFuture;
        if (completableFuture != null) {
            completableFuture.cancel(true);
        }
        ScheduledFuture<?> scheduledFuture = this.leaseRenewerFuture;
        if (scheduledFuture == null) {
            return null;
        }
        scheduledFuture.cancel(true);
        return null;
    }

    private CompletableFuture<Void> releaseLeaseOnShutdown() {
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (this.shutdownReason != CloseReason.LeaseLost) {
            completedFuture = this.hostContext.getLeaseManager().releaseLease(this.lease).handleAsync((r6, th) -> {
                if (th == null) {
                    return null;
                }
                TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Failure releasing lease on pump shutdown"), LoggingUtils.unwrapException(th, null));
                return null;
            }, (Executor) this.hostContext.getExecutor());
        }
        return completedFuture;
    }

    protected void internalShutdown(CloseReason closeReason, Throwable th) {
        setClosing();
        this.shutdownReason = closeReason;
        if (th == null) {
            this.shutdownTriggerFuture.complete(null);
        } else {
            this.shutdownTriggerFuture.completeExceptionally(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> shutdown(CloseReason closeReason) {
        TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "pump shutdown for reason " + closeReason.toString()));
        internalShutdown(closeReason, null);
        return this.shutdownFinishedFuture;
    }

    private void leaseRenewer() {
        TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "leaseRenewer()"));
        if (this.leaseRenewerFuture.isCancelled() || getIsClosingOrClosed()) {
            return;
        }
        this.hostContext.getLeaseManager().renewLease(this.lease).thenApplyAsync(bool -> {
            Boolean bool = true;
            if (!bool.booleanValue()) {
                TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "Lease lost, shutting down pump"));
                internalShutdown(CloseReason.LeaseLost, null);
                bool = false;
            }
            return bool;
        }, (Executor) this.hostContext.getExecutor()).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (bool2, th) -> {
            if (th != null) {
                Exception exc = (Exception) LoggingUtils.unwrapException(th, null);
                TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "Transient failure renewing lease"), exc);
                this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), exc, EventProcessorHostActionStrings.RENEWING_LEASE, this.lease.getPartitionId());
            }
            if (bool2 == null || !bool2.booleanValue() || this.leaseRenewerFuture.isCancelled() || getIsClosingOrClosed()) {
                return;
            }
            scheduleLeaseRenewer();
        }, (Executor) this.hostContext.getExecutor());
    }

    public int getMaxEventCount() {
        return this.hostContext.getEventProcessorOptions().getMaxBatchSize();
    }

    public void onReceive(Iterable<EventData> iterable) {
        EventData eventData;
        if (this.hostContext.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()) {
            this.partitionContext.setRuntimeInformation(this.partitionReceiver.getRuntimeInformation());
        }
        Iterable<EventData> iterable2 = iterable;
        if (iterable2 == null) {
            iterable2 = new ArrayList();
        }
        Iterator<EventData> it = iterable2.iterator();
        EventData eventData2 = null;
        while (true) {
            eventData = eventData2;
            if (!it.hasNext()) {
                break;
            } else {
                eventData2 = it.next();
            }
        }
        if (eventData != null) {
            this.partitionContext.setOffsetAndSequenceNumber(eventData);
        }
        try {
            synchronized (this.processingSynchronizer) {
                this.processor.onEvents(this.partitionContext, iterable2);
            }
        } catch (Exception e) {
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "Got exception from onEvents"), e);
        }
    }

    public void onError(Throwable th) {
        if (th == null) {
            th = new Throwable("No error info supplied by EventHub client");
        }
        if (th instanceof ReceiverDisconnectedException) {
            TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.partitionContext, "EventHub client disconnected, probably another host took the partition"));
        } else {
            TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "EventHub client error: " + th.toString()));
            if (th instanceof Exception) {
                TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(this.partitionContext, "EventHub client error continued"), (Exception) th);
            }
        }
        Throwable th2 = th;
        CompletableFuture.runAsync(() -> {
            this.processor.onError(this.partitionContext, th2);
        }, this.hostContext.getExecutor()).thenRunAsync(() -> {
            internalShutdown(CloseReason.Shutdown, th2);
        }, (Executor) this.hostContext.getExecutor());
    }
}
