/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class LargeStreamletTest
extends TestCase {
    private static final Log log = LogFactory.getLog(LargeStreamletTest.class);
    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
    private static final int BUFFER_SIZE = 1024;
    private static final int MESSAGE_COUNT = 0x100000;
    private AtomicInteger totalRead = new AtomicInteger();
    private AtomicInteger totalWritten = new AtomicInteger();
    private AtomicBoolean stopThreads = new AtomicBoolean(false);
    protected Exception writerException;
    protected Exception readerException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testStreamlets() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
        connection.start();
        final ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
        connection2.start();
        try {
            final ActiveMQQueue destination = new ActiveMQQueue("streamtest");
            Thread readerThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void run() {
                    LargeStreamletTest.this.totalRead.set(0);
                    try {
                        InputStream inputStream = connection.createInputStream(destination);
                        try {
                            int read;
                            byte[] buf = new byte[1024];
                            while (!LargeStreamletTest.this.stopThreads.get() && (read = inputStream.read(buf)) != -1) {
                                LargeStreamletTest.this.totalRead.addAndGet(read);
                            }
                        }
                        finally {
                            inputStream.close();
                        }
                    }
                    catch (Exception e) {
                        LargeStreamletTest.this.readerException = e;
                        e.printStackTrace();
                    }
                    finally {
                        log.info(LargeStreamletTest.this.totalRead + " total bytes read.");
                    }
                }
            });
            Thread writerThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void run() {
                    LargeStreamletTest.this.totalWritten.set(0);
                    try {
                        OutputStream outputStream = connection2.createOutputStream(destination);
                        try {
                            byte[] buf = new byte[1024];
                            new Random().nextBytes(buf);
                            for (int count = 0x100000; count > 0 && !LargeStreamletTest.this.stopThreads.get(); --count) {
                                outputStream.write(buf);
                                LargeStreamletTest.this.totalWritten.addAndGet(buf.length);
                            }
                        }
                        finally {
                            outputStream.close();
                        }
                    }
                    catch (Exception e) {
                        LargeStreamletTest.this.writerException = e;
                        e.printStackTrace();
                    }
                    finally {
                        log.info(LargeStreamletTest.this.totalWritten + " total bytes written.");
                    }
                }
            });
            readerThread.start();
            writerThread.start();
            Thread.sleep(1000L);
            int lastRead = this.totalRead.get();
            while (readerThread.isAlive()) {
                readerThread.join(1000L);
                if (lastRead == this.totalRead.get()) break;
                lastRead = this.totalRead.get();
            }
            this.stopThreads.set(true);
            LargeStreamletTest.assertTrue((String)"Should not have received a reader exception", (this.readerException == null ? 1 : 0) != 0);
            LargeStreamletTest.assertTrue((String)"Should not have received a writer exception", (this.writerException == null ? 1 : 0) != 0);
            Assert.assertEquals((String)"Not all messages accounted for", (int)this.totalWritten.get(), (int)this.totalRead.get());
        }
        finally {
            connection.close();
            connection2.close();
        }
    }
}

