mirror of
https://github.com/oven-sh/bun
synced 2026-02-28 20:40:59 +01:00
Compare commits
19 Commits
claude/fix
...
ciro/fetch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ba11c0de3 | ||
|
|
1ca8d0fb53 | ||
|
|
28450dbd91 | ||
|
|
56ca927ea3 | ||
|
|
306bae4c39 | ||
|
|
88d522b6a0 | ||
|
|
1cec0c5662 | ||
|
|
dd0a360839 | ||
|
|
bdbef97b24 | ||
|
|
22160d8926 | ||
|
|
5544e919fc | ||
|
|
4408cc797f | ||
|
|
aabbe3c741 | ||
|
|
68d2d1c7e0 | ||
|
|
5562cb40fc | ||
|
|
b57375bd2e | ||
|
|
bdeaf98a4d | ||
|
|
d68398b53f | ||
|
|
2c3136cb9d |
@@ -414,14 +414,18 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
|
||||
#endif
|
||||
|
||||
#ifdef LIBUS_USE_EPOLL
|
||||
void us_timer_close(struct us_timer_t *timer) {
|
||||
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
|
||||
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;
|
||||
|
||||
us_poll_stop(&cb->p, cb->loop);
|
||||
close(us_poll_fd(&cb->p));
|
||||
|
||||
/* (regular) sockets are the only polls which are not freed immediately */
|
||||
us_poll_free((struct us_poll_t *) timer, cb->loop);
|
||||
if(fallthrough){
|
||||
us_free(timer);
|
||||
}else {
|
||||
us_poll_free((struct us_poll_t *) timer, cb->loop);
|
||||
}
|
||||
}
|
||||
|
||||
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
|
||||
@@ -438,7 +442,7 @@ void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms
|
||||
us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
|
||||
}
|
||||
#else
|
||||
void us_timer_close(struct us_timer_t *timer) {
|
||||
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
|
||||
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;
|
||||
|
||||
struct kevent64_s event;
|
||||
@@ -446,7 +450,11 @@ void us_timer_close(struct us_timer_t *timer) {
|
||||
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
|
||||
|
||||
/* (regular) sockets are the only polls which are not freed immediately */
|
||||
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
|
||||
if(fallthrough){
|
||||
us_free(timer);
|
||||
}else {
|
||||
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
|
||||
}
|
||||
}
|
||||
|
||||
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
|
||||
|
||||
@@ -132,7 +132,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
|
||||
void *us_timer_ext(struct us_timer_t *timer);
|
||||
|
||||
/* */
|
||||
void us_timer_close(struct us_timer_t *timer);
|
||||
void us_timer_close(struct us_timer_t *timer, int fallthrough);
|
||||
|
||||
/* Arm a timer with a delay from now and eventually a repeat delay.
|
||||
* Specify 0 as repeat delay to disable repeating. Specify both 0 to disarm. */
|
||||
|
||||
@@ -47,7 +47,7 @@ void us_internal_loop_data_free(struct us_loop_t *loop) {
|
||||
|
||||
free(loop->data.recv_buf);
|
||||
|
||||
us_timer_close(loop->data.sweep_timer);
|
||||
us_timer_close(loop->data.sweep_timer, 0);
|
||||
us_internal_async_close(loop->data.wakeup_async);
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ void uws_timer_close(struct us_timer_t *timer)
|
||||
struct timer_handler_data *data;
|
||||
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
//Timer create helper
|
||||
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
|
||||
@@ -47,7 +47,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
|
||||
if (!data->repeat)
|
||||
{
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
},
|
||||
ms, repeat_ms);
|
||||
|
||||
@@ -19,7 +19,7 @@ void uws_timer_close(struct us_timer_t *timer)
|
||||
struct timer_handler_data *data;
|
||||
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
//Timer create helper
|
||||
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
|
||||
@@ -52,7 +52,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
|
||||
if (!data->repeat)
|
||||
{
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
},
|
||||
ms, repeat_ms);
|
||||
|
||||
@@ -62,7 +62,7 @@ void uws_timer_close(struct us_timer_t *timer)
|
||||
struct timer_handler_data *data;
|
||||
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
//Timer create helper
|
||||
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
|
||||
@@ -95,7 +95,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
|
||||
if (!data->repeat)
|
||||
{
|
||||
free(data);
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}
|
||||
},
|
||||
ms, repeat_ms);
|
||||
|
||||
@@ -90,7 +90,7 @@ int main() {
|
||||
|
||||
delete upgradeData;
|
||||
|
||||
us_timer_close(t);
|
||||
us_timer_close(t, 0);
|
||||
}, 5000, 0);
|
||||
|
||||
},
|
||||
|
||||
@@ -126,7 +126,7 @@ public:
|
||||
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
|
||||
|
||||
/* Stop and free dateTimer first */
|
||||
us_timer_close(loopData->dateTimer);
|
||||
us_timer_close(loopData->dateTimer, 0);
|
||||
|
||||
loopData->~LoopData();
|
||||
/* uSockets will track whether this loop is owned by us or a borrowed alien loop */
|
||||
|
||||
@@ -3600,7 +3600,7 @@ pub const Timer = struct {
|
||||
|
||||
this.poll_ref.unref(vm);
|
||||
|
||||
this.timer.deinit();
|
||||
this.timer.deinit(false);
|
||||
|
||||
// balance double unreffing in doUnref
|
||||
vm.event_loop_handle.?.num_polls += @as(i32, @intFromBool(this.did_unref_timer));
|
||||
|
||||
@@ -317,9 +317,9 @@ pub const FFI = struct {
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
var size = symbols.values().len;
|
||||
if(size >= 63) {
|
||||
if (size >= 63) {
|
||||
size = 0;
|
||||
}
|
||||
var obj = JSC.JSValue.createEmptyObject(global, size);
|
||||
|
||||
@@ -3674,6 +3674,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
|
||||
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.createFIFOPrivateName(), streamInternalsCreateFIFOCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyReadableStreamPrivateName(), readableStreamCreateEmptyReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.createUsedReadableStreamPrivateName(), readableStreamCreateUsedReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.consumeReadableStreamPrivateName(), readableStreamConsumeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.createNativeReadableStreamPrivateName(), readableStreamCreateNativeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
putDirectBuiltinFunction(vm, this, builtinNames.requireESMPrivateName(), importMetaObjectRequireESMCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
|
||||
|
||||
@@ -2165,6 +2165,14 @@ JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject)
|
||||
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
|
||||
}
|
||||
|
||||
JSC__JSValue ReadableStream__used(Zig::GlobalObject* globalObject)
|
||||
{
|
||||
auto& vm = globalObject->vm();
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto* function = globalObject->getDirect(vm, clientData->builtinNames().createUsedReadableStreamPrivateName()).getObject();
|
||||
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
|
||||
}
|
||||
|
||||
JSC__JSValue JSC__JSValue__createRangeError(const ZigString* message, const ZigString* arg1,
|
||||
JSC__JSGlobalObject* globalObject)
|
||||
{
|
||||
|
||||
@@ -1148,7 +1148,7 @@ pub const EventLoop = struct {
|
||||
|
||||
pub fn callTask(timer: *uws.Timer) callconv(.C) void {
|
||||
var task = Task.from(timer.as(*anyopaque));
|
||||
timer.deinit();
|
||||
defer timer.deinit(true);
|
||||
|
||||
JSC.VirtualMachine.get().enqueueTask(task);
|
||||
}
|
||||
|
||||
@@ -67,7 +67,7 @@ fn callSync(comptime FunctionEnum: NodeFSFunctionEnum) NodeFSFunction {
|
||||
args,
|
||||
comptime Flavor.sync,
|
||||
);
|
||||
|
||||
|
||||
switch (result) {
|
||||
.err => |err| {
|
||||
globalObject.throwValue(JSC.JSValue.c(err.toJS(globalObject)));
|
||||
|
||||
@@ -97,7 +97,7 @@ pub const StatWatcherScheduler = struct {
|
||||
prev = next;
|
||||
} else {
|
||||
if (this.head.load(.Monotonic) == null) {
|
||||
this.timer.?.deinit();
|
||||
this.timer.?.deinit(false);
|
||||
this.timer = null;
|
||||
// The scheduler is not deinit here, but it will get reused.
|
||||
}
|
||||
|
||||
@@ -1434,7 +1434,6 @@ pub const TestRunnerTask = struct {
|
||||
vm.clearOnException();
|
||||
|
||||
this.ref.unref(vm);
|
||||
|
||||
// there is a double free here involving async before/after callbacks
|
||||
//
|
||||
// Fortunately:
|
||||
|
||||
@@ -466,7 +466,10 @@ pub const Body = struct {
|
||||
JSC.markBinding(@src());
|
||||
|
||||
switch (this.*) {
|
||||
.Used, .Empty => {
|
||||
.Used => {
|
||||
return JSC.WebCore.ReadableStream.used(globalThis);
|
||||
},
|
||||
.Empty => {
|
||||
return JSC.WebCore.ReadableStream.empty(globalThis);
|
||||
},
|
||||
.Null => {
|
||||
@@ -493,6 +496,12 @@ pub const Body = struct {
|
||||
if (locked.readable) |readable| {
|
||||
return readable.value;
|
||||
}
|
||||
|
||||
if (locked.promise != null) {
|
||||
//TODO: make this closed
|
||||
return JSC.WebCore.ReadableStream.used(globalThis);
|
||||
}
|
||||
|
||||
var drain_result: JSC.WebCore.DrainResult = .{
|
||||
.estimated_size = 0,
|
||||
};
|
||||
@@ -1104,8 +1113,7 @@ pub fn BodyMixin(comptime Type: type) type {
|
||||
var body: *Body.Value = this.getBodyValue();
|
||||
|
||||
if (body.* == .Used) {
|
||||
// TODO: make this closed
|
||||
return JSC.WebCore.ReadableStream.empty(globalThis);
|
||||
return JSC.WebCore.ReadableStream.used(globalThis);
|
||||
}
|
||||
|
||||
return body.toReadableStream(globalThis);
|
||||
|
||||
@@ -778,27 +778,30 @@ pub const Fetch = struct {
|
||||
if (!success) {
|
||||
const err = this.onReject();
|
||||
err.ensureStillAlive();
|
||||
// if we are streaming update with error
|
||||
if (this.readable_stream_ref.get()) |readable| {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.err = .{ .JSValue = err },
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
return;
|
||||
}
|
||||
// if we are buffering resolve the promise
|
||||
if (this.response.get()) |response_js| {
|
||||
if (response_js.as(Response)) |response| {
|
||||
const body = response.body;
|
||||
if (body.value == .Locked) {
|
||||
if (body.value.Locked.readable) |readable| {
|
||||
readable.ptr.Bytes.onData(
|
||||
.{
|
||||
.err = .{ .JSValue = err },
|
||||
},
|
||||
bun.default_allocator,
|
||||
);
|
||||
return;
|
||||
if (body.value.Locked.promise) |promise_| {
|
||||
const promise = promise_.asAnyPromise().?;
|
||||
promise.reject(globalThis, err);
|
||||
}
|
||||
}
|
||||
|
||||
response.body.value.toErrorInstance(err, globalThis);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
globalThis.throwValue(err);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1708,7 +1711,7 @@ pub const Fetch = struct {
|
||||
if (decompress.isBoolean()) {
|
||||
disable_decompression = !decompress.asBoolean();
|
||||
} else if (decompress.isNumber()) {
|
||||
disable_keepalive = decompress.to(i32) == 0;
|
||||
disable_decompression = decompress.to(i32) == 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1901,7 +1904,7 @@ pub const Fetch = struct {
|
||||
if (decompress.isBoolean()) {
|
||||
disable_decompression = !decompress.asBoolean();
|
||||
} else if (decompress.isNumber()) {
|
||||
disable_keepalive = decompress.to(i32) == 0;
|
||||
disable_decompression = decompress.to(i32) == 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -225,6 +225,7 @@ pub const ReadableStream = struct {
|
||||
extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
|
||||
extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
|
||||
extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
|
||||
extern fn ReadableStream__used(*JSGlobalObject) JSC.JSValue;
|
||||
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
|
||||
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
|
||||
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
|
||||
@@ -367,6 +368,12 @@ pub const ReadableStream = struct {
|
||||
return ReadableStream__empty(globalThis);
|
||||
}
|
||||
|
||||
pub fn used(globalThis: *JSGlobalObject) JSC.JSValue {
|
||||
JSC.markBinding(@src());
|
||||
|
||||
return ReadableStream__used(globalThis);
|
||||
}
|
||||
|
||||
const Base = @import("../../ast/base.zig");
|
||||
pub const StreamTag = enum(usize) {
|
||||
invalid = 0,
|
||||
@@ -3596,6 +3603,9 @@ pub const ByteStream = struct {
|
||||
this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
|
||||
this.buffer.appendSliceAssumeCapacity(chunk);
|
||||
},
|
||||
.err => {
|
||||
this.pending.result = .{ .err = stream.err };
|
||||
},
|
||||
else => unreachable,
|
||||
}
|
||||
return;
|
||||
@@ -3605,6 +3615,9 @@ pub const ByteStream = struct {
|
||||
.temporary_and_done, .temporary => {
|
||||
try this.buffer.appendSlice(chunk);
|
||||
},
|
||||
.err => {
|
||||
this.pending.result = .{ .err = stream.err };
|
||||
},
|
||||
// We don't support the rest of these yet
|
||||
else => unreachable,
|
||||
}
|
||||
|
||||
@@ -647,18 +647,12 @@ pub const Timer = opaque {
|
||||
pub fn create(loop: *Loop, ptr: anytype) *Timer {
|
||||
const Type = @TypeOf(ptr);
|
||||
|
||||
// never fallthrough poll
|
||||
// the problem is uSockets hardcodes it on the other end
|
||||
// so we can never free non-fallthrough polls
|
||||
return us_create_timer(loop, 0, @sizeOf(Type));
|
||||
}
|
||||
|
||||
pub fn createFallthrough(loop: *Loop, ptr: anytype) *Timer {
|
||||
const Type = @TypeOf(ptr);
|
||||
|
||||
// never fallthrough poll
|
||||
// the problem is uSockets hardcodes it on the other end
|
||||
// so we can never free non-fallthrough polls
|
||||
return us_create_timer(loop, 1, @sizeOf(Type));
|
||||
}
|
||||
|
||||
@@ -669,9 +663,9 @@ pub const Timer = opaque {
|
||||
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *Timer) void {
|
||||
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
|
||||
debug("Timer.deinit()", .{});
|
||||
us_timer_close(this);
|
||||
us_timer_close(this, @intFromBool(fallthrough));
|
||||
}
|
||||
|
||||
pub fn ext(this: *Timer, comptime Type: type) ?*Type {
|
||||
@@ -915,7 +909,7 @@ const uintmax_t = c_ulong;
|
||||
|
||||
extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) *Timer;
|
||||
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
|
||||
extern fn us_timer_close(timer: ?*Timer) void;
|
||||
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
|
||||
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
|
||||
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
|
||||
pub const us_socket_context_options_t = extern struct {
|
||||
|
||||
@@ -751,10 +751,14 @@ pub const HTTPThread = struct {
|
||||
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
|
||||
if (http.client.isHTTPS()) {
|
||||
const socket = uws.SocketTLS.from(socket_ptr.value);
|
||||
socket.shutdown();
|
||||
if (socket.isEstablished() and !socket.isClosed() and !socket.isShutdown()) {
|
||||
socket.shutdownRead();
|
||||
}
|
||||
} else {
|
||||
const socket = uws.SocketTCP.from(socket_ptr.value);
|
||||
socket.shutdown();
|
||||
if (socket.isEstablished() and !socket.isClosed() and !socket.isShutdown()) {
|
||||
socket.shutdownRead();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1099,7 +1103,7 @@ pub fn onClose(
|
||||
}
|
||||
|
||||
if (in_progress) {
|
||||
client.fail(error.ConnectionClosed);
|
||||
client.closeAndFail(error.ConnectionClosed, is_ssl, socket);
|
||||
}
|
||||
}
|
||||
pub fn onTimeout(
|
||||
@@ -1350,10 +1354,6 @@ pub const InternalState = struct {
|
||||
reader.deinit();
|
||||
}
|
||||
|
||||
// if we are holding a cloned_metadata we need to deinit it
|
||||
// this should never happen because we should always return the metadata to the user
|
||||
std.debug.assert(this.cloned_metadata == null);
|
||||
// just in case we check and free to avoid leaks
|
||||
if (this.cloned_metadata != null) {
|
||||
this.cloned_metadata.?.deinit(allocator);
|
||||
this.cloned_metadata = null;
|
||||
@@ -1562,6 +1562,9 @@ pub fn isKeepAlivePossible(this: *HTTPClient) bool {
|
||||
if (this.http_proxy != null and this.url.isHTTPS()) {
|
||||
return false;
|
||||
}
|
||||
if (this.signals.get(.aborted)) {
|
||||
return false;
|
||||
}
|
||||
return !this.disable_keepalive;
|
||||
}
|
||||
return false;
|
||||
@@ -1868,6 +1871,9 @@ pub const AsyncHTTP = struct {
|
||||
if (this.http_proxy != null and this.url.isHTTPS()) {
|
||||
return false;
|
||||
}
|
||||
if (this.signals.get(.aborted)) {
|
||||
return false;
|
||||
}
|
||||
// check state
|
||||
if (this.state.allow_keepalive and !this.disable_keepalive) return true;
|
||||
}
|
||||
@@ -2893,6 +2899,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
|
||||
}
|
||||
|
||||
pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
|
||||
log("closeAndAbort", .{});
|
||||
this.closeAndFail(error.Aborted, comptime is_ssl, socket);
|
||||
}
|
||||
|
||||
@@ -2901,9 +2908,6 @@ fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
|
||||
}
|
||||
|
||||
this.state.reset(this.allocator);
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
this.state.request_stage = .fail;
|
||||
this.state.response_stage = .fail;
|
||||
this.state.fail = err;
|
||||
@@ -2911,6 +2915,9 @@ fn fail(this: *HTTPClient, err: anyerror) void {
|
||||
|
||||
const callback = this.result_callback;
|
||||
const result = this.toResult();
|
||||
this.state.reset(this.allocator);
|
||||
this.proxy_tunneling = false;
|
||||
|
||||
callback.run(result);
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ using namespace JSC;
|
||||
macro(cork) \
|
||||
macro(createCommonJSModule) \
|
||||
macro(createEmptyReadableStream) \
|
||||
macro(createUsedReadableStream) \
|
||||
macro(createFIFO) \
|
||||
macro(createInternalModuleById) \
|
||||
macro(createNativeReadableStream) \
|
||||
|
||||
@@ -290,6 +290,15 @@ export function createEmptyReadableStream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
$linkTimeConstant;
|
||||
export function createUsedReadableStream() {
|
||||
var stream = new ReadableStream({
|
||||
pull() {},
|
||||
} as any);
|
||||
stream.getReader();
|
||||
return stream;
|
||||
}
|
||||
|
||||
$linkTimeConstant;
|
||||
export function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
|
||||
return new ReadableStream({
|
||||
|
||||
@@ -662,7 +662,7 @@ class IncomingMessage extends Readable {
|
||||
if (this.#aborted) return;
|
||||
if (done) {
|
||||
this.push(null);
|
||||
this.destroy();
|
||||
process.nextTick(destroyBodyStreamNT, this);
|
||||
break;
|
||||
}
|
||||
for (var v of value) {
|
||||
|
||||
File diff suppressed because one or more lines are too long
8
src/js/out/WebCoreJSBuiltins.cpp
generated
8
src/js/out/WebCoreJSBuiltins.cpp
generated
@@ -1250,6 +1250,14 @@ const int s_readableStreamCreateNativeReadableStreamCodeLength = 215;
|
||||
static const JSC::Intrinsic s_readableStreamCreateNativeReadableStreamCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamCreateNativeReadableStreamCode = "(function (nativePtr, nativeType, autoAllocateChunkSize) {\"use strict\";\n return new @ReadableStream({\n @lazy: !0,\n @bunNativeType: nativeType,\n @bunNativePtr: nativePtr,\n autoAllocateChunkSize\n });\n})\n";
|
||||
|
||||
// createUsedReadableStream
|
||||
const JSC::ConstructAbility s_readableStreamCreateUsedReadableStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamCreateUsedReadableStreamCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_readableStreamCreateUsedReadableStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Private;
|
||||
const int s_readableStreamCreateUsedReadableStreamCodeLength = 130;
|
||||
static const JSC::Intrinsic s_readableStreamCreateUsedReadableStreamCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamCreateUsedReadableStreamCode = "(function () {\"use strict\";\n var stream = new @ReadableStream({\n pull() {\n }\n });\n return stream.getReader(), stream;\n})\n";
|
||||
|
||||
// getReader
|
||||
const JSC::ConstructAbility s_readableStreamGetReaderCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamGetReaderCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
|
||||
11
src/js/out/WebCoreJSBuiltins.h
generated
11
src/js/out/WebCoreJSBuiltins.h
generated
@@ -2382,6 +2382,14 @@ extern const JSC::ConstructAbility s_readableStreamCreateNativeReadableStreamCod
|
||||
extern const JSC::ConstructorKind s_readableStreamCreateNativeReadableStreamCodeConstructorKind;
|
||||
extern const JSC::ImplementationVisibility s_readableStreamCreateNativeReadableStreamCodeImplementationVisibility;
|
||||
|
||||
// createUsedReadableStream
|
||||
#define WEBCORE_BUILTIN_READABLESTREAM_CREATEUSEDREADABLESTREAM 1
|
||||
extern const char* const s_readableStreamCreateUsedReadableStreamCode;
|
||||
extern const int s_readableStreamCreateUsedReadableStreamCodeLength;
|
||||
extern const JSC::ConstructAbility s_readableStreamCreateUsedReadableStreamCodeConstructAbility;
|
||||
extern const JSC::ConstructorKind s_readableStreamCreateUsedReadableStreamCodeConstructorKind;
|
||||
extern const JSC::ImplementationVisibility s_readableStreamCreateUsedReadableStreamCodeImplementationVisibility;
|
||||
|
||||
// getReader
|
||||
#define WEBCORE_BUILTIN_READABLESTREAM_GETREADER 1
|
||||
extern const char* const s_readableStreamGetReaderCode;
|
||||
@@ -2499,6 +2507,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
|
||||
macro(consumeReadableStream, readableStreamConsumeReadableStream, 3) \
|
||||
macro(createEmptyReadableStream, readableStreamCreateEmptyReadableStream, 0) \
|
||||
macro(createNativeReadableStream, readableStreamCreateNativeReadableStream, 3) \
|
||||
macro(createUsedReadableStream, readableStreamCreateUsedReadableStream, 0) \
|
||||
macro(getReader, readableStreamGetReader, 1) \
|
||||
macro(initializeReadableStream, readableStreamInitializeReadableStream, 3) \
|
||||
macro(lazyAsyncIterator, readableStreamLazyAsyncIterator, 0) \
|
||||
@@ -2519,6 +2528,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
|
||||
macro(readableStreamConsumeReadableStreamCode, consumeReadableStream, ASCIILiteral(), s_readableStreamConsumeReadableStreamCodeLength) \
|
||||
macro(readableStreamCreateEmptyReadableStreamCode, createEmptyReadableStream, ASCIILiteral(), s_readableStreamCreateEmptyReadableStreamCodeLength) \
|
||||
macro(readableStreamCreateNativeReadableStreamCode, createNativeReadableStream, ASCIILiteral(), s_readableStreamCreateNativeReadableStreamCodeLength) \
|
||||
macro(readableStreamCreateUsedReadableStreamCode, createUsedReadableStream, ASCIILiteral(), s_readableStreamCreateUsedReadableStreamCodeLength) \
|
||||
macro(readableStreamGetReaderCode, getReader, ASCIILiteral(), s_readableStreamGetReaderCodeLength) \
|
||||
macro(readableStreamInitializeReadableStreamCode, initializeReadableStream, ASCIILiteral(), s_readableStreamInitializeReadableStreamCodeLength) \
|
||||
macro(readableStreamLazyAsyncIteratorCode, lazyAsyncIterator, ASCIILiteral(), s_readableStreamLazyAsyncIteratorCodeLength) \
|
||||
@@ -2539,6 +2549,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
|
||||
macro(consumeReadableStream) \
|
||||
macro(createEmptyReadableStream) \
|
||||
macro(createNativeReadableStream) \
|
||||
macro(createUsedReadableStream) \
|
||||
macro(getReader) \
|
||||
macro(initializeReadableStream) \
|
||||
macro(lazyAsyncIterator) \
|
||||
|
||||
@@ -265,6 +265,28 @@ describe("TextDecoder", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("TextDecoder ignoreBOM", () => {
|
||||
it.each([
|
||||
{
|
||||
encoding: "utf-8",
|
||||
bytes: [0xef, 0xbb, 0xbf, 0x61, 0x62, 0x63],
|
||||
},
|
||||
{
|
||||
encoding: "utf-16le",
|
||||
bytes: [0xff, 0xfe, 0x61, 0x00, 0x62, 0x00, 0x63, 0x00],
|
||||
},
|
||||
])("should ignoreBOM for: %o", ({ encoding, bytes }) => {
|
||||
const BOM = "\uFEFF";
|
||||
const array = new Uint8Array(bytes);
|
||||
|
||||
const decoder_ignore_bom = new TextDecoder(encoding, { ignoreBOM: true });
|
||||
expect(decoder_ignore_bom.decode(array)).toStrictEqual(`${BOM}abc`);
|
||||
|
||||
const decoder_not_ignore_bom = new TextDecoder(encoding, { ignoreBOM: false });
|
||||
expect(decoder_not_ignore_bom.decode(array)).toStrictEqual("abc");
|
||||
});
|
||||
});
|
||||
|
||||
it("truncated sequences", () => {
|
||||
const assert_equals = (a, b) => expect(a).toBe(b);
|
||||
|
||||
|
||||
@@ -28,6 +28,147 @@ const smallText = Buffer.from("Hello".repeat(16));
|
||||
const empty = Buffer.alloc(0);
|
||||
|
||||
describe("fetch() with streaming", () => {
|
||||
it(`should be able to fail properly when reading from readable stream`, async () => {
|
||||
let server: Server | null = null;
|
||||
try {
|
||||
server = Bun.serve({
|
||||
port: 0,
|
||||
fetch(req) {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async start(controller) {
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
"Content-Type": "text/plain",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
const server_url = `http://${server.hostname}:${server.port}`;
|
||||
const res = await fetch(server_url, { signal: AbortSignal.timeout(20) });
|
||||
try {
|
||||
const reader = res.body?.getReader();
|
||||
while (true) {
|
||||
const { done } = await reader?.read();
|
||||
if (done) break;
|
||||
}
|
||||
expect(true).toBe("unreachable");
|
||||
} catch (err: any) {
|
||||
if (err.name !== "TimeoutError") throw err;
|
||||
expect(err.message).toBe("The operation timed out.");
|
||||
}
|
||||
} finally {
|
||||
server?.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it(`should be locked after start buffering`, async () => {
|
||||
let server: Server | null = null;
|
||||
try {
|
||||
server = Bun.serve({
|
||||
port: 0,
|
||||
fetch(req) {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async start(controller) {
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
"Content-Type": "text/plain",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
const server_url = `http://${server.hostname}:${server.port}`;
|
||||
const res = await fetch(server_url);
|
||||
try {
|
||||
const promise = res.text(); // start buffering
|
||||
res.body?.getReader(); // get a reader
|
||||
const result = await promise; // should throw the right error
|
||||
expect(result).toBe("unreachable");
|
||||
} catch (err: any) {
|
||||
if (err.name !== "TypeError") throw err;
|
||||
expect(err.message).toBe("ReadableStream is locked");
|
||||
}
|
||||
} finally {
|
||||
server?.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it(`should be locked after start buffering when calling getReader`, async () => {
|
||||
let server: Server | null = null;
|
||||
try {
|
||||
server = Bun.serve({
|
||||
port: 0,
|
||||
fetch(req) {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async start(controller) {
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.enqueue("Hello, World!");
|
||||
await Bun.sleep(10);
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
{
|
||||
status: 200,
|
||||
headers: {
|
||||
"Content-Type": "text/plain",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
});
|
||||
|
||||
const server_url = `http://${server.hostname}:${server.port}`;
|
||||
const res = await fetch(server_url);
|
||||
try {
|
||||
const body = res.body as ReadableStream<Uint8Array>;
|
||||
const promise = res.text(); // start buffering
|
||||
body.getReader(); // get a reader
|
||||
const result = await promise; // should throw the right error
|
||||
expect(result).toBe("unreachable");
|
||||
} catch (err: any) {
|
||||
if (err.name !== "TypeError") throw err;
|
||||
expect(err.message).toBe("ReadableStream is locked");
|
||||
}
|
||||
} finally {
|
||||
server?.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("can deflate with and without headers #4478", async () => {
|
||||
let server: Server | null = null;
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user