/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.stomp;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;

public class StompTest
extends CombinationTestSupport {
    private BrokerService broker;
    private TransportConnector connector;
    private Socket stompSocket;
    private ByteArrayOutputStream inputBuffer;
    private Connection connection;
    private Session session;
    private ActiveMQQueue queue;
    protected String bindAddress = "stomp://localhost:0";

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.connector = this.broker.addConnector(this.bindAddress);
        this.broker.start();
        URI connectUri = this.connector.getConnectUri();
        this.stompSocket = this.createSocket(connectUri);
        this.inputBuffer = new ByteArrayOutputStream();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        this.connection = cf.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.queue = new ActiveMQQueue(this.getQueueName());
        this.connection.start();
    }

    protected Socket createSocket(URI connectUri) throws IOException {
        return new Socket("127.0.0.1", connectUri.getPort());
    }

    protected String getQueueName() {
        return ((Object)((Object)this)).getClass().getName() + "." + this.getName();
    }

    protected void tearDown() throws Exception {
        this.connection.close();
        this.stompSocket.close();
        this.broker.stop();
    }

    public void sendFrame(String data) throws Exception {
        byte[] bytes = data.getBytes("UTF-8");
        OutputStream outputStream = this.stompSocket.getOutputStream();
        for (int i = 0; i < bytes.length; ++i) {
            outputStream.write(bytes[i]);
        }
        outputStream.flush();
    }

    public String receiveFrame(long timeOut) throws Exception {
        this.stompSocket.setSoTimeout((int)timeOut);
        InputStream is = this.stompSocket.getInputStream();
        int c = 0;
        while (true) {
            if ((c = is.read()) < 0) {
                throw new IOException("socket closed.");
            }
            if (c == 0) {
                c = is.read();
                StompTest.assertEquals((String)"Expecting stomp frame to terminate with \u0000\n", (int)c, (int)10);
                byte[] ba = this.inputBuffer.toByteArray();
                this.inputBuffer.reset();
                return new String(ba, "UTF-8");
            }
            this.inputBuffer.write(c);
        }
    }

    public void sendMessage(String msg) throws Exception {
        this.sendMessage(msg, "foo", "xyz");
    }

    public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
        MessageProducer producer = this.session.createProducer(this.queue);
        TextMessage message = this.session.createTextMessage(msg);
        message.setStringProperty(propertyName, propertyValue);
        producer.send(message);
    }

    public void sendBytesMessage(byte[] msg) throws Exception {
        MessageProducer producer = this.session.createProducer(this.queue);
        BytesMessage message = this.session.createBytesMessage();
        message.writeBytes(msg);
        producer.send(message);
    }

    public void testConnect() throws Exception {
        String connect_frame = "CONNECT\nlogin: brianm\npasscode: wombats\nrequest-id: 1\n\n\u0000";
        this.sendFrame(connect_frame);
        String f = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)f.startsWith("CONNECTED"));
        StompTest.assertTrue((f.indexOf("response-id:1") >= 0 ? 1 : 0) != 0);
    }

    public void testSendMessage() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SEND\ndestination:/queue/" + this.getQueueName() + "\n\n" + "Hello World" + "\u0000";
        this.sendFrame(frame);
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertEquals((String)"Hello World", (String)message.getText());
        long tnow = System.currentTimeMillis();
        long tmsg = message.getJMSTimestamp();
        StompTest.assertTrue((Math.abs(tnow - tmsg) < 1000L ? 1 : 0) != 0);
    }

    public void testJMSXGroupIdCanBeSet() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SEND\ndestination:/queue/" + this.getQueueName() + "\n" + "JMSXGroupID: TEST\n\n" + "Hello World" + "\u0000";
        this.sendFrame(frame);
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertEquals((String)"TEST", (String)((ActiveMQTextMessage)message).getGroupID());
    }

    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue, "foo = 'abc'");
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SEND\nfoo:abc\nbar:123\ndestination:/queue/" + this.getQueueName() + "\n\n" + "Hello World" + "\u0000";
        this.sendFrame(frame);
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertEquals((String)"Hello World", (String)message.getText());
        StompTest.assertEquals((String)"foo", (String)"abc", (String)message.getStringProperty("foo"));
        StompTest.assertEquals((String)"bar", (String)"123", (String)message.getStringProperty("bar"));
    }

    public void testSendMessageWithStandardHeaders() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SEND\ncorrelation-id:c123\npriority:3\ntype:t345\nJMSXGroupID:abc\nfoo:abc\nbar:123\ndestination:/queue/" + this.getQueueName() + "\n\n" + "Hello World" + "\u0000";
        this.sendFrame(frame);
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertEquals((String)"Hello World", (String)message.getText());
        StompTest.assertEquals((String)"JMSCorrelationID", (String)"c123", (String)message.getJMSCorrelationID());
        StompTest.assertEquals((String)"getJMSType", (String)"t345", (String)message.getJMSType());
        StompTest.assertEquals((String)"getJMSPriority", (int)3, (int)message.getJMSPriority());
        StompTest.assertEquals((String)"foo", (String)"abc", (String)message.getStringProperty("foo"));
        StompTest.assertEquals((String)"bar", (String)"123", (String)message.getStringProperty("bar"));
        StompTest.assertEquals((String)"JMSXGroupID", (String)"abc", (String)message.getStringProperty("JMSXGroupID"));
        ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message;
        StompTest.assertEquals((String)"GroupID", (String)"abc", (String)amqMessage.getGroupID());
    }

    public void testSubscribeWithAutoAck() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        this.sendMessage(this.getName());
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
    }

    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        this.sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", 2);
        Matcher cl_matcher = cl.matcher(frame);
        StompTest.assertTrue((boolean)cl_matcher.find());
        StompTest.assertEquals((String)"5", (String)cl_matcher.group(1));
        StompTest.assertFalse((boolean)Pattern.compile("type:\\s*null", 2).matcher(frame).find());
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
    }

    public void testSubscribeWithMessageSentWithProperties() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        MessageProducer producer = this.session.createProducer(this.queue);
        TextMessage message = this.session.createTextMessage("Hello World");
        message.setStringProperty("s", "value");
        message.setBooleanProperty("n", false);
        message.setByteProperty("byte", (byte)9);
        message.setDoubleProperty("d", 2.0);
        message.setFloatProperty("f", 6.0f);
        message.setIntProperty("i", 10);
        message.setLongProperty("l", 121L);
        message.setShortProperty("s", (short)12);
        producer.send(message);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
    }

    public void testMessagesAreInOrder() throws Exception {
        int i;
        int ctr = 10;
        String[] data = new String[ctr];
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        for (i = 0; i < ctr; ++i) {
            data[i] = this.getName() + i;
            this.sendMessage(data[i]);
        }
        for (i = 0; i < ctr; ++i) {
            frame = this.receiveFrame(1000L);
            StompTest.assertTrue((String)"Message not in order", (frame.indexOf(data[i]) >= 0 ? 1 : 0) != 0);
        }
        this.waitForFrameToTakeEffect();
        for (i = 0; i < ctr; ++i) {
            data[i] = this.getName() + ":second:" + i;
            this.sendMessage(data[i]);
        }
        for (i = 0; i < ctr; ++i) {
            frame = this.receiveFrame(1000L);
            StompTest.assertTrue((String)"Message not in order", (frame.indexOf(data[i]) >= 0 ? 1 : 0) != 0);
        }
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
    }

    public void testSubscribeWithAutoAckAndSelector() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "selector: foo = 'zzz'\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        this.sendMessage("Ignored message", "foo", "1234");
        this.sendMessage("Real message", "foo", "zzz");
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        StompTest.assertTrue((String)("Should have received the real message but got: " + frame), (frame.indexOf("Real message") > 0 ? 1 : 0) != 0);
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
    }

    public void testSubscribeWithClientAck() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:client\n\n" + "\u0000";
        this.sendFrame(frame);
        this.sendMessage(this.getName());
        frame = this.receiveFrame(10000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        frame = "DISCONNECT\n\n\n\u0000";
        this.sendFrame(frame);
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertTrue((boolean)message.getJMSRedelivered());
    }

    public void testUnsubscribe() throws Exception {
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        frame = this.receiveFrame(100000L);
        StompTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "ack:auto\n\n" + "\u0000";
        this.sendFrame(frame);
        this.sendMessage("first message");
        frame = this.receiveFrame(1000L);
        StompTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        frame = "UNSUBSCRIBE\ndestination:/queue/" + this.getQueueName() + "\n" + "\n\n" + "\u0000";
        this.sendFrame(frame);
        this.waitForFrameToTakeEffect();
        this.sendMessage("second message");
        try {
            frame = this.receiveFrame(1000L);
            log.info("Received frame: " + frame);
            StompTest.fail((String)"No message should have been received since subscription was removed");
        }
        catch (SocketTimeoutException socketTimeoutException) {
            // empty catch block
        }
    }

    public void testTransactionCommit() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        String f = this.receiveFrame(1000L);
        StompTest.assertTrue((boolean)f.startsWith("CONNECTED"));
        frame = "BEGIN\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        frame = "SEND\ndestination:/queue/" + this.getQueueName() + "\n" + "transaction: tx1\n" + "\n\n" + "Hello World" + "\u0000";
        this.sendFrame(frame);
        frame = "COMMIT\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        this.waitForFrameToTakeEffect();
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((String)"Should have received a message", (Object)message);
    }

    public void testTransactionRollback() throws Exception {
        MessageConsumer consumer = this.session.createConsumer(this.queue);
        String frame = "CONNECT\nlogin: brianm\npasscode: wombats\n\n\u0000";
        this.sendFrame(frame);
        String f = this.receiveFrame(1000L);
        StompTest.assertTrue((boolean)f.startsWith("CONNECTED"));
        frame = "BEGIN\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        frame = "SEND\ndestination:/queue/" + this.getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "first message" + "\u0000";
        this.sendFrame(frame);
        frame = "ABORT\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        frame = "BEGIN\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        frame = "SEND\ndestination:/queue/" + this.getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "second message" + "\u0000";
        this.sendFrame(frame);
        frame = "COMMIT\ntransaction: tx1\n\n\n\u0000";
        this.sendFrame(frame);
        this.waitForFrameToTakeEffect();
        TextMessage message = (TextMessage)consumer.receive(1000L);
        StompTest.assertNotNull((Object)message);
        StompTest.assertEquals((String)"second message", (String)message.getText().trim());
    }

    protected void waitForFrameToTakeEffect() throws InterruptedException {
        Thread.sleep(2000L);
    }
}

