diff --git a/src/bun.js/api/bun/h2_frame_parser.zig b/src/bun.js/api/bun/h2_frame_parser.zig index 6c96a5e837..eac72c3865 100644 --- a/src/bun.js/api/bun/h2_frame_parser.zig +++ b/src/bun.js/api/bun/h2_frame_parser.zig @@ -3129,6 +3129,7 @@ pub const H2FrameParser = struct { return JSC.JSValue.jsBoolean(true); } pub fn rstStream(this: *H2FrameParser, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue { + log("rstStream", .{}); JSC.markBinding(@src()); const args_list = callframe.arguments_old(2); if (args_list.len < 2) { @@ -3146,14 +3147,9 @@ pub const H2FrameParser = struct { return globalObject.throw("Invalid stream id", .{}); } - var stream = this.streams.getPtr(stream_id) orelse { + const stream = this.streams.getPtr(stream_id) orelse { return globalObject.throw("Invalid stream id", .{}); }; - - if (!stream.canSendData() and !stream.canReceiveData()) { - return JSC.JSValue.jsBoolean(false); - } - if (!error_arg.isNumber()) { return globalObject.throw("Invalid ErrorCode", .{}); } @@ -3467,7 +3463,13 @@ pub const H2FrameParser = struct { const identifier = stream.getIdentifier(); identifier.ensureStillAlive(); stream.freeResources(this, false); - stream.rstCode = @intFromEnum(ErrorCode.COMPRESSION_ERROR); + stream.rstCode = @intFromEnum(ErrorCode.FRAME_SIZE_ERROR); + this.dispatchWith2Extra( + .onFrameError, + identifier, + JSC.JSValue.jsNumber(@intFromEnum(FrameType.HTTP_FRAME_HEADERS)), + JSC.JSValue.jsNumber(@intFromEnum(ErrorCode.FRAME_SIZE_ERROR)), + ); this.dispatchWithExtra(.onStreamError, identifier, JSC.JSValue.jsNumber(stream.rstCode)); return .undefined; }; @@ -3498,8 +3500,16 @@ pub const H2FrameParser = struct { const identifier = stream.getIdentifier(); identifier.ensureStillAlive(); stream.freeResources(this, false); - stream.rstCode = @intFromEnum(ErrorCode.COMPRESSION_ERROR); + stream.rstCode = @intFromEnum(ErrorCode.FRAME_SIZE_ERROR); + this.dispatchWith2Extra( + .onFrameError, + identifier, + JSC.JSValue.jsNumber(@intFromEnum(FrameType.HTTP_FRAME_HEADERS)), + JSC.JSValue.jsNumber(@intFromEnum(ErrorCode.FRAME_SIZE_ERROR)), + ); + this.dispatchWithExtra(.onStreamError, identifier, JSC.JSValue.jsNumber(stream.rstCode)); + return .undefined; }; } @@ -3703,9 +3713,6 @@ pub const H2FrameParser = struct { var it = StreamResumableIterator.init(this); while (it.next()) |stream| { // this is the oposite logic of emitErrorToallStreams, in this case we wanna to cancel this streams - if (this.isServer) { - if (stream.id % 2 == 0) continue; - } else if (stream.id % 2 != 0) continue; if (stream.state != .CLOSED) { const old_state = stream.state; stream.state = .CLOSED; diff --git a/src/js/node/http2.ts b/src/js/node/http2.ts index d908e5f87a..1e6d5e893c 100644 --- a/src/js/node/http2.ts +++ b/src/js/node/http2.ts @@ -1880,6 +1880,7 @@ class Http2Stream extends Duplex { validateFunction(callback, "callback"); this.once("close", callback); } + this.push(null); const { ending } = this._writableState; if (!ending) { // If the writable side of the Http2Stream is still open, emit the @@ -2987,7 +2988,7 @@ class ServerHttp2Session extends Http2Session { // Gracefully closes the Http2Session, allowing any existing streams to complete on their own and preventing new Http2Stream instances from being created. Once closed, http2session.destroy() might be called if there are no open Http2Stream instances. // If specified, the callback function is registered as a handler for the 'close' event. - close(callback: Function) { + close(callback?: Function) { this.#closed = true; if (typeof callback === "function") { diff --git a/test/js/node/test/parallel/test-http2-client-jsstream-destroy.js b/test/js/node/test/parallel/test-http2-client-jsstream-destroy.js new file mode 100644 index 0000000000..329cf74c72 --- /dev/null +++ b/test/js/node/test/parallel/test-http2-client-jsstream-destroy.js @@ -0,0 +1,58 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const h2 = require('http2'); +const tls = require('tls'); +const fixtures = require('../common/fixtures'); +const { Duplex } = require('stream'); + +const server = h2.createSecureServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}); + +class JSSocket extends Duplex { + constructor(socket) { + super({ emitClose: true }); + socket.on('close', () => this.destroy()); + socket.on('data', (data) => this.push(data)); + this.socket = socket; + } + + _write(data, encoding, callback) { + this.socket.write(data, encoding, callback); + } + + _read(size) { + } + + _final(cb) { + cb(); + } +} + +server.listen(0, common.mustCall(function() { + const socket = tls.connect({ + rejectUnauthorized: false, + host: 'localhost', + port: this.address().port, + ALPNProtocols: ['h2'] + }, () => { + const proxy = new JSSocket(socket); + const client = h2.connect(`https://127.0.0.1:${this.address().port}`, { + createConnection: () => proxy + }); + const req = client.request(); + + server.on('request', () => { + socket.destroy(); + }); + + req.on('close', common.mustCall(() => { + client.close(); + server.close(); + })); + }); +})); diff --git a/test/js/node/test/parallel/test-http2-stream-destroy-event-order.js b/test/js/node/test/parallel/test-http2-stream-destroy-event-order.js new file mode 100644 index 0000000000..b25ccdc015 --- /dev/null +++ b/test/js/node/test/parallel/test-http2-stream-destroy-event-order.js @@ -0,0 +1,28 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const http2 = require('http2'); + +let client; +let req; +const server = http2.createServer(); +server.on('stream', common.mustCall((stream) => { + stream.on('error', common.mustCall(() => { + client.close(); + stream.on('close', common.mustCall(() => { + server.close(); + })); + })); + + req.close(2); +})); +server.listen(0, common.mustCall(() => { + client = http2.connect(`http://127.0.0.1:${server.address().port}`); + req = client.request(); + req.resume(); + req.on('error', common.mustCall(() => { + req.on('close', common.mustCall()); + })); +})); diff --git a/test/js/third_party/grpc-js/test-call-propagation.test.ts b/test/js/third_party/grpc-js/test-call-propagation.test.ts index 99bb8c093a..5db8ab7655 100644 --- a/test/js/third_party/grpc-js/test-call-propagation.test.ts +++ b/test/js/third_party/grpc-js/test-call-propagation.test.ts @@ -73,7 +73,7 @@ describe("Call propagation", () => { proxyServer.forceShutdown(); }); describe("Cancellation", () => { - it.todo("should work with unary requests", done => { + it("should work with unary requests", done => { done = multiDone(done, 2); // eslint-disable-next-line prefer-const let call: grpc.ClientUnaryCall; @@ -119,7 +119,7 @@ describe("Call propagation", () => { done(); }); }); - it.todo("Should work with server streaming requests", done => { + it("Should work with server streaming requests", done => { done = multiDone(done, 2); // eslint-disable-next-line prefer-const let call: grpc.ClientReadableStream;