mirror of
https://github.com/oven-sh/bun
synced 2026-02-03 15:38:46 +00:00
Compare commits
6 Commits
dylan/pyth
...
user/marko
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f746aa7c91 | ||
|
|
aa5a301a1a | ||
|
|
3564991a89 | ||
|
|
6e305460c1 | ||
|
|
ddec6fc3ce | ||
|
|
57b7876286 |
@@ -4,7 +4,7 @@ register_repository(
|
||||
REPOSITORY
|
||||
libuv/libuv
|
||||
COMMIT
|
||||
da527d8d2a908b824def74382761566371439003
|
||||
5152db2cbfeb5582e9c27c5ea1dba2cd9e10759b
|
||||
)
|
||||
|
||||
if(WIN32)
|
||||
|
||||
7
repro-hell/beancounter.ts
Normal file
7
repro-hell/beancounter.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
let count = 0;
|
||||
process.stdin.on("data", chunk => {
|
||||
count += chunk.length;
|
||||
});
|
||||
process.stdin.on("end", () => {
|
||||
console.log(count);
|
||||
});
|
||||
1
repro-hell/reader.ts
Normal file
1
repro-hell/reader.ts
Normal file
@@ -0,0 +1 @@
|
||||
process.stdin.pipe(process.stdout);
|
||||
@@ -1,3 +1,5 @@
|
||||
const mlog = @import("../../../../mlog.zig").log;
|
||||
|
||||
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
return struct {
|
||||
const This = @This();
|
||||
@@ -35,6 +37,7 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
}
|
||||
|
||||
pub fn close(this: *This) void {
|
||||
mlog("StaticPipeWriter(0x{x}) close()\n", .{@intFromPtr(this)});
|
||||
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
|
||||
this.writer.close();
|
||||
}
|
||||
@@ -60,6 +63,14 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
}
|
||||
|
||||
pub fn start(this: *This) bun.sys.Maybe(void) {
|
||||
mlog(
|
||||
"StaticPipeWriter(0x{x}, writer={s}) start() LIFECYCLE: buffer_size={d}\n",
|
||||
.{
|
||||
@intFromPtr(this),
|
||||
@typeName(@TypeOf(this.writer)),
|
||||
this.source.slice().len,
|
||||
},
|
||||
);
|
||||
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
|
||||
this.ref();
|
||||
this.buffer = this.source.slice();
|
||||
@@ -83,19 +94,25 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
|
||||
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
|
||||
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
|
||||
mlog("StaticPipeWriter(0x{x}) onWrite(amount={d} {}) LIFECYCLE: buffer_remaining_before={d}\n", .{ @intFromPtr(this), amount, status, this.buffer.len });
|
||||
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
|
||||
const remaining_after = this.buffer.len;
|
||||
mlog("StaticPipeWriter(0x{x}) LIFECYCLE: buffer_remaining_after={d} will_close={}\n", .{ @intFromPtr(this), remaining_after, (status == .end_of_file or remaining_after == 0) });
|
||||
if (status == .end_of_file or this.buffer.len == 0) {
|
||||
mlog("StaticPipeWriter(0x{x}) LIFECYCLE: closing writer (finished sending all data)\n", .{@intFromPtr(this)});
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onError(this: *This, err: bun.sys.Error) void {
|
||||
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
|
||||
mlog("StaticPipeWriter(0x{x}) onError(err={any})\n", .{ @intFromPtr(this), err });
|
||||
this.source.detach();
|
||||
}
|
||||
|
||||
pub fn onClose(this: *This) void {
|
||||
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
|
||||
mlog("StaticPipeWriter(0x{x}) onClose() LIFECYCLE: stdin writer closed, notifying subprocess\n", .{@intFromPtr(this)});
|
||||
this.source.detach();
|
||||
this.process.onCloseIO(.stdin);
|
||||
}
|
||||
|
||||
@@ -31,10 +31,10 @@
|
||||
*/
|
||||
|
||||
#define UV_VERSION_MAJOR 1
|
||||
#define UV_VERSION_MINOR 50
|
||||
#define UV_VERSION_PATCH 1
|
||||
#define UV_VERSION_IS_RELEASE 0
|
||||
#define UV_VERSION_SUFFIX "dev"
|
||||
#define UV_VERSION_MINOR 51
|
||||
#define UV_VERSION_PATCH 0
|
||||
#define UV_VERSION_IS_RELEASE 1
|
||||
#define UV_VERSION_SUFFIX ""
|
||||
|
||||
#define UV_VERSION_HEX ((UV_VERSION_MAJOR << 16) | \
|
||||
(UV_VERSION_MINOR << 8) | \
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
const mlog = @import("../mlog.zig").log;
|
||||
const RareData = @This();
|
||||
|
||||
websocket_deflate: ?*WebSocketDeflate.RareData = null,
|
||||
@@ -376,6 +377,7 @@ pub fn stdout(rare: *RareData) *Blob.Store {
|
||||
|
||||
pub fn stdin(rare: *RareData) *Blob.Store {
|
||||
bun.analytics.Features.@"Bun.stdin" += 1;
|
||||
mlog("RareData stdin() called - creating/returning stdin Blob.Store\n", .{});
|
||||
return rare.stdin_store orelse brk: {
|
||||
var mode: bun.Mode = 0;
|
||||
const fd = bun.FD.fromUV(0);
|
||||
@@ -397,6 +399,7 @@ pub fn stdin(rare: *RareData) *Blob.Store {
|
||||
},
|
||||
},
|
||||
});
|
||||
mlog("RareData stdin() created new Blob.Store for fd=0, is_atty={?}, mode={?}\n", .{ store.data.file.is_atty, mode });
|
||||
rare.stdin_store = store;
|
||||
break :brk store;
|
||||
};
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
//! objects that reference the filesystem (Blob.Store.File). This is how
|
||||
//! operations like writing `Store.File` to another `Store.File` knows to use a
|
||||
//! basic file copy instead of a naive read write loop.
|
||||
const mlog = @import("../../mlog.zig").log;
|
||||
|
||||
const Blob = @This();
|
||||
|
||||
@@ -1997,6 +1998,7 @@ pub fn getStream(
|
||||
|
||||
recommended_chunk_size = @as(SizeType, @intCast(@max(0, @as(i52, @truncate(arguments[0].toInt64())))));
|
||||
}
|
||||
mlog("Blob.stream() creating ReadableStream for blob, chunk_size={}, store_type={s}\n", .{ recommended_chunk_size, if (this.store) |store| @tagName(store.data) else "null" });
|
||||
const stream = try jsc.WebCore.ReadableStream.fromBlobCopyRef(
|
||||
globalThis,
|
||||
this,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
const mlog = @import("../../mlog.zig").log;
|
||||
const FileReader = @This();
|
||||
|
||||
const log = Output.scoped(.FileReader, .visible);
|
||||
@@ -175,6 +176,8 @@ pub fn setup(
|
||||
}
|
||||
|
||||
pub fn onStart(this: *FileReader) streams.Start {
|
||||
mlog("FileReader onStart()\n", .{});
|
||||
|
||||
this.reader.setParent(this);
|
||||
const was_lazy = this.lazy != .none;
|
||||
var pollable = false;
|
||||
@@ -218,6 +221,7 @@ pub fn onStart(this: *FileReader) streams.Start {
|
||||
_ = this.parent().incrementCount();
|
||||
this.waiting_for_onReaderDone = true;
|
||||
if (this.start_offset) |offset| {
|
||||
mlog("FileReader starting with offset: fd={}, offset={}, pollable={}\n", .{ this.fd, offset, pollable });
|
||||
switch (this.reader.startFileOffset(this.fd, pollable, offset)) {
|
||||
.result => {},
|
||||
.err => |e| {
|
||||
@@ -225,6 +229,7 @@ pub fn onStart(this: *FileReader) streams.Start {
|
||||
},
|
||||
}
|
||||
} else {
|
||||
mlog("FileReader starting: fd={}, pollable={}, max_size={?}\n", .{ this.fd, pollable, this.max_size });
|
||||
switch (this.reader.start(this.fd, pollable)) {
|
||||
.result => {},
|
||||
.err => |e| {
|
||||
@@ -304,6 +309,7 @@ pub fn deinit(this: *FileReader) void {
|
||||
pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState) bool {
|
||||
var buf = init_buf;
|
||||
log("onReadChunk() = {d} ({s}) - read_inside_on_pull: {s}", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });
|
||||
mlog("FileReader onReadChunk(len={d}, state={s} - read_inside_on_pull: {s})\n", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });
|
||||
|
||||
if (this.done) {
|
||||
this.reader.close();
|
||||
@@ -312,10 +318,14 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
var close = false;
|
||||
defer if (close) this.reader.close();
|
||||
var hasMore = state != .eof;
|
||||
mlog("FileReader onReadChunk: hasMore={} (state={s})\n", .{ hasMore, @tagName(state) });
|
||||
|
||||
if (buf.len > 0) {
|
||||
if (this.max_size) |max_size| {
|
||||
if (this.total_readed >= max_size) return false;
|
||||
if (this.total_readed >= max_size) {
|
||||
mlog("FileReader onReadChunk: returning false (max_size reached)\n", .{});
|
||||
return false;
|
||||
}
|
||||
const len = @min(max_size - this.total_readed, buf.len);
|
||||
if (buf.len > len) {
|
||||
buf = buf[0..len];
|
||||
@@ -327,9 +337,12 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
hasMore = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mlog("FileReader onReadChunk: 0-byte read, hasMore={}, reader.isDone={}\n", .{ hasMore, this.reader.isDone() });
|
||||
}
|
||||
|
||||
const reader_buffer = this.reader.buffer();
|
||||
mlog("FileReader onReadChunk: read_inside_on_pull={s}, buf.len={d}\n", .{ @tagName(this.read_inside_on_pull), buf.len });
|
||||
if (this.read_inside_on_pull != .none) {
|
||||
switch (this.read_inside_on_pull) {
|
||||
.js => |in_progress| {
|
||||
@@ -358,6 +371,14 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
}
|
||||
|
||||
if (buf.len == 0) {
|
||||
// Certain readers (such as pipes) may return 0-byte reads even when
|
||||
// not at EOF. Consequently, we need to check whether the reader is
|
||||
// actually done or not.
|
||||
if (!this.reader.isDone()) {
|
||||
// If the reader is not done, we still want to keep reading.
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.buffered.items.len == 0) {
|
||||
this.buffered.clearAndFree(bun.default_allocator);
|
||||
this.buffered = reader_buffer.moveToUnmanaged();
|
||||
@@ -375,6 +396,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
} else {
|
||||
this.pending.result = .{ .done = {} };
|
||||
}
|
||||
mlog("FileReader onReadChunk: returning false (true EOF with 0-byte read)\n", .{});
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -394,6 +416,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
.{ .into_array_and_done = into_array }
|
||||
else
|
||||
.{ .into_array = into_array };
|
||||
mlog("FileReader onReadChunk: returning {} (pending_view path)\n", .{!was_done});
|
||||
return !was_done;
|
||||
}
|
||||
|
||||
@@ -407,6 +430,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
reader_buffer.clearRetainingCapacity();
|
||||
this.pending.result = .{ .temporary = .fromBorrowedSliceDangerous(buf) };
|
||||
}
|
||||
mlog("FileReader onReadChunk: returning {} (slice in reader buffer)\n", .{!was_done});
|
||||
return !was_done;
|
||||
}
|
||||
|
||||
@@ -415,6 +439,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
.{ .temporary_and_done = .fromBorrowedSliceDangerous(buf) }
|
||||
else
|
||||
.{ .temporary = .fromBorrowedSliceDangerous(buf) };
|
||||
mlog("FileReader onReadChunk: returning {} (slice not in buffered)\n", .{!was_done});
|
||||
return !was_done;
|
||||
}
|
||||
|
||||
@@ -427,6 +452,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
.{ .owned_and_done = .moveFromList(&buffered) }
|
||||
else
|
||||
.{ .owned = .moveFromList(&buffered) };
|
||||
mlog("FileReader onReadChunk: returning {} (buffered path)\n", .{!was_done});
|
||||
return !was_done;
|
||||
} else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
|
||||
bun.handleOom(this.buffered.appendSlice(bun.default_allocator, buf));
|
||||
@@ -436,9 +462,11 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
|
||||
}
|
||||
|
||||
// For pipes, we have to keep pulling or the other process will block.
|
||||
return this.read_inside_on_pull != .temporary and
|
||||
const should_continue = this.read_inside_on_pull != .temporary and
|
||||
!(this.buffered.items.len + reader_buffer.items.len >= this.highwater_mark and
|
||||
!this.reader.flags.pollable);
|
||||
mlog("FileReader onReadChunk: final return should_continue={}, read_inside_on_pull={s}, buffered_len={d}\n", .{ should_continue, @tagName(this.read_inside_on_pull), this.buffered.items.len });
|
||||
return should_continue;
|
||||
}
|
||||
|
||||
fn isPulling(this: *const FileReader) bool {
|
||||
@@ -446,6 +474,7 @@ fn isPulling(this: *const FileReader) bool {
|
||||
}
|
||||
|
||||
pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Result {
|
||||
mlog("FileReader onPull(buffer_len={d}) called\n", .{buffer.len});
|
||||
array.ensureStillAlive();
|
||||
defer array.ensureStillAlive();
|
||||
const drained = this.drain();
|
||||
@@ -461,20 +490,25 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
|
||||
this.buffered.clearAndFree(bun.default_allocator);
|
||||
|
||||
if (this.reader.isDone()) {
|
||||
mlog("FileReader onPull(buffer_len={d}) returning INTO_ARRAY_AND_DONE with {d} bytes\n", .{ buffer.len, drained.len });
|
||||
return .{ .into_array_and_done = .{ .value = array, .len = drained.len } };
|
||||
} else {
|
||||
mlog("FileReader onPull(buffer_len={d}) returning INTO_ARRAY with {d} bytes\n", .{ buffer.len, drained.len });
|
||||
return .{ .into_array = .{ .value = array, .len = drained.len } };
|
||||
}
|
||||
}
|
||||
|
||||
if (this.reader.isDone()) {
|
||||
mlog("FileReader onPull(buffer_len={d}) returning OWNED_AND_DONE with {d} bytes\n", .{ buffer.len, drained.len });
|
||||
return .{ .owned_and_done = drained };
|
||||
} else {
|
||||
mlog("FileReader onPull(buffer_len={d}) returning OWNED with {d} bytes\n", .{ buffer.len, drained.len });
|
||||
return .{ .owned = drained };
|
||||
}
|
||||
}
|
||||
|
||||
if (this.reader.isDone()) {
|
||||
mlog("FileReader onPull(buffer_len={d}) returning DONE (no drained data)\n", .{buffer.len});
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
@@ -521,7 +555,7 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
|
||||
|
||||
if (this.reader.isDone()) {
|
||||
log("onPull({d}) = done", .{buffer.len});
|
||||
|
||||
mlog("FileReader onPull(buffer_len={d}) returning DONE\n", .{buffer.len});
|
||||
return .{ .done = {} };
|
||||
}
|
||||
}
|
||||
@@ -530,7 +564,7 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
|
||||
this.pending_view = buffer;
|
||||
|
||||
log("onPull({d}) = pending", .{buffer.len});
|
||||
|
||||
mlog("FileReader onPull(buffer_len={d}) returning PENDING\n", .{buffer.len});
|
||||
return .{ .pending = &this.pending };
|
||||
}
|
||||
|
||||
@@ -563,37 +597,41 @@ fn consumeReaderBuffer(this: *FileReader) void {
|
||||
|
||||
pub fn onReaderDone(this: *FileReader) void {
|
||||
log("onReaderDone()", .{});
|
||||
if (!this.isPulling()) {
|
||||
this.consumeReaderBuffer();
|
||||
if (this.pending.state == .pending) {
|
||||
if (this.buffered.items.len > 0) {
|
||||
this.pending.result = .{ .owned_and_done = bun.ByteList.moveFromList(&this.buffered) };
|
||||
} else {
|
||||
this.pending.result = .{ .done = {} };
|
||||
}
|
||||
this.buffered = .{};
|
||||
this.pending.run();
|
||||
} else if (this.buffered.items.len > 0) {
|
||||
const this_value = this.parent().this_jsvalue;
|
||||
const globalThis = this.parent().globalThis;
|
||||
if (this_value != .zero) {
|
||||
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| {
|
||||
const buffered = this.buffered;
|
||||
this.buffered = .{};
|
||||
this.parent().incrementCount();
|
||||
defer _ = this.parent().decrementCount();
|
||||
this.eventLoop().js.runCallback(
|
||||
cb,
|
||||
globalThis,
|
||||
.js_undefined,
|
||||
&.{
|
||||
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| {
|
||||
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } };
|
||||
return;
|
||||
},
|
||||
mlog("FileReader onReaderDone()\n", .{});
|
||||
|
||||
if (this.isPulling()) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.consumeReaderBuffer();
|
||||
if (this.pending.state == .pending) {
|
||||
if (this.buffered.items.len > 0) {
|
||||
this.pending.result = .{ .owned_and_done = bun.ByteList.moveFromList(&this.buffered) };
|
||||
} else {
|
||||
this.pending.result = .{ .done = {} };
|
||||
}
|
||||
this.buffered = .{};
|
||||
this.pending.run();
|
||||
} else if (this.buffered.items.len > 0) {
|
||||
const this_value = this.parent().this_jsvalue;
|
||||
const globalThis = this.parent().globalThis;
|
||||
if (this_value != .zero) {
|
||||
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| {
|
||||
const buffered = this.buffered;
|
||||
this.buffered = .{};
|
||||
this.parent().incrementCount();
|
||||
defer _ = this.parent().decrementCount();
|
||||
this.eventLoop().js.runCallback(
|
||||
cb,
|
||||
globalThis,
|
||||
.js_undefined,
|
||||
&.{
|
||||
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| {
|
||||
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } };
|
||||
return;
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
const mlog = @import("../../mlog.zig").log;
|
||||
const ReadableStream = @This();
|
||||
|
||||
value: JSValue,
|
||||
@@ -116,6 +117,7 @@ pub fn toAnyBlob(
|
||||
}
|
||||
|
||||
pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
mlog("ReadableStream(0x{x}) done()\n", .{@intFromPtr(this)});
|
||||
jsc.markBinding(@src());
|
||||
// done is called when we are done consuming the stream
|
||||
// cancel actually mark the stream source as done
|
||||
@@ -136,6 +138,7 @@ pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
}
|
||||
|
||||
pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
mlog("ReadableStream(0x{x}) cancel()\n", .{@intFromPtr(this)});
|
||||
jsc.markBinding(@src());
|
||||
// cancel the stream
|
||||
ReadableStream__cancel(this.value, globalThis);
|
||||
@@ -144,6 +147,7 @@ pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
}
|
||||
|
||||
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
mlog("ReadableStream(0x{x}) abort()\n", .{@intFromPtr(this)});
|
||||
jsc.markBinding(@src());
|
||||
// for now we are just calling cancel should be fine
|
||||
this.cancel(globalThis);
|
||||
@@ -313,6 +317,8 @@ pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommend
|
||||
return reader.toReadableStream(globalThis);
|
||||
},
|
||||
.file => {
|
||||
const fd = if (store.data.file.pathlike == .fd) store.data.file.pathlike.fd else bun.invalid_fd;
|
||||
mlog("ReadableStream.fromBlobCopyRef FILE case: fd={}, start_offset={}, max_size={?}, blob_size={}\n", .{ fd, blob.offset, if (blob.size != Blob.max_size) blob.size else null, blob.size });
|
||||
var reader = webcore.FileReader.Source.new(.{
|
||||
.globalThis = globalThis,
|
||||
.context = .{
|
||||
@@ -326,7 +332,7 @@ pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommend
|
||||
},
|
||||
});
|
||||
store.ref();
|
||||
|
||||
mlog("ReadableStream.fromBlobCopyRef created FileReader.Source, returning stream\n", .{});
|
||||
return reader.toReadableStream(globalThis);
|
||||
},
|
||||
.s3 => |*s3| {
|
||||
|
||||
@@ -143,10 +143,10 @@ pub const UV__ENODATA = -@as(c_int, 4024);
|
||||
pub const UV__EUNATCH = -@as(c_int, 4023);
|
||||
pub const UV_VERSION_H = "";
|
||||
pub const UV_VERSION_MAJOR = @as(c_int, 1);
|
||||
pub const UV_VERSION_MINOR = @as(c_int, 46);
|
||||
pub const UV_VERSION_PATCH = @as(c_int, 1);
|
||||
pub const UV_VERSION_MINOR = @as(c_int, 51);
|
||||
pub const UV_VERSION_PATCH = @as(c_int, 0);
|
||||
pub const UV_VERSION_IS_RELEASE = @as(c_int, 0);
|
||||
pub const UV_VERSION_SUFFIX = "dev";
|
||||
pub const UV_VERSION_SUFFIX = "";
|
||||
pub const UV_VERSION_HEX = ((UV_VERSION_MAJOR << @as(c_int, 16)) | (UV_VERSION_MINOR << @as(c_int, 8))) | UV_VERSION_PATCH;
|
||||
|
||||
pub const UV_THREADPOOL_H_ = "";
|
||||
@@ -2981,7 +2981,9 @@ fn StreamMixin(comptime Type: type) type {
|
||||
req.readStop();
|
||||
error_cb(context_data, ReturnCodeI64.init(nreads).errEnum() orelse bun.sys.E.CANCELED);
|
||||
} else {
|
||||
read_cb(context_data, buffer.slice());
|
||||
const actual_bytes_read = @as(usize, @intCast(nreads));
|
||||
bun.sys.syslog("libuv uvReadcb: nreads={d} buffer.len={d} actual_bytes={d}", .{ nreads, buffer.len, actual_bytes_read });
|
||||
read_cb(context_data, buffer.base[0..actual_bytes_read]);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -79,29 +79,71 @@ fn onPipeClose(this: *WindowsNamedPipe) void {
|
||||
}
|
||||
|
||||
fn onReadAlloc(this: *WindowsNamedPipe, suggested_size: usize) []u8 {
|
||||
mlog("WindowsNamedPipe onReadAlloc(0x{d}) suggested_size={}\n", .{ @intFromPtr(this), suggested_size });
|
||||
var available = this.incoming.unusedCapacitySlice();
|
||||
mlog("WindowsNamedPipe onReadAlloc(0x{d}) initial_available_len={} incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), available.len, this.incoming.len, this.incoming.cap });
|
||||
|
||||
if (available.len < suggested_size) {
|
||||
mlog("WindowsNamedPipe onReadAlloc(0x{d}) need to expand: available={} < suggested={}\n", .{ @intFromPtr(this), available.len, suggested_size });
|
||||
bun.handleOom(this.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size));
|
||||
available = this.incoming.unusedCapacitySlice();
|
||||
mlog("WindowsNamedPipe onReadAlloc(0x{d}) after expansion: available_len={} incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), available.len, this.incoming.len, this.incoming.cap });
|
||||
}
|
||||
return available.ptr[0..suggested_size];
|
||||
|
||||
const result_slice = available.ptr[0..suggested_size];
|
||||
mlog("WindowsNamedPipe onReadAlloc(0x{d}) returning slice: ptr=0x{d} len={}\n", .{ @intFromPtr(this), @intFromPtr(result_slice.ptr), result_slice.len });
|
||||
return result_slice;
|
||||
}
|
||||
|
||||
fn onRead(this: *WindowsNamedPipe, buffer: []const u8) void {
|
||||
log("onRead ({})", .{buffer.len});
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) buffer.len={} buffer.ptr=0x{d}\n", .{ @intFromPtr(this), buffer.len, @intFromPtr(buffer.ptr) });
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) BEFORE: incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), this.incoming.len, this.incoming.cap });
|
||||
|
||||
// Log the first and last few bytes of the buffer to see what we received
|
||||
if (buffer.len > 0) {
|
||||
const preview_len = @min(buffer.len, 32);
|
||||
if (preview_len > 0) {
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) buffer first {} bytes content logged\n", .{ @intFromPtr(this), preview_len });
|
||||
}
|
||||
|
||||
if (buffer.len > 64) {
|
||||
const end_preview_len = @min(32, buffer.len);
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) buffer last {} bytes content logged\n", .{ @intFromPtr(this), end_preview_len });
|
||||
}
|
||||
}
|
||||
|
||||
this.incoming.len += @as(u32, @truncate(buffer.len));
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) AFTER adding buffer: incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), this.incoming.len, this.incoming.cap });
|
||||
|
||||
bun.assert(this.incoming.len <= this.incoming.cap);
|
||||
bun.assert(bun.isSliceInBuffer(buffer, this.incoming.allocatedSlice()));
|
||||
|
||||
const data = this.incoming.slice();
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) final data.len={} data.ptr=0x{d}\n", .{ @intFromPtr(this), data.len, @intFromPtr(data.ptr) });
|
||||
|
||||
// Log content of final data to see what gets passed to handlers
|
||||
if (data.len > 0) {
|
||||
const data_preview_len = @min(data.len, 32);
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) final data first {} bytes content logged\n", .{ @intFromPtr(this), data_preview_len });
|
||||
|
||||
if (data.len > 64) {
|
||||
const data_end_preview_len = @min(32, data.len);
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) final data last {} bytes content logged\n", .{ @intFromPtr(this), data_end_preview_len });
|
||||
}
|
||||
}
|
||||
|
||||
this.resetTimeout();
|
||||
|
||||
if (this.wrapper) |*wrapper| {
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) calling wrapper.receiveData with {} bytes\n", .{ @intFromPtr(this), data.len });
|
||||
wrapper.receiveData(data);
|
||||
} else {
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) calling handlers.onData with {} bytes\n", .{ @intFromPtr(this), data.len });
|
||||
this.handlers.onData(this.handlers.ctx, data);
|
||||
}
|
||||
|
||||
mlog("WindowsNamedPipe onRead(0x{d}) resetting incoming.len to 0\n", .{@intFromPtr(this)});
|
||||
this.incoming.len = 0;
|
||||
}
|
||||
|
||||
@@ -207,13 +249,18 @@ fn internalWrite(this: *WindowsNamedPipe, encoded_data: []const u8) void {
|
||||
}
|
||||
|
||||
pub fn resumeStream(this: *WindowsNamedPipe) bool {
|
||||
mlog("WindowsNamedPipe resumeStream(0x{d}) called\n", .{@intFromPtr(this)});
|
||||
const stream = this.writer.getStream() orelse {
|
||||
mlog("WindowsNamedPipe resumeStream(0x{d}) failed: no stream\n", .{@intFromPtr(this)});
|
||||
return false;
|
||||
};
|
||||
mlog("WindowsNamedPipe resumeStream(0x{d}) got stream=0x{d}, calling readStart\n", .{ @intFromPtr(this), @intFromPtr(stream) });
|
||||
const readStartResult = stream.readStart(this, onReadAlloc, onReadError, onRead);
|
||||
if (readStartResult == .err) {
|
||||
mlog("WindowsNamedPipe resumeStream(0x{d}) readStart failed: {}\n", .{ @intFromPtr(this), readStartResult.err });
|
||||
return false;
|
||||
}
|
||||
mlog("WindowsNamedPipe resumeStream(0x{d}) readStart succeeded\n", .{@intFromPtr(this)});
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -274,27 +321,37 @@ pub fn from(
|
||||
};
|
||||
}
|
||||
fn onConnect(this: *WindowsNamedPipe, status: uv.ReturnCode) void {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) status={}\n", .{ @intFromPtr(this), status.int() });
|
||||
if (this.pipe) |pipe| {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) unreferencing pipe\n", .{@intFromPtr(this)});
|
||||
_ = pipe.unref();
|
||||
}
|
||||
|
||||
if (status.toError(.connect)) |err| {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) connect failed: {}\n", .{ @intFromPtr(this), err });
|
||||
this.onError(err);
|
||||
return;
|
||||
}
|
||||
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) connect successful, setting disconnected=false\n", .{@intFromPtr(this)});
|
||||
this.flags.disconnected = false;
|
||||
if (this.start(true)) {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) start() successful, isTLS={}\n", .{ @intFromPtr(this), this.isTLS() });
|
||||
if (this.isTLS()) {
|
||||
if (this.wrapper) |*wrapper| {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) TLS: calling wrapper.start()\n", .{@intFromPtr(this)});
|
||||
// trigger onOpen and start the handshake
|
||||
wrapper.start();
|
||||
}
|
||||
} else {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) no TLS: calling onOpen()\n", .{@intFromPtr(this)});
|
||||
// trigger onOpen
|
||||
this.onOpen();
|
||||
}
|
||||
} else {
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) start() failed\n", .{@intFromPtr(this)});
|
||||
}
|
||||
mlog("WindowsNamedPipe onConnect(0x{d}) calling flush()\n", .{@intFromPtr(this)});
|
||||
this.flush();
|
||||
}
|
||||
|
||||
@@ -431,27 +488,37 @@ pub fn startTLS(this: *WindowsNamedPipe, ssl_options: jsc.API.ServerConfig.SSLCo
|
||||
}
|
||||
|
||||
pub fn start(this: *WindowsNamedPipe, is_client: bool) bool {
|
||||
mlog("WindowsNamedPipe start(0x{d}) is_client={}\n", .{ @intFromPtr(this), is_client });
|
||||
this.flags.is_client = is_client;
|
||||
if (this.pipe == null) {
|
||||
mlog("WindowsNamedPipe start(0x{d}) failed: pipe is null\n", .{@intFromPtr(this)});
|
||||
return false;
|
||||
}
|
||||
mlog("WindowsNamedPipe start(0x{d}) unreferencing pipe and setting up writer\n", .{@intFromPtr(this)});
|
||||
_ = this.pipe.?.unref();
|
||||
this.writer.setParent(this);
|
||||
const startPipeResult = this.writer.startWithPipe(this.pipe.?);
|
||||
if (startPipeResult == .err) {
|
||||
mlog("WindowsNamedPipe start(0x{d}) writer.startWithPipe failed: {}\n", .{ @intFromPtr(this), startPipeResult.err });
|
||||
this.onError(startPipeResult.err);
|
||||
return false;
|
||||
}
|
||||
mlog("WindowsNamedPipe start(0x{d}) writer.startWithPipe succeeded\n", .{@intFromPtr(this)});
|
||||
|
||||
const stream = this.writer.getStream() orelse {
|
||||
mlog("WindowsNamedPipe start(0x{d}) writer.getStream() returned null\n", .{@intFromPtr(this)});
|
||||
this.onError(bun.sys.Error.fromCode(bun.sys.E.PIPE, .read));
|
||||
return false;
|
||||
};
|
||||
mlog("WindowsNamedPipe start(0x{d}) got stream=0x{d}, calling readStart\n", .{ @intFromPtr(this), @intFromPtr(stream) });
|
||||
|
||||
const readStartResult = stream.readStart(this, onReadAlloc, onReadError, onRead);
|
||||
if (readStartResult == .err) {
|
||||
mlog("WindowsNamedPipe start(0x{d}) readStart failed: {}\n", .{ @intFromPtr(this), readStartResult.err });
|
||||
this.onError(readStartResult.err);
|
||||
return false;
|
||||
}
|
||||
mlog("WindowsNamedPipe start(0x{d}) readStart succeeded\n", .{@intFromPtr(this)});
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -574,6 +641,7 @@ pub fn deinit(this: *WindowsNamedPipe) void {
|
||||
pub const CertError = UpgradedDuplex.CertError;
|
||||
const WrapperType = SSLWrapper(*WindowsNamedPipe);
|
||||
const log = bun.Output.scoped(.WindowsNamedPipe, .visible);
|
||||
const mlog = @import("../../mlog.zig").log;
|
||||
|
||||
const std = @import("std");
|
||||
const SSLWrapper = @import("../../bun.js/api/bun/ssl_wrapper.zig").SSLWrapper;
|
||||
|
||||
@@ -595,7 +595,10 @@ pub const FD = packed struct(backing_int) {
|
||||
pub const truncate = bun.sys.ftruncate;
|
||||
pub const unlinkat = bun.sys.unlinkat;
|
||||
pub const updateNonblocking = bun.sys.updateNonblocking;
|
||||
pub const write = bun.sys.write;
|
||||
const Error = @import("./sys/Error.zig");
|
||||
pub fn write(fd: bun.FileDescriptor, bytes: []const u8) bun.api.node.Maybe(usize, Error) {
|
||||
return bun.sys.write(fd, bytes);
|
||||
}
|
||||
pub const writeNonblocking = bun.sys.writeNonblocking;
|
||||
pub const writev = bun.sys.writev;
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
const mlog = @import("../mlog.zig").log;
|
||||
|
||||
// This is a runtime type instead of comptime due to bugs in Zig.
|
||||
// https://github.com/ziglang/zig/issues/18664
|
||||
const BufferedReaderVTable = struct {
|
||||
@@ -749,6 +751,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn from(to: *WindowsBufferedReader, other: anytype, parent: anytype) void {
|
||||
mlog("WindowsBufferedReader from(to=0x{d})\n", .{@intFromPtr(to)});
|
||||
bun.assert(other.source != null and to.source == null);
|
||||
to.* = .{
|
||||
.vtable = to.vtable,
|
||||
@@ -770,11 +773,13 @@ pub const WindowsBufferedReader = struct {
|
||||
return source.getFd();
|
||||
}
|
||||
|
||||
pub fn watch(_: *WindowsBufferedReader) void {
|
||||
pub fn watch(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader watch(0x{d})\n", .{@intFromPtr(this)});
|
||||
// No-op on windows.
|
||||
}
|
||||
|
||||
pub fn setParent(this: *WindowsBufferedReader, parent: anytype) void {
|
||||
mlog("WindowsBufferedReader setParent(0x{d})\n", .{@intFromPtr(this)});
|
||||
this.parent = parent;
|
||||
if (!this.flags.is_done) {
|
||||
if (this.source) |source| {
|
||||
@@ -860,6 +865,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn startWithCurrentPipe(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader startWithCurrentPipe(0x{d})\n", .{@intFromPtr(this)});
|
||||
bun.assert(!this.source.?.isClosed());
|
||||
this.source.?.setData(this);
|
||||
this.buffer().clearRetainingCapacity();
|
||||
@@ -868,11 +874,13 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn startWithPipe(this: *WindowsBufferedReader, pipe: *uv.Pipe) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader startWithPipe(0x{d})\n", .{@intFromPtr(this)});
|
||||
this.source = .{ .pipe = pipe };
|
||||
return this.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
pub fn start(this: *WindowsBufferedReader, fd: bun.FileDescriptor, _: bool) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader start(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
|
||||
bun.assert(this.source == null);
|
||||
const source = switch (Source.open(uv.Loop.get(), fd)) {
|
||||
.err => |err| return .{ .err = err },
|
||||
@@ -884,12 +892,14 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn startFileOffset(this: *WindowsBufferedReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader startFileOffset(0x{d}, fd={}, poll={}, offset={})\n", .{ @intFromPtr(this), fd, poll, offset });
|
||||
this._offset = offset;
|
||||
this.flags.use_pread = true;
|
||||
return this.start(fd, poll);
|
||||
}
|
||||
|
||||
pub fn deinit(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader deinit(0x{d})\n", .{@intFromPtr(this)});
|
||||
MaxBuf.removeFromPipereader(&this.maxbuf);
|
||||
this.buffer().deinit();
|
||||
const source = this.source orelse return;
|
||||
@@ -901,6 +911,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn setRawMode(this: *WindowsBufferedReader, value: bool) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader setRawMode(0x{d}, value={})\n", .{ @intFromPtr(this), value });
|
||||
const source = this.source orelse return .{
|
||||
.err = .{
|
||||
.errno = @intFromEnum(bun.sys.E.BADF),
|
||||
@@ -923,6 +934,14 @@ pub const WindowsBufferedReader = struct {
|
||||
const nread_int = nread.int();
|
||||
|
||||
bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int });
|
||||
mlog("WindowsBufferedReader onStreamRead(0x{d}) = {d}\n", .{ @intFromPtr(this), nread_int });
|
||||
|
||||
// DEBUG: Track cumulative reads to diagnose partial read issue after libuv 1.51.0 upgrade
|
||||
const current_buffer_size = this._buffer.items.len;
|
||||
const bytes_this_read = if (nread_int > 0) @as(usize, @intCast(nread_int)) else 0;
|
||||
const total_after_read = current_buffer_size + bytes_this_read;
|
||||
mlog("WindowsBufferedReader READ_TRACKING(0x{d}) this_read={d} buffer_before={d} total_after={d} flags(done={} paused={})\n",
|
||||
.{ @intFromPtr(this), bytes_this_read, current_buffer_size, total_after_read, this.flags.is_done, this.flags.is_paused });
|
||||
|
||||
// NOTE: pipes/tty need to call stopReading on errors (yeah)
|
||||
switch (nread_int) {
|
||||
@@ -946,6 +965,10 @@ pub const WindowsBufferedReader = struct {
|
||||
const len: usize = @intCast(nread_int);
|
||||
var slice = buf.slice();
|
||||
this.onRead(.{ .result = len }, slice[0..len], .progress);
|
||||
|
||||
// DEBUG: Track state after onRead to see if it affects further reading
|
||||
mlog("WindowsBufferedReader POST_READ(0x{d}) flags_after_onRead(done={} paused={}) buffer_final_size={d}\n",
|
||||
.{ @intFromPtr(this), this.flags.is_done, this.flags.is_paused, this._buffer.items.len });
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -1012,6 +1035,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn startReading(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader startReading(0x{d})\n", .{@intFromPtr(this)});
|
||||
if (this.flags.is_done or !this.flags.is_paused) return .success;
|
||||
this.flags.is_paused = false;
|
||||
const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.sys.E.BADF, .read) };
|
||||
@@ -1039,6 +1063,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn stopReading(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedReader stopReading(0x{d})\n", .{@intFromPtr(this)});
|
||||
if (this.flags.is_done or this.flags.is_paused) return .success;
|
||||
this.flags.is_paused = true;
|
||||
const source = this.source orelse return .success;
|
||||
@@ -1054,6 +1079,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn closeImpl(this: *WindowsBufferedReader, comptime callDone: bool) void {
|
||||
mlog("WindowsBufferedReader closeImpl(0x{d}, callDone={})\n", .{ @intFromPtr(this), callDone });
|
||||
if (this.source) |source| {
|
||||
switch (source) {
|
||||
.sync_file, .file => |file| {
|
||||
@@ -1088,6 +1114,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn close(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader close(0x{d})\n", .{@intFromPtr(this)});
|
||||
_ = this.stopReading();
|
||||
this.closeImpl(true);
|
||||
}
|
||||
@@ -1109,6 +1136,7 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn onRead(this: *WindowsBufferedReader, amount: bun.sys.Maybe(usize), slice: []u8, hasMore: ReadState) void {
|
||||
mlog("WindowsBufferedReader onRead(0x{d}, slice.len={})\n", .{ @intFromPtr(this), slice.len });
|
||||
if (amount == .err) {
|
||||
this.onError(amount.err);
|
||||
return;
|
||||
@@ -1139,14 +1167,17 @@ pub const WindowsBufferedReader = struct {
|
||||
}
|
||||
|
||||
pub fn pause(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader pause(0x{d})\n", .{@intFromPtr(this)});
|
||||
_ = this.stopReading();
|
||||
}
|
||||
|
||||
pub fn unpause(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader unpause(0x{d})\n", .{@intFromPtr(this)});
|
||||
_ = this.startReading();
|
||||
}
|
||||
|
||||
pub fn read(this: *WindowsBufferedReader) void {
|
||||
mlog("WindowsBufferedReader read(0x{d})\n", .{@intFromPtr(this)});
|
||||
// we cannot sync read pipes on Windows so we just check if we are paused to resume the reading
|
||||
this.unpause();
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
const mlog = @import("../mlog.zig").log;
|
||||
const log = bun.Output.scoped(.PipeWriter, .hidden);
|
||||
|
||||
pub const WriteResult = union(enum) {
|
||||
@@ -781,155 +782,232 @@ fn BaseWindowsPipeWriter(
|
||||
) type {
|
||||
return struct {
|
||||
pub fn getFd(this: *const WindowsPipeWriter) bun.FileDescriptor {
|
||||
const pipe = this.source orelse return bun.invalid_fd;
|
||||
return pipe.getFd();
|
||||
mlog("BaseWindowsPipeWriter getFd(0x{d})\n", .{@intFromPtr(this)});
|
||||
const pipe = this.source orelse {
|
||||
mlog("BaseWindowsPipeWriter getFd(0x{d}) -> no source, returning invalid_fd\n", .{@intFromPtr(this)});
|
||||
return bun.invalid_fd;
|
||||
};
|
||||
const fd = pipe.getFd();
|
||||
mlog("BaseWindowsPipeWriter getFd(0x{d}) -> fd={}\n", .{ @intFromPtr(this), fd });
|
||||
return fd;
|
||||
}
|
||||
|
||||
pub fn hasRef(this: *const WindowsPipeWriter) bool {
|
||||
mlog("BaseWindowsPipeWriter hasRef(0x{d}) is_done={}\n", .{ @intFromPtr(this), this.is_done });
|
||||
if (this.is_done) {
|
||||
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> false (is_done)\n", .{@intFromPtr(this)});
|
||||
return false;
|
||||
}
|
||||
if (this.source) |pipe| return pipe.hasRef();
|
||||
if (this.source) |pipe| {
|
||||
const has_ref = pipe.hasRef();
|
||||
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> {} (from pipe)\n", .{ @intFromPtr(this), has_ref });
|
||||
return has_ref;
|
||||
}
|
||||
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> false (no source)\n", .{@intFromPtr(this)});
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn enableKeepingProcessAlive(this: *WindowsPipeWriter, event_loop: anytype) void {
|
||||
mlog("BaseWindowsPipeWriter enableKeepingProcessAlive(0x{d})\n", .{@intFromPtr(this)});
|
||||
this.updateRef(event_loop, true);
|
||||
}
|
||||
|
||||
pub fn disableKeepingProcessAlive(this: *WindowsPipeWriter, event_loop: anytype) void {
|
||||
mlog("BaseWindowsPipeWriter disableKeepingProcessAlive(0x{d})\n", .{@intFromPtr(this)});
|
||||
this.updateRef(event_loop, false);
|
||||
}
|
||||
|
||||
fn onFileClose(handle: *uv.fs_t) callconv(.C) void {
|
||||
mlog("BaseWindowsPipeWriter onFileClose() handle=0x{d}\n", .{@intFromPtr(handle)});
|
||||
const file = bun.cast(*Source.File, handle.data);
|
||||
mlog("BaseWindowsPipeWriter onFileClose() file=0x{d}, cleaning up\n", .{@intFromPtr(file)});
|
||||
handle.deinit();
|
||||
bun.default_allocator.destroy(file);
|
||||
}
|
||||
|
||||
fn onPipeClose(handle: *uv.Pipe) callconv(.C) void {
|
||||
mlog("BaseWindowsPipeWriter onPipeClose() handle=0x{d}\n", .{@intFromPtr(handle)});
|
||||
const this = bun.cast(*uv.Pipe, handle.data);
|
||||
mlog("BaseWindowsPipeWriter onPipeClose() pipe=0x{d}, destroying\n", .{@intFromPtr(this)});
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void {
|
||||
mlog("BaseWindowsPipeWriter onTTYClose() handle=0x{d}\n", .{@intFromPtr(handle)});
|
||||
const this = bun.cast(*uv.uv_tty_t, handle.data);
|
||||
mlog("BaseWindowsPipeWriter onTTYClose() tty=0x{d}, destroying\n", .{@intFromPtr(this)});
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
pub fn close(this: *WindowsPipeWriter) void {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) is_done={} owns_fd={}\n", .{ @intFromPtr(this), this.is_done, this.owns_fd });
|
||||
this.is_done = true;
|
||||
if (this.source) |source| {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) closing source type={s}\n", .{ @intFromPtr(this), @tagName(source) });
|
||||
switch (source) {
|
||||
.sync_file, .file => |file| {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) handling file/sync_file, calling fs.cancel()\n", .{@intFromPtr(this)});
|
||||
// always cancel the current one
|
||||
file.fs.cancel();
|
||||
if (this.owns_fd) {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) owns_fd=true, calling uv_fs_close\n", .{@intFromPtr(this)});
|
||||
// always use close_fs here because we can have a operation in progress
|
||||
file.close_fs.data = file;
|
||||
_ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, onFileClose);
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) owns_fd=false, skipping uv_fs_close\n", .{@intFromPtr(this)});
|
||||
}
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) handling pipe, calling pipe.close()\n", .{@intFromPtr(this)});
|
||||
pipe.data = pipe;
|
||||
pipe.close(onPipeClose);
|
||||
},
|
||||
.tty => |tty| {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) handling tty, calling tty.close()\n", .{@intFromPtr(this)});
|
||||
tty.data = tty;
|
||||
tty.close(onTTYClose);
|
||||
},
|
||||
}
|
||||
this.source = null;
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) calling onCloseSource()\n", .{@intFromPtr(this)});
|
||||
this.onCloseSource();
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter close(0x{d}) no source to close\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *WindowsPipeWriter, _: anytype, value: bool) void {
|
||||
mlog("BaseWindowsPipeWriter updateRef(0x{d}, value={})\n", .{ @intFromPtr(this), value });
|
||||
if (this.source) |pipe| {
|
||||
if (value) {
|
||||
mlog("BaseWindowsPipeWriter updateRef(0x{d}) calling pipe.ref()\n", .{@intFromPtr(this)});
|
||||
pipe.ref();
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter updateRef(0x{d}) calling pipe.unref()\n", .{@intFromPtr(this)});
|
||||
pipe.unref();
|
||||
}
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter updateRef(0x{d}) no source to update\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn setParent(this: *WindowsPipeWriter, parent: *Parent) void {
|
||||
mlog("BaseWindowsPipeWriter setParent(0x{d}, parent=0x{d}) is_done={}\n", .{ @intFromPtr(this), @intFromPtr(parent), this.is_done });
|
||||
this.parent = parent;
|
||||
if (!this.is_done) {
|
||||
if (this.source) |pipe| {
|
||||
mlog("BaseWindowsPipeWriter setParent(0x{d}) calling pipe.setData\n", .{@intFromPtr(this)});
|
||||
pipe.setData(this);
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter setParent(0x{d}) no source to setData\n", .{@intFromPtr(this)});
|
||||
}
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter setParent(0x{d}) skipping setData (is_done=true)\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn watch(_: *WindowsPipeWriter) void {
|
||||
pub fn watch(this: *WindowsPipeWriter) void {
|
||||
mlog("BaseWindowsPipeWriter watch(0x{d}) - no-op\n", .{@intFromPtr(this)});
|
||||
// no-op
|
||||
}
|
||||
|
||||
pub fn startWithPipe(this: *WindowsPipeWriter, pipe: *uv.Pipe) bun.sys.Maybe(void) {
|
||||
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}, pipe=0x{d})\n", .{ @intFromPtr(this), @intFromPtr(pipe) });
|
||||
bun.assert(this.source == null);
|
||||
this.source = .{ .pipe = pipe };
|
||||
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}) calling setParent\n", .{@intFromPtr(this)});
|
||||
this.setParent(this.parent);
|
||||
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
|
||||
return this.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
pub fn startSync(this: *WindowsPipeWriter, fd: bun.FileDescriptor, _: bool) bun.sys.Maybe(void) {
|
||||
mlog("BaseWindowsPipeWriter startSync(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
|
||||
bun.assert(this.source == null);
|
||||
const source = Source{
|
||||
.sync_file = Source.openFile(fd),
|
||||
};
|
||||
mlog("BaseWindowsPipeWriter startSync(0x{d}) created sync_file source\n", .{@intFromPtr(this)});
|
||||
source.setData(this);
|
||||
this.source = source;
|
||||
this.setParent(this.parent);
|
||||
mlog("BaseWindowsPipeWriter startSync(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
|
||||
return this.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
pub fn startWithFile(this: *WindowsPipeWriter, fd: bun.FileDescriptor) bun.sys.Maybe(void) {
|
||||
mlog("BaseWindowsPipeWriter startWithFile(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
|
||||
bun.assert(this.source == null);
|
||||
const source: bun.io.Source = .{ .file = Source.openFile(fd) };
|
||||
mlog("BaseWindowsPipeWriter startWithFile(0x{d}) created file source\n", .{@intFromPtr(this)});
|
||||
source.setData(this);
|
||||
this.source = source;
|
||||
this.setParent(this.parent);
|
||||
mlog("BaseWindowsPipeWriter startWithFile(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
|
||||
return this.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
pub fn start(this: *WindowsPipeWriter, rawfd: anytype, _: bool) bun.sys.Maybe(void) {
|
||||
const FDType = @TypeOf(rawfd);
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}, FDType={s})\n", .{ @intFromPtr(this), @typeName(FDType) });
|
||||
const fd = switch (FDType) {
|
||||
bun.FileDescriptor => rawfd,
|
||||
*bun.MovableIfWindowsFd => rawfd.get().?,
|
||||
else => @compileError("Expected `bun.FileDescriptor` or `*bun.MovableIfWindowsFd` but got: " ++ @typeName(rawfd)),
|
||||
};
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) resolved fd={}\n", .{ @intFromPtr(this), fd });
|
||||
bun.assert(this.source == null);
|
||||
const source = switch (Source.open(uv.Loop.get(), fd)) {
|
||||
.result => |source| source,
|
||||
.err => |err| return .{ .err = err },
|
||||
.result => |src| blk: {
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) Source.open succeeded\n", .{@intFromPtr(this)});
|
||||
break :blk src;
|
||||
},
|
||||
.err => |err| {
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) Source.open failed: {}\n", .{ @intFromPtr(this), err });
|
||||
return .{ .err = err };
|
||||
},
|
||||
};
|
||||
// Creating a uv_pipe/uv_tty takes ownership of the file descriptor
|
||||
// TODO: Change the type of the parameter and update all places to
|
||||
// use MovableFD
|
||||
if (switch (source) {
|
||||
const should_take_ownership = switch (source) {
|
||||
.pipe, .tty => true,
|
||||
else => false,
|
||||
} and FDType == *bun.MovableIfWindowsFd) {
|
||||
} and FDType == *bun.MovableIfWindowsFd;
|
||||
if (should_take_ownership) {
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) taking ownership of MovableFD\n", .{@intFromPtr(this)});
|
||||
_ = rawfd.take();
|
||||
} else {
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) not taking ownership (source={s}, FDType={s})\n", .{ @intFromPtr(this), @tagName(source), @typeName(FDType) });
|
||||
}
|
||||
source.setData(this);
|
||||
this.source = source;
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) calling setParent\n", .{@intFromPtr(this)});
|
||||
this.setParent(this.parent);
|
||||
mlog("BaseWindowsPipeWriter start(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
|
||||
return this.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
pub fn setPipe(this: *WindowsPipeWriter, pipe: *uv.Pipe) void {
|
||||
mlog("BaseWindowsPipeWriter setPipe(0x{d}, pipe=0x{d})\n", .{ @intFromPtr(this), @intFromPtr(pipe) });
|
||||
this.source = .{ .pipe = pipe };
|
||||
this.setParent(this.parent);
|
||||
}
|
||||
|
||||
pub fn getStream(this: *const WindowsPipeWriter) ?*uv.uv_stream_t {
|
||||
const source = this.source orelse return null;
|
||||
if (source == .file) return null;
|
||||
return source.toStream();
|
||||
mlog("BaseWindowsPipeWriter getStream(0x{d})\n", .{@intFromPtr(this)});
|
||||
const source = this.source orelse {
|
||||
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> null (no source)\n", .{@intFromPtr(this)});
|
||||
return null;
|
||||
};
|
||||
if (source == .file) {
|
||||
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> null (file source)\n", .{@intFromPtr(this)});
|
||||
return null;
|
||||
}
|
||||
const stream = source.toStream();
|
||||
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> stream=0x{d}\n", .{ @intFromPtr(this), @intFromPtr(stream) });
|
||||
return stream;
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -973,16 +1051,19 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
|
||||
pub const getStream = internals.getStream;
|
||||
|
||||
fn onCloseSource(this: *WindowsWriter) void {
|
||||
mlog("WindowsBufferedWriter onCloseSource(0x{d})\n", .{@intFromPtr(this)});
|
||||
if (onClose) |onCloseFn| {
|
||||
onCloseFn(this.parent);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn memoryCost(this: *const WindowsWriter) usize {
|
||||
mlog("WindowsBufferedWriter memoryCost(0x{d})\n", .{@intFromPtr(this)});
|
||||
return @sizeOf(@This()) + this.write_buffer.len;
|
||||
}
|
||||
|
||||
pub fn startWithCurrentPipe(this: *WindowsWriter) bun.sys.Maybe(void) {
|
||||
mlog("WindowsBufferedWriter startWithCurrentPipe(0x{d})\n", .{@intFromPtr(this)});
|
||||
bun.assert(this.source != null);
|
||||
this.is_done = false;
|
||||
this.write();
|
||||
@@ -991,18 +1072,23 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
|
||||
|
||||
fn onWriteComplete(this: *WindowsWriter, status: uv.ReturnCode) void {
|
||||
const written = this.pending_payload_size;
|
||||
mlog("WindowsBufferedWriter onWriteComplete(0x{d}, written={}, status={})\n", .{ @intFromPtr(this), written, status.int() });
|
||||
this.pending_payload_size = 0;
|
||||
if (status.toError(.write)) |err| {
|
||||
mlog("There was an error during write: {}\n", .{err});
|
||||
this.close();
|
||||
onError(this.parent, err);
|
||||
return;
|
||||
}
|
||||
|
||||
const pending = this.getBufferInternal();
|
||||
const has_pending_data = (pending.len - written) == 0;
|
||||
mlog("has_pending_data: {}, is_done: {}\n", .{ has_pending_data, this.is_done });
|
||||
onWrite(this.parent, @intCast(written), if (this.is_done and !has_pending_data) .drained else .pending);
|
||||
// is_done can be changed inside onWrite
|
||||
if (this.is_done and !has_pending_data) {
|
||||
// already done and end was called
|
||||
mlog("WindowsBufferedWriter onWriteComplete(0x{d}) LIFECYCLE: closing writer after completion (is_done={} no_pending_data={})\n", .{ @intFromPtr(this), this.is_done, !has_pending_data });
|
||||
this.close();
|
||||
return;
|
||||
}
|
||||
@@ -1019,6 +1105,7 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
|
||||
return;
|
||||
}
|
||||
const this = bun.cast(*WindowsWriter, fs.data);
|
||||
mlog("WindowsWriter onFsWriteComplete(0x{d}, result={})\n", .{ @intFromPtr(this), result.int() });
|
||||
|
||||
fs.deinit();
|
||||
if (result.toError(.write)) |err| {
|
||||
@@ -1031,53 +1118,78 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
|
||||
}
|
||||
|
||||
pub fn write(this: *WindowsWriter) void {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) called\n", .{@intFromPtr(this)});
|
||||
const buffer = this.getBufferInternal();
|
||||
mlog("WindowsBufferedWriter write(0x{d}) buffer.len={} is_done={} pending_payload_size={}\n", .{ @intFromPtr(this), buffer.len, this.is_done, this.pending_payload_size });
|
||||
// if we are already done or if we have some pending payload we just wait until next write
|
||||
if (this.is_done or this.pending_payload_size > 0 or buffer.len == 0) {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) exiting early: is_done={} payload_size={} buffer.len={}\n", .{ @intFromPtr(this), this.is_done, this.pending_payload_size, buffer.len });
|
||||
return;
|
||||
}
|
||||
|
||||
const pipe = this.source orelse return;
|
||||
const pipe = this.source orelse {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) no source, returning\n", .{@intFromPtr(this)});
|
||||
return;
|
||||
};
|
||||
mlog("WindowsBufferedWriter write(0x{d}) source type={s}\n", .{ @intFromPtr(this), @tagName(pipe) });
|
||||
switch (pipe) {
|
||||
.sync_file => {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) ERROR: sync_file path reached\n", .{@intFromPtr(this)});
|
||||
@panic("This code path shouldn't be reached - sync_file in PipeWriter.zig");
|
||||
},
|
||||
.file => |file| {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) writing to file, buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
|
||||
this.pending_payload_size = buffer.len;
|
||||
file.fs.deinit();
|
||||
file.fs.setData(this);
|
||||
this.write_buffer = uv.uv_buf_t.init(buffer);
|
||||
|
||||
mlog("WindowsBufferedWriter write(0x{d}) calling uv_fs_write\n", .{@intFromPtr(this)});
|
||||
if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) uv_fs_write failed: {}\n", .{ @intFromPtr(this), err });
|
||||
this.close();
|
||||
onError(this.parent, err);
|
||||
} else {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) uv_fs_write initiated successfully\n", .{@intFromPtr(this)});
|
||||
}
|
||||
},
|
||||
else => {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) writing to stream, buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
|
||||
// the buffered version should always have a stable ptr
|
||||
this.pending_payload_size = buffer.len;
|
||||
this.write_buffer = uv.uv_buf_t.init(buffer);
|
||||
mlog("WindowsBufferedWriter write(0x{d}) calling write_req.write\n", .{@intFromPtr(this)});
|
||||
if (this.write_req.write(pipe.toStream(), &this.write_buffer, this, onWriteComplete).asErr()) |write_err| {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) write_req.write failed: {}\n", .{ @intFromPtr(this), write_err });
|
||||
this.close();
|
||||
onError(this.parent, write_err);
|
||||
} else {
|
||||
mlog("WindowsBufferedWriter write(0x{d}) write_req.write initiated successfully\n", .{@intFromPtr(this)});
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn getBufferInternal(this: *WindowsWriter) []const u8 {
|
||||
return getBuffer(this.parent);
|
||||
const buffer = getBuffer(this.parent);
|
||||
mlog("WindowsBufferedWriter getBufferInternal(0x{d}) -> buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
|
||||
return buffer;
|
||||
}
|
||||
|
||||
pub fn end(this: *WindowsWriter) void {
|
||||
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: end() called, pending_payload_size={}\n", .{ @intFromPtr(this), this.pending_payload_size });
|
||||
if (this.is_done) {
|
||||
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: already done, ignoring\n", .{@intFromPtr(this)});
|
||||
return;
|
||||
}
|
||||
|
||||
this.is_done = true;
|
||||
if (this.pending_payload_size == 0) {
|
||||
// will auto close when pending stuff get written
|
||||
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: closing immediately (no pending data)\n", .{@intFromPtr(this)});
|
||||
this.close();
|
||||
} else {
|
||||
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: waiting for pending write to complete before closing\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1253,34 +1365,47 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
pub const getStream = internals.getStream;
|
||||
|
||||
pub fn memoryCost(this: *const WindowsWriter) usize {
|
||||
return @sizeOf(@This()) + this.current_payload.memoryCost() + this.outgoing.memoryCost();
|
||||
const cost = @sizeOf(@This()) + this.current_payload.memoryCost() + this.outgoing.memoryCost();
|
||||
mlog("WindowsStreamingWriter memoryCost(0x{d}) = {d}\n", .{ @intFromPtr(this), cost });
|
||||
return cost;
|
||||
}
|
||||
|
||||
fn onCloseSource(this: *WindowsWriter) void {
|
||||
mlog("WindowsStreamingWriter onCloseSource(0x{d}) closed_without_reporting={}\n", .{ @intFromPtr(this), this.closed_without_reporting });
|
||||
this.source = null;
|
||||
if (this.closed_without_reporting) {
|
||||
mlog("WindowsStreamingWriter onCloseSource(0x{d}) early return due to closed_without_reporting\n", .{@intFromPtr(this)});
|
||||
this.closed_without_reporting = false;
|
||||
return;
|
||||
}
|
||||
mlog("WindowsStreamingWriter onCloseSource(0x{d}) calling onClose\n", .{@intFromPtr(this)});
|
||||
onClose(this.parent);
|
||||
}
|
||||
|
||||
pub fn startWithCurrentPipe(this: *WindowsWriter) bun.sys.Maybe(void) {
|
||||
mlog("WindowsStreamingWriter startWithCurrentPipe(0x{d}) source_is_null={}\n", .{ @intFromPtr(this), this.source == null });
|
||||
bun.assert(this.source != null);
|
||||
this.is_done = false;
|
||||
mlog("WindowsStreamingWriter startWithCurrentPipe(0x{d}) success, is_done={}\n", .{ @intFromPtr(this), this.is_done });
|
||||
return .success;
|
||||
}
|
||||
|
||||
pub fn hasPendingData(this: *const WindowsWriter) bool {
|
||||
return (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty());
|
||||
const has_pending = (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty());
|
||||
mlog("WindowsStreamingWriter hasPendingData(0x{d}) = {} (outgoing={}, current_payload={})\n", .{ @intFromPtr(this), has_pending, this.outgoing.isNotEmpty(), this.current_payload.isNotEmpty() });
|
||||
return has_pending;
|
||||
}
|
||||
|
||||
fn isDone(this: *WindowsWriter) bool {
|
||||
// done is flags andd no more data queued? so we are done!
|
||||
return this.is_done and !this.hasPendingData();
|
||||
const pending = this.hasPendingData();
|
||||
const done = this.is_done and !pending;
|
||||
mlog("WindowsStreamingWriter isDone(0x{d}) = {} (is_done={}, hasPendingData={})\n", .{ @intFromPtr(this), done, this.is_done, pending });
|
||||
return done;
|
||||
}
|
||||
|
||||
fn onWriteComplete(this: *WindowsWriter, status: uv.ReturnCode) void {
|
||||
mlog("WindowsStreamingWriter onWriteComplete(0x{d}, status={})\n", .{ @intFromPtr(this), status.int() });
|
||||
if (status.toError(.write)) |err| {
|
||||
this.last_write_result = .{ .err = err };
|
||||
log("onWrite() = {s}", .{err.name()});
|
||||
@@ -1323,27 +1448,34 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
|
||||
fn onFsWriteComplete(fs: *uv.fs_t) callconv(.C) void {
|
||||
const result = fs.result;
|
||||
mlog("WindowsStreamingWriter onFsWriteComplete() result={}\n", .{result.int()});
|
||||
if (result.int() == uv.UV_ECANCELED) {
|
||||
mlog("WindowsStreamingWriter onFsWriteComplete() CANCELED path\n", .{});
|
||||
fs.deinit();
|
||||
return;
|
||||
}
|
||||
const this = bun.cast(*WindowsWriter, fs.data);
|
||||
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}, result={})\n", .{ @intFromPtr(this), result.int() });
|
||||
|
||||
fs.deinit();
|
||||
if (result.toError(.write)) |err| {
|
||||
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}) ERROR path: {}\n", .{ @intFromPtr(this), err });
|
||||
this.close();
|
||||
onError(this.parent, err);
|
||||
return;
|
||||
}
|
||||
|
||||
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}) SUCCESS path: calling onWriteComplete\n", .{@intFromPtr(this)});
|
||||
this.onWriteComplete(.zero);
|
||||
}
|
||||
|
||||
/// this tries to send more data returning if we are writable or not after this
|
||||
fn processSend(this: *WindowsWriter) void {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) called\n", .{@intFromPtr(this)});
|
||||
log("processSend", .{});
|
||||
if (this.current_payload.isNotEmpty()) {
|
||||
// we have some pending async request, the next outgoing data will be processed after this finish
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) PENDING path: current_payload not empty, size={}\n", .{ @intFromPtr(this), this.current_payload.size() });
|
||||
this.last_write_result = .{ .pending = 0 };
|
||||
return;
|
||||
}
|
||||
@@ -1351,11 +1483,14 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
const bytes = this.outgoing.slice();
|
||||
// nothing todo (we assume we are writable until we try to write something)
|
||||
if (bytes.len == 0) {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) EMPTY path: no outgoing data\n", .{@intFromPtr(this)});
|
||||
this.last_write_result = .{ .wrote = 0 };
|
||||
return;
|
||||
}
|
||||
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) processing {} bytes\n", .{ @intFromPtr(this), bytes.len });
|
||||
var pipe = this.source orelse {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) ERROR: no source pipe\n", .{@intFromPtr(this)});
|
||||
const err = bun.sys.Error.fromCode(bun.sys.E.PIPE, .pipe);
|
||||
this.last_write_result = .{ .err = err };
|
||||
onError(this.parent, err);
|
||||
@@ -1364,77 +1499,100 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
};
|
||||
|
||||
// current payload is empty we can just swap with outgoing
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) swapping buffers\n", .{@intFromPtr(this)});
|
||||
const temp = this.current_payload;
|
||||
this.current_payload = this.outgoing;
|
||||
this.outgoing = temp;
|
||||
switch (pipe) {
|
||||
.sync_file => {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) PANIC: sync_file should not be reachable\n", .{@intFromPtr(this)});
|
||||
@panic("sync_file pipe write should not be reachable");
|
||||
},
|
||||
.file => |file| {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) FILE path: calling uv_fs_write\n", .{@intFromPtr(this)});
|
||||
file.fs.deinit();
|
||||
file.fs.setData(this);
|
||||
this.write_buffer = uv.uv_buf_t.init(bytes);
|
||||
|
||||
if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) FILE ERROR: uv_fs_write failed: {}\n", .{ @intFromPtr(this), err });
|
||||
this.last_write_result = .{ .err = err };
|
||||
onError(this.parent, err);
|
||||
this.closeWithoutReporting();
|
||||
return;
|
||||
}
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) FILE: uv_fs_write queued successfully\n", .{@intFromPtr(this)});
|
||||
},
|
||||
else => {
|
||||
// enqueue the write
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM path: calling write_req.write\n", .{@intFromPtr(this)});
|
||||
this.write_buffer = uv.uv_buf_t.init(bytes);
|
||||
if (this.write_req.write(pipe.toStream(), &this.write_buffer, this, onWriteComplete).asErr()) |err| {
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM ERROR: write_req.write failed: {}\n", .{ @intFromPtr(this), err });
|
||||
this.last_write_result = .{ .err = err };
|
||||
onError(this.parent, err);
|
||||
this.closeWithoutReporting();
|
||||
return;
|
||||
}
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM: write_req.write queued successfully\n", .{@intFromPtr(this)});
|
||||
},
|
||||
}
|
||||
mlog("WindowsStreamingWriter processSend(0x{d}) setting last_write_result to pending\n", .{@intFromPtr(this)});
|
||||
this.last_write_result = .{ .pending = 0 };
|
||||
}
|
||||
|
||||
const WindowsWriter = @This();
|
||||
|
||||
fn closeWithoutReporting(this: *WindowsWriter) void {
|
||||
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) fd={}, closed_without_reporting={}\n", .{ @intFromPtr(this), this.getFd().cast(), this.closed_without_reporting });
|
||||
if (this.getFd() != bun.invalid_fd) {
|
||||
bun.assert(!this.closed_without_reporting);
|
||||
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) setting flag and calling close\n", .{@intFromPtr(this)});
|
||||
this.closed_without_reporting = true;
|
||||
this.close();
|
||||
} else {
|
||||
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) invalid fd, skipping close\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deinit(this: *WindowsWriter) void {
|
||||
mlog("WindowsStreamingWriter deinit(0x{d}) outgoing_size={}, current_payload_size={}\n", .{ @intFromPtr(this), this.outgoing.size(), this.current_payload.size() });
|
||||
// clean both buffers if needed
|
||||
this.outgoing.deinit();
|
||||
this.current_payload.deinit();
|
||||
mlog("WindowsStreamingWriter deinit(0x{d}) buffers cleaned, calling closeWithoutReporting\n", .{@intFromPtr(this)});
|
||||
this.closeWithoutReporting();
|
||||
}
|
||||
|
||||
fn writeInternal(this: *WindowsWriter, buffer: anytype, comptime writeFn: anytype) WriteResult {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) buffer_len={}, is_done={}\n", .{ @intFromPtr(this), @as(usize, buffer.len), this.is_done });
|
||||
if (this.is_done) {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) DONE path: already ended\n", .{@intFromPtr(this)});
|
||||
return .{ .done = 0 };
|
||||
}
|
||||
|
||||
if (this.source != null and this.source.? == .sync_file) {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE path\n", .{@intFromPtr(this)});
|
||||
defer this.outgoing.reset();
|
||||
var remain = StreamBuffer.writeOrFallback(&this.outgoing, buffer, comptime writeFn) catch {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE OOM error\n", .{@intFromPtr(this)});
|
||||
return .{ .err = bun.sys.Error.oom };
|
||||
};
|
||||
const initial_len = remain.len;
|
||||
const fd: bun.FD = .fromUV(this.source.?.sync_file.file);
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE writing {} bytes to fd={}\n", .{ @intFromPtr(this), initial_len, fd.cast() });
|
||||
|
||||
while (remain.len > 0) {
|
||||
switch (fd.write(remain)) {
|
||||
.err => |err| {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE write error: {}\n", .{ @intFromPtr(this), err });
|
||||
return .{ .err = err };
|
||||
},
|
||||
.result => |wrote| {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE wrote {} bytes, {} remaining\n", .{ @intFromPtr(this), wrote, remain.len - wrote });
|
||||
remain = remain[wrote..];
|
||||
if (wrote == 0) {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE wrote 0, breaking\n", .{@intFromPtr(this)});
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -1442,6 +1600,7 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
}
|
||||
|
||||
const wrote = initial_len - remain.len;
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE total wrote={}\n", .{ @intFromPtr(this), wrote });
|
||||
if (wrote == 0) {
|
||||
return .{ .done = wrote };
|
||||
}
|
||||
@@ -1449,56 +1608,76 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
|
||||
}
|
||||
|
||||
const had_buffered_data = this.outgoing.isNotEmpty();
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC path: had_buffered_data={}, outgoing_size={}\n", .{ @intFromPtr(this), had_buffered_data, this.outgoing.size() });
|
||||
(if (comptime @TypeOf(writeFn) == @TypeOf(&StreamBuffer.writeLatin1) and writeFn == &StreamBuffer.writeLatin1)
|
||||
writeFn(&this.outgoing, buffer, true)
|
||||
else
|
||||
writeFn(&this.outgoing, buffer)) catch {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC OOM error during buffer write\n", .{@intFromPtr(this)});
|
||||
return .{ .err = bun.sys.Error.oom };
|
||||
};
|
||||
if (had_buffered_data) {
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC had buffered data, returning pending\n", .{@intFromPtr(this)});
|
||||
return .{ .pending = 0 };
|
||||
}
|
||||
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC calling processSend\n", .{@intFromPtr(this)});
|
||||
this.processSend();
|
||||
return this.last_write_result;
|
||||
}
|
||||
|
||||
pub fn writeUTF16(this: *WindowsWriter, buf: []const u16) WriteResult {
|
||||
mlog("WindowsStreamingWriter writeUTF16(0x{d}, buf.len={})\n", .{ @intFromPtr(this), buf.len });
|
||||
return writeInternal(this, buf, &StreamBuffer.writeUTF16);
|
||||
}
|
||||
|
||||
pub fn writeLatin1(this: *WindowsWriter, buffer: []const u8) WriteResult {
|
||||
mlog("WindowsStreamingWriter writeLatin1(0x{d}, buffer.len={})\n", .{ @intFromPtr(this), buffer.len });
|
||||
return writeInternal(this, buffer, &StreamBuffer.writeLatin1);
|
||||
}
|
||||
|
||||
pub fn write(this: *WindowsWriter, buffer: []const u8) WriteResult {
|
||||
mlog("WindowsStreamingWriter write(0x{d}, buffer.len={})\n", .{ @intFromPtr(this), buffer.len });
|
||||
return writeInternal(this, buffer, &StreamBuffer.write);
|
||||
}
|
||||
|
||||
pub fn flush(this: *WindowsWriter) WriteResult {
|
||||
mlog("WindowsStreamingWriter flush(0x{d}) is_done={}\n", .{ @intFromPtr(this), this.is_done });
|
||||
if (this.is_done) {
|
||||
mlog("WindowsStreamingWriter flush(0x{d}) DONE path: already ended\n", .{@intFromPtr(this)});
|
||||
return .{ .done = 0 };
|
||||
}
|
||||
if (!this.hasPendingData()) {
|
||||
const has_pending = this.hasPendingData();
|
||||
if (!has_pending) {
|
||||
mlog("WindowsStreamingWriter flush(0x{d}) NO_PENDING path: no data to flush\n", .{@intFromPtr(this)});
|
||||
return .{ .wrote = 0 };
|
||||
}
|
||||
|
||||
mlog("WindowsStreamingWriter flush(0x{d}) calling processSend\n", .{@intFromPtr(this)});
|
||||
this.processSend();
|
||||
return this.last_write_result;
|
||||
}
|
||||
|
||||
pub fn end(this: *WindowsWriter) void {
|
||||
mlog("WindowsStreamingWriter end(0x{d}) is_done={}, owns_fd={}\n", .{ @intFromPtr(this), this.is_done, this.owns_fd });
|
||||
if (this.is_done) {
|
||||
mlog("WindowsStreamingWriter end(0x{d}) already done, returning\n", .{@intFromPtr(this)});
|
||||
return;
|
||||
}
|
||||
|
||||
this.closed_without_reporting = false;
|
||||
this.is_done = true;
|
||||
|
||||
if (!this.hasPendingData()) {
|
||||
const has_pending = this.hasPendingData();
|
||||
mlog("WindowsStreamingWriter end(0x{d}) hasPendingData={}\n", .{ @intFromPtr(this), has_pending });
|
||||
if (!has_pending) {
|
||||
if (!this.owns_fd) {
|
||||
mlog("WindowsStreamingWriter end(0x{d}) not owning fd, returning\n", .{@intFromPtr(this)});
|
||||
return;
|
||||
}
|
||||
mlog("WindowsStreamingWriter end(0x{d}) calling close\n", .{@intFromPtr(this)});
|
||||
this.close();
|
||||
} else {
|
||||
mlog("WindowsStreamingWriter end(0x{d}) has pending data, will close after writes complete\n", .{@intFromPtr(this)});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
70
src/mlog.zig
Normal file
70
src/mlog.zig
Normal file
@@ -0,0 +1,70 @@
|
||||
const std = @import("std");
|
||||
const c = @cImport({
|
||||
@cInclude("stdlib.h");
|
||||
});
|
||||
|
||||
const Logger = struct {
|
||||
var log_file: ?std.fs.File = null;
|
||||
var proc_id: std.os.windows.DWORD = 0;
|
||||
var name_buf: [64]u8 = undefined;
|
||||
var name: []u8 = undefined;
|
||||
|
||||
export fn cleanup() void {
|
||||
if (Logger.log_file) |*file| {
|
||||
file.sync() catch {
|
||||
@panic("Failed to flush mlog.txt: {any}\n");
|
||||
};
|
||||
|
||||
file.close();
|
||||
Logger.log_file = null;
|
||||
}
|
||||
|
||||
std.debug.print("Saved output to {s}.\n", .{Logger.name});
|
||||
}
|
||||
|
||||
fn getAndOpenFile() *std.fs.File {
|
||||
if (Logger.log_file) |*file| {
|
||||
return file;
|
||||
}
|
||||
|
||||
Logger.proc_id = std.os.windows.GetCurrentProcessId();
|
||||
|
||||
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
|
||||
defer _ = gpa.deinit();
|
||||
|
||||
var cwd = std.fs.cwd();
|
||||
|
||||
Logger.name = std.fmt.bufPrint(&name_buf, "mlog_{}.txt", .{Logger.proc_id}) catch {
|
||||
@panic("Failed to format mlog filename: {any}\n");
|
||||
};
|
||||
|
||||
Logger.log_file = cwd.openFile(Logger.name, .{
|
||||
.mode = .read_write,
|
||||
}) catch
|
||||
cwd.createFile(
|
||||
Logger.name,
|
||||
.{
|
||||
.read = true,
|
||||
.truncate = false,
|
||||
},
|
||||
) catch
|
||||
@panic("Failed to create mlog.txt: {any}\n");
|
||||
|
||||
_ = c.atexit(cleanup);
|
||||
|
||||
return &(Logger.log_file orelse unreachable);
|
||||
}
|
||||
|
||||
pub fn log(comptime fmt: []const u8, args: anytype) void {
|
||||
const file = Logger.getAndOpenFile();
|
||||
|
||||
const writer = file.writer();
|
||||
|
||||
const nanos = std.time.nanoTimestamp();
|
||||
std.fmt.format(writer, "[{}] (pid {}) " ++ fmt, .{ nanos, Logger.proc_id } ++ args) catch {
|
||||
@panic("Failed to write to mlog.txt: {any}\n");
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
pub const log = Logger.log;
|
||||
@@ -1,3 +1,4 @@
|
||||
const mlog = @import("../mlog.zig").log;
|
||||
// const IPC = @import("../bun.js/ipc.zig");
|
||||
|
||||
pub const Stdio = util.Stdio;
|
||||
@@ -1106,6 +1107,7 @@ pub const PipeReader = struct {
|
||||
.out_type = out_type,
|
||||
});
|
||||
log("PipeReader(0x{x}, {s}) create()", .{ @intFromPtr(this), @tagName(this.out_type) });
|
||||
mlog("PipeReader(0x{x}, {s}) create()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
|
||||
|
||||
if (capture) |cap| {
|
||||
this.captured_writer.writer = cap.refSelf();
|
||||
@@ -1131,10 +1133,12 @@ pub const PipeReader = struct {
|
||||
}
|
||||
|
||||
pub fn start(this: *PipeReader, process: *ShellSubprocess, event_loop: jsc.EventLoopHandle) bun.sys.Maybe(void) {
|
||||
mlog("PipeReader(0x{x}, {s}) start()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
|
||||
// this.ref();
|
||||
this.process = process;
|
||||
this.event_loop = event_loop;
|
||||
if (Environment.isWindows) {
|
||||
mlog("Starting with current pipe...\n", .{});
|
||||
return this.reader.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
@@ -1161,6 +1165,7 @@ pub const PipeReader = struct {
|
||||
var this: *PipeReader = @ptrCast(@alignCast(ptr));
|
||||
this.buffered_output.append(chunk);
|
||||
log("PipeReader(0x{x}, {s}) onReadChunk(chunk_len={d}, has_more={s})", .{ @intFromPtr(this), @tagName(this.out_type), chunk.len, @tagName(has_more) });
|
||||
mlog("PipeReader(0x{x}, {s}) onReadChunk(chunk_len={d}, has_more={s})\n", .{ @intFromPtr(this), @tagName(this.out_type), chunk.len, @tagName(has_more) });
|
||||
|
||||
this.captured_writer.doWrite(chunk);
|
||||
|
||||
@@ -1179,6 +1184,7 @@ pub const PipeReader = struct {
|
||||
}
|
||||
|
||||
pub fn onReaderDone(this: *PipeReader) void {
|
||||
mlog("The reader is done...\n", .{});
|
||||
log("onReaderDone(0x{x}, {s})", .{ @intFromPtr(this), @tagName(this.out_type) });
|
||||
const owned = this.toOwnedSlice();
|
||||
this.state = .{ .done = owned };
|
||||
@@ -1267,6 +1273,7 @@ pub const PipeReader = struct {
|
||||
}
|
||||
|
||||
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) jsc.JSValue {
|
||||
mlog("PipeReader(0x{x}, {s}) toReadableStream()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
|
||||
defer this.deinit();
|
||||
|
||||
switch (this.state) {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//!
|
||||
//! Sometimes this namespace is referred to as "Syscall", prefer "bun.sys"/"sys"
|
||||
|
||||
const mlog = @import("./mlog.zig").log;
|
||||
const This = @This();
|
||||
|
||||
//
|
||||
@@ -1550,6 +1551,7 @@ pub fn write(fd: bun.FileDescriptor, bytes: []const u8) Maybe(usize) {
|
||||
// "WriteFile sets this value to zero before doing any work or error checking."
|
||||
var bytes_written: u32 = undefined;
|
||||
bun.assert(bytes.len > 0);
|
||||
mlog("We're about to WriteFile({}, {})\n", .{ fd, adjusted_len });
|
||||
const rc = kernel32.WriteFile(
|
||||
fd.cast(),
|
||||
bytes.ptr,
|
||||
|
||||
@@ -442,13 +442,13 @@ for (let [gcTick, label] of [
|
||||
}
|
||||
|
||||
const fixtures = [
|
||||
[helloWorld, "hello"],
|
||||
//[helloWorld, "hello"],
|
||||
[huge, hugeString],
|
||||
] as const;
|
||||
|
||||
for (const [callback, fixture] of fixtures) {
|
||||
describe(fixture.slice(0, 12), () => {
|
||||
describe("should should allow reading stdout", () => {
|
||||
describe("should allow reading stdout", () => {
|
||||
it("before exit", async () => {
|
||||
const process = callback();
|
||||
const output = await process.stdout.text();
|
||||
|
||||
Reference in New Issue
Block a user