Compare commits

...

22 Commits

Author SHA1 Message Date
Jarred Sumner
1ab4966bfd Make this test faster in debug builds 2024-10-20 18:55:02 -07:00
Jarred Sumner
002fd74993 Deflake a couple tests 2024-10-20 18:44:19 -07:00
Jarred Sumner
b319c5f7fe Fix a test failure 2024-10-20 18:11:58 -07:00
Jarred Sumner
2c7ea40b65 fix 2024-10-20 17:22:24 -07:00
Jarred Sumner
ba66bf14da Update uws.zig 2024-10-20 17:14:43 -07:00
Jarred Sumner
07d6e31baa windows 2024-10-20 17:13:46 -07:00
Jarred Sumner
33de5926de Merge branch 'main' into jarred/timer 2024-10-20 17:08:31 -07:00
Jarred Sumner
8db87098bb wip 2024-10-18 23:01:53 -07:00
Jarred Sumner
4975b1fd2e Revert "Experiment: keep flushing immediate queue"
This reverts commit cd5c6693b5.
2024-10-17 23:53:15 -07:00
Jarred Sumner
133b0045fc Fix test 2024-10-16 11:30:29 -07:00
Jarred Sumner
bab5fe7588 Merge branch 'main' into jarred/timer 2024-10-16 11:12:05 -07:00
Jarred Sumner
cd5c6693b5 Experiment: keep flushing immediate queue 2024-10-16 01:20:58 -07:00
Jarred Sumner
022c7cf500 Update socket.zig 2024-10-16 00:30:38 -07:00
Jarred Sumner
0ab7ed84cb Fixups 2024-10-16 00:28:07 -07:00
Jarred Sumner
1a0badd830 Fix edgecase 2024-10-15 23:31:17 -07:00
Jarred Sumner
87f070acac Fix edgecase with AbortSignal 2024-10-15 21:58:20 -07:00
Jarred Sumner
5d763fd71e fix bug 2024-10-15 20:39:58 -07:00
Jarred Sumner
6d9e2d437a hmm 2024-10-15 19:58:42 -07:00
Jarred Sumner
a444dcebb8 some tests 2024-10-15 19:17:41 -07:00
Jarred Sumner
cd05556689 auto uncork 2024-10-15 18:35:01 -07:00
Jarred Sumner
ef4435d987 fixup 2024-10-15 18:04:03 -07:00
Jarred Sumner
318eb7bcbf Fix num_polls never going down 2024-10-15 18:00:24 -07:00
29 changed files with 372 additions and 199 deletions

View File

@@ -348,11 +348,14 @@ private:
return (us_socket_t *) asyncSocket;
}
/* It is okay to uncork a closed socket and we need to */
((AsyncSocket<SSL> *) s)->uncork();
if (returnedSocket != nullptr) {
/* It is okay to uncork a closed socket, and we need to */
((AsyncSocket<SSL> *) s)->uncork();
return s;
}
/* We cannot return nullptr to the underlying stack in any case */
return s;
// It's okay if this returns nullptr.
return static_cast<us_socket_t*>(nullptr);
});
/* Handle HTTP write out (note: SSL_read may trigger this spuriously, the app need to handle spurious calls) */

View File

@@ -24,6 +24,7 @@
#include "LoopData.h"
#include <libusockets.h>
#include <iostream>
#include "AsyncSocket.h"
extern "C" int bun_is_exiting();
@@ -148,6 +149,10 @@ public:
getLazyLoop().loop = nullptr;
}
static LoopData* data(struct us_loop_t *loop) {
return (LoopData *) us_loop_ext(loop);
}
void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);

View File

@@ -63,6 +63,7 @@ public:
}
delete [] corkBuffer;
}
void* getCorkedSocket() {
return this->corkedSocket;
}

View File

@@ -55,11 +55,11 @@ pub const KeepAlive = struct {
this.status = .inactive;
if (comptime @TypeOf(event_loop_ctx_) == JSC.EventLoopHandle) {
event_loop_ctx_.loop().subActive(1);
event_loop_ctx_.loop().unrefCount(1);
return;
}
const event_loop_ctx = JSC.AbstractVM(event_loop_ctx_);
event_loop_ctx.platformEventLoop().subActive(1);
event_loop_ctx.platformEventLoop().unrefCount(1);
}
/// From another thread, Prevent a poll from keeping the process alive.

View File

@@ -30,6 +30,7 @@ pub const All = struct {
timers: TimerHeap = .{
.context = {},
},
timer_ref: bun.Async.KeepAlive = .{},
active_timer_count: i32 = 0,
uv_timer: if (Environment.isWindows) uv.Timer else void =
if (Environment.isWindows) std.mem.zeroes(uv.Timer) else {},
@@ -109,17 +110,17 @@ pub const All = struct {
this.active_timer_count = new;
if (old <= 0 and new > 0) {
if (comptime Environment.isWindows) {
this.uv_timer.ref();
if (comptime Environment.isPosix) {
if (new > 0) {
this.timer_ref.ref(vm);
} else {
vm.uwsLoop().ref();
this.timer_ref.unref(vm);
}
} else if (old > 0 and new <= 0) {
if (comptime Environment.isWindows) {
} else if (Environment.isWindows) {
if (old <= 0 and new > 0) {
this.uv_timer.ref();
} else if (old > 0 and new <= 0) {
this.uv_timer.unref();
} else {
vm.uwsLoop().unref();
}
}
}

View File

@@ -628,7 +628,7 @@ pub const Listener = struct {
this.listener = .{
// we need to add support for the backlog parameter on listen here we use the default value of nodejs
.namedPipe = WindowsNamedPipeListeningContext.listen(globalObject, pipe_name, 511, ssl, this) catch {
.namedPipe = WindowsNamedPipeListeningContext.listen(globalObject, pipe_name, 511, &ssl.?, this) catch {
exception.* = JSC.toInvalidArguments("Failed to listen at {s}", .{pipe_name}, globalObject).asObjectRef();
this.deinit();
return .zero;
@@ -644,7 +644,7 @@ pub const Listener = struct {
}
}
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl != null)
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl.?)
JSC.API.ServerConfig.SSLConfig.asUSockets(&ssl.?)
else
.{};
@@ -1057,13 +1057,13 @@ pub const Listener = struct {
}
const vm = globalObject.bunVM();
const socket_config = SocketConfig.fromJS(vm, opts, globalObject, exception) orelse {
var socket_config: SocketConfig = SocketConfig.fromJS(vm, opts, globalObject, exception) orelse {
return .zero;
};
var hostname_or_unix = socket_config.hostname_or_unix;
const port = socket_config.port;
var ssl = socket_config.ssl;
var ssl = if (socket_config.ssl) |*ssl_config| ssl_config else null;
var handlers = socket_config.handlers;
var default_data = socket_config.default_data;
@@ -1178,8 +1178,8 @@ pub const Listener = struct {
}
}
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl != null)
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl.?)
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl) |ssl_config|
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl_config)
else
.{};
@@ -3581,10 +3581,14 @@ pub const DuplexUpgradeContext = struct {
fn onError(this: *DuplexUpgradeContext, err_value: JSC.JSValue) void {
if (this.is_open) {
if (this.tls) |tls| {
this.tls = null;
defer tls.deref();
tls.handleError(err_value);
}
} else {
if (this.tls) |tls| {
this.tls = null;
defer tls.deref();
tls.handleConnectError(@intFromEnum(bun.C.SystemErrno.ECONNREFUSED));
}
}
@@ -3602,6 +3606,8 @@ pub const DuplexUpgradeContext = struct {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);
if (this.tls) |tls| {
defer tls.deref();
this.tls = null;
tls.onClose(socket, 0, null);
}
@@ -3611,28 +3617,31 @@ pub const DuplexUpgradeContext = struct {
fn runEvent(this: *DuplexUpgradeContext) void {
switch (this.task_event) {
.StartTLS => {
if (this.ssl_config) |config| {
this.upgrade.startTLS(config, true) catch |err| {
switch (err) {
error.OutOfMemory => {
bun.outOfMemory();
},
else => {
const errno = @intFromEnum(bun.C.SystemErrno.ECONNREFUSED);
if (this.tls) |tls| {
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);
const tls = this.tls.?;
defer tls.deref();
var config = this.ssl_config orelse return;
defer config.deinit();
this.ssl_config = null;
tls.handleConnectError(errno);
tls.onClose(socket, errno, null);
}
},
}
};
this.ssl_config.?.deinit();
this.ssl_config = null;
}
this.upgrade.startTLS(&config, true) catch |err| {
switch (err) {
error.OutOfMemory => {
bun.outOfMemory();
},
else => {
const errno = @intFromEnum(bun.C.SystemErrno.ECONNREFUSED);
tls.handleConnectError(errno);
},
}
};
},
.Close => {
defer {
if (this.tls) |tls| {
tls.deref();
}
}
this.upgrade.close();
},
}
@@ -3640,15 +3649,27 @@ pub const DuplexUpgradeContext = struct {
fn deinitInNextTick(this: *DuplexUpgradeContext) void {
this.task_event = .Close;
if (this.tls) |tls| {
tls.ref();
}
this.vm.enqueueTask(JSC.Task.init(&this.task));
}
fn startTLS(this: *DuplexUpgradeContext) void {
this.task_event = .StartTLS;
if (this.tls) |tls| {
tls.ref();
}
this.vm.enqueueTask(JSC.Task.init(&this.task));
}
fn deinit(this: *DuplexUpgradeContext) void {
if (this.ssl_config) |*config| {
config.deinit();
this.ssl_config = null;
}
if (this.tls) |tls| {
this.tls = null;
tls.deref();
@@ -3699,7 +3720,7 @@ pub const WindowsNamedPipeListeningContext = if (Environment.isWindows) struct {
this.uvPipe.close(onPipeClosed);
}
pub fn listen(globalThis: *JSC.JSGlobalObject, path: []const u8, backlog: i32, ssl_config: ?JSC.API.ServerConfig.SSLConfig, listener: *Listener) !*WindowsNamedPipeListeningContext {
pub fn listen(globalThis: *JSC.JSGlobalObject, path: []const u8, backlog: i32, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, listener: *Listener) !*WindowsNamedPipeListeningContext {
const this = WindowsNamedPipeListeningContext.new(.{
.globalThis = globalThis,
.vm = globalThis.bunVM(),
@@ -3755,7 +3776,7 @@ pub const WindowsNamedPipeListeningContext = if (Environment.isWindows) struct {
fn deinitInNextTick(this: *WindowsNamedPipeListeningContext) void {
bun.assert(this.task_event != .deinit);
this.task_event = .deinit;
this.vm.enqueueTask(JSC.Task.init(&this.task));
this.vm.enqueueImmediateTask(JSC.Task.init(&this.task));
}
fn deinit(this: *WindowsNamedPipeListeningContext) void {
@@ -3935,7 +3956,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
fn deinitInNextTick(this: *WindowsNamedPipeContext) void {
bun.assert(this.task_event != .deinit);
this.task_event = .deinit;
this.vm.enqueueTask(JSC.Task.init(&this.task));
this.vm.enqueueImmediateTask(JSC.Task.init(&this.task));
}
fn create(globalThis: *JSC.JSGlobalObject, socket: SocketType) *WindowsNamedPipeContext {
@@ -3975,7 +3996,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
return this;
}
pub fn open(globalThis: *JSC.JSGlobalObject, fd: bun.FileDescriptor, ssl_config: ?JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
pub fn open(globalThis: *JSC.JSGlobalObject, fd: bun.FileDescriptor, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
// TODO: reuse the same context for multiple connections when possibles
const this = WindowsNamedPipeContext.create(globalThis, socket);
@@ -3996,7 +4017,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
return &this.named_pipe;
}
pub fn connect(globalThis: *JSC.JSGlobalObject, path: []const u8, ssl_config: ?JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
pub fn connect(globalThis: *JSC.JSGlobalObject, path: []const u8, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
// TODO: reuse the same context for multiple connections when possibles
const this = WindowsNamedPipeContext.create(globalThis, socket);

View File

@@ -93,7 +93,7 @@ pub fn SSLWrapper(comptime T: type) type {
};
}
pub fn init(ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool, handlers: Handlers) !This {
pub fn init(ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool, handlers: Handlers) !This {
BoringSSL.load();
const ctx_opts: uws.us_bun_socket_context_options_t = JSC.API.ServerConfig.SSLConfig.asUSockets(ssl_options);

View File

@@ -584,7 +584,7 @@ pub const ServerConfig = struct {
const log = Output.scoped(.SSLConfig, false);
pub fn asUSockets(this: SSLConfig) uws.us_bun_socket_context_options_t {
pub fn asUSockets(this: *const SSLConfig) uws.us_bun_socket_context_options_t {
var ctx_opts: uws.us_bun_socket_context_options_t = .{};
if (this.key_file_name != null)
@@ -1953,7 +1953,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
fn drainMicrotasks(this: *const RequestContext) void {
if (this.isAsync()) return;
if (this.server) |server| server.vm.drainMicrotasks();
if (this.server) |server| {
if (Environment.isDebug) server.vm.eventLoop().debug.enter();
defer if (Environment.isDebug) server.vm.eventLoop().debug.exit();
server.vm.drainMicrotasks();
}
}
pub fn setAbortHandler(this: *RequestContext) void {
@@ -2400,7 +2404,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
var any_js_calls = false;
var vm = this.server.?.vm;
const globalThis = this.server.?.globalThis;
if (comptime Environment.isDebug) vm.eventLoop().debug.enter();
defer {
defer if (comptime Environment.isDebug) vm.eventLoop().debug.exit();
// This is a task in the event loop.
// If we called into JavaScript, we must drain the microtask queue
if (any_js_calls) {
@@ -2424,9 +2430,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.detachResponse();
var any_js_calls = false;
var vm = this.server.?.vm;
var vm: *JSC.VirtualMachine = this.server.?.vm;
const globalThis = this.server.?.globalThis;
if (comptime Environment.isDebug) vm.eventLoop().debug.enter();
defer {
defer if (comptime Environment.isDebug) vm.eventLoop().debug.exit();
// This is a task in the event loop.
// If we called into JavaScript, we must drain the microtask queue
if (any_js_calls) {
@@ -2993,6 +3001,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
streamLog("returned a promise", .{});
if (comptime Environment.isDebug) JSC.VirtualMachine.get().eventLoop().debug.enter();
defer if (comptime Environment.isDebug) JSC.VirtualMachine.get().eventLoop().debug.exit();
this.drainMicrotasks();
switch (promise.status(globalThis.vm())) {
@@ -6547,7 +6557,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
},
.tracker = JSC.AsyncTaskTracker.init(vm),
});
event_loop.enqueueTask(JSC.Task.init(task));
event_loop.enqueueImmediateTask(JSC.Task.init(task));
}
if (this.pending_requests == 0 and this.listener == null and this.flags.has_js_deinited and !this.hasActiveWebSockets()) {
if (this.config.websocket) |*ws| {
@@ -6602,7 +6612,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
const task = bun.default_allocator.create(JSC.AnyTask) catch unreachable;
task.* = JSC.AnyTask.New(ThisServer, deinit).init(this);
this.vm.enqueueTask(JSC.Task.init(task));
this.vm.enqueueImmediateTask(JSC.Task.init(task));
}
pub fn deinit(this: *ThisServer) void {
@@ -7007,6 +7017,14 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
return;
}
if (ctx.resp != null) {
if (resp.isClosed()) {
ctx.resp = null;
ctx.deref();
return;
}
}
if (ctx.shouldRenderMissing()) {
ctx.renderMissing();
return;

View File

@@ -4252,6 +4252,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__onResolveEntryPointResult;
} else if (handler == Bun__onRejectEntryPointResult) {
return GlobalObject::PromiseFunctions::Bun__onRejectEntryPointResult;
} else if (handler == Bun__Expect__onReject) {
return GlobalObject::PromiseFunctions::Bun__Expect__onReject;
} else if (handler == Bun__Expect__onResolve) {
return GlobalObject::PromiseFunctions::Bun__Expect__onResolve;
} else {
RELEASE_ASSERT_NOT_REACHED();
}

View File

@@ -354,8 +354,10 @@ public:
Bun__BodyValueBufferer__onResolveStream,
Bun__onResolveEntryPointResult,
Bun__onRejectEntryPointResult,
Bun__Expect__onReject,
Bun__Expect__onResolve,
};
static constexpr size_t promiseFunctionsSize = 24;
static constexpr size_t promiseFunctionsSize = 26;
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));

View File

@@ -2826,6 +2826,9 @@ pub const AnyPromise = union(enum) {
inline else => |promise| promise.isHandled(vm),
};
}
pub fn then(this: AnyPromise, globalThis: *JSGlobalObject, ctx: JSValue, onFulfilled: JSNativeFn, onRejected: JSNativeFn) void {
this.asValue(globalThis)._then(globalThis, ctx, onFulfilled, onRejected);
}
pub fn setHandled(this: AnyPromise, vm: *VM) void {
switch (this) {
inline else => |promise| promise.setHandled(vm),

View File

@@ -852,5 +852,7 @@ CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *arg
BUN_DECLARE_HOST_FUNCTION(Bun__onResolveEntryPointResult);
BUN_DECLARE_HOST_FUNCTION(Bun__onRejectEntryPointResult);
BUN_DECLARE_HOST_FUNCTION(Bun__Expect__onReject);
BUN_DECLARE_HOST_FUNCTION(Bun__Expect__onResolve);
#endif

View File

@@ -375,10 +375,9 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
{
auto& vm = JSC::getVM(lexicalGlobalObject);
auto throwScope = DECLARE_THROW_SCOPE(vm);
UNUSED_PARAM(throwScope);
UNUSED_PARAM(callFrame);
EnsureStillAliveScope argument0 = callFrame->argument(0);
JSC::JSString* input = argument0.value().toStringOrNull(lexicalGlobalObject);
JSC::JSString* input = argument0.value().toString(lexicalGlobalObject);
RETURN_IF_EXCEPTION(throwScope, {});
JSC::EncodedJSValue res;
String str;
if (input->is8Bit()) {
@@ -397,11 +396,6 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
res = TextEncoder__encode16(lexicalGlobalObject, str.span16().data(), str.length());
}
if (UNLIKELY(JSC::JSValue::decode(res).isObject() && JSC::JSValue::decode(res).getObject()->isErrorInstance())) {
throwScope.throwException(lexicalGlobalObject, JSC::JSValue::decode(res));
return {};
}
RELEASE_AND_RETURN(throwScope, res);
}

View File

@@ -821,7 +821,10 @@ pub const EventLoop = struct {
defer this.debug.exit();
if (count == 1) {
this.drainMicrotasksWithGlobal(this.global, this.virtual_machine.jsc);
const vm = this.virtual_machine;
const global = this.global;
const jsc = vm.jsc;
this.drainTasksWithoutRejection(vm, global, jsc);
}
this.entered_event_loop_count -= 1;
@@ -1479,6 +1482,32 @@ pub const EventLoop = struct {
this.virtual_machine.gc_controller.processGCTimer();
}
pub fn drainTasksWithoutRejection(this: *EventLoop, ctx: *JSC.VirtualMachine, global: *JSC.JSGlobalObject, js_vm: *JSC.VM) void {
while (true) {
while (this.tickWithCount(ctx) > 0) {
this.tickConcurrent();
} else {
this.drainMicrotasksWithGlobal(global, js_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
}
pub fn drainTasks(this: *EventLoop, ctx: *JSC.VirtualMachine, global: *JSC.JSGlobalObject, js_vm: *JSC.VM) void {
while (true) {
while (this.tickWithCount(ctx) > 0) : (global.handleRejectedPromises()) {
this.tickConcurrent();
} else {
this.drainMicrotasksWithGlobal(global, js_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
}
pub fn tick(this: *EventLoop) void {
JSC.markBinding(@src());
{
@@ -1496,16 +1525,7 @@ pub const EventLoop = struct {
const global = ctx.global;
const global_vm = ctx.jsc;
while (true) {
while (this.tickWithCount(ctx) > 0) : (this.global.handleRejectedPromises()) {
this.tickConcurrent();
} else {
this.drainMicrotasksWithGlobal(global, global_vm);
this.tickConcurrent();
if (this.tasks.count > 0) continue;
}
break;
}
this.drainTasks(ctx, global, global_vm);
while (this.tickWithCount(ctx) > 0) {
this.tickConcurrent();

View File

@@ -1083,7 +1083,7 @@ pub const VirtualMachine = struct {
onUnhandledRejection: *const OnUnhandledRejection = undefined,
count: usize = 0,
pub fn apply(this: *UnhandledRejectionScope, vm: *JSC.VirtualMachine) void {
pub fn apply(this: *const UnhandledRejectionScope, vm: *JSC.VirtualMachine) void {
vm.onUnhandledRejection = this.onUnhandledRejection;
vm.onUnhandledRejectionCtx = this.ctx;
vm.unhandled_error_counter = this.count;
@@ -1570,10 +1570,6 @@ pub const VirtualMachine = struct {
this.eventLoop().waitForPromise(promise);
}
pub fn waitForTasks(this: *VirtualMachine) void {
this.eventLoop().waitForTasks();
}
pub const MacroMap = std.AutoArrayHashMap(i32, js.JSObjectRef);
pub fn enableMacroMode(this: *VirtualMachine) void {
@@ -2562,7 +2558,7 @@ pub const VirtualMachine = struct {
pub const main_file_name: string = "bun:main";
pub fn drainMicrotasks(this: *VirtualMachine) void {
this.eventLoop().drainMicrotasks();
this.eventLoop().drainTasksWithoutRejection(this, this.global, this.jsc);
}
pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {

View File

@@ -62,6 +62,11 @@ pub const Expect = struct {
flags: Flags = .{},
parent: ParentScope = .{ .global = {} },
custom_label: bun.String = bun.String.empty,
promise_status: enum {
none,
fulfilled,
rejected,
} = .none,
pub const TestScope = struct {
test_id: TestRunner.Test.ID,
@@ -204,22 +209,63 @@ pub const Expect = struct {
const matcher_params = switch (Output.enable_ansi_colors) {
inline else => |colors| comptime Output.prettyFmt(matcher_params_fmt, colors),
};
return processPromise(this.custom_label, this.flags, globalThis, value, matcher_name, matcher_params, false);
return processPromise(this.custom_label, thisValue, this.flags, globalThis, value, matcher_name, matcher_params, false);
}
export fn Bun__Expect__onReject(globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(JSC.conv) JSValue {
const arguments = callframe.argumentsUndef(2).all();
const value = arguments[1];
const this: *Expect = Expect.fromJS(value) orelse {
return .undefined;
};
this.promise_status = .rejected;
Expect.capturedValueSetCached(value, globalThis, arguments[0]);
return arguments[0];
}
export fn Bun__Expect__onResolve(globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(JSC.conv) JSValue {
const arguments = callframe.argumentsUndef(2).all();
const value = arguments[1];
const this: *Expect = Expect.fromJS(value) orelse {
return .undefined;
};
this.promise_status = .fulfilled;
Expect.capturedValueSetCached(value, globalThis, arguments[0]);
return arguments[0];
}
/// Processes the async flags (resolves/rejects), waiting for the async value if needed.
/// If no flags, returns the original value
/// If either flag is set, waits for the result, and returns either it as a JSValue, or null if the expectation failed (in which case if silent is false, also throws a js exception)
pub fn processPromise(custom_label: bun.String, flags: Expect.Flags, globalThis: *JSGlobalObject, value: JSValue, matcher_name: anytype, matcher_params: anytype, comptime silent: bool) ?JSValue {
pub fn processPromise(custom_label: bun.String, thisValue: JSValue, flags: Expect.Flags, globalThis: *JSGlobalObject, value: JSValue, matcher_name: anytype, matcher_params: anytype, comptime silent: bool) ?JSValue {
switch (flags.promise) {
inline .resolves, .rejects => |resolution| {
if (value.asAnyPromise()) |promise| {
const vm = globalThis.vm();
promise.setHandled(vm);
globalThis.bunVM().waitForPromise(promise);
const jsc_vm = globalThis.bunVM();
var strong = JSC.Strong{};
defer strong.deinit();
if (promise.status(vm) == .pending) {
strong = JSC.Strong.create(promise.asValue(globalThis), globalThis);
promise.then(globalThis, thisValue, &Bun__Expect__onResolve, &Bun__Expect__onReject);
const prev_rejection_scope = jsc_vm.unhandledRejectionScope();
defer prev_rejection_scope.apply(jsc_vm);
var new_rejection_scope = prev_rejection_scope;
new_rejection_scope.onUnhandledRejection = JSC.VirtualMachine.onQuietUnhandledRejectionHandler;
new_rejection_scope.apply(jsc_vm);
jsc_vm.waitForPromise(promise);
}
const newValue = promise.result(vm);
switch (promise.status(vm)) {
.fulfilled => switch (resolution) {
.resolves => {},
@@ -248,8 +294,7 @@ pub const Expect = struct {
.pending => unreachable,
}
newValue.ensureStillAlive();
return newValue;
return promise.result(vm);
} else {
if (!silent) {
var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis, .quote_strings = true };
@@ -315,7 +360,7 @@ pub const Expect = struct {
outFlags.* = flags.encode();
// (note that matcher_name/matcher_args are not used because silent=true)
if (processPromise(bun.String.empty, flags, globalThis, value.*, "", "", true)) |result| {
if (processPromise(bun.String.empty, .undefined, flags, globalThis, value.*, "", "", true)) |result| {
value.* = result;
return true;
}
@@ -2330,9 +2375,6 @@ pub const Expect = struct {
var vm = globalThis.bunVM();
var return_value: JSValue = .zero;
// Drain existing unhandled rejections
vm.global.handleRejectedPromises();
var scope = vm.unhandledRejectionScope();
const prev_unhandled_pending_rejection_to_capture = vm.unhandled_pending_rejection_to_capture;
vm.unhandled_pending_rejection_to_capture = &return_value;
@@ -4673,9 +4715,13 @@ pub const Expect = struct {
// support for async matcher results
if (result.asAnyPromise()) |promise| {
const vm = globalThis.vm();
var strong = JSC.Strong{};
defer strong.deinit();
promise.setHandled(vm);
globalThis.bunVM().waitForPromise(promise);
if (promise.status(vm) == .pending) {
strong = JSC.Strong.create(promise.asValue(globalThis), globalThis);
globalThis.bunVM().waitForPromise(promise);
}
result = promise.result(vm);
result.ensureStillAlive();
@@ -4805,7 +4851,7 @@ pub const Expect = struct {
globalThis.throw("Internal consistency error: failed to retrieve the captured value", .{});
return .zero;
};
value = Expect.processPromise(expect.custom_label, expect.flags, globalThis, value, matcher_name, matcher_params, false) orelse return .zero;
value = Expect.processPromise(expect.custom_label, thisValue, expect.flags, globalThis, value, matcher_name, matcher_params, false) orelse return .zero;
value.ensureStillAlive();
incrementExpectCallCounter();

View File

@@ -59,13 +59,14 @@ pub const TextEncoder = struct {
const uint8array = JSC.JSValue.createUninitializedUint8Array(globalThis, result.written);
bun.assert(result.written <= buf.len);
bun.assert(result.read == slice.len);
const array_buffer = uint8array.asArrayBuffer(globalThis).?;
const array_buffer = uint8array.asArrayBuffer(globalThis) orelse return .zero;
bun.assert(result.written == array_buffer.len);
@memcpy(array_buffer.byteSlice()[0..result.written], buf[0..result.written]);
return uint8array;
} else {
const bytes = strings.allocateLatin1IntoUTF8(globalThis.bunVM().allocator, []const u8, slice) catch {
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
globalThis.throwOutOfMemory();
return .zero;
};
bun.assert(bytes.len >= slice.len);
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
@@ -112,7 +113,8 @@ pub const TextEncoder = struct {
@TypeOf(slice),
slice,
) catch {
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
globalThis.throwOutOfMemory();
return .zero;
};
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
}

View File

@@ -119,6 +119,18 @@ extern "C"
}
}
extern "C" void uws_res_clear_corked_socket(us_loop_t *loop) {
uWS::LoopData *loopData = uWS::Loop::data(loop);
void *corkedSocket = loopData->getCorkedSocket();
if (corkedSocket) {
if (loopData->isCorkedSSL()) {
((uWS::AsyncSocket<true> *) corkedSocket)->uncork();
} else {
((uWS::AsyncSocket<false> *) corkedSocket)->uncork();
}
}
}
void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data)
{
if (ssl)
@@ -1609,6 +1621,10 @@ size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header,
}
}
int uws_res_is_closed(int ssl, uws_res_r res) {
return us_socket_is_closed(ssl, (us_socket_t *)res);
}
__attribute__((callback (corker, ctx)))
void uws_res_cork(int ssl, uws_res_r res, void *ctx,
void (*corker)(void *ctx)) nonnull_fn_decl;

View File

@@ -414,7 +414,7 @@ pub const UpgradedDuplex = struct {
return array;
}
pub fn startTLS(this: *UpgradedDuplex, ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
pub fn startTLS(this: *UpgradedDuplex, ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
this.wrapper = try WrapperType.init(ssl_options, is_client, .{
.ctx = this,
.onOpen = UpgradedDuplex.onOpen,
@@ -525,25 +525,26 @@ pub const UpgradedDuplex = struct {
}
this.origin.deinit();
if (this.onDataCallback.get()) |callback| {
if (this.onDataCallback.trySwap()) |callback| {
JSC.setFunctionData(callback, null);
this.onDataCallback.deinit();
}
if (this.onEndCallback.get()) |callback| {
if (this.onEndCallback.trySwap()) |callback| {
JSC.setFunctionData(callback, null);
this.onEndCallback.deinit();
}
if (this.onWritableCallback.get()) |callback| {
if (this.onWritableCallback.trySwap()) |callback| {
JSC.setFunctionData(callback, null);
this.onWritableCallback.deinit();
}
if (this.onCloseCallback.get()) |callback| {
if (this.onCloseCallback.trySwap()) |callback| {
JSC.setFunctionData(callback, null);
this.onCloseCallback.deinit();
}
var ssl_error = this.ssl_error;
ssl_error.deinit();
this.ssl_error = .{};
this.onDataCallback.deinit();
this.onEndCallback.deinit();
this.onWritableCallback.deinit();
this.onCloseCallback.deinit();
}
};
@@ -856,7 +857,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
}
return .{ .result = {} };
}
pub fn open(this: *WindowsNamedPipe, fd: bun.FileDescriptor, ssl_options: ?JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
pub fn open(this: *WindowsNamedPipe, fd: bun.FileDescriptor, ssl_options: ?*const JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
bun.assert(this.pipe != null);
this.flags.disconnected = true;
@@ -892,7 +893,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
return .{ .result = {} };
}
pub fn connect(this: *WindowsNamedPipe, path: []const u8, ssl_options: ?JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
pub fn connect(this: *WindowsNamedPipe, path: []const u8, ssl_options: ?*const JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
bun.assert(this.pipe != null);
this.flags.disconnected = true;
// ref because we are connecting
@@ -924,7 +925,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
this.connect_req.data = this;
return this.pipe.?.connect(&this.connect_req, path, this, onConnect);
}
pub fn startTLS(this: *WindowsNamedPipe, ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
pub fn startTLS(this: *WindowsNamedPipe, ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
this.flags.is_ssl = true;
if (this.start(is_client)) {
this.wrapper = try WrapperType.init(ssl_options, is_client, .{
@@ -2372,6 +2373,10 @@ pub const PosixLoop = extern struct {
const log = bun.Output.scoped(.Loop, false);
pub fn uncork(this: *PosixLoop) void {
uws_res_clear_corked_socket(this);
}
pub fn iterationNumber(this: *const PosixLoop) u64 {
return this.internal_loop_data.iteration_nr;
}
@@ -3483,6 +3488,10 @@ pub fn NewApp(comptime ssl: bool) type {
uws_res_end(ssl_flag, res.downcast(), data.ptr, data.len, close_connection);
}
pub fn isClosed(res: *Response) bool {
return uws_res_is_closed(ssl_flag, res.downcast()) != 0;
}
pub fn tryEnd(res: *Response, data: []const u8, total: usize, close_: bool) bool {
return uws_res_try_end(ssl_flag, res.downcast(), data.ptr, data.len, total, close_);
}
@@ -4105,6 +4114,10 @@ pub const WindowsLoop = extern struct {
pre: *uv.uv_prepare_t,
check: *uv.uv_check_t,
pub fn uncork(this: *PosixLoop) void {
uws_res_clear_corked_socket(this);
}
pub fn get() *WindowsLoop {
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
}
@@ -4420,3 +4433,5 @@ pub fn onThreadExit() void {
}
extern fn uws_app_clear_routes(ssl_flag: c_int, app: *uws_app_t) void;
extern fn uws_res_clear_corked_socket(loop: *Loop) void;
extern fn uws_res_is_closed(ssl_flag: c_int, res: *anyopaque) i32;

View File

@@ -414,14 +414,14 @@ const ProxyTunnel = struct {
}
}
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig) void {
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: *const JSC.API.ServerConfig.SSLConfig) void {
const proxy_tunnel = ProxyTunnel.new(.{});
var custom_options = ssl_options;
var custom_options = ssl_options.*;
// we always request the cert so we can verify it and also we manually abort the connection if the hostname doesn't match
custom_options.reject_unauthorized = 0;
custom_options.request_cert = 1;
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(custom_options, true, .{
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(&custom_options, true, .{
.onOpen = ProxyTunnel.onOpen,
.onData = ProxyTunnel.onData,
.onHandshake = ProxyTunnel.onHandshake,
@@ -1007,7 +1007,7 @@ pub const HTTPThread = struct {
queued_tasks: Queue = Queue{},
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_shutdowns: ShutdownMessage.List = ShutdownMessage.List{},
queued_shutdowns_lock: bun.Lock = .{},
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
@@ -1022,6 +1022,8 @@ pub const HTTPThread = struct {
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
pub const List = std.ArrayListUnmanaged(ShutdownMessage);
};
pub const LibdeflateState = struct {
@@ -1188,10 +1190,19 @@ pub const HTTPThread = struct {
}
fn drainEvents(this: *@This()) void {
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
for (this.queued_shutdowns.items) |http| {
while (true) {
// We need to make sure we don't hold the lock while we call close because
// close calls the socket close callback, which reads the list of queued shutdowns
// Meaning we can end up in a deadlock if we don't swap the list and release the lock
var queued_shutdowns = ShutdownMessage.List{};
{
this.queued_shutdowns_lock.lock();
defer this.queued_shutdowns_lock.unlock();
queued_shutdowns = this.queued_shutdowns;
this.queued_shutdowns = .{};
}
for (queued_shutdowns.items) |http| {
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
if (http.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
@@ -1203,7 +1214,13 @@ pub const HTTPThread = struct {
}
}
}
this.queued_shutdowns.clearRetainingCapacity();
// If we had any queued shutdowns, let's loop back around and try again
if (queued_shutdowns.items.len > 0) {
queued_shutdowns.deinit(bun.default_allocator);
continue;
}
break;
}
while (this.queued_proxy_deref.popOrNull()) |http| {
@@ -3198,7 +3215,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
// if we have options we pass them (ca, reject_unauthorized, etc) otherwise use the default
const ssl_options = if (this.tls_props != null) this.tls_props.?.* else JSC.API.ServerConfig.SSLConfig.zero;
const ssl_options = if (this.tls_props != null) this.tls_props.? else &JSC.API.ServerConfig.SSLConfig.zero;
ProxyTunnel.start(this, is_ssl, socket, ssl_options);
}

View File

@@ -2920,13 +2920,18 @@ pub const PostgresSQLConnection = struct {
pub fn onDrain(this: *PostgresSQLConnection) void {
var vm = this.globalObject.bunVM();
defer vm.drainMicrotasks();
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
this.flushData();
}
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
var vm = this.globalObject.bunVM();
defer vm.drainMicrotasks();
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
if (this.read_buffer.remaining().len == 0) {
var consumed: usize = 0;
var offset: usize = 0;

View File

@@ -346,7 +346,7 @@ describe("bun test", () => {
await expect(sleep(1)).resolves.toBeUndefined();
});
test("timeout", async () => {
await expect(sleep(64)).resolves.toBeUndefined();
await expect(await sleep(64)).resolves.toBeUndefined();
});
`,
});

View File

@@ -149,7 +149,7 @@ describe("Server", () => {
port: 0,
});
const response = await fetch(`http://${server.hostname}:${server.port}`);
const response = await fetch(server.url);
expect(await response.text()).toBe("Hello");
});
@@ -199,7 +199,7 @@ describe("Server", () => {
});
try {
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
await fetch(server.url, { signal: abortController.signal });
} catch (err: any) {
expect(err).toBeDefined();
expect(err?.name).toBe("AbortError");
@@ -229,7 +229,7 @@ describe("Server", () => {
});
try {
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
await fetch(server.url, { signal: abortController.signal });
} catch {
fetchAborted = true;
}
@@ -255,6 +255,7 @@ describe("Server", () => {
type: "direct",
async pull(controller) {
abortController.abort();
await Bun.sleep(0);
const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
controller.write(buffer);
@@ -278,7 +279,7 @@ describe("Server", () => {
});
try {
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
await fetch(server.url, { signal: abortController.signal }).then(r => r.text());
} catch {}
await Bun.sleep(10);
expect(signalOnServer).toBe(true);
@@ -331,6 +332,7 @@ describe("Server", () => {
new ReadableStream({
async pull(controller) {
abortController.abort();
await Bun.sleep(0);
const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
controller.enqueue(buffer);
@@ -353,8 +355,11 @@ describe("Server", () => {
});
try {
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
} catch {}
await fetch(server.url, { signal: abortController.signal }).then(r => r.text());
expect.unreachable();
} catch (err: any) {
expect(err.message).toContain("The operation was aborted");
}
await Bun.sleep(10);
expect(signalOnServer).toBe(true);
}
@@ -551,7 +556,6 @@ test("should be able to await server.stop()", async () => {
});
test("should be able to await server.stop(true) with keep alive", async () => {
const { promise, resolve } = Promise.withResolvers();
const ready = Promise.withResolvers();
const received = Promise.withResolvers();
using server = Bun.serve({
@@ -565,8 +569,11 @@ test("should be able to await server.stop(true) with keep alive", async () => {
},
});
// Start the request
const responsePromise = fetch(server.url);
var didRequestFail = false;
// Send a request
const responsePromise = fetch(server.url)
.then(r => r.text())
.catch(() => (didRequestFail = true));
// Wait for the server to receive it.
await received.promise;
// Stop listening for new connections
@@ -574,13 +581,16 @@ test("should be able to await server.stop(true) with keep alive", async () => {
// Continue the request
ready.resolve();
if (didRequestFail) {
throw new Error("Request failed too early.");
}
// Wait for the server to stop
await stopped;
// It should fail before the server responds
expect(async () => {
await (await responsePromise).text();
}).toThrow();
await responsePromise;
expect(didRequestFail).toBe(true);
// Ensure the server is completely stopped
expect(async () => await fetch(server.url)).toThrow();

View File

@@ -499,27 +499,22 @@ describe("streaming", () => {
});
it("throw on pull after writing should not call the error handler", async () => {
let subprocess;
afterAll(() => {
subprocess?.kill();
});
const onMessage = mock(async href => {
var onMessage = mock(async href => {
const url = new URL("write", href);
const response = await fetch(url);
expect(response.status).toBe(402);
expect(response.headers.get("X-Hey")).toBe("123");
expect(response.text()).resolves.toBe("");
expect(await response.text()).toBe("");
subprocess.kill();
});
subprocess = Bun.spawn({
await using subprocess = Bun.spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "readable-stream-throws.fixture.js"],
env: bunEnv,
stdout: "ignore",
stderr: "pipe",
stdin: "inherit",
ipc: onMessage,
});
@@ -1956,7 +1951,7 @@ it("we should always send date", async () => {
it("should allow use of custom timeout", async () => {
using server = Bun.serve({
port: 0,
idleTimeout: 8, // uws precision is in seconds, and lower than 4 seconds is not reliable its timer is not that accurate
idleTimeout: 7, // uws precision is in seconds, and lower than 4 seconds is not reliable its timer is not that accurate
async fetch(req) {
const url = new URL(req.url);
return new Response(
@@ -1978,12 +1973,21 @@ it("should allow use of custom timeout", async () => {
},
});
async function testTimeout(pathname: string, success: boolean) {
const res = await fetch(new URL(pathname, server.url.origin));
expect(res.status).toBe(200);
const promise = fetch(new URL(pathname, server.url.origin)).then(r => {
expect(r.status).toBe(200);
return r.text();
});
if (success) {
expect(res.text()).resolves.toBe("Hello, World!");
return expect(await promise).toBe("Hello, World!");
} else {
expect(res.text()).rejects.toThrow(/The socket connection was closed unexpectedly./);
return promise
.then(() => {
throw new Error("Expected timeout");
})
.catch(err => {
expect(err.message).toContain("The socket connection was closed unexpectedly.");
});
}
}
await Promise.all([testTimeout("/ok", true), testTimeout("/timeout", false)]);
@@ -2071,5 +2075,5 @@ it("allow custom timeout per request", async () => {
expect(server.timeout).toBeFunction();
const res = await fetch(new URL("/long-timeout", server.url.origin));
expect(res.status).toBe(200);
expect(res.text()).resolves.toBe("Hello, World!");
expect(await res.text()).toBe("Hello, World!");
}, 20_000);

View File

@@ -15,6 +15,8 @@ $.cwd(process.cwd());
$.nothrow();
const DEFAULT_THRESHOLD = process.platform === "darwin" ? 100 * (1 << 20) : 150 * (1 << 20);
const aaa128kb = Buffer.alloc(128 * 1024, "a").toString();
const aaa2kb = Buffer.alloc(2 * 1024, "a").toString();
const TESTS: [name: string, builder: () => TestBuilder, runs?: number][] = [
["redirect_file", () => TestBuilder.command`echo hello > test.txt`.fileEquals("test.txt", "hello\n")],
@@ -143,37 +145,17 @@ describe("fd leak", () => {
memLeakTest("Buffer", () => TestBuilder.command`cat ${import.meta.filename} > ${Buffer.alloc(1 << 20)}`, 100);
memLeakTest(
"Blob_something",
() =>
TestBuilder.command`cat < ${new Blob([
Array(128 * 1024)
.fill("a")
.join(""),
])}`.stdout(str =>
expect(str).toEqual(
Array(128 * 1024)
.fill("a")
.join(""),
),
),
100,
);
memLeakTest(
"Blob_nothing",
() =>
TestBuilder.command`echo hi < ${new Blob([
Array(128 * 1024)
.fill("a")
.join(""),
])}`.stdout("hi\n"),
() => TestBuilder.command`cat < ${new Blob([aaa128kb])}`.stdout(str => expect(str).toEqual(aaa128kb)),
100,
);
memLeakTest("Blob_nothing", () => TestBuilder.command`echo hi < ${new Blob([aaa128kb])}`.stdout("hi\n"), 100);
memLeakTest("String", () => TestBuilder.command`echo ${Array(4096).fill("a").join("")}`.stdout(() => {}), 100);
describe("#11816", async () => {
function doit(builtin: boolean) {
test(builtin ? "builtin" : "external", async () => {
const files = tempDirWithFiles("hi", {
"input.txt": Array(2048).fill("a").join(""),
"input.txt": aaa2kb,
});
for (let j = 0; j < 10; j++) {
const promises = [];
@@ -206,7 +188,7 @@ describe("fd leak", () => {
function doit(builtin: boolean) {
test(builtin ? "builtin" : "external", async () => {
const files = tempDirWithFiles("hi", {
"input.txt": Array(2048).fill("a").join(""),
"input.txt": aaa2kb,
});
// wrapping in a function
// because of an optimization

View File

@@ -1904,27 +1904,29 @@ it("should emit events in the right order", async () => {
it("destroy should end download", async () => {
// just simulate some file that will take forever to download
const payload = Buffer.alloc(128 * 1024, "X");
let byteLengths = {};
using server = Bun.serve({
port: 0,
async fetch(req) {
let running = true;
req.signal.onabort = () => (running = false);
const index = new URL(req.url).searchParams.get("index");
byteLengths[index] = 0;
return new Response(async function* () {
while (running) {
byteLengths[index] += payload.byteLength;
yield payload;
await Bun.sleep(10);
}
});
},
});
let index = 0;
for (let i = 0; i < 5; i++) {
let sendedByteLength = 0;
using server = Bun.serve({
port: 0,
async fetch(req) {
let running = true;
req.signal.onabort = () => (running = false);
return new Response(async function* () {
while (running) {
sendedByteLength += payload.byteLength;
yield payload;
await Bun.sleep(10);
}
});
},
});
async function run() {
async function run(index: number) {
let receivedByteLength = 0;
let { promise, resolve } = Promise.withResolvers();
const req = request(server.url, res => {
const req = request(`${server.url}/?index=${index}`, res => {
res.on("data", data => {
receivedByteLength += data.length;
if (resolve) {
@@ -1939,7 +1941,7 @@ it("destroy should end download", async () => {
await Bun.sleep(10);
const initialByteLength = receivedByteLength;
// we should receive the same amount of data we sent
expect(initialByteLength).toBeLessThanOrEqual(sendedByteLength);
expect(initialByteLength).toBeLessThanOrEqual(byteLengths[index]);
await Bun.sleep(10);
// we should not receive more data after destroy
expect(initialByteLength).toBe(receivedByteLength);
@@ -1947,7 +1949,7 @@ it("destroy should end download", async () => {
}
const runCount = 50;
const runs = Array.from({ length: runCount }, run);
const runs = Array.from({ length: runCount }, () => run(index++));
await Promise.all(runs);
Bun.gc(true);
await Bun.sleep(10);

View File

@@ -27,7 +27,8 @@ test("HTTP server deletes parser after write", async () => {
});
await new Promise(resolve => {
req.end(resolve);
req.once("close", resolve);
req.end();
});
server.close();

View File

@@ -7,13 +7,9 @@ const http = require("http");
let server;
beforeAll(() => {
server = http.createServer(handle);
});
beforeAll(() => {});
afterAll(() => {
server.close();
});
afterAll(() => {});
function handle(req, res) {
res.on("error", jest.fn());
@@ -37,11 +33,18 @@ function handle(req, res) {
});
}
test("http server write end after end", done => {
test("http server write end after end", async () => {
server = http.createServer(handle);
const { promise, resolve, reject } = Promise.withResolvers();
server.listen(0, () => {
http.get(`http://localhost:${server.address().port}`);
done();
http
.get(`http://localhost:${server.address().port}`)
.once("close", () => {
resolve();
})
.once("error", reject);
});
await promise;
});
//<#END_FILE: test-http-server-write-end-after-end.js

View File

@@ -492,7 +492,7 @@ test("fetching with Request object - issue #1527", async () => {
body,
});
expect(fetch(request)).resolves.pass();
expect(async () => await fetch(request)).resolves.pass();
} finally {
server.closeAllConnections();
}