Skip to content

Commit

Permalink
extract is also an async iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Jun 17, 2023
1 parent b9f75f2 commit 7525b47
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 20 deletions.
51 changes: 33 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ If you want to pack/unpack directories on the file system check out [tar-fs](htt
To create a pack stream use `tar.pack()` and call `pack.entry(header, [callback])` to add tar entries.

``` js
var tar = require('tar-stream')
var pack = tar.pack() // pack is a stream
const tar = require('tar-stream')
const pack = tar.pack() // pack is a stream

// add a file called my-test.txt with the content "Hello World!"
pack.entry({ name: 'my-test.txt' }, 'Hello World!')

// add a file called my-stream-test.txt from a stream
var entry = pack.entry({ name: 'my-stream-test.txt', size: 11 }, function(err) {
const entry = pack.entry({ name: 'my-stream-test.txt', size: 11 }, function(err) {
// the stream was added
// no more entries
pack.finalize()
Expand All @@ -54,21 +54,21 @@ pack.pipe(process.stdout)
To extract a stream use `tar.extract()` and listen for `extract.on('entry', (header, stream, next) )`

``` js
var extract = tar.extract()
const extract = tar.extract()

extract.on('entry', function(header, stream, next) {
extract.on('entry', function (header, stream, next) {
// header is the tar header
// stream is the content body (might be an empty stream)
// call next when you are done with this entry

stream.on('end', function() {
stream.on('end', function () {
next() // ready for next entry
})

stream.resume() // just auto drain the stream
})

extract.on('finish', function() {
extract.on('finish', function () {
// all entries read
})

Expand All @@ -77,6 +77,21 @@ pack.pipe(extract)

The tar archive is streamed sequentially, meaning you **must** drain each entry's stream as you get them or else the main extract stream will receive backpressure and stop reading.

## Extracting as an async iterator

The extraction stream in addition to being a writable stream is also an async iterator

``` js
const extract = tar.extract()

someStream.pipe(extract)

for await (const entry of extract) {
entry.header // the tar header
entry.resume() // the entry is the stream also
}
```

## Headers

The header object using in `entry` should contain the following properties.
Expand Down Expand Up @@ -106,18 +121,18 @@ Most of these values can be found by stat'ing a file.
Using tar-stream it is easy to rewrite paths / change modes etc in an existing tarball.

``` js
var extract = tar.extract()
var pack = tar.pack()
var path = require('path')
const extract = tar.extract()
const pack = tar.pack()
const path = require('path')

extract.on('entry', function(header, stream, callback) {
extract.on('entry', function (header, stream, callback) {
// let's prefix all names with 'tmp'
header.name = path.join('tmp', header.name)
// write the new entry to the pack stream
stream.pipe(pack.entry(header, callback))
})

extract.on('finish', function() {
extract.on('finish', function () {
// all entries done - lets finalize it
pack.finalize()
})
Expand All @@ -133,15 +148,15 @@ pack.pipe(newTarballStream)


``` js
var fs = require('fs')
var tar = require('tar-stream')
const fs = require('fs')
const tar = require('tar-stream')

var pack = tar.pack() // pack is a stream
var path = 'YourTarBall.tar'
var yourTarball = fs.createWriteStream(path)
const pack = tar.pack() // pack is a stream
const path = 'YourTarBall.tar'
const yourTarball = fs.createWriteStream(path)

// add a file called YourFile.txt with the content "Hello World!"
pack.entry({name: 'YourFile.txt'}, 'Hello World!', function (err) {
pack.entry({ name: 'YourFile.txt' }, 'Hello World!', function (err) {
if (err) throw err
pack.finalize()
})
Expand Down
98 changes: 96 additions & 2 deletions extract.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ class Extract extends Writable {
this._stream = null
this._missing = 0
this._longHeader = false
this._callback = null
this._callback = noop
this._locked = false
this._finished = false
this._pax = null
this._paxGlobal = null
this._gnuLongPath = null
Expand Down Expand Up @@ -286,7 +287,8 @@ class Extract extends Writable {
}

_final (cb) {
cb(this._missing > 0 || this._buffer.buffered > 0 ? new Error('Unexpected end of data') : null)
this._finished = this._missing === 0 && this._buffer.buffered === 0
cb(this._finished ? null : new Error('Unexpected end of data'))
}

_predestroy () {
Expand All @@ -297,6 +299,98 @@ class Extract extends Writable {
if (this._stream) this._stream.destroy(getStreamError(this))
cb(null)
}

[Symbol.asyncIterator] () {
let error = null

let promiseResolve = null
let promiseReject = null

let entryStream = null
let entryCallback = null

const extract = this

this.on('entry', onentry)
this.on('error', (err) => { error = err })
this.on('close', onclose)

return {
[Symbol.asyncIterator] () {
return this
},
next () {
return new Promise(onnext)
},
return () {
return destroy(null)
},
throw (err) {
return destroy(err)
}
}

function consumeCallback (err) {
if (!entryCallback) return
const cb = entryCallback
entryCallback = null
cb(err)
}

function onnext (resolve, reject) {
if (error) {
return reject(error)
}

if (entryStream) {
resolve({ value: entryStream, done: false })
entryStream = null
return
}

promiseResolve = resolve
promiseReject = reject

consumeCallback(null)

if (extract._finished && promiseResolve) {
promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}
}

function onentry (header, stream, callback) {
entryCallback = callback
stream.on('error', noop) // no way around this due to tick sillyness

if (promiseResolve) {
promiseResolve({ value: stream, done: false })
promiseResolve = promiseReject = null
} else {
entryStream = stream
}
}

function onclose () {
consumeCallback(error)
if (!promiseResolve) return
if (error) promiseReject(error)
else promiseResolve({ value: undefined, done: true })
promiseResolve = promiseReject = null
}

function destroy (err) {
extract.destroy(err)
consumeCallback(err)
return new Promise((resolve, reject) => {
if (extract.destroyed) return resolve({ value: undefined, done: true })
extract.once('close', function () {
if (err) reject(err)
else resolve({ value: undefined, done: true })
})
})
}
}
}

module.exports = function extract (opts) {
Expand Down
16 changes: 16 additions & 0 deletions test/extract.js
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,22 @@ test('unknown format attempts to extract if allowed', function (t) {
}
})

test('extract streams are async iterators', async function (t) {
const extract = tar.extract()
const b = fs.readFileSync(fixtures.MULTI_FILE_TAR)

extract.end(b)

const expected = ['file-1.txt', 'file-2.txt']

for await (const entry of extract) {
t.is(entry.header.name, expected.shift())
entry.resume()
t.comment('wait a bit...')
await new Promise(resolve => setTimeout(resolve, 100))
}
})

function clamp (index, len, defaultValue) {
if (typeof index !== 'number') return defaultValue
index = ~~index // Coerce to integer.
Expand Down

0 comments on commit 7525b47

Please sign in to comment.