Compare commits

...

1 Commits

Author SHA1 Message Date
cirospaciari
748eb45476 estimated size 2024-10-17 22:33:23 -07:00
5 changed files with 89 additions and 36 deletions

View File

@@ -734,6 +734,8 @@ pub const H2FrameParser = struct {
autouncork_registered: bool = false,
has_nonnative_backpressure: bool = false,
ref_count: u8 = 1,
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
reported_estimated_size: usize = 0,
threadlocal var shared_request_buffer: [16384]u8 = undefined;
/// The streams hashmap may mutate when growing we use this when we need to make sure its safe to iterate over it
@@ -919,6 +921,7 @@ pub const H2FrameParser = struct {
.max => return @min(maxLen - frameLen, 255),
}
}
pub fn flushQueue(this: *Stream, client: *H2FrameParser, written: *usize) FlushState {
if (this.canSendData()) {
// flush one frame
@@ -943,6 +946,7 @@ pub const H2FrameParser = struct {
} else {
// flush with some payload
client.queuedDataSize -= frame.len;
const padding = this.getPadding(frame.len, MAX_PAYLOAD_SIZE_WITHOUT_FRAME - 1);
const payload_size = frame.len + (if (padding != 0) padding + 1 else 0);
var flags: u8 = if (frame.end_stream and !this.waitForTrailers) @intFromEnum(DataFrameFlags.END_STREAM) else 0;
@@ -1051,12 +1055,13 @@ pub const H2FrameParser = struct {
};
if (bytes.len > 0) {
@memcpy(frame.buffer[0..bytes.len], bytes);
client.globalThis.vm().reportExtraMemory(bytes.len);
}
log("dataFrame enqueued {}", .{frame.len});
this.dataFrameQueue.enqueue(frame, client.allocator);
client.outboundQueueSize += 1;
client.queuedDataSize += frame.len;
client.calculateEstimatedByteSize();
}
pub fn init(streamIdentifier: u32, initialWindowSize: u32) Stream {
@@ -1132,6 +1137,8 @@ pub const H2FrameParser = struct {
frame.deinit(client.allocator);
client.outboundQueueSize -= 1;
}
client.calculateEstimatedByteSize();
}
/// this can be called multiple times
pub fn freeResources(this: *Stream, client: *H2FrameParser, comptime finalizing: bool) void {
@@ -1473,6 +1480,7 @@ pub const H2FrameParser = struct {
this.writeBuffer.len = MAX_BUFFER_SIZE;
this.writeBuffer.shrinkAndFree(this.allocator, MAX_BUFFER_SIZE);
this.writeBuffer.clearRetainingCapacity();
this.calculateEstimatedByteSize();
}
log("_genericFlush {}", .{buffer.len});
} else {
@@ -1483,6 +1491,8 @@ pub const H2FrameParser = struct {
pub fn _genericWrite(this: *H2FrameParser, comptime T: type, socket: T, bytes: []const u8) bool {
log("_genericWrite {}", .{bytes.len});
defer this.calculateEstimatedByteSize();
const buffer = this.writeBuffer.slice()[this.writeBufferOffset..];
if (buffer.len > 0) {
@@ -1494,7 +1504,6 @@ pub const H2FrameParser = struct {
// we still have more to buffer and even more now
_ = this.writeBuffer.write(this.allocator, bytes) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(bytes.len);
log("_genericWrite flushed {} and buffered more {}", .{ written, bytes.len });
return false;
@@ -1510,7 +1519,6 @@ pub const H2FrameParser = struct {
const pending = bytes[written..];
// ops not all data was sent, lets buffer again
_ = this.writeBuffer.write(this.allocator, pending) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(pending.len);
log("_genericWrite buffered more {}", .{pending.len});
return false;
@@ -1521,6 +1529,7 @@ pub const H2FrameParser = struct {
this.writeBuffer.len = MAX_BUFFER_SIZE;
this.writeBuffer.shrinkAndFree(this.allocator, MAX_BUFFER_SIZE);
this.writeBuffer.clearRetainingCapacity();
this.calculateEstimatedByteSize();
}
return true;
}
@@ -1530,7 +1539,6 @@ pub const H2FrameParser = struct {
const pending = bytes[written..];
// ops not all data was sent, lets buffer again
_ = this.writeBuffer.write(this.allocator, pending) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(pending.len);
return false;
}
@@ -1540,6 +1548,8 @@ pub const H2FrameParser = struct {
fn flushStreamQueue(this: *H2FrameParser) usize {
log("flushStreamQueue {}", .{this.outboundQueueSize});
var written: usize = 0;
defer this.calculateEstimatedByteSize();
// try to send as much as we can until we reach backpressure
while (this.outboundQueueSize > 0) {
var it = StreamResumableIterator.init(this);
@@ -1575,6 +1585,7 @@ pub const H2FrameParser = struct {
this.writeBuffer.len = MAX_BUFFER_SIZE;
this.writeBuffer.shrinkAndFree(this.allocator, MAX_BUFFER_SIZE);
this.writeBuffer.clearRetainingCapacity();
this.calculateEstimatedByteSize();
}
}
const output_value = this.handlers.binary_type.toJS(bytes, this.handlers.globalObject);
@@ -1602,10 +1613,11 @@ pub const H2FrameParser = struct {
.tls_writeonly, .tls => |socket| this._genericWrite(*TLSSocket, socket, bytes),
.tcp_writeonly, .tcp => |socket| this._genericWrite(*TCPSocket, socket, bytes),
else => {
defer this.calculateEstimatedByteSize();
if (this.has_nonnative_backpressure) {
// we should not invoke JS when we have backpressure is cheaper to keep it queued here
_ = this.writeBuffer.write(this.allocator, bytes) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(bytes.len);
return false;
}
@@ -1617,7 +1629,6 @@ pub const H2FrameParser = struct {
-1 => {
// dropped
_ = this.writeBuffer.write(this.allocator, bytes) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(bytes.len);
this.has_nonnative_backpressure = true;
},
0 => {
@@ -1648,6 +1659,7 @@ pub const H2FrameParser = struct {
if (bytes.len > 0) {
_ = this._write(bytes);
}
CORKED_H2 = null;
}
}
}
@@ -1704,7 +1716,7 @@ pub const H2FrameParser = struct {
if (this.remainingLength > 0) {
// buffer more data
_ = this.readBuffer.appendSlice(payload) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(payload.len);
this.calculateEstimatedByteSize();
return null;
} else if (this.remainingLength < 0) {
@@ -1717,7 +1729,8 @@ pub const H2FrameParser = struct {
if (this.readBuffer.list.items.len > 0) {
// return buffered data
_ = this.readBuffer.appendSlice(payload) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(payload.len);
this.calculateEstimatedByteSize();
return .{
.data = this.readBuffer.list.items,
@@ -2198,6 +2211,7 @@ pub const H2FrameParser = struct {
const settings = this.remoteSettings orelse this.localSettings;
const entry = this.streams.getOrPut(streamIdentifier) catch bun.outOfMemory();
entry.value_ptr.* = Stream.init(streamIdentifier, settings.initialWindowSize);
this.calculateEstimatedByteSize();
const ctx_value = this.strong_ctx.get() orelse return entry.value_ptr;
const callback = this.handlers.onStreamStart;
if (callback != .zero) {
@@ -2256,7 +2270,8 @@ pub const H2FrameParser = struct {
if (total < FrameHeader.byteSize) {
// buffer more data
_ = this.readBuffer.appendSlice(bytes) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(bytes.len);
this.calculateEstimatedByteSize();
return bytes.len;
}
@@ -2296,7 +2311,7 @@ pub const H2FrameParser = struct {
if (bytes.len < FrameHeader.byteSize) {
// buffer more dheaderata
this.readBuffer.appendSlice(bytes) catch bun.outOfMemory();
this.globalThis.vm().reportExtraMemory(bytes.len);
this.calculateEstimatedByteSize();
return bytes.len;
}
@@ -3111,6 +3126,27 @@ pub const H2FrameParser = struct {
this.dispatchWithExtra(.onStreamEnd, identifier, JSC.JSValue.jsNumber(@intFromEnum(stream.state)));
return .undefined;
}
fn getNextStreamID(this: *H2FrameParser) u32 {
var stream_id: u32 = this.lastStreamID;
if (this.isServer) {
if (stream_id % 2 == 0) {
stream_id += 2;
} else {
stream_id += 1;
}
} else {
if (stream_id % 2 == 0) {
stream_id += 1;
} else if (stream_id == 0) {
stream_id = 1;
} else {
stream_id += 2;
}
}
return stream_id;
}
pub fn writeStream(this: *H2FrameParser, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) JSValue {
JSC.markBinding(@src());
const args = callframe.argumentsUndef(5);
@@ -3168,26 +3204,6 @@ pub const H2FrameParser = struct {
return JSC.JSValue.jsBoolean(true);
}
fn getNextStreamID(this: *H2FrameParser) u32 {
var stream_id: u32 = this.lastStreamID;
if (this.isServer) {
if (stream_id % 2 == 0) {
stream_id += 2;
} else {
stream_id += 1;
}
} else {
if (stream_id % 2 == 0) {
stream_id += 1;
} else if (stream_id == 0) {
stream_id = 1;
} else {
stream_id += 2;
}
}
return stream_id;
}
pub fn hasNativeRead(this: *H2FrameParser, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSValue {
return JSC.JSValue.jsBoolean(this.native_socket == .tcp or this.native_socket == .tls);
}
@@ -3817,6 +3833,7 @@ pub const H2FrameParser = struct {
});
}
};
this.calculateEstimatedByteSize();
// check if socket is provided, and if it is a valid native socket
if (options.get(globalObject, "native")) |socket_js| {
if (JSTLSSocket.fromJS(socket_js)) |socket| {
@@ -3894,14 +3911,17 @@ pub const H2FrameParser = struct {
}
return this;
}
pub fn detachFromJS(this: *H2FrameParser, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) JSValue {
pub fn detachFromJS(this: *H2FrameParser, _: *JSC.JSGlobalObject, _: *JSC.CallFrame, this_value: JSC.JSValue) JSValue {
JSC.markBinding(@src());
this.detach(false);
_ = H2FrameParser.dangerouslySetPtr(this_value, null);
this.deref();
return .undefined;
}
/// be careful when calling detach be sure that the socket is closed and the parser not accesible anymore
/// this function can be called multiple times, it will erase stream info
pub fn detach(this: *H2FrameParser, comptime finalizing: bool) void {
log("detach {s}", .{ if(this.isServer) "server" else "client" });
this.flushCorked();
this.detachNativeSocket();
this.strong_ctx.deinit();
@@ -3922,8 +3942,12 @@ pub const H2FrameParser = struct {
stream.freeResources(this, finalizing);
}
var streams = this.streams;
defer streams.deinit();
this.streams = bun.U32HashMap(Stream).init(bun.default_allocator);
streams.deinit();
this.has_pending_activity.store(false, .release);
this.calculateEstimatedByteSize();
}
pub fn deinit(this: *H2FrameParser) void {
@@ -3939,10 +3963,28 @@ pub const H2FrameParser = struct {
this.detach(true);
}
pub fn estimatedSize(this: ?*H2FrameParser) callconv(.C) usize {
if(this == null) {
return 0;
}
return this.?.reported_estimated_size;
}
pub fn calculateEstimatedByteSize(this: *H2FrameParser) void {
this.reported_estimated_size = @sizeOf(H2FrameParser) + this.writeBuffer.len + this.queuedDataSize + (this.streams.capacity() * @sizeOf(Stream));
}
pub fn hasPendingActivity(this: ?*H2FrameParser) callconv(.C) bool {
@fence(.acquire);
if(this == null) {
return false;
}
return this.?.has_pending_activity.load(.acquire);
}
pub fn finalize(
this: *H2FrameParser,
) void {
log("finalize", .{});
log("finalize {s}", .{ if(this.isServer) "server" else "client" });
this.deref();
}
};

View File

@@ -4,6 +4,11 @@ export default [
define({
name: "H2FrameParser",
JSType: "0b11101110",
finalize: true,
construct: true,
hasPendingActivity: true,
estimatedSize: true,
klass: {},
proto: {
request: {
fn: "request",
@@ -40,6 +45,7 @@ export default [
detach: {
fn: "detachFromJS",
length: 0,
passThis: true,
},
rstStream: {
fn: "rstStream",
@@ -106,8 +112,5 @@ export default [
length: 0,
},
},
finalize: true,
construct: true,
klass: {},
}),
];

View File

@@ -17,6 +17,7 @@ WTF_MAKE_ISO_ALLOCATED_IMPL(StrongRef);
extern "C" void Bun__StrongRef__delete(Bun::StrongRef* strongRef)
{
strongRef->m_cell.clear();
delete strongRef;
}

View File

@@ -28,6 +28,11 @@ public:
{
}
~StrongRef()
{
m_cell.clear();
}
JSC::Strong<JSC::Unknown> m_cell;
};

View File

@@ -2388,9 +2388,11 @@ class ServerHttp2Session extends Http2Session {
this.#parser = null;
}
this.close();
this[bunHTTP2Socket] = null;
}
#onError(error: Error) {
this[bunHTTP2Socket] = null;
this.destroy(error);
}