mirror of
https://github.com/oven-sh/bun
synced 2026-02-03 07:28:53 +00:00
Compare commits
22 Commits
fix-node-h
...
jarred/tim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ab4966bfd | ||
|
|
002fd74993 | ||
|
|
b319c5f7fe | ||
|
|
2c7ea40b65 | ||
|
|
ba66bf14da | ||
|
|
07d6e31baa | ||
|
|
33de5926de | ||
|
|
8db87098bb | ||
|
|
4975b1fd2e | ||
|
|
133b0045fc | ||
|
|
bab5fe7588 | ||
|
|
cd5c6693b5 | ||
|
|
022c7cf500 | ||
|
|
0ab7ed84cb | ||
|
|
1a0badd830 | ||
|
|
87f070acac | ||
|
|
5d763fd71e | ||
|
|
6d9e2d437a | ||
|
|
a444dcebb8 | ||
|
|
cd05556689 | ||
|
|
ef4435d987 | ||
|
|
318eb7bcbf |
@@ -348,11 +348,14 @@ private:
|
||||
return (us_socket_t *) asyncSocket;
|
||||
}
|
||||
|
||||
/* It is okay to uncork a closed socket and we need to */
|
||||
((AsyncSocket<SSL> *) s)->uncork();
|
||||
if (returnedSocket != nullptr) {
|
||||
/* It is okay to uncork a closed socket, and we need to */
|
||||
((AsyncSocket<SSL> *) s)->uncork();
|
||||
return s;
|
||||
}
|
||||
|
||||
/* We cannot return nullptr to the underlying stack in any case */
|
||||
return s;
|
||||
// It's okay if this returns nullptr.
|
||||
return static_cast<us_socket_t*>(nullptr);
|
||||
});
|
||||
|
||||
/* Handle HTTP write out (note: SSL_read may trigger this spuriously, the app need to handle spurious calls) */
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "LoopData.h"
|
||||
#include <libusockets.h>
|
||||
#include <iostream>
|
||||
#include "AsyncSocket.h"
|
||||
|
||||
extern "C" int bun_is_exiting();
|
||||
|
||||
@@ -148,6 +149,10 @@ public:
|
||||
getLazyLoop().loop = nullptr;
|
||||
}
|
||||
|
||||
static LoopData* data(struct us_loop_t *loop) {
|
||||
return (LoopData *) us_loop_ext(loop);
|
||||
}
|
||||
|
||||
void addPostHandler(void *key, MoveOnlyFunction<void(Loop *)> &&handler) {
|
||||
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ public:
|
||||
}
|
||||
delete [] corkBuffer;
|
||||
}
|
||||
|
||||
void* getCorkedSocket() {
|
||||
return this->corkedSocket;
|
||||
}
|
||||
|
||||
@@ -55,11 +55,11 @@ pub const KeepAlive = struct {
|
||||
this.status = .inactive;
|
||||
|
||||
if (comptime @TypeOf(event_loop_ctx_) == JSC.EventLoopHandle) {
|
||||
event_loop_ctx_.loop().subActive(1);
|
||||
event_loop_ctx_.loop().unrefCount(1);
|
||||
return;
|
||||
}
|
||||
const event_loop_ctx = JSC.AbstractVM(event_loop_ctx_);
|
||||
event_loop_ctx.platformEventLoop().subActive(1);
|
||||
event_loop_ctx.platformEventLoop().unrefCount(1);
|
||||
}
|
||||
|
||||
/// From another thread, Prevent a poll from keeping the process alive.
|
||||
|
||||
@@ -30,6 +30,7 @@ pub const All = struct {
|
||||
timers: TimerHeap = .{
|
||||
.context = {},
|
||||
},
|
||||
timer_ref: bun.Async.KeepAlive = .{},
|
||||
active_timer_count: i32 = 0,
|
||||
uv_timer: if (Environment.isWindows) uv.Timer else void =
|
||||
if (Environment.isWindows) std.mem.zeroes(uv.Timer) else {},
|
||||
@@ -109,17 +110,17 @@ pub const All = struct {
|
||||
|
||||
this.active_timer_count = new;
|
||||
|
||||
if (old <= 0 and new > 0) {
|
||||
if (comptime Environment.isWindows) {
|
||||
this.uv_timer.ref();
|
||||
if (comptime Environment.isPosix) {
|
||||
if (new > 0) {
|
||||
this.timer_ref.ref(vm);
|
||||
} else {
|
||||
vm.uwsLoop().ref();
|
||||
this.timer_ref.unref(vm);
|
||||
}
|
||||
} else if (old > 0 and new <= 0) {
|
||||
if (comptime Environment.isWindows) {
|
||||
} else if (Environment.isWindows) {
|
||||
if (old <= 0 and new > 0) {
|
||||
this.uv_timer.ref();
|
||||
} else if (old > 0 and new <= 0) {
|
||||
this.uv_timer.unref();
|
||||
} else {
|
||||
vm.uwsLoop().unref();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -628,7 +628,7 @@ pub const Listener = struct {
|
||||
|
||||
this.listener = .{
|
||||
// we need to add support for the backlog parameter on listen here we use the default value of nodejs
|
||||
.namedPipe = WindowsNamedPipeListeningContext.listen(globalObject, pipe_name, 511, ssl, this) catch {
|
||||
.namedPipe = WindowsNamedPipeListeningContext.listen(globalObject, pipe_name, 511, &ssl.?, this) catch {
|
||||
exception.* = JSC.toInvalidArguments("Failed to listen at {s}", .{pipe_name}, globalObject).asObjectRef();
|
||||
this.deinit();
|
||||
return .zero;
|
||||
@@ -644,7 +644,7 @@ pub const Listener = struct {
|
||||
}
|
||||
}
|
||||
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl != null)
|
||||
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl.?)
|
||||
JSC.API.ServerConfig.SSLConfig.asUSockets(&ssl.?)
|
||||
else
|
||||
.{};
|
||||
|
||||
@@ -1057,13 +1057,13 @@ pub const Listener = struct {
|
||||
}
|
||||
const vm = globalObject.bunVM();
|
||||
|
||||
const socket_config = SocketConfig.fromJS(vm, opts, globalObject, exception) orelse {
|
||||
var socket_config: SocketConfig = SocketConfig.fromJS(vm, opts, globalObject, exception) orelse {
|
||||
return .zero;
|
||||
};
|
||||
|
||||
var hostname_or_unix = socket_config.hostname_or_unix;
|
||||
const port = socket_config.port;
|
||||
var ssl = socket_config.ssl;
|
||||
var ssl = if (socket_config.ssl) |*ssl_config| ssl_config else null;
|
||||
var handlers = socket_config.handlers;
|
||||
var default_data = socket_config.default_data;
|
||||
|
||||
@@ -1178,8 +1178,8 @@ pub const Listener = struct {
|
||||
}
|
||||
}
|
||||
|
||||
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl != null)
|
||||
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl.?)
|
||||
const ctx_opts: uws.us_bun_socket_context_options_t = if (ssl) |ssl_config|
|
||||
JSC.API.ServerConfig.SSLConfig.asUSockets(ssl_config)
|
||||
else
|
||||
.{};
|
||||
|
||||
@@ -3581,10 +3581,14 @@ pub const DuplexUpgradeContext = struct {
|
||||
fn onError(this: *DuplexUpgradeContext, err_value: JSC.JSValue) void {
|
||||
if (this.is_open) {
|
||||
if (this.tls) |tls| {
|
||||
this.tls = null;
|
||||
defer tls.deref();
|
||||
tls.handleError(err_value);
|
||||
}
|
||||
} else {
|
||||
if (this.tls) |tls| {
|
||||
this.tls = null;
|
||||
defer tls.deref();
|
||||
tls.handleConnectError(@intFromEnum(bun.C.SystemErrno.ECONNREFUSED));
|
||||
}
|
||||
}
|
||||
@@ -3602,6 +3606,8 @@ pub const DuplexUpgradeContext = struct {
|
||||
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);
|
||||
|
||||
if (this.tls) |tls| {
|
||||
defer tls.deref();
|
||||
this.tls = null;
|
||||
tls.onClose(socket, 0, null);
|
||||
}
|
||||
|
||||
@@ -3611,28 +3617,31 @@ pub const DuplexUpgradeContext = struct {
|
||||
fn runEvent(this: *DuplexUpgradeContext) void {
|
||||
switch (this.task_event) {
|
||||
.StartTLS => {
|
||||
if (this.ssl_config) |config| {
|
||||
this.upgrade.startTLS(config, true) catch |err| {
|
||||
switch (err) {
|
||||
error.OutOfMemory => {
|
||||
bun.outOfMemory();
|
||||
},
|
||||
else => {
|
||||
const errno = @intFromEnum(bun.C.SystemErrno.ECONNREFUSED);
|
||||
if (this.tls) |tls| {
|
||||
const socket = TLSSocket.Socket.fromDuplex(&this.upgrade);
|
||||
const tls = this.tls.?;
|
||||
defer tls.deref();
|
||||
var config = this.ssl_config orelse return;
|
||||
defer config.deinit();
|
||||
this.ssl_config = null;
|
||||
|
||||
tls.handleConnectError(errno);
|
||||
tls.onClose(socket, errno, null);
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
this.ssl_config.?.deinit();
|
||||
this.ssl_config = null;
|
||||
}
|
||||
this.upgrade.startTLS(&config, true) catch |err| {
|
||||
switch (err) {
|
||||
error.OutOfMemory => {
|
||||
bun.outOfMemory();
|
||||
},
|
||||
else => {
|
||||
const errno = @intFromEnum(bun.C.SystemErrno.ECONNREFUSED);
|
||||
tls.handleConnectError(errno);
|
||||
},
|
||||
}
|
||||
};
|
||||
},
|
||||
.Close => {
|
||||
defer {
|
||||
if (this.tls) |tls| {
|
||||
tls.deref();
|
||||
}
|
||||
}
|
||||
|
||||
this.upgrade.close();
|
||||
},
|
||||
}
|
||||
@@ -3640,15 +3649,27 @@ pub const DuplexUpgradeContext = struct {
|
||||
|
||||
fn deinitInNextTick(this: *DuplexUpgradeContext) void {
|
||||
this.task_event = .Close;
|
||||
if (this.tls) |tls| {
|
||||
tls.ref();
|
||||
}
|
||||
this.vm.enqueueTask(JSC.Task.init(&this.task));
|
||||
}
|
||||
|
||||
fn startTLS(this: *DuplexUpgradeContext) void {
|
||||
this.task_event = .StartTLS;
|
||||
if (this.tls) |tls| {
|
||||
tls.ref();
|
||||
}
|
||||
|
||||
this.vm.enqueueTask(JSC.Task.init(&this.task));
|
||||
}
|
||||
|
||||
fn deinit(this: *DuplexUpgradeContext) void {
|
||||
if (this.ssl_config) |*config| {
|
||||
config.deinit();
|
||||
this.ssl_config = null;
|
||||
}
|
||||
|
||||
if (this.tls) |tls| {
|
||||
this.tls = null;
|
||||
tls.deref();
|
||||
@@ -3699,7 +3720,7 @@ pub const WindowsNamedPipeListeningContext = if (Environment.isWindows) struct {
|
||||
this.uvPipe.close(onPipeClosed);
|
||||
}
|
||||
|
||||
pub fn listen(globalThis: *JSC.JSGlobalObject, path: []const u8, backlog: i32, ssl_config: ?JSC.API.ServerConfig.SSLConfig, listener: *Listener) !*WindowsNamedPipeListeningContext {
|
||||
pub fn listen(globalThis: *JSC.JSGlobalObject, path: []const u8, backlog: i32, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, listener: *Listener) !*WindowsNamedPipeListeningContext {
|
||||
const this = WindowsNamedPipeListeningContext.new(.{
|
||||
.globalThis = globalThis,
|
||||
.vm = globalThis.bunVM(),
|
||||
@@ -3755,7 +3776,7 @@ pub const WindowsNamedPipeListeningContext = if (Environment.isWindows) struct {
|
||||
fn deinitInNextTick(this: *WindowsNamedPipeListeningContext) void {
|
||||
bun.assert(this.task_event != .deinit);
|
||||
this.task_event = .deinit;
|
||||
this.vm.enqueueTask(JSC.Task.init(&this.task));
|
||||
this.vm.enqueueImmediateTask(JSC.Task.init(&this.task));
|
||||
}
|
||||
|
||||
fn deinit(this: *WindowsNamedPipeListeningContext) void {
|
||||
@@ -3935,7 +3956,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
|
||||
fn deinitInNextTick(this: *WindowsNamedPipeContext) void {
|
||||
bun.assert(this.task_event != .deinit);
|
||||
this.task_event = .deinit;
|
||||
this.vm.enqueueTask(JSC.Task.init(&this.task));
|
||||
this.vm.enqueueImmediateTask(JSC.Task.init(&this.task));
|
||||
}
|
||||
|
||||
fn create(globalThis: *JSC.JSGlobalObject, socket: SocketType) *WindowsNamedPipeContext {
|
||||
@@ -3975,7 +3996,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
|
||||
return this;
|
||||
}
|
||||
|
||||
pub fn open(globalThis: *JSC.JSGlobalObject, fd: bun.FileDescriptor, ssl_config: ?JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
|
||||
pub fn open(globalThis: *JSC.JSGlobalObject, fd: bun.FileDescriptor, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
|
||||
// TODO: reuse the same context for multiple connections when possibles
|
||||
|
||||
const this = WindowsNamedPipeContext.create(globalThis, socket);
|
||||
@@ -3996,7 +4017,7 @@ pub const WindowsNamedPipeContext = if (Environment.isWindows) struct {
|
||||
return &this.named_pipe;
|
||||
}
|
||||
|
||||
pub fn connect(globalThis: *JSC.JSGlobalObject, path: []const u8, ssl_config: ?JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
|
||||
pub fn connect(globalThis: *JSC.JSGlobalObject, path: []const u8, ssl_config: ?*const JSC.API.ServerConfig.SSLConfig, socket: SocketType) !*uws.WindowsNamedPipe {
|
||||
// TODO: reuse the same context for multiple connections when possibles
|
||||
|
||||
const this = WindowsNamedPipeContext.create(globalThis, socket);
|
||||
|
||||
@@ -93,7 +93,7 @@ pub fn SSLWrapper(comptime T: type) type {
|
||||
};
|
||||
}
|
||||
|
||||
pub fn init(ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool, handlers: Handlers) !This {
|
||||
pub fn init(ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool, handlers: Handlers) !This {
|
||||
BoringSSL.load();
|
||||
|
||||
const ctx_opts: uws.us_bun_socket_context_options_t = JSC.API.ServerConfig.SSLConfig.asUSockets(ssl_options);
|
||||
|
||||
@@ -584,7 +584,7 @@ pub const ServerConfig = struct {
|
||||
|
||||
const log = Output.scoped(.SSLConfig, false);
|
||||
|
||||
pub fn asUSockets(this: SSLConfig) uws.us_bun_socket_context_options_t {
|
||||
pub fn asUSockets(this: *const SSLConfig) uws.us_bun_socket_context_options_t {
|
||||
var ctx_opts: uws.us_bun_socket_context_options_t = .{};
|
||||
|
||||
if (this.key_file_name != null)
|
||||
@@ -1953,7 +1953,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
|
||||
fn drainMicrotasks(this: *const RequestContext) void {
|
||||
if (this.isAsync()) return;
|
||||
if (this.server) |server| server.vm.drainMicrotasks();
|
||||
if (this.server) |server| {
|
||||
if (Environment.isDebug) server.vm.eventLoop().debug.enter();
|
||||
defer if (Environment.isDebug) server.vm.eventLoop().debug.exit();
|
||||
server.vm.drainMicrotasks();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn setAbortHandler(this: *RequestContext) void {
|
||||
@@ -2400,7 +2404,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
var any_js_calls = false;
|
||||
var vm = this.server.?.vm;
|
||||
const globalThis = this.server.?.globalThis;
|
||||
if (comptime Environment.isDebug) vm.eventLoop().debug.enter();
|
||||
defer {
|
||||
defer if (comptime Environment.isDebug) vm.eventLoop().debug.exit();
|
||||
// This is a task in the event loop.
|
||||
// If we called into JavaScript, we must drain the microtask queue
|
||||
if (any_js_calls) {
|
||||
@@ -2424,9 +2430,11 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
|
||||
this.detachResponse();
|
||||
var any_js_calls = false;
|
||||
var vm = this.server.?.vm;
|
||||
var vm: *JSC.VirtualMachine = this.server.?.vm;
|
||||
const globalThis = this.server.?.globalThis;
|
||||
if (comptime Environment.isDebug) vm.eventLoop().debug.enter();
|
||||
defer {
|
||||
defer if (comptime Environment.isDebug) vm.eventLoop().debug.exit();
|
||||
// This is a task in the event loop.
|
||||
// If we called into JavaScript, we must drain the microtask queue
|
||||
if (any_js_calls) {
|
||||
@@ -2993,6 +3001,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
// it returns a Promise when it goes through ReadableStreamDefaultReader
|
||||
if (assignment_result.asAnyPromise()) |promise| {
|
||||
streamLog("returned a promise", .{});
|
||||
if (comptime Environment.isDebug) JSC.VirtualMachine.get().eventLoop().debug.enter();
|
||||
defer if (comptime Environment.isDebug) JSC.VirtualMachine.get().eventLoop().debug.exit();
|
||||
this.drainMicrotasks();
|
||||
|
||||
switch (promise.status(globalThis.vm())) {
|
||||
@@ -6547,7 +6557,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
|
||||
},
|
||||
.tracker = JSC.AsyncTaskTracker.init(vm),
|
||||
});
|
||||
event_loop.enqueueTask(JSC.Task.init(task));
|
||||
event_loop.enqueueImmediateTask(JSC.Task.init(task));
|
||||
}
|
||||
if (this.pending_requests == 0 and this.listener == null and this.flags.has_js_deinited and !this.hasActiveWebSockets()) {
|
||||
if (this.config.websocket) |*ws| {
|
||||
@@ -6602,7 +6612,7 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
|
||||
|
||||
const task = bun.default_allocator.create(JSC.AnyTask) catch unreachable;
|
||||
task.* = JSC.AnyTask.New(ThisServer, deinit).init(this);
|
||||
this.vm.enqueueTask(JSC.Task.init(task));
|
||||
this.vm.enqueueImmediateTask(JSC.Task.init(task));
|
||||
}
|
||||
|
||||
pub fn deinit(this: *ThisServer) void {
|
||||
@@ -7007,6 +7017,14 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
|
||||
return;
|
||||
}
|
||||
|
||||
if (ctx.resp != null) {
|
||||
if (resp.isClosed()) {
|
||||
ctx.resp = null;
|
||||
ctx.deref();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx.shouldRenderMissing()) {
|
||||
ctx.renderMissing();
|
||||
return;
|
||||
|
||||
@@ -4252,6 +4252,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
|
||||
return GlobalObject::PromiseFunctions::Bun__onResolveEntryPointResult;
|
||||
} else if (handler == Bun__onRejectEntryPointResult) {
|
||||
return GlobalObject::PromiseFunctions::Bun__onRejectEntryPointResult;
|
||||
} else if (handler == Bun__Expect__onReject) {
|
||||
return GlobalObject::PromiseFunctions::Bun__Expect__onReject;
|
||||
} else if (handler == Bun__Expect__onResolve) {
|
||||
return GlobalObject::PromiseFunctions::Bun__Expect__onResolve;
|
||||
} else {
|
||||
RELEASE_ASSERT_NOT_REACHED();
|
||||
}
|
||||
|
||||
@@ -354,8 +354,10 @@ public:
|
||||
Bun__BodyValueBufferer__onResolveStream,
|
||||
Bun__onResolveEntryPointResult,
|
||||
Bun__onRejectEntryPointResult,
|
||||
Bun__Expect__onReject,
|
||||
Bun__Expect__onResolve,
|
||||
};
|
||||
static constexpr size_t promiseFunctionsSize = 24;
|
||||
static constexpr size_t promiseFunctionsSize = 26;
|
||||
|
||||
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));
|
||||
|
||||
|
||||
@@ -2826,6 +2826,9 @@ pub const AnyPromise = union(enum) {
|
||||
inline else => |promise| promise.isHandled(vm),
|
||||
};
|
||||
}
|
||||
pub fn then(this: AnyPromise, globalThis: *JSGlobalObject, ctx: JSValue, onFulfilled: JSNativeFn, onRejected: JSNativeFn) void {
|
||||
this.asValue(globalThis)._then(globalThis, ctx, onFulfilled, onRejected);
|
||||
}
|
||||
pub fn setHandled(this: AnyPromise, vm: *VM) void {
|
||||
switch (this) {
|
||||
inline else => |promise| promise.setHandled(vm),
|
||||
|
||||
2
src/bun.js/bindings/headers.h
generated
2
src/bun.js/bindings/headers.h
generated
@@ -852,5 +852,7 @@ CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *arg
|
||||
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__onResolveEntryPointResult);
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__onRejectEntryPointResult);
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__Expect__onReject);
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__Expect__onResolve);
|
||||
|
||||
#endif
|
||||
@@ -375,10 +375,9 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
|
||||
{
|
||||
auto& vm = JSC::getVM(lexicalGlobalObject);
|
||||
auto throwScope = DECLARE_THROW_SCOPE(vm);
|
||||
UNUSED_PARAM(throwScope);
|
||||
UNUSED_PARAM(callFrame);
|
||||
EnsureStillAliveScope argument0 = callFrame->argument(0);
|
||||
JSC::JSString* input = argument0.value().toStringOrNull(lexicalGlobalObject);
|
||||
JSC::JSString* input = argument0.value().toString(lexicalGlobalObject);
|
||||
RETURN_IF_EXCEPTION(throwScope, {});
|
||||
JSC::EncodedJSValue res;
|
||||
String str;
|
||||
if (input->is8Bit()) {
|
||||
@@ -397,11 +396,6 @@ static inline JSC::EncodedJSValue jsTextEncoderPrototypeFunction_encodeBody(JSC:
|
||||
res = TextEncoder__encode16(lexicalGlobalObject, str.span16().data(), str.length());
|
||||
}
|
||||
|
||||
if (UNLIKELY(JSC::JSValue::decode(res).isObject() && JSC::JSValue::decode(res).getObject()->isErrorInstance())) {
|
||||
throwScope.throwException(lexicalGlobalObject, JSC::JSValue::decode(res));
|
||||
return {};
|
||||
}
|
||||
|
||||
RELEASE_AND_RETURN(throwScope, res);
|
||||
}
|
||||
|
||||
|
||||
@@ -821,7 +821,10 @@ pub const EventLoop = struct {
|
||||
defer this.debug.exit();
|
||||
|
||||
if (count == 1) {
|
||||
this.drainMicrotasksWithGlobal(this.global, this.virtual_machine.jsc);
|
||||
const vm = this.virtual_machine;
|
||||
const global = this.global;
|
||||
const jsc = vm.jsc;
|
||||
this.drainTasksWithoutRejection(vm, global, jsc);
|
||||
}
|
||||
|
||||
this.entered_event_loop_count -= 1;
|
||||
@@ -1479,6 +1482,32 @@ pub const EventLoop = struct {
|
||||
this.virtual_machine.gc_controller.processGCTimer();
|
||||
}
|
||||
|
||||
pub fn drainTasksWithoutRejection(this: *EventLoop, ctx: *JSC.VirtualMachine, global: *JSC.JSGlobalObject, js_vm: *JSC.VM) void {
|
||||
while (true) {
|
||||
while (this.tickWithCount(ctx) > 0) {
|
||||
this.tickConcurrent();
|
||||
} else {
|
||||
this.drainMicrotasksWithGlobal(global, js_vm);
|
||||
this.tickConcurrent();
|
||||
if (this.tasks.count > 0) continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn drainTasks(this: *EventLoop, ctx: *JSC.VirtualMachine, global: *JSC.JSGlobalObject, js_vm: *JSC.VM) void {
|
||||
while (true) {
|
||||
while (this.tickWithCount(ctx) > 0) : (global.handleRejectedPromises()) {
|
||||
this.tickConcurrent();
|
||||
} else {
|
||||
this.drainMicrotasksWithGlobal(global, js_vm);
|
||||
this.tickConcurrent();
|
||||
if (this.tasks.count > 0) continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tick(this: *EventLoop) void {
|
||||
JSC.markBinding(@src());
|
||||
{
|
||||
@@ -1496,16 +1525,7 @@ pub const EventLoop = struct {
|
||||
const global = ctx.global;
|
||||
const global_vm = ctx.jsc;
|
||||
|
||||
while (true) {
|
||||
while (this.tickWithCount(ctx) > 0) : (this.global.handleRejectedPromises()) {
|
||||
this.tickConcurrent();
|
||||
} else {
|
||||
this.drainMicrotasksWithGlobal(global, global_vm);
|
||||
this.tickConcurrent();
|
||||
if (this.tasks.count > 0) continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
this.drainTasks(ctx, global, global_vm);
|
||||
|
||||
while (this.tickWithCount(ctx) > 0) {
|
||||
this.tickConcurrent();
|
||||
|
||||
@@ -1083,7 +1083,7 @@ pub const VirtualMachine = struct {
|
||||
onUnhandledRejection: *const OnUnhandledRejection = undefined,
|
||||
count: usize = 0,
|
||||
|
||||
pub fn apply(this: *UnhandledRejectionScope, vm: *JSC.VirtualMachine) void {
|
||||
pub fn apply(this: *const UnhandledRejectionScope, vm: *JSC.VirtualMachine) void {
|
||||
vm.onUnhandledRejection = this.onUnhandledRejection;
|
||||
vm.onUnhandledRejectionCtx = this.ctx;
|
||||
vm.unhandled_error_counter = this.count;
|
||||
@@ -1570,10 +1570,6 @@ pub const VirtualMachine = struct {
|
||||
this.eventLoop().waitForPromise(promise);
|
||||
}
|
||||
|
||||
pub fn waitForTasks(this: *VirtualMachine) void {
|
||||
this.eventLoop().waitForTasks();
|
||||
}
|
||||
|
||||
pub const MacroMap = std.AutoArrayHashMap(i32, js.JSObjectRef);
|
||||
|
||||
pub fn enableMacroMode(this: *VirtualMachine) void {
|
||||
@@ -2562,7 +2558,7 @@ pub const VirtualMachine = struct {
|
||||
pub const main_file_name: string = "bun:main";
|
||||
|
||||
pub fn drainMicrotasks(this: *VirtualMachine) void {
|
||||
this.eventLoop().drainMicrotasks();
|
||||
this.eventLoop().drainTasksWithoutRejection(this, this.global, this.jsc);
|
||||
}
|
||||
|
||||
pub fn processFetchLog(globalThis: *JSGlobalObject, specifier: bun.String, referrer: bun.String, log: *logger.Log, ret: *ErrorableResolvedSource, err: anyerror) void {
|
||||
|
||||
@@ -62,6 +62,11 @@ pub const Expect = struct {
|
||||
flags: Flags = .{},
|
||||
parent: ParentScope = .{ .global = {} },
|
||||
custom_label: bun.String = bun.String.empty,
|
||||
promise_status: enum {
|
||||
none,
|
||||
fulfilled,
|
||||
rejected,
|
||||
} = .none,
|
||||
|
||||
pub const TestScope = struct {
|
||||
test_id: TestRunner.Test.ID,
|
||||
@@ -204,22 +209,63 @@ pub const Expect = struct {
|
||||
const matcher_params = switch (Output.enable_ansi_colors) {
|
||||
inline else => |colors| comptime Output.prettyFmt(matcher_params_fmt, colors),
|
||||
};
|
||||
return processPromise(this.custom_label, this.flags, globalThis, value, matcher_name, matcher_params, false);
|
||||
return processPromise(this.custom_label, thisValue, this.flags, globalThis, value, matcher_name, matcher_params, false);
|
||||
}
|
||||
|
||||
export fn Bun__Expect__onReject(globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(JSC.conv) JSValue {
|
||||
const arguments = callframe.argumentsUndef(2).all();
|
||||
const value = arguments[1];
|
||||
|
||||
const this: *Expect = Expect.fromJS(value) orelse {
|
||||
return .undefined;
|
||||
};
|
||||
this.promise_status = .rejected;
|
||||
Expect.capturedValueSetCached(value, globalThis, arguments[0]);
|
||||
|
||||
return arguments[0];
|
||||
}
|
||||
|
||||
export fn Bun__Expect__onResolve(globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(JSC.conv) JSValue {
|
||||
const arguments = callframe.argumentsUndef(2).all();
|
||||
const value = arguments[1];
|
||||
|
||||
const this: *Expect = Expect.fromJS(value) orelse {
|
||||
return .undefined;
|
||||
};
|
||||
this.promise_status = .fulfilled;
|
||||
Expect.capturedValueSetCached(value, globalThis, arguments[0]);
|
||||
|
||||
return arguments[0];
|
||||
}
|
||||
|
||||
/// Processes the async flags (resolves/rejects), waiting for the async value if needed.
|
||||
/// If no flags, returns the original value
|
||||
/// If either flag is set, waits for the result, and returns either it as a JSValue, or null if the expectation failed (in which case if silent is false, also throws a js exception)
|
||||
pub fn processPromise(custom_label: bun.String, flags: Expect.Flags, globalThis: *JSGlobalObject, value: JSValue, matcher_name: anytype, matcher_params: anytype, comptime silent: bool) ?JSValue {
|
||||
pub fn processPromise(custom_label: bun.String, thisValue: JSValue, flags: Expect.Flags, globalThis: *JSGlobalObject, value: JSValue, matcher_name: anytype, matcher_params: anytype, comptime silent: bool) ?JSValue {
|
||||
switch (flags.promise) {
|
||||
inline .resolves, .rejects => |resolution| {
|
||||
if (value.asAnyPromise()) |promise| {
|
||||
const vm = globalThis.vm();
|
||||
promise.setHandled(vm);
|
||||
|
||||
globalThis.bunVM().waitForPromise(promise);
|
||||
const jsc_vm = globalThis.bunVM();
|
||||
var strong = JSC.Strong{};
|
||||
defer strong.deinit();
|
||||
|
||||
if (promise.status(vm) == .pending) {
|
||||
strong = JSC.Strong.create(promise.asValue(globalThis), globalThis);
|
||||
promise.then(globalThis, thisValue, &Bun__Expect__onResolve, &Bun__Expect__onReject);
|
||||
|
||||
const prev_rejection_scope = jsc_vm.unhandledRejectionScope();
|
||||
defer prev_rejection_scope.apply(jsc_vm);
|
||||
|
||||
var new_rejection_scope = prev_rejection_scope;
|
||||
new_rejection_scope.onUnhandledRejection = JSC.VirtualMachine.onQuietUnhandledRejectionHandler;
|
||||
new_rejection_scope.apply(jsc_vm);
|
||||
|
||||
jsc_vm.waitForPromise(promise);
|
||||
}
|
||||
|
||||
const newValue = promise.result(vm);
|
||||
switch (promise.status(vm)) {
|
||||
.fulfilled => switch (resolution) {
|
||||
.resolves => {},
|
||||
@@ -248,8 +294,7 @@ pub const Expect = struct {
|
||||
.pending => unreachable,
|
||||
}
|
||||
|
||||
newValue.ensureStillAlive();
|
||||
return newValue;
|
||||
return promise.result(vm);
|
||||
} else {
|
||||
if (!silent) {
|
||||
var formatter = JSC.ConsoleObject.Formatter{ .globalThis = globalThis, .quote_strings = true };
|
||||
@@ -315,7 +360,7 @@ pub const Expect = struct {
|
||||
outFlags.* = flags.encode();
|
||||
|
||||
// (note that matcher_name/matcher_args are not used because silent=true)
|
||||
if (processPromise(bun.String.empty, flags, globalThis, value.*, "", "", true)) |result| {
|
||||
if (processPromise(bun.String.empty, .undefined, flags, globalThis, value.*, "", "", true)) |result| {
|
||||
value.* = result;
|
||||
return true;
|
||||
}
|
||||
@@ -2330,9 +2375,6 @@ pub const Expect = struct {
|
||||
var vm = globalThis.bunVM();
|
||||
var return_value: JSValue = .zero;
|
||||
|
||||
// Drain existing unhandled rejections
|
||||
vm.global.handleRejectedPromises();
|
||||
|
||||
var scope = vm.unhandledRejectionScope();
|
||||
const prev_unhandled_pending_rejection_to_capture = vm.unhandled_pending_rejection_to_capture;
|
||||
vm.unhandled_pending_rejection_to_capture = &return_value;
|
||||
@@ -4673,9 +4715,13 @@ pub const Expect = struct {
|
||||
// support for async matcher results
|
||||
if (result.asAnyPromise()) |promise| {
|
||||
const vm = globalThis.vm();
|
||||
var strong = JSC.Strong{};
|
||||
defer strong.deinit();
|
||||
promise.setHandled(vm);
|
||||
|
||||
globalThis.bunVM().waitForPromise(promise);
|
||||
if (promise.status(vm) == .pending) {
|
||||
strong = JSC.Strong.create(promise.asValue(globalThis), globalThis);
|
||||
globalThis.bunVM().waitForPromise(promise);
|
||||
}
|
||||
|
||||
result = promise.result(vm);
|
||||
result.ensureStillAlive();
|
||||
@@ -4805,7 +4851,7 @@ pub const Expect = struct {
|
||||
globalThis.throw("Internal consistency error: failed to retrieve the captured value", .{});
|
||||
return .zero;
|
||||
};
|
||||
value = Expect.processPromise(expect.custom_label, expect.flags, globalThis, value, matcher_name, matcher_params, false) orelse return .zero;
|
||||
value = Expect.processPromise(expect.custom_label, thisValue, expect.flags, globalThis, value, matcher_name, matcher_params, false) orelse return .zero;
|
||||
value.ensureStillAlive();
|
||||
|
||||
incrementExpectCallCounter();
|
||||
|
||||
@@ -59,13 +59,14 @@ pub const TextEncoder = struct {
|
||||
const uint8array = JSC.JSValue.createUninitializedUint8Array(globalThis, result.written);
|
||||
bun.assert(result.written <= buf.len);
|
||||
bun.assert(result.read == slice.len);
|
||||
const array_buffer = uint8array.asArrayBuffer(globalThis).?;
|
||||
const array_buffer = uint8array.asArrayBuffer(globalThis) orelse return .zero;
|
||||
bun.assert(result.written == array_buffer.len);
|
||||
@memcpy(array_buffer.byteSlice()[0..result.written], buf[0..result.written]);
|
||||
return uint8array;
|
||||
} else {
|
||||
const bytes = strings.allocateLatin1IntoUTF8(globalThis.bunVM().allocator, []const u8, slice) catch {
|
||||
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
|
||||
globalThis.throwOutOfMemory();
|
||||
return .zero;
|
||||
};
|
||||
bun.assert(bytes.len >= slice.len);
|
||||
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
|
||||
@@ -112,7 +113,8 @@ pub const TextEncoder = struct {
|
||||
@TypeOf(slice),
|
||||
slice,
|
||||
) catch {
|
||||
return JSC.toInvalidArguments("Out of memory", .{}, globalThis);
|
||||
globalThis.throwOutOfMemory();
|
||||
return .zero;
|
||||
};
|
||||
return ArrayBuffer.fromBytes(bytes, .Uint8Array).toJSUnchecked(globalThis, null);
|
||||
}
|
||||
|
||||
@@ -119,6 +119,18 @@ extern "C"
|
||||
}
|
||||
}
|
||||
|
||||
extern "C" void uws_res_clear_corked_socket(us_loop_t *loop) {
|
||||
uWS::LoopData *loopData = uWS::Loop::data(loop);
|
||||
void *corkedSocket = loopData->getCorkedSocket();
|
||||
if (corkedSocket) {
|
||||
if (loopData->isCorkedSSL()) {
|
||||
((uWS::AsyncSocket<true> *) corkedSocket)->uncork();
|
||||
} else {
|
||||
((uWS::AsyncSocket<false> *) corkedSocket)->uncork();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void uws_app_delete(int ssl, uws_app_t *app, const char *pattern, uws_method_handler handler, void *user_data)
|
||||
{
|
||||
if (ssl)
|
||||
@@ -1609,6 +1621,10 @@ size_t uws_req_get_header(uws_req_t *res, const char *lower_case_header,
|
||||
}
|
||||
}
|
||||
|
||||
int uws_res_is_closed(int ssl, uws_res_r res) {
|
||||
return us_socket_is_closed(ssl, (us_socket_t *)res);
|
||||
}
|
||||
|
||||
__attribute__((callback (corker, ctx)))
|
||||
void uws_res_cork(int ssl, uws_res_r res, void *ctx,
|
||||
void (*corker)(void *ctx)) nonnull_fn_decl;
|
||||
|
||||
@@ -414,7 +414,7 @@ pub const UpgradedDuplex = struct {
|
||||
return array;
|
||||
}
|
||||
|
||||
pub fn startTLS(this: *UpgradedDuplex, ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
|
||||
pub fn startTLS(this: *UpgradedDuplex, ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
|
||||
this.wrapper = try WrapperType.init(ssl_options, is_client, .{
|
||||
.ctx = this,
|
||||
.onOpen = UpgradedDuplex.onOpen,
|
||||
@@ -525,25 +525,26 @@ pub const UpgradedDuplex = struct {
|
||||
}
|
||||
|
||||
this.origin.deinit();
|
||||
if (this.onDataCallback.get()) |callback| {
|
||||
if (this.onDataCallback.trySwap()) |callback| {
|
||||
JSC.setFunctionData(callback, null);
|
||||
this.onDataCallback.deinit();
|
||||
}
|
||||
if (this.onEndCallback.get()) |callback| {
|
||||
if (this.onEndCallback.trySwap()) |callback| {
|
||||
JSC.setFunctionData(callback, null);
|
||||
this.onEndCallback.deinit();
|
||||
}
|
||||
if (this.onWritableCallback.get()) |callback| {
|
||||
if (this.onWritableCallback.trySwap()) |callback| {
|
||||
JSC.setFunctionData(callback, null);
|
||||
this.onWritableCallback.deinit();
|
||||
}
|
||||
if (this.onCloseCallback.get()) |callback| {
|
||||
if (this.onCloseCallback.trySwap()) |callback| {
|
||||
JSC.setFunctionData(callback, null);
|
||||
this.onCloseCallback.deinit();
|
||||
}
|
||||
var ssl_error = this.ssl_error;
|
||||
ssl_error.deinit();
|
||||
this.ssl_error = .{};
|
||||
|
||||
this.onDataCallback.deinit();
|
||||
this.onEndCallback.deinit();
|
||||
this.onWritableCallback.deinit();
|
||||
this.onCloseCallback.deinit();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -856,7 +857,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
|
||||
}
|
||||
return .{ .result = {} };
|
||||
}
|
||||
pub fn open(this: *WindowsNamedPipe, fd: bun.FileDescriptor, ssl_options: ?JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
|
||||
pub fn open(this: *WindowsNamedPipe, fd: bun.FileDescriptor, ssl_options: ?*const JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
|
||||
bun.assert(this.pipe != null);
|
||||
this.flags.disconnected = true;
|
||||
|
||||
@@ -892,7 +893,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
pub fn connect(this: *WindowsNamedPipe, path: []const u8, ssl_options: ?JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
|
||||
pub fn connect(this: *WindowsNamedPipe, path: []const u8, ssl_options: ?*const JSC.API.ServerConfig.SSLConfig) JSC.Maybe(void) {
|
||||
bun.assert(this.pipe != null);
|
||||
this.flags.disconnected = true;
|
||||
// ref because we are connecting
|
||||
@@ -924,7 +925,7 @@ pub const WindowsNamedPipe = if (Environment.isWindows) struct {
|
||||
this.connect_req.data = this;
|
||||
return this.pipe.?.connect(&this.connect_req, path, this, onConnect);
|
||||
}
|
||||
pub fn startTLS(this: *WindowsNamedPipe, ssl_options: JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
|
||||
pub fn startTLS(this: *WindowsNamedPipe, ssl_options: *const JSC.API.ServerConfig.SSLConfig, is_client: bool) !void {
|
||||
this.flags.is_ssl = true;
|
||||
if (this.start(is_client)) {
|
||||
this.wrapper = try WrapperType.init(ssl_options, is_client, .{
|
||||
@@ -2372,6 +2373,10 @@ pub const PosixLoop = extern struct {
|
||||
|
||||
const log = bun.Output.scoped(.Loop, false);
|
||||
|
||||
pub fn uncork(this: *PosixLoop) void {
|
||||
uws_res_clear_corked_socket(this);
|
||||
}
|
||||
|
||||
pub fn iterationNumber(this: *const PosixLoop) u64 {
|
||||
return this.internal_loop_data.iteration_nr;
|
||||
}
|
||||
@@ -3483,6 +3488,10 @@ pub fn NewApp(comptime ssl: bool) type {
|
||||
uws_res_end(ssl_flag, res.downcast(), data.ptr, data.len, close_connection);
|
||||
}
|
||||
|
||||
pub fn isClosed(res: *Response) bool {
|
||||
return uws_res_is_closed(ssl_flag, res.downcast()) != 0;
|
||||
}
|
||||
|
||||
pub fn tryEnd(res: *Response, data: []const u8, total: usize, close_: bool) bool {
|
||||
return uws_res_try_end(ssl_flag, res.downcast(), data.ptr, data.len, total, close_);
|
||||
}
|
||||
@@ -4105,6 +4114,10 @@ pub const WindowsLoop = extern struct {
|
||||
pre: *uv.uv_prepare_t,
|
||||
check: *uv.uv_check_t,
|
||||
|
||||
pub fn uncork(this: *PosixLoop) void {
|
||||
uws_res_clear_corked_socket(this);
|
||||
}
|
||||
|
||||
pub fn get() *WindowsLoop {
|
||||
return uws_get_loop_with_native(bun.windows.libuv.Loop.get());
|
||||
}
|
||||
@@ -4420,3 +4433,5 @@ pub fn onThreadExit() void {
|
||||
}
|
||||
|
||||
extern fn uws_app_clear_routes(ssl_flag: c_int, app: *uws_app_t) void;
|
||||
extern fn uws_res_clear_corked_socket(loop: *Loop) void;
|
||||
extern fn uws_res_is_closed(ssl_flag: c_int, res: *anyopaque) i32;
|
||||
|
||||
37
src/http.zig
37
src/http.zig
@@ -414,14 +414,14 @@ const ProxyTunnel = struct {
|
||||
}
|
||||
}
|
||||
|
||||
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: JSC.API.ServerConfig.SSLConfig) void {
|
||||
fn start(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ssl_options: *const JSC.API.ServerConfig.SSLConfig) void {
|
||||
const proxy_tunnel = ProxyTunnel.new(.{});
|
||||
|
||||
var custom_options = ssl_options;
|
||||
var custom_options = ssl_options.*;
|
||||
// we always request the cert so we can verify it and also we manually abort the connection if the hostname doesn't match
|
||||
custom_options.reject_unauthorized = 0;
|
||||
custom_options.request_cert = 1;
|
||||
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(custom_options, true, .{
|
||||
proxy_tunnel.wrapper = SSLWrapper(*HTTPClient).init(&custom_options, true, .{
|
||||
.onOpen = ProxyTunnel.onOpen,
|
||||
.onData = ProxyTunnel.onData,
|
||||
.onHandshake = ProxyTunnel.onHandshake,
|
||||
@@ -1007,7 +1007,7 @@ pub const HTTPThread = struct {
|
||||
|
||||
queued_tasks: Queue = Queue{},
|
||||
|
||||
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
|
||||
queued_shutdowns: ShutdownMessage.List = ShutdownMessage.List{},
|
||||
queued_shutdowns_lock: bun.Lock = .{},
|
||||
|
||||
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
|
||||
@@ -1022,6 +1022,8 @@ pub const HTTPThread = struct {
|
||||
const ShutdownMessage = struct {
|
||||
async_http_id: u32,
|
||||
is_tls: bool,
|
||||
|
||||
pub const List = std.ArrayListUnmanaged(ShutdownMessage);
|
||||
};
|
||||
|
||||
pub const LibdeflateState = struct {
|
||||
@@ -1188,10 +1190,19 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
|
||||
fn drainEvents(this: *@This()) void {
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
for (this.queued_shutdowns.items) |http| {
|
||||
while (true) {
|
||||
// We need to make sure we don't hold the lock while we call close because
|
||||
// close calls the socket close callback, which reads the list of queued shutdowns
|
||||
// Meaning we can end up in a deadlock if we don't swap the list and release the lock
|
||||
var queued_shutdowns = ShutdownMessage.List{};
|
||||
{
|
||||
this.queued_shutdowns_lock.lock();
|
||||
defer this.queued_shutdowns_lock.unlock();
|
||||
queued_shutdowns = this.queued_shutdowns;
|
||||
this.queued_shutdowns = .{};
|
||||
}
|
||||
|
||||
for (queued_shutdowns.items) |http| {
|
||||
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
|
||||
if (http.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr.value);
|
||||
@@ -1203,7 +1214,13 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_shutdowns.clearRetainingCapacity();
|
||||
|
||||
// If we had any queued shutdowns, let's loop back around and try again
|
||||
if (queued_shutdowns.items.len > 0) {
|
||||
queued_shutdowns.deinit(bun.default_allocator);
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
while (this.queued_proxy_deref.popOrNull()) |http| {
|
||||
@@ -3198,7 +3215,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc
|
||||
|
||||
fn startProxyHandshake(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
// if we have options we pass them (ca, reject_unauthorized, etc) otherwise use the default
|
||||
const ssl_options = if (this.tls_props != null) this.tls_props.?.* else JSC.API.ServerConfig.SSLConfig.zero;
|
||||
const ssl_options = if (this.tls_props != null) this.tls_props.? else &JSC.API.ServerConfig.SSLConfig.zero;
|
||||
ProxyTunnel.start(this, is_ssl, socket, ssl_options);
|
||||
}
|
||||
|
||||
|
||||
@@ -2920,13 +2920,18 @@ pub const PostgresSQLConnection = struct {
|
||||
|
||||
pub fn onDrain(this: *PostgresSQLConnection) void {
|
||||
var vm = this.globalObject.bunVM();
|
||||
defer vm.drainMicrotasks();
|
||||
const event_loop = vm.eventLoop();
|
||||
event_loop.enter();
|
||||
defer event_loop.exit();
|
||||
this.flushData();
|
||||
}
|
||||
|
||||
pub fn onData(this: *PostgresSQLConnection, data: []const u8) void {
|
||||
var vm = this.globalObject.bunVM();
|
||||
defer vm.drainMicrotasks();
|
||||
const event_loop = vm.eventLoop();
|
||||
event_loop.enter();
|
||||
defer event_loop.exit();
|
||||
|
||||
if (this.read_buffer.remaining().len == 0) {
|
||||
var consumed: usize = 0;
|
||||
var offset: usize = 0;
|
||||
|
||||
@@ -346,7 +346,7 @@ describe("bun test", () => {
|
||||
await expect(sleep(1)).resolves.toBeUndefined();
|
||||
});
|
||||
test("timeout", async () => {
|
||||
await expect(sleep(64)).resolves.toBeUndefined();
|
||||
await expect(await sleep(64)).resolves.toBeUndefined();
|
||||
});
|
||||
`,
|
||||
});
|
||||
|
||||
@@ -149,7 +149,7 @@ describe("Server", () => {
|
||||
port: 0,
|
||||
});
|
||||
|
||||
const response = await fetch(`http://${server.hostname}:${server.port}`);
|
||||
const response = await fetch(server.url);
|
||||
expect(await response.text()).toBe("Hello");
|
||||
});
|
||||
|
||||
@@ -199,7 +199,7 @@ describe("Server", () => {
|
||||
});
|
||||
|
||||
try {
|
||||
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
|
||||
await fetch(server.url, { signal: abortController.signal });
|
||||
} catch (err: any) {
|
||||
expect(err).toBeDefined();
|
||||
expect(err?.name).toBe("AbortError");
|
||||
@@ -229,7 +229,7 @@ describe("Server", () => {
|
||||
});
|
||||
|
||||
try {
|
||||
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
|
||||
await fetch(server.url, { signal: abortController.signal });
|
||||
} catch {
|
||||
fetchAborted = true;
|
||||
}
|
||||
@@ -255,6 +255,7 @@ describe("Server", () => {
|
||||
type: "direct",
|
||||
async pull(controller) {
|
||||
abortController.abort();
|
||||
await Bun.sleep(0);
|
||||
|
||||
const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
|
||||
controller.write(buffer);
|
||||
@@ -278,7 +279,7 @@ describe("Server", () => {
|
||||
});
|
||||
|
||||
try {
|
||||
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
|
||||
await fetch(server.url, { signal: abortController.signal }).then(r => r.text());
|
||||
} catch {}
|
||||
await Bun.sleep(10);
|
||||
expect(signalOnServer).toBe(true);
|
||||
@@ -331,6 +332,7 @@ describe("Server", () => {
|
||||
new ReadableStream({
|
||||
async pull(controller) {
|
||||
abortController.abort();
|
||||
await Bun.sleep(0);
|
||||
|
||||
const buffer = await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer();
|
||||
controller.enqueue(buffer);
|
||||
@@ -353,8 +355,11 @@ describe("Server", () => {
|
||||
});
|
||||
|
||||
try {
|
||||
await fetch(`http://${server.hostname}:${server.port}`, { signal: abortController.signal });
|
||||
} catch {}
|
||||
await fetch(server.url, { signal: abortController.signal }).then(r => r.text());
|
||||
expect.unreachable();
|
||||
} catch (err: any) {
|
||||
expect(err.message).toContain("The operation was aborted");
|
||||
}
|
||||
await Bun.sleep(10);
|
||||
expect(signalOnServer).toBe(true);
|
||||
}
|
||||
@@ -551,7 +556,6 @@ test("should be able to await server.stop()", async () => {
|
||||
});
|
||||
|
||||
test("should be able to await server.stop(true) with keep alive", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const ready = Promise.withResolvers();
|
||||
const received = Promise.withResolvers();
|
||||
using server = Bun.serve({
|
||||
@@ -565,8 +569,11 @@ test("should be able to await server.stop(true) with keep alive", async () => {
|
||||
},
|
||||
});
|
||||
|
||||
// Start the request
|
||||
const responsePromise = fetch(server.url);
|
||||
var didRequestFail = false;
|
||||
// Send a request
|
||||
const responsePromise = fetch(server.url)
|
||||
.then(r => r.text())
|
||||
.catch(() => (didRequestFail = true));
|
||||
// Wait for the server to receive it.
|
||||
await received.promise;
|
||||
// Stop listening for new connections
|
||||
@@ -574,13 +581,16 @@ test("should be able to await server.stop(true) with keep alive", async () => {
|
||||
// Continue the request
|
||||
ready.resolve();
|
||||
|
||||
if (didRequestFail) {
|
||||
throw new Error("Request failed too early.");
|
||||
}
|
||||
|
||||
// Wait for the server to stop
|
||||
await stopped;
|
||||
|
||||
// It should fail before the server responds
|
||||
expect(async () => {
|
||||
await (await responsePromise).text();
|
||||
}).toThrow();
|
||||
await responsePromise;
|
||||
expect(didRequestFail).toBe(true);
|
||||
|
||||
// Ensure the server is completely stopped
|
||||
expect(async () => await fetch(server.url)).toThrow();
|
||||
|
||||
@@ -499,27 +499,22 @@ describe("streaming", () => {
|
||||
});
|
||||
|
||||
it("throw on pull after writing should not call the error handler", async () => {
|
||||
let subprocess;
|
||||
|
||||
afterAll(() => {
|
||||
subprocess?.kill();
|
||||
});
|
||||
|
||||
const onMessage = mock(async href => {
|
||||
var onMessage = mock(async href => {
|
||||
const url = new URL("write", href);
|
||||
const response = await fetch(url);
|
||||
expect(response.status).toBe(402);
|
||||
expect(response.headers.get("X-Hey")).toBe("123");
|
||||
expect(response.text()).resolves.toBe("");
|
||||
expect(await response.text()).toBe("");
|
||||
subprocess.kill();
|
||||
});
|
||||
|
||||
subprocess = Bun.spawn({
|
||||
await using subprocess = Bun.spawn({
|
||||
cwd: import.meta.dirname,
|
||||
cmd: [bunExe(), "readable-stream-throws.fixture.js"],
|
||||
env: bunEnv,
|
||||
stdout: "ignore",
|
||||
stderr: "pipe",
|
||||
stdin: "inherit",
|
||||
ipc: onMessage,
|
||||
});
|
||||
|
||||
@@ -1956,7 +1951,7 @@ it("we should always send date", async () => {
|
||||
it("should allow use of custom timeout", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
idleTimeout: 8, // uws precision is in seconds, and lower than 4 seconds is not reliable its timer is not that accurate
|
||||
idleTimeout: 7, // uws precision is in seconds, and lower than 4 seconds is not reliable its timer is not that accurate
|
||||
async fetch(req) {
|
||||
const url = new URL(req.url);
|
||||
return new Response(
|
||||
@@ -1978,12 +1973,21 @@ it("should allow use of custom timeout", async () => {
|
||||
},
|
||||
});
|
||||
async function testTimeout(pathname: string, success: boolean) {
|
||||
const res = await fetch(new URL(pathname, server.url.origin));
|
||||
expect(res.status).toBe(200);
|
||||
const promise = fetch(new URL(pathname, server.url.origin)).then(r => {
|
||||
expect(r.status).toBe(200);
|
||||
return r.text();
|
||||
});
|
||||
|
||||
if (success) {
|
||||
expect(res.text()).resolves.toBe("Hello, World!");
|
||||
return expect(await promise).toBe("Hello, World!");
|
||||
} else {
|
||||
expect(res.text()).rejects.toThrow(/The socket connection was closed unexpectedly./);
|
||||
return promise
|
||||
.then(() => {
|
||||
throw new Error("Expected timeout");
|
||||
})
|
||||
.catch(err => {
|
||||
expect(err.message).toContain("The socket connection was closed unexpectedly.");
|
||||
});
|
||||
}
|
||||
}
|
||||
await Promise.all([testTimeout("/ok", true), testTimeout("/timeout", false)]);
|
||||
@@ -2071,5 +2075,5 @@ it("allow custom timeout per request", async () => {
|
||||
expect(server.timeout).toBeFunction();
|
||||
const res = await fetch(new URL("/long-timeout", server.url.origin));
|
||||
expect(res.status).toBe(200);
|
||||
expect(res.text()).resolves.toBe("Hello, World!");
|
||||
expect(await res.text()).toBe("Hello, World!");
|
||||
}, 20_000);
|
||||
|
||||
@@ -15,6 +15,8 @@ $.cwd(process.cwd());
|
||||
$.nothrow();
|
||||
|
||||
const DEFAULT_THRESHOLD = process.platform === "darwin" ? 100 * (1 << 20) : 150 * (1 << 20);
|
||||
const aaa128kb = Buffer.alloc(128 * 1024, "a").toString();
|
||||
const aaa2kb = Buffer.alloc(2 * 1024, "a").toString();
|
||||
|
||||
const TESTS: [name: string, builder: () => TestBuilder, runs?: number][] = [
|
||||
["redirect_file", () => TestBuilder.command`echo hello > test.txt`.fileEquals("test.txt", "hello\n")],
|
||||
@@ -143,37 +145,17 @@ describe("fd leak", () => {
|
||||
memLeakTest("Buffer", () => TestBuilder.command`cat ${import.meta.filename} > ${Buffer.alloc(1 << 20)}`, 100);
|
||||
memLeakTest(
|
||||
"Blob_something",
|
||||
() =>
|
||||
TestBuilder.command`cat < ${new Blob([
|
||||
Array(128 * 1024)
|
||||
.fill("a")
|
||||
.join(""),
|
||||
])}`.stdout(str =>
|
||||
expect(str).toEqual(
|
||||
Array(128 * 1024)
|
||||
.fill("a")
|
||||
.join(""),
|
||||
),
|
||||
),
|
||||
100,
|
||||
);
|
||||
memLeakTest(
|
||||
"Blob_nothing",
|
||||
() =>
|
||||
TestBuilder.command`echo hi < ${new Blob([
|
||||
Array(128 * 1024)
|
||||
.fill("a")
|
||||
.join(""),
|
||||
])}`.stdout("hi\n"),
|
||||
() => TestBuilder.command`cat < ${new Blob([aaa128kb])}`.stdout(str => expect(str).toEqual(aaa128kb)),
|
||||
100,
|
||||
);
|
||||
memLeakTest("Blob_nothing", () => TestBuilder.command`echo hi < ${new Blob([aaa128kb])}`.stdout("hi\n"), 100);
|
||||
memLeakTest("String", () => TestBuilder.command`echo ${Array(4096).fill("a").join("")}`.stdout(() => {}), 100);
|
||||
|
||||
describe("#11816", async () => {
|
||||
function doit(builtin: boolean) {
|
||||
test(builtin ? "builtin" : "external", async () => {
|
||||
const files = tempDirWithFiles("hi", {
|
||||
"input.txt": Array(2048).fill("a").join(""),
|
||||
"input.txt": aaa2kb,
|
||||
});
|
||||
for (let j = 0; j < 10; j++) {
|
||||
const promises = [];
|
||||
@@ -206,7 +188,7 @@ describe("fd leak", () => {
|
||||
function doit(builtin: boolean) {
|
||||
test(builtin ? "builtin" : "external", async () => {
|
||||
const files = tempDirWithFiles("hi", {
|
||||
"input.txt": Array(2048).fill("a").join(""),
|
||||
"input.txt": aaa2kb,
|
||||
});
|
||||
// wrapping in a function
|
||||
// because of an optimization
|
||||
|
||||
@@ -1904,27 +1904,29 @@ it("should emit events in the right order", async () => {
|
||||
it("destroy should end download", async () => {
|
||||
// just simulate some file that will take forever to download
|
||||
const payload = Buffer.alloc(128 * 1024, "X");
|
||||
let byteLengths = {};
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
let running = true;
|
||||
req.signal.onabort = () => (running = false);
|
||||
const index = new URL(req.url).searchParams.get("index");
|
||||
byteLengths[index] = 0;
|
||||
return new Response(async function* () {
|
||||
while (running) {
|
||||
byteLengths[index] += payload.byteLength;
|
||||
yield payload;
|
||||
await Bun.sleep(10);
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
let index = 0;
|
||||
for (let i = 0; i < 5; i++) {
|
||||
let sendedByteLength = 0;
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
let running = true;
|
||||
req.signal.onabort = () => (running = false);
|
||||
return new Response(async function* () {
|
||||
while (running) {
|
||||
sendedByteLength += payload.byteLength;
|
||||
yield payload;
|
||||
await Bun.sleep(10);
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
async function run() {
|
||||
async function run(index: number) {
|
||||
let receivedByteLength = 0;
|
||||
let { promise, resolve } = Promise.withResolvers();
|
||||
const req = request(server.url, res => {
|
||||
const req = request(`${server.url}/?index=${index}`, res => {
|
||||
res.on("data", data => {
|
||||
receivedByteLength += data.length;
|
||||
if (resolve) {
|
||||
@@ -1939,7 +1941,7 @@ it("destroy should end download", async () => {
|
||||
await Bun.sleep(10);
|
||||
const initialByteLength = receivedByteLength;
|
||||
// we should receive the same amount of data we sent
|
||||
expect(initialByteLength).toBeLessThanOrEqual(sendedByteLength);
|
||||
expect(initialByteLength).toBeLessThanOrEqual(byteLengths[index]);
|
||||
await Bun.sleep(10);
|
||||
// we should not receive more data after destroy
|
||||
expect(initialByteLength).toBe(receivedByteLength);
|
||||
@@ -1947,7 +1949,7 @@ it("destroy should end download", async () => {
|
||||
}
|
||||
|
||||
const runCount = 50;
|
||||
const runs = Array.from({ length: runCount }, run);
|
||||
const runs = Array.from({ length: runCount }, () => run(index++));
|
||||
await Promise.all(runs);
|
||||
Bun.gc(true);
|
||||
await Bun.sleep(10);
|
||||
|
||||
@@ -27,7 +27,8 @@ test("HTTP server deletes parser after write", async () => {
|
||||
});
|
||||
|
||||
await new Promise(resolve => {
|
||||
req.end(resolve);
|
||||
req.once("close", resolve);
|
||||
req.end();
|
||||
});
|
||||
|
||||
server.close();
|
||||
|
||||
@@ -7,13 +7,9 @@ const http = require("http");
|
||||
|
||||
let server;
|
||||
|
||||
beforeAll(() => {
|
||||
server = http.createServer(handle);
|
||||
});
|
||||
beforeAll(() => {});
|
||||
|
||||
afterAll(() => {
|
||||
server.close();
|
||||
});
|
||||
afterAll(() => {});
|
||||
|
||||
function handle(req, res) {
|
||||
res.on("error", jest.fn());
|
||||
@@ -37,11 +33,18 @@ function handle(req, res) {
|
||||
});
|
||||
}
|
||||
|
||||
test("http server write end after end", done => {
|
||||
test("http server write end after end", async () => {
|
||||
server = http.createServer(handle);
|
||||
const { promise, resolve, reject } = Promise.withResolvers();
|
||||
server.listen(0, () => {
|
||||
http.get(`http://localhost:${server.address().port}`);
|
||||
done();
|
||||
http
|
||||
.get(`http://localhost:${server.address().port}`)
|
||||
.once("close", () => {
|
||||
resolve();
|
||||
})
|
||||
.once("error", reject);
|
||||
});
|
||||
await promise;
|
||||
});
|
||||
|
||||
//<#END_FILE: test-http-server-write-end-after-end.js
|
||||
|
||||
@@ -492,7 +492,7 @@ test("fetching with Request object - issue #1527", async () => {
|
||||
body,
|
||||
});
|
||||
|
||||
expect(fetch(request)).resolves.pass();
|
||||
expect(async () => await fetch(request)).resolves.pass();
|
||||
} finally {
|
||||
server.closeAllConnections();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user