Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: gajus/slonik
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v31.2.2
Choose a base ref
...
head repository: gajus/slonik
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v31.2.3
Choose a head ref
  • 1 commit
  • 3 files changed
  • 1 contributor

Commits on Sep 16, 2022

  1. fix: close underlying QueryStream on stream.destroy() (#360)

    * Close underlying QueryStream on stream.destroy()
    
    * Update test name
    
    Co-authored-by: alxndrsn <alxndrsn>
    alxndrsn authored Sep 16, 2022

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    d322cf9 View commit details
Showing with 50 additions and 9 deletions.
  1. +2 −7 src/QueryStream.ts
  2. +12 −2 src/connectionMethods/stream.ts
  3. +36 −0 test/slonik/integration/pg.ts
9 changes: 2 additions & 7 deletions src/QueryStream.ts
Original file line number Diff line number Diff line change
@@ -63,15 +63,10 @@ export class QueryStream extends Readable {
this.cursor.submit(connection);
}

public close (callback: Function) {
public _destroy (error: Error, callback: Function) {
this._closed = true;

// eslint-disable-next-line unicorn/consistent-function-scoping
const close = () => {
this.emit('close');
};

this.cursor.close(callback || close);
this.cursor.close(callback);
}

public _read (size: number) {
14 changes: 12 additions & 2 deletions src/connectionMethods/stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type Stream from 'stream';
import {
type Readable,
} from 'stream';
import through from 'through2';
import {
QueryStream,
@@ -21,7 +23,7 @@ export const stream: InternalStreamFunction = async (connectionLogger, connectio
async (finalConnection, finalSql, finalValues, executionContext, actualQuery) => {
const query = new QueryStream(finalSql, finalValues, options);

const queryStream: Stream = finalConnection.query(query);
const queryStream: Readable = finalConnection.query(query);

const rowTransformers: Array<NonNullable<Interceptor['transformRow']>> = [];

@@ -66,6 +68,10 @@ export const stream: InternalStreamFunction = async (connectionLogger, connectio

// Invoked if stream is destroyed using transformedStream.destroy().
transformedStream.on('close', () => {
if (!queryStream.destroyed) {
queryStream.destroy();
}

resolve({
command: 'SELECT',
fields: [],
@@ -75,6 +81,10 @@ export const stream: InternalStreamFunction = async (connectionLogger, connectio
});
});

transformedStream.on('error', (error: Error) => {
queryStream.destroy(error);
});

streamHandler(transformedStream);
});
},
36 changes: 36 additions & 0 deletions test/slonik/integration/pg.ts
Original file line number Diff line number Diff line change
@@ -337,3 +337,39 @@ test('copies from binary stream', async (t) => {

await pool.end();
});

test('frees connection after destroying a stream', async (t) => {
const pool = await createPool(t.context.dsn);

await pool.stream(sql`
SELECT * FROM GENERATE_SERIES(1, 100)
`, (stream) => {
stream.destroy();
});

t.deepEqual(await pool.anyFirst(sql`
SELECT TRUE
`), [
true,
]);

await pool.end();
});

test('frees connection after destroying a stream with an error', async (t) => {
const pool = await createPool(t.context.dsn);

await pool.stream(sql`
SELECT * FROM GENERATE_SERIES(1, 100)
`, (stream) => {
stream.destroy(new Error('Foo'));
});

t.deepEqual(await pool.anyFirst(sql`
SELECT TRUE
`), [
true,
]);

await pool.end();
});