Compare commits

...

10 Commits

Author SHA1 Message Date
Jarred Sumner
1d611e33b4 Update .gitignore 2022-09-01 01:08:06 -07:00
Jarred Sumner
ca9c87f9d7 Merge branch 'jarred/usockets-direction' into jarred/new-http 2022-08-31 22:38:05 -07:00
Jarred Sumner
a626a07ee8 wip 2022-08-31 22:37:27 -07:00
Jarred Sumner
3e37f5a8eb wip 2022-08-31 21:10:23 -07:00
Jarred Sumner
4528b9938f wip 2022-08-31 19:31:02 -07:00
Jarred Sumner
d1ae89f7c9 using usockets 2022-08-31 19:19:31 -07:00
Jarred Sumner
3752da9b49 wip not good yet 2022-08-31 04:12:27 -07:00
Jarred Sumner
34ca3c72f0 it sort of runs 2022-08-30 02:00:05 -07:00
Jarred Sumner
fb98c16866 wip 2022-08-29 03:25:29 -07:00
Jarred Sumner
aced91c3fc Rename http.zig to bun_dev_http_server.zig 2022-08-28 22:16:15 -07:00
27 changed files with 1456 additions and 152 deletions

2
.gitignore vendored
View File

@@ -104,3 +104,5 @@ src/runtime.version
*.database
*.db
misctools/machbench
misctools/toyhttpserver
misctools/toyhttpserver-lite

13
.vscode/launch.json generated vendored
View File

@@ -99,8 +99,17 @@
{
"type": "lldb",
"request": "launch",
"name": "fetch debug",
"program": "${workspaceFolder}/misctools/fetch",
"name": "toyhttpserver debug",
"program": "${workspaceFolder}/misctools/toyhttpserver",
"args": ["https://example.com", "--verbose", "3001"],
"cwd": "${workspaceFolder}",
"console": "internalConsole"
},
{
"type": "lldb",
"request": "launch",
"name": "toyhttpserver lite debug",
"program": "${workspaceFolder}/misctools/toyhttpserver-lite",
"args": ["https://example.com", "--verbose"],
"cwd": "${workspaceFolder}",
"console": "internalConsole"

View File

@@ -794,6 +794,28 @@ httpbench-release: $(IO_FILES)
$(CXX) $(PACKAGE_DIR)/httpbench.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/httpbench $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(MINIMUM_ARCHIVE_FILES)
rm -rf $(PACKAGE_DIR)/httpbench.o
.PHONY: toyhttpserver-debug
toyhttpserver-debug:
$(ZIG) build toyhttpserver-obj
$(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
.PHONY: toyhttpserver
toyhttpserver:
$(ZIG) build -Drelease-fast toyhttpserver-obj
$(CXX) $(PACKAGE_DIR)/toyhttpserver.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
rm -rf $(PACKAGE_DIR)/toyhttpserver.o
.PHONY: toyhttpserver-lite-debug
toyhttpserver-lite-debug:
$(ZIG) build toyhttpserver-lite-obj
$(CXX) $(DEBUG_PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(DEBUG_IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
.PHONY: toyhttpserver-lite
toyhttpserver-lite:
$(ZIG) build -Drelease-fast toyhttpserver-lite-obj
$(CXX) $(PACKAGE_DIR)/toyhttpserver-lite.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/toyhttpserver-lite $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES)
rm -rf $(PACKAGE_DIR)/toyhttpserver-lite.o
.PHONY: check-glibc-version-dependency
check-glibc-version-dependency:
@objdump -T $(RELEASE_BUN) | ((grep -qe "GLIBC_2.3[0-9]") && { echo "Glibc 2.3X detected, this will break the binary"; exit 1; }) || true

View File

@@ -535,6 +535,50 @@ pub fn build(b: *std.build.Builder) !void {
defer headers_step.dependOn(&headers_obj.step);
try configureObjectStep(b, headers_obj, target, obj.main_pkg_path.?);
var opts = b.addOptions();
opts.addOption(
bool,
"bindgen",
true,
);
opts.addOption(
bool,
"baseline",
is_baseline,
);
opts.addOption([:0]const u8, "sha", git_sha);
opts.addOption(bool, "is_canary", is_canary);
headers_obj.addOptions("build_options", opts);
}
{
const headers_step = b.step("toyhttpserver-obj", "Build ToyHTTP bench");
var headers_obj: *std.build.LibExeObjStep = b.addObject("toyhttpserver", "src/http_server.zig");
defer headers_step.dependOn(&headers_obj.step);
try configureObjectStep(b, headers_obj, target, obj.main_pkg_path.?);
var opts = b.addOptions();
opts.addOption(
bool,
"bindgen",
true,
);
opts.addOption(
bool,
"baseline",
is_baseline,
);
opts.addOption([:0]const u8, "sha", git_sha);
opts.addOption(bool, "is_canary", is_canary);
headers_obj.addOptions("build_options", opts);
}
{
const headers_step = b.step("toyhttpserver-lite-obj", "Build ToyHTTP Server Single-Threaded");
var headers_obj: *std.build.LibExeObjStep = b.addObject("toyhttpserver-lite", "src/http_server.zig");
defer headers_step.dependOn(&headers_obj.step);
try configureObjectStep(b, headers_obj, target, obj.main_pkg_path.?);
var opts = b.addOptions();
opts.addOption(
bool,
"bindgen",
@@ -546,6 +590,7 @@ pub fn build(b: *std.build.Builder) !void {
"baseline",
is_baseline,
);
opts.addOption(bool, "toy_single_threaded_http_server", true);
opts.addOption([:0]const u8, "sha", git_sha);
opts.addOption(bool, "is_canary", is_canary);
headers_obj.addOptions("build_options", opts);

View File

@@ -25,7 +25,7 @@ const js_printer = @import("../../js_printer.zig");
const js_parser = @import("../../js_parser.zig");
const js_ast = @import("../../js_ast.zig");
const hash_map = @import("../../hash_map.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const NodeFallbackModules = @import("../../node_fallbacks.zig");
const ImportKind = ast.ImportKind;
const Analytics = @import("../../analytics/analytics_thread.zig");

View File

@@ -25,7 +25,7 @@ const js_printer = @import("../../js_printer.zig");
const js_parser = @import("../../js_parser.zig");
const js_ast = @import("../../js_ast.zig");
const hash_map = @import("../../hash_map.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const NodeFallbackModules = @import("../../node_fallbacks.zig");
const ImportKind = ast.ImportKind;
const Analytics = @import("../../analytics/analytics_thread.zig");

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const FilesystemRouter = @import("../../router.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const JavaScript = @import("../javascript.zig");
const QueryStringMap = @import("../../url.zig").QueryStringMap;
const CombinedScanner = @import("../../url.zig").CombinedScanner;

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const FilesystemRouter = @import("../../router.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const JavaScript = @import("../javascript.zig");
const QueryStringMap = @import("../../url.zig").QueryStringMap;
const CombinedScanner = @import("../../url.zig").CombinedScanner;

View File

@@ -25,7 +25,7 @@ const js_printer = @import("../../js_printer.zig");
const js_parser = @import("../../js_parser.zig");
const js_ast = @import("../../js_ast.zig");
const hash_map = @import("../../hash_map.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const NodeFallbackModules = @import("../../node_fallbacks.zig");
const ImportKind = ast.ImportKind;
const Analytics = @import("../../analytics/analytics_thread.zig");
@@ -75,7 +75,7 @@ const URL = @import("../../url.zig").URL;
const Transpiler = @import("./transpiler.zig");
const VirtualMachine = @import("../javascript.zig").VirtualMachine;
const IOTask = JSC.IOTask;
const is_bindgen = JSC.is_bindgen;
const is_bindgen = @import("std").meta.globalOption("bindgen", bool) orelse false;
const uws = @import("uws");
const Fallback = Runtime.Fallback;
const MimeType = HTTP.MimeType;
@@ -106,8 +106,8 @@ pub const ServerConfig = struct {
max_request_body_size: usize = 1024 * 1024 * 128,
development: bool = false,
onError: JSC.JSValue = JSC.JSValue.zero,
onRequest: JSC.JSValue = JSC.JSValue.zero,
onError: if (is_bindgen) void else JSC.JSValue = if (is_bindgen) void{} else JSC.JSValue.zero,
onRequest: if (is_bindgen) void else JSC.JSValue = if (is_bindgen) void{} else JSC.JSValue.zero,
pub const SSLConfig = struct {
server_name: [*c]const u8 = null,

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const FilesystemRouter = @import("../../router.zig");
const http = @import("../../http.zig");
const http = @import("../../bun_dev_http_server.zig");
const JavaScript = @import("../javascript.zig");
const QueryStringMap = @import("../../url.zig").QueryStringMap;
const CombinedScanner = @import("../../url.zig").CombinedScanner;

View File

@@ -20,7 +20,7 @@ const options = @import("../options.zig");
const Bundler = @import("../bundler.zig").ServeBundler;
const js_printer = @import("../js_printer.zig");
const hash_map = @import("../hash_map.zig");
const http = @import("../http.zig");
const http = @import("../bun_dev_http_server.zig");
pub const DefaultBunDefines = struct {
pub const Keys = struct {

View File

@@ -35,7 +35,7 @@ const js_printer = @import("../js_printer.zig");
const js_parser = @import("../js_parser.zig");
const js_ast = @import("../js_ast.zig");
const hash_map = @import("../hash_map.zig");
const http = @import("../http.zig");
const http = @import("../bun_dev_http_server.zig");
const NodeFallbackModules = @import("../node_fallbacks.zig");
const ImportKind = ast.ImportKind;
const Analytics = @import("../analytics/analytics_thread.zig");

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const RequestContext = @import("../../http.zig").RequestContext;
const MimeType = @import("../../http.zig").MimeType;
const RequestContext = @import("../../bun_dev_http_server.zig").RequestContext;
const MimeType = @import("../../bun_dev_http_server.zig").MimeType;
const ZigURL = @import("../../url.zig").URL;
const HTTPClient = @import("http");
const NetworkThread = HTTPClient.NetworkThread;

View File

@@ -1,7 +1,7 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const RequestContext = @import("../../http.zig").RequestContext;
const MimeType = @import("../../http.zig").MimeType;
const RequestContext = @import("../../bun_dev_http_server.zig").RequestContext;
const MimeType = @import("../../bun_dev_http_server.zig").MimeType;
const ZigURL = @import("../../url.zig").URL;
const HTTPClient = @import("http");
const NetworkThread = HTTPClient.NetworkThread;

View File

@@ -1,8 +1,8 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const bun = @import("../../global.zig");
const RequestContext = @import("../../http.zig").RequestContext;
const MimeType = @import("../../http.zig").MimeType;
const RequestContext = @import("../../bun_dev_http_server.zig").RequestContext;
const MimeType = @import("../../bun_dev_http_server.zig").MimeType;
const ZigURL = @import("../../url.zig").URL;
const HTTPClient = @import("http");
const NetworkThread = HTTPClient.NetworkThread;

View File

@@ -1,8 +1,8 @@
const std = @import("std");
const Api = @import("../../api/schema.zig").Api;
const bun = @import("../../global.zig");
const RequestContext = @import("../../http.zig").RequestContext;
const MimeType = @import("../../http.zig").MimeType;
const RequestContext = @import("../../bun_dev_http_server.zig").RequestContext;
const MimeType = @import("../../bun_dev_http_server.zig").MimeType;
const ZigURL = @import("../../url.zig").URL;
const HTTPClient = @import("http");
const NetworkThread = HTTPClient.NetworkThread;

View File

@@ -1,4 +1,7 @@
// const c = @import("./c.zig");
// This is the HTTP server used by "bun dev"
// It is not suitable for benchmarking.
// It is not designed to be fast.
const std = @import("std");
const bun = @import("global.zig");
const string = bun.string;
@@ -3641,6 +3644,7 @@ pub const Server = struct {
var req = picohttp.Request.parse(req_buf_node.data[0..read_size], &req_headers_buf) catch |err| {
_ = conn.client.write(RequestContext.printStatusLine(400) ++ "\r\n\r\n", SOCKET_FLAGS) catch {};
_ = Syscall.close(conn.client.socket.fd);
Output.printErrorln("ERR: {s}", .{@errorName(err)});
return;
};

View File

@@ -1,4 +1,4 @@
const Server = @import("../http.zig").Server;
const Server = @import("../bun_dev_http_server.zig").Server;
const Command = @import("../cli.zig").Command;
const Global = @import("../global.zig").Global;
pub const DevCommand = struct {

View File

@@ -13,6 +13,18 @@ pub const Header = struct {
name: []const u8,
value: []const u8,
pub fn count(this: *const Header, builder: *StringBuilder) void {
builder.count(this.name);
builder.count(this.value);
}
pub fn clone(this: *const Header, builder: *StringBuilder) Header {
return .{
.name = builder.append(this.name),
.value = builder.append(this.value),
};
}
pub fn isMultiline(self: Header) bool {
return @ptrToInt(self.name.ptr) == 0;
}
@@ -39,11 +51,37 @@ pub const Header = struct {
}
};
const StringBuilder = @import("../string_builder.zig");
pub const Request = struct {
method: []const u8,
path: []const u8,
minor_version: usize,
headers: []const Header,
bytes_read: u32 = 0,
pub fn count(this: Request, builder: *StringBuilder) void {
builder.count(this.method);
builder.count(this.path);
for (this.headers) |header| {
header.count(builder);
}
}
pub fn clone(this: *const Request, headers: []Header, builder: *StringBuilder) Request {
for (this.headers) |header, i| {
headers[i] = header.clone(builder);
}
return .{
.method = builder.append(this.method),
.path = builder.append(this.path),
.minor_version = this.minor_version,
.headers = headers,
.bytes_read = this.bytes_read,
};
}
pub fn format(self: Request, comptime _: []const u8, _: fmt.FormatOptions, writer: anytype) !void {
try fmt.format(writer, "{s} {s}\n", .{ self.method, self.path });
@@ -83,6 +121,7 @@ pub const Request = struct {
.path = path,
.minor_version = @intCast(usize, minor_version),
.headers = src[0..num_headers],
.bytes_read = @intCast(u32, @maximum(rc, 0)),
},
};
}

View File

@@ -17,6 +17,10 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
socket: *Socket,
const ThisSocket = @This();
pub fn handle(this: ThisSocket) ?*anyopaque {
return us_socket_get_native_handle(comptime ssl_int, this.socket);
}
pub fn isEstablished(this: ThisSocket) bool {
return us_socket_is_established(comptime ssl_int, this.socket) > 0;
}
@@ -36,7 +40,7 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
return us_socket_context(
comptime ssl_int,
this.socket,
);
).?;
}
pub fn flush(this: ThisSocket) void {
return us_socket_flush(
@@ -101,6 +105,56 @@ pub fn NewSocketHandler(comptime ssl: bool) type {
);
}
pub fn attach(fd: c_int, ctx: *us_socket_context_t) ?ThisSocket {
if (us_socket_attach(comptime ssl_int, fd, ctx)) |socket| {
return ThisSocket{ .socket = socket };
} else {
return null;
}
}
pub fn detach(this: ThisSocket) c_int {
const handle_ = this.handle().?;
var socket_ = this.socket;
if (us_socket_detach(comptime ssl_int, socket_) == null)
return -1;
return @intCast(c_int, @ptrToInt(handle_));
}
pub fn listen(
host: []const u8,
port: c_int,
socket_ctx: *us_socket_context_t,
comptime Context: type,
ctx: Context,
comptime socket_field_name: []const u8,
) ?*Context {
// var stack_fallback = std.heap.stackFallback(1024, bun.default_allocator);
// var allocator = stack_fallback.get();
// var host_ = allocator.dupeZ(u8, host) catch return null;
// defer allocator.free(host_);
_ = host;
var socket = us_socket_context_listen(
comptime ssl_int,
socket_ctx,
null,
port,
0,
@sizeOf(Context),
) orelse return null;
const socket_ = ThisSocket{ .socket = @ptrCast(*Socket, socket) };
var holder = socket_.ext(Context) orelse {
if (comptime bun.Environment.allow_assert) unreachable;
_ = us_socket_close_connecting(comptime ssl_int, socket_.socket);
return null;
};
holder.* = ctx;
@field(holder.*, socket_field_name) = socket;
return holder;
}
pub fn connect(
host: []const u8,
port: c_int,
@@ -316,7 +370,7 @@ extern fn us_socket_context_add_server_name(ssl: c_int, context: ?*us_socket_con
extern fn us_socket_context_remove_server_name(ssl: c_int, context: ?*us_socket_context_t, hostname_pattern: [*c]const u8) void;
extern fn us_socket_context_on_server_name(ssl: c_int, context: ?*us_socket_context_t, cb: ?fn (?*us_socket_context_t, [*c]const u8) callconv(.C) void) void;
extern fn us_socket_context_get_native_handle(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
pub extern fn us_create_socket_context(ssl: c_int, loop: ?*Loop, ext_size: c_int, options: us_socket_context_options_t) ?*us_socket_context_t;
extern fn us_socket_context_free(ssl: c_int, context: ?*us_socket_context_t) void;
extern fn us_socket_context_on_open(ssl: c_int, context: ?*us_socket_context_t, on_open: fn (*Socket, c_int, [*c]u8, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_close(ssl: c_int, context: ?*us_socket_context_t, on_close: fn (*Socket, c_int, ?*anyopaque) callconv(.C) ?*Socket) void;
@@ -325,7 +379,7 @@ extern fn us_socket_context_on_writable(ssl: c_int, context: ?*us_socket_context
extern fn us_socket_context_on_timeout(ssl: c_int, context: ?*us_socket_context_t, on_timeout: fn (*Socket) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_connect_error(ssl: c_int, context: ?*us_socket_context_t, on_connect_error: fn (*Socket, c_int) callconv(.C) ?*Socket) void;
extern fn us_socket_context_on_end(ssl: c_int, context: ?*us_socket_context_t, on_end: fn (*Socket) callconv(.C) ?*Socket) void;
extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
pub extern fn us_socket_context_ext(ssl: c_int, context: ?*us_socket_context_t) ?*anyopaque;
extern fn us_socket_context_listen(ssl: c_int, context: ?*us_socket_context_t, host: [*c]const u8, port: c_int, options: c_int, socket_ext_size: c_int) ?*listen_socket_t;
@@ -391,8 +445,8 @@ pub const Poll = opaque {
pub const write_flag = if (Environment.isLinux) std.os.linux.EPOLL.OUT else 2;
};
pub fn deinit(self: *Poll) void {
us_poll_free(self);
pub fn deinit(self: *Poll, loop: *Loop) void {
us_poll_free(self, loop);
}
// (void* userData, int fd, int events, int error, struct us_poll_t *poll)
@@ -503,7 +557,11 @@ pub const Request = opaque {
extern fn uws_req_get_parameter(res: *Request, index: c_ushort, dest: *[*]const u8) usize;
};
const listen_socket_t = opaque {};
pub const listen_socket_t = opaque {
pub fn close(this: *listen_socket_t, comptime ssl: bool) void {
us_listen_socket_close(comptime @as(c_int, @boolToInt(ssl)), this);
}
};
extern fn us_listen_socket_close(ssl: c_int, ls: *listen_socket_t) void;
pub fn NewApp(comptime ssl: bool) type {
@@ -1148,3 +1206,5 @@ pub const uws_app_listen_config_t = extern struct {
};
extern fn us_socket_mark_needs_more_not_ssl(socket: ?*uws_res) void;
extern fn us_socket_detach(ssl: c_int, socket: *Socket) ?*Socket;
extern fn us_socket_attach(ssl: c_int, client_fd: c_int, ctx: *us_socket_context_t) ?*Socket;

94
src/hive_array.zig Normal file
View File

@@ -0,0 +1,94 @@
const std = @import("std");
const assert = std.debug.assert;
const mem = std.mem;
const testing = std.testing;
/// An array that efficiently tracks which elements are in use.
/// The pointers are intended to be stable
/// Sorta related to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p0447r15.html
pub fn HiveArray(comptime T: type, comptime capacity: u16) type {
return struct {
const Self = @This();
buffer: [capacity]T = undefined,
available: std.bit_set.IntegerBitSet(capacity) = std.bit_set.IntegerBitSet(capacity).initFull(),
pub const size = capacity;
pub fn init() Self {
return .{};
}
pub fn get(self: *Self) ?*T {
const index = self.available.findFirstSet() orelse return null;
self.available.unset(index);
return &self.buffer[index];
}
pub fn indexOf(self: *const Self, value: *const T) ?u63 {
const start = &self.buffer;
const end = @ptrCast([*]const T, start) + capacity;
if (!(@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end)))
return null;
// aligned to the size of T
const index = (@ptrToInt(value) - @ptrToInt(start)) / @sizeOf(T);
assert(index < capacity);
return @truncate(u63, index);
}
pub fn in(self: *const Self, value: *const T) bool {
const start = &self.buffer;
const end = @ptrCast([*]const T, start) + capacity;
return (@ptrToInt(value) >= @ptrToInt(start) and @ptrToInt(value) < @ptrToInt(end));
}
pub fn put(self: *Self, value: *T) bool {
const index = self.indexOf(value) orelse return false;
assert(!self.available.isSet(index));
assert(&self.buffer[index] == value);
value.* = undefined;
self.available.set(index);
return true;
}
};
}
test "HiveArray" {
const size = 64;
// Choose an integer with a weird alignment
const Int = u127;
var a = HiveArray(Int, size).init();
{
var b = a.get().?;
try testing.expect(a.get().? != b);
try testing.expectEqual(a.indexOf(b), 0);
try testing.expect(a.put(b));
try testing.expect(a.get().? == b);
var c = a.get().?;
c.* = 123;
var d: Int = 12345;
try testing.expect(a.put(&d) == false);
try testing.expect(a.in(&d) == false);
}
a.available = @TypeOf(a.available).initFull();
{
var i: u63 = 0;
while (i < size) {
var b = a.get().?;
try testing.expectEqual(a.indexOf(b), i);
try testing.expect(a.put(b));
try testing.expect(a.get().? == b);
i = i + 1;
}
i = 0;
while (i < size) : (i += 1) {
try testing.expect(a.get() == null);
}
}
}

623
src/http_server.zig Normal file
View File

@@ -0,0 +1,623 @@
const std = @import("std");
const bun = @import("global.zig");
const string = bun.string;
const Output = bun.Output;
const Global = bun.Global;
const Environment = bun.Environment;
const strings = bun.strings;
const MutableString = bun.MutableString;
const FeatureFlags = bun.FeatureFlags;
const picohttp = @import("picohttp");
const Header = picohttp.Header;
const HTTPRequest = picohttp.Request;
const StaticResponse = picohttp.Response;
pub const Headers = picohttp.Headers;
pub const MimeType = @import("./http/mime_type.zig");
const HiveArray = @import("./hive_array.zig").HiveArray;
const ObjectPool = @import("./pool.zig").ObjectPool;
const StringPointer = @import("./api/schema.zig").Api.StringPointer;
const StringBuilder = @import("./string_builder.zig");
const Lock = @import("./lock.zig").Lock;
const log = Output.scoped(.HTTPServer, false);
const uWS = @import("uws");
const adjustUlimit = @import("./fs.zig").FileSystem.RealFS.adjustUlimit;
const ServerConfig = struct {
port: u16 = 3001,
host: []const u8 = "0.0.0.0",
reuse_port: bool = true,
};
const AsyncIO = @import("io");
pub const constants = struct {
pub const OPEN_SOCKET_FLAGS = std.os.SOCK.CLOEXEC | std.os.SO.REUSEADDR | std.os.SO.REUSEPORT;
pub const SOCKET_BACKLOG = 1024;
};
const FallbackBufferPool = ObjectPool([16384]u8, null, false, 1024);
const IncomingRequest = struct {
http_request: HTTPRequest,
body_chunk: []const u8 = "",
fd: fd_t = 0,
bytes: []u8,
pub fn freeData(this: *IncomingRequest, allocator: std.mem.Allocator) void {
if (this.bytes.len > 0)
allocator.free(this.bytes);
this.bytes.len = 0;
this.bytes.ptr = undefined;
this.body_chunk = "";
if (this.http_request.headers.len > 0)
allocator.free(this.http_request.headers);
this.http_request.headers.len = 0;
this.http_request.headers.ptr = undefined;
}
pub fn create(allocator: std.mem.Allocator, request_recv: []const u8, fd: fd_t, request: HTTPRequest) !IncomingRequest {
var body_chunk = request_recv[@minimum(request.bytes_read, request_recv.len)..];
var string_builder = StringBuilder{};
request.count(&string_builder);
if (body_chunk.len > 0) string_builder.count(body_chunk);
try string_builder.allocate(allocator);
var headers = try allocator.alloc(Header, request.headers.len);
const new_request = request.clone(headers, &string_builder);
return IncomingRequest{
.http_request = new_request,
.body_chunk = if (body_chunk.len > 0) string_builder.append(body_chunk) else "",
.fd = fd,
.bytes = string_builder.ptr.?[0..string_builder.cap],
};
}
};
const fd_t = std.os.fd_t;
pub const RequestHandler = struct {
ctx: *anyopaque,
onRequest: fn (ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool,
pub fn New(comptime HandlerType: type, comptime Function: anytype) type {
return struct {
pub fn init(handler: *HandlerType) RequestHandler {
return RequestHandler{
.ctx = handler,
.onRequest = onRequest,
};
}
pub fn onRequest(ctx: *anyopaque, conn: *Connection, incoming: IncomingRequest) bool {
if (@typeInfo(@TypeOf(Function)).Fn.return_type.? == void) {
Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming);
return true;
}
return Function(@ptrCast(*HandlerType, @alignCast(@alignOf(HandlerType), ctx)), conn, incoming);
}
};
}
};
pub const Server = struct {
listener: *uWS.listen_socket_t,
ctx: *uWS.us_socket_context_t,
status: Status = Status.open,
handler: RequestHandler,
shutdown_requested: bool = false,
loop: *uWS.Loop = undefined,
const PendingSocketsList = std.fifo.LinearFifo(u32, .{ .Static = constants.SOCKET_BACKLOG });
pub fn quiet(this: *Server) void {
if (this.status != .open)
return;
this.status = .closing;
this.listener.close(false);
this.status = .closed;
}
pub const Status = enum {
open,
closing,
closed,
};
pub fn shutdown(this: *Server) void {
if (this.shutdown_requested)
return;
this.shutdown_requested = true;
log("shutdown");
this.quiet();
}
pub fn boot() void {}
pub fn start(config: ServerConfig, handler: RequestHandler) !*Server {
log("start port: {d}", .{config.port});
var server = try bun.default_allocator.create(Server);
var ctx = server.createContext() orelse return error.OutOfMemory;
uWS.SocketTCP.configure(
ctx,
Connection,
Connection.onOpen,
Connection.onClose,
Connection.onData,
Connection.onWritable,
Connection.onTimeout,
Connection.onConnectError,
Connection.onEnd,
);
server.* = .{
.listener = undefined,
.ctx = ctx,
.handler = handler,
.status = .open,
.loop = uWS.Loop.get().?,
};
if (uWS.SocketTCP.listen(config.host, config.port, ctx, *Server, server, "listener") == null) {
return error.ListenFailed;
}
server.* = .{
.listener = server.listener,
.ctx = ctx,
.handler = handler,
.status = .open,
.loop = uWS.Loop.get().?,
};
return server;
}
pub fn createContext(server: *Server) ?*uWS.us_socket_context_t {
var loop = uWS.Loop.get().?;
var ctx = uWS.us_create_socket_context(0, loop, @sizeOf(*Server), .{}) orelse return null;
var ptr = @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, ctx).?));
ptr.* = server;
return ctx;
}
pub fn dispatch(this: *Server, connection: *Connection, incoming_request: IncomingRequest) void {
if (this.handler.onRequest(this.handler.ctx, connection, incoming_request)) {
return;
}
_ = connection.socket.write(bad_request, false);
connection.socket.close(0, null);
}
};
const CompletionSwapper = struct {
first: AsyncIO.Completion = undefined,
second: AsyncIO.Completion = undefined,
which: u1 = 0,
pub fn get(this: *CompletionSwapper) *AsyncIO.Completion {
if (this.which == 0) {
this.which = 1;
this.first = undefined;
return &this.first;
} else {
this.which = 0;
this.second = undefined;
return &this.second;
}
}
};
const CRLF = [2]u8{ '\r', '\n' };
const request_header_fields_too_large = "431 Request Header Fields Too Large" ++
CRLF ++
"Connection: keep-alive" ++
CRLF ++
"Server: bun" ++
CRLF ++
"Content-Type: text/plain" ++
CRLF ++
"Content-Length: 0" ++
CRLF ++
CRLF;
const bad_request = "400 Bad Request" ++
CRLF ++
"Connection: keep-alive" ++
CRLF ++
"Server: bun" ++
CRLF ++
"Content-Type: text/plain" ++
CRLF ++
"Content-Length: 0" ++
CRLF ++
CRLF;
const hello_world = "HTTP/1.1 200 OK" ++
CRLF ++
"Connection: keep-alive" ++
CRLF ++
"Server: bun" ++
CRLF ++
"Content-Type: text/plain" ++
CRLF ++
"Content-Length: 13" ++
CRLF ++ CRLF ++
"Hello, world!";
pub const Connection = struct {
socket: uWS.SocketTCP,
incoming_request: IncomingRequest = undefined,
is_writable: bool = false,
has_received: bool = false,
has_incoming_request: bool = false,
pub fn onOpen(this: *Connection, socket: uWS.SocketTCP) void {
this.socket = socket;
socket.timeout(30);
this.is_writable = false;
log("Client connected", .{});
}
fn dispatch(this: *Connection, incoming_request: IncomingRequest) void {
this.has_received = false;
this.is_writable = false;
this.server().dispatch(this, incoming_request);
return;
}
pub fn onClose(this: *Connection, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void {
_ = this;
_ = socket;
log("Client disconnected", .{});
}
pub fn onWritable(this: *Connection, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
this.is_writable = true;
}
pub fn onData(this: *Connection, socket: uWS.SocketTCP, data: []const u8) void {
_ = this;
_ = socket;
_ = data;
socket.timeout(30);
var headers: [512]picohttp.Header = undefined;
const request = HTTPRequest.parse(data, &headers) catch |err| {
switch (err) {
error.BadRequest => {
log("onRecv bad request", .{});
this.socket.close(0, null);
return;
},
error.ShortRead => {
return;
},
}
};
const fd = @intCast(fd_t, @ptrToInt(socket.handle().?));
// if (this.has_incoming_request) {
// this.incoming_request.freeData(bun.default_allocator);
// }
this.has_received = true;
this.has_incoming_request = true;
this.dispatch(IncomingRequest.create(bun.default_allocator, data, fd, request) catch {
log("Dropping request due to OOM!", .{});
this.socket.close(0, null);
return;
});
}
pub fn onTimeout(this: *Connection, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
socket.close(0, null);
}
pub fn onConnectError(this: *Connection, socket: uWS.SocketTCP, code: c_int) void {
_ = this;
_ = socket;
_ = code;
}
pub fn onEnd(this: *Connection, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
socket.shutdown();
socket.close(0, null);
}
pub inline fn server(this: Connection) *Server {
return @ptrCast(**Server, @alignCast(@alignOf(*Server), uWS.us_socket_context_ext(0, this.socket.context())).?).*;
}
};
const NetworkThread = @import("./network_thread.zig");
pub const ToySingleThreadedHTTPServer = struct {
pub const Handler = RequestHandler.New(ToySingleThreadedHTTPServer, onRequest);
server: *Server,
pub fn onRequest(
this: *ToySingleThreadedHTTPServer,
connection: *Connection,
_: IncomingRequest,
) bool {
_ = this;
const wrote = connection.socket.write(hello_world, true);
if (wrote < hello_world.len) {
log("onRequest: write failed", .{});
connection.socket.close(0, null);
return false;
}
// incoming.freeData(bun.default_allocator);
return true;
}
pub fn startServer(toy: *ToySingleThreadedHTTPServer) void {
var toy_config = ServerConfig{
.port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001,
};
defer Output.prettyln("Server started on port {d}", .{toy_config.port});
defer Output.flush();
toy.server = Server.start(toy_config, RequestHandler.New(ToySingleThreadedHTTPServer, onRequest).init(toy)) catch unreachable;
toy.server.loop.run();
}
pub fn main() anyerror!void {
var http = try bun.default_allocator.create(ToySingleThreadedHTTPServer);
http.* = .{ .server = undefined };
var stdout_ = std.io.getStdOut();
var stderr_ = std.io.getStdErr();
var output_source = Output.Source.init(stdout_, stderr_);
Output.Source.set(&output_source);
_ = try adjustUlimit();
defer Output.flush();
startServer(http);
}
};
pub const ToyHTTPServer = struct {
pub const Handler = RequestHandler.New(*ToyHTTPServer, onRequest);
const Fifo = std.fifo.LinearFifo(IncomingRequest, .{ .Static = 4096 });
server: *Server,
first_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable,
second_list: std.BoundedArray(IncomingRequest, 8096) = std.BoundedArray(IncomingRequest, 8096).init(0) catch unreachable,
first_is_active: bool = true,
incoming_list: *std.BoundedArray(IncomingRequest, 8096) = undefined,
outgoing_list: std.atomic.Atomic(*std.BoundedArray(IncomingRequest, 8096)) = undefined,
active: Fifo,
lock: Lock = Lock.init(),
loop: *uWS.Loop,
has_scheduled: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(0),
ctx: *uWS.us_socket_context_t = undefined,
waker: AsyncIO.Waker = undefined,
// active_requests: HiveArray(WritableSocket, 1024) = HiveArray(WritableSocket, 1024).init(),
pub fn onRequest(
this: *ToyHTTPServer,
_: *Connection,
incoming: IncomingRequest,
) bool {
{
this.lock.lock();
defer this.lock.unlock();
this.outgoing_list.loadUnchecked().appendAssumeCapacity(incoming);
}
if (this.outgoing_list.loadUnchecked().len > 20)
this.waker.wake() catch unreachable;
return true;
}
pub fn drain(this: *ToyHTTPServer) void {
var ctx = this.ctx;
{
this.lock.lock();
defer {
this.lock.unlock();
}
if (this.first_is_active) {
this.first_is_active = false;
this.second_list.len = 0;
this.incoming_list = this.outgoing_list.value;
this.outgoing_list.value = &this.second_list;
} else {
this.first_is_active = true;
this.first_list.len = 0;
this.incoming_list = this.outgoing_list.value;
this.outgoing_list.value = &this.first_list;
}
}
const slice = this.incoming_list.slice();
for (slice) |incoming| {
const sent = AsyncIO.darwin.@"sendto"(incoming.fd, hello_world, hello_world.len, 0, null, 0);
if (sent < hello_world.len) {
var socket = uWS.SocketTCP.attach(incoming.fd, ctx) orelse continue;
_ = socket.write(hello_world, true);
}
}
this.incoming_list.len = 0;
}
// pub fn dispatch(this: *ToyHTTPServer, socket: *WritableSocket, _: IncomingRequest) void {
// this.server.takeAsync(socket.socket.detach());
// }
pub const WritableSocket = struct {
socket: uWS.SocketTCP,
incoming_request: IncomingRequest = undefined,
is_writable: bool = false,
has_received: bool = false,
has_incoming_request: bool = false,
pub fn onOpen(_: *WritableSocket, _: uWS.SocketTCP) void {
// this.socket = socket;
// socket.timeout(30);
// this.is_writable = false;
// log("Client connected", .{});
}
pub fn dispatch(this: *WritableSocket) void {
this.has_received = false;
this.is_writable = false;
// this.server().dispatch(this, this.incoming_request);
return;
}
pub fn onClose(this: *WritableSocket, socket: uWS.SocketTCP, _: c_int, _: ?*anyopaque) void {
_ = this;
_ = socket;
log("Client disconnected", .{});
}
pub fn onWritable(this: *WritableSocket, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
this.is_writable = true;
}
pub fn onData(this: *WritableSocket, socket: uWS.SocketTCP, data: []const u8) void {
_ = this;
_ = socket;
_ = data;
// socket.timeout(30);
// var headers: [512]picohttp.Header = undefined;
// const request = HTTPRequest.parse(data, &headers) catch |err| {
// switch (err) {
// error.BadRequest => {
// log("onRecv bad request", .{});
// this.socket.close(0, null);
// return;
// },
// error.ShortRead => {
// return;
// },
// }
// };
// const fd = @intCast(fd_t, @ptrToInt(socket.handle().?));
// if (this.has_incoming_request) {
// this.incoming_request.freeData(bun.default_allocator);
// }
// this.incoming_request = IncomingRequest.create(bun.default_allocator, data, fd, request) catch {
// log("Dropping request due to OOM!", .{});
// this.socket.close(0, null);
// return;
// };
// this.has_received = true;
// this.has_incoming_request = true;
// this.dispatch();
}
pub fn onTimeout(this: *WritableSocket, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
socket.close(0, null);
}
pub fn onConnectError(this: *WritableSocket, socket: uWS.SocketTCP, code: c_int) void {
_ = this;
_ = socket;
_ = code;
}
pub fn onEnd(this: *WritableSocket, socket: uWS.SocketTCP) void {
_ = this;
_ = socket;
socket.shutdown();
socket.close(0, null);
}
pub inline fn server(this: WritableSocket) *ToyHTTPServer {
return @ptrCast(**ToyHTTPServer, @alignCast(@alignOf(*ToyHTTPServer), uWS.us_socket_context_ext(0, this.socket.context())).?).*;
}
};
fn scheduleWakeup(this: *ToyHTTPServer) void {
if (this.has_scheduled.load(.Monotonic) == 0) return;
this.waker.wake() catch unreachable;
}
pub fn startServer(toy: *ToyHTTPServer) void {
Output.Source.configureNamedThread("ToyHTTPServer");
var toy_config = ServerConfig{
.port = std.fmt.parseInt(u16, std.os.getenv("PORT") orelse "3001", 10) catch 3001,
};
defer Output.prettyln("Server started on port {d}", .{toy_config.port});
defer Output.flush();
toy.server = Server.start(toy_config, RequestHandler.New(ToyHTTPServer, onRequest).init(toy)) catch unreachable;
_ = toy.server.loop.addPostHandler(*ToyHTTPServer, toy, scheduleWakeup);
toy.server.loop.run();
}
pub fn main() anyerror!void {
var http = try bun.default_allocator.create(ToyHTTPServer);
var stdout_ = std.io.getStdOut();
var stderr_ = std.io.getStdErr();
var output_source = Output.Source.init(stdout_, stderr_);
_ = try adjustUlimit();
Output.Source.set(&output_source);
defer Output.flush();
http.* = .{
.active = Fifo.init(),
.server = undefined,
.loop = uWS.Loop.get().?,
.waker = AsyncIO.Waker.init(bun.default_allocator) catch unreachable,
};
http.incoming_list = &http.first_list;
http.outgoing_list.value = &http.second_list;
http.ctx = uWS.us_create_socket_context(0, http.loop, 8, .{}).?;
uWS.SocketTCP.configure(
http.ctx,
WritableSocket,
WritableSocket.onOpen,
WritableSocket.onClose,
WritableSocket.onData,
WritableSocket.onWritable,
WritableSocket.onTimeout,
WritableSocket.onConnectError,
WritableSocket.onEnd,
);
@ptrCast(**anyopaque, @alignCast(@alignOf(*anyopaque), uWS.us_socket_context_ext(0, http.ctx).?)).* = @ptrCast(*anyopaque, &http);
_ = http.loop.addPostHandler(*ToyHTTPServer, http, drain);
var thread = std.Thread.spawn(.{}, startServer, .{http}) catch unreachable;
http.drain();
thread.detach();
while (true) {
_ = http.waker.wait() catch 0;
http.drain();
}
}
};
pub const main = if (@hasDecl(@import("build_options"), "toy_single_threaded_http_server"))
ToySingleThreadedHTTPServer.main
else
ToyHTTPServer.main;
test "ToyHTTPServer" {
try ToyHTTPServer.main();
}

View File

@@ -7,6 +7,15 @@
// errno
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
extern "C" int close$NOCANCEL(int fd);
extern "C" mach_port_t io_darwin_create_machport(uint64_t wakeup, int32_t fd,
void *wakeup_buffer_,
size_t nbytes) {
@@ -62,4 +71,104 @@ extern "C" bool io_darwin_schedule_wakeup(mach_port_t waker) {
return true;
}
#ifndef fd_t
#define fd_t int
#endif
static fd_t apple_no_sigpipe(fd_t fd) {
int no_sigpipe = 1;
setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &no_sigpipe, sizeof(int));
return fd;
}
static fd_t bsd_set_nonblocking(fd_t fd) {
#ifdef _WIN32
/* Libuv will set windows sockets as non-blocking */
#else
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
#endif
return fd;
}
static void bsd_socket_nodelay(fd_t fd, int enabled) {
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&enabled, sizeof(enabled));
}
static fd_t bsd_create_socket(int domain, int type, int protocol) {
// returns INVALID_SOCKET on error
int flags = 0;
#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK)
flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
#endif
fd_t created_fd = socket(domain, type | flags, protocol);
return bsd_set_nonblocking(apple_no_sigpipe(created_fd));
}
extern "C" int io_darwin_create_listen_socket(const char *host,
const char *port, bool reuse) {
struct addrinfo hints, *result;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(host, port, &hints, &result)) {
return -1;
}
fd_t listenFd = -1;
struct addrinfo *listenAddr;
for (struct addrinfo *a = result; a && listenFd == -1; a = a->ai_next) {
if (a->ai_family == AF_INET6) {
listenFd =
bsd_create_socket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenAddr = a;
}
}
for (struct addrinfo *a = result; a && listenFd == -1; a = a->ai_next) {
if (a->ai_family == AF_INET) {
listenFd =
bsd_create_socket(a->ai_family, a->ai_socktype, a->ai_protocol);
listenAddr = a;
}
}
if (listenFd == -1) {
freeaddrinfo(result);
return -1;
}
if (reuse) {
/* Otherwise, always enable SO_REUSEPORT and SO_REUSEADDR _unless_ options
* specify otherwise */
#if /*defined(__linux) &&*/ defined(SO_REUSEPORT)
int optval = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));
#endif
int enabled = 1;
setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, (int *)&enabled,
sizeof(enabled));
}
#ifdef IPV6_V6ONLY
int disabled = 0;
setsockopt(listenFd, IPPROTO_IPV6, IPV6_V6ONLY, (int *)&disabled,
sizeof(disabled));
#endif
if (bind(listenFd, listenAddr->ai_addr, (socklen_t)listenAddr->ai_addrlen) ||
listen(listenFd, 512)) {
close$NOCANCEL(listenFd);
freeaddrinfo(result);
return -1;
}
freeaddrinfo(result);
return listenFd;
}
#endif

View File

@@ -260,6 +260,53 @@ const mem = std.mem;
const assert = std.debug.assert;
const c = std.c;
pub const darwin = struct {
pub const SO_DEBUG = @as(c_int, 0x0001);
pub const SO_ACCEPTCONN = @as(c_int, 0x0002);
pub const SO_REUSEADDR = @as(c_int, 0x0004);
pub const SO_KEEPALIVE = @as(c_int, 0x0008);
pub const SO_DONTROUTE = @as(c_int, 0x0010);
pub const SO_BROADCAST = @as(c_int, 0x0020);
pub const SO_USELOOPBACK = @as(c_int, 0x0040);
pub const SO_LINGER = @as(c_int, 0x0080);
pub const SO_OOBINLINE = @as(c_int, 0x0100);
pub const SO_REUSEPORT = @as(c_int, 0x0200);
pub const SO_TIMESTAMP = @as(c_int, 0x0400);
pub const SO_TIMESTAMP_MONOTONIC = @as(c_int, 0x0800);
pub const SO_DONTTRUNC = @as(c_int, 0x2000);
pub const SO_WANTMORE = @as(c_int, 0x4000);
pub const SO_WANTOOBFLAG = @import("std").zig.c_translation.promoteIntLiteral(c_int, 0x8000, .hexadecimal);
pub const SO_SNDBUF = @as(c_int, 0x1001);
pub const SO_RCVBUF = @as(c_int, 0x1002);
pub const SO_SNDLOWAT = @as(c_int, 0x1003);
pub const SO_RCVLOWAT = @as(c_int, 0x1004);
pub const SO_SNDTIMEO = @as(c_int, 0x1005);
pub const SO_RCVTIMEO = @as(c_int, 0x1006);
pub const SO_ERROR = @as(c_int, 0x1007);
pub const SO_TYPE = @as(c_int, 0x1008);
pub const SO_LABEL = @as(c_int, 0x1010);
pub const SO_PEERLABEL = @as(c_int, 0x1011);
pub const SO_NREAD = @as(c_int, 0x1020);
pub const SO_NKE = @as(c_int, 0x1021);
pub const SO_NOSIGPIPE = @as(c_int, 0x1022);
pub const SO_NOADDRERR = @as(c_int, 0x1023);
pub const SO_NWRITE = @as(c_int, 0x1024);
pub const SO_REUSESHAREUID = @as(c_int, 0x1025);
pub const SO_NOTIFYCONFLICT = @as(c_int, 0x1026);
pub const SO_UPCALLCLOSEWAIT = @as(c_int, 0x1027);
pub const SO_LINGER_SEC = @as(c_int, 0x1080);
pub const SO_RANDOMPORT = @as(c_int, 0x1082);
pub const SO_NP_EXTENSIONS = @as(c_int, 0x1083);
pub const SO_NUMRCVPKT = @as(c_int, 0x1112);
pub const SO_NET_SERVICE_TYPE = @as(c_int, 0x1116);
pub const SO_NETSVC_MARKING_LEVEL = @as(c_int, 0x1119);
pub const TCP_NODELAY = 0x01;
pub const TCP_MAXSEG = 0x02;
pub const TCP_NOPUSH = 0x04;
pub const TCP_NOOPT = 0x08;
pub const TCP_KEEPALIVE = 0x10;
pub const TCP_CONNECTIONTIMEOUT = 0x20;
pub usingnamespace os.darwin;
pub extern "c" fn @"recvfrom$NOCANCEL"(sockfd: c.fd_t, noalias buf: *anyopaque, len: usize, flags: u32, noalias src_addr: ?*c.sockaddr, noalias addrlen: ?*c.socklen_t) isize;
pub extern "c" fn @"sendto$NOCANCEL"(sockfd: c.fd_t, buf: *const anyopaque, len: usize, flags: u32, dest_addr: ?*const c.sockaddr, addrlen: c.socklen_t) isize;
@@ -267,12 +314,37 @@ pub const darwin = struct {
pub extern "c" fn @"sendmsg$NOCANCEL"(sockfd: c.fd_t, msg: *const std.x.os.Socket.Message, flags: c_int) isize;
pub extern "c" fn @"recvmsg$NOCANCEL"(sockfd: c.fd_t, msg: *std.x.os.Socket.Message, flags: c_int) isize;
pub extern "c" fn @"connect$NOCANCEL"(sockfd: c.fd_t, sock_addr: *const c.sockaddr, addrlen: c.socklen_t) c_int;
pub extern "c" fn @"accept$NOCANCEL"(sockfd: c.fd_t, noalias addr: ?*c.sockaddr, noalias addrlen: ?*c.socklen_t) c_int;
pub extern "c" fn @"accept4$NOCANCEL"(sockfd: c.fd_t, noalias addr: ?*c.sockaddr, noalias addrlen: ?*c.socklen_t, flags: c_uint) c_int;
pub extern "c" fn @"open$NOCANCEL"(path: [*:0]const u8, oflag: c_uint, ...) c_int;
pub extern "c" fn @"read$NOCANCEL"(fd: c.fd_t, buf: [*]u8, nbyte: usize) isize;
pub extern "c" fn @"pread$NOCANCEL"(fd: c.fd_t, buf: [*]u8, nbyte: usize, offset: c.off_t) isize;
pub extern "c" fn @"recv$NOCANCEL"(sockfd: c.fd_t, arg1: ?*anyopaque, arg2: usize, arg3: c_int) isize;
pub extern "c" fn @"accept$NOCANCEL"(sockfd: c.fd_t, noalias addr: ?*c.sockaddr, noalias addrlen: ?*c.socklen_t) c_int;
pub fn @"kevent64$NOCANCEL"(
kq: c_int,
changelist: [*]const Kevent64,
nchanges: c_int,
eventlist: [*]Kevent64,
nevents: c_int,
flags: c_uint,
timeout_: ?*const os.timespec,
) c_int {
while (true) {
const ret = os.system.kevent64(kq, changelist, nchanges, eventlist, nevents, flags, timeout_);
if (ret == -1) {
const err = std.c.getErrno(ret);
if (err == .INTR) {
continue;
}
return -1;
}
return ret;
}
unreachable;
}
};
const kevent64 = darwin.@"kevent64$NOCANCEL";
pub const OpenError = error{
/// In WASI, this error may occur when the file descriptor does
/// not hold the required rights to open a new resource relative to it.
@@ -451,7 +523,7 @@ pub const Syscall = struct {
}
pub fn socket(domain: u32, socket_type: u32, protocol: u32) SocketError!socket_t {
const filtered_sock_type = socket_type & ~@as(u32, os.SOCK.NONBLOCK | os.SOCK.CLOEXEC);
const filtered_sock_type = socket_type & ~@as(u32, os.SOCK.NONBLOCK | os.SOCK.CLOEXEC | std.os.SO.REUSEADDR | std.os.SO.REUSEPORT);
const rc = darwin.socket(domain, filtered_sock_type, protocol);
switch (darwin.getErrno(rc)) {
.SUCCESS => {
@@ -478,26 +550,26 @@ const Time = @import("./time.zig").Time;
const IO = @This();
pub const Callback = struct {
ctx: *anyopaque,
callback: fn (*anyopaque) void,
};
time: Time = .{},
io_inflight: usize = 0,
timeouts: FIFO(Completion) = .{},
completed: FIFO(Completion) = .{},
io_pending: FIFO(Completion) = .{},
last_event_fd: std.atomic.Atomic(u32) = std.atomic.Atomic(u32).init(32),
pending_count: usize = 0,
waker: Waker = undefined,
pub fn hasNoWork(this: *IO) bool {
return this.pending_count == 0 and this.io_inflight == 0 and this.io_pending.peek() == null and this.completed.peek() == null and this.timeouts.peek() == null;
}
pub fn init(_: u12, _: u32, waker: Waker) !IO {
return IO{
.waker = waker,
};
}
pub const Waker = struct {
pub const MachPortWaker = struct {
kq: os.fd_t,
machport: *anyopaque = undefined,
machport_buf: []u8 = &.{},
@@ -513,7 +585,7 @@ pub const Waker = struct {
pub fn wait(this: Waker) !usize {
var events = zeroed;
const count = std.os.system.kevent64(
const count = kevent64(
this.kq,
&events,
0,
@@ -571,7 +643,7 @@ pub const UserFilterWaker = struct {
events[0].data = 0;
events[0].fflags = c.NOTE_TRIGGER;
events[0].udata = 0;
const errno = std.os.system.kevent64(
const errno = kevent64(
this.kq,
&events,
1,
@@ -596,7 +668,7 @@ pub const UserFilterWaker = struct {
events[0].data = 0;
events[0].udata = 0;
const errno = std.os.system.kevent64(
const errno = kevent64(
this.kq,
&events,
1,
@@ -623,7 +695,7 @@ pub const UserFilterWaker = struct {
events[0].data = 0;
events[0].udata = 0;
var timespec = default_timespec;
const errno = std.os.system.kevent64(
const errno = kevent64(
kq,
&events,
1,
@@ -642,6 +714,8 @@ pub const UserFilterWaker = struct {
}
};
pub const Waker = MachPortWaker;
pub fn deinit(self: *IO) void {
assert(self.waker.kq > -1);
os.close(self.waker.kq);
@@ -653,7 +727,7 @@ pub fn tick(self: *IO) !void {
return self.flush(.no_wait);
}
const Kevent64 = std.os.system.kevent64_s;
const Kevent64 = std.os.darwin.kevent64_s;
/// Pass all queued submissions to the kernel and run for `nanoseconds`.
/// The `nanoseconds` argument is a u63 to allow coercion to the i64 used
@@ -695,8 +769,12 @@ pub fn wait(self: *IO, context: anytype, comptime function: anytype) void {
}
fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void {
return flush_(self);
}
fn flush_(self: *IO) !void {
var io_pending = self.io_pending.peek();
var events: [2048]Kevent64 = undefined;
var events: [4096]Kevent64 = undefined;
// Check timeouts and fill events with completions in io_pending
// (they will be submitted through kevent).
@@ -704,20 +782,8 @@ fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void {
const next_timeout = self.flush_timeouts();
// Flush any timeouts
{
var completed = self.completed;
self.completed = .{};
if (completed.pop()) |first| {
(first.callback)(self, first);
while (completed.pop()) |completion|
(completion.callback)(self, completion);
return;
}
}
const change_events = self.flush_io(&events, &io_pending);
var change_events = self.flush_io(&events, &io_pending);
// Zero timeouts for kevent() implies a non-blocking poll
var ts = default_timespec;
@@ -727,49 +793,76 @@ fn flush(self: *IO, comptime _: @Type(.EnumLiteral)) !void {
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), timeout_ns % std.time.ns_per_s);
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), timeout_ns / std.time.ns_per_s);
}
while (true) {
const new_events_ = kevent64(
self.waker.kq,
&events,
@intCast(c_int, change_events),
&events,
@intCast(c_int, events.len),
0,
if (next_timeout != null) &ts else null,
);
const new_events_ = std.os.system.kevent64(
self.waker.kq,
&events,
@intCast(c_int, change_events),
&events,
@intCast(c_int, events.len),
0,
if (next_timeout != null) &ts else null,
);
if (new_events_ < 0) {
return std.debug.panic("kevent() failed {s}", .{@tagName(std.c.getErrno(new_events_))});
}
const new_events = @intCast(usize, new_events_);
if (new_events_ < 0) {
return std.debug.panic("kevent() failed {s}", .{@tagName(std.c.getErrno(new_events_))});
}
const new_events = @intCast(usize, new_events_);
// Mark the io events submitted only after kevent() successfully processed them
self.io_pending.out = io_pending;
if (io_pending == null) {
self.io_pending.in = null;
}
var new_io_inflight_events = new_events;
self.io_inflight += change_events;
for (events[0..new_events]) |kevent| {
if (kevent.filter == c.EVFILT_MACHPORT) {
new_io_inflight_events -= 1;
continue;
// Mark the io events submitted only after kevent() successfully processed them
self.io_pending.out = io_pending;
if (io_pending == null) {
self.io_pending.in = null;
}
const completion = @intToPtr(*Completion, kevent.udata);
completion.next = null;
self.completed.push(completion);
var new_io_inflight_events = new_events;
self.io_inflight += change_events;
for (events[0..new_events]) |kevent| {
if (kevent.filter == c.EVFILT_MACHPORT or kevent.filter == c.EVFILT_USER) {
new_io_inflight_events -= 1;
continue;
}
const completion = @intToPtr(*Completion, kevent.udata);
switch (completion.operation) {
.accept => |*accept| {
accept.backlog = @intCast(@TypeOf(accept.backlog), kevent.data);
},
.send => |*send| {
send.disconnected = kevent.fflags & c.EV_EOF != 0;
},
.recv => |*recv| {
recv.available = @intCast(u32, @truncate(i33, kevent.data));
},
else => {},
}
completion.next = null;
self.completed.push(completion);
}
// subtract machport events from io_inflight
self.io_inflight -= @minimum(change_events, new_io_inflight_events);
change_events = self.flush_io(&events, &io_pending);
if (change_events == 0 and new_events < events.len) {
break;
}
}
// subtract machport events from io_inflight
self.io_inflight -= new_io_inflight_events;
{
var completed = self.completed;
self.completed = .{};
if (completed.pop()) |first| {
var current = first.next;
(first.callback)(self, first);
var completed = self.completed;
self.completed = .{};
while (completed.pop()) |completion| {
(completion.callback)(self, completion);
while (current) |completion| {
var prev_next = completion.next;
(completion.callback)(self, completion);
current = prev_next;
}
}
}
}
@@ -780,8 +873,8 @@ fn flush_io(_: *IO, events: []Kevent64, io_pending_top: *?*Completion) usize {
const event_info = switch (completion.operation) {
.accept => |op| [3]c_int{
op.socket,
c.EVFILT_READ,
c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT,
c.EVFILT_READ | c.EV_CLEAR,
c.EV_ADD | c.EV_ENABLE,
},
.connect => |op| [3]c_int{
op.socket,
@@ -806,7 +899,7 @@ fn flush_io(_: *IO, events: []Kevent64, io_pending_top: *?*Completion) usize {
.send => |op| [3]c_int{
op.socket,
c.EVFILT_WRITE,
c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT,
c.EV_ADD | c.EV_ENABLE | c.EV_ONESHOT | c.EV_EOF,
},
.event => |op| [3]c_int{
op.fd,
@@ -871,6 +964,7 @@ pub const Completion = struct {
const Operation = union(enum) {
accept: struct {
socket: os.socket_t,
backlog: u32 = 0,
},
close: struct {
fd: os.fd_t,
@@ -894,12 +988,14 @@ const Operation = union(enum) {
socket: os.socket_t,
buf: [*]u8,
len: u32,
available: u32 = 0,
},
send: struct {
socket: os.socket_t,
buf: [*]const u8,
len: u32,
flags: u32 = 0,
disconnected: bool = false,
},
timeout: struct {
expires: u64,
@@ -935,7 +1031,7 @@ fn submit(
operation_data: anytype,
comptime OperationImpl: type,
) void {
submitWithIncrementPending(self, context, callback, completion, operation_tag, operation_data, OperationImpl, true);
submitWithIncrementPending(self, context, callback, completion, operation_tag, operation_data, OperationImpl);
}
fn submitWithIncrementPending(
@@ -946,10 +1042,7 @@ fn submitWithIncrementPending(
comptime operation_tag: std.meta.Tag(Operation),
operation_data: anytype,
comptime OperationImpl: type,
comptime increment_pending: bool,
) void {
if (comptime increment_pending)
self.pending_count += 1;
const Context = @TypeOf(context);
const onCompleteFn = struct {
fn onComplete(
@@ -976,9 +1069,6 @@ fn submitWithIncrementPending(
else => {},
}
if (comptime increment_pending)
io.pending_count -= 1;
// Complete the Completion
return callback(
@intToPtr(Context, @ptrToInt(_completion.context)),
@@ -997,11 +1087,11 @@ fn submitWithIncrementPending(
switch (operation_tag) {
.timeout => self.timeouts.push(completion),
else => self.completed.push(completion),
else => self.io_pending.push(completion),
}
}
pub const AcceptError = os.AcceptError || os.SetSockOptError;
pub const AcceptError = os.AcceptError || Errno;
// -- NOT DONE YET
pub fn eventfd(self: *IO) os.fd_t {
@@ -1092,13 +1182,7 @@ pub fn accept(
// darwin doesn't support os.MSG.NOSIGNAL,
// but instead a socket option to avoid SIGPIPE.
Syscall.setsockopt(fd, os.SOL_SOCKET, os.SO_NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch |err| return switch (err) {
error.TimeoutTooBig => unreachable,
error.PermissionDenied => error.NetworkSubsystemFailed,
error.AlreadyConnected => error.NetworkSubsystemFailed,
error.InvalidProtocolOption => error.ProtocolFailure,
else => |e| e,
};
Syscall.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch {};
return fd;
}
@@ -1106,6 +1190,67 @@ pub fn accept(
);
}
pub fn acceptNow(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: AcceptError!os.socket_t,
) void,
completion: *Completion,
socket: os.socket_t,
) void {
const accepter = struct {
fn doOperation(op: anytype) AcceptError!os.socket_t {
const fd = darwin.@"accept$NOCANCEL"(
op.socket,
null,
null,
);
if (fd < 0) {
switch (std.c.getErrno(fd)) {
.SUCCESS => unreachable,
.INTR => unreachable,
.AGAIN => return error.WouldBlock,
.CONNABORTED => return error.ConnectionAborted,
.INVAL => return error.SocketNotListening,
.MFILE => return error.ProcessFdQuotaExceeded,
.NFILE => return error.SystemFdQuotaExceeded,
.NOBUFS => return error.SystemResources,
.NOMEM => return error.SystemResources,
.PROTO => return error.ProtocolFailure,
.PERM => return error.BlockedByFirewall,
else => |err| return asError(err),
}
}
errdefer {
Syscall.close(fd) catch {};
}
const foo = Syscall.fcntl(fd, std.os.F.SETFL, (Syscall.fcntl(fd, std.os.F.GETFL, 0) catch 0) | std.os.O.NONBLOCK) catch 0;
_ = foo;
// darwin doesn't support os.MSG.NOSIGNAL,
// but instead a socket option to avoid SIGPIPE.
Syscall.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1))) catch {};
return fd;
}
};
self.submit(
context,
callback,
completion,
.accept,
.{
.socket = socket,
},
accepter,
);
}
pub const CloseError = error{
FileDescriptorInvalid,
DiskQuota,
@@ -1396,7 +1541,7 @@ pub fn recv(
},
struct {
fn doOperation(op: anytype) RecvError!usize {
const rc = system.@"recvfrom$NOCANCEL"(op.socket, op.buf, op.len, 0, null, null);
const rc = system.@"recv$NOCANCEL"(op.socket, op.buf, @minimum(op.len, op.available), 0);
return switch (system.getErrno(rc)) {
.SUCCESS => @intCast(usize, rc),
.AGAIN => error.WouldBlock,
@@ -1410,6 +1555,64 @@ pub fn recv(
);
}
pub fn recvNow(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: RecvError!usize,
) void,
completion: *Completion,
socket: os.socket_t,
buffer: []u8,
) void {
assert(socket > 0);
const receiver = struct {
fn doOperation(op: anytype) RecvError!usize {
const rc = system.@"recvfrom$NOCANCEL"(op.socket, op.buf, op.len, 0, null, null);
return switch (system.getErrno(rc)) {
.SUCCESS => @intCast(usize, rc),
.AGAIN => error.WouldBlock,
.NOMEM => error.SystemResources,
.CONNREFUSED => error.ConnectionRefused,
.CONNRESET => error.ConnectionResetByPeer,
else => |err| asError(err),
};
}
};
const op: Operation = .{ .recv = .{
.socket = socket,
.buf = buffer.ptr,
.len = @intCast(u32, buffer_limit(buffer.len)),
} };
const result = receiver.doOperation(op.recv) catch {
self.submit(
context,
callback,
completion,
.recv,
.{
.socket = socket,
.buf = buffer.ptr,
.len = @intCast(u32, buffer_limit(buffer.len)),
},
receiver,
);
return;
};
completion.* = .{
.next = null,
.context = context,
.callback = undefined,
.operation = op,
};
callback(context, completion, result);
}
pub const SendError = error{
AccessDenied,
AddressFamilyNotSupported,
@@ -1442,6 +1645,34 @@ pub fn send(
buffer: []const u8,
_: u32,
) void {
assert(socket > 0);
const sender = struct {
fn doOperation(op: anytype) SendError!usize {
const rc = system.@"sendto$NOCANCEL"(op.socket, op.buf, op.len, op.flags, null, 0);
return switch (system.getErrno(rc)) {
.SUCCESS => @intCast(usize, rc),
.ACCES => error.AccessDenied,
.AGAIN => error.WouldBlock,
.ALREADY => error.FastOpenAlreadyInProgress,
.CONNRESET => error.ConnectionResetByPeer,
.MSGSIZE => error.MessageTooBig,
.NOBUFS => error.SystemResources,
.NOMEM => error.SystemResources,
.PIPE => error.BrokenPipe,
.AFNOSUPPORT => error.AddressFamilyNotSupported,
.LOOP => error.SymLinkLoop,
.NAMETOOLONG => error.NameTooLong,
.NOENT => error.FileNotFound,
.NOTDIR => error.NotDir,
.HOSTUNREACH => error.NetworkUnreachable,
.NETUNREACH => error.NetworkUnreachable,
.NOTCONN => error.SocketNotConnected,
.NETDOWN => error.NetworkSubsystemFailed,
else => |err| asError(err),
};
}
};
self.submit(
context,
callback,
@@ -1453,35 +1684,84 @@ pub fn send(
.len = @intCast(u32, buffer_limit(buffer.len)),
.flags = 0,
},
struct {
fn doOperation(op: anytype) SendError!usize {
const rc = system.@"sendto$NOCANCEL"(op.socket, op.buf, op.len, op.flags, null, 0);
return switch (system.getErrno(rc)) {
.SUCCESS => @intCast(usize, rc),
.ACCES => error.AccessDenied,
.AGAIN => error.WouldBlock,
.ALREADY => error.FastOpenAlreadyInProgress,
.CONNRESET => error.ConnectionResetByPeer,
.MSGSIZE => error.MessageTooBig,
.NOBUFS => error.SystemResources,
.NOMEM => error.SystemResources,
.PIPE => error.BrokenPipe,
.AFNOSUPPORT => error.AddressFamilyNotSupported,
.LOOP => error.SymLinkLoop,
.NAMETOOLONG => error.NameTooLong,
.NOENT => error.FileNotFound,
.NOTDIR => error.NotDir,
.HOSTUNREACH => error.NetworkUnreachable,
.NETUNREACH => error.NetworkUnreachable,
.NOTCONN => error.SocketNotConnected,
.NETDOWN => error.NetworkSubsystemFailed,
else => |err| asError(err),
};
}
},
sender,
);
}
pub fn sendNow(
self: *IO,
comptime Context: type,
context: Context,
comptime callback: fn (
context: Context,
completion: *Completion,
result: SendError!usize,
) void,
completion: *Completion,
socket: os.socket_t,
buffer: []const u8,
_: u32,
) void {
return send(self, Context, context, callback, completion, socket, buffer, 0);
// assert(socket > 0);
// const sender = struct {
// fn doOperation(op: anytype) SendError!usize {
// const rc = system.@"sendto$NOCANCEL"(op.socket, op.buf, op.len, op.flags, null, 0);
// return switch (system.getErrno(rc)) {
// .SUCCESS => @intCast(usize, rc),
// .ACCES => error.AccessDenied,
// .AGAIN => error.WouldBlock,
// .ALREADY => error.FastOpenAlreadyInProgress,
// .CONNRESET => error.ConnectionResetByPeer,
// .MSGSIZE => error.MessageTooBig,
// .NOBUFS => error.SystemResources,
// .NOMEM => error.SystemResources,
// .PIPE => error.BrokenPipe,
// .AFNOSUPPORT => error.AddressFamilyNotSupported,
// .LOOP => error.SymLinkLoop,
// .NAMETOOLONG => error.NameTooLong,
// .NOENT => error.FileNotFound,
// .NOTDIR => error.NotDir,
// .HOSTUNREACH => error.NetworkUnreachable,
// .NETUNREACH => error.NetworkUnreachable,
// .NOTCONN => error.SocketNotConnected,
// .NETDOWN => error.NetworkSubsystemFailed,
// else => |err| asError(err),
// };
// }
// };
// const op: Operation = .{ .send = .{
// .socket = socket,
// .buf = buffer.ptr,
// .len = @intCast(u32, buffer_limit(buffer.len)),
// .flags = 0,
// } };
// const result = sender.doOperation(op.send) catch {
// self.submit(
// context,
// callback,
// completion,
// .send,
// .{
// .socket = socket,
// .buf = buffer.ptr,
// .len = @intCast(u32, buffer_limit(buffer.len)),
// .flags = 0,
// },
// sender,
// );
// return;
// };
// completion.* = .{
// .operation = op,
// .context = context,
// .callback = undefined,
// .next = null,
// };
// callback(context, completion, result);
}
pub const TimeoutError = error{Canceled} || Errno;
pub fn timeout(
@@ -1577,13 +1857,14 @@ pub fn write(
}
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
const fd = try Syscall.socket(family, sock_type | os.SOCK.NONBLOCK, protocol);
const fd = try Syscall.socket(family, sock_type | os.SOCK.NONBLOCK | os.SOCK.CLOEXEC, protocol);
errdefer {
Syscall.close(fd) catch {};
}
// darwin doesn't support os.MSG.NOSIGNAL, but instead a socket option to avoid SIGPIPE.
try Syscall.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE, &mem.toBytes(@as(c_int, 1)));
try Syscall.setsockopt(fd, os.SOL.SOCKET, os.SO.NOSIGPIPE | os.SO.NOSIGPIPE | darwin.TCP_NODELAY, &mem.toBytes(@as(c_int, 1)));
return fd;
}
@@ -1604,3 +1885,19 @@ fn buffer_limit(buffer_len: usize) usize {
pub var global: IO = undefined;
pub var global_loaded: bool = false;
extern fn io_darwin_create_listen_socket(host: [*c]const u8, port: [*c]const u8, reuse: bool) c_int;
pub fn createListenSocket(
host: []const u8,
port: u16,
reuse: bool,
) c_int {
var host_: [1024]u8 = undefined;
@memcpy(&host_, host.ptr, host.len);
host_[host.len] = 0;
var port_: [16]u8 = undefined;
var port_string = std.fmt.bufPrintZ(&port_, "{d}", .{port}) catch return -1;
var sentinled = host_[0..host.len :0];
return io_darwin_create_listen_socket(sentinled, port_string, reuse);
}

View File

@@ -995,7 +995,7 @@ pub const Waker = struct {
pub fn wait(this: Waker) !u64 {
var bytes: usize = 0;
_ = std.os.read(this.fd, @ptrCast(*[8]u8, &bytes)) catch 0;
_ = try std.os.read(this.fd, @ptrCast(*[8]u8, &bytes));
return @intCast(u64, bytes);
}

View File

@@ -26,7 +26,7 @@ const BrowserMap = @import("./package_json.zig").BrowserMap;
const CacheSet = cache.Set;
const DataURL = @import("./data_url.zig").DataURL;
pub const DirInfo = @import("./dir_info.zig");
const HTTPWatcher = if (Environment.isTest or Environment.isWasm) void else @import("../http.zig").Watcher;
const HTTPWatcher = if (Environment.isTest or Environment.isWasm) void else @import("../bun_dev_http_server.zig").Watcher;
const Wyhash = std.hash.Wyhash;
const ResolvePath = @import("./resolve_path.zig");
const NodeFallbackModules = @import("../node_fallbacks.zig");

View File

@@ -15,9 +15,9 @@ debug_only_checker: DebugHashTable = DebugHashTable{},
pub fn count(this: *StringBuilder, slice: string) void {
this.cap += slice.len;
if (comptime Env.allow_assert) {
_ = this.debug_only_checker.getOrPut(bun.default_allocator, bun.hash(slice)) catch unreachable;
}
// if (comptime Env.allow_assert) {
// _ = this.debug_only_checker.getOrPut(bun.default_allocator, bun.hash(slice)) catch unreachable;
// }
}
pub fn allocate(this: *StringBuilder, allocator: Allocator) !void {
@@ -29,10 +29,10 @@ pub fn allocate(this: *StringBuilder, allocator: Allocator) !void {
pub fn deinit(this: *StringBuilder, allocator: Allocator) void {
if (this.ptr == null or this.cap == 0) return;
allocator.free(this.ptr.?[0..this.cap]);
if (comptime Env.allow_assert) {
this.debug_only_checker.deinit(bun.default_allocator);
this.debug_only_checker = .{};
}
// if (comptime Env.allow_assert) {
// this.debug_only_checker.deinit(bun.default_allocator);
// this.debug_only_checker = .{};
// }
}
pub fn append(this: *StringBuilder, slice: string) string {
@@ -41,9 +41,9 @@ pub fn append(this: *StringBuilder, slice: string) string {
assert(this.ptr != null); // must call allocate first
}
if (comptime Env.allow_assert) {
assert(this.debug_only_checker.contains(bun.hash(slice)));
}
// if (comptime Env.allow_assert) {
// assert(this.debug_only_checker.contains(bun.hash(slice)));
// }
bun.copy(u8, this.ptr.?[this.len..this.cap], slice);
const result = this.ptr.?[this.len..this.cap][0..slice.len];