/*
 * Decompiled with CFR 0.152.
 */
package com.vaadin.hilla.push;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.vaadin.hilla.AuthenticationUtil;
import com.vaadin.hilla.EndpointInvocationException;
import com.vaadin.hilla.EndpointInvoker;
import com.vaadin.hilla.EndpointSubscription;
import com.vaadin.hilla.push.messages.fromclient.AbstractServerMessage;
import com.vaadin.hilla.push.messages.fromclient.SubscribeMessage;
import com.vaadin.hilla.push.messages.fromclient.UnsubscribeMessage;
import com.vaadin.hilla.push.messages.toclient.AbstractClientMessage;
import com.vaadin.hilla.push.messages.toclient.ClientMessageComplete;
import com.vaadin.hilla.push.messages.toclient.ClientMessageError;
import com.vaadin.hilla.push.messages.toclient.ClientMessageUpdate;
import jakarta.servlet.ServletContext;
import java.security.Principal;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

@Service
public class PushMessageHandler {
    private final EndpointInvoker endpointInvoker;
    ConcurrentHashMap<String, ConcurrentHashMap<String, SubscriptionInfo>> fluxSubscriptionInfos = new ConcurrentHashMap();
    @Autowired
    private ServletContext servletContext;

    public PushMessageHandler(EndpointInvoker endpointInvoker) {
        this.endpointInvoker = endpointInvoker;
    }

    public void handleMessage(String connectionId, AbstractServerMessage message, Consumer<AbstractClientMessage> sender) {
        if (message instanceof SubscribeMessage) {
            this.handleBrowserSubscribe(connectionId, (SubscribeMessage)message, sender);
        } else if (message instanceof UnsubscribeMessage) {
            this.handleBrowserUnsubscribe(connectionId, (UnsubscribeMessage)message);
        } else {
            throw new IllegalArgumentException("Unknown message type: " + message.getClass().getName());
        }
    }

    private void handleBrowserSubscribe(String connectionId, SubscribeMessage message, Consumer<AbstractClientMessage> sender) {
        String fluxId = message.getId();
        if (this.fluxSubscriptionInfos.get(connectionId).containsKey(fluxId)) {
            String msg = "A subscription for flux id " + fluxId + " already exists";
            this.getLogger().error(msg);
            return;
        }
        Class<?> returnType = this.endpointInvoker.getReturnType(message.getEndpointName(), message.getMethodName());
        if (returnType != Flux.class && returnType != EndpointSubscription.class) {
            sender.accept(new ClientMessageError(fluxId, "Method " + message.getEndpointName() + "/" + message.getMethodName() + " is not a Flux nor EndpointSubscription method"));
            return;
        }
        ArrayNode paramsArray = message.getParams();
        ObjectNode paramsObject = paramsArray.objectNode();
        for (int i = 0; i < paramsArray.size(); ++i) {
            paramsObject.set("" + i, paramsArray.get(i));
        }
        Authentication principal = AuthenticationUtil.getSecurityHolderAuthentication();
        Function<String, Boolean> isInRole = AuthenticationUtil.getSecurityHolderRoleChecker();
        try {
            Flux flux;
            Object returnValue = this.endpointInvoker.invoke(message.getEndpointName(), message.getMethodName(), paramsObject, (Principal)principal, isInRole);
            Runnable unsubscribeHandler = null;
            if (returnValue instanceof EndpointSubscription) {
                EndpointSubscription endpointSubscription = (EndpointSubscription)returnValue;
                flux = endpointSubscription.getFlux();
                unsubscribeHandler = endpointSubscription.getOnUnsubscribe();
            } else {
                flux = (Flux)returnValue;
            }
            CompletableFuture<Object> waitForSubscriptionData = new CompletableFuture<Object>();
            Disposable endpointFluxSubscriber = flux.subscribe(item -> this.send(sender, new ClientMessageUpdate(fluxId, item)), error -> waitForSubscriptionData.whenComplete((a, b) -> {
                this.disposeSubscriptionInfo(connectionId, fluxId, false);
                this.send(sender, new ClientMessageError(fluxId, "Exception in Flux"));
                this.getLogger().error("Exception in Flux", error);
            }), () -> waitForSubscriptionData.whenComplete((a, b) -> {
                this.disposeSubscriptionInfo(connectionId, fluxId, false);
                this.send(sender, new ClientMessageComplete(fluxId));
            }));
            this.fluxSubscriptionInfos.get(connectionId).put(fluxId, new SubscriptionInfo(endpointFluxSubscriber, unsubscribeHandler));
            waitForSubscriptionData.complete(null);
            waitForSubscriptionData.complete(null);
        }
        catch (EndpointInvocationException.EndpointNotFoundException e) {
            sender.accept(new ClientMessageError(fluxId, "No such endpoint"));
            return;
        }
        catch (EndpointInvocationException.EndpointHttpException e) {
            sender.accept(new ClientMessageError(fluxId, e.getMessage()));
            return;
        }
    }

    private void send(Consumer<AbstractClientMessage> sender, AbstractClientMessage message) {
        sender.accept(message);
    }

    public void handleBrowserConnect(String connectionId) {
        this.fluxSubscriptionInfos.put(connectionId, new ConcurrentHashMap());
    }

    public void handleBrowserReconnect(String connectionId) {
        this.fluxSubscriptionInfos.putIfAbsent(connectionId, new ConcurrentHashMap());
    }

    public void handleBrowserDisconnect(String connectionId) {
        this.disposeConnectionInfo(connectionId, true);
    }

    private void handleBrowserUnsubscribe(String connectionId, UnsubscribeMessage message) {
        String fluxId = message.getId();
        this.disposeSubscriptionInfo(connectionId, fluxId, true);
    }

    private void disposeConnectionInfo(String connectionId, boolean invokeUnsubscribeListener) {
        ConcurrentHashMap<String, SubscriptionInfo> fluxMap = this.fluxSubscriptionInfos.remove(connectionId);
        if (fluxMap != null) {
            fluxMap.forEach((cid, subscriptionInfo) -> this.dispose((SubscriptionInfo)subscriptionInfo, invokeUnsubscribeListener));
        }
    }

    private void disposeSubscriptionInfo(String connectionId, String subscriptionId, boolean invokeUnsubscribeListener) {
        SubscriptionInfo subscriptionInfo;
        ConcurrentHashMap<String, SubscriptionInfo> fluxMap = this.fluxSubscriptionInfos.get(connectionId);
        if (fluxMap != null && (subscriptionInfo = fluxMap.remove(subscriptionId)) != null) {
            this.dispose(subscriptionInfo, invokeUnsubscribeListener);
        }
    }

    private void dispose(SubscriptionInfo subscriptionInfo, boolean invokeUnsubscribeListener) {
        Runnable unsubscribeHandler;
        subscriptionInfo.getFluxSubscriptionDisposable().dispose();
        if (invokeUnsubscribeListener && (unsubscribeHandler = subscriptionInfo.getUnsubscribeHandler()) != null) {
            unsubscribeHandler.run();
        }
    }

    private Logger getLogger() {
        return LoggerFactory.getLogger(this.getClass());
    }

    static class SubscriptionInfo {
        private final Disposable fluxSubscriptionDisposable;
        private final Runnable unsubscribeHandler;

        private SubscriptionInfo(Disposable fluxSubscriptionDisposable, Runnable unsubscribeHandler) {
            this.fluxSubscriptionDisposable = fluxSubscriptionDisposable;
            this.unsubscribeHandler = unsubscribeHandler;
        }

        private Disposable getFluxSubscriptionDisposable() {
            return this.fluxSubscriptionDisposable;
        }

        private Runnable getUnsubscribeHandler() {
            return this.unsubscribeHandler;
        }
    }
}

