package org.apache.servicecomb.saga.alpha.core;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.servicecomb.saga.common.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/servicecomb/saga/alpha/core/EventScanner.class */
public class EventScanner implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private final ScheduledExecutorService scheduler;
    private final TxEventRepository eventRepository;
    private final CommandRepository commandRepository;
    private final TxTimeoutRepository timeoutRepository;
    private final OmegaCallback omegaCallback;
    private final int eventPollingInterval;
    private long nextEndedEventId;
    private long nextCompensatedEventId;

    public EventScanner(ScheduledExecutorService scheduledExecutorService, TxEventRepository txEventRepository, CommandRepository commandRepository, TxTimeoutRepository txTimeoutRepository, OmegaCallback omegaCallback, int i) {
        this.scheduler = scheduledExecutorService;
        this.eventRepository = txEventRepository;
        this.commandRepository = commandRepository;
        this.timeoutRepository = txTimeoutRepository;
        this.omegaCallback = omegaCallback;
        this.eventPollingInterval = i;
    }

    @Override // java.lang.Runnable
    public void run() {
        pollEvents();
    }

    private void pollEvents() {
        this.scheduler.scheduleWithFixedDelay(() -> {
            updateTimeoutStatus();
            findTimeoutEvents();
            abortTimeoutEvents();
            saveUncompensatedEventsToCommands();
            compensate();
            updateCompensatedCommands();
            deleteDuplicateSagaEndedEvents();
            updateTransactionStatus();
        }, 0L, this.eventPollingInterval, TimeUnit.MILLISECONDS);
    }

    private void findTimeoutEvents() {
        this.eventRepository.findTimeoutEvents().forEach(txEvent -> {
            LOG.info("Found timeout event {}", txEvent);
            this.timeoutRepository.save(txTimeoutOf(txEvent));
        });
    }

    private void updateTimeoutStatus() {
        this.timeoutRepository.markTimeoutAsDone();
    }

    private void saveUncompensatedEventsToCommands() {
        this.eventRepository.findFirstUncompensatedEventByIdGreaterThan(this.nextEndedEventId, EventType.TxEndedEvent.name()).forEach(txEvent -> {
            LOG.info("Found uncompensated event {}", txEvent);
            this.nextEndedEventId = txEvent.id();
            this.commandRepository.saveCompensationCommands(txEvent.globalTxId());
        });
    }

    private void updateCompensatedCommands() {
        this.eventRepository.findFirstCompensatedEventByIdGreaterThan(this.nextCompensatedEventId).ifPresent(txEvent -> {
            LOG.info("Found compensated event {}", txEvent);
            this.nextCompensatedEventId = txEvent.id();
            updateCompensationStatus(txEvent);
        });
    }

    private void deleteDuplicateSagaEndedEvents() {
        try {
            this.eventRepository.deleteDuplicateEvents(EventType.SagaEndedEvent.name());
        } catch (Exception e) {
            LOG.warn("Failed to delete duplicate event", e);
        }
    }

    private void updateCompensationStatus(TxEvent txEvent) {
        this.commandRepository.markCommandAsDone(txEvent.globalTxId(), txEvent.localTxId());
        LOG.info("Transaction with globalTxId {} and localTxId {} was compensated", txEvent.globalTxId(), txEvent.localTxId());
        markSagaEnded(txEvent);
    }

    private void abortTimeoutEvents() {
        this.timeoutRepository.findFirstTimeout().forEach(txTimeout -> {
            LOG.info("Found timeout event {} to abort", txTimeout);
            this.eventRepository.save(toTxAbortedEvent(txTimeout));
            if (txTimeout.type().equals(EventType.TxStartedEvent.name())) {
                Optional<TxEvent> findTxStartedEvent = this.eventRepository.findTxStartedEvent(txTimeout.globalTxId(), txTimeout.localTxId());
                OmegaCallback omegaCallback = this.omegaCallback;
                omegaCallback.getClass();
                findTxStartedEvent.ifPresent(omegaCallback::compensate);
            }
        });
    }

    private void updateTransactionStatus() {
        this.eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
    }

    private void markSagaEnded(TxEvent txEvent) {
        if (this.commandRepository.findUncompletedCommands(txEvent.globalTxId()).isEmpty()) {
            markGlobalTxEndWithEvent(txEvent);
        }
    }

    private void markGlobalTxEndWithEvent(TxEvent txEvent) {
        this.eventRepository.save(toSagaEndedEvent(txEvent));
        LOG.info("Marked end of transaction with globalTxId {}", txEvent.globalTxId());
    }

    private void markGlobalTxEndWithEvents(List<TxEvent> list) {
        list.forEach(this::markGlobalTxEndWithEvent);
    }

    private TxEvent toTxAbortedEvent(TxTimeout txTimeout) {
        return new TxEvent(txTimeout.serviceName(), txTimeout.instanceId(), txTimeout.globalTxId(), txTimeout.localTxId(), txTimeout.parentTxId(), EventType.TxAbortedEvent.name(), "", "Transaction timeout".getBytes());
    }

    private TxEvent toSagaEndedEvent(TxEvent txEvent) {
        return new TxEvent(txEvent.serviceName(), txEvent.instanceId(), txEvent.globalTxId(), txEvent.globalTxId(), null, EventType.SagaEndedEvent.name(), "", EMPTY_PAYLOAD);
    }

    private void compensate() {
        this.commandRepository.findFirstCommandToCompensate().forEach(command -> {
            LOG.info("Compensating transaction with globalTxId {} and localTxId {}", command.globalTxId(), command.localTxId());
            this.omegaCallback.compensate(txStartedEventOf(command));
        });
    }

    private TxEvent txStartedEventOf(Command command) {
        return new TxEvent(command.serviceName(), command.instanceId(), command.globalTxId(), command.localTxId(), command.parentTxId(), EventType.TxStartedEvent.name(), command.compensationMethod(), command.payloads());
    }

    private TxTimeout txTimeoutOf(TxEvent txEvent) {
        return new TxTimeout(txEvent.id(), txEvent.serviceName(), txEvent.instanceId(), txEvent.globalTxId(), txEvent.localTxId(), txEvent.parentTxId(), txEvent.type(), txEvent.expiryTime(), TaskStatus.NEW.name());
    }
}
