Skip to content

Commit

Permalink
improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 28, 2020
1 parent 4dedc2c commit 4080a28
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 115 deletions.
46 changes: 17 additions & 29 deletions lib/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const {
} = require('./errors')
const Request = require('./request')
const util = require('./util')
const assert = require('assert')
const { kResume, kEnqueue } = require('./symbols')

// TODO: Refactor
Expand All @@ -28,10 +29,7 @@ class PipelineRequest extends Request {
_onHeaders (statusCode, headers, resume) {
const { callback } = this

if (!callback) {
return
}

assert(callback)
this.callback = null
this.res = callback.call(this, null, {
statusCode,
Expand All @@ -42,19 +40,15 @@ class PipelineRequest extends Request {
}

_onBody (chunk, offset, length) {
if (this.res) {
return this.res(null, chunk.slice(offset, offset + length))
}
return this.res(null, chunk.slice(offset, offset + length))
}

_onComplete (trailers) {
// TODO: Trailers?

if (this.res) {
const res = this.res
this.res = null
res(null, null)
}
const res = this.res
this.res = null
res(null, null)
}

_onError (err) {
Expand Down Expand Up @@ -106,11 +100,7 @@ module.exports = function (client, opts, handler) {
ret.end()
}

if (request) {
request.runInAsyncScope(callback, null, err, null)
} else {
callback(err, null)
}
request.runInAsyncScope(callback, null, err, null)
}
})

Expand Down Expand Up @@ -139,19 +129,17 @@ module.exports = function (client, opts, handler) {
util.destroy(body, err)
util.destroy(req, err)
util.destroy(res, err)
if (request) {
if (err) {
request.onError(err)
}
request.runInAsyncScope(
callback,
null,
err,
null
)
} else {
callback(err, null)

if (err) {
request.onError(err)
}

request.runInAsyncScope(
callback,
null,
err,
null
)
}
}).on('prefinish', () => {
// Node < 15 does not call _final in same tick.
Expand Down
26 changes: 12 additions & 14 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const {
} = require('./errors')
const util = require('./util')
const { kEnqueue } = require('./symbols')
const { assert } = require('console')
const assert = require('assert')

class StreamRequest extends Request {
constructor (client, opts, factory, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -81,17 +82,15 @@ class StreamRequest extends Request {

const { callback, res } = this

if (res) {
this.res = null
if (!res.readable) {
util.destroy(res)
}
assert(res)
this.res = null
if (!res.readable) {
util.destroy(res)
}

if (callback) {
this.callback = null
callback(null, null)
}
assert(callback)
this.callback = null
callback(null, null)
})

if (typeof res.destroy === 'function') {
Expand All @@ -118,10 +117,9 @@ class StreamRequest extends Request {

this.factory = null

if (callback) {
this.callback = null
process.nextTick(callback, err, null)
}
assert(callback)
this.callback = null
process.nextTick(callback, err, null)

if (res) {
this.res = null
Expand Down
34 changes: 12 additions & 22 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class Client extends EventEmitter {
resume(this)
}

/* istanbul ignore next: only used for test */
[kConnect] (cb) {
connect(this)
if (cb) {
Expand Down Expand Up @@ -390,11 +391,7 @@ class Parser extends HTTPParser {

[HTTPParser.kOnExecute] (ret) {
if (ret instanceof Error) {
const err = ret
if (typeof err.reason === 'string') {
err.message = `Parse Error: ${err.reason}`
}
util.destroy(this.socket, err)
util.destroy(this.socket, ret)
} else {
// When the underlying `net.Socket` instance is consumed - no
// `data` events are emitted, and thus `socket.setTimeout` fires the
Expand Down Expand Up @@ -428,7 +425,7 @@ class Parser extends HTTPParser {
// TODO: More statusCode validation?

if (statusCode < 200) {
request.onInfo(statusCode, util.parseHeaders(rawHeaders, headers))
// request.onInfo(statusCode, util.parseHeaders(rawHeaders, headers))
} else {
request.onHeaders(statusCode, util.parseHeaders(rawHeaders, headers), resumeSocket)
}
Expand All @@ -439,10 +436,7 @@ class Parser extends HTTPParser {
[HTTPParser.kOnBody] (chunk, offset, length) {
const { client, socket, statusCode } = this

if (statusCode < 200) {
util.destroy(socket, new SocketError('unexpected request body'))
return
}
assert(statusCode >= 200)

const request = client[kQueue][client[kRunningIdx]]

Expand Down Expand Up @@ -537,6 +531,7 @@ function connect (client) {
let socket
if (protocol === 'https:') {
const tlsOpts = { ...client[kTLSOpts], servername }
/* istanbul ignore next: https://github.com/mcollina/undici/issues/267 */
socket = client[kSocketPath]
? tls.connect(client[kSocketPath], tlsOpts)
: tls.connect(port || /* istanbul ignore next */ 443, hostname, tlsOpts)
Expand Down Expand Up @@ -571,7 +566,7 @@ function connect (client) {
client.emit('connect')
resume(client)
})
.on('data', function () {
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
Expand Down Expand Up @@ -704,11 +699,7 @@ function resume (client) {
return
}

if (client[kReset]) {
return
}

if (client[kWriting]) {
if (client[kWriting] || client[kReset]) {
return
}

Expand All @@ -721,16 +712,17 @@ function resume (client) {

if (util.isStream(request.body) && util.bodyLength(request.body) === 0) {
request.body
.on('data', function () {
.on('data', /* istanbul ignore next */ function () {
/* istanbul ignore next */
assert(false)
})
.on('error', function (err) {
util.destroy(this)
request.onError(err)
})
.on('end', function () {
util.destroy(this)
})

request.body = null
}

Expand Down Expand Up @@ -822,7 +814,8 @@ function write (client, request) {

request.body = null
client[kReset] = !expectsPayload
} else if (util.isStream(body)) {
} else {
assert(util.isStream(body))
assert(contentLength !== 0 || !client.running, 'stream body cannot be pipelined')

let finished = false
Expand Down Expand Up @@ -933,9 +926,6 @@ function write (client, request) {
.on('close', onFinished)

client[kWriting] = true
} else {
/* istanbul ignore next */
assert(false)
}

socket.uncork()
Expand Down
10 changes: 0 additions & 10 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ class Request extends AsyncResource {
: null
}

onInfo (statusCode, headers) {
if (this.aborted) {
return
}

if (this._onInfo) {
this.runInAsyncScope(this._onInfo, this, statusCode, headers)
}
}

onHeaders (statusCode, headers, resume) {
if (this.aborted) {
return
Expand Down
54 changes: 54 additions & 0 deletions test/client-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -839,3 +839,57 @@ test('pipeline invalid opts', (t) => {
})
})
})

test('pipeline CONNECT throw', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))

client.pipeline({
path: '/',
method: 'CONNECT'
}, () => {
t.fail()
}).on('error', (err) => {
t.ok(err instanceof errors.NotSupportedError)
})
client.on('disconnect', () => {
t.fail()
})
})
})

test('pipeline body without destroy', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end('asd')
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.destroy.bind(client))

client.pipeline({
path: '/',
method: 'GET'
}, ({ body }) => {
const pt = new PassThrough({ autoDestroy: false })
pt.destroy = null
return body.pipe(pt)
})
.end()
.on('end', () => {
t.pass()
})
.resume()
})
})

0 comments on commit 4080a28

Please sign in to comment.