From a2e2d114e9b6fc807fa7eca587ef1a52a41493e5 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Tue, 3 Dec 2024 12:53:48 -0800 Subject: [PATCH] fix(net/tls) fix backpressure pause on socket (#15543) --- packages/bun-usockets/src/crypto/openssl.c | 2 + src/bun.js/api/bun/socket.zig | 7 ++- test/js/node/tls/node-tls-upgrade.test.ts | 62 +++++++++++++++++++ test/js/third_party/postgres/postgres.test.ts | 25 +++++++- 4 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 test/js/node/tls/node-tls-upgrade.test.ts diff --git a/packages/bun-usockets/src/crypto/openssl.c b/packages/bun-usockets/src/crypto/openssl.c index e5f7e55ffc..5b22659d34 100644 --- a/packages/bun-usockets/src/crypto/openssl.c +++ b/packages/bun-usockets/src/crypto/openssl.c @@ -2153,6 +2153,8 @@ us_socket_context_on_socket_connect_error( socket->ssl_read_wants_write = 0; socket->fatal_error = 0; socket->handshake_state = HANDSHAKE_PENDING; + // always resume the socket + us_socket_resume(1, &socket->s); return socket; } diff --git a/src/bun.js/api/bun/socket.zig b/src/bun.js/api/bun/socket.zig index e14405120b..08007ae75b 100644 --- a/src/bun.js/api/bun/socket.zig +++ b/src/bun.js/api/bun/socket.zig @@ -1457,7 +1457,8 @@ fn NewSocket(comptime ssl: bool) type { JSC.markBinding(@src()); log("resume", .{}); - if (this.flags.is_paused) { + // we should not allow pausing/resuming a wrapped socket because a wrapped socket is 2 sockets and this can cause issues + if (this.wrapped == .none and this.flags.is_paused) { this.flags.is_paused = !this.socket.resumeStream(); } return .undefined; @@ -1466,9 +1467,11 @@ fn NewSocket(comptime ssl: bool) type { JSC.markBinding(@src()); log("pause", .{}); - if (!this.flags.is_paused) { + // we should not allow pausing/resuming a wrapped socket because a wrapped socket is 2 sockets and this can cause issues + if (this.wrapped == .none and !this.flags.is_paused) { this.flags.is_paused = this.socket.pauseStream(); } + return .undefined; } diff --git a/test/js/node/tls/node-tls-upgrade.test.ts b/test/js/node/tls/node-tls-upgrade.test.ts new file mode 100644 index 0000000000..a48f5c3e77 --- /dev/null +++ b/test/js/node/tls/node-tls-upgrade.test.ts @@ -0,0 +1,62 @@ +import net from "net"; +import tls from "tls"; +import { once } from "events"; +import { tls as certs } from "harness"; +import { test, expect } from "bun:test"; + +test("should be able to upgrade a paused socket and also have backpressure on it #15438", async () => { + // enought to trigger backpressure + const payload = Buffer.alloc(16 * 1024 * 4, "b").toString("utf8"); + + const server = tls.createServer(certs, socket => { + // echo + socket.on("data", data => { + socket.write(data); + }); + }); + + await once(server.listen(0, "127.0.0.1"), "listening"); + + const socket = net.connect({ + port: (server.address() as net.AddressInfo).port, + host: "127.0.0.1", + }); + await once(socket, "connect"); + + // pause raw socket + socket.pause(); + + const tlsSocket = tls.connect({ + ca: certs.cert, + servername: "localhost", + socket, + }); + await once(tlsSocket, "secureConnect"); + + // do http request using tls socket + async function doWrite(socket: net.Socket) { + let downloadedBody = 0; + const { promise, resolve, reject } = Promise.withResolvers(); + function onData(data: Buffer) { + downloadedBody += data.byteLength; + if (downloadedBody === payload.length * 2) { + resolve(); + } + } + socket.pause(); + socket.write(payload); + socket.write(payload, () => { + socket.on("data", onData); + socket.resume(); + }); + + await promise; + socket.off("data", onData); + } + for (let i = 0; i < 100; i++) { + // upgrade the tlsSocket + await doWrite(tlsSocket); + } + + expect().pass(); +}); diff --git a/test/js/third_party/postgres/postgres.test.ts b/test/js/third_party/postgres/postgres.test.ts index ef60a79989..3800a062bc 100644 --- a/test/js/third_party/postgres/postgres.test.ts +++ b/test/js/third_party/postgres/postgres.test.ts @@ -11,7 +11,30 @@ describe.skipIf(!databaseUrl)("postgres", () => { const [{ version }] = await sql`SELECT version()`; expect(version).toMatch(/PostgreSQL/); } finally { - sql.end(); + await sql.end(); + } + }); + + test("should be able to resume after backpressure pause on upgraded handler #15438", async () => { + const sql = postgres(databaseUrl!); + try { + const batch = []; + for (let i = 0; i < 1000; i++) { + batch.push( + (async sql => { + const [{ version }] = await sql`SELECT version()`; + expect(version).toMatch(/PostgreSQL/); + })(sql), + ); + if (batch.length === 50) { + await Promise.all(batch); + } + } + if (batch.length > 0) { + await Promise.all(batch); + } + } finally { + await sql.end(); } });