mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
[bun install] async http request works I think?
This commit is contained in:
9
.vscode/launch.json
vendored
9
.vscode/launch.json
vendored
@@ -1,6 +1,15 @@
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"name": "HTTP bench",
|
||||
"program": "${workspaceFolder}/misctools/http_bench",
|
||||
"args": ["http://example.com"],
|
||||
"cwd": "${workspaceFolder}",
|
||||
"console": "internalConsole"
|
||||
},
|
||||
{
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
|
||||
11
Makefile
11
Makefile
@@ -425,6 +425,17 @@ fetch-debug:
|
||||
src/deps/picohttpparser.o \
|
||||
$(LIBCRYPTO_STATIC_LIB)
|
||||
|
||||
|
||||
httpbench-debug:
|
||||
cd misctools; $(ZIG) build-obj ./http_bench.zig -fcompiler-rt -lc --main-pkg-path ../ --pkg-begin io ../$(IO_FILE) --pkg-end
|
||||
$(CXX) ./misctools/http_bench.o -g -o ./misctools/http_bench $(DEFAULT_LINKER_FLAGS) -lc \
|
||||
src/deps/mimalloc/libmimalloc.a \
|
||||
src/deps/zlib/libz.a \
|
||||
src/deps/libarchive.a \
|
||||
src/deps/libs2n.a \
|
||||
src/deps/picohttpparser.o \
|
||||
$(LIBCRYPTO_STATIC_LIB)
|
||||
|
||||
s2n-mac:
|
||||
cd $(DEPS_DIR)/s2n-tls; \
|
||||
make clean; \
|
||||
|
||||
2
misctools/.gitignore
vendored
2
misctools/.gitignore
vendored
@@ -1,3 +1,5 @@
|
||||
*.tgz
|
||||
tgz
|
||||
readlink-getfd
|
||||
readlink-realpath
|
||||
http_bench
|
||||
|
||||
@@ -2,7 +2,7 @@ const std = @import("std");
|
||||
usingnamespace @import("../src/global.zig");
|
||||
const clap = @import("../src/deps/zig-clap/clap.zig");
|
||||
|
||||
const HTTPClient = @import("../src/http_client.zig");
|
||||
const HTTPClient = @import("../src/http/http_client_async.zig");
|
||||
const URL = @import("../src/query_string_map.zig").URL;
|
||||
const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers;
|
||||
const Method = @import("../src/http/method.zig").Method;
|
||||
|
||||
250
misctools/http_bench.zig
Normal file
250
misctools/http_bench.zig
Normal file
@@ -0,0 +1,250 @@
|
||||
const std = @import("std");
|
||||
usingnamespace @import("../src/global.zig");
|
||||
const clap = @import("../src/deps/zig-clap/clap.zig");
|
||||
|
||||
const URL = @import("../src/query_string_map.zig").URL;
|
||||
const Headers = @import("../src/javascript/jsc/webcore/response.zig").Headers;
|
||||
const Method = @import("../src/http/method.zig").Method;
|
||||
const ColonListType = @import("../src/cli/colon_list_type.zig").ColonListType;
|
||||
const HeadersTuple = ColonListType(string, noop_resolver);
|
||||
const path_handler = @import("../src/resolver/resolve_path.zig");
|
||||
|
||||
fn noop_resolver(in: string) !string {
|
||||
return in;
|
||||
}
|
||||
|
||||
const VERSION = "0.0.0";
|
||||
|
||||
const params = [_]clap.Param(clap.Help){
|
||||
clap.parseParam("-v, --verbose Show headers & status code") catch unreachable,
|
||||
clap.parseParam("-H, --header <STR>... Add a header") catch unreachable,
|
||||
clap.parseParam("-r, --max-redirects <STR> Maximum number of redirects to follow (default: 128)") catch unreachable,
|
||||
clap.parseParam("-b, --body <STR> HTTP request body as a string") catch unreachable,
|
||||
clap.parseParam("-f, --file <STR> File path to load as body") catch unreachable,
|
||||
clap.parseParam("-n, --count <INT> How many runs? Default 10") catch unreachable,
|
||||
clap.parseParam("-t, --timeout <INT> Max duration per request") catch unreachable,
|
||||
clap.parseParam("-r, --retry <INT> Max retry count") catch unreachable,
|
||||
clap.parseParam("--no-gzip Disable gzip") catch unreachable,
|
||||
clap.parseParam("--no-deflate Disable deflate") catch unreachable,
|
||||
clap.parseParam("--no-compression Disable gzip & deflate") catch unreachable,
|
||||
clap.parseParam("--version Print the version and exit") catch unreachable,
|
||||
clap.parseParam("--turbo Skip sending TLS shutdown signals") catch unreachable,
|
||||
clap.parseParam("<POS>... ") catch unreachable,
|
||||
};
|
||||
|
||||
const MethodNames = std.ComptimeStringMap(Method, .{
|
||||
.{ "GET", Method.GET },
|
||||
.{ "get", Method.GET },
|
||||
|
||||
.{ "POST", Method.POST },
|
||||
.{ "post", Method.POST },
|
||||
|
||||
.{ "PUT", Method.PUT },
|
||||
.{ "put", Method.PUT },
|
||||
|
||||
.{ "PATCH", Method.PATCH },
|
||||
.{ "patch", Method.PATCH },
|
||||
|
||||
.{ "OPTIONS", Method.OPTIONS },
|
||||
.{ "options", Method.OPTIONS },
|
||||
|
||||
.{ "HEAD", Method.HEAD },
|
||||
.{ "head", Method.HEAD },
|
||||
});
|
||||
|
||||
var file_path_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined;
|
||||
var cwd_buf: [std.fs.MAX_PATH_BYTES + 1]u8 = undefined;
|
||||
|
||||
pub const Arguments = struct {
|
||||
url: URL,
|
||||
method: Method,
|
||||
verbose: bool = false,
|
||||
headers: Headers.Entries,
|
||||
headers_buf: string,
|
||||
body: string = "",
|
||||
turbo: bool = false,
|
||||
count: usize = 10,
|
||||
timeout: usize = 0,
|
||||
|
||||
pub fn parse(allocator: *std.mem.Allocator) !Arguments {
|
||||
var diag = clap.Diagnostic{};
|
||||
|
||||
var args = clap.parse(clap.Help, ¶ms, .{
|
||||
.diagnostic = &diag,
|
||||
.allocator = allocator,
|
||||
}) catch |err| {
|
||||
// Report useful error and exit
|
||||
diag.report(Output.errorWriter(), err) catch {};
|
||||
return err;
|
||||
};
|
||||
|
||||
var positionals = args.positionals();
|
||||
var raw_args: std.ArrayListUnmanaged(string) = undefined;
|
||||
|
||||
if (positionals.len > 0) {
|
||||
raw_args = .{ .capacity = positionals.len, .items = @intToPtr([*][]const u8, @ptrToInt(positionals.ptr))[0..positionals.len] };
|
||||
} else {
|
||||
raw_args = .{};
|
||||
}
|
||||
|
||||
if (args.flag("--version")) {
|
||||
try Output.writer().writeAll(VERSION);
|
||||
std.os.exit(0);
|
||||
}
|
||||
|
||||
var method = Method.GET;
|
||||
var url: URL = .{};
|
||||
var body_string: string = args.option("--body") orelse "";
|
||||
|
||||
if (args.option("--file")) |file_path| {
|
||||
if (file_path.len > 0) {
|
||||
var cwd = try std.process.getCwd(&cwd_buf);
|
||||
var parts = [_]string{std.mem.span(file_path)};
|
||||
var absolute_path = path_handler.joinAbsStringBuf(cwd, &file_path_buf, &parts, .auto);
|
||||
file_path_buf[absolute_path.len] = 0;
|
||||
file_path_buf[absolute_path.len + 1] = 0;
|
||||
var absolute_path_len = absolute_path.len;
|
||||
var absolute_path_ = file_path_buf[0..absolute_path_len :0];
|
||||
|
||||
var body_file = std.fs.openFileAbsoluteZ(absolute_path_, .{ .read = true }) catch |err| {
|
||||
Output.printErrorln("<r><red>{s}<r> opening file {s}", .{ @errorName(err), absolute_path });
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
};
|
||||
|
||||
var file_contents = body_file.readToEndAlloc(allocator, try body_file.getEndPos()) catch |err| {
|
||||
Output.printErrorln("<r><red>{s}<r> reading file {s}", .{ @errorName(err), absolute_path });
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
};
|
||||
body_string = file_contents;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
var raw_arg_i: usize = 0;
|
||||
while (raw_arg_i < raw_args.items.len) : (raw_arg_i += 1) {
|
||||
const arg = raw_args.items[raw_arg_i];
|
||||
if (MethodNames.get(std.mem.span(arg))) |method_| {
|
||||
method = method_;
|
||||
_ = raw_args.swapRemove(raw_arg_i);
|
||||
}
|
||||
}
|
||||
|
||||
if (raw_args.items.len == 0) {
|
||||
Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Missing URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{});
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
}
|
||||
|
||||
const url_position = raw_args.items.len - 1;
|
||||
url = URL.parse(raw_args.swapRemove(url_position));
|
||||
if (!url.isAbsolute()) {
|
||||
Output.prettyErrorln("<r><red>error<r><d>:<r> <b>Invalid URL<r>\n\nExample:\n<r><b>fetch GET https://example.com<r>\n\n<b>fetch example.com/foo<r>\n\n", .{});
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return Arguments{
|
||||
.url = url,
|
||||
.method = method,
|
||||
.verbose = args.flag("--verbose"),
|
||||
.headers = .{},
|
||||
.headers_buf = "",
|
||||
.body = body_string,
|
||||
.turbo = args.flag("--turbo"),
|
||||
.timeout = std.fmt.parseInt(usize, args.option("--timeout") orelse "0", 10) catch |err| {
|
||||
Output.prettyErrorln("<r><red>{s}<r> parsing timeout", .{@errorName(err)});
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
},
|
||||
.count = std.fmt.parseInt(usize, args.option("--count") orelse "10", 10) catch |err| {
|
||||
Output.prettyErrorln("<r><red>{s}<r> parsing count", .{@errorName(err)});
|
||||
Output.flush();
|
||||
std.os.exit(1);
|
||||
},
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
const NetworkThread = @import("../src/http/network_thread.zig");
|
||||
const HTTP = @import("../src/http/http_client_async.zig");
|
||||
|
||||
var stdout_: std.fs.File = undefined;
|
||||
var stderr_: std.fs.File = undefined;
|
||||
pub fn main() anyerror!void {
|
||||
stdout_ = std.io.getStdOut();
|
||||
stderr_ = std.io.getStdErr();
|
||||
var output_source = Output.Source.init(stdout_, stderr_);
|
||||
Output.Source.set(&output_source);
|
||||
|
||||
defer Output.flush();
|
||||
|
||||
var args = try Arguments.parse(default_allocator);
|
||||
|
||||
var channel = try default_allocator.create(HTTP.HTTPChannel);
|
||||
channel.* = HTTP.HTTPChannel.init();
|
||||
|
||||
try channel.buffer.ensureCapacity(args.count);
|
||||
|
||||
try NetworkThread.init();
|
||||
|
||||
var i: usize = 0;
|
||||
while (i < args.count) : (i += 1) {
|
||||
var response_body = try default_allocator.create(MutableString);
|
||||
response_body.* = try MutableString.init(default_allocator, 1024);
|
||||
var request_body = try default_allocator.create(MutableString);
|
||||
request_body.* = try MutableString.init(default_allocator, 0);
|
||||
|
||||
var async_http = try default_allocator.create(HTTP.AsyncHTTP);
|
||||
async_http.* = try HTTP.AsyncHTTP.init(
|
||||
default_allocator,
|
||||
args.method,
|
||||
args.url,
|
||||
args.headers,
|
||||
args.headers_buf,
|
||||
request_body,
|
||||
response_body,
|
||||
args.timeout,
|
||||
);
|
||||
async_http.channel = channel;
|
||||
async_http.schedule(default_allocator);
|
||||
}
|
||||
|
||||
var read_count: usize = 0;
|
||||
var success_count: usize = 0;
|
||||
var fail_count: usize = 0;
|
||||
var timer = try std.time.Timer.start();
|
||||
while (read_count < args.count) {
|
||||
while (channel.tryReadItem() catch null) |http| {
|
||||
read_count += 1;
|
||||
|
||||
Output.printElapsed(@floatCast(f64, @intToFloat(f128, http.elapsed) / std.time.ns_per_ms));
|
||||
if (http.response) |resp| {
|
||||
if (resp.status_code == 200) {
|
||||
success_count += 1;
|
||||
} else {
|
||||
fail_count += 1;
|
||||
}
|
||||
Output.printError(" {}\n", .{resp});
|
||||
} else if (http.err) |err| {
|
||||
fail_count += 1;
|
||||
Output.printError(" err: {s}\n", .{@errorName(err)});
|
||||
} else {
|
||||
fail_count += 1;
|
||||
Output.prettyError(" Uh-oh: {s}\n", .{@tagName(http.state.loadUnchecked())});
|
||||
}
|
||||
|
||||
Output.flush();
|
||||
}
|
||||
}
|
||||
|
||||
Output.printElapsed(@floatCast(f64, @intToFloat(f128, timer.read()) / std.time.ns_per_ms));
|
||||
Output.prettyErrorln("Completed {d}\n Success: <green>{d}<r>\n Failure: <red>{d}<r>\n", .{
|
||||
read_count,
|
||||
success_count,
|
||||
fail_count,
|
||||
});
|
||||
Output.flush();
|
||||
}
|
||||
32
src/http.zig
32
src/http.zig
@@ -1540,7 +1540,7 @@ pub const RequestContext = struct {
|
||||
ctx.appendHeader("Sec-WebSocket-Protocol", "bun-hmr");
|
||||
try ctx.writeStatus(101);
|
||||
try ctx.flushHeaders();
|
||||
// Output.prettyln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{});
|
||||
// Output.prettyErrorln("<r><green>101<r><d> Hot Module Reloading connected.<r>", .{});
|
||||
// Output.flush();
|
||||
Analytics.Features.hot_module_reloading = true;
|
||||
|
||||
@@ -1591,7 +1591,7 @@ pub const RequestContext = struct {
|
||||
var frame = handler.websocket.read() catch |err| {
|
||||
switch (err) {
|
||||
error.ConnectionClosed => {
|
||||
// Output.prettyln("Websocket closed.", .{});
|
||||
// Output.prettyErrorln("Websocket closed.", .{});
|
||||
handler.tombstone = true;
|
||||
is_socket_closed = true;
|
||||
continue;
|
||||
@@ -1604,7 +1604,7 @@ pub const RequestContext = struct {
|
||||
};
|
||||
switch (frame.header.opcode) {
|
||||
.Close => {
|
||||
// Output.prettyln("Websocket closed.", .{});
|
||||
// Output.prettyErrorln("Websocket closed.", .{});
|
||||
is_socket_closed = true;
|
||||
return;
|
||||
},
|
||||
@@ -1639,7 +1639,7 @@ pub const RequestContext = struct {
|
||||
},
|
||||
.success => {
|
||||
if (build_result.timestamp > cmd.timestamp) {
|
||||
Output.prettyln(
|
||||
Output.prettyErrorln(
|
||||
"<r><b><green>{d}ms<r> <d>built<r> <b>{s}<r><b> <r><d>({d}+ LOC)",
|
||||
.{
|
||||
build_result.timestamp - cmd.timestamp,
|
||||
@@ -2553,7 +2553,7 @@ pub const Server = struct {
|
||||
);
|
||||
|
||||
if (comptime FeatureFlags.verbose_watcher) {
|
||||
Output.prettyln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
Output.prettyErrorln("<r><d>File changed: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
}
|
||||
} else {
|
||||
const change_message = Api.WebsocketMessageFileChangeNotification{
|
||||
@@ -2566,12 +2566,12 @@ pub const Server = struct {
|
||||
const change_buf = content_fbs.getWritten();
|
||||
const written_buf = filechange_buf[0 .. header.len + change_buf.len];
|
||||
RequestContext.WebsocketHandler.broadcast(written_buf) catch |err| {
|
||||
Output.prettyln("Error writing change notification: {s}<r>", .{@errorName(err)});
|
||||
Output.prettyErrorln("Error writing change notification: {s}<r>", .{@errorName(err)});
|
||||
};
|
||||
if (comptime is_emoji_enabled) {
|
||||
Output.prettyln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
Output.prettyErrorln("<r>📜 <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
} else {
|
||||
Output.prettyln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
Output.prettyErrorln("<r> <d>File change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -2583,9 +2583,9 @@ pub const Server = struct {
|
||||
// ctx.watcher.removeAtIndex(event.index, hashes[event.index], parent_hashes, .directory);
|
||||
|
||||
if (comptime is_emoji_enabled) {
|
||||
Output.prettyln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
Output.prettyErrorln("<r>📁 <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
} else {
|
||||
Output.prettyln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
Output.prettyErrorln("<r> <d>Dir change: {s}<r>", .{ctx.bundler.fs.relativeTo(file_path)});
|
||||
}
|
||||
},
|
||||
}
|
||||
@@ -2839,13 +2839,13 @@ pub const Server = struct {
|
||||
200, 304, 101 => {},
|
||||
|
||||
201...303, 305...399 => {
|
||||
Output.prettyln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><green>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
400...499 => {
|
||||
Output.prettyln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><yellow>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
else => {
|
||||
Output.prettyln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><red>{d}<r><d> {s} <r>{s}<d> as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -2861,13 +2861,13 @@ pub const Server = struct {
|
||||
200, 304, 101 => {},
|
||||
|
||||
201...303, 305...399 => {
|
||||
Output.prettyln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><green>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
400...499 => {
|
||||
Output.prettyln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><yellow>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
else => {
|
||||
Output.prettyln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
Output.prettyErrorln("<r><red>{d}<r><d> <r>{s}<d> {s} as {s}<r>", .{ status, @tagName(req_ctx.method), req.path, req_ctx.mime_type.value });
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
1310
src/http/http_client_async.zig
Normal file
1310
src/http/http_client_async.zig
Normal file
File diff suppressed because it is too large
Load Diff
25
src/http/network_thread.zig
Normal file
25
src/http/network_thread.zig
Normal file
@@ -0,0 +1,25 @@
|
||||
const ThreadPool = @import("../thread_pool.zig");
|
||||
const Batch = ThreadPool.Batch;
|
||||
const std = @import("std");
|
||||
const AsyncIO = @import("io");
|
||||
|
||||
const NetworkThread = @This();
|
||||
|
||||
/// Single-thread in this pool
|
||||
pool: ThreadPool,
|
||||
|
||||
pub var global: NetworkThread = undefined;
|
||||
pub var global_loaded: bool = false;
|
||||
|
||||
pub fn init() !void {
|
||||
AsyncIO.global = try AsyncIO.init(0, 0);
|
||||
AsyncIO.global_loaded = true;
|
||||
|
||||
global = NetworkThread{
|
||||
.pool = ThreadPool.init(.{ .max_threads = 1, .stack_size = 64 * 1024 * 1024 }),
|
||||
};
|
||||
|
||||
global.pool.io = &AsyncIO.global;
|
||||
|
||||
global_loaded = true;
|
||||
}
|
||||
@@ -259,16 +259,8 @@ pub fn buildRequest(this: *const HTTPClient, body_len: usize) picohttp.Request {
|
||||
pub fn connect(
|
||||
this: *HTTPClient,
|
||||
) !tcp.Client {
|
||||
var client: tcp.Client = try tcp.Client.init(tcp.Domain.ip, .{ .close_on_exec = true });
|
||||
const port = this.url.getPortAuto();
|
||||
client.setNoDelay(true) catch {};
|
||||
client.setReadBufferSize(http_req_buf.len) catch {};
|
||||
client.setQuickACK(true) catch {};
|
||||
|
||||
if (this.timeout > 0) {
|
||||
client.setReadTimeout(this.timeout) catch {};
|
||||
client.setWriteTimeout(this.timeout) catch {};
|
||||
}
|
||||
var client: tcp.Client = undefined;
|
||||
|
||||
// if (this.url.isLocalhost()) {
|
||||
// try client.connect(
|
||||
@@ -277,7 +269,16 @@ pub fn connect(
|
||||
// } else {
|
||||
// } else if (this.url.isDomainName()) {
|
||||
var stream = try std.net.tcpConnectToHost(default_allocator, this.url.hostname, port);
|
||||
client.socket = std.x.os.Socket.from(stream.handle);
|
||||
client = tcp.Client{ .socket = std.x.os.Socket.from(stream.handle) };
|
||||
|
||||
if (this.timeout > 0) {
|
||||
client.setReadTimeout(this.timeout) catch {};
|
||||
client.setWriteTimeout(this.timeout) catch {};
|
||||
}
|
||||
|
||||
client.setNoDelay(true) catch {};
|
||||
client.setReadBufferSize(http_req_buf.len) catch {};
|
||||
client.setQuickACK(true) catch {};
|
||||
|
||||
// }
|
||||
// } else if (this.url.getIPv4Address()) |ip_addr| {
|
||||
|
||||
@@ -1,5 +1,20 @@
|
||||
const std = @import("std");
|
||||
const os = std.os;
|
||||
const os = struct {
|
||||
pub usingnamespace std.os;
|
||||
pub const EINTR = 4;
|
||||
pub const EAGAIN = 35;
|
||||
pub const EBADF = 9;
|
||||
pub const ECONNRESET = 54;
|
||||
pub const EFAULT = 14;
|
||||
pub const EINVAL = 22;
|
||||
pub const EIO = 5;
|
||||
pub const EISDIR = 21;
|
||||
pub const ENOBUFS = 55;
|
||||
pub const ENOMEM = 12;
|
||||
pub const ENXIO = 6;
|
||||
pub const EOVERFLOW = 84;
|
||||
pub const ESPIPE = 29;
|
||||
};
|
||||
const mem = std.mem;
|
||||
const assert = std.debug.assert;
|
||||
|
||||
@@ -486,7 +501,7 @@ pub fn read(
|
||||
op.len,
|
||||
@bitCast(isize, op.offset),
|
||||
);
|
||||
return switch (os.errno(rc)) {
|
||||
return switch (@enumToInt(os.errno(rc))) {
|
||||
0 => @intCast(usize, rc),
|
||||
os.EINTR => continue,
|
||||
os.EAGAIN => error.WouldBlock,
|
||||
@@ -501,7 +516,7 @@ pub fn read(
|
||||
os.ENXIO => error.Unseekable,
|
||||
os.EOVERFLOW => error.Unseekable,
|
||||
os.ESPIPE => error.Unseekable,
|
||||
else => |err| os.unexpectedErrno(err),
|
||||
else => error.Unexpected,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -663,3 +678,6 @@ fn buffer_limit(buffer_len: usize) usize {
|
||||
};
|
||||
return std.math.min(limit, buffer_len);
|
||||
}
|
||||
|
||||
pub var global: IO = undefined;
|
||||
pub var global_loaded: bool = false;
|
||||
|
||||
@@ -872,3 +872,6 @@ pub fn write(
|
||||
pub fn openSocket(family: u32, sock_type: u32, protocol: u32) !os.socket_t {
|
||||
return os.socket(family, sock_type, protocol);
|
||||
}
|
||||
|
||||
pub var global: IO = undefined;
|
||||
pub var global_loaded: bool = false;
|
||||
|
||||
@@ -17,6 +17,8 @@ pub const MainPanicHandler = panicky.NewPanicHandler(std.builtin.default_panic);
|
||||
const js = @import("javascript/jsc/bindings/bindings.zig");
|
||||
usingnamespace @import("javascript/jsc/javascript.zig");
|
||||
|
||||
pub const io_mode = .blocking;
|
||||
|
||||
pub fn panic(msg: []const u8, error_return_trace: ?*std.builtin.StackTrace) noreturn {
|
||||
MainPanicHandler.handle_panic(msg, error_return_trace);
|
||||
}
|
||||
|
||||
37
src/pool.zig
Normal file
37
src/pool.zig
Normal file
@@ -0,0 +1,37 @@
|
||||
const std = @import("std");
|
||||
|
||||
pub fn ObjectPool(comptime Type: type, comptime Init: (fn (allocator: *std.mem.Allocator) anyerror!Type)) type {
|
||||
return struct {
|
||||
const LinkedList = std.SinglyLinkedList(Type);
|
||||
// mimalloc crashes on realloc across threads
|
||||
threadlocal var list: LinkedList = undefined;
|
||||
threadlocal var loaded: bool = false;
|
||||
pub fn get(allocator: *std.mem.Allocator) *LinkedList.Node {
|
||||
if (loaded) {
|
||||
if (list.popFirst()) |node| {
|
||||
node.data.reset();
|
||||
return node;
|
||||
}
|
||||
}
|
||||
|
||||
var new_node = allocator.create(LinkedList.Node) catch unreachable;
|
||||
new_node.* = LinkedList.Node{
|
||||
.data = Init(
|
||||
allocator,
|
||||
) catch unreachable,
|
||||
};
|
||||
|
||||
return new_node;
|
||||
}
|
||||
|
||||
pub fn release(node: *LinkedList.Node) void {
|
||||
if (loaded) {
|
||||
list.prepend(node);
|
||||
return;
|
||||
}
|
||||
|
||||
list = LinkedList{ .first = node };
|
||||
loaded = true;
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -4,10 +4,13 @@
|
||||
const std = @import("std");
|
||||
const ThreadPool = @This();
|
||||
const Futex = @import("./futex.zig");
|
||||
const AsyncIO = @import("io");
|
||||
|
||||
const assert = std.debug.assert;
|
||||
const Atomic = std.atomic.Atomic;
|
||||
|
||||
io: ?*AsyncIO = null,
|
||||
|
||||
stack_size: u32,
|
||||
max_threads: u32,
|
||||
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
|
||||
@@ -15,6 +18,7 @@ idle_event: Event = .{},
|
||||
join_event: Event = .{},
|
||||
run_queue: Node.Queue = .{},
|
||||
threads: Atomic(?*Thread) = Atomic(?*Thread).init(null),
|
||||
name: []const u8 = "",
|
||||
|
||||
const Sync = packed struct {
|
||||
/// Tracks the number of threads not searching for Tasks
|
||||
@@ -172,6 +176,7 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
|
||||
if (can_wake and sync.spawned < self.max_threads) {
|
||||
const spawn_config = std.Thread.SpawnConfig{ .stack_size = self.stack_size };
|
||||
const thread = std.Thread.spawn(spawn_config, Thread.run, .{self}) catch return self.unregister(null);
|
||||
// if (self.name.len > 0) thread.setName(self.name) catch {};
|
||||
return thread.detach();
|
||||
}
|
||||
|
||||
@@ -230,7 +235,14 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
|
||||
|
||||
// Wait for a signal by either notify() or shutdown() without wasting cpu cycles.
|
||||
// TODO: Add I/O polling here.
|
||||
if (self.io) |io| {
|
||||
io.tick() catch unreachable;
|
||||
}
|
||||
} else {
|
||||
if (self.io) |io| {
|
||||
while (true) io.run_for_ns(std.time.ns_per_ms * 1000) catch {};
|
||||
}
|
||||
|
||||
self.idle_event.wait();
|
||||
sync = @bitCast(Sync, self.sync.load(.Monotonic));
|
||||
}
|
||||
@@ -315,6 +327,8 @@ fn join(self: *ThreadPool) void {
|
||||
thread.join_event.notify();
|
||||
}
|
||||
|
||||
const Output = @import("./global.zig").Output;
|
||||
|
||||
const Thread = struct {
|
||||
next: ?*Thread = null,
|
||||
target: ?*Thread = null,
|
||||
@@ -326,6 +340,8 @@ const Thread = struct {
|
||||
|
||||
/// Thread entry point which runs a worker for the ThreadPool
|
||||
fn run(thread_pool: *ThreadPool) void {
|
||||
Output.Source.configureThread();
|
||||
|
||||
var self = Thread{};
|
||||
current = &self;
|
||||
|
||||
@@ -344,6 +360,7 @@ const Thread = struct {
|
||||
const task = @fieldParentPtr(Task, "node", result.node);
|
||||
(task.callback)(task);
|
||||
}
|
||||
Output.flush();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,6 +386,9 @@ const Thread = struct {
|
||||
}
|
||||
|
||||
// TODO: add optimistic I/O polling here
|
||||
if (thread_pool.io) |io| {
|
||||
io.tick() catch {};
|
||||
}
|
||||
|
||||
// Then try work stealing from other threads
|
||||
var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
|
||||
|
||||
Reference in New Issue
Block a user