Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receiving [ERR_STREAM_PREMATURE_CLOSE] in the logs when intentionally closing the ResultSet stream before end #263

Closed
spinlud opened this issue May 6, 2024 · 6 comments
Labels
bug Something isn't working tech debt

Comments

@spinlud
Copy link

spinlud commented May 6, 2024

Describe the bug

I have a stream generated from a ResultSet of a query. When consuming the stream, I need to end it prematurely if a certain condition is met:

import {pipeline} from 'stream/promises';
import {createWriteStream} from 'fs';
import {createClient} from '@clickhouse/client';

async function* myAsyncGenerator() {
	const clickHouseClient = createClient({
		// ...
	});

	const resultSet = await clickHouseClient.query({		
		query: `select * from my_table limit 10`,
		format: 'JSONCompactStringsEachRowWithNamesAndTypes',
	});

	const stream = resultSet.stream();

	let i = 0;

	for await (const rows of stream) {
		for (const row of rows) {
			i++;
			let jsonRow = await row.json();

			// If a condition is true, I want to stop consuming the stream prematurely
			if (i === 3) {
				return stream.destroy();
			}

			yield jsonRow.join(',') + '\n\n';
		}
	}
}

(async () => {
	const asyncGen = myAsyncGenerator();
	const outputStream = createWriteStream('output.txt');
	await pipeline(asyncGen, outputStream);
})();

This code terminates successfully (exit code 0) but it logs the following[ERR_STREAM_PREMATURE_CLOSE] error on the console:

image

Expected behaviour

I could be wrong, but this [ERR_STREAM_PREMATURE_CLOSE] error doesn't seem to originate on the stream object received from the call to the database, I suspect instead that this can orginate from another stream that is writing on this one (e.g. a stream from the socket who is writing the data?).

In this case I am voluntarily closing the stream before consuming all the data, so I don't want to see this [ERR_STREAM_PREMATURE_CLOSE] logged on the console. Also it doesn't seem to be an error, but just the log of an error, because the program terminates with success (exit code 0).

Is there any way to prevent this [ERR_STREAM_PREMATURE_CLOSE] error log to appear in the console?

Error log

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at new NodeError (node:internal/errors:405:5)
    at Transform.onclose (node:internal/streams/end-of-stream:159:30)
    at Transform.emit (node:events:529:35)
    at Transform.emit (node:domain:489:12)
    at emitCloseNT (node:internal/streams/destroy:132:10)
    at processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}

Process finished with exit code 0

Configuration

Environment

  • Client version: 0.2.7
  • Language version: Node v18.18.0
  • OS: Darwin arm64

ClickHouse server

  • ClickHouse Server version: 24.2.2.16100
@spinlud spinlud added the bug Something isn't working label May 6, 2024
@slvrtrn
Copy link
Contributor

slvrtrn commented May 6, 2024

If the desired logic cannot be implemented via the ClickHouse query, you could just use AbortController.

import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'

async function* myAsyncGenerator() {
  const clickHouseClient = createClient({
    // ...
  })

  const abortController = new AbortController()
  const resultSet = await clickHouseClient.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  for await (const rows of resultSet.stream()) {
    for (const row of rows) {
      i++

      // If a condition is true, I want to stop consuming the stream prematurely
      if (i === 5) {
        console.log('aborting')
        abortController.abort()
        break
      }

      const result = row.json<string[]>().join(',') + '\n\n'
      console.log('Yielding row', result)
      yield result
    }
  }
}
;(async () => {
  const asyncGen = myAsyncGenerator()
  const outputStream = createWriteStream('output.txt')
  await pipeline(asyncGen, outputStream)
})()

console output:

➜  examples git:(main) ✗ ts-node node/test.ts
Yielding row number


Yielding row UInt64


Yielding row 0


Yielding row 1


aborting

The output.txt file contents:

image

NB: You might want to add https://clickhouse.com/docs/en/operations/settings/settings#cancel-http-readonly-queries-on-client-close. Also, see the abort_request example.

@spinlud
Copy link
Author

spinlud commented May 8, 2024

Hi, thanks for the help!

I have tried your solution using the AbortController, but got this error instead:

image

Also I have noticed that you are breaking only the inner loop when aborting, but not the outer loop. Is any specific reason for not breaking also the outer loop?

@slvrtrn
Copy link
Contributor

slvrtrn commented May 8, 2024

You are right. I tried adjusting the asyncGenerator approach again, which was not trivial. Maybe something like this will do the trick? Please notice the backpressure comment — you'll likely need to handle this with such a manual approach.

import { createWriteStream } from 'fs'
import { createClient, ClickHouseLogLevel } from '@clickhouse/client'

(async () => {
  process.on('unhandledRejection', (err) => {
    console.error('unhandledRejection:', err)
  })
  process.on('uncaughtException', (err) => {
    console.error('uncaughtException:', err)
  })

  const client = createClient({
    log: {
      // level: ClickHouseLogLevel.TRACE,
    },
  })

  const abortController = new AbortController()
  const resultSet = await client.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  const stream = resultSet.stream<string[]>()
  const outputStream = createWriteStream('output.txt')

  await new Promise((resolve, reject) => {
    stream
      .on('data', (rows) => {
        for (const row of rows) {
          if (i++ === 5) {
            console.log('Reached the condition, ending stream...')
            abortController.abort()
            return
          }
          const result = row.json().join(',') + '\n\n'
          console.log('Yielding row', result)
          // important: need to add backpressure handling here
          outputStream.write(result)
        }
      })
      .on('error', (err) => {
        console.error('Error in stream', err)
        reject(err)
      })
      .on('end', () => {
        console.log('End of stream, resolve...')
        resolve(0)
      })
  })

  console.log('Closing client...')
  await client.close()
})()

Prints:

Yielding row number


Yielding row UInt64


Yielding row 0


Yielding row 1


Yielding row 2


Reached the condition, ending stream...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was released
End of stream, resolve...
Closing client...
[2024-05-08T14:37:53.701Z][TRACE][@clickhouse/client][Connection] Socket 0ae5c20c-868e-471c-8b2f-43f7a2ffb68e was closed or ended, 'free' listener removed

This output looks right to me - the socket is properly released on the abort event and is back in the keep-alive pool, and it is only destroyed when we close the client.

@slvrtrn
Copy link
Contributor

slvrtrn commented May 8, 2024

And with just 1 socket and the second consecutive request, all seems OK, too.

import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'
import { ClickHouseLogLevel } from '@clickhouse/client-common'

;(async () => {
  process.on('unhandledRejection', (err) => {
    console.error('unhandledRejection:', err)
  })
  process.on('uncaughtException', (err) => {
    console.error('uncaughtException:', err)
  })

  const client = createClient({
    max_open_connections: 1,
    log: {
      level: ClickHouseLogLevel.TRACE,
    },
  })

  const abortController = new AbortController()
  const resultSet = await client.query({
    query: `select * from system.numbers limit 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
    abort_signal: abortController.signal,
  })

  let i = 0
  const stream = resultSet.stream<string[]>()
  const outputStream = createWriteStream('output.txt')

  await new Promise((resolve, reject) => {
    stream
      .on('data', (rows) => {
        for (const row of rows) {
          if (i++ === 5) {
            console.log('Reached the condition, ending stream...')
            abortController.abort()
            return
          }
          const result = row.json().join(',') + '\n\n'
          console.log('Yielding row', result)
          // important: need to add backpressure handling here
          outputStream.write(result)
        }
      })
      .on('error', (err) => {
        console.error('Error in stream', err)
        reject(err)
      })
      .on('end', () => {
        console.log('End of stream, resolve...')
        resolve(0)
      })
  })

  const rs = await client.query({
    query: 'SELECT 1 AS number',
    format: 'JSONEachRow',
  })
  console.log(
    'Verifying that we can query using the same socket one more time...',
    await rs.json(),
  )

  console.log('Closing client...')
  await client.close()
})()
Yielding row number


Yielding row UInt64


Yielding row 0


Yielding row 1


Yielding row 2


Reached the condition, ending stream...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
End of stream, resolve...
[2024-05-08T14:47:24.767Z][TRACE][@clickhouse/client][Connection] Reusing socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3
...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was released
Verifying that we can query using the same socket one more time... [ { number: 1 } ]
Closing client...
[2024-05-08T14:47:24.774Z][TRACE][@clickhouse/client][Connection] Socket 1bbbdc08-9d1d-466e-8248-a2d2c62cffc3 was closed or ended, 'free' listener removed

@slvrtrn
Copy link
Contributor

slvrtrn commented May 8, 2024

I think I understand what is happening now.

Due to the query being in the async generator function body, an unfinished request goes out of scope, and so does the AbortController that is used internally to cancel the request on errors in the Node.js connection src.

That internal AbortController is used to prevent the underlying sockets from being stuck while dialing an unreachable host, which can happen even if the request was timed out (and that was the only sensible solution I could find to that issue).

A fun fact about the AbortController is that it fires the abort signal when it goes out of scope. This also explains why my stream example does not produce any errors, as there is no extra function there (but I believe it is still incorrect), so the query (and the request) does not go out of scope.

Then, the error is printed here.

I will check if I can make it less annoying and more transparent to the user so that just the ResultSet.close method can be called, and that's it, with proper discard of the response stream while keeping the socket and without unnecessary error messages in the console, as the last part is very outdated and is basically a tech debt.

@slvrtrn
Copy link
Contributor

slvrtrn commented May 20, 2024

Should be fixed in 1.0.2.

import { pipeline } from 'stream/promises'
import { createWriteStream } from 'fs'
import { createClient } from '@clickhouse/client'

async function* myAsyncGenerator() {
  const clickHouseClient = createClient({
    // ...
  })

  const resultSet = await clickHouseClient.query({
    query: `SELECT * FROM system.numbers LIMIT 10`,
    format: 'JSONCompactStringsEachRowWithNamesAndTypes',
  })

  const stream = resultSet.stream()

  let i = 0

  for await (const rows of stream) {
    for (const row of rows) {
      // If a condition is true, I want to stop consuming the stream prematurely
      if (i++ === 5) {
        return stream.destroy()
      }

      yield row.json<string[]>().join(',') + '\n'
    }
  }
}

;(async () => {
  const asyncGen = myAsyncGenerator()
  const outputStream = createWriteStream('output.txt')
  await pipeline(asyncGen, outputStream)
})()

Yields the following file:

number
UInt64
0
1
2

Without unnecessary errors in the logs.

Please feel free to re-open or create a new one if there are still any issues.

@slvrtrn slvrtrn closed this as completed May 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working tech debt
Projects
None yet
Development

No branches or pull requests

2 participants