Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Web Stream API #5286

Merged
merged 10 commits into from
Jan 29, 2024
3 changes: 3 additions & 0 deletions docs/Reference/Errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- [FST_ERR_LOG_INVALID_DESTINATION](#fst_err_log_invalid_destination)
- [FST_ERR_LOG_INVALID_LOGGER](#fst_err_log_invalid_logger)
- [FST_ERR_REP_INVALID_PAYLOAD_TYPE](#fst_err_rep_invalid_payload_type)
- [FST_ERR_REP_RESPONSE_BODY_CONSUMED](#fst_err_rep_response_body_consumed)
- [FST_ERR_REP_ALREADY_SENT](#fst_err_rep_already_sent)
- [FST_ERR_REP_SENT_VALUE](#fst_err_rep_sent_value)
- [FST_ERR_SEND_INSIDE_ONERR](#fst_err_send_inside_onerr)
Expand Down Expand Up @@ -312,6 +313,8 @@ Below is a table with all the error codes that Fastify uses.
| <a id="fst_err_log_invalid_destination">FST_ERR_LOG_INVALID_DESTINATION</a> | The logger does not accept the specified destination. | Use a `'stream'` or a `'file'` as the destination. | [#1168](https://github.com/fastify/fastify/pull/1168) |
| <a id="fst_err_log_invalid_logger">FST_ERR_LOG_INVALID_LOGGER</a> | The logger should have all these methods: `'info'`, `'error'`, `'debug'`, `'fatal'`, `'warn'`, `'trace'`, `'child'`. | Use a logger with all the required methods. | [#4520](https://github.com/fastify/fastify/pull/4520) |
| <a id="fst_err_rep_invalid_payload_type">FST_ERR_REP_INVALID_PAYLOAD_TYPE</a> | Reply payload can be either a `string` or a `Buffer`. | Use a `string` or a `Buffer` for the payload. | [#1168](https://github.com/fastify/fastify/pull/1168) |
| <a id="fst_err_rep_response_body_consumed">FST_ERR_REP_RESPONSE_BODY_CONSUMED</a> | Using `Response` as reply payload
but the body is being consumed. | Make sure you don't consume the `Response.body` | [#5286](https://github.com/fastify/fastify/pull/5286) |
| <a id="fst_err_rep_already_sent">FST_ERR_REP_ALREADY_SENT</a> | A response was already sent. | - | [#1336](https://github.com/fastify/fastify/pull/1336) |
| <a id="fst_err_rep_sent_value">FST_ERR_REP_SENT_VALUE</a> | The only possible value for `reply.sent` is `true`. | - | [#1336](https://github.com/fastify/fastify/pull/1336) |
| <a id="fst_err_send_inside_onerr">FST_ERR_SEND_INSIDE_ONERR</a> | You cannot use `send` inside the `onError` hook. | - | [#1348](https://github.com/fastify/fastify/pull/1348) |
Expand Down
2 changes: 1 addition & 1 deletion docs/Reference/Hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ fastify.addHook('onSend', (request, reply, payload, done) => {
> `null`.

Note: If you change the payload, you may only change it to a `string`, a
`Buffer`, a `stream`, or `null`.
`Buffer`, a `stream`, a `ReadableStream`, a `Response`, or `null`.


### onResponse
Expand Down
48 changes: 48 additions & 0 deletions docs/Reference/Reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
- [Strings](#strings)
- [Streams](#streams)
- [Buffers](#buffers)
- [ReadableStream](#send-readablestream)
- [Response](#send-response)
- [Errors](#errors)
- [Type of the final payload](#type-of-the-final-payload)
- [Async-Await and Promises](#async-await-and-promises)
Expand Down Expand Up @@ -756,6 +758,52 @@ fastify.get('/streams', function (request, reply) {
})
```

#### ReadableStream
<a id="send-readablestream"></a>

`ReadableStream` will be treated as a node stream mentioned above,
the content is considered to be pre-serialized, so they will be
sent unmodified without response validation.

```js
const fs = require('node:fs')
const { ReadableStream } = require('node:stream/web')
fastify.get('/streams', function (request, reply) {
const stream = fs.createReadStream('some-file')
reply.header('Content-Type', 'application/octet-stream')
reply.send(ReadableStream.from(stream))
})
```

#### Response
<a id="send-response"></a>

`Response` allows to manage the reply payload, status code and
headers in one place. The payload provided inside `Response` is
considered to be pre-serialized, so they will be sent unmodified
without response validation.

Plese be aware when using `Response`, the status code and headers
will not directly reflect to `reply.statusCode` and `reply.getHeaders()`.
Such behavior is based on `Response` only allow `readonly` status
code and headers. The data is not allow to be bi-direction editing,
and may confuse when checking the `payload` in `onSend` hooks.

```js
const fs = require('node:fs')
const { ReadableStream } = require('node:stream/web')
fastify.get('/streams', function (request, reply) {
const stream = fs.createReadStream('some-file')
const readableStream = ReadableStream.from(stream)
const response = new Response(readableStream, {
status: 200,
headers: { 'content-type': 'application/octet-stream' }
})
reply.send(response)
})
```


#### Errors
<a id="errors"></a>

Expand Down
4 changes: 4 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ const codes = {
500,
TypeError
),
FST_ERR_REP_RESPONSE_BODY_CONSUMED: createError(
'FST_ERR_REP_RESPONSE_BODY_CONSUMED',
'Response.body is already consumed.'
),
FST_ERR_REP_ALREADY_SENT: createError(
'FST_ERR_REP_ALREADY_SENT',
'Reply was already sent, did you forget to "return reply" in "%s" (%s)?'
Expand Down
57 changes: 55 additions & 2 deletions lib/reply.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const eos = require('node:stream').finished
const Readable = require('node:stream').Readable

const {
kFourOhFourContext,
Expand Down Expand Up @@ -44,6 +45,7 @@ const CONTENT_TYPE = {
}
const {
FST_ERR_REP_INVALID_PAYLOAD_TYPE,
FST_ERR_REP_RESPONSE_BODY_CONSUMED,
FST_ERR_REP_ALREADY_SENT,
FST_ERR_REP_SENT_VALUE,
FST_ERR_SEND_INSIDE_ONERR,
Expand All @@ -55,6 +57,8 @@ const {
} = require('./errors')
const { FSTDEP010, FSTDEP013, FSTDEP019, FSTDEP020 } = require('./warnings')

const toString = Object.prototype.toString

function Reply (res, request, log) {
this.raw = res
this[kReplySerializer] = null
Expand Down Expand Up @@ -163,7 +167,14 @@ Reply.prototype.send = function (payload) {
const hasContentType = contentType !== undefined

if (payload !== null) {
if (typeof payload.pipe === 'function') {
if (
// node:stream
typeof payload.pipe === 'function' ||
// node:stream/web
typeof payload.getReader === 'function' ||
// Response
toString.call(payload) === '[object Response]'
Comment on lines +174 to +176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not create a symbol kResponse and set it as an attribute of the payload once, so we dont need to call toString multiple times?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can skip it in .send but we can reduce one time in onSendEnd.
Note that due to the nature of Response, it is an immutable object. Which means every mutation (like status or headers) on it requires to re-create a new Response. Setting an attribute on it will be useless somehow.

) {
onSendHook(this, payload)
return this
}
Expand Down Expand Up @@ -570,7 +581,6 @@ function safeWriteHead (reply, statusCode) {
function onSendEnd (reply, payload) {
const res = reply.raw
const req = reply.request
const statusCode = res.statusCode

// we check if we need to update the trailers header and set it
if (reply[kReplyTrailers] !== null) {
Expand All @@ -586,6 +596,17 @@ function onSendEnd (reply, payload) {
reply.header('Trailer', header.trim())
}

// since Response contain status code, we need to update before
// any action that used statusCode
const isResponse = toString.call(payload) === '[object Response]'
if (isResponse) {
// https://developer.mozilla.org/en-US/docs/Web/API/Response/status
if (typeof payload.status === 'number') {
reply.code(payload.status)
}
}
const statusCode = res.statusCode

if (payload === undefined || payload === null) {
// according to https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2
// we cannot send a content-length for 304 and 204, and all status code
Expand Down Expand Up @@ -617,11 +638,38 @@ function onSendEnd (reply, payload) {
return
}

// node:stream
if (typeof payload.pipe === 'function') {
sendStream(payload, res, reply)
return
}

// node:stream/web
if (typeof payload.getReader === 'function') {
sendWebStream(payload, res, reply)
return
}

// Response
if (isResponse) {
// https://developer.mozilla.org/en-US/docs/Web/API/Response/headers
if (typeof payload.headers === 'object' && typeof payload.headers.forEach === 'function') {
for (const [headerName, headerValue] of payload.headers) {
reply.header(headerName, headerValue)
}
}

// https://developer.mozilla.org/en-US/docs/Web/API/Response/body
if (payload.body != null) {
if (payload.bodyUsed) {
throw new FST_ERR_REP_RESPONSE_BODY_CONSUMED()
}
// Response.body always a ReadableStream
sendWebStream(payload.body, res, reply)
}
return
}

if (typeof payload !== 'string' && !Buffer.isBuffer(payload)) {
throw new FST_ERR_REP_INVALID_PAYLOAD_TYPE(typeof payload)
}
Expand Down Expand Up @@ -654,6 +702,11 @@ function logStreamError (logger, err, res) {
}
}

function sendWebStream (payload, res, reply) {
const nodeStream = Readable.fromWeb(payload)
sendStream(nodeStream, res, reply)
}

function sendStream (payload, res, reply) {
let sourceOpen = true
let errorLogged = false
Expand Down
24 changes: 17 additions & 7 deletions test/internals/errors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const errors = require('../../lib/errors')
const { readFileSync } = require('node:fs')
const { resolve } = require('node:path')

test('should expose 78 errors', t => {
test('should expose 79 errors', t => {
t.plan(1)
const exportedKeys = Object.keys(errors)
let counter = 0
Expand All @@ -14,11 +14,11 @@ test('should expose 78 errors', t => {
counter++
}
}
t.equal(counter, 78)
t.equal(counter, 79)
})

test('ensure name and codes of Errors are identical', t => {
t.plan(78)
t.plan(79)
const exportedKeys = Object.keys(errors)
for (const key of exportedKeys) {
if (errors[key].name === 'FastifyError') {
Expand Down Expand Up @@ -337,6 +337,16 @@ test('FST_ERR_REP_INVALID_PAYLOAD_TYPE', t => {
t.ok(error instanceof TypeError)
})

test('FST_ERR_REP_RESPONSE_BODY_CONSUMED', t => {
t.plan(5)
const error = new errors.FST_ERR_REP_RESPONSE_BODY_CONSUMED()
t.equal(error.name, 'FastifyError')
t.equal(error.code, 'FST_ERR_REP_RESPONSE_BODY_CONSUMED')
t.equal(error.message, 'Response.body is already consumed.')
t.equal(error.statusCode, 500)
t.ok(error instanceof Error)
})

test('FST_ERR_REP_ALREADY_SENT', t => {
t.plan(5)
const error = new errors.FST_ERR_REP_ALREADY_SENT('/hello', 'GET')
Expand Down Expand Up @@ -818,7 +828,7 @@ test('FST_ERR_LISTEN_OPTIONS_INVALID', t => {
})

test('Ensure that all errors are in Errors.md TOC', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const exportedKeys = Object.keys(errors)
Expand All @@ -830,7 +840,7 @@ test('Ensure that all errors are in Errors.md TOC', t => {
})

test('Ensure that non-existing errors are not in Errors.md TOC', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const matchRE = / {4}- \[([A-Z0-9_]+)\]\(#[a-z0-9_]+\)/g
Expand All @@ -843,7 +853,7 @@ test('Ensure that non-existing errors are not in Errors.md TOC', t => {
})

test('Ensure that all errors are in Errors.md documented', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const exportedKeys = Object.keys(errors)
Expand All @@ -855,7 +865,7 @@ test('Ensure that all errors are in Errors.md documented', t => {
})

test('Ensure that non-existing errors are not in Errors.md documented', t => {
t.plan(78)
t.plan(79)
const errorsMd = readFileSync(resolve(__dirname, '../../docs/Reference/Errors.md'), 'utf8')

const matchRE = /<a id="[0-9a-zA-Z_]+">([0-9a-zA-Z_]+)<\/a>/g
Expand Down