Skip to content

Commit

Permalink
feat: Add H2 support (#2061)
Browse files Browse the repository at this point in the history
* feat: port H2 work with latest main

* fix: linting errors

* refactor: adjust support for headers and set testing

* test: add testing for h2

* refactor: make http2 session handle shorter

* feat: add support for sending body over http2

* feat: ensure support for streams over H2

* refactor: remove noisy logs

* feat: support 100 continue

* feat: support for iterators

* feat: add support for Blobs

* refactor: adapt contracts to h2 support

* refactor: cleanup

* feat: support for content-length

* refactor: body write

* test: refactor check continue test

* fix: bad check for headers

* fix: bad change

* chore: add http2 alpn test (#34)

* chore: add http2 alpn test using fastify

* chore: update to test https 1 with http2

* chore: update alpn test to return server request alpn protocol and http version

* chore: add alpn with body

* fix: remove fastify from package json

* refactor: remove leftover

* test: ensure dispatch feature

* feat(h2): support connect

* fix: pass signal down the road

* test: ensure stream works as expected

* test: ensure pipeline  works as expected

* test: ensure upgrade fails

* test: ensure destroy works as expected

* feat: allow to disable H2 calls upon request

* fix: linting

* feat: support GOAWAY frame (server-side)

* refactor; use h2 constants

* feat: initial shape of concurrent stream handling

* refactor: header processing

* chore: http/2 benchmark (#35)

Co-authored-by: Carlos Fuentes <me@metcoder.dev>

* refactor: adjust accordingly to review

* fix: add missing error handler for socket

* refactor: headers handling

* feat: initial concurrent stream support

* fix: lint

* refactor: adjust several pieces

* fix: support h2 headers for fetch

* feat: enhance h2 for fetch

* refactor: apply review suggestions

Co-authored-by: Robert Nagy <ronagy@icloud.com>

* refactor: set allowh2 to false

* fix: linting

* refactor: implement kHTTPConnVersion symbol

* test: adjust testing

* feat: buil factory

* fix: rebase

* feat: enhance TS types for maxConcurrent streams

* test: move fetch tests to fetch folder

* feat: add experimental warning

* test: refactor suite

* refactor: apply several changes

* test: split tests between v20 and lower

---------

Co-authored-by: Michael Kaufman <2073135+mkaufmaner@users.noreply.github.com>
Co-authored-by: Robert Nagy <ronagy@icloud.com>
Co-authored-by: Matteo Collina <hello@matteocollina.com>
  • Loading branch information
4 people committed Sep 8, 2023
1 parent ea4f257 commit a8a5d0a
Show file tree
Hide file tree
Showing 22 changed files with 2,846 additions and 90 deletions.
306 changes: 306 additions & 0 deletions benchmarks/benchmark-http2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
'use strict'

const { connect } = require('http2')
const { createSecureContext } = require('tls')
const os = require('os')
const path = require('path')
const { readFileSync } = require('fs')
const { table } = require('table')
const { Writable } = require('stream')
const { WritableStream } = require('stream/web')
const { isMainThread } = require('worker_threads')

const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..')

const ca = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'ca.pem'), 'utf8')
const servername = 'agent1'

const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1
const errorThreshold = parseInt(process.env.ERROR_THRESHOLD, 10) || 3
const connections = parseInt(process.env.CONNECTIONS, 10) || 50
const pipelining = parseInt(process.env.PIPELINING, 10) || 10
const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100
const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0
const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0
const dest = {}

if (process.env.PORT) {
dest.port = process.env.PORT
dest.url = `https://localhost:${process.env.PORT}`
} else {
dest.url = 'https://localhost'
dest.socketPath = path.join(os.tmpdir(), 'undici.sock')
}

const httpsBaseOptions = {
ca,
servername,
protocol: 'https:',
hostname: 'localhost',
method: 'GET',
path: '/',
query: {
frappucino: 'muffin',
goat: 'scone',
pond: 'moose',
foo: ['bar', 'baz', 'bal'],
bool: true,
numberKey: 256
},
...dest
}

const http2ClientOptions = {
secureContext: createSecureContext({ ca }),
servername
}

const undiciOptions = {
path: '/',
method: 'GET',
headersTimeout,
bodyTimeout
}

const Class = connections > 1 ? Pool : Client
const dispatcher = new Class(httpsBaseOptions.url, {
allowH2: true,
pipelining,
connections,
connect: {
rejectUnauthorized: false,
ca,
servername
},
...dest
})

setGlobalDispatcher(new Agent({
allowH2: true,
pipelining,
connections,
connect: {
rejectUnauthorized: false,
ca,
servername
}
}))

class SimpleRequest {
constructor (resolve) {
this.dst = new Writable({
write (chunk, encoding, callback) {
callback()
}
}).on('finish', resolve)
}

onConnect (abort) { }

onHeaders (statusCode, headers, resume) {
this.dst.on('drain', resume)
}

onData (chunk) {
return this.dst.write(chunk)
}

onComplete () {
this.dst.end()
}

onError (err) {
throw err
}
}

function makeParallelRequests (cb) {
return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb)))
}

function printResults (results) {
// Sort results by least performant first, then compare relative performances and also printing padding
let last

const rows = Object.entries(results)
// If any failed, put on the top of the list, otherwise order by mean, ascending
.sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean))
.map(([name, result]) => {
if (!result.success) {
return [name, result.size, 'Errored', 'N/A', 'N/A']
}

// Calculate throughput and relative performance
const { size, mean, standardError } = result
const relative = last !== 0 ? (last / mean - 1) * 100 : 0

// Save the slowest for relative comparison
if (typeof last === 'undefined') {
last = mean
}

return [
name,
size,
`${((connections * 1e9) / mean).toFixed(2)} req/sec`,
${((standardError / mean) * 100).toFixed(2)} %`,
relative > 0 ? `+ ${relative.toFixed(2)} %` : '-'
]
})

console.log(results)

// Add the header row
rows.unshift(['Tests', 'Samples', 'Result', 'Tolerance', 'Difference with slowest'])

return table(rows, {
columns: {
0: {
alignment: 'left'
},
1: {
alignment: 'right'
},
2: {
alignment: 'right'
},
3: {
alignment: 'right'
},
4: {
alignment: 'right'
}
},
drawHorizontalLine: (index, size) => index > 0 && index < size,
border: {
bodyLeft: '│',
bodyRight: '│',
bodyJoin: '│',
joinLeft: '|',
joinRight: '|',
joinJoin: '|'
}
})
}

const experiments = {
'http2 - request' () {
return makeParallelRequests(resolve => {
connect(dest.url, http2ClientOptions, (session) => {
const headers = {
':path': '/',
':method': 'GET',
':scheme': 'https',
':authority': `localhost:${dest.port}`
}

const request = session.request(headers)

request.pipe(
new Writable({
write (chunk, encoding, callback) {
callback()
}
})
).on('finish', resolve)
})
})
},
'undici - pipeline' () {
return makeParallelRequests(resolve => {
dispatcher
.pipeline(undiciOptions, data => {
return data.body
})
.end()
.pipe(
new Writable({
write (chunk, encoding, callback) {
callback()
}
})
)
.on('finish', resolve)
})
},
'undici - request' () {
return makeParallelRequests(resolve => {
try {
dispatcher.request(undiciOptions).then(({ body }) => {
body
.pipe(
new Writable({
write (chunk, encoding, callback) {
callback()
}
})
)
.on('error', (err) => {
console.log('undici - request - dispatcher.request - body - error', err)
})
.on('finish', () => {
resolve()
})
})
} catch (err) {
console.error('undici - request - dispatcher.request - requestCount', err)
}
})
},
'undici - stream' () {
return makeParallelRequests(resolve => {
return dispatcher
.stream(undiciOptions, () => {
return new Writable({
write (chunk, encoding, callback) {
callback()
}
})
})
.then(resolve)
})
},
'undici - dispatch' () {
return makeParallelRequests(resolve => {
dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve))
})
}
}

if (process.env.PORT) {
// fetch does not support the socket
experiments['undici - fetch'] = () => {
return makeParallelRequests(resolve => {
fetch(dest.url, {}).then(res => {
res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } }))
}).catch(console.log)
})
}
}

async function main () {
const { cronometro } = await import('cronometro')

cronometro(
experiments,
{
iterations,
errorThreshold,
print: false
},
(err, results) => {
if (err) {
throw err
}

console.log(printResults(results))
dispatcher.destroy()
}
)
}

if (isMainThread) {
main()
} else {
module.exports = main
}

0 comments on commit a8a5d0a

Please sign in to comment.