/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.transaction.LocalTransaction;
import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.WrappedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TransactionBroker
extends BrokerFilter {
    private static final Log log = LogFactory.getLog(TransactionBroker.class);
    private TransactionStore transactionStore;
    private ConcurrentHashMap xaTransactions = new ConcurrentHashMap();

    public TransactionBroker(Broker next, TransactionStore transactionStore) {
        super(next);
        this.transactionStore = transactionStore;
    }

    public void start() throws Exception {
        this.next.start();
        this.transactionStore.start();
        try {
            final ConnectionContext context = new ConnectionContext();
            context.setBroker(this);
            context.setInRecoveryMode(true);
            context.setTransactions(new ConcurrentHashMap());
            context.setProducerFlowControl(false);
            this.transactionStore.recover(new TransactionRecoveryListener(){

                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
                    try {
                        int i;
                        TransactionBroker.this.beginTransaction(context, xid);
                        for (i = 0; i < addedMessages.length; ++i) {
                            TransactionBroker.this.send(context, addedMessages[i]);
                        }
                        for (i = 0; i < aks.length; ++i) {
                            TransactionBroker.this.acknowledge(context, aks[i]);
                        }
                        TransactionBroker.this.prepareTransaction(context, xid);
                    }
                    catch (Throwable e) {
                        throw new WrappedException(e);
                    }
                }
            });
        }
        catch (WrappedException e) {
            Throwable cause = e.getCause();
            throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
        }
    }

    public void stop() throws Exception {
        this.transactionStore.stop();
        this.next.stop();
    }

    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
        ArrayList<TransactionId> txs = new ArrayList<TransactionId>();
        Iterator iter = this.xaTransactions.values().iterator();
        while (iter.hasNext()) {
            Transaction tx = (Transaction)iter.next();
            if (!tx.isPrepared()) continue;
            txs.add(tx.getTransactionId());
        }
        TransactionId[] rc = new XATransactionId[txs.size()];
        txs.toArray(rc);
        return rc;
    }

    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        if (xid.isXATransaction()) {
            Transaction transaction = (Transaction)this.xaTransactions.get(xid);
            if (transaction != null) {
                return;
            }
            transaction = new XATransaction(this.transactionStore, (XATransactionId)xid, this);
            this.xaTransactions.put(xid, transaction);
        } else {
            ConcurrentHashMap transactionMap = context.getTransactions();
            Transaction transaction = (Transaction)transactionMap.get(xid);
            if (transaction != null) {
                throw new JMSException("Transaction '" + xid + "' has already been started.");
            }
            transaction = new LocalTransaction(this.transactionStore, (LocalTransactionId)xid, context);
            transactionMap.put(xid, transaction);
        }
    }

    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        Transaction transaction = this.getTransaction(context, xid, false);
        return transaction.prepare();
    }

    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
        Transaction transaction = this.getTransaction(context, xid, true);
        transaction.commit(onePhase);
    }

    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        Transaction transaction = this.getTransaction(context, xid, true);
        transaction.rollback();
    }

    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
        Transaction transaction = this.getTransaction(context, xid, true);
        transaction.rollback();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
        Transaction originalTx = context.getTransaction();
        Transaction transaction = null;
        if (ack.isInTransaction()) {
            transaction = this.getTransaction(context, ack.getTransactionId(), false);
        }
        context.setTransaction(transaction);
        try {
            this.next.acknowledge(context, ack);
        }
        finally {
            context.setTransaction(originalTx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void send(ConnectionContext context, Message message) throws Exception {
        Transaction originalTx = context.getTransaction();
        Transaction transaction = null;
        if (message.getTransactionId() != null) {
            transaction = this.getTransaction(context, message.getTransactionId(), false);
        }
        context.setTransaction(transaction);
        try {
            this.next.send(context, message);
        }
        finally {
            context.setTransaction(originalTx);
        }
    }

    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
        Iterator iter = context.getTransactions().values().iterator();
        while (iter.hasNext()) {
            try {
                Transaction transaction = (Transaction)iter.next();
                transaction.rollback();
            }
            catch (Exception e) {
                log.warn("ERROR Rolling back disconnected client's transactions: ", e);
            }
            iter.remove();
        }
        this.next.removeConnection(context, info, error);
    }

    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
        ConcurrentHashMap transactionMap = xid.isXATransaction() ? this.xaTransactions : context.getTransactions();
        Transaction transaction = (Transaction)transactionMap.get(xid);
        if (transaction != null) {
            return transaction;
        }
        if (xid.isXATransaction()) {
            XAException e = new XAException("Transaction '" + xid + "' has not been started.");
            e.errorCode = -4;
            throw e;
        }
        throw new JMSException("Transaction '" + xid + "' has not been started.");
    }

    public void removeTransaction(XATransactionId xid) {
        this.xaTransactions.remove(xid);
    }
}

