Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to implement a through stream #527

Open
msageryd opened this issue Sep 26, 2023 · 20 comments
Open

How to implement a through stream #527

msageryd opened this issue Sep 26, 2023 · 20 comments

Comments

@msageryd
Copy link

Streams are sometimes hard to grasp for me. The various versions in Node doesn't make it easier.

It seems lika a good idea to use redable-stream so I can at least have a solid ground to build knowledge upon, instead of an ever changing landscape.

I'm reading a stream of binary files (stdout from a child process), the files are delimited with a delimiter. I'm looking at each chunk when it arrives and either just write it to my outputStream or create a new outputStream if a file delimiter is discovered. After that , each new stream needs to be cloned (using cloneable-readable) and piped to different locations.

Originally I had an "outputStreamFactory" which created a writableStream and piped it to it's destination. The streamFactory was used to create a new stream everytime a file delimiter was discovered. This does not work anymore, since I need to pipe the stream again (cannot pipe a writable stream).

Q1: should I use a through stream for this?
Q2: readable-stream does not have a through-stream. Should I build one from a transform stream?
Q3: in that case, how should I build a through stream safely from a transform stream?
Q4: or should I use thgrough2, which is probably not using readable-stream under the hood

@benjamingr
Copy link
Member

benjamingr commented Sep 26, 2023

Not sure I understand you mean something like (no packages, this is pseudocode not an implementation just to understand the ask):

myStream.compose(async function*(s) {
  let currentReadable = new Readable();
  for await (const chunk of s) {
    let delimiter = chunk.indexOf("\n");
    if (delimiter !== -1) {
      currentReadable.push(chunk.slice(0, currentReadable));
      yield currentReadable();
      currentReadable = new Readable();
      currentReadable.push(chunk.slice(delimiter));
    } else {
      currentReadable.push(chunk);
    }
  }
  yield currentReadable();
}).forEach((readable) => {
  readable.pipe(somewhereDependingOnWhateverLogicYouHave);
});

@msageryd
Copy link
Author

msageryd commented Sep 26, 2023

Almost.

I don't want to mix my "WhateverLogicYouHave" with this particular code. Instead I'm supplying a handleOutputStream function which willl be called at the start and after every new file, i.e. after each file delimiter. The clone- and pipe logic will be handled within handleOutputStream.

I'm not sure if it's necessary, but in order to be able to write to outputStream as well as piping it to other places I made it a through stream.

The Through stream looks like this:

class MyTrough extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(chunk, encoding, cb) {
    //No transformation, just pass through data
    this.push(chunk);
    cb();
  }
}

Also, I'm not awaiting anything. Instead I'm using the data event like this:

child.stdout.on('data', (chunk) => {
    const delimiterIndex = chunk.indexOf(fileDelimiter, 0, 'binary');
    if (delimiterIndex > -1) {
      outputStream.end();
      fileIndex++;
      outputStream = new MyThrough();
      handleOutputStream({ fileIndex, outputStream });
       chunk = chunk.slice(
        delimiterIndex + fileDelimiter.length + 1,
        chunk.length
      );
    }
    outputStream.write(chunk);
});

@vweevers
Copy link
Contributor

or should I use thgrough2, which is probably not using readable-stream under the hood

through2 has no reason to exist anymore. It does use readable-stream but it's outdated and merely sugar. There's now an almost as elegant solution:

const { Transform} = require('readable-stream')

const stream = new Transform({
  transform (chunk, encoding, cb) {
    // ..
  }
})

Which is equivalent to:

const stream = through2(function (chunk, encoding, cb) {
  // ..
})

@msageryd
Copy link
Author

Thanks @vweevers
It seems like I almost managed to build a Through stream by myself.

I do wondet what your // .. represents. Do you mean that I don't even need to push the chunks when they arrive? (se my code above)

@vweevers
Copy link
Contributor

I do wondet what your // .. represents.

I just intended to show that the body of the function is equal between the two code snippets. Fully written out, it's:

const { Transform } = require('readable-stream')

const stream = new Transform({
  transform (chunk, encoding, cb) {
    cb(null, chunk)
  }
})

If you need exactly that, then you can also use the PassThrough utility:

const { PassThrough } = require('readable-stream')

const stream = new PassThrough()

@benjamingr
Copy link
Member

benjamingr commented Sep 26, 2023

I tested the code and fixed it, here is a "native" solution, can be simplified much further but has the advantage of only reading the "files" until needed.

const { Readable, PassThrough } = require("stream");

const arr = Uint8Array.from([
    ...(Array(10000).fill(1)),
    10,
    ...(Array(10000).fill(2)),
    10,
    ...(Array(10000).fill(3))
]);

let streams = Readable.from([arr, arr], { objectMode: false }).compose(async function* (stream) {
    let currentReadable = new PassThrough();
    for await (let chunk of stream) {
        let delimiter = chunk.indexOf(10);
        let found = delimiter !== -1;
        while (delimiter !== -1) {
            currentReadable.push(chunk.slice(0, delimiter)); 
            yield currentReadable;
            chunk = chunk.slice(delimiter + 1);
            currentReadable = new PassThrough();
            currentReadable.push(chunk);
            delimiter = chunk.indexOf("\n");
        }
        if (!found) {
            currentReadable.push(chunk);
        }
    }
    yield currentReadable;
});

// sometimes later
streams.forEach(async s => {
    console.log("Got stream, do whatever with it, pass it to wherever");
});

This can be made faster and the while loop can likely be simplified a lot.

@msageryd
Copy link
Author

@benjamingr
Wow, that is so awesome. I couldn't have dreamt up that code. Just as I thought I had a little grasp on streams I realise I have a long way to go. Compose seems like a powerful tool. I need to read up..

As I'm only looking for a specific file delimiter I switched your last chunk.indexOf("\n") to a delimiter-search. I also added delimiter.length to to the second slice, chunk = chunk.slice(delimiterIndex + fileDelimiter.length + 1).

Other then that, the code works great. Next step is to try to clone those streams. I had trouble with this last I tried (mcollina/cloneable-readable#44).

@vweevers Thank you for pointing out that Through is available as a util in readable-stream. I didn't find this in the docs, so I assumed it was not there.

@benjamingr
Copy link
Member

We are creating these new streams in our "stream of streams" why do you need to further clone them?

@msageryd
Copy link
Author

Each stream is a separate file.
Each file needs to be streamed through different transforms and end up in different places.

Example:

delimitedFileStream -> file1 -> S3
                             -> resizeThumbnail -> localDisk
                                                -> S3

                    -> file2 -> S3
                             -> resizeThumbnail -> localDisk
                                                -> S3

The two files extracted from delimitedFileStream will end up in three writeStreams each.

Resizing is performed with Sharp (https://github.com/lovell/sharp).

@msageryd
Copy link
Author

msageryd commented Sep 27, 2023

Is there any "best practice" on how to get hold of and marchal error code out of a stream generator as above?

My original stream is stdout from a child process. I need to capture both the exit code and stderr from the child process and somehow marchal these to the caller.

I tried to wrap the code within another promise, but I don't think child.close will ever be reached since the streames are not started at this stage.

async function pdfiumCommand({ command, input, options = {} }) {
  try {
    return new Promise((resolve, reject) => {
      const child = spawnPdfiumProcess({ command, input, options });

      let fileStreams = child.stdout.compose(async function* (stream) {
        let currentReadable = new PassThrough();
        for await (let chunk of stream) {
          let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
          while (delimiterIndex !== -1) {
            currentReadable.push(chunk.slice(0, delimiterIndex));
            yield currentReadable;

            chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
            currentReadable = new PassThrough();
            currentReadable.push(chunk);
            delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
          }
          if (delimiterIndex === -1) {
            currentReadable.push(chunk);
          }
        }
        yield currentReadable;
      });

      let stderr = '';
      // child.stderr.on('data', (data) => {
      //   stderr += data.toString();
      // });

      child.on('close', (code) => {
        resolve({
          fileStreams,
          stderr,
          code,
        });
      });
    });
  } catch (e) {
    reject(new Error(code || 'No error code returned from "pdfium"'));
  }
}

edit:
I see that my try-catch is out of alignment due to bad refactoring, but the point of this post is clear anyway, I think.

@msageryd
Copy link
Author

Frustrating.. I'm not able to catch my error. I probably lack understanding of error handling with generators and/or streams.

My last attempt was to throw an error if the exit code from the child process is not 0.

async function pdfiumCommand({ command, input, options = {} }) {
  const child = spawnPdfiumProcess({ command, input, options });

  let fileStreams = child.stdout.compose(async function* (stream) {
    let currentReadable = new PassThrough();
    for await (let chunk of stream) {
      let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      while (delimiterIndex !== -1) {
        currentReadable.push(chunk.slice(0, delimiterIndex));
        yield currentReadable;

        chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
        currentReadable = new PassThrough();
        currentReadable.push(chunk);
        delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      }
      if (delimiterIndex === -1) {
        currentReadable.push(chunk);
      }
    }
    yield currentReadable;
  });

  child.on('close', (code) => {
    if (code !== 0) {
      throw new Error('PDFium error: ' + code);
    }
  });

  return fileStreams;
}

My application crashes and I can see the correct code in the crash log. But I'm not able to catch the error. I even tried pump (https://www.npmjs.com/package/pump) in the hope to catch the error in the callback, but I cant even get my "Pump finished" message when there are no errors.

const fileStreams = await pdfiumExplode2ToStream({
  input,
  options,
});

let i = 0;
fileStreams.forEach(async (fileStream) => {
  i++;
  console.log('Processing file ' + i);
  const outputPath = `../test/tmp`;
  const outputPdf = fs.createWriteStream(`${outputPath}/file_${i}.pdf`);

  try {
    pump(fileStream, outputPdf, function (err) {
      if (err) {
        console.log('error from pump callback');
        console.log(err);
      } else {
        console.log('Pump finished');
      }
    });
  } catch (err) {
    console.log('error from try-catch');
    console.log(err);
  }
});

@benjamingr
Copy link
Member

compose creates a stream, my code returns a stream of streams (they're already copied), there is no need to wrap it in new Promise you can for await it directly

@msageryd
Copy link
Author

I understood that. I just tried to solve my error reporting problem with an outer promise. This was a bad idea. I also tried to throw an error (my last post), but I can't find any way to catch this error.

@benjamingr
Copy link
Member

How is your child process signaling it had an error? Does it terminate with a different exit code? Write to stderr? Processes don't uniformly distinguish "I exited early" from "I exited with an error" from "I finished"

@benjamingr
Copy link
Member

Anyway you can .destroy(err) your fileStreams based on that and similarly propagate it to the sub-per-file-streams if you need to

@msageryd
Copy link
Author

Im getting both a non zero exit code and a message on stderr if something goes wrong in the child process. But I'm not able to propagate the error up in the chain.

I tried this, but it didn't work:

  child.on('close', (code) => {
    if (code !== 0) {
      fileStreams.destroy(new Error('PDFium error: ' + code));
    }
  });

@msageryd
Copy link
Author

I also tried to destroy each inner stream.

  child.on('close', (code) => {
    if (code !== 0) {
      fileStreams.forEach(async (fileStream) => {
        fileStream.destroy(new Error('PDFium error: ' + code));
      });

      fileStreams.destroy(new Error('PDFium error: ' + code));
    }
  });

@msageryd
Copy link
Author

I found one (ugly) solution. This is not a neat solution, but at least I can get hold of the error message from stderr.

The first stream in the composed "bundle" is now stderr. I'm sure there must be better ways to solve this.

  let errorStream = new PassThrough();

  let fileStreams = child.stdout.compose(async function* (stream) {
    yield errorStream;
    let currentReadable = new PassThrough();

    for await (let chunk of stream) {
      let delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      while (delimiterIndex !== -1) {
        currentReadable.push(chunk.slice(0, delimiterIndex));
        yield currentReadable;

        chunk = chunk.slice(delimiterIndex + FILE_DELIMITER.length + 1);
        currentReadable = new PassThrough();
        currentReadable.push(chunk);
        delimiterIndex = chunk.indexOf(FILE_DELIMITER, 0, 'binary');
      }
      if (delimiterIndex === -1) {
        currentReadable.push(chunk);
      }
    }
    yield currentReadable;
  });

  child.stderr.on('data', (data) => {
    errorStream.push(data);
  });

@msageryd
Copy link
Author

msageryd commented Sep 27, 2023

@benjamingr
I don't understand what's going on. It would be awesome if I could use your concept, but something is fishy. I suspect that the streams starts too early so they might have ended before I try to use them.

In my example I have only one file in my componsed file streams.

The following works fine:

jpgFileStream.forEach(async (fileStream) => {
  const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
  fileStream.pipe(outFile);
});

As soon as I introduce a transform stream (sharp) I get empty files in the output.

jpgFileStream.forEach(async (fileStream) => {
  const outFile = fs.createWriteStream(`../test/tmp/fromGenerator.jpg`);
  const sharpPipeline = sharp().rotate(90);
  fileStream.pipe(sharpPipeline).pipe(outFile);
});

Sometimes I get "cut off" jpeg files like this, which leads me to believe that the streams are already started:
image

The sharpPipeline works fine if I get the input directly from a readable filestream.

const sharpPipeline = sharp().rotate(90);
const inFile = fs.createReadStream('../test/resources/test.jpg');
const outFile = fs.createWriteStream(`../test/tmp/fromLocalDisk.jpg`);
inFile.pipe(sharpPipeline).pipe(outFile);

@msageryd
Copy link
Author

msageryd commented Sep 28, 2023

Update:
This has nothing to do with sharp.

Whenever my childprocess takes a bit longer to produce the stream (i.e. more complicated processing), the destination writeStream seems to end prematurely, i.e I get empty or sometimes half files written to disk.

It's almost as if the for await (let chunk of stream) loop does not actually await the chunks from stdout.

I went back to my original solution and tried the exact same files and file processing. It works great. The main difference is that I provide a handleOutputStream function which is called after each new troughStream is created, i.e. I handle each stream immediately instead of waiting for them to become a bunch of streams.

I suspect that this packaging och streams in streams either has some design problem or maybe some stream bug. Compose is still marked as experimental in the docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants