mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 13:51:47 +00:00
fix graceful shutdown
This commit is contained in:
@@ -38,6 +38,7 @@ const bunTLSConnectOptions = Symbol.for("::buntlsconnectoptions::");
|
||||
const bunSocketServerOptions = Symbol.for("::bunnetserveroptions::");
|
||||
const kInfoHeaders = Symbol("sent-info-headers");
|
||||
const kProxySocket = Symbol("proxySocket");
|
||||
const kSessions = Symbol("sessions");
|
||||
const kQuotedString = /^[\x09\x20-\x5b\x5d-\x7e\x80-\xff]*$/;
|
||||
const MAX_ADDITIONAL_SETTINGS = 10;
|
||||
const Stream = require("node:stream");
|
||||
@@ -2623,7 +2624,6 @@ class ServerHttp2Session extends Http2Session {
|
||||
if (state === 7) {
|
||||
markStreamClosed(stream);
|
||||
self.#connections--;
|
||||
|
||||
stream.destroy();
|
||||
if (self.#connections === 0 && self.#closed) {
|
||||
self.destroy();
|
||||
@@ -2824,6 +2824,9 @@ class ServerHttp2Session extends Http2Session {
|
||||
constructor(socket: TLSSocket | Socket, options?: Http2ConnectOptions, server?: Http2Server) {
|
||||
super();
|
||||
this[kServer] = server;
|
||||
if (server) {
|
||||
server[kSessions].add(this);
|
||||
}
|
||||
this.#connected = true;
|
||||
if (socket instanceof TLSSocket) {
|
||||
// server will receive the preface to know if is or not h2
|
||||
@@ -2996,6 +2999,10 @@ class ServerHttp2Session extends Http2Session {
|
||||
}
|
||||
|
||||
destroy(error: Error | number | undefined = NGHTTP2_NO_ERROR, code?: number) {
|
||||
const server = this[kServer];
|
||||
if (server) {
|
||||
server[kSessions].delete(this);
|
||||
}
|
||||
if (typeof error === "number") {
|
||||
code = error;
|
||||
error = code !== NGHTTP2_NO_ERROR ? $ERR_HTTP2_SESSION_ERROR(code) : undefined;
|
||||
@@ -3074,7 +3081,6 @@ class ClientHttp2Session extends Http2Session {
|
||||
stream.emit("aborted");
|
||||
}
|
||||
self.#connections--;
|
||||
|
||||
process.nextTick(emitStreamErrorNT, self, stream, error, true, self.#connections === 0 && self.#closed);
|
||||
},
|
||||
streamError(self: ClientHttp2Session, stream: ClientHttp2Stream, error: number) {
|
||||
@@ -3102,7 +3108,6 @@ class ClientHttp2Session extends Http2Session {
|
||||
if (state === 7) {
|
||||
markStreamClosed(stream);
|
||||
self.#connections--;
|
||||
|
||||
stream.destroy();
|
||||
if (self.#connections === 0 && self.#closed) {
|
||||
self.destroy();
|
||||
@@ -3673,6 +3678,19 @@ function sessionOnTimeout() {
|
||||
this.destroy();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This function closes all active sessions gracefully.
|
||||
* @param {*} server the underlying server whose sessions to be closed
|
||||
*/
|
||||
function closeAllSessions(server: Http2Server | Http2SecureServer) {
|
||||
const sessions = server[kSessions];
|
||||
if (sessions.size > 0) {
|
||||
for (const session of sessions) {
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function connectionListener(socket: Socket) {
|
||||
const options = this[bunSocketServerOptions] || {};
|
||||
if (socket.alpnProtocol === false || socket.alpnProtocol === "http/1.1") {
|
||||
@@ -3712,6 +3730,7 @@ function connectionListener(socket: Socket) {
|
||||
}
|
||||
// setup session
|
||||
const session = new ServerHttp2Session(socket, options, this);
|
||||
|
||||
session.on("error", sessionOnError);
|
||||
const timeout = this.timeout;
|
||||
if (timeout) session.setTimeout(timeout, sessionOnTimeout);
|
||||
@@ -3759,14 +3778,16 @@ function initializeOptions(options) {
|
||||
|
||||
class Http2Server extends net.Server {
|
||||
timeout = 0;
|
||||
[kSessions] = new SafeSet();
|
||||
constructor(options, onRequestHandler) {
|
||||
if (typeof options === "function") {
|
||||
onRequestHandler = options;
|
||||
options = {};
|
||||
}
|
||||
options = initializeOptions(options);
|
||||
|
||||
super(options, connectionListener);
|
||||
this[kSessions] = new SafeSet();
|
||||
|
||||
this.setMaxListeners(0);
|
||||
|
||||
this.on("newListener", setupCompat);
|
||||
@@ -3788,6 +3809,11 @@ class Http2Server extends net.Server {
|
||||
options.settings = { ...options.settings, ...settings };
|
||||
}
|
||||
}
|
||||
|
||||
close() {
|
||||
super.close();
|
||||
closeAllSessions(this);
|
||||
}
|
||||
}
|
||||
|
||||
Http2Server.prototype[EventEmitter.captureRejectionSymbol] = function (err, event, ...args) {
|
||||
@@ -3831,6 +3857,7 @@ function emitFrameErrorEventNT(stream, frameType, errorCode) {
|
||||
}
|
||||
class Http2SecureServer extends tls.Server {
|
||||
timeout = 0;
|
||||
[kSessions] = new SafeSet();
|
||||
constructor(options, onRequestHandler) {
|
||||
//TODO: add 'http/1.1' on ALPNProtocols list after allowHTTP1 support
|
||||
if (typeof options !== "undefined") {
|
||||
@@ -3854,6 +3881,7 @@ class Http2SecureServer extends tls.Server {
|
||||
validateUint32(options.maxSessionRejectedStreams, "maxSessionRejectedStreams");
|
||||
}
|
||||
super(options, connectionListener);
|
||||
this[kSessions] = new SafeSet();
|
||||
this.setMaxListeners(0);
|
||||
this.on("newListener", setupCompat);
|
||||
if (typeof onRequestHandler === "function") {
|
||||
@@ -3874,6 +3902,10 @@ class Http2SecureServer extends tls.Server {
|
||||
options.settings = { ...options.settings, ...settings };
|
||||
}
|
||||
}
|
||||
close() {
|
||||
super.close();
|
||||
closeAllSessions(this);
|
||||
}
|
||||
}
|
||||
function createServer(options, onRequestHandler) {
|
||||
return new Http2Server(options, onRequestHandler);
|
||||
|
||||
Reference in New Issue
Block a user