Compare commits

...

3 Commits

Author SHA1 Message Date
Ciro Spaciari
3645624579 emit end and close when aborted 2024-05-21 13:10:19 -03:00
Ciro Spaciari
0eff3c5d08 aborted behavior 2024-05-21 13:06:31 -03:00
Ciro Spaciari
4d7ea6c1b6 remove streamError emits 2024-05-21 11:35:33 -03:00
3 changed files with 33 additions and 32 deletions

View File

@@ -1726,7 +1726,7 @@ pub const H2FrameParser = struct {
if (stream.signal) |_signal| {
return JSC.JSValue.jsBoolean(_signal.aborted());
}
return JSC.JSValue.jsBoolean(true);
return JSC.JSValue.jsBoolean(stream.rstCode == @intFromEnum(ErrorCode.CANCEL));
}
pub fn getStreamState(this: *H2FrameParser, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSValue {
JSC.markBinding(@src());

View File

@@ -654,31 +654,33 @@ function emitStreamNT(self, streams, streamId) {
function emitStreamErrorNT(self, streams, streamId, error, destroy) {
const stream = streams.get(streamId);
const error_instance = streamErrorFromCode(error);
if (stream) {
if (!stream[bunHTTP2Closed]) {
stream[bunHTTP2Closed] = true;
}
stream.rstCode = error;
stream.emit("error", error_instance);
if (destroy) stream.destroy(error_instance);
if (constants.NGHTTP2_CANCEL != error && !stream.aborted) {
const error_instance = streamErrorFromCode(error);
stream.emit("error", error_instance);
if (destroy) stream.destroy(error_instance, error);
} else {
if (destroy) stream.destroy();
}
}
self.emit("streamError", error_instance);
}
function emitAbortedNT(self, streams, streamId, error) {
const stream = streams.get(streamId);
const error_instance = streamErrorFromCode(constants.NGHTTP2_CANCEL);
if (stream) {
if (!stream[bunHTTP2Closed]) {
stream[bunHTTP2Closed] = true;
}
stream.rstCode = constants.NGHTTP2_CANCEL;
stream.emit("aborted", error);
stream.emit("error", error_instance);
stream.emit("aborted");
stream.destroy();
stream.emit("end");
stream.emit("close");
}
self.emit("streamError", error_instance);
}
class ClientHttp2Session extends Http2Session {
/// close indicates that we called closed
@@ -720,13 +722,14 @@ class ClientHttp2Session extends Http2Session {
if (!self) return;
var stream = self.#streams.get(streamId);
if (stream) {
const error_instance = streamErrorFromCode(error);
if (!stream[bunHTTP2Closed]) {
stream[bunHTTP2Closed] = true;
}
stream.rstCode = error;
stream.emit("error", error_instance);
self.emit("streamError", error_instance);
if (error !== constants.NGHTTP2_CANCEL && !stream.aborted) {
stream.rstCode = error;
const error_instance = streamErrorFromCode(error);
stream.emit("error", error_instance);
}
} else {
process.nextTick(emitStreamErrorNT, self, self.#streams, streamId, error);
}
@@ -834,15 +837,15 @@ class ClientHttp2Session extends Http2Session {
if (!self) return;
var stream = self.#streams.get(streamId);
if (stream) {
const error_instance = streamErrorFromCode(constants.NGHTTP2_CANCEL);
if (!stream[bunHTTP2Closed]) {
stream[bunHTTP2Closed] = true;
}
stream.rstCode = constants.NGHTTP2_CANCEL;
stream.emit("aborted", error);
stream.emit("error", error_instance);
self.emit("streamError", error_instance);
stream.emit("aborted");
stream.destroy();
stream.emit("end");
stream.emit("close");
} else {
process.nextTick(emitAbortedNT, self, self.#streams, streamId, error);
}
@@ -863,7 +866,7 @@ class ClientHttp2Session extends Http2Session {
if (errorCode !== 0) {
for (let [_, stream] of self.#streams) {
stream.rstCode = errorCode;
stream.destroy(sessionErrorFromCode(errorCode));
stream.destroy(sessionErrorFromCode(errorCode), errorCode);
}
}
self[bunHTTP2Socket]?.end();
@@ -1155,7 +1158,7 @@ class ClientHttp2Session extends Http2Session {
this[bunHTTP2Socket] = null;
// this should not be needed since RST + GOAWAY should be sent
for (let [_, stream] of this.#streams) {
if (error) {
if (error && code != constants.NGHTTP2_CANCEL) {
stream.emit("error", error);
}
stream.destroy();

View File

@@ -81,6 +81,7 @@ function doHttp2Request(url, headers, payload, options, request_options) {
data += chunk;
});
req.on("error", reject);
req.on("aborted", reject);
req.on("end", () => {
resolve({ data, headers: response_headers });
client.close();
@@ -636,8 +637,8 @@ describe("Client Basics", () => {
await promise;
expect("unreachable").toBe(true);
} catch (err) {
expect(err.code).toBe("ERR_HTTP2_STREAM_ERROR");
expect(err.message).toBe("Stream closed with error code 8");
// aborted event should not pass any arguments
expect(err).toBeUndefined();
}
});
it("aborted event should work with abortController", async () => {
@@ -647,16 +648,16 @@ describe("Client Basics", () => {
client.on("error", reject);
const req = client.request({ ":path": "/" }, { signal: abortController.signal });
req.on("aborted", resolve);
// error should not be called when aborted
req.on("error", reject);
req.on("end", () => {
resolve();
client.close();
});
abortController.abort();
const result = await promise;
expect(result).toBeDefined();
expect(result.name).toBe("AbortError");
expect(result.message).toBe("The operation was aborted.");
expect(result.code).toBe(20);
// https://nodejs.org/api/http2.html#event-aborted
// aborted event will not pass any arguments
await promise;
expect(req.aborted).toBeTrue();
expect(req.rstCode).toBe(8);
});
@@ -666,15 +667,12 @@ describe("Client Basics", () => {
client.on("error", reject);
const req = client.request({ ":path": "/" }, { signal: AbortSignal.abort() });
req.on("aborted", resolve);
req.on("error", reject);
req.on("end", () => {
resolve();
client.close();
});
const result = await promise;
expect(result).toBeDefined();
expect(result.name).toBe("AbortError");
expect(result.message).toBe("The operation was aborted.");
expect(result.code).toBe(20);
await promise;
expect(req.rstCode).toBe(8);
expect(req.aborted).toBeTrue();
});