/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ageiport.processor.core.eventbus.http;

import com.alibaba.ageiport.common.concurrent.ThreadPoolUtil;
import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatchResponse;
import com.alibaba.ageiport.processor.core.eventbus.http.HttpEventBusOptions;
import com.alibaba.ageiport.processor.core.eventbus.local.async.AsyncEventBus;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import com.alibaba.fastjson.JSON;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import java.util.EventListener;
import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;

public class HttpEventBusAgent
extends AbstractVerticle {
    private static final Logger logger = LoggerFactory.getLogger(HttpEventBusAgent.class);
    private AgeiPort ageiPort;
    private HttpEventBusOptions options;
    private ExecutorService executorService;
    private AsyncEventBus eventBus;

    public HttpEventBusAgent(AgeiPort ageiPort, HttpEventBusOptions options) {
        this.ageiPort = ageiPort;
        this.options = options;
        this.executorService = ThreadPoolUtil.createExecutor((String)"eb-agent", (int)options.getEventHandleCorePoolSize(), (int)options.getEventHandleMaxPoolSize(), new ArrayBlockingQueue(options.getEventHandleQueueSize()));
        this.eventBus = new AsyncEventBus(this.executorService);
    }

    public void start() {
        logger.info("AGEIPort HttpEventBus Agent start");
        HttpServer httpServer = this.vertx.createHttpServer();
        httpServer.requestHandler(serverRequest -> {
            if ("/event".equals(serverRequest.uri())) {
                serverRequest.body(event -> {
                    if (event.succeeded()) {
                        String requestJson = ((Buffer)event.result()).toString();
                        logger.info("server receive:{}", new Object[]{requestJson});
                        try {
                            TaskStageEvent stageEvent = (TaskStageEvent)JsonUtil.toObject((String)requestJson, TaskStageEvent.class);
                            this.eventBus.post(stageEvent);
                            HttpDispatchResponse dispatchResponse = new HttpDispatchResponse(true);
                            String responseJson = JsonUtil.toJsonString((Object)dispatchResponse);
                            serverRequest.response().setStatusCode(200).end(responseJson);
                        }
                        catch (Throwable e) {
                            logger.error("consume request json failed, requestJson:{}", new Object[]{requestJson, e});
                            HttpDispatchResponse dispatchResponse = new HttpDispatchResponse(false);
                            String responseJson = JsonUtil.toJsonString((Object)dispatchResponse);
                            serverRequest.response().setStatusCode(200).end(responseJson);
                        }
                    } else {
                        logger.error("consume request error, {}", event.cause());
                        HttpDispatchResponse dispatchResponse = new HttpDispatchResponse(false);
                        String responseJson = JsonUtil.toJsonString((Object)dispatchResponse);
                        serverRequest.response().setStatusCode(200).end(responseJson);
                    }
                });
                return;
            }
            if ("/event/ping".equals(serverRequest.uri())) {
                logger.info("server uri:{}", new Object[]{serverRequest.uri()});
                serverRequest.body(event -> {
                    logger.info("server event result:{}", new Object[]{event.succeeded()});
                    if (event.succeeded()) {
                        HashMap<String, Comparable<Boolean>> result = new HashMap<String, Comparable<Boolean>>();
                        result.put("success", Boolean.valueOf(true));
                        result.put("timestamp", Long.valueOf(System.currentTimeMillis()));
                        String chunk = JSON.toJSONString(result);
                        serverRequest.response().setStatusCode(200).end(chunk);
                    } else {
                        logger.error("consume request error, {}", event.cause());
                        HttpDispatchResponse dispatchResponse = new HttpDispatchResponse(false);
                        String responseJson = JsonUtil.toJsonString((Object)dispatchResponse);
                        serverRequest.response().setStatusCode(200).end(responseJson);
                    }
                });
                return;
            }
            logger.error("not found, url:{}", new Object[]{serverRequest.uri()});
            HashMap<String, Object> result = new HashMap<String, Object>();
            result.put("success", false);
            result.put("errorMessage", "404 not found");
            String chunk = JSON.toJSONString(result);
            serverRequest.response().setStatusCode(404).end(chunk);
        });
        httpServer.listen(this.options.getPort().intValue());
        logger.info("AGEIPort HttpEventBus Agent start finished");
    }

    public void register(EventListener listener) {
        this.eventBus.register(listener);
    }

    public void unregister(EventListener listener) {
        this.eventBus.unregister(listener);
    }
}

