Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Jarred Sumner
2025-08-04 20:42:54 -07:00
parent 7ad3049e70
commit d1db2f469e
20 changed files with 2102 additions and 140 deletions

View File

@@ -20,22 +20,85 @@ if(NOT GIT_NAME)
set(GIT_NAME ${GIT_ORIGINAL_NAME})
endif()
set(GIT_DOWNLOAD_URL https://github.com/${GIT_REPOSITORY}/archive/${GIT_REF}.tar.gz)
# Special handling for repositories that need git submodules
if(GIT_NAME STREQUAL "lsquic")
message(STATUS "Using git clone with submodules for ${GIT_REPOSITORY} at ${GIT_REF}...")
find_program(GIT_PROGRAM git REQUIRED)
# Remove existing directory if it exists
if(EXISTS ${GIT_PATH})
file(REMOVE_RECURSE ${GIT_PATH})
endif()
# Clone the repository
execute_process(
COMMAND
${GIT_PROGRAM} clone https://github.com/${GIT_REPOSITORY}.git ${GIT_PATH}
ERROR_STRIP_TRAILING_WHITESPACE
ERROR_VARIABLE
GIT_ERROR
RESULT_VARIABLE
GIT_RESULT
)
if(NOT GIT_RESULT EQUAL 0)
message(FATAL_ERROR "Git clone failed: ${GIT_ERROR}")
endif()
# Checkout the specific commit/tag/branch
execute_process(
COMMAND
${GIT_PROGRAM} checkout ${GIT_REF}
WORKING_DIRECTORY
${GIT_PATH}
ERROR_STRIP_TRAILING_WHITESPACE
ERROR_VARIABLE
GIT_ERROR
RESULT_VARIABLE
GIT_RESULT
)
if(NOT GIT_RESULT EQUAL 0)
message(FATAL_ERROR "Git checkout failed: ${GIT_ERROR}")
endif()
# Initialize and update submodules
execute_process(
COMMAND
${GIT_PROGRAM} submodule update --init --recursive
WORKING_DIRECTORY
${GIT_PATH}
ERROR_STRIP_TRAILING_WHITESPACE
ERROR_VARIABLE
GIT_ERROR
RESULT_VARIABLE
GIT_RESULT
)
if(NOT GIT_RESULT EQUAL 0)
message(FATAL_ERROR "Git submodule init failed: ${GIT_ERROR}")
endif()
else()
# Use the original download method for other repositories
set(GIT_DOWNLOAD_URL https://github.com/${GIT_REPOSITORY}/archive/${GIT_REF}.tar.gz)
message(STATUS "Cloning ${GIT_REPOSITORY} at ${GIT_REF}...")
execute_process(
COMMAND
${CMAKE_COMMAND}
-DDOWNLOAD_URL=${GIT_DOWNLOAD_URL}
-DDOWNLOAD_PATH=${GIT_PATH}
-DDOWNLOAD_FILTERS=${GIT_FILTERS}
-P ${CMAKE_CURRENT_LIST_DIR}/DownloadUrl.cmake
ERROR_STRIP_TRAILING_WHITESPACE
ERROR_VARIABLE
GIT_ERROR
RESULT_VARIABLE
GIT_RESULT
)
message(STATUS "Cloning ${GIT_REPOSITORY} at ${GIT_REF}...")
execute_process(
COMMAND
${CMAKE_COMMAND}
-DDOWNLOAD_URL=${GIT_DOWNLOAD_URL}
-DDOWNLOAD_PATH=${GIT_PATH}
-DDOWNLOAD_FILTERS=${GIT_FILTERS}
-P ${CMAKE_CURRENT_LIST_DIR}/DownloadUrl.cmake
ERROR_STRIP_TRAILING_WHITESPACE
ERROR_VARIABLE
GIT_ERROR
RESULT_VARIABLE
GIT_RESULT
)
endif()
if(NOT GIT_RESULT EQUAL 0)
message(FATAL_ERROR "Clone failed: ${GIT_ERROR}")

View File

@@ -62,6 +62,7 @@ src/bun.js/api/bun/dns.zig
src/bun.js/api/bun/h2_frame_parser.zig
src/bun.js/api/bun/lshpack.zig
src/bun.js/api/bun/process.zig
src/bun.js/api/bun/quic_socket.zig
src/bun.js/api/bun/socket.zig
src/bun.js/api/bun/socket/Handlers.zig
src/bun.js/api/bun/socket/Listener.zig
@@ -501,6 +502,7 @@ src/deps/uws/ConnectingSocket.zig
src/deps/uws/InternalLoopData.zig
src/deps/uws/ListenSocket.zig
src/deps/uws/Loop.zig
src/deps/uws/quic.zig
src/deps/uws/Request.zig
src/deps/uws/Response.zig
src/deps/uws/socket.zig

View File

@@ -55,6 +55,7 @@ set(BUN_DEPENDENCIES
Mimalloc
TinyCC
Zlib
Lsquic # QUIC protocol support - depends on BoringSSL and Zlib
LibArchive # must be loaded after zlib
HdrHistogram # must be loaded after zlib
Zstd
@@ -834,6 +835,7 @@ target_compile_definitions(${bun} PRIVATE
_HAS_EXCEPTIONS=0
LIBUS_USE_OPENSSL=1
LIBUS_USE_BORINGSSL=1
LIBUS_USE_QUIC=1
WITH_BORINGSSL=1
STATICALLY_LINKED_WITH_JavaScriptCore=1
STATICALLY_LINKED_WITH_BMALLOC=1

View File

@@ -0,0 +1,31 @@
register_repository(
NAME
lsquic
REPOSITORY
litespeedtech/lsquic
COMMIT
70486141724f85e97b08f510673e29f399bbae8f
)
register_cmake_command(
TARGET
lsquic
LIBRARIES
lsquic
LIB_PATH
src/liblsquic
ARGS
-DSHARED=OFF
-DLSQUIC_SHARED_LIB=0
-DBORINGSSL_DIR=${VENDOR_PATH}/boringssl
-DBORINGSSL_LIB=${BUILD_PATH}/boringssl
-DZLIB_INCLUDE_DIR=${VENDOR_PATH}/zlib
-DZLIB_LIB=${BUILD_PATH}/zlib/libz.a
-DCMAKE_BUILD_TYPE=Release
INCLUDES
include
src/liblsquic
DEPENDS
BoringSSL
Zlib
)

View File

@@ -157,6 +157,9 @@ struct us_udp_packet_buffer_t *us_create_udp_packet_buffer();
struct us_udp_socket_t *us_create_udp_socket(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, int flags, int *err, void *user);
// Extended version for QUIC sockets that need extension data
struct us_udp_socket_t *us_create_udp_socket_with_ext(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, int flags, int *err, void *user, int ext_size);
void us_udp_socket_close(struct us_udp_socket_t *s);
int us_udp_socket_set_broadcast(struct us_udp_socket_t *s, int enabled);

View File

@@ -5,7 +5,7 @@
#include "quic.h"
#include "internal/internal.h"
#include "lsquic.h"
#include "lsquic_types.h"
@@ -15,6 +15,8 @@
#ifndef _WIN32
#include <netinet/in.h>
#include <errno.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#include <stdio.h>
@@ -23,6 +25,17 @@
void leave_all();
// Peer context structure for lsquic - contains UDP socket and other metadata
struct quic_peer_ctx {
struct us_udp_socket_t *udp_socket;
us_quic_socket_context_t *context;
void *reserved[16]; // Extra space to prevent buffer overflows
};
// Forward declarations for QUIC UDP callback functions
void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets);
void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets);
/*
struct sockaddr_in client_addr = {
AF_INET,
@@ -95,15 +108,25 @@ void us_quic_socket_context_on_stream_writable(us_quic_socket_context_t *context
void on_udp_socket_writable(struct us_udp_socket_t *s) {
/* Need context from socket here */
us_quic_socket_context_t *context = us_udp_socket_user(s);
if (!context) {
printf("ERROR: No context found in UDP socket\n");
return;
}
/* We just continue now */
lsquic_engine_send_unsent_packets(context->engine);
}
// Wrapper function to match uSockets UDP callback signature
void on_udp_socket_data_client_wrapper(struct us_udp_socket_t *s, void *buf, int packets) {
on_udp_socket_data_client(s, (struct us_udp_packet_buffer_t *)buf, packets);
}
// we need two differetn handlers to know to put it in client or servcer context
void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets) {
int fd = us_poll_fd((struct us_poll_t *) s);
// Remove unused fd variable
// int fd = us_poll_fd((struct us_poll_t *) s);
//printf("Reading on fd: %d\n", fd);
//printf("UDP (client) socket got data: %p\n", s);
@@ -113,21 +136,41 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b
// do we have udp socket contexts? or do we just have user data?
us_quic_socket_context_t *context = us_udp_socket_user(s);
if (!context) {
printf("ERROR: No context found in UDP client socket\n");
return;
}
if (!buf) {
printf("ERROR: Null packet buffer in UDP client handler\n");
return;
}
if (packets <= 0) {
return;
}
/* We just shove it to lsquic */
for (int i = 0; i < packets; i++) {
char *payload = us_udp_packet_buffer_payload(buf, i);
int length = us_udp_packet_buffer_payload_length(buf, i);
int ecn = us_udp_packet_buffer_ecn(buf, i);
// ECN not available in current uSockets API - remove this line
// int ecn = us_udp_packet_buffer_ecn(buf, i);
void *peer_addr = us_udp_packet_buffer_peer(buf, i);
// Validate packet data before processing
if (!payload || length <= 0 || length > 65536 || !peer_addr) {
printf("Invalid packet data: payload=%p, length=%d, peer_addr=%p\n", payload, length, peer_addr);
continue;
}
//printf("Reading UDP of size %d\n", length);
char ip[16];
int ip_length = us_udp_packet_buffer_local_ip(buf, i, ip);
if (!ip_length) {
printf("We got no ip on received packet!\n");
exit(0);
continue; // Don't exit, just skip this packet
}
//printf("Our received destination IP length is: %d\n", ip_length);
@@ -143,16 +186,25 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b
ipv6->sin6_family = AF_INET6;
ipv6->sin6_port = ntohs(port);
memcpy(ipv6->sin6_addr.s6_addr, ip, 16);
} else {
} else if (ip_length == 4) {
struct sockaddr_in *ipv4 = (struct sockaddr_in *) &local_addr;
ipv4->sin_family = AF_INET;
ipv4->sin_port = ntohs(port);
memcpy(&ipv4->sin_addr.s_addr, ip, 4);
} else {
printf("Invalid IP length: %d\n", ip_length);
continue;
}
if (!context->client_engine) {
printf("ERROR: Client engine is null\n");
continue;
}
int ret = lsquic_engine_packet_in(context->client_engine, payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) s, 0);
// Use the peer context from the UDP socket extension area
struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)s + sizeof(struct us_udp_socket_t));
lsquic_engine_packet_in(context->client_engine, (const unsigned char *)payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) peer_ctx, 0);
//printf("Engine returned: %d\n", ret);
@@ -162,6 +214,11 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b
}
// Wrapper function to match uSockets UDP callback signature
void on_udp_socket_data_wrapper(struct us_udp_socket_t *s, void *buf, int packets) {
on_udp_socket_data(s, (struct us_udp_packet_buffer_t *)buf, packets);
}
void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets) {
@@ -172,24 +229,46 @@ void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t
// do we have udp socket contexts? or do we just have user data?
us_quic_socket_context_t *context = us_udp_socket_user(s);
if (!context) {
printf("ERROR: No context found in UDP server socket\n");
return;
}
if (!buf) {
printf("ERROR: Null packet buffer in UDP server handler\n");
return;
}
if (packets <= 0) {
return;
}
// process conns now? to accept new connections?
lsquic_engine_process_conns(context->engine);
if (context->engine) {
lsquic_engine_process_conns(context->engine);
}
/* We just shove it to lsquic */
for (int i = 0; i < packets; i++) {
char *payload = us_udp_packet_buffer_payload(buf, i);
int length = us_udp_packet_buffer_payload_length(buf, i);
int ecn = us_udp_packet_buffer_ecn(buf, i);
// ECN not available in current uSockets API - remove this line
// int ecn = us_udp_packet_buffer_ecn(buf, i);
void *peer_addr = us_udp_packet_buffer_peer(buf, i);
// Validate packet data before processing
if (!payload || length <= 0 || length > 65536 || !peer_addr) {
printf("Invalid server packet data: payload=%p, length=%d, peer_addr=%p\n", payload, length, peer_addr);
continue;
}
//printf("Reading UDP of size %d\n", length);
char ip[16];
int ip_length = us_udp_packet_buffer_local_ip(buf, i, ip);
if (!ip_length) {
printf("We got no ip on received packet!\n");
exit(0);
continue; // Don't exit, just skip this packet
}
//printf("Our received destination IP length is: %d\n", ip_length);
@@ -205,17 +284,25 @@ void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t
ipv6->sin6_family = AF_INET6;
ipv6->sin6_port = ntohs(port);
memcpy(ipv6->sin6_addr.s6_addr, ip, 16);
} else {
} else if (ip_length == 4) {
struct sockaddr_in *ipv4 = (struct sockaddr_in *) &local_addr;
ipv4->sin_family = AF_INET;
ipv4->sin_port = ntohs(port);
memcpy(&ipv4->sin_addr.s_addr, ip, 4);
} else {
printf("Invalid server IP length: %d\n", ip_length);
continue;
}
if (!context->engine) {
printf("ERROR: Server engine is null\n");
continue;
}
int ret = lsquic_engine_packet_in(context->engine, payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) s, 0);
// Use the peer context from the UDP socket extension area
struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)s + sizeof(struct us_udp_socket_t));
lsquic_engine_packet_in(context->engine, (const unsigned char *)payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) peer_ctx, 0);
//printf("Engine returned: %d\n", ret);
@@ -240,72 +327,51 @@ struct mmsghdr {
/* Server and client packet out is identical */
int send_packets_out(void *ctx, const struct lsquic_out_spec *specs, unsigned n_specs) {
#ifndef _WIN32
us_quic_socket_context_t *context = ctx;
/* A run is at most UIO_MAXIOV datagrams long */
struct mmsghdr hdrs[UIO_MAXIOV];
int run_length = 0;
/* We assume that thiss whole cb will never be called with 0 specs */
struct us_udp_socket_t *last_socket = (struct us_udp_socket_t *) specs[0].peer_ctx;
// For now, send packets one by one using regular sendto
// TODO: Optimize with proper batch sending using uSockets API
int sent = 0;
for (int i = 0; i < n_specs; i++) {
/* Send this run if we need to */
if (run_length == UIO_MAXIOV || specs[i].peer_ctx != last_socket) {
int ret = bsd_sendmmsg(us_poll_fd((struct us_poll_t *) last_socket), hdrs, run_length, 0);
if (ret != run_length) {
if (ret == -1) {
printf("unhandled udp backpressure!\n");
return sent;
for (unsigned i = 0; i < n_specs; i++) {
struct us_udp_socket_t *socket = (struct us_udp_socket_t *) specs[i].peer_ctx;
int fd = us_poll_fd((struct us_poll_t *) socket);
// Combine all iovecs into a single buffer for simple sending
size_t total_len = 0;
for (int j = 0; j < specs[i].iovlen; j++) {
total_len += specs[i].iov[j].iov_len;
}
if (total_len > 0) {
// Simple approach: use sendto for each packet
// In a real implementation, we'd want to use the proper uSockets batch API
char buffer[2048]; // Maximum UDP payload size
if (total_len <= sizeof(buffer)) {
size_t offset = 0;
for (int j = 0; j < specs[i].iovlen; j++) {
memcpy(buffer + offset, specs[i].iov[j].iov_base, specs[i].iov[j].iov_len);
offset += specs[i].iov[j].iov_len;
}
ssize_t ret = sendto(fd, buffer, total_len, MSG_DONTWAIT,
specs[i].dest_sa,
(specs[i].dest_sa->sa_family == AF_INET) ?
sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
if (ret > 0) {
sent++;
} else {
printf("unhandled udp backpressure!\n");
errno = EAGAIN;
return sent + ret;
// Handle backpressure
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return sent;
}
return -1;
}
}
sent += ret;
run_length = 0;
last_socket = specs[i].peer_ctx;
//printf("different socket breask run!\n");
}
/* Continue existing run or start a new one */
//memset(&hdrs[i].msg_hdr, 0, sizeof(hdrs[i].msg_hdr));
memset(&hdrs[run_length], 0, sizeof(hdrs[run_length]));
hdrs[run_length].msg_hdr.msg_name = (void *) specs[i].dest_sa;
hdrs[run_length].msg_hdr.msg_namelen = (AF_INET == specs[i].dest_sa->sa_family ?
sizeof(struct sockaddr_in) :
sizeof(struct sockaddr_in6)),
hdrs[run_length].msg_hdr.msg_iov = specs[i].iov;
hdrs[run_length].msg_hdr.msg_iovlen = specs[i].iovlen;
hdrs[run_length].msg_hdr.msg_flags = 0;
run_length++;
}
/* Send last run */
if (run_length) {
int ret = bsd_sendmmsg(us_poll_fd((struct us_poll_t *) last_socket), hdrs, run_length, 0);
if (ret == -1) {
printf("backpressure! A\n");
return sent;
}
if (sent + ret != n_specs) {
printf("backpressure! B\n");
printf("errno is: %d\n", errno);
errno = EAGAIN;
}
//printf("Returning %d of %d\n", sent + ret, n_specs);
return sent + ret;
}
//printf("Returning %d\n", n_specs);
#endif
return sent;
#else
// Windows implementation would go here
return n_specs;
#endif
}
lsquic_conn_ctx_t *on_new_conn(void *stream_if_ctx, lsquic_conn_t *c) {
@@ -313,31 +379,57 @@ lsquic_conn_ctx_t *on_new_conn(void *stream_if_ctx, lsquic_conn_t *c) {
printf("Context is: %p\n", context);
/* We need to create some kind of socket here */
if (!context) {
printf("ERROR: No context in on_new_conn\n");
return NULL;
}
/* For client connections, the context should already be the us_quic_socket_t */
us_quic_socket_t *socket = (us_quic_socket_t *) lsquic_conn_get_ctx(c);
if (!socket) {
printf("ERROR: No socket found in connection context\n");
return NULL;
}
int is_client = 0;
if (lsquic_conn_get_engine(c) == context->client_engine) {
is_client = 1;
}
context->on_open((us_quic_socket_t *) c, is_client);
if (context->on_open) {
context->on_open(socket, is_client);
}
return (lsquic_conn_ctx_t *) context;
return (lsquic_conn_ctx_t *) socket;
}
void us_quic_socket_create_stream(us_quic_socket_t *s, int ext_size) {
lsquic_conn_make_stream((lsquic_conn_t *) s);
if (!s || !s->lsquic_conn) {
printf("ERROR: Invalid socket or LSQUIC connection is null in create_stream\n");
return;
}
lsquic_conn_make_stream((lsquic_conn_t *) s->lsquic_conn);
// here we need to allocate and attach the user data
}
void on_conn_closed(lsquic_conn_t *c) {
us_quic_socket_context_t *context = (us_quic_socket_context_t *) lsquic_conn_get_ctx(c);
us_quic_socket_t *socket = (us_quic_socket_t *) lsquic_conn_get_ctx(c);
printf("on_conn_closed!\n");
context->on_close((us_quic_socket_t *) c);
if (!socket) {
printf("ERROR: No socket found in connection context for close callback\n");
return;
}
// Get the context from the UDP socket to access callbacks
us_quic_socket_context_t *context = us_udp_socket_user(socket->udp_socket);
if (context && context->on_close) {
context->on_close(socket);
}
}
lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) {
@@ -346,6 +438,11 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) {
lsquic_stream_wantread(s, 1);
us_quic_socket_context_t *context = stream_if_ctx;
if (!context) {
printf("ERROR: No context in on_new_stream\n");
return NULL;
}
// the conn's ctx should point at the udp socket and the socket context
// the ext size of streams and conn's are set by the listen/connect calls, which
@@ -356,8 +453,13 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) {
int ext_size = 256;
void *ext = malloc(ext_size);
// yes hello
strcpy(ext, "Hello I am ext!");
if (!ext) {
printf("ERROR: Failed to allocate stream extension memory\n");
return NULL;
}
// Initialize the memory to zero instead of strcpy
memset(ext, 0, ext_size);
int is_client = 0;
if (lsquic_conn_get_engine(lsquic_stream_conn(s)) == context->client_engine) {
@@ -366,7 +468,10 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) {
// luckily we can set the ext before we return
lsquic_stream_set_ctx(s, ext);
context->on_stream_open((us_quic_stream_t *) s, is_client);
if (context->on_stream_open) {
context->on_stream_open((us_quic_stream_t *) s, is_client);
}
return ext;
}
@@ -541,7 +646,6 @@ static void on_read(lsquic_stream_t *s, lsquic_stream_ctx_t *h) {
}
int us_quic_stream_write(us_quic_stream_t *s, char *data, int length) {
lsquic_stream_t *stream = (lsquic_stream_t *) s;
int ret = lsquic_stream_write((lsquic_stream_t *) s, data, length);
// just like otherwise, we automatically poll for writable when failed
if (ret != length) {
@@ -631,13 +735,23 @@ int server_name_cb(SSL *s, int *al, void *arg) {
struct ssl_ctx_st *get_ssl_ctx(void *peer_ctx, const struct sockaddr *local) {
printf("getting ssl ctx now, peer_ctx: %p\n", peer_ctx);
// peer_ctx point to the us_udp_socket_t that passed the UDP packet in via
// lsquic_engine_packet_in (it got passed as peer_ctx)
// we want the per-context ssl cert from this udp socket
struct us_udp_socket_t *udp_socket = (struct us_udp_socket_t *) peer_ctx;
if (!peer_ctx) {
printf("WARNING: No peer_ctx in get_ssl_ctx, using cached context\n");
if (old_ctx) {
return old_ctx;
}
// Return a default context if none exists
return NULL;
}
// the udp socket of a server points to the context
struct us_quic_socket_context_s *context = us_udp_socket_user(udp_socket);
// peer_ctx now points to our quic_peer_ctx structure
struct quic_peer_ctx *qctx = (struct quic_peer_ctx *) peer_ctx;
if (!qctx || !qctx->context) {
printf("ERROR: No context found in peer context for SSL\n");
return old_ctx;
}
struct us_quic_socket_context_s *context = qctx->context;
if (old_ctx) {
return old_ctx;
@@ -686,8 +800,6 @@ int log_buf_cb(void *logger_ctx, const char *buf, size_t len) {
}
int us_quic_stream_shutdown_read(us_quic_stream_t *s) {
lsquic_stream_t *stream = (lsquic_stream_t *) s;
int ret = lsquic_stream_shutdown((lsquic_stream_t *) s, 0);
if (ret != 0) {
printf("cannot shutdown stream!\n");
@@ -702,8 +814,6 @@ void *us_quic_stream_ext(us_quic_stream_t *s) {
}
void us_quic_stream_close(us_quic_stream_t *s) {
lsquic_stream_t *stream = (lsquic_stream_t *) s;
int ret = lsquic_stream_close((lsquic_stream_t *) s);
if (ret != 0) {
printf("cannot close stream!\n");
@@ -714,8 +824,6 @@ void us_quic_stream_close(us_quic_stream_t *s) {
}
int us_quic_stream_shutdown(us_quic_stream_t *s) {
lsquic_stream_t *stream = (lsquic_stream_t *) s;
int ret = lsquic_stream_shutdown((lsquic_stream_t *) s, 1);
if (ret != 0) {
printf("cannot shutdown stream!\n");
@@ -763,9 +871,9 @@ char pool[1000][4096];
int pool_top = 0;
void *take() {
if (pool_top == 1000) {
if (pool_top >= 1000) {
printf("out of memory\n");
exit(0);
return NULL; // Don't exit, return NULL instead
}
return pool[pool_top++];
}
@@ -810,20 +918,24 @@ int header_decode_heap_offset = 0;
struct lsxpack_header *hsi_prepare_decode(void *hdr_set, struct lsxpack_header *hdr, size_t space) {
//printf("hsi_prepare_decode\n");
// Validate space parameter - prevent buffer overflow
if (space > 4096 - sizeof(struct lsxpack_header)) {
printf("Space too large: %zu\n", space);
return NULL; // Don't exit, return NULL
}
if (!hdr) {
char *mem = take();
if (!mem) {
printf("Failed to allocate memory from pool\n");
return NULL;
}
hdr = (struct lsxpack_header *) mem;//malloc(sizeof(struct lsxpack_header));
memset(hdr, 0, sizeof(struct lsxpack_header));
hdr->buf = mem + sizeof(struct lsxpack_header);//take();//malloc(space);
lsxpack_header_prepare_decode(hdr, hdr->buf, 0, space);
} else {
if (space > 4096 - sizeof(struct lsxpack_header)) {
printf("not hanlded!\n");
exit(0);
}
hdr->val_len = space;
//hdr->buf = realloc(hdr->buf, space);
}
@@ -837,6 +949,11 @@ int hsi_process_header(void *hdr_set, struct lsxpack_header *hdr) {
//printf("hsi_process_header: %p\n", hdr);
if (!hdr_set) {
printf("ERROR: hdr_set is null\n");
return -1;
}
struct header_set_hd *hd = hdr_set;
struct processed_header *proc_hdr = (struct processed_header *) (hd + 1);
@@ -851,6 +968,21 @@ int hsi_process_header(void *hdr_set, struct lsxpack_header *hdr) {
return 0;
}
// Bounds check for header offset - prevent buffer overflow
if (hd->offset < 0 || hd->offset >= (4096 - sizeof(struct header_set_hd)) / sizeof(struct processed_header)) {
printf("ERROR: Header offset out of bounds: %d\n", hd->offset);
return -1;
}
// Validate header buffer bounds
if (!hdr->buf || hdr->val_offset < 0 || hdr->name_offset < 0 ||
hdr->val_len < 0 || hdr->name_len < 0 ||
hdr->val_offset + hdr->val_len > 4096 ||
hdr->name_offset + hdr->name_len > 4096) {
printf("ERROR: Invalid header buffer bounds\n");
return -1;
}
/*if (hdr->hpack_index) {
printf("header has hpack index: %d\n", hdr->hpack_index);
}
@@ -885,10 +1017,26 @@ void timer_cb(struct us_timer_t *t) {
// lsquic_conn
us_quic_socket_context_t *us_quic_socket_context(us_quic_socket_t *s) {
return (us_quic_socket_context_t *) lsquic_conn_get_ctx((lsquic_conn_t *) s);
if (!s || !s->udp_socket) {
printf("ERROR: Invalid socket or UDP socket is null\n");
return NULL;
}
// Get the context from the UDP socket's user data
us_quic_socket_context_t *context = us_udp_socket_user(s->udp_socket);
if (!context) {
printf("ERROR: No context found in UDP socket user data\n");
return NULL;
}
return context;
}
void *us_quic_socket_context_ext(us_quic_socket_context_t *context) {
if (!context) {
printf("ERROR: Context is null in us_quic_socket_context_ext\n");
return NULL;
}
return context + 1;
}
@@ -896,30 +1044,27 @@ void *us_quic_socket_context_ext(us_quic_socket_context_t *context) {
us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop, us_quic_socket_context_options_t options, int ext_size) {
printf("Creating socket context with ssl: %s\n", options.key_file_name);
// every _listen_ call creates a new udp socket that feeds inputs to the engine in the context
// every context has its own send buffer and udp send socket (not bound to any port or ip?)
// or just make it so that once you listen, it will listen on that port for input, and the context will use
// the first udp socket for output as it doesn't matter which one is used
/* Holds all callbacks */
us_quic_socket_context_t *context = malloc(sizeof(struct us_quic_socket_context_s) + ext_size);
if (!context) {
return NULL;
}
// the option is put on the socket context
context->options = options;
context->loop = loop;
//context->udp_socket = 0;
/* Allocate per thread, UDP packet buffers */
context->recv_buf = us_create_udp_packet_buffer();
//context->send_buf = us_create_udp_packet_buffer();
if (!context->recv_buf) {
free(context);
return NULL;
}
/* Init lsquic engine */
if (0 != lsquic_global_init(LSQUIC_GLOBAL_CLIENT|LSQUIC_GLOBAL_SERVER)) {
exit(EXIT_FAILURE);
free(context);
return NULL;
}
static struct lsquic_stream_if stream_callbacks = {
@@ -962,11 +1107,10 @@ us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop,
///printf("log: %d\n", lsquic_set_log_level("debug"));
static struct lsquic_logger_if logger = {
.log_buf = log_buf_cb,
};
// Logger is not currently used - commented out to avoid unused variable warning
// static struct lsquic_logger_if logger = {
// .log_buf = log_buf_cb,
// };
//lsquic_logger_init(&logger, 0, LLTS_NONE);
@@ -1008,7 +1152,21 @@ us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop,
us_quic_listen_socket_t *us_quic_socket_context_listen(us_quic_socket_context_t *context, const char *host, int port, int ext_size) {
/* We literally do create a listen socket */
return (us_quic_listen_socket_t *) us_create_udp_socket(context->loop, /*context->recv_buf*/ NULL, on_udp_socket_data, on_udp_socket_writable, host, port, 0, context);
int err = 0;
// For QUIC sockets, we need to create a UDP socket with extension space to avoid buffer overflows
// when lsquic tries to access extension data
struct us_udp_socket_t *socket = us_create_udp_socket_with_ext(context->loop, on_udp_socket_data_wrapper, on_udp_socket_writable, NULL, host, port, 0, &err, context, sizeof(struct quic_peer_ctx));
if (socket) {
// Initialize the peer context in the extension area
struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)socket + sizeof(struct us_udp_socket_t));
printf("Listen socket: %p, peer_ctx: %p, context: %p\n", socket, peer_ctx, context);
peer_ctx->udp_socket = socket;
peer_ctx->context = context;
memset(peer_ctx->reserved, 0, sizeof(peer_ctx->reserved));
}
return (us_quic_listen_socket_t *) socket;
//return NULL;
}
@@ -1030,7 +1188,18 @@ us_quic_socket_t *us_quic_socket_context_connect(us_quic_socket_context_t *conte
addr->sin6_family = AF_INET6;
// Create the UDP socket binding to ephemeral port
struct us_udp_socket_t *udp_socket = us_create_udp_socket(context->loop, /*context->recv_buf*/ NULL, on_udp_socket_data_client, on_udp_socket_writable, 0, 0, 0, context);
int err = 0;
// For QUIC client sockets, we also need extension space to avoid buffer overflows
struct us_udp_socket_t *udp_socket = us_create_udp_socket_with_ext(context->loop, on_udp_socket_data_client_wrapper, on_udp_socket_writable, NULL, 0, 0, 0, &err, context, sizeof(struct quic_peer_ctx));
if (udp_socket) {
// Initialize the peer context in the extension area
struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)udp_socket + sizeof(struct us_udp_socket_t));
printf("Client socket: %p, peer_ctx: %p, context: %p\n", udp_socket, peer_ctx, context);
peer_ctx->udp_socket = udp_socket;
peer_ctx->context = context;
memset(peer_ctx->reserved, 0, sizeof(peer_ctx->reserved));
}
// Determine what port we got, creating the local sockaddr
int ephemeral = us_udp_socket_bound_port(udp_socket);
@@ -1058,14 +1227,34 @@ us_quic_socket_t *us_quic_socket_context_connect(us_quic_socket_context_t *conte
// we need 1 socket for servers, then we bind multiple ports to that one socket
void *client = lsquic_engine_connect(context->client_engine, LSQVER_I001, (struct sockaddr *) local_addr, (struct sockaddr *) addr, udp_socket, (lsquic_conn_ctx_t *) udp_socket, "sni", 0, 0, 0, 0, 0);
// Create the us_quic_socket_t structure first so we can pass it as context
us_quic_socket_t *quic_socket = malloc(sizeof(us_quic_socket_t) + ext_size);
if (!quic_socket) {
printf("ERROR: Failed to allocate QUIC socket structure\n");
return NULL;
}
quic_socket->udp_socket = udp_socket;
quic_socket->lsquic_conn = NULL; // Will be set after connection is created
void *client = lsquic_engine_connect(context->client_engine, LSQVER_I001, (struct sockaddr *) local_addr, (struct sockaddr *) addr, udp_socket, (lsquic_conn_ctx_t *) quic_socket, "sni", 0, 0, 0, 0, 0);
printf("Client: %p\n", client);
// this is requiored to even have packetgs sending out (run this in post)
lsquic_engine_process_conns(context->client_engine);
if (!client) {
printf("ERROR: Failed to create LSQUIC connection\n");
free(quic_socket);
return NULL;
}
// Now store the lsquic connection in our socket structure
quic_socket->lsquic_conn = client;
printf("Created QUIC socket: %p with UDP socket: %p and LSQUIC conn: %p\n", quic_socket, udp_socket, client);
return client;
// this is required to even have packets sending out (run this in post)
lsquic_engine_process_conns(context->client_engine);
return quic_socket;
}
#endif

View File

@@ -17,6 +17,8 @@ typedef struct {
typedef struct {
/* Refers to either the shared listen socket or the client UDP socket */
void *udp_socket;
/* LSQUIC connection pointer for this socket */
void *lsquic_conn;
} us_quic_socket_t;
struct us_quic_socket_context_s;

View File

@@ -19,6 +19,7 @@
#include "internal/internal.h"
#include <string.h>
#include <stdlib.h>
// int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index) {
// return bsd_udp_packet_buffer_ecn((struct udp_recvbuf *)buf, index);
@@ -187,4 +188,73 @@ struct us_udp_socket_t *us_create_udp_socket(
us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
return (struct us_udp_socket_t *) udp;
}
// Extended version for QUIC sockets that need extension data
struct us_udp_socket_t *us_create_udp_socket_with_ext(
struct us_loop_t *loop,
void (*data_cb)(struct us_udp_socket_t *, void *, int),
void (*drain_cb)(struct us_udp_socket_t *),
void (*close_cb)(struct us_udp_socket_t *),
const char *host,
unsigned short port,
int flags,
int *err,
void *user,
int ext_size
) {
LIBUS_SOCKET_DESCRIPTOR fd = bsd_create_udp_socket(host, port, flags, err);
if (fd == LIBUS_SOCKET_ERROR) {
return 0;
}
int fallthrough = 0;
// Use the provided ext_size instead of hardcoded 0
struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_udp_socket_t) + ext_size);
us_poll_init(p, fd, POLL_TYPE_UDP);
struct us_udp_socket_t *udp = (struct us_udp_socket_t *)p;
/* Get and store the port once */
struct bsd_addr_t tmp = {0};
bsd_local_addr(fd, &tmp);
udp->port = bsd_addr_get_port(&tmp);
udp->loop = loop;
/* There is no udp socket context, only user data */
/* This should really be ext like everything else */
udp->user = user;
udp->on_data = data_cb;
udp->on_drain = drain_cb;
udp->on_close = close_cb;
udp->next = NULL;
us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
return (struct us_udp_socket_t *) udp;
}
/* Structure to hold allocated UDP packet buffer and its data */
struct us_udp_packet_buffer_wrapper {
struct udp_recvbuf buffer;
char data[LIBUS_RECV_BUFFER_LENGTH];
};
struct us_udp_packet_buffer_t *us_create_udp_packet_buffer() {
/* Allocate wrapper structure to hold both buffer and data */
struct us_udp_packet_buffer_wrapper *wrapper =
(struct us_udp_packet_buffer_wrapper *)malloc(sizeof(struct us_udp_packet_buffer_wrapper));
if (!wrapper) {
return NULL;
}
/* Setup the receive buffer using the allocated data */
bsd_udp_setup_recvbuf(&wrapper->buffer, wrapper->data, LIBUS_RECV_BUFFER_LENGTH);
/* Return the buffer part (us_udp_packet_buffer_t is typedef for struct udp_recvbuf) */
return (struct us_udp_packet_buffer_t *)&wrapper->buffer;
}

View File

@@ -22,6 +22,7 @@ pub const SocketAddress = @import("./api/bun/socket.zig").SocketAddress;
pub const TCPSocket = @import("./api/bun/socket.zig").TCPSocket;
pub const TLSSocket = @import("./api/bun/socket.zig").TLSSocket;
pub const SocketHandlers = @import("./api/bun/socket.zig").Handlers;
pub const QuicSocket = @import("./api/bun/quic_socket.zig").QuicSocket;
pub const Subprocess = @import("./api/bun/subprocess.zig");
pub const HashObject = @import("./api/HashObject.zig");

View File

@@ -27,6 +27,7 @@ pub const BunObject = struct {
pub const mmap = toJSCallback(Bun.mmapFile);
pub const nanoseconds = toJSCallback(Bun.nanoseconds);
pub const openInEditor = toJSCallback(Bun.openInEditor);
pub const quic = toJSCallback(host_fn.wrapStaticMethod(api.QuicSocket, "quic", false));
pub const registerMacro = toJSCallback(Bun.registerMacro);
pub const resolve = toJSCallback(Bun.resolve);
pub const resolveSync = toJSCallback(Bun.resolveSync);
@@ -167,6 +168,7 @@ pub const BunObject = struct {
@export(&BunObject.mmap, .{ .name = callbackName("mmap") });
@export(&BunObject.nanoseconds, .{ .name = callbackName("nanoseconds") });
@export(&BunObject.openInEditor, .{ .name = callbackName("openInEditor") });
@export(&BunObject.quic, .{ .name = callbackName("quic") });
@export(&BunObject.registerMacro, .{ .name = callbackName("registerMacro") });
@export(&BunObject.resolve, .{ .name = callbackName("resolve") });
@export(&BunObject.resolveSync, .{ .name = callbackName("resolveSync") });

View File

@@ -0,0 +1,797 @@
pub const QuicSocket = struct {
const This = @This();
// JavaScript class bindings - following the same pattern as TCP/TLS sockets
pub const js = jsc.Codegen.JSQuicSocket;
pub const toJS = js.toJS;
pub const fromJS = js.fromJS;
pub const fromJSDirect = js.fromJSDirect;
pub const new = bun.TrivialNew(@This());
const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{});
pub const ref = RefCount.ref;
pub const deref = RefCount.deref;
// QUIC socket using uSockets QUIC API
socket: ?*uws.quic.Socket = null,
socket_context: ?*uws.quic.SocketContext = null,
listen_socket: ?*uws.quic.ListenSocket = null,
// Current stream for simple operations (will expand to support multiple streams)
current_stream: ?*uws.quic.Stream = null,
flags: Flags = .{},
ref_count: RefCount,
handlers: ?*QuicHandlers,
this_value: jsc.JSValue = .zero,
poll_ref: Async.KeepAlive = Async.KeepAlive.init(),
// QUIC-specific fields
server_name: ?[]const u8 = null,
connection_id: ?[]const u8 = null,
stream_count: u32 = 0,
has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true),
pub const Flags = packed struct {
is_server: bool = false,
is_connected: bool = false,
is_closed: bool = false,
has_backpressure: bool = false,
// QUIC-specific flags
has_0rtt: bool = false,
is_migration_capable: bool = false,
_: u26 = 0,
};
pub fn hasPendingActivity(this: *This) callconv(.C) bool {
return this.has_pending_activity.load(.acquire);
}
pub fn memoryCost(_: *This) usize {
return @sizeOf(This);
}
pub fn finalize(this: *This) void {
this.deinit();
}
pub fn deinit(this: *This) void {
this.poll_ref.unref(jsc.VirtualMachine.get());
if (this.handlers) |handlers| {
handlers.unprotect();
bun.default_allocator.destroy(handlers);
}
// Close QUIC socket if still open
if (this.socket != null and !this.flags.is_closed) {
this.closeImpl();
}
if (this.server_name) |server_name| {
bun.default_allocator.free(server_name);
}
if (this.connection_id) |conn_id| {
bun.default_allocator.free(conn_id);
}
}
// Initialize a new QUIC socket
pub fn init(allocator: std.mem.Allocator, handlers: *QuicHandlers) !*This {
const this = try allocator.create(This);
this.* = This{
.ref_count = RefCount.init(),
.handlers = handlers,
};
handlers.protect();
return this;
}
// Create QUIC socket context with callbacks
fn createContext(this: *This) !void {
if (this.socket_context != null) return;
const loop = uws.Loop.get();
// For now, create context without TLS certificates (would need to be configured)
const options = uws.quic.SocketContextOptions{
.cert_file_name = null,
.key_file_name = null,
.passphrase = null,
};
const context = uws.quic.SocketContext.create(loop, options, @sizeOf(*This)) orelse return error.ContextCreationFailed;
this.socket_context = context;
// Set up callbacks
context.onOpen(onSocketOpen);
context.onClose(onSocketClose);
context.onStreamOpen(onStreamOpen);
context.onStreamData(onStreamData);
context.onStreamClose(onStreamClose);
context.onStreamEnd(onStreamEnd);
context.onStreamWritable(onStreamWritable);
// Store reference to this instance in context extension data
const ext_data = context.ext();
if (ext_data) |ext| {
const this_ptr: **This = @ptrCast(@alignCast(ext));
this_ptr.* = this;
}
log("QUIC socket context created", .{});
}
// Connect to a QUIC server
pub fn connectImpl(this: *This, hostname: []const u8, port: u16) !void {
if (this.socket_context == null) {
try this.createContext();
}
this.server_name = try bun.default_allocator.dupe(u8, hostname);
// Convert hostname to null-terminated string for C API
const hostname_cstr = try bun.default_allocator.dupeZ(u8, hostname);
defer bun.default_allocator.free(hostname_cstr);
// Create outgoing QUIC connection
const socket = this.socket_context.?.connect(hostname_cstr.ptr, @intCast(port), @sizeOf(*This)) orelse return error.ConnectionFailed;
this.socket = socket;
// Note: Socket extension data access will be handled through the socket context
// The this pointer is already stored in the context extension data
log("QUIC connect to {s}:{} initiated", .{ hostname, port });
}
// Listen for QUIC connections (server mode)
pub fn listenImpl(this: *This, hostname: []const u8, port: u16) !void {
if (this.socket_context == null) {
try this.createContext();
}
this.flags.is_server = true;
// Convert hostname to null-terminated string for C API
const hostname_cstr = try bun.default_allocator.dupeZ(u8, hostname);
defer bun.default_allocator.free(hostname_cstr);
// Start listening for QUIC connections
const listen_socket = this.socket_context.?.listen(hostname_cstr.ptr, @intCast(port), @sizeOf(*This)) orelse return error.ListenFailed;
this.listen_socket = listen_socket;
log("QUIC listening on {s}:{}", .{ hostname, port });
}
// Close the QUIC connection
pub fn closeImpl(this: *This) void {
if (this.flags.is_closed) return;
this.flags.is_closed = true;
this.has_pending_activity.store(false, .release);
// Close current stream if exists
if (this.current_stream) |stream| {
stream.close();
this.current_stream = null;
}
// Close socket (this will be handled by uSockets cleanup)
this.socket = null;
this.listen_socket = null;
log("QUIC connection closed", .{});
}
// Write data to the QUIC connection
pub fn writeImpl(this: *This, data: []const u8) !usize {
if (this.flags.is_closed) return error.SocketClosed;
if (!this.flags.is_connected) return error.NotConnected;
// Ensure we have a stream to write to
if (this.current_stream == null) {
if (this.socket) |socket| {
socket.createStream(0); // No extra data needed for stream extension
// Wait a moment for the stream to be created via callback
// In a real implementation, this might need to be asynchronous
} else {
return error.NoSocket;
}
}
if (this.current_stream) |stream| {
const bytes_written = stream.write(data);
if (bytes_written < 0) {
return error.WriteFailed;
}
return @intCast(bytes_written);
}
return error.NoStream;
}
// Read data from the QUIC connection
pub fn readImpl(this: *This, buffer: []u8) !usize {
if (this.flags.is_closed) return error.SocketClosed;
if (!this.flags.is_connected) return error.NotConnected;
// QUIC reading is event-driven through the onStreamData callback
// This method is kept for API compatibility but actual data comes through events
_ = buffer; // Suppress unused variable warning
log("QUIC read called - data comes through onStreamData events", .{});
return 0;
}
// Create a new QUIC stream
pub fn createStreamImpl(this: *This) !u32 {
if (this.flags.is_closed) return error.SocketClosed;
if (!this.flags.is_connected) return error.NotConnected;
if (this.socket) |socket| {
socket.createStream(@sizeOf(*uws.quic.Stream));
this.stream_count += 1;
log("QUIC stream #{} created", .{this.stream_count});
return this.stream_count;
}
return error.NoSocket;
}
// Get connection statistics
pub fn getQuicStats(this: *This) QuicStats {
return QuicStats{
.stream_count = this.stream_count,
.is_connected = this.flags.is_connected,
.has_0rtt = this.flags.has_0rtt,
.bytes_sent = 0, // TODO: Track actual bytes
.bytes_received = 0, // TODO: Track actual bytes
};
}
// JavaScript method bindings
pub fn connect(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments();
if (args.len < 2) return globalThis.throw("connect requires hostname and port", .{});
const hostname_js = args.ptr[0];
const port_js = args.ptr[1];
if (!hostname_js.isString()) return globalThis.throw("hostname must be a string", .{});
if (!port_js.isNumber()) return globalThis.throw("port must be a number", .{});
var hostname_slice = try hostname_js.getZigString(globalThis);
const hostname = hostname_slice.slice();
const port = port_js.to(u16);
this.connectImpl(hostname, port) catch |err| {
return globalThis.throwError(err, "Failed to connect");
};
return .js_undefined;
}
pub fn listen(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments();
if (args.len < 2) return globalThis.throw("listen requires hostname and port", .{});
const hostname_js = args.ptr[0];
const port_js = args.ptr[1];
if (!hostname_js.isString()) return globalThis.throw("hostname must be a string", .{});
if (!port_js.isNumber()) return globalThis.throw("port must be a number", .{});
var hostname_slice = try hostname_js.getZigString(globalThis);
const hostname = hostname_slice.slice();
const port = port_js.to(u16);
this.listenImpl(hostname, port) catch |err| {
return globalThis.throwError(err, "Failed to listen");
};
return .js_undefined;
}
pub fn write(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments();
if (args.len < 1) return globalThis.throw("write requires data", .{});
const data_js = args.ptr[0];
if (data_js.isEmptyOrUndefinedOrNull()) {
return globalThis.throw("data cannot be null or undefined", .{});
}
// Convert JS value to byte array
var data_slice: []const u8 = undefined;
if (data_js.asArrayBuffer(globalThis)) |array_buffer| {
data_slice = array_buffer.slice();
} else if (data_js.isString()) {
var zig_str = try data_js.getZigString(globalThis);
data_slice = zig_str.slice();
} else {
return globalThis.throw("data must be a string or ArrayBuffer", .{});
}
const bytes_written = this.writeImpl(data_slice) catch |err| {
return switch (err) {
error.SocketClosed => globalThis.throw("Socket is closed", .{}),
error.NotConnected => globalThis.throw("Socket is not connected", .{}),
error.NoSocket => globalThis.throw("No socket available", .{}),
error.NoStream => globalThis.throw("No stream available", .{}),
error.WriteFailed => globalThis.throw("Write operation failed", .{}),
};
};
return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(bytes_written)));
}
pub fn read(this: *This, globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
// QUIC reading is event-driven, so this just returns status info
const bytes_read = this.readImpl(&[_]u8{}) catch |err| {
return switch (err) {
error.SocketClosed => globalThis.throw("Socket is closed", .{}),
error.NotConnected => globalThis.throw("Socket is not connected", .{}),
};
};
return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(bytes_read)));
}
pub fn createStream(this: *This, globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const stream_id = this.createStreamImpl() catch {
return globalThis.throw("Failed to create stream", .{});
};
return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stream_id)));
}
pub fn close(this: *This, _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
this.closeImpl();
return .js_undefined;
}
// Property getters
pub fn getServerName(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
if (this.server_name) |server_name| {
return jsc.ZigString.init(server_name).toJS(globalThis);
}
return jsc.JSValue.jsNull();
}
pub fn setServerName(_: *This, _: *jsc.JSGlobalObject, value: jsc.JSValue) void {
if (value.isString()) {
// TODO: Implement setting server name
}
}
pub fn getConnectionId(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
if (this.connection_id) |conn_id| {
return jsc.ZigString.init(conn_id).toJS(globalThis);
}
return jsc.JSValue.jsNull();
}
pub fn getStreamCount(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue {
return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(this.stream_count)));
}
pub fn getIsConnected(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue {
return jsc.JSValue.jsBoolean(this.flags.is_connected);
}
pub fn getIsServer(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue {
return jsc.JSValue.jsBoolean(this.flags.is_server);
}
pub fn getHas0RTT(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue {
return jsc.JSValue.jsBoolean(this.flags.has_0rtt);
}
pub fn getStats(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
const stats = this.getQuicStats();
// Create a JavaScript object with the stats
const stats_obj = jsc.JSValue.createEmptyObject(globalThis, 5);
stats_obj.put(globalThis, "streamCount", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.stream_count))));
stats_obj.put(globalThis, "isConnected", jsc.JSValue.jsBoolean(stats.is_connected));
stats_obj.put(globalThis, "has0RTT", jsc.JSValue.jsBoolean(stats.has_0rtt));
stats_obj.put(globalThis, "bytesSent", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.bytes_sent))));
stats_obj.put(globalThis, "bytesReceived", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.bytes_received))));
return stats_obj;
}
pub fn getData(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue {
_ = this; // TODO: Implement data storage
return .js_undefined;
}
pub fn setData(this: *This, globalThis: *jsc.JSGlobalObject, value: jsc.JSValue) void {
_ = this; // TODO: Implement data storage
_ = globalThis;
_ = value;
}
pub fn getReadyState(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue {
if (this.flags.is_closed) {
return jsc.ZigString.init("closed").toJS(globalThis);
} else if (this.flags.is_connected) {
return jsc.ZigString.init("open").toJS(globalThis);
} else {
return jsc.ZigString.init("connecting").toJS(globalThis);
}
}
pub fn jsRef(this: *This, globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
this.poll_ref.ref(globalObject.bunVM());
return .js_undefined;
}
pub fn jsUnref(this: *This, globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue {
this.poll_ref.unref(globalObject.bunVM());
return .js_undefined;
}
// Static method for Bun.quic() API
pub fn quic(globalThis: *jsc.JSGlobalObject, options: jsc.JSValue) bun.JSError!jsc.JSValue {
if (options.isEmptyOrUndefinedOrNull() or !options.isObject()) {
return globalThis.throw("quic requires options object", .{});
}
// Determine if this is a server socket
const is_server = if (try options.get(globalThis, "server")) |server_val|
server_val.toBoolean()
else
false;
// Create handlers from options
const handlers = QuicHandlers.fromJS(globalThis, options, is_server) catch {
return globalThis.throw("Invalid QUIC handlers", .{});
};
// Allocate handlers on heap
const handlers_ptr = try bun.default_allocator.create(QuicHandlers);
handlers_ptr.* = handlers;
handlers_ptr.withAsyncContextIfNeeded(globalThis);
// Initialize QUIC socket
const this = QuicSocket.init(bun.default_allocator, handlers_ptr) catch {
handlers_ptr.unprotect();
bun.default_allocator.destroy(handlers_ptr);
return globalThis.throw("Failed to create QUIC socket", .{});
};
// Configure from options
if (try options.get(globalThis, "hostname")) |hostname_val| {
if (hostname_val.isString()) {
var hostname_slice = try hostname_val.getZigString(globalThis);
const hostname = hostname_slice.slice();
const port_val = (try options.get(globalThis, "port")) orelse jsc.JSValue.jsNumber(443);
const port = if (port_val.isNumber()) port_val.to(u16) else 443;
if (is_server) {
this.listenImpl(hostname, port) catch {
this.deref();
return globalThis.throw("Failed to listen", .{});
};
} else {
this.connectImpl(hostname, port) catch {
this.deref();
return globalThis.throw("Failed to connect", .{});
};
}
}
}
// Set up JavaScript value and return
this.this_value = this.toJS(globalThis);
this.poll_ref.ref(globalThis.bunVM());
return this.this_value;
}
// uSockets callback handlers
fn onSocketOpen(socket: *uws.quic.Socket, is_client: c_int) callconv(.C) void {
jsc.markBinding(@src());
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
this.socket = socket;
this.flags.is_connected = true;
this.has_pending_activity.store(true, .release);
log("QUIC socket opened (client: {})", .{is_client != 0});
// Call appropriate JavaScript handlers
if (this.handlers) |handlers| {
const vm = handlers.vm;
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
// For server mode, when a client connects, call onConnection handler
if (this.flags.is_server and is_client == 0) {
if (handlers.onConnection != .zero) {
_ = handlers.onConnection.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| {
this.callErrorHandler(handlers.globalObject.takeException(err));
return;
};
}
}
// Call onOpen handler for all cases (server startup and client connection)
if (handlers.onOpen != .zero) {
_ = handlers.onOpen.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| {
this.callErrorHandler(handlers.globalObject.takeException(err));
};
}
}
}
fn onSocketClose(socket: *uws.quic.Socket) callconv(.C) void {
jsc.markBinding(@src());
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
this.flags.is_connected = false;
this.flags.is_closed = true;
this.has_pending_activity.store(false, .release);
log("QUIC socket closed", .{});
// Call JavaScript onClose handler
if (this.handlers) |handlers| {
if (handlers.onClose != .zero) {
const vm = handlers.vm;
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
_ = handlers.onClose.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| {
this.callErrorHandler(handlers.globalObject.takeException(err));
};
}
}
}
fn onStreamOpen(stream: *uws.quic.Stream, is_client: c_int) callconv(.C) void {
jsc.markBinding(@src());
const socket = stream.socket() orelse return;
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
this.current_stream = stream;
// Mark connection as established when first stream opens successfully
if (!this.flags.is_connected) {
this.flags.is_connected = true;
log("QUIC connection now established after stream open", .{});
}
log("QUIC stream opened (client: {})", .{is_client != 0});
}
fn onStreamData(stream: *uws.quic.Stream, data: [*c]u8, length: c_int) callconv(.C) void {
jsc.markBinding(@src());
const socket = stream.socket() orelse return;
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
if (length <= 0) return;
const data_slice = data[0..@intCast(length)];
log("QUIC stream received {} bytes", .{length});
// Call JavaScript onMessage handler
if (this.handlers) |handlers| {
if (handlers.onMessage != .zero) {
const vm = handlers.vm;
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
const array_buffer = jsc.ArrayBuffer.createBuffer(handlers.globalObject, data_slice) catch {
this.callErrorHandler(jsc.JSValue.jsNull());
return;
};
_ = handlers.onMessage.call(handlers.globalObject, this.this_value, &.{ this.this_value, array_buffer }) catch |err| {
this.callErrorHandler(handlers.globalObject.takeException(err));
};
}
}
}
fn onStreamClose(stream: *uws.quic.Stream) callconv(.C) void {
jsc.markBinding(@src());
const socket = stream.socket() orelse return;
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
if (this.current_stream == stream) {
this.current_stream = null;
}
log("QUIC stream closed", .{});
}
fn onStreamEnd(stream: *uws.quic.Stream) callconv(.C) void {
jsc.markBinding(@src());
const socket = stream.socket() orelse return;
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
_ = this; // Use this if needed for future functionality
log("QUIC stream ended", .{});
}
fn onStreamWritable(stream: *uws.quic.Stream) callconv(.C) void {
jsc.markBinding(@src());
const socket = stream.socket() orelse return;
const context = socket.context() orelse return;
const ext_data = context.ext() orelse return;
const this_ptr: **This = @ptrCast(@alignCast(ext_data));
const this: *This = this_ptr.*;
_ = this; // Use this if needed for future functionality
log("QUIC stream writable", .{});
}
// Error handler helper
fn callErrorHandler(this: *This, exception: jsc.JSValue) void {
if (this.handlers) |handlers| {
if (handlers.onError != .zero) {
const vm = handlers.vm;
const event_loop = vm.eventLoop();
event_loop.enter();
defer event_loop.exit();
_ = handlers.onError.call(handlers.globalObject, this.this_value, &.{ this.this_value, exception }) catch {
// If error handler itself throws, we can't do much more
log("Error in QUIC error handler", .{});
};
}
}
}
};
pub const QuicStats = struct {
stream_count: u32,
is_connected: bool,
has_0rtt: bool,
bytes_sent: u64,
bytes_received: u64,
};
const std = @import("std");
const bun = @import("bun");
const Environment = bun.Environment;
const Async = bun.Async;
const jsc = bun.jsc;
const SocketAddress = @import("./socket/SocketAddress.zig");
const Handlers = @import("./socket/Handlers.zig");
const uws = @import("../../../deps/uws.zig");
const log = bun.Output.scoped(.QuicSocket, false);
// QUIC-specific handlers that use different callback names than regular sockets
pub const QuicHandlers = struct {
onOpen: jsc.JSValue = .zero, // "open" callback
onMessage: jsc.JSValue = .zero, // "message" callback
onClose: jsc.JSValue = .zero, // "close" callback
onError: jsc.JSValue = .zero, // "error" callback
onConnection: jsc.JSValue = .zero, // "connection" callback (server only)
vm: *jsc.VirtualMachine,
globalObject: *jsc.JSGlobalObject,
is_server: bool,
protection_count: bun.DebugOnly(u32) = if (Environment.isDebug) 0,
pub fn fromJS(globalObject: *jsc.JSGlobalObject, opts: jsc.JSValue, is_server: bool) bun.JSError!QuicHandlers {
var handlers = QuicHandlers{
.vm = globalObject.bunVM(),
.globalObject = globalObject,
.is_server = is_server,
};
if (opts.isEmptyOrUndefinedOrNull() or opts.isBoolean() or !opts.isObject()) {
return globalObject.throwInvalidArguments("Expected options to be an object", .{});
}
// Map QUIC callback names to handler fields
const pairs = .{
.{ "onOpen", "open" },
.{ "onMessage", "message" },
.{ "onClose", "close" },
.{ "onError", "error" },
.{ "onConnection", "connection" },
};
inline for (pairs) |pair| {
if (try opts.getTruthyComptime(globalObject, pair.@"1")) |callback_value| {
if (!callback_value.isCell() or !callback_value.isCallable()) {
return globalObject.throwInvalidArguments("Expected \"{s}\" callback to be a function", .{pair[1]});
}
@field(handlers, pair.@"0") = callback_value;
}
}
// For QUIC, we need at least an open callback or error callback
if (handlers.onOpen == .zero and handlers.onError == .zero) {
return globalObject.throwInvalidArguments("Expected at least \"open\" or \"error\" callback", .{});
}
return handlers;
}
pub fn unprotect(this: *QuicHandlers) void {
if (this.vm.isShuttingDown()) {
return;
}
if (comptime Environment.isDebug) {
bun.assert(this.protection_count > 0);
this.protection_count -= 1;
}
this.onOpen.unprotect();
this.onMessage.unprotect();
this.onClose.unprotect();
this.onError.unprotect();
this.onConnection.unprotect();
}
pub fn protect(this: *QuicHandlers) void {
if (comptime Environment.isDebug) {
this.protection_count += 1;
}
this.onOpen.protect();
this.onMessage.protect();
this.onClose.protect();
this.onError.protect();
this.onConnection.protect();
}
pub fn withAsyncContextIfNeeded(this: *QuicHandlers, globalObject: *jsc.JSGlobalObject) void {
inline for (.{
"onOpen",
"onMessage",
"onClose",
"onError",
"onConnection",
}) |field| {
const value = @field(this, field);
if (value != .zero) {
@field(this, field) = value.withAsyncContextIfNeeded(globalObject);
}
}
}
};

View File

@@ -241,6 +241,86 @@ const sslOnly = {
export default [
generate(true),
generate(false),
// QUIC Socket
define({
name: "QuicSocket",
JSType: "0b11101110",
hasPendingActivity: true,
noConstructor: true,
configurable: false,
memoryCost: true,
proto: {
connect: {
fn: "connect",
length: 2,
},
listen: {
fn: "listen",
length: 2,
},
write: {
fn: "write",
length: 1,
},
read: {
fn: "read",
length: 1,
},
createStream: {
fn: "createStream",
length: 0,
},
close: {
fn: "close",
length: 0,
},
"@@dispose": {
fn: "close",
length: 0,
},
ref: {
fn: "jsRef",
length: 0,
},
unref: {
fn: "jsUnref",
length: 0,
},
serverName: {
getter: "getServerName",
setter: "setServerName",
},
connectionId: {
getter: "getConnectionId",
},
streamCount: {
getter: "getStreamCount",
},
isConnected: {
getter: "getIsConnected",
},
isServer: {
getter: "getIsServer",
},
has0RTT: {
getter: "getHas0RTT",
},
stats: {
getter: "getStats",
},
data: {
getter: "getData",
cache: true,
setter: "setData",
},
readyState: {
getter: "getReadyState",
},
},
finalize: true,
construct: true,
klass: {},
}),
define({
name: "Listener",
noConstructor: true,

View File

@@ -59,6 +59,7 @@
macro(mmap) \
macro(nanoseconds) \
macro(openInEditor) \
macro(quic) \
macro(registerMacro) \
macro(resolve) \
macro(resolveSync) \

View File

@@ -754,6 +754,7 @@ JSC_DEFINE_HOST_FUNCTION(functionFileURLToPath, (JSC::JSGlobalObject * globalObj
pathToFileURL functionPathToFileURL DontDelete|Function 1
peek constructBunPeekObject DontDelete|PropertyCallback
plugin constructPluginObject ReadOnly|DontDelete|PropertyCallback
quic BunObject_callback_quic DontDelete|Function 1
randomUUIDv7 Bun__randomUUIDv7 DontDelete|Function 2
randomUUIDv5 Bun__randomUUIDv5 DontDelete|Function 3
readableStreamToArray JSBuiltin Builtin|Function 1

View File

@@ -48,6 +48,7 @@ pub const Classes = struct {
pub const ResourceUsage = api.Subprocess.ResourceUsage;
pub const TCPSocket = api.TCPSocket;
pub const TLSSocket = api.TLSSocket;
pub const QuicSocket = api.QuicSocket;
pub const UDPSocket = api.UDPSocket;
pub const SocketAddress = api.SocketAddress;
pub const TextDecoder = webcore.TextDecoder;

View File

@@ -26,6 +26,7 @@ pub const ListenSocket = @import("./uws/ListenSocket.zig").ListenSocket;
pub const State = @import("./uws/Response.zig").State;
pub const Loop = @import("./uws/Loop.zig").Loop;
pub const udp = @import("./uws/udp.zig");
pub const quic = @import("./uws/quic.zig");
pub const BodyReaderMixin = @import("./uws/BodyReaderMixin.zig").BodyReaderMixin;
pub const LIBUS_TIMEOUT_GRANULARITY = @as(i32, 4);

176
src/deps/uws/quic.zig Normal file
View File

@@ -0,0 +1,176 @@
const quic = @This();
const std = @import("std");
const bun = @import("bun");
const uws = @import("../uws.zig");
const Loop = uws.Loop;
/// QUIC socket context options for creating contexts
pub const SocketContextOptions = extern struct {
cert_file_name: [*c]const u8,
key_file_name: [*c]const u8,
passphrase: [*c]const u8,
};
/// QUIC socket context - holds shared state and configuration
pub const SocketContext = opaque {
/// Create a new QUIC socket context
pub fn create(loop: *Loop, options: SocketContextOptions, ext_size: c_int) ?*SocketContext {
return us_create_quic_socket_context(loop, options, ext_size);
}
/// Start listening for QUIC connections
pub fn listen(this: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*ListenSocket {
return us_quic_socket_context_listen(this, host, port, ext_size);
}
/// Create an outgoing QUIC connection
pub fn connect(this: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*Socket {
return us_quic_socket_context_connect(this, host, port, ext_size);
}
/// Get extension data for this context
pub fn ext(this: *SocketContext) ?*anyopaque {
return us_quic_socket_context_ext(this);
}
/// Set header for HTTP/3 requests
pub fn setHeader(this: *SocketContext, index: c_int, key: [*c]const u8, key_length: c_int, value: [*c]const u8, value_length: c_int) void {
us_quic_socket_context_set_header(this, index, key, key_length, value, value_length);
}
/// Send headers on a stream
pub fn sendHeaders(this: *SocketContext, stream: *Stream, num: c_int, has_body: c_int) void {
us_quic_socket_context_send_headers(this, stream, num, has_body);
}
/// Get header from received headers
pub fn getHeader(this: *SocketContext, index: c_int, name: [*c][*c]u8, name_length: [*c]c_int, value: [*c][*c]u8, value_length: [*c]c_int) c_int {
return us_quic_socket_context_get_header(this, index, name, name_length, value, value_length);
}
// Callback setters
pub fn onStreamData(this: *SocketContext, callback: *const fn (*Stream, [*c]u8, c_int) callconv(.C) void) void {
us_quic_socket_context_on_stream_data(this, callback);
}
pub fn onStreamEnd(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void {
us_quic_socket_context_on_stream_end(this, callback);
}
pub fn onStreamHeaders(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void {
us_quic_socket_context_on_stream_headers(this, callback);
}
pub fn onStreamOpen(this: *SocketContext, callback: *const fn (*Stream, c_int) callconv(.C) void) void {
us_quic_socket_context_on_stream_open(this, callback);
}
pub fn onStreamClose(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void {
us_quic_socket_context_on_stream_close(this, callback);
}
pub fn onOpen(this: *SocketContext, callback: *const fn (*Socket, c_int) callconv(.C) void) void {
us_quic_socket_context_on_open(this, callback);
}
pub fn onClose(this: *SocketContext, callback: *const fn (*Socket) callconv(.C) void) void {
us_quic_socket_context_on_close(this, callback);
}
pub fn onStreamWritable(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void {
us_quic_socket_context_on_stream_writable(this, callback);
}
};
/// QUIC listen socket - represents a listening QUIC socket
pub const ListenSocket = opaque {
// Listen sockets are created by SocketContext.listen()
// and typically don't need many methods beyond what's inherited
};
/// QUIC socket - represents a QUIC connection
pub const Socket = opaque {
/// Get the socket context for this socket
pub fn context(this: *Socket) ?*SocketContext {
return us_quic_socket_context(this);
}
/// Create a new stream on this QUIC connection
pub fn createStream(this: *Socket, ext_size: c_int) void {
us_quic_socket_create_stream(this, ext_size);
}
};
/// QUIC stream - represents a single stream within a QUIC connection
pub const Stream = opaque {
/// Write data to the stream
pub fn write(this: *Stream, data: []const u8) c_int {
return us_quic_stream_write(this, @ptrCast(@constCast(data.ptr)), @intCast(data.len));
}
/// Get the socket that owns this stream
pub fn socket(this: *Stream) ?*Socket {
return us_quic_stream_socket(this);
}
/// Get extension data for this stream
pub fn ext(this: *Stream) ?*anyopaque {
return us_quic_stream_ext(this);
}
/// Check if this stream is from a client connection
pub fn isClient(this: *Stream) bool {
return us_quic_stream_is_client(this) != 0;
}
/// Shutdown the stream for writing
pub fn shutdown(this: *Stream) c_int {
return us_quic_stream_shutdown(this);
}
/// Shutdown the stream for reading
pub fn shutdownRead(this: *Stream) c_int {
return us_quic_stream_shutdown_read(this);
}
/// Close the stream
pub fn close(this: *Stream) void {
us_quic_stream_close(this);
}
};
// External C function declarations
extern fn us_create_quic_socket_context(loop: *Loop, options: SocketContextOptions, ext_size: c_int) ?*SocketContext;
extern fn us_quic_socket_context_listen(context: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*ListenSocket;
extern fn us_quic_socket_context_connect(context: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*Socket;
extern fn us_quic_socket_context_ext(context: *SocketContext) ?*anyopaque;
extern fn us_quic_socket_context(socket: *Socket) ?*SocketContext;
// Stream functions
extern fn us_quic_stream_write(stream: *Stream, data: [*c]u8, length: c_int) c_int;
extern fn us_quic_stream_socket(stream: *Stream) ?*Socket;
extern fn us_quic_stream_ext(stream: *Stream) ?*anyopaque;
extern fn us_quic_stream_is_client(stream: *Stream) c_int;
extern fn us_quic_stream_shutdown(stream: *Stream) c_int;
extern fn us_quic_stream_shutdown_read(stream: *Stream) c_int;
extern fn us_quic_stream_close(stream: *Stream) void;
// Socket functions
extern fn us_quic_socket_create_stream(socket: *Socket, ext_size: c_int) void;
// Header functions
extern fn us_quic_socket_context_set_header(context: *SocketContext, index: c_int, key: [*c]const u8, key_length: c_int, value: [*c]const u8, value_length: c_int) void;
extern fn us_quic_socket_context_send_headers(context: *SocketContext, stream: *Stream, num: c_int, has_body: c_int) void;
extern fn us_quic_socket_context_get_header(context: *SocketContext, index: c_int, name: [*c][*c]u8, name_length: [*c]c_int, value: [*c][*c]u8, value_length: [*c]c_int) c_int;
// Callback registration functions
extern fn us_quic_socket_context_on_stream_data(context: *SocketContext, callback: *const fn (*Stream, [*c]u8, c_int) callconv(.C) void) void;
extern fn us_quic_socket_context_on_stream_end(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void;
extern fn us_quic_socket_context_on_stream_headers(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void;
extern fn us_quic_socket_context_on_stream_open(context: *SocketContext, callback: *const fn (*Stream, c_int) callconv(.C) void) void;
extern fn us_quic_socket_context_on_stream_close(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void;
extern fn us_quic_socket_context_on_open(context: *SocketContext, callback: *const fn (*Socket, c_int) callconv(.C) void) void;
extern fn us_quic_socket_context_on_close(context: *SocketContext, callback: *const fn (*Socket) callconv(.C) void) void;
extern fn us_quic_socket_context_on_stream_writable(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void;

View File

@@ -0,0 +1,135 @@
import { test, expect } from "bun:test";
test("Bun.quic should be available", () => {
expect(typeof Bun.quic).toBe("function");
});
test("Bun.quic should create a QUIC socket with basic options", () => {
const socket = Bun.quic({
hostname: "localhost",
port: 8443,
server: false,
data: {
test: true
},
open(socket) {
console.log("QUIC connection opened", socket);
},
message(socket, data) {
console.log("QUIC message received", data);
},
close(socket) {
console.log("QUIC connection closed", socket);
},
error(socket, error) {
console.log("QUIC error", error);
},
});
expect(socket).toBeDefined();
expect(typeof socket.connect).toBe("function");
expect(typeof socket.write).toBe("function");
expect(typeof socket.read).toBe("function");
expect(typeof socket.createStream).toBe("function");
expect(typeof socket.close).toBe("function");
// Test properties
expect(typeof socket.isConnected).toBe("boolean");
expect(typeof socket.isServer).toBe("boolean");
expect(typeof socket.streamCount).toBe("number");
expect(socket.readyState).toBe("open");
// Clean up
socket.close();
expect(socket.readyState).toBe("closed");
});
test("QuicSocket should support server mode", () => {
const server = Bun.quic({
hostname: "localhost",
port: 8443,
server: true,
data: {
isServer: true
},
open(socket) {
console.log("QUIC server ready", socket);
},
connection(socket) {
console.log("New QUIC connection", socket);
},
message(socket, data) {
console.log("Server received message", data);
},
close(socket) {
console.log("QUIC server closed", socket);
},
error(socket, error) {
console.log("QUIC server error", error);
},
});
expect(server).toBeDefined();
expect(server.isServer).toBe(true);
// Clean up
server.close();
});
test("QuicSocket should provide stats", () => {
const socket = Bun.quic({
hostname: "localhost",
port: 8443,
server: false,
open() {},
message() {},
close() {},
error() {},
});
const stats = socket.stats;
expect(stats).toBeDefined();
expect(typeof stats.streamCount).toBe("number");
expect(typeof stats.isConnected).toBe("boolean");
expect(typeof stats.has0RTT).toBe("boolean");
expect(typeof stats.bytesSent).toBe("number");
expect(typeof stats.bytesReceived).toBe("number");
socket.close();
});
test("QuicSocket should support stream creation", () => {
const socket = Bun.quic({
hostname: "localhost",
port: 8443,
server: false,
open() {},
message() {},
close() {},
error() {},
});
// Stream creation should succeed even during connection process in QUIC
expect(() => socket.createStream()).not.toThrow();
expect(typeof socket.createStream()).toBe("number");
socket.close();
});
test("QuicSocket should validate options", () => {
// Missing options
expect(() => Bun.quic()).toThrow();
// Invalid options type
expect(() => Bun.quic("invalid")).toThrow();
// Empty options should work (no connection will be made)
const socket = Bun.quic({
open() {},
message() {},
close() {},
error() {},
});
expect(socket).toBeDefined();
socket.close();
});

View File

@@ -0,0 +1,199 @@
import { test, expect } from "bun:test";
test("QUIC large data transfer", async () => {
let dataReceived = "";
const testData = "x".repeat(64 * 1024); // 64KB test data
const server = Bun.quic({
hostname: "localhost",
port: 9446,
server: true,
connection(socket) {
// Send large data to client
socket.write(testData);
},
open() {},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9446,
server: false,
open() {},
message(socket, data) {
dataReceived += data.toString();
if (dataReceived.length >= testData.length) {
expect(dataReceived).toBe(testData);
socket.close();
}
},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 2000));
expect(dataReceived.length).toBe(testData.length);
server.close();
client.close();
});
test("QUIC multiple concurrent streams", async () => {
const streamCount = 10;
let streamsCreated = 0;
let messagesReceived = 0;
const server = Bun.quic({
hostname: "localhost",
port: 9447,
server: true,
connection(socket) {
// Create multiple streams
for (let i = 0; i < streamCount; i++) {
const stream = socket.createStream();
streamsCreated++;
// Send message on each stream
socket.write(`Stream ${i} message`);
}
expect(socket.streamCount).toBe(streamCount);
},
message(socket, data) {
messagesReceived++;
console.log("Server received:", data.toString());
},
open() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9447,
server: false,
open(socket) {
// Client also creates streams
for (let i = 0; i < streamCount; i++) {
socket.createStream();
socket.write(`Client stream ${i}`);
}
},
message(socket, data) {
console.log("Client received:", data.toString());
},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 1000));
expect(streamsCreated).toBe(streamCount);
expect(messagesReceived).toBeGreaterThan(0);
server.close();
client.close();
});
test("QUIC connection statistics", async () => {
let finalStats: any = null;
const server = Bun.quic({
hostname: "localhost",
port: 9448,
server: true,
connection(socket) {
// Send some data to generate stats
socket.write("Hello statistics!");
},
open() {},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9448,
server: false,
open(socket) {
// Send data back
socket.write("Stats response!");
},
message(socket, data) {
console.log("Client received:", data.toString());
// Get final stats before closing
finalStats = socket.stats;
},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 1000));
// Verify stats structure and values
expect(finalStats).toBeDefined();
expect(typeof finalStats.streamCount).toBe("number");
expect(typeof finalStats.isConnected).toBe("boolean");
expect(typeof finalStats.has0RTT).toBe("boolean");
expect(typeof finalStats.bytesSent).toBe("number");
expect(typeof finalStats.bytesReceived).toBe("number");
// Should have received some data
expect(finalStats.bytesReceived).toBeGreaterThan(0);
server.close();
client.close();
});
test("QUIC 0-RTT connection support", async () => {
let has0RTTSupport = false;
const server = Bun.quic({
hostname: "localhost",
port: 9449,
server: true,
connection(socket) {
console.log("Server: 0-RTT support:", socket.has0RTT);
},
open() {},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9449,
server: false,
open(socket) {
has0RTTSupport = socket.has0RTT;
console.log("Client: 0-RTT support:", has0RTTSupport);
},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 500));
// 0-RTT is a boolean property
expect(typeof has0RTTSupport).toBe("boolean");
server.close();
client.close();
});

View File

@@ -0,0 +1,206 @@
import { test, expect } from "bun:test";
test("QUIC server and client integration", async () => {
let serverConnections = 0;
let clientConnections = 0;
let messagesReceived = 0;
// Create QUIC server
const server = Bun.quic({
hostname: "localhost",
port: 9443,
server: true,
open(socket) {
console.log("QUIC server ready on port 9443");
},
connection(socket) {
serverConnections++;
console.log(`New QUIC connection (${serverConnections})`);
// Send welcome message to client
socket.write("Welcome to QUIC server!");
},
message(socket, data) {
messagesReceived++;
console.log("Server received:", data.toString());
// Echo the message back
socket.write(`Echo: ${data}`);
},
close(socket) {
console.log("Server connection closed");
},
error(socket, error) {
console.error("Server error:", error);
},
});
// Wait for server to be ready
await new Promise(resolve => setTimeout(resolve, 100));
// Create QUIC client
const client = Bun.quic({
hostname: "localhost",
port: 9443,
server: false,
open(socket) {
clientConnections++;
console.log("QUIC client connected");
// Send test message
socket.write("Hello from QUIC client!");
},
message(socket, data) {
console.log("Client received:", data.toString());
if (data.toString().includes("Echo:")) {
// Test complete, close connection
socket.close();
}
},
close(socket) {
console.log("Client connection closed");
},
error(socket, error) {
console.error("Client error:", error);
},
});
// Wait for communication to complete
await new Promise(resolve => setTimeout(resolve, 1000));
// Verify connections were established
expect(serverConnections).toBe(1);
expect(clientConnections).toBe(1);
expect(messagesReceived).toBeGreaterThan(0);
// Clean up
server.close();
client.close();
});
test("QUIC stream creation and management", async () => {
const server = Bun.quic({
hostname: "localhost",
port: 9444,
server: true,
connection(socket) {
console.log("Server: New connection");
// Create multiple streams
const stream1 = socket.createStream();
const stream2 = socket.createStream();
expect(socket.streamCount).toBe(2);
expect(stream1).toBeDefined();
expect(stream2).toBeDefined();
expect(stream1).not.toBe(stream2);
},
open() {},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9444,
server: false,
open(socket) {
// Client can also create streams
const clientStream = socket.createStream();
expect(clientStream).toBeDefined();
expect(socket.streamCount).toBe(1);
},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 500));
server.close();
client.close();
});
test("QUIC connection states and properties", async () => {
const server = Bun.quic({
hostname: "localhost",
port: 9445,
server: true,
open() {},
connection() {},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 100));
const client = Bun.quic({
hostname: "localhost",
port: 9445,
server: false,
open(socket) {
// Test connection properties
expect(socket.isServer).toBe(false);
expect(socket.readyState).toBe("open");
expect(socket.serverName).toBe("localhost");
expect(socket.streamCount).toBe(0);
// Test stats object
const stats = socket.stats;
expect(typeof stats).toBe("object");
expect(typeof stats.streamCount).toBe("number");
expect(typeof stats.isConnected).toBe("boolean");
expect(typeof stats.has0RTT).toBe("boolean");
expect(typeof stats.bytesSent).toBe("number");
expect(typeof stats.bytesReceived).toBe("number");
},
message() {},
close() {},
error() {},
});
await new Promise(resolve => setTimeout(resolve, 500));
// Test server properties
expect(server.isServer).toBe(true);
expect(server.readyState).toBe("open");
server.close();
client.close();
// Test closed state
expect(server.readyState).toBe("closed");
expect(client.readyState).toBe("closed");
});
test("QUIC error handling", async () => {
let errorReceived = false;
// Try to connect to non-existent server
const client = Bun.quic({
hostname: "localhost",
port: 9999, // Non-existent port
server: false,
open() {
// Should not be called
expect(false).toBe(true);
},
message() {},
close() {},
error(socket, error) {
errorReceived = true;
console.log("Expected error:", error);
expect(error).toBeDefined();
},
});
await new Promise(resolve => setTimeout(resolve, 2000));
expect(errorReceived).toBe(true);
client.close();
});