/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ageiport.processor.core.eventbus.http;

import com.alibaba.ageiport.common.logger.Logger;
import com.alibaba.ageiport.common.logger.LoggerFactory;
import com.alibaba.ageiport.common.utils.JsonUtil;
import com.alibaba.ageiport.common.utils.StringUtils;
import com.alibaba.ageiport.processor.core.AgeiPort;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatchResponse;
import com.alibaba.ageiport.processor.core.dispatcher.http.HttpDispatcher;
import com.alibaba.ageiport.processor.core.eventbus.http.HttpEventBusAgent;
import com.alibaba.ageiport.processor.core.eventbus.http.HttpEventBusOptions;
import com.alibaba.ageiport.processor.core.model.core.impl.MainTask;
import com.alibaba.ageiport.processor.core.spi.eventbus.EventBus;
import com.alibaba.ageiport.processor.core.spi.task.monitor.TaskStageEvent;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import java.util.EventListener;
import java.util.EventObject;

public class HttpEventBus
implements EventBus {
    public static final String URL = "/event";
    public static final String HEALTH_URL = "/event/ping";
    private static final Logger logger = LoggerFactory.getLogger(HttpDispatcher.class);
    private AgeiPort ageiPort;
    HttpClient httpClient;
    HttpEventBusAgent agent;
    HttpEventBusOptions options;

    public HttpEventBus(AgeiPort ageiPort, HttpEventBusOptions options) {
        this.ageiPort = ageiPort;
        this.options = options;
        this.agent = new HttpEventBusAgent(ageiPort, options);
        Vertx vertx = ageiPort.getBean(Vertx.class, s -> Vertx.vertx(), ageiPort);
        vertx.deployVerticle((Verticle)this.agent);
        this.httpClient = vertx.createHttpClient();
    }

    @Override
    public void register(EventListener listener) {
        this.agent.register(listener);
    }

    @Override
    public void unregister(EventListener listener) {
        this.agent.unregister(listener);
    }

    @Override
    public void post(EventObject eventObject) {
        String body = JsonUtil.toJsonString((Object)eventObject);
        logger.info("http event bus send:{}", new Object[]{body});
        TaskStageEvent taskStageEvent = (TaskStageEvent)eventObject;
        MainTask mainTask = this.ageiPort.getTaskServerClient().getMainTask(taskStageEvent.getMainTaskId());
        RequestOptions requestOptions = new RequestOptions();
        String host = mainTask.getHost();
        requestOptions.setHost(host).setPort(this.options.getPort()).setMethod(HttpMethod.POST).setURI(URL).setTimeout(3000L);
        String message = StringUtils.format((String)"main:{}, sub:{}, host:{}, stage:{}", (Object[])new Object[]{taskStageEvent.getMainTaskId(), taskStageEvent.getSubTaskId(), host, taskStageEvent.getStage()});
        this.httpClient.request(requestOptions, e -> {
            if (e.succeeded()) {
                HttpClientRequest httpClientRequest = (HttpClientRequest)e.result();
                httpClientRequest.send(body, asyncResult -> {
                    if (asyncResult.succeeded()) {
                        if (((HttpClientResponse)asyncResult.result()).statusCode() == 200) {
                            HttpClientResponse response = (HttpClientResponse)asyncResult.result();
                            response.bodyHandler(bodyResult -> {
                                String resultJson = bodyResult.toString();
                                HttpDispatchResponse dispatchResponse = (HttpDispatchResponse)JsonUtil.toObject((String)resultJson, HttpDispatchResponse.class);
                                if (dispatchResponse != null && Boolean.TRUE.equals(dispatchResponse.getSuccess())) {
                                    logger.info("post event success, {}", new Object[]{message});
                                } else {
                                    logger.error("post event failed, message:{}, resultJson:{}", new Object[]{message, resultJson});
                                }
                            });
                        } else {
                            logger.error("post response failed, send error, {}, statusCode:{}", new Object[]{message, ((HttpClientResponse)asyncResult.result()).statusCode()});
                        }
                    } else {
                        logger.error("post response failed, send failed, {}", new Object[]{message, asyncResult.cause()});
                    }
                });
            } else {
                logger.error("post request failed, {}, body:{}", new Object[]{message, body, e.cause()});
            }
        });
    }
}

