Work on HTTP/2 implementation

Added partial HTTP/2 support to fetch(). The implementation sends requests successfully but responses don't complete properly yet.

Changes:
- Added callback invocation when HTTP/2 streams end
- Fixed state transitions to set all required fields to .done
- Added CONTINUATION frame support for multi-frame headers
- Enhanced HPACK error reporting with specific error types
- Fixed header block accumulation logic

Current issues:
- HPACK decoder fails with certain header encodings from Node.js servers
- No flow control (WINDOW_UPDATE frames) yet
- No decompression support for HTTP/2 streams

The code compiles but HTTP/2 requests still don't work end-to-end. More work needed on HPACK compatibility.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Claude
2025-08-07 01:02:26 +02:00
parent 0f99389d8b
commit ac74fdd74c
5 changed files with 565 additions and 7 deletions

View File

@@ -30,6 +30,18 @@ pub const HPACK = extern struct {
pub fn decode(self: *HPACK, src: []const u8) !DecodeResult {
var header: lshpack_header = .{};
const offset = lshpack_wrapper_decode(self, src.ptr, src.len, &header);
// Check for error codes returned by the C++ wrapper
if (offset >= std.math.maxInt(usize) - 10) {
const error_code = std.math.maxInt(usize) - offset;
return switch (error_code) {
1 => error.BadHPACKData, // LSHPACK_ERR_BAD_DATA
2 => error.HPACKHeaderTooLarge, // LSHPACK_ERR_TOO_LARGE
3 => error.HPACKNeedMoreBuffer, // LSHPACK_ERR_MORE_BUF
else => error.UnableToDecode,
};
}
if (offset == 0) return error.UnableToDecode;
if (header.name_len == 0) return error.EmptyHeaderName;
@@ -63,4 +75,5 @@ extern fn lshpack_wrapper_decode(self: *HPACK, src: [*]const u8, src_len: usize,
extern fn lshpack_wrapper_encode(self: *HPACK, name: [*]const u8, name_len: usize, value: [*]const u8, value_len: usize, never_index: c_int, buffer: [*]u8, buffer_len: usize, buffer_offset: usize) usize;
const bun = @import("bun");
const std = @import("std");
const mimalloc = bun.mimalloc;

View File

@@ -311,8 +311,15 @@ size_t lshpack_wrapper_decode(lshpack_wrapper* self,
const unsigned char* s = src;
auto rc = lshpack_dec_decode(&self->dec, &s, s + src_len, &hdr);
if (rc != 0)
return 0;
if (rc != 0) {
// For now, handle specific errors more gracefully
// LSHPACK_ERR_BAD_DATA (-1), LSHPACK_ERR_TOO_LARGE (-2), LSHPACK_ERR_MORE_BUF (-3)
// Return a special error value that encodes the lshpack error
// We use SIZE_MAX to indicate errors, with the low bits containing the error code
// This allows the Zig code to distinguish between different error types
return SIZE_MAX - (size_t)(-rc); // Convert negative error to positive offset from SIZE_MAX
}
output->name = lsxpack_header_get_name(&hdr);
output->name_len = hdr.name_len;

View File

@@ -536,6 +536,10 @@ protocol: HTTPProtocol = .unspecified,
http2_hpack_decoder: ?*@import("bun.js/api/bun/lshpack.zig").HPACK = null,
http2_next_stream_id: u32 = 1,
http2_settings_acked: bool = false,
// State for accumulating HPACK header blocks across frames
http2_header_block_buffer: ?[]u8 = null,
http2_header_block_len: usize = 0,
http2_expecting_continuation: bool = false,
pub fn deinit(this: *HTTPClient) void {
if (this.redirect.len > 0) {
@@ -564,12 +568,20 @@ pub fn deinit(this: *HTTPClient) void {
decoder.deinit();
this.http2_hpack_decoder = null;
}
// Clean up HTTP/2 header block buffer
if (this.http2_header_block_buffer) |buffer| {
bun.default_allocator.free(buffer);
this.http2_header_block_buffer = null;
}
}
pub fn upgradeToHTTP2(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) !void {
// This method handles the transition from HTTP/1.1 to HTTP/2 client
// when ALPN negotiation indicates HTTP/2 support
log("upgradeToHTTP2 called, should_use_http2={}, http2_attempted={}", .{this.should_use_http2, this.http2_attempted});
if (!this.should_use_http2) {
return error.HTTP2NotNegotiated;
}
@@ -583,6 +595,7 @@ pub fn upgradeToHTTP2(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPC
// Set HTTP/2 flag in the internal state
this.state.flags.is_http2 = true;
log("Set is_http2 flag to true", .{});
// Initialize HPACK decoder
const lshpack = @import("bun.js/api/bun/lshpack.zig");
@@ -711,9 +724,18 @@ pub fn handleHTTP2Data(
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
_ = ctx;
log("handleHTTP2Data: {} bytes", .{incoming_data.len});
// Debug: print first few bytes to see what we're receiving
if (incoming_data.len > 0) {
const preview_len = @min(incoming_data.len, 20);
var hex_buf: [60]u8 = undefined;
var hex_pos: usize = 0;
for (incoming_data[0..preview_len]) |byte| {
const chars = std.fmt.bufPrint(hex_buf[hex_pos..], "{x:0>2} ", .{byte}) catch break;
hex_pos += chars.len;
}
log(" First bytes (hex): {s}", .{hex_buf[0..hex_pos]});
}
// Parse HTTP/2 frames
var data = incoming_data;
@@ -746,7 +768,71 @@ pub fn handleHTTP2Data(
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
// TODO: Handle data frame properly
// Parse DATA frame flags
const end_stream = (frame_flags & 0x01) != 0;
const padded = (frame_flags & 0x08) != 0;
log(" DATA flags: end_stream={}, padded={}", .{end_stream, padded});
var data_payload = frame_payload;
var padding_length: u8 = 0;
// Handle padding
if (padded) {
if (data_payload.len < 1) {
log("Protocol error: PADDED flag set but no padding length", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
padding_length = data_payload[0];
data_payload = data_payload[1..];
if (padding_length >= data_payload.len) {
log("Protocol error: Invalid padding length", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
data_payload = data_payload[0..data_payload.len - padding_length];
}
log(" DATA payload: {} bytes", .{data_payload.len});
// Append data to response buffer
if (data_payload.len > 0) {
// Process the body data similar to HTTP/1.1
const report_progress = this.handleResponseBody(data_payload, false) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
if (report_progress) {
this.progressUpdate(is_ssl, ctx, socket);
}
}
// If END_STREAM is set, the response is complete
if (end_stream) {
log(" END_STREAM flag set, response complete", .{});
this.state.flags.http2_stream_ended = true;
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
// Finalize the response
if (this.state.pending_response) |*resp| {
_ = this.handleResponseMetadata(resp) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
}
// Call the result callback to notify fetch that the response is complete
const callback = this.result_callback;
const result = this.toResult();
callback.run(@fieldParentPtr("client", this), result);
return;
}
},
0x01 => { // HEADERS frame
log("Received HEADERS frame on stream {}", .{stream_id});
@@ -755,7 +841,202 @@ pub fn handleHTTP2Data(
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
// TODO: Handle headers frame with HPACK decompression
// Parse HEADERS frame flags
const end_stream = (frame_flags & 0x01) != 0;
const end_headers = (frame_flags & 0x04) != 0;
const padded = (frame_flags & 0x08) != 0;
const priority = (frame_flags & 0x20) != 0;
log(" HEADERS flags: end_stream={}, end_headers={}, padded={}, priority={}",
.{end_stream, end_headers, padded, priority});
log(" Frame payload length: {}", .{frame_payload.len});
var headers_data = frame_payload;
var padding_length: u8 = 0;
// Handle padding
if (padded) {
if (headers_data.len < 1) {
log("Protocol error: PADDED flag set but no padding length", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
padding_length = headers_data[0];
headers_data = headers_data[1..];
if (padding_length >= headers_data.len) {
log("Protocol error: Invalid padding length", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
headers_data = headers_data[0..headers_data.len - padding_length];
}
// Handle priority
if (priority) {
if (headers_data.len < 5) {
log("Protocol error: PRIORITY flag set but insufficient data", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
// Skip priority data (5 bytes)
headers_data = headers_data[5..];
}
log(" Headers data length after processing: {}", .{headers_data.len});
// If END_HEADERS is set, we have the complete header block
if (end_headers) {
// Decode headers directly if we don't have accumulated data
const headers_to_decode = if (this.http2_header_block_len > 0) blk: {
// We have accumulated data from previous frames, add this and decode all
this.accumulateHeaderBlock(headers_data) catch |err| {
log("Failed to accumulate final header block: {}", .{err});
this.closeAndFail(err, is_ssl, socket);
return;
};
break :blk this.getCompleteHeaderBlock();
} else headers_data; // First and only HEADERS frame
log(" Complete header block length: {}", .{headers_to_decode.len});
const decode_result = this.decodeHTTP2Headers(headers_to_decode) catch |err| {
log("Failed to decode HTTP/2 headers: {}", .{err});
this.clearHeaderBlock();
this.closeAndFail(err, is_ssl, socket);
return;
};
// Clear the accumulated header block
this.clearHeaderBlock();
// Update response with headers
if (decode_result.status_code) |code| {
// Build response object with headers
// Create status text buffer
var status_text_buf: [32]u8 = undefined;
const status_text = std.fmt.bufPrint(&status_text_buf, "{}", .{code}) catch "200";
this.state.pending_response = picohttp.Response{
.status = status_text,
.status_code = code,
.headers = .{
.list = shared_response_headers_buf[0..decode_result.header_count],
},
};
// Mark headers complete if END_HEADERS is set
if (end_headers) {
this.state.response_stage = .body;
this.state.flags.http2_headers_complete = true;
// Call progressUpdate to notify about headers
this.progressUpdate(is_ssl, ctx, socket);
}
// If END_STREAM is also set, the response is complete
if (end_stream) {
this.state.flags.http2_stream_ended = true;
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
// Finalize the response
if (this.state.pending_response) |*resp| {
_ = this.handleResponseMetadata(resp) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
};
}
// Call the result callback to notify fetch that the response is complete
const callback = this.result_callback;
const result = this.toResult();
callback.run(@fieldParentPtr("client", this), result);
return;
}
}
} else {
// END_HEADERS is not set, accumulate and wait for CONTINUATION frames
this.accumulateHeaderBlock(headers_data) catch |err| {
log("Failed to accumulate header block: {}", .{err});
this.closeAndFail(err, is_ssl, socket);
return;
};
log(" Header block incomplete, waiting for CONTINUATION frames", .{});
this.http2_expecting_continuation = true;
}
},
0x09 => { // CONTINUATION frame
log("Received CONTINUATION frame on stream {}", .{stream_id});
if (stream_id == 0) {
log("Protocol error: CONTINUATION frame on stream 0", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
if (!this.http2_expecting_continuation) {
log("Protocol error: Unexpected CONTINUATION frame", .{});
this.closeAndFail(error.HTTP2ProtocolError, is_ssl, socket);
return;
}
// Parse CONTINUATION frame flags
const end_headers = (frame_flags & 0x04) != 0;
log(" CONTINUATION flags: end_headers={}", .{end_headers});
log(" Frame payload length: {}", .{frame_payload.len});
// Accumulate the continuation header data
this.accumulateHeaderBlock(frame_payload) catch |err| {
log("Failed to accumulate continuation header block: {}", .{err});
this.clearHeaderBlock();
this.closeAndFail(err, is_ssl, socket);
return;
};
// If this is the end of the header block, decode it
if (end_headers) {
log(" Header block complete with CONTINUATION frame", .{});
const complete_header_block = this.getCompleteHeaderBlock();
log(" Complete header block length: {}", .{complete_header_block.len});
const decode_result = this.decodeHTTP2Headers(complete_header_block) catch |err| {
log("Failed to decode HTTP/2 headers from CONTINUATION: {}", .{err});
this.clearHeaderBlock();
this.closeAndFail(err, is_ssl, socket);
return;
};
// Clear the accumulated header block
this.clearHeaderBlock();
// Update response with headers (similar to HEADERS frame)
if (decode_result.status_code) |code| {
// Build response object with headers
// Create status text buffer
var status_text_buf: [32]u8 = undefined;
const status_text = std.fmt.bufPrint(&status_text_buf, "{}", .{code}) catch "200";
this.state.pending_response = picohttp.Response{
.status = status_text,
.status_code = code,
.headers = .{
.list = shared_response_headers_buf[0..decode_result.header_count],
},
};
// Mark headers complete
this.state.response_stage = .body;
this.state.flags.http2_headers_complete = true;
// Call progressUpdate to notify about headers
this.progressUpdate(is_ssl, ctx, socket);
}
}
},
0x03 => { // RST_STREAM frame
log("Received RST_STREAM frame on stream {}", .{stream_id});
@@ -875,6 +1156,134 @@ pub fn handleHTTP2Data(
}
}
fn accumulateHeaderBlock(this: *HTTPClient, headers_data: []const u8) !void {
if (headers_data.len == 0) return;
if (this.http2_header_block_buffer == null) {
// Initialize buffer with some initial capacity
this.http2_header_block_buffer = try bun.default_allocator.alloc(u8, 8192);
this.http2_header_block_len = 0;
}
const buffer = this.http2_header_block_buffer.?;
const needed_size = this.http2_header_block_len + headers_data.len;
if (needed_size > buffer.len) {
// Resize buffer
const new_size = needed_size * 2; // Double the size to avoid frequent reallocations
const new_buffer = try bun.default_allocator.realloc(buffer, new_size);
this.http2_header_block_buffer = new_buffer;
}
// Append the new header data
const final_buffer = this.http2_header_block_buffer.?;
@memcpy(final_buffer[this.http2_header_block_len..this.http2_header_block_len + headers_data.len], headers_data);
this.http2_header_block_len += headers_data.len;
}
fn getCompleteHeaderBlock(this: *HTTPClient) []const u8 {
if (this.http2_header_block_buffer) |buffer| {
return buffer[0..this.http2_header_block_len];
}
return &.{};
}
fn clearHeaderBlock(this: *HTTPClient) void {
this.http2_header_block_len = 0;
this.http2_expecting_continuation = false;
}
const HTTP2HeaderDecodeResult = struct {
status_code: ?u16,
header_count: usize,
};
fn decodeHTTP2Headers(this: *HTTPClient, headers_data: []const u8) !HTTP2HeaderDecodeResult {
const hpack = this.http2_hpack_decoder orelse {
log("No HPACK decoder available", .{});
return error.NoHPACKDecoder;
};
// Initialize response headers if this is the first HEADERS frame
if (!this.state.flags.http2_headers_received) {
this.state.flags.http2_headers_received = true;
this.state.response_stage = .headers;
// Clear the shared response headers buffer
shared_response_headers_buf = undefined;
}
// Decode headers
var header_offset: usize = 0;
var header_count: usize = 0;
var status_code: ?u16 = null;
// Create temporary buffer for header strings
var header_buf: [8192]u8 = undefined;
var header_buf_used: usize = 0;
while (header_offset < headers_data.len and header_count < shared_response_headers_buf.len) {
// Debug: show what we're about to decode
if (header_offset == 0) {
const preview_len = @min(headers_data.len, 20);
var hex_buf: [60]u8 = undefined;
var hex_pos: usize = 0;
for (headers_data[0..preview_len]) |byte| {
const chars = std.fmt.bufPrint(hex_buf[hex_pos..], "{x:0>2} ", .{byte}) catch break;
hex_pos += chars.len;
}
log(" HPACK data to decode (hex): {s}", .{hex_buf[0..hex_pos]});
}
const result = hpack.decode(headers_data[header_offset..]) catch |err| {
log("HPACK decode error: {} (offset={}, remaining_len={})", .{err, header_offset, headers_data.len - header_offset});
// Print the problematic byte
if (header_offset < headers_data.len) {
log(" Error at byte: 0x{x:0>2}", .{headers_data[header_offset]});
}
return err;
};
const name = result.name;
const value = result.value;
const bytes_consumed = result.next;
log(" Decoded header: {s} = {s}", .{name, value});
// Handle pseudo-headers
if (strings.eqlComptime(name, ":status")) {
status_code = std.fmt.parseInt(u16, value, 10) catch 200;
log(" Response status: {?}", .{status_code});
} else if (!strings.hasPrefixComptime(name, ":")) {
// Regular header - add to shared buffer
if (header_buf_used + name.len + value.len < header_buf.len) {
// Copy name to buffer
const name_start = header_buf_used;
@memcpy(header_buf[name_start..name_start + name.len], name);
header_buf_used += name.len;
// Copy value to buffer
const value_start = header_buf_used;
@memcpy(header_buf[value_start..value_start + value.len], value);
header_buf_used += value.len;
shared_response_headers_buf[header_count] = .{
.name = header_buf[name_start..name_start + name.len],
.value = header_buf[value_start..value_start + value.len],
};
header_count += 1;
}
}
header_offset += bytes_consumed;
}
return HTTP2HeaderDecodeResult{
.status_code = status_code,
.header_count = header_count,
};
}
pub fn processHTTP2Settings(this: *HTTPClient, payload: []const u8, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
_ = this;
_ = socket;
@@ -2102,7 +2511,7 @@ pub fn onData(
ctx: *NewHTTPContext(is_ssl),
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
log("onData {}", .{incoming_data.len});
bun.Output.prettyErrorln("[HTTPClient] onData {} bytes, is_http2={}, should_use_http2={}", .{incoming_data.len, this.state.flags.is_http2, this.should_use_http2});
if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
return;
@@ -2117,6 +2526,7 @@ pub fn onData(
// Handle HTTP/2 frames if negotiated
if (this.state.flags.is_http2) {
log("Routing to handleHTTP2Data", .{});
this.handleHTTP2Data(is_ssl, incoming_data, ctx, socket);
return;
}

59
test-http2-fetch.ts Normal file
View File

@@ -0,0 +1,59 @@
// Test HTTP/2 with node:http2 server and fetch() client
import * as http2 from "node:http2";
import * as fs from "node:fs";
// Create HTTP/2 server
const server = http2.createSecureServer({
allowHTTP1: false,
settings: {
enablePush: false,
},
key: fs.readFileSync("/home/claude/bun/test/js/bun/http/fixtures/cert.key"),
cert: fs.readFileSync("/home/claude/bun/test/js/bun/http/fixtures/cert.pem"),
});
server.on("error", (err) => console.error("Server error:", err));
server.on("stream", (stream, headers) => {
console.log("Server received headers:", headers);
stream.respond({
":status": 200,
"content-type": "application/json",
"x-test-header": "http2-works",
});
stream.end(JSON.stringify({
message: "Hello from HTTP/2 server",
method: headers[":method"],
path: headers[":path"],
protocol: "HTTP/2",
}));
});
server.listen(0, async () => {
const port = server.address().port;
console.log(`HTTP/2 server listening on port ${port}`);
try {
// Use fetch() to make request to HTTP/2 server
const response = await fetch(`https://localhost:${port}/test`, {
headers: {
"User-Agent": "Bun/fetch-test"
},
// @ts-ignore - Bun-specific option
tls: { rejectUnauthorized: false }
});
console.log("Fetch response status:", response.status);
console.log("Fetch response headers:", Object.fromEntries(response.headers.entries()));
const data = await response.json();
console.log("Fetch response data:", data);
} catch (err) {
console.error("Fetch error:", err);
} finally {
server.close();
}
});

69
test-http2.ts Normal file
View File

@@ -0,0 +1,69 @@
// Test HTTP/2 server using node:http2 module
import * as http2 from "node:http2";
import * as fs from "node:fs";
// Create a self-signed certificate for testing
const serverOptions = {
allowHTTP1: false,
settings: {
enablePush: false,
}
};
// Create HTTP/2 server
const server = http2.createSecureServer({
...serverOptions,
key: fs.readFileSync("/home/claude/bun/test/js/bun/http/fixtures/cert.key"),
cert: fs.readFileSync("/home/claude/bun/test/js/bun/http/fixtures/cert.pem"),
});
server.on("error", (err) => console.error("Server error:", err));
server.on("stream", (stream, headers) => {
console.log("Received request headers:", headers);
// Respond with HTTP/2
stream.respond({
":status": 200,
"content-type": "text/plain",
"x-custom-header": "test-value",
});
stream.write("Hello from HTTP/2 server!\n");
stream.write("Method: " + headers[":method"] + "\n");
stream.write("Path: " + headers[":path"] + "\n");
stream.end();
});
server.listen(0, () => {
const port = server.address().port;
console.log(`HTTP/2 server listening on port ${port}`);
// Create client to test
const client = http2.connect(`https://localhost:${port}`, {
rejectUnauthorized: false
});
const req = client.request({
":path": "/test",
":method": "GET",
});
req.on("response", (headers) => {
console.log("Client received response headers:", headers);
console.log("Status:", headers[":status"]);
});
let data = "";
req.on("data", (chunk) => {
data += chunk;
});
req.on("end", () => {
console.log("Client received data:", data);
client.close();
server.close();
});
req.end();
});