Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fd offset handling in ReadStream #10883

Merged
merged 10 commits into from
May 7, 2024
4 changes: 2 additions & 2 deletions src/bun.js/webcore.zig
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon
// They may have said yes, but the stdin is invalid.
return .false;
};
if(next_byte == '\n'){
if (next_byte == '\n') {
return .false;
}
},
Expand All @@ -142,7 +142,7 @@ fn confirm(globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callcon
// 8. If the user responded positively, return true;
// otherwise, the user responded negatively: return false.
return .true;
}else if(next_byte == '\r'){
} else if (next_byte == '\r') {
//Check Windows style
const second_byte = reader.readByte() catch {
return .false;
Expand Down
14 changes: 14 additions & 0 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2829,6 +2829,20 @@ pub const Blob = struct {
return stream;
}

pub fn toStreamWithOffset(
globalThis: *JSC.JSGlobalObject,
callframe: *JSC.CallFrame,
) callconv(.C) JSC.JSValue {
const this = callframe.this().as(Blob) orelse @panic("this is not a Blob");
const args = callframe.arguments(1).slice();

return JSC.WebCore.ReadableStream.fromFileBlobWithOffset(
globalThis,
this,
@intCast(args[0].toInt64()),
);
}

fn promisified(
value: JSC.JSValue,
global: *JSGlobalObject,
Expand Down
52 changes: 47 additions & 5 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,38 @@ pub const ReadableStream = struct {
}
}

pub fn fromFileBlobWithOffset(
globalThis: *JSGlobalObject,
blob: *const Blob,
offset: usize,
) JSC.JSValue {
JSC.markBinding(@src());
var store = blob.store orelse {
return ReadableStream.empty(globalThis);
};
switch (store.data) {
.file => {
var reader = FileReader.Source.new(.{
.globalThis = globalThis,
.context = .{
.event_loop = JSC.EventLoopHandle.init(globalThis.bunVM().eventLoop()),
.start_offset = offset,
.lazy = .{
.blob = store,
},
},
});
store.ref();

return reader.toReadableStream(globalThis);
},
else => {
globalThis.throw("Expected FileBlob", .{});
return .zero;
gvilums marked this conversation as resolved.
Show resolved Hide resolved
},
}
}

pub fn fromPipe(
globalThis: *JSGlobalObject,
parent: anytype,
Expand Down Expand Up @@ -3415,6 +3447,7 @@ pub const FileReader = struct {
pending_value: JSC.Strong = .{},
pending_view: []u8 = &.{},
fd: bun.FileDescriptor = bun.invalid_fd,
start_offset: ?usize = null,
started: bool = false,
waiting_for_onReaderDone: bool = false,
event_loop: JSC.EventLoopHandle,
Expand Down Expand Up @@ -3606,11 +3639,20 @@ pub const FileReader = struct {
if (was_lazy) {
_ = this.parent().incrementCount();
this.waiting_for_onReaderDone = true;
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
if (this.start_offset) |offset| {
switch (this.reader.startFileOffset(this.fd, pollable, offset)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
} else {
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
return .{ .err = e };
},
}
}
} else if (comptime Environment.isPosix) {
if (this.reader.flags.pollable and !this.reader.isDone()) {
Expand Down
57 changes: 49 additions & 8 deletions src/io/PipeReader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,41 @@ pub fn PosixPipeReader(
return false;
}

fn wrapReadFn(comptime func: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize) {
return struct {
pub fn call(fd: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) {
_ = offset;
return func(fd, buffer);
}
}.call;
}

fn readFile(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, bun.sys.read);
const preadFn = struct {
pub fn call(fd1: bun.FileDescriptor, buffer: []u8, offset: usize) JSC.Maybe(usize) {
return bun.sys.pread(fd1, buffer, @intCast(offset));
}
}.call;
if (parent.flags.use_pread) {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, preadFn);
} else {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .file, wrapReadFn(bun.sys.read));
}
}

fn readSocket(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, bun.sys.recvNonBlock);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .socket, wrapReadFn(bun.sys.recvNonBlock));
}

fn readPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, bun.sys.readNonblocking);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .nonblocking_pipe, wrapReadFn(bun.sys.readNonblocking));
}

fn readBlockingPipe(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup: bool) void {
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, bun.sys.readNonblocking);
return readWithFn(parent, resizable_buffer, fd, size_hint, received_hup, .pipe, wrapReadFn(bun.sys.readNonblocking));
}

fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8) JSC.Maybe(usize)) void {
fn readWithFn(parent: *This, resizable_buffer: *std.ArrayList(u8), fd: bun.FileDescriptor, size_hint: isize, received_hup_: bool, comptime file_type: FileType, comptime sys_fn: *const fn (bun.FileDescriptor, []u8, usize) JSC.Maybe(usize)) void {
_ = size_hint; // autofix
const streaming = parent.vtable.isStreamingEnabled();

Expand All @@ -122,8 +140,10 @@ pub fn PosixPipeReader(
switch (sys_fn(
fd,
buffer,
parent._offset,
)) {
.result => |bytes_read| {
parent._offset += bytes_read;
buffer = stack_buffer_head[0..bytes_read];
stack_buffer_head = stack_buffer_head[bytes_read..];

Expand Down Expand Up @@ -217,8 +237,9 @@ pub fn PosixPipeReader(
resizable_buffer.ensureUnusedCapacity(16 * 1024) catch bun.outOfMemory();
var buffer: []u8 = resizable_buffer.unusedCapacitySlice();

switch (sys_fn(fd, buffer)) {
switch (sys_fn(fd, buffer, parent._offset)) {
.result => |bytes_read| {
parent._offset += bytes_read;
buffer = buffer[0..bytes_read];
resizable_buffer.items.len += bytes_read;

Expand Down Expand Up @@ -402,7 +423,7 @@ pub fn WindowsPipeReader(
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| {
this.flags.is_paused = true;
// we should inform the error if we are unable to keep reading
this.onRead(.{ .err = err }, "", .progress);
Expand All @@ -413,6 +434,7 @@ pub fn WindowsPipeReader(
}

const len: usize = @intCast(nread_int);
this._offset += len;
// we got some data lets get the current iov
if (this.source) |source| {
if (source == .file) {
Expand All @@ -439,7 +461,7 @@ pub fn WindowsPipeReader(
source.setData(this);
const buf = this.getReadBufferWithStableMemoryAddress(64 * 1024);
file.iov = uv.uv_buf_t.init(buf);
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, -1, onFileRead).toError(.write)) |err| {
if (uv.uv_fs_read(uv.Loop.get(), &file.fs, file.file, @ptrCast(&file.iov), 1, if (this.flags.use_pread) @intCast(this._offset) else -1, onFileRead).toError(.write)) |err| {
return .{ .err = err };
}
},
Expand Down Expand Up @@ -650,6 +672,7 @@ const BufferedReaderVTable = struct {
const PosixBufferedReader = struct {
handle: PollOrFd = .{ .closed = {} },
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
_offset: usize = 0,
vtable: BufferedReaderVTable,
flags: Flags = .{},

Expand All @@ -662,6 +685,7 @@ const PosixBufferedReader = struct {
closed_without_reporting: bool = false,
close_handle: bool = true,
memfd: bool = false,
use_pread: bool = false,
};

pub fn init(comptime Type: type) PosixBufferedReader {
Expand All @@ -683,6 +707,7 @@ const PosixBufferedReader = struct {
to.* = .{
.handle = other.handle,
._buffer = other.buffer().*,
._offset = other._offset,
.flags = other.flags,
.vtable = .{
.fns = to.vtable.fns,
Expand All @@ -692,6 +717,7 @@ const PosixBufferedReader = struct {
other.buffer().* = std.ArrayList(u8).init(bun.default_allocator);
other.flags.is_done = true;
other.handle = .{ .closed = {} };
other._offset = 0;
to.handle.setOwner(to);

// note: the caller is supposed to drain the buffer themselves
Expand Down Expand Up @@ -879,6 +905,12 @@ const PosixBufferedReader = struct {
};
}

pub fn startFileOffset(this: *PosixBufferedReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) {
this._offset = offset;
this.flags.use_pread = true;
return this.start(fd, poll);
}

// Exists for consistentcy with Windows.
pub fn hasPendingRead(this: *const PosixBufferedReader) bool {
return this.handle == .poll and this.handle.poll.isRegistered();
Expand Down Expand Up @@ -927,6 +959,7 @@ pub const WindowsBufferedReader = struct {
/// The pointer to this pipe must be stable.
/// It cannot change because we don't know what libuv will do with it.
source: ?Source = null,
_offset: usize = 0,
_buffer: std.ArrayList(u8) = std.ArrayList(u8).init(bun.default_allocator),
// for compatibility with Linux
flags: Flags = .{},
Expand All @@ -948,6 +981,7 @@ pub const WindowsBufferedReader = struct {

is_paused: bool = true,
has_inflight_read: bool = false,
use_pread: bool = false,
gvilums marked this conversation as resolved.
Show resolved Hide resolved
};

pub fn init(comptime Type: type) WindowsOutputReader {
Expand All @@ -970,6 +1004,7 @@ pub const WindowsBufferedReader = struct {
.vtable = to.vtable,
.flags = other.flags,
._buffer = other.buffer().*,
._offset = other._offset,
.source = other.source,
};
other.flags.is_done = true;
Expand Down Expand Up @@ -1103,6 +1138,12 @@ pub const WindowsBufferedReader = struct {
return this.startWithCurrentPipe();
}

pub fn startFileOffset(this: *WindowsOutputReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.JSC.Maybe(void) {
this._offset = offset;
this.flags.use_pread = true;
return this.start(fd, poll);
}

pub fn deinit(this: *WindowsOutputReader) void {
this.buffer().deinit();
const source = this.source orelse return;
Expand Down
53 changes: 13 additions & 40 deletions src/js/node/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ var defaultReadStreamOptions = {

let kHandle = Symbol("kHandle");

const blobToStreamWithOffset = $newZigFunction("blob.zig", "Blob.toStreamWithOffset", 1);

var ReadStreamClass;

ReadStream = (function (InternalReadStream) {
Expand Down Expand Up @@ -633,7 +635,7 @@ ReadStream = (function (InternalReadStream) {

// Get the stream controller
// We need the pointer to the underlying stream controller for the NativeReadable
var stream = fileRef.stream();
const stream = blobToStreamWithOffset.$apply(fileRef, [start]);
var ptr = stream.$bunNativePtr;
if (!ptr) {
throw new Error("Failed to get internal stream controller. This is a bug in Bun");
Expand Down Expand Up @@ -672,9 +674,8 @@ ReadStream = (function (InternalReadStream) {
this._readableState.autoClose = autoDestroy = autoClose;
this._readableState.highWaterMark = highWaterMark;

if (start !== undefined) {
this.pos = start;
}
this.pos = start || 0;
this.bytesRead = start || 0;

$assert(overridden_fs);
this.#fs = overridden_fs;
Expand Down Expand Up @@ -740,51 +741,23 @@ ReadStream = (function (InternalReadStream) {
}

push(chunk) {
// Is it even possible for this to be less than 1?
var bytesRead = chunk?.length ?? 0;
let bytesRead = chunk?.length ?? 0;
if (bytesRead > 0) {
this.bytesRead += bytesRead;
var currPos = this.pos;
// Handle case of going through bytes before pos if bytesRead is less than pos
// If pos is undefined, we are reading through the whole file
// Otherwise we started from somewhere in the middle of the file
if (currPos !== undefined) {
// At this point we still haven't hit our `start` point
// We should discard this chunk and exit
if (this.bytesRead < currPos) {
return true;
}
// At this point, bytes read is greater than our starting position
// If the current position is still the starting position, that means
// this is the first chunk where we care about the bytes read
// and we need to subtract the bytes read from the start position (n) and slice the last n bytes
if (currPos === this.start) {
var n = this.bytesRead - currPos;
chunk = chunk.slice(-n);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
if (this.end !== undefined && this.bytesRead > this.end) {
chunk = chunk.slice(0, this.end - this.start + 1);
}
return super.push(chunk, ...rest);
}
var end = this.end;
// This is multi-chunk read case where we go passed the end of the what we want to read in the last chunk
if (end !== undefined && this.bytesRead > end) {
chunk = chunk.slice(0, end - currPos + 1);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
return super.push(chunk, ...rest);
}
let end = this.end;
// truncate the chunk if we go past the end
if (end !== undefined && this.bytesRead > end) {
chunk = chunk.slice(0, end - this.pos + 1);
var [_, ...rest] = arguments;
this.pos = this.bytesRead;
return super.push(chunk, ...rest);
}
this.pos = this.bytesRead;
}

return super.push(...arguments);
}

// #

// n should be the highwatermark passed from Readable.read when calling internal _read (_read is set to this private fn in this class)
#internalRead(n) {
// pos is the current position in the file
Expand Down