mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
fix(windows) socket and timers/performance tests (#9651)
* WIP missing keepalive * cleanup * is a Bun.sleep bug? * no bun sleep * fix exception * revert * fix setTimeout/Bun.sleep * add Bun.sleep keepalive test * fixes * one more bonus fix * fix early firing of timers * use localhost and pass the server.hostname * opsie
This commit is contained in:
@@ -3797,7 +3797,8 @@ pub const Timer = struct {
|
||||
if (vm.isInspectorEnabled()) {
|
||||
Debugger.willDispatchAsyncCall(globalThis, .DOMTimer, Timeout.ID.asyncID(.{ .id = this.id, .kind = kind }));
|
||||
}
|
||||
|
||||
vm.eventLoop().enter();
|
||||
defer vm.eventLoop().exit();
|
||||
const result = callback.callWithGlobalThis(
|
||||
globalThis,
|
||||
args,
|
||||
@@ -4053,8 +4054,7 @@ pub const Timer = struct {
|
||||
if (this.cancelled) {
|
||||
_ = uv.uv_timer_stop(&this.timer);
|
||||
}
|
||||
// libuv runs on the same thread
|
||||
return this.runFromJSThread();
|
||||
this.runFromJSThread();
|
||||
}
|
||||
|
||||
fn onRequest(req: *bun.io.Request) bun.io.Action {
|
||||
@@ -4146,6 +4146,8 @@ pub const Timer = struct {
|
||||
_ = this.scheduled_count.fetchAdd(1, .Monotonic);
|
||||
const ms: usize = @max(interval orelse this.interval, 1);
|
||||
if (Environment.isWindows) {
|
||||
// we MUST update the timer so we avoid early firing
|
||||
uv.uv_update_time(uv.Loop.get());
|
||||
if (uv.uv_timer_start(&this.timer, TimerReference.onUVRequest, @intCast(ms), 0) != 0) @panic("unable to start timer");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1038,18 +1038,8 @@ pub const Listener = struct {
|
||||
TLSSocket.dataSetCached(tls.getThisValue(globalObject), globalObject, default_data);
|
||||
|
||||
tls.doConnect(connection, socket_context) catch {
|
||||
handlers_ptr.unprotect();
|
||||
socket_context.deinit(true);
|
||||
handlers.vm.allocator.destroy(handlers_ptr);
|
||||
handlers.promise.deinit();
|
||||
bun.default_allocator.destroy(tls);
|
||||
const err = JSC.SystemError{
|
||||
.message = bun.String.static("Failed to connect"),
|
||||
.syscall = bun.String.static("connect"),
|
||||
.code = if (port == null) bun.String.static("ENOENT") else bun.String.static("ECONNREFUSED"),
|
||||
};
|
||||
exception.* = err.toErrorInstance(globalObject).asObjectRef();
|
||||
return .zero;
|
||||
tls.handleConnectError(@intFromEnum(if (port == null) bun.C.SystemErrno.ENOENT else bun.C.SystemErrno.ECONNREFUSED));
|
||||
return promise_value;
|
||||
};
|
||||
tls.poll_ref.ref(handlers.vm);
|
||||
|
||||
@@ -1069,18 +1059,8 @@ pub const Listener = struct {
|
||||
TCPSocket.dataSetCached(tcp.getThisValue(globalObject), globalObject, default_data);
|
||||
|
||||
tcp.doConnect(connection, socket_context) catch {
|
||||
handlers_ptr.unprotect();
|
||||
socket_context.deinit(false);
|
||||
handlers.vm.allocator.destroy(handlers_ptr);
|
||||
handlers.promise.deinit();
|
||||
bun.default_allocator.destroy(tcp);
|
||||
const err = JSC.SystemError{
|
||||
.message = bun.String.static("Failed to connect"),
|
||||
.syscall = bun.String.static("connect"),
|
||||
.code = if (port == null) bun.String.static("ENOENT") else bun.String.static("ECONNREFUSED"),
|
||||
};
|
||||
exception.* = err.toErrorInstance(globalObject).asObjectRef();
|
||||
return .zero;
|
||||
tcp.handleConnectError(@intFromEnum(if (port == null) bun.C.SystemErrno.ENOENT else bun.C.SystemErrno.ECONNREFUSED));
|
||||
return promise_value;
|
||||
};
|
||||
tcp.poll_ref.ref(handlers.vm);
|
||||
|
||||
@@ -1253,8 +1233,7 @@ fn NewSocket(comptime ssl: bool) type {
|
||||
_ = handlers.callErrorHandler(this_value, &[_]JSC.JSValue{ this_value, err_value });
|
||||
}
|
||||
}
|
||||
pub fn onConnectError(this: *This, _: Socket, errno: c_int) void {
|
||||
JSC.markBinding(@src());
|
||||
fn handleConnectError(this: *This, errno: c_int) void {
|
||||
log("onConnectError({d})", .{errno});
|
||||
if (this.detached) return;
|
||||
this.detached = true;
|
||||
@@ -1270,10 +1249,9 @@ fn NewSocket(comptime ssl: bool) type {
|
||||
.errno = errno,
|
||||
.message = bun.String.static("Failed to connect"),
|
||||
.syscall = bun.String.static("connect"),
|
||||
|
||||
// For some reason errno is 0 which causes this to be success.
|
||||
// Unix socket case wont hit this callback because it instantly errors.
|
||||
.code = bun.String.static("ECONNREFUSED"),
|
||||
// Unix socket emits ENOENT
|
||||
.code = if (errno == @intFromEnum(bun.C.SystemErrno.ENOENT)) bun.String.static("ENOENT") else bun.String.static("ECONNREFUSED"),
|
||||
// .code = bun.String.static(@tagName(bun.sys.getErrno(errno))),
|
||||
// .code = bun.String.static(@tagName(@as(bun.C.E, @enumFromInt(errno)))),
|
||||
};
|
||||
@@ -1311,6 +1289,10 @@ fn NewSocket(comptime ssl: bool) type {
|
||||
this.has_pending_activity.store(false, .Release);
|
||||
}
|
||||
}
|
||||
pub fn onConnectError(this: *This, _: Socket, errno: c_int) void {
|
||||
JSC.markBinding(@src());
|
||||
this.handleConnectError(errno);
|
||||
}
|
||||
|
||||
pub fn markActive(this: *This) void {
|
||||
if (!this.is_active) {
|
||||
|
||||
@@ -1206,7 +1206,7 @@ pub const PipeReader = struct {
|
||||
const owned = this.toOwnedSlice();
|
||||
this.state = .{ .done = owned };
|
||||
if (!this.isDone()) return;
|
||||
// we need to ref because the process might be done and deref inside signalDoneToCmd before we call onCloseIO
|
||||
// we need to ref because the process might be done and deref inside signalDoneToCmd and we wanna to keep it alive to check this.process
|
||||
this.ref();
|
||||
defer this.deref();
|
||||
this.trySignalDoneToCmd();
|
||||
@@ -1325,6 +1325,9 @@ pub const PipeReader = struct {
|
||||
bun.default_allocator.free(this.state.done);
|
||||
}
|
||||
this.state = .{ .err = err.toSystemError() };
|
||||
// we need to ref because the process might be done and deref inside signalDoneToCmd and we wanna to keep it alive to check this.process
|
||||
this.ref();
|
||||
defer this.deref();
|
||||
this.trySignalDoneToCmd();
|
||||
if (this.process) |process| {
|
||||
// this.process = null;
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
(async () => {
|
||||
const port = process.argv[2] ? parseInt(process.argv[2]) : null;
|
||||
const hostname = process.argv[3] ? process.argv[3] : "localhost";
|
||||
await Bun.sleep(10);
|
||||
// failed connection
|
||||
console.log("test 1: failed connection");
|
||||
try {
|
||||
const socket = await Bun.connect({
|
||||
hostname: "localhost",
|
||||
hostname: hostname,
|
||||
port: 9999,
|
||||
socket: { data() {} },
|
||||
});
|
||||
@@ -15,7 +16,7 @@
|
||||
console.log("test 2: failed connection [tls]");
|
||||
try {
|
||||
const socket = await Bun.connect({
|
||||
hostname: "localhost",
|
||||
hostname: hostname,
|
||||
port: 9999,
|
||||
socket: { data() {} },
|
||||
tls: true,
|
||||
@@ -26,7 +27,7 @@
|
||||
// successful connection
|
||||
console.log("test 3: successful connection");
|
||||
const socket = await Bun.connect({
|
||||
hostname: "localhost",
|
||||
hostname: hostname,
|
||||
port,
|
||||
socket: { data() {} },
|
||||
});
|
||||
@@ -35,7 +36,7 @@
|
||||
// successful connection tls
|
||||
console.log("test 4: successful connection [tls]");
|
||||
const socket2 = await Bun.connect({
|
||||
hostname: "localhost",
|
||||
hostname: hostname,
|
||||
port,
|
||||
socket: { data() {} },
|
||||
});
|
||||
|
||||
13
test/js/bun/net/socket-huge-fixture.js
generated
13
test/js/bun/net/socket-huge-fixture.js
generated
@@ -19,14 +19,19 @@ var server = listen({
|
||||
open(socket) {
|
||||
console.time("send 1 GB (server)");
|
||||
socket.data.sent = socket.write(huge);
|
||||
if (socket.data.sent === huge.length) {
|
||||
console.timeEnd("send 1 GB (server)");
|
||||
socket.shutdown();
|
||||
serverResolve();
|
||||
}
|
||||
},
|
||||
async drain(socket) {
|
||||
socket.data.sent += socket.write(huge.subarray(socket.data.sent));
|
||||
// console.error("Sent", socket.data.sent, "bytes");
|
||||
|
||||
if (socket.data.sent === huge.length) {
|
||||
console.timeEnd("send 1 GB (server)");
|
||||
socket.shutdown();
|
||||
server.stop(true);
|
||||
serverResolve();
|
||||
}
|
||||
},
|
||||
@@ -35,7 +40,7 @@ var server = listen({
|
||||
|
||||
const socket = await connect({
|
||||
port: server.port,
|
||||
hostname: "localhost",
|
||||
hostname: server.hostname,
|
||||
data: { received: 0 },
|
||||
socket: {
|
||||
open(socket) {
|
||||
@@ -45,7 +50,7 @@ const socket = await connect({
|
||||
|
||||
data(socket, data) {
|
||||
socket.data.received += data.length;
|
||||
console.log("Received", data.length, "bytes");
|
||||
// console.error("Received", data.length, "bytes");
|
||||
received.update(data);
|
||||
|
||||
if (socket.data.received === huge.length) {
|
||||
@@ -58,7 +63,7 @@ const socket = await connect({
|
||||
});
|
||||
|
||||
await Promise.all([clientPromise, serverPromise]);
|
||||
server.stop();
|
||||
server.stop(true);
|
||||
socket.end();
|
||||
|
||||
if (received.digest("hex") !== Bun.SHA256.hash(huge, "hex")) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// @known-failing-on-windows: 1 failing
|
||||
import { expect, it } from "bun:test";
|
||||
import { bunEnv, bunExe, expectMaxObjectTypeCount } from "harness";
|
||||
import { bunEnv, bunExe, expectMaxObjectTypeCount, isWindows } from "harness";
|
||||
import { connect, fileURLToPath, SocketHandler, spawn } from "bun";
|
||||
|
||||
it("should coerce '0' to 0", async () => {
|
||||
@@ -67,7 +67,7 @@ it("connect without top level await should keep process alive", async () => {
|
||||
port: 0,
|
||||
});
|
||||
const proc = Bun.spawn({
|
||||
cmd: [bunExe(), "keep-event-loop-alive.js", String(server.port)],
|
||||
cmd: [bunExe(), "keep-event-loop-alive.js", String(server.port), server.hostname],
|
||||
cwd: import.meta.dir,
|
||||
env: bunEnv,
|
||||
});
|
||||
@@ -159,8 +159,10 @@ it("should reject on connection error, calling both connectError() and rejecting
|
||||
|
||||
it("should not leak memory when connect() fails", async () => {
|
||||
await (async () => {
|
||||
var promises = new Array(100);
|
||||
for (let i = 0; i < 100; i++) {
|
||||
// windows can take more than a second per connection
|
||||
const quantity = isWindows ? 10 : 50;
|
||||
var promises = new Array(quantity);
|
||||
for (let i = 0; i < quantity; i++) {
|
||||
promises[i] = connect({
|
||||
hostname: "localhost",
|
||||
port: 55555,
|
||||
@@ -179,8 +181,8 @@ it("should not leak memory when connect() fails", async () => {
|
||||
promises.length = 0;
|
||||
})();
|
||||
|
||||
await expectMaxObjectTypeCount(expect, "TCPSocket", 50, 100);
|
||||
});
|
||||
await expectMaxObjectTypeCount(expect, "TCPSocket", 50, 50);
|
||||
}, 60_000);
|
||||
|
||||
// this also tests we mark the promise as handled if connectError() is called
|
||||
it("should handle connection error", done => {
|
||||
@@ -222,13 +224,9 @@ it("should handle connection error", done => {
|
||||
});
|
||||
|
||||
it("should not leak memory when connect() fails again", async () => {
|
||||
await expectMaxObjectTypeCount(expect, "TCPSocket", 5, 100);
|
||||
await expectMaxObjectTypeCount(expect, "TCPSocket", 5, 50);
|
||||
});
|
||||
|
||||
it("should allow large amounts of data to be sent and received", async () => {
|
||||
expect([fileURLToPath(new URL("./socket-huge-fixture.js", import.meta.url))]).toRun();
|
||||
}, 10_000);
|
||||
|
||||
it("socket.timeout works", async () => {
|
||||
try {
|
||||
const { promise, resolve } = Promise.withResolvers<any>();
|
||||
@@ -250,7 +248,7 @@ it("socket.timeout works", async () => {
|
||||
port: 0,
|
||||
});
|
||||
var client = await connect({
|
||||
hostname: "localhost",
|
||||
hostname: server.hostname,
|
||||
port: server.port,
|
||||
socket: {
|
||||
timeout(socket) {
|
||||
@@ -270,3 +268,7 @@ it("socket.timeout works", async () => {
|
||||
server!.stop(true);
|
||||
}
|
||||
}, 10_000);
|
||||
|
||||
it("should allow large amounts of data to be sent and received", async () => {
|
||||
expect([fileURLToPath(new URL("./socket-huge-fixture.js", import.meta.url))]).toRun();
|
||||
}, 60_000);
|
||||
|
||||
4
test/js/bun/util/sleep-keepalive.ts
Normal file
4
test/js/bun/util/sleep-keepalive.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
(async () => {
|
||||
await Bun.sleep(10);
|
||||
console.log("event loop was not killed");
|
||||
})();
|
||||
@@ -58,3 +58,17 @@ test("sleep should saturate timeout values", async () => {
|
||||
|
||||
await allExited;
|
||||
});
|
||||
|
||||
test("sleep should keep the event loop alive", async () => {
|
||||
const proc = Bun.spawn({
|
||||
cmd: [bunExe(), "sleep-keepalive.ts"],
|
||||
stderr: "inherit",
|
||||
stdout: "pipe",
|
||||
stdin: "inherit",
|
||||
env: bunEnv,
|
||||
cwd: import.meta.dir,
|
||||
});
|
||||
await proc.exited;
|
||||
expect(proc.exitCode).toBe(0);
|
||||
expect(await new Response(proc.stdout).text()).toContain("event loop was not killed");
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user