/*
 * Decompiled with CFR 0.152.
 */
package com.jcloud.jcq.client.common;

import com.jcloud.jcq.client.common.ClientInstance;
import com.jcloud.jcq.client.common.RemotingApiWrapper;
import com.jcloud.jcq.client.consumer.SubscribeConsumer;
import com.jcloud.jcq.common.utils.CommunicationUtils;
import com.jcloud.jcq.communication.core.ChannelWrapper;
import com.jcloud.jcq.communication.portal.CommunicationRequestHandler;
import com.jcloud.jcq.communication.protocol.CommunicationUnit;
import com.jcloud.jcq.protocol.ProtocolSerializer;
import com.jcloud.jcq.protocol.RequestCode;
import com.jcloud.jcq.protocol.broker.PushMessageRequest;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestHandler
implements CommunicationRequestHandler {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
    private final RemotingApiWrapper remotingApiWrapper = RemotingApiWrapper.getInstance();
    private static final RequestHandler instance = new RequestHandler();
    ConcurrentMap<String, ClientInstance> clientInstances = new ConcurrentHashMap<String, ClientInstance>();

    private RequestHandler() {
        this.registerRequestCode2Processor();
    }

    public static RequestHandler getInstance() {
        return instance;
    }

    @Override
    public CommunicationUnit processRequest(ChannelWrapper channelWrapper, CommunicationUnit communicationUnit) throws Exception {
        String sourceAddress = CommunicationUtils.parseChannelRemoteAddr(channelWrapper.getChannel());
        if (logger.isDebugEnabled()) {
            logger.debug("got request code:{} from address:{}", (Object)RequestCode.getName(communicationUnit.getCode()), (Object)sourceAddress);
        }
        switch (communicationUnit.getCode()) {
            case 105: {
                this.handlePushMessage(communicationUnit);
                break;
            }
        }
        return null;
    }

    @Override
    public boolean rejectRequest() {
        return false;
    }

    public void registerClientInstance(ClientInstance clientInstance) {
        this.clientInstances.put(clientInstance.getInstanceId(), clientInstance);
    }

    public void unregisterClientInstance(ClientInstance clientInstance) {
        this.clientInstances.remove(clientInstance.getInstanceId());
    }

    private void registerRequestCode2Processor() {
        HashMap<Short, ExecutorService> map = new HashMap<Short, ExecutorService>();
        map.put((short)105, null);
        this.remotingApiWrapper.registerRequestCode2Processor(map, this);
    }

    private void handlePushMessage(CommunicationUnit communicationUnit) {
        PushMessageRequest pushMessageRequest = ProtocolSerializer.decode(communicationUnit.getData(), PushMessageRequest.class);
        String consumerId = pushMessageRequest.getConsumerId();
        ClientInstance clientInstance = (ClientInstance)this.clientInstances.get(consumerId);
        if (clientInstance == null) {
            logger.warn("requestHandler not have consumer:{} registered, cannot handle push request:{}", (Object)consumerId, (Object)pushMessageRequest);
            return;
        }
        if (!(clientInstance instanceof SubscribeConsumer)) {
            logger.warn("clientInstance:{} is not instanceof SubscribeConsumer, cannot handle push request:{}", (Object)consumerId, (Object)pushMessageRequest);
            return;
        }
        ((SubscribeConsumer)clientInstance).receiveMessages(pushMessageRequest.getMessages(), pushMessageRequest.getAckIndex(), pushMessageRequest.getBrokerGroupId());
    }
}

