Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to combine stream with async await? #1787

Open
phen0menon opened this issue Mar 19, 2024 · 1 comment
Open

How to combine stream with async await? #1787

phen0menon opened this issue Mar 19, 2024 · 1 comment

Comments

@phen0menon
Copy link

phen0menon commented Mar 19, 2024

I'm looking for a way to combine WritableStream with async await. I saw some old issues in this repository related to this problem and didn't find any relevant solution.

What am I trying to do:

  1. Get file stream from S3
  2. Parse file with streams using htmlparser2
  3. During parsing, insert chunks of data to Kafka

Example of code is following:

const readable = getFileStreamFromS3()

let buffer = []

const parser = new WStream(
    {
        onopentag(name, attributes) {
            // ... collect data on open tag
        },

        ontext(text) {
            // collect text on text
        },

        async onclosetag(tagname) {
            buffer.push(data)
            if (buffer.length >= 100) {
                await pushToKafka(buffer)
                buffer = []
            }
        },
    },
    { xmlMode: true },
)

return new Promise(resolve => readable.pipe(writableStream).on('finish', resolve))

However, it doesn't seem to work. Any ideas on how to implement this feature?

@himavamsi12
Copy link

To combine a writable stream with async/await, you need to handle the asynchronous operations within the write method of the writable stream. However, the current approach you've shown is attempting to use async/await inside an event handler, which won't work as expected. Instead, you should handle the asynchronous operation inside the write method itself. Here's how you can refactor your code to achieve this:

const { Writable } = require('stream');
const { getFileStreamFromS3, pushToKafka } = require('./your-utils');

const readable = getFileStreamFromS3();

const parser = new Writable({
    write(chunk, encoding, callback) {
        // Parse chunk using htmlparser2
        // For example, assuming parseChunk is a function that parses the chunk
        parseChunk(chunk).then(async (parsedData) => {
            // Insert parsed data into Kafka
            await pushToKafka(parsedData);
            callback(); // Call the callback to indicate that the chunk has been processed
        }).catch(err => {
            callback(err); // If an error occurs, pass it to the callback
        });
    }
});

// Pipe the readable stream to the parser
readable.pipe(parser);

// Handle finish event to resolve the promise when parsing is done
return new Promise((resolve, reject) => {
    parser.on('finish', resolve);
    parser.on('error', reject); // Handle any errors that occur during parsing
});

Is it works let me know?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants