package com.vaadin.hilla.push;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.vaadin.hilla.AuthenticationUtil;
import com.vaadin.hilla.EndpointInvocationException;
import com.vaadin.hilla.EndpointInvoker;
import com.vaadin.hilla.EndpointSubscription;
import com.vaadin.hilla.push.messages.fromclient.AbstractServerMessage;
import com.vaadin.hilla.push.messages.fromclient.SubscribeMessage;
import com.vaadin.hilla.push.messages.fromclient.UnsubscribeMessage;
import com.vaadin.hilla.push.messages.toclient.AbstractClientMessage;
import com.vaadin.hilla.push.messages.toclient.ClientMessageComplete;
import com.vaadin.hilla.push.messages.toclient.ClientMessageError;
import com.vaadin.hilla.push.messages.toclient.ClientMessageUpdate;
import jakarta.servlet.ServletContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

@Service
/* loaded from: input_file:com/vaadin/hilla/push/PushMessageHandler.class */
public class PushMessageHandler {
    private final EndpointInvoker endpointInvoker;
    ConcurrentHashMap<String, ConcurrentHashMap<String, SubscriptionInfo>> fluxSubscriptionInfos = new ConcurrentHashMap<>();

    @Autowired
    private ServletContext servletContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/vaadin/hilla/push/PushMessageHandler$SubscriptionInfo.class */
    public static class SubscriptionInfo {
        private final Disposable fluxSubscriptionDisposable;
        private final Runnable unsubscribeHandler;

        private SubscriptionInfo(Disposable disposable, Runnable runnable) {
            this.fluxSubscriptionDisposable = disposable;
            this.unsubscribeHandler = runnable;
        }

        private Disposable getFluxSubscriptionDisposable() {
            return this.fluxSubscriptionDisposable;
        }

        private Runnable getUnsubscribeHandler() {
            return this.unsubscribeHandler;
        }
    }

    public PushMessageHandler(EndpointInvoker endpointInvoker) {
        this.endpointInvoker = endpointInvoker;
    }

    public void handleMessage(String str, AbstractServerMessage abstractServerMessage, Consumer<AbstractClientMessage> consumer) {
        if (abstractServerMessage instanceof SubscribeMessage) {
            handleBrowserSubscribe(str, (SubscribeMessage) abstractServerMessage, consumer);
        } else {
            if (!(abstractServerMessage instanceof UnsubscribeMessage)) {
                throw new IllegalArgumentException("Unknown message type: " + abstractServerMessage.getClass().getName());
            }
            handleBrowserUnsubscribe(str, (UnsubscribeMessage) abstractServerMessage);
        }
    }

    private void handleBrowserSubscribe(String str, SubscribeMessage subscribeMessage, Consumer<AbstractClientMessage> consumer) {
        Flux flux;
        String id = subscribeMessage.getId();
        if (this.fluxSubscriptionInfos.get(str).containsKey(id)) {
            getLogger().error("A subscription for flux id " + id + " already exists");
            return;
        }
        Class<?> returnType = this.endpointInvoker.getReturnType(subscribeMessage.getEndpointName(), subscribeMessage.getMethodName());
        if (returnType != Flux.class && returnType != EndpointSubscription.class) {
            consumer.accept(new ClientMessageError(id, "Method " + subscribeMessage.getEndpointName() + "/" + subscribeMessage.getMethodName() + " is not a Flux nor EndpointSubscription method"));
            return;
        }
        ArrayNode params = subscribeMessage.getParams();
        ObjectNode objectNode = params.objectNode();
        for (int i = 0; i < params.size(); i++) {
            objectNode.set(i, params.get(i));
        }
        try {
            Object invoke = this.endpointInvoker.invoke(subscribeMessage.getEndpointName(), subscribeMessage.getMethodName(), objectNode, AuthenticationUtil.getSecurityHolderAuthentication(), AuthenticationUtil.getSecurityHolderRoleChecker());
            Runnable runnable = null;
            if (invoke instanceof EndpointSubscription) {
                EndpointSubscription endpointSubscription = (EndpointSubscription) invoke;
                flux = endpointSubscription.getFlux();
                runnable = endpointSubscription.getOnUnsubscribe();
            } else {
                flux = (Flux) invoke;
            }
            CompletableFuture completableFuture = new CompletableFuture();
            this.fluxSubscriptionInfos.get(str).put(id, new SubscriptionInfo(flux.subscribe(obj -> {
                send(consumer, new ClientMessageUpdate(id, obj));
            }, th -> {
                completableFuture.whenComplete((r12, th) -> {
                    disposeSubscriptionInfo(str, id, false);
                    send(consumer, new ClientMessageError(id, "Exception in Flux"));
                    getLogger().error("Exception in Flux", th);
                });
            }, () -> {
                completableFuture.whenComplete((r10, th2) -> {
                    disposeSubscriptionInfo(str, id, false);
                    send(consumer, new ClientMessageComplete(id));
                });
            }), runnable));
            completableFuture.complete(null);
            completableFuture.complete(null);
        } catch (EndpointInvocationException.EndpointAccessDeniedException | EndpointInvocationException.EndpointBadRequestException | EndpointInvocationException.EndpointInternalException e) {
            consumer.accept(new ClientMessageError(id, e.getMessage()));
        } catch (EndpointInvocationException.EndpointNotFoundException e2) {
            consumer.accept(new ClientMessageError(id, "No such endpoint"));
        }
    }

    private void send(Consumer<AbstractClientMessage> consumer, AbstractClientMessage abstractClientMessage) {
        consumer.accept(abstractClientMessage);
    }

    public void handleBrowserConnect(String str) {
        this.fluxSubscriptionInfos.put(str, new ConcurrentHashMap<>());
    }

    public void handleBrowserDisconnect(String str) {
        disposeConnectionInfo(str, true);
    }

    private void handleBrowserUnsubscribe(String str, UnsubscribeMessage unsubscribeMessage) {
        disposeSubscriptionInfo(str, unsubscribeMessage.getId(), true);
    }

    private void disposeConnectionInfo(String str, boolean z) {
        ConcurrentHashMap<String, SubscriptionInfo> remove = this.fluxSubscriptionInfos.remove(str);
        if (remove != null) {
            remove.forEach((str2, subscriptionInfo) -> {
                dispose(subscriptionInfo, z);
            });
        }
    }

    private void disposeSubscriptionInfo(String str, String str2, boolean z) {
        SubscriptionInfo remove;
        ConcurrentHashMap<String, SubscriptionInfo> concurrentHashMap = this.fluxSubscriptionInfos.get(str);
        if (concurrentHashMap == null || (remove = concurrentHashMap.remove(str2)) == null) {
            return;
        }
        dispose(remove, z);
    }

    private void dispose(SubscriptionInfo subscriptionInfo, boolean z) {
        Runnable unsubscribeHandler;
        subscriptionInfo.getFluxSubscriptionDisposable().dispose();
        if (!z || (unsubscribeHandler = subscriptionInfo.getUnsubscribeHandler()) == null) {
            return;
        }
        unsubscribeHandler.run();
    }

    private Logger getLogger() {
        return LoggerFactory.getLogger(getClass());
    }
}
