-
Notifications
You must be signed in to change notification settings - Fork 559
/
sse.ts
81 lines (73 loc) 路 2.88 KB
/
sse.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import { ExecutionResult } from 'graphql';
import { isAsyncIterable } from '@envelop/core';
import { getResponseInitByRespectingErrors } from '../../error.js';
import { FetchAPI, MaybeArray } from '../../types.js';
import { ResultProcessor, ResultProcessorInput } from '../types.js';
import { jsonStringifyResultWithoutInternals } from './stringify.js';
export function getSSEProcessor(): ResultProcessor {
return function processSSEResult(result: ResultProcessorInput, fetchAPI: FetchAPI): Response {
let pingIntervalMs = 12_000;
// for testing the pings, reduce the timeout
if (globalThis.process?.env?.NODE_ENV === 'test') {
pingIntervalMs = 300;
}
const headersInit = {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
'Content-Encoding': 'none',
};
const responseInit = getResponseInitByRespectingErrors(result, headersInit, true);
let iterator: AsyncIterator<MaybeArray<ExecutionResult>>;
let pingInterval: number;
const textEncoder = new fetchAPI.TextEncoder();
const readableStream = new fetchAPI.ReadableStream({
start(controller) {
// always start with a ping because some browsers dont accept a header flush
// causing the fetch to stall until something is streamed through the response
controller.enqueue(textEncoder.encode(':\n\n'));
// ping client every 12 seconds to keep the connection alive
pingInterval = setInterval(() => {
if (!controller.desiredSize) {
clearInterval(pingInterval);
return;
}
controller.enqueue(textEncoder.encode(':\n\n'));
}, pingIntervalMs) as unknown as number;
if (isAsyncIterable(result)) {
iterator = result[Symbol.asyncIterator]();
} else {
let finished = false;
iterator = {
next: () => {
if (finished) {
return Promise.resolve({ done: true, value: null });
}
finished = true;
return Promise.resolve({ done: false, value: result });
},
};
}
},
async pull(controller) {
const { done, value } = await iterator.next();
if (value != null) {
controller.enqueue(textEncoder.encode(`event: next\n`));
const chunk = jsonStringifyResultWithoutInternals(value);
controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`));
}
if (done) {
controller.enqueue(textEncoder.encode(`event: complete\n`));
controller.enqueue(textEncoder.encode(`data:\n\n`));
clearInterval(pingInterval);
controller.close();
}
},
async cancel(e) {
clearInterval(pingInterval);
await iterator.return?.(e);
},
});
return new fetchAPI.Response(readableStream, responseInit);
};
}