Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: gajus/slonik
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v37.0.0
Choose a base ref
...
head repository: gajus/slonik
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v37.0.1
Choose a head ref
  • 9 commits
  • 14 files changed
  • 1 contributor

Commits on Sep 29, 2023

  1. style: apply eslint fixes

    gajus committed Sep 29, 2023
    Copy the full SHA
    33ce6e5 View commit details
  2. chore: udpate semantic-release

    gajus committed Sep 29, 2023
    Copy the full SHA
    ac8a1ee View commit details
  3. chore: update typescript

    gajus committed Sep 29, 2023
    Copy the full SHA
    c5ee270 View commit details
  4. Copy the full SHA
    a6bbdde View commit details
  5. Copy the full SHA
    5baf5d6 View commit details
  6. test: streams include notices

    gajus committed Sep 29, 2023
    Copy the full SHA
    db02359 View commit details
  7. Copy the full SHA
    d00f107 View commit details
  8. chore: update ava

    gajus committed Sep 29, 2023
    Copy the full SHA
    a6de154 View commit details
  9. fix: correct typescript types

    gajus committed Sep 29, 2023
    Copy the full SHA
    6b3ee05 View commit details
13 changes: 10 additions & 3 deletions knip.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
{
"$schema": "https://unpkg.com/knip@2/schema.json",
"entry": ["src/index.ts"],
"ignoreDependencies": ["husky", "ts-node"],
"project": ["src/**/*.ts"]
"entry": [
"src/index.ts"
],
"ignoreDependencies": [
"husky",
"ts-node"
],
"project": [
"src/**/*.ts"
]
}
5,515 changes: 3,511 additions & 2,004 deletions package-lock.json

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
@@ -21,15 +21,15 @@
},
"description": "A Node.js PostgreSQL client with strict types, detailed logging and assertions.",
"devDependencies": {
"@semantic-release/commit-analyzer": "^9.0.2",
"@semantic-release/github": "^8.1.0",
"@semantic-release/npm": "^10.0.3",
"@semantic-release/commit-analyzer": "^11.0.0",
"@semantic-release/github": "^9.2.1",
"@semantic-release/npm": "^11.0.0",
"@types/sinon": "^10.0.15",
"ava": "^5.3.0",
"ava": "^5.3.1",
"cspell": "^7.3.6",
"delay": "^5.0.0",
"eslint": "^8.41.0",
"eslint-config-canonical": "^41.0.5",
"eslint": "^8.50.0",
"eslint-config-canonical": "^41.2.3",
"expect-type": "^0.15.0",
"gitdown": "^3.1.5",
"husky": "^8.0.3",
@@ -38,10 +38,10 @@
"pg-native": "^3.0.1",
"postgres": "^3.3.5",
"postgres-bridge": "^1.14.0",
"semantic-release": "^21.0.2",
"semantic-release": "^22.0.5",
"sinon": "^15.1.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.4",
"typescript": "^5.2.2",
"zod": "^3.21.4"
},
"engines": {
@@ -89,11 +89,11 @@
"scripts": {
"build": "rm -fr ./dist && tsc --project ./tsconfig.build.json",
"create-readme": "gitdown ./.README/README.md --output-file ./README.md",
"lint": "npm run lint:cspell && npm run lint:eslint && npm run lint:knip && npm run lint:tsc",
"lint:cspell": "cspell . --no-progress --gitignore",
"lint:eslint": "eslint --cache ./src ./test",
"lint:knip": "knip",
"lint:tsc": "tsc --noEmit",
"lint": "npm run lint:cspell && npm run lint:eslint && npm run lint:knip && npm run lint:tsc",
"test": "nyc ava --verbose --serial"
},
"types": "./dist/index.d.ts",
5 changes: 3 additions & 2 deletions src/connectionMethods/query.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type ExecutionRoutine } from '../routines/executeQuery';
import { executeQuery } from '../routines/executeQuery';
import { executeQuery, type ExecutionRoutine } from '../routines/executeQuery';
import {
type Field,
type InternalQueryMethod,
@@ -37,6 +36,7 @@ const executionRoutine: ExecutionRoutine = async (
notices: result.notices ?? [],
rowCount: result.rowCount || 0,
rows: result.rows || [],
type: 'QueryResult',
};
};

@@ -54,5 +54,6 @@ export const query: InternalQueryMethod = async (
slonikSql,
inheritedQueryId,
executionRoutine,
false,
);
};
25 changes: 13 additions & 12 deletions src/connectionMethods/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type ExecutionRoutine } from '../routines/executeQuery';
import { executeQuery } from '../routines/executeQuery';
import { executeQuery, type ExecutionRoutine } from '../routines/executeQuery';
import {
type ClientConfiguration,
type Field,
@@ -80,14 +79,6 @@ const createExecutionRoutine = <T>(
streamOptions?: QueryStreamConfig,
): ExecutionRoutine => {
return async (connection, sql, values, executionContext, actualQuery) => {
const streamEndResultRow = {
command: 'SELECT',
fields: [],
notices: [],
rowCount: 0,
rows: [],
} as const;

const queryStream: Readable = connection.query(
new QueryStream(sql, values as unknown[], streamOptions),
);
@@ -103,7 +94,10 @@ const createExecutionRoutine = <T>(

await pipeline(queryStream, transformStream);

return streamEndResultRow;
return {
notices: [],
type: 'StreamResult',
};
};
};

@@ -116,12 +110,19 @@ export const stream: InternalStreamFunction = async (
uid,
streamOptions,
) => {
return await executeQuery(
const result = await executeQuery(
connectionLogger,
connection,
clientConfiguration,
slonikSql,
undefined,
createExecutionRoutine(clientConfiguration, onStream, streamOptions),
true,
);

if (result.type === 'QueryResult') {
throw new Error('Query result cannot be returned in a streaming context.');
}

return result;
};
1 change: 1 addition & 0 deletions src/factories/createMockQueryResult.ts
Original file line number Diff line number Diff line change
@@ -9,5 +9,6 @@ export const createMockQueryResult = (
notices: [],
rowCount: rows.length,
rows,
type: 'QueryResult',
};
};
67 changes: 36 additions & 31 deletions src/routines/executeQuery.ts
Original file line number Diff line number Diff line change
@@ -25,14 +25,15 @@ import {
type QueryResult,
type QueryResultRow,
type QuerySqlToken,
type StreamResult,
} from '../types';
import { createQueryId } from '../utilities/createQueryId';
import { defer } from '../utilities/defer';
import { getStackTrace } from 'get-stack-trace';
import { type PoolClient as PgPoolClient } from 'pg';
import { serializeError } from 'serialize-error';

type GenericQueryResult = QueryResult<QueryResultRow>;
type GenericQueryResult = StreamResult | QueryResult<QueryResultRow>;

export type ExecutionRoutine = (
connection: PgPoolClient,
@@ -122,7 +123,10 @@ export const executeQuery = async (
query: QuerySqlToken,
inheritedQueryId: QueryId | undefined,
executionRoutine: ExecutionRoutine,
): Promise<QueryResult<Record<string, PrimitiveValueExpression>>> => {
stream: boolean,
): Promise<
StreamResult | QueryResult<Record<string, PrimitiveValueExpression>>
> => {
const poolClientState = getPoolClientState(connection);

if (poolClientState.terminated) {
@@ -189,19 +193,21 @@ export const executeQuery = async (

let result: GenericQueryResult | null;

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.beforeQueryExecution) {
result = await interceptor.beforeQueryExecution(
executionContext,
actualQuery,
);

if (result) {
log.info(
'beforeQueryExecution interceptor produced a result; short-circuiting query execution using beforeQueryExecution result',
if (!stream) {
for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.beforeQueryExecution) {
result = await interceptor.beforeQueryExecution(
executionContext,
actualQuery,
);

return result;
if (result) {
log.info(
'beforeQueryExecution interceptor produced a result; short-circuiting query execution using beforeQueryExecution result',
);

return result;
}
}
}
}
@@ -346,18 +352,17 @@ export const executeQuery = async (
// @ts-expect-error -- We want to keep notices as readonly for consumer, but write to it here.
result.notices = notices;

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.afterQueryExecution) {
await interceptor.afterQueryExecution(
executionContext,
actualQuery,
result,
);
if (result.type === 'QueryResult') {
for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.afterQueryExecution) {
await interceptor.afterQueryExecution(
executionContext,
actualQuery,
result,
);
}
}
}

// Stream does not have `rows` in the result object and all rows are already transformed.
if (result.rows) {
const interceptors: Interceptor[] =
clientConfiguration.interceptors.slice();

@@ -378,15 +383,15 @@ export const executeQuery = async (
};
}
}
}

for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.beforeQueryResult) {
await interceptor.beforeQueryResult(
executionContext,
actualQuery,
result,
);
for (const interceptor of clientConfiguration.interceptors) {
if (interceptor.beforeQueryResult) {
await interceptor.beforeQueryResult(
executionContext,
actualQuery,
result,
);
}
}
}

10 changes: 8 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -84,6 +84,7 @@ export type QueryResult<T> = {
readonly notices: readonly Notice[];
readonly rowCount: number;
readonly rows: readonly T[];
readonly type: 'QueryResult';
};

export type ClientConfiguration = {
@@ -145,11 +146,16 @@ export type ClientConfigurationInput = Partial<ClientConfiguration>;

export type QueryStreamConfig = ReadableOptions & { batchSize?: number };

export type StreamResult = {
notices: readonly Notice[];
type: 'StreamResult';
};

type StreamFunction = <T extends ZodTypeAny>(
sql: QuerySqlToken<T>,
streamHandler: StreamHandler<z.infer<T>>,
config?: QueryStreamConfig,
) => Promise<Record<string, unknown> | null>;
) => Promise<StreamResult>;

export type CommonQueryMethods = {
readonly any: QueryAnyFunction;
@@ -446,7 +452,7 @@ export type InternalStreamFunction = <T>(
streamHandler: StreamHandler<T>,
uid?: QueryId,
config?: QueryStreamConfig,
) => Promise<Record<string, unknown>>;
) => Promise<StreamResult>;

export type InternalTransactionFunction = <T>(
log: Logger,
7 changes: 7 additions & 0 deletions test/helpers/createIntegrationTests.ts
Original file line number Diff line number Diff line change
@@ -90,6 +90,7 @@ export const createTestRunner = (
tags text[],
birth_date date,
payload bytea,
molecules int8,
updated_no_tz_at timestamp without time zone NOT NULL DEFAULT now(),
updated_at timestamp with time zone NOT NULL DEFAULT now()
)
@@ -437,6 +438,7 @@ export const createIntegrationTests = (
names: [Buffer.from('foo')],
},
],
type: 'QueryResult',
});

await pool.end();
@@ -475,6 +477,7 @@ export const createIntegrationTests = (
name: 'foo',
},
],
type: 'QueryResult',
});

await pool.end();
@@ -522,6 +525,7 @@ export const createIntegrationTests = (
name: 'bar',
},
],
type: 'QueryResult',
});

await pool.end();
@@ -567,6 +571,7 @@ export const createIntegrationTests = (
name: 'foo',
},
],
type: 'QueryResult',
});

await pool.end();
@@ -1136,6 +1141,7 @@ export const createIntegrationTests = (
name: 'foo',
},
],
type: 'QueryResult',
});

await pool.end();
@@ -1193,6 +1199,7 @@ export const createIntegrationTests = (
name: 1,
},
],
type: 'QueryResult',
});

await pool.end();
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ test('short-circuits the query execution', async (t) => {
foo: 2,
},
],
type: 'QueryResult',
};
},
},
@@ -45,6 +46,7 @@ test('short-circuits the query execution', async (t) => {
foo: 2,
},
],
type: 'QueryResult',
});
});

@@ -69,6 +71,7 @@ test('executes query if "beforeQuery" does not return results', async (t) => {
foo: 1,
},
],
type: 'QueryResult',
});

const result = await pool.query(sql.unsafe`SELECT 1`);
@@ -83,5 +86,6 @@ test('executes query if "beforeQuery" does not return results', async (t) => {
foo: 1,
},
],
type: 'QueryResult',
});
});
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ test('overrides result row', async (t) => {
foo: 1,
},
],
type: 'QueryResult',
});

const result = await pool.query(sql.unsafe`SELECT 1`);
@@ -40,5 +41,6 @@ test('overrides result row', async (t) => {
foo: 2,
},
],
type: 'QueryResult',
});
});
4 changes: 4 additions & 0 deletions test/slonik/connectionMethods/query/query.ts
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ test('executes the query and returns the result', async (t) => {
foo: 1,
},
],
type: 'QueryResult',
});

const result = await pool.query(sql.unsafe`SELECT 1`);
@@ -76,6 +77,7 @@ test('executes the query and returns the result', async (t) => {
foo: 1,
},
],
type: 'QueryResult',
});
});

@@ -114,6 +116,7 @@ test('adds notices observed during the query execution to the query result objec
foo: 1,
},
],
type: 'QueryResult',
});

await delay(100);
@@ -130,6 +133,7 @@ test('adds notices observed during the query execution to the query result objec
foo: 1,
},
],
type: 'QueryResult',
});
});

127 changes: 126 additions & 1 deletion test/slonik/integration/stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { createPool, sql, StatementTimeoutError } from '../../../src';
import {
createBigintTypeParser,
createPool,
sql,
StatementTimeoutError,
} from '../../../src';
import { createTestRunner } from '../../helpers/createIntegrationTests';
import { Pool as PgPool } from 'pg';
import * as sinon from 'sinon';
@@ -220,6 +225,126 @@ test('streams rows using AsyncIterator', async (t) => {
await pool.end();
});

test('reading stream using custom type parsers', async (t) => {
const pool = await createPool(t.context.dsn, {
typeParsers: [createBigintTypeParser()],
});

await pool.query(sql.unsafe`
INSERT INTO person (name, molecules)
VALUES
('foo', ${BigInt('6022000000000000000')}),
('bar', ${BigInt('6022000000000000001')}),
('baz', ${BigInt('6022000000000000002')})
`);

const persons: bigint[] = [];

await pool.stream(
sql.type(
z.object({
molecules: z.bigint(),
}),
)`
SELECT molecules
FROM person
`,
(stream) => {
stream.on('data', (datum) => {
persons.push(datum.data.molecules);
});
},
);

t.deepEqual(persons, [
BigInt('6022000000000000000'),
BigInt('6022000000000000001'),
BigInt('6022000000000000002'),
]);

await pool.end();
});

test('reading stream using row transform interceptors', async (t) => {
const pool = await createPool(t.context.dsn, {
interceptors: [
{
transformRow: (context, query, row) => {
return {
...row,
// @ts-expect-error - we know it exists
name: row.name.toUpperCase(),
};
},
},
],
});

await pool.query(sql.unsafe`
INSERT INTO person (name)
VALUES ('foo'), ('bar'), ('baz')
`);

const names: string[] = [];

await pool.stream(
sql.type(
z.object({
name: z.string(),
}),
)`
SELECT name
FROM person
`,
(stream) => {
stream.on('data', (datum) => {
names.push(datum.data.name);
});
},
);

t.deepEqual(names, ['FOO', 'BAR', 'BAZ']);

await pool.end();
});

test('streams include notices', async (t) => {
const pool = await createPool(t.context.dsn);

await pool.query(sql.unsafe`
CREATE OR REPLACE FUNCTION test_notice
(
v_test INTEGER
) RETURNS BOOLEAN
LANGUAGE plpgsql
AS
$$
BEGIN
RAISE NOTICE '1. TEST NOTICE [%]',v_test;
RAISE NOTICE '2. TEST NOTICE [%]',v_test;
RAISE NOTICE '3. TEST NOTICE [%]',v_test;
RETURN TRUE;
END;
$$;
`);

const result = await pool.stream(
sql.unsafe`
SELECT *
FROM test_notice(${10})
`,
(stream) => {
stream.on('data', () => {});
},
);

t.true(result.notices.length === 3);

await pool.end();
});

test('streams rows with different batchSize', async (t) => {
const pool = await createPool(t.context.dsn);

5 changes: 5 additions & 0 deletions test/slonik/routines/executeQuery.ts
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ test('throws a descriptive error if query is empty', async (t) => {
} as unknown as QuerySqlToken,
'foo',
t.context.executionRoutine,
false,
);
});

@@ -69,6 +70,7 @@ test('throws a descriptive error if the entire query is a value binding', async
} as unknown as QuerySqlToken,
'foo',
t.context.executionRoutine,
false,
);
});

@@ -108,6 +110,7 @@ test('retries an implicit query that failed due to a transaction error', async (
} as unknown as QuerySqlToken,
'foo',
executionRoutineStub,
false,
);

t.is(executionRoutineStub.callCount, 2);
@@ -149,6 +152,7 @@ test('returns the thrown transaction error if the retry limit is reached', async
} as unknown as QuerySqlToken,
'foo',
executionRoutineStub,
false,
),
);

@@ -189,6 +193,7 @@ test('transaction errors are not handled if the function was called by a transac
} as unknown as QuerySqlToken,
'foo',
executionRoutineStub,
false,
),
);