Skip to content

Commit a306db0

Browse files
committedAug 1, 2023
fix(webtransport): add proper framing
WebTransport being a stream-based protocol, the chunking boundaries are not necessarily preserved. That's why we need a header indicating the type of the payload (plain text or binary) and its length. We will use a format inspired by the WebSocket frame: - first bit indicates whether the payload is binary - the next 7 bits are either: - 125 or less: that's the length of the payload - 126: the next 2 bytes represent the length of the payload - 127: the next 8 bytes represent the length of the payload Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#decoding_payload_length Related: - #687 - #688
1 parent 7dd1350 commit a306db0

File tree

5 files changed

+122
-116
lines changed

5 files changed

+122
-116
lines changed
 

‎lib/server.ts

+20-18
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ import type { CookieSerializeOptions } from "cookie";
1616
import type { CorsOptions, CorsOptionsDelegate } from "cors";
1717
import type { Duplex } from "stream";
1818
import { WebTransport } from "./transports/webtransport";
19-
import { TextDecoder } from "util";
19+
import { createPacketDecoderStream } from "engine.io-parser";
2020

2121
const debug = debugModule("engine");
2222

2323
const kResponseHeaders = Symbol("responseHeaders");
24-
const TEXT_DECODER = new TextDecoder();
2524

2625
type Transport = "polling" | "websocket";
2726

@@ -149,15 +148,13 @@ type Middleware = (
149148
next: (err?: any) => void
150149
) => void;
151150

152-
function parseSessionId(handshake: string) {
153-
if (handshake.startsWith("0{")) {
154-
try {
155-
const parsed = JSON.parse(handshake.substring(1));
156-
if (typeof parsed.sid === "string") {
157-
return parsed.sid;
158-
}
159-
} catch (e) {}
160-
}
151+
function parseSessionId(data: string) {
152+
try {
153+
const parsed = JSON.parse(data);
154+
if (typeof parsed.sid === "string") {
155+
return parsed.sid;
156+
}
157+
} catch (e) {}
161158
}
162159

163160
export abstract class BaseServer extends EventEmitter {
@@ -536,7 +533,11 @@ export abstract class BaseServer extends EventEmitter {
536533
}
537534

538535
const stream = result.value;
539-
const reader = stream.readable.getReader();
536+
const transformStream = createPacketDecoderStream(
537+
this.opts.maxHttpBufferSize,
538+
"nodebuffer"
539+
);
540+
const reader = stream.readable.pipeThrough(transformStream).getReader();
540541

541542
// reading the first packet of the stream
542543
const { value, done } = await reader.read();
@@ -546,12 +547,13 @@ export abstract class BaseServer extends EventEmitter {
546547
}
547548

548549
clearTimeout(timeout);
549-
const handshake = TEXT_DECODER.decode(value);
550550

551-
// handshake is either
552-
// "0" => new session
553-
// '0{"sid":"xxxx"}' => upgrade
554-
if (handshake === "0") {
551+
if (value.type !== "open") {
552+
debug("invalid WebTransport handshake");
553+
return session.close();
554+
}
555+
556+
if (value.data === undefined) {
555557
const transport = new WebTransport(session, stream, reader);
556558

557559
// note: we cannot use "this.generateId()", because there is no "req" argument
@@ -572,7 +574,7 @@ export abstract class BaseServer extends EventEmitter {
572574
return;
573575
}
574576

575-
const sid = parseSessionId(handshake);
577+
const sid = parseSessionId(value.data);
576578

577579
if (!sid) {
578580
debug("invalid WebTransport handshake");

‎lib/transports/webtransport.ts

+28-46
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
11
import { Transport } from "../transport";
22
import debugModule from "debug";
3+
import { createPacketEncoderStream } from "engine.io-parser";
34

45
const debug = debugModule("engine:webtransport");
56

6-
const BINARY_HEADER = Buffer.of(54);
7-
8-
function shouldIncludeBinaryHeader(packet, encoded) {
9-
// 48 === "0".charCodeAt(0) (OPEN packet type)
10-
// 54 === "6".charCodeAt(0) (NOOP packet type)
11-
return (
12-
packet.type === "message" &&
13-
typeof packet.data !== "string" &&
14-
encoded[0] >= 48 &&
15-
encoded[0] <= 54
16-
);
17-
}
18-
197
/**
208
* Reference: https://developer.mozilla.org/en-US/docs/Web/API/WebTransport_API
219
*/
@@ -24,24 +12,24 @@ export class WebTransport extends Transport {
2412

2513
constructor(private readonly session, stream, reader) {
2614
super({ _query: { EIO: "4" } });
27-
this.writer = stream.writable.getWriter();
15+
16+
const transformStream = createPacketEncoderStream();
17+
transformStream.readable.pipeTo(stream.writable);
18+
this.writer = transformStream.writable.getWriter();
19+
2820
(async () => {
29-
let binaryFlag = false;
30-
while (true) {
31-
const { value, done } = await reader.read();
32-
if (done) {
33-
debug("session is closed");
34-
break;
35-
}
36-
debug("received chunk: %o", value);
37-
if (!binaryFlag && value.byteLength === 1 && value[0] === 54) {
38-
binaryFlag = true;
39-
continue;
21+
try {
22+
while (true) {
23+
const { value, done } = await reader.read();
24+
if (done) {
25+
debug("session is closed");
26+
break;
27+
}
28+
debug("received chunk: %o", value);
29+
this.onPacket(value);
4030
}
41-
this.onPacket(
42-
this.parser.decodePacketFromBinary(value, binaryFlag, "nodebuffer")
43-
);
44-
binaryFlag = false;
31+
} catch (e) {
32+
debug("error while reading: %s", e.message);
4533
}
4634
})();
4735

@@ -58,26 +46,20 @@ export class WebTransport extends Transport {
5846
return true;
5947
}
6048

61-
send(packets) {
49+
async send(packets) {
6250
this.writable = false;
6351

64-
for (let i = 0; i < packets.length; i++) {
65-
const packet = packets[i];
66-
const isLast = i + 1 === packets.length;
67-
68-
this.parser.encodePacketToBinary(packet, (data) => {
69-
if (shouldIncludeBinaryHeader(packet, data)) {
70-
debug("writing binary header");
71-
this.writer.write(BINARY_HEADER);
72-
}
73-
debug("writing chunk: %o", data);
74-
this.writer.write(data);
75-
if (isLast) {
76-
this.writable = true;
77-
this.emit("drain");
78-
}
79-
});
52+
try {
53+
for (let i = 0; i < packets.length; i++) {
54+
const packet = packets[i];
55+
await this.writer.write(packet);
56+
}
57+
} catch (e) {
58+
debug("error while writing: %s", e.message);
8059
}
60+
61+
this.writable = true;
62+
this.emit("drain");
8163
}
8264

8365
doClose(fn) {

‎package-lock.json

+25-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
"cookie": "~0.4.1",
4040
"cors": "~2.8.5",
4141
"debug": "~4.3.1",
42-
"engine.io-parser": "~5.1.0",
42+
"engine.io-parser": "~5.2.1",
4343
"ws": "~8.11.0"
4444
},
4545
"devDependencies": {

‎test/webtransport.mjs

+48-43
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,15 @@ function setup(opts, cb) {
8585
const reader = stream.readable.getReader();
8686
const writer = stream.writable.getWriter();
8787

88-
engine.on("connection", (socket) => {
88+
engine.on("connection", async (socket) => {
89+
await reader.read(); // header
90+
await reader.read(); // payload (handshake)
91+
8992
cb({ engine, h3Server, socket, client, stream, reader, writer });
9093
});
9194

95+
await writer.write(Uint8Array.of(1));
9296
await writer.write(TEXT_ENCODER.encode("0"));
93-
await reader.read(); // handshake
9497
});
9598
}
9699

@@ -130,19 +133,20 @@ describe("WebTransport", () => {
130133
const writer = stream.writable.getWriter();
131134

132135
(async function read() {
133-
const { done, value } = await reader.read();
136+
const header = await reader.read();
134137

135-
if (done) {
136-
return;
137-
}
138+
expect(header.value).to.eql(Uint8Array.of(107));
139+
140+
const { value } = await reader.read();
138141

139142
const handshake = TEXT_DECODER.decode(value);
140143
expect(handshake.startsWith("0{")).to.be(true);
141144

142145
partialDone();
143146
})();
144147

145-
await writer.write(TEXT_ENCODER.encode("0"));
148+
writer.write(Uint8Array.of(1));
149+
writer.write(TEXT_ENCODER.encode("0"));
146150
});
147151
});
148152

@@ -194,6 +198,10 @@ describe("WebTransport", () => {
194198
const writer = stream.writable.getWriter();
195199

196200
(async function read() {
201+
const header = await reader.read();
202+
203+
expect(header.value).to.eql(Uint8Array.of(6));
204+
197205
const { done, value } = await reader.read();
198206

199207
if (done) {
@@ -206,10 +214,13 @@ describe("WebTransport", () => {
206214
partialDone();
207215
})();
208216

217+
await writer.write(Uint8Array.of(31));
209218
await writer.write(
210219
TEXT_ENCODER.encode(`0{"sid":"${payload.sid}"}`)
211220
);
221+
await writer.write(Uint8Array.of(6));
212222
await writer.write(TEXT_ENCODER.encode(`2probe`));
223+
await writer.write(Uint8Array.of(1));
213224
await writer.write(TEXT_ENCODER.encode(`5`));
214225
});
215226
}
@@ -281,10 +292,14 @@ describe("WebTransport", () => {
281292
},
282293
async ({ engine, h3Server, reader, writer }) => {
283294
for (let i = 0; i < 5; i++) {
295+
const header = await reader.read();
296+
expect(header.value).to.eql(Uint8Array.of(1));
297+
284298
const packet = await reader.read();
285299
const value = TEXT_DECODER.decode(packet.value);
286300
expect(value).to.eql("2");
287301

302+
writer.write(Uint8Array.of(1));
288303
writer.write(TEXT_ENCODER.encode("3"));
289304
}
290305

@@ -338,6 +353,7 @@ describe("WebTransport", () => {
338353
success(engine, h3Server, done);
339354
});
340355

356+
writer.write(Uint8Array.of(6));
341357
writer.write(TEXT_ENCODER.encode("4hello"));
342358
});
343359
});
@@ -346,6 +362,9 @@ describe("WebTransport", () => {
346362
setup({}, async ({ engine, h3Server, socket, reader }) => {
347363
socket.send("hello");
348364

365+
const header = await reader.read();
366+
expect(header.value).to.eql(Uint8Array.of(6));
367+
349368
const { value } = await reader.read();
350369
const decoded = TEXT_DECODER.decode(value);
351370
expect(decoded).to.eql("4hello");
@@ -363,6 +382,7 @@ describe("WebTransport", () => {
363382
success(engine, h3Server, done);
364383
});
365384

385+
writer.write(Uint8Array.of(131));
366386
writer.write(Uint8Array.of(1, 2, 3));
367387
});
368388
});
@@ -371,64 +391,49 @@ describe("WebTransport", () => {
371391
setup({}, async ({ engine, h3Server, socket, reader }) => {
372392
socket.send(Buffer.of(1, 2, 3));
373393

374-
const { value } = await reader.read();
375-
expect(value).to.eql(Uint8Array.of(1, 2, 3));
376-
377-
success(engine, h3Server, done);
378-
});
379-
});
380-
381-
it("should send some binary data (client to server) (with binary flag)", (done) => {
382-
setup({}, async ({ engine, h3Server, socket, writer }) => {
383-
socket.on("data", (data) => {
384-
expect(Buffer.isBuffer(data)).to.be(true);
385-
expect(data).to.eql(Buffer.of(48, 1, 2, 3));
386-
387-
success(engine, h3Server, done);
388-
});
389-
390-
writer.write(Uint8Array.of(54));
391-
writer.write(Uint8Array.of(48, 1, 2, 3));
392-
});
393-
});
394-
395-
it("should send some binary data (server to client) (with binary flag)", (done) => {
396-
setup({}, async ({ engine, h3Server, socket, reader }) => {
397-
socket.send(Buffer.of(48, 1, 2, 3));
398-
399394
const header = await reader.read();
400-
expect(header.value).to.eql(Uint8Array.of(54));
395+
expect(header.value).to.eql(Uint8Array.of(131));
401396

402397
const { value } = await reader.read();
403-
expect(value).to.eql(Uint8Array.of(48, 1, 2, 3));
398+
expect(value).to.eql(Uint8Array.of(1, 2, 3));
404399

405400
success(engine, h3Server, done);
406401
});
407402
});
408403

409-
it("should send some binary data (client to server) (binary flag)", (done) => {
404+
it("should send some big binary data (client to server)", (done) => {
410405
setup({}, async ({ engine, h3Server, socket, writer }) => {
406+
const payload = Buffer.allocUnsafe(1e6);
407+
411408
socket.on("data", (data) => {
412409
expect(Buffer.isBuffer(data)).to.be(true);
413-
expect(data).to.eql(Buffer.of(54));
410+
expect(data).to.eql(payload);
414411

415412
success(engine, h3Server, done);
416413
});
417414

418-
writer.write(Uint8Array.of(54));
419-
writer.write(Uint8Array.of(54));
415+
writer.write(Uint8Array.of(255, 0, 0, 0, 0, 0, 15, 66, 64));
416+
writer.write(payload);
420417
});
421418
});
422419

423-
it("should send some binary data (server to client) (binary flag)", (done) => {
420+
it("should send some big binary data (server to client)", (done) => {
424421
setup({}, async ({ engine, h3Server, socket, reader }) => {
425-
socket.send(Buffer.of(54));
422+
const payload = Buffer.allocUnsafe(1e6);
423+
424+
socket.send(payload);
426425

427426
const header = await reader.read();
428-
expect(header.value).to.eql(Uint8Array.of(54));
427+
expect(header.value).to.eql(
428+
Uint8Array.of(255, 0, 0, 0, 0, 0, 15, 66, 64)
429+
);
429430

430-
const { value } = await reader.read();
431-
expect(value).to.eql(Uint8Array.of(54));
431+
const chunk1 = await reader.read();
432+
// the size of the chunk is implementation-specific (maxDatagramSize)
433+
expect(chunk1.value).to.eql(payload.slice(0, 1228));
434+
435+
const chunk2 = await reader.read();
436+
expect(chunk2.value).to.eql(payload.slice(1228, 2456));
432437

433438
success(engine, h3Server, done);
434439
});

0 commit comments

Comments
 (0)
Please sign in to comment.