Skip to content

Commit

Permalink
Drain the response stream in the command method (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn committed May 8, 2024
1 parent c5ba385 commit ddcc741
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 278 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,9 @@
# 1.0.2 (Common, Node.js)

## Bug fixes

- (Node.js only) The `command` method now drains the response stream properly, as the previous implementation could cause the Keep-Alive socket to close after each request.

# 1.0.1 (Common, Node.js, Web)

## Bug fixes
Expand Down
16 changes: 6 additions & 10 deletions packages/client-common/src/client.ts
Expand Up @@ -9,11 +9,7 @@ import type {
} from '@clickhouse/client-common'
import { type DataFormat, DefaultLogger } from '@clickhouse/client-common'
import type { InputJSON, InputJSONObjectEachRow } from './clickhouse_types'
import type {
CloseStream,
ImplementationDetails,
ValuesEncoder,
} from './config'
import type { ImplementationDetails, ValuesEncoder } from './config'
import { getConnectionParams, prepareConfigWithURL } from './config'
import type { ConnPingResult } from './connection'
import type { BaseResultSet } from './result'
Expand Down Expand Up @@ -119,7 +115,6 @@ export class ClickHouseClient<Stream = unknown> {
private readonly connection: Connection<Stream>
private readonly makeResultSet: MakeResultSet<Stream>
private readonly valuesEncoder: ValuesEncoder<Stream>
private readonly closeStream: CloseStream<Stream>
private readonly sessionId?: string

constructor(
Expand All @@ -142,7 +137,6 @@ export class ClickHouseClient<Stream = unknown> {
)
this.makeResultSet = config.impl.make_result_set
this.valuesEncoder = config.impl.values_encoder
this.closeStream = config.impl.close_stream
}

/**
Expand Down Expand Up @@ -172,9 +166,11 @@ export class ClickHouseClient<Stream = unknown> {
* If you are interested in the response data, consider using {@link ClickHouseClient.exec}
*/
async command(params: CommandParams): Promise<CommandResult> {
const { stream, query_id, summary } = await this.exec(params)
await this.closeStream(stream)
return { query_id, summary }
const query = removeTrailingSemi(params.query.trim())
return await this.connection.command({
query,
...this.withClientQueryParams(params),
})
}

/**
Expand Down
3 changes: 0 additions & 3 deletions packages/client-common/src/config.ts
Expand Up @@ -113,8 +113,6 @@ export interface ValuesEncoder<Stream> {
): string | Stream
}

export type CloseStream<Stream> = (stream: Stream) => Promise<void>

/**
* An implementation might have extra config parameters that we can parse from the connection URL.
* These are supposed to be processed after we finish parsing the base configuration.
Expand All @@ -141,7 +139,6 @@ export interface ImplementationDetails<Stream> {
make_connection: MakeConnection<Stream>
make_result_set: MakeResultSet<Stream>
values_encoder: ValuesEncoder<Stream>
close_stream: CloseStream<Stream>
handle_specific_url_params?: HandleImplSpecificURLParams
}
}
Expand Down
6 changes: 4 additions & 2 deletions packages/client-common/src/connection.ts
Expand Up @@ -47,19 +47,21 @@ export interface ConnQueryResult<Stream> extends ConnBaseResult {
export type ConnInsertResult = ConnBaseResult & WithClickHouseSummary
export type ConnExecResult<Stream> = ConnQueryResult<Stream> &
WithClickHouseSummary
export type ConnCommandResult = ConnBaseResult & WithClickHouseSummary

export type ConnPingResult =
| {
success: true
}
| { success: false; error: Error }

export type ConnOperation = 'Ping' | 'Query' | 'Insert' | 'Exec'
export type ConnOperation = 'Ping' | 'Query' | 'Insert' | 'Exec' | 'Command'

export interface Connection<Stream> {
ping(): Promise<ConnPingResult>
close(): Promise<void>
query(params: ConnBaseQueryParams): Promise<ConnQueryResult<Stream>>
exec(params: ConnBaseQueryParams): Promise<ConnExecResult<Stream>>
insert(params: ConnInsertParams<Stream>): Promise<ConnInsertResult>
exec(params: ConnBaseQueryParams): Promise<ConnExecResult<Stream>>
command(params: ConnBaseQueryParams): Promise<ConnCommandResult>
}
1 change: 1 addition & 0 deletions packages/client-common/src/index.ts
Expand Up @@ -91,6 +91,7 @@ export type {
ConnBaseResult,
ConnInsertParams,
ConnPingResult,
ConnCommandResult,
ConnOperation,
} from './connection'
export type { QueryParamsWithFormat } from './client'
Expand Down
61 changes: 61 additions & 0 deletions packages/client-node/__tests__/unit/node_connection.test.ts
Expand Up @@ -171,6 +171,67 @@ describe('[Node.js] Connection', () => {
expect(url.search).toContain(`?query_id=${query_id}`)
})

it('should generate random query_id for every command request', async () => {
const adapter = buildHttpConnection({
compression: {
decompress_response: false,
compress_request: false,
},
})

const httpRequestStub = spyOn(Http, 'request')

const request1 = stubClientRequest()
httpRequestStub.and.returnValue(request1)

const cmdPromise = adapter.command({
query: 'SELECT * FROM system.numbers LIMIT 5',
})
emitResponseBody(request1, 'Ok.')
const { query_id } = await cmdPromise

const request2 = stubClientRequest()
httpRequestStub.and.returnValue(request2)

const cmdPromise2 = adapter.command({
query: 'SELECT * FROM system.numbers LIMIT 5',
})
emitResponseBody(request2, 'Ok.')
const { query_id: query_id2 } = await cmdPromise2

expect(query_id).not.toEqual(query_id2)
const [url1] = httpRequestStub.calls.all()[0].args
expect(url1.search).toContain(`?query_id=${query_id}`)
const [url2] = httpRequestStub.calls.all()[1].args
expect(url2.search).toContain(`?query_id=${query_id2}`)
})

it('should use provided query_id for command', async () => {
const adapter = buildHttpConnection({
compression: {
decompress_response: false,
compress_request: false,
},
})

const httpRequestStub = spyOn(Http, 'request')
const request = stubClientRequest()
httpRequestStub.and.returnValue(request)

const query_id = guid()
const cmdPromise = adapter.command({
query: 'SELECT * FROM system.numbers LIMIT 5',
query_id,
})
emitResponseBody(request, 'Ok.')
const { query_id: result_query_id } = await cmdPromise

expect(httpRequestStub).toHaveBeenCalledTimes(1)
const [url] = httpRequestStub.calls.mostRecent().args
expect(url.search).toContain(`?query_id=${query_id}`)
expect(query_id).toEqual(result_query_id)
})

it('should generate random query_id for every insert request', async () => {
const adapter = buildHttpConnection({
compression: {
Expand Down
3 changes: 0 additions & 3 deletions packages/client-node/src/config.ts
Expand Up @@ -103,7 +103,4 @@ export const NodeConfigImpl: Required<
format: DataFormat,
query_id: string,
) => new ResultSet(stream, format, query_id)) as any,
close_stream: async (stream) => {
stream.destroy()
},
}

0 comments on commit ddcc741

Please sign in to comment.