mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 22:01:47 +00:00
more fixes
This commit is contained in:
@@ -3129,6 +3129,7 @@ pub const H2FrameParser = struct {
|
||||
return JSC.JSValue.jsBoolean(true);
|
||||
}
|
||||
pub fn rstStream(this: *H2FrameParser, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
|
||||
log("rstStream", .{});
|
||||
JSC.markBinding(@src());
|
||||
const args_list = callframe.arguments_old(2);
|
||||
if (args_list.len < 2) {
|
||||
@@ -3146,14 +3147,9 @@ pub const H2FrameParser = struct {
|
||||
return globalObject.throw("Invalid stream id", .{});
|
||||
}
|
||||
|
||||
var stream = this.streams.getPtr(stream_id) orelse {
|
||||
const stream = this.streams.getPtr(stream_id) orelse {
|
||||
return globalObject.throw("Invalid stream id", .{});
|
||||
};
|
||||
|
||||
if (!stream.canSendData() and !stream.canReceiveData()) {
|
||||
return JSC.JSValue.jsBoolean(false);
|
||||
}
|
||||
|
||||
if (!error_arg.isNumber()) {
|
||||
return globalObject.throw("Invalid ErrorCode", .{});
|
||||
}
|
||||
@@ -3467,7 +3463,13 @@ pub const H2FrameParser = struct {
|
||||
const identifier = stream.getIdentifier();
|
||||
identifier.ensureStillAlive();
|
||||
stream.freeResources(this, false);
|
||||
stream.rstCode = @intFromEnum(ErrorCode.COMPRESSION_ERROR);
|
||||
stream.rstCode = @intFromEnum(ErrorCode.FRAME_SIZE_ERROR);
|
||||
this.dispatchWith2Extra(
|
||||
.onFrameError,
|
||||
identifier,
|
||||
JSC.JSValue.jsNumber(@intFromEnum(FrameType.HTTP_FRAME_HEADERS)),
|
||||
JSC.JSValue.jsNumber(@intFromEnum(ErrorCode.FRAME_SIZE_ERROR)),
|
||||
);
|
||||
this.dispatchWithExtra(.onStreamError, identifier, JSC.JSValue.jsNumber(stream.rstCode));
|
||||
return .undefined;
|
||||
};
|
||||
@@ -3498,8 +3500,16 @@ pub const H2FrameParser = struct {
|
||||
const identifier = stream.getIdentifier();
|
||||
identifier.ensureStillAlive();
|
||||
stream.freeResources(this, false);
|
||||
stream.rstCode = @intFromEnum(ErrorCode.COMPRESSION_ERROR);
|
||||
stream.rstCode = @intFromEnum(ErrorCode.FRAME_SIZE_ERROR);
|
||||
this.dispatchWith2Extra(
|
||||
.onFrameError,
|
||||
identifier,
|
||||
JSC.JSValue.jsNumber(@intFromEnum(FrameType.HTTP_FRAME_HEADERS)),
|
||||
JSC.JSValue.jsNumber(@intFromEnum(ErrorCode.FRAME_SIZE_ERROR)),
|
||||
);
|
||||
|
||||
this.dispatchWithExtra(.onStreamError, identifier, JSC.JSValue.jsNumber(stream.rstCode));
|
||||
|
||||
return .undefined;
|
||||
};
|
||||
}
|
||||
@@ -3703,9 +3713,6 @@ pub const H2FrameParser = struct {
|
||||
var it = StreamResumableIterator.init(this);
|
||||
while (it.next()) |stream| {
|
||||
// this is the oposite logic of emitErrorToallStreams, in this case we wanna to cancel this streams
|
||||
if (this.isServer) {
|
||||
if (stream.id % 2 == 0) continue;
|
||||
} else if (stream.id % 2 != 0) continue;
|
||||
if (stream.state != .CLOSED) {
|
||||
const old_state = stream.state;
|
||||
stream.state = .CLOSED;
|
||||
|
||||
@@ -1880,6 +1880,7 @@ class Http2Stream extends Duplex {
|
||||
validateFunction(callback, "callback");
|
||||
this.once("close", callback);
|
||||
}
|
||||
this.push(null);
|
||||
const { ending } = this._writableState;
|
||||
if (!ending) {
|
||||
// If the writable side of the Http2Stream is still open, emit the
|
||||
@@ -2987,7 +2988,7 @@ class ServerHttp2Session extends Http2Session {
|
||||
|
||||
// Gracefully closes the Http2Session, allowing any existing streams to complete on their own and preventing new Http2Stream instances from being created. Once closed, http2session.destroy() might be called if there are no open Http2Stream instances.
|
||||
// If specified, the callback function is registered as a handler for the 'close' event.
|
||||
close(callback: Function) {
|
||||
close(callback?: Function) {
|
||||
this.#closed = true;
|
||||
|
||||
if (typeof callback === "function") {
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
if (!common.hasCrypto)
|
||||
common.skip('missing crypto');
|
||||
const h2 = require('http2');
|
||||
const tls = require('tls');
|
||||
const fixtures = require('../common/fixtures');
|
||||
const { Duplex } = require('stream');
|
||||
|
||||
const server = h2.createSecureServer({
|
||||
key: fixtures.readKey('agent1-key.pem'),
|
||||
cert: fixtures.readKey('agent1-cert.pem')
|
||||
});
|
||||
|
||||
class JSSocket extends Duplex {
|
||||
constructor(socket) {
|
||||
super({ emitClose: true });
|
||||
socket.on('close', () => this.destroy());
|
||||
socket.on('data', (data) => this.push(data));
|
||||
this.socket = socket;
|
||||
}
|
||||
|
||||
_write(data, encoding, callback) {
|
||||
this.socket.write(data, encoding, callback);
|
||||
}
|
||||
|
||||
_read(size) {
|
||||
}
|
||||
|
||||
_final(cb) {
|
||||
cb();
|
||||
}
|
||||
}
|
||||
|
||||
server.listen(0, common.mustCall(function() {
|
||||
const socket = tls.connect({
|
||||
rejectUnauthorized: false,
|
||||
host: 'localhost',
|
||||
port: this.address().port,
|
||||
ALPNProtocols: ['h2']
|
||||
}, () => {
|
||||
const proxy = new JSSocket(socket);
|
||||
const client = h2.connect(`https://127.0.0.1:${this.address().port}`, {
|
||||
createConnection: () => proxy
|
||||
});
|
||||
const req = client.request();
|
||||
|
||||
server.on('request', () => {
|
||||
socket.destroy();
|
||||
});
|
||||
|
||||
req.on('close', common.mustCall(() => {
|
||||
client.close();
|
||||
server.close();
|
||||
}));
|
||||
});
|
||||
}));
|
||||
@@ -0,0 +1,28 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
if (!common.hasCrypto)
|
||||
common.skip('missing crypto');
|
||||
const http2 = require('http2');
|
||||
|
||||
let client;
|
||||
let req;
|
||||
const server = http2.createServer();
|
||||
server.on('stream', common.mustCall((stream) => {
|
||||
stream.on('error', common.mustCall(() => {
|
||||
client.close();
|
||||
stream.on('close', common.mustCall(() => {
|
||||
server.close();
|
||||
}));
|
||||
}));
|
||||
|
||||
req.close(2);
|
||||
}));
|
||||
server.listen(0, common.mustCall(() => {
|
||||
client = http2.connect(`http://127.0.0.1:${server.address().port}`);
|
||||
req = client.request();
|
||||
req.resume();
|
||||
req.on('error', common.mustCall(() => {
|
||||
req.on('close', common.mustCall());
|
||||
}));
|
||||
}));
|
||||
@@ -73,7 +73,7 @@ describe("Call propagation", () => {
|
||||
proxyServer.forceShutdown();
|
||||
});
|
||||
describe("Cancellation", () => {
|
||||
it.todo("should work with unary requests", done => {
|
||||
it("should work with unary requests", done => {
|
||||
done = multiDone(done, 2);
|
||||
// eslint-disable-next-line prefer-const
|
||||
let call: grpc.ClientUnaryCall;
|
||||
@@ -119,7 +119,7 @@ describe("Call propagation", () => {
|
||||
done();
|
||||
});
|
||||
});
|
||||
it.todo("Should work with server streaming requests", done => {
|
||||
it("Should work with server streaming requests", done => {
|
||||
done = multiDone(done, 2);
|
||||
// eslint-disable-next-line prefer-const
|
||||
let call: grpc.ClientReadableStream<unknown>;
|
||||
|
||||
Reference in New Issue
Block a user