mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
fix(node:http) resume when reading and avoid unnecessary pause/resumes calls (#18804)
This commit is contained in:
@@ -369,9 +369,11 @@ struct us_listen_socket_t *us_socket_context_listen(int ssl, struct us_socket_co
|
||||
ls->s.context = context;
|
||||
ls->s.timeout = 255;
|
||||
ls->s.long_timeout = 255;
|
||||
ls->s.low_prio_state = 0;
|
||||
ls->s.flags.low_prio_state = 0;
|
||||
ls->s.flags.is_paused = 0;
|
||||
|
||||
ls->s.next = 0;
|
||||
ls->s.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
ls->s.flags.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
us_internal_socket_context_link_listen_socket(context, ls);
|
||||
|
||||
ls->socket_ext_size = socket_ext_size;
|
||||
@@ -401,10 +403,10 @@ struct us_listen_socket_t *us_socket_context_listen_unix(int ssl, struct us_sock
|
||||
ls->s.context = context;
|
||||
ls->s.timeout = 255;
|
||||
ls->s.long_timeout = 255;
|
||||
ls->s.low_prio_state = 0;
|
||||
ls->s.flags.low_prio_state = 0;
|
||||
ls->s.next = 0;
|
||||
ls->s.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
|
||||
ls->s.flags.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
ls->s.flags.is_paused = 0;
|
||||
us_internal_socket_context_link_listen_socket(context, ls);
|
||||
|
||||
ls->socket_ext_size = socket_ext_size;
|
||||
@@ -432,9 +434,11 @@ struct us_socket_t* us_socket_context_connect_resolved_dns(struct us_socket_cont
|
||||
socket->context = context;
|
||||
socket->timeout = 255;
|
||||
socket->long_timeout = 255;
|
||||
socket->low_prio_state = 0;
|
||||
socket->flags.low_prio_state = 0;
|
||||
socket->flags.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
socket->flags.is_paused = 0;
|
||||
socket->connect_state = NULL;
|
||||
socket->allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
|
||||
|
||||
us_internal_socket_context_link_socket(context, socket);
|
||||
|
||||
@@ -556,8 +560,9 @@ int start_connections(struct us_connecting_socket_t *c, int count) {
|
||||
s->context = c->context;
|
||||
s->timeout = c->timeout;
|
||||
s->long_timeout = c->long_timeout;
|
||||
s->low_prio_state = 0;
|
||||
s->allow_half_open = (c->options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
s->flags.low_prio_state = 0;
|
||||
s->flags.allow_half_open = (c->options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
s->flags.is_paused = 0;
|
||||
/* Link it into context so that timeout fires properly */
|
||||
us_internal_socket_context_link_socket(s->context, s);
|
||||
|
||||
@@ -731,9 +736,10 @@ struct us_socket_t *us_socket_context_connect_unix(int ssl, struct us_socket_con
|
||||
connect_socket->context = context;
|
||||
connect_socket->timeout = 255;
|
||||
connect_socket->long_timeout = 255;
|
||||
connect_socket->low_prio_state = 0;
|
||||
connect_socket->flags.low_prio_state = 0;
|
||||
connect_socket->connect_state = NULL;
|
||||
connect_socket->allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
connect_socket->flags.allow_half_open = (options & LIBUS_SOCKET_ALLOW_HALF_OPEN);
|
||||
connect_socket->flags.is_paused = 0;
|
||||
us_internal_socket_context_link_socket(context, connect_socket);
|
||||
|
||||
return connect_socket;
|
||||
@@ -764,7 +770,7 @@ struct us_socket_t *us_socket_context_adopt_socket(int ssl, struct us_socket_con
|
||||
return s;
|
||||
}
|
||||
|
||||
if (s->low_prio_state != 1) {
|
||||
if (s->flags.low_prio_state != 1) {
|
||||
/* We need to be sure that we still holding a reference*/
|
||||
us_socket_context_ref(ssl, context);
|
||||
/* This properly updates the iterator if in on_timeout */
|
||||
@@ -788,7 +794,7 @@ struct us_socket_t *us_socket_context_adopt_socket(int ssl, struct us_socket_con
|
||||
new_s->timeout = 255;
|
||||
new_s->long_timeout = 255;
|
||||
|
||||
if (new_s->low_prio_state == 1) {
|
||||
if (new_s->flags.low_prio_state == 1) {
|
||||
/* update pointers in low-priority queue */
|
||||
if (!new_s->prev) new_s->context->loop->data.low_prio_head = new_s;
|
||||
else new_s->prev->next = new_s;
|
||||
|
||||
@@ -70,6 +70,7 @@ void us_internal_loop_update_pending_ready_polls(struct us_loop_t *loop,
|
||||
#define IS_EINTR(rc) (rc == -1 && errno == EINTR)
|
||||
#define LIBUS_ERR errno
|
||||
#endif
|
||||
#include <stdbool.h>
|
||||
/* Poll type and what it polls for */
|
||||
enum {
|
||||
/* Three first bits */
|
||||
@@ -161,14 +162,22 @@ us_internal_ssl_socket_close(us_internal_ssl_socket_r s, int code,
|
||||
int us_internal_handle_dns_results(us_loop_r loop);
|
||||
|
||||
/* Sockets are polls */
|
||||
|
||||
struct us_socket_flags {
|
||||
/* If true, the readable side is paused */
|
||||
bool is_paused: 1;
|
||||
/* Allow to stay alive after FIN/EOF */
|
||||
bool allow_half_open: 1;
|
||||
/* 0 = not in low-prio queue, 1 = is in low-prio queue, 2 = was in low-prio queue in this iteration */
|
||||
unsigned char low_prio_state: 2;
|
||||
|
||||
} __attribute__((packed));
|
||||
|
||||
struct us_socket_t {
|
||||
alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p; // 4 bytes
|
||||
unsigned char timeout; // 1 byte
|
||||
unsigned char long_timeout; // 1 byte
|
||||
unsigned char
|
||||
low_prio_state; /* 0 = not in low-prio queue, 1 = is in low-prio queue, 2
|
||||
= was in low-prio queue in this iteration */
|
||||
unsigned char allow_half_open; /* Allow to stay alive after FIN/EOF */
|
||||
struct us_socket_flags flags;
|
||||
|
||||
struct us_socket_context_t *context;
|
||||
struct us_socket_t *prev, *next;
|
||||
|
||||
@@ -164,7 +164,7 @@ void us_internal_handle_low_priority_sockets(struct us_loop_t *loop) {
|
||||
us_internal_socket_context_link_socket(s->context, s);
|
||||
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) | LIBUS_SOCKET_READABLE);
|
||||
|
||||
s->low_prio_state = 2;
|
||||
s->flags.low_prio_state = 2;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,9 +310,9 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
|
||||
s->connect_state = NULL;
|
||||
s->timeout = 255;
|
||||
s->long_timeout = 255;
|
||||
s->low_prio_state = 0;
|
||||
s->allow_half_open = listen_socket->s.allow_half_open;
|
||||
|
||||
s->flags.low_prio_state = 0;
|
||||
s->flags.allow_half_open = listen_socket->s.flags.allow_half_open;
|
||||
s->flags.is_paused = 0;
|
||||
|
||||
/* We always use nodelay */
|
||||
bsd_socket_nodelay(client_fd, 1);
|
||||
@@ -358,8 +358,8 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
|
||||
* SSL handshakes are CPU intensive, so we limit the number of handshakes per loop iteration, and move the rest
|
||||
* to the low-priority queue */
|
||||
if (s->context->is_low_prio(s)) {
|
||||
if (s->low_prio_state == 2) {
|
||||
s->low_prio_state = 0; /* Socket has been delayed and now it's time to process incoming data for one iteration */
|
||||
if (s->flags.low_prio_state == 2) {
|
||||
s->flags.low_prio_state = 0; /* Socket has been delayed and now it's time to process incoming data for one iteration */
|
||||
} else if (s->context->loop->data.low_prio_budget > 0) {
|
||||
s->context->loop->data.low_prio_budget--; /* Still having budget for this iteration - do normal processing */
|
||||
} else {
|
||||
@@ -375,7 +375,7 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
|
||||
if (s->next) s->next->prev = s;
|
||||
s->context->loop->data.low_prio_head = s;
|
||||
|
||||
s->low_prio_state = 1;
|
||||
s->flags.low_prio_state = 1;
|
||||
|
||||
break;
|
||||
}
|
||||
@@ -439,7 +439,7 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
|
||||
s = us_socket_close(0, s, LIBUS_SOCKET_CLOSE_CODE_CLEAN_SHUTDOWN, NULL);
|
||||
return;
|
||||
}
|
||||
if(s->allow_half_open) {
|
||||
if(s->flags.allow_half_open) {
|
||||
/* We got a Error but is EOF and we allow half open so stop polling for readable and keep going*/
|
||||
us_poll_change(&s->p, us_socket_context(0, s)->loop, us_poll_events(&s->p) & LIBUS_SOCKET_WRITABLE);
|
||||
s = s->context->on_end(s);
|
||||
|
||||
@@ -187,7 +187,7 @@ struct us_socket_t *us_socket_close(int ssl, struct us_socket_t *s, int code, vo
|
||||
/* make sure the context is alive until the callback ends */
|
||||
us_socket_context_ref(ssl, s->context);
|
||||
|
||||
if (s->low_prio_state == 1) {
|
||||
if (s->flags.low_prio_state == 1) {
|
||||
/* Unlink this socket from the low-priority queue */
|
||||
if (!s->prev) s->context->loop->data.low_prio_head = s->next;
|
||||
else s->prev->next = s->next;
|
||||
@@ -196,7 +196,7 @@ struct us_socket_t *us_socket_close(int ssl, struct us_socket_t *s, int code, vo
|
||||
|
||||
s->prev = 0;
|
||||
s->next = 0;
|
||||
s->low_prio_state = 0;
|
||||
s->flags.low_prio_state = 0;
|
||||
us_socket_context_unref(ssl, s->context);
|
||||
} else {
|
||||
us_internal_socket_context_unlink_socket(ssl, s->context, s);
|
||||
@@ -247,7 +247,7 @@ struct us_socket_t *us_socket_close(int ssl, struct us_socket_t *s, int code, vo
|
||||
// - does not close
|
||||
struct us_socket_t *us_socket_detach(int ssl, struct us_socket_t *s) {
|
||||
if (!us_socket_is_closed(0, s)) {
|
||||
if (s->low_prio_state == 1) {
|
||||
if (s->flags.low_prio_state == 1) {
|
||||
/* Unlink this socket from the low-priority queue */
|
||||
if (!s->prev) s->context->loop->data.low_prio_head = s->next;
|
||||
else s->prev->next = s->next;
|
||||
@@ -256,7 +256,7 @@ struct us_socket_t *us_socket_detach(int ssl, struct us_socket_t *s) {
|
||||
|
||||
s->prev = 0;
|
||||
s->next = 0;
|
||||
s->low_prio_state = 0;
|
||||
s->flags.low_prio_state = 0;
|
||||
us_socket_context_unref(ssl, s->context);
|
||||
|
||||
} else {
|
||||
@@ -286,7 +286,7 @@ struct us_socket_t *us_socket_attach(int ssl, LIBUS_SOCKET_DESCRIPTOR client_fd,
|
||||
|
||||
s->context = ctx;
|
||||
s->timeout = 0;
|
||||
s->low_prio_state = 0;
|
||||
s->flags.low_prio_state = 0;
|
||||
|
||||
/* We always use nodelay */
|
||||
bsd_socket_nodelay(client_fd, 1);
|
||||
@@ -339,7 +339,9 @@ struct us_socket_t *us_socket_from_fd(struct us_socket_context_t *ctx, int socke
|
||||
s->context = ctx;
|
||||
s->timeout = 0;
|
||||
s->long_timeout = 0;
|
||||
s->low_prio_state = 0;
|
||||
s->flags.low_prio_state = 0;
|
||||
s->flags.is_paused = 0;
|
||||
s->flags.allow_half_open = 0;
|
||||
|
||||
/* We always use nodelay */
|
||||
bsd_socket_nodelay(fd, 1);
|
||||
@@ -565,13 +567,17 @@ struct us_loop_t *us_connecting_socket_get_loop(struct us_connecting_socket_t *c
|
||||
}
|
||||
|
||||
void us_socket_pause(int ssl, struct us_socket_t *s) {
|
||||
if(s->flags.is_paused) return;
|
||||
// closed cannot be paused because it is already closed
|
||||
if(us_socket_is_closed(ssl, s)) return;
|
||||
// we are readable and writable so we can just pause readable side
|
||||
us_poll_change(&s->p, s->context->loop, LIBUS_SOCKET_WRITABLE);
|
||||
s->flags.is_paused = 1;
|
||||
}
|
||||
|
||||
void us_socket_resume(int ssl, struct us_socket_t *s) {
|
||||
if(!s->flags.is_paused) return;
|
||||
s->flags.is_paused = 0;
|
||||
// closed cannot be resumed
|
||||
if(us_socket_is_closed(ssl, s)) return;
|
||||
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
raw_response: uws.AnyResponse,
|
||||
|
||||
ref_count: u32 = 1,
|
||||
flags: Flags = .{},
|
||||
|
||||
js_ref: JSC.Ref = .{},
|
||||
|
||||
flags: Flags = .{},
|
||||
body_read_state: BodyReadState = .none,
|
||||
body_read_ref: JSC.Ref = .{},
|
||||
promise: JSC.Strong = .empty,
|
||||
@@ -97,6 +98,15 @@ pub fn getServerSocketValue(this: *NodeHTTPResponse) JSC.JSValue {
|
||||
return Bun__getNodeHTTPServerSocketThisValue(this.raw_response == .SSL, this.raw_response.socket());
|
||||
}
|
||||
|
||||
pub fn pauseSocket(this: *NodeHTTPResponse) void {
|
||||
log("pauseSocket", .{});
|
||||
this.raw_response.pause();
|
||||
}
|
||||
|
||||
pub fn resumeSocket(this: *NodeHTTPResponse) void {
|
||||
log("resumeSocket", .{});
|
||||
this.raw_response.@"resume"();
|
||||
}
|
||||
pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_protocol: ZigString, sec_websocket_extensions: ZigString) bool {
|
||||
const upgrade_ctx = this.upgrade_context.context orelse return false;
|
||||
const ws_handler = this.server.webSocketHandler() orelse return false;
|
||||
@@ -104,7 +114,7 @@ pub fn upgrade(this: *NodeHTTPResponse, data_value: JSValue, sec_websocket_proto
|
||||
if (socketValue == .zero) {
|
||||
return false;
|
||||
}
|
||||
this.raw_response.@"resume"();
|
||||
resumeSocket(this);
|
||||
|
||||
defer {
|
||||
this.setOnAbortedHandler();
|
||||
@@ -226,8 +236,7 @@ pub fn shouldRequestBePending(this: *const NodeHTTPResponse) bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
pub fn dumpRequestBody(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame, thisValue: JSC.JSValue) bun.JSError!JSC.JSValue {
|
||||
_ = callframe; // autofix
|
||||
pub fn dumpRequestBody(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame, thisValue: JSC.JSValue) bun.JSError!JSC.JSValue {
|
||||
if (this.buffered_request_body_data_during_pause.len > 0) {
|
||||
this.buffered_request_body_data_during_pause.deinitWithAllocator(bun.default_allocator);
|
||||
}
|
||||
@@ -467,9 +476,7 @@ fn writeHeadInternal(response: uws.AnyResponse, globalObject: *JSC.JSGlobalObjec
|
||||
}
|
||||
}
|
||||
|
||||
pub fn writeContinue(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
const arguments = callframe.arguments_old(1).slice();
|
||||
_ = arguments; // autofix
|
||||
pub fn writeContinue(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
if (this.isDone()) {
|
||||
return .undefined;
|
||||
}
|
||||
@@ -532,11 +539,9 @@ pub fn onTimeout(this: *NodeHTTPResponse, _: uws.AnyResponse) void {
|
||||
this.handleAbortOrTimeout(.timeout, .zero);
|
||||
}
|
||||
|
||||
pub fn doPause(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame, thisValue: JSC.JSValue) bun.JSError!JSC.JSValue {
|
||||
_ = globalObject; // autofix
|
||||
_ = callframe; // autofix
|
||||
|
||||
if (this.flags.request_has_completed or this.flags.socket_closed) {
|
||||
pub fn doPause(this: *NodeHTTPResponse, _: *JSC.JSGlobalObject, _: *JSC.CallFrame, thisValue: JSC.JSValue) bun.JSError!JSC.JSValue {
|
||||
log("doPause", .{});
|
||||
if (this.flags.request_has_completed or this.flags.socket_closed or this.flags.ended) {
|
||||
return .false;
|
||||
}
|
||||
if (this.body_read_ref.has and NodeHTTPResponse.onDataGetCached(thisValue) == null) {
|
||||
@@ -546,7 +551,7 @@ pub fn doPause(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callf
|
||||
|
||||
if (!Environment.isWindows) {
|
||||
// TODO: figure out why windows is not emitting EOF with UV_DISCONNECT
|
||||
this.raw_response.pause();
|
||||
pauseSocket(this);
|
||||
}
|
||||
return .true;
|
||||
}
|
||||
@@ -564,9 +569,9 @@ fn drainBufferedRequestBodyFromPause(this: *NodeHTTPResponse, globalObject: *JSC
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn doResume(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
_ = callframe; // autofix
|
||||
if (this.flags.request_has_completed or this.flags.socket_closed) {
|
||||
pub fn doResume(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
log("doResume", .{});
|
||||
if (this.flags.request_has_completed or this.flags.socket_closed or this.flags.ended) {
|
||||
return .false;
|
||||
}
|
||||
|
||||
@@ -580,7 +585,7 @@ pub fn doResume(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, call
|
||||
result = buffered_data;
|
||||
}
|
||||
|
||||
this.raw_response.@"resume"();
|
||||
resumeSocket(this);
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -648,9 +653,7 @@ pub export fn Bun__NodeHTTPRequest__onReject(globalObject: *JSC.JSGlobalObject,
|
||||
return .undefined;
|
||||
}
|
||||
|
||||
pub fn abort(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
_ = globalObject; // autofix
|
||||
_ = callframe; // autofix
|
||||
pub fn abort(this: *NodeHTTPResponse, _: *JSC.JSGlobalObject, _: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
if (this.isDone()) {
|
||||
return .undefined;
|
||||
}
|
||||
@@ -660,7 +663,7 @@ pub fn abort(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callfra
|
||||
if (state.isHttpEndCalled()) {
|
||||
return .undefined;
|
||||
}
|
||||
this.raw_response.@"resume"();
|
||||
resumeSocket(this);
|
||||
this.raw_response.clearOnData();
|
||||
this.raw_response.clearOnWritable();
|
||||
this.raw_response.clearTimeout();
|
||||
@@ -983,7 +986,7 @@ pub fn write(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callfra
|
||||
pub fn end(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
const arguments = callframe.arguments_old(3).slice();
|
||||
//We dont wanna a paused socket when we call end, so is important to resume the socket
|
||||
this.raw_response.@"resume"();
|
||||
resumeSocket(this);
|
||||
return writeOrEnd(this, globalObject, arguments, callframe.this(), true);
|
||||
}
|
||||
|
||||
|
||||
@@ -431,7 +431,32 @@ const NodeHTTPServerSocket = class Socket extends Duplex {
|
||||
return this.connecting;
|
||||
}
|
||||
|
||||
_read(size) {}
|
||||
#resumeSocket() {
|
||||
const handle = this[kHandle];
|
||||
const response = handle?.response;
|
||||
if (response) {
|
||||
const resumed = response.resume();
|
||||
if (resumed && resumed !== true) {
|
||||
const bodyReadState = handle.hasBody;
|
||||
|
||||
const message = this._httpMessage;
|
||||
const req = message?.req;
|
||||
|
||||
if ((bodyReadState & NodeHTTPBodyReadState.done) !== 0) {
|
||||
emitServerSocketEOFNT(this, req);
|
||||
}
|
||||
if (req) {
|
||||
req.push(resumed);
|
||||
}
|
||||
this.push(resumed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_read(size) {
|
||||
// https://github.com/nodejs/node/blob/13e3aef053776be9be262f210dc438ecec4a3c8d/lib/net.js#L725-L737
|
||||
this.#resumeSocket();
|
||||
}
|
||||
|
||||
get readyState() {
|
||||
if (this.connecting) return "opening";
|
||||
@@ -492,22 +517,16 @@ const NodeHTTPServerSocket = class Socket extends Duplex {
|
||||
_write(chunk, encoding, callback) {}
|
||||
|
||||
pause() {
|
||||
const message = this._httpMessage;
|
||||
const handle = this[kHandle];
|
||||
const response = handle?.response;
|
||||
if (response && message) {
|
||||
if (response) {
|
||||
response.pause();
|
||||
}
|
||||
return super.pause();
|
||||
}
|
||||
|
||||
resume() {
|
||||
const message = this._httpMessage;
|
||||
const handle = this[kHandle];
|
||||
const response = handle?.response;
|
||||
if (response && message) {
|
||||
response.resume();
|
||||
}
|
||||
this.#resumeSocket();
|
||||
return super.resume();
|
||||
}
|
||||
|
||||
@@ -953,7 +972,9 @@ const ServerPrototype = {
|
||||
isNextIncomingMessageHTTPS = prevIsNextIncomingMessageHTTPS;
|
||||
handle.onabort = onServerRequestEvent.bind(socket);
|
||||
// start buffering data if any, the user will need to resume() or .on("data") to read it
|
||||
handle.pause();
|
||||
if (hasBody) {
|
||||
handle.pause();
|
||||
}
|
||||
drainMicrotasks();
|
||||
|
||||
let capturedError;
|
||||
@@ -985,7 +1006,6 @@ const ServerPrototype = {
|
||||
|
||||
socket[kRequest] = http_req;
|
||||
const is_upgrade = http_req.headers.upgrade;
|
||||
|
||||
if (!is_upgrade) {
|
||||
if (canUseInternalAssignSocket) {
|
||||
// ~10% performance improvement in JavaScriptCore due to avoiding .once("close", ...) and removing a listener
|
||||
@@ -994,7 +1014,6 @@ const ServerPrototype = {
|
||||
http_res.assignSocket(socket);
|
||||
}
|
||||
}
|
||||
|
||||
function onClose() {
|
||||
didFinish = true;
|
||||
resolveFunction && resolveFunction();
|
||||
@@ -1206,6 +1225,21 @@ function emitEOFIncomingMessage(self) {
|
||||
process.nextTick(emitEOFIncomingMessageOuter, self);
|
||||
}
|
||||
|
||||
function emitServerSocketEOF(self, req) {
|
||||
self.push(null);
|
||||
if (req) {
|
||||
req.push(null);
|
||||
req.complete = true;
|
||||
}
|
||||
}
|
||||
|
||||
function emitServerSocketEOFNT(self, req) {
|
||||
if (req) {
|
||||
req[eofInProgress] = true;
|
||||
}
|
||||
process.nextTick(emitServerSocketEOF, self);
|
||||
}
|
||||
|
||||
function hasServerResponseFinished(self, chunk, callback) {
|
||||
const finished = self.finished;
|
||||
|
||||
@@ -1380,6 +1414,12 @@ const IncomingMessagePrototype = {
|
||||
this._consuming = true;
|
||||
}
|
||||
|
||||
const socket = this.socket;
|
||||
if (socket && socket.readable) {
|
||||
//https://github.com/nodejs/node/blob/13e3aef053776be9be262f210dc438ecec4a3c8d/lib/_http_incoming.js#L211-L213
|
||||
socket.resume();
|
||||
}
|
||||
|
||||
if (this[eofInProgress]) {
|
||||
// There is a nextTick pending that will emit EOF
|
||||
return;
|
||||
@@ -2215,6 +2255,7 @@ const ServerResponsePrototype = {
|
||||
socket.removeListener("close", onServerResponseClose);
|
||||
socket._httpMessage = null;
|
||||
}
|
||||
|
||||
this.socket = null;
|
||||
},
|
||||
|
||||
|
||||
89
test/js/third_party/astro/astro-post.test.js
vendored
89
test/js/third_party/astro/astro-post.test.js
vendored
@@ -1,20 +1,50 @@
|
||||
import { preview, build } from "astro";
|
||||
import { expect, test } from "bun:test";
|
||||
import { beforeAll, expect, test, afterAll, describe } from "bun:test";
|
||||
import { join } from "path";
|
||||
test("is able todo a POST request to an astro action", async () => {
|
||||
await build({
|
||||
root: join(import.meta.dirname, "fixtures"),
|
||||
devOutput: false,
|
||||
logLevel: "error",
|
||||
import { nodeExe, bunEnv } from "harness";
|
||||
|
||||
const fixtureDir = join(import.meta.dirname, "fixtures");
|
||||
function postNodeFormData(port) {
|
||||
const result = Bun.spawnSync({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-form-data.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
const previewServer = await preview({
|
||||
root: join(import.meta.dirname, "fixtures"),
|
||||
port: 0,
|
||||
logLevel: "error",
|
||||
expect(result.exitCode).toBe(0);
|
||||
}
|
||||
function postNodeAction(port) {
|
||||
const result = Bun.spawnSync({
|
||||
cmd: [nodeExe(), join(fixtureDir, "node-action.fetch.fixture.js"), port?.toString()],
|
||||
env: bunEnv,
|
||||
stdio: ["inherit", "inherit", "inherit"],
|
||||
});
|
||||
expect(result.exitCode).toBe(0);
|
||||
}
|
||||
|
||||
describe("astro", async () => {
|
||||
let previewServer;
|
||||
let origin;
|
||||
|
||||
beforeAll(async () => {
|
||||
await build({
|
||||
root: fixtureDir,
|
||||
devOutput: false,
|
||||
logLevel: "error",
|
||||
});
|
||||
previewServer = await preview({
|
||||
root: fixtureDir,
|
||||
port: 0,
|
||||
logLevel: "error",
|
||||
});
|
||||
origin = `http://localhost:${previewServer.port}`;
|
||||
});
|
||||
afterAll(async () => {
|
||||
await previewServer.stop();
|
||||
});
|
||||
|
||||
try {
|
||||
const r = await fetch(`http://localhost:${previewServer.port}/_actions/getGreeting/`, {
|
||||
test("is able todo a POST request to an astro action using bun", async () => {
|
||||
const r = await fetch(`${origin}/_actions/getGreeting/`, {
|
||||
body: '{"name":"World"}',
|
||||
headers: {
|
||||
accept: "application/json",
|
||||
"accept-language": "en-US,en;q=0.9,es;q=0.8",
|
||||
@@ -25,17 +55,40 @@ test("is able todo a POST request to an astro action", async () => {
|
||||
"sec-fetch-dest": "empty",
|
||||
"sec-fetch-mode": "cors",
|
||||
"sec-fetch-site": "same-origin",
|
||||
Referer: "http://localhost:4321/",
|
||||
Referer: origin,
|
||||
"Referrer-Policy": "strict-origin-when-cross-origin",
|
||||
},
|
||||
body: '{"name":"World"}',
|
||||
method: "POST",
|
||||
});
|
||||
expect(r.status).toBe(200);
|
||||
const text = await r.text();
|
||||
expect(text).toBe('["Hello, World!"]');
|
||||
} finally {
|
||||
// Stop the server if needed
|
||||
await previewServer.stop();
|
||||
}
|
||||
});
|
||||
|
||||
test("is able todo a POST request to an astro action using node", async () => {
|
||||
postNodeAction(previewServer.port);
|
||||
});
|
||||
|
||||
test("is able to post form data to an astro using bun", async () => {
|
||||
const formData = new FormData();
|
||||
formData.append("name", "John Doe");
|
||||
formData.append("email", "john.doe@example.com");
|
||||
const r = await fetch(`${origin}/form-data`, {
|
||||
"body": formData,
|
||||
"headers": {
|
||||
"origin": origin,
|
||||
},
|
||||
"method": "POST",
|
||||
});
|
||||
|
||||
expect(r.status).toBe(200);
|
||||
const text = await r.json();
|
||||
expect(text).toEqual({
|
||||
name: "John Doe",
|
||||
email: "john.doe@example.com",
|
||||
});
|
||||
});
|
||||
test("is able to post form data to an astro using node", async () => {
|
||||
postNodeFormData(previewServer.port);
|
||||
});
|
||||
});
|
||||
|
||||
31
test/js/third_party/astro/fixtures/node-action.fetch.fixture.js
vendored
Normal file
31
test/js/third_party/astro/fixtures/node-action.fetch.fixture.js
vendored
Normal file
@@ -0,0 +1,31 @@
|
||||
const previewServerPort = parseInt(process.argv[2], 10);
|
||||
function expect(value) {
|
||||
return {
|
||||
toBe: expected => {
|
||||
if (value !== expected) {
|
||||
throw new Error(`Expected ${value} to be ${expected}`);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
const origin = `http://localhost:${previewServerPort}`;
|
||||
const r = await fetch(`${origin}/_actions/getGreeting/`, {
|
||||
body: '{"name":"World"}',
|
||||
headers: {
|
||||
accept: "application/json",
|
||||
"accept-language": "en-US,en;q=0.9,es;q=0.8",
|
||||
"content-type": "application/json",
|
||||
"sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24", "Google Chrome";v="134"',
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-ch-ua-platform": '"macOS"',
|
||||
"sec-fetch-dest": "empty",
|
||||
"sec-fetch-mode": "cors",
|
||||
"sec-fetch-site": "same-origin",
|
||||
Referer: origin,
|
||||
"Referrer-Policy": "strict-origin-when-cross-origin",
|
||||
},
|
||||
method: "POST",
|
||||
});
|
||||
expect(r.status).toBe(200);
|
||||
const text = await r.text();
|
||||
expect(text).toBe('["Hello, World!"]');
|
||||
30
test/js/third_party/astro/fixtures/node-form-data.fetch.fixture.js
vendored
Normal file
30
test/js/third_party/astro/fixtures/node-form-data.fetch.fixture.js
vendored
Normal file
@@ -0,0 +1,30 @@
|
||||
const previewServerPort = parseInt(process.argv[2], 10);
|
||||
function expect(value) {
|
||||
return {
|
||||
toBe: expected => {
|
||||
if (value !== expected) {
|
||||
throw new Error(`Expected ${value} to be ${expected}`);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
const formData = new FormData();
|
||||
formData.append("name", "John Doe");
|
||||
formData.append("email", "john.doe@example.com");
|
||||
const origin = `http://localhost:${previewServerPort}`;
|
||||
const r = await fetch(`${origin}/form-data`, {
|
||||
"body": formData,
|
||||
"headers": {
|
||||
"origin": origin,
|
||||
},
|
||||
"method": "POST",
|
||||
});
|
||||
|
||||
expect(r.status).toBe(200);
|
||||
const text = await r.text();
|
||||
expect(text).toBe(
|
||||
JSON.stringify({
|
||||
name: "John Doe",
|
||||
email: "john.doe@example.com",
|
||||
}),
|
||||
);
|
||||
22
test/js/third_party/astro/fixtures/src/layouts/Layout.astro
vendored
Normal file
22
test/js/third_party/astro/fixtures/src/layouts/Layout.astro
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<meta name="viewport" content="width=device-width" />
|
||||
<link rel="icon" type="image/svg+xml" href="/favicon.svg" />
|
||||
<meta name="generator" content={Astro.generator} />
|
||||
<title>Astro Basics</title>
|
||||
</head>
|
||||
<body>
|
||||
<slot />
|
||||
</body>
|
||||
</html>
|
||||
|
||||
<style>
|
||||
html,
|
||||
body {
|
||||
margin: 0;
|
||||
width: 100%;
|
||||
height: 100%;
|
||||
}
|
||||
</style>
|
||||
16
test/js/third_party/astro/fixtures/src/pages/form-data.ts
vendored
Normal file
16
test/js/third_party/astro/fixtures/src/pages/form-data.ts
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
import type { APIRoute } from "astro";
|
||||
|
||||
export const POST: APIRoute = async function ({ request }) {
|
||||
try {
|
||||
const formData = await request.formData();
|
||||
|
||||
return new Response(JSON.stringify(Object.fromEntries(formData)), {
|
||||
status: 200,
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
return new Response("Error", { status: 500 });
|
||||
}
|
||||
};
|
||||
13
test/js/third_party/astro/fixtures/src/pages/index.astro
vendored
Normal file
13
test/js/third_party/astro/fixtures/src/pages/index.astro
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
---
|
||||
import Layout from '../layouts/Layout.astro';
|
||||
---
|
||||
|
||||
<Layout>
|
||||
<form action="/form-data" method="POST"
|
||||
style="display: flex; flex-direction: column; gap: 8px; padding: 16px; max-width: 300px;"
|
||||
>
|
||||
<input type="text" name="name" value="John Doe" />
|
||||
<input type="email" name="email" value="john.doe@example.com" />
|
||||
<input type="submit" value="Submit" />
|
||||
</form>
|
||||
</Layout>
|
||||
Reference in New Issue
Block a user