diff --git a/src/bun_dev_http_server.zig b/src/bun_dev_http_server.zig index 37eca79204..0732c460cf 100644 --- a/src/bun_dev_http_server.zig +++ b/src/bun_dev_http_server.zig @@ -3644,6 +3644,7 @@ pub const Server = struct { var req = picohttp.Request.parse(req_buf_node.data[0..read_size], &req_headers_buf) catch |err| { _ = conn.client.write(RequestContext.printStatusLine(400) ++ "\r\n\r\n", SOCKET_FLAGS) catch {}; _ = Syscall.close(conn.client.socket.fd); + Output.printErrorln("ERR: {s}", .{@errorName(err)}); return; }; diff --git a/src/hive_array.zig b/src/hive_array.zig new file mode 100644 index 0000000000..60eb4f0265 --- /dev/null +++ b/src/hive_array.zig @@ -0,0 +1,94 @@ +const std = @import("std"); +const assert = std.debug.assert; +const mem = std.mem; +const testing = std.testing; + +/// An array that efficiently tracks which elements are in use. +/// The pointers are intended to be stable +/// Sorta related to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p0447r15.html +pub fn HiveArray(comptime T: type, comptime capacity: u16) type { + return struct { + const Self = @This(); + buffer: [capacity]T = undefined, + available: std.bit_set.IntegerBitSet(capacity) = std.bit_set.IntegerBitSet(capacity).initFull(), + pub const size = capacity; + + pub fn init() Self { + return .{}; + } + + pub fn get(self: *Self) ?*T { + const index = self.available.findFirstSet() orelse return null; + self.available.unset(index); + return &self.buffer[index]; + } + + pub fn indexOf(self: *const Self, value: *const T) ?u63 { + const start = &self.buffer; + const end = @ptrCast([*]const T, start) + capacity; + if (!(@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end))) + return null; + + // aligned to the size of T + const index = (@ptrToInt(value) - @ptrToInt(start)) / @sizeOf(T); + assert(index < capacity); + return @truncate(u63, index); + } + + pub fn in(self: *const Self, value: *const T) bool { + const start = &self.buffer; + const end = @ptrCast([*]const T, start) + capacity; + return (@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end)); + } + + pub fn put(self: *Self, value: *T) bool { + const index = self.indexOf(value) orelse return false; + + assert(!self.available.isSet(index)); + assert(&self.buffer[index] == value); + + value.* = undefined; + + self.available.set(index); + return true; + } + }; +} + +test "HiveArray" { + const size = 64; + + // Choose an integer with a weird alignment + const Int = u127; + + var a = HiveArray(Int, size).init(); + + { + var b = a.get().?; + try testing.expect(a.get().? != b); + try testing.expectEqual(a.indexOf(b), 0); + try testing.expect(a.put(b)); + try testing.expect(a.get().? == b); + var c = a.get().?; + c.* = 123; + var d: Int = 12345; + try testing.expect(a.put(&d) == false); + try testing.expect(a.in(&d) == false); + } + + a.available = @TypeOf(a.available).initFull(); + { + var i: u63 = 0; + while (i < size) { + var b = a.get().?; + try testing.expectEqual(a.indexOf(b), i); + try testing.expect(a.put(b)); + try testing.expect(a.get().? == b); + i = i + 1; + } + i = 0; + while (i < size) : (i += 1) { + try testing.expect(a.get() == null); + } + } +} diff --git a/src/http_server.zig b/src/http_server.zig new file mode 100644 index 0000000000..90eaf0f2b6 --- /dev/null +++ b/src/http_server.zig @@ -0,0 +1,207 @@ +const std = @import("std"); +const bun = @import("global.zig"); +const string = bun.string; +const Output = bun.Output; +const Global = bun.Global; +const Environment = bun.Environment; +const strings = bun.strings; +const MutableString = bun.MutableString; +const FeatureFlags = bun.FeatureFlags; +const picohttp = @import("picohttp"); +const Header = picohttp.Header; +const IncomingRequest = picohttp.Request; +const StaticResponse = picohttp.Response; +pub const Headers = picohttp.Headers; +pub const MimeType = @import("./http/mime_type.zig"); +const Syscall = @import("./bun.js/node/syscall.zig"); +const HiveArray = @import("./hive_array.zig").HiveArray; +const JSC = @import("./jsc.zig"); +const ObjectPool = @import("./pool.zig").ObjectPool; + +const log = Output.scoped(.HTTPServer, false); + +const ServerConfig = @import("./bun.js/api/server.zig").ServerConfig; +const AsyncIO = @import("io"); +pub const constants = struct { + pub const OPEN_SOCKET_FLAGS = std.os.SOCK.CLOEXEC; + pub const PADDING = 64; + pub const RECV_BUFFER_LENGTH = (1024 * 512) - (PADDING * 2); + pub const SOCKET_BACKLOG = 1024; +}; + +const FallbackBuffer = std.BoundedArray(u8, 16384); +const FallbackBufferPool = ObjectPool(FallbackBuffer, null, false, 256); + +const SocketList = HiveArray(Socket, constants.SOCKET_BACKLOG); + +const fd_t = JSC.Node.FileDescriptor; + +pub const Server = struct { + recv_buffer_bytes: [constants.RECV_BUFFER_LENGTH]u8 align(constants.PADDING) = undefined, + recv_buffer: []u8 = &.{}, + listener: fd_t, + accept_completion: AsyncIO.Completion = undefined, + accept_connections: bool = true, + sockets: SocketList = SocketList.init(), + + pub fn start(config: ServerConfig) !*Server { + const socket = try AsyncIO.openSocket(std.os.af.INET, constants.OPEN_SOCKET_FLAGS | std.os.SOCK.STREAM, std.os.IPPROTO.TCP); + errdefer std.os.close(socket); + var listener: std.x.net.tcp.Listener = .{ + .socket = .{ + .fd = socket, + }, + }; + listener.setFastOpen(true) catch {}; + listener.setReuseAddress(true) catch {}; + listener.setReusePort(true) catch {}; + listener.setKeepAlive(false) catch {}; + try listener.bind(std.x.net.ip.Address.initIPv4(std.x.os.IPv4.unspecified, config.port)); + var server = try bun.default_allocator.create(Server); + server.* = .{ + .listener = socket, + }; + server.recv_buffer = &server.recv_buffer_bytes; + server.enqueueAccept(); + return server; + } + + pub fn enqueueAccept(server: *Server) void { + AsyncIO.global.accept(*Server, server, onAccept, &server.accept_completion, server.listener); + } + + pub fn onAccept( + this: *Server, + _: *AsyncIO.Completion, + result_: AsyncIO.AcceptError!std.os.socket_t, + ) void { + const fd = result_ catch |err| { + log("onAccept error: {s}", .{@errorName(err)}); + return; + }; + + if (!this.accept_connections) { + log("onAccept closing fd: {d} because accept_connections is false", .{fd}); + std.os.close(fd); + return; + } + + var socket = this.sockets.get() orelse { + log("onAccept closing fd: {d} because no sockets available", .{fd}); + std.os.close(fd); + return; + }; + + socket.* = .{ + .fd = fd, + }; + + socket.enqueueRecv(); + this.enqueueAccept(); + } +}; + +const CompletionSwapper = struct { + first: AsyncIO.Completion = undefined, + second: AsyncIO.Completion = undefined, + which: u1 = 0, + + pub fn get(this: *CompletionSwapper) *AsyncIO.Completion { + if (this.which == 0) { + this.which = 1; + return &this.first; + } else { + this.which = 0; + return &this.second; + } + } +}; + +const request_header_fields_too_large = "431 Request Header Fields Too Large" ++ + "\r\n" ++ + "Connection: close" ++ + "\r\n" ++ + "Server: bun" ++ + "\r\n" ++ + "Content-Type: text/plain" ++ + "\r\n" ++ + "Content-Length: 0" ++ + "\r\n" ++ + "\r\n"; + +const bad_request = "400 Bad Request" ++ + "\r\n" ++ + "Connection: close" ++ + "\r\n" ++ + "Server: bun" ++ + "\r\n" ++ + "Content-Type: text/plain" ++ + "\r\n" ++ + "Content-Length: 0" ++ + "\r\n" ++ + "\r\n"; + +pub const Socket = struct { + fd: fd_t = 0, + read_slice: []u8 = &.{}, + recv_completion: CompletionSwapper = CompletionSwapper{}, + + pub fn enqueueRecv(this: *Socket) void { + this.setTimeout(); + AsyncIO.global.recv(*Socket, this, Socket.onRecv, this.recv_completion.get(), this.fd, this.getNextBuffer()); + } + + pub fn onRecv( + this: *Socket, + completion: *AsyncIO.Completion, + read_: AsyncIO.RecvError!usize, + ) void { + const read = read_ catch |err| { + log("onRecv error: {s}", .{@errorName(err)}); + this.close(); + return; + }; + + if (read == 0) { + log("onRecv disconnected socket", .{}); + this.close(); + return; + } + + this.consume(completion.operation.recv.buf[0..read]); + + var headers: [512]picohttp.Header = undefined; + var data = this.getData() catch |err| { + switch (err) { + error.TooBig => { + log("onRecv TooBig", .{}); + this.server().sendError(this.fd, request_header_fields_too_large); + this.reset(); + return; + }, + } + }; + const request = IncomingRequest.parse(data.slice(), &headers) catch |err| { + switch (err) { + error.BadRequest => { + log("onRecv bad request", .{}); + this.server().sendError(this.fd, bad_request); + this.reset(); + return; + }, + error.ShortRead => { + this.enqueueRecv(); + return; + }, + } + }; + log("onRecv request: {any}", .{request}); + this.cancelTimeout(); + this.server().dispatch(this.fd, request, data); + this.reset(); + } + + pub fn server(this: *Socket) *Server { + return @fieldParentPtr(Server, "sockets", @fieldParentPtr(SocketList, "data", this)); + } +}; diff --git a/src/io/io_darwin.zig b/src/io/io_darwin.zig index 7906dd6550..2498bae57d 100644 --- a/src/io/io_darwin.zig +++ b/src/io/io_darwin.zig @@ -1092,7 +1092,7 @@ pub fn accept( // darwin doesn't support os.MSG.NOSIGNAL, // but instead a socket option to avoid SIGPIPE. - Syscall.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) { + Syscall.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) { error.TimeoutTooBig => unreachable, error.PermissionDenied => error.NetworkSubsystemFailed, error.AlreadyConnected => error.NetworkSubsystemFailed,