Skip to content

Commit b3cfaab

Browse files
Jarred-Sumnerautofix-ci[bot]Claude Botclaudepfgithub
authored
Fix: after pausing stdin, a subprocess should be able to read from stdin (#23341)
Fixes #23333, Fixes #13978 ### What does this PR do? ### How did you verify your code works? --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Claude Bot <[email protected]> Co-authored-by: Claude <[email protected]> Co-authored-by: pfg <[email protected]> Co-authored-by: Zack Radisic <[email protected]>
1 parent 8574d62 commit b3cfaab

16 files changed

+647
-143
lines changed

src/bun.js/api/server/FileRoute.zig

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,9 @@ const StreamTransfer = struct {
397397
log("max_size reached, ending stream", .{});
398398
if (this.route.server) |server| {
399399
// dont need to ref because we are already holding a ref and will be derefed in onReaderDone
400-
this.reader.pause();
400+
if (!bun.Environment.isPosix) {
401+
this.reader.pause();
402+
}
401403
// we cannot free inside onReadChunk this would be UAF so we schedule it to be done in the next event loop tick
402404
this.eof_task = jsc.AnyTask.New(StreamTransfer, StreamTransfer.onReaderDone).init(this);
403405
server.vm().enqueueTask(jsc.Task.init(&this.eof_task.?));
@@ -430,7 +432,9 @@ const StreamTransfer = struct {
430432
// pause the reader so deref until onWritable
431433
defer this.deref();
432434
this.resp.onWritable(*StreamTransfer, onWritable, this);
433-
this.reader.pause();
435+
if (!bun.Environment.isPosix) {
436+
this.reader.pause();
437+
}
434438
return false;
435439
},
436440
.want_more => {

src/bun.js/api/streams.classes.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ function source(name) {
7272
fn: "setRawModeFromJS",
7373
length: 1,
7474
},
75+
setFlowing: {
76+
fn: "setFlowingFromJS",
77+
length: 1,
78+
},
7579
}
7680
: {}),
7781
},

src/bun.js/webcore/FileReader.zig

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ lazy: Lazy = .{ .none = {} },
1818
buffered: std.ArrayListUnmanaged(u8) = .{},
1919
read_inside_on_pull: ReadDuringJSOnPullResult = .{ .none = {} },
2020
highwater_mark: usize = 16384,
21+
flowing: bool = true,
2122

2223
pub const IOReader = bun.io.BufferedReader;
2324
pub const Poll = IOReader;
@@ -487,6 +488,14 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
487488
}
488489

489490
if (!this.reader.hasPendingRead()) {
491+
// If not flowing (paused), don't initiate new reads
492+
if (!this.flowing) {
493+
log("onPull({d}) = pending (not flowing)", .{buffer.len});
494+
this.pending_value.set(this.parent().globalThis, array);
495+
this.pending_view = buffer;
496+
return .{ .pending = &this.pending };
497+
}
498+
490499
this.read_inside_on_pull = .{ .js = buffer };
491500
this.reader.read();
492501

@@ -581,32 +590,15 @@ pub fn onReaderDone(this: *FileReader) void {
581590
}
582591
this.buffered = .{};
583592
this.pending.run();
584-
} else if (this.buffered.items.len > 0) {
585-
const this_value = this.parent().this_jsvalue;
586-
const globalThis = this.parent().globalThis;
587-
if (this_value != .zero) {
588-
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| {
589-
const buffered = this.buffered;
590-
this.buffered = .{};
591-
this.parent().incrementCount();
592-
defer _ = this.parent().decrementCount();
593-
this.eventLoop().js.runCallback(
594-
cb,
595-
globalThis,
596-
.js_undefined,
597-
&.{
598-
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| {
599-
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } };
600-
return;
601-
},
602-
},
603-
);
604-
}
605-
}
606593
}
594+
// Don't handle buffered data here - it will be returned on the next onPull
595+
// This ensures proper ordering of chunks
607596
}
608597

609-
this.parent().onClose();
598+
// Only close the stream if there's no buffered data left to deliver
599+
if (this.buffered.items.len == 0) {
600+
this.parent().onClose();
601+
}
610602
if (this.waiting_for_onReaderDone) {
611603
this.waiting_for_onReaderDone = false;
612604
_ = this.parent().decrementCount();
@@ -631,6 +623,26 @@ pub fn setRawMode(this: *FileReader, flag: bool) bun.sys.Maybe(void) {
631623
return this.reader.setRawMode(flag);
632624
}
633625

626+
pub fn setFlowing(this: *FileReader, flag: bool) void {
627+
log("setFlowing({}) was={}", .{ flag, this.flowing });
628+
629+
if (this.flowing == flag) {
630+
return;
631+
}
632+
633+
this.flowing = flag;
634+
635+
if (flag) {
636+
this.reader.unpause();
637+
if (!this.reader.isDone() and !this.reader.hasPendingRead()) {
638+
// Kick off a new read if needed
639+
this.reader.read();
640+
}
641+
} else {
642+
this.reader.pause();
643+
}
644+
}
645+
634646
pub fn memoryCost(this: *const FileReader) usize {
635647
// ReadableStreamSource covers @sizeOf(FileReader)
636648
return this.reader.memoryCost() + this.buffered.capacity;

src/bun.js/webcore/ReadableStream.zig

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,19 @@ pub fn NewSource(
577577
@compileError("setRawMode is not implemented on " ++ @typeName(Context));
578578
}
579579

580+
pub fn setFlowingFromJS(this: *ReadableStreamSourceType, _: *jsc.JSGlobalObject, call_frame: *jsc.CallFrame) bun.JSError!JSValue {
581+
if (@hasDecl(Context, "setFlowing")) {
582+
const flag = call_frame.argument(0);
583+
if (Environment.allow_assert) {
584+
bun.assert(flag.isBoolean());
585+
}
586+
this.context.setFlowing(flag == .true);
587+
return .js_undefined;
588+
}
589+
590+
return .js_undefined;
591+
}
592+
580593
const supports_ref = setRefUnrefFn != null;
581594

582595
pub const js = @field(jsc.Codegen, "JS" ++ name_ ++ "InternalReadableStreamSource");

0 commit comments

Comments
 (0)