/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.net.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Named;
import org.unidal.net.transport.ClientTransportDescriptor;
import org.unidal.net.transport.TransportHub;
import org.unidal.net.transport.handler.ClientStateHandler;

@Named(type=ClientTransportHandler.class, instantiationStrategy="per-lookup")
public class ClientTransportHandler
implements Threads.Task,
LogEnabled {
    private ClientTransportDescriptor m_descriptor;
    private ClientChannelManager m_channelManager;
    private AtomicBoolean m_active = new AtomicBoolean(true);
    private CountDownLatch m_latch = new CountDownLatch(1);
    private CountDownLatch m_warmup = new CountDownLatch(1);
    private Logger m_logger;

    public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
        this.m_latch.await(timeout, unit);
    }

    public void awaitWarmup() {
        try {
            this.m_warmup.await();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    @Override
    public String getName() {
        return this.getClass().getSimpleName();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.m_channelManager = new ClientChannelManager();
            long expireTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(1L);
            while (this.m_channelManager.getActiveChannel() == null && System.currentTimeMillis() < expireTime) {
                TimeUnit.MILLISECONDS.sleep(1L);
            }
            this.m_warmup.countDown();
            this.run0();
        }
        catch (Throwable e) {
            this.m_logger.error(e.getMessage(), e);
            this.m_warmup.countDown();
        }
        finally {
            if (this.m_channelManager != null) {
                this.m_channelManager.close();
            }
            this.m_latch.countDown();
        }
    }

    private void run0() throws InterruptedException {
        ByteBufAllocator allocator = this.m_descriptor.getByteBufAllocator();
        int initialCapacity = 4096;
        ByteBuf buf = allocator.buffer(initialCapacity);
        TransportHub hub = this.m_descriptor.getHub();
        while (this.m_active.get()) {
            Channel channel = this.m_channelManager.getActiveChannel();
            if (channel != null && channel.isWritable()) {
                while (hub.fill(buf)) {
                    channel.writeAndFlush((Object)buf);
                    buf = allocator.buffer(initialCapacity);
                    if (channel.isWritable()) continue;
                }
            }
            TimeUnit.MILLISECONDS.sleep(1L);
        }
        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3L);
        while (true) {
            Channel channel;
            if ((channel = this.m_channelManager.getActiveChannel()) != null && channel.isWritable()) {
                while (hub.fill(buf)) {
                    channel.writeAndFlush((Object)buf);
                    buf = allocator.buffer(initialCapacity);
                    if (channel.isWritable()) continue;
                }
            }
            if (System.currentTimeMillis() >= end) {
                throw new InterruptedException("Timeout with messages left in the queue!");
            }
            TimeUnit.MILLISECONDS.sleep(1L);
        }
    }

    public void setDescriptor(ClientTransportDescriptor descriptor) {
        this.m_descriptor = descriptor;
    }

    @Override
    public void shutdown() {
        this.m_active.set(false);
    }

    private class ClientChannelManager {
        private List<InetSocketAddress> m_addresses;
        private Bootstrap m_bootstrap;
        private Channel m_channel;
        private int m_index = -1;
        private ChannelFuture m_primary;
        private long m_failBackCheckInternal = 2000L;
        private long m_lastCheckTime;

        public ClientChannelManager() {
            Bootstrap bootstrap = new Bootstrap();
            Class<? extends Channel> channelClass = ClientTransportHandler.this.m_descriptor.getChannelClass();
            ((Bootstrap)bootstrap.group(ClientTransportHandler.this.m_descriptor.getGroup())).channel(channelClass);
            bootstrap.handler((ChannelHandler)new ClientChannelInitializer());
            for (Map.Entry<ChannelOption<Object>, Object> e : ClientTransportHandler.this.m_descriptor.getOptions().entrySet()) {
                bootstrap.option(e.getKey(), e.getValue());
            }
            this.m_bootstrap = bootstrap;
        }

        public void close() {
            ClientTransportHandler.this.m_descriptor.getGroup().shutdownGracefully();
            if (this.m_channel != null) {
                this.m_channel.close();
            }
        }

        public Channel getActiveChannel() throws InterruptedException {
            List<InetSocketAddress> addresses = ClientTransportHandler.this.m_descriptor.getRemoteAddresses();
            if (!addresses.equals(this.m_addresses)) {
                this.m_addresses = addresses;
                for (int i = 0; i < addresses.size(); ++i) {
                    InetSocketAddress address = addresses.get(i);
                    ChannelFuture future = this.m_bootstrap.connect((SocketAddress)address).sync();
                    if (!future.isSuccess()) continue;
                    if (this.m_channel != null) {
                        this.m_channel.close();
                    }
                    this.m_channel = future.channel();
                    this.m_index = i;
                    break;
                }
                return this.m_channel;
            }
            if (this.m_channel == null || this.m_channel.closeFuture().isSuccess()) {
                // empty if block
            }
            if (this.m_index > 0) {
                if (this.m_primary == null) {
                    long now = System.currentTimeMillis();
                    if (this.m_lastCheckTime + this.m_failBackCheckInternal < now) {
                        InetSocketAddress address = this.m_addresses.get(this.m_index);
                        this.m_lastCheckTime = now;
                        this.m_primary = this.m_bootstrap.connect((SocketAddress)address);
                    }
                } else {
                    Channel channel = this.m_primary.channel();
                    if (channel.isOpen() && channel.isActive()) {
                        this.m_channel = channel;
                        this.m_index = 0;
                    }
                }
            }
            if (this.m_channel != null && this.m_channel.isOpen() && this.m_channel.isActive()) {
                return this.m_channel;
            }
            return null;
        }
    }

    private class ClientChannelInitializer
    extends ChannelInitializer<Channel> {
        private ClientChannelInitializer() {
        }

        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new ChannelHandler[]{new ClientStateHandler(ClientTransportHandler.this.m_descriptor.getName())});
            for (Map.Entry<String, ChannelHandler> e : ClientTransportHandler.this.m_descriptor.getHandlers().entrySet()) {
                pipeline.addLast(e.getKey(), e.getValue());
            }
        }
    }
}

