From 7b2b77adca730e516c1b187092374a01de7f0f56 Mon Sep 17 00:00:00 2001 From: Gar Date: Wed, 18 May 2022 13:27:41 -0700 Subject: [PATCH] deps: make-fetch-happen@10.1.5 * cache integrity and size events so late listeners still get them * pass expected integrity to cacache * pass integrityEmitter to cacache to avoid a redundant integrity stream * remove in-memory buffering in favor of full time streaming --- .../make-fetch-happen/lib/cache/entry.js | 138 ++++++------------ .../make-fetch-happen/lib/pipeline.js | 41 ++++++ node_modules/make-fetch-happen/lib/remote.js | 13 +- node_modules/make-fetch-happen/package.json | 8 +- package-lock.json | 18 +-- package.json | 2 +- 6 files changed, 109 insertions(+), 111 deletions(-) create mode 100644 node_modules/make-fetch-happen/lib/pipeline.js diff --git a/node_modules/make-fetch-happen/lib/cache/entry.js b/node_modules/make-fetch-happen/lib/cache/entry.js index ae2ad8c7667f..7a7572ba030c 100644 --- a/node_modules/make-fetch-happen/lib/cache/entry.js +++ b/node_modules/make-fetch-happen/lib/cache/entry.js @@ -1,21 +1,16 @@ const { Request, Response } = require('minipass-fetch') const Minipass = require('minipass') -const MinipassCollect = require('minipass-collect') const MinipassFlush = require('minipass-flush') -const MinipassPipeline = require('minipass-pipeline') const cacache = require('cacache') const url = require('url') +const CachingMinipassPipeline = require('../pipeline.js') const CachePolicy = require('./policy.js') const cacheKey = require('./key.js') const remote = require('../remote.js') const hasOwnProperty = (obj, prop) => Object.prototype.hasOwnProperty.call(obj, prop) -// maximum amount of data we will buffer into memory -// if we'll exceed this, we switch to streaming -const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB - // allow list for request headers that will be written to the cache index // note: we will also store any request headers // that are named in a response's vary header @@ -256,13 +251,12 @@ class CacheEntry { } const size = this.response.headers.get('content-length') - const fitsInMemory = !!size && Number(size) < MAX_MEM_SIZE - const shouldBuffer = this.options.memoize !== false && fitsInMemory const cacheOpts = { algorithms: this.options.algorithms, metadata: getMetadata(this.request, this.response, this.options), size, - memoize: fitsInMemory && this.options.memoize, + integrity: this.options.integrity, + integrityEmitter: this.response.body.hasIntegrityEmitter && this.response.body, } let body = null @@ -275,52 +269,31 @@ class CacheEntry { cacheWriteReject = reject }) - body = new MinipassPipeline(new MinipassFlush({ + body = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, new MinipassFlush({ flush () { return cacheWritePromise }, })) - - let abortStream, onResume - if (shouldBuffer) { - // if the result fits in memory, use a collect stream to gather - // the response and write it to cacache while also passing it through - // to the user - onResume = () => { - const collector = new MinipassCollect.PassThrough() - abortStream = collector - collector.on('collect', (data) => { - // TODO if the cache write fails, log a warning but return the response anyway - cacache.put(this.options.cachePath, this.key, data, cacheOpts) - .then(cacheWriteResolve, cacheWriteReject) - }) - body.unshift(collector) - body.unshift(this.response.body) - } - } else { - // if it does not fit in memory, create a tee stream and use - // that to pipe to both the cache and the user simultaneously - onResume = () => { - const tee = new Minipass() - const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts) - abortStream = cacheStream - tee.pipe(cacheStream) - // TODO if the cache write fails, log a warning but return the response anyway - cacheStream.promise().then(cacheWriteResolve, cacheWriteReject) - body.unshift(tee) - body.unshift(this.response.body) - } + // this is always true since if we aren't reusing the one from the remote fetch, we + // are using the one from cacache + body.hasIntegrityEmitter = true + + const onResume = () => { + const tee = new Minipass() + const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts) + // re-emit the integrity and size events on our new response body so they can be reused + cacheStream.on('integrity', i => body.emit('integrity', i)) + cacheStream.on('size', s => body.emit('size', s)) + // stick a flag on here so downstream users will know if they can expect integrity events + tee.pipe(cacheStream) + // TODO if the cache write fails, log a warning but return the response anyway + cacheStream.promise().then(cacheWriteResolve, cacheWriteReject) + body.unshift(tee) + body.unshift(this.response.body) } body.once('resume', onResume) body.once('end', () => body.removeListener('resume', onResume)) - this.response.body.on('error', (err) => { - // the abortStream will either be a MinipassCollect if we buffer - // or a cacache write stream, either way be sure to listen for - // errors from the actual response and avoid writing data that we - // know to be invalid to the cache - abortStream.destroy(err) - }) } else { await cacache.index.insert(this.options.cachePath, this.key, null, cacheOpts) } @@ -331,7 +304,7 @@ class CacheEntry { // the header anyway this.response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath)) this.response.headers.set('x-local-cache-key', encodeURIComponent(this.key)) - this.response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream') + this.response.headers.set('x-local-cache-mode', 'stream') this.response.headers.set('x-local-cache-status', status) this.response.headers.set('x-local-cache-time', new Date().toISOString()) const newResponse = new Response(body, { @@ -346,9 +319,6 @@ class CacheEntry { // use the cached data to create a response and return it async respond (method, options, status) { let response - const size = Number(this.response.headers.get('content-length')) - const fitsInMemory = !!size && size < MAX_MEM_SIZE - const shouldBuffer = this.options.memoize !== false && fitsInMemory if (method === 'HEAD' || [301, 308].includes(this.response.status)) { // if the request is a HEAD, or the response is a redirect, // then the metadata in the entry already includes everything @@ -358,66 +328,44 @@ class CacheEntry { // we're responding with a full cached response, so create a body // that reads from cacache and attach it to a new Response const body = new Minipass() - const removeOnResume = () => body.removeListener('resume', onResume) - let onResume - if (shouldBuffer) { - onResume = async () => { - removeOnResume() - try { - const content = await cacache.get.byDigest( + const headers = { ...this.policy.responseHeaders() } + const onResume = () => { + const cacheStream = cacache.get.stream.byDigest( + this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } + ) + cacheStream.on('error', async (err) => { + cacheStream.pause() + if (err.code === 'EINTEGRITY') { + await cacache.rm.content( this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } ) - body.end(content) - } catch (err) { - if (err.code === 'EINTEGRITY') { - await cacache.rm.content( - this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } - ) - } - if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { - await CacheEntry.invalidate(this.request, this.options) - } - body.emit('error', err) } - } - } else { - onResume = () => { - const cacheStream = cacache.get.stream.byDigest( - this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } - ) - cacheStream.on('error', async (err) => { - cacheStream.pause() - if (err.code === 'EINTEGRITY') { - await cacache.rm.content( - this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize } - ) - } - if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { - await CacheEntry.invalidate(this.request, this.options) - } - body.emit('error', err) - cacheStream.resume() - }) - cacheStream.pipe(body) - } + if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') { + await CacheEntry.invalidate(this.request, this.options) + } + body.emit('error', err) + cacheStream.resume() + }) + // emit the integrity and size events based on our metadata so we're consistent + body.emit('integrity', this.entry.integrity) + body.emit('size', Number(headers['content-length'])) + cacheStream.pipe(body) } body.once('resume', onResume) - body.once('end', removeOnResume) + body.once('end', () => body.removeListener('resume', onResume)) response = new Response(body, { url: this.entry.metadata.url, counter: options.counter, status: 200, - headers: { - ...this.policy.responseHeaders(), - }, + headers, }) } response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath)) response.headers.set('x-local-cache-hash', encodeURIComponent(this.entry.integrity)) response.headers.set('x-local-cache-key', encodeURIComponent(this.key)) - response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream') + response.headers.set('x-local-cache-mode', 'stream') response.headers.set('x-local-cache-status', status) response.headers.set('x-local-cache-time', new Date(this.entry.metadata.time).toUTCString()) return response diff --git a/node_modules/make-fetch-happen/lib/pipeline.js b/node_modules/make-fetch-happen/lib/pipeline.js new file mode 100644 index 000000000000..b1d221b2d0ce --- /dev/null +++ b/node_modules/make-fetch-happen/lib/pipeline.js @@ -0,0 +1,41 @@ +'use strict' + +const MinipassPipeline = require('minipass-pipeline') + +class CachingMinipassPipeline extends MinipassPipeline { + #events = [] + #data = new Map() + + constructor (opts, ...streams) { + // CRITICAL: do NOT pass the streams to the call to super(), this will start + // the flow of data and potentially cause the events we need to catch to emit + // before we've finished our own setup. instead we call super() with no args, + // finish our setup, and then push the streams into ourselves to start the + // data flow + super() + this.#events = opts.events + + /* istanbul ignore next - coverage disabled because this is pointless to test here */ + if (streams.length) { + this.push(...streams) + } + } + + on (event, handler) { + if (this.#events.includes(event) && this.#data.has(event)) { + return handler(...this.#data.get(event)) + } + + return super.on(event, handler) + } + + emit (event, ...data) { + if (this.#events.includes(event)) { + this.#data.set(event, data) + } + + return super.emit(event, ...data) + } +} + +module.exports = CachingMinipassPipeline diff --git a/node_modules/make-fetch-happen/lib/remote.js b/node_modules/make-fetch-happen/lib/remote.js index a8b8d2a0198d..763fc0d48802 100644 --- a/node_modules/make-fetch-happen/lib/remote.js +++ b/node_modules/make-fetch-happen/lib/remote.js @@ -1,9 +1,9 @@ const Minipass = require('minipass') -const MinipassPipeline = require('minipass-pipeline') const fetch = require('minipass-fetch') const promiseRetry = require('promise-retry') const ssri = require('ssri') +const CachingMinipassPipeline = require('./pipeline.js') const getAgent = require('./agent.js') const pkg = require('../package.json') @@ -53,7 +53,16 @@ const remoteFetch = (request, options) => { // we got a 200 response and the user has specified an expected // integrity value, so wrap the response in an ssri stream to verify it const integrityStream = ssri.integrityStream({ integrity: _opts.integrity }) - res = new fetch.Response(new MinipassPipeline(res.body, integrityStream), res) + const pipeline = new CachingMinipassPipeline({ + events: ['integrity', 'size'], + }, res.body, integrityStream) + // we also propagate the integrity and size events out to the pipeline so we can use + // this new response body as an integrityEmitter for cacache + integrityStream.on('integrity', i => pipeline.emit('integrity', i)) + integrityStream.on('size', s => pipeline.emit('size', s)) + res = new fetch.Response(pipeline, res) + // set an explicit flag so we know if our response body will emit integrity and size + res.body.hasIntegrityEmitter = true } res.headers.set('x-fetch-attempts', attemptNum) diff --git a/node_modules/make-fetch-happen/package.json b/node_modules/make-fetch-happen/package.json index e1e8e975187b..2e6153b99fa0 100644 --- a/node_modules/make-fetch-happen/package.json +++ b/node_modules/make-fetch-happen/package.json @@ -1,6 +1,6 @@ { "name": "make-fetch-happen", - "version": "10.1.3", + "version": "10.1.5", "description": "Opinionated, caching, retrying fetch client", "main": "lib/index.js", "files": [ @@ -37,7 +37,7 @@ "license": "ISC", "dependencies": { "agentkeepalive": "^4.2.1", - "cacache": "^16.0.2", + "cacache": "^16.1.0", "http-cache-semantics": "^4.1.0", "http-proxy-agent": "^5.0.0", "https-proxy-agent": "^5.0.0", @@ -55,7 +55,7 @@ }, "devDependencies": { "@npmcli/eslint-config": "^3.0.1", - "@npmcli/template-oss": "3.4.3", + "@npmcli/template-oss": "3.5.0", "mkdirp": "^1.0.4", "nock": "^13.2.4", "rimraf": "^3.0.2", @@ -73,6 +73,6 @@ }, "templateOSS": { "//@npmcli/template-oss": "This file is partially managed by @npmcli/template-oss. Edits may be overwritten.", - "version": "3.4.3" + "version": "3.5.0" } } diff --git a/package-lock.json b/package-lock.json index 83d078741d94..3154dc55452c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -121,7 +121,7 @@ "libnpmsearch": "^5.0.2", "libnpmteam": "^4.0.2", "libnpmversion": "^3.0.1", - "make-fetch-happen": "^10.1.3", + "make-fetch-happen": "^10.1.5", "minipass": "^3.1.6", "minipass-pipeline": "^1.2.4", "mkdirp": "^1.0.4", @@ -4609,13 +4609,13 @@ "peer": true }, "node_modules/make-fetch-happen": { - "version": "10.1.3", - "resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.3.tgz", - "integrity": "sha512-s/UjmGjUHn9m52cctFhN2ITObbT+axoUhgeir8xGrOlPbKDyJsdhQzb8PGncPQQ28uduHybFJ6Iumy2OZnreXw==", + "version": "10.1.5", + "resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.5.tgz", + "integrity": "sha512-mucOj2H0Jn/ax7H9K9T1bf0p1nn/mBFa551Os7ed9xRfLEx20aZhZeLslmRYfAaAqXZUGipcs+m5KOKvOH0XKA==", "inBundle": true, "dependencies": { "agentkeepalive": "^4.2.1", - "cacache": "^16.0.2", + "cacache": "^16.1.0", "http-cache-semantics": "^4.1.0", "http-proxy-agent": "^5.0.0", "https-proxy-agent": "^5.0.0", @@ -13269,12 +13269,12 @@ "peer": true }, "make-fetch-happen": { - "version": "10.1.3", - "resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.3.tgz", - "integrity": "sha512-s/UjmGjUHn9m52cctFhN2ITObbT+axoUhgeir8xGrOlPbKDyJsdhQzb8PGncPQQ28uduHybFJ6Iumy2OZnreXw==", + "version": "10.1.5", + "resolved": "https://registry.npmjs.org/make-fetch-happen/-/make-fetch-happen-10.1.5.tgz", + "integrity": "sha512-mucOj2H0Jn/ax7H9K9T1bf0p1nn/mBFa551Os7ed9xRfLEx20aZhZeLslmRYfAaAqXZUGipcs+m5KOKvOH0XKA==", "requires": { "agentkeepalive": "^4.2.1", - "cacache": "^16.0.2", + "cacache": "^16.1.0", "http-cache-semantics": "^4.1.0", "http-proxy-agent": "^5.0.0", "https-proxy-agent": "^5.0.0", diff --git a/package.json b/package.json index 621be8335705..a2fd510a77c8 100644 --- a/package.json +++ b/package.json @@ -90,7 +90,7 @@ "libnpmsearch": "^5.0.2", "libnpmteam": "^4.0.2", "libnpmversion": "^3.0.1", - "make-fetch-happen": "^10.1.3", + "make-fetch-happen": "^10.1.5", "minipass": "^3.1.6", "minipass-pipeline": "^1.2.4", "mkdirp": "^1.0.4",