Skip to content

Commit

Permalink
feat: streams in browser and Bzz API revamp
Browse files Browse the repository at this point in the history
  • Loading branch information
AuHau committed Nov 13, 2019
1 parent e1c34a4 commit bac02e1
Show file tree
Hide file tree
Showing 18 changed files with 1,471 additions and 223 deletions.
36 changes: 35 additions & 1 deletion __tests__/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ describe('browser', () => {
page.on('console', msg => {
for (let i = 0; i < msg.args().length; ++i) {
/* eslint-disable-next-line no-console */
console.log(`${i}: ${msg.args()[i]}`)
console.log(msg.args()[i])
}
})

page.on('pageerror', function(err) {
console.log('Page error: ' + err.toString())
})

page.on('error', function(err) {
console.log('Error: ' + err.toString())
})

await page.addScriptTag({
path: resolve(
__dirname,
'../packages/swarm-browser/dist/erebos.swarm.development.js',
),
})

await page.addScriptTag({
path: resolve(
__dirname,
Expand Down Expand Up @@ -227,6 +236,31 @@ describe('browser', () => {
expect(directoryList).toEqual({ ...dir, '/': dir[defaultPath] })
})

it('downloadDirectoryData() streams the same data provided to uploadDirectory()', async () => {
const dir = {
[`foo-${uploadContent}.txt`]: {
data: `this is foo-${uploadContent}.txt`,
},
[`bar-${uploadContent}.txt`]: {
data: `this is bar-${uploadContent}.txt`,
},
}

const downloadedDir = await evalClient(async (client, dir) => {
const dirHash = await client.bzz.uploadDirectory(dir)
const response = await client.bzz.downloadDirectoryData(dirHash)
return Object.keys(response).reduce(
(prev, current) => ({
...prev,
[current]: { data: response[current].data.toString('utf8') },
}),
{},
)
}, dir)

expect(downloadedDir).toEqual(dir)
})

it('supports feeds posting and getting', async () => {
jest.setTimeout(20000)
const data = { test: uploadContent }
Expand Down
36 changes: 18 additions & 18 deletions docs/api-bzz.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ The `download()` method returns a [`Response` instance](https://developer.mozill

**Returns** `Promise<Response>`

### .downloadDirectoryData()

**Arguments**

1. `hashOrDomain: string`: ENS name or Swarm hash
1. [`options?: DownloadOptions = {}`](#downloadoptions)

**Returns** `Promise<DirectoryData>`

### .uploadFile()

Uploads a single file and returns the hash. If the `contentType` option is provided, it will return the manifest hash, otherwise the file will be uploaded as raw data and will return the hash of the data itself.
Expand Down Expand Up @@ -667,15 +676,6 @@ Returns a [RxJS `Observable`](https://rxjs.dev/api/index/class/Observable) emitt

**Returns** `Observable<FileEntry>`

### .downloadDirectoryData()

**Arguments**

1. `hashOrDomain: string`: ENS name or Swarm hash
1. [`options?: DownloadOptions = {}`](#downloadoptions)

**Returns** `Promise<DirectoryData>`

### .downloadTarTo()

**Arguments**
Expand Down Expand Up @@ -718,15 +718,6 @@ Call `downloadFileTo()` or `downloadDirectoryTo()` depending on the provided `pa

**Returns** `Promise<void>`

### .uploadFileStream()

**Arguments**

1. `stream: Readable`: Node.js [`Readable stream`](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_readable) instance
1. [`options?: UploadOptions = {}`](#uploadoptions)

**Returns** `Promise<string>`

### .uploadTar()

**Arguments**
Expand Down Expand Up @@ -764,3 +755,12 @@ Calls `uploadFileFrom()` or `uploadDirectoryFrom()` depending on the provided `p
1. [`options?: UploadOptions = {}`](#uploadoptions)

**Returns** `Promise<string>`

### .uploadFileStream()

**Arguments**

1. `readable-stream: Readable`: [`Readable stream`](https://www.npmjs.com/package/readable-stream) compatible instance
1. [`options?: UploadOptions = {}`](#uploadoptions)

**Returns** `Promise<string>`
7 changes: 6 additions & 1 deletion packages/api-bzz-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
"@babel/runtime": "^7.6.2",
"@erebos/hex": "^0.10.0",
"@erebos/keccak256": "^0.10.0",
"rxjs": "^6.5.3"
"readable-stream": "^3.1.1",
"rxjs": "^6.5.3",
"tar-stream": "^2.1.0"
},
"devDependencies": {
"@types/readable-stream": "^2.3.5"
}
}
83 changes: 82 additions & 1 deletion packages/api-bzz-base/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { createHex, hexInput, hexValue, toHexValue } from '@erebos/hex'
import { interval, merge, Observable } from 'rxjs'
import { interval, merge, Observable, Observer } from 'rxjs'
import { distinctUntilChanged, filter, flatMap } from 'rxjs/operators'
import tarStream from 'tar-stream'

import { Readable } from 'readable-stream'
import { createFeedDigest, getFeedTopic } from './feed'
import {
BaseResponse,
Expand All @@ -25,6 +27,7 @@ import {
SignBytesFunc,
Tag,
UploadOptions,
FileEntry,
} from './types'

export * from './feed'
Expand Down Expand Up @@ -281,6 +284,84 @@ export class BaseBzz<Response extends BaseResponse> {
return resOrError(res)
}

protected async downloadTar(
hash: string,
options: DownloadOptions,
): Promise<Response> {
if (options.headers == null) {
options.headers = {}
}
options.headers.accept = 'application/x-tar'
return await this.download(hash, options)
}

public downloadObservable(
hash: string,
options: DownloadOptions = {},
): Observable<FileEntry> {
return new Observable((observer: Observer<FileEntry>) => {
this.downloadTar(hash, options).then(
res => {
const extract = tarStream.extract()
extract.on('entry', (header, stream, next) => {
if (header.type === 'file') {
const chunks: Array<Buffer> = []
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk)
})
stream.on('end', () => {
observer.next({
data: Buffer.concat(chunks),
path: header.name,
size: header.size,
})
next()
})
stream.resume()
} else {
next()
}
})
extract.on('finish', () => {
observer.complete()
})

this.normalizeStream(res.body).pipe(extract)
},
err => {
observer.error(err)
},
)
})
}

protected normalizeStream(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
stream: Readable | ReadableStream | NodeJS.ReadableStream,
): Readable {
throw new Error('Must be implemented in extending class')
}

public downloadDirectoryData(
hash: string,
options: DownloadOptions = {},
): Promise<DirectoryData> {
return new Promise((resolve, reject) => {
const directoryData: DirectoryData = {}
this.downloadObservable(hash, options).subscribe({
next: entry => {
directoryData[entry.path] = { data: entry.data, size: entry.size }
},
error: err => {
reject(err)
},
complete: () => {
resolve(directoryData)
},
})
})
}

protected async uploadBody(
body: any,
options: UploadOptions,
Expand Down
7 changes: 6 additions & 1 deletion packages/api-bzz-base/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/// <reference types="node" />
import { hexInput, hexValue } from '@erebos/hex';
import { Observable } from 'rxjs';
import { BaseResponse, RequestInit, Fetch, BzzConfig, BzzMode, DirectoryData, DownloadOptions, FeedMetadata, FeedParams, FeedUpdateParams, FetchOptions, ListResult, PinOptions, PinnedFile, PollOptions, PollFeedOptions, PollFeedContentHashOptions, PollFeedContentOptions, SignBytesFunc, Tag, UploadOptions } from './types';
import { Readable } from 'readable-stream';
import { BaseResponse, RequestInit, Fetch, BzzConfig, BzzMode, DirectoryData, DownloadOptions, FeedMetadata, FeedParams, FeedUpdateParams, FetchOptions, ListResult, PinOptions, PinnedFile, PollOptions, PollFeedOptions, PollFeedContentHashOptions, PollFeedContentOptions, SignBytesFunc, Tag, UploadOptions, FileEntry } from './types';
export * from './feed';
export * from './types';
export declare const BZZ_MODE_PROTOCOLS: {
Expand Down Expand Up @@ -37,6 +38,10 @@ export declare class BaseBzz<Response extends BaseResponse> {
hash(domain: string, options?: FetchOptions): Promise<hexValue>;
list(hash: string, options?: DownloadOptions): Promise<ListResult>;
download(hash: string, options?: DownloadOptions): Promise<Response>;
protected downloadTar(hash: string, options: DownloadOptions): Promise<Response>;
downloadObservable(hash: string, options?: DownloadOptions): Observable<FileEntry>;
protected normalizeStream(stream: Readable | ReadableStream | NodeJS.ReadableStream): Readable;
downloadDirectoryData(hash: string, options?: DownloadOptions): Promise<DirectoryData>;
protected uploadBody(body: any, options: UploadOptions, raw?: boolean): Promise<hexValue>;
uploadFile(data: string | Buffer, options?: UploadOptions): Promise<hexValue>;
uploadDirectory(_directory: DirectoryData, _options?: UploadOptions): Promise<hexValue>;
Expand Down
6 changes: 5 additions & 1 deletion packages/api-bzz-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
},
"dependencies": {
"@babel/runtime": "^7.6.2",
"@erebos/api-bzz-base": "^0.10.0"
"@erebos/api-bzz-base": "^0.10.0",
"readable-stream": "^3.4.0"
},
"devDependencies": {
"@types/readable-stream": "^2.3.5"
}
}
6 changes: 6 additions & 0 deletions packages/api-bzz-browser/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
UploadOptions,
} from '@erebos/api-bzz-base'
import { hexValue } from '@erebos/hex'
import { Readable } from 'readable-stream'
import { NodeReadable } from './utils'

export * from '@erebos/api-bzz-base'

Expand All @@ -16,6 +18,10 @@ export class Bzz extends BaseBzz<Response> {
super(window.fetch.bind(window), { ...cfg, url: new URL(url).href })
}

protected normalizeStream(stream: ReadableStream): Readable {
return (new NodeReadable(stream) as unknown) as Readable
}

public async uploadDirectory(
directory: DirectoryData,
options: UploadOptions = {},
Expand Down
70 changes: 70 additions & 0 deletions packages/api-bzz-browser/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Readable } from 'readable-stream'

// Taken from https://github.com/JCCR/web-streams-node
// License: https://github.com/jccr/web-streams-node/blob/master/LICENSE

export class NodeReadable extends Readable {
private _webStream: ReadableStream
private _reader: ReadableStreamDefaultReader<any>
private _reading: boolean
private _doneReading?: (value?: unknown) => void

constructor(webStream: ReadableStream, options?: object) {
super(options)
this._webStream = webStream
this._reader = webStream.getReader()
this._reading = false
this._doneReading = undefined
}

_read(): void {
if (this._reading) {
return
}
this._reading = true
const doRead = (): void => {
this._reader.read().then(res => {
if (this._doneReading) {
this._reading = false
this._reader.releaseLock()
this._doneReading()
}
if (res.done) {
this.push(null)
this._reading = false
this._reader.releaseLock()
return
}
if (this.push(res.value)) {
return doRead()
} else {
this._reading = false
this._reader.releaseLock()
}
})
}
doRead()
}

_destroy(
err: Error | null,
callback: (err: Error | null | undefined) => void,
): void {
if (this._reading) {
const promise = new Promise(resolve => {
this._doneReading = resolve
})
promise.then(() => this._handleDestroy(err, callback))
} else {
this._handleDestroy(err, callback)
}
}

_handleDestroy(
err: Error | null,
callback: (err: Error | null | undefined) => void,
): void {
this._webStream.cancel()
super._destroy(err, callback)
}
}
2 changes: 2 additions & 0 deletions packages/api-bzz-browser/types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { BaseBzz, BzzConfig, DirectoryData, UploadOptions } from '@erebos/api-bzz-base';
import { hexValue } from '@erebos/hex';
import { Readable } from 'readable-stream';
export * from '@erebos/api-bzz-base';
export declare class Bzz extends BaseBzz<Response> {
constructor(config: BzzConfig);
protected normalizeStream(stream: ReadableStream): Readable;
uploadDirectory(directory: DirectoryData, options?: UploadOptions): Promise<hexValue>;
}
11 changes: 11 additions & 0 deletions packages/api-bzz-browser/types/utils.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Readable } from 'readable-stream';
export declare class NodeReadable extends Readable {
private _webStream;
private _reader;
private _reading;
private _doneReading?;
constructor(webStream: ReadableStream, options?: object);
_read(size?: number): void;
_destroy(err: Error | null, callback: (err: Error | null | undefined) => void): void;
_handleDestroy(err: Error | null, callback: (err: Error | null | undefined) => void): void;
}
3 changes: 1 addition & 2 deletions packages/api-bzz-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
"fs-extra": "^8.0.1",
"node-fetch": "^2.6.0",
"rxjs": "^6.5.3",
"tar-fs": "^2.0.0",
"tar-stream": "^2.1.0"
"tar-fs": "^2.0.0"
},
"devDependencies": {
"@types/fs-extra": "^8.0.0",
Expand Down

0 comments on commit bac02e1

Please sign in to comment.