From f337089567d7d92c9467e311be7d72b0a7dc8047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 23 Feb 2024 08:46:16 +0100 Subject: [PATCH] fix: only reset pending value with resume token (#2000) Fixes #1959 --- .gitignore | 1 + src/partial-result-stream.ts | 7 +- test/spanner.ts | 207 +++++++++++++++++++++++++++++++++-- 3 files changed, 205 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index d4f03a0df..604f64ee0 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ system-test/*key.json .DS_Store package-lock.json __pycache__ +.idea diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index a3d42ee6d..e03539219 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -244,8 +244,7 @@ export class PartialResultStream extends Transform implements ResultEvents { } } - _clearPendingValues() { - this._values = []; + _resetPendingValues() { if (this._pendingValueForResume) { this._pendingValue = this._pendingValueForResume; } else { @@ -484,7 +483,9 @@ export function partialResultStream( }); }; const makeRequest = (): void => { - partialRSStream._clearPendingValues(); + if (is.defined(lastResumeToken) && lastResumeToken.length > 0) { + partialRSStream._resetPendingValues(); + } lastRequestStream = requestFn(lastResumeToken); lastRequestStream.on('end', endListener); requestsStream.add(lastRequestStream); diff --git a/test/spanner.ts b/test/spanner.ts index 315fb0596..32a1b8e58 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -120,7 +120,6 @@ describe('Spanner with mock server', () => { } ); }); - server.start(); spannerMock.putStatementResult( selectSql, mock.StatementResult.resultSet(mock.createSimpleResultSet()) @@ -3695,10 +3694,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked string value', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -3747,10 +3746,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked string value in an array', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -3800,10 +3799,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked list value', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -4047,6 +4046,200 @@ describe('Spanner with mock server', () => { } }); + it('should clear pending values if the last partial result did not have a resume token and was not a complete row', async () => { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: undefined, + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: false, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + // Let the stream return UNAVAILABLE on index 1 (so the second PartialResultSet). + setupResultsAndErrors(sql, [prs1, prs2, prs3], [1]); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + }); + + it('should not clear pending values if the last partial result had a resume token and was not a complete row', async () => { + for (const errorIndexes of [[1], [2]]) { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: Buffer.from('00000000'), + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: false, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + } + }); + + it('should not clear pending values if the last partial result was chunked and had a resume token', async () => { + for (const errorIndexes of [[2]]) { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: Buffer.from('00000000'), + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: true, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + // The previous value was chunked, but it is still perfectly possible that it actually contained + // the entire value. So in this case the actual value was '100'. + {stringValue: ''}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + } + }); + + function verifyQueryResult(rows: Json[]) { + assert.strictEqual(rows.length, 2); + assert.strictEqual(rows[0].col1, 'id1.1'); + assert.strictEqual(rows[0].col2, 'id1.2'); + assert.strictEqual(rows[0].col3, 100); + assert.strictEqual(rows[0].col4, true); + assert.strictEqual(rows[0].col5, true); + assert.strictEqual(rows[0].col6, 0.5); + + assert.strictEqual(rows[1].col1, 'id2.1'); + assert.strictEqual(rows[1].col2, 'id2.2'); + assert.strictEqual(rows[1].col3, 200); + assert.strictEqual(rows[1].col4, true); + assert.strictEqual(rows[1].col5, true); + assert.strictEqual(rows[1].col6, 0.5); + } + + function createMultiColumnMetadata() { + const fields = [ + protobuf.StructType.Field.create({ + name: 'col1', + type: protobuf.Type.create({code: protobuf.TypeCode.STRING}), + }), + protobuf.StructType.Field.create({ + name: 'col2', + type: protobuf.Type.create({code: protobuf.TypeCode.STRING}), + }), + protobuf.StructType.Field.create({ + name: 'col3', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + protobuf.StructType.Field.create({ + name: 'col4', + type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}), + }), + protobuf.StructType.Field.create({ + name: 'col5', + type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}), + }), + protobuf.StructType.Field.create({ + name: 'col6', + type: protobuf.Type.create({code: protobuf.TypeCode.FLOAT64}), + }), + ]; + return new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + } + function createMetadata() { const fields = [ protobuf.StructType.Field.create({