Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-Sent Events implementation needs improvements #12670

Closed
Totalus opened this issue Oct 27, 2023 · 5 comments
Closed

Server-Sent Events implementation needs improvements #12670

Totalus opened this issue Oct 27, 2023 · 5 comments
Labels
needs triage This issue has not been looked into type: enhancement 🐺

Comments

@Totalus
Copy link

Totalus commented Oct 27, 2023

Server-Sent Events implementation in Nest throught the @Sse decorator has flaws that makes it hard to use. I’ve created this issue to highlight the flaws and to also share a workaround to implement SSE without the decorator’s issues.

That being said, feel free to comment. Maybe there are some good reasons I am not aware of why the @Sse decorator behaves like it does, or options I am not aware of.

Problems with the decorator

When using @Sse decorator, the connection with the client is established before the handler is called (wether your handler is async or not according to my observations, see also #12260). As a result, throwing an HttpException inside the handler will not return an HTTP error as you would expect, but instead will send an SSE error type message, with the data of this message being the error name. Any additional data you might pass (ex: error code, etc.) when throwing the exception is lost. This behavior makes it harder to deal on the client side as you don’t know until after the connection is established if the call actually failed or not and you don't get the error details.

Also, an empty SSE message is always sent just after a connection is established, not sure why that is and if it is intended or not.

Expectation

As a user, I want my endpoint to return an HTTP error code if the request is invalid for a reason or another and only establish the SSE connection if prior validation is complete without error. I also want my additional error data to be forwarded to my client so it can properly handle the error.

Documentation lacking details

I find the documentation page for SSE lacks some important details regarding how the connection is established before the handler is called and how the error handling works. Also, the provided example generates the data inside the request handler directly. In practice, chances are you want to establish a SSE connection to notify your clients of events that will happen in the future and are generated outside of the request handler. Finding out how to achieve that took more reaseach and brought me out of NestJS documentation. It would be nice to have those details directly in Nest's SSE documentation page.

Workaround

Because of the various issues, I took a step aside and implemented SSE without using the @Sse decorator which turned out to be fairly easy and provides much more control over the execution flow. A quick search made me realize I could directly used express response object to implement this in a normal @Get request.

In the background, SSE is just an HTTP connection that is kept alive in which you write messages as plain text following a standard format. All you need to establish the connection is to set the righ headers:

@Get()
async mySseEndpoint(Res() response: Response) {

    // Whatever validation you want to make can be done
    // here and you can throw errors as expected.

    // Set the headers to indicate this is an SSE connection
    response.set({
        'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
        'Connection': 'keep-alive',
        'Content-Type': 'text/event-stream',
    })
    
    // Flusing the headers will establish the connection
    response.flushHeaders();
    
    // From this point, the connection with the client is established.
    // We can send data with response.write(), encoding the data following SSE standard.
}

Here’s a complete example in which I am using observables, which makes it very close to what you would have in an @Sse decorated endpoint.

import { Controller, Get, InternalServerErrorException, Query, Res, MessageEvent } from "@nestjs/common";
import { Subject } from "rxjs";
import { Response } from "express";

@Controller('sse')
export class SseExample {

    /** List of connected clients */
    connectedClients = new Map<string, { close: () => void, subject: Subject<MessageEvent> }>();

    @Get()
    async example(@Res() response: Response) {
        
        let validationFailed = false;

        /* make some validation */

        if(validationFailed)
            throw new InternalServerErrorException({ message: 'Query failed', error: 100, status: 500 });

        // Create a subject for this client in which we'll push our data
        const subject = new Subject<MessageEvent>();

        // Create an observer that will take the data pushed to the subject and
        // write it to our connection stream in the right format
        const observer = {
            next: (msg: MessageEvent) => {
                // Called when data is pushed to the subject using subject.next()
                // Encode the message as SSE (see https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)

                // Here's an example of what it could look like, assuming msg.data is an object
                // If msg.data is not an object, you should adjust accordingly

                if(msg.type)
                    response.write(`event: ${msg.type}\n`)
                if(msg.id)
                    response.write(`id: ${msg.id}\n`)
                if(msg.retry)
                    response.write(`retry: ${msg.retry}\n`)

                response.write(`data: ${JSON.stringify(msg.data)}\n\n`); 
            },
            complete: () =>  { console.log(`observer.complete`) },
            error: (err: any) =>  { console.log(`observer.error: ${err}`) },
        };

        // Attach the observer to the subject
        subject.subscribe(observer);

        // Add the client to our client list
        const clientKey = String(Math.random()); // String that identifies your client
        this.connectedClients.set(clientKey,
            {
                close: () => { response.end() },   // Will allow us to close the connection if needed
                subject,                           // Subject related to this client
            }
        );

        // Handle connection closed
        response.on('close', () => {
            console.log(`Closing connection for client ${clientKey}`);
            subject.complete(); // End the observable stream
            this.connectedClients.delete(clientKey); // Remove client from the list
            response.end(); // Close connection (unsure if this is really requried, to release the resources)
        })

        // Send headers to establish SSE connection
        response.set({
            'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
            'Connection': 'keep-alive',
            'Content-Type': 'text/event-stream',
        })
        
        response.flushHeaders();

        // From this point, the connection with the client is established.
        // We can send data using the subject.next(MessageEvent) function.
        // See the sendDataToClient() function below.
    }

    /** Send a SSE message to the specified client */
    sendDataToClient(clientId: string, message: MessageEvent) {
        this.connectedClients.get(clientId)?.subject.next(message);
    }
}
@Totalus Totalus added needs triage This issue has not been looked into type: enhancement 🐺 labels Oct 27, 2023
@kamilmysliwiec
Copy link
Member

Because of the various issues, I took a step aside and implemented SSE without using the @sse decorator which turned out to be fairly easy and provides much more control over the execution flow

This is completely fine and recommended when you need full control over the execution flow. @Sse decorator serves as a "simplified approach" that lets you set things up quickly but that's all

@ghost
Copy link

ghost commented Nov 8, 2023

could this be an example in the samples repo?

@kamilmysliwiec
Copy link
Member

Definitely! Would you like to add an example to the existing sse sample project?

@ghost
Copy link

ghost commented Nov 9, 2023

I was hoping the author would, since they wrote the code. I'm currently happy with the decorator.

@thesambayo
Copy link

    sendDataToClient(clientId: string, message: MessageEvent) {
        this.connectedClients.get(clientId)?.subject.next(message);
    }

@Totalus , I can't thank you enough for sharing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into type: enhancement 🐺
Projects
None yet
Development

No branches or pull requests

3 participants