/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.command.ActiveMQDestination;

public class TwoBrokerQueueClientsReconnectTest
extends JmsMultipleBrokersTestSupport {
    protected static final int MESSAGE_COUNT = 100;
    protected static final int PREFETCH_COUNT = 1;
    protected int msgsClient1;
    protected int msgsClient2;
    protected String broker1;
    protected String broker2;

    public void testClientAReceivesOnly() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doOneClientReceivesOnly();
    }

    public void testClientBReceivesOnly() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doOneClientReceivesOnly();
    }

    public void doOneClientReceivesOnly() throws Exception {
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", dest, 100);
        client2.close();
        this.msgsClient1 += this.receiveAllMessages(client1);
        client1.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have receive all messages."), (int)100, (int)this.msgsClient1);
    }

    public void testClientAReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doOneClientReceivesOnlyAfterReconnect();
    }

    public void testClientBReceivesOnlyAfterReconnect() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doOneClientReceivesOnlyAfterReconnect();
    }

    public void doOneClientReceivesOnlyAfterReconnect() throws Exception {
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        client1.close();
        client1 = this.createConsumer(this.broker1, dest);
        Thread.sleep(500L);
        client2.close();
        this.msgsClient1 += this.receiveAllMessages(client1);
        client1.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received all messages."), (int)100, (int)this.msgsClient1);
    }

    public void testTwoClientsReceiveClientADisconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doTwoClientsReceiveOneClientDisconnects();
    }

    public void testTwoClientsReceiveClientBDisconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doTwoClientsReceiveOneClientDisconnects();
    }

    public void doTwoClientsReceiveOneClientDisconnects() throws Exception {
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client1.close();
        this.msgsClient2 += this.receiveAllMessages(client2);
        client2.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 20% of the messages."), (int)20, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 80% of the messages."), (int)80, (int)this.msgsClient2);
    }

    public void testTwoClientsReceiveClientAReconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.doTwoClientsReceiveOneClientReconnects();
    }

    public void testTwoClientsReceiveClientBReconnects() throws Exception {
        this.broker1 = "BrokerB";
        this.broker2 = "BrokerA";
        this.doTwoClientsReceiveOneClientReconnects();
    }

    public void doTwoClientsReceiveOneClientReconnects() throws Exception {
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client1.close();
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client1 = this.createConsumer(this.broker1, dest);
        Thread.sleep(500L);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        client1.close();
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client2.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 40% of the messages."), (int)40, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 60% of the messages."), (int)60, (int)this.msgsClient2);
    }

    public void testTwoClientsReceiveTwoClientReconnects() throws Exception {
        this.broker1 = "BrokerA";
        this.broker2 = "BrokerB";
        this.bridgeBrokers(this.broker1, this.broker2);
        this.bridgeBrokers(this.broker2, this.broker1);
        this.startAllBrokers();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", false);
        MessageConsumer client1 = this.createConsumer(this.broker1, dest);
        MessageConsumer client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.sendMessages("BrokerA", dest, 100);
        this.msgsClient1 += this.receiveExactMessages(client1, 20);
        this.msgsClient2 += this.receiveExactMessages(client2, 20);
        client1.close();
        client2.close();
        client1 = this.createConsumer(this.broker1, dest);
        client2 = this.createConsumer(this.broker2, dest);
        Thread.sleep(500L);
        this.msgsClient1 += this.receiveExactMessages(client1, 30);
        client1.close();
        this.msgsClient2 += this.receiveExactMessages(client2, 30);
        client2.close();
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker1 + " should have received 50% of the messages."), (int)50, (int)this.msgsClient1);
        TwoBrokerQueueClientsReconnectTest.assertEquals((String)("Client for " + this.broker2 + " should have received 50% of the messages."), (int)50, (int)this.msgsClient2);
    }

    protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
        int i;
        for (i = 0; i < msgCount; ++i) {
            Message msg = consumer.receive(1000L);
            if (msg != null) continue;
            System.err.println("Consumer failed to receive exactly " + msgCount + " messages. Actual messages received is: " + i);
            break;
        }
        return i;
    }

    protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
        Message msg;
        int msgsReceived = 0;
        do {
            if ((msg = consumer.receive(1000L)) == null) continue;
            ++msgsReceived;
        } while (msg != null);
        return msgsReceived;
    }

    protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
        Connection conn = this.createConnection(brokerName);
        conn.start();
        Session sess = conn.createSession(false, 1);
        return sess.createConsumer(dest);
    }

    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        this.createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        this.createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        ActiveMQConnectionFactory factoryA = (ActiveMQConnectionFactory)this.getConnectionFactory("BrokerA");
        ActiveMQConnectionFactory factoryB = (ActiveMQConnectionFactory)this.getConnectionFactory("BrokerB");
        ActiveMQPrefetchPolicy policy = new ActiveMQPrefetchPolicy();
        policy.setAll(1);
        factoryA.setPrefetchPolicy(policy);
        factoryB.setPrefetchPolicy(policy);
        this.msgsClient1 = 0;
        this.msgsClient2 = 0;
    }
}

