This commit is contained in:
Jarred Sumner
2022-01-26 14:37:44 -08:00
parent 047501999d
commit 9322cec8f2
5 changed files with 207 additions and 161 deletions

View File

@@ -1,14 +1,17 @@
import fs from "fs";
const response = await fetch("http://example.com/");
const text = await response.text();
const urls = ["https://example.com", "http://example.com"];
for (let url of urls) {
const response = await fetch(url);
const text = await response.text();
if (
fs.readFileSync(
import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) +
"/fetch.js.txt",
"utf8"
) !== text
) {
throw new Error("Expected fetch.js.txt to match snapshot");
if (
fs.readFileSync(
import.meta.path.substring(0, import.meta.path.lastIndexOf("/")) +
"/fetch.js.txt",
"utf8"
) !== text
) {
throw new Error("Expected fetch.js.txt to match snapshot");
}
}

View File

@@ -8,26 +8,38 @@ const Output = @import("../global.zig").Output;
const extremely_verbose = @import("../http_client_async.zig").extremely_verbose;
const SOCKET_FLAGS = @import("../http_client_async.zig").SOCKET_FLAGS;
const getAllocator = @import("../http_client_async.zig").getAllocator;
const assert = std.debug.assert;
const BufferPool = AsyncMessage.BufferPool;
const fail = -3;
const connection_closed = -2;
const pending = -1;
const OK = 0;
bio: *boring.BIO = undefined,
socket_fd: std.os.socket_t = 0,
allocator: std.mem.Allocator,
read_buf_len: usize = 0,
allocator: std.mem.Allocator,
read_wait: Wait = Wait.pending,
send_wait: Wait = Wait.pending,
recv_completion: AsyncIO.Completion = undefined,
send_completion: AsyncIO.Completion = undefined,
write_buffer: ?*AsyncMessage = null,
recv_buffer: ?*BufferPool.Node = null,
recv_completion: Completion = undefined,
last_send_result: AsyncIO.SendError!usize = 0,
send_buffer: ?*BufferPool.Node = null,
send_completion: Completion = undefined,
write_error: c_int = 0,
socket_recv_len: c_int = 0,
bio_read_offset: u32 = 0,
socket_send_error: ?anyerror = null,
socket_recv_error: ?anyerror = null,
last_read_result: AsyncIO.RecvError!usize = 0,
next: ?*AsyncBIO = null,
pending_frame: PendingFrame = PendingFrame.init(),
pending_frame: PendingFrame = PendingFrame.init(),
pub const PendingFrame = std.fifo.LinearFifo(anyframe, .{ .Static = 8 });
pub inline fn pushPendingFrame(this: *AsyncBIO, frame: anyframe) void {
@@ -38,6 +50,12 @@ pub inline fn popPendingFrame(this: *AsyncBIO) ?anyframe {
return this.pending_frame.readItem();
}
pub fn nextFrame(this: *AsyncBIO) void {
if (this.pending_frame.readItem()) |frame| {
resume frame;
}
}
var method: ?*boring.BIO_METHOD = null;
var head: ?*AsyncBIO = null;
@@ -77,11 +95,14 @@ pub fn release(this: *AsyncBIO) void {
}
this.read_wait = .pending;
this.last_read_result = 0;
this.send_wait = .pending;
this.last_read_result = 0;
this.pending_frame = PendingFrame.init();
if (this.recv_buffer) |recv| {
recv.release();
this.recv_buffer = null;
}
if (this.write_buffer) |write| {
write.release();
this.write_buffer = null;
@@ -110,79 +131,72 @@ const WaitResult = enum {
send,
};
const Sender = struct {
pub fn onSend(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
this.last_send_result = result;
this.send_wait = .completed;
this.write_buffer.?.sent += @truncate(u32, result catch 0);
pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
const socket_recv_len = @truncate(
c_int,
result_ catch |err| {
this.socket_recv_error = err;
this.socket_recv_len = fail;
this.onSocketReadComplete();
return;
},
);
this.socket_recv_len += socket_recv_len;
if (extremely_verbose) {
const read_result = result catch @as(usize, 999);
Output.prettyErrorln("onSend: {d}", .{read_result});
Output.flush();
}
if (this.pending_frame.readItem()) |frame| {
resume frame;
}
}
};
pub fn enqueueSend(
self: *AsyncBIO,
) void {
if (self.write_buffer == null) return;
var to_write = self.write_buffer.?.slice();
if (to_write.len == 0) {
if (socket_recv_len == 0) {
this.onSocketReadComplete();
return;
}
self.last_send_result = 0;
this.read_wait = .pending;
this.scheduleSocketRead();
}
AsyncIO.global.send(
*AsyncBIO,
self,
Sender.onSend,
&self.send_completion,
self.socket_fd,
to_write,
SOCKET_FLAGS,
);
self.send_wait = .suspended;
if (extremely_verbose) {
Output.prettyErrorln("enqueueSend: {d}", .{to_write.len});
Output.flush();
fn onSocketReadComplete(this: *AsyncBIO) void {
assert(this.read_wait == .suspended);
this.handleSocketReadComplete();
this.nextFrame();
}
inline fn readBuf(this: *AsyncBIO) []u8 {
return this.recv_buffer.?.data[this.bio_read_offset..];
}
pub fn scheduleSocketRead(this: *AsyncBIO) void {
assert(this.read_wait == .pending);
this.read_wait = .suspended;
AsyncIO.global.recv(*AsyncBIO, this, this.doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
}
fn handleSocketReadComplete(
this: *AsyncBIO,
) void {
this.read_wait = .completed;
if (this.socket_recv_len <= 0) {
if (this.recv_buffer) |buf| {
buf.release();
this.recv_buffer = null;
}
}
}
const Reader = struct {
pub fn onRead(this: *AsyncBIO, _: *Completion, result: AsyncIO.RecvError!usize) void {
this.last_read_result = result;
this.read_wait = .completed;
if (extremely_verbose) {
const read_result = result catch @as(usize, 999);
Output.prettyErrorln("onRead: {d}", .{read_result});
Output.flush();
}
if (this.pending_frame.readItem()) |frame| {
resume frame;
}
}
};
pub fn onSocketWriteComplete(this: *AsyncBIO, _: *Completion, result: AsyncIO.SendError!usize) void {
assert(this.send_wait == .pending);
this.handleSocketWriteComplete(result);
this.nextFrame();
}
pub fn enqueueRead(self: *AsyncBIO, read_buf: []u8, off: u64) void {
var read_buffer = read_buf[off..];
if (read_buffer.len == 0) {
return;
}
self.last_read_result = 0;
AsyncIO.global.recv(*AsyncBIO, self, Reader.onRead, &self.recv_completion, self.socket_fd, read_buffer);
self.read_wait = .suspended;
if (extremely_verbose) {
Output.prettyErrorln("enqueuedRead: {d}", .{read_buf.len});
Output.flush();
}
pub fn handleSocketWriteComplete(this: *AsyncBIO, result: AsyncIO.SendError!usize) void {
// this.last_socket_recv_len = result;
// this.read_wait = .completed;
// if (extremely_verbose) {
// const socket_recv_len = result catch @as(usize, 999);
// Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
// Output.flush();
// }
}
pub const Bio = struct {
@@ -205,90 +219,100 @@ pub const Bio = struct {
return 0;
}
pub fn write(this_bio: *boring.BIO, ptr: [*c]const u8, len: c_int) callconv(.C) c_int {
std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
if (len < 0) return len;
assert(@ptrToInt(ptr) > 0);
boring.BIO_clear_retry_flags(this_bio);
var this = cast(this_bio);
var buf = ptr[0..@intCast(usize, len)];
boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
if (this.socket_send_error != null) {
if (extremely_verbose) {
Output.prettyErrorln("write: {s}", .{@errorName(this.socket_send_error.?)});
Output.flush();
}
return -1;
}
}
if (len <= 0) {
return 0;
pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len_: c_int) callconv(.C) c_int {
if (len_ < 0) return len_;
const len__: u32 = @intCast(u32, len_);
assert(@ptrToInt(ptr) > 0);
boring.BIO_clear_retry_flags(this_bio);
var this = cast(this_bio);
var socket_recv_len = this.socket_recv_len;
var bio_read_offset = this.bio_read_offset;
defer {
this.bio_read_offset = bio_read_offset;
this.socket_recv_len = socket_recv_len;
}
var this = cast(this_bio);
if (this.read_wait == .suspended) {
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
if (this.socket_recv_error) |socket_err| {
if (extremely_verbose) Output.prettyErrorln("SSL read error: {s}", .{@errorName(socket_err)});
return -1;
}
switch (this.send_wait) {
.pending => {
var write_buffer = this.write_buffer orelse brk: {
this.write_buffer = AsyncMessage.get(getAllocator());
break :brk this.write_buffer.?;
};
_ = write_buffer.writeAll(buf);
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
},
.suspended => {
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
},
.completed => {
this.send_wait = .pending;
const written = this.last_send_result catch |err| {
Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
Output.flush();
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
};
this.last_send_result = 0;
return @intCast(c_int, written);
},
// If there is no result available synchronously, report any Write() errors
// that were observed. Otherwise the application may have encountered a socket
// error while writing that would otherwise not be reported until the
// application attempted to write again - which it may never do. See
// https://crbug.com/249848.
if ((this.write_error != OK or this.write_error != pending) and (socket_recv_len == OK or socket_recv_len == pending)) {
return -1;
}
unreachable;
}
pub fn read(this_bio: *boring.BIO, ptr: [*c]u8, len: c_int) callconv(.C) c_int {
std.debug.assert(@ptrToInt(ptr) > 0 and len >= 0);
var this = cast(this_bio);
var buf = ptr[0..@maximum(@intCast(usize, len), this.read_buf_len)];
boring.BIO_clear_flags(this_bio, boring.BIO_FLAGS_RWS | boring.BIO_FLAGS_SHOULD_RETRY);
switch (this.read_wait) {
.pending => {
this.enqueueRead(buf, 0);
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
},
.suspended => {
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
},
.completed => {
this.read_wait = .pending;
const read_len = this.last_read_result catch |err| {
Output.prettyErrorln("HTTPS error: {s}", .{@errorName(err)});
Output.flush();
boring.BIO_set_flags(this_bio, (boring.BIO_FLAGS_WRITE | boring.BIO_FLAGS_SHOULD_RETRY));
return -1;
};
this.last_read_result = 0;
return @intCast(c_int, read_len);
},
if (socket_recv_len == 0) {
// Instantiate the read buffer and read from the socket. Although only |len|
// bytes were requested, intentionally read to the full buffer size. The SSL
// layer reads the record header and body in separate reads to avoid
// overreading, but issuing one is more efficient. SSL sockets are not
// reused after shutdown for non-SSL traffic, so overreading is fine.
assert(bio_read_offset == 0);
this.scheduleSocketRead();
socket_recv_len = pending;
}
unreachable;
if (socket_recv_len == pending) {
boring.BIO_set_retry_read(this_bio);
return -1;
}
// If the last Read() failed, report the error.
if (socket_recv_len < 0) {
if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len});
return -1;
}
const socket_recv_len_ = @intCast(u32, socket_recv_len);
// Report the result of the last Read() if non-empty.
if (!(bio_read_offset < socket_recv_len_)) return 0;
const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
var data = @ptrCast([*]const u8, &this.recv_buffer.?.data[bio_read_offset]);
@memcpy(ptr, data, len);
bio_read_offset += len;
if (bio_read_offset == socket_recv_len_) {
// The read buffer is empty.
bio_read_offset = 0;
socket_recv_len = 0;
if (this.recv_buffer) |buf| {
buf.release();
this.recv_buffer = null;
}
}
return @intCast(c_int, len);
}
// https://chromium.googlesource.com/chromium/src/+/refs/heads/main/net/socket/socket_bio_adapter.cc#376
pub fn ctrl(_: *boring.BIO, cmd: c_int, _: c_long, _: ?*anyopaque) callconv(.C) c_long {
return switch (cmd) {
boring.BIO_CTRL_PENDING, boring.BIO_CTRL_WPENDING => 0,
else => 1,
// The SSL stack requires BIOs handle BIO_flush.
boring.BIO_CTRL_FLUSH => 1,
else => 0,
};
}
};

View File

@@ -3,7 +3,8 @@ const ObjectPool = @import("../pool.zig").ObjectPool;
const AsyncIO = @import("io");
pub const buffer_pool_len = std.math.maxInt(u16) - 64;
pub const BufferPool = ObjectPool([buffer_pool_len]u8, null, false);
pub const BufferPoolBytes = [buffer_pool_len]u8;
pub const BufferPool = ObjectPool(BufferPoolBytes, null, false);
const AsyncMessage = @This();

View File

@@ -641,6 +641,29 @@ pub const Bun = struct {
return ZigString.init(stream.buffer[0..stream.pos]).toValueGC(ctx.ptr()).asObjectRef();
}
// pub fn resolvePath(
// _: void,
// ctx: js.JSContextRef,
// _: js.JSObjectRef,
// _: js.JSObjectRef,
// arguments: []const js.JSValueRef,
// _: js.ExceptionRef,
// ) js.JSValueRef {
// if (arguments.len == 0) return ZigString.Empty.toValue(ctx.ptr()).asObjectRef();
// var zig_str: ZigString = ZigString.Empty;
// JSValue.toZigString(JSValue.fromRef(arguments[0]), &zig_str, ctx.ptr());
// var buf: [std.fs.MAX_PATH_BYTES]u8 = undefined;
// var stack = std.heap.stackFallback(32 * @sizeOf(string), VirtualMachine.vm.allocator);
// var allocator = stack.get();
// var parts = allocator.alloc(string, arguments.len) catch {};
// defer allocator.free(parts);
// const to = zig_str.slice();
// var parts = .{to};
// const value = ZigString.init(VirtualMachine.vm.bundler.fs.absBuf(&parts, &buf)).toValueGC(ctx.ptr());
// return value.asObjectRef();
// }
pub const Class = NewClass(
void,
.{

5
types.d.ts vendored
View File

@@ -1,5 +0,0 @@
interface BunNodeModule extends NodeJS.Module {
requireFirst(...id: string[]): any;
}
declare var module: BunNodeModule;