Fix biggest issue with HTTPS client!

This commit is contained in:
Jarred Sumner
2022-02-01 18:51:25 -08:00
parent f3c4bfcbae
commit 170e58a99d

View File

@@ -15,20 +15,27 @@ const fail = -3;
const connection_closed = -2;
const pending = -1;
const OK = 0;
const ObjectPool = @import("../pool.zig").ObjectPool;
const Packet = struct {
completion: Completion,
min: u32 = 0,
owned_slice: []u8 = &[_]u8{},
pub const Pool = ObjectPool(Packet, null, false);
};
bio: *boring.BIO = undefined,
socket_fd: std.os.socket_t = 0,
allocator: std.mem.Allocator,
read_wait: Wait = Wait.pending,
send_wait: Wait = Wait.pending,
pending_reads: u32 = 0,
pending_sends: u32 = 0,
recv_buffer: ?*BufferPool.Node = null,
recv_completion: Completion = undefined,
send_buffer: ?*BufferPool.Node = null,
send_completion: Completion = undefined,
write_error: c_int = 0,
socket_recv_len: c_int = 0,
@@ -89,8 +96,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
if (head) |head_| {
var next = head_.next;
var ret = head_;
ret.read_wait = .pending;
ret.send_wait = .pending;
head = next;
return ret;
@@ -99,8 +104,6 @@ fn instance(allocator: std.mem.Allocator) *AsyncBIO {
var bio = allocator.create(AsyncBIO) catch unreachable;
bio.* = AsyncBIO{
.allocator = allocator,
.read_wait = .pending,
.send_wait = .pending,
};
return bio;
@@ -111,8 +114,6 @@ pub fn release(this: *AsyncBIO) void {
this.next = head_;
}
this.read_wait = .pending;
this.send_wait = .pending;
this.socket_send_len = 0;
this.socket_recv_len = 0;
this.bio_write_offset = 0;
@@ -154,7 +155,10 @@ const WaitResult = enum {
send,
};
pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!usize) void {
pub fn doSocketRead(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.RecvError!usize) void {
var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion));
ctx.release();
const socket_recv_len = @intCast(
c_int,
result_ catch |err| {
@@ -169,10 +173,10 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
Output.prettyErrorln("onRead: {d}", .{socket_recv_len});
Output.flush();
}
if (socket_recv_len == 0) {
this.socket_recv_eof = true;
}
this.read_wait = .pending;
// if (socket_recv_len == 0) {
@@ -184,26 +188,28 @@ pub fn doSocketRead(this: *AsyncBIO, _: *Completion, result_: AsyncIO.RecvError!
// this.scheduleSocketRead();
}
pub fn doSocketWrite(this: *AsyncBIO, _: *Completion, result_: AsyncIO.SendError!usize) void {
pub fn doSocketWrite(this: *AsyncBIO, completion: *Completion, result_: AsyncIO.SendError!usize) void {
var ctx = @fieldParentPtr(Packet.Pool.Node, "data", @fieldParentPtr(Packet, "completion", completion));
defer ctx.release();
const socket_send_len = @truncate(
u32,
result_ catch |err| {
this.socket_send_error = err;
this.send_wait = .pending;
this.onSocketWriteComplete();
return;
},
);
this.socket_send_len += socket_send_len;
if (socket_send_len == 0 or this.writeBuf().len == 0) {
this.send_wait = .pending;
const remain = ctx.data.min - @minimum(ctx.data.min, socket_send_len);
if (socket_send_len == 0 or remain == 0) {
this.onSocketWriteComplete();
return;
}
this.send_wait = .pending;
this.scheduleSocketWrite();
this.scheduleSocketWrite(completion.operation.slice()[remain..]);
}
fn onSocketReadComplete(this: *AsyncBIO) void {
@@ -213,49 +219,39 @@ fn onSocketReadComplete(this: *AsyncBIO) void {
}
inline fn readBuf(this: *AsyncBIO) []u8 {
return this.recv_buffer.?.data[this.bio_read_offset..];
}
inline fn writeBuf(this: *AsyncBIO) []const u8 {
return this.send_buffer.?.data[this.socket_send_len..this.bio_write_offset];
return this.recv_buffer.?.data[@intCast(u32, this.socket_recv_len)..];
}
pub fn hasPendingReadData(this: *AsyncBIO) bool {
return this.socket_recv_len > 0;
return this.socket_recv_len - @intCast(c_int, this.bio_read_offset) > 0;
}
pub fn scheduleSocketRead(this: *AsyncBIO) void {
this.read_wait = .suspended;
pub fn scheduleSocketRead(this: *AsyncBIO, min: u32) void {
if (this.recv_buffer == null) {
this.recv_buffer = BufferPool.get(getAllocator());
}
AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &this.recv_completion, this.socket_fd, this.readBuf());
var packet = Packet.Pool.get(getAllocator());
packet.data.min = @truncate(u32, min);
AsyncIO.global.recv(*AsyncBIO, this, doSocketRead, &packet.data.completion, this.socket_fd, this.readBuf());
}
pub fn scheduleSocketWrite(this: *AsyncBIO) void {
this.send_wait = .suspended;
AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &this.send_completion, this.socket_fd, this.writeBuf(), SOCKET_FLAGS);
pub fn scheduleSocketWrite(this: *AsyncBIO, buf: []const u8) void {
var packet = Packet.Pool.get(getAllocator());
packet.data.min = @truncate(u32, buf.len);
AsyncIO.global.send(*AsyncBIO, this, doSocketWrite, &packet.data.completion, this.socket_fd, buf, SOCKET_FLAGS);
}
fn handleSocketReadComplete(
this: *AsyncBIO,
) void {
this.read_wait = .pending;
if (this.socket_recv_len <= 0) {
if (this.recv_buffer) |buf| {
buf.release();
this.recv_buffer = null;
}
}
this.pending_reads -|= 1;
}
pub fn onSocketWriteComplete(
this: *AsyncBIO,
) void {
assert(this.send_wait == .pending);
this.handleSocketWriteComplete();
this.nextFrame();
}
@@ -263,21 +259,23 @@ pub fn onSocketWriteComplete(
pub fn handleSocketWriteComplete(
this: *AsyncBIO,
) void {
this.send_wait = .completed;
this.pending_sends -|= 1;
if (extremely_verbose) {
Output.prettyErrorln("onWrite: {d}", .{this.socket_send_len});
Output.flush();
}
// if (this.send_buffer) |buf| {
// buf.release();
// this.send_buffer = null;
if (this.pending_sends == 0) {
if (this.send_buffer) |buf| {
buf.release();
this.send_buffer = null;
// // this might be incorrect!
// this.bio_write_offset = 0;
// this.socket_send_len = 0;
// }
// this might be incorrect!
this.bio_write_offset = 0;
this.socket_send_len = 0;
}
}
}
pub const Bio = struct {
@@ -320,6 +318,9 @@ pub const Bio = struct {
return -1;
}
const remaining_in_send_buffer = buffer_pool_len - this.bio_write_offset;
const total_remaining = remaining_in_send_buffer - @minimum(remaining_in_send_buffer, len);
if (this.send_buffer == null) {
this.send_buffer = BufferPool.get(getAllocator());
}
@@ -335,7 +336,8 @@ pub const Bio = struct {
@memcpy(data.ptr, ptr, to_copy);
this.bio_write_offset += to_copy;
this.scheduleSocketWrite();
this.scheduleSocketWrite(data[0..to_copy]);
this.pending_sends += 1;
return @intCast(c_int, to_copy);
}
@@ -382,17 +384,15 @@ pub const Bio = struct {
// overreading, but issuing one is more efficient. SSL sockets are not
// reused after shutdown for non-SSL traffic, so overreading is fine.
assert(bio_read_offset == 0);
this.scheduleSocketRead();
if (this.pending_reads == 0) {
this.scheduleSocketRead(len__);
this.pending_reads += 1;
}
boring.BIO_set_retry_read(this_bio);
return pending;
}
if (socket_recv_len == pending) {
boring.BIO_set_retry_read(this_bio);
return -1;
}
// If the last Read() failed, report the error.
if (socket_recv_len < 0) {
if (extremely_verbose) Output.prettyErrorln("Unexpected ssl error: {d}", .{socket_recv_len});
@@ -402,16 +402,25 @@ pub const Bio = struct {
const socket_recv_len_ = @intCast(u32, socket_recv_len);
// Report the result of the last Read() if non-empty.
if (!(bio_read_offset < socket_recv_len_)) {
return 0;
}
const len = @minimum(len__, socket_recv_len_ - bio_read_offset);
var data = @ptrCast([*]const u8, this.recv_buffer.?.data[bio_read_offset..].ptr);
@memcpy(ptr, data, len);
var bytes = this.recv_buffer.?.data[bio_read_offset..socket_recv_len_];
if (len__ > @truncate(u32, bytes.len)) {
if (this.pending_reads == 0) {
this.pending_reads += 1;
this.scheduleSocketRead(len);
}
boring.BIO_set_retry_read(this_bio);
return -1;
}
@memcpy(ptr, bytes.ptr, len);
bio_read_offset += len;
if (bio_read_offset == socket_recv_len_) {
if (bio_read_offset == socket_recv_len_ and this.pending_reads == 0) {
// The read buffer is empty.
// we can reset the pointer back to the beginning of the buffer
// if there is more data to read, we will ask for another
bio_read_offset = 0;
socket_recv_len = 0;