Skip to content
This repository has been archived by the owner on Oct 31, 2018. It is now read-only.

how to do asynchronous, secondary reads from files and databases #27

Open
loveencounterflow opened this issue Oct 18, 2014 · 2 comments

Comments

@loveencounterflow
Copy link

i have the following problem: i read in keys from a (level) DB; upon each key i see, i want to open another readstream where i look for more data related to the first key and write that back into the stream. my data collection is rather big so certainly there are many chunks of raw bytes read and written before one run is finished. this tends to hide the obnoxious fact that, sometimes, some pieces of data would appear to get lost in the process.

i've written a library called Pipedreams to help me abstract away the dirty details of stream handling; it mainly relies on through to get its stuff done. i've been building up and using that library for a couple months now, and since i knew even less about streams back when i started, it sure does contain bugs (when you compare the code of individual functions in that library, you can see the different strata of comprehension / skill i had at my disposal when writing that function. time for an overhaul i guess).

i have illustrated my specific use case over at https://gist.github.com/loveencounterflow/65fd8ec711cf78950aa0; i'm using short files there instead of levelDB, but the problem remains the same as far as i can see.

as i see it, the problem seems to be that when i open a readstream and pipe it through transformers, everything goes fine as long as the transformers are all synchronous. but when one transformer has to do something asynchronous—like openening a file to read lines—then its results may come late to the pipe's destination stream. it is as if the end or close event of the source stream is passed through to the destination which then closes, to, letting some data stranded. the gist's output clearly shows that all processing is indeed performed, but it never reaches its destination. i suspect that if the sample files were much longer (so they'd cause multiple chunks to be emitted), the earlier processing results would indeed show up at the destination.

we all now that Node won't exit by itself as long as there's still something on the event loop, so you can use setTimeout( fn, 1e6 ) to force the process not to terminate. i tried setImmediate to make data passing 'more asynchronous' (you see i'm guessing here—what does it really mean that through is 'synchronous'—does that mean it is inherently unsuited for asynchronous tasks? do i have to use ES.readable instead?), and indeed with that trick i managed to get some (but crucially not all) data items through.

from the experience my intense dabbling with NodeJS streams for the better part of this year i'd say that when i open a readstream and call s.pause() on it (say from within a piped transformer), then that should make NodeJS hand indefinitely until i call s.resume()—otherwise i haven't paused at all. that seemingly does not work; also, you have to be careful where to put the pause call to avoid getting a unable to get back to old streams mode at this point.

i should be very grateful if some knowledgeable people could fill me out on my doubts, misunderstandings, and lacunae. streams are great!

@dominictarr
Copy link
Owner

It sounds like you might want something like this: https://www.npmjs.org/package/map-stream

Also, you could build your own with through. The simplest way to do this with through is to keep a count of how many chunks you have on the fly, and don't end the stream if there are some things unfinished. This won't guarantee they come out in order, but they will come out (unless someone call's their callback twice - but you can detect that with a similar strategy)

Alternatively, use through2. it's the same idea as through but using streams 2. the only different is that the data/end handlers callsback when they are done. You can create this effect using pause and resume, but that requires the upstream stream to be well behaved (which through streams are by default, but not all classic streams are). But to be honest, through2 is simpler for this. It will ensure that the data coming out is in order, too, which may be what you want.

The first approach will process every chunk in parallel, the second in serial.

@loveencounterflow
Copy link
Author

thanks a lot for your recommendations, which sort of mesh well with what i've found out so far. i've already dabbled with through2 which looks very interesting to me.

on a related note i've just found out that an ordinary fs readstream gets terminated prematurely when all i do is peeking into the data using a function built with event-stream's map; this got corrected when i replaced a map-based solution with one using event-stream's through. not sure why there should be this difference, but surely it would be great if such pitfalls, if resulting from a design decision, could be mentioned in the docs. right now i'm suspecting map may be the main / only culprit in my pipes.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants