/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.advisory;

import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.advisory.ConsumerEvent;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ConsumerListener;

public class ConsumerListenerTest
extends EmbeddedBrokerTestSupport
implements ConsumerListener {
    protected Session consumerSession1;
    protected Session consumerSession2;
    protected int consumerCounter;
    protected ConsumerEventSource consumerEventSource;
    protected BlockingQueue eventQueue = new ArrayBlockingQueue(1000);
    private Connection connection;

    public void testConsumerEvents() throws Exception {
        this.consumerEventSource.start();
        this.consumerSession1 = this.createConsumer();
        this.assertConsumerEvent(1, true);
        this.consumerSession2 = this.createConsumer();
        this.assertConsumerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        this.assertConsumerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        this.assertConsumerEvent(0, false);
    }

    public void testListenWhileAlreadyConsumersActive() throws Exception {
        this.consumerSession1 = this.createConsumer();
        this.consumerSession2 = this.createConsumer();
        this.consumerEventSource.start();
        this.assertConsumerEvent(2, true);
        this.assertConsumerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        this.assertConsumerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        this.assertConsumerEvent(0, false);
    }

    public void onConsumerEvent(ConsumerEvent event) {
        this.eventQueue.add(event);
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.connection = this.createConnection();
        this.connection.start();
        this.consumerEventSource = new ConsumerEventSource(this.connection, this.destination);
        this.consumerEventSource.setConsumerListener(this);
    }

    protected void tearDown() throws Exception {
        if (this.consumerEventSource != null) {
            this.consumerEventSource.stop();
        }
        if (this.consumerSession2 != null) {
            this.consumerSession2.close();
        }
        if (this.consumerSession1 != null) {
            this.consumerSession1.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
        super.tearDown();
    }

    protected void assertConsumerEvent(int count, boolean started) throws InterruptedException {
        ConsumerEvent event = this.waitForConsumerEvent();
        ConsumerListenerTest.assertEquals((String)"Consumer count", (int)count, (int)event.getConsumerCount());
        ConsumerListenerTest.assertEquals((String)"started", (boolean)started, (boolean)event.isStarted());
    }

    protected Session createConsumer() throws JMSException {
        final String consumerText = "Consumer: " + ++this.consumerCounter;
        log.info("Creating consumer: " + consumerText + " on destination: " + this.destination);
        Session answer = this.connection.createSession(false, 1);
        MessageConsumer consumer = answer.createConsumer(this.destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                log.info("Received message by: " + consumerText + " message: " + message);
            }
        });
        return answer;
    }

    protected ConsumerEvent waitForConsumerEvent() throws InterruptedException {
        ConsumerEvent answer = (ConsumerEvent)this.eventQueue.poll(100000L, TimeUnit.MILLISECONDS);
        ConsumerListenerTest.assertTrue((String)"Should have received a consumer event!", (answer != null ? 1 : 0) != 0);
        return answer;
    }
}

