/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.IllegalEntityException;
import com.microsoft.azure.eventprocessorhost.Closable;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.CompleteLease;
import com.microsoft.azure.eventprocessorhost.HostContext;
import com.microsoft.azure.eventprocessorhost.ICheckpointManager;
import com.microsoft.azure.eventprocessorhost.ILeaseManager;
import com.microsoft.azure.eventprocessorhost.LoggingUtils;
import com.microsoft.azure.eventprocessorhost.PartitionScanner;
import com.microsoft.azure.eventprocessorhost.PumpManager;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionManager
extends Closable {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(PartitionManager.class);
    protected final HostContext hostContext;
    private final Object scanFutureSynchronizer = new Object();
    private final int retryMax = 5;
    protected PumpManager pumpManager = null;
    protected volatile String[] partitionIds = null;
    private ScheduledFuture<?> scanFuture = null;

    PartitionManager(HostContext hostContext) {
        super(null);
        this.hostContext = hostContext;
    }

    CompletableFuture<Void> cachePartitionIds() {
        CompletionStage<Object> retval = null;
        if (this.partitionIds != null) {
            retval = CompletableFuture.completedFuture(null);
        } else {
            try {
                CompletableFuture cleanupFuture = new CompletableFuture();
                retval = ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.hostContext.getEventHubClientFactory().createEventHubClient().thenApplyAsync(ehClient -> {
                    EventHubClient saveForCleanupClient = ehClient;
                    cleanupFuture.thenComposeAsync(empty -> saveForCleanupClient.close(), (Executor)this.hostContext.getExecutor());
                    return ehClient;
                }, (Executor)this.hostContext.getExecutor())).thenComposeAsync(ehClient -> ehClient.getRuntimeInformation(), (Executor)this.hostContext.getExecutor())).thenAcceptAsync(ehInfo -> {
                    if (ehInfo != null) {
                        this.partitionIds = ehInfo.getPartitionIds();
                        TRACE_LOGGER.info(this.hostContext.withHost("Eventhub " + this.hostContext.getEventHubPath() + " count of partitions: " + ehInfo.getPartitionCount()));
                        for (String id : this.partitionIds) {
                            TRACE_LOGGER.info(this.hostContext.withHost("Found partition with id: " + id));
                        }
                    } else {
                        throw new CompletionException(new TimeoutException("getRuntimeInformation returned null"));
                    }
                }, (Executor)this.hostContext.getExecutor())).handleAsync((empty, e) -> {
                    cleanupFuture.complete(null);
                    if (e != null) {
                        Throwable notifyWith = e;
                        if (e instanceof CompletionException) {
                            notifyWith = e.getCause();
                        }
                        throw new CompletionException((Throwable)new IllegalEntityException("Failure getting partition ids for event hub", notifyWith));
                    }
                    return null;
                }, (Executor)this.hostContext.getExecutor());
            }
            catch (EventHubException | IOException e2) {
                retval = new CompletableFuture();
                retval.completeExceptionally((Throwable)new IllegalEntityException("Failure getting partition ids for event hub", e2));
            }
        }
        return retval;
    }

    PumpManager createPumpTestHook() {
        return new PumpManager(this.hostContext, this);
    }

    void onInitializeCompleteTestHook() {
    }

    void onPartitionCheckCompleteTestHook() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CompletableFuture<Void> stopPartitions() {
        this.setClosing();
        Object object = this.scanFutureSynchronizer;
        synchronized (object) {
            if (this.scanFuture != null) {
                this.scanFuture.cancel(true);
            }
        }
        CompletionStage<Object> stopping = CompletableFuture.completedFuture(null);
        if (this.pumpManager != null) {
            TRACE_LOGGER.info(this.hostContext.withHost("Shutting down all pumps"));
            stopping = this.pumpManager.removeAllPumps(CloseReason.Shutdown).whenCompleteAsync((empty, e) -> {
                if (e != null) {
                    Throwable notifyWith = LoggingUtils.unwrapException(e, null);
                    TRACE_LOGGER.warn(this.hostContext.withHost("Failure during shutdown"), notifyWith);
                    if (notifyWith instanceof Exception) {
                        this.hostContext.getEventProcessorOptions().notifyOfException(this.hostContext.getHostName(), (Exception)notifyWith, "Partition Manager Cleanup");
                    }
                }
            }, (Executor)this.hostContext.getExecutor());
        }
        stopping = stopping.whenCompleteAsync((empty, e) -> {
            TRACE_LOGGER.info(this.hostContext.withHost("Partition manager exiting"));
            this.setClosed();
        }, (Executor)this.hostContext.getExecutor());
        return stopping;
    }

    public CompletableFuture<Void> initialize() {
        this.pumpManager = this.createPumpTestHook();
        return ((CompletableFuture)((CompletableFuture)this.cachePartitionIds().thenComposeAsync(unused -> this.initializeStores(), (Executor)this.hostContext.getExecutor())).whenCompleteAsync((empty, e) -> {
            if (e != null) {
                StringBuilder outAction = new StringBuilder();
                Throwable notifyWith = LoggingUtils.unwrapException(e, outAction);
                if (outAction.length() > 0) {
                    TRACE_LOGGER.error(this.hostContext.withHost("Exception while initializing stores (" + outAction.toString() + "), not starting partition manager"), notifyWith);
                } else {
                    TRACE_LOGGER.error(this.hostContext.withHost("Exception while initializing stores, not starting partition manager"), notifyWith);
                }
            }
        }, (Executor)this.hostContext.getExecutor())).thenRunAsync(() -> {
            Object object = this.scanFutureSynchronizer;
            synchronized (object) {
                TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner first pass"));
                this.scanFuture = this.hostContext.getExecutor().schedule(() -> this.scan(true), 0L, TimeUnit.SECONDS);
            }
            this.onInitializeCompleteTestHook();
        }, this.hostContext.getExecutor());
    }

    private CompletableFuture<?> initializeStores() {
        ILeaseManager leaseManager = this.hostContext.getLeaseManager();
        ICheckpointManager checkpointManager = this.hostContext.getCheckpointManager();
        CompletableFuture<?> initializeStoresFuture = this.buildRetries(CompletableFuture.completedFuture(null), () -> leaseManager.createLeaseStoreIfNotExists(), "Failure creating lease store for this Event Hub, retrying", "Out of retries creating lease store for this Event Hub", "Creating Lease Store", this.retryMax);
        initializeStoresFuture = this.buildRetries(initializeStoresFuture, () -> checkpointManager.createCheckpointStoreIfNotExists(), "Failure creating checkpoint store for this Event Hub, retrying", "Out of retries creating checkpoint store for this Event Hub", "Creating Checkpoint Store", this.retryMax);
        initializeStoresFuture = this.buildRetries(initializeStoresFuture, () -> leaseManager.createAllLeasesIfNotExists(Arrays.asList(this.partitionIds)), "Failure creating leases, retrying", "Out of retries creating leases", "Creating Leases", this.retryMax);
        initializeStoresFuture = this.buildRetries(initializeStoresFuture, () -> checkpointManager.createAllCheckpointsIfNotExists(Arrays.asList(this.partitionIds)), "Failure creating checkpoint holders, retrying", "Out of retries creating checkpoint holders", "Creating Checkpoint Holders", this.retryMax);
        initializeStoresFuture.whenCompleteAsync((r, e) -> {
            if (e != null && e instanceof FinalException) {
                throw ((FinalException)e).getInner();
            }
        }, (Executor)this.hostContext.getExecutor());
        return initializeStoresFuture;
    }

    private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callable<CompletableFuture<?>> lambda, String retryMessage, String finalFailureMessage, String action, int maxRetries) {
        CompletionStage retryChain = buildOnto.thenComposeAsync(unused -> {
            CompletableFuture newresult = CompletableFuture.completedFuture(null);
            try {
                newresult = (CompletableFuture)lambda.call();
            }
            catch (Exception e1) {
                throw new CompletionException(e1);
            }
            return newresult;
        }, (Executor)this.hostContext.getExecutor());
        for (int i = 1; i < maxRetries; ++i) {
            retryChain = ((CompletableFuture)((CompletableFuture)retryChain).handleAsync((r, e) -> {
                Object effectiveResult = r;
                if (e != null) {
                    if (e instanceof FinalException) {
                        throw (FinalException)e;
                    }
                    TRACE_LOGGER.warn(this.hostContext.withHost(retryMessage), LoggingUtils.unwrapException(e, null));
                } else if (r == null) {
                    effectiveResult = true;
                }
                return e == null ? effectiveResult : null;
            }, (Executor)this.hostContext.getExecutor())).thenComposeAsync(oldresult -> {
                CompletableFuture newresult = CompletableFuture.completedFuture(oldresult);
                if (oldresult == null) {
                    try {
                        newresult = (CompletableFuture)lambda.call();
                    }
                    catch (Exception e1) {
                        throw new CompletionException(e1);
                    }
                }
                return newresult;
            }, (Executor)this.hostContext.getExecutor());
        }
        retryChain = ((CompletableFuture)retryChain).handleAsync((r, e) -> {
            if (e != null) {
                if (e instanceof FinalException) {
                    throw (FinalException)e;
                }
                TRACE_LOGGER.warn(this.hostContext.withHost(finalFailureMessage));
                throw new FinalException(LoggingUtils.wrapExceptionWithMessage(LoggingUtils.unwrapException(e, null), finalFailureMessage, action));
            }
            return r;
        }, (Executor)this.hostContext.getExecutor());
        return retryChain;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Void scan(boolean isFirst) {
        block5: {
            TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
            long start = System.currentTimeMillis();
            try {
                new PartitionScanner(this.hostContext, lease -> this.pumpManager.addPump((CompleteLease)lease), this).scan(isFirst).whenCompleteAsync((didSteal, e) -> {
                    TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
                    if (e != null && !(e instanceof Closable.ClosingException)) {
                        TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
                    }
                    this.onPartitionCheckCompleteTestHook();
                    if (!this.getIsClosingOrClosed()) {
                        int seconds;
                        int n = seconds = didSteal != false ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
                        if (isFirst) {
                            seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
                        }
                        Object object = this.scanFutureSynchronizer;
                        synchronized (object) {
                            this.scanFuture = this.hostContext.getExecutor().schedule(() -> this.scan(false), (long)seconds, TimeUnit.SECONDS);
                        }
                        TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
                    } else {
                        TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
                    }
                }, (Executor)this.hostContext.getExecutor());
            }
            catch (Exception e2) {
                TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), (Throwable)e2);
                if (this.getIsClosingOrClosed()) break block5;
                int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
                Object object = this.scanFutureSynchronizer;
                synchronized (object) {
                    this.scanFuture = this.hostContext.getExecutor().schedule(() -> this.scan(false), (long)seconds, TimeUnit.SECONDS);
                }
                TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
            }
        }
        return null;
    }

    static class FinalException
    extends CompletionException {
        private static final long serialVersionUID = -4600271981700687166L;

        FinalException(CompletionException e) {
            super(e);
        }

        CompletionException getInner() {
            return (CompletionException)this.getCause();
        }
    }
}

