fix(net/tls) fix backpressure pause on socket (#15543)

This commit is contained in:
Ciro Spaciari
2024-12-03 12:53:48 -08:00
committed by GitHub
parent da3d64b1ef
commit a2e2d114e9
4 changed files with 93 additions and 3 deletions

View File

@@ -2153,6 +2153,8 @@ us_socket_context_on_socket_connect_error(
socket->ssl_read_wants_write = 0;
socket->fatal_error = 0;
socket->handshake_state = HANDSHAKE_PENDING;
// always resume the socket
us_socket_resume(1, &socket->s);
return socket;
}

View File

@@ -1457,7 +1457,8 @@ fn NewSocket(comptime ssl: bool) type {
JSC.markBinding(@src());
log("resume", .{});
if (this.flags.is_paused) {
// we should not allow pausing/resuming a wrapped socket because a wrapped socket is 2 sockets and this can cause issues
if (this.wrapped == .none and this.flags.is_paused) {
this.flags.is_paused = !this.socket.resumeStream();
}
return .undefined;
@@ -1466,9 +1467,11 @@ fn NewSocket(comptime ssl: bool) type {
JSC.markBinding(@src());
log("pause", .{});
if (!this.flags.is_paused) {
// we should not allow pausing/resuming a wrapped socket because a wrapped socket is 2 sockets and this can cause issues
if (this.wrapped == .none and !this.flags.is_paused) {
this.flags.is_paused = this.socket.pauseStream();
}
return .undefined;
}

View File

@@ -0,0 +1,62 @@
import net from "net";
import tls from "tls";
import { once } from "events";
import { tls as certs } from "harness";
import { test, expect } from "bun:test";
test("should be able to upgrade a paused socket and also have backpressure on it #15438", async () => {
// enought to trigger backpressure
const payload = Buffer.alloc(16 * 1024 * 4, "b").toString("utf8");
const server = tls.createServer(certs, socket => {
// echo
socket.on("data", data => {
socket.write(data);
});
});
await once(server.listen(0, "127.0.0.1"), "listening");
const socket = net.connect({
port: (server.address() as net.AddressInfo).port,
host: "127.0.0.1",
});
await once(socket, "connect");
// pause raw socket
socket.pause();
const tlsSocket = tls.connect({
ca: certs.cert,
servername: "localhost",
socket,
});
await once(tlsSocket, "secureConnect");
// do http request using tls socket
async function doWrite(socket: net.Socket) {
let downloadedBody = 0;
const { promise, resolve, reject } = Promise.withResolvers();
function onData(data: Buffer) {
downloadedBody += data.byteLength;
if (downloadedBody === payload.length * 2) {
resolve();
}
}
socket.pause();
socket.write(payload);
socket.write(payload, () => {
socket.on("data", onData);
socket.resume();
});
await promise;
socket.off("data", onData);
}
for (let i = 0; i < 100; i++) {
// upgrade the tlsSocket
await doWrite(tlsSocket);
}
expect().pass();
});

View File

@@ -11,7 +11,30 @@ describe.skipIf(!databaseUrl)("postgres", () => {
const [{ version }] = await sql`SELECT version()`;
expect(version).toMatch(/PostgreSQL/);
} finally {
sql.end();
await sql.end();
}
});
test("should be able to resume after backpressure pause on upgraded handler #15438", async () => {
const sql = postgres(databaseUrl!);
try {
const batch = [];
for (let i = 0; i < 1000; i++) {
batch.push(
(async sql => {
const [{ version }] = await sql`SELECT version()`;
expect(version).toMatch(/PostgreSQL/);
})(sql),
);
if (batch.length === 50) {
await Promise.all(batch);
}
}
if (batch.length > 0) {
await Promise.all(batch);
}
} finally {
await sql.end();
}
});