Compare commits

...

6 Commits

Author SHA1 Message Date
Marko Vejnovic
f746aa7c91 More random debugging 2025-09-24 16:21:33 -07:00
Marko Vejnovic
aa5a301a1a Comment fold 2025-09-24 13:43:54 -07:00
Marko Vejnovic
3564991a89 Fix libuv reading. 2025-09-24 13:43:45 -07:00
Marko Vejnovic
6e305460c1 State of debugging 2025-09-24 13:35:47 -07:00
Marko Vejnovic
ddec6fc3ce Lack of progress 2025-09-23 18:38:16 -07:00
Marko Vejnovic
57b7876286 Uprev libuv to 1.51.0 2025-09-22 18:45:00 -07:00
18 changed files with 503 additions and 67 deletions

View File

@@ -4,7 +4,7 @@ register_repository(
REPOSITORY
libuv/libuv
COMMIT
da527d8d2a908b824def74382761566371439003
5152db2cbfeb5582e9c27c5ea1dba2cd9e10759b
)
if(WIN32)

View File

@@ -0,0 +1,7 @@
let count = 0;
process.stdin.on("data", chunk => {
count += chunk.length;
});
process.stdin.on("end", () => {
console.log(count);
});

1
repro-hell/reader.ts Normal file
View File

@@ -0,0 +1 @@
process.stdin.pipe(process.stdout);

View File

@@ -1,3 +1,5 @@
const mlog = @import("../../../../mlog.zig").log;
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
return struct {
const This = @This();
@@ -35,6 +37,7 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
}
pub fn close(this: *This) void {
mlog("StaticPipeWriter(0x{x}) close()\n", .{@intFromPtr(this)});
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
this.writer.close();
}
@@ -60,6 +63,14 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
}
pub fn start(this: *This) bun.sys.Maybe(void) {
mlog(
"StaticPipeWriter(0x{x}, writer={s}) start() LIFECYCLE: buffer_size={d}\n",
.{
@intFromPtr(this),
@typeName(@TypeOf(this.writer)),
this.source.slice().len,
},
);
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
this.ref();
this.buffer = this.source.slice();
@@ -83,19 +94,25 @@ pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
mlog("StaticPipeWriter(0x{x}) onWrite(amount={d} {}) LIFECYCLE: buffer_remaining_before={d}\n", .{ @intFromPtr(this), amount, status, this.buffer.len });
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
const remaining_after = this.buffer.len;
mlog("StaticPipeWriter(0x{x}) LIFECYCLE: buffer_remaining_after={d} will_close={}\n", .{ @intFromPtr(this), remaining_after, (status == .end_of_file or remaining_after == 0) });
if (status == .end_of_file or this.buffer.len == 0) {
mlog("StaticPipeWriter(0x{x}) LIFECYCLE: closing writer (finished sending all data)\n", .{@intFromPtr(this)});
this.writer.close();
}
}
pub fn onError(this: *This, err: bun.sys.Error) void {
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
mlog("StaticPipeWriter(0x{x}) onError(err={any})\n", .{ @intFromPtr(this), err });
this.source.detach();
}
pub fn onClose(this: *This) void {
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
mlog("StaticPipeWriter(0x{x}) onClose() LIFECYCLE: stdin writer closed, notifying subprocess\n", .{@intFromPtr(this)});
this.source.detach();
this.process.onCloseIO(.stdin);
}

View File

@@ -31,10 +31,10 @@
*/
#define UV_VERSION_MAJOR 1
#define UV_VERSION_MINOR 50
#define UV_VERSION_PATCH 1
#define UV_VERSION_IS_RELEASE 0
#define UV_VERSION_SUFFIX "dev"
#define UV_VERSION_MINOR 51
#define UV_VERSION_PATCH 0
#define UV_VERSION_IS_RELEASE 1
#define UV_VERSION_SUFFIX ""
#define UV_VERSION_HEX ((UV_VERSION_MAJOR << 16) | \
(UV_VERSION_MINOR << 8) | \

View File

@@ -1,3 +1,4 @@
const mlog = @import("../mlog.zig").log;
const RareData = @This();
websocket_deflate: ?*WebSocketDeflate.RareData = null,
@@ -376,6 +377,7 @@ pub fn stdout(rare: *RareData) *Blob.Store {
pub fn stdin(rare: *RareData) *Blob.Store {
bun.analytics.Features.@"Bun.stdin" += 1;
mlog("RareData stdin() called - creating/returning stdin Blob.Store\n", .{});
return rare.stdin_store orelse brk: {
var mode: bun.Mode = 0;
const fd = bun.FD.fromUV(0);
@@ -397,6 +399,7 @@ pub fn stdin(rare: *RareData) *Blob.Store {
},
},
});
mlog("RareData stdin() created new Blob.Store for fd=0, is_atty={?}, mode={?}\n", .{ store.data.file.is_atty, mode });
rare.stdin_store = store;
break :brk store;
};

View File

@@ -3,6 +3,7 @@
//! objects that reference the filesystem (Blob.Store.File). This is how
//! operations like writing `Store.File` to another `Store.File` knows to use a
//! basic file copy instead of a naive read write loop.
const mlog = @import("../../mlog.zig").log;
const Blob = @This();
@@ -1997,6 +1998,7 @@ pub fn getStream(
recommended_chunk_size = @as(SizeType, @intCast(@max(0, @as(i52, @truncate(arguments[0].toInt64())))));
}
mlog("Blob.stream() creating ReadableStream for blob, chunk_size={}, store_type={s}\n", .{ recommended_chunk_size, if (this.store) |store| @tagName(store.data) else "null" });
const stream = try jsc.WebCore.ReadableStream.fromBlobCopyRef(
globalThis,
this,

View File

@@ -1,3 +1,4 @@
const mlog = @import("../../mlog.zig").log;
const FileReader = @This();
const log = Output.scoped(.FileReader, .visible);
@@ -175,6 +176,8 @@ pub fn setup(
}
pub fn onStart(this: *FileReader) streams.Start {
mlog("FileReader onStart()\n", .{});
this.reader.setParent(this);
const was_lazy = this.lazy != .none;
var pollable = false;
@@ -218,6 +221,7 @@ pub fn onStart(this: *FileReader) streams.Start {
_ = this.parent().incrementCount();
this.waiting_for_onReaderDone = true;
if (this.start_offset) |offset| {
mlog("FileReader starting with offset: fd={}, offset={}, pollable={}\n", .{ this.fd, offset, pollable });
switch (this.reader.startFileOffset(this.fd, pollable, offset)) {
.result => {},
.err => |e| {
@@ -225,6 +229,7 @@ pub fn onStart(this: *FileReader) streams.Start {
},
}
} else {
mlog("FileReader starting: fd={}, pollable={}, max_size={?}\n", .{ this.fd, pollable, this.max_size });
switch (this.reader.start(this.fd, pollable)) {
.result => {},
.err => |e| {
@@ -304,6 +309,7 @@ pub fn deinit(this: *FileReader) void {
pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState) bool {
var buf = init_buf;
log("onReadChunk() = {d} ({s}) - read_inside_on_pull: {s}", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });
mlog("FileReader onReadChunk(len={d}, state={s} - read_inside_on_pull: {s})\n", .{ buf.len, @tagName(state), @tagName(this.read_inside_on_pull) });
if (this.done) {
this.reader.close();
@@ -312,10 +318,14 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
var close = false;
defer if (close) this.reader.close();
var hasMore = state != .eof;
mlog("FileReader onReadChunk: hasMore={} (state={s})\n", .{ hasMore, @tagName(state) });
if (buf.len > 0) {
if (this.max_size) |max_size| {
if (this.total_readed >= max_size) return false;
if (this.total_readed >= max_size) {
mlog("FileReader onReadChunk: returning false (max_size reached)\n", .{});
return false;
}
const len = @min(max_size - this.total_readed, buf.len);
if (buf.len > len) {
buf = buf[0..len];
@@ -327,9 +337,12 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
hasMore = false;
}
}
} else {
mlog("FileReader onReadChunk: 0-byte read, hasMore={}, reader.isDone={}\n", .{ hasMore, this.reader.isDone() });
}
const reader_buffer = this.reader.buffer();
mlog("FileReader onReadChunk: read_inside_on_pull={s}, buf.len={d}\n", .{ @tagName(this.read_inside_on_pull), buf.len });
if (this.read_inside_on_pull != .none) {
switch (this.read_inside_on_pull) {
.js => |in_progress| {
@@ -358,6 +371,14 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
}
if (buf.len == 0) {
// Certain readers (such as pipes) may return 0-byte reads even when
// not at EOF. Consequently, we need to check whether the reader is
// actually done or not.
if (!this.reader.isDone()) {
// If the reader is not done, we still want to keep reading.
return true;
}
if (this.buffered.items.len == 0) {
this.buffered.clearAndFree(bun.default_allocator);
this.buffered = reader_buffer.moveToUnmanaged();
@@ -375,6 +396,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
} else {
this.pending.result = .{ .done = {} };
}
mlog("FileReader onReadChunk: returning false (true EOF with 0-byte read)\n", .{});
return false;
}
@@ -394,6 +416,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
.{ .into_array_and_done = into_array }
else
.{ .into_array = into_array };
mlog("FileReader onReadChunk: returning {} (pending_view path)\n", .{!was_done});
return !was_done;
}
@@ -407,6 +430,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
reader_buffer.clearRetainingCapacity();
this.pending.result = .{ .temporary = .fromBorrowedSliceDangerous(buf) };
}
mlog("FileReader onReadChunk: returning {} (slice in reader buffer)\n", .{!was_done});
return !was_done;
}
@@ -415,6 +439,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
.{ .temporary_and_done = .fromBorrowedSliceDangerous(buf) }
else
.{ .temporary = .fromBorrowedSliceDangerous(buf) };
mlog("FileReader onReadChunk: returning {} (slice not in buffered)\n", .{!was_done});
return !was_done;
}
@@ -427,6 +452,7 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
.{ .owned_and_done = .moveFromList(&buffered) }
else
.{ .owned = .moveFromList(&buffered) };
mlog("FileReader onReadChunk: returning {} (buffered path)\n", .{!was_done});
return !was_done;
} else if (!bun.isSliceInBuffer(buf, this.buffered.allocatedSlice())) {
bun.handleOom(this.buffered.appendSlice(bun.default_allocator, buf));
@@ -436,9 +462,11 @@ pub fn onReadChunk(this: *@This(), init_buf: []const u8, state: bun.io.ReadState
}
// For pipes, we have to keep pulling or the other process will block.
return this.read_inside_on_pull != .temporary and
const should_continue = this.read_inside_on_pull != .temporary and
!(this.buffered.items.len + reader_buffer.items.len >= this.highwater_mark and
!this.reader.flags.pollable);
mlog("FileReader onReadChunk: final return should_continue={}, read_inside_on_pull={s}, buffered_len={d}\n", .{ should_continue, @tagName(this.read_inside_on_pull), this.buffered.items.len });
return should_continue;
}
fn isPulling(this: *const FileReader) bool {
@@ -446,6 +474,7 @@ fn isPulling(this: *const FileReader) bool {
}
pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Result {
mlog("FileReader onPull(buffer_len={d}) called\n", .{buffer.len});
array.ensureStillAlive();
defer array.ensureStillAlive();
const drained = this.drain();
@@ -461,20 +490,25 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
this.buffered.clearAndFree(bun.default_allocator);
if (this.reader.isDone()) {
mlog("FileReader onPull(buffer_len={d}) returning INTO_ARRAY_AND_DONE with {d} bytes\n", .{ buffer.len, drained.len });
return .{ .into_array_and_done = .{ .value = array, .len = drained.len } };
} else {
mlog("FileReader onPull(buffer_len={d}) returning INTO_ARRAY with {d} bytes\n", .{ buffer.len, drained.len });
return .{ .into_array = .{ .value = array, .len = drained.len } };
}
}
if (this.reader.isDone()) {
mlog("FileReader onPull(buffer_len={d}) returning OWNED_AND_DONE with {d} bytes\n", .{ buffer.len, drained.len });
return .{ .owned_and_done = drained };
} else {
mlog("FileReader onPull(buffer_len={d}) returning OWNED with {d} bytes\n", .{ buffer.len, drained.len });
return .{ .owned = drained };
}
}
if (this.reader.isDone()) {
mlog("FileReader onPull(buffer_len={d}) returning DONE (no drained data)\n", .{buffer.len});
return .{ .done = {} };
}
@@ -521,7 +555,7 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
if (this.reader.isDone()) {
log("onPull({d}) = done", .{buffer.len});
mlog("FileReader onPull(buffer_len={d}) returning DONE\n", .{buffer.len});
return .{ .done = {} };
}
}
@@ -530,7 +564,7 @@ pub fn onPull(this: *FileReader, buffer: []u8, array: jsc.JSValue) streams.Resul
this.pending_view = buffer;
log("onPull({d}) = pending", .{buffer.len});
mlog("FileReader onPull(buffer_len={d}) returning PENDING\n", .{buffer.len});
return .{ .pending = &this.pending };
}
@@ -563,37 +597,41 @@ fn consumeReaderBuffer(this: *FileReader) void {
pub fn onReaderDone(this: *FileReader) void {
log("onReaderDone()", .{});
if (!this.isPulling()) {
this.consumeReaderBuffer();
if (this.pending.state == .pending) {
if (this.buffered.items.len > 0) {
this.pending.result = .{ .owned_and_done = bun.ByteList.moveFromList(&this.buffered) };
} else {
this.pending.result = .{ .done = {} };
}
this.buffered = .{};
this.pending.run();
} else if (this.buffered.items.len > 0) {
const this_value = this.parent().this_jsvalue;
const globalThis = this.parent().globalThis;
if (this_value != .zero) {
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| {
const buffered = this.buffered;
this.buffered = .{};
this.parent().incrementCount();
defer _ = this.parent().decrementCount();
this.eventLoop().js.runCallback(
cb,
globalThis,
.js_undefined,
&.{
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| {
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } };
return;
},
mlog("FileReader onReaderDone()\n", .{});
if (this.isPulling()) {
return;
}
this.consumeReaderBuffer();
if (this.pending.state == .pending) {
if (this.buffered.items.len > 0) {
this.pending.result = .{ .owned_and_done = bun.ByteList.moveFromList(&this.buffered) };
} else {
this.pending.result = .{ .done = {} };
}
this.buffered = .{};
this.pending.run();
} else if (this.buffered.items.len > 0) {
const this_value = this.parent().this_jsvalue;
const globalThis = this.parent().globalThis;
if (this_value != .zero) {
if (Source.js.onDrainCallbackGetCached(this_value)) |cb| {
const buffered = this.buffered;
this.buffered = .{};
this.parent().incrementCount();
defer _ = this.parent().decrementCount();
this.eventLoop().js.runCallback(
cb,
globalThis,
.js_undefined,
&.{
jsc.ArrayBuffer.fromBytes(buffered.items, .Uint8Array).toJS(globalThis) catch |err| {
this.pending.result = .{ .err = .{ .WeakJSValue = globalThis.takeException(err) } };
return;
},
);
}
},
);
}
}
}

View File

@@ -1,3 +1,4 @@
const mlog = @import("../../mlog.zig").log;
const ReadableStream = @This();
value: JSValue,
@@ -116,6 +117,7 @@ pub fn toAnyBlob(
}
pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
mlog("ReadableStream(0x{x}) done()\n", .{@intFromPtr(this)});
jsc.markBinding(@src());
// done is called when we are done consuming the stream
// cancel actually mark the stream source as done
@@ -136,6 +138,7 @@ pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
}
pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
mlog("ReadableStream(0x{x}) cancel()\n", .{@intFromPtr(this)});
jsc.markBinding(@src());
// cancel the stream
ReadableStream__cancel(this.value, globalThis);
@@ -144,6 +147,7 @@ pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
}
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
mlog("ReadableStream(0x{x}) abort()\n", .{@intFromPtr(this)});
jsc.markBinding(@src());
// for now we are just calling cancel should be fine
this.cancel(globalThis);
@@ -313,6 +317,8 @@ pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommend
return reader.toReadableStream(globalThis);
},
.file => {
const fd = if (store.data.file.pathlike == .fd) store.data.file.pathlike.fd else bun.invalid_fd;
mlog("ReadableStream.fromBlobCopyRef FILE case: fd={}, start_offset={}, max_size={?}, blob_size={}\n", .{ fd, blob.offset, if (blob.size != Blob.max_size) blob.size else null, blob.size });
var reader = webcore.FileReader.Source.new(.{
.globalThis = globalThis,
.context = .{
@@ -326,7 +332,7 @@ pub fn fromBlobCopyRef(globalThis: *JSGlobalObject, blob: *const Blob, recommend
},
});
store.ref();
mlog("ReadableStream.fromBlobCopyRef created FileReader.Source, returning stream\n", .{});
return reader.toReadableStream(globalThis);
},
.s3 => |*s3| {

View File

@@ -143,10 +143,10 @@ pub const UV__ENODATA = -@as(c_int, 4024);
pub const UV__EUNATCH = -@as(c_int, 4023);
pub const UV_VERSION_H = "";
pub const UV_VERSION_MAJOR = @as(c_int, 1);
pub const UV_VERSION_MINOR = @as(c_int, 46);
pub const UV_VERSION_PATCH = @as(c_int, 1);
pub const UV_VERSION_MINOR = @as(c_int, 51);
pub const UV_VERSION_PATCH = @as(c_int, 0);
pub const UV_VERSION_IS_RELEASE = @as(c_int, 0);
pub const UV_VERSION_SUFFIX = "dev";
pub const UV_VERSION_SUFFIX = "";
pub const UV_VERSION_HEX = ((UV_VERSION_MAJOR << @as(c_int, 16)) | (UV_VERSION_MINOR << @as(c_int, 8))) | UV_VERSION_PATCH;
pub const UV_THREADPOOL_H_ = "";
@@ -2981,7 +2981,9 @@ fn StreamMixin(comptime Type: type) type {
req.readStop();
error_cb(context_data, ReturnCodeI64.init(nreads).errEnum() orelse bun.sys.E.CANCELED);
} else {
read_cb(context_data, buffer.slice());
const actual_bytes_read = @as(usize, @intCast(nreads));
bun.sys.syslog("libuv uvReadcb: nreads={d} buffer.len={d} actual_bytes={d}", .{ nreads, buffer.len, actual_bytes_read });
read_cb(context_data, buffer.base[0..actual_bytes_read]);
}
}
};

View File

@@ -79,29 +79,71 @@ fn onPipeClose(this: *WindowsNamedPipe) void {
}
fn onReadAlloc(this: *WindowsNamedPipe, suggested_size: usize) []u8 {
mlog("WindowsNamedPipe onReadAlloc(0x{d}) suggested_size={}\n", .{ @intFromPtr(this), suggested_size });
var available = this.incoming.unusedCapacitySlice();
mlog("WindowsNamedPipe onReadAlloc(0x{d}) initial_available_len={} incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), available.len, this.incoming.len, this.incoming.cap });
if (available.len < suggested_size) {
mlog("WindowsNamedPipe onReadAlloc(0x{d}) need to expand: available={} < suggested={}\n", .{ @intFromPtr(this), available.len, suggested_size });
bun.handleOom(this.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size));
available = this.incoming.unusedCapacitySlice();
mlog("WindowsNamedPipe onReadAlloc(0x{d}) after expansion: available_len={} incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), available.len, this.incoming.len, this.incoming.cap });
}
return available.ptr[0..suggested_size];
const result_slice = available.ptr[0..suggested_size];
mlog("WindowsNamedPipe onReadAlloc(0x{d}) returning slice: ptr=0x{d} len={}\n", .{ @intFromPtr(this), @intFromPtr(result_slice.ptr), result_slice.len });
return result_slice;
}
fn onRead(this: *WindowsNamedPipe, buffer: []const u8) void {
log("onRead ({})", .{buffer.len});
mlog("WindowsNamedPipe onRead(0x{d}) buffer.len={} buffer.ptr=0x{d}\n", .{ @intFromPtr(this), buffer.len, @intFromPtr(buffer.ptr) });
mlog("WindowsNamedPipe onRead(0x{d}) BEFORE: incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), this.incoming.len, this.incoming.cap });
// Log the first and last few bytes of the buffer to see what we received
if (buffer.len > 0) {
const preview_len = @min(buffer.len, 32);
if (preview_len > 0) {
mlog("WindowsNamedPipe onRead(0x{d}) buffer first {} bytes content logged\n", .{ @intFromPtr(this), preview_len });
}
if (buffer.len > 64) {
const end_preview_len = @min(32, buffer.len);
mlog("WindowsNamedPipe onRead(0x{d}) buffer last {} bytes content logged\n", .{ @intFromPtr(this), end_preview_len });
}
}
this.incoming.len += @as(u32, @truncate(buffer.len));
mlog("WindowsNamedPipe onRead(0x{d}) AFTER adding buffer: incoming_len={} incoming_cap={}\n", .{ @intFromPtr(this), this.incoming.len, this.incoming.cap });
bun.assert(this.incoming.len <= this.incoming.cap);
bun.assert(bun.isSliceInBuffer(buffer, this.incoming.allocatedSlice()));
const data = this.incoming.slice();
mlog("WindowsNamedPipe onRead(0x{d}) final data.len={} data.ptr=0x{d}\n", .{ @intFromPtr(this), data.len, @intFromPtr(data.ptr) });
// Log content of final data to see what gets passed to handlers
if (data.len > 0) {
const data_preview_len = @min(data.len, 32);
mlog("WindowsNamedPipe onRead(0x{d}) final data first {} bytes content logged\n", .{ @intFromPtr(this), data_preview_len });
if (data.len > 64) {
const data_end_preview_len = @min(32, data.len);
mlog("WindowsNamedPipe onRead(0x{d}) final data last {} bytes content logged\n", .{ @intFromPtr(this), data_end_preview_len });
}
}
this.resetTimeout();
if (this.wrapper) |*wrapper| {
mlog("WindowsNamedPipe onRead(0x{d}) calling wrapper.receiveData with {} bytes\n", .{ @intFromPtr(this), data.len });
wrapper.receiveData(data);
} else {
mlog("WindowsNamedPipe onRead(0x{d}) calling handlers.onData with {} bytes\n", .{ @intFromPtr(this), data.len });
this.handlers.onData(this.handlers.ctx, data);
}
mlog("WindowsNamedPipe onRead(0x{d}) resetting incoming.len to 0\n", .{@intFromPtr(this)});
this.incoming.len = 0;
}
@@ -207,13 +249,18 @@ fn internalWrite(this: *WindowsNamedPipe, encoded_data: []const u8) void {
}
pub fn resumeStream(this: *WindowsNamedPipe) bool {
mlog("WindowsNamedPipe resumeStream(0x{d}) called\n", .{@intFromPtr(this)});
const stream = this.writer.getStream() orelse {
mlog("WindowsNamedPipe resumeStream(0x{d}) failed: no stream\n", .{@intFromPtr(this)});
return false;
};
mlog("WindowsNamedPipe resumeStream(0x{d}) got stream=0x{d}, calling readStart\n", .{ @intFromPtr(this), @intFromPtr(stream) });
const readStartResult = stream.readStart(this, onReadAlloc, onReadError, onRead);
if (readStartResult == .err) {
mlog("WindowsNamedPipe resumeStream(0x{d}) readStart failed: {}\n", .{ @intFromPtr(this), readStartResult.err });
return false;
}
mlog("WindowsNamedPipe resumeStream(0x{d}) readStart succeeded\n", .{@intFromPtr(this)});
return true;
}
@@ -274,27 +321,37 @@ pub fn from(
};
}
fn onConnect(this: *WindowsNamedPipe, status: uv.ReturnCode) void {
mlog("WindowsNamedPipe onConnect(0x{d}) status={}\n", .{ @intFromPtr(this), status.int() });
if (this.pipe) |pipe| {
mlog("WindowsNamedPipe onConnect(0x{d}) unreferencing pipe\n", .{@intFromPtr(this)});
_ = pipe.unref();
}
if (status.toError(.connect)) |err| {
mlog("WindowsNamedPipe onConnect(0x{d}) connect failed: {}\n", .{ @intFromPtr(this), err });
this.onError(err);
return;
}
mlog("WindowsNamedPipe onConnect(0x{d}) connect successful, setting disconnected=false\n", .{@intFromPtr(this)});
this.flags.disconnected = false;
if (this.start(true)) {
mlog("WindowsNamedPipe onConnect(0x{d}) start() successful, isTLS={}\n", .{ @intFromPtr(this), this.isTLS() });
if (this.isTLS()) {
if (this.wrapper) |*wrapper| {
mlog("WindowsNamedPipe onConnect(0x{d}) TLS: calling wrapper.start()\n", .{@intFromPtr(this)});
// trigger onOpen and start the handshake
wrapper.start();
}
} else {
mlog("WindowsNamedPipe onConnect(0x{d}) no TLS: calling onOpen()\n", .{@intFromPtr(this)});
// trigger onOpen
this.onOpen();
}
} else {
mlog("WindowsNamedPipe onConnect(0x{d}) start() failed\n", .{@intFromPtr(this)});
}
mlog("WindowsNamedPipe onConnect(0x{d}) calling flush()\n", .{@intFromPtr(this)});
this.flush();
}
@@ -431,27 +488,37 @@ pub fn startTLS(this: *WindowsNamedPipe, ssl_options: jsc.API.ServerConfig.SSLCo
}
pub fn start(this: *WindowsNamedPipe, is_client: bool) bool {
mlog("WindowsNamedPipe start(0x{d}) is_client={}\n", .{ @intFromPtr(this), is_client });
this.flags.is_client = is_client;
if (this.pipe == null) {
mlog("WindowsNamedPipe start(0x{d}) failed: pipe is null\n", .{@intFromPtr(this)});
return false;
}
mlog("WindowsNamedPipe start(0x{d}) unreferencing pipe and setting up writer\n", .{@intFromPtr(this)});
_ = this.pipe.?.unref();
this.writer.setParent(this);
const startPipeResult = this.writer.startWithPipe(this.pipe.?);
if (startPipeResult == .err) {
mlog("WindowsNamedPipe start(0x{d}) writer.startWithPipe failed: {}\n", .{ @intFromPtr(this), startPipeResult.err });
this.onError(startPipeResult.err);
return false;
}
mlog("WindowsNamedPipe start(0x{d}) writer.startWithPipe succeeded\n", .{@intFromPtr(this)});
const stream = this.writer.getStream() orelse {
mlog("WindowsNamedPipe start(0x{d}) writer.getStream() returned null\n", .{@intFromPtr(this)});
this.onError(bun.sys.Error.fromCode(bun.sys.E.PIPE, .read));
return false;
};
mlog("WindowsNamedPipe start(0x{d}) got stream=0x{d}, calling readStart\n", .{ @intFromPtr(this), @intFromPtr(stream) });
const readStartResult = stream.readStart(this, onReadAlloc, onReadError, onRead);
if (readStartResult == .err) {
mlog("WindowsNamedPipe start(0x{d}) readStart failed: {}\n", .{ @intFromPtr(this), readStartResult.err });
this.onError(readStartResult.err);
return false;
}
mlog("WindowsNamedPipe start(0x{d}) readStart succeeded\n", .{@intFromPtr(this)});
return true;
}
@@ -574,6 +641,7 @@ pub fn deinit(this: *WindowsNamedPipe) void {
pub const CertError = UpgradedDuplex.CertError;
const WrapperType = SSLWrapper(*WindowsNamedPipe);
const log = bun.Output.scoped(.WindowsNamedPipe, .visible);
const mlog = @import("../../mlog.zig").log;
const std = @import("std");
const SSLWrapper = @import("../../bun.js/api/bun/ssl_wrapper.zig").SSLWrapper;

View File

@@ -595,7 +595,10 @@ pub const FD = packed struct(backing_int) {
pub const truncate = bun.sys.ftruncate;
pub const unlinkat = bun.sys.unlinkat;
pub const updateNonblocking = bun.sys.updateNonblocking;
pub const write = bun.sys.write;
const Error = @import("./sys/Error.zig");
pub fn write(fd: bun.FileDescriptor, bytes: []const u8) bun.api.node.Maybe(usize, Error) {
return bun.sys.write(fd, bytes);
}
pub const writeNonblocking = bun.sys.writeNonblocking;
pub const writev = bun.sys.writev;

View File

@@ -1,3 +1,5 @@
const mlog = @import("../mlog.zig").log;
// This is a runtime type instead of comptime due to bugs in Zig.
// https://github.com/ziglang/zig/issues/18664
const BufferedReaderVTable = struct {
@@ -749,6 +751,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn from(to: *WindowsBufferedReader, other: anytype, parent: anytype) void {
mlog("WindowsBufferedReader from(to=0x{d})\n", .{@intFromPtr(to)});
bun.assert(other.source != null and to.source == null);
to.* = .{
.vtable = to.vtable,
@@ -770,11 +773,13 @@ pub const WindowsBufferedReader = struct {
return source.getFd();
}
pub fn watch(_: *WindowsBufferedReader) void {
pub fn watch(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader watch(0x{d})\n", .{@intFromPtr(this)});
// No-op on windows.
}
pub fn setParent(this: *WindowsBufferedReader, parent: anytype) void {
mlog("WindowsBufferedReader setParent(0x{d})\n", .{@intFromPtr(this)});
this.parent = parent;
if (!this.flags.is_done) {
if (this.source) |source| {
@@ -860,6 +865,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn startWithCurrentPipe(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader startWithCurrentPipe(0x{d})\n", .{@intFromPtr(this)});
bun.assert(!this.source.?.isClosed());
this.source.?.setData(this);
this.buffer().clearRetainingCapacity();
@@ -868,11 +874,13 @@ pub const WindowsBufferedReader = struct {
}
pub fn startWithPipe(this: *WindowsBufferedReader, pipe: *uv.Pipe) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader startWithPipe(0x{d})\n", .{@intFromPtr(this)});
this.source = .{ .pipe = pipe };
return this.startWithCurrentPipe();
}
pub fn start(this: *WindowsBufferedReader, fd: bun.FileDescriptor, _: bool) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader start(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
bun.assert(this.source == null);
const source = switch (Source.open(uv.Loop.get(), fd)) {
.err => |err| return .{ .err = err },
@@ -884,12 +892,14 @@ pub const WindowsBufferedReader = struct {
}
pub fn startFileOffset(this: *WindowsBufferedReader, fd: bun.FileDescriptor, poll: bool, offset: usize) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader startFileOffset(0x{d}, fd={}, poll={}, offset={})\n", .{ @intFromPtr(this), fd, poll, offset });
this._offset = offset;
this.flags.use_pread = true;
return this.start(fd, poll);
}
pub fn deinit(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader deinit(0x{d})\n", .{@intFromPtr(this)});
MaxBuf.removeFromPipereader(&this.maxbuf);
this.buffer().deinit();
const source = this.source orelse return;
@@ -901,6 +911,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn setRawMode(this: *WindowsBufferedReader, value: bool) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader setRawMode(0x{d}, value={})\n", .{ @intFromPtr(this), value });
const source = this.source orelse return .{
.err = .{
.errno = @intFromEnum(bun.sys.E.BADF),
@@ -923,6 +934,14 @@ pub const WindowsBufferedReader = struct {
const nread_int = nread.int();
bun.sys.syslog("onStreamRead(0x{d}) = {d}", .{ @intFromPtr(this), nread_int });
mlog("WindowsBufferedReader onStreamRead(0x{d}) = {d}\n", .{ @intFromPtr(this), nread_int });
// DEBUG: Track cumulative reads to diagnose partial read issue after libuv 1.51.0 upgrade
const current_buffer_size = this._buffer.items.len;
const bytes_this_read = if (nread_int > 0) @as(usize, @intCast(nread_int)) else 0;
const total_after_read = current_buffer_size + bytes_this_read;
mlog("WindowsBufferedReader READ_TRACKING(0x{d}) this_read={d} buffer_before={d} total_after={d} flags(done={} paused={})\n",
.{ @intFromPtr(this), bytes_this_read, current_buffer_size, total_after_read, this.flags.is_done, this.flags.is_paused });
// NOTE: pipes/tty need to call stopReading on errors (yeah)
switch (nread_int) {
@@ -946,6 +965,10 @@ pub const WindowsBufferedReader = struct {
const len: usize = @intCast(nread_int);
var slice = buf.slice();
this.onRead(.{ .result = len }, slice[0..len], .progress);
// DEBUG: Track state after onRead to see if it affects further reading
mlog("WindowsBufferedReader POST_READ(0x{d}) flags_after_onRead(done={} paused={}) buffer_final_size={d}\n",
.{ @intFromPtr(this), this.flags.is_done, this.flags.is_paused, this._buffer.items.len });
},
}
}
@@ -1012,6 +1035,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn startReading(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader startReading(0x{d})\n", .{@intFromPtr(this)});
if (this.flags.is_done or !this.flags.is_paused) return .success;
this.flags.is_paused = false;
const source: Source = this.source orelse return .{ .err = bun.sys.Error.fromCode(bun.sys.E.BADF, .read) };
@@ -1039,6 +1063,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn stopReading(this: *WindowsBufferedReader) bun.sys.Maybe(void) {
mlog("WindowsBufferedReader stopReading(0x{d})\n", .{@intFromPtr(this)});
if (this.flags.is_done or this.flags.is_paused) return .success;
this.flags.is_paused = true;
const source = this.source orelse return .success;
@@ -1054,6 +1079,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn closeImpl(this: *WindowsBufferedReader, comptime callDone: bool) void {
mlog("WindowsBufferedReader closeImpl(0x{d}, callDone={})\n", .{ @intFromPtr(this), callDone });
if (this.source) |source| {
switch (source) {
.sync_file, .file => |file| {
@@ -1088,6 +1114,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn close(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader close(0x{d})\n", .{@intFromPtr(this)});
_ = this.stopReading();
this.closeImpl(true);
}
@@ -1109,6 +1136,7 @@ pub const WindowsBufferedReader = struct {
}
pub fn onRead(this: *WindowsBufferedReader, amount: bun.sys.Maybe(usize), slice: []u8, hasMore: ReadState) void {
mlog("WindowsBufferedReader onRead(0x{d}, slice.len={})\n", .{ @intFromPtr(this), slice.len });
if (amount == .err) {
this.onError(amount.err);
return;
@@ -1139,14 +1167,17 @@ pub const WindowsBufferedReader = struct {
}
pub fn pause(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader pause(0x{d})\n", .{@intFromPtr(this)});
_ = this.stopReading();
}
pub fn unpause(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader unpause(0x{d})\n", .{@intFromPtr(this)});
_ = this.startReading();
}
pub fn read(this: *WindowsBufferedReader) void {
mlog("WindowsBufferedReader read(0x{d})\n", .{@intFromPtr(this)});
// we cannot sync read pipes on Windows so we just check if we are paused to resume the reading
this.unpause();
}

View File

@@ -1,3 +1,4 @@
const mlog = @import("../mlog.zig").log;
const log = bun.Output.scoped(.PipeWriter, .hidden);
pub const WriteResult = union(enum) {
@@ -781,155 +782,232 @@ fn BaseWindowsPipeWriter(
) type {
return struct {
pub fn getFd(this: *const WindowsPipeWriter) bun.FileDescriptor {
const pipe = this.source orelse return bun.invalid_fd;
return pipe.getFd();
mlog("BaseWindowsPipeWriter getFd(0x{d})\n", .{@intFromPtr(this)});
const pipe = this.source orelse {
mlog("BaseWindowsPipeWriter getFd(0x{d}) -> no source, returning invalid_fd\n", .{@intFromPtr(this)});
return bun.invalid_fd;
};
const fd = pipe.getFd();
mlog("BaseWindowsPipeWriter getFd(0x{d}) -> fd={}\n", .{ @intFromPtr(this), fd });
return fd;
}
pub fn hasRef(this: *const WindowsPipeWriter) bool {
mlog("BaseWindowsPipeWriter hasRef(0x{d}) is_done={}\n", .{ @intFromPtr(this), this.is_done });
if (this.is_done) {
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> false (is_done)\n", .{@intFromPtr(this)});
return false;
}
if (this.source) |pipe| return pipe.hasRef();
if (this.source) |pipe| {
const has_ref = pipe.hasRef();
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> {} (from pipe)\n", .{ @intFromPtr(this), has_ref });
return has_ref;
}
mlog("BaseWindowsPipeWriter hasRef(0x{d}) -> false (no source)\n", .{@intFromPtr(this)});
return false;
}
pub fn enableKeepingProcessAlive(this: *WindowsPipeWriter, event_loop: anytype) void {
mlog("BaseWindowsPipeWriter enableKeepingProcessAlive(0x{d})\n", .{@intFromPtr(this)});
this.updateRef(event_loop, true);
}
pub fn disableKeepingProcessAlive(this: *WindowsPipeWriter, event_loop: anytype) void {
mlog("BaseWindowsPipeWriter disableKeepingProcessAlive(0x{d})\n", .{@intFromPtr(this)});
this.updateRef(event_loop, false);
}
fn onFileClose(handle: *uv.fs_t) callconv(.C) void {
mlog("BaseWindowsPipeWriter onFileClose() handle=0x{d}\n", .{@intFromPtr(handle)});
const file = bun.cast(*Source.File, handle.data);
mlog("BaseWindowsPipeWriter onFileClose() file=0x{d}, cleaning up\n", .{@intFromPtr(file)});
handle.deinit();
bun.default_allocator.destroy(file);
}
fn onPipeClose(handle: *uv.Pipe) callconv(.C) void {
mlog("BaseWindowsPipeWriter onPipeClose() handle=0x{d}\n", .{@intFromPtr(handle)});
const this = bun.cast(*uv.Pipe, handle.data);
mlog("BaseWindowsPipeWriter onPipeClose() pipe=0x{d}, destroying\n", .{@intFromPtr(this)});
bun.default_allocator.destroy(this);
}
fn onTTYClose(handle: *uv.uv_tty_t) callconv(.C) void {
mlog("BaseWindowsPipeWriter onTTYClose() handle=0x{d}\n", .{@intFromPtr(handle)});
const this = bun.cast(*uv.uv_tty_t, handle.data);
mlog("BaseWindowsPipeWriter onTTYClose() tty=0x{d}, destroying\n", .{@intFromPtr(this)});
bun.default_allocator.destroy(this);
}
pub fn close(this: *WindowsPipeWriter) void {
mlog("BaseWindowsPipeWriter close(0x{d}) is_done={} owns_fd={}\n", .{ @intFromPtr(this), this.is_done, this.owns_fd });
this.is_done = true;
if (this.source) |source| {
mlog("BaseWindowsPipeWriter close(0x{d}) closing source type={s}\n", .{ @intFromPtr(this), @tagName(source) });
switch (source) {
.sync_file, .file => |file| {
mlog("BaseWindowsPipeWriter close(0x{d}) handling file/sync_file, calling fs.cancel()\n", .{@intFromPtr(this)});
// always cancel the current one
file.fs.cancel();
if (this.owns_fd) {
mlog("BaseWindowsPipeWriter close(0x{d}) owns_fd=true, calling uv_fs_close\n", .{@intFromPtr(this)});
// always use close_fs here because we can have a operation in progress
file.close_fs.data = file;
_ = uv.uv_fs_close(uv.Loop.get(), &file.close_fs, file.file, onFileClose);
} else {
mlog("BaseWindowsPipeWriter close(0x{d}) owns_fd=false, skipping uv_fs_close\n", .{@intFromPtr(this)});
}
},
.pipe => |pipe| {
mlog("BaseWindowsPipeWriter close(0x{d}) handling pipe, calling pipe.close()\n", .{@intFromPtr(this)});
pipe.data = pipe;
pipe.close(onPipeClose);
},
.tty => |tty| {
mlog("BaseWindowsPipeWriter close(0x{d}) handling tty, calling tty.close()\n", .{@intFromPtr(this)});
tty.data = tty;
tty.close(onTTYClose);
},
}
this.source = null;
mlog("BaseWindowsPipeWriter close(0x{d}) calling onCloseSource()\n", .{@intFromPtr(this)});
this.onCloseSource();
} else {
mlog("BaseWindowsPipeWriter close(0x{d}) no source to close\n", .{@intFromPtr(this)});
}
}
pub fn updateRef(this: *WindowsPipeWriter, _: anytype, value: bool) void {
mlog("BaseWindowsPipeWriter updateRef(0x{d}, value={})\n", .{ @intFromPtr(this), value });
if (this.source) |pipe| {
if (value) {
mlog("BaseWindowsPipeWriter updateRef(0x{d}) calling pipe.ref()\n", .{@intFromPtr(this)});
pipe.ref();
} else {
mlog("BaseWindowsPipeWriter updateRef(0x{d}) calling pipe.unref()\n", .{@intFromPtr(this)});
pipe.unref();
}
} else {
mlog("BaseWindowsPipeWriter updateRef(0x{d}) no source to update\n", .{@intFromPtr(this)});
}
}
pub fn setParent(this: *WindowsPipeWriter, parent: *Parent) void {
mlog("BaseWindowsPipeWriter setParent(0x{d}, parent=0x{d}) is_done={}\n", .{ @intFromPtr(this), @intFromPtr(parent), this.is_done });
this.parent = parent;
if (!this.is_done) {
if (this.source) |pipe| {
mlog("BaseWindowsPipeWriter setParent(0x{d}) calling pipe.setData\n", .{@intFromPtr(this)});
pipe.setData(this);
} else {
mlog("BaseWindowsPipeWriter setParent(0x{d}) no source to setData\n", .{@intFromPtr(this)});
}
} else {
mlog("BaseWindowsPipeWriter setParent(0x{d}) skipping setData (is_done=true)\n", .{@intFromPtr(this)});
}
}
pub fn watch(_: *WindowsPipeWriter) void {
pub fn watch(this: *WindowsPipeWriter) void {
mlog("BaseWindowsPipeWriter watch(0x{d}) - no-op\n", .{@intFromPtr(this)});
// no-op
}
pub fn startWithPipe(this: *WindowsPipeWriter, pipe: *uv.Pipe) bun.sys.Maybe(void) {
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}, pipe=0x{d})\n", .{ @intFromPtr(this), @intFromPtr(pipe) });
bun.assert(this.source == null);
this.source = .{ .pipe = pipe };
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}) calling setParent\n", .{@intFromPtr(this)});
this.setParent(this.parent);
mlog("BaseWindowsPipeWriter startWithPipe(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
return this.startWithCurrentPipe();
}
pub fn startSync(this: *WindowsPipeWriter, fd: bun.FileDescriptor, _: bool) bun.sys.Maybe(void) {
mlog("BaseWindowsPipeWriter startSync(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
bun.assert(this.source == null);
const source = Source{
.sync_file = Source.openFile(fd),
};
mlog("BaseWindowsPipeWriter startSync(0x{d}) created sync_file source\n", .{@intFromPtr(this)});
source.setData(this);
this.source = source;
this.setParent(this.parent);
mlog("BaseWindowsPipeWriter startSync(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
return this.startWithCurrentPipe();
}
pub fn startWithFile(this: *WindowsPipeWriter, fd: bun.FileDescriptor) bun.sys.Maybe(void) {
mlog("BaseWindowsPipeWriter startWithFile(0x{d}, fd={})\n", .{ @intFromPtr(this), fd });
bun.assert(this.source == null);
const source: bun.io.Source = .{ .file = Source.openFile(fd) };
mlog("BaseWindowsPipeWriter startWithFile(0x{d}) created file source\n", .{@intFromPtr(this)});
source.setData(this);
this.source = source;
this.setParent(this.parent);
mlog("BaseWindowsPipeWriter startWithFile(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
return this.startWithCurrentPipe();
}
pub fn start(this: *WindowsPipeWriter, rawfd: anytype, _: bool) bun.sys.Maybe(void) {
const FDType = @TypeOf(rawfd);
mlog("BaseWindowsPipeWriter start(0x{d}, FDType={s})\n", .{ @intFromPtr(this), @typeName(FDType) });
const fd = switch (FDType) {
bun.FileDescriptor => rawfd,
*bun.MovableIfWindowsFd => rawfd.get().?,
else => @compileError("Expected `bun.FileDescriptor` or `*bun.MovableIfWindowsFd` but got: " ++ @typeName(rawfd)),
};
mlog("BaseWindowsPipeWriter start(0x{d}) resolved fd={}\n", .{ @intFromPtr(this), fd });
bun.assert(this.source == null);
const source = switch (Source.open(uv.Loop.get(), fd)) {
.result => |source| source,
.err => |err| return .{ .err = err },
.result => |src| blk: {
mlog("BaseWindowsPipeWriter start(0x{d}) Source.open succeeded\n", .{@intFromPtr(this)});
break :blk src;
},
.err => |err| {
mlog("BaseWindowsPipeWriter start(0x{d}) Source.open failed: {}\n", .{ @intFromPtr(this), err });
return .{ .err = err };
},
};
// Creating a uv_pipe/uv_tty takes ownership of the file descriptor
// TODO: Change the type of the parameter and update all places to
// use MovableFD
if (switch (source) {
const should_take_ownership = switch (source) {
.pipe, .tty => true,
else => false,
} and FDType == *bun.MovableIfWindowsFd) {
} and FDType == *bun.MovableIfWindowsFd;
if (should_take_ownership) {
mlog("BaseWindowsPipeWriter start(0x{d}) taking ownership of MovableFD\n", .{@intFromPtr(this)});
_ = rawfd.take();
} else {
mlog("BaseWindowsPipeWriter start(0x{d}) not taking ownership (source={s}, FDType={s})\n", .{ @intFromPtr(this), @tagName(source), @typeName(FDType) });
}
source.setData(this);
this.source = source;
mlog("BaseWindowsPipeWriter start(0x{d}) calling setParent\n", .{@intFromPtr(this)});
this.setParent(this.parent);
mlog("BaseWindowsPipeWriter start(0x{d}) calling startWithCurrentPipe\n", .{@intFromPtr(this)});
return this.startWithCurrentPipe();
}
pub fn setPipe(this: *WindowsPipeWriter, pipe: *uv.Pipe) void {
mlog("BaseWindowsPipeWriter setPipe(0x{d}, pipe=0x{d})\n", .{ @intFromPtr(this), @intFromPtr(pipe) });
this.source = .{ .pipe = pipe };
this.setParent(this.parent);
}
pub fn getStream(this: *const WindowsPipeWriter) ?*uv.uv_stream_t {
const source = this.source orelse return null;
if (source == .file) return null;
return source.toStream();
mlog("BaseWindowsPipeWriter getStream(0x{d})\n", .{@intFromPtr(this)});
const source = this.source orelse {
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> null (no source)\n", .{@intFromPtr(this)});
return null;
};
if (source == .file) {
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> null (file source)\n", .{@intFromPtr(this)});
return null;
}
const stream = source.toStream();
mlog("BaseWindowsPipeWriter getStream(0x{d}) -> stream=0x{d}\n", .{ @intFromPtr(this), @intFromPtr(stream) });
return stream;
}
};
}
@@ -973,16 +1051,19 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
pub const getStream = internals.getStream;
fn onCloseSource(this: *WindowsWriter) void {
mlog("WindowsBufferedWriter onCloseSource(0x{d})\n", .{@intFromPtr(this)});
if (onClose) |onCloseFn| {
onCloseFn(this.parent);
}
}
pub fn memoryCost(this: *const WindowsWriter) usize {
mlog("WindowsBufferedWriter memoryCost(0x{d})\n", .{@intFromPtr(this)});
return @sizeOf(@This()) + this.write_buffer.len;
}
pub fn startWithCurrentPipe(this: *WindowsWriter) bun.sys.Maybe(void) {
mlog("WindowsBufferedWriter startWithCurrentPipe(0x{d})\n", .{@intFromPtr(this)});
bun.assert(this.source != null);
this.is_done = false;
this.write();
@@ -991,18 +1072,23 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
fn onWriteComplete(this: *WindowsWriter, status: uv.ReturnCode) void {
const written = this.pending_payload_size;
mlog("WindowsBufferedWriter onWriteComplete(0x{d}, written={}, status={})\n", .{ @intFromPtr(this), written, status.int() });
this.pending_payload_size = 0;
if (status.toError(.write)) |err| {
mlog("There was an error during write: {}\n", .{err});
this.close();
onError(this.parent, err);
return;
}
const pending = this.getBufferInternal();
const has_pending_data = (pending.len - written) == 0;
mlog("has_pending_data: {}, is_done: {}\n", .{ has_pending_data, this.is_done });
onWrite(this.parent, @intCast(written), if (this.is_done and !has_pending_data) .drained else .pending);
// is_done can be changed inside onWrite
if (this.is_done and !has_pending_data) {
// already done and end was called
mlog("WindowsBufferedWriter onWriteComplete(0x{d}) LIFECYCLE: closing writer after completion (is_done={} no_pending_data={})\n", .{ @intFromPtr(this), this.is_done, !has_pending_data });
this.close();
return;
}
@@ -1019,6 +1105,7 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
return;
}
const this = bun.cast(*WindowsWriter, fs.data);
mlog("WindowsWriter onFsWriteComplete(0x{d}, result={})\n", .{ @intFromPtr(this), result.int() });
fs.deinit();
if (result.toError(.write)) |err| {
@@ -1031,53 +1118,78 @@ pub fn WindowsBufferedWriter(Parent: type, function_table: anytype) type {
}
pub fn write(this: *WindowsWriter) void {
mlog("WindowsBufferedWriter write(0x{d}) called\n", .{@intFromPtr(this)});
const buffer = this.getBufferInternal();
mlog("WindowsBufferedWriter write(0x{d}) buffer.len={} is_done={} pending_payload_size={}\n", .{ @intFromPtr(this), buffer.len, this.is_done, this.pending_payload_size });
// if we are already done or if we have some pending payload we just wait until next write
if (this.is_done or this.pending_payload_size > 0 or buffer.len == 0) {
mlog("WindowsBufferedWriter write(0x{d}) exiting early: is_done={} payload_size={} buffer.len={}\n", .{ @intFromPtr(this), this.is_done, this.pending_payload_size, buffer.len });
return;
}
const pipe = this.source orelse return;
const pipe = this.source orelse {
mlog("WindowsBufferedWriter write(0x{d}) no source, returning\n", .{@intFromPtr(this)});
return;
};
mlog("WindowsBufferedWriter write(0x{d}) source type={s}\n", .{ @intFromPtr(this), @tagName(pipe) });
switch (pipe) {
.sync_file => {
mlog("WindowsBufferedWriter write(0x{d}) ERROR: sync_file path reached\n", .{@intFromPtr(this)});
@panic("This code path shouldn't be reached - sync_file in PipeWriter.zig");
},
.file => |file| {
mlog("WindowsBufferedWriter write(0x{d}) writing to file, buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
this.pending_payload_size = buffer.len;
file.fs.deinit();
file.fs.setData(this);
this.write_buffer = uv.uv_buf_t.init(buffer);
mlog("WindowsBufferedWriter write(0x{d}) calling uv_fs_write\n", .{@intFromPtr(this)});
if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| {
mlog("WindowsBufferedWriter write(0x{d}) uv_fs_write failed: {}\n", .{ @intFromPtr(this), err });
this.close();
onError(this.parent, err);
} else {
mlog("WindowsBufferedWriter write(0x{d}) uv_fs_write initiated successfully\n", .{@intFromPtr(this)});
}
},
else => {
mlog("WindowsBufferedWriter write(0x{d}) writing to stream, buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
// the buffered version should always have a stable ptr
this.pending_payload_size = buffer.len;
this.write_buffer = uv.uv_buf_t.init(buffer);
mlog("WindowsBufferedWriter write(0x{d}) calling write_req.write\n", .{@intFromPtr(this)});
if (this.write_req.write(pipe.toStream(), &this.write_buffer, this, onWriteComplete).asErr()) |write_err| {
mlog("WindowsBufferedWriter write(0x{d}) write_req.write failed: {}\n", .{ @intFromPtr(this), write_err });
this.close();
onError(this.parent, write_err);
} else {
mlog("WindowsBufferedWriter write(0x{d}) write_req.write initiated successfully\n", .{@intFromPtr(this)});
}
},
}
}
fn getBufferInternal(this: *WindowsWriter) []const u8 {
return getBuffer(this.parent);
const buffer = getBuffer(this.parent);
mlog("WindowsBufferedWriter getBufferInternal(0x{d}) -> buffer.len={}\n", .{ @intFromPtr(this), buffer.len });
return buffer;
}
pub fn end(this: *WindowsWriter) void {
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: end() called, pending_payload_size={}\n", .{ @intFromPtr(this), this.pending_payload_size });
if (this.is_done) {
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: already done, ignoring\n", .{@intFromPtr(this)});
return;
}
this.is_done = true;
if (this.pending_payload_size == 0) {
// will auto close when pending stuff get written
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: closing immediately (no pending data)\n", .{@intFromPtr(this)});
this.close();
} else {
mlog("WindowsBufferedWriter end(0x{d}) LIFECYCLE: waiting for pending write to complete before closing\n", .{@intFromPtr(this)});
}
}
};
@@ -1253,34 +1365,47 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
pub const getStream = internals.getStream;
pub fn memoryCost(this: *const WindowsWriter) usize {
return @sizeOf(@This()) + this.current_payload.memoryCost() + this.outgoing.memoryCost();
const cost = @sizeOf(@This()) + this.current_payload.memoryCost() + this.outgoing.memoryCost();
mlog("WindowsStreamingWriter memoryCost(0x{d}) = {d}\n", .{ @intFromPtr(this), cost });
return cost;
}
fn onCloseSource(this: *WindowsWriter) void {
mlog("WindowsStreamingWriter onCloseSource(0x{d}) closed_without_reporting={}\n", .{ @intFromPtr(this), this.closed_without_reporting });
this.source = null;
if (this.closed_without_reporting) {
mlog("WindowsStreamingWriter onCloseSource(0x{d}) early return due to closed_without_reporting\n", .{@intFromPtr(this)});
this.closed_without_reporting = false;
return;
}
mlog("WindowsStreamingWriter onCloseSource(0x{d}) calling onClose\n", .{@intFromPtr(this)});
onClose(this.parent);
}
pub fn startWithCurrentPipe(this: *WindowsWriter) bun.sys.Maybe(void) {
mlog("WindowsStreamingWriter startWithCurrentPipe(0x{d}) source_is_null={}\n", .{ @intFromPtr(this), this.source == null });
bun.assert(this.source != null);
this.is_done = false;
mlog("WindowsStreamingWriter startWithCurrentPipe(0x{d}) success, is_done={}\n", .{ @intFromPtr(this), this.is_done });
return .success;
}
pub fn hasPendingData(this: *const WindowsWriter) bool {
return (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty());
const has_pending = (this.outgoing.isNotEmpty() or this.current_payload.isNotEmpty());
mlog("WindowsStreamingWriter hasPendingData(0x{d}) = {} (outgoing={}, current_payload={})\n", .{ @intFromPtr(this), has_pending, this.outgoing.isNotEmpty(), this.current_payload.isNotEmpty() });
return has_pending;
}
fn isDone(this: *WindowsWriter) bool {
// done is flags andd no more data queued? so we are done!
return this.is_done and !this.hasPendingData();
const pending = this.hasPendingData();
const done = this.is_done and !pending;
mlog("WindowsStreamingWriter isDone(0x{d}) = {} (is_done={}, hasPendingData={})\n", .{ @intFromPtr(this), done, this.is_done, pending });
return done;
}
fn onWriteComplete(this: *WindowsWriter, status: uv.ReturnCode) void {
mlog("WindowsStreamingWriter onWriteComplete(0x{d}, status={})\n", .{ @intFromPtr(this), status.int() });
if (status.toError(.write)) |err| {
this.last_write_result = .{ .err = err };
log("onWrite() = {s}", .{err.name()});
@@ -1323,27 +1448,34 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
fn onFsWriteComplete(fs: *uv.fs_t) callconv(.C) void {
const result = fs.result;
mlog("WindowsStreamingWriter onFsWriteComplete() result={}\n", .{result.int()});
if (result.int() == uv.UV_ECANCELED) {
mlog("WindowsStreamingWriter onFsWriteComplete() CANCELED path\n", .{});
fs.deinit();
return;
}
const this = bun.cast(*WindowsWriter, fs.data);
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}, result={})\n", .{ @intFromPtr(this), result.int() });
fs.deinit();
if (result.toError(.write)) |err| {
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}) ERROR path: {}\n", .{ @intFromPtr(this), err });
this.close();
onError(this.parent, err);
return;
}
mlog("WindowsStreamingWriter onFsWriteComplete(0x{d}) SUCCESS path: calling onWriteComplete\n", .{@intFromPtr(this)});
this.onWriteComplete(.zero);
}
/// this tries to send more data returning if we are writable or not after this
fn processSend(this: *WindowsWriter) void {
mlog("WindowsStreamingWriter processSend(0x{d}) called\n", .{@intFromPtr(this)});
log("processSend", .{});
if (this.current_payload.isNotEmpty()) {
// we have some pending async request, the next outgoing data will be processed after this finish
mlog("WindowsStreamingWriter processSend(0x{d}) PENDING path: current_payload not empty, size={}\n", .{ @intFromPtr(this), this.current_payload.size() });
this.last_write_result = .{ .pending = 0 };
return;
}
@@ -1351,11 +1483,14 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
const bytes = this.outgoing.slice();
// nothing todo (we assume we are writable until we try to write something)
if (bytes.len == 0) {
mlog("WindowsStreamingWriter processSend(0x{d}) EMPTY path: no outgoing data\n", .{@intFromPtr(this)});
this.last_write_result = .{ .wrote = 0 };
return;
}
mlog("WindowsStreamingWriter processSend(0x{d}) processing {} bytes\n", .{ @intFromPtr(this), bytes.len });
var pipe = this.source orelse {
mlog("WindowsStreamingWriter processSend(0x{d}) ERROR: no source pipe\n", .{@intFromPtr(this)});
const err = bun.sys.Error.fromCode(bun.sys.E.PIPE, .pipe);
this.last_write_result = .{ .err = err };
onError(this.parent, err);
@@ -1364,77 +1499,100 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
};
// current payload is empty we can just swap with outgoing
mlog("WindowsStreamingWriter processSend(0x{d}) swapping buffers\n", .{@intFromPtr(this)});
const temp = this.current_payload;
this.current_payload = this.outgoing;
this.outgoing = temp;
switch (pipe) {
.sync_file => {
mlog("WindowsStreamingWriter processSend(0x{d}) PANIC: sync_file should not be reachable\n", .{@intFromPtr(this)});
@panic("sync_file pipe write should not be reachable");
},
.file => |file| {
mlog("WindowsStreamingWriter processSend(0x{d}) FILE path: calling uv_fs_write\n", .{@intFromPtr(this)});
file.fs.deinit();
file.fs.setData(this);
this.write_buffer = uv.uv_buf_t.init(bytes);
if (uv.uv_fs_write(uv.Loop.get(), &file.fs, file.file, @ptrCast(&this.write_buffer), 1, -1, onFsWriteComplete).toError(.write)) |err| {
mlog("WindowsStreamingWriter processSend(0x{d}) FILE ERROR: uv_fs_write failed: {}\n", .{ @intFromPtr(this), err });
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
}
mlog("WindowsStreamingWriter processSend(0x{d}) FILE: uv_fs_write queued successfully\n", .{@intFromPtr(this)});
},
else => {
// enqueue the write
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM path: calling write_req.write\n", .{@intFromPtr(this)});
this.write_buffer = uv.uv_buf_t.init(bytes);
if (this.write_req.write(pipe.toStream(), &this.write_buffer, this, onWriteComplete).asErr()) |err| {
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM ERROR: write_req.write failed: {}\n", .{ @intFromPtr(this), err });
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
}
mlog("WindowsStreamingWriter processSend(0x{d}) STREAM: write_req.write queued successfully\n", .{@intFromPtr(this)});
},
}
mlog("WindowsStreamingWriter processSend(0x{d}) setting last_write_result to pending\n", .{@intFromPtr(this)});
this.last_write_result = .{ .pending = 0 };
}
const WindowsWriter = @This();
fn closeWithoutReporting(this: *WindowsWriter) void {
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) fd={}, closed_without_reporting={}\n", .{ @intFromPtr(this), this.getFd().cast(), this.closed_without_reporting });
if (this.getFd() != bun.invalid_fd) {
bun.assert(!this.closed_without_reporting);
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) setting flag and calling close\n", .{@intFromPtr(this)});
this.closed_without_reporting = true;
this.close();
} else {
mlog("WindowsStreamingWriter closeWithoutReporting(0x{d}) invalid fd, skipping close\n", .{@intFromPtr(this)});
}
}
pub fn deinit(this: *WindowsWriter) void {
mlog("WindowsStreamingWriter deinit(0x{d}) outgoing_size={}, current_payload_size={}\n", .{ @intFromPtr(this), this.outgoing.size(), this.current_payload.size() });
// clean both buffers if needed
this.outgoing.deinit();
this.current_payload.deinit();
mlog("WindowsStreamingWriter deinit(0x{d}) buffers cleaned, calling closeWithoutReporting\n", .{@intFromPtr(this)});
this.closeWithoutReporting();
}
fn writeInternal(this: *WindowsWriter, buffer: anytype, comptime writeFn: anytype) WriteResult {
mlog("WindowsStreamingWriter writeInternal(0x{d}) buffer_len={}, is_done={}\n", .{ @intFromPtr(this), @as(usize, buffer.len), this.is_done });
if (this.is_done) {
mlog("WindowsStreamingWriter writeInternal(0x{d}) DONE path: already ended\n", .{@intFromPtr(this)});
return .{ .done = 0 };
}
if (this.source != null and this.source.? == .sync_file) {
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE path\n", .{@intFromPtr(this)});
defer this.outgoing.reset();
var remain = StreamBuffer.writeOrFallback(&this.outgoing, buffer, comptime writeFn) catch {
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE OOM error\n", .{@intFromPtr(this)});
return .{ .err = bun.sys.Error.oom };
};
const initial_len = remain.len;
const fd: bun.FD = .fromUV(this.source.?.sync_file.file);
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE writing {} bytes to fd={}\n", .{ @intFromPtr(this), initial_len, fd.cast() });
while (remain.len > 0) {
switch (fd.write(remain)) {
.err => |err| {
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE write error: {}\n", .{ @intFromPtr(this), err });
return .{ .err = err };
},
.result => |wrote| {
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE wrote {} bytes, {} remaining\n", .{ @intFromPtr(this), wrote, remain.len - wrote });
remain = remain[wrote..];
if (wrote == 0) {
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE wrote 0, breaking\n", .{@intFromPtr(this)});
break;
}
},
@@ -1442,6 +1600,7 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
}
const wrote = initial_len - remain.len;
mlog("WindowsStreamingWriter writeInternal(0x{d}) SYNC_FILE total wrote={}\n", .{ @intFromPtr(this), wrote });
if (wrote == 0) {
return .{ .done = wrote };
}
@@ -1449,56 +1608,76 @@ pub fn WindowsStreamingWriter(comptime Parent: type, function_table: anytype) ty
}
const had_buffered_data = this.outgoing.isNotEmpty();
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC path: had_buffered_data={}, outgoing_size={}\n", .{ @intFromPtr(this), had_buffered_data, this.outgoing.size() });
(if (comptime @TypeOf(writeFn) == @TypeOf(&StreamBuffer.writeLatin1) and writeFn == &StreamBuffer.writeLatin1)
writeFn(&this.outgoing, buffer, true)
else
writeFn(&this.outgoing, buffer)) catch {
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC OOM error during buffer write\n", .{@intFromPtr(this)});
return .{ .err = bun.sys.Error.oom };
};
if (had_buffered_data) {
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC had buffered data, returning pending\n", .{@intFromPtr(this)});
return .{ .pending = 0 };
}
mlog("WindowsStreamingWriter writeInternal(0x{d}) ASYNC calling processSend\n", .{@intFromPtr(this)});
this.processSend();
return this.last_write_result;
}
pub fn writeUTF16(this: *WindowsWriter, buf: []const u16) WriteResult {
mlog("WindowsStreamingWriter writeUTF16(0x{d}, buf.len={})\n", .{ @intFromPtr(this), buf.len });
return writeInternal(this, buf, &StreamBuffer.writeUTF16);
}
pub fn writeLatin1(this: *WindowsWriter, buffer: []const u8) WriteResult {
mlog("WindowsStreamingWriter writeLatin1(0x{d}, buffer.len={})\n", .{ @intFromPtr(this), buffer.len });
return writeInternal(this, buffer, &StreamBuffer.writeLatin1);
}
pub fn write(this: *WindowsWriter, buffer: []const u8) WriteResult {
mlog("WindowsStreamingWriter write(0x{d}, buffer.len={})\n", .{ @intFromPtr(this), buffer.len });
return writeInternal(this, buffer, &StreamBuffer.write);
}
pub fn flush(this: *WindowsWriter) WriteResult {
mlog("WindowsStreamingWriter flush(0x{d}) is_done={}\n", .{ @intFromPtr(this), this.is_done });
if (this.is_done) {
mlog("WindowsStreamingWriter flush(0x{d}) DONE path: already ended\n", .{@intFromPtr(this)});
return .{ .done = 0 };
}
if (!this.hasPendingData()) {
const has_pending = this.hasPendingData();
if (!has_pending) {
mlog("WindowsStreamingWriter flush(0x{d}) NO_PENDING path: no data to flush\n", .{@intFromPtr(this)});
return .{ .wrote = 0 };
}
mlog("WindowsStreamingWriter flush(0x{d}) calling processSend\n", .{@intFromPtr(this)});
this.processSend();
return this.last_write_result;
}
pub fn end(this: *WindowsWriter) void {
mlog("WindowsStreamingWriter end(0x{d}) is_done={}, owns_fd={}\n", .{ @intFromPtr(this), this.is_done, this.owns_fd });
if (this.is_done) {
mlog("WindowsStreamingWriter end(0x{d}) already done, returning\n", .{@intFromPtr(this)});
return;
}
this.closed_without_reporting = false;
this.is_done = true;
if (!this.hasPendingData()) {
const has_pending = this.hasPendingData();
mlog("WindowsStreamingWriter end(0x{d}) hasPendingData={}\n", .{ @intFromPtr(this), has_pending });
if (!has_pending) {
if (!this.owns_fd) {
mlog("WindowsStreamingWriter end(0x{d}) not owning fd, returning\n", .{@intFromPtr(this)});
return;
}
mlog("WindowsStreamingWriter end(0x{d}) calling close\n", .{@intFromPtr(this)});
this.close();
} else {
mlog("WindowsStreamingWriter end(0x{d}) has pending data, will close after writes complete\n", .{@intFromPtr(this)});
}
}
};

70
src/mlog.zig Normal file
View File

@@ -0,0 +1,70 @@
const std = @import("std");
const c = @cImport({
@cInclude("stdlib.h");
});
const Logger = struct {
var log_file: ?std.fs.File = null;
var proc_id: std.os.windows.DWORD = 0;
var name_buf: [64]u8 = undefined;
var name: []u8 = undefined;
export fn cleanup() void {
if (Logger.log_file) |*file| {
file.sync() catch {
@panic("Failed to flush mlog.txt: {any}\n");
};
file.close();
Logger.log_file = null;
}
std.debug.print("Saved output to {s}.\n", .{Logger.name});
}
fn getAndOpenFile() *std.fs.File {
if (Logger.log_file) |*file| {
return file;
}
Logger.proc_id = std.os.windows.GetCurrentProcessId();
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
var cwd = std.fs.cwd();
Logger.name = std.fmt.bufPrint(&name_buf, "mlog_{}.txt", .{Logger.proc_id}) catch {
@panic("Failed to format mlog filename: {any}\n");
};
Logger.log_file = cwd.openFile(Logger.name, .{
.mode = .read_write,
}) catch
cwd.createFile(
Logger.name,
.{
.read = true,
.truncate = false,
},
) catch
@panic("Failed to create mlog.txt: {any}\n");
_ = c.atexit(cleanup);
return &(Logger.log_file orelse unreachable);
}
pub fn log(comptime fmt: []const u8, args: anytype) void {
const file = Logger.getAndOpenFile();
const writer = file.writer();
const nanos = std.time.nanoTimestamp();
std.fmt.format(writer, "[{}] (pid {}) " ++ fmt, .{ nanos, Logger.proc_id } ++ args) catch {
@panic("Failed to write to mlog.txt: {any}\n");
};
}
};
pub const log = Logger.log;

View File

@@ -1,3 +1,4 @@
const mlog = @import("../mlog.zig").log;
// const IPC = @import("../bun.js/ipc.zig");
pub const Stdio = util.Stdio;
@@ -1106,6 +1107,7 @@ pub const PipeReader = struct {
.out_type = out_type,
});
log("PipeReader(0x{x}, {s}) create()", .{ @intFromPtr(this), @tagName(this.out_type) });
mlog("PipeReader(0x{x}, {s}) create()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
if (capture) |cap| {
this.captured_writer.writer = cap.refSelf();
@@ -1131,10 +1133,12 @@ pub const PipeReader = struct {
}
pub fn start(this: *PipeReader, process: *ShellSubprocess, event_loop: jsc.EventLoopHandle) bun.sys.Maybe(void) {
mlog("PipeReader(0x{x}, {s}) start()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
// this.ref();
this.process = process;
this.event_loop = event_loop;
if (Environment.isWindows) {
mlog("Starting with current pipe...\n", .{});
return this.reader.startWithCurrentPipe();
}
@@ -1161,6 +1165,7 @@ pub const PipeReader = struct {
var this: *PipeReader = @ptrCast(@alignCast(ptr));
this.buffered_output.append(chunk);
log("PipeReader(0x{x}, {s}) onReadChunk(chunk_len={d}, has_more={s})", .{ @intFromPtr(this), @tagName(this.out_type), chunk.len, @tagName(has_more) });
mlog("PipeReader(0x{x}, {s}) onReadChunk(chunk_len={d}, has_more={s})\n", .{ @intFromPtr(this), @tagName(this.out_type), chunk.len, @tagName(has_more) });
this.captured_writer.doWrite(chunk);
@@ -1179,6 +1184,7 @@ pub const PipeReader = struct {
}
pub fn onReaderDone(this: *PipeReader) void {
mlog("The reader is done...\n", .{});
log("onReaderDone(0x{x}, {s})", .{ @intFromPtr(this), @tagName(this.out_type) });
const owned = this.toOwnedSlice();
this.state = .{ .done = owned };
@@ -1267,6 +1273,7 @@ pub const PipeReader = struct {
}
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) jsc.JSValue {
mlog("PipeReader(0x{x}, {s}) toReadableStream()\n", .{ @intFromPtr(this), @tagName(this.out_type) });
defer this.deinit();
switch (this.state) {

View File

@@ -4,6 +4,7 @@
//!
//! Sometimes this namespace is referred to as "Syscall", prefer "bun.sys"/"sys"
const mlog = @import("./mlog.zig").log;
const This = @This();
//
@@ -1550,6 +1551,7 @@ pub fn write(fd: bun.FileDescriptor, bytes: []const u8) Maybe(usize) {
// "WriteFile sets this value to zero before doing any work or error checking."
var bytes_written: u32 = undefined;
bun.assert(bytes.len > 0);
mlog("We're about to WriteFile({}, {})\n", .{ fd, adjusted_len });
const rc = kernel32.WriteFile(
fd.cast(),
bytes.ptr,

View File

@@ -442,13 +442,13 @@ for (let [gcTick, label] of [
}
const fixtures = [
[helloWorld, "hello"],
//[helloWorld, "hello"],
[huge, hugeString],
] as const;
for (const [callback, fixture] of fixtures) {
describe(fixture.slice(0, 12), () => {
describe("should should allow reading stdout", () => {
describe("should allow reading stdout", () => {
it("before exit", async () => {
const process = callback();
const output = await process.stdout.text();