/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.Service;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerStoppedException;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TransportConnection
implements Service,
Connection,
Task,
CommandVisitor {
    private static final Log log = LogFactory.getLog(TransportConnection.class);
    private static final Log transportLog = LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
    private static final Log serviceLog = LogFactory.getLog(TransportConnection.class.getName() + ".Service");
    protected final Broker broker;
    private MasterBroker masterBroker;
    protected final TransportConnector connector;
    private final Transport transport;
    private MessageAuthorizationPolicy messageAuthorizationPolicy;
    protected final ConcurrentHashMap localConnectionStates = new ConcurrentHashMap();
    protected final Map brokerConnectionStates;
    protected BrokerInfo brokerInfo;
    private WireFormatInfo wireFormatInfo;
    protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
    protected final TaskRunner taskRunner;
    protected final AtomicReference transportException = new AtomicReference();
    private boolean inServiceException = false;
    private ConnectionStatistics statistics = new ConnectionStatistics();
    private boolean manageable;
    private boolean slow;
    private boolean markedCandidate;
    private boolean blockedCandidate;
    private boolean blocked;
    private boolean connected;
    private boolean active;
    private boolean starting;
    private boolean pendingStop;
    private long timeStamp = 0L;
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AtomicBoolean transportDisposed = new AtomicBoolean();
    protected final AtomicBoolean disposed = new AtomicBoolean(false);
    private CountDownLatch stopLatch = new CountDownLatch(1);
    protected final AtomicBoolean asyncException = new AtomicBoolean(false);
    private ConnectionContext context;
    private boolean networkConnection;
    private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
    protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);

    public TransportConnection(TransportConnector connector, Transport transport, Broker broker, TaskRunnerFactory taskRunnerFactory) {
        this.connector = connector;
        this.broker = broker;
        RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
        this.brokerConnectionStates = rb.getConnectionStates();
        if (connector != null) {
            this.statistics.setParent(connector.getStatistics());
        }
        this.taskRunner = taskRunnerFactory != null ? taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " + System.identityHashCode(this)) : null;
        connector.setBrokerName(broker.getBrokerName());
        this.transport = transport;
        this.transport.setTransportListener(new DefaultTransportListener(){

            public void onCommand(Object o) {
                Command command = (Command)o;
                Response response = TransportConnection.this.service(command);
                if (response != null) {
                    TransportConnection.this.dispatchSync(response);
                }
            }

            public void onException(IOException exception) {
                TransportConnection.this.serviceTransportException(exception);
            }
        });
        this.connected = true;
    }

    public int getDispatchQueueSize() {
        return this.dispatchQueue.size();
    }

    public void serviceTransportException(IOException e) {
        if (!this.disposed.get()) {
            this.transportException.set(e);
            if (transportLog.isDebugEnabled()) {
                transportLog.debug("Transport failed: " + e, e);
            }
            ServiceSupport.dispose(this);
        }
    }

    public void serviceExceptionAsync(final IOException e) {
        if (this.asyncException.compareAndSet(false, true)) {
            new Thread("Async Exception Handler"){

                public void run() {
                    TransportConnection.this.serviceException(e);
                }
            }.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void serviceException(Throwable e) {
        if (e instanceof IOException) {
            this.serviceTransportException((IOException)e);
        } else if (e.getClass() == BrokerStoppedException.class) {
            if (!this.disposed.get()) {
                if (serviceLog.isDebugEnabled()) {
                    serviceLog.debug("Broker has been stopped.  Notifying client and closing his connection.");
                }
                ConnectionError ce = new ConnectionError();
                ce.setException(e);
                this.dispatchSync(ce);
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
                ServiceSupport.dispose(this);
            }
        } else if (!this.disposed.get() && !this.inServiceException) {
            this.inServiceException = true;
            try {
                if (serviceLog.isDebugEnabled()) {
                    serviceLog.debug("Async error occurred: " + e, e);
                }
                ConnectionError ce = new ConnectionError();
                ce.setException(e);
                this.dispatchAsync(ce);
            }
            finally {
                this.inServiceException = false;
            }
        }
    }

    public Response service(Command command) {
        Response response = null;
        boolean responseRequired = command.isResponseRequired();
        int commandId = command.getCommandId();
        try {
            response = command.visit(this);
        }
        catch (Throwable e) {
            if (responseRequired) {
                if (serviceLog.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
                    serviceLog.debug("Error occured while processing sync command: " + e, e);
                }
                response = new ExceptionResponse(e);
            }
            this.serviceException(e);
        }
        if (responseRequired) {
            if (this.context == null || !this.context.isDontSendReponse()) {
                if (response == null) {
                    response = new Response();
                }
                response.setCorrelationId(commandId);
            }
            if (this.context != null) {
                this.context.setDontSendReponse(false);
                this.context = null;
            }
        }
        return response;
    }

    protected ConnectionState lookupConnectionState(ConsumerId id) {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(id.getParentId().getParentId());
        if (cs == null) {
            throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
        }
        return cs;
    }

    protected ConnectionState lookupConnectionState(ProducerId id) {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(id.getParentId().getParentId());
        if (cs == null) {
            throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
        }
        return cs;
    }

    protected ConnectionState lookupConnectionState(SessionId id) {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(id.getParentId());
        if (cs == null) {
            throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
        }
        return cs;
    }

    protected ConnectionState lookupConnectionState(ConnectionId connectionId) {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(connectionId);
        if (cs == null) {
            throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
        }
        return cs;
    }

    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
        return null;
    }

    public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
        this.broker.removeSubscription(this.lookupConnectionState(info.getConnectionId()).getContext(), info);
        return null;
    }

    public Response processWireFormat(WireFormatInfo info) throws Exception {
        this.wireFormatInfo = info;
        return null;
    }

    public Response processShutdown(ShutdownInfo info) throws Exception {
        this.stop();
        return null;
    }

    public Response processFlush(FlushCommand command) throws Exception {
        return null;
    }

    public synchronized Response processBeginTransaction(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        if (cs.getTransactionState(info.getTransactionId()) == null) {
            cs.addTransactionState(info.getTransactionId());
            this.broker.beginTransaction(context, info.getTransactionId());
        }
        return null;
    }

    public synchronized Response processEndTransaction(TransactionInfo info) throws Exception {
        return null;
    }

    public synchronized Response processPrepareTransaction(TransactionInfo info) throws Exception {
        TransactionState transactionState;
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        if ((transactionState = cs.getTransactionState(info.getTransactionId())) == null) {
            throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
        }
        if (!transactionState.isPrepared()) {
            transactionState.setPrepared(true);
            int result = this.broker.prepareTransaction(context, info.getTransactionId());
            transactionState.setPreparedResult(result);
            IntegerResponse response = new IntegerResponse(result);
            return response;
        }
        IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
        return response;
    }

    public synchronized Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        cs.removeTransactionState(info.getTransactionId());
        this.broker.commitTransaction(context, info.getTransactionId(), true);
        return null;
    }

    public synchronized Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        cs.removeTransactionState(info.getTransactionId());
        this.broker.commitTransaction(context, info.getTransactionId(), false);
        return null;
    }

    public synchronized Response processRollbackTransaction(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        cs.removeTransactionState(info.getTransactionId());
        this.broker.rollbackTransaction(context, info.getTransactionId());
        return null;
    }

    public synchronized Response processForgetTransaction(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        this.broker.forgetTransaction(context, info.getTransactionId());
        return null;
    }

    public synchronized Response processRecoverTransactions(TransactionInfo info) throws Exception {
        ConnectionState cs = (ConnectionState)this.localConnectionStates.get(info.getConnectionId());
        ConnectionContext context = null;
        if (cs != null) {
            context = cs.getContext();
        }
        DataStructure[] preparedTransactions = this.broker.getPreparedTransactions(context);
        return new DataArrayResponse(preparedTransactions);
    }

    public Response processMessage(Message messageSend) throws Exception {
        ProducerId producerId = messageSend.getProducerId();
        ConnectionState state = this.lookupConnectionState(producerId);
        this.context = state.getContext();
        ProducerState producerState = null;
        if (messageSend.getMessageId().getProducerId().equals(messageSend.getProducerId())) {
            SessionState ss = state.getSessionState(producerId.getParentId());
            if (ss == null) {
                throw new IllegalStateException("Cannot send from a session that had not been registered: " + producerId.getParentId());
            }
            producerState = ss.getProducerState(producerId);
        }
        if (producerState == null) {
            this.broker.send(this.context, messageSend);
        } else {
            long seq = messageSend.getMessageId().getProducerSequenceId();
            if (seq > producerState.getLastSequenceId()) {
                producerState.setLastSequenceId(seq);
                this.broker.send(this.context, messageSend);
            }
        }
        return null;
    }

    public Response processMessageAck(MessageAck ack) throws Exception {
        this.broker.acknowledge(this.lookupConnectionState(ack.getConsumerId()).getContext(), ack);
        return null;
    }

    public Response processMessagePull(MessagePull pull) throws Exception {
        return this.broker.messagePull(this.lookupConnectionState(pull.getConsumerId()).getContext(), pull);
    }

    public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
        this.broker.processDispatchNotification(notification);
        return null;
    }

    public synchronized Response processAddDestination(DestinationInfo info) throws Exception {
        ConnectionState cs = this.lookupConnectionState(info.getConnectionId());
        this.broker.addDestinationInfo(cs.getContext(), info);
        if (info.getDestination().isTemporary()) {
            cs.addTempDestination(info);
        }
        return null;
    }

    public synchronized Response processRemoveDestination(DestinationInfo info) throws Exception {
        ConnectionState cs = this.lookupConnectionState(info.getConnectionId());
        this.broker.removeDestinationInfo(cs.getContext(), info);
        if (info.getDestination().isTemporary()) {
            cs.removeTempDestination(info.getDestination());
        }
        return null;
    }

    public synchronized Response processAddProducer(ProducerInfo info) throws Exception {
        SessionId sessionId = info.getProducerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
        }
        if (!ss.getProducerIds().contains(info.getProducerId())) {
            this.broker.addProducer(cs.getContext(), info);
            try {
                ss.addProducer(info);
            }
            catch (IllegalStateException e) {
                this.broker.removeProducer(cs.getContext(), info);
            }
        }
        return null;
    }

    public synchronized Response processRemoveProducer(ProducerId id) throws Exception {
        SessionId sessionId = id.getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
        }
        ProducerState ps = ss.removeProducer(id);
        if (ps == null) {
            throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
        }
        this.broker.removeProducer(cs.getContext(), ps.getInfo());
        return null;
    }

    public synchronized Response processAddConsumer(ConsumerInfo info) throws Exception {
        SessionId sessionId = info.getConsumerId().getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
        }
        if (!ss.getConsumerIds().contains(info.getConsumerId())) {
            this.broker.addConsumer(cs.getContext(), info);
            try {
                ss.addConsumer(info);
            }
            catch (IllegalStateException e) {
                this.broker.removeConsumer(cs.getContext(), info);
            }
        }
        return null;
    }

    public synchronized Response processRemoveConsumer(ConsumerId id) throws Exception {
        SessionId sessionId = id.getParentId();
        ConnectionId connectionId = sessionId.getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        SessionState ss = cs.getSessionState(sessionId);
        if (ss == null) {
            throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
        }
        ConsumerState consumerState = ss.removeConsumer(id);
        if (consumerState == null) {
            throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
        }
        this.broker.removeConsumer(cs.getContext(), consumerState.getInfo());
        return null;
    }

    public synchronized Response processAddSession(SessionInfo info) throws Exception {
        ConnectionId connectionId = info.getSessionId().getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        if (!cs.getSessionIds().contains(info.getSessionId())) {
            this.broker.addSession(cs.getContext(), info);
            try {
                cs.addSession(info);
            }
            catch (IllegalStateException e) {
                this.broker.removeSession(cs.getContext(), info);
            }
        }
        return null;
    }

    public synchronized Response processRemoveSession(SessionId id) throws Exception {
        ConnectionId connectionId = id.getParentId();
        ConnectionState cs = this.lookupConnectionState(connectionId);
        SessionState session = cs.getSessionState(id);
        if (session == null) {
            throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
        }
        session.shutdown();
        Iterator iter = session.getConsumerIds().iterator();
        while (iter.hasNext()) {
            ConsumerId consumerId = (ConsumerId)iter.next();
            try {
                this.processRemoveConsumer(consumerId);
            }
            catch (Throwable e) {
                log.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
            }
        }
        iter = session.getProducerIds().iterator();
        while (iter.hasNext()) {
            ProducerId producerId = (ProducerId)iter.next();
            try {
                this.processRemoveProducer(producerId);
            }
            catch (Throwable e) {
                log.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
            }
        }
        cs.removeSession(id);
        this.broker.removeSession(cs.getContext(), session.getInfo());
        return null;
    }

    public synchronized Response processAddConnection(ConnectionInfo info) throws Exception {
        ConnectionState state = (ConnectionState)this.brokerConnectionStates.get(info.getConnectionId());
        if (state != null && this != state.getConnection()) {
            log.debug("Killing previous stale connection: " + state.getConnection());
            state.getConnection().stop();
            if (!state.getConnection().stopLatch.await(15L, TimeUnit.SECONDS)) {
                throw new Exception("Previous connection could not be clean up.");
            }
        }
        log.debug("Setting up new connection: " + this);
        String clientId = info.getClientId();
        this.context = new ConnectionContext();
        this.context.setConnection(this);
        this.context.setBroker(this.broker);
        this.context.setConnector(this.connector);
        this.context.setTransactions(new ConcurrentHashMap());
        this.context.setClientId(clientId);
        this.context.setUserName(info.getUserName());
        this.context.setConnectionId(info.getConnectionId());
        this.context.setWireFormatInfo(this.wireFormatInfo);
        this.context.setNetworkConnection(this.networkConnection);
        this.context.incrementReference();
        this.manageable = info.isManageable();
        state = new ConnectionState(info, this.context, this);
        this.brokerConnectionStates.put(info.getConnectionId(), state);
        this.localConnectionStates.put(info.getConnectionId(), state);
        this.broker.addConnection(this.context, info);
        if (info.isManageable() && this.broker.isFaultTolerantConfiguration()) {
            ConnectionControl command = new ConnectionControl();
            command.setFaultTolerant(this.broker.isFaultTolerantConfiguration());
            this.dispatchAsync(command);
        }
        return null;
    }

    public synchronized Response processRemoveConnection(ConnectionId id) {
        ConnectionState cs = this.lookupConnectionState(id);
        cs.shutdown();
        Iterator iter = cs.getSessionIds().iterator();
        while (iter.hasNext()) {
            SessionId sessionId = (SessionId)iter.next();
            try {
                this.processRemoveSession(sessionId);
            }
            catch (Throwable e) {
                serviceLog.warn("Failed to remove session " + sessionId, e);
            }
        }
        iter = cs.getTempDesinations().iterator();
        while (iter.hasNext()) {
            DestinationInfo di = (DestinationInfo)iter.next();
            try {
                this.broker.removeDestination(cs.getContext(), di.getDestination(), 0L);
            }
            catch (Throwable e) {
                serviceLog.warn("Failed to remove tmp destination " + di.getDestination(), e);
            }
            iter.remove();
        }
        try {
            this.broker.removeConnection(cs.getContext(), cs.getInfo(), null);
        }
        catch (Throwable e) {
            serviceLog.warn("Failed to remove connection " + cs.getInfo(), e);
        }
        ConnectionState state = (ConnectionState)this.localConnectionStates.remove(id);
        if (state != null && state.getContext().decrementReference() == 0) {
            this.brokerConnectionStates.remove(id);
        }
        return null;
    }

    public Connector getConnector() {
        return this.connector;
    }

    public void dispatchSync(Command message) {
        this.getStatistics().getEnqueues().increment();
        try {
            this.processDispatch(message);
        }
        catch (IOException e) {
            this.serviceExceptionAsync(e);
        }
    }

    public void dispatchAsync(Command message) {
        if (!this.disposed.get()) {
            this.getStatistics().getEnqueues().increment();
            if (this.taskRunner == null) {
                this.dispatchSync(message);
            } else {
                this.dispatchQueue.add(message);
                try {
                    this.taskRunner.wakeup();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } else if (message.isMessageDispatch()) {
            MessageDispatch md = (MessageDispatch)message;
            Runnable sub = md.getTransmitCallback();
            this.broker.processDispatch(md);
            if (sub != null) {
                sub.run();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processDispatch(Command command) throws IOException {
        try {
            if (!this.disposed.get()) {
                this.dispatch(command);
            }
        }
        finally {
            if (command.isMessageDispatch()) {
                MessageDispatch md = (MessageDispatch)command;
                Runnable sub = md.getTransmitCallback();
                this.broker.processDispatch(md);
                Object consumer = md.getConsumer();
                if (sub != null) {
                    sub.run();
                }
            }
            this.getStatistics().getDequeues().increment();
        }
    }

    public boolean iterate() {
        try {
            if (this.disposed.get()) {
                if (this.dispatchStopped.compareAndSet(false, true)) {
                    if (this.transportException.get() == null) {
                        try {
                            this.dispatch(new ShutdownInfo());
                        }
                        catch (Throwable ignore) {
                            // empty catch block
                        }
                    }
                    this.dispatchStoppedLatch.countDown();
                }
                return false;
            }
            if (!this.dispatchStopped.get()) {
                if (this.dispatchQueue.isEmpty()) {
                    return false;
                }
                Command command = (Command)this.dispatchQueue.remove(0);
                this.processDispatch(command);
                return true;
            }
            return false;
        }
        catch (IOException e) {
            if (this.dispatchStopped.compareAndSet(false, true)) {
                this.dispatchStoppedLatch.countDown();
            }
            this.serviceExceptionAsync(e);
            return false;
        }
    }

    public ConnectionStatistics getStatistics() {
        return this.statistics;
    }

    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
        return this.messageAuthorizationPolicy;
    }

    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
    }

    public boolean isManageable() {
        return this.manageable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void start() throws Exception {
        this.starting = true;
        try {
            this.transport.start();
            this.active = true;
            this.processDispatch(this.connector.getBrokerInfo());
            this.connector.onStarted(this);
        }
        finally {
            this.starting = false;
            if (this.pendingStop) {
                log.debug("Calling the delayed stop()");
                this.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        TransportConnection transportConnection = this;
        synchronized (transportConnection) {
            this.pendingStop = true;
            if (this.starting) {
                log.debug("stop() called in the middle of start(). Delaying...");
                return;
            }
        }
        if (this.stopped.compareAndSet(false, true)) {
            log.debug("Stopping connection: " + this.transport.getRemoteAddress());
            this.connector.onStopped(this);
            try {
                if (this.masterBroker != null) {
                    this.masterBroker.stop();
                }
            }
            catch (Exception ignore) {
                // empty catch block
            }
            if (this.disposed.compareAndSet(false, true)) {
                ArrayList l = new ArrayList(this.localConnectionStates.values());
                Iterator iter = l.iterator();
                while (iter.hasNext()) {
                    ConnectionState cs = (ConnectionState)iter.next();
                    cs.getContext().getStopping().set(true);
                }
                if (this.taskRunner != null) {
                    this.taskRunner.wakeup();
                    this.dispatchStoppedLatch.await(5L, TimeUnit.SECONDS);
                    this.disposeTransport();
                    this.taskRunner.shutdown();
                } else {
                    this.disposeTransport();
                }
                iter = this.dispatchQueue.iterator();
                while (iter.hasNext()) {
                    Command command = (Command)iter.next();
                    if (!command.isMessageDispatch()) continue;
                    MessageDispatch md = (MessageDispatch)command;
                    Runnable sub = md.getTransmitCallback();
                    this.broker.processDispatch(md);
                    if (sub == null) continue;
                    sub.run();
                }
                if (!this.broker.isStopped()) {
                    l = new ArrayList(this.localConnectionStates.keySet());
                    iter = l.iterator();
                    while (iter.hasNext()) {
                        ConnectionId connectionId = (ConnectionId)iter.next();
                        try {
                            log.debug("Cleaning up connection resources.");
                            this.processRemoveConnection(connectionId);
                        }
                        catch (Throwable ignore) {
                            ignore.printStackTrace();
                        }
                    }
                    if (this.brokerInfo != null) {
                        this.broker.removeBroker(this, this.brokerInfo);
                    }
                }
                this.stopLatch.countDown();
            }
        }
    }

    public boolean isBlockedCandidate() {
        return this.blockedCandidate;
    }

    public void setBlockedCandidate(boolean blockedCandidate) {
        this.blockedCandidate = blockedCandidate;
    }

    public boolean isMarkedCandidate() {
        return this.markedCandidate;
    }

    public void setMarkedCandidate(boolean markedCandidate) {
        this.markedCandidate = markedCandidate;
        if (!markedCandidate) {
            this.timeStamp = 0L;
            this.blockedCandidate = false;
        }
    }

    public void setSlow(boolean slow) {
        this.slow = slow;
    }

    public boolean isSlow() {
        return this.slow;
    }

    public boolean isMarkedBlockedCandidate() {
        return this.markedCandidate;
    }

    public void doMark() {
        if (this.timeStamp == 0L) {
            this.timeStamp = System.currentTimeMillis();
        }
    }

    public boolean isBlocked() {
        return this.blocked;
    }

    public boolean isConnected() {
        return this.connected;
    }

    public void setBlocked(boolean blocked) {
        this.blocked = blocked;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
    }

    public boolean isActive() {
        return this.active;
    }

    public void setActive(boolean active) {
        this.active = active;
    }

    public synchronized boolean isStarting() {
        return this.starting;
    }

    protected synchronized void setStarting(boolean starting) {
        this.starting = starting;
    }

    public synchronized boolean isPendingStop() {
        return this.pendingStop;
    }

    protected synchronized void setPendingStop(boolean pendingStop) {
        this.pendingStop = pendingStop;
    }

    public Response processBrokerInfo(BrokerInfo info) {
        if (info.isSlaveBroker()) {
            MutableBrokerFilter parent = (MutableBrokerFilter)this.broker.getAdaptor(MutableBrokerFilter.class);
            this.masterBroker = new MasterBroker(parent, this.transport);
            this.masterBroker.startProcessing();
            log.info("Slave Broker " + info.getBrokerName() + " is attached");
        }
        if (this.brokerInfo != null) {
            log.warn("Unexpected extra broker info command received: " + info);
        }
        this.brokerInfo = info;
        this.broker.addBroker(this, info);
        this.networkConnection = true;
        Iterator iter = this.localConnectionStates.values().iterator();
        while (iter.hasNext()) {
            ConnectionState cs = (ConnectionState)iter.next();
            cs.getContext().setNetworkConnection(true);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void dispatch(Command command) throws IOException {
        try {
            this.setMarkedCandidate(true);
            this.transport.oneway(command);
        }
        finally {
            this.setMarkedCandidate(false);
        }
    }

    public String getRemoteAddress() {
        return this.transport.getRemoteAddress();
    }

    public String getConnectionId() {
        Iterator iterator = this.localConnectionStates.values().iterator();
        ConnectionState object = (ConnectionState)iterator.next();
        if (object == null) {
            return null;
        }
        if (object.getInfo().getClientId() != null) {
            return object.getInfo().getClientId();
        }
        return object.getInfo().getConnectionId().toString();
    }

    protected void disposeTransport() {
        if (this.transportDisposed.compareAndSet(false, true)) {
            try {
                this.transport.stop();
                this.active = false;
                log.debug("Stopped connection: " + this.transport.getRemoteAddress());
            }
            catch (Exception e) {
                log.debug("Could not stop transport: " + e, e);
            }
        }
    }

    static class ConnectionState
    extends org.apache.activemq.state.ConnectionState {
        private final ConnectionContext context;
        TransportConnection connection;

        public ConnectionState(ConnectionInfo info, ConnectionContext context, TransportConnection connection) {
            super(info);
            this.context = context;
            this.connection = connection;
        }

        public ConnectionContext getContext() {
            return this.context;
        }

        public TransportConnection getConnection() {
            return this.connection;
        }
    }
}

