fix(net/tls) fix pg hang on end + hanging on query (#6487)

* fix pg hang on end + hanging on query

* remove dummy function

* fix node-stream

* add test

* fix test

* return error in test

* fix test use once instead of on

* fix OOM

* generated

* 💅

* 💅
This commit is contained in:
Ciro Spaciari
2023-10-14 20:16:49 -03:00
committed by GitHub
parent 9b5e66453b
commit a87aa2fafe
3 changed files with 36 additions and 8 deletions

View File

@@ -66,6 +66,9 @@ const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::");
const bunSocketInternal = Symbol.for("::bunnetsocketinternal::");
const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::");
function closeNT(self) {
self.emit("close");
}
function endNT(socket, callback, err) {
socket.end();
callback(err);
@@ -320,7 +323,7 @@ const Socket = (function (InternalSocket) {
this._parent = this;
this._parentWrap = this;
this.#pendingRead = undefined;
this.#upgraded = false;
this.#upgraded = null;
if (socket instanceof Socket) {
this.#socket = socket;
}
@@ -355,6 +358,14 @@ const Socket = (function (InternalSocket) {
Socket.#Drain(socket);
}
#closeRawConnection() {
const connection = this.#upgraded;
connection[bunSocketInternal] = null;
connection.unref();
connection.destroy();
process.nextTick(closeNT, connection);
}
connect(port, host, connectListener) {
var path;
var connection = this.#socket;
@@ -455,7 +466,7 @@ const Socket = (function (InternalSocket) {
if (socket) {
this.connecting = true;
this.#upgraded = true;
this.#upgraded = connection;
const result = socket.upgradeTLS({
data: this,
tls,
@@ -466,6 +477,7 @@ const Socket = (function (InternalSocket) {
// replace socket
connection[bunSocketInternal] = raw;
raw.timeout(raw.timeout);
this.once("end", this.#closeRawConnection);
raw.connecting = false;
this[bunSocketInternal] = tls;
} else {
@@ -479,7 +491,7 @@ const Socket = (function (InternalSocket) {
if (!socket) return;
this.connecting = true;
this.#upgraded = true;
this.#upgraded = connection;
const result = socket.upgradeTLS({
data: this,
tls,
@@ -491,6 +503,7 @@ const Socket = (function (InternalSocket) {
// replace socket
connection[bunSocketInternal] = raw;
raw.timeout(raw.timeout);
this.once("end", this.#closeRawConnection);
raw.connecting = false;
this[bunSocketInternal] = tls;
} else {
@@ -537,6 +550,7 @@ const Socket = (function (InternalSocket) {
_final(callback) {
this[bunSocketInternal]?.end();
callback();
process.nextTick(closeNT, this);
}
get localAddress() {
@@ -559,8 +573,10 @@ const Socket = (function (InternalSocket) {
const queue = this.#readQueue;
let chunk;
while ((chunk = queue.peek())) {
if (!this.push(chunk)) return;
const can_continue = !this.push(chunk);
// always remove from queue push will queue it internally if needed
queue.shift();
if (!can_continue) break;
}
}

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,5 @@
import { test, expect, describe } from "bun:test";
import { Pool } from "pg";
import { Pool, Client } from "pg";
import { parse } from "pg-connection-string";
import postgres from "postgres";
@@ -19,6 +19,18 @@ describe("pg", () => {
pool.end();
}
});
it("should execute big query and end connection", async () => {
const client = new Client({
connectionString: CONNECTION_STRING,
ssl: { rejectUnauthorized: false },
});
await client.connect();
const res = await client.query(`SELECT * FROM users LIMIT 1000`);
expect(res.rows.length).toBeGreaterThanOrEqual(300);
await client.end();
}, 5000);
});
describe("postgres", () => {