Skip to content

Commit

Permalink
Fix fd offset handling in ReadStream (#10883)
Browse files Browse the repository at this point in the history
Co-authored-by: Georgijs Vilums <=>
Co-authored-by: gvilums <gvilums@users.noreply.github.com>
Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
  • Loading branch information
3 people committed May 7, 2024
1 parent 6217d78 commit d4db29c
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/bun.js/webcore.zig
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon
// They may have said yes, but the stdin is invalid.
return .false;
};
if(next_byte == '\n'){
if (next_byte == '\n') {
return .false;
}
},
Expand All @@ -142,7 +142,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon
// 8. If the user responded positively, return true;
// otherwise, the user responded negatively: return false.
return .true;
}else if(next_byte == '\r'){
} else if (next_byte == '\r') {
//Check Windows style
const second_byte = reader.readByte() catch {
return .false;
Expand Down
14 changes: 14 additions & 0 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,20 @@ pub const Blob = struct {
return stream;
}

pub fn toStreamWithOffset(
globalThis: *JSC.JSGlobalObject,
callframe: *JSC.CallFrame,
) callconv(.C) JSC.JSValue {
const this = callframe.this().as(Blob) orelse @panic("this is not a Blob");
const args = callframe.arguments(1).slice();

return JSC.WebCore.ReadableStream.fromFileBlobWithOffset(
globalThis,
this,
@intCast(args[0].toInt64()),
);
}

fn promisified(
value: JSC.JSValue,
global: *JSGlobalObject,
Expand Down
52 changes: 47 additions & 5 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,38 @@ pub const ReadableStream = struct {
}
}

pub fn fromFileBlobWithOffset(
globalThis: *JSGlobalObject,
blob: *const Blob,
offset: usize,
) JSC.JSValue {
JSC.markBinding(@src());
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
};
switch (store.data) {
.file => {
var reader = FileReader.Source.new(.{
.globalThis = globalThis,
.context = .{
.event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()),
.start_offset = offset,
.lazy = .{
.blob = store,
},
},
});
store.ref();

return reader.toReadableStream(globalThis);
},
else => {
globalThis.throw("Expected FileBlob", .{});
return .zero;
},
}
}

pub fn fromPipe(
globalThis: *JSGlobalObject,
parent: anytype,
Expand Down Expand Up @@ -3415,6 +3447,7 @@ pub const FileReader = struct {
pending_value: JSC.Strong = .{},
pending_view: []u8 = &.{},
fd: bun.FileDescriptor = bun.invalid_fd,
start_offset: ?usize = null,
started: bool = false,
waiting_for_onReaderDone: bool = false,
event_loop: JSC.EventLoopHandle,
Expand Down Expand Up @@ -3606,11 +3639,20 @@ pub const FileReader = struct {
if (was_lazy) {
_ = this.parent().incrementCount();
this.waiting_for_onReaderDone = true;
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
if (this.start_offset) |offset| {
switch (this.reader.startFileOffset(this.fd, pollable, offset)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
} else {
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
}
} else if (comptime Environment.isPosix) {
if (this.reader.flags.pollable and !this.reader.isDone()) {
Expand Down
57 changes: 49 additions & 8 deletions src/io/PipeReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,41 @@ pub fn PosixPipeReader(
return false;
}

fn wrapReadFn(comptime func: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize) {
return struct {
pub fn call(fd: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) {
_ = offset;
return func(fd, buffer);
}
}.call;
}

fn readFile(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, bun.sys.read);
const preadFn = struct {
pub fn call(fd1: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) {
return bun.sys.pread(fd1, buffer, @intCast(offset));
}
}.call;
if (parent.flags.use_pread) {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, preadFn);
} else {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, wrapReadFn(bun.sys.read));
}
}

fn readSocket(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, bun.sys.recvNonBlock);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, wrapReadFn(bun.sys.recvNonBlock));
}

fn readPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, bun.sys.readNonblocking);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking));
}

fn readBlockingPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, bun.sys.readNonblocking);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking));
}

fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) void {
fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void {
_ = size_hint; // autofix
const streaming = parent.vtable.isStreamingEnabled();

Expand All @@ -122,8 +140,10 @@ pub fn PosixPipeReader(
switch (sys_fn(
fd,
buffer,
parent._offset,
)) {
.result => |bytes_read| {
parent._offset += bytes_read;
buffer = stack_buffer_head[0..bytes_read];
stack_buffer_head = stack_buffer_head[bytes_read..];

Expand Down Expand Up @@ -217,8 +237,9 @@ pub fn PosixPipeReader(
resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory();
var buffer: []u8 = resizable_buffer.unusedCapacitySlice();

switch (sys_fn(fd, buffer)) {
switch (sys_fn(fd, buffer, parent._offset)) {
.result => |bytes_read| {
parent._offset += bytes_read;
buffer = buffer[0..bytes_read];
resizable_buffer.items.len += bytes_read;

Expand Down Expand Up @@ -402,7 +423,7 @@ pub fn WindowsPipeReader(
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| {
this.flags.is_paused = true;
// we should inform the error if we are unable to keep reading
this.onRead(.{ .err = err }, "", .progress);
Expand All @@ -413,6 +434,7 @@ pub fn WindowsPipeReader(
}

const len: usize = @intCast(nread_int);
this._offset += len;
// we got some data lets get the current iov
if (this.source) |source| {
if (source == .file) {
Expand All @@ -439,7 +461,7 @@ pub fn WindowsPipeReader(
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| {
return .{ .err = err };
}
},
Expand Down Expand Up @@ -650,6 +672,7 @@ const BufferedReaderVTable = struct {
const PosixBufferedReader = struct {
handle: PollOrFd = .{ .closed = {} },
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
_offset: usize = 0,
vtable: BufferedReaderVTable,
flags: Flags = .{},

Expand All @@ -662,6 +685,7 @@ const PosixBufferedReader = struct {
closed_without_reporting: bool = false,
close_handle: bool = true,
memfd: bool = false,
use_pread: bool = false,
};

pub fn init(comptime Type: type) PosixBufferedReader {
Expand All @@ -683,6 +707,7 @@ const PosixBufferedReader = struct {
to.* = .{
.handle = other.handle,
._buffer = other.buffer().*,
._offset = other._offset,
.flags = other.flags,
.vtable = .{
.fns = to.vtable.fns,
Expand All @@ -692,6 +717,7 @@ const PosixBufferedReader = struct {
other.buffer().* = std.ArrayList(u8).init(bun.default_allocator);
other.flags.is_done = true;
other.handle = .{ .closed = {} };
other._offset = 0;
to.handle.setOwner(to);

// note: the caller is supposed to drain the buffer themselves
Expand Down Expand Up @@ -879,6 +905,12 @@ const PosixBufferedReader = struct {
};
}

pub fn startFileOffset(this: *PosixBufferedReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) {
this._offset = offset;
this.flags.use_pread = true;
return this.start(fd, poll);
}

// Exists for consistentcy with Windows.
pub fn hasPendingRead(this: *const PosixBufferedReader) bool {
return this.handle == .poll and this.handle.poll.isRegistered();
Expand Down Expand Up @@ -927,6 +959,7 @@ pub const WindowsBufferedReader = struct {
/// The pointer to this pipe must be stable.
/// It cannot change because we don't know what libuv will do with it.
source: ?Source = null,
_offset: usize = 0,
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
// for compatibility with Linux
flags: Flags = .{},
Expand All @@ -948,6 +981,7 @@ pub const WindowsBufferedReader = struct {

is_paused: bool = true,
has_inflight_read: bool = false,
use_pread: bool = false,
};

pub fn init(comptime Type: type) WindowsOutputReader {
Expand All @@ -970,6 +1004,7 @@ pub const WindowsBufferedReader = struct {
.vtable = to.vtable,
.flags = other.flags,
._buffer = other.buffer().*,
._offset = other._offset,
.source = other.source,
};
other.flags.is_done = true;
Expand Down Expand Up @@ -1103,6 +1138,12 @@ pub const WindowsBufferedReader = struct {
return this.startWithCurrentPipe();
}

pub fn startFileOffset(this: *WindowsOutputReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) {
this._offset = offset;
this.flags.use_pread = true;
return this.start(fd, poll);
}

pub fn deinit(this: *WindowsOutputReader) void {
this.buffer().deinit();
const source = this.source orelse return;
Expand Down
53 changes: 13 additions & 40 deletions src/js/node/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ var defaultReadStreamOptions = {

let kHandle = Symbol("kHandle");

const blobToStreamWithOffset = $newZigFunction("blob.zig", "Blob.toStreamWithOffset", 1);

var ReadStreamClass;

ReadStream = (function (InternalReadStream) {
Expand Down Expand Up @@ -633,7 +635,7 @@ ReadStream = (function (InternalReadStream) {

// Get the stream controller
// We need the pointer to the underlying stream controller for the NativeReadable
var stream = fileRef.stream();
const stream = blobToStreamWithOffset.$apply(fileRef, [start]);
var ptr = stream.$bunNativePtr;
if (!ptr) {
throw new Error("Failed to get internal stream controller. This is a bug in Bun");
Expand Down Expand Up @@ -672,9 +674,8 @@ ReadStream = (function (InternalReadStream) {
this._readableState.autoClose = autoDestroy = autoClose;
this._readableState.highWaterMark = highWaterMark;

if (start !== undefined) {
this.pos = start;
}
this.pos = start || 0;
this.bytesRead = start || 0;

$assert(overridden_fs);
this.#fs = overridden_fs;
Expand Down Expand Up @@ -740,51 +741,23 @@ ReadStream = (function (InternalReadStream) {
}

push(chunk) {
// Is it even possible for this to be less than 1?
var bytesRead = chunk?.length ?? 0;
let bytesRead = chunk?.length ?? 0;
if (bytesRead > 0) {
this.bytesRead += bytesRead;
var currPos = this.pos;
// Handle case of going through bytes before pos if bytesRead is less than pos
// If pos is undefined, we are reading through the whole file
// Otherwise we started from somewhere in the middle of the file
if (currPos !== undefined) {
// At this point we still haven't hit our `start` point
// We should discard this chunk and exit
if (this.bytesRead < currPos) {
return true;
}
// At this point, bytes read is greater than our starting position
// If the current position is still the starting position, that means
// this is the first chunk where we care about the bytes read
// and we need to subtract the bytes read from the start position (n) and slice the last n bytes
if (currPos === this.start) {
var n = this.bytesRead - currPos;
chunk = chunk.slice(-n);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
if (this.end !== undefined && this.bytesRead > this.end) {
chunk = chunk.slice(0, this.end - this.start + 1);
}
return super.push(chunk, ...rest);
}
var end = this.end;
// This is multi-chunk read case where we go passed the end of the what we want to read in the last chunk
if (end !== undefined && this.bytesRead > end) {
chunk = chunk.slice(0, end - currPos + 1);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
return super.push(chunk, ...rest);
}
let end = this.end;
// truncate the chunk if we go past the end
if (end !== undefined && this.bytesRead > end) {
chunk = chunk.slice(0, end - this.pos + 1);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
return super.push(chunk, ...rest);
}
this.pos = this.bytesRead;
}

return super.push(...arguments);
}

// #

// n should be the highwatermark passed from Readable.read when calling internal _read (_read is set to this private fn in this class)
#internalRead(n) {
// pos is the current position in the file
Expand Down

0 comments on commit d4db29c

Please sign in to comment.