This commit is contained in:
Alistair Smith
2025-05-30 13:42:32 -07:00
parent 10551fcc90
commit 8a8a23c5d6
3 changed files with 14 additions and 8 deletions

View File

@@ -244,7 +244,7 @@ void MessagePort::close()
removeAllEventListeners();
}
// Helper function to process a batch of messages and recursively post tasks for remaining messages
// at MOST 1000 messages per event loop tick so we dont starve the event loop
void MessagePort::processMessageBatch(ScriptExecutionContext& context, Vector<MessageWithMessagePorts>&& messages, Function<void()>&& completionCallback) {
constexpr size_t maxMessagesPerTick = 1000;
size_t messageCount = messages.size();
@@ -267,9 +267,9 @@ void MessagePort::processMessageBatch(ScriptExecutionContext& context, Vector<Me
scope.clearExceptionExceptTermination();
}
if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
// if (Zig::GlobalObject::scriptExecutionStatus(globalObject, globalObject) == ScriptExecutionStatus::Running) {
globalObject->drainMicrotasks();
}
// }
}
if (messageCount > maxMessagesPerTick) {
@@ -279,10 +279,11 @@ void MessagePort::processMessageBatch(ScriptExecutionContext& context, Vector<Me
remainingMessages.append(WTFMove(messages[i]));
}
context.postTask(
[protectedThis = Ref{*this}, remaining = WTFMove(remainingMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
context.postImmediateCppTask(
[protectedThis = Ref { *this }, remaining = WTFMove(remainingMessages), completionCallback = WTFMove(completionCallback)](ScriptExecutionContext& ctx) mutable {
protectedThis->processMessageBatch(ctx, WTFMove(remaining), WTFMove(completionCallback));
});
}
);
} else {
completionCallback();
}

View File

@@ -532,6 +532,12 @@ pub fn enqueueImmediateTask(this: *EventLoop, task: *Timer.ImmediateObject) void
bun.handleOom(this.immediate_tasks.append(bun.default_allocator, task));
}
pub fn enqueueImmediateCppTask(this: *EventLoop, task: *jsc.CppTask) void {
// For now, just enqueue as a regular task
// TODO: implement proper immediate C++ task handling
this.enqueueTask(jsc.Task.init(task));
}
pub fn ensureWaker(this: *EventLoop) void {
jsc.markBinding(@src());
if (this.virtual_machine.event_loop_handle == null) {

View File

@@ -81,8 +81,7 @@ pub export fn Bun__queueTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
pub export fn Bun__queueImmediateCppTask(global: *JSGlobalObject, task: *jsc.CppTask) void {
jsc.markBinding(@src());
// TODO: implement enqueueImmediateCppTask
global.bunVM().eventLoop().enqueueTask(jsc.Task.init(task));
global.bunVM().eventLoop().enqueueImmediateCppTask(task);
}
pub export fn Bun__queueTaskWithTimeout(global: *JSGlobalObject, task: *jsc.CppTask, milliseconds: i32) void {