Compare commits

...

3 Commits

Author SHA1 Message Date
Claude Bot
1349b98d43 Fix write3 to only re-subscribe on partial writes or EAGAIN/EWOULDBLOCK
Don't re-subscribe to poll for fatal errors like ECONNRESET, EPIPE, etc.
Only re-subscribe when:
- Partial write (0 <= written < length)
- Flow control error (EAGAIN/EWOULDBLOCK)

This matches the correct behavior where fatal errors should not trigger
poll re-subscription.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 07:20:17 +00:00
autofix-ci[bot]
3cc9084ff2 [autofix.ci] apply automated fixes 2025-10-05 06:35:57 +00:00
Claude Bot
7d874f41ec Implement us_socket_write3 with errno-based error reporting
Add a new socket write function that returns -errno on error instead of
re-subscribing to poll and returning 0. This enables high-quality error
reporting through the stack.

Changes:
- Add us_socket_write3() in socket.c that returns -errno on errors
- Add us_internal_ssl_socket_write3() in openssl.c for SSL sockets
- Add Zig bindings in us_socket_t.zig and socket.zig
- Update HTTP client in http.zig to use write3() with Maybe(usize)
- Fix header include order for _GNU_SOURCE and sys/types.h
- Add proper Windows/POSIX errno mapping in write3 implementations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-05 00:00:19 +00:00
8 changed files with 223 additions and 24 deletions

View File

@@ -1795,6 +1795,77 @@ int us_internal_ssl_socket_write(struct us_internal_ssl_socket_t *s,
return 0;
}
/* Same as us_internal_ssl_socket_write but returns -errno on error instead of 0 */
ssize_t us_internal_ssl_socket_write3(struct us_internal_ssl_socket_t *s,
const char *data, int length) {
if (us_socket_is_closed(0, &s->s) || us_internal_ssl_socket_is_shut_down(s) || length == 0) {
return 0;
}
struct us_internal_ssl_socket_context_t *context =
(struct us_internal_ssl_socket_context_t *)us_socket_context(0, &s->s);
struct us_loop_t *loop = us_socket_context_loop(0, &context->sc);
struct loop_ssl_data *loop_ssl_data =
(struct loop_ssl_data *)loop->data.ssl_data;
loop_ssl_data->ssl_read_input_length = 0;
loop_ssl_data->ssl_socket = &s->s;
int written = SSL_write(s->ssl, data, length);
if (written > 0) {
return written;
}
int err = SSL_get_error(s->ssl, written);
if (err == SSL_ERROR_WANT_READ) {
// here we need to trigger writable event next ssl_read!
s->ssl_write_wants_read = 1;
return -EAGAIN;
} else if (err == SSL_ERROR_WANT_WRITE) {
return -EAGAIN;
} else if (err == SSL_ERROR_SSL || err == SSL_ERROR_SYSCALL) {
// these two errors may add to the error queue, which is per thread and
// must be cleared
ERR_clear_error();
s->fatal_error = 1;
// Get the underlying error
unsigned long ssl_err = ERR_peek_error();
if (ssl_err != 0) {
return -EIO;
}
// Check for underlying socket error
#ifdef _WIN32
int wsa_error = WSAGetLastError();
switch (wsa_error) {
case WSAEWOULDBLOCK: return -EWOULDBLOCK;
case WSAECONNRESET: return -ECONNRESET;
case WSAECONNABORTED: return -ECONNABORTED;
case WSAENETDOWN: return -ENETDOWN;
case WSAENETRESET: return -ENETRESET;
case WSAENOTCONN: return -ENOTCONN;
case WSAESHUTDOWN: return -ESHUTDOWN;
case WSAETIMEDOUT: return -ETIMEDOUT;
default: return -EIO;
}
#else
if (errno != 0) {
return -errno;
}
return -EIO;
#endif
} else if (err == SSL_ERROR_ZERO_RETURN) {
// Connection closed
return -ECONNRESET;
}
return -EAGAIN;
}
void *us_internal_ssl_socket_ext(struct us_internal_ssl_socket_t *s) {
return s + 1;
}

View File

@@ -418,6 +418,8 @@ struct us_socket_t *us_internal_ssl_socket_context_connect_unix(
int us_internal_ssl_socket_write(us_internal_ssl_socket_r s,
const char *data, int length);
ssize_t us_internal_ssl_socket_write3(us_internal_ssl_socket_r s,
const char *data, int length);
int us_internal_ssl_socket_raw_write(us_internal_ssl_socket_r s,
const char *data, int length);

View File

@@ -39,6 +39,8 @@
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/* For ssize_t and mmsghdr */
#include <sys/types.h>
/* For socklen_t */
#include <sys/socket.h>
#include <netdb.h>

View File

@@ -16,6 +16,14 @@
*/
// clang-format off
#pragma once
/* We need GNU features for mmsghdr and other definitions */
#ifndef _WIN32
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#endif
#ifndef us_calloc
#define us_calloc calloc
#endif
@@ -84,6 +92,9 @@
#endif
#include "stddef.h"
#ifndef _WIN32
#include <sys/types.h>
#endif
#ifdef __cplusplus
extern "C" {
@@ -423,6 +434,10 @@ void *us_socket_get_native_handle(int ssl, us_socket_r s) nonnull_fn_decl;
* Will call the on_writable callback of active socket context on failure to write everything off in one go. */
int us_socket_write(int ssl, us_socket_r s, const char * nonnull_arg data, int length) nonnull_fn_decl;
/* Same as us_socket_write but returns -errno on error instead of 0 and does not re-subscribe to poll.
* Returns actual bytes written on success, or -errno on error. */
ssize_t us_socket_write3(int ssl, us_socket_r s, const char * nonnull_arg data, int length) nonnull_fn_decl;
/* Special path for non-SSL sockets. Used to send header and payload in one go. Works like us_socket_write. */
int us_socket_write2(int ssl, us_socket_r s, const char *header, int header_length, const char *payload, int payload_length) nonnull_fn_decl;

View File

@@ -376,6 +376,64 @@ int us_socket_write(int ssl, struct us_socket_t *s, const char *data, int length
return written < 0 ? 0 : written;
}
/* Same as us_socket_write but returns -errno on error instead of 0 */
ssize_t us_socket_write3(int ssl, struct us_socket_t *s, const char *data, int length) {
#ifndef LIBUS_NO_SSL
if (ssl) {
return us_internal_ssl_socket_write3((struct us_internal_ssl_socket_t *) s, data, length);
}
#endif
if (us_socket_is_closed(ssl, s) || us_socket_is_shut_down(ssl, s)) {
return 0;
}
ssize_t written = bsd_send(us_poll_fd(&s->p), data, length);
if (written < 0) {
// Handle errors by converting to -errno
int error_code;
#ifdef _WIN32
// On Windows with winsock, we need to convert WSA error to errno-like value
int wsa_error = WSAGetLastError();
switch (wsa_error) {
case WSAEWOULDBLOCK: error_code = EWOULDBLOCK; break;
case WSAECONNRESET: error_code = ECONNRESET; break;
case WSAECONNABORTED: error_code = ECONNABORTED; break;
case WSAENETDOWN: error_code = ENETDOWN; break;
case WSAENETRESET: error_code = ENETRESET; break;
case WSAENOTCONN: error_code = ENOTCONN; break;
case WSAESHUTDOWN: error_code = ESHUTDOWN; break;
case WSAETIMEDOUT: error_code = ETIMEDOUT; break;
case WSAEACCES: error_code = EACCES; break;
case WSAEFAULT: error_code = EFAULT; break;
case WSAEINVAL: error_code = EINVAL; break;
case WSAEMSGSIZE: error_code = EMSGSIZE; break;
case WSAENETUNREACH: error_code = ENETUNREACH; break;
case WSAENOBUFS: error_code = ENOBUFS; break;
default: error_code = EIO; break;
}
#else
error_code = errno;
#endif
// Only re-subscribe for flow control errors (EAGAIN/EWOULDBLOCK)
if (error_code == EAGAIN || error_code == EWOULDBLOCK) {
s->context->loop->data.last_write_failed = 1;
us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
}
return -error_code;
}
// Re-subscribe on partial writes (incomplete but successful)
if (written < length) {
s->context->loop->data.last_write_failed = 1;
us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
}
return written;
}
#if !defined(_WIN32)
/* Send a message with data and an attached file descriptor, for use in IPC. Returns the number of bytes written. If that
number is less than the length, the file descriptor was not sent. */

View File

@@ -326,6 +326,16 @@ pub fn NewSocketHandler(comptime is_ssl: bool) type {
};
}
/// Same as write but returns -errno on error instead of 0 and does not re-subscribe to poll.
pub fn write3(this: ThisSocket, data: []const u8) isize {
return switch (this.socket) {
.upgradedDuplex => |socket| @intCast(socket.encodeAndWrite(data)),
.pipe => |pipe| if (comptime Environment.isWindows) @intCast(pipe.encodeAndWrite(data)) else 0,
.connected => |socket| socket.write3(is_ssl, data),
.connecting, .detached => 0,
};
}
pub fn writeFd(this: ThisSocket, data: []const u8, file_descriptor: bun.FileDescriptor) i32 {
return switch (this.socket) {
.upgradedDuplex, .pipe => this.write(data),

View File

@@ -132,6 +132,14 @@ pub const us_socket_t = opaque {
return rc;
}
/// Same as write but returns -errno on error instead of 0 and does not re-subscribe to poll.
/// Returns actual bytes written on success, or -errno on error.
pub fn write3(this: *us_socket_t, ssl: bool, data: []const u8) isize {
const rc = c.us_socket_write3(@intFromBool(ssl), this, data.ptr, @intCast(data.len));
debug("us_socket_write3({p}, {d}) = {d}", .{ this, data.len, rc });
return rc;
}
pub fn writeFd(this: *us_socket_t, data: []const u8, file_descriptor: bun.FD) i32 {
if (bun.Environment.isWindows) @compileError("TODO: implement writeFd on Windows");
const rc = c.us_socket_ipc_write_fd(this, data.ptr, @intCast(data.len), file_descriptor.native());
@@ -199,6 +207,7 @@ pub const c = struct {
pub extern fn us_socket_context(ssl: i32, s: ?*us_socket_t) ?*SocketContext;
pub extern fn us_socket_write(ssl: i32, s: ?*us_socket_t, data: [*c]const u8, length: i32) i32;
pub extern fn us_socket_write3(ssl: i32, s: ?*us_socket_t, data: [*c]const u8, length: i32) isize;
pub extern fn us_socket_ipc_write_fd(s: ?*us_socket_t, data: [*c]const u8, length: i32, fd: i32) i32;
pub extern fn us_socket_write2(ssl: i32, *us_socket_t, header: ?[*]const u8, len: usize, payload: ?[*]const u8, usize) i32;
pub extern fn us_socket_raw_write(ssl: i32, s: ?*us_socket_t, data: [*c]const u8, length: i32) i32;

View File

@@ -966,7 +966,18 @@ noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call:
assert(!socket.isShutdown());
assert(!socket.isClosed());
}
const amount = try writeToSocket(is_ssl, socket, to_send);
const amount = switch (writeToSocket(is_ssl, socket, to_send)) {
.result => |amt| amt,
.err => |err| {
_ = err; // errno information is available
this.closeAndFail(error.WriteFailed, is_ssl, socket);
return .{
.has_sent_headers = false,
.has_sent_body = false,
.try_sending_more_data = false,
};
},
};
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
@@ -1006,29 +1017,38 @@ pub fn flushStream(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCont
this.writeToStream(is_ssl, socket, "");
}
/// Write data to the socket (Just a error wrapper to easly handle amount written and error handling)
fn writeToSocket(comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, data: []const u8) !usize {
const amount = socket.write(data);
/// Write data to the socket returning Maybe(usize) with high-quality error information
fn writeToSocket(comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, data: []const u8) Maybe(usize) {
const amount = socket.write3(data);
if (amount < 0) {
return error.WriteFailed;
// write3 returns -errno on error
const errno_value: bun.sys.E = @enumFromInt(-amount);
const err = bun.sys.Error.fromCode(errno_value, .write);
return .{ .err = err };
}
return @intCast(amount);
return .{ .result = @intCast(amount) };
}
/// Write data to the socket and buffer the unwritten data if there is backpressure
fn writeToSocketWithBufferFallback(comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, buffer: *bun.io.StreamBuffer, data: []const u8) !usize {
const amount = try writeToSocket(is_ssl, socket, data);
fn writeToSocketWithBufferFallback(comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, buffer: *bun.io.StreamBuffer, data: []const u8) Maybe(usize) {
const amount = switch (writeToSocket(is_ssl, socket, data)) {
.result => |amt| amt,
.err => |err| return .{ .err = err },
};
if (amount < data.len) {
bun.handleOom(buffer.write(data[@intCast(amount)..]));
}
return amount;
return .{ .result = amount };
}
/// Write buffered data to the socket returning true if there is backpressure
fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, buffer: *bun.io.StreamBuffer, data: []const u8) !bool {
fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, buffer: *bun.io.StreamBuffer, data: []const u8) Maybe(bool) {
const to_send = buffer.slice();
if (to_send.len > 0) {
const amount = try writeToSocket(is_ssl, socket, to_send);
const amount = switch (writeToSocket(is_ssl, socket, to_send)) {
.result => |amt| amt,
.err => |err| return .{ .err = err },
};
this.state.request_sent_len += amount;
buffer.cursor += amount;
if (amount < to_send.len) {
@@ -1037,7 +1057,7 @@ fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: Ne
bun.handleOom(buffer.write(data));
}
// failed to send everything so we have backpressure
return true;
return .{ .result = true };
}
if (buffer.isEmpty()) {
buffer.reset();
@@ -1047,13 +1067,16 @@ fn writeToStreamUsingBuffer(this: *HTTPClient, comptime is_ssl: bool, socket: Ne
// ok we flushed all pending data so we can reset the backpressure
if (data.len > 0) {
// no backpressure everything was sended so we can just try to send
const sent = try writeToSocketWithBufferFallback(is_ssl, socket, buffer, data);
const sent = switch (writeToSocketWithBufferFallback(is_ssl, socket, buffer, data)) {
.result => |amt| amt,
.err => |err| return .{ .err = err },
};
this.state.request_sent_len += sent;
// if we didn't send all the data we have backpressure
return sent < data.len;
return .{ .result = sent < data.len };
}
// no data to send so we are done
return false;
return .{ .result = false };
}
pub fn writeToStream(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, data: []const u8) void {
@@ -1082,12 +1105,16 @@ pub fn writeToStream(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCo
}
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
const has_backpressure = writeToStreamUsingBuffer(this, is_ssl, socket, buffer, data) catch |err| {
// we got some critical error so we need to fail and close the connection
stream_buffer.release();
stream.detach();
this.closeAndFail(err, is_ssl, socket);
return;
const has_backpressure = switch (writeToStreamUsingBuffer(this, is_ssl, socket, buffer, data)) {
.result => |bp| bp,
.err => |err| {
// we got some critical error so we need to fail and close the connection
_ = err; // errno information is available but closeAndFail expects anyerror
stream_buffer.release();
stream.detach();
this.closeAndFail(error.WriteFailed, is_ssl, socket);
return;
},
};
if (has_backpressure) {
@@ -1189,9 +1216,13 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.bytes => {
const to_send = this.state.request_body;
if (to_send.len > 0) {
const sent = writeToSocket(is_ssl, socket, to_send) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
const sent = switch (writeToSocket(is_ssl, socket, to_send)) {
.result => |s| s,
.err => |err| {
_ = err; // errno information is available
this.closeAndFail(error.WriteFailed, is_ssl, socket);
return;
},
};
this.state.request_sent_len += sent;
@@ -2552,6 +2583,7 @@ const strings = bun.strings;
const uws = bun.uws;
const Arena = bun.allocators.MimallocArena;
const BoringSSL = bun.BoringSSL.c;
const Maybe = bun.sys.Maybe;
const api = bun.schema.api;
const SSLConfig = bun.api.server.ServerConfig.SSLConfig;