From 589f941aea154ee4a0b4f72aa09d5c782ff772a9 Mon Sep 17 00:00:00 2001 From: Ashcon Partovi Date: Fri, 26 Apr 2024 15:25:24 -0700 Subject: [PATCH] UDP support (#7271) Co-authored-by: Georgijs Vilums Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: gvilums Co-authored-by: Jarred Sumner Co-authored-by: Jarred-Sumner Co-authored-by: Georgijs <48869301+gvilums@users.noreply.github.com> Co-authored-by: Georgijs Vilums <=> --- packages/bun-types/bun.d.ts | 85 ++ packages/bun-usockets/src/bsd.c | 420 ++++--- .../bun-usockets/src/eventing/epoll_kqueue.c | 4 +- packages/bun-usockets/src/eventing/libuv.c | 4 +- .../src/internal/eventing/epoll_kqueue.h | 6 +- packages/bun-usockets/src/internal/internal.h | 28 +- .../bun-usockets/src/internal/loop_data.h | 2 + .../src/internal/networking/bsd.h | 67 +- packages/bun-usockets/src/libusockets.h | 11 +- packages/bun-usockets/src/loop.c | 50 +- packages/bun-usockets/src/udp.c | 167 +-- src/bun.js/api/BunObject.zig | 2 + src/bun.js/api/bun/subprocess.zig | 1 - src/bun.js/api/bun/udp_socket.zig | 770 ++++++++++++ src/bun.js/api/sockets.classes.ts | 59 + src/bun.js/bindings/BunObject+exports.h | 1 + src/bun.js/bindings/BunObject.cpp | 1 + src/bun.js/bindings/bindings.zig | 2 +- .../bindings/generated_classes_list.zig | 1 + src/bun.js/bindings/helpers.h | 2 +- src/bun.js/node/node_os.zig | 15 +- src/deps/c_ares.zig | 3 - src/deps/uws.zig | 82 +- src/js/node/dgram.ts | 1079 ++++++++++++++++- src/jsc.zig | 1 + src/string.zig | 11 + src/string_immutable.zig | 20 + src/sys.zig | 2 + test/js/bun/udp/dgram.test.ts | 185 +++ test/js/bun/udp/testdata.ts | 79 ++ test/js/bun/udp/udp_socket.test.ts | 215 ++++ 31 files changed, 3066 insertions(+), 309 deletions(-) create mode 100644 src/bun.js/api/bun/udp_socket.zig create mode 100644 test/js/bun/udp/dgram.test.ts create mode 100644 test/js/bun/udp/testdata.ts create mode 100644 test/js/bun/udp/udp_socket.test.ts diff --git a/packages/bun-types/bun.d.ts b/packages/bun-types/bun.d.ts index 25c6741883..927e357957 100644 --- a/packages/bun-types/bun.d.ts +++ b/packages/bun-types/bun.d.ts @@ -3973,6 +3973,91 @@ declare module "bun" { function listen(options: TCPSocketListenOptions): TCPSocketListener; function listen(options: UnixSocketOptions): UnixSocketListener; + namespace udp { + type Data = string | ArrayBufferView | ArrayBufferLike; + + export interface SocketHandler { + data?( + socket: Socket, + data: BinaryTypeList[DataBinaryType], + port: number, + address: string, + ): void | Promise; + drain?(socket: Socket): void | Promise; + error?(socket: Socket, error: Error): void | Promise; + } + + export interface ConnectedSocketHandler { + data?( + socket: ConnectedSocket, + data: BinaryTypeList[DataBinaryType], + port: number, + address: string, + ): void | Promise; + drain?(socket: ConnectedSocket): void | Promise; + error?(socket: ConnectedSocket, error: Error): void | Promise; + } + + export interface SocketOptions { + hostname?: string; + port?: number; + binaryType?: DataBinaryType; + socket?: SocketHandler; + } + + export interface ConnectSocketOptions { + hostname?: string; + port?: number; + binaryType?: DataBinaryType; + socket?: ConnectedSocketHandler; + connect: { + hostname: string; + port: number; + }; + } + + export interface BaseUDPSocket { + readonly hostname: string; + readonly port: number; + readonly address: SocketAddress; + readonly binaryType: BinaryType; + readonly closed: boolean; + ref(): void; + unref(): void; + close(): void; + } + + export interface ConnectedSocket extends BaseUDPSocket { + readonly remoteAddress: SocketAddress; + sendMany(packets: readonly Data[]): number; + send(data: Data): boolean; + reload(handler: ConnectedSocketHandler): void; + } + + export interface Socket extends BaseUDPSocket { + sendMany(packets: readonly (Data | string | number)[]): number; + send(data: Data, port: number, address: string): boolean; + reload(handler: SocketHandler): void; + } + } + + /** + * Create a UDP socket + * + * @param options The options to use when creating the server + * @param options.socket The socket handler to use + * @param options.hostname The hostname to listen on + * @param options.port The port to listen on + * @param options.binaryType The binary type to use for the socket + * @param options.connect The hostname and port to connect to + */ + export function udpSocket( + options: udp.SocketOptions, + ): Promise>; + export function udpSocket( + options: udp.ConnectSocketOptions, + ): Promise>; + namespace SpawnOptions { /** * Option for stdout/stderr diff --git a/packages/bun-usockets/src/bsd.c b/packages/bun-usockets/src/bsd.c index 7fdd748d48..0e9c435ffa 100644 --- a/packages/bun-usockets/src/bsd.c +++ b/packages/bun-usockets/src/bsd.c @@ -26,7 +26,6 @@ #include #ifndef _WIN32 -//#define _GNU_SOURCE #include #include #include @@ -38,107 +37,185 @@ #include #endif -/* Internal structure of packet buffer */ -struct us_internal_udp_packet_buffer { -#if defined(_WIN32) || defined(__APPLE__) - char *buf[LIBUS_UDP_MAX_NUM]; - size_t len[LIBUS_UDP_MAX_NUM]; - struct sockaddr_storage addr[LIBUS_UDP_MAX_NUM]; -#else - struct mmsghdr msgvec[LIBUS_UDP_MAX_NUM]; - struct iovec iov[LIBUS_UDP_MAX_NUM]; - struct sockaddr_storage addr[LIBUS_UDP_MAX_NUM]; - char control[LIBUS_UDP_MAX_NUM][256]; +#if defined(__APPLE__) && defined(__aarch64__) +#define HAS_MSGX #endif -}; /* We need to emulate sendmmsg, recvmmsg on platform who don't have it */ -int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags) { -#if defined(__APPLE__) - -struct mmsghdr { - struct msghdr msg_hdr; /* Message header */ - unsigned int msg_len; /* Number of bytes transmitted */ -}; - - struct mmsghdr *hdrs = (struct mmsghdr *) msgvec; - - for (int i = 0; i < vlen; i++) { - int ret = sendmsg(fd, &hdrs[i].msg_hdr, flags); - if (ret == -1) { - if (i) { - return i; +int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int flags) { +#if defined(_WIN32)// || defined(__APPLE__) + for (int i = 0; i < sendbuf->num; i++) { + while (1) { + int ret = 0; + struct sockaddr *addr = (struct sockaddr *)sendbuf->addresses[i]; + if (!addr || addr->sa_family == AF_UNSPEC) { + ret = send(fd, sendbuf->payloads[i], sendbuf->lengths[i], flags); + } else if (addr->sa_family == AF_INET) { + socklen_t len = sizeof(struct sockaddr_in); + ret = sendto(fd, sendbuf->payloads[i], sendbuf->lengths[i], flags, addr, len); + } else if (addr->sa_family == AF_INET6) { + socklen_t len = sizeof(struct sockaddr_in6); + ret = sendto(fd, sendbuf->payloads[i], sendbuf->lengths[i], flags, addr, len); } else { + errno = EAFNOSUPPORT; return -1; } - } else { - hdrs[i].msg_len = ret; + if (ret < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return i; + return ret; + } + break; } } - - return vlen; - -#elif defined(_WIN32) - - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec; - - /* Let's just use sendto here */ - /* Winsock does not have sendmsg, while macOS has, however, we simply use sendto since both macOS and Winsock has it. - * Besides, you should use Linux either way to get best performance with the sendmmsg */ - - - // while we do not get error, send next - - for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) { - // need to support ipv6 addresses also! - int ret = sendto(fd, packet_buffer->buf[i], packet_buffer->len[i], flags, (struct sockaddr *)&packet_buffer->addr[i], sizeof(struct sockaddr_in)); - - if (ret == -1) { - // if we fail then we need to buffer up, no that's not our problem - // we do need to register poll out though and have a callback for it - return i; + return sendbuf->num; +#elif defined(__APPLE__) + // TODO figure out why sendmsg_x fails when one of the messages is empty + // so that we can get rid of this code. + // One of the weird things is that once a non-empty message has been sent on the socket, + // empty messages start working as well. Bizzare. +#ifdef HAS_MSGX + if (sendbuf->has_empty) { +#endif + for (int i = 0; i < sendbuf->num; i++) { + while (1) { + ssize_t ret = sendmsg(fd, &sendbuf->msgvec[i].msg_hdr, flags); + if (ret < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return i; + return ret; + } + break; + } } - - //printf("sendto: %d\n", ret); + return sendbuf->num; +#ifdef HAS_MSGX } - - return LIBUS_UDP_MAX_NUM; // one message + while (1) { + int ret = sendmsg_x(fd, sendbuf->msgvec, sendbuf->num, flags); + if (ret >= 0 || errno != EINTR) return ret; + } +#endif #else - return sendmmsg(fd, (struct mmsghdr *)msgvec, vlen, flags | MSG_NOSIGNAL); + while (1) { + int ret = sendmmsg(fd, sendbuf->msgvec, sendbuf->num, flags | MSG_NOSIGNAL); + if (ret >= 0 || errno != EINTR) return ret; + } #endif } -int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags, void *timeout) { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec; - - - for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) { - socklen_t addr_len = sizeof(struct sockaddr_storage); - int ret = recvfrom(fd, packet_buffer->buf[i], LIBUS_UDP_MAX_SIZE, flags, (struct sockaddr *)&packet_buffer->addr[i], &addr_len); - - if (ret == -1) { - return i; +int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags) { +#if defined(_WIN32) + socklen_t addr_len = sizeof(struct sockaddr_storage); + while (1) { + ssize_t ret = recvfrom(fd, recvbuf->buf, LIBUS_RECV_BUFFER_LENGTH, flags, (struct sockaddr *)&recvbuf->addr, &addr_len); + if (ret < 0) { + if (errno == EINTR) continue; + return ret; } - - packet_buffer->len[i] = ret; + recvbuf->recvlen = ret; + return 1; + } +#elif defined(__APPLE__) +#ifdef HAS_MSGX + while (1) { + int ret = recvmsg_x(fd, recvbuf->msgvec, LIBUS_UDP_RECV_COUNT, flags); + if (ret >= 0 || errno != EINTR) return ret; } - - return LIBUS_UDP_MAX_NUM; #else - // we need to set controllen for ip packet - for (int i = 0; i < vlen; i++) { - ((struct mmsghdr *)msgvec)[i].msg_hdr.msg_controllen = 256; + for (int i = 0; i < LIBUS_UDP_RECV_COUNT; ++i) { + while (1) { + ssize_t ret = recvmsg(fd, &recvbuf->msgvec[i].msg_hdr, flags); + if (ret < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) return i; + return ret; + } + recvbuf->msgvec[i].msg_len = ret; + break; + } } + return LIBUS_UDP_RECV_COUNT; +#endif +#else + while (1) { + int ret = recvmmsg(fd, (struct mmsghdr *)&recvbuf->msgvec, LIBUS_UDP_RECV_COUNT, flags, 0); + if (ret >= 0 || errno != EINTR) return ret; + } +#endif +} - return recvmmsg(fd, (struct mmsghdr *)msgvec, vlen, flags, 0); +void bsd_udp_setup_recvbuf(struct udp_recvbuf *recvbuf, void *databuf, size_t databuflen) { +#if defined(_WIN32) + recvbuf->buf = databuf; + recvbuf->buflen = databuflen; +#else + // assert(databuflen > LIBUS_UDP_MAX_SIZE * LIBUS_UDP_RECV_COUNT); + + for (int i = 0; i < LIBUS_UDP_RECV_COUNT; i++) { + recvbuf->iov[i].iov_base = (char*)databuf + i * LIBUS_UDP_MAX_SIZE; + recvbuf->iov[i].iov_len = LIBUS_UDP_MAX_SIZE; + + recvbuf->msgvec[i].msg_hdr.msg_name = &recvbuf->addr[i]; + recvbuf->msgvec[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); + + recvbuf->msgvec[i].msg_hdr.msg_iov = &recvbuf->iov[i]; + recvbuf->msgvec[i].msg_hdr.msg_iovlen = 1; + + recvbuf->msgvec[i].msg_hdr.msg_control = recvbuf->control[i]; + recvbuf->msgvec[i].msg_hdr.msg_controllen = 256; + } +#endif +} + +int bsd_udp_setup_sendbuf(struct udp_sendbuf *buf, size_t bufsize, void** payloads, size_t* lengths, void** addresses, int num) { +#if defined(_WIN32) + buf->payloads = payloads; + buf->lengths = lengths; + buf->addresses = addresses; + buf->num = num; + return num; +#else + buf->has_empty = 0; + struct mmsghdr *msgvec = buf->msgvec; + // todo check this math + size_t count = (bufsize - sizeof(struct udp_sendbuf)) / (sizeof(struct mmsghdr) + sizeof(struct iovec)); + if (count > num) { + count = num; + } + struct iovec *iov = (struct iovec *) (msgvec + count); + for (int i = 0; i < count; i++) { + struct sockaddr *addr = (struct sockaddr *)addresses[i]; + socklen_t addr_len = 0; + if (addr) { + addr_len = addr->sa_family == AF_INET ? sizeof(struct sockaddr_in) + : addr->sa_family == AF_INET6 ? sizeof(struct sockaddr_in6) + : 0; + } + iov[i].iov_base = payloads[i]; + iov[i].iov_len = lengths[i]; + msgvec[i].msg_hdr.msg_name = addresses[i]; + msgvec[i].msg_hdr.msg_namelen = addr_len; + msgvec[i].msg_hdr.msg_control = NULL; + msgvec[i].msg_hdr.msg_controllen = 0; + msgvec[i].msg_hdr.msg_iov = iov + i; + msgvec[i].msg_hdr.msg_iovlen = 1; + msgvec[i].msg_hdr.msg_flags = 0; + msgvec[i].msg_len = 0; + + if (lengths[i] == 0) { + buf->has_empty = 1; + } + } + buf->num = count; + return count; #endif } // this one is needed for knowing the destination addr of udp packet // an udp socket can only bind to one port, and that port never changes // this function returns ONLY the IP address, not any port -int bsd_udp_packet_buffer_local_ip(void *msgvec, int index, char *ip) { +int bsd_udp_packet_buffer_local_ip(struct udp_recvbuf *msgvec, int index, char *ip) { #if defined(_WIN32) || defined(__APPLE__) return 0; // not supported #else @@ -163,99 +240,30 @@ int bsd_udp_packet_buffer_local_ip(void *msgvec, int index, char *ip) { #endif } -char *bsd_udp_packet_buffer_peer(void *msgvec, int index) { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec; - return (char *)&packet_buffer->addr[index]; +char *bsd_udp_packet_buffer_peer(struct udp_recvbuf *msgvec, int index) { +#if defined(_WIN32) + return (char *)&msgvec->addr; #else return ((struct mmsghdr *) msgvec)[index].msg_hdr.msg_name; #endif } -char *bsd_udp_packet_buffer_payload(void *msgvec, int index) { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec; - return packet_buffer->buf[index]; +char *bsd_udp_packet_buffer_payload(struct udp_recvbuf *msgvec, int index) { +#if defined(_WIN32) + return msgvec->buf; #else return ((struct mmsghdr *) msgvec)[index].msg_hdr.msg_iov[0].iov_base; #endif } -int bsd_udp_packet_buffer_payload_length(void *msgvec, int index) { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec; - return packet_buffer->len[index]; +int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index) { +#if defined(_WIN32) + return msgvec->recvlen; #else return ((struct mmsghdr *) msgvec)[index].msg_len; #endif } -void bsd_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf, int index, int offset, void *payload, int length, void *peer_addr) { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) send_buf; - - memcpy(packet_buffer->buf[index], payload, length); - memcpy(&packet_buffer->addr[index], peer_addr, sizeof(struct sockaddr_storage)); - - packet_buffer->len[index] = length; -#else - //printf("length: %d, offset: %d\n", length, offset); - - struct mmsghdr *ss = (struct mmsghdr *) send_buf; - - // copy the peer address - memcpy(ss[index].msg_hdr.msg_name, peer_addr, /*ss[index].msg_hdr.msg_namelen*/ sizeof(struct sockaddr_in)); - - // set control length to 0 - ss[index].msg_hdr.msg_controllen = 0; - - // copy the payload - - ss[index].msg_hdr.msg_iov->iov_len = length + offset; - - - memcpy(((char *) ss[index].msg_hdr.msg_iov->iov_base) + offset, payload, length); -#endif -} - -/* The maximum UDP payload size is 64kb, but in IPV6 you can have jumbopackets larger than so. - * We do not support those jumbo packets currently, but will safely ignore them. - * Any sane sender would assume we don't support them if we consistently drop them. - * Therefore a udp_packet_buffer_t will be 64 MB in size (64kb * 1024). */ -void *bsd_create_udp_packet_buffer() { -#if defined(_WIN32) || defined(__APPLE__) - struct us_internal_udp_packet_buffer *b = malloc(sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * LIBUS_UDP_MAX_NUM); - - for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) { - b->buf[i] = ((char *) b) + sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * i; - } - - return (struct us_udp_packet_buffer_t *) b; -#else - /* Allocate 64kb times 1024 */ - struct us_internal_udp_packet_buffer *b = malloc(sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * LIBUS_UDP_MAX_NUM); - - for (int n = 0; n < LIBUS_UDP_MAX_NUM; ++n) { - - b->iov[n].iov_base = &((char *) (b + 1))[n * LIBUS_UDP_MAX_SIZE]; - b->iov[n].iov_len = LIBUS_UDP_MAX_SIZE; - - b->msgvec[n].msg_hdr = (struct msghdr) { - .msg_name = &b->addr, - .msg_namelen = sizeof (struct sockaddr_storage), - - .msg_iov = &b->iov[n], - .msg_iovlen = 1, - - .msg_control = b->control[n], - .msg_controllen = 256, - }; - } - - return (struct us_udp_packet_buffer_t *) b; -#endif -} - LIBUS_SOCKET_DESCRIPTOR apple_no_sigpipe(LIBUS_SOCKET_DESCRIPTOR fd) { #ifdef __APPLE__ if (fd != LIBUS_SOCKET_ERROR) { @@ -766,10 +774,10 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port) { if (setsockopt(listenFd, IPPROTO_IPV6, IPV6_RECVPKTINFO, (void *) &enabled, sizeof(enabled)) == -1) { if (errno == 92) { if (setsockopt(listenFd, IPPROTO_IP, IP_PKTINFO, (void *) &enabled, sizeof(enabled)) != 0) { - printf("Error setting IPv4 pktinfo!\n"); + //printf("Error setting IPv4 pktinfo!\n"); } } else { - printf("Error setting IPv6 pktinfo!\n"); + //printf("Error setting IPv6 pktinfo!\n"); } } @@ -777,10 +785,10 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port) { if (setsockopt(listenFd, IPPROTO_IPV6, IPV6_RECVTCLASS, (void *) &enabled, sizeof(enabled)) == -1) { if (errno == 92) { if (setsockopt(listenFd, IPPROTO_IP, IP_RECVTOS, (void *) &enabled, sizeof(enabled)) != 0) { - printf("Error setting IPv4 ECN!\n"); + //printf("Error setting IPv4 ECN!\n"); } } else { - printf("Error setting IPv6 ECN!\n"); + //printf("Error setting IPv6 ECN!\n"); } } @@ -795,37 +803,83 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port) { return listenFd; } -int bsd_udp_packet_buffer_ecn(void *msgvec, int index) { +int bsd_connect_udp_socket(LIBUS_SOCKET_DESCRIPTOR fd, const char *host, int port) { + struct addrinfo hints, *result; + memset(&hints, 0, sizeof(struct addrinfo)); -#if defined(_WIN32) || defined(__APPLE__) - printf("ECN not supported!\n"); -#else - // we should iterate all control messages once, after recvmmsg and then only fetch them with these functions - struct msghdr *mh = &((struct mmsghdr *) msgvec)[index].msg_hdr; - for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(mh); cmsg != NULL; cmsg = CMSG_NXTHDR(mh, cmsg)) { - // do we need to get TOS from ipv6 also? - if (cmsg->cmsg_level == IPPROTO_IP) { - if (cmsg->cmsg_type == IP_TOS) { - uint8_t tos = *(uint8_t *)CMSG_DATA(cmsg); - return tos & 3; - } - } + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; - if (cmsg->cmsg_level == IPPROTO_IPV6) { - if (cmsg->cmsg_type == IPV6_TCLASS) { - // is this correct? - uint8_t tos = *(uint8_t *)CMSG_DATA(cmsg); - return tos & 3; - } + char port_string[16]; + snprintf(port_string, 16, "%d", port); + + if (getaddrinfo(host, port_string, &hints, &result)) { + return -1; + } + + if (result == NULL) { + return -1; + } + + for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) { + if (connect(fd, rp->ai_addr, rp->ai_addrlen) == 0) { + freeaddrinfo(result); + return 0; } } -#endif - printf("We got no ECN!\n"); - - return 0; // no ecn defaults to 0 + freeaddrinfo(result); + return LIBUS_SOCKET_ERROR; } +int bsd_disconnect_udp_socket(LIBUS_SOCKET_DESCRIPTOR fd) { + struct sockaddr addr; + memset(&addr, 0, sizeof(addr)); + addr.sa_family = AF_UNSPEC; + #ifdef __APPLE__ + addr.sa_len = sizeof(addr); + #endif + + int res = connect(fd, &addr, sizeof(addr)); + // EAFNOSUPPORT is harmless in this case - we just want to disconnect + if (res == 0 || errno == EAFNOSUPPORT) { + return 0; + } else { + return -1; + } +} + +// int bsd_udp_packet_buffer_ecn(void *msgvec, int index) { + +// #if defined(_WIN32) || defined(__APPLE__) +// errno = ENOSYS; +// return -1; +// #else +// // we should iterate all control messages once, after recvmmsg and then only fetch them with these functions +// struct msghdr *mh = &((struct mmsghdr *) msgvec)[index].msg_hdr; +// for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(mh); cmsg != NULL; cmsg = CMSG_NXTHDR(mh, cmsg)) { +// // do we need to get TOS from ipv6 also? +// if (cmsg->cmsg_level == IPPROTO_IP) { +// if (cmsg->cmsg_type == IP_TOS) { +// uint8_t tos = *(uint8_t *)CMSG_DATA(cmsg); +// return tos & 3; +// } +// } + +// if (cmsg->cmsg_level == IPPROTO_IPV6) { +// if (cmsg->cmsg_type == IPV6_TCLASS) { +// // is this correct? +// uint8_t tos = *(uint8_t *)CMSG_DATA(cmsg); +// return tos & 3; +// } +// } +// } +// #endif + +// //printf("We got no ECN!\n"); +// return 0; // no ecn defaults to 0 +// } + static int bsd_do_connect_raw(struct addrinfo *rp, int fd) { do { diff --git a/packages/bun-usockets/src/eventing/epoll_kqueue.c b/packages/bun-usockets/src/eventing/epoll_kqueue.c index ca511e0161..3e60c90452 100644 --- a/packages/bun-usockets/src/eventing/epoll_kqueue.c +++ b/packages/bun-usockets/src/eventing/epoll_kqueue.c @@ -89,12 +89,12 @@ LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) { /* Returns any of listen socket, socket, shut down socket or callback */ int us_internal_poll_type(struct us_poll_t *p) { - return p->state.poll_type & 3; + return p->state.poll_type & POLL_TYPE_KIND_MASK; } /* Bug: doesn't really SET, rather read and change, so needs to be inited first! */ void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) { - p->state.poll_type = poll_type | (p->state.poll_type & 12); + p->state.poll_type = poll_type | (p->state.poll_type & POLL_TYPE_POLLING_MASK); } /* Timer */ diff --git a/packages/bun-usockets/src/eventing/libuv.c b/packages/bun-usockets/src/eventing/libuv.c index e737cd8b61..e5d10c4a41 100644 --- a/packages/bun-usockets/src/eventing/libuv.c +++ b/packages/bun-usockets/src/eventing/libuv.c @@ -127,10 +127,10 @@ int us_poll_events(struct us_poll_t *p) { unsigned int us_internal_accept_poll_event(struct us_poll_t *p) { return 0; } -int us_internal_poll_type(struct us_poll_t *p) { return p->poll_type & 3; } +int us_internal_poll_type(struct us_poll_t *p) { return p->poll_type & POLL_TYPE_KIND_MASK; } void us_internal_poll_set_type(struct us_poll_t *p, int poll_type) { - p->poll_type = poll_type | (p->poll_type & 12); + p->poll_type = poll_type | (p->poll_type & POLL_TYPE_POLLING_MASK); } LIBUS_SOCKET_DESCRIPTOR us_poll_fd(struct us_poll_t *p) { return p->fd; } diff --git a/packages/bun-usockets/src/internal/eventing/epoll_kqueue.h b/packages/bun-usockets/src/internal/eventing/epoll_kqueue.h index ae46e91e7d..19bfbc85f7 100644 --- a/packages/bun-usockets/src/internal/eventing/epoll_kqueue.h +++ b/packages/bun-usockets/src/internal/eventing/epoll_kqueue.h @@ -64,9 +64,11 @@ struct us_loop_t { struct us_poll_t { alignas(LIBUS_EXT_ALIGNMENT) struct { - signed int fd : 28; // we could have this unsigned if we wanted to, -1 should never be used - unsigned int poll_type : 4; + signed int fd : 27; // we could have this unsigned if we wanted to, -1 should never be used + unsigned int poll_type : 5; } state; }; +#undef FD_BITS + #endif // EPOLL_KQUEUE_H diff --git a/packages/bun-usockets/src/internal/internal.h b/packages/bun-usockets/src/internal/internal.h index 76074ecce5..29224ce014 100644 --- a/packages/bun-usockets/src/internal/internal.h +++ b/packages/bun-usockets/src/internal/internal.h @@ -54,17 +54,23 @@ void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop, /* Poll type and what it polls for */ enum { - /* Two first bits */ + /* Three first bits */ POLL_TYPE_SOCKET = 0, POLL_TYPE_SOCKET_SHUT_DOWN = 1, POLL_TYPE_SEMI_SOCKET = 2, POLL_TYPE_CALLBACK = 3, + POLL_TYPE_UDP = 4, /* Two last bits */ - POLL_TYPE_POLLING_OUT = 4, - POLL_TYPE_POLLING_IN = 8 + POLL_TYPE_POLLING_OUT = 8, + POLL_TYPE_POLLING_IN = 16, }; +#define POLL_TYPE_BITSIZE 5 // make sure to update epoll_kqueue.h if you change this +#define POLL_TYPE_KIND_MASK 0b111 +#define POLL_TYPE_POLLING_MASK 0b11000 +#define POLL_TYPE_MASK (POLL_TYPE_KIND_MASK | POLL_TYPE_POLLING_MASK) + /* Loop related */ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events); @@ -123,6 +129,22 @@ struct us_wrapped_socket_context_t { struct us_socket_events_t old_events; }; +struct us_udp_socket_t { + alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p; + void (*on_data)(struct us_udp_socket_t *, void *, int); + void (*on_drain)(struct us_udp_socket_t *); + void (*on_close)(struct us_udp_socket_t *); + void *user; + struct us_loop_t *loop; + /* An UDP socket can only ever be bound to one single port regardless of how + * many interfaces it may listen to. Therefore we cache the port after creation + * and use it to build a proper and full sockaddr_in or sockaddr_in6 for every received packet */ + uint16_t port; + uint16_t closed : 1; + uint16_t connected : 1; + struct us_udp_socket_t *next; +}; + #if defined(LIBUS_USE_KQUEUE) /* Internal callback types are polls just like sockets */ struct us_internal_callback_t { diff --git a/packages/bun-usockets/src/internal/loop_data.h b/packages/bun-usockets/src/internal/loop_data.h index 468c5235b2..7cdcd1979d 100644 --- a/packages/bun-usockets/src/internal/loop_data.h +++ b/packages/bun-usockets/src/internal/loop_data.h @@ -25,9 +25,11 @@ struct us_internal_loop_data_t { struct us_socket_context_t *head; struct us_socket_context_t *iterator; char *recv_buf; + char *send_buf; void *ssl_data; void (*pre_cb)(struct us_loop_t *); void (*post_cb)(struct us_loop_t *); + struct us_udp_socket_t *closed_udp_head; struct us_socket_t *closed_head; struct us_socket_t *low_prio_head; int low_prio_budget; diff --git a/packages/bun-usockets/src/internal/networking/bsd.h b/packages/bun-usockets/src/internal/networking/bsd.h index 314421ab81..91d62700fd 100644 --- a/packages/bun-usockets/src/internal/networking/bsd.h +++ b/packages/bun-usockets/src/internal/networking/bsd.h @@ -45,7 +45,6 @@ #endif #define LIBUS_UDP_MAX_SIZE (64 * 1024) -#define LIBUS_UDP_MAX_NUM 1024 struct bsd_addr_t { struct sockaddr_storage mem; @@ -55,15 +54,61 @@ struct bsd_addr_t { int port; }; -int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags); -int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags, void *timeout); -int bsd_udp_packet_buffer_payload_length(void *msgvec, int index); -char *bsd_udp_packet_buffer_payload(void *msgvec, int index); -char *bsd_udp_packet_buffer_peer(void *msgvec, int index); -int bsd_udp_packet_buffer_local_ip(void *msgvec, int index, char *ip); -int bsd_udp_packet_buffer_ecn(void *msgvec, int index); -void *bsd_create_udp_packet_buffer(); -void bsd_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf, int index, int offset, void *payload, int length, void *peer_addr); +#ifdef _WIN32 +// on windows we can only receive one packet at a time +#define LIBUS_UDP_RECV_COUNT 1 +#else +// on unix we can receive at most as many packets as fit into the receive buffer +#define LIBUS_UDP_RECV_COUNT (LIBUS_RECV_BUFFER_LENGTH / LIBUS_UDP_MAX_SIZE) +#endif + +#ifdef __APPLE__ +// a.k.a msghdr_x +struct mmsghdr { + struct msghdr msg_hdr; + size_t msg_len; /* byte length of buffer in msg_iov */ +}; + +ssize_t sendmsg_x(int s, struct mmsghdr *msgp, u_int cnt, int flags); +ssize_t recvmsg_x(int s, struct mmsghdr *msgp, u_int cnt, int flags); +#endif + +struct udp_recvbuf { +#if defined(_WIN32) + char *buf; + size_t buflen; + size_t recvlen; + struct sockaddr_storage addr; +#else + struct mmsghdr msgvec[LIBUS_UDP_RECV_COUNT]; + struct iovec iov[LIBUS_UDP_RECV_COUNT]; + struct sockaddr_storage addr[LIBUS_UDP_RECV_COUNT]; + char control[LIBUS_UDP_RECV_COUNT][256]; +#endif +}; + +struct udp_sendbuf { +#ifdef _WIN32 + void **payloads; + size_t *lengths; + void **addresses; + int num; +#else + int num; + char has_empty; + struct mmsghdr msgvec[]; +#endif +}; + +int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int flags); +int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags); +void bsd_udp_setup_recvbuf(struct udp_recvbuf *recvbuf, void *databuf, size_t databuflen); +int bsd_udp_setup_sendbuf(struct udp_sendbuf *buf, size_t bufsize, void** payloads, size_t* lengths, void** addresses, int num); +int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index); +char *bsd_udp_packet_buffer_payload(struct udp_recvbuf *msgvec, int index); +char *bsd_udp_packet_buffer_peer(struct udp_recvbuf *msgvec, int index); +int bsd_udp_packet_buffer_local_ip(struct udp_recvbuf *msgvec, int index, char *ip); +// int bsd_udp_packet_buffer_ecn(struct udp_recvbuf *msgvec, int index); LIBUS_SOCKET_DESCRIPTOR apple_no_sigpipe(LIBUS_SOCKET_DESCRIPTOR fd); LIBUS_SOCKET_DESCRIPTOR bsd_set_nonblocking(LIBUS_SOCKET_DESCRIPTOR fd); @@ -101,6 +146,8 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_listen_socket_unix(const char *path, size_t p /* Creates an UDP socket bound to the hostname and port */ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port); +int bsd_connect_udp_socket(LIBUS_SOCKET_DESCRIPTOR fd, const char *host, int port); +int bsd_disconnect_udp_socket(LIBUS_SOCKET_DESCRIPTOR fd); LIBUS_SOCKET_DESCRIPTOR bsd_create_connect_socket(const char *host, int port, const char *source_host, int options); diff --git a/packages/bun-usockets/src/libusockets.h b/packages/bun-usockets/src/libusockets.h index 7f4e39865c..a082766cdc 100644 --- a/packages/bun-usockets/src/libusockets.h +++ b/packages/bun-usockets/src/libusockets.h @@ -38,6 +38,9 @@ /* 512kb shared receive buffer */ #define LIBUS_RECV_BUFFER_LENGTH 524288 + +/* Small 16KB shared send buffer for UDP packet metadata */ +#define LIBUS_SEND_BUFFER_LENGTH (1 << 14) /* A timeout granularity of 4 seconds means give or take 4 seconds from set timeout */ #define LIBUS_TIMEOUT_GRANULARITY 4 /* 32 byte padding of receive buffer ends */ @@ -102,14 +105,14 @@ int us_udp_socket_bound_port(struct us_udp_socket_t *s); char *us_udp_packet_buffer_peer(struct us_udp_packet_buffer_t *buf, int index); /* Peeks ECN of received packet */ -int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index); +// int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index); /* Receives a set of packets into specified packet buffer */ int us_udp_socket_receive(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf); void us_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf, int index, int offset, void *payload, int length, void *peer_addr); -int us_udp_socket_send(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int num); +int us_udp_socket_send(struct us_udp_socket_t *s, void** payloads, size_t* lengths, void** addresses, int num); /* Allocates a packet buffer that is reuable per thread. Mutated by us_udp_socket_receive. */ struct us_udp_packet_buffer_t *us_create_udp_packet_buffer(); @@ -121,7 +124,9 @@ struct us_udp_packet_buffer_t *us_create_udp_packet_buffer(); //struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), char *host, unsigned short port); -struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, struct us_udp_packet_buffer_t *buf, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, void *user); +struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, void *user); + +void us_udp_socket_close(struct us_udp_socket_t *s); /* This one is ugly, should be ext! not user */ void *us_udp_socket_user(struct us_udp_socket_t *s); diff --git a/packages/bun-usockets/src/loop.c b/packages/bun-usockets/src/loop.c index 193cc850cc..780c4a5f78 100644 --- a/packages/bun-usockets/src/loop.c +++ b/packages/bun-usockets/src/loop.c @@ -22,16 +22,16 @@ #include #endif -#include - /* The loop has 2 fallthrough polls */ void us_internal_loop_data_init(struct us_loop_t *loop, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop)) { loop->data.sweep_timer = us_create_timer(loop, 1, 0); loop->data.recv_buf = malloc(LIBUS_RECV_BUFFER_LENGTH + LIBUS_RECV_BUFFER_PADDING * 2); + loop->data.send_buf = malloc(LIBUS_SEND_BUFFER_LENGTH); loop->data.ssl_data = 0; loop->data.head = 0; loop->data.iterator = 0; + loop->data.closed_udp_head = 0; loop->data.closed_head = 0; loop->data.low_prio_head = 0; loop->data.low_prio_budget = 0; @@ -50,6 +50,7 @@ void us_internal_loop_data_free(struct us_loop_t *loop) { #endif free(loop->data.recv_buf); + free(loop->data.send_buf); us_timer_close(loop->data.sweep_timer, 0); us_internal_async_close(loop->data.wakeup_async); @@ -175,6 +176,14 @@ void us_internal_free_closed_sockets(struct us_loop_t *loop) { } loop->data.closed_head = 0; } + if (loop->data.closed_udp_head) { + for (struct us_udp_socket_t *s = loop->data.closed_udp_head; s; ) { + struct us_udp_socket_t *next = s->next; + us_poll_free((struct us_poll_t *) s, loop); + s = next; + } + loop->data.closed_udp_head = 0; + } } void sweep_timer_cb(struct us_internal_callback_t *cb) { @@ -397,6 +406,43 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int events) } break; } + case POLL_TYPE_UDP: { + struct us_udp_socket_t *u = (struct us_udp_socket_t *) p; + if (u->closed) { + break; + } + + if (events & LIBUS_SOCKET_WRITABLE && !error) { + u->on_drain(u); + if (u->closed) { + break; + } + // We only poll for writable after a read has failed, and only send one drain notification. + // Otherwise we would receive a writable event on every tick of the event loop. + us_poll_change(&u->p, u->loop, us_poll_events(&u->p) & LIBUS_SOCKET_READABLE); + } + if (events & LIBUS_SOCKET_READABLE) { + struct udp_recvbuf recvbuf; + bsd_udp_setup_recvbuf(&recvbuf, u->loop->data.recv_buf, LIBUS_RECV_BUFFER_LENGTH); + while (1) { + int npackets = bsd_recvmmsg(us_poll_fd(p), &recvbuf, MSG_DONTWAIT); + if (npackets > 0) { + u->on_data(u, &recvbuf, npackets); + if (u->closed) { + break; + } + } else if (npackets == LIBUS_SOCKET_ERROR && bsd_would_block()) { + // break receive loop when we receive EAGAIN or similar + break; + } else if (npackets == LIBUS_SOCKET_ERROR && !bsd_would_block()) { + // close the socket on error + us_udp_socket_close(u); + break; + } + } + } + break; + } } } diff --git a/packages/bun-usockets/src/udp.c b/packages/bun-usockets/src/udp.c index 70a7d3a003..90db9c6a86 100644 --- a/packages/bun-usockets/src/udp.c +++ b/packages/bun-usockets/src/udp.c @@ -18,130 +18,145 @@ #include "libusockets.h" #include "internal/internal.h" -#include -#include #include -int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index) { - return bsd_udp_packet_buffer_ecn(buf, index); -} +// int us_udp_packet_buffer_ecn(struct us_udp_packet_buffer_t *buf, int index) { +// return bsd_udp_packet_buffer_ecn((struct udp_recvbuf *)buf, index); +// } int us_udp_packet_buffer_local_ip(struct us_udp_packet_buffer_t *buf, int index, char *ip) { - return bsd_udp_packet_buffer_local_ip(buf, index, ip); + return bsd_udp_packet_buffer_local_ip((struct udp_recvbuf *)buf, index, ip); } char *us_udp_packet_buffer_peer(struct us_udp_packet_buffer_t *buf, int index) { - return bsd_udp_packet_buffer_peer(buf, index); + return bsd_udp_packet_buffer_peer((struct udp_recvbuf *)buf, index); } char *us_udp_packet_buffer_payload(struct us_udp_packet_buffer_t *buf, int index) { - return bsd_udp_packet_buffer_payload(buf, index); + return bsd_udp_packet_buffer_payload((struct udp_recvbuf *)buf, index); } int us_udp_packet_buffer_payload_length(struct us_udp_packet_buffer_t *buf, int index) { - return bsd_udp_packet_buffer_payload_length(buf, index); + return bsd_udp_packet_buffer_payload_length((struct udp_recvbuf *)buf, index); } -// what should we return? number of sent datagrams? -int us_udp_socket_send(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int num) { +int us_udp_socket_send(struct us_udp_socket_t *s, void** payloads, size_t* lengths, void** addresses, int num) { + if (num == 0) return 0; int fd = us_poll_fd((struct us_poll_t *) s); - // we need to poll out if we failed + struct udp_sendbuf *buf = (struct udp_sendbuf *)s->loop->data.send_buf; - return bsd_sendmmsg(fd, buf, num, 0); + int total_sent = 0; + while (total_sent < num) { + int count = bsd_udp_setup_sendbuf(buf, LIBUS_SEND_BUFFER_LENGTH, payloads, lengths, addresses, num); + payloads += count; + lengths += count; + addresses += count; + num -= count; + // TODO nohang flag? + int sent = bsd_sendmmsg(fd, buf, MSG_DONTWAIT); + if (sent < 0) { + return sent; + } + total_sent += sent; + if (0 <= sent && sent < num) { + // if we couldn't send all packets, register a writable event so we can call the drain callback + us_poll_change((struct us_poll_t *) s, s->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); + } + } + return total_sent; } -int us_udp_socket_receive(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf) { - int fd = us_poll_fd((struct us_poll_t *) s); - return bsd_recvmmsg(fd, buf, LIBUS_UDP_MAX_NUM, 0, 0); -} - -void us_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf, int index, int offset, void *payload, int length, void *peer_addr) { - bsd_udp_buffer_set_packet_payload(send_buf, index, offset, payload, length, peer_addr); -} - -struct us_udp_packet_buffer_t *us_create_udp_packet_buffer() { - return (struct us_udp_packet_buffer_t *) bsd_create_udp_packet_buffer(); -} - -struct us_internal_udp_t { - struct us_internal_callback_t cb; - struct us_udp_packet_buffer_t *receive_buf; - void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int); - void (*drain_cb)(struct us_udp_socket_t *); - void *user; - /* An UDP socket can only ever be bound to one single port regardless of how - * many interfaces it may listen to. Therefore we cache the port after creation - * and use it to build a proper and full sockaddr_in or sockaddr_in6 for every received packet */ - int port; -}; - int us_udp_socket_bound_port(struct us_udp_socket_t *s) { - return ((struct us_internal_udp_t *) s)->port; + return ((struct us_udp_socket_t *) s)->port; } -/* Internal wrapper, move from here */ -void internal_on_udp_read(struct us_udp_socket_t *s) { +void us_udp_socket_bound_ip(struct us_udp_socket_t *s, char *buf, int *length) { + struct bsd_addr_t addr; + if (bsd_local_addr(us_poll_fd((struct us_poll_t *)s), &addr) || *length < bsd_addr_get_ip_length(&addr)) { + *length = 0; + } else { + *length = bsd_addr_get_ip_length(&addr); + memcpy(buf, bsd_addr_get_ip(&addr), *length); + } +} - // lookup receive buffer and callback here - struct us_internal_udp_t *udp = (struct us_internal_udp_t *) s; - - int packets = us_udp_socket_receive(s, udp->receive_buf); - //printf("Packets: %d\n", packets); - - // we need to get the socket data and lookup its callback here - - - udp->data_cb(s, udp->receive_buf, packets); +void us_udp_socket_remote_ip(struct us_udp_socket_t *s, char *buf, int *length) { + struct bsd_addr_t addr; + if (bsd_remote_addr(us_poll_fd((struct us_poll_t *)s), &addr) || *length < bsd_addr_get_ip_length(&addr)) { + *length = 0; + } else { + *length = bsd_addr_get_ip_length(&addr); + memcpy(buf, bsd_addr_get_ip(&addr), *length); + } } void *us_udp_socket_user(struct us_udp_socket_t *s) { - struct us_internal_udp_t *udp = (struct us_internal_udp_t *) s; + struct us_udp_socket_t *udp = (struct us_udp_socket_t *) s; return udp->user; } -struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, struct us_udp_packet_buffer_t *buf, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, void *user) { - +void us_udp_socket_close(struct us_udp_socket_t *s) { + struct us_loop_t *loop = s->loop; + struct us_poll_t *p = (struct us_poll_t *) s; + us_poll_stop(p, loop); + bsd_close_socket(us_poll_fd(p)); + s->closed = 1; + s->next = loop->data.closed_udp_head; + loop->data.closed_udp_head = s; + s->on_close(s); +} + +int us_udp_socket_connect(struct us_udp_socket_t *s, const char* host, unsigned short port) { + return bsd_connect_udp_socket(us_poll_fd((struct us_poll_t *)s), host, port); +} + +int us_udp_socket_disconnect(struct us_udp_socket_t *s) { + return bsd_disconnect_udp_socket(us_poll_fd((struct us_poll_t *)s)); +} + +struct us_udp_socket_t *us_create_udp_socket( + struct us_loop_t *loop, + void (*data_cb)(struct us_udp_socket_t *, void *, int), + void (*drain_cb)(struct us_udp_socket_t *), + void (*close_cb)(struct us_udp_socket_t *), + const char *host, + unsigned short port, + void *user +) { + LIBUS_SOCKET_DESCRIPTOR fd = bsd_create_udp_socket(host, port); if (fd == LIBUS_SOCKET_ERROR) { return 0; } - /* If buf is 0 then create one here */ - if (!buf) { - buf = us_create_udp_packet_buffer(); - } - int ext_size = 0; int fallthrough = 0; - struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_udp_t) + ext_size); - us_poll_init(p, fd, POLL_TYPE_CALLBACK); + struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_udp_socket_t) + ext_size); + us_poll_init(p, fd, POLL_TYPE_UDP); - struct us_internal_udp_t *cb = (struct us_internal_udp_t *) p; - cb->cb.loop = loop; - cb->cb.cb_expects_the_loop = 0; - cb->cb.leave_poll_ready = 1; + struct us_udp_socket_t *udp = (struct us_udp_socket_t *)p; /* Get and store the port once */ struct bsd_addr_t tmp; bsd_local_addr(fd, &tmp); - cb->port = bsd_addr_get_port(&tmp); - - printf("The port of UDP is: %d\n", cb->port); + udp->port = bsd_addr_get_port(&tmp); + udp->loop = loop; /* There is no udp socket context, only user data */ /* This should really be ext like everything else */ - cb->user = user; + udp->user = user; - cb->data_cb = data_cb; - cb->receive_buf = buf; - cb->drain_cb = drain_cb; + udp->closed = 0; + udp->connected = 0; + udp->on_data = data_cb; + udp->on_drain = drain_cb; + udp->on_close = close_cb; + udp->next = NULL; - cb->cb.cb = (void (*)(struct us_internal_callback_t *)) internal_on_udp_read; - - us_poll_start((struct us_poll_t *) cb, cb->cb.loop, LIBUS_SOCKET_READABLE); + us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE); - return (struct us_udp_socket_t *) cb; + return (struct us_udp_socket_t *) udp; } \ No newline at end of file diff --git a/src/bun.js/api/BunObject.zig b/src/bun.js/api/BunObject.zig index 713ba248a8..7f714fd9da 100644 --- a/src/bun.js/api/BunObject.zig +++ b/src/bun.js/api/BunObject.zig @@ -21,6 +21,7 @@ pub const BunObject = struct { pub const inflateSync = JSC.wrapStaticMethod(JSZlib, "inflateSync", true); pub const jest = @import("../test/jest.zig").Jest.call; pub const listen = JSC.wrapStaticMethod(JSC.API.Listener, "listen", false); + pub const udpSocket = JSC.wrapStaticMethod(JSC.API.UDPSocket, "udpSocket", false); pub const mmap = Bun.mmapFile; pub const nanoseconds = Bun.nanoseconds; pub const openInEditor = Bun.openInEditor; @@ -134,6 +135,7 @@ pub const BunObject = struct { @export(BunObject.inflateSync, .{ .name = callbackName("inflateSync") }); @export(BunObject.jest, .{ .name = callbackName("jest") }); @export(BunObject.listen, .{ .name = callbackName("listen") }); + @export(BunObject.udpSocket, .{ .name = callbackName("udpSocket") }); @export(BunObject.mmap, .{ .name = callbackName("mmap") }); @export(BunObject.nanoseconds, .{ .name = callbackName("nanoseconds") }); @export(BunObject.openInEditor, .{ .name = callbackName("openInEditor") }); diff --git a/src/bun.js/api/bun/subprocess.zig b/src/bun.js/api/bun/subprocess.zig index 020b2040d4..9133bc3c61 100644 --- a/src/bun.js/api/bun/subprocess.zig +++ b/src/bun.js/api/bun/subprocess.zig @@ -1707,7 +1707,6 @@ pub const Subprocess = struct { if (arg.len == 0) { continue; } - argv.appendAssumeCapacity(arg.toOwnedSliceZ(allocator) catch { globalThis.throwOutOfMemory(); return .zero; diff --git a/src/bun.js/api/bun/udp_socket.zig b/src/bun.js/api/bun/udp_socket.zig new file mode 100644 index 0000000000..583c5935d1 --- /dev/null +++ b/src/bun.js/api/bun/udp_socket.zig @@ -0,0 +1,770 @@ +const std = @import("std"); +const uws = @import("../../../deps/uws.zig"); +const bun = @import("root").bun; + +const strings = bun.strings; +const default_allocator = bun.default_allocator; +const Output = bun.Output; +const Async = bun.Async; +const JSC = bun.JSC; +const CallFrame = JSC.CallFrame; +const JSGlobalObject = JSC.JSGlobalObject; +const JSValue = JSC.JSValue; + +const log = Output.scoped(.UdpSocket, false); + +const INET6_ADDRSTRLEN = if (bun.Environment.isWindows) 65 else 46; + +extern fn ntohs(nshort: u16) u16; +extern fn htonl(hlong: u32) u32; +extern fn htons(hshort: u16) u16; +extern fn inet_ntop(af: c_int, src: ?*const anyopaque, dst: [*c]u8, size: c_int) ?[*:0]const u8; +extern fn inet_pton(af: c_int, src: [*c]const u8, dst: ?*anyopaque) c_int; +extern fn JSSocketAddress__create(global: *JSGlobalObject, address: JSValue, port: i32, v6: bool) JSValue; + +fn onClose(socket: *uws.udp.Socket) callconv(.C) void { + JSC.markBinding(@src()); + + const this: *UDPSocket = bun.cast(*UDPSocket, socket.user().?); + this.closed = true; + this.poll_ref.unref(this.globalThis.bunVM()); + _ = this.js_refcount.fetchSub(1, .Monotonic); +} + +fn onDrain(socket: *uws.udp.Socket) callconv(.C) void { + JSC.markBinding(@src()); + + const this: *UDPSocket = bun.cast(*UDPSocket, socket.user().?); + const callback = this.config.on_drain; + if (callback == .zero) return; + + const result = callback.callWithThis(this.globalThis, this.thisValue, &[_]JSValue{this.thisValue}); + if (result.toError()) |err| { + _ = this.callErrorHandler(.zero, &[_]JSValue{err}); + } +} + +fn onData(socket: *uws.udp.Socket, buf: *uws.udp.PacketBuffer, packets: c_int) callconv(.C) void { + JSC.markBinding(@src()); + + const udpSocket: *UDPSocket = bun.cast(*UDPSocket, socket.user().?); + const callback = udpSocket.config.on_data; + if (callback == .zero) return; + + const globalThis = udpSocket.globalThis; + + var i: c_int = 0; + while (i < packets) : (i += 1) { + const peer = buf.getPeer(i); + + var addr_buf: [INET6_ADDRSTRLEN + 1:0]u8 = undefined; + var hostname: ?[*:0]const u8 = null; + var port: u16 = 0; + + switch (peer.family) { + std.os.AF.INET => { + const peer4: *std.os.sockaddr.in = @ptrCast(peer); + hostname = inet_ntop(peer.family, &peer4.addr, &addr_buf, addr_buf.len); + port = ntohs(peer4.port); + }, + std.os.AF.INET6 => { + const peer6: *std.os.sockaddr.in6 = @ptrCast(peer); + hostname = inet_ntop(peer.family, &peer6.addr, &addr_buf, addr_buf.len); + port = ntohs(peer6.port); + }, + else => continue, + } + + if (hostname == null or port == 0) { + continue; + } + + const slice = buf.getPayload(i); + + const loop = udpSocket.vm.eventLoop(); + loop.enter(); + defer loop.exit(); + _ = udpSocket.js_refcount.fetchAdd(1, .Monotonic); + defer _ = udpSocket.js_refcount.fetchSub(1, .Monotonic); + + const result = callback.callWithThis(globalThis, udpSocket.thisValue, &[_]JSValue{ + udpSocket.thisValue, + udpSocket.config.binary_type.toJS(slice, globalThis), + JSC.jsNumber(port), + JSC.ZigString.init(std.mem.span(hostname.?)).toValueAuto(globalThis), + }); + + if (result.toError()) |err| { + _ = udpSocket.callErrorHandler(.zero, &[_]JSValue{err}); + } + } +} + +pub const UDPSocketConfig = struct { + const This = @This(); + const handlers = .{ + .{ "data", "on_data" }, + .{ "drain", "on_drain" }, + .{ "error", "on_error" }, + }; + + const ConnectConfig = struct { + port: u16, + address: [:0]u8, + }; + + hostname: [:0]u8, + connect: ?ConnectConfig = null, + port: u16, + binary_type: JSC.BinaryType = .Buffer, + on_data: JSValue = .zero, + on_drain: JSValue = .zero, + on_error: JSValue = .zero, + + pub fn fromJS(globalThis: *JSGlobalObject, options: JSValue) ?This { + if (options.isEmptyOrUndefinedOrNull() or !options.isObject()) { + globalThis.throwInvalidArguments("Expected an object", .{}); + return null; + } + + const hostname = brk: { + if (options.getTruthy(globalThis, "hostname")) |value| { + if (!value.isString()) { + globalThis.throwInvalidArguments("Expected \"hostname\" to be a string", .{}); + return null; + } + const str = value.toBunString(globalThis); + defer str.deref(); + break :brk str.toOwnedSliceZ(default_allocator) catch bun.outOfMemory(); + } else { + break :brk default_allocator.dupeZ(u8, "0.0.0.0") catch bun.outOfMemory(); + } + }; + defer if (globalThis.hasException()) default_allocator.free(hostname); + + const port: u16 = brk: { + if (options.getTruthy(globalThis, "port")) |value| { + const number = value.coerceToInt32(globalThis); + break :brk if (number < 1 or number > 0xffff) 0 else @intCast(number); + } else { + break :brk 0; + } + }; + + var config = This{ + .hostname = hostname, + .port = port, + }; + + if (options.getTruthy(globalThis, "socket")) |socket| { + if (!socket.isObject()) { + globalThis.throwInvalidArguments("Expected \"socket\" to be an object", .{}); + return null; + } + + if (options.getTruthy(globalThis, "binaryType")) |value| { + if (!value.isString()) { + globalThis.throwInvalidArguments("Expected \"socket.binaryType\" to be a string", .{}); + return null; + } + + config.binary_type = JSC.BinaryType.fromJSValue(globalThis, value) orelse { + globalThis.throwInvalidArguments("Expected \"socket.binaryType\" to be 'arraybuffer', 'uint8array', or 'buffer'", .{}); + return null; + }; + } + + inline for (handlers) |handler| { + if (socket.getTruthyComptime(globalThis, handler.@"0")) |value| { + if (!value.isCell() or !value.isCallable(globalThis.vm())) { + globalThis.throwInvalidArguments("Expected \"socket.{s}\" to be a function", .{handler.@"0"}); + return null; + } + @field(config, handler.@"1") = value; + } + } + } + + defer { + if (globalThis.hasException()) { + if (config.connect) |connect| { + default_allocator.free(connect.address); + } + } + } + + if (options.getTruthy(globalThis, "connect")) |connect| { + if (!connect.isObject()) { + globalThis.throwInvalidArguments("Expected \"connect\" to be an object", .{}); + return null; + } + + const connect_host_js = connect.getTruthy(globalThis, "hostname") orelse { + globalThis.throwInvalidArguments("Expected \"connect.hostname\" to be a string", .{}); + return null; + }; + + if (!connect_host_js.isString()) { + globalThis.throwInvalidArguments("Expected \"connect.hostname\" to be a string", .{}); + return null; + } + + const connect_port_js = connect.getTruthy(globalThis, "port") orelse { + globalThis.throwInvalidArguments("Expected \"connect.port\" to be an integer", .{}); + return null; + }; + const connect_port = connect_port_js.coerceToInt32(globalThis); + + const str = connect_host_js.toBunString(globalThis); + defer str.deref(); + const connect_host = str.toOwnedSliceZ(default_allocator) catch bun.outOfMemory(); + + config.connect = .{ + .port = if (connect_port < 1 or connect_port > 0xffff) 0 else @as(u16, @intCast(connect_port)), + .address = connect_host, + }; + } + + config.protect(); + + return config; + } + + pub fn protect(this: This) void { + inline for (handlers) |handler| { + @field(this, handler.@"1").protect(); + } + } + + pub fn unprotect(this: This) void { + inline for (handlers) |handler| { + @field(this, handler.@"1").unprotect(); + } + } + + pub fn deinit(this: This) void { + this.unprotect(); + default_allocator.free(this.hostname); + if (this.connect) |val| { + default_allocator.free(val.address); + } + } +}; + +pub const UDPSocket = struct { + const This = @This(); + + config: UDPSocketConfig, + + socket: *uws.udp.Socket, + loop: *uws.Loop, + + globalThis: *JSGlobalObject, + thisValue: JSValue = .zero, + + ref: JSC.Ref = JSC.Ref.init(), + poll_ref: Async.KeepAlive = Async.KeepAlive.init(), + // if marked as closed the socket pointer may be stale + closed: bool = false, + connect_info: ?ConnectInfo = null, + vm: *JSC.VirtualMachine, + js_refcount: std.atomic.Value(usize) = std.atomic.Value(usize).init(1), + + const ConnectInfo = struct { + port: u16, + }; + + pub usingnamespace JSC.Codegen.JSUDPSocket; + + pub fn constructor(globalThis: *JSGlobalObject, _: *CallFrame) callconv(.C) ?*This { + globalThis.throw("Cannot construct UDPSocket", .{}); + return null; + } + + pub fn hasPendingActivity(this: *This) callconv(.C) bool { + return this.js_refcount.load(.Monotonic) > 0; + } + + pub usingnamespace bun.New(@This()); + + pub fn udpSocket(globalThis: *JSGlobalObject, options: JSValue) JSValue { + log("udpSocket", .{}); + + const config = UDPSocketConfig.fromJS(globalThis, options) orelse { + return .zero; + }; + + const vm = globalThis.bunVM(); + var this = This.new(.{ + .socket = undefined, + .config = config, + .globalThis = globalThis, + .loop = uws.Loop.get(), + .vm = vm, + }); + + // also cleans up config + defer { + if (globalThis.hasException()) { + this.closed = true; + this.deinit(); + } + } + + if (uws.udp.Socket.create( + this.loop, + onData, + onDrain, + onClose, + config.hostname, + config.port, + this, + )) |socket| { + this.socket = socket; + } else { + globalThis.throw("Failed to bind socket", .{}); + return .zero; + } + + if (config.connect) |connect| { + const ret = this.socket.connect(connect.address, connect.port); + if (ret != 0) { + if (JSC.Maybe(void).errnoSys(ret, .connect)) |err| { + globalThis.throwValue(err.toJS(globalThis)); + return .zero; + } + } + this.connect_info = .{ .port = connect.port }; + } + + this.poll_ref.ref(vm); + const thisValue = this.toJS(globalThis); + thisValue.ensureStillAlive(); + this.thisValue = thisValue; + return JSC.JSPromise.resolvedPromiseValue(globalThis, thisValue); + } + + pub fn callErrorHandler( + this: *This, + thisValue: JSValue, + err: []const JSValue, + ) bool { + const callback = this.config.on_error; + const globalThis = this.globalThis; + const vm = globalThis.bunVM(); + + if (callback == .zero) { + if (err.len > 0) + vm.onUnhandledError(globalThis, err[0]); + + return false; + } + + const result = callback.callWithThis(globalThis, thisValue, err); + if (result.isAnyError()) { + vm.onUnhandledError(globalThis, result); + } + + return true; + } + + pub fn sendMany(this: *This, globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(.C) JSValue { + if (this.closed) { + globalThis.throw("Socket is closed", .{}); + return .zero; + } + const arguments = callframe.arguments(1); + if (arguments.len != 1) { + globalThis.throwInvalidArguments("Expected 1 argument, got {}", .{arguments.len}); + return .zero; + } + + const arg = arguments.ptr[0]; + if (!arg.jsType().isArray()) { + globalThis.throwInvalidArgumentType("sendMany", "first argument", "array"); + return .zero; + } + + const array_len = arg.getLength(globalThis); + if (this.connect_info == null and array_len % 3 != 0) { + globalThis.throwInvalidArguments("Expected 3 arguments for each packet", .{}); + return .zero; + } + + const len = if (this.connect_info == null) array_len / 3 else array_len; + + var arena = std.heap.ArenaAllocator.init(bun.default_allocator); + defer arena.deinit(); + const alloc = arena.allocator(); + + var payloads = alloc.alloc([*]const u8, len) catch bun.outOfMemory(); + var lens = alloc.alloc(usize, len) catch bun.outOfMemory(); + var addr_ptrs = alloc.alloc(?*const anyopaque, len) catch bun.outOfMemory(); + var addrs = alloc.alloc(std.os.sockaddr.storage, len) catch bun.outOfMemory(); + + var iter = arg.arrayIterator(globalThis); + + var i: u16 = 0; + var port: JSValue = .zero; + while (iter.next()) |val| : (i += 1) { + if (i >= array_len) { + globalThis.throwInvalidArguments("Mismatch between array length property and number of items", .{}); + return .zero; + } + const slice_idx = if (this.connect_info == null) i / 3 else i; + if (this.connect_info != null or i % 3 == 0) { + const slice = brk: { + if (val.asArrayBuffer(globalThis)) |arrayBuffer| { + break :brk arrayBuffer.slice(); + } else if (val.isString()) { + break :brk val.toString(globalThis).toSlice(globalThis, alloc).slice(); + } else { + globalThis.throwInvalidArguments("Expected ArrayBufferView or string as payload", .{}); + return .zero; + } + }; + payloads[slice_idx] = slice.ptr; + lens[slice_idx] = slice.len; + } + if (this.connect_info != null) { + addr_ptrs[slice_idx] = null; + continue; + } + if (i % 3 == 1) { + port = val; + continue; + } + if (i % 3 == 2) { + if (!this.parseAddr(globalThis, port, val, &addrs[slice_idx])) { + globalThis.throwInvalidArguments("Invalid address", .{}); + return .zero; + } + addr_ptrs[slice_idx] = &addrs[slice_idx]; + } + } + if (i != array_len) { + globalThis.throwInvalidArguments("Mismatch between array length property and number of items", .{}); + return .zero; + } + const res = this.socket.send(payloads, lens, addr_ptrs); + if (bun.JSC.Maybe(void).errnoSys(res, .send)) |err| { + globalThis.throwValue(err.toJS(globalThis)); + return .zero; + } + return JSValue.jsNumber(res); + } + + pub fn send( + this: *This, + globalThis: *JSGlobalObject, + callframe: *CallFrame, + ) callconv(.C) JSValue { + if (this.closed) { + globalThis.throw("Socket is closed", .{}); + return .zero; + } + const arguments = callframe.arguments(3); + const dst: ?Destination = brk: { + if (this.connect_info != null) { + if (arguments.len == 1) { + break :brk null; + } + if (arguments.len == 3) { + globalThis.throwInvalidArguments("Cannot specify destination on connected socket", .{}); + return .zero; + } + globalThis.throwInvalidArguments("Expected 1 argument, got {}", .{arguments.len}); + return .zero; + } else { + if (arguments.len != 3) { + globalThis.throwInvalidArguments("Expected 3 arguments, got {}", .{arguments.len}); + return .zero; + } + break :brk .{ + .port = arguments.ptr[1], + .address = arguments.ptr[2], + }; + } + }; + + const payload_arg = arguments.ptr[0]; + var payload = brk: { + if (payload_arg.asArrayBuffer(globalThis)) |array_buffer| { + break :brk bun.JSC.ZigString.Slice{ + .ptr = array_buffer.ptr, + .len = array_buffer.len, + }; + } else if (payload_arg.isString()) { + break :brk payload_arg.asString().toSlice(globalThis, bun.default_allocator); + } else { + globalThis.throwInvalidArguments("Expected ArrayBufferView or string as first argument", .{}); + return .zero; + } + }; + defer payload.deinit(); + + var addr: std.os.sockaddr.storage = std.mem.zeroes(std.os.sockaddr.storage); + const addr_ptr = brk: { + if (dst) |dest| { + if (!this.parseAddr(globalThis, dest.port, dest.address, &addr)) { + globalThis.throwInvalidArguments("Invalid address", .{}); + return .zero; + } + break :brk &addr; + } else { + break :brk null; + } + }; + + const res = this.socket.send(&.{payload.ptr}, &.{payload.len}, &.{addr_ptr}); + if (bun.JSC.Maybe(void).errnoSys(res, .send)) |err| { + globalThis.throwValue(err.toJS(globalThis)); + return .zero; + } + return JSValue.jsBoolean(res > 0); + } + + fn parseAddr( + this: *This, + globalThis: *JSGlobalObject, + port_val: JSValue, + address_val: JSValue, + storage: *std.os.sockaddr.storage, + ) bool { + _ = this; + const number = port_val.coerceToInt32(globalThis); + const port: u16 = if (number < 1 or number > 0xffff) 0 else @intCast(number); + + const str = address_val.toBunString(globalThis); + defer str.deref(); + const address_slice = str.toOwnedSliceZ(default_allocator) catch bun.outOfMemory(); + defer default_allocator.free(address_slice); + + var addr4: *std.os.sockaddr.in = @ptrCast(storage); + if (inet_pton(std.os.AF.INET, address_slice.ptr, &addr4.addr) == 1) { + addr4.port = htons(@truncate(port)); + addr4.family = std.os.AF.INET; + } else { + var addr6: *std.os.sockaddr.in6 = @ptrCast(storage); + if (inet_pton(std.os.AF.INET6, address_slice.ptr, &addr6.addr) == 1) { + addr6.port = htons(@truncate(port)); + addr6.family = std.os.AF.INET6; + } else { + return false; + } + } + + return true; + } + + const Destination = struct { + port: JSValue, + address: JSValue, + }; + + pub fn ref(this: *This, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue { + if (!this.closed) { + this.poll_ref.ref(globalThis.bunVM()); + } + + return .undefined; + } + + pub fn unref(this: *This, globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) JSValue { + if (!this.closed) { + this.poll_ref.unref(globalThis.bunVM()); + } + + return .undefined; + } + + pub fn close( + this: *This, + _: *JSGlobalObject, + _: *CallFrame, + ) callconv(.C) JSValue { + if (!this.closed) this.socket.close(); + + return .undefined; + } + + pub fn reload(this: *This, globalThis: *JSGlobalObject, callframe: *CallFrame) callconv(.C) JSValue { + const args = callframe.arguments(1); + + if (args.len < 1) { + globalThis.throwInvalidArguments("Expected 1 argument", .{}); + return .zero; + } + + const options = args.ptr[0]; + const config = UDPSocketConfig.fromJS(globalThis, options) orelse { + return .zero; + }; + + config.protect(); + var previous_config = this.config; + previous_config.unprotect(); + this.config = config; + + return .undefined; + } + + pub fn getClosed(this: *This, _: *JSGlobalObject) callconv(.C) JSValue { + return JSValue.jsBoolean(this.closed); + } + + pub fn getHostname(this: *This, _: *JSGlobalObject) callconv(.C) JSValue { + const hostname = JSC.ZigString.init(this.config.hostname); + return hostname.toValueGC(this.globalThis); + } + + pub fn getPort(this: *This, _: *JSGlobalObject) callconv(.C) JSValue { + if (this.closed) return .undefined; + return JSValue.jsNumber(this.socket.boundPort()); + } + + fn addressToString(globalThis: *JSGlobalObject, address_bytes: []const u8) JSValue { + var text_buf: [512]u8 = undefined; + const address: std.net.Address = switch (address_bytes.len) { + 4 => std.net.Address.initIp4(address_bytes[0..4].*, 0), + 16 => std.net.Address.initIp6(address_bytes[0..16].*, 0, 0, 0), + else => return .undefined, + }; + + const slice = bun.fmt.formatIp(address, &text_buf) catch unreachable; + return bun.String.createLatin1(slice).toJS(globalThis); + } + + pub fn getAddress(this: *This, globalThis: *JSGlobalObject) callconv(.C) JSValue { + if (this.closed) return .undefined; + var buf: [64]u8 = [_]u8{0} ** 64; + var length: i32 = 64; + this.socket.boundIp(&buf, &length); + + const address_bytes = buf[0..@as(usize, @intCast(length))]; + const port = this.socket.boundPort(); + return JSSocketAddress__create( + globalThis, + addressToString(globalThis, address_bytes), + @intCast(port), + length == 16, + ); + } + + pub fn getRemoteAddress(this: *This, globalThis: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + if (this.closed) return .undefined; + const connect_info = this.connect_info orelse return .undefined; + var buf: [64]u8 = [_]u8{0} ** 64; + var length: i32 = 64; + this.socket.remoteIp(&buf, &length); + + const address_bytes = buf[0..@as(usize, @intCast(length))]; + return JSSocketAddress__create( + globalThis, + addressToString(globalThis, address_bytes), + connect_info.port, + length == 16, + ); + } + + pub fn getBinaryType( + this: *This, + globalThis: *JSGlobalObject, + ) callconv(.C) JSValue { + return switch (this.config.binary_type) { + .Buffer => JSC.ZigString.init("buffer").toValueGC(globalThis), + .Uint8Array => JSC.ZigString.init("uint8array").toValueGC(globalThis), + .ArrayBuffer => JSC.ZigString.init("arraybuffer").toValueGC(globalThis), + else => @panic("Invalid binary type"), + }; + } + + pub fn finalize(this: *This) callconv(.C) void { + log("Finalize {*}", .{this}); + this.deinit(); + } + + pub fn deinit(this: *This) void { + // finalize is only called when js_refcount reaches 0 + // js_refcount can only reach 0 when the socket is closed + bun.assert(this.closed); + + this.config.deinit(); + this.destroy(); + } + + pub fn jsConnect(globalThis: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const args = callFrame.arguments(2); + + const this = callFrame.this().as(UDPSocket) orelse { + globalThis.throwInvalidArguments("Expected UDPSocket as 'this'", .{}); + return .zero; + }; + + if (this.connect_info != null) { + globalThis.throw("Socket is already connected", .{}); + return .zero; + } + + if (this.closed) { + globalThis.throw("Socket is closed", .{}); + return .zero; + } + + if (args.len < 2) { + globalThis.throwInvalidArguments("Expected 2 arguments", .{}); + return .zero; + } + + const str = args.ptr[0].toBunString(globalThis); + defer str.deref(); + const connect_host = str.toOwnedSliceZ(default_allocator) catch bun.outOfMemory(); + defer default_allocator.free(connect_host); + + const connect_port_js = args.ptr[1]; + + if (!connect_port_js.isNumber()) { + globalThis.throwInvalidArguments("Expected \"port\" to be an integer", .{}); + return .zero; + } + + const connect_port = connect_port_js.asInt32(); + const port: u16 = if (connect_port < 1 or connect_port > 0xffff) 0 else @as(u16, @intCast(connect_port)); + + if (this.socket.connect(connect_host, port) == -1) { + globalThis.throw("Failed to connect socket", .{}); + return .zero; + } + this.connect_info = .{ + .port = port, + }; + // TODO reset cached remoteAddress property + + return .undefined; + } + + pub fn jsDisconnect(globalObject: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const this = callFrame.this().as(UDPSocket) orelse { + globalObject.throwInvalidArguments("Expected UDPSocket as 'this'", .{}); + return .zero; + }; + + if (this.connect_info == null) { + globalObject.throw("Socket is not connected", .{}); + return .zero; + } + + if (this.closed) { + globalObject.throw("Socket is closed", .{}); + return .zero; + } + + if (this.socket.disconnect() == -1) { + globalObject.throw("Failed to disconnect socket", .{}); + return .zero; + } + this.connect_info = null; + + return .undefined; + } +}; diff --git a/src/bun.js/api/sockets.classes.ts b/src/bun.js/api/sockets.classes.ts index a3db312476..4a46355719 100644 --- a/src/bun.js/api/sockets.classes.ts +++ b/src/bun.js/api/sockets.classes.ts @@ -218,4 +218,63 @@ export default [ construct: true, klass: {}, }), + + define({ + name: "UDPSocket", + noConstructor: true, + JSType: "0b11101110", + finalize: true, + construct: true, + hasPendingActivity: true, + proto: { + send: { + fn: "send", + length: 3, + }, + sendMany: { + fn: "sendMany", + length: 3, + }, + close: { + fn: "close", + length: 0, + }, + reload: { + fn: "reload", + length: 1, + }, + ref: { + fn: "ref", + length: 0, + }, + unref: { + fn: "unref", + length: 0, + }, + hostname: { + getter: "getHostname", + cache: true, + }, + port: { + getter: "getPort", + cache: true, + }, + address: { + getter: "getAddress", + cache: true, + }, + remoteAddress: { + getter: "getRemoteAddress", + cache: true, + }, + binaryType: { + getter: "getBinaryType", + cache: true, + }, + closed: { + getter: "getClosed", + }, + }, + klass: {}, + }), ]; diff --git a/src/bun.js/bindings/BunObject+exports.h b/src/bun.js/bindings/BunObject+exports.h index 37087710a4..ed64a72518 100644 --- a/src/bun.js/bindings/BunObject+exports.h +++ b/src/bun.js/bindings/BunObject+exports.h @@ -48,6 +48,7 @@ macro(inflateSync) \ macro(jest) \ macro(listen) \ + macro(udpSocket) \ macro(mmap) \ macro(nanoseconds) \ macro(openInEditor) \ diff --git a/src/bun.js/bindings/BunObject.cpp b/src/bun.js/bindings/BunObject.cpp index 6f5d78352b..afb215e92c 100644 --- a/src/bun.js/bindings/BunObject.cpp +++ b/src/bun.js/bindings/BunObject.cpp @@ -566,6 +566,7 @@ JSC_DEFINE_HOST_FUNCTION(functionFileURLToPath, (JSC::JSGlobalObject * globalObj isMainThread constructIsMainThread ReadOnly|DontDelete|PropertyCallback jest BunObject_callback_jest DontEnum|DontDelete|Function 1 listen BunObject_callback_listen DontDelete|Function 1 + udpSocket BunObject_callback_udpSocket DontDelete|Function 1 main BunObject_getter_wrap_main DontDelete|PropertyCallback mmap BunObject_callback_mmap DontDelete|Function 1 nanoseconds functionBunNanoseconds DontDelete|Function 0 diff --git a/src/bun.js/bindings/bindings.zig b/src/bun.js/bindings/bindings.zig index 5260e34497..c3f40c8c03 100644 --- a/src/bun.js/bindings/bindings.zig +++ b/src/bun.js/bindings/bindings.zig @@ -441,7 +441,7 @@ pub const ZigString = extern struct { }; } - pub const empty = Slice{ .ptr = undefined, .len = 0 }; + pub const empty = Slice{ .ptr = "", .len = 0 }; pub inline fn isAllocated(this: Slice) bool { return !this.allocator.isNull(); diff --git a/src/bun.js/bindings/generated_classes_list.zig b/src/bun.js/bindings/generated_classes_list.zig index 50e3a8960c..f154e6c85c 100644 --- a/src/bun.js/bindings/generated_classes_list.zig +++ b/src/bun.js/bindings/generated_classes_list.zig @@ -52,6 +52,7 @@ pub const Classes = struct { pub const ResourceUsage = JSC.ResourceUsage; pub const TCPSocket = JSC.API.TCPSocket; pub const TLSSocket = JSC.API.TLSSocket; + pub const UDPSocket = JSC.API.UDPSocket; pub const TextDecoder = JSC.WebCore.TextDecoder; pub const Timeout = JSC.API.Bun.Timer.TimerObject; pub const BuildArtifact = JSC.API.BuildArtifact; diff --git a/src/bun.js/bindings/helpers.h b/src/bun.js/bindings/helpers.h index 7982f624cf..b6f4b34ec9 100644 --- a/src/bun.js/bindings/helpers.h +++ b/src/bun.js/bindings/helpers.h @@ -246,7 +246,7 @@ static const JSC::JSValue toJSStringValueGC(ZigString str, JSC::JSGlobalObject* return JSC::JSValue(toJSStringGC(str, global)); } -static const ZigString ZigStringEmpty = ZigString { nullptr, 0 }; +static const ZigString ZigStringEmpty = ZigString { (unsigned char*)"", 0 }; static const unsigned char __dot_char = '.'; static const ZigString ZigStringCwd = ZigString { &__dot_char, 1 }; static const BunString BunStringCwd = BunString { BunStringTag::StaticZigString, ZigStringCwd }; diff --git a/src/bun.js/node/node_os.zig b/src/bun.js/node/node_os.zig index 9cb4b6bc57..da1036bf4f 100644 --- a/src/bun.js/node/node_os.zig +++ b/src/bun.js/node/node_os.zig @@ -541,11 +541,16 @@ pub const OS = struct { // hex characters and 5 for the colon separators var mac_buf: [17]u8 = undefined; const addr_data = if (comptime Environment.isLinux) ll_addr.addr else if (comptime Environment.isMac) ll_addr.sdl_data[ll_addr.sdl_nlen..] else @compileError("unreachable"); - const mac = std.fmt.bufPrint(&mac_buf, "{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}", .{ - addr_data[0], addr_data[1], addr_data[2], - addr_data[3], addr_data[4], addr_data[5], - }) catch unreachable; - interface.put(globalThis, JSC.ZigString.static("mac"), JSC.ZigString.init(mac).withEncoding().toValueGC(globalThis)); + if (addr_data.len < 6) { + const mac = "00:00:00:00:00:00"; + interface.put(globalThis, JSC.ZigString.static("mac"), JSC.ZigString.init(mac).withEncoding().toValueGC(globalThis)); + } else { + const mac = std.fmt.bufPrint(&mac_buf, "{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}:{x:0>2}", .{ + addr_data[0], addr_data[1], addr_data[2], + addr_data[3], addr_data[4], addr_data[5], + }) catch unreachable; + interface.put(globalThis, JSC.ZigString.static("mac"), JSC.ZigString.init(mac).withEncoding().toValueGC(globalThis)); + } } else { const mac = "00:00:00:00:00:00"; interface.put(globalThis, JSC.ZigString.static("mac"), JSC.ZigString.init(mac).withEncoding().toValueGC(globalThis)); diff --git a/src/deps/c_ares.zig b/src/deps/c_ares.zig index 3a10195ea4..1f4dbaab3b 100644 --- a/src/deps/c_ares.zig +++ b/src/deps/c_ares.zig @@ -548,9 +548,6 @@ pub const Channel = opaque { var host_buf: [1024]u8 = undefined; var port_buf: [52]u8 = undefined; const host_ptr: ?[*:0]const u8 = brk: { - if (!(host.len > 0 and !bun.strings.eqlComptime(host, "0.0.0.0") and !bun.strings.eqlComptime(host, "::0"))) { - break :brk null; - } const len = @min(host.len, host_buf.len - 1); @memcpy(host_buf[0..len], host[0..len]); host_buf[len] = 0; diff --git a/src/deps/uws.zig b/src/deps/uws.zig index 356855dba2..3a33807682 100644 --- a/src/deps/uws.zig +++ b/src/deps/uws.zig @@ -30,9 +30,11 @@ pub const InternalLoopData = extern struct { head: ?*SocketContext, iterator: ?*SocketContext, recv_buf: [*]u8, + send_buf: [*]u8, ssl_data: ?*anyopaque, pre_cb: ?*fn (?*Loop) callconv(.C) void, post_cb: ?*fn (?*Loop) callconv(.C) void, + closed_udp_head: ?*udp.Socket, closed_head: ?*Socket, low_prio_head: ?*Socket, low_prio_budget: i32, @@ -1300,8 +1302,8 @@ pub const Poll = opaque { pub const write_flag = if (Environment.isLinux) std.os.linux.EPOLL.OUT else 2; }; - pub fn deinit(self: *Poll) void { - us_poll_free(self); + pub fn deinit(self: *Poll, loop: *Loop) void { + us_poll_free(self, loop); } // (void* userData, int fd, int events, int error, struct us_poll_t *poll) @@ -2697,3 +2699,79 @@ pub fn newSocketFromPair(ctx: *SocketContext, ext_size: c_int, fds: *[2]LIBUS_SO } extern fn us_socket_get_error(ssl_flag: c_int, socket: *Socket) c_int; + +pub const udp = struct { + pub const Socket = opaque { + const This = @This(); + + pub fn create(loop: *Loop, data_cb: *const fn (*This, *PacketBuffer, c_int) callconv(.C) void, drain_cb: *const fn (*This) callconv(.C) void, close_cb: *const fn (*This) callconv(.C) void, host: [*c]const u8, port: c_ushort, user_data: ?*anyopaque) ?*This { + return us_create_udp_socket(loop, data_cb, drain_cb, close_cb, host, port, user_data); + } + + pub fn send(this: *This, payloads: []const [*]const u8, lengths: []const usize, addresses: []const ?*const anyopaque) c_int { + bun.assert(payloads.len == lengths.len and payloads.len == addresses.len); + return us_udp_socket_send(this, payloads.ptr, lengths.ptr, addresses.ptr, @intCast(payloads.len)); + } + + pub fn user(this: *This) ?*anyopaque { + return us_udp_socket_user(this); + } + + pub fn bind(this: *This, hostname: [*c]const u8, port: c_uint) c_int { + return us_udp_socket_bind(this, hostname, port); + } + + pub fn boundPort(this: *This) c_int { + return us_udp_socket_bound_port(this); + } + + pub fn boundIp(this: *This, buf: [*c]u8, length: *i32) void { + return us_udp_socket_bound_ip(this, buf, length); + } + + pub fn remoteIp(this: *This, buf: [*c]u8, length: *i32) void { + return us_udp_socket_remote_ip(this, buf, length); + } + + pub fn close(this: *This) void { + return us_udp_socket_close(this); + } + + pub fn connect(this: *This, hostname: [*c]const u8, port: c_uint) c_int { + return us_udp_socket_connect(this, hostname, port); + } + + pub fn disconnect(this: *This) c_int { + return us_udp_socket_disconnect(this); + } + }; + + extern fn us_create_udp_socket(loop: ?*Loop, data_cb: *const fn (*udp.Socket, *PacketBuffer, c_int) callconv(.C) void, drain_cb: *const fn (*udp.Socket) callconv(.C) void, close_cb: *const fn (*udp.Socket) callconv(.C) void, host: [*c]const u8, port: c_ushort, user_data: ?*anyopaque) ?*udp.Socket; + extern fn us_udp_socket_connect(socket: ?*udp.Socket, hostname: [*c]const u8, port: c_uint) c_int; + extern fn us_udp_socket_disconnect(socket: ?*udp.Socket) c_int; + extern fn us_udp_socket_send(socket: ?*udp.Socket, [*c]const [*c]const u8, [*c]const usize, [*c]const ?*const anyopaque, c_int) c_int; + extern fn us_udp_socket_user(socket: ?*udp.Socket) ?*anyopaque; + extern fn us_udp_socket_bind(socket: ?*udp.Socket, hostname: [*c]const u8, port: c_uint) c_int; + extern fn us_udp_socket_bound_port(socket: ?*udp.Socket) c_int; + extern fn us_udp_socket_bound_ip(socket: ?*udp.Socket, buf: [*c]u8, length: [*c]i32) void; + extern fn us_udp_socket_remote_ip(socket: ?*udp.Socket, buf: [*c]u8, length: [*c]i32) void; + extern fn us_udp_socket_close(socket: ?*udp.Socket) void; + + pub const PacketBuffer = opaque { + const This = @This(); + + pub fn getPeer(this: *This, index: c_int) *std.os.sockaddr.storage { + return us_udp_packet_buffer_peer(this, index); + } + + pub fn getPayload(this: *This, index: c_int) []u8 { + const payload = us_udp_packet_buffer_payload(this, index); + const len = us_udp_packet_buffer_payload_length(this, index); + return payload[0..@as(usize, @intCast(len))]; + } + }; + + extern fn us_udp_packet_buffer_peer(buf: ?*PacketBuffer, index: c_int) *std.os.sockaddr.storage; + extern fn us_udp_packet_buffer_payload(buf: ?*PacketBuffer, index: c_int) [*]u8; + extern fn us_udp_packet_buffer_payload_length(buf: ?*PacketBuffer, index: c_int) c_int; +}; diff --git a/src/js/node/dgram.ts b/src/js/node/dgram.ts index b7abe9156a..6e2e115bd5 100644 --- a/src/js/node/dgram.ts +++ b/src/js/node/dgram.ts @@ -1,23 +1,1076 @@ -// Hardcoded module "node:dgram" -// This is a stub! None of this is actually implemented yet. +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +const BIND_STATE_UNBOUND = 0; +const BIND_STATE_BINDING = 1; +const BIND_STATE_BOUND = 2; + +const CONNECT_STATE_DISCONNECTED = 0; +const CONNECT_STATE_CONNECTING = 1; +const CONNECT_STATE_CONNECTED = 2; + +const RECV_BUFFER = true; +const SEND_BUFFER = false; + +const kStateSymbol = Symbol("state symbol"); +const async_id_symbol = Symbol("async_id_symbol"); + const { hideFromStack, throwNotImplemented } = require("internal/shared"); -function createSocket() { - throwNotImplemented("node:dgram createSocket", 1630); +const { + FunctionPrototypeBind, + ObjectSetPrototypeOf, + SymbolAsyncDispose, + SymbolDispose, + StringPrototypeTrim, + NumberIsNaN, +} = require("internal/primordials"); + +const EventEmitter = require("node:events"); + +class ERR_OUT_OF_RANGE extends Error { + constructor(argumentName, range, received) { + super(`The value of "${argumentName}" is out of range. It must be ${range}. Received ${received}`); + this.code = "ERR_OUT_OF_RANGE"; + } } -function Socket() { - throwNotImplemented("node:dgram Socket", 1630); +class ERR_BUFFER_OUT_OF_BOUNDS extends Error { + constructor() { + super("Buffer offset or length is out of bounds"); + this.code = "ERR_BUFFER_OUT_OF_BOUNDS"; + } } -function _createSocketHandle() { - throwNotImplemented("node:dgram _createSocketHandle", 1630); +class ERR_INVALID_ARG_TYPE extends Error { + constructor(argName, expected, actual) { + super(`The "${argName}" argument must be of type ${expected}. Received type ${typeof actual}`); + this.code = "ERR_INVALID_ARG_TYPE"; + } } -export default { - createSocket, - Socket, - _createSocketHandle, +class ERR_MISSING_ARGS extends Error { + constructor(argName) { + super(`The "${argName}" argument is required`); + this.code = "ERR_MISSING_ARGS"; + } +} + +class ERR_SOCKET_ALREADY_BOUND extends Error { + constructor() { + super("Socket is already bound"); + this.code = "ERR_SOCKET_ALREADY_BOUND"; + } +} + +class ERR_SOCKET_BAD_BUFFER_SIZE extends Error { + constructor() { + super("Buffer size must be a number"); + this.code = "ERR_SOCKET_BAD_BUFFER_SIZE"; + } +} + +class ERR_SOCKET_BUFFER_SIZE extends Error { + constructor(ctx) { + super(`Invalid buffer size: ${ctx}`); + this.code = "ERR_SOCKET_BUFFER_SIZE"; + } +} + +class ERR_SOCKET_DGRAM_IS_CONNECTED extends Error { + constructor() { + super("Socket is connected"); + this.code = "ERR_SOCKET_DGRAM_IS_CONNECTED"; + } +} + +class ERR_SOCKET_DGRAM_NOT_CONNECTED extends Error { + constructor() { + super("Socket is not connected"); + this.code = "ERR_SOCKET_DGRAM_NOT_CONNECTED"; + } +} + +class ERR_SOCKET_BAD_PORT extends Error { + constructor(name, port, allowZero) { + super(`Invalid ${name}: ${port}. Ports must be >= 0 and <= 65535. ${allowZero ? "0" : ""}`); + this.code = "ERR_SOCKET_BAD_PORT"; + } +} + +class ERR_SOCKET_DGRAM_NOT_RUNNING extends Error { + constructor() { + super("Socket is not running"); + this.code = "ERR_SOCKET_DGRAM_NOT_RUNNING"; + } +} + +function isInt32(value) { + return value === (value | 0); +} + +function validateAbortSignal(signal, name) { + if (signal !== undefined && (signal === null || typeof signal !== "object" || !("aborted" in signal))) { + throw new ERR_INVALID_ARG_TYPE(name, "AbortSignal", signal); + } +} +hideFromStack(validateAbortSignal); + +function validateString(value, name) { + if (typeof value !== "string") throw new ERR_INVALID_ARG_TYPE(name, "string", value); +} +hideFromStack(validateString); + +function validateNumber(value, name, min = undefined, max) { + if (typeof value !== "number") throw new ERR_INVALID_ARG_TYPE(name, "number", value); + + if ( + (min != null && value < min) || + (max != null && value > max) || + ((min != null || max != null) && NumberIsNaN(value)) + ) { + throw new ERR_OUT_OF_RANGE( + name, + `${min != null ? `>= ${min}` : ""}${min != null && max != null ? " && " : ""}${max != null ? `<= ${max}` : ""}`, + value, + ); + } +} +hideFromStack(validateNumber); + +function validatePort(port, name = "Port", allowZero = true) { + if ( + (typeof port !== "number" && typeof port !== "string") || + (typeof port === "string" && StringPrototypeTrim(port).length === 0) || + +port !== +port >>> 0 || + port > 0xffff || + (port === 0 && !allowZero) + ) { + throw new ERR_SOCKET_BAD_PORT(name, port, allowZero); + } + return port | 0; +} +hideFromStack(validatePort); + +function validateFunction(value, name) { + if (typeof value !== "function") throw new ERR_INVALID_ARG_TYPE(name, "Function", value); +} +hideFromStack(validateFunction); + +// placeholder +function defaultTriggerAsyncIdScope(triggerAsyncId, block, ...args) { + return block.$apply(null, args); +} + +function lookup4(lookup, address, callback) { + return lookup(address || "127.0.0.1", 4, callback); +} + +function lookup6(lookup, address, callback) { + return lookup(address || "::1", 6, callback); +} + +let dns; + +function newHandle(type, lookup) { + if (lookup === undefined) { + if (dns === undefined) { + dns = require("node:dns"); + } + + lookup = dns.lookup; + } else { + validateFunction(lookup, "lookup"); + } + + const handle = {}; + if (type === "udp4") { + handle.lookup = FunctionPrototypeBind(lookup4, handle, lookup); + } else if (type === "udp6") { + handle.lookup = FunctionPrototypeBind(lookup6, handle, lookup); + } else { + throw new ERR_SOCKET_BAD_TYPE(); + } + + return handle; +} + +let udpSocketChannel; + +function Socket(type, listener) { + EventEmitter.$call(this); + let lookup; + let recvBufferSize; + let sendBufferSize; + + let options; + if (type !== null && typeof type === "object") { + options = type; + type = options.type; + lookup = options.lookup; + recvBufferSize = options.recvBufferSize; + sendBufferSize = options.sendBufferSize; + } + + const handle = newHandle(type, lookup); + + // this[async_id_symbol] = handle.getAsyncId(); + this.type = type; + + if (typeof listener === "function") this.on("message", listener); + + this[kStateSymbol] = { + handle, + receiving: false, + bindState: BIND_STATE_UNBOUND, + connectState: CONNECT_STATE_DISCONNECTED, + queue: undefined, + reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. + ipv6Only: options && options.ipv6Only, + recvBufferSize, + sendBufferSize, + }; + + if (options?.signal !== undefined) { + const { signal } = options; + validateAbortSignal(signal, "options.signal"); + const onAborted = () => { + if (this[kStateSymbol].handle) this.close(); + }; + if (signal.aborted) { + onAborted(); + } else { + const disposable = EventEmitter.addAbortListener(signal, onAborted); + this.once("close", disposable[SymbolDispose]); + } + } + if (!udpSocketChannel) { + udpSocketChannel = require("node:diagnostics_channel").channel("udp.socket"); + } + if (udpSocketChannel.hasSubscribers) { + udpSocketChannel.publish({ + socket: this, + }); + } +} +Socket.prototype = {}; +ObjectSetPrototypeOf(Socket.prototype, EventEmitter.prototype); +ObjectSetPrototypeOf(Socket, EventEmitter); + +function createSocket(type, listener) { + return new Socket(type, listener); +} + +function bufferSize(self, size, buffer) { + if (size >>> 0 !== size) throw new ERR_SOCKET_BAD_BUFFER_SIZE(); + + const ctx = {}; + // const ret = self[kStateSymbol].handle.bufferSize(size, buffer, ctx); + const ret = 1 << 19; // common buffer for all sockets is fixed at 512KiB + if (ret === undefined) { + throw new ERR_SOCKET_BUFFER_SIZE(ctx); + } + return ret; +} + +Socket.prototype.bind = function (port_, address_ /* , callback */) { + let port = port_; + + const state = this[kStateSymbol]; + + if (state.bindState !== BIND_STATE_UNBOUND) throw new ERR_SOCKET_ALREADY_BOUND(); + + state.bindState = BIND_STATE_BINDING; + + const cb = arguments.length && arguments[arguments.length - 1]; + if (typeof cb === "function") { + function removeListeners() { + this.removeListener("error", removeListeners); + this.removeListener("listening", onListening); + } + + function onListening() { + removeListeners.$call(this); + cb.$call(this); + } + + this.on("error", removeListeners); + this.on("listening", onListening); + } + + if (port !== null && typeof port === "object" && typeof port.recvStart === "function") { + throwNotImplemented("Socket.prototype.bind(handle)"); + /* + replaceHandle(this, port); + startListening(this); + return this; + */ + } + + // Open an existing fd instead of creating a new one. + if (port !== null && typeof port === "object" && isInt32(port.fd) && port.fd > 0) { + throwNotImplemented("Socket.prototype.bind({ fd })"); + /* + const fd = port.fd; + const exclusive = !!port.exclusive; + const state = this[kStateSymbol]; + + const type = guessHandleType(fd); + if (type !== 'UDP') + throw new ERR_INVALID_FD_TYPE(type); + const err = state.handle.open(fd); + + if (err) + throw new ErrnoException(err, 'open'); + + startListening(this); + return this; + */ + } + + let address; + let exclusive; + + if (port !== null && typeof port === "object") { + address = port.address || ""; + exclusive = !!port.exclusive; + port = port.port; + } else { + address = typeof address_ === "function" ? "" : address_; + exclusive = false; + } + + // Defaulting address for bind to all interfaces + if (!address) { + if (this.type === "udp4") address = "0.0.0.0"; + else address = "::"; + } + + // Resolve address first + state.handle.lookup(address, (err, ip) => { + if (!state.handle) return; // Handle has been closed in the mean time + + if (err) { + state.bindState = BIND_STATE_UNBOUND; + this.emit("error", err); + return; + } + + let flags = 0; + if (state.reuseAddr) flags |= 0; //UV_UDP_REUSEADDR; + if (state.ipv6Only) flags |= 0; //UV_UDP_IPV6ONLY; + + // TODO flags + const family = this.type === "udp4" ? "IPv4" : "IPv6"; + Bun.udpSocket({ + hostname: ip, + port: port || 0, + socket: { + data: (_socket, data, port, address) => { + this.emit("message", data, { + port: port, + address: address, + size: data.length, + // TODO check if this is correct + family, + }); + }, + error: (_socket, error) => { + this.emit("error", error); + }, + }, + }).$then( + socket => { + state.handle.socket = socket; + state.receiving = true; + state.bindState = BIND_STATE_BOUND; + + this.emit("listening"); + }, + err => { + state.bindState = BIND_STATE_UNBOUND; + this.emit("error", err); + }, + ); + }); + + return this; }; -hideFromStack(createSocket, Socket, _createSocketHandle); +Socket.prototype.connect = function (port, address, callback) { + port = validatePort(port, "Port", false); + if (typeof address === "function") { + callback = address; + address = ""; + } else if (address === undefined) { + address = ""; + } + + validateString(address, "address"); + + const state = this[kStateSymbol]; + + if (state.connectState !== CONNECT_STATE_DISCONNECTED) throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); + + state.connectState = CONNECT_STATE_CONNECTING; + if (state.bindState === BIND_STATE_UNBOUND) this.bind({ port: 0, exclusive: true }, null); + + if (state.bindState !== BIND_STATE_BOUND) { + enqueue(this, FunctionPrototypeBind(_connect, this, port, address, callback)); + return; + } + + _connect.$apply(this, [port, address, callback]); +}; + +function _connect(port, address, callback) { + const state = this[kStateSymbol]; + if (callback) this.once("connect", callback); + + const afterDns = (ex, ip) => { + defaultTriggerAsyncIdScope(this[async_id_symbol], doConnect, ex, this, ip, address, port, callback); + }; + + state.handle.lookup(address, afterDns); +} + +const connectFn = $newZigFunction("udp_socket.zig", "UDPSocket.jsConnect", 2); + +function doConnect(ex, self, ip, address, port, callback) { + const state = self[kStateSymbol]; + if (!state.handle) return; + + if (!ex) { + try { + connectFn.$call(state.handle.socket, ip, port); + } catch (e) { + ex = e; + } + } + + if (ex) { + state.connectState = CONNECT_STATE_DISCONNECTED; + return process.nextTick(() => { + if (callback) { + self.removeListener("connect", callback); + callback(ex); + } else { + self.emit("error", ex); + } + }); + } + + state.connectState = CONNECT_STATE_CONNECTED; + process.nextTick(() => self.emit("connect")); +} + +const disconnectFn = $newZigFunction("udp_socket.zig", "UDPSocket.jsDisconnect", 0); + +Socket.prototype.disconnect = function () { + const state = this[kStateSymbol]; + if (state.connectState !== CONNECT_STATE_CONNECTED) throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); + + disconnectFn.$call(state.handle.socket); + state.connectState = CONNECT_STATE_DISCONNECTED; +}; + +// Thin wrapper around `send`, here for compatibility with dgram_legacy.js +Socket.prototype.sendto = function (buffer, offset, length, port, address, callback) { + validateNumber(offset, "offset"); + validateNumber(length, "length"); + validateNumber(port, "port"); + validateString(address, "address"); + + this.send(buffer, offset, length, port, address, callback); +}; + +function sliceBuffer(buffer, offset, length) { + if (typeof buffer === "string") { + buffer = Buffer.from(buffer); + } else if (!ArrayBuffer.isView(buffer)) { + throw new ERR_INVALID_ARG_TYPE("buffer", ["Buffer", "TypedArray", "DataView", "string"], buffer); + } + + offset = offset >>> 0; + length = length >>> 0; + if (offset > buffer.byteLength) { + throw new ERR_BUFFER_OUT_OF_BOUNDS("offset"); + } + + if (offset + length > buffer.byteLength) { + throw new ERR_BUFFER_OUT_OF_BOUNDS("length"); + } + + return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length); +} + +function fixBufferList(list) { + const newlist = new Array(list.length); + + for (let i = 0, l = list.length; i < l; i++) { + const buf = list[i]; + if (typeof buf === "string") newlist[i] = Buffer.from(buf); + else if (!ArrayBuffer.isView(buf)) return null; + else newlist[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength); + } + + return newlist; +} + +function enqueue(self, toEnqueue) { + const state = self[kStateSymbol]; + + // If the send queue hasn't been initialized yet, do it, and install an + // event handler that flushes the send queue after binding is done. + if (state.queue === undefined) { + state.queue = []; + self.once(EventEmitter.errorMonitor, onListenError); + self.once("listening", onListenSuccess); + } + state.queue.push(toEnqueue); +} + +function onListenSuccess() { + this.removeListener(EventEmitter.errorMonitor, onListenError); + clearQueue.$call(this); +} + +function onListenError(err) { + this.removeListener("listening", onListenSuccess); + this[kStateSymbol].queue = undefined; +} + +function clearQueue() { + const state = this[kStateSymbol]; + const queue = state.queue; + state.queue = undefined; + + // Flush the send queue. + for (const queueEntry of queue) queueEntry(); +} + +// valid combinations +// For connectionless sockets +// send(buffer, offset, length, port, address, callback) +// send(buffer, offset, length, port, address) +// send(buffer, offset, length, port, callback) +// send(buffer, offset, length, port) +// send(bufferOrList, port, address, callback) +// send(bufferOrList, port, address) +// send(bufferOrList, port, callback) +// send(bufferOrList, port) +// For connected sockets +// send(buffer, offset, length, callback) +// send(buffer, offset, length) +// send(bufferOrList, callback) +// send(bufferOrList) +Socket.prototype.send = function (buffer, offset, length, port, address, callback) { + let list; + const state = this[kStateSymbol]; + const connected = state.connectState === CONNECT_STATE_CONNECTED; + if (!connected) { + if (address || (port && typeof port !== "function")) { + buffer = sliceBuffer(buffer, offset, length); + } else { + callback = port; + port = offset; + address = length; + } + } else { + if (typeof length === "number") { + buffer = sliceBuffer(buffer, offset, length); + if (typeof port === "function") { + callback = port; + port = null; + } + } else { + callback = offset; + } + + if (port || address) throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); + } + + if (!Array.isArray(buffer)) { + if (typeof buffer === "string") { + list = [Buffer.from(buffer)]; + } else if (!ArrayBuffer.isView(buffer)) { + throw new ERR_INVALID_ARG_TYPE("buffer", ["Buffer", "TypedArray", "DataView", "string"], buffer); + } else { + list = [buffer]; + } + } else if (!(list = fixBufferList(buffer))) { + throw new ERR_INVALID_ARG_TYPE("buffer list arguments", ["Buffer", "TypedArray", "DataView", "string"], buffer); + } + + if (!connected) port = validatePort(port, "Port", false); + + // Normalize callback so it's either a function or undefined but not anything + // else. + if (typeof callback !== "function") callback = undefined; + + if (typeof address === "function") { + callback = address; + address = undefined; + } else if (address != null) { + validateString(address, "address"); + } + + if (state.bindState === BIND_STATE_UNBOUND) this.bind({ port: 0, exclusive: true }, null); + + if (list.length === 0) list.push(Buffer.alloc(0)); + + // If the socket hasn't been bound yet, push the outbound packet onto the + // send queue and send after binding is complete. + if (state.bindState !== BIND_STATE_BOUND) { + enqueue(this, FunctionPrototypeBind(this.send, this, list, port, address, callback)); + return; + } + + const afterDns = (ex, ip) => { + defaultTriggerAsyncIdScope(this[async_id_symbol], doSend, ex, this, ip, list, address, port, callback); + }; + + if (!connected) { + state.handle.lookup(address, afterDns); + } else { + afterDns(null, null); + } +}; + +function doSend(ex, self, ip, list, address, port, callback) { + const state = self[kStateSymbol]; + + if (ex) { + if (typeof callback === "function") { + process.nextTick(callback, ex); + return; + } + + process.nextTick(() => self.emit("error", ex)); + return; + } + if (!state.handle) { + return; + } + const socket = state.handle.socket; + if (!socket) { + return; + } + + let err = null; + let success = false; + const data = Buffer.concat(list); + try { + if (port) { + success = socket.send(data, port, ip); + } else { + success = socket.send(data); + } + } catch (e) { + err = e; + } + // TODO check if this makes sense + if (callback) { + if (err) { + process.nextTick(callback, err); + } else { + const sent = success ? data.length : 0; + process.nextTick(callback, null, sent); + } + } + + /* + const req = new SendWrap(); + req.list = list; // Keep reference alive. + req.address = address; + req.port = port; + if (callback) { + req.callback = callback; + req.oncomplete = afterSend; + } + + let err; + if (port) + err = state.handle.send(req, list, list.length, port, ip, !!callback); + else + err = state.handle.send(req, list, list.length, !!callback); + + if (err >= 1) { + // Synchronous finish. The return code is msg_length + 1 so that we can + // distinguish between synchronous success and asynchronous success. + if (callback) + process.nextTick(callback, null, err - 1); + return; + } + + if (err && callback) { + // Don't emit as error, dgram_legacy.js compatibility + const ex = new ExceptionWithHostPort(err, 'send', address, port); + process.nextTick(callback, ex); + } + */ +} + +/* +function afterSend(err, sent) { + if (err) { + err = new ExceptionWithHostPort(err, 'send', this.address, this.port); + } else { + err = null; + } + + this.callback(err, sent); +} +*/ + +Socket.prototype.close = function (callback) { + const state = this[kStateSymbol]; + const queue = state.queue; + + if (typeof callback === "function") this.on("close", callback); + + if (queue !== undefined) { + queue.push(FunctionPrototypeBind(this.close, this)); + return this; + } + + state.receiving = false; + state.handle.socket?.close(); + state.handle.socket = undefined; + defaultTriggerAsyncIdScope(this[async_id_symbol], process.nextTick, socketCloseNT, this); + + return this; +}; + +Socket.prototype[SymbolAsyncDispose] = async function () { + if (!this[kStateSymbol].handle.socket) { + return; + } + return new Promise((resolve, reject) => { + this.close(err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); +}; + +function socketCloseNT(self) { + self.emit("close"); +} + +Socket.prototype.address = function () { + const addr = this[kStateSymbol].handle.socket?.address; + if (!addr) throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); + return addr; +}; + +Socket.prototype.remoteAddress = function () { + const state = this[kStateSymbol]; + const socket = state.handle.socket; + + if (!socket) throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); + + if (state.connectState !== CONNECT_STATE_CONNECTED) throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); + + if (!socket.remoteAddress) throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); + + return socket.remoteAddress; +}; + +Socket.prototype.setBroadcast = function (arg) { + throwNotImplemented("setBroadcast", 10381); + /* + const err = this[kStateSymbol].handle.setBroadcast(arg ? 1 : 0); + if (err) { + throw new ErrnoException(err, 'setBroadcast'); + } + */ +}; + +Socket.prototype.setTTL = function (ttl) { + throwNotImplemented("setTTL", 10381); + /* + validateNumber(ttl, 'ttl'); + + const err = this[kStateSymbol].handle.setTTL(ttl); + if (err) { + throw new ErrnoException(err, 'setTTL'); + } + + return ttl; + */ +}; + +Socket.prototype.setMulticastTTL = function (ttl) { + throwNotImplemented("setMulticastTTL", 10381); + /* + validateNumber(ttl, 'ttl'); + + const err = this[kStateSymbol].handle.setMulticastTTL(ttl); + if (err) { + throw new ErrnoException(err, 'setMulticastTTL'); + } + + return ttl; + */ +}; + +Socket.prototype.setMulticastLoopback = function (arg) { + throwNotImplemented("setMulticastLoopback", 10381); + /* + const err = this[kStateSymbol].handle.setMulticastLoopback(arg ? 1 : 0); + if (err) { + throw new ErrnoException(err, 'setMulticastLoopback'); + } + + return arg; // 0.4 compatibility + */ +}; + +Socket.prototype.setMulticastInterface = function (interfaceAddress) { + throwNotImplemented("setMulticastInterface", 10381); + /* + validateString(interfaceAddress, 'interfaceAddress'); + + const err = this[kStateSymbol].handle.setMulticastInterface(interfaceAddress); + if (err) { + throw new ErrnoException(err, 'setMulticastInterface'); + } + */ +}; + +Socket.prototype.addMembership = function (multicastAddress, interfaceAddress) { + throwNotImplemented("addMembership", 10381); + /* + if (!multicastAddress) { + throw new ERR_MISSING_ARGS('multicastAddress'); + } + + const { handle } = this[kStateSymbol]; + const err = handle.addMembership(multicastAddress, interfaceAddress); + if (err) { + throw new ErrnoException(err, 'addMembership'); + } + */ +}; + +Socket.prototype.dropMembership = function (multicastAddress, interfaceAddress) { + throwNotImplemented("dropMembership", 10381); + /* + if (!multicastAddress) { + throw new ERR_MISSING_ARGS('multicastAddress'); + } + + const { handle } = this[kStateSymbol]; + const err = handle.dropMembership(multicastAddress, interfaceAddress); + if (err) { + throw new ErrnoException(err, 'dropMembership'); + } + */ +}; + +Socket.prototype.addSourceSpecificMembership = function (sourceAddress, groupAddress, interfaceAddress) { + throwNotImplemented("addSourceSpecificMembership", 10381); + /* + validateString(sourceAddress, 'sourceAddress'); + validateString(groupAddress, 'groupAddress'); + + const err = + this[kStateSymbol].handle.addSourceSpecificMembership(sourceAddress, + groupAddress, + interfaceAddress); + if (err) { + throw new ErrnoException(err, 'addSourceSpecificMembership'); + } + */ +}; + +Socket.prototype.dropSourceSpecificMembership = function (sourceAddress, groupAddress, interfaceAddress) { + throwNotImplemented("dropSourceSpecificMembership", 10381); + /* + validateString(sourceAddress, 'sourceAddress'); + validateString(groupAddress, 'groupAddress'); + + const err = + this[kStateSymbol].handle.dropSourceSpecificMembership(sourceAddress, + groupAddress, + interfaceAddress); + if (err) { + throw new ErrnoException(err, 'dropSourceSpecificMembership'); + } + */ +}; + +Socket.prototype.ref = function () { + const socket = this[kStateSymbol].handle?.socket; + + if (socket) socket.ref(); + + return this; +}; + +Socket.prototype.unref = function () { + const socket = this[kStateSymbol].handle?.socket; + + if (socket) socket.unref(); + + return this; +}; + +Socket.prototype.setRecvBufferSize = function (size) { + bufferSize(this, size, RECV_BUFFER); +}; + +Socket.prototype.setSendBufferSize = function (size) { + bufferSize(this, size, SEND_BUFFER); +}; + +Socket.prototype.getRecvBufferSize = function () { + return bufferSize(this, 0, RECV_BUFFER); +}; + +Socket.prototype.getSendBufferSize = function () { + return bufferSize(this, 0, SEND_BUFFER); +}; + +Socket.prototype.getSendQueueSize = function () { + return 0; + // return this[kStateSymbol].handle.getSendQueueSize(); +}; + +Socket.prototype.getSendQueueCount = function () { + return 0; + // return this[kStateSymbol].handle.getSendQueueCount(); +}; + +// Deprecated private APIs. +/* +ObjectDefineProperty(Socket.prototype, '_handle', { + __proto__: null, + get: deprecate(function() { + return this[kStateSymbol].handle; + }, 'Socket.prototype._handle is deprecated', 'DEP0112'), + set: deprecate(function(val) { + this[kStateSymbol].handle = val; + }, 'Socket.prototype._handle is deprecated', 'DEP0112'), +}); + + +ObjectDefineProperty(Socket.prototype, '_receiving', { + __proto__: null, + get: deprecate(function() { + return this[kStateSymbol].receiving; + }, 'Socket.prototype._receiving is deprecated', 'DEP0112'), + set: deprecate(function(val) { + this[kStateSymbol].receiving = val; + }, 'Socket.prototype._receiving is deprecated', 'DEP0112'), +}); + + +ObjectDefineProperty(Socket.prototype, '_bindState', { + __proto__: null, + get: deprecate(function() { + return this[kStateSymbol].bindState; + }, 'Socket.prototype._bindState is deprecated', 'DEP0112'), + set: deprecate(function(val) { + this[kStateSymbol].bindState = val; + }, 'Socket.prototype._bindState is deprecated', 'DEP0112'), +}); + + +ObjectDefineProperty(Socket.prototype, '_queue', { + __proto__: null, + get: deprecate(function() { + return this[kStateSymbol].queue; + }, 'Socket.prototype._queue is deprecated', 'DEP0112'), + set: deprecate(function(val) { + this[kStateSymbol].queue = val; + }, 'Socket.prototype._queue is deprecated', 'DEP0112'), +}); + + +ObjectDefineProperty(Socket.prototype, '_reuseAddr', { + __proto__: null, + get: deprecate(function() { + return this[kStateSymbol].reuseAddr; + }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'), + set: deprecate(function(val) { + this[kStateSymbol].reuseAddr = val; + }, 'Socket.prototype._reuseAddr is deprecated', 'DEP0112'), +}); + + +Socket.prototype._healthCheck = deprecate(function() { + healthCheck(this); +}, 'Socket.prototype._healthCheck() is deprecated', 'DEP0112'); + + +Socket.prototype._stopReceiving = deprecate(function() { + stopReceiving(this); +}, 'Socket.prototype._stopReceiving() is deprecated', 'DEP0112'); + +function _createSocketHandle(address, port, addressType, fd, flags) { + const handle = newHandle(addressType); + let err; + + if (isInt32(fd) && fd > 0) { + const type = guessHandleType(fd); + if (type !== 'UDP') { + err = UV_EINVAL; + } else { + err = handle.open(fd); + } + } else if (port || address) { + err = handle.bind(address, port || 0, flags); + } + + if (err) { + handle.close(); + return err; + } + + return handle; +} + + +// Legacy alias on the C++ wrapper object. This is not public API, so we may +// want to runtime-deprecate it at some point. There's no hurry, though. +ObjectDefineProperty(UDP.prototype, 'owner', { + __proto__: null, + get() { return this[owner_symbol]; }, + set(v) { return this[owner_symbol] = v; }, +}); +*/ + +export default { + /* + _createSocketHandle: deprecate( + _createSocketHandle, + 'dgram._createSocketHandle() is deprecated', + 'DEP0112', + ), + */ + createSocket, + Socket, +}; diff --git a/src/jsc.zig b/src/jsc.zig index f6122942db..da30bd4960 100644 --- a/src/jsc.zig +++ b/src/jsc.zig @@ -46,6 +46,7 @@ pub const API = struct { pub const MatchedRoute = @import("./bun.js/api/filesystem_router.zig").MatchedRoute; pub const TCPSocket = @import("./bun.js/api/bun/socket.zig").TCPSocket; pub const TLSSocket = @import("./bun.js/api/bun/socket.zig").TLSSocket; + pub const UDPSocket = @import("./bun.js/api/bun/udp_socket.zig").UDPSocket; pub const Listener = @import("./bun.js/api/bun/socket.zig").Listener; pub const H2FrameParser = @import("./bun.js/api/bun/h2_frame_parser.zig").H2FrameParser; }; diff --git a/src/string.zig b/src/string.zig index 5172be01f1..d36e7139f0 100644 --- a/src/string.zig +++ b/src/string.zig @@ -158,6 +158,17 @@ pub const WTFStringImplStruct = extern struct { ); } + pub fn toOwnedSliceZ(this: WTFStringImpl, allocator: std.mem.Allocator) [:0]u8 { + if (this.is8Bit()) { + if (bun.strings.toUTF8FromLatin1Z(allocator, this.latin1Slice()) catch bun.outOfMemory()) |utf8| { + return utf8.items[0 .. utf8.items.len - 1 :0]; + } + + return allocator.dupeZ(u8, this.latin1Slice()) catch bun.outOfMemory(); + } + return bun.strings.toUTF8AllocZ(allocator, this.utf16Slice()) catch bun.outOfMemory(); + } + pub fn toUTF8IfNeeded(this: WTFStringImpl, allocator: std.mem.Allocator) ?ZigString.Slice { if (this.is8Bit()) { if (bun.strings.toUTF8FromLatin1(allocator, this.latin1Slice()) catch bun.outOfMemory()) |utf8| { diff --git a/src/string_immutable.zig b/src/string_immutable.zig index 38c01c4d80..3c41274e46 100644 --- a/src/string_immutable.zig +++ b/src/string_immutable.zig @@ -995,6 +995,13 @@ pub fn toUTF8Alloc(allocator: std.mem.Allocator, js: []const u16) ![]u8 { return try toUTF8AllocWithType(allocator, []const u16, js); } +pub fn toUTF8AllocZ(allocator: std.mem.Allocator, js: []const u16) ![:0]u8 { + var list = std.ArrayList(u8).init(allocator); + try toUTF8AppendToList(&list, js); + try list.append(0); + return list.items[0 .. list.items.len - 1 :0]; +} + pub inline fn appendUTF8MachineWordToUTF16MachineWord(output: *[@sizeOf(usize) / 2]u16, input: *const [@sizeOf(usize) / 2]u8) void { output[0 .. @sizeOf(usize) / 2].* = @as( [4]u16, @@ -1859,6 +1866,19 @@ pub fn toUTF8FromLatin1(allocator: std.mem.Allocator, latin1: []const u8) !?std. return try allocateLatin1IntoUTF8WithList(list, 0, []const u8, latin1); } +pub fn toUTF8FromLatin1Z(allocator: std.mem.Allocator, latin1: []const u8) !?std.ArrayList(u8) { + if (bun.JSC.is_bindgen) + unreachable; + + if (isAllASCII(latin1)) + return null; + + const list = try std.ArrayList(u8).initCapacity(allocator, latin1.len + 1); + var list1 = try allocateLatin1IntoUTF8WithList(list, 0, []const u8, latin1); + try list1.append(0); + return list1; +} + pub fn toUTF8ListWithTypeBun(list: *std.ArrayList(u8), comptime Type: type, utf16: Type) !std.ArrayList(u8) { var utf16_remaining = utf16; diff --git a/src/sys.zig b/src/sys.zig index 6d98986145..5f3e345e5c 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -67,6 +67,7 @@ pub const Tag = enum(u8) { dup, access, + connect, chmod, chown, clonefile, @@ -113,6 +114,7 @@ pub const Tag = enum(u8) { recv, send, sendfile, + sendmmsg, splice, rmdir, truncate, diff --git a/test/js/bun/udp/dgram.test.ts b/test/js/bun/udp/dgram.test.ts new file mode 100644 index 0000000000..44184bddcb --- /dev/null +++ b/test/js/bun/udp/dgram.test.ts @@ -0,0 +1,185 @@ +import { createSocket } from "dgram"; +import { describe, test, expect, it } from "bun:test"; + +import { nodeDataCases } from "./testdata"; + +describe("createSocket()", () => { + test("connect", done => { + const PORT = 12345; + const client = createSocket("udp4"); + client.on("close", done); + + client.connect(PORT, () => { + const remoteAddr = client.remoteAddress(); + expect(remoteAddr.port).toBe(PORT); + expect(() => client.connect(PORT)).toThrow(); + + client.disconnect(); + expect(() => client.disconnect()).toThrow(); + + expect(() => client.remoteAddress()).toThrow(); + + client.once("connect", () => client.close()); + client.connect(PORT); + }); + }); + + test("IPv4 address", done => { + const socket = createSocket("udp4"); + + socket.on("listening", () => { + const address = socket.address(); + + expect(address.address).toBe("127.0.0.1"); + expect(address.port).toBeNumber(); + expect(address.port).toBeFinite(); + expect(address.port).toBeGreaterThan(0); + expect(address.family).toBe("IPv4"); + socket.close(done); + }); + + socket.on("error", err => { + socket.close(done); + expect(err).toBeNull(); + }); + + socket.bind(0, "127.0.0.1"); + }); + + test("IPv6 address", done => { + const socket = createSocket("udp6"); + const localhost = "::1"; + + socket.on("listening", () => { + const address = socket.address(); + + expect(address.address).toBe(localhost); + expect(address.port).toBeNumber(); + expect(address.port).toBeFinite(); + expect(address.port).toBeGreaterThan(0); + expect(address.family).toBe("IPv6"); + socket.close(done); + }); + + socket.on("error", err => { + socket.close(done); + expect(err).toBeNull(); + }); + + socket.bind(0, localhost); + }); + + const validateRecv = (server, data, rinfo, bytes) => { + expect(data).toHaveLength(bytes.length); + expect(data).toStrictEqual(Buffer.from(bytes)); + expect(rinfo.port).toBeInteger(); + expect(rinfo.port).toBeWithin(1, 65535 + 1); + expect(rinfo.address).toBeString(); + expect(rinfo.address).not.toBeEmpty(); + expect(rinfo.port).not.toBe(server.address().port); + }; + + for (const { label, data, bytes } of nodeDataCases) { + test(`send ${label}`, done => { + const client = createSocket("udp4"); + const closed = { closed: false }; + client.on("close", () => { + closed.closed = true; + }); + const server = createSocket("udp4"); + client.on("error", err => { + expect(err).toBeNull(); + }); + server.on("error", err => { + expect(err).toBeNull(); + }); + server.on("message", (data, rinfo) => { + validateRecv(server, data, rinfo, bytes); + + server.close(); + client.close(); + done(); + }); + function sendRec() { + if (!closed.closed) { + client.send(data, server.address().port, "127.0.0.1", () => { + setTimeout(sendRec, 100); + }); + } + } + server.on("listening", () => { + sendRec(); + }); + server.bind(); + }); + + test(`send connected ${label}`, done => { + const client = createSocket("udp4"); + const closed = { closed: false }; + client.on("close", () => { + closed.closed = true; + }); + const server = createSocket("udp4"); + client.on("error", err => { + expect(err).toBeNull(); + }); + server.on("error", err => { + expect(err).toBeNull(); + }); + server.on("message", (data, rinfo) => { + validateRecv(server, data, rinfo, bytes); + + server.close(); + client.close(); + done(); + }); + function sendRec() { + if (!closed.closed) { + client.send(data, () => { + setTimeout(sendRec, 100); + }); + } + } + server.on("listening", () => { + const addr = server.address(); + client.connect(addr.port, "127.0.0.1", () => { + sendRec(); + }); + }); + server.bind(); + }); + + test(`send array ${label}`, done => { + const client = createSocket("udp4"); + const closed = { closed: false }; + client.on("close", () => { + closed.closed = true; + }); + const server = createSocket("udp4"); + client.on("error", err => { + expect(err).toBeNull(); + }); + server.on("error", err => { + expect(err).toBeNull(); + }); + server.on("message", (data, rinfo) => { + validateRecv(server, data, rinfo, [bytes, bytes, bytes].flat()); + + server.close(); + client.close(); + done(); + }); + function sendRec() { + if (!closed.closed) { + client.send([data, data, data], server.address().port, "127.0.0.1", () => { + setTimeout(sendRec, 100); + }); + } + } + server.on("listening", () => { + sendRec(); + }); + server.bind(); + }); + } +}); diff --git a/test/js/bun/udp/testdata.ts b/test/js/bun/udp/testdata.ts new file mode 100644 index 0000000000..157867feec --- /dev/null +++ b/test/js/bun/udp/testdata.ts @@ -0,0 +1,79 @@ +export const nodeDataTypes = [ + { + binaryType: "buffer", + type: Buffer, + }, + { + binaryType: "uint8array", + type: Uint8Array, + }, +]; + +export const dataTypes = [ + ...nodeDataTypes, + { + binaryType: undefined, + type: Buffer, + }, + { + binaryType: "arraybuffer", + type: ArrayBuffer, + }, +]; + +export const nodeDataCases = [ + { + label: "string (ascii)", + data: "ascii", + bytes: [0x61, 0x73, 0x63, 0x69, 0x69], + }, + { + label: "string (latin1)", + data: "latin1-©", + bytes: [0x6c, 0x61, 0x74, 0x69, 0x6e, 0x31, 0x2d, 0xc2, 0xa9], + }, + { + label: "string (utf-8)", + data: "utf8-😶", + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x98, 0xb6], + }, + { + label: "string (empty)", + data: "", + bytes: [], + }, + { + label: "Uint8Array (utf-8)", + data: new TextEncoder().encode("utf8-🙂"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x82], + }, + { + label: "Uint8Array (empty)", + data: new Uint8Array(), + bytes: [], + }, + { + label: "Buffer (utf-8)", + data: Buffer.from("utf8-🤩"), + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0xa4, 0xa9], + }, + { + label: "Buffer (empty)", + data: Buffer.from([]), + bytes: [], + }, +]; + +export const dataCases = [ + ...nodeDataCases, + { + label: "ArrayBuffer (utf-8)", + data: new TextEncoder().encode("utf8-🙃").buffer, + bytes: [0x75, 0x74, 0x66, 0x38, 0x2d, 0xf0, 0x9f, 0x99, 0x83], + }, + { + label: "ArrayBuffer (empty)", + data: new ArrayBuffer(0), + bytes: [], + }, +]; diff --git a/test/js/bun/udp/udp_socket.test.ts b/test/js/bun/udp/udp_socket.test.ts new file mode 100644 index 0000000000..8775fdc25c --- /dev/null +++ b/test/js/bun/udp/udp_socket.test.ts @@ -0,0 +1,215 @@ +import { udpSocket } from "bun"; +import { describe, test, expect, it } from "bun:test"; +import { randomPort } from "harness"; +import { dataTypes, dataCases } from "./testdata"; + +describe("udpSocket()", () => { + test("can create a socket", async () => { + const socket = await udpSocket({}); + expect(socket).toBeInstanceOf(Object); + expect(socket.port).toBeInteger(); + expect(socket.port).toBeWithin(1, 65535 + 1); + expect(socket.port).toBe(socket.port); // test that property is cached + expect(socket.hostname).toBeString(); + expect(socket.hostname).toBe(socket.hostname); // test that property is cached + expect(socket.address).toEqual({ + address: socket.hostname, + family: socket.hostname === "::" ? "IPv6" : "IPv4", + port: socket.port, + }); + expect(socket.address).toBe(socket.address); // test that property is cached + expect(socket.binaryType).toBe("buffer"); + expect(socket.binaryType).toBe(socket.binaryType); // test that property is cached + expect(socket.ref).toBeFunction(); + expect(socket.unref).toBeFunction(); + expect(socket.send).toBeFunction(); + expect(socket.close).toBeFunction(); + socket.close(); + }); + + test("can create a socket with given port", async () => { + const port = randomPort(); + const socket = await udpSocket({ port }); + expect(socket.port).toBe(port); + expect(socket.address).toMatchObject({ port: socket.port }); + socket.close(); + }); + + test("can create a socket with a random port", async () => { + const socket = await udpSocket({ port: 0 }); + expect(socket.port).toBeInteger(); + expect(socket.port).toBeWithin(1, 65535 + 1); + expect(socket.address).toMatchObject({ port: socket.port }); + socket.close(); + }); + + describe.each([{ hostname: "localhost" }, { hostname: "127.0.0.1" }, { hostname: "::1" }])( + "can create a socket with given hostname", + ({ hostname }) => { + test(hostname, async () => { + const socket = await udpSocket({ hostname }); + expect(socket.hostname).toBe(hostname); + expect(socket.port).toBeInteger(); + expect(socket.port).toBeWithin(1, 65535 + 1); + expect(socket.address).toMatchObject({ port: socket.port }); + socket.close(); + }); + }, + ); + + const validateRecv = (socket, data, port, address, binaryType, bytes) => { + expect(socket).toBeInstanceOf(Object); + expect(socket.binaryType).toBe(binaryType || "buffer"); + expect(data).toHaveLength(bytes.length); + if (data instanceof ArrayBuffer) { + expect(new Uint8Array(data)).toStrictEqual(new Uint8Array(bytes)); + } else { + expect(Buffer.from(data)).toStrictEqual(Buffer.from(bytes)); + } + expect(port).toBeInteger(); + expect(port).toBeWithin(1, 65535 + 1); + expect(port).not.toBe(socket.port); + expect(address).toBeString(); + expect(address).not.toBeEmpty(); + }; + + const validateSend = res => { + expect(res).toBeBoolean(); + }; + + const validateSendMany = (res, count) => { + expect(res).toBeNumber(); + expect(res).toBeGreaterThanOrEqual(0); + expect(res).toBeLessThanOrEqual(count); + }; + + for (const { binaryType, type } of dataTypes) { + for (const { label, data, bytes } of dataCases) { + test(`send ${label} (${binaryType || "undefined"})`, async done => { + const client = await udpSocket({}); + const server = await udpSocket({ + binaryType: binaryType, + socket: { + data(socket, data, port, address) { + validateRecv(socket, data, port, address, binaryType, bytes); + + server.close(); + client.close(); + done(); + }, + }, + }); + + // handle unreliable transmission in UDP + function sendRec() { + if (!client.closed) { + validateSend(client.send(data, server.port, "127.0.0.1")); + setTimeout(sendRec, 10); + } + } + sendRec(); + }); + + test(`send connected ${label} (${binaryType || "undefined"})`, async done => { + let client; + const server = await udpSocket({ + binaryType: binaryType, + socket: { + data(socket, data, port, address) { + validateRecv(socket, data, port, address, binaryType, bytes); + + server.close(); + client.close(); + done(); + }, + }, + }); + client = await udpSocket({ + connect: { + port: server.port, + hostname: "127.0.0.1", + }, + }); + + // handle unreliable transmission in UDP + function sendRec() { + if (!client.closed) { + validateSend(client.send(data)); + setTimeout(sendRec, 10); + } + } + sendRec(); + }); + + test(`sendMany ${label} (${binaryType || "undefined"})`, async done => { + const client = await udpSocket({}); + let count = 0; + const server = await udpSocket({ + binaryType: binaryType, + socket: { + data(socket, data, port, address) { + validateRecv(socket, data, port, address, binaryType, bytes); + + count += 1; + if (count === 100) { + server.close(); + client.close(); + done(); + } + }, + }, + }); + + const payload = Array(100).fill([data, server.port, "127.0.0.1"]).flat(); + + // handle unreliable transmission in UDP + function sendRec() { + if (!client.closed) { + validateSendMany(client.sendMany(payload), 100); + setTimeout(sendRec, 10); + } + } + sendRec(); + }); + + test(`sendMany connected ${label} (${binaryType || "undefined"})`, async done => { + // const client = await udpSocket({}); + let client; + let count = 0; + const server = await udpSocket({ + binaryType: binaryType, + socket: { + data(socket, data, port, address) { + validateRecv(socket, data, port, address, binaryType, bytes); + + count += 1; + if (count === 100) { + server.close(); + client.close(); + done(); + } + }, + }, + }); + + client = await udpSocket({ + connect: { + port: server.port, + hostname: "127.0.0.1", + }, + }); + + const payload = Array(100).fill(data); + + // handle unreliable transmission in UDP + function sendRec() { + if (!client.closed) { + validateSendMany(client.sendMany(payload), 100); + setTimeout(sendRec, 10); + } + } + sendRec(); + }); + } + } +});