@@ -1158,6 +1158,7 @@ class Http2Session extends EventEmitter {
1158
1158
streams : new Map ( ) ,
1159
1159
pendingStreams : new Set ( ) ,
1160
1160
pendingAck : 0 ,
1161
+ shutdownWritableCalled : false ,
1161
1162
writeQueueSize : 0 ,
1162
1163
originSet : undefined
1163
1164
} ;
@@ -1724,6 +1725,26 @@ function afterShutdown(status) {
1724
1725
stream [ kMaybeDestroy ] ( ) ;
1725
1726
}
1726
1727
1728
+ function shutdownWritable ( callback ) {
1729
+ const handle = this [ kHandle ] ;
1730
+ if ( ! handle ) return callback ( ) ;
1731
+ const state = this [ kState ] ;
1732
+ if ( state . shutdownWritableCalled ) {
1733
+ // Backport v12.x: Session required for debugging stream object
1734
+ // debugStreamObj(this, 'shutdownWritable() already called');
1735
+ return callback ( ) ;
1736
+ }
1737
+ state . shutdownWritableCalled = true ;
1738
+
1739
+ const req = new ShutdownWrap ( ) ;
1740
+ req . oncomplete = afterShutdown ;
1741
+ req . callback = callback ;
1742
+ req . handle = handle ;
1743
+ const err = handle . shutdown ( req ) ;
1744
+ if ( err === 1 ) // synchronous finish
1745
+ return afterShutdown . call ( req , 0 ) ;
1746
+ }
1747
+
1727
1748
function finishSendTrailers ( stream , headersList ) {
1728
1749
// The stream might be destroyed and in that case
1729
1750
// there is nothing to do.
@@ -1983,10 +2004,50 @@ class Http2Stream extends Duplex {
1983
2004
1984
2005
let req ;
1985
2006
2007
+ let waitingForWriteCallback = true ;
2008
+ let waitingForEndCheck = true ;
2009
+ let writeCallbackErr ;
2010
+ let endCheckCallbackErr ;
2011
+ const done = ( ) => {
2012
+ if ( waitingForEndCheck || waitingForWriteCallback ) return ;
2013
+ const err = writeCallbackErr || endCheckCallbackErr ;
2014
+ // writeGeneric does not destroy on error and
2015
+ // we cannot enable autoDestroy,
2016
+ // so make sure to destroy on error.
2017
+ if ( err ) {
2018
+ this . destroy ( err ) ;
2019
+ }
2020
+ cb ( err ) ;
2021
+ } ;
2022
+ const writeCallback = ( err ) => {
2023
+ waitingForWriteCallback = false ;
2024
+ writeCallbackErr = err ;
2025
+ done ( ) ;
2026
+ } ;
2027
+ const endCheckCallback = ( err ) => {
2028
+ waitingForEndCheck = false ;
2029
+ endCheckCallbackErr = err ;
2030
+ done ( ) ;
2031
+ } ;
2032
+ // Shutdown write stream right after last chunk is sent
2033
+ // so final DATA frame can include END_STREAM flag
2034
+ process . nextTick ( ( ) => {
2035
+ if ( writeCallbackErr ||
2036
+ ! this . _writableState . ending ||
2037
+ // Backport v12.x: _writableState.buffered does not exist
2038
+ // this._writableState.buffered.length ||
2039
+ this . _writableState . bufferedRequest ||
2040
+ ( this [ kState ] . flags & STREAM_FLAGS_HAS_TRAILERS ) )
2041
+ return endCheckCallback ( ) ;
2042
+ // Backport v12.x: Session required for debugging stream object
2043
+ // debugStreamObj(this, 'shutting down writable on last write');
2044
+ shutdownWritable . call ( this , endCheckCallback ) ;
2045
+ } ) ;
2046
+
1986
2047
if ( writev )
1987
- req = writevGeneric ( this , data , cb ) ;
2048
+ req = writevGeneric ( this , data , writeCallback ) ;
1988
2049
else
1989
- req = writeGeneric ( this , data , encoding , cb ) ;
2050
+ req = writeGeneric ( this , data , encoding , writeCallback ) ;
1990
2051
1991
2052
trackWriteState ( this , req . bytes ) ;
1992
2053
}
@@ -2000,21 +2061,13 @@ class Http2Stream extends Duplex {
2000
2061
}
2001
2062
2002
2063
_final ( cb ) {
2003
- const handle = this [ kHandle ] ;
2004
2064
if ( this . pending ) {
2005
2065
this . once ( 'ready' , ( ) => this . _final ( cb ) ) ;
2006
- } else if ( handle !== undefined ) {
2007
- debugStreamObj ( this , '_final shutting down' ) ;
2008
- const req = new ShutdownWrap ( ) ;
2009
- req . oncomplete = afterShutdown ;
2010
- req . callback = cb ;
2011
- req . handle = handle ;
2012
- const err = handle . shutdown ( req ) ;
2013
- if ( err === 1 ) // synchronous finish
2014
- return afterShutdown . call ( req , 0 ) ;
2015
- } else {
2016
- cb ( ) ;
2066
+ return ;
2017
2067
}
2068
+ // Backport v12.x: Session required for debugging stream object
2069
+ // debugStreamObj(this, 'shutting down writable on _final');
2070
+ shutdownWritable . call ( this , cb ) ;
2018
2071
}
2019
2072
2020
2073
_read ( nread ) {
@@ -2119,11 +2172,20 @@ class Http2Stream extends Duplex {
2119
2172
debugStream ( this [ kID ] || 'pending' , session [ kType ] , 'destroying stream' ) ;
2120
2173
2121
2174
const state = this [ kState ] ;
2122
- const sessionCode = session [ kState ] . goawayCode ||
2123
- session [ kState ] . destroyCode ;
2124
- const code = err != null ?
2125
- sessionCode || NGHTTP2_INTERNAL_ERROR :
2126
- state . rstCode || sessionCode ;
2175
+ const sessionState = session [ kState ] ;
2176
+ const sessionCode = sessionState . goawayCode || sessionState . destroyCode ;
2177
+
2178
+ // If a stream has already closed successfully, there is no error
2179
+ // to report from this stream, even if the session has errored.
2180
+ // This can happen if the stream was already in process of destroying
2181
+ // after a successful close, but the session had a error between
2182
+ // this stream's close and destroy operations.
2183
+ // Previously, this always overrode a successful close operation code
2184
+ // NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
2185
+ const code = ( err != null ?
2186
+ ( sessionCode || NGHTTP2_INTERNAL_ERROR ) :
2187
+ ( this . closed ? this . rstCode : sessionCode )
2188
+ ) ;
2127
2189
const hasHandle = handle !== undefined ;
2128
2190
2129
2191
if ( ! this . closed )
@@ -2132,13 +2194,13 @@ class Http2Stream extends Duplex {
2132
2194
2133
2195
if ( hasHandle ) {
2134
2196
handle . destroy ( ) ;
2135
- session [ kState ] . streams . delete ( id ) ;
2197
+ sessionState . streams . delete ( id ) ;
2136
2198
} else {
2137
- session [ kState ] . pendingStreams . delete ( this ) ;
2199
+ sessionState . pendingStreams . delete ( this ) ;
2138
2200
}
2139
2201
2140
2202
// Adjust the write queue size for accounting
2141
- session [ kState ] . writeQueueSize -= state . writeQueueSize ;
2203
+ sessionState . writeQueueSize -= state . writeQueueSize ;
2142
2204
state . writeQueueSize = 0 ;
2143
2205
2144
2206
// RST code 8 not emitted as an error as its used by clients to signify
0 commit comments