This commit is contained in:
Ciro Spaciari
2025-05-16 21:45:19 -07:00
parent d3ba69ba47
commit 43cff4bc42

View File

@@ -113,7 +113,10 @@ const UInt31WithReserved = packed struct(u32) {
uint31: u31 = 0,
pub fn from(value: u32) UInt31WithReserved {
return @bitCast(value);
return .{
.reserved = false,
.uint31 = @truncate(value),
};
}
pub fn toUInt32(value: UInt31WithReserved) u32 {
@@ -966,11 +969,12 @@ pub const H2FrameParser = struct {
break :brk dataHeader.write(@TypeOf(writer), writer);
} else {
const frame_slice = frame.slice();
const max_size = @min(frame_slice.len, this.remoteWindowSize - this.remoteUsedWindowSize, this.remoteWindowSize - this.remoteUsedWindowSize);
const max_size = @min(@min(frame_slice.len, this.remoteWindowSize - this.remoteUsedWindowSize, client.remoteWindowSize - client.remoteUsedWindowSize), MAX_PAYLOAD_SIZE_WITHOUT_FRAME);
if (max_size == 0) {
is_flow_control_limited = true;
log("dataFrame flow control limited", .{});
// we are flow control limited
return .no_action;
// we are flow control limited lets return backpressure if is limited in the connection so we short circuit the flush
return if (client.remoteWindowSize == client.remoteUsedWindowSize) .backpressure else .no_action;
}
if (max_size < frame_slice.len) {
is_flow_control_limited = true;
@@ -979,6 +983,8 @@ pub const H2FrameParser = struct {
const able_to_send = frame_slice[0..max_size];
client.queuedDataSize -= able_to_send.len;
written.* += frame_slice.len;
this.remoteUsedWindowSize += able_to_send.len;
client.remoteUsedWindowSize += able_to_send.len;
log("dataFrame partial flushed {} {}", .{ able_to_send.len, frame.end_stream });
@@ -1008,6 +1014,8 @@ pub const H2FrameParser = struct {
// flush with some payload
client.queuedDataSize -= frame_slice.len;
written.* += frame_slice.len;
this.remoteUsedWindowSize += frame_slice.len;
client.remoteUsedWindowSize += frame_slice.len;
log("dataFrame flushed {} {}", .{ frame_slice.len, frame.end_stream });
const padding = this.getPadding(frame_slice.len, MAX_PAYLOAD_SIZE_WITHOUT_FRAME - 1);
@@ -1235,17 +1243,20 @@ pub const H2FrameParser = struct {
}
fn incrementWindowSizeIfNeeded(this: *H2FrameParser) void {
if (this.usedWindowSize == this.windowSize) {
this.windowSize += WINDOW_INCREMENT_SIZE;
this.sendWindowUpdate(0, UInt31WithReserved.from(WINDOW_INCREMENT_SIZE));
}
var total_increment: u32 = 0;
var it = this.streams.valueIterator();
while (it.next()) |stream| {
if (stream.usedWindowSize == stream.windowSize) {
stream.windowSize += WINDOW_INCREMENT_SIZE;
this.sendWindowUpdate(stream.id, UInt31WithReserved.from(WINDOW_INCREMENT_SIZE));
total_increment += 1;
}
}
if (this.usedWindowSize == this.windowSize and total_increment > 0) {
this.windowSize += WINDOW_INCREMENT_SIZE * total_increment; // we will need at least this many increments to send all the streams
this.sendWindowUpdate(0, UInt31WithReserved.from(WINDOW_INCREMENT_SIZE * total_increment));
}
}
pub fn setSettings(this: *H2FrameParser, settings: FullSettingsPayload) bool {
@@ -1625,14 +1636,18 @@ pub const H2FrameParser = struct {
fn flushStreamQueue(this: *H2FrameParser) usize {
log("flushStreamQueue {}", .{this.outboundQueueSize});
var written: usize = 0;
// try to send as much as we can until we reach backpressure
while (this.outboundQueueSize > 0) {
var something_was_flushed = true;
// try to send as much as we can until we reach backpressure or until we can't flush anymore
while (this.outboundQueueSize > 0 and something_was_flushed) {
var it = StreamResumableIterator.init(this);
something_was_flushed = false;
while (it.next()) |stream| {
// reach backpressure
const result = stream.flushQueue(this, &written);
switch (result) {
.flushed, .no_action => continue, // we can continue
.flushed => something_was_flushed = true,
.no_action => continue, // we can continue
.backpressure => return written, // backpressure we need to return
}
}
@@ -3202,7 +3217,7 @@ pub const H2FrameParser = struct {
while (offset < payload.len) {
// max frame size will always be at least 16384 (but we need to respect the flow control)
var max_size = @min(@min(MAX_PAYLOAD_SIZE_WITHOUT_FRAME, this.remoteWindowSize - stream.remoteUsedWindowSize), stream.remoteWindowSize - stream.remoteUsedWindowSize);
var max_size = @min(@min(MAX_PAYLOAD_SIZE_WITHOUT_FRAME, this.remoteWindowSize - this.remoteUsedWindowSize), stream.remoteWindowSize - stream.remoteUsedWindowSize);
var is_flow_control_limited = false;
if (max_size == 0) {
is_flow_control_limited = true;
@@ -3211,6 +3226,7 @@ pub const H2FrameParser = struct {
}
const size = @min(payload.len - offset, max_size);
defer if (!is_flow_control_limited) {
log("remoteUsedWindowSize += {} {} {}", .{ size, stream.remoteUsedWindowSize, this.remoteUsedWindowSize });
stream.remoteUsedWindowSize += size;
this.remoteUsedWindowSize += size;
};