mirror of
https://github.com/oven-sh/bun
synced 2026-02-03 07:28:53 +00:00
Compare commits
3 Commits
kai/memory
...
jarred/sub
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2c36c8746c | ||
|
|
a0ec7265fb | ||
|
|
1b6c5c6aba |
@@ -98,6 +98,11 @@ src/bun.js/api/bun/spawn.zig
|
||||
src/bun.js/api/bun/spawn/stdio.zig
|
||||
src/bun.js/api/bun/ssl_wrapper.zig
|
||||
src/bun.js/api/bun/subprocess.zig
|
||||
src/bun.js/api/bun/subprocess/Readable.zig
|
||||
src/bun.js/api/bun/subprocess/ResourceUsage.zig
|
||||
src/bun.js/api/bun/subprocess/StaticPipeWriter.zig
|
||||
src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig
|
||||
src/bun.js/api/bun/subprocess/Writable.zig
|
||||
src/bun.js/api/bun/udp_socket.zig
|
||||
src/bun.js/api/bun/x509.zig
|
||||
src/bun.js/api/BunObject.zig
|
||||
@@ -130,6 +135,7 @@ src/bun.js/api/server/StaticRoute.zig
|
||||
src/bun.js/api/server/WebSocketServerContext.zig
|
||||
src/bun.js/api/streams.classes.zig
|
||||
src/bun.js/api/Timer.zig
|
||||
src/bun.js/api/Timer/DateHeaderTimer.zig
|
||||
src/bun.js/api/Timer/EventLoopTimer.zig
|
||||
src/bun.js/api/Timer/ImmediateObject.zig
|
||||
src/bun.js/api/Timer/TimeoutObject.zig
|
||||
|
||||
@@ -82,19 +82,6 @@ private:
|
||||
|
||||
static Loop *create(void *hint) {
|
||||
Loop *loop = ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();
|
||||
|
||||
/* We also need some timers (should live off the one 4 second timer rather) */
|
||||
LoopData *loopData = (LoopData *) us_loop_ext((struct us_loop_t *) loop);
|
||||
loopData->dateTimer = us_create_timer((struct us_loop_t *) loop, 1, sizeof(LoopData *));
|
||||
loopData->updateDate();
|
||||
|
||||
memcpy(us_timer_ext(loopData->dateTimer), &loopData, sizeof(LoopData *));
|
||||
us_timer_set(loopData->dateTimer, [](struct us_timer_t *t) {
|
||||
LoopData *loopData;
|
||||
memcpy(&loopData, us_timer_ext(t), sizeof(LoopData *));
|
||||
loopData->updateDate();
|
||||
}, 1000, 1000);
|
||||
|
||||
return loop;
|
||||
}
|
||||
|
||||
@@ -146,10 +133,7 @@ public:
|
||||
/* Freeing the default loop should be done once */
|
||||
void free() {
|
||||
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
|
||||
|
||||
/* Stop and free dateTimer first */
|
||||
us_timer_close(loopData->dateTimer, 1);
|
||||
|
||||
|
||||
loopData->~LoopData();
|
||||
/* uSockets will track whether this loop is owned by us or a borrowed alien loop */
|
||||
us_loop_free((us_loop_t *) this);
|
||||
|
||||
@@ -151,8 +151,6 @@ public:
|
||||
ZlibContext *zlibContext = nullptr;
|
||||
InflationStream *inflationStream = nullptr;
|
||||
DeflationStream *deflationStream = nullptr;
|
||||
|
||||
us_timer_t *dateTimer;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -310,7 +310,11 @@ pub const VMHolder = struct {
|
||||
};
|
||||
|
||||
pub inline fn get() *VirtualMachine {
|
||||
return VMHolder.vm.?;
|
||||
return getOrNull().?;
|
||||
}
|
||||
|
||||
pub inline fn getOrNull() ?*VirtualMachine {
|
||||
return VMHolder.vm;
|
||||
}
|
||||
|
||||
pub fn getMainThreadVM() ?*VirtualMachine {
|
||||
|
||||
@@ -46,6 +46,9 @@ pub const All = struct {
|
||||
}
|
||||
} = .{},
|
||||
|
||||
/// Updates the "Date" header.
|
||||
date_header_timer: DateHeaderTimer = .{},
|
||||
|
||||
pub fn init() @This() {
|
||||
return .{
|
||||
.thread_id = std.Thread.getCurrentId(),
|
||||
@@ -199,6 +202,27 @@ pub const All = struct {
|
||||
return VirtualMachine.get().timer.last_id;
|
||||
}
|
||||
|
||||
fn isDateTimerActive(this: *const All) bool {
|
||||
return this.date_header_timer.event_loop_timer.state == .ACTIVE;
|
||||
}
|
||||
|
||||
pub fn updateDateHeaderTimerIfNecessary(this: *All, loop: *const uws.Loop, vm: *VirtualMachine) void {
|
||||
if (loop.shouldEnableDateHeaderTimer()) {
|
||||
if (!this.isDateTimerActive()) {
|
||||
this.date_header_timer.enable(
|
||||
vm,
|
||||
// Be careful to avoid adding extra calls to bun.timespec.now()
|
||||
// when it's not needed.
|
||||
&bun.timespec.now(),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// don't un-schedule it here.
|
||||
// it's better to wake up an extra 1 time after a second idle
|
||||
// than to have to check a date potentially on every single HTTP request.
|
||||
}
|
||||
}
|
||||
|
||||
pub fn getTimeout(this: *All, spec: *timespec, vm: *VirtualMachine) bool {
|
||||
var maybe_now: ?timespec = null;
|
||||
while (this.timers.peek()) |min| {
|
||||
@@ -571,6 +595,8 @@ pub const ID = extern struct {
|
||||
/// A timer created by WTF code and invoked by Bun's event loop
|
||||
pub const WTFTimer = @import("./Timer/WTFTimer.zig");
|
||||
|
||||
pub const DateHeaderTimer = @import("./Timer/DateHeaderTimer.zig");
|
||||
|
||||
pub const internal_bindings = struct {
|
||||
/// Node.js has some tests that check whether timers fire at the right time. They check this
|
||||
/// with the internal binding `getLibuvNow()`, which returns an integer in milliseconds. This
|
||||
@@ -605,3 +631,4 @@ const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
const VirtualMachine = jsc.VirtualMachine;
|
||||
const uws = bun.uws;
|
||||
|
||||
80
src/bun.js/api/Timer/DateHeaderTimer.zig
Normal file
80
src/bun.js/api/Timer/DateHeaderTimer.zig
Normal file
@@ -0,0 +1,80 @@
|
||||
/// DateHeaderTimer manages the periodic updating of the "Date" header in Bun.serve().
|
||||
///
|
||||
/// This timer ensures that HTTP responses include an up-to-date Date header by
|
||||
/// updating the date every second when there are active connections.
|
||||
///
|
||||
/// Behavior:
|
||||
/// - When sweep_timer_count > 0 (active connections), the timer should be running
|
||||
/// - When sweep_timer_count = 0 (no connections), the timer doesn't get rescheduled.
|
||||
/// - If the timer was already running, no changes are made.
|
||||
/// - If the timer was not running and needs to start:
|
||||
/// - If the last update was > 1 second ago, update the date immediately and schedule next update
|
||||
/// - If the last update was < 1 second ago, just schedule the next update
|
||||
///
|
||||
/// Note that we only check for potential updates ot this timer once per event loop tick.
|
||||
const DateHeaderTimer = @This();
|
||||
|
||||
event_loop_timer: jsc.API.Timer.EventLoopTimer = .{
|
||||
.tag = .DateHeaderTimer,
|
||||
.next = .epoch,
|
||||
},
|
||||
|
||||
/// Schedule the "Date"" header timer.
|
||||
///
|
||||
/// The logic handles two scenarios:
|
||||
/// 1. If the timer was recently updated (< 1 second ago), just reschedule it
|
||||
/// 2. If the timer is stale (> 1 second since last update), update the date immediately and reschedule
|
||||
pub fn enable(this: *DateHeaderTimer, vm: *VirtualMachine, now: *const bun.timespec) void {
|
||||
bun.debugAssert(this.event_loop_timer.state != .ACTIVE);
|
||||
|
||||
const last_update = this.event_loop_timer.next;
|
||||
const elapsed = now.duration(&last_update).ms();
|
||||
|
||||
// If the last update was more than 1 second ago, the date is stale
|
||||
if (elapsed >= std.time.ms_per_s) {
|
||||
// Update the date immediately since it's stale
|
||||
log("updating stale timer & rescheduling for 1 second later", .{});
|
||||
|
||||
// updateDate() is an expensive function.
|
||||
vm.uwsLoop().updateDate();
|
||||
|
||||
vm.timer.update(&this.event_loop_timer, &now.addMs(std.time.ms_per_s));
|
||||
} else {
|
||||
// The date was updated recently, just reschedule for the next second
|
||||
log("rescheduling timer", .{});
|
||||
vm.timer.insert(&this.event_loop_timer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run(this: *DateHeaderTimer, vm: *VirtualMachine) void {
|
||||
this.event_loop_timer.state = .FIRED;
|
||||
const loop = vm.uwsLoop();
|
||||
const now = bun.timespec.now();
|
||||
|
||||
// Record when we last ran it.
|
||||
this.event_loop_timer.next = now;
|
||||
log("run", .{});
|
||||
|
||||
// updateDate() is an expensive function.
|
||||
loop.updateDate();
|
||||
|
||||
if (loop.internal_loop_data.sweep_timer_count > 0) {
|
||||
// Reschedule it automatically for 1 second later.
|
||||
this.event_loop_timer.next = now.addMs(std.time.ms_per_s);
|
||||
vm.timer.insert(&this.event_loop_timer);
|
||||
}
|
||||
}
|
||||
|
||||
pub export fn Bun__internal_ensureDateHeaderTimerIsEnabled(loop: *uws.Loop) callconv(.C) void {
|
||||
if (jsc.VirtualMachine.getOrNull()) |vm| {
|
||||
vm.timer.updateDateHeaderTimerIfNecessary(loop, vm);
|
||||
}
|
||||
}
|
||||
|
||||
const bun = @import("bun");
|
||||
const jsc = bun.jsc;
|
||||
const VirtualMachine = jsc.VirtualMachine;
|
||||
const std = @import("std");
|
||||
|
||||
const uws = bun.uws;
|
||||
const log = bun.Output.scoped(.DateHeaderTimer, .visible);
|
||||
@@ -65,6 +65,7 @@ pub const Tag = if (Environment.isWindows) enum {
|
||||
DevServerSweepSourceMaps,
|
||||
DevServerMemoryVisualizerTick,
|
||||
AbortSignalTimeout,
|
||||
DateHeaderTimer,
|
||||
|
||||
pub fn Type(comptime T: Tag) type {
|
||||
return switch (T) {
|
||||
@@ -86,6 +87,7 @@ pub const Tag = if (Environment.isWindows) enum {
|
||||
.DevServerMemoryVisualizerTick,
|
||||
=> bun.bake.DevServer,
|
||||
.AbortSignalTimeout => jsc.WebCore.AbortSignal.Timeout,
|
||||
.DateHeaderTimer => jsc.API.Timer.DateHeaderTimer,
|
||||
};
|
||||
}
|
||||
} else enum {
|
||||
@@ -105,6 +107,7 @@ pub const Tag = if (Environment.isWindows) enum {
|
||||
DevServerSweepSourceMaps,
|
||||
DevServerMemoryVisualizerTick,
|
||||
AbortSignalTimeout,
|
||||
DateHeaderTimer,
|
||||
|
||||
pub fn Type(comptime T: Tag) type {
|
||||
return switch (T) {
|
||||
@@ -125,6 +128,7 @@ pub const Tag = if (Environment.isWindows) enum {
|
||||
.DevServerMemoryVisualizerTick,
|
||||
=> bun.bake.DevServer,
|
||||
.AbortSignalTimeout => jsc.WebCore.AbortSignal.Timeout,
|
||||
.DateHeaderTimer => jsc.API.Timer.DateHeaderTimer,
|
||||
};
|
||||
}
|
||||
};
|
||||
@@ -194,6 +198,11 @@ pub fn fire(self: *Self, now: *const timespec, vm: *VirtualMachine) Arm {
|
||||
timeout.run(vm);
|
||||
return .disarm;
|
||||
},
|
||||
.DateHeaderTimer => {
|
||||
const date_header_timer = @as(*jsc.API.Timer.DateHeaderTimer, @fieldParentPtr("event_loop_timer", self));
|
||||
date_header_timer.run(vm);
|
||||
return .disarm;
|
||||
},
|
||||
inline else => |t| {
|
||||
if (@FieldType(t.Type(), "event_loop_timer") != Self) {
|
||||
@compileError(@typeName(t.Type()) ++ " has wrong type for 'event_loop_timer'");
|
||||
|
||||
@@ -84,69 +84,7 @@ pub inline fn assertStdioResult(result: StdioResult) void {
|
||||
}
|
||||
}
|
||||
|
||||
pub const ResourceUsage = struct {
|
||||
pub const js = jsc.Codegen.JSResourceUsage;
|
||||
pub const toJS = ResourceUsage.js.toJS;
|
||||
pub const fromJS = ResourceUsage.js.fromJS;
|
||||
pub const fromJSDirect = ResourceUsage.js.fromJSDirect;
|
||||
|
||||
rusage: Rusage,
|
||||
|
||||
pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
|
||||
var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
const rusage = this.rusage;
|
||||
|
||||
const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec);
|
||||
const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec);
|
||||
|
||||
cpu.put(globalObject, jsc.ZigString.static("user"), usrTime);
|
||||
cpu.put(globalObject, jsc.ZigString.static("system"), sysTime);
|
||||
cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime));
|
||||
|
||||
return cpu;
|
||||
}
|
||||
|
||||
pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.maxrss);
|
||||
}
|
||||
|
||||
pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.ixrss);
|
||||
}
|
||||
|
||||
pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.nswap);
|
||||
}
|
||||
|
||||
pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock));
|
||||
ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock));
|
||||
return ops;
|
||||
}
|
||||
|
||||
pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd));
|
||||
msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv));
|
||||
return msgs;
|
||||
}
|
||||
|
||||
pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.nsignals);
|
||||
}
|
||||
|
||||
pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw));
|
||||
ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw));
|
||||
return ctx;
|
||||
}
|
||||
|
||||
pub fn finalize(this: *ResourceUsage) callconv(.C) void {
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
};
|
||||
pub const ResourceUsage = @import("./subprocess/ResourceUsage.zig");
|
||||
|
||||
pub fn appendEnvpFromJS(globalThis: *jsc.JSGlobalObject, object: *jsc.JSObject, envp: *std.ArrayList(?[*:0]const u8), PATH: *[]const u8) bun.JSError!void {
|
||||
var object_iter = try jsc.JSPropertyIterator(.{ .skip_empty_name = false, .include_value = true }).init(globalThis, object);
|
||||
@@ -207,27 +145,24 @@ pub fn resourceUsage(
|
||||
return this.createResourceUsageObject(globalObject);
|
||||
}
|
||||
|
||||
pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) JSValue {
|
||||
const pid_rusage = this.pid_rusage orelse brk: {
|
||||
if (Environment.isWindows) {
|
||||
if (this.process.poller == .uv) {
|
||||
this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv);
|
||||
break :brk this.pid_rusage.?;
|
||||
pub fn createResourceUsageObject(this: *Subprocess, globalObject: *JSGlobalObject) bun.JSError!JSValue {
|
||||
return ResourceUsage.create(
|
||||
brk: {
|
||||
if (this.pid_rusage != null) {
|
||||
break :brk &this.pid_rusage.?;
|
||||
}
|
||||
}
|
||||
|
||||
return .js_undefined;
|
||||
};
|
||||
if (Environment.isWindows) {
|
||||
if (this.process.poller == .uv) {
|
||||
this.pid_rusage = PosixSpawn.process.uv_getrusage(&this.process.poller.uv);
|
||||
break :brk &this.pid_rusage.?;
|
||||
}
|
||||
}
|
||||
|
||||
const resource_usage = ResourceUsage{
|
||||
.rusage = pid_rusage,
|
||||
};
|
||||
|
||||
var result = bun.default_allocator.create(ResourceUsage) catch {
|
||||
return globalObject.throwOutOfMemoryValue();
|
||||
};
|
||||
result.* = resource_usage;
|
||||
return result.toJS(globalObject);
|
||||
return .js_undefined;
|
||||
},
|
||||
globalObject,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn hasExited(this: *const Subprocess) bool {
|
||||
@@ -357,183 +292,8 @@ pub fn constructor(globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSE
|
||||
return globalObject.throw("Cannot construct Subprocess", .{});
|
||||
}
|
||||
|
||||
const Readable = union(enum) {
|
||||
fd: bun.FileDescriptor,
|
||||
memfd: bun.FileDescriptor,
|
||||
pipe: *PipeReader,
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
closed: void,
|
||||
/// Eventually we will implement Readables created from blobs and array buffers.
|
||||
/// When we do that, `buffer` will be borrowed from those objects.
|
||||
///
|
||||
/// When a buffered `pipe` finishes reading from its file descriptor,
|
||||
/// the owning `Readable` will be convered into this variant and the pipe's
|
||||
/// buffer will be taken as an owned `CowString`.
|
||||
buffer: CowString,
|
||||
|
||||
pub fn memoryCost(this: *const Readable) usize {
|
||||
return switch (this.*) {
|
||||
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
|
||||
.buffer => this.buffer.length(),
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const Readable) bool {
|
||||
return switch (this.*) {
|
||||
.pipe => this.pipe.hasPendingActivity(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn ref(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(true);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unref(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(false);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable {
|
||||
_ = allocator; // autofix
|
||||
_ = is_sync; // autofix
|
||||
assertStdioResult(result);
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
if (stdio == .pipe) {
|
||||
_ = bun.sys.setNonblocking(result.?);
|
||||
}
|
||||
}
|
||||
|
||||
return switch (stdio) {
|
||||
.inherit => Readable{ .inherit = {} },
|
||||
.ignore, .ipc, .path => Readable{ .ignore = {} },
|
||||
.fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd },
|
||||
.memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} },
|
||||
.dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() },
|
||||
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) },
|
||||
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
|
||||
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
|
||||
.readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately
|
||||
};
|
||||
}
|
||||
|
||||
pub fn onClose(this: *Readable, _: ?bun.sys.Error) void {
|
||||
this.* = .closed;
|
||||
}
|
||||
|
||||
pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
|
||||
|
||||
pub fn onStart(_: *Readable) void {}
|
||||
|
||||
pub fn close(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.memfd => |fd| {
|
||||
this.* = .{ .closed = {} };
|
||||
fd.close();
|
||||
},
|
||||
.fd => |_| {
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.pipe => {
|
||||
this.pipe.close();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.memfd => |fd| {
|
||||
this.* = .{ .closed = {} };
|
||||
fd.close();
|
||||
},
|
||||
.fd => {
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.buffer => |*buf| {
|
||||
buf.deinit(bun.default_allocator);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue {
|
||||
_ = exited; // autofix
|
||||
switch (this.*) {
|
||||
// should only be reachable when the entire output is buffered.
|
||||
.memfd => return this.toBufferedValue(globalThis),
|
||||
|
||||
.fd => |fd| {
|
||||
return fd.toJS(globalThis);
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toJS(globalThis);
|
||||
},
|
||||
.buffer => |*buffer| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
|
||||
if (buffer.length() == 0) {
|
||||
return jsc.WebCore.ReadableStream.empty(globalThis);
|
||||
}
|
||||
|
||||
const own = try buffer.takeSlice(bun.default_allocator);
|
||||
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue {
|
||||
switch (this.*) {
|
||||
.fd => |fd| {
|
||||
return fd.toJS(globalThis);
|
||||
},
|
||||
.memfd => |fd| {
|
||||
if (comptime !Environment.isPosix) {
|
||||
Output.panic("memfd is only supported on Linux", .{});
|
||||
}
|
||||
this.* = .{ .closed = {} };
|
||||
return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toBuffer(globalThis);
|
||||
},
|
||||
.buffer => |*buf| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
const own = buf.takeSlice(bun.default_allocator) catch {
|
||||
return globalThis.throwOutOfMemory();
|
||||
};
|
||||
|
||||
return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
pub const PipeReader = @import("./subprocess/SubprocessPipeReader.zig");
|
||||
pub const Readable = @import("./subprocess/Readable.zig").Readable;
|
||||
|
||||
pub fn getStderr(this: *Subprocess, globalThis: *JSGlobalObject) bun.JSError!JSValue {
|
||||
this.observable_getters.insert(.stderr);
|
||||
@@ -810,670 +570,9 @@ pub const Source = union(enum) {
|
||||
}
|
||||
};
|
||||
|
||||
pub const NewStaticPipeWriter = @import("./subprocess/StaticPipeWriter.zig").NewStaticPipeWriter;
|
||||
pub const StaticPipeWriter = NewStaticPipeWriter(Subprocess);
|
||||
|
||||
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
return struct {
|
||||
const This = @This();
|
||||
|
||||
ref_count: WriterRefCount,
|
||||
writer: IOWriter = .{},
|
||||
stdio_result: StdioResult,
|
||||
source: Source = .{ .detached = {} },
|
||||
process: *ProcessType = undefined,
|
||||
event_loop: jsc.EventLoopHandle,
|
||||
buffer: []const u8 = "",
|
||||
|
||||
// It seems there is a bug in the Zig compiler. We'll get back to this one later
|
||||
const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{});
|
||||
pub const ref = WriterRefCount.ref;
|
||||
pub const deref = WriterRefCount.deref;
|
||||
|
||||
const print = bun.Output.scoped(.StaticPipeWriter, .visible);
|
||||
|
||||
pub const IOWriter = bun.io.BufferedWriter(@This(), struct {
|
||||
pub const onWritable = null;
|
||||
pub const getBuffer = This.getBuffer;
|
||||
pub const onClose = This.onClose;
|
||||
pub const onError = This.onError;
|
||||
pub const onWrite = This.onWrite;
|
||||
});
|
||||
pub const Poll = IOWriter;
|
||||
|
||||
pub fn updateRef(this: *This, add: bool) void {
|
||||
this.writer.updateRef(this.event_loop, add);
|
||||
}
|
||||
|
||||
pub fn getBuffer(this: *This) []const u8 {
|
||||
return this.buffer;
|
||||
}
|
||||
|
||||
pub fn close(this: *This) void {
|
||||
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
pub fn flush(this: *This) void {
|
||||
if (this.buffer.len > 0)
|
||||
this.writer.write();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This {
|
||||
const this = bun.new(This, .{
|
||||
.ref_count = .init(),
|
||||
.event_loop = jsc.EventLoopHandle.init(event_loop),
|
||||
.process = subprocess,
|
||||
.stdio_result = result,
|
||||
.source = source,
|
||||
});
|
||||
if (Environment.isWindows) {
|
||||
this.writer.setPipe(this.stdio_result.buffer);
|
||||
}
|
||||
this.writer.setParent(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn start(this: *This) bun.sys.Maybe(void) {
|
||||
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
|
||||
this.ref();
|
||||
this.buffer = this.source.slice();
|
||||
if (Environment.isWindows) {
|
||||
return this.writer.startWithCurrentPipe();
|
||||
}
|
||||
switch (this.writer.start(this.stdio_result.?, true)) {
|
||||
.err => |err| {
|
||||
return .{ .err = err };
|
||||
},
|
||||
.result => {
|
||||
if (comptime Environment.isPosix) {
|
||||
const poll = this.writer.handle.poll;
|
||||
poll.flags.insert(.socket);
|
||||
}
|
||||
|
||||
return .success;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
|
||||
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
|
||||
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
|
||||
if (status == .end_of_file or this.buffer.len == 0) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onError(this: *This, err: bun.sys.Error) void {
|
||||
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
|
||||
this.source.detach();
|
||||
}
|
||||
|
||||
pub fn onClose(this: *This) void {
|
||||
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
|
||||
this.source.detach();
|
||||
this.process.onCloseIO(.stdin);
|
||||
}
|
||||
|
||||
fn _deinit(this: *This) void {
|
||||
this.writer.end();
|
||||
this.source.detach();
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
pub fn memoryCost(this: *const This) usize {
|
||||
return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost();
|
||||
}
|
||||
|
||||
pub fn loop(this: *This) *uws.Loop {
|
||||
return this.event_loop.loop();
|
||||
}
|
||||
|
||||
pub fn watch(this: *This) void {
|
||||
if (this.buffer.len > 0) {
|
||||
this.writer.watch();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *This) jsc.EventLoopHandle {
|
||||
return this.event_loop;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub const PipeReader = struct {
|
||||
const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{});
|
||||
pub const ref = PipeReader.RefCount.ref;
|
||||
pub const deref = PipeReader.RefCount.deref;
|
||||
|
||||
reader: IOReader = undefined,
|
||||
process: ?*Subprocess = null,
|
||||
event_loop: *jsc.EventLoop = undefined,
|
||||
ref_count: PipeReader.RefCount,
|
||||
state: union(enum) {
|
||||
pending: void,
|
||||
done: []u8,
|
||||
err: bun.sys.Error,
|
||||
} = .{ .pending = {} },
|
||||
stdio_result: StdioResult,
|
||||
pub const IOReader = bun.io.BufferedReader;
|
||||
pub const Poll = IOReader;
|
||||
|
||||
pub fn memoryCost(this: *const PipeReader) usize {
|
||||
return this.reader.memoryCost();
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const PipeReader) bool {
|
||||
if (this.state == .pending)
|
||||
return true;
|
||||
|
||||
return this.reader.hasPendingActivity();
|
||||
}
|
||||
|
||||
pub fn detach(this: *PipeReader) void {
|
||||
this.process = null;
|
||||
this.deref();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader {
|
||||
var this = bun.new(PipeReader, .{
|
||||
.ref_count = .init(),
|
||||
.process = process,
|
||||
.reader = IOReader.init(@This()),
|
||||
.event_loop = event_loop,
|
||||
.stdio_result = result,
|
||||
});
|
||||
MaxBuf.addToPipereader(limit, &this.reader.maxbuf);
|
||||
if (Environment.isWindows) {
|
||||
this.reader.source = .{ .pipe = this.stdio_result.buffer };
|
||||
}
|
||||
|
||||
this.reader.setParent(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn readAll(this: *PipeReader) void {
|
||||
if (this.state == .pending)
|
||||
this.reader.read();
|
||||
}
|
||||
|
||||
pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) {
|
||||
this.ref();
|
||||
this.process = process;
|
||||
this.event_loop = event_loop;
|
||||
if (Environment.isWindows) {
|
||||
return this.reader.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
switch (this.reader.start(this.stdio_result.?, true)) {
|
||||
.err => |err| {
|
||||
return .{ .err = err };
|
||||
},
|
||||
.result => {
|
||||
if (comptime Environment.isPosix) {
|
||||
const poll = this.reader.handle.poll;
|
||||
poll.flags.insert(.socket);
|
||||
this.reader.flags.socket = true;
|
||||
this.reader.flags.nonblocking = true;
|
||||
this.reader.flags.pollable = true;
|
||||
poll.flags.insert(.nonblocking);
|
||||
}
|
||||
|
||||
return .success;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub const toJS = toReadableStream;
|
||||
|
||||
pub fn onReaderDone(this: *PipeReader) void {
|
||||
const owned = this.toOwnedSlice();
|
||||
this.state = .{ .done = owned };
|
||||
if (this.process) |process| {
|
||||
this.process = null;
|
||||
process.onCloseIO(this.kind(process));
|
||||
this.deref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
|
||||
if (process.stdout == .pipe and process.stdout.pipe == reader) {
|
||||
return .stdout;
|
||||
}
|
||||
|
||||
if (process.stderr == .pipe and process.stderr.pipe == reader) {
|
||||
return .stderr;
|
||||
}
|
||||
|
||||
@panic("We should be either stdout or stderr");
|
||||
}
|
||||
|
||||
pub fn toOwnedSlice(this: *PipeReader) []u8 {
|
||||
if (this.state == .done) {
|
||||
return this.state.done;
|
||||
}
|
||||
// we do not use .toOwnedSlice() because we don't want to reallocate memory.
|
||||
const out = this.reader._buffer;
|
||||
this.reader._buffer.items = &.{};
|
||||
this.reader._buffer.capacity = 0;
|
||||
|
||||
if (out.capacity > 0 and out.items.len == 0) {
|
||||
out.deinit();
|
||||
return &.{};
|
||||
}
|
||||
|
||||
return out.items;
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *PipeReader, add: bool) void {
|
||||
this.reader.updateRef(add);
|
||||
}
|
||||
|
||||
pub fn watch(this: *PipeReader) void {
|
||||
if (!this.reader.isDone())
|
||||
this.reader.watch();
|
||||
}
|
||||
|
||||
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue {
|
||||
defer this.detach();
|
||||
|
||||
switch (this.state) {
|
||||
.pending => {
|
||||
const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader);
|
||||
this.state = .{ .done = &.{} };
|
||||
return stream;
|
||||
},
|
||||
.done => |bytes| {
|
||||
this.state = .{ .done = &.{} };
|
||||
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0);
|
||||
},
|
||||
.err => |err| {
|
||||
_ = err;
|
||||
const empty = try jsc.WebCore.ReadableStream.empty(globalObject);
|
||||
jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject);
|
||||
return empty;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
|
||||
switch (this.state) {
|
||||
.done => |bytes| {
|
||||
defer this.state = .{ .done = &.{} };
|
||||
return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void {
|
||||
if (this.state == .done) {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
this.state = .{ .err = err };
|
||||
if (this.process) |process|
|
||||
process.onCloseIO(this.kind(process));
|
||||
}
|
||||
|
||||
pub fn close(this: *PipeReader) void {
|
||||
switch (this.state) {
|
||||
.pending => {
|
||||
this.reader.close();
|
||||
},
|
||||
.done => {},
|
||||
.err => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *PipeReader) *jsc.EventLoop {
|
||||
return this.event_loop;
|
||||
}
|
||||
|
||||
pub fn loop(this: *PipeReader) *uws.Loop {
|
||||
return this.event_loop.virtual_machine.uwsLoop();
|
||||
}
|
||||
|
||||
fn deinit(this: *PipeReader) void {
|
||||
if (comptime Environment.isPosix) {
|
||||
bun.assert(this.reader.isDone());
|
||||
}
|
||||
|
||||
if (comptime Environment.isWindows) {
|
||||
bun.assert(this.reader.source == null or this.reader.source.?.isClosed());
|
||||
}
|
||||
|
||||
if (this.state == .done) {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
|
||||
this.reader.deinit();
|
||||
bun.destroy(this);
|
||||
}
|
||||
};
|
||||
|
||||
const Writable = union(enum) {
|
||||
pipe: *jsc.WebCore.FileSink,
|
||||
fd: bun.FileDescriptor,
|
||||
buffer: *StaticPipeWriter,
|
||||
memfd: bun.FileDescriptor,
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
|
||||
pub fn memoryCost(this: *const Writable) usize {
|
||||
return switch (this.*) {
|
||||
.pipe => |pipe| pipe.memoryCost(),
|
||||
.buffer => |buffer| buffer.memoryCost(),
|
||||
// TODO: memfd
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const Writable) bool {
|
||||
return switch (this.*) {
|
||||
.pipe => false,
|
||||
|
||||
// we mark them as .ignore when they are closed, so this must be true
|
||||
.buffer => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn ref(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(true);
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(true);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unref(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(false);
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(false);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
// When the stream has closed we need to be notified to prevent a use-after-free
|
||||
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
|
||||
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
|
||||
const process: *Subprocess = @fieldParentPtr("stdin", this);
|
||||
|
||||
if (process.this_jsvalue != .zero) {
|
||||
if (js.stdinGetCached(process.this_jsvalue)) |existing_value| {
|
||||
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
|
||||
}
|
||||
}
|
||||
|
||||
switch (this.*) {
|
||||
.buffer => {
|
||||
this.buffer.deref();
|
||||
},
|
||||
.pipe => {
|
||||
this.pipe.deref();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
process.onStdinDestroyed();
|
||||
|
||||
this.* = .{
|
||||
.ignore = {},
|
||||
};
|
||||
}
|
||||
pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
|
||||
pub fn onStart(_: *Writable) void {}
|
||||
|
||||
pub fn init(
|
||||
stdio: *Stdio,
|
||||
event_loop: *jsc.EventLoop,
|
||||
subprocess: *Subprocess,
|
||||
result: StdioResult,
|
||||
promise_for_stream: *jsc.JSValue,
|
||||
) !Writable {
|
||||
assertStdioResult(result);
|
||||
|
||||
if (Environment.isWindows) {
|
||||
switch (stdio.*) {
|
||||
.pipe, .readable_stream => {
|
||||
if (result == .buffer) {
|
||||
const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
|
||||
|
||||
switch (pipe.writer.startWithCurrentPipe()) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
_ = err; // autofix
|
||||
pipe.deref();
|
||||
if (stdio.* == .readable_stream) {
|
||||
stdio.readable_stream.cancel(event_loop.global);
|
||||
}
|
||||
return error.UnexpectedCreatingStdin;
|
||||
},
|
||||
}
|
||||
pipe.writer.setParent(pipe);
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
|
||||
if (stdio.* == .readable_stream) {
|
||||
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
|
||||
if (assign_result.toError()) |err| {
|
||||
pipe.deref();
|
||||
subprocess.deref();
|
||||
return event_loop.global.throwValue(err);
|
||||
}
|
||||
promise_for_stream.* = assign_result;
|
||||
}
|
||||
|
||||
return Writable{
|
||||
.pipe = pipe,
|
||||
};
|
||||
}
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
|
||||
.blob => |blob| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
|
||||
};
|
||||
},
|
||||
.array_buffer => |array_buffer| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
|
||||
};
|
||||
},
|
||||
.fd => |fd| {
|
||||
return Writable{ .fd = fd };
|
||||
},
|
||||
.dup2 => |dup2| {
|
||||
return Writable{ .fd = dup2.to.toFd() };
|
||||
},
|
||||
.inherit => {
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
.memfd, .path, .ignore => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
.ipc, .capture => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
if (stdio.* == .pipe) {
|
||||
_ = bun.sys.setNonblocking(result.?);
|
||||
}
|
||||
}
|
||||
|
||||
switch (stdio.*) {
|
||||
.dup2 => @panic("TODO dup2 stdio"),
|
||||
.pipe, .readable_stream => {
|
||||
const pipe = jsc.WebCore.FileSink.create(event_loop, result.?);
|
||||
|
||||
switch (pipe.writer.start(pipe.fd, true)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
_ = err; // autofix
|
||||
pipe.deref();
|
||||
if (stdio.* == .readable_stream) {
|
||||
stdio.readable_stream.cancel(event_loop.global);
|
||||
}
|
||||
|
||||
return error.UnexpectedCreatingStdin;
|
||||
},
|
||||
}
|
||||
|
||||
pipe.writer.handle.poll.flags.insert(.socket);
|
||||
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
|
||||
if (stdio.* == .readable_stream) {
|
||||
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
|
||||
if (assign_result.toError()) |err| {
|
||||
pipe.deref();
|
||||
subprocess.deref();
|
||||
return event_loop.global.throwValue(err);
|
||||
}
|
||||
promise_for_stream.* = assign_result;
|
||||
}
|
||||
|
||||
return Writable{
|
||||
.pipe = pipe,
|
||||
};
|
||||
},
|
||||
|
||||
.blob => |blob| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
|
||||
};
|
||||
},
|
||||
.array_buffer => |array_buffer| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
|
||||
};
|
||||
},
|
||||
.memfd => |memfd| {
|
||||
bun.assert(memfd != bun.invalid_fd);
|
||||
return Writable{ .memfd = memfd };
|
||||
},
|
||||
.fd => {
|
||||
return Writable{ .fd = result.? };
|
||||
},
|
||||
.inherit => {
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
.path, .ignore => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
.ipc, .capture => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue {
|
||||
return switch (this.*) {
|
||||
.fd => |fd| fd.toJS(globalThis),
|
||||
.memfd, .ignore => .js_undefined,
|
||||
.buffer, .inherit => .js_undefined,
|
||||
.pipe => |pipe| {
|
||||
this.* = .{ .ignore = {} };
|
||||
if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) {
|
||||
// onAttachedProcessExit() can call deref on the
|
||||
// subprocess. Since we never called ref(), it would be
|
||||
// unbalanced to do so, leading to a use-after-free.
|
||||
// So, let's not do that.
|
||||
// https://github.com/oven-sh/bun/pull/14092
|
||||
bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed);
|
||||
const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0;
|
||||
pipe.onAttachedProcessExit(&subprocess.process.status);
|
||||
if (Environment.isDebug) {
|
||||
bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get());
|
||||
}
|
||||
return pipe.toJS(globalThis);
|
||||
} else {
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) {
|
||||
pipe.signal.clear();
|
||||
}
|
||||
return pipe.toJSWithDestructor(
|
||||
globalThis,
|
||||
jsc.WebCore.Sink.DestructorPtr.init(subprocess),
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Writable) void {
|
||||
const subprocess: *Subprocess = @fieldParentPtr("stdin", this);
|
||||
if (subprocess.this_jsvalue != .zero) {
|
||||
if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| {
|
||||
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return switch (this.*) {
|
||||
.pipe => |pipe| {
|
||||
if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) {
|
||||
pipe.signal.clear();
|
||||
}
|
||||
|
||||
pipe.deref();
|
||||
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(false);
|
||||
this.buffer.deref();
|
||||
},
|
||||
.memfd => |fd| {
|
||||
fd.close();
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.ignore => {},
|
||||
.fd, .inherit => {},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn close(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => |pipe| {
|
||||
_ = pipe.end(null);
|
||||
},
|
||||
.memfd => |fd| {
|
||||
fd.close();
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.fd => {
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.close();
|
||||
},
|
||||
.ignore => {},
|
||||
.inherit => {},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub fn memoryCost(this: *const Subprocess) usize {
|
||||
return @sizeOf(@This()) +
|
||||
this.process.memoryCost() +
|
||||
@@ -2618,7 +1717,7 @@ pub fn spawnMaybeSync(
|
||||
const exitCode = subprocess.getExitCode(globalThis);
|
||||
const stdout = try subprocess.stdout.toBufferedValue(globalThis);
|
||||
const stderr = try subprocess.stderr.toBufferedValue(globalThis);
|
||||
const resource_usage: JSValue = if (!globalThis.hasException()) subprocess.createResourceUsageObject(globalThis) else .zero;
|
||||
const resource_usage: JSValue = if (!globalThis.hasException()) try subprocess.createResourceUsageObject(globalThis) else .zero;
|
||||
const exitedDueToTimeout = subprocess.event_loop_timer.state == .FIRED;
|
||||
const exitedDueToMaxBuffer = subprocess.exited_due_to_maxbuf;
|
||||
const resultPid = jsc.JSValue.jsNumberFromInt32(subprocess.pid());
|
||||
@@ -2717,7 +1816,8 @@ pub fn getGlobalThis(this: *Subprocess) ?*jsc.JSGlobalObject {
|
||||
|
||||
const IPClog = Output.scoped(.IPC, .visible);
|
||||
|
||||
const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor;
|
||||
pub const StdioResult = if (Environment.isWindows) bun.spawn.WindowsSpawnResult.StdioResult else ?bun.FileDescriptor;
|
||||
pub const Writable = @import("./subprocess/Writable.zig").Writable;
|
||||
|
||||
pub const MaxBuf = bun.io.MaxBuf;
|
||||
|
||||
|
||||
195
src/bun.js/api/bun/subprocess/Readable.zig
Normal file
195
src/bun.js/api/bun/subprocess/Readable.zig
Normal file
@@ -0,0 +1,195 @@
|
||||
pub const Readable = union(enum) {
|
||||
fd: bun.FileDescriptor,
|
||||
memfd: bun.FileDescriptor,
|
||||
pipe: *PipeReader,
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
closed: void,
|
||||
/// Eventually we will implement Readables created from blobs and array buffers.
|
||||
/// When we do that, `buffer` will be borrowed from those objects.
|
||||
///
|
||||
/// When a buffered `pipe` finishes reading from its file descriptor,
|
||||
/// the owning `Readable` will be convered into this variant and the pipe's
|
||||
/// buffer will be taken as an owned `CowString`.
|
||||
buffer: CowString,
|
||||
|
||||
pub fn memoryCost(this: *const Readable) usize {
|
||||
return switch (this.*) {
|
||||
.pipe => @sizeOf(PipeReader) + this.pipe.memoryCost(),
|
||||
.buffer => this.buffer.length(),
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const Readable) bool {
|
||||
return switch (this.*) {
|
||||
.pipe => this.pipe.hasPendingActivity(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn ref(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(true);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unref(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(false);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(stdio: Stdio, event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, allocator: std.mem.Allocator, max_size: ?*MaxBuf, is_sync: bool) Readable {
|
||||
_ = allocator; // autofix
|
||||
_ = is_sync; // autofix
|
||||
Subprocess.assertStdioResult(result);
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
if (stdio == .pipe) {
|
||||
_ = bun.sys.setNonblocking(result.?);
|
||||
}
|
||||
}
|
||||
|
||||
return switch (stdio) {
|
||||
.inherit => Readable{ .inherit = {} },
|
||||
.ignore, .ipc, .path => Readable{ .ignore = {} },
|
||||
.fd => |fd| if (Environment.isPosix) Readable{ .fd = result.? } else Readable{ .fd = fd },
|
||||
.memfd => if (Environment.isPosix) Readable{ .memfd = stdio.memfd } else Readable{ .ignore = {} },
|
||||
.dup2 => |dup2| if (Environment.isPosix) Output.panic("TODO: implement dup2 support in Stdio readable", .{}) else Readable{ .fd = dup2.out.toFd() },
|
||||
.pipe => Readable{ .pipe = PipeReader.create(event_loop, process, result, max_size) },
|
||||
.array_buffer, .blob => Output.panic("TODO: implement ArrayBuffer & Blob support in Stdio readable", .{}),
|
||||
.capture => Output.panic("TODO: implement capture support in Stdio readable", .{}),
|
||||
.readable_stream => Readable{ .ignore = {} }, // ReadableStream is handled separately
|
||||
};
|
||||
}
|
||||
|
||||
pub fn onClose(this: *Readable, _: ?bun.sys.Error) void {
|
||||
this.* = .closed;
|
||||
}
|
||||
|
||||
pub fn onReady(_: *Readable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
|
||||
|
||||
pub fn onStart(_: *Readable) void {}
|
||||
|
||||
pub fn close(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.memfd => |fd| {
|
||||
this.* = .{ .closed = {} };
|
||||
fd.close();
|
||||
},
|
||||
.fd => |_| {
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.pipe => {
|
||||
this.pipe.close();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Readable) void {
|
||||
switch (this.*) {
|
||||
.memfd => |fd| {
|
||||
this.* = .{ .closed = {} };
|
||||
fd.close();
|
||||
},
|
||||
.fd => {
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
},
|
||||
.buffer => |*buf| {
|
||||
buf.deinit(bun.default_allocator);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toJS(this: *Readable, globalThis: *jsc.JSGlobalObject, exited: bool) bun.JSError!JSValue {
|
||||
_ = exited; // autofix
|
||||
switch (this.*) {
|
||||
// should only be reachable when the entire output is buffered.
|
||||
.memfd => return this.toBufferedValue(globalThis),
|
||||
|
||||
.fd => |fd| {
|
||||
return fd.toJS(globalThis);
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toJS(globalThis);
|
||||
},
|
||||
.buffer => |*buffer| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
|
||||
if (buffer.length() == 0) {
|
||||
return jsc.WebCore.ReadableStream.empty(globalThis);
|
||||
}
|
||||
|
||||
const own = try buffer.takeSlice(bun.default_allocator);
|
||||
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalThis, own, 0);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toBufferedValue(this: *Readable, globalThis: *jsc.JSGlobalObject) bun.JSError!JSValue {
|
||||
switch (this.*) {
|
||||
.fd => |fd| {
|
||||
return fd.toJS(globalThis);
|
||||
},
|
||||
.memfd => |fd| {
|
||||
if (comptime !Environment.isPosix) {
|
||||
Output.panic("memfd is only supported on Linux", .{});
|
||||
}
|
||||
this.* = .{ .closed = {} };
|
||||
return jsc.ArrayBuffer.toJSBufferFromMemfd(fd, globalThis);
|
||||
},
|
||||
.pipe => |pipe| {
|
||||
defer pipe.detach();
|
||||
this.* = .{ .closed = {} };
|
||||
return pipe.toBuffer(globalThis);
|
||||
},
|
||||
.buffer => |*buf| {
|
||||
defer this.* = .{ .closed = {} };
|
||||
const own = buf.takeSlice(bun.default_allocator) catch {
|
||||
return globalThis.throwOutOfMemory();
|
||||
};
|
||||
|
||||
return jsc.MarkedArrayBuffer.fromBytes(own, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const std = @import("std");
|
||||
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
const Output = bun.Output;
|
||||
const default_allocator = bun.default_allocator;
|
||||
const CowString = bun.ptr.CowString;
|
||||
const Stdio = bun.spawn.Stdio;
|
||||
|
||||
const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
|
||||
const Subprocess = jsc.API.Subprocess;
|
||||
const MaxBuf = Subprocess.MaxBuf;
|
||||
const PipeReader = Subprocess.PipeReader;
|
||||
const StdioResult = Subprocess.StdioResult;
|
||||
75
src/bun.js/api/bun/subprocess/ResourceUsage.zig
Normal file
75
src/bun.js/api/bun/subprocess/ResourceUsage.zig
Normal file
@@ -0,0 +1,75 @@
|
||||
const ResourceUsage = @This();
|
||||
|
||||
pub const js = jsc.Codegen.JSResourceUsage;
|
||||
pub const toJS = ResourceUsage.js.toJS;
|
||||
pub const fromJS = ResourceUsage.js.fromJS;
|
||||
pub const fromJSDirect = ResourceUsage.js.fromJSDirect;
|
||||
|
||||
rusage: Rusage,
|
||||
|
||||
pub fn create(rusage: *const Rusage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
|
||||
return bun.new(ResourceUsage, .{ .rusage = rusage.* }).toJS(globalObject);
|
||||
}
|
||||
|
||||
pub fn getCPUTime(this: *ResourceUsage, globalObject: *JSGlobalObject) bun.JSError!JSValue {
|
||||
var cpu = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
const rusage = this.rusage;
|
||||
|
||||
const usrTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.utime.usec, rusage.utime.sec);
|
||||
const sysTime = try JSValue.fromTimevalNoTruncate(globalObject, rusage.stime.usec, rusage.stime.sec);
|
||||
|
||||
cpu.put(globalObject, jsc.ZigString.static("user"), usrTime);
|
||||
cpu.put(globalObject, jsc.ZigString.static("system"), sysTime);
|
||||
cpu.put(globalObject, jsc.ZigString.static("total"), JSValue.bigIntSum(globalObject, usrTime, sysTime));
|
||||
|
||||
return cpu;
|
||||
}
|
||||
|
||||
pub fn getMaxRSS(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.maxrss);
|
||||
}
|
||||
|
||||
pub fn getSharedMemorySize(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.ixrss);
|
||||
}
|
||||
|
||||
pub fn getSwapCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.nswap);
|
||||
}
|
||||
|
||||
pub fn getOps(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var ops = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
ops.put(globalObject, jsc.ZigString.static("in"), jsc.JSValue.jsNumber(this.rusage.inblock));
|
||||
ops.put(globalObject, jsc.ZigString.static("out"), jsc.JSValue.jsNumber(this.rusage.oublock));
|
||||
return ops;
|
||||
}
|
||||
|
||||
pub fn getMessages(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var msgs = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
msgs.put(globalObject, jsc.ZigString.static("sent"), jsc.JSValue.jsNumber(this.rusage.msgsnd));
|
||||
msgs.put(globalObject, jsc.ZigString.static("received"), jsc.JSValue.jsNumber(this.rusage.msgrcv));
|
||||
return msgs;
|
||||
}
|
||||
|
||||
pub fn getSignalCount(this: *ResourceUsage, _: *JSGlobalObject) JSValue {
|
||||
return jsc.JSValue.jsNumber(this.rusage.nsignals);
|
||||
}
|
||||
|
||||
pub fn getContextSwitches(this: *ResourceUsage, globalObject: *JSGlobalObject) JSValue {
|
||||
var ctx = jsc.JSValue.createEmptyObjectWithNullPrototype(globalObject);
|
||||
ctx.put(globalObject, jsc.ZigString.static("voluntary"), jsc.JSValue.jsNumber(this.rusage.nvcsw));
|
||||
ctx.put(globalObject, jsc.ZigString.static("involuntary"), jsc.JSValue.jsNumber(this.rusage.nivcsw));
|
||||
return ctx;
|
||||
}
|
||||
|
||||
pub fn finalize(this: *ResourceUsage) callconv(.C) void {
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
const bun = @import("bun");
|
||||
const default_allocator = bun.default_allocator;
|
||||
const Rusage = bun.spawn.Rusage;
|
||||
|
||||
const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
139
src/bun.js/api/bun/subprocess/StaticPipeWriter.zig
Normal file
139
src/bun.js/api/bun/subprocess/StaticPipeWriter.zig
Normal file
@@ -0,0 +1,139 @@
|
||||
pub fn NewStaticPipeWriter(comptime ProcessType: type) type {
|
||||
return struct {
|
||||
const This = @This();
|
||||
|
||||
ref_count: WriterRefCount,
|
||||
writer: IOWriter = .{},
|
||||
stdio_result: StdioResult,
|
||||
source: Source = .{ .detached = {} },
|
||||
process: *ProcessType = undefined,
|
||||
event_loop: jsc.EventLoopHandle,
|
||||
buffer: []const u8 = "",
|
||||
|
||||
// It seems there is a bug in the Zig compiler. We'll get back to this one later
|
||||
const WriterRefCount = bun.ptr.RefCount(@This(), "ref_count", _deinit, .{});
|
||||
pub const ref = WriterRefCount.ref;
|
||||
pub const deref = WriterRefCount.deref;
|
||||
|
||||
const print = bun.Output.scoped(.StaticPipeWriter, .visible);
|
||||
|
||||
pub const IOWriter = bun.io.BufferedWriter(@This(), struct {
|
||||
pub const onWritable = null;
|
||||
pub const getBuffer = This.getBuffer;
|
||||
pub const onClose = This.onClose;
|
||||
pub const onError = This.onError;
|
||||
pub const onWrite = This.onWrite;
|
||||
});
|
||||
pub const Poll = IOWriter;
|
||||
|
||||
pub fn updateRef(this: *This, add: bool) void {
|
||||
this.writer.updateRef(this.event_loop, add);
|
||||
}
|
||||
|
||||
pub fn getBuffer(this: *This) []const u8 {
|
||||
return this.buffer;
|
||||
}
|
||||
|
||||
pub fn close(this: *This) void {
|
||||
log("StaticPipeWriter(0x{x}) close()", .{@intFromPtr(this)});
|
||||
this.writer.close();
|
||||
}
|
||||
|
||||
pub fn flush(this: *This) void {
|
||||
if (this.buffer.len > 0)
|
||||
this.writer.write();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: anytype, subprocess: *ProcessType, result: StdioResult, source: Source) *This {
|
||||
const this = bun.new(This, .{
|
||||
.ref_count = .init(),
|
||||
.event_loop = jsc.EventLoopHandle.init(event_loop),
|
||||
.process = subprocess,
|
||||
.stdio_result = result,
|
||||
.source = source,
|
||||
});
|
||||
if (Environment.isWindows) {
|
||||
this.writer.setPipe(this.stdio_result.buffer);
|
||||
}
|
||||
this.writer.setParent(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn start(this: *This) bun.sys.Maybe(void) {
|
||||
log("StaticPipeWriter(0x{x}) start()", .{@intFromPtr(this)});
|
||||
this.ref();
|
||||
this.buffer = this.source.slice();
|
||||
if (Environment.isWindows) {
|
||||
return this.writer.startWithCurrentPipe();
|
||||
}
|
||||
switch (this.writer.start(this.stdio_result.?, true)) {
|
||||
.err => |err| {
|
||||
return .{ .err = err };
|
||||
},
|
||||
.result => {
|
||||
if (comptime Environment.isPosix) {
|
||||
const poll = this.writer.handle.poll;
|
||||
poll.flags.insert(.socket);
|
||||
}
|
||||
|
||||
return .success;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onWrite(this: *This, amount: usize, status: bun.io.WriteStatus) void {
|
||||
log("StaticPipeWriter(0x{x}) onWrite(amount={d} {})", .{ @intFromPtr(this), amount, status });
|
||||
this.buffer = this.buffer[@min(amount, this.buffer.len)..];
|
||||
if (status == .end_of_file or this.buffer.len == 0) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onError(this: *This, err: bun.sys.Error) void {
|
||||
log("StaticPipeWriter(0x{x}) onError(err={any})", .{ @intFromPtr(this), err });
|
||||
this.source.detach();
|
||||
}
|
||||
|
||||
pub fn onClose(this: *This) void {
|
||||
log("StaticPipeWriter(0x{x}) onClose()", .{@intFromPtr(this)});
|
||||
this.source.detach();
|
||||
this.process.onCloseIO(.stdin);
|
||||
}
|
||||
|
||||
fn _deinit(this: *This) void {
|
||||
this.writer.end();
|
||||
this.source.detach();
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
pub fn memoryCost(this: *const This) usize {
|
||||
return @sizeOf(@This()) + this.source.memoryCost() + this.writer.memoryCost();
|
||||
}
|
||||
|
||||
pub fn loop(this: *This) *uws.Loop {
|
||||
return this.event_loop.loop();
|
||||
}
|
||||
|
||||
pub fn watch(this: *This) void {
|
||||
if (this.buffer.len > 0) {
|
||||
this.writer.watch();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *This) jsc.EventLoopHandle {
|
||||
return this.event_loop;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const log = Output.scoped(.StaticPipeWriter, .hidden);
|
||||
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
const Output = bun.Output;
|
||||
const jsc = bun.jsc;
|
||||
const uws = bun.uws;
|
||||
|
||||
const Subprocess = jsc.API.Subprocess;
|
||||
const Source = Subprocess.Source;
|
||||
const StdioResult = Subprocess.StdioResult;
|
||||
225
src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig
Normal file
225
src/bun.js/api/bun/subprocess/SubprocessPipeReader.zig
Normal file
@@ -0,0 +1,225 @@
|
||||
const PipeReader = @This();
|
||||
|
||||
const RefCount = bun.ptr.RefCount(@This(), "ref_count", PipeReader.deinit, .{});
|
||||
pub const ref = PipeReader.RefCount.ref;
|
||||
pub const deref = PipeReader.RefCount.deref;
|
||||
|
||||
reader: IOReader = undefined,
|
||||
process: ?*Subprocess = null,
|
||||
event_loop: *jsc.EventLoop = undefined,
|
||||
ref_count: PipeReader.RefCount,
|
||||
state: union(enum) {
|
||||
pending: void,
|
||||
done: []u8,
|
||||
err: bun.sys.Error,
|
||||
} = .{ .pending = {} },
|
||||
stdio_result: StdioResult,
|
||||
pub const IOReader = bun.io.BufferedReader;
|
||||
pub const Poll = IOReader;
|
||||
|
||||
pub fn memoryCost(this: *const PipeReader) usize {
|
||||
return this.reader.memoryCost();
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const PipeReader) bool {
|
||||
if (this.state == .pending)
|
||||
return true;
|
||||
|
||||
return this.reader.hasPendingActivity();
|
||||
}
|
||||
|
||||
pub fn detach(this: *PipeReader) void {
|
||||
this.process = null;
|
||||
this.deref();
|
||||
}
|
||||
|
||||
pub fn create(event_loop: *jsc.EventLoop, process: *Subprocess, result: StdioResult, limit: ?*MaxBuf) *PipeReader {
|
||||
var this = bun.new(PipeReader, .{
|
||||
.ref_count = .init(),
|
||||
.process = process,
|
||||
.reader = IOReader.init(@This()),
|
||||
.event_loop = event_loop,
|
||||
.stdio_result = result,
|
||||
});
|
||||
MaxBuf.addToPipereader(limit, &this.reader.maxbuf);
|
||||
if (Environment.isWindows) {
|
||||
this.reader.source = .{ .pipe = this.stdio_result.buffer };
|
||||
}
|
||||
|
||||
this.reader.setParent(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn readAll(this: *PipeReader) void {
|
||||
if (this.state == .pending)
|
||||
this.reader.read();
|
||||
}
|
||||
|
||||
pub fn start(this: *PipeReader, process: *Subprocess, event_loop: *jsc.EventLoop) bun.sys.Maybe(void) {
|
||||
this.ref();
|
||||
this.process = process;
|
||||
this.event_loop = event_loop;
|
||||
if (Environment.isWindows) {
|
||||
return this.reader.startWithCurrentPipe();
|
||||
}
|
||||
|
||||
switch (this.reader.start(this.stdio_result.?, true)) {
|
||||
.err => |err| {
|
||||
return .{ .err = err };
|
||||
},
|
||||
.result => {
|
||||
if (comptime Environment.isPosix) {
|
||||
const poll = this.reader.handle.poll;
|
||||
poll.flags.insert(.socket);
|
||||
this.reader.flags.socket = true;
|
||||
this.reader.flags.nonblocking = true;
|
||||
this.reader.flags.pollable = true;
|
||||
poll.flags.insert(.nonblocking);
|
||||
}
|
||||
|
||||
return .success;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub const toJS = toReadableStream;
|
||||
|
||||
pub fn onReaderDone(this: *PipeReader) void {
|
||||
const owned = this.toOwnedSlice();
|
||||
this.state = .{ .done = owned };
|
||||
if (this.process) |process| {
|
||||
this.process = null;
|
||||
process.onCloseIO(this.kind(process));
|
||||
this.deref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn kind(reader: *const PipeReader, process: *const Subprocess) StdioKind {
|
||||
if (process.stdout == .pipe and process.stdout.pipe == reader) {
|
||||
return .stdout;
|
||||
}
|
||||
|
||||
if (process.stderr == .pipe and process.stderr.pipe == reader) {
|
||||
return .stderr;
|
||||
}
|
||||
|
||||
@panic("We should be either stdout or stderr");
|
||||
}
|
||||
|
||||
pub fn toOwnedSlice(this: *PipeReader) []u8 {
|
||||
if (this.state == .done) {
|
||||
return this.state.done;
|
||||
}
|
||||
// we do not use .toOwnedSlice() because we don't want to reallocate memory.
|
||||
const out = this.reader._buffer;
|
||||
this.reader._buffer.items = &.{};
|
||||
this.reader._buffer.capacity = 0;
|
||||
|
||||
if (out.capacity > 0 and out.items.len == 0) {
|
||||
out.deinit();
|
||||
return &.{};
|
||||
}
|
||||
|
||||
return out.items;
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *PipeReader, add: bool) void {
|
||||
this.reader.updateRef(add);
|
||||
}
|
||||
|
||||
pub fn watch(this: *PipeReader) void {
|
||||
if (!this.reader.isDone())
|
||||
this.reader.watch();
|
||||
}
|
||||
|
||||
pub fn toReadableStream(this: *PipeReader, globalObject: *jsc.JSGlobalObject) bun.JSError!jsc.JSValue {
|
||||
defer this.detach();
|
||||
|
||||
switch (this.state) {
|
||||
.pending => {
|
||||
const stream = jsc.WebCore.ReadableStream.fromPipe(globalObject, this, &this.reader);
|
||||
this.state = .{ .done = &.{} };
|
||||
return stream;
|
||||
},
|
||||
.done => |bytes| {
|
||||
this.state = .{ .done = &.{} };
|
||||
return jsc.WebCore.ReadableStream.fromOwnedSlice(globalObject, bytes, 0);
|
||||
},
|
||||
.err => |err| {
|
||||
_ = err;
|
||||
const empty = try jsc.WebCore.ReadableStream.empty(globalObject);
|
||||
jsc.WebCore.ReadableStream.cancel(&(try jsc.WebCore.ReadableStream.fromJS(empty, globalObject)).?, globalObject);
|
||||
return empty;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toBuffer(this: *PipeReader, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
|
||||
switch (this.state) {
|
||||
.done => |bytes| {
|
||||
defer this.state = .{ .done = &.{} };
|
||||
return jsc.MarkedArrayBuffer.fromBytes(bytes, bun.default_allocator, .Uint8Array).toNodeBuffer(globalThis);
|
||||
},
|
||||
else => {
|
||||
return .js_undefined;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn onReaderError(this: *PipeReader, err: bun.sys.Error) void {
|
||||
if (this.state == .done) {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
this.state = .{ .err = err };
|
||||
if (this.process) |process|
|
||||
process.onCloseIO(this.kind(process));
|
||||
}
|
||||
|
||||
pub fn close(this: *PipeReader) void {
|
||||
switch (this.state) {
|
||||
.pending => {
|
||||
this.reader.close();
|
||||
},
|
||||
.done => {},
|
||||
.err => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn eventLoop(this: *PipeReader) *jsc.EventLoop {
|
||||
return this.event_loop;
|
||||
}
|
||||
|
||||
pub fn loop(this: *PipeReader) *uws.Loop {
|
||||
return this.event_loop.virtual_machine.uwsLoop();
|
||||
}
|
||||
|
||||
fn deinit(this: *PipeReader) void {
|
||||
if (comptime Environment.isPosix) {
|
||||
bun.assert(this.reader.isDone());
|
||||
}
|
||||
|
||||
if (comptime Environment.isWindows) {
|
||||
bun.assert(this.reader.source == null or this.reader.source.?.isClosed());
|
||||
}
|
||||
|
||||
if (this.state == .done) {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
|
||||
this.reader.deinit();
|
||||
bun.destroy(this);
|
||||
}
|
||||
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
const default_allocator = bun.default_allocator;
|
||||
const uws = bun.uws;
|
||||
|
||||
const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
|
||||
const Subprocess = jsc.API.Subprocess;
|
||||
const MaxBuf = Subprocess.MaxBuf;
|
||||
const StdioKind = Subprocess.StdioKind;
|
||||
const StdioResult = Subprocess.StdioResult;
|
||||
334
src/bun.js/api/bun/subprocess/Writable.zig
Normal file
334
src/bun.js/api/bun/subprocess/Writable.zig
Normal file
@@ -0,0 +1,334 @@
|
||||
pub const Writable = union(enum) {
|
||||
pipe: *jsc.WebCore.FileSink,
|
||||
fd: bun.FileDescriptor,
|
||||
buffer: *StaticPipeWriter,
|
||||
memfd: bun.FileDescriptor,
|
||||
inherit: void,
|
||||
ignore: void,
|
||||
|
||||
pub fn memoryCost(this: *const Writable) usize {
|
||||
return switch (this.*) {
|
||||
.pipe => |pipe| pipe.memoryCost(),
|
||||
.buffer => |buffer| buffer.memoryCost(),
|
||||
// TODO: memfd
|
||||
else => 0,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasPendingActivity(this: *const Writable) bool {
|
||||
return switch (this.*) {
|
||||
.pipe => false,
|
||||
|
||||
// we mark them as .ignore when they are closed, so this must be true
|
||||
.buffer => true,
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn ref(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(true);
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(true);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unref(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => {
|
||||
this.pipe.updateRef(false);
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(false);
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
}
|
||||
|
||||
// When the stream has closed we need to be notified to prevent a use-after-free
|
||||
// We can test for this use-after-free by enabling hot module reloading on a file and then saving it twice
|
||||
pub fn onClose(this: *Writable, _: ?bun.sys.Error) void {
|
||||
const process: *Subprocess = @fieldParentPtr("stdin", this);
|
||||
|
||||
if (process.this_jsvalue != .zero) {
|
||||
if (js.stdinGetCached(process.this_jsvalue)) |existing_value| {
|
||||
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
|
||||
}
|
||||
}
|
||||
|
||||
switch (this.*) {
|
||||
.buffer => {
|
||||
this.buffer.deref();
|
||||
},
|
||||
.pipe => {
|
||||
this.pipe.deref();
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
|
||||
process.onStdinDestroyed();
|
||||
|
||||
this.* = .{
|
||||
.ignore = {},
|
||||
};
|
||||
}
|
||||
pub fn onReady(_: *Writable, _: ?jsc.WebCore.Blob.SizeType, _: ?jsc.WebCore.Blob.SizeType) void {}
|
||||
pub fn onStart(_: *Writable) void {}
|
||||
|
||||
pub fn init(
|
||||
stdio: *Stdio,
|
||||
event_loop: *jsc.EventLoop,
|
||||
subprocess: *Subprocess,
|
||||
result: StdioResult,
|
||||
promise_for_stream: *jsc.JSValue,
|
||||
) !Writable {
|
||||
Subprocess.assertStdioResult(result);
|
||||
|
||||
if (Environment.isWindows) {
|
||||
switch (stdio.*) {
|
||||
.pipe, .readable_stream => {
|
||||
if (result == .buffer) {
|
||||
const pipe = jsc.WebCore.FileSink.createWithPipe(event_loop, result.buffer);
|
||||
|
||||
switch (pipe.writer.startWithCurrentPipe()) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
_ = err; // autofix
|
||||
pipe.deref();
|
||||
if (stdio.* == .readable_stream) {
|
||||
stdio.readable_stream.cancel(event_loop.global);
|
||||
}
|
||||
return error.UnexpectedCreatingStdin;
|
||||
},
|
||||
}
|
||||
pipe.writer.setParent(pipe);
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
|
||||
if (stdio.* == .readable_stream) {
|
||||
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
|
||||
if (assign_result.toError()) |err| {
|
||||
pipe.deref();
|
||||
subprocess.deref();
|
||||
return event_loop.global.throwValue(err);
|
||||
}
|
||||
promise_for_stream.* = assign_result;
|
||||
}
|
||||
|
||||
return Writable{
|
||||
.pipe = pipe,
|
||||
};
|
||||
}
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
|
||||
.blob => |blob| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
|
||||
};
|
||||
},
|
||||
.array_buffer => |array_buffer| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
|
||||
};
|
||||
},
|
||||
.fd => |fd| {
|
||||
return Writable{ .fd = fd };
|
||||
},
|
||||
.dup2 => |dup2| {
|
||||
return Writable{ .fd = dup2.to.toFd() };
|
||||
},
|
||||
.inherit => {
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
.memfd, .path, .ignore => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
.ipc, .capture => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
if (stdio.* == .pipe) {
|
||||
_ = bun.sys.setNonblocking(result.?);
|
||||
}
|
||||
}
|
||||
|
||||
switch (stdio.*) {
|
||||
.dup2 => @panic("TODO dup2 stdio"),
|
||||
.pipe, .readable_stream => {
|
||||
const pipe = jsc.WebCore.FileSink.create(event_loop, result.?);
|
||||
|
||||
switch (pipe.writer.start(pipe.fd, true)) {
|
||||
.result => {},
|
||||
.err => |err| {
|
||||
_ = err; // autofix
|
||||
pipe.deref();
|
||||
if (stdio.* == .readable_stream) {
|
||||
stdio.readable_stream.cancel(event_loop.global);
|
||||
}
|
||||
|
||||
return error.UnexpectedCreatingStdin;
|
||||
},
|
||||
}
|
||||
|
||||
pipe.writer.handle.poll.flags.insert(.socket);
|
||||
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
|
||||
if (stdio.* == .readable_stream) {
|
||||
const assign_result = pipe.assignToStream(&stdio.readable_stream, event_loop.global);
|
||||
if (assign_result.toError()) |err| {
|
||||
pipe.deref();
|
||||
subprocess.deref();
|
||||
return event_loop.global.throwValue(err);
|
||||
}
|
||||
promise_for_stream.* = assign_result;
|
||||
}
|
||||
|
||||
return Writable{
|
||||
.pipe = pipe,
|
||||
};
|
||||
},
|
||||
|
||||
.blob => |blob| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .blob = blob }),
|
||||
};
|
||||
},
|
||||
.array_buffer => |array_buffer| {
|
||||
return Writable{
|
||||
.buffer = StaticPipeWriter.create(event_loop, subprocess, result, .{ .array_buffer = array_buffer }),
|
||||
};
|
||||
},
|
||||
.memfd => |memfd| {
|
||||
bun.assert(memfd != bun.invalid_fd);
|
||||
return Writable{ .memfd = memfd };
|
||||
},
|
||||
.fd => {
|
||||
return Writable{ .fd = result.? };
|
||||
},
|
||||
.inherit => {
|
||||
return Writable{ .inherit = {} };
|
||||
},
|
||||
.path, .ignore => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
.ipc, .capture => {
|
||||
return Writable{ .ignore = {} };
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn toJS(this: *Writable, globalThis: *jsc.JSGlobalObject, subprocess: *Subprocess) JSValue {
|
||||
return switch (this.*) {
|
||||
.fd => |fd| fd.toJS(globalThis),
|
||||
.memfd, .ignore => .js_undefined,
|
||||
.buffer, .inherit => .js_undefined,
|
||||
.pipe => |pipe| {
|
||||
this.* = .{ .ignore = {} };
|
||||
if (subprocess.process.hasExited() and !subprocess.flags.has_stdin_destructor_called) {
|
||||
// onAttachedProcessExit() can call deref on the
|
||||
// subprocess. Since we never called ref(), it would be
|
||||
// unbalanced to do so, leading to a use-after-free.
|
||||
// So, let's not do that.
|
||||
// https://github.com/oven-sh/bun/pull/14092
|
||||
bun.debugAssert(!subprocess.flags.deref_on_stdin_destroyed);
|
||||
const debug_ref_count = if (Environment.isDebug) subprocess.ref_count else 0;
|
||||
pipe.onAttachedProcessExit(&subprocess.process.status);
|
||||
if (Environment.isDebug) {
|
||||
bun.debugAssert(subprocess.ref_count.get() == debug_ref_count.get());
|
||||
}
|
||||
return pipe.toJS(globalThis);
|
||||
} else {
|
||||
subprocess.flags.has_stdin_destructor_called = false;
|
||||
subprocess.weak_file_sink_stdin_ptr = pipe;
|
||||
subprocess.ref();
|
||||
subprocess.flags.deref_on_stdin_destroyed = true;
|
||||
if (@intFromPtr(pipe.signal.ptr) == @intFromPtr(subprocess)) {
|
||||
pipe.signal.clear();
|
||||
}
|
||||
return pipe.toJSWithDestructor(
|
||||
globalThis,
|
||||
jsc.WebCore.Sink.DestructorPtr.init(subprocess),
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn finalize(this: *Writable) void {
|
||||
const subprocess: *Subprocess = @fieldParentPtr("stdin", this);
|
||||
if (subprocess.this_jsvalue != .zero) {
|
||||
if (jsc.Codegen.JSSubprocess.stdinGetCached(subprocess.this_jsvalue)) |existing_value| {
|
||||
jsc.WebCore.FileSink.JSSink.setDestroyCallback(existing_value, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return switch (this.*) {
|
||||
.pipe => |pipe| {
|
||||
if (pipe.signal.ptr == @as(*anyopaque, @ptrCast(this))) {
|
||||
pipe.signal.clear();
|
||||
}
|
||||
|
||||
pipe.deref();
|
||||
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.updateRef(false);
|
||||
this.buffer.deref();
|
||||
},
|
||||
.memfd => |fd| {
|
||||
fd.close();
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.ignore => {},
|
||||
.fd, .inherit => {},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn close(this: *Writable) void {
|
||||
switch (this.*) {
|
||||
.pipe => |pipe| {
|
||||
_ = pipe.end(null);
|
||||
},
|
||||
.memfd => |fd| {
|
||||
fd.close();
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.fd => {
|
||||
this.* = .{ .ignore = {} };
|
||||
},
|
||||
.buffer => {
|
||||
this.buffer.close();
|
||||
},
|
||||
.ignore => {},
|
||||
.inherit => {},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
const Stdio = bun.spawn.Stdio;
|
||||
|
||||
const jsc = bun.jsc;
|
||||
const JSGlobalObject = jsc.JSGlobalObject;
|
||||
const JSValue = jsc.JSValue;
|
||||
|
||||
const Subprocess = jsc.API.Subprocess;
|
||||
const StaticPipeWriter = Subprocess.StaticPipeWriter;
|
||||
const StdioResult = Subprocess.StdioResult;
|
||||
const js = Subprocess.js;
|
||||
@@ -326,8 +326,8 @@ pub fn usocketsLoop(this: *const EventLoop) *uws.Loop {
|
||||
}
|
||||
|
||||
pub fn autoTick(this: *EventLoop) void {
|
||||
var loop = this.usocketsLoop();
|
||||
var ctx = this.virtual_machine;
|
||||
const loop = this.usocketsLoop();
|
||||
const ctx = this.virtual_machine;
|
||||
|
||||
this.tickImmediateTasks(ctx);
|
||||
if (comptime Environment.isPosix) {
|
||||
@@ -350,6 +350,8 @@ pub fn autoTick(this: *EventLoop) void {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.timer.updateDateHeaderTimerIfNecessary(loop, ctx);
|
||||
|
||||
this.runImminentGCTimer();
|
||||
|
||||
if (loop.isActive()) {
|
||||
@@ -378,8 +380,8 @@ pub fn autoTick(this: *EventLoop) void {
|
||||
}
|
||||
|
||||
pub fn tickPossiblyForever(this: *EventLoop) void {
|
||||
var ctx = this.virtual_machine;
|
||||
var loop = this.usocketsLoop();
|
||||
const ctx = this.virtual_machine;
|
||||
const loop = this.usocketsLoop();
|
||||
|
||||
if (comptime Environment.isPosix) {
|
||||
const pending_unref = ctx.pending_unref_counter;
|
||||
@@ -429,6 +431,8 @@ pub fn autoTickActive(this: *EventLoop) void {
|
||||
}
|
||||
}
|
||||
|
||||
ctx.timer.updateDateHeaderTimerIfNecessary(loop, ctx);
|
||||
|
||||
if (loop.isActive()) {
|
||||
this.processGCTimer();
|
||||
var timespec: bun.timespec = undefined;
|
||||
|
||||
@@ -23,6 +23,11 @@ using TCPWebSocket = uWS::WebSocket<false, true, void *>;
|
||||
extern "C"
|
||||
{
|
||||
|
||||
void uws_loop_date_header_timer_update(us_loop_t *loop) {
|
||||
uWS::LoopData *loopData = uWS::Loop::data(loop);
|
||||
loopData->updateDate();
|
||||
}
|
||||
|
||||
uws_app_t *uws_create_app(int ssl, struct us_bun_socket_context_options_t options)
|
||||
{
|
||||
if (ssl)
|
||||
|
||||
@@ -29,6 +29,10 @@ pub const InternalLoopData = extern struct {
|
||||
return this.recv_buf[0..LIBUS_RECV_BUFFER_LENGTH];
|
||||
}
|
||||
|
||||
pub fn shouldEnableDateHeaderTimer(this: *const InternalLoopData) bool {
|
||||
return this.sweep_timer_count > 0;
|
||||
}
|
||||
|
||||
pub fn setParentEventLoop(this: *InternalLoopData, parent: jsc.EventLoopHandle) void {
|
||||
switch (parent) {
|
||||
.js => |ptr| {
|
||||
|
||||
@@ -31,6 +31,10 @@ pub const PosixLoop = extern struct {
|
||||
c.uws_res_clear_corked_socket(this);
|
||||
}
|
||||
|
||||
pub fn updateDate(this: *PosixLoop) void {
|
||||
c.uws_loop_date_header_timer_update(this);
|
||||
}
|
||||
|
||||
pub fn iterationNumber(this: *const PosixLoop) u64 {
|
||||
return this.internal_loop_data.iteration_nr;
|
||||
}
|
||||
@@ -157,6 +161,10 @@ pub const PosixLoop = extern struct {
|
||||
pub fn run(this: *PosixLoop) void {
|
||||
c.us_loop_run(this);
|
||||
}
|
||||
|
||||
pub fn shouldEnableDateHeaderTimer(this: *const PosixLoop) bool {
|
||||
return this.internal_loop_data.shouldEnableDateHeaderTimer();
|
||||
}
|
||||
};
|
||||
|
||||
pub const WindowsLoop = extern struct {
|
||||
@@ -169,6 +177,10 @@ pub const WindowsLoop = extern struct {
|
||||
pre: *uv.uv_prepare_t,
|
||||
check: *uv.uv_check_t,
|
||||
|
||||
pub fn shouldEnableDateHeaderTimer(this: *const WindowsLoop) bool {
|
||||
return this.internal_loop_data.shouldEnableDateHeaderTimer();
|
||||
}
|
||||
|
||||
pub fn uncork(this: *PosixLoop) void {
|
||||
c.uws_res_clear_corked_socket(this);
|
||||
}
|
||||
@@ -245,6 +257,10 @@ pub const WindowsLoop = extern struct {
|
||||
c.uws_loop_defer(this, user_data, Handler.callback);
|
||||
}
|
||||
|
||||
pub fn updateDate(this: *Loop) void {
|
||||
c.uws_loop_date_header_timer_update(this);
|
||||
}
|
||||
|
||||
fn NewHandler(comptime UserType: type, comptime callback_fn: fn (UserType) void) type {
|
||||
return struct {
|
||||
loop: *Loop,
|
||||
@@ -287,6 +303,7 @@ const c = struct {
|
||||
pub extern fn uws_get_loop_with_native(*anyopaque) *WindowsLoop;
|
||||
pub extern fn uws_loop_defer(loop: *Loop, ctx: *anyopaque, cb: *const (fn (ctx: *anyopaque) callconv(.C) void)) void;
|
||||
pub extern fn uws_res_clear_corked_socket(loop: *Loop) void;
|
||||
pub extern fn uws_loop_date_header_timer_update(loop: *Loop) void;
|
||||
};
|
||||
|
||||
const log = bun.Output.scoped(.Loop, .visible);
|
||||
|
||||
55
test/js/bun/http/bun-serve-date.test.ts
Normal file
55
test/js/bun/http/bun-serve-date.test.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
import { test, expect } from "bun:test";
|
||||
|
||||
test("Date header is not updated every request", async () => {
|
||||
const twoSecondsAgo = new Date(Date.now() - 2 * 1000);
|
||||
await using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch() {
|
||||
return new Response("OK");
|
||||
},
|
||||
});
|
||||
|
||||
// Make multiple requests in quick succession
|
||||
const responses = await Promise.all([
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
]);
|
||||
|
||||
// All responses should have the same Date header since they were made within the same second
|
||||
const dates = responses.map(r => r.headers.get("Date"));
|
||||
const uniqueDates = new Set(dates);
|
||||
|
||||
// Should only have 1 unique date value since all requests were made rapidly
|
||||
expect(uniqueDates.size).toBe(1);
|
||||
expect(dates[0]).toBeTruthy();
|
||||
|
||||
for (const delay of [250, 250, 250, 250, 250]) {
|
||||
await Bun.sleep(delay);
|
||||
const laterResponses = await Promise.all([
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
fetch(server.url),
|
||||
]);
|
||||
const laterDates = laterResponses.map(r => r.headers.get("Date"));
|
||||
const laterUniqueDates = new Set(laterDates);
|
||||
expect(laterUniqueDates.size).toBe(1);
|
||||
uniqueDates.add([...laterUniqueDates][0]);
|
||||
}
|
||||
|
||||
// There should only really be two, but I don't trust timers to be SUPER accurate.
|
||||
expect(uniqueDates.size).toBeLessThan(4);
|
||||
|
||||
for (const date of [...uniqueDates]) {
|
||||
const d = new Date(date!);
|
||||
const stamp = d.getTime();
|
||||
expect(Number.isFinite(stamp)).toBe(true);
|
||||
expect(stamp).toBeGreaterThan(0);
|
||||
expect(stamp).toBeGreaterThan(twoSecondsAgo.getTime());
|
||||
expect(stamp).toBeLessThan(Date.now() + 100);
|
||||
}
|
||||
});
|
||||
Reference in New Issue
Block a user