/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class NetworkReconnectTest
extends TestCase {
    protected static final Log log = LogFactory.getLog(NetworkReconnectTest.class);
    private BrokerService producerBroker;
    private BrokerService consumerBroker;
    private ActiveMQConnectionFactory producerConnectionFactory;
    private ActiveMQConnectionFactory consumerConnectionFactory;
    private Destination destination;
    private ArrayList connections = new ArrayList();

    public void testMultipleProducerBrokerRestarts() throws Exception {
        for (int i = 0; i < 10; ++i) {
            this.testWithProducerBrokerRestart();
            this.disposeConsumerConnections();
        }
    }

    public void testWithoutRestarts() throws Exception {
        this.startProducerBroker();
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        AtomicInteger counter = this.createConsumerCounter(this.producerConnectionFactory);
        this.waitForConsumerToArrive(counter);
        String messageId = this.sendMessage();
        Message message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void testWithProducerBrokerRestart() throws Exception {
        this.startProducerBroker();
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        AtomicInteger counter = this.createConsumerCounter(this.producerConnectionFactory);
        this.waitForConsumerToArrive(counter);
        String messageId = this.sendMessage();
        Message message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
        this.stopProducerBroker();
        this.startProducerBroker();
        counter = this.createConsumerCounter(this.producerConnectionFactory);
        this.waitForConsumerToArrive(counter);
        messageId = this.sendMessage();
        message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void testWithConsumerBrokerRestart() throws Exception {
        this.startProducerBroker();
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        AtomicInteger counter = this.createConsumerCounter(this.producerConnectionFactory);
        this.waitForConsumerToArrive(counter);
        String messageId = this.sendMessage();
        Message message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
        this.stopConsumerBroker();
        this.waitForConsumerToLeave(counter);
        this.startConsumerBroker();
        consumer = this.createConsumer();
        this.waitForConsumerToArrive(counter);
        messageId = this.sendMessage();
        message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void testWithConsumerBrokerStartDelay() throws Exception {
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        Thread.sleep(5000L);
        this.startProducerBroker();
        AtomicInteger counter = this.createConsumerCounter(this.producerConnectionFactory);
        this.waitForConsumerToArrive(counter);
        String messageId = this.sendMessage();
        Message message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
    }

    public void testWithProducerBrokerStartDelay() throws Exception {
        this.startProducerBroker();
        AtomicInteger counter = this.createConsumerCounter(this.producerConnectionFactory);
        Thread.sleep(5000L);
        this.startConsumerBroker();
        MessageConsumer consumer = this.createConsumer();
        this.waitForConsumerToArrive(counter);
        String messageId = this.sendMessage();
        Message message = consumer.receive(1000L);
        NetworkReconnectTest.assertEquals((String)messageId, (String)message.getJMSMessageID());
        NetworkReconnectTest.assertNull((Object)consumer.receiveNoWait());
    }

    protected void setUp() throws Exception {
        log.info("===============================================================================");
        log.info("Running Test Case: " + this.getName());
        log.info("===============================================================================");
        this.producerConnectionFactory = this.createProducerConnectionFactory();
        this.consumerConnectionFactory = this.createConsumerConnectionFactory();
        this.destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
    }

    protected void tearDown() throws Exception {
        this.disposeConsumerConnections();
        try {
            this.stopProducerBroker();
        }
        catch (Throwable e) {
            // empty catch block
        }
        try {
            this.stopConsumerBroker();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    protected void disposeConsumerConnections() {
        Iterator iter = this.connections.iterator();
        while (iter.hasNext()) {
            Connection connection = (Connection)iter.next();
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    protected void startProducerBroker() throws Exception {
        if (this.producerBroker == null) {
            this.producerBroker = this.createFirstBroker();
            this.producerBroker.start();
        }
    }

    protected void stopProducerBroker() throws Exception {
        if (this.producerBroker != null) {
            this.producerBroker.stop();
            this.producerBroker = null;
        }
    }

    protected void startConsumerBroker() throws Exception {
        if (this.consumerBroker == null) {
            this.consumerBroker = this.createSecondBroker();
            this.consumerBroker.start();
        }
    }

    protected void stopConsumerBroker() throws Exception {
        if (this.consumerBroker != null) {
            this.consumerBroker.stop();
            this.consumerBroker = null;
        }
    }

    protected BrokerService createFirstBroker() throws Exception {
        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
    }

    protected BrokerService createSecondBroker() throws Exception {
        return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
    }

    protected ActiveMQConnectionFactory createProducerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker1");
    }

    protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected String sendMessage() throws JMSException {
        Connection connection = null;
        try {
            connection = this.producerConnectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(this.destination);
            Message message = session.createMessage();
            producer.send(message);
            String string = message.getJMSMessageID();
            return string;
        }
        finally {
            try {
                connection.close();
            }
            catch (Throwable ignore) {}
        }
    }

    protected MessageConsumer createConsumer() throws JMSException {
        Connection connection = this.consumerConnectionFactory.createConnection();
        this.connections.add(connection);
        connection.start();
        Session session = connection.createSession(false, 1);
        return session.createConsumer(this.destination);
    }

    protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
        final AtomicInteger rc = new AtomicInteger(0);
        Connection connection = cf.createConnection();
        this.connections.add(connection);
        connection.start();
        ConsumerEventSource source = new ConsumerEventSource(connection, this.destination);
        source.setConsumerListener(new ConsumerListener(){

            public void onConsumerEvent(ConsumerEvent event) {
                rc.set(event.getConsumerCount());
            }
        });
        source.start();
        return rc;
    }

    protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException {
        for (int i = 0; i < 100; ++i) {
            if (consumerCounter.get() > 0) {
                return;
            }
            Thread.sleep(100L);
        }
        NetworkReconnectTest.fail((String)"The consumer did not arrive.");
    }

    protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException {
        for (int i = 0; i < 100; ++i) {
            if (consumerCounter.get() == 0) {
                return;
            }
            Thread.sleep(100L);
        }
        NetworkReconnectTest.fail((String)"The consumer did not leave.");
    }
}

