/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region.cursors;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.TopicStorePrefetch;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class StoreDurableSubscriberCursor
extends AbstractPendingMessageCursor {
    private static final Log log = LogFactory.getLog(StoreDurableSubscriberCursor.class);
    private int pendingCount = 0;
    private String clientId;
    private String subscriberName;
    private Map topics = new HashMap();
    private LinkedList storePrefetches = new LinkedList();
    private boolean started;
    private PendingMessageCursor nonPersistent;
    private PendingMessageCursor currentCursor;

    public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize) {
        this.clientId = clientId;
        this.subscriberName = subscriberName;
        this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
        this.storePrefetches.add(this.nonPersistent);
    }

    public synchronized void start() throws Exception {
        this.started = true;
        Iterator i = this.storePrefetches.iterator();
        while (i.hasNext()) {
            PendingMessageCursor tsp = (PendingMessageCursor)i.next();
            tsp.start();
            this.pendingCount += tsp.size();
        }
    }

    public synchronized void stop() throws Exception {
        this.started = false;
        Iterator i = this.storePrefetches.iterator();
        while (i.hasNext()) {
            PendingMessageCursor tsp = (PendingMessageCursor)i.next();
            tsp.stop();
        }
        this.pendingCount = 0;
    }

    public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
        TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, this.clientId, this.subscriberName);
        tsp.setMaxBatchSize(this.getMaxBatchSize());
        this.topics.put(destination, tsp);
        this.storePrefetches.add(tsp);
        if (this.started) {
            tsp.start();
            this.pendingCount += tsp.size();
        }
    }

    public synchronized void remove(ConnectionContext context, Destination destination) throws Exception {
        Object tsp = this.topics.remove(destination);
        if (tsp != null) {
            this.storePrefetches.remove(tsp);
        }
    }

    public synchronized boolean isEmpty() {
        return this.pendingCount <= 0;
    }

    public boolean isRecoveryRequired() {
        return false;
    }

    public synchronized void addMessageLast(MessageReference node) throws Exception {
        if (node != null) {
            Destination dest;
            TopicStorePrefetch tsp;
            Message msg = node.getMessage();
            if (this.started) {
                ++this.pendingCount;
                if (!msg.isPersistent()) {
                    this.nonPersistent.addMessageLast(node);
                }
            }
            if (msg.isPersistent() && (tsp = (TopicStorePrefetch)this.topics.get(dest = msg.getRegionDestination())) != null) {
                tsp.addMessageLast(node);
                if (this.started && this.pendingCount - this.nonPersistent.size() <= 0) {
                    tsp.nextToDispatch(node.getMessageId());
                }
            }
        }
    }

    public void clear() {
        this.pendingCount = 0;
    }

    public synchronized boolean hasNext() {
        boolean result;
        boolean bl = result = this.pendingCount > 0;
        if (result) {
            try {
                this.currentCursor = this.getNextCursor();
            }
            catch (Exception e) {
                log.error("Failed to get current cursor ", e);
                throw new RuntimeException(e);
            }
            result = this.currentCursor != null ? this.currentCursor.hasNext() : false;
        }
        return result;
    }

    public synchronized MessageReference next() {
        return this.currentCursor != null ? this.currentCursor.next() : null;
    }

    public synchronized void remove() {
        if (this.currentCursor != null) {
            this.currentCursor.remove();
        }
        --this.pendingCount;
    }

    public synchronized void reset() {
        Iterator i = this.storePrefetches.iterator();
        while (i.hasNext()) {
            AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
            tsp.reset();
        }
    }

    public int size() {
        return this.pendingCount;
    }

    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (this.currentCursor == null || this.currentCursor.isEmpty()) {
            this.currentCursor = null;
            Iterator i = this.storePrefetches.iterator();
            while (i.hasNext()) {
                AbstractPendingMessageCursor tsp = (AbstractPendingMessageCursor)i.next();
                tsp.setMaxBatchSize(this.getMaxBatchSize());
                if (!tsp.hasNext()) continue;
                this.currentCursor = tsp;
                break;
            }
            Object obj = this.storePrefetches.removeFirst();
            this.storePrefetches.addLast(obj);
        }
        return this.currentCursor;
    }
}

