Compare commits

...

8 Commits

Author SHA1 Message Date
Dylan Conway
d8b5176cc0 fix 2023-09-12 21:27:30 -07:00
Dylan Conway
ec251863d1 track registered one shot fds 2023-09-12 20:47:14 -07:00
Dylan Conway
85bf54a3ac Merge branch 'main' into dylan/cli-fixes 2023-09-11 20:38:47 -07:00
Dylan Conway
58d5771589 Merge branch 'main' into dylan/cli-fixes 2023-09-11 18:21:14 -07:00
Dylan Conway
5884a5cec2 make sure epoll is deleted 2023-09-11 18:18:25 -07:00
Dylan Conway
00be978c3a delete on rearm 2023-09-11 12:10:19 -07:00
Dylan Conway
bd359df23c Merge branch 'main' into dylan/cli-fixes 2023-09-11 12:02:02 -07:00
Dylan Conway
6bc5098e7e eof and check for undefined 2023-09-03 22:56:25 -07:00
7 changed files with 90 additions and 19 deletions

View File

@@ -2168,10 +2168,19 @@ pub const FilePoll = struct {
var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @intFromPtr(Pollable.init(this).ptr()) } };
var op: u32 = if (this.isRegistered() or this.flags.contains(.needs_rearm)) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD;
if (op == linux.EPOLL.CTL_ADD and this.flags.contains(.one_shot)) {
var gpe = JSC.VirtualMachine.get().registered_one_shot_epoll_fds.getOrPut(fd) catch unreachable;
if (gpe.found_existing) {
op = linux.EPOLL.CTL_MOD;
}
}
const ctl = linux.epoll_ctl(
watcher_fd,
if (this.isRegistered() or this.flags.contains(.needs_rearm)) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD,
@as(std.os.fd_t, @intCast(fd)),
op,
@intCast(fd),
&event,
);
this.flags.insert(.was_ever_registered);
@@ -2322,10 +2331,11 @@ pub const FilePoll = struct {
log("unregister: {s} ({d})", .{ @tagName(flag), fd });
if (comptime Environment.isLinux) {
_ = JSC.VirtualMachine.get().registered_one_shot_epoll_fds.remove(fd);
const ctl = linux.epoll_ctl(
watcher_fd,
linux.EPOLL.CTL_DEL,
@as(std.os.fd_t, @intCast(fd)),
@intCast(fd),
null,
);

View File

@@ -526,6 +526,8 @@ pub const VirtualMachine = struct {
debugger: ?Debugger = null,
has_started_debugger: bool = false,
registered_one_shot_epoll_fds: std.AutoHashMap(u64, void),
pub const OnUnhandledRejection = fn (*VirtualMachine, globalObject: *JSC.JSGlobalObject, JSC.JSValue) void;
pub const OnException = fn (*ZigException) void;
@@ -1068,6 +1070,7 @@ pub const VirtualMachine = struct {
.file_blobs = JSC.WebCore.Blob.Store.Map.init(allocator),
.standalone_module_graph = opts.graph.?,
.parser_arena = @import("root").bun.ArenaAllocator.init(allocator),
.registered_one_shot_epoll_fds = std.AutoHashMap(u64, void).init(allocator),
};
vm.source_mappings = .{ .map = &vm.saved_source_map_table };
vm.regular_event_loop.tasks = EventLoop.Queue.init(
@@ -1170,6 +1173,7 @@ pub const VirtualMachine = struct {
.ref_strings_mutex = Lock.init(),
.file_blobs = JSC.WebCore.Blob.Store.Map.init(allocator),
.parser_arena = @import("root").bun.ArenaAllocator.init(allocator),
.registered_one_shot_epoll_fds = std.AutoHashMap(u64, void).init(allocator),
};
vm.source_mappings = .{ .map = &vm.saved_source_map_table };
vm.regular_event_loop.tasks = EventLoop.Queue.init(
@@ -1301,6 +1305,7 @@ pub const VirtualMachine = struct {
.parser_arena = @import("root").bun.ArenaAllocator.init(allocator),
.standalone_module_graph = worker.parent.standalone_module_graph,
.worker = worker,
.registered_one_shot_epoll_fds = std.AutoHashMap(u64, void).init(allocator),
};
vm.source_mappings = .{ .map = &vm.saved_source_map_table };
vm.regular_event_loop.tasks = EventLoop.Queue.init(
@@ -1922,6 +1927,7 @@ pub const VirtualMachine = struct {
// TODO:
pub fn deinit(this: *VirtualMachine) void {
this.source_mappings.deinit();
this.registered_one_shot_epoll_fds.deinit();
}
pub const ExceptionList = std.ArrayList(Api.JsException);

View File

@@ -3919,12 +3919,7 @@ pub const FIFO = struct {
}
}
if (comptime Environment.isLinux) {
if (available == 0) {
std.debug.assert(this.poll_ref == null);
return .pending;
}
} else if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
if (available == std.math.maxInt(@TypeOf(available)) and this.poll_ref == null) {
// we don't know if it's readable or not
return switch (bun.isReadable(this.fd)) {
.hup => {
@@ -3994,6 +3989,12 @@ pub const FIFO = struct {
if (this.to_read) |*to_read| {
to_read.* = to_read.* -| @as(u32, @truncate(read_result.read.len));
}
if (this.poll_ref) |poll_ref| {
if (poll_ref.flags.contains(.eof)) {
this.close();
return;
}
}
}
this.pending.result = read_result.toStream(
@@ -4123,11 +4124,12 @@ pub const FIFO = struct {
// otherwise we might block the process
poll.flags.remove(.readable);
}
if (result == 0) {
poll.flags.insert(.eof);
}
}
if (result == 0) {
return .{ .read = buf[0..0] };
}
return .{ .read = buf[0..result] };
},
}

View File

@@ -147,11 +147,10 @@ export function getStdinStream(fd) {
try {
var done: any, value: any;
const read = reader?.readMany();
if (!read) return;
if ($isPromise(read)) {
({ done, value } = await read);
} else {
// @ts-expect-error
({ done, value } = read);
}

View File

@@ -119,6 +119,60 @@ Object.defineProperty(WriteStream, "prototype", {
const Real = require("node:fs").WriteStream.prototype;
Object.defineProperty(WriteStream, "prototype", { value: Real });
WriteStream.prototype[Symbol.asyncIterator] = async function* () {
const stream = Bun.file(this.fd).stream();
var reader = stream.getReader();
// TODO: use builtin
var deferredError;
var indexOf = Bun.indexOfLine;
try {
while (true) {
var done, value;
var pendingChunk;
const firstResult = reader.readMany();
if ($isPromise(firstResult)) {
({ done, value } = await firstResult);
} else {
({ done, value } = firstResult);
}
if (done) {
if (pendingChunk) {
yield pendingChunk;
}
return;
}
var actualChunk;
// we assume it was given line-by-line
for (const chunk of value) {
actualChunk = chunk;
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, chunk]);
pendingChunk = null;
}
var last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
var i = indexOf(actualChunk, last) + 1;
while (i !== -1) {
yield actualChunk.subarray(last, i);
last = i + 1;
i = indexOf(actualChunk, last);
}
if (i != -1) {
pendingChunk = actualChunk.subarray(last);
}
}
}
} catch (e) {
deferredError = e;
} finally {
reader.releaseLock();
if (deferredError) {
throw deferredError;
}
}
};
WriteStream.prototype._refreshSize = function () {
const oldCols = this.columns;
const oldRows = this.rows;

File diff suppressed because one or more lines are too long

View File

@@ -654,9 +654,9 @@ const char* const s_processObjectInternalsGetStdioWriteStreamCode = "(function (
const JSC::ConstructAbility s_processObjectInternalsGetStdinStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_processObjectInternalsGetStdinStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_processObjectInternalsGetStdinStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
const int s_processObjectInternalsGetStdinStreamCodeLength = 1386;
const int s_processObjectInternalsGetStdinStreamCodeLength = 1402;
static const JSC::Intrinsic s_processObjectInternalsGetStdinStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_processObjectInternalsGetStdinStreamCode = "(function (fd){\"use strict\";var reader,readerRef;function ref(){reader\?\?=@Bun.stdin.stream().getReader(),readerRef\?\?=setInterval(()=>{},1<<30)}function unref(){if(readerRef)clearInterval(readerRef),readerRef=@undefined;if(reader)reader.cancel(),reader=@undefined}const stream=new((@getInternalField(@internalModuleRegistry,44))||(@createInternalModuleById(44))).ReadStream(fd),originalOn=stream.on;stream.on=function(event,listener){if(event===\"readable\")ref();return originalOn.call(this,event,listener)},stream.fd=fd;const originalPause=stream.pause;stream.pause=function(){return unref(),originalPause.call(this)};const originalResume=stream.resume;stream.resume=function(){return ref(),originalResume.call(this)};async function internalRead(stream2){try{var done,value;const read=reader\?.readMany();if(@isPromise(read))({done,value}=await read);else({done,value}=read);if(!done){stream2.push(value[0]);const length=value.length;for(let i=1;i<length;i++)stream2.push(value[i])}else stream2.emit(\"end\"),stream2.pause()}catch(err){stream2.destroy(err)}}return stream._read=function(size){internalRead(this)},stream.on(\"resume\",()=>{ref(),stream._undestroy()}),stream._readableState.reading=!1,stream.on(\"pause\",()=>{process.nextTick(()=>{if(!stream.readableFlowing)stream._readableState.reading=!1})}),stream.on(\"close\",()=>{process.nextTick(()=>{stream.destroy(),unref()})}),stream})\n";
const char* const s_processObjectInternalsGetStdinStreamCode = "(function (fd){\"use strict\";var reader,readerRef;function ref(){reader\?\?=@Bun.stdin.stream().getReader(),readerRef\?\?=setInterval(()=>{},1<<30)}function unref(){if(readerRef)clearInterval(readerRef),readerRef=@undefined;if(reader)reader.cancel(),reader=@undefined}const stream=new((@getInternalField(@internalModuleRegistry,44))||(@createInternalModuleById(44))).ReadStream(fd),originalOn=stream.on;stream.on=function(event,listener){if(event===\"readable\")ref();return originalOn.call(this,event,listener)},stream.fd=fd;const originalPause=stream.pause;stream.pause=function(){return unref(),originalPause.call(this)};const originalResume=stream.resume;stream.resume=function(){return ref(),originalResume.call(this)};async function internalRead(stream2){try{var done,value;const read=reader\?.readMany();if(!read)return;if(@isPromise(read))({done,value}=await read);else({done,value}=read);if(!done){stream2.push(value[0]);const length=value.length;for(let i=1;i<length;i++)stream2.push(value[i])}else stream2.emit(\"end\"),stream2.pause()}catch(err){stream2.destroy(err)}}return stream._read=function(size){internalRead(this)},stream.on(\"resume\",()=>{ref(),stream._undestroy()}),stream._readableState.reading=!1,stream.on(\"pause\",()=>{process.nextTick(()=>{if(!stream.readableFlowing)stream._readableState.reading=!1})}),stream.on(\"close\",()=>{process.nextTick(()=>{stream.destroy(),unref()})}),stream})\n";
// initializeNextTickQueue
const JSC::ConstructAbility s_processObjectInternalsInitializeNextTickQueueCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;