diff --git a/cmake/scripts/GitClone.cmake b/cmake/scripts/GitClone.cmake index d02f0228b3..1fbfa42862 100644 --- a/cmake/scripts/GitClone.cmake +++ b/cmake/scripts/GitClone.cmake @@ -20,22 +20,85 @@ if(NOT GIT_NAME) set(GIT_NAME ${GIT_ORIGINAL_NAME}) endif() -set(GIT_DOWNLOAD_URL https://github.com/${GIT_REPOSITORY}/archive/${GIT_REF}.tar.gz) +# Special handling for repositories that need git submodules +if(GIT_NAME STREQUAL "lsquic") + message(STATUS "Using git clone with submodules for ${GIT_REPOSITORY} at ${GIT_REF}...") + + find_program(GIT_PROGRAM git REQUIRED) + + # Remove existing directory if it exists + if(EXISTS ${GIT_PATH}) + file(REMOVE_RECURSE ${GIT_PATH}) + endif() + + # Clone the repository + execute_process( + COMMAND + ${GIT_PROGRAM} clone https://github.com/${GIT_REPOSITORY}.git ${GIT_PATH} + ERROR_STRIP_TRAILING_WHITESPACE + ERROR_VARIABLE + GIT_ERROR + RESULT_VARIABLE + GIT_RESULT + ) + + if(NOT GIT_RESULT EQUAL 0) + message(FATAL_ERROR "Git clone failed: ${GIT_ERROR}") + endif() + + # Checkout the specific commit/tag/branch + execute_process( + COMMAND + ${GIT_PROGRAM} checkout ${GIT_REF} + WORKING_DIRECTORY + ${GIT_PATH} + ERROR_STRIP_TRAILING_WHITESPACE + ERROR_VARIABLE + GIT_ERROR + RESULT_VARIABLE + GIT_RESULT + ) + + if(NOT GIT_RESULT EQUAL 0) + message(FATAL_ERROR "Git checkout failed: ${GIT_ERROR}") + endif() + + # Initialize and update submodules + execute_process( + COMMAND + ${GIT_PROGRAM} submodule update --init --recursive + WORKING_DIRECTORY + ${GIT_PATH} + ERROR_STRIP_TRAILING_WHITESPACE + ERROR_VARIABLE + GIT_ERROR + RESULT_VARIABLE + GIT_RESULT + ) + + if(NOT GIT_RESULT EQUAL 0) + message(FATAL_ERROR "Git submodule init failed: ${GIT_ERROR}") + endif() + +else() + # Use the original download method for other repositories + set(GIT_DOWNLOAD_URL https://github.com/${GIT_REPOSITORY}/archive/${GIT_REF}.tar.gz) -message(STATUS "Cloning ${GIT_REPOSITORY} at ${GIT_REF}...") -execute_process( - COMMAND - ${CMAKE_COMMAND} - -DDOWNLOAD_URL=${GIT_DOWNLOAD_URL} - -DDOWNLOAD_PATH=${GIT_PATH} - -DDOWNLOAD_FILTERS=${GIT_FILTERS} - -P ${CMAKE_CURRENT_LIST_DIR}/DownloadUrl.cmake - ERROR_STRIP_TRAILING_WHITESPACE - ERROR_VARIABLE - GIT_ERROR - RESULT_VARIABLE - GIT_RESULT -) + message(STATUS "Cloning ${GIT_REPOSITORY} at ${GIT_REF}...") + execute_process( + COMMAND + ${CMAKE_COMMAND} + -DDOWNLOAD_URL=${GIT_DOWNLOAD_URL} + -DDOWNLOAD_PATH=${GIT_PATH} + -DDOWNLOAD_FILTERS=${GIT_FILTERS} + -P ${CMAKE_CURRENT_LIST_DIR}/DownloadUrl.cmake + ERROR_STRIP_TRAILING_WHITESPACE + ERROR_VARIABLE + GIT_ERROR + RESULT_VARIABLE + GIT_RESULT + ) +endif() if(NOT GIT_RESULT EQUAL 0) message(FATAL_ERROR "Clone failed: ${GIT_ERROR}") diff --git a/cmake/sources/ZigSources.txt b/cmake/sources/ZigSources.txt index 960b7a2f3b..c7938eb07d 100644 --- a/cmake/sources/ZigSources.txt +++ b/cmake/sources/ZigSources.txt @@ -62,6 +62,7 @@ src/bun.js/api/bun/dns.zig src/bun.js/api/bun/h2_frame_parser.zig src/bun.js/api/bun/lshpack.zig src/bun.js/api/bun/process.zig +src/bun.js/api/bun/quic_socket.zig src/bun.js/api/bun/socket.zig src/bun.js/api/bun/socket/Handlers.zig src/bun.js/api/bun/socket/Listener.zig @@ -501,6 +502,7 @@ src/deps/uws/ConnectingSocket.zig src/deps/uws/InternalLoopData.zig src/deps/uws/ListenSocket.zig src/deps/uws/Loop.zig +src/deps/uws/quic.zig src/deps/uws/Request.zig src/deps/uws/Response.zig src/deps/uws/socket.zig diff --git a/cmake/targets/BuildBun.cmake b/cmake/targets/BuildBun.cmake index 43d3b88055..ba5bc19a6e 100644 --- a/cmake/targets/BuildBun.cmake +++ b/cmake/targets/BuildBun.cmake @@ -55,6 +55,7 @@ set(BUN_DEPENDENCIES Mimalloc TinyCC Zlib + Lsquic # QUIC protocol support - depends on BoringSSL and Zlib LibArchive # must be loaded after zlib HdrHistogram # must be loaded after zlib Zstd @@ -834,6 +835,7 @@ target_compile_definitions(${bun} PRIVATE _HAS_EXCEPTIONS=0 LIBUS_USE_OPENSSL=1 LIBUS_USE_BORINGSSL=1 + LIBUS_USE_QUIC=1 WITH_BORINGSSL=1 STATICALLY_LINKED_WITH_JavaScriptCore=1 STATICALLY_LINKED_WITH_BMALLOC=1 diff --git a/cmake/targets/BuildLsquic.cmake b/cmake/targets/BuildLsquic.cmake new file mode 100644 index 0000000000..d7e4591a2b --- /dev/null +++ b/cmake/targets/BuildLsquic.cmake @@ -0,0 +1,31 @@ +register_repository( + NAME + lsquic + REPOSITORY + litespeedtech/lsquic + COMMIT + 70486141724f85e97b08f510673e29f399bbae8f +) + +register_cmake_command( + TARGET + lsquic + LIBRARIES + lsquic + LIB_PATH + src/liblsquic + ARGS + -DSHARED=OFF + -DLSQUIC_SHARED_LIB=0 + -DBORINGSSL_DIR=${VENDOR_PATH}/boringssl + -DBORINGSSL_LIB=${BUILD_PATH}/boringssl + -DZLIB_INCLUDE_DIR=${VENDOR_PATH}/zlib + -DZLIB_LIB=${BUILD_PATH}/zlib/libz.a + -DCMAKE_BUILD_TYPE=Release + INCLUDES + include + src/liblsquic + DEPENDS + BoringSSL + Zlib +) \ No newline at end of file diff --git a/packages/bun-usockets/src/libusockets.h b/packages/bun-usockets/src/libusockets.h index a5156f700c..ab8fc7f85c 100644 --- a/packages/bun-usockets/src/libusockets.h +++ b/packages/bun-usockets/src/libusockets.h @@ -157,6 +157,9 @@ struct us_udp_packet_buffer_t *us_create_udp_packet_buffer(); struct us_udp_socket_t *us_create_udp_socket(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, int flags, int *err, void *user); +// Extended version for QUIC sockets that need extension data +struct us_udp_socket_t *us_create_udp_socket_with_ext(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, int flags, int *err, void *user, int ext_size); + void us_udp_socket_close(struct us_udp_socket_t *s); int us_udp_socket_set_broadcast(struct us_udp_socket_t *s, int enabled); diff --git a/packages/bun-usockets/src/quic.c b/packages/bun-usockets/src/quic.c index d4d94c7041..db05dcdfdd 100644 --- a/packages/bun-usockets/src/quic.c +++ b/packages/bun-usockets/src/quic.c @@ -5,7 +5,7 @@ #include "quic.h" - +#include "internal/internal.h" #include "lsquic.h" #include "lsquic_types.h" @@ -15,6 +15,8 @@ #ifndef _WIN32 #include #include +#include +#include #endif #include @@ -23,6 +25,17 @@ void leave_all(); +// Peer context structure for lsquic - contains UDP socket and other metadata +struct quic_peer_ctx { + struct us_udp_socket_t *udp_socket; + us_quic_socket_context_t *context; + void *reserved[16]; // Extra space to prevent buffer overflows +}; + +// Forward declarations for QUIC UDP callback functions +void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets); +void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets); + /* struct sockaddr_in client_addr = { AF_INET, @@ -95,15 +108,25 @@ void us_quic_socket_context_on_stream_writable(us_quic_socket_context_t *context void on_udp_socket_writable(struct us_udp_socket_t *s) { /* Need context from socket here */ us_quic_socket_context_t *context = us_udp_socket_user(s); + if (!context) { + printf("ERROR: No context found in UDP socket\n"); + return; + } /* We just continue now */ lsquic_engine_send_unsent_packets(context->engine); } +// Wrapper function to match uSockets UDP callback signature +void on_udp_socket_data_client_wrapper(struct us_udp_socket_t *s, void *buf, int packets) { + on_udp_socket_data_client(s, (struct us_udp_packet_buffer_t *)buf, packets); +} + // we need two differetn handlers to know to put it in client or servcer context void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets) { - int fd = us_poll_fd((struct us_poll_t *) s); + // Remove unused fd variable + // int fd = us_poll_fd((struct us_poll_t *) s); //printf("Reading on fd: %d\n", fd); //printf("UDP (client) socket got data: %p\n", s); @@ -113,21 +136,41 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b // do we have udp socket contexts? or do we just have user data? us_quic_socket_context_t *context = us_udp_socket_user(s); + if (!context) { + printf("ERROR: No context found in UDP client socket\n"); + return; + } + + if (!buf) { + printf("ERROR: Null packet buffer in UDP client handler\n"); + return; + } + + if (packets <= 0) { + return; + } /* We just shove it to lsquic */ for (int i = 0; i < packets; i++) { char *payload = us_udp_packet_buffer_payload(buf, i); int length = us_udp_packet_buffer_payload_length(buf, i); - int ecn = us_udp_packet_buffer_ecn(buf, i); + // ECN not available in current uSockets API - remove this line + // int ecn = us_udp_packet_buffer_ecn(buf, i); void *peer_addr = us_udp_packet_buffer_peer(buf, i); + // Validate packet data before processing + if (!payload || length <= 0 || length > 65536 || !peer_addr) { + printf("Invalid packet data: payload=%p, length=%d, peer_addr=%p\n", payload, length, peer_addr); + continue; + } + //printf("Reading UDP of size %d\n", length); char ip[16]; int ip_length = us_udp_packet_buffer_local_ip(buf, i, ip); if (!ip_length) { printf("We got no ip on received packet!\n"); - exit(0); + continue; // Don't exit, just skip this packet } //printf("Our received destination IP length is: %d\n", ip_length); @@ -143,16 +186,25 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b ipv6->sin6_family = AF_INET6; ipv6->sin6_port = ntohs(port); memcpy(ipv6->sin6_addr.s6_addr, ip, 16); - } else { + } else if (ip_length == 4) { struct sockaddr_in *ipv4 = (struct sockaddr_in *) &local_addr; ipv4->sin_family = AF_INET; ipv4->sin_port = ntohs(port); memcpy(&ipv4->sin_addr.s_addr, ip, 4); + } else { + printf("Invalid IP length: %d\n", ip_length); + continue; } + if (!context->client_engine) { + printf("ERROR: Client engine is null\n"); + continue; + } - int ret = lsquic_engine_packet_in(context->client_engine, payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) s, 0); + // Use the peer context from the UDP socket extension area + struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)s + sizeof(struct us_udp_socket_t)); + lsquic_engine_packet_in(context->client_engine, (const unsigned char *)payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) peer_ctx, 0); //printf("Engine returned: %d\n", ret); @@ -162,6 +214,11 @@ void on_udp_socket_data_client(struct us_udp_socket_t *s, struct us_udp_packet_b } +// Wrapper function to match uSockets UDP callback signature +void on_udp_socket_data_wrapper(struct us_udp_socket_t *s, void *buf, int packets) { + on_udp_socket_data(s, (struct us_udp_packet_buffer_t *)buf, packets); +} + void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets) { @@ -172,24 +229,46 @@ void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t // do we have udp socket contexts? or do we just have user data? us_quic_socket_context_t *context = us_udp_socket_user(s); + if (!context) { + printf("ERROR: No context found in UDP server socket\n"); + return; + } + + if (!buf) { + printf("ERROR: Null packet buffer in UDP server handler\n"); + return; + } + + if (packets <= 0) { + return; + } // process conns now? to accept new connections? - lsquic_engine_process_conns(context->engine); + if (context->engine) { + lsquic_engine_process_conns(context->engine); + } /* We just shove it to lsquic */ for (int i = 0; i < packets; i++) { char *payload = us_udp_packet_buffer_payload(buf, i); int length = us_udp_packet_buffer_payload_length(buf, i); - int ecn = us_udp_packet_buffer_ecn(buf, i); + // ECN not available in current uSockets API - remove this line + // int ecn = us_udp_packet_buffer_ecn(buf, i); void *peer_addr = us_udp_packet_buffer_peer(buf, i); + // Validate packet data before processing + if (!payload || length <= 0 || length > 65536 || !peer_addr) { + printf("Invalid server packet data: payload=%p, length=%d, peer_addr=%p\n", payload, length, peer_addr); + continue; + } + //printf("Reading UDP of size %d\n", length); char ip[16]; int ip_length = us_udp_packet_buffer_local_ip(buf, i, ip); if (!ip_length) { printf("We got no ip on received packet!\n"); - exit(0); + continue; // Don't exit, just skip this packet } //printf("Our received destination IP length is: %d\n", ip_length); @@ -205,17 +284,25 @@ void on_udp_socket_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t ipv6->sin6_family = AF_INET6; ipv6->sin6_port = ntohs(port); memcpy(ipv6->sin6_addr.s6_addr, ip, 16); - } else { - + } else if (ip_length == 4) { struct sockaddr_in *ipv4 = (struct sockaddr_in *) &local_addr; ipv4->sin_family = AF_INET; ipv4->sin_port = ntohs(port); memcpy(&ipv4->sin_addr.s_addr, ip, 4); + } else { + printf("Invalid server IP length: %d\n", ip_length); + continue; } + if (!context->engine) { + printf("ERROR: Server engine is null\n"); + continue; + } - int ret = lsquic_engine_packet_in(context->engine, payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) s, 0); + // Use the peer context from the UDP socket extension area + struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)s + sizeof(struct us_udp_socket_t)); + lsquic_engine_packet_in(context->engine, (const unsigned char *)payload, length, (struct sockaddr *) &local_addr, peer_addr, (void *) peer_ctx, 0); //printf("Engine returned: %d\n", ret); @@ -240,72 +327,51 @@ struct mmsghdr { /* Server and client packet out is identical */ int send_packets_out(void *ctx, const struct lsquic_out_spec *specs, unsigned n_specs) { #ifndef _WIN32 - us_quic_socket_context_t *context = ctx; - - /* A run is at most UIO_MAXIOV datagrams long */ - struct mmsghdr hdrs[UIO_MAXIOV]; - int run_length = 0; - - /* We assume that thiss whole cb will never be called with 0 specs */ - struct us_udp_socket_t *last_socket = (struct us_udp_socket_t *) specs[0].peer_ctx; - + // For now, send packets one by one using regular sendto + // TODO: Optimize with proper batch sending using uSockets API int sent = 0; - for (int i = 0; i < n_specs; i++) { - /* Send this run if we need to */ - if (run_length == UIO_MAXIOV || specs[i].peer_ctx != last_socket) { - int ret = bsd_sendmmsg(us_poll_fd((struct us_poll_t *) last_socket), hdrs, run_length, 0); - if (ret != run_length) { - if (ret == -1) { - printf("unhandled udp backpressure!\n"); - return sent; + for (unsigned i = 0; i < n_specs; i++) { + struct us_udp_socket_t *socket = (struct us_udp_socket_t *) specs[i].peer_ctx; + int fd = us_poll_fd((struct us_poll_t *) socket); + + // Combine all iovecs into a single buffer for simple sending + size_t total_len = 0; + for (int j = 0; j < specs[i].iovlen; j++) { + total_len += specs[i].iov[j].iov_len; + } + + if (total_len > 0) { + // Simple approach: use sendto for each packet + // In a real implementation, we'd want to use the proper uSockets batch API + char buffer[2048]; // Maximum UDP payload size + if (total_len <= sizeof(buffer)) { + size_t offset = 0; + for (int j = 0; j < specs[i].iovlen; j++) { + memcpy(buffer + offset, specs[i].iov[j].iov_base, specs[i].iov[j].iov_len); + offset += specs[i].iov[j].iov_len; + } + + ssize_t ret = sendto(fd, buffer, total_len, MSG_DONTWAIT, + specs[i].dest_sa, + (specs[i].dest_sa->sa_family == AF_INET) ? + sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); + if (ret > 0) { + sent++; } else { - printf("unhandled udp backpressure!\n"); - errno = EAGAIN; - return sent + ret; + // Handle backpressure + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return sent; + } + return -1; } } - sent += ret; - run_length = 0; - last_socket = specs[i].peer_ctx; - //printf("different socket breask run!\n"); } - - /* Continue existing run or start a new one */ - //memset(&hdrs[i].msg_hdr, 0, sizeof(hdrs[i].msg_hdr)); - memset(&hdrs[run_length], 0, sizeof(hdrs[run_length])); - - hdrs[run_length].msg_hdr.msg_name = (void *) specs[i].dest_sa; - hdrs[run_length].msg_hdr.msg_namelen = (AF_INET == specs[i].dest_sa->sa_family ? - sizeof(struct sockaddr_in) : - sizeof(struct sockaddr_in6)), - hdrs[run_length].msg_hdr.msg_iov = specs[i].iov; - hdrs[run_length].msg_hdr.msg_iovlen = specs[i].iovlen; - hdrs[run_length].msg_hdr.msg_flags = 0; - - run_length++; } - - /* Send last run */ - if (run_length) { - int ret = bsd_sendmmsg(us_poll_fd((struct us_poll_t *) last_socket), hdrs, run_length, 0); - if (ret == -1) { - printf("backpressure! A\n"); - return sent; - } - if (sent + ret != n_specs) { - printf("backpressure! B\n"); - printf("errno is: %d\n", errno); - errno = EAGAIN; - } - //printf("Returning %d of %d\n", sent + ret, n_specs); - return sent + ret; - } - - //printf("Returning %d\n", n_specs); - -#endif - + return sent; +#else + // Windows implementation would go here return n_specs; +#endif } lsquic_conn_ctx_t *on_new_conn(void *stream_if_ctx, lsquic_conn_t *c) { @@ -313,31 +379,57 @@ lsquic_conn_ctx_t *on_new_conn(void *stream_if_ctx, lsquic_conn_t *c) { printf("Context is: %p\n", context); - /* We need to create some kind of socket here */ + if (!context) { + printf("ERROR: No context in on_new_conn\n"); + return NULL; + } + + /* For client connections, the context should already be the us_quic_socket_t */ + us_quic_socket_t *socket = (us_quic_socket_t *) lsquic_conn_get_ctx(c); + if (!socket) { + printf("ERROR: No socket found in connection context\n"); + return NULL; + } int is_client = 0; if (lsquic_conn_get_engine(c) == context->client_engine) { is_client = 1; } - context->on_open((us_quic_socket_t *) c, is_client); + if (context->on_open) { + context->on_open(socket, is_client); + } - return (lsquic_conn_ctx_t *) context; + return (lsquic_conn_ctx_t *) socket; } void us_quic_socket_create_stream(us_quic_socket_t *s, int ext_size) { - lsquic_conn_make_stream((lsquic_conn_t *) s); + if (!s || !s->lsquic_conn) { + printf("ERROR: Invalid socket or LSQUIC connection is null in create_stream\n"); + return; + } + + lsquic_conn_make_stream((lsquic_conn_t *) s->lsquic_conn); // here we need to allocate and attach the user data } void on_conn_closed(lsquic_conn_t *c) { - us_quic_socket_context_t *context = (us_quic_socket_context_t *) lsquic_conn_get_ctx(c); + us_quic_socket_t *socket = (us_quic_socket_t *) lsquic_conn_get_ctx(c); printf("on_conn_closed!\n"); - context->on_close((us_quic_socket_t *) c); + if (!socket) { + printf("ERROR: No socket found in connection context for close callback\n"); + return; + } + + // Get the context from the UDP socket to access callbacks + us_quic_socket_context_t *context = us_udp_socket_user(socket->udp_socket); + if (context && context->on_close) { + context->on_close(socket); + } } lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) { @@ -346,6 +438,11 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) { lsquic_stream_wantread(s, 1); us_quic_socket_context_t *context = stream_if_ctx; + + if (!context) { + printf("ERROR: No context in on_new_stream\n"); + return NULL; + } // the conn's ctx should point at the udp socket and the socket context // the ext size of streams and conn's are set by the listen/connect calls, which @@ -356,8 +453,13 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) { int ext_size = 256; void *ext = malloc(ext_size); - // yes hello - strcpy(ext, "Hello I am ext!"); + if (!ext) { + printf("ERROR: Failed to allocate stream extension memory\n"); + return NULL; + } + + // Initialize the memory to zero instead of strcpy + memset(ext, 0, ext_size); int is_client = 0; if (lsquic_conn_get_engine(lsquic_stream_conn(s)) == context->client_engine) { @@ -366,7 +468,10 @@ lsquic_stream_ctx_t *on_new_stream(void *stream_if_ctx, lsquic_stream_t *s) { // luckily we can set the ext before we return lsquic_stream_set_ctx(s, ext); - context->on_stream_open((us_quic_stream_t *) s, is_client); + + if (context->on_stream_open) { + context->on_stream_open((us_quic_stream_t *) s, is_client); + } return ext; } @@ -541,7 +646,6 @@ static void on_read(lsquic_stream_t *s, lsquic_stream_ctx_t *h) { } int us_quic_stream_write(us_quic_stream_t *s, char *data, int length) { - lsquic_stream_t *stream = (lsquic_stream_t *) s; int ret = lsquic_stream_write((lsquic_stream_t *) s, data, length); // just like otherwise, we automatically poll for writable when failed if (ret != length) { @@ -631,13 +735,23 @@ int server_name_cb(SSL *s, int *al, void *arg) { struct ssl_ctx_st *get_ssl_ctx(void *peer_ctx, const struct sockaddr *local) { printf("getting ssl ctx now, peer_ctx: %p\n", peer_ctx); - // peer_ctx point to the us_udp_socket_t that passed the UDP packet in via - // lsquic_engine_packet_in (it got passed as peer_ctx) - // we want the per-context ssl cert from this udp socket - struct us_udp_socket_t *udp_socket = (struct us_udp_socket_t *) peer_ctx; + if (!peer_ctx) { + printf("WARNING: No peer_ctx in get_ssl_ctx, using cached context\n"); + if (old_ctx) { + return old_ctx; + } + // Return a default context if none exists + return NULL; + } - // the udp socket of a server points to the context - struct us_quic_socket_context_s *context = us_udp_socket_user(udp_socket); + // peer_ctx now points to our quic_peer_ctx structure + struct quic_peer_ctx *qctx = (struct quic_peer_ctx *) peer_ctx; + if (!qctx || !qctx->context) { + printf("ERROR: No context found in peer context for SSL\n"); + return old_ctx; + } + + struct us_quic_socket_context_s *context = qctx->context; if (old_ctx) { return old_ctx; @@ -686,8 +800,6 @@ int log_buf_cb(void *logger_ctx, const char *buf, size_t len) { } int us_quic_stream_shutdown_read(us_quic_stream_t *s) { - lsquic_stream_t *stream = (lsquic_stream_t *) s; - int ret = lsquic_stream_shutdown((lsquic_stream_t *) s, 0); if (ret != 0) { printf("cannot shutdown stream!\n"); @@ -702,8 +814,6 @@ void *us_quic_stream_ext(us_quic_stream_t *s) { } void us_quic_stream_close(us_quic_stream_t *s) { - lsquic_stream_t *stream = (lsquic_stream_t *) s; - int ret = lsquic_stream_close((lsquic_stream_t *) s); if (ret != 0) { printf("cannot close stream!\n"); @@ -714,8 +824,6 @@ void us_quic_stream_close(us_quic_stream_t *s) { } int us_quic_stream_shutdown(us_quic_stream_t *s) { - lsquic_stream_t *stream = (lsquic_stream_t *) s; - int ret = lsquic_stream_shutdown((lsquic_stream_t *) s, 1); if (ret != 0) { printf("cannot shutdown stream!\n"); @@ -763,9 +871,9 @@ char pool[1000][4096]; int pool_top = 0; void *take() { - if (pool_top == 1000) { + if (pool_top >= 1000) { printf("out of memory\n"); - exit(0); + return NULL; // Don't exit, return NULL instead } return pool[pool_top++]; } @@ -810,20 +918,24 @@ int header_decode_heap_offset = 0; struct lsxpack_header *hsi_prepare_decode(void *hdr_set, struct lsxpack_header *hdr, size_t space) { //printf("hsi_prepare_decode\n"); + + // Validate space parameter - prevent buffer overflow + if (space > 4096 - sizeof(struct lsxpack_header)) { + printf("Space too large: %zu\n", space); + return NULL; // Don't exit, return NULL + } if (!hdr) { char *mem = take(); + if (!mem) { + printf("Failed to allocate memory from pool\n"); + return NULL; + } hdr = (struct lsxpack_header *) mem;//malloc(sizeof(struct lsxpack_header)); memset(hdr, 0, sizeof(struct lsxpack_header)); hdr->buf = mem + sizeof(struct lsxpack_header);//take();//malloc(space); lsxpack_header_prepare_decode(hdr, hdr->buf, 0, space); } else { - - if (space > 4096 - sizeof(struct lsxpack_header)) { - printf("not hanlded!\n"); - exit(0); - } - hdr->val_len = space; //hdr->buf = realloc(hdr->buf, space); } @@ -837,6 +949,11 @@ int hsi_process_header(void *hdr_set, struct lsxpack_header *hdr) { //printf("hsi_process_header: %p\n", hdr); + if (!hdr_set) { + printf("ERROR: hdr_set is null\n"); + return -1; + } + struct header_set_hd *hd = hdr_set; struct processed_header *proc_hdr = (struct processed_header *) (hd + 1); @@ -851,6 +968,21 @@ int hsi_process_header(void *hdr_set, struct lsxpack_header *hdr) { return 0; } + // Bounds check for header offset - prevent buffer overflow + if (hd->offset < 0 || hd->offset >= (4096 - sizeof(struct header_set_hd)) / sizeof(struct processed_header)) { + printf("ERROR: Header offset out of bounds: %d\n", hd->offset); + return -1; + } + + // Validate header buffer bounds + if (!hdr->buf || hdr->val_offset < 0 || hdr->name_offset < 0 || + hdr->val_len < 0 || hdr->name_len < 0 || + hdr->val_offset + hdr->val_len > 4096 || + hdr->name_offset + hdr->name_len > 4096) { + printf("ERROR: Invalid header buffer bounds\n"); + return -1; + } + /*if (hdr->hpack_index) { printf("header has hpack index: %d\n", hdr->hpack_index); } @@ -885,10 +1017,26 @@ void timer_cb(struct us_timer_t *t) { // lsquic_conn us_quic_socket_context_t *us_quic_socket_context(us_quic_socket_t *s) { - return (us_quic_socket_context_t *) lsquic_conn_get_ctx((lsquic_conn_t *) s); + if (!s || !s->udp_socket) { + printf("ERROR: Invalid socket or UDP socket is null\n"); + return NULL; + } + + // Get the context from the UDP socket's user data + us_quic_socket_context_t *context = us_udp_socket_user(s->udp_socket); + if (!context) { + printf("ERROR: No context found in UDP socket user data\n"); + return NULL; + } + + return context; } void *us_quic_socket_context_ext(us_quic_socket_context_t *context) { + if (!context) { + printf("ERROR: Context is null in us_quic_socket_context_ext\n"); + return NULL; + } return context + 1; } @@ -896,30 +1044,27 @@ void *us_quic_socket_context_ext(us_quic_socket_context_t *context) { us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop, us_quic_socket_context_options_t options, int ext_size) { - printf("Creating socket context with ssl: %s\n", options.key_file_name); - - // every _listen_ call creates a new udp socket that feeds inputs to the engine in the context - // every context has its own send buffer and udp send socket (not bound to any port or ip?) - - // or just make it so that once you listen, it will listen on that port for input, and the context will use - // the first udp socket for output as it doesn't matter which one is used - /* Holds all callbacks */ us_quic_socket_context_t *context = malloc(sizeof(struct us_quic_socket_context_s) + ext_size); + if (!context) { + return NULL; + } // the option is put on the socket context context->options = options; - context->loop = loop; - //context->udp_socket = 0; /* Allocate per thread, UDP packet buffers */ context->recv_buf = us_create_udp_packet_buffer(); - //context->send_buf = us_create_udp_packet_buffer(); + if (!context->recv_buf) { + free(context); + return NULL; + } /* Init lsquic engine */ if (0 != lsquic_global_init(LSQUIC_GLOBAL_CLIENT|LSQUIC_GLOBAL_SERVER)) { - exit(EXIT_FAILURE); + free(context); + return NULL; } static struct lsquic_stream_if stream_callbacks = { @@ -962,11 +1107,10 @@ us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop, ///printf("log: %d\n", lsquic_set_log_level("debug")); - static struct lsquic_logger_if logger = { - .log_buf = log_buf_cb, - }; - - + // Logger is not currently used - commented out to avoid unused variable warning + // static struct lsquic_logger_if logger = { + // .log_buf = log_buf_cb, + // }; //lsquic_logger_init(&logger, 0, LLTS_NONE); @@ -1008,7 +1152,21 @@ us_quic_socket_context_t *us_create_quic_socket_context(struct us_loop_t *loop, us_quic_listen_socket_t *us_quic_socket_context_listen(us_quic_socket_context_t *context, const char *host, int port, int ext_size) { /* We literally do create a listen socket */ - return (us_quic_listen_socket_t *) us_create_udp_socket(context->loop, /*context->recv_buf*/ NULL, on_udp_socket_data, on_udp_socket_writable, host, port, 0, context); + int err = 0; + // For QUIC sockets, we need to create a UDP socket with extension space to avoid buffer overflows + // when lsquic tries to access extension data + struct us_udp_socket_t *socket = us_create_udp_socket_with_ext(context->loop, on_udp_socket_data_wrapper, on_udp_socket_writable, NULL, host, port, 0, &err, context, sizeof(struct quic_peer_ctx)); + + if (socket) { + // Initialize the peer context in the extension area + struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)socket + sizeof(struct us_udp_socket_t)); + printf("Listen socket: %p, peer_ctx: %p, context: %p\n", socket, peer_ctx, context); + peer_ctx->udp_socket = socket; + peer_ctx->context = context; + memset(peer_ctx->reserved, 0, sizeof(peer_ctx->reserved)); + } + + return (us_quic_listen_socket_t *) socket; //return NULL; } @@ -1030,7 +1188,18 @@ us_quic_socket_t *us_quic_socket_context_connect(us_quic_socket_context_t *conte addr->sin6_family = AF_INET6; // Create the UDP socket binding to ephemeral port - struct us_udp_socket_t *udp_socket = us_create_udp_socket(context->loop, /*context->recv_buf*/ NULL, on_udp_socket_data_client, on_udp_socket_writable, 0, 0, 0, context); + int err = 0; + // For QUIC client sockets, we also need extension space to avoid buffer overflows + struct us_udp_socket_t *udp_socket = us_create_udp_socket_with_ext(context->loop, on_udp_socket_data_client_wrapper, on_udp_socket_writable, NULL, 0, 0, 0, &err, context, sizeof(struct quic_peer_ctx)); + + if (udp_socket) { + // Initialize the peer context in the extension area + struct quic_peer_ctx *peer_ctx = (struct quic_peer_ctx *)((char *)udp_socket + sizeof(struct us_udp_socket_t)); + printf("Client socket: %p, peer_ctx: %p, context: %p\n", udp_socket, peer_ctx, context); + peer_ctx->udp_socket = udp_socket; + peer_ctx->context = context; + memset(peer_ctx->reserved, 0, sizeof(peer_ctx->reserved)); + } // Determine what port we got, creating the local sockaddr int ephemeral = us_udp_socket_bound_port(udp_socket); @@ -1058,14 +1227,34 @@ us_quic_socket_t *us_quic_socket_context_connect(us_quic_socket_context_t *conte // we need 1 socket for servers, then we bind multiple ports to that one socket - void *client = lsquic_engine_connect(context->client_engine, LSQVER_I001, (struct sockaddr *) local_addr, (struct sockaddr *) addr, udp_socket, (lsquic_conn_ctx_t *) udp_socket, "sni", 0, 0, 0, 0, 0); + // Create the us_quic_socket_t structure first so we can pass it as context + us_quic_socket_t *quic_socket = malloc(sizeof(us_quic_socket_t) + ext_size); + if (!quic_socket) { + printf("ERROR: Failed to allocate QUIC socket structure\n"); + return NULL; + } + + quic_socket->udp_socket = udp_socket; + quic_socket->lsquic_conn = NULL; // Will be set after connection is created + + void *client = lsquic_engine_connect(context->client_engine, LSQVER_I001, (struct sockaddr *) local_addr, (struct sockaddr *) addr, udp_socket, (lsquic_conn_ctx_t *) quic_socket, "sni", 0, 0, 0, 0, 0); printf("Client: %p\n", client); - // this is requiored to even have packetgs sending out (run this in post) - lsquic_engine_process_conns(context->client_engine); + if (!client) { + printf("ERROR: Failed to create LSQUIC connection\n"); + free(quic_socket); + return NULL; + } + + // Now store the lsquic connection in our socket structure + quic_socket->lsquic_conn = client; + printf("Created QUIC socket: %p with UDP socket: %p and LSQUIC conn: %p\n", quic_socket, udp_socket, client); - return client; + // this is required to even have packets sending out (run this in post) + lsquic_engine_process_conns(context->client_engine); + + return quic_socket; } #endif diff --git a/packages/bun-usockets/src/quic.h b/packages/bun-usockets/src/quic.h index 6d33d27b61..244bc7cc41 100644 --- a/packages/bun-usockets/src/quic.h +++ b/packages/bun-usockets/src/quic.h @@ -17,6 +17,8 @@ typedef struct { typedef struct { /* Refers to either the shared listen socket or the client UDP socket */ void *udp_socket; + /* LSQUIC connection pointer for this socket */ + void *lsquic_conn; } us_quic_socket_t; struct us_quic_socket_context_s; diff --git a/packages/bun-usockets/src/udp.c b/packages/bun-usockets/src/udp.c index aa4069976d..7a15a59372 100644 --- a/packages/bun-usockets/src/udp.c +++ b/packages/bun-usockets/src/udp.c @@ -19,6 +19,7 @@ #include "internal/internal.h" #include +#include // int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index) { // return bsd_udp_packet_buffer_ecn((struct udp_recvbuf *)buf, index); @@ -187,4 +188,73 @@ struct us_udp_socket_t *us_create_udp_socket( us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); return (struct us_udp_socket_t *) udp; +} + +// Extended version for QUIC sockets that need extension data +struct us_udp_socket_t *us_create_udp_socket_with_ext( + struct us_loop_t *loop, + void (*data_cb)(struct us_udp_socket_t *, void *, int), + void (*drain_cb)(struct us_udp_socket_t *), + void (*close_cb)(struct us_udp_socket_t *), + const char *host, + unsigned short port, + int flags, + int *err, + void *user, + int ext_size +) { + + LIBUS_SOCKET_DESCRIPTOR fd = bsd_create_udp_socket(host, port, flags, err); + if (fd == LIBUS_SOCKET_ERROR) { + return 0; + } + + int fallthrough = 0; + + // Use the provided ext_size instead of hardcoded 0 + struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_udp_socket_t) + ext_size); + us_poll_init(p, fd, POLL_TYPE_UDP); + + struct us_udp_socket_t *udp = (struct us_udp_socket_t *)p; + + /* Get and store the port once */ + struct bsd_addr_t tmp = {0}; + bsd_local_addr(fd, &tmp); + udp->port = bsd_addr_get_port(&tmp); + udp->loop = loop; + + /* There is no udp socket context, only user data */ + /* This should really be ext like everything else */ + udp->user = user; + + udp->on_data = data_cb; + udp->on_drain = drain_cb; + udp->on_close = close_cb; + udp->next = NULL; + + us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); + + return (struct us_udp_socket_t *) udp; +} + +/* Structure to hold allocated UDP packet buffer and its data */ +struct us_udp_packet_buffer_wrapper { + struct udp_recvbuf buffer; + char data[LIBUS_RECV_BUFFER_LENGTH]; +}; + +struct us_udp_packet_buffer_t *us_create_udp_packet_buffer() { + /* Allocate wrapper structure to hold both buffer and data */ + struct us_udp_packet_buffer_wrapper *wrapper = + (struct us_udp_packet_buffer_wrapper *)malloc(sizeof(struct us_udp_packet_buffer_wrapper)); + + if (!wrapper) { + return NULL; + } + + /* Setup the receive buffer using the allocated data */ + bsd_udp_setup_recvbuf(&wrapper->buffer, wrapper->data, LIBUS_RECV_BUFFER_LENGTH); + + /* Return the buffer part (us_udp_packet_buffer_t is typedef for struct udp_recvbuf) */ + return (struct us_udp_packet_buffer_t *)&wrapper->buffer; } \ No newline at end of file diff --git a/src/bun.js/api.zig b/src/bun.js/api.zig index fd1f1b66e6..0ab4f34ea4 100644 --- a/src/bun.js/api.zig +++ b/src/bun.js/api.zig @@ -22,6 +22,7 @@ pub const SocketAddress = @import("./api/bun/socket.zig").SocketAddress; pub const TCPSocket = @import("./api/bun/socket.zig").TCPSocket; pub const TLSSocket = @import("./api/bun/socket.zig").TLSSocket; pub const SocketHandlers = @import("./api/bun/socket.zig").Handlers; +pub const QuicSocket = @import("./api/bun/quic_socket.zig").QuicSocket; pub const Subprocess = @import("./api/bun/subprocess.zig"); pub const HashObject = @import("./api/HashObject.zig"); diff --git a/src/bun.js/api/BunObject.zig b/src/bun.js/api/BunObject.zig index 62713db2a3..105f613487 100644 --- a/src/bun.js/api/BunObject.zig +++ b/src/bun.js/api/BunObject.zig @@ -27,6 +27,7 @@ pub const BunObject = struct { pub const mmap = toJSCallback(Bun.mmapFile); pub const nanoseconds = toJSCallback(Bun.nanoseconds); pub const openInEditor = toJSCallback(Bun.openInEditor); + pub const quic = toJSCallback(host_fn.wrapStaticMethod(api.QuicSocket, "quic", false)); pub const registerMacro = toJSCallback(Bun.registerMacro); pub const resolve = toJSCallback(Bun.resolve); pub const resolveSync = toJSCallback(Bun.resolveSync); @@ -167,6 +168,7 @@ pub const BunObject = struct { @export(&BunObject.mmap, .{ .name = callbackName("mmap") }); @export(&BunObject.nanoseconds, .{ .name = callbackName("nanoseconds") }); @export(&BunObject.openInEditor, .{ .name = callbackName("openInEditor") }); + @export(&BunObject.quic, .{ .name = callbackName("quic") }); @export(&BunObject.registerMacro, .{ .name = callbackName("registerMacro") }); @export(&BunObject.resolve, .{ .name = callbackName("resolve") }); @export(&BunObject.resolveSync, .{ .name = callbackName("resolveSync") }); diff --git a/src/bun.js/api/bun/quic_socket.zig b/src/bun.js/api/bun/quic_socket.zig new file mode 100644 index 0000000000..2adcca262c --- /dev/null +++ b/src/bun.js/api/bun/quic_socket.zig @@ -0,0 +1,797 @@ +pub const QuicSocket = struct { + const This = @This(); + + // JavaScript class bindings - following the same pattern as TCP/TLS sockets + pub const js = jsc.Codegen.JSQuicSocket; + pub const toJS = js.toJS; + pub const fromJS = js.fromJS; + pub const fromJSDirect = js.fromJSDirect; + + pub const new = bun.TrivialNew(@This()); + + const RefCount = bun.ptr.RefCount(@This(), "ref_count", deinit, .{}); + pub const ref = RefCount.ref; + pub const deref = RefCount.deref; + + // QUIC socket using uSockets QUIC API + socket: ?*uws.quic.Socket = null, + socket_context: ?*uws.quic.SocketContext = null, + listen_socket: ?*uws.quic.ListenSocket = null, + // Current stream for simple operations (will expand to support multiple streams) + current_stream: ?*uws.quic.Stream = null, + + flags: Flags = .{}, + ref_count: RefCount, + handlers: ?*QuicHandlers, + this_value: jsc.JSValue = .zero, + poll_ref: Async.KeepAlive = Async.KeepAlive.init(), + + // QUIC-specific fields + server_name: ?[]const u8 = null, + connection_id: ?[]const u8 = null, + stream_count: u32 = 0, + + has_pending_activity: std.atomic.Value(bool) = std.atomic.Value(bool).init(true), + + pub const Flags = packed struct { + is_server: bool = false, + is_connected: bool = false, + is_closed: bool = false, + has_backpressure: bool = false, + + // QUIC-specific flags + has_0rtt: bool = false, + is_migration_capable: bool = false, + + _: u26 = 0, + }; + + pub fn hasPendingActivity(this: *This) callconv(.C) bool { + return this.has_pending_activity.load(.acquire); + } + + pub fn memoryCost(_: *This) usize { + return @sizeOf(This); + } + + pub fn finalize(this: *This) void { + this.deinit(); + } + + pub fn deinit(this: *This) void { + this.poll_ref.unref(jsc.VirtualMachine.get()); + + if (this.handlers) |handlers| { + handlers.unprotect(); + bun.default_allocator.destroy(handlers); + } + + // Close QUIC socket if still open + if (this.socket != null and !this.flags.is_closed) { + this.closeImpl(); + } + + if (this.server_name) |server_name| { + bun.default_allocator.free(server_name); + } + + if (this.connection_id) |conn_id| { + bun.default_allocator.free(conn_id); + } + } + + // Initialize a new QUIC socket + pub fn init(allocator: std.mem.Allocator, handlers: *QuicHandlers) !*This { + const this = try allocator.create(This); + this.* = This{ + .ref_count = RefCount.init(), + .handlers = handlers, + }; + handlers.protect(); + return this; + } + + // Create QUIC socket context with callbacks + fn createContext(this: *This) !void { + if (this.socket_context != null) return; + + const loop = uws.Loop.get(); + + // For now, create context without TLS certificates (would need to be configured) + const options = uws.quic.SocketContextOptions{ + .cert_file_name = null, + .key_file_name = null, + .passphrase = null, + }; + + const context = uws.quic.SocketContext.create(loop, options, @sizeOf(*This)) orelse return error.ContextCreationFailed; + + this.socket_context = context; + + // Set up callbacks + context.onOpen(onSocketOpen); + context.onClose(onSocketClose); + context.onStreamOpen(onStreamOpen); + context.onStreamData(onStreamData); + context.onStreamClose(onStreamClose); + context.onStreamEnd(onStreamEnd); + context.onStreamWritable(onStreamWritable); + + // Store reference to this instance in context extension data + const ext_data = context.ext(); + if (ext_data) |ext| { + const this_ptr: **This = @ptrCast(@alignCast(ext)); + this_ptr.* = this; + } + + log("QUIC socket context created", .{}); + } + + // Connect to a QUIC server + pub fn connectImpl(this: *This, hostname: []const u8, port: u16) !void { + if (this.socket_context == null) { + try this.createContext(); + } + + this.server_name = try bun.default_allocator.dupe(u8, hostname); + + // Convert hostname to null-terminated string for C API + const hostname_cstr = try bun.default_allocator.dupeZ(u8, hostname); + defer bun.default_allocator.free(hostname_cstr); + + // Create outgoing QUIC connection + const socket = this.socket_context.?.connect(hostname_cstr.ptr, @intCast(port), @sizeOf(*This)) orelse return error.ConnectionFailed; + + this.socket = socket; + + // Note: Socket extension data access will be handled through the socket context + // The this pointer is already stored in the context extension data + + log("QUIC connect to {s}:{} initiated", .{ hostname, port }); + } + + // Listen for QUIC connections (server mode) + pub fn listenImpl(this: *This, hostname: []const u8, port: u16) !void { + if (this.socket_context == null) { + try this.createContext(); + } + + this.flags.is_server = true; + + // Convert hostname to null-terminated string for C API + const hostname_cstr = try bun.default_allocator.dupeZ(u8, hostname); + defer bun.default_allocator.free(hostname_cstr); + + // Start listening for QUIC connections + const listen_socket = this.socket_context.?.listen(hostname_cstr.ptr, @intCast(port), @sizeOf(*This)) orelse return error.ListenFailed; + + this.listen_socket = listen_socket; + + log("QUIC listening on {s}:{}", .{ hostname, port }); + } + + // Close the QUIC connection + pub fn closeImpl(this: *This) void { + if (this.flags.is_closed) return; + this.flags.is_closed = true; + this.has_pending_activity.store(false, .release); + + // Close current stream if exists + if (this.current_stream) |stream| { + stream.close(); + this.current_stream = null; + } + + // Close socket (this will be handled by uSockets cleanup) + this.socket = null; + this.listen_socket = null; + + log("QUIC connection closed", .{}); + } + + // Write data to the QUIC connection + pub fn writeImpl(this: *This, data: []const u8) !usize { + if (this.flags.is_closed) return error.SocketClosed; + if (!this.flags.is_connected) return error.NotConnected; + + // Ensure we have a stream to write to + if (this.current_stream == null) { + if (this.socket) |socket| { + socket.createStream(0); // No extra data needed for stream extension + // Wait a moment for the stream to be created via callback + // In a real implementation, this might need to be asynchronous + } else { + return error.NoSocket; + } + } + + if (this.current_stream) |stream| { + const bytes_written = stream.write(data); + if (bytes_written < 0) { + return error.WriteFailed; + } + return @intCast(bytes_written); + } + + return error.NoStream; + } + + // Read data from the QUIC connection + pub fn readImpl(this: *This, buffer: []u8) !usize { + if (this.flags.is_closed) return error.SocketClosed; + if (!this.flags.is_connected) return error.NotConnected; + + // QUIC reading is event-driven through the onStreamData callback + // This method is kept for API compatibility but actual data comes through events + _ = buffer; // Suppress unused variable warning + log("QUIC read called - data comes through onStreamData events", .{}); + return 0; + } + + // Create a new QUIC stream + pub fn createStreamImpl(this: *This) !u32 { + if (this.flags.is_closed) return error.SocketClosed; + if (!this.flags.is_connected) return error.NotConnected; + + if (this.socket) |socket| { + socket.createStream(@sizeOf(*uws.quic.Stream)); + this.stream_count += 1; + log("QUIC stream #{} created", .{this.stream_count}); + return this.stream_count; + } + + return error.NoSocket; + } + + // Get connection statistics + pub fn getQuicStats(this: *This) QuicStats { + return QuicStats{ + .stream_count = this.stream_count, + .is_connected = this.flags.is_connected, + .has_0rtt = this.flags.has_0rtt, + .bytes_sent = 0, // TODO: Track actual bytes + .bytes_received = 0, // TODO: Track actual bytes + }; + } + + // JavaScript method bindings + pub fn connect(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const args = callframe.arguments(); + if (args.len < 2) return globalThis.throw("connect requires hostname and port", .{}); + + const hostname_js = args.ptr[0]; + const port_js = args.ptr[1]; + + if (!hostname_js.isString()) return globalThis.throw("hostname must be a string", .{}); + if (!port_js.isNumber()) return globalThis.throw("port must be a number", .{}); + + var hostname_slice = try hostname_js.getZigString(globalThis); + const hostname = hostname_slice.slice(); + const port = port_js.to(u16); + + this.connectImpl(hostname, port) catch |err| { + return globalThis.throwError(err, "Failed to connect"); + }; + + return .js_undefined; + } + + pub fn listen(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const args = callframe.arguments(); + if (args.len < 2) return globalThis.throw("listen requires hostname and port", .{}); + + const hostname_js = args.ptr[0]; + const port_js = args.ptr[1]; + + if (!hostname_js.isString()) return globalThis.throw("hostname must be a string", .{}); + if (!port_js.isNumber()) return globalThis.throw("port must be a number", .{}); + + var hostname_slice = try hostname_js.getZigString(globalThis); + const hostname = hostname_slice.slice(); + const port = port_js.to(u16); + + this.listenImpl(hostname, port) catch |err| { + return globalThis.throwError(err, "Failed to listen"); + }; + + return .js_undefined; + } + + pub fn write(this: *This, globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const args = callframe.arguments(); + if (args.len < 1) return globalThis.throw("write requires data", .{}); + + const data_js = args.ptr[0]; + if (data_js.isEmptyOrUndefinedOrNull()) { + return globalThis.throw("data cannot be null or undefined", .{}); + } + + // Convert JS value to byte array + var data_slice: []const u8 = undefined; + + if (data_js.asArrayBuffer(globalThis)) |array_buffer| { + data_slice = array_buffer.slice(); + } else if (data_js.isString()) { + var zig_str = try data_js.getZigString(globalThis); + data_slice = zig_str.slice(); + } else { + return globalThis.throw("data must be a string or ArrayBuffer", .{}); + } + + const bytes_written = this.writeImpl(data_slice) catch |err| { + return switch (err) { + error.SocketClosed => globalThis.throw("Socket is closed", .{}), + error.NotConnected => globalThis.throw("Socket is not connected", .{}), + error.NoSocket => globalThis.throw("No socket available", .{}), + error.NoStream => globalThis.throw("No stream available", .{}), + error.WriteFailed => globalThis.throw("Write operation failed", .{}), + }; + }; + + return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(bytes_written))); + } + + pub fn read(this: *This, globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + // QUIC reading is event-driven, so this just returns status info + const bytes_read = this.readImpl(&[_]u8{}) catch |err| { + return switch (err) { + error.SocketClosed => globalThis.throw("Socket is closed", .{}), + error.NotConnected => globalThis.throw("Socket is not connected", .{}), + }; + }; + + return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(bytes_read))); + } + + pub fn createStream(this: *This, globalThis: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + const stream_id = this.createStreamImpl() catch { + return globalThis.throw("Failed to create stream", .{}); + }; + + return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stream_id))); + } + + pub fn close(this: *This, _: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + this.closeImpl(); + return .js_undefined; + } + + // Property getters + pub fn getServerName(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue { + if (this.server_name) |server_name| { + return jsc.ZigString.init(server_name).toJS(globalThis); + } + return jsc.JSValue.jsNull(); + } + + pub fn setServerName(_: *This, _: *jsc.JSGlobalObject, value: jsc.JSValue) void { + if (value.isString()) { + // TODO: Implement setting server name + } + } + + pub fn getConnectionId(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue { + if (this.connection_id) |conn_id| { + return jsc.ZigString.init(conn_id).toJS(globalThis); + } + return jsc.JSValue.jsNull(); + } + + pub fn getStreamCount(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue { + return jsc.JSValue.jsNumber(@as(f64, @floatFromInt(this.stream_count))); + } + + pub fn getIsConnected(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue { + return jsc.JSValue.jsBoolean(this.flags.is_connected); + } + + pub fn getIsServer(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue { + return jsc.JSValue.jsBoolean(this.flags.is_server); + } + + pub fn getHas0RTT(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue { + return jsc.JSValue.jsBoolean(this.flags.has_0rtt); + } + + pub fn getStats(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue { + const stats = this.getQuicStats(); + + // Create a JavaScript object with the stats + const stats_obj = jsc.JSValue.createEmptyObject(globalThis, 5); + stats_obj.put(globalThis, "streamCount", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.stream_count)))); + stats_obj.put(globalThis, "isConnected", jsc.JSValue.jsBoolean(stats.is_connected)); + stats_obj.put(globalThis, "has0RTT", jsc.JSValue.jsBoolean(stats.has_0rtt)); + stats_obj.put(globalThis, "bytesSent", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.bytes_sent)))); + stats_obj.put(globalThis, "bytesReceived", jsc.JSValue.jsNumber(@as(f64, @floatFromInt(stats.bytes_received)))); + + return stats_obj; + } + + pub fn getData(this: *This, _: *jsc.JSGlobalObject) jsc.JSValue { + _ = this; // TODO: Implement data storage + return .js_undefined; + } + + pub fn setData(this: *This, globalThis: *jsc.JSGlobalObject, value: jsc.JSValue) void { + _ = this; // TODO: Implement data storage + _ = globalThis; + _ = value; + } + + pub fn getReadyState(this: *This, globalThis: *jsc.JSGlobalObject) jsc.JSValue { + if (this.flags.is_closed) { + return jsc.ZigString.init("closed").toJS(globalThis); + } else if (this.flags.is_connected) { + return jsc.ZigString.init("open").toJS(globalThis); + } else { + return jsc.ZigString.init("connecting").toJS(globalThis); + } + } + + pub fn jsRef(this: *This, globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + this.poll_ref.ref(globalObject.bunVM()); + return .js_undefined; + } + + pub fn jsUnref(this: *This, globalObject: *jsc.JSGlobalObject, _: *jsc.CallFrame) bun.JSError!jsc.JSValue { + this.poll_ref.unref(globalObject.bunVM()); + return .js_undefined; + } + + // Static method for Bun.quic() API + pub fn quic(globalThis: *jsc.JSGlobalObject, options: jsc.JSValue) bun.JSError!jsc.JSValue { + if (options.isEmptyOrUndefinedOrNull() or !options.isObject()) { + return globalThis.throw("quic requires options object", .{}); + } + + // Determine if this is a server socket + const is_server = if (try options.get(globalThis, "server")) |server_val| + server_val.toBoolean() + else + false; + + // Create handlers from options + const handlers = QuicHandlers.fromJS(globalThis, options, is_server) catch { + return globalThis.throw("Invalid QUIC handlers", .{}); + }; + + // Allocate handlers on heap + const handlers_ptr = try bun.default_allocator.create(QuicHandlers); + handlers_ptr.* = handlers; + handlers_ptr.withAsyncContextIfNeeded(globalThis); + + // Initialize QUIC socket + const this = QuicSocket.init(bun.default_allocator, handlers_ptr) catch { + handlers_ptr.unprotect(); + bun.default_allocator.destroy(handlers_ptr); + return globalThis.throw("Failed to create QUIC socket", .{}); + }; + + // Configure from options + if (try options.get(globalThis, "hostname")) |hostname_val| { + if (hostname_val.isString()) { + var hostname_slice = try hostname_val.getZigString(globalThis); + const hostname = hostname_slice.slice(); + + const port_val = (try options.get(globalThis, "port")) orelse jsc.JSValue.jsNumber(443); + const port = if (port_val.isNumber()) port_val.to(u16) else 443; + + if (is_server) { + this.listenImpl(hostname, port) catch { + this.deref(); + return globalThis.throw("Failed to listen", .{}); + }; + } else { + this.connectImpl(hostname, port) catch { + this.deref(); + return globalThis.throw("Failed to connect", .{}); + }; + } + } + } + + // Set up JavaScript value and return + this.this_value = this.toJS(globalThis); + this.poll_ref.ref(globalThis.bunVM()); + + return this.this_value; + } + + // uSockets callback handlers + fn onSocketOpen(socket: *uws.quic.Socket, is_client: c_int) callconv(.C) void { + jsc.markBinding(@src()); + + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + this.socket = socket; + this.flags.is_connected = true; + this.has_pending_activity.store(true, .release); + + log("QUIC socket opened (client: {})", .{is_client != 0}); + + // Call appropriate JavaScript handlers + if (this.handlers) |handlers| { + const vm = handlers.vm; + const event_loop = vm.eventLoop(); + event_loop.enter(); + defer event_loop.exit(); + + // For server mode, when a client connects, call onConnection handler + if (this.flags.is_server and is_client == 0) { + if (handlers.onConnection != .zero) { + _ = handlers.onConnection.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| { + this.callErrorHandler(handlers.globalObject.takeException(err)); + return; + }; + } + } + + // Call onOpen handler for all cases (server startup and client connection) + if (handlers.onOpen != .zero) { + _ = handlers.onOpen.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| { + this.callErrorHandler(handlers.globalObject.takeException(err)); + }; + } + } + } + + fn onSocketClose(socket: *uws.quic.Socket) callconv(.C) void { + jsc.markBinding(@src()); + + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + this.flags.is_connected = false; + this.flags.is_closed = true; + this.has_pending_activity.store(false, .release); + + log("QUIC socket closed", .{}); + + // Call JavaScript onClose handler + if (this.handlers) |handlers| { + if (handlers.onClose != .zero) { + const vm = handlers.vm; + const event_loop = vm.eventLoop(); + event_loop.enter(); + defer event_loop.exit(); + + _ = handlers.onClose.call(handlers.globalObject, this.this_value, &.{this.this_value}) catch |err| { + this.callErrorHandler(handlers.globalObject.takeException(err)); + }; + } + } + } + + fn onStreamOpen(stream: *uws.quic.Stream, is_client: c_int) callconv(.C) void { + jsc.markBinding(@src()); + + const socket = stream.socket() orelse return; + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + this.current_stream = stream; + + // Mark connection as established when first stream opens successfully + if (!this.flags.is_connected) { + this.flags.is_connected = true; + log("QUIC connection now established after stream open", .{}); + } + + log("QUIC stream opened (client: {})", .{is_client != 0}); + } + + fn onStreamData(stream: *uws.quic.Stream, data: [*c]u8, length: c_int) callconv(.C) void { + jsc.markBinding(@src()); + + const socket = stream.socket() orelse return; + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + if (length <= 0) return; + + const data_slice = data[0..@intCast(length)]; + log("QUIC stream received {} bytes", .{length}); + + // Call JavaScript onMessage handler + if (this.handlers) |handlers| { + if (handlers.onMessage != .zero) { + const vm = handlers.vm; + const event_loop = vm.eventLoop(); + event_loop.enter(); + defer event_loop.exit(); + + const array_buffer = jsc.ArrayBuffer.createBuffer(handlers.globalObject, data_slice) catch { + this.callErrorHandler(jsc.JSValue.jsNull()); + return; + }; + _ = handlers.onMessage.call(handlers.globalObject, this.this_value, &.{ this.this_value, array_buffer }) catch |err| { + this.callErrorHandler(handlers.globalObject.takeException(err)); + }; + } + } + } + + fn onStreamClose(stream: *uws.quic.Stream) callconv(.C) void { + jsc.markBinding(@src()); + + const socket = stream.socket() orelse return; + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + if (this.current_stream == stream) { + this.current_stream = null; + } + + log("QUIC stream closed", .{}); + } + + fn onStreamEnd(stream: *uws.quic.Stream) callconv(.C) void { + jsc.markBinding(@src()); + + const socket = stream.socket() orelse return; + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + _ = this; // Use this if needed for future functionality + + log("QUIC stream ended", .{}); + } + + fn onStreamWritable(stream: *uws.quic.Stream) callconv(.C) void { + jsc.markBinding(@src()); + + const socket = stream.socket() orelse return; + const context = socket.context() orelse return; + const ext_data = context.ext() orelse return; + const this_ptr: **This = @ptrCast(@alignCast(ext_data)); + const this: *This = this_ptr.*; + + _ = this; // Use this if needed for future functionality + + log("QUIC stream writable", .{}); + } + + // Error handler helper + fn callErrorHandler(this: *This, exception: jsc.JSValue) void { + if (this.handlers) |handlers| { + if (handlers.onError != .zero) { + const vm = handlers.vm; + const event_loop = vm.eventLoop(); + event_loop.enter(); + defer event_loop.exit(); + + _ = handlers.onError.call(handlers.globalObject, this.this_value, &.{ this.this_value, exception }) catch { + // If error handler itself throws, we can't do much more + log("Error in QUIC error handler", .{}); + }; + } + } + } +}; + +pub const QuicStats = struct { + stream_count: u32, + is_connected: bool, + has_0rtt: bool, + bytes_sent: u64, + bytes_received: u64, +}; + +const std = @import("std"); +const bun = @import("bun"); +const Environment = bun.Environment; +const Async = bun.Async; + +const jsc = bun.jsc; +const SocketAddress = @import("./socket/SocketAddress.zig"); +const Handlers = @import("./socket/Handlers.zig"); +const uws = @import("../../../deps/uws.zig"); +const log = bun.Output.scoped(.QuicSocket, false); + +// QUIC-specific handlers that use different callback names than regular sockets +pub const QuicHandlers = struct { + onOpen: jsc.JSValue = .zero, // "open" callback + onMessage: jsc.JSValue = .zero, // "message" callback + onClose: jsc.JSValue = .zero, // "close" callback + onError: jsc.JSValue = .zero, // "error" callback + onConnection: jsc.JSValue = .zero, // "connection" callback (server only) + + vm: *jsc.VirtualMachine, + globalObject: *jsc.JSGlobalObject, + is_server: bool, + + protection_count: bun.DebugOnly(u32) = if (Environment.isDebug) 0, + + pub fn fromJS(globalObject: *jsc.JSGlobalObject, opts: jsc.JSValue, is_server: bool) bun.JSError!QuicHandlers { + var handlers = QuicHandlers{ + .vm = globalObject.bunVM(), + .globalObject = globalObject, + .is_server = is_server, + }; + + if (opts.isEmptyOrUndefinedOrNull() or opts.isBoolean() or !opts.isObject()) { + return globalObject.throwInvalidArguments("Expected options to be an object", .{}); + } + + // Map QUIC callback names to handler fields + const pairs = .{ + .{ "onOpen", "open" }, + .{ "onMessage", "message" }, + .{ "onClose", "close" }, + .{ "onError", "error" }, + .{ "onConnection", "connection" }, + }; + + inline for (pairs) |pair| { + if (try opts.getTruthyComptime(globalObject, pair.@"1")) |callback_value| { + if (!callback_value.isCell() or !callback_value.isCallable()) { + return globalObject.throwInvalidArguments("Expected \"{s}\" callback to be a function", .{pair[1]}); + } + + @field(handlers, pair.@"0") = callback_value; + } + } + + // For QUIC, we need at least an open callback or error callback + if (handlers.onOpen == .zero and handlers.onError == .zero) { + return globalObject.throwInvalidArguments("Expected at least \"open\" or \"error\" callback", .{}); + } + + return handlers; + } + + pub fn unprotect(this: *QuicHandlers) void { + if (this.vm.isShuttingDown()) { + return; + } + + if (comptime Environment.isDebug) { + bun.assert(this.protection_count > 0); + this.protection_count -= 1; + } + this.onOpen.unprotect(); + this.onMessage.unprotect(); + this.onClose.unprotect(); + this.onError.unprotect(); + this.onConnection.unprotect(); + } + + pub fn protect(this: *QuicHandlers) void { + if (comptime Environment.isDebug) { + this.protection_count += 1; + } + this.onOpen.protect(); + this.onMessage.protect(); + this.onClose.protect(); + this.onError.protect(); + this.onConnection.protect(); + } + + pub fn withAsyncContextIfNeeded(this: *QuicHandlers, globalObject: *jsc.JSGlobalObject) void { + inline for (.{ + "onOpen", + "onMessage", + "onClose", + "onError", + "onConnection", + }) |field| { + const value = @field(this, field); + if (value != .zero) { + @field(this, field) = value.withAsyncContextIfNeeded(globalObject); + } + } + } +}; diff --git a/src/bun.js/api/sockets.classes.ts b/src/bun.js/api/sockets.classes.ts index 95500fbe9e..9f0ffee3cf 100644 --- a/src/bun.js/api/sockets.classes.ts +++ b/src/bun.js/api/sockets.classes.ts @@ -241,6 +241,86 @@ const sslOnly = { export default [ generate(true), generate(false), + // QUIC Socket + define({ + name: "QuicSocket", + JSType: "0b11101110", + hasPendingActivity: true, + noConstructor: true, + configurable: false, + memoryCost: true, + proto: { + connect: { + fn: "connect", + length: 2, + }, + listen: { + fn: "listen", + length: 2, + }, + write: { + fn: "write", + length: 1, + }, + read: { + fn: "read", + length: 1, + }, + createStream: { + fn: "createStream", + length: 0, + }, + close: { + fn: "close", + length: 0, + }, + "@@dispose": { + fn: "close", + length: 0, + }, + ref: { + fn: "jsRef", + length: 0, + }, + unref: { + fn: "jsUnref", + length: 0, + }, + serverName: { + getter: "getServerName", + setter: "setServerName", + }, + connectionId: { + getter: "getConnectionId", + }, + streamCount: { + getter: "getStreamCount", + }, + isConnected: { + getter: "getIsConnected", + }, + isServer: { + getter: "getIsServer", + }, + has0RTT: { + getter: "getHas0RTT", + }, + stats: { + getter: "getStats", + }, + data: { + getter: "getData", + cache: true, + setter: "setData", + }, + readyState: { + getter: "getReadyState", + }, + }, + finalize: true, + construct: true, + klass: {}, + }), define({ name: "Listener", noConstructor: true, diff --git a/src/bun.js/bindings/BunObject+exports.h b/src/bun.js/bindings/BunObject+exports.h index e3966cdc97..8c8d185b1a 100644 --- a/src/bun.js/bindings/BunObject+exports.h +++ b/src/bun.js/bindings/BunObject+exports.h @@ -59,6 +59,7 @@ macro(mmap) \ macro(nanoseconds) \ macro(openInEditor) \ + macro(quic) \ macro(registerMacro) \ macro(resolve) \ macro(resolveSync) \ diff --git a/src/bun.js/bindings/BunObject.cpp b/src/bun.js/bindings/BunObject.cpp index 790ad54588..18fd9b4ba8 100644 --- a/src/bun.js/bindings/BunObject.cpp +++ b/src/bun.js/bindings/BunObject.cpp @@ -754,6 +754,7 @@ JSC_DEFINE_HOST_FUNCTION(functionFileURLToPath, (JSC::JSGlobalObject * globalObj pathToFileURL functionPathToFileURL DontDelete|Function 1 peek constructBunPeekObject DontDelete|PropertyCallback plugin constructPluginObject ReadOnly|DontDelete|PropertyCallback + quic BunObject_callback_quic DontDelete|Function 1 randomUUIDv7 Bun__randomUUIDv7 DontDelete|Function 2 randomUUIDv5 Bun__randomUUIDv5 DontDelete|Function 3 readableStreamToArray JSBuiltin Builtin|Function 1 diff --git a/src/bun.js/bindings/generated_classes_list.zig b/src/bun.js/bindings/generated_classes_list.zig index d5fd4778bc..87af774979 100644 --- a/src/bun.js/bindings/generated_classes_list.zig +++ b/src/bun.js/bindings/generated_classes_list.zig @@ -48,6 +48,7 @@ pub const Classes = struct { pub const ResourceUsage = api.Subprocess.ResourceUsage; pub const TCPSocket = api.TCPSocket; pub const TLSSocket = api.TLSSocket; + pub const QuicSocket = api.QuicSocket; pub const UDPSocket = api.UDPSocket; pub const SocketAddress = api.SocketAddress; pub const TextDecoder = webcore.TextDecoder; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 9050e98730..546c38e3ad 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -26,6 +26,7 @@ pub const ListenSocket = @import("./uws/ListenSocket.zig").ListenSocket; pub const State = @import("./uws/Response.zig").State; pub const Loop = @import("./uws/Loop.zig").Loop; pub const udp = @import("./uws/udp.zig"); +pub const quic = @import("./uws/quic.zig"); pub const BodyReaderMixin = @import("./uws/BodyReaderMixin.zig").BodyReaderMixin; pub const LIBUS_TIMEOUT_GRANULARITY = @as(i32, 4); diff --git a/src/deps/uws/quic.zig b/src/deps/uws/quic.zig new file mode 100644 index 0000000000..ee74d7461b --- /dev/null +++ b/src/deps/uws/quic.zig @@ -0,0 +1,176 @@ +const quic = @This(); + +const std = @import("std"); +const bun = @import("bun"); +const uws = @import("../uws.zig"); + +const Loop = uws.Loop; + +/// QUIC socket context options for creating contexts +pub const SocketContextOptions = extern struct { + cert_file_name: [*c]const u8, + key_file_name: [*c]const u8, + passphrase: [*c]const u8, +}; + +/// QUIC socket context - holds shared state and configuration +pub const SocketContext = opaque { + /// Create a new QUIC socket context + pub fn create(loop: *Loop, options: SocketContextOptions, ext_size: c_int) ?*SocketContext { + return us_create_quic_socket_context(loop, options, ext_size); + } + + /// Start listening for QUIC connections + pub fn listen(this: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*ListenSocket { + return us_quic_socket_context_listen(this, host, port, ext_size); + } + + /// Create an outgoing QUIC connection + pub fn connect(this: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*Socket { + return us_quic_socket_context_connect(this, host, port, ext_size); + } + + /// Get extension data for this context + pub fn ext(this: *SocketContext) ?*anyopaque { + return us_quic_socket_context_ext(this); + } + + /// Set header for HTTP/3 requests + pub fn setHeader(this: *SocketContext, index: c_int, key: [*c]const u8, key_length: c_int, value: [*c]const u8, value_length: c_int) void { + us_quic_socket_context_set_header(this, index, key, key_length, value, value_length); + } + + /// Send headers on a stream + pub fn sendHeaders(this: *SocketContext, stream: *Stream, num: c_int, has_body: c_int) void { + us_quic_socket_context_send_headers(this, stream, num, has_body); + } + + /// Get header from received headers + pub fn getHeader(this: *SocketContext, index: c_int, name: [*c][*c]u8, name_length: [*c]c_int, value: [*c][*c]u8, value_length: [*c]c_int) c_int { + return us_quic_socket_context_get_header(this, index, name, name_length, value, value_length); + } + + // Callback setters + pub fn onStreamData(this: *SocketContext, callback: *const fn (*Stream, [*c]u8, c_int) callconv(.C) void) void { + us_quic_socket_context_on_stream_data(this, callback); + } + + pub fn onStreamEnd(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void { + us_quic_socket_context_on_stream_end(this, callback); + } + + pub fn onStreamHeaders(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void { + us_quic_socket_context_on_stream_headers(this, callback); + } + + pub fn onStreamOpen(this: *SocketContext, callback: *const fn (*Stream, c_int) callconv(.C) void) void { + us_quic_socket_context_on_stream_open(this, callback); + } + + pub fn onStreamClose(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void { + us_quic_socket_context_on_stream_close(this, callback); + } + + pub fn onOpen(this: *SocketContext, callback: *const fn (*Socket, c_int) callconv(.C) void) void { + us_quic_socket_context_on_open(this, callback); + } + + pub fn onClose(this: *SocketContext, callback: *const fn (*Socket) callconv(.C) void) void { + us_quic_socket_context_on_close(this, callback); + } + + pub fn onStreamWritable(this: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void { + us_quic_socket_context_on_stream_writable(this, callback); + } +}; + +/// QUIC listen socket - represents a listening QUIC socket +pub const ListenSocket = opaque { + // Listen sockets are created by SocketContext.listen() + // and typically don't need many methods beyond what's inherited +}; + +/// QUIC socket - represents a QUIC connection +pub const Socket = opaque { + /// Get the socket context for this socket + pub fn context(this: *Socket) ?*SocketContext { + return us_quic_socket_context(this); + } + + /// Create a new stream on this QUIC connection + pub fn createStream(this: *Socket, ext_size: c_int) void { + us_quic_socket_create_stream(this, ext_size); + } +}; + +/// QUIC stream - represents a single stream within a QUIC connection +pub const Stream = opaque { + /// Write data to the stream + pub fn write(this: *Stream, data: []const u8) c_int { + return us_quic_stream_write(this, @ptrCast(@constCast(data.ptr)), @intCast(data.len)); + } + + /// Get the socket that owns this stream + pub fn socket(this: *Stream) ?*Socket { + return us_quic_stream_socket(this); + } + + /// Get extension data for this stream + pub fn ext(this: *Stream) ?*anyopaque { + return us_quic_stream_ext(this); + } + + /// Check if this stream is from a client connection + pub fn isClient(this: *Stream) bool { + return us_quic_stream_is_client(this) != 0; + } + + /// Shutdown the stream for writing + pub fn shutdown(this: *Stream) c_int { + return us_quic_stream_shutdown(this); + } + + /// Shutdown the stream for reading + pub fn shutdownRead(this: *Stream) c_int { + return us_quic_stream_shutdown_read(this); + } + + /// Close the stream + pub fn close(this: *Stream) void { + us_quic_stream_close(this); + } +}; + +// External C function declarations +extern fn us_create_quic_socket_context(loop: *Loop, options: SocketContextOptions, ext_size: c_int) ?*SocketContext; +extern fn us_quic_socket_context_listen(context: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*ListenSocket; +extern fn us_quic_socket_context_connect(context: *SocketContext, host: [*c]const u8, port: c_int, ext_size: c_int) ?*Socket; +extern fn us_quic_socket_context_ext(context: *SocketContext) ?*anyopaque; +extern fn us_quic_socket_context(socket: *Socket) ?*SocketContext; + +// Stream functions +extern fn us_quic_stream_write(stream: *Stream, data: [*c]u8, length: c_int) c_int; +extern fn us_quic_stream_socket(stream: *Stream) ?*Socket; +extern fn us_quic_stream_ext(stream: *Stream) ?*anyopaque; +extern fn us_quic_stream_is_client(stream: *Stream) c_int; +extern fn us_quic_stream_shutdown(stream: *Stream) c_int; +extern fn us_quic_stream_shutdown_read(stream: *Stream) c_int; +extern fn us_quic_stream_close(stream: *Stream) void; + +// Socket functions +extern fn us_quic_socket_create_stream(socket: *Socket, ext_size: c_int) void; + +// Header functions +extern fn us_quic_socket_context_set_header(context: *SocketContext, index: c_int, key: [*c]const u8, key_length: c_int, value: [*c]const u8, value_length: c_int) void; +extern fn us_quic_socket_context_send_headers(context: *SocketContext, stream: *Stream, num: c_int, has_body: c_int) void; +extern fn us_quic_socket_context_get_header(context: *SocketContext, index: c_int, name: [*c][*c]u8, name_length: [*c]c_int, value: [*c][*c]u8, value_length: [*c]c_int) c_int; + +// Callback registration functions +extern fn us_quic_socket_context_on_stream_data(context: *SocketContext, callback: *const fn (*Stream, [*c]u8, c_int) callconv(.C) void) void; +extern fn us_quic_socket_context_on_stream_end(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void; +extern fn us_quic_socket_context_on_stream_headers(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void; +extern fn us_quic_socket_context_on_stream_open(context: *SocketContext, callback: *const fn (*Stream, c_int) callconv(.C) void) void; +extern fn us_quic_socket_context_on_stream_close(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void; +extern fn us_quic_socket_context_on_open(context: *SocketContext, callback: *const fn (*Socket, c_int) callconv(.C) void) void; +extern fn us_quic_socket_context_on_close(context: *SocketContext, callback: *const fn (*Socket) callconv(.C) void) void; +extern fn us_quic_socket_context_on_stream_writable(context: *SocketContext, callback: *const fn (*Stream) callconv(.C) void) void; diff --git a/test/js/bun/quic/quic-basic.test.ts b/test/js/bun/quic/quic-basic.test.ts new file mode 100644 index 0000000000..3b330f6c04 --- /dev/null +++ b/test/js/bun/quic/quic-basic.test.ts @@ -0,0 +1,135 @@ +import { test, expect } from "bun:test"; + +test("Bun.quic should be available", () => { + expect(typeof Bun.quic).toBe("function"); +}); + +test("Bun.quic should create a QUIC socket with basic options", () => { + const socket = Bun.quic({ + hostname: "localhost", + port: 8443, + server: false, + data: { + test: true + }, + open(socket) { + console.log("QUIC connection opened", socket); + }, + message(socket, data) { + console.log("QUIC message received", data); + }, + close(socket) { + console.log("QUIC connection closed", socket); + }, + error(socket, error) { + console.log("QUIC error", error); + }, + }); + + expect(socket).toBeDefined(); + expect(typeof socket.connect).toBe("function"); + expect(typeof socket.write).toBe("function"); + expect(typeof socket.read).toBe("function"); + expect(typeof socket.createStream).toBe("function"); + expect(typeof socket.close).toBe("function"); + + // Test properties + expect(typeof socket.isConnected).toBe("boolean"); + expect(typeof socket.isServer).toBe("boolean"); + expect(typeof socket.streamCount).toBe("number"); + expect(socket.readyState).toBe("open"); + + // Clean up + socket.close(); + expect(socket.readyState).toBe("closed"); +}); + +test("QuicSocket should support server mode", () => { + const server = Bun.quic({ + hostname: "localhost", + port: 8443, + server: true, + data: { + isServer: true + }, + open(socket) { + console.log("QUIC server ready", socket); + }, + connection(socket) { + console.log("New QUIC connection", socket); + }, + message(socket, data) { + console.log("Server received message", data); + }, + close(socket) { + console.log("QUIC server closed", socket); + }, + error(socket, error) { + console.log("QUIC server error", error); + }, + }); + + expect(server).toBeDefined(); + expect(server.isServer).toBe(true); + + // Clean up + server.close(); +}); + +test("QuicSocket should provide stats", () => { + const socket = Bun.quic({ + hostname: "localhost", + port: 8443, + server: false, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + const stats = socket.stats; + expect(stats).toBeDefined(); + expect(typeof stats.streamCount).toBe("number"); + expect(typeof stats.isConnected).toBe("boolean"); + expect(typeof stats.has0RTT).toBe("boolean"); + expect(typeof stats.bytesSent).toBe("number"); + expect(typeof stats.bytesReceived).toBe("number"); + + socket.close(); +}); + +test("QuicSocket should support stream creation", () => { + const socket = Bun.quic({ + hostname: "localhost", + port: 8443, + server: false, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + // Stream creation should succeed even during connection process in QUIC + expect(() => socket.createStream()).not.toThrow(); + expect(typeof socket.createStream()).toBe("number"); + + socket.close(); +}); + +test("QuicSocket should validate options", () => { + // Missing options + expect(() => Bun.quic()).toThrow(); + + // Invalid options type + expect(() => Bun.quic("invalid")).toThrow(); + + // Empty options should work (no connection will be made) + const socket = Bun.quic({ + open() {}, + message() {}, + close() {}, + error() {}, + }); + expect(socket).toBeDefined(); + socket.close(); +}); \ No newline at end of file diff --git a/test/js/bun/quic/quic-performance.test.ts b/test/js/bun/quic/quic-performance.test.ts new file mode 100644 index 0000000000..3ac32b7027 --- /dev/null +++ b/test/js/bun/quic/quic-performance.test.ts @@ -0,0 +1,199 @@ +import { test, expect } from "bun:test"; + +test("QUIC large data transfer", async () => { + let dataReceived = ""; + const testData = "x".repeat(64 * 1024); // 64KB test data + + const server = Bun.quic({ + hostname: "localhost", + port: 9446, + server: true, + connection(socket) { + // Send large data to client + socket.write(testData); + }, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9446, + server: false, + open() {}, + message(socket, data) { + dataReceived += data.toString(); + + if (dataReceived.length >= testData.length) { + expect(dataReceived).toBe(testData); + socket.close(); + } + }, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 2000)); + + expect(dataReceived.length).toBe(testData.length); + + server.close(); + client.close(); +}); + +test("QUIC multiple concurrent streams", async () => { + const streamCount = 10; + let streamsCreated = 0; + let messagesReceived = 0; + + const server = Bun.quic({ + hostname: "localhost", + port: 9447, + server: true, + connection(socket) { + // Create multiple streams + for (let i = 0; i < streamCount; i++) { + const stream = socket.createStream(); + streamsCreated++; + + // Send message on each stream + socket.write(`Stream ${i} message`); + } + + expect(socket.streamCount).toBe(streamCount); + }, + message(socket, data) { + messagesReceived++; + console.log("Server received:", data.toString()); + }, + open() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9447, + server: false, + open(socket) { + // Client also creates streams + for (let i = 0; i < streamCount; i++) { + socket.createStream(); + socket.write(`Client stream ${i}`); + } + }, + message(socket, data) { + console.log("Client received:", data.toString()); + }, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + expect(streamsCreated).toBe(streamCount); + expect(messagesReceived).toBeGreaterThan(0); + + server.close(); + client.close(); +}); + +test("QUIC connection statistics", async () => { + let finalStats: any = null; + + const server = Bun.quic({ + hostname: "localhost", + port: 9448, + server: true, + connection(socket) { + // Send some data to generate stats + socket.write("Hello statistics!"); + }, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9448, + server: false, + open(socket) { + // Send data back + socket.write("Stats response!"); + }, + message(socket, data) { + console.log("Client received:", data.toString()); + + // Get final stats before closing + finalStats = socket.stats; + }, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + // Verify stats structure and values + expect(finalStats).toBeDefined(); + expect(typeof finalStats.streamCount).toBe("number"); + expect(typeof finalStats.isConnected).toBe("boolean"); + expect(typeof finalStats.has0RTT).toBe("boolean"); + expect(typeof finalStats.bytesSent).toBe("number"); + expect(typeof finalStats.bytesReceived).toBe("number"); + + // Should have received some data + expect(finalStats.bytesReceived).toBeGreaterThan(0); + + server.close(); + client.close(); +}); + +test("QUIC 0-RTT connection support", async () => { + let has0RTTSupport = false; + + const server = Bun.quic({ + hostname: "localhost", + port: 9449, + server: true, + connection(socket) { + console.log("Server: 0-RTT support:", socket.has0RTT); + }, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9449, + server: false, + open(socket) { + has0RTTSupport = socket.has0RTT; + console.log("Client: 0-RTT support:", has0RTTSupport); + }, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 500)); + + // 0-RTT is a boolean property + expect(typeof has0RTTSupport).toBe("boolean"); + + server.close(); + client.close(); +}); \ No newline at end of file diff --git a/test/js/bun/quic/quic-server-client.test.ts b/test/js/bun/quic/quic-server-client.test.ts new file mode 100644 index 0000000000..3eb217b792 --- /dev/null +++ b/test/js/bun/quic/quic-server-client.test.ts @@ -0,0 +1,206 @@ +import { test, expect } from "bun:test"; + +test("QUIC server and client integration", async () => { + let serverConnections = 0; + let clientConnections = 0; + let messagesReceived = 0; + + // Create QUIC server + const server = Bun.quic({ + hostname: "localhost", + port: 9443, + server: true, + open(socket) { + console.log("QUIC server ready on port 9443"); + }, + connection(socket) { + serverConnections++; + console.log(`New QUIC connection (${serverConnections})`); + + // Send welcome message to client + socket.write("Welcome to QUIC server!"); + }, + message(socket, data) { + messagesReceived++; + console.log("Server received:", data.toString()); + + // Echo the message back + socket.write(`Echo: ${data}`); + }, + close(socket) { + console.log("Server connection closed"); + }, + error(socket, error) { + console.error("Server error:", error); + }, + }); + + // Wait for server to be ready + await new Promise(resolve => setTimeout(resolve, 100)); + + // Create QUIC client + const client = Bun.quic({ + hostname: "localhost", + port: 9443, + server: false, + open(socket) { + clientConnections++; + console.log("QUIC client connected"); + + // Send test message + socket.write("Hello from QUIC client!"); + }, + message(socket, data) { + console.log("Client received:", data.toString()); + + if (data.toString().includes("Echo:")) { + // Test complete, close connection + socket.close(); + } + }, + close(socket) { + console.log("Client connection closed"); + }, + error(socket, error) { + console.error("Client error:", error); + }, + }); + + // Wait for communication to complete + await new Promise(resolve => setTimeout(resolve, 1000)); + + // Verify connections were established + expect(serverConnections).toBe(1); + expect(clientConnections).toBe(1); + expect(messagesReceived).toBeGreaterThan(0); + + // Clean up + server.close(); + client.close(); +}); + +test("QUIC stream creation and management", async () => { + const server = Bun.quic({ + hostname: "localhost", + port: 9444, + server: true, + connection(socket) { + console.log("Server: New connection"); + + // Create multiple streams + const stream1 = socket.createStream(); + const stream2 = socket.createStream(); + + expect(socket.streamCount).toBe(2); + expect(stream1).toBeDefined(); + expect(stream2).toBeDefined(); + expect(stream1).not.toBe(stream2); + }, + open() {}, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9444, + server: false, + open(socket) { + // Client can also create streams + const clientStream = socket.createStream(); + expect(clientStream).toBeDefined(); + expect(socket.streamCount).toBe(1); + }, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 500)); + + server.close(); + client.close(); +}); + +test("QUIC connection states and properties", async () => { + const server = Bun.quic({ + hostname: "localhost", + port: 9445, + server: true, + open() {}, + connection() {}, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 100)); + + const client = Bun.quic({ + hostname: "localhost", + port: 9445, + server: false, + open(socket) { + // Test connection properties + expect(socket.isServer).toBe(false); + expect(socket.readyState).toBe("open"); + expect(socket.serverName).toBe("localhost"); + expect(socket.streamCount).toBe(0); + + // Test stats object + const stats = socket.stats; + expect(typeof stats).toBe("object"); + expect(typeof stats.streamCount).toBe("number"); + expect(typeof stats.isConnected).toBe("boolean"); + expect(typeof stats.has0RTT).toBe("boolean"); + expect(typeof stats.bytesSent).toBe("number"); + expect(typeof stats.bytesReceived).toBe("number"); + }, + message() {}, + close() {}, + error() {}, + }); + + await new Promise(resolve => setTimeout(resolve, 500)); + + // Test server properties + expect(server.isServer).toBe(true); + expect(server.readyState).toBe("open"); + + server.close(); + client.close(); + + // Test closed state + expect(server.readyState).toBe("closed"); + expect(client.readyState).toBe("closed"); +}); + +test("QUIC error handling", async () => { + let errorReceived = false; + + // Try to connect to non-existent server + const client = Bun.quic({ + hostname: "localhost", + port: 9999, // Non-existent port + server: false, + open() { + // Should not be called + expect(false).toBe(true); + }, + message() {}, + close() {}, + error(socket, error) { + errorReceived = true; + console.log("Expected error:", error); + expect(error).toBeDefined(); + }, + }); + + await new Promise(resolve => setTimeout(resolve, 2000)); + + expect(errorReceived).toBe(true); + client.close(); +}); \ No newline at end of file