package dev.hilla.push;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.vaadin.flow.component.dependency.NpmPackage;
import dev.hilla.EndpointInvoker;
import dev.hilla.push.messages.fromclient.AbstractServerMessage;
import dev.hilla.push.messages.fromclient.PushCloseMessage;
import dev.hilla.push.messages.fromclient.PushConnectMessage;
import dev.hilla.push.messages.toclient.AbstractClientMessage;
import dev.hilla.push.messages.toclient.ClientMessageComplete;
import dev.hilla.push.messages.toclient.ClientMessageError;
import dev.hilla.push.messages.toclient.ClientMessageUpdate;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

@Controller
@NpmPackage.Container({@NpmPackage(value = "rsocket-websocket-client", version = "0.0.27"), @NpmPackage(value = "@types/rsocket-websocket-client", version = "0.0.4")})
/* loaded from: input_file:dev/hilla/push/RSocketController.class */
public class RSocketController {
    private final ObjectMapper objectMapper;
    private final EndpointInvoker endpointInvoker;
    private Map<String, Disposable> closeHandlers = new ConcurrentHashMap();

    public RSocketController(ObjectMapper objectMapper, EndpointInvoker endpointInvoker) {
        this.objectMapper = objectMapper;
        this.endpointInvoker = endpointInvoker;
    }

    @MessageMapping({"rs"})
    private Flux<AbstractClientMessage> requestResponse(Flux<AbstractServerMessage> flux) {
        Sinks.Many directBestEffort = Sinks.many().multicast().directBestEffort();
        getLogger().info("Received request-response request: {}", flux);
        flux.subscribe(abstractServerMessage -> {
            handleClientMessage(abstractServerMessage, directBestEffort);
        });
        flux.doOnError(th -> {
            System.out.println(th);
        });
        flux.doOnComplete(() -> {
            System.out.println("Complete");
        });
        flux.doOnCancel(() -> {
            System.out.println("Cancel");
        });
        return directBestEffort.asFlux();
    }

    private void handleClientMessage(AbstractServerMessage abstractServerMessage, Sinks.Many<AbstractClientMessage> many) {
        if (!(abstractServerMessage instanceof PushConnectMessage)) {
            if (!(abstractServerMessage instanceof PushCloseMessage)) {
                throw new IllegalArgumentException("Unknown message type: " + abstractServerMessage.getClass().getName());
            }
            handleClose((PushCloseMessage) abstractServerMessage, many);
        } else {
            try {
                handleConnect((PushConnectMessage) abstractServerMessage, many);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void handleClose(PushCloseMessage pushCloseMessage, Sinks.Many<AbstractClientMessage> many) {
    }

    private void handleConnect(PushConnectMessage pushConnectMessage, Sinks.Many<AbstractClientMessage> many) throws Exception {
        if (this.endpointInvoker.getReturnType(pushConnectMessage.getEndpointName(), pushConnectMessage.getMethodName()) != Flux.class) {
            many.tryEmitError(new IllegalArgumentException("Method " + pushConnectMessage.getEndpointName() + "/" + pushConnectMessage.getMethodName() + " is not a Flux method"));
        } else {
            this.closeHandlers.put(pushConnectMessage.getId(), ((Flux) this.endpointInvoker.invoke(pushConnectMessage.getEndpointName(), pushConnectMessage.getMethodName(), pushConnectMessage.getParams(), null, str -> {
                return false;
            })).subscribe(obj -> {
                many.emitNext(new ClientMessageUpdate(pushConnectMessage.getId(), obj), (signalType, emitResult) -> {
                    if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
                        return true;
                    }
                    System.err.println("failed to send update message: " + emitResult);
                    return false;
                });
            }, th -> {
                th.printStackTrace();
                this.closeHandlers.remove(pushConnectMessage.getId());
                many.emitNext(new ClientMessageError(pushConnectMessage.getId()), (signalType, emitResult) -> {
                    if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
                        return true;
                    }
                    System.err.println("failed to send error message: " + emitResult);
                    return false;
                });
            }, () -> {
                this.closeHandlers.remove(pushConnectMessage.getId());
                many.emitNext(new ClientMessageComplete(pushConnectMessage.getId()), (signalType, emitResult) -> {
                    if (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
                        return true;
                    }
                    System.err.println("failed to send complete message: " + emitResult);
                    return false;
                });
            }));
        }
    }

    private Logger getLogger() {
        return LoggerFactory.getLogger(getClass());
    }
}
