Compare commits

...

19 Commits

Author SHA1 Message Date
cirospaciari
1ba11c0de3 1 more lock test 2023-09-29 22:15:31 -03:00
cirospaciari
1ca8d0fb53 missing return 2023-09-29 22:04:30 -03:00
cirospaciari
28450dbd91 merge fix 2023-09-29 21:49:59 -03:00
Ciro Spaciari
56ca927ea3 oops 2 2023-09-29 21:43:11 -03:00
cirospaciari
306bae4c39 oops 2023-09-29 21:43:11 -03:00
cirospaciari
88d522b6a0 remove this because im not sure of it 2023-09-29 21:43:11 -03:00
cirospaciari
1cec0c5662 fix AbortSignal.timeout 2023-09-29 21:43:10 -03:00
cirospaciari
dd0a360839 fix tests 2023-09-29 21:41:59 -03:00
cirospaciari
bdbef97b24 remove comments 2023-09-29 21:41:59 -03:00
cirospaciari
22160d8926 more tests 2023-09-29 21:41:59 -03:00
cirospaciari
5544e919fc test 2023-09-29 21:40:59 -03:00
cirospaciari
4408cc797f undo 2023-09-29 21:40:59 -03:00
cirospaciari
aabbe3c741 rebase 2023-09-29 21:40:59 -03:00
cirospaciari
68d2d1c7e0 make readable stream locked when used 2023-09-29 21:40:58 -03:00
cirospaciari
5562cb40fc solve conflicts 2023-09-29 21:40:14 -03:00
cirospaciari
b57375bd2e fix mistake 2023-09-29 21:38:54 -03:00
cirospaciari
bdeaf98a4d fix http module hanging 2023-09-29 21:38:53 -03:00
cirospaciari
d68398b53f add tests and more fixes 2023-09-29 21:35:43 -03:00
cirospaciari
2c3136cb9d revert fail and allow metadata deinit 2023-09-29 21:35:43 -03:00
29 changed files with 293 additions and 60 deletions

View File

@@ -414,14 +414,18 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
#endif
#ifdef LIBUS_USE_EPOLL
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *cb = (struct us_internal_callback_t *) timer;
us_poll_stop(&cb->p, cb->loop);
close(us_poll_fd(&cb->p));
/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, cb->loop);
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, cb->loop);
}
}
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {
@@ -438,7 +442,7 @@ void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms
us_poll_start((struct us_poll_t *) t, internal_cb->loop, LIBUS_SOCKET_READABLE);
}
#else
void us_timer_close(struct us_timer_t *timer) {
void us_timer_close(struct us_timer_t *timer, int fallthrough) {
struct us_internal_callback_t *internal_cb = (struct us_internal_callback_t *) timer;
struct kevent64_s event;
@@ -446,7 +450,11 @@ void us_timer_close(struct us_timer_t *timer) {
kevent64(internal_cb->loop->fd, &event, 1, NULL, 0, 0, NULL);
/* (regular) sockets are the only polls which are not freed immediately */
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
if(fallthrough){
us_free(timer);
}else {
us_poll_free((struct us_poll_t *) timer, internal_cb->loop);
}
}
void us_timer_set(struct us_timer_t *t, void (*cb)(struct us_timer_t *t), int ms, int repeat_ms) {

View File

@@ -132,7 +132,7 @@ struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsi
void *us_timer_ext(struct us_timer_t *timer);
/* */
void us_timer_close(struct us_timer_t *timer);
void us_timer_close(struct us_timer_t *timer, int fallthrough);
/* Arm a timer with a delay from now and eventually a repeat delay.
* Specify 0 as repeat delay to disable repeating. Specify both 0 to disarm. */

View File

@@ -47,7 +47,7 @@ void us_internal_loop_data_free(struct us_loop_t *loop) {
free(loop->data.recv_buf);
us_timer_close(loop->data.sweep_timer);
us_timer_close(loop->data.sweep_timer, 0);
us_internal_async_close(loop->data.wakeup_async);
}

View File

@@ -14,7 +14,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
@@ -47,7 +47,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);

View File

@@ -19,7 +19,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
@@ -52,7 +52,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);

View File

@@ -62,7 +62,7 @@ void uws_timer_close(struct us_timer_t *timer)
struct timer_handler_data *data;
memcpy(&data, us_timer_ext(t), sizeof(struct timer_handler_data *));
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
//Timer create helper
struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void *data), void *data)
@@ -95,7 +95,7 @@ struct us_timer_t *uws_create_timer(int ms, int repeat_ms, void (*handler)(void
if (!data->repeat)
{
free(data);
us_timer_close(t);
us_timer_close(t, 0);
}
},
ms, repeat_ms);

View File

@@ -90,7 +90,7 @@ int main() {
delete upgradeData;
us_timer_close(t);
us_timer_close(t, 0);
}, 5000, 0);
},

View File

@@ -126,7 +126,7 @@ public:
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
/* Stop and free dateTimer first */
us_timer_close(loopData->dateTimer);
us_timer_close(loopData->dateTimer, 0);
loopData->~LoopData();
/* uSockets will track whether this loop is owned by us or a borrowed alien loop */

View File

@@ -3600,7 +3600,7 @@ pub const Timer = struct {
this.poll_ref.unref(vm);
this.timer.deinit();
this.timer.deinit(false);
// balance double unreffing in doUnref
vm.event_loop_handle.?.num_polls += @as(i32, @intFromBool(this.did_unref_timer));

View File

@@ -317,9 +317,9 @@ pub const FFI = struct {
};
};
};
var size = symbols.values().len;
if(size >= 63) {
if (size >= 63) {
size = 0;
}
var obj = JSC.JSValue.createEmptyObject(global, size);

View File

@@ -3674,6 +3674,7 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
putDirectBuiltinFunction(vm, this, builtinNames.createFIFOPrivateName(), streamInternalsCreateFIFOCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createEmptyReadableStreamPrivateName(), readableStreamCreateEmptyReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createUsedReadableStreamPrivateName(), readableStreamCreateUsedReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.consumeReadableStreamPrivateName(), readableStreamConsumeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.createNativeReadableStreamPrivateName(), readableStreamCreateNativeReadableStreamCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);
putDirectBuiltinFunction(vm, this, builtinNames.requireESMPrivateName(), importMetaObjectRequireESMCodeGenerator(vm), PropertyAttribute::Builtin | PropertyAttribute::DontDelete | PropertyAttribute::ReadOnly);

View File

@@ -2165,6 +2165,14 @@ JSC__JSValue ReadableStream__empty(Zig::GlobalObject* globalObject)
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}
JSC__JSValue ReadableStream__used(Zig::GlobalObject* globalObject)
{
auto& vm = globalObject->vm();
auto clientData = WebCore::clientData(vm);
auto* function = globalObject->getDirect(vm, clientData->builtinNames().createUsedReadableStreamPrivateName()).getObject();
return JSValue::encode(JSC::call(globalObject, function, JSC::ArgList(), "ReadableStream.create"_s));
}
JSC__JSValue JSC__JSValue__createRangeError(const ZigString* message, const ZigString* arg1,
JSC__JSGlobalObject* globalObject)
{

View File

@@ -1148,7 +1148,7 @@ pub const EventLoop = struct {
pub fn callTask(timer: *uws.Timer) callconv(.C) void {
var task = Task.from(timer.as(*anyopaque));
timer.deinit();
defer timer.deinit(true);
JSC.VirtualMachine.get().enqueueTask(task);
}

View File

@@ -67,7 +67,7 @@ fn callSync(comptime FunctionEnum: NodeFSFunctionEnum) NodeFSFunction {
args,
comptime Flavor.sync,
);
switch (result) {
.err => |err| {
globalObject.throwValue(JSC.JSValue.c(err.toJS(globalObject)));

View File

@@ -97,7 +97,7 @@ pub const StatWatcherScheduler = struct {
prev = next;
} else {
if (this.head.load(.Monotonic) == null) {
this.timer.?.deinit();
this.timer.?.deinit(false);
this.timer = null;
// The scheduler is not deinit here, but it will get reused.
}

View File

@@ -1434,7 +1434,6 @@ pub const TestRunnerTask = struct {
vm.clearOnException();
this.ref.unref(vm);
// there is a double free here involving async before/after callbacks
//
// Fortunately:

View File

@@ -466,7 +466,10 @@ pub const Body = struct {
JSC.markBinding(@src());
switch (this.*) {
.Used, .Empty => {
.Used => {
return JSC.WebCore.ReadableStream.used(globalThis);
},
.Empty => {
return JSC.WebCore.ReadableStream.empty(globalThis);
},
.Null => {
@@ -493,6 +496,12 @@ pub const Body = struct {
if (locked.readable) |readable| {
return readable.value;
}
if (locked.promise != null) {
//TODO: make this closed
return JSC.WebCore.ReadableStream.used(globalThis);
}
var drain_result: JSC.WebCore.DrainResult = .{
.estimated_size = 0,
};
@@ -1104,8 +1113,7 @@ pub fn BodyMixin(comptime Type: type) type {
var body: *Body.Value = this.getBodyValue();
if (body.* == .Used) {
// TODO: make this closed
return JSC.WebCore.ReadableStream.empty(globalThis);
return JSC.WebCore.ReadableStream.used(globalThis);
}
return body.toReadableStream(globalThis);

View File

@@ -778,27 +778,30 @@ pub const Fetch = struct {
if (!success) {
const err = this.onReject();
err.ensureStillAlive();
// if we are streaming update with error
if (this.readable_stream_ref.get()) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
return;
}
// if we are buffering resolve the promise
if (this.response.get()) |response_js| {
if (response_js.as(Response)) |response| {
const body = response.body;
if (body.value == .Locked) {
if (body.value.Locked.readable) |readable| {
readable.ptr.Bytes.onData(
.{
.err = .{ .JSValue = err },
},
bun.default_allocator,
);
return;
if (body.value.Locked.promise) |promise_| {
const promise = promise_.asAnyPromise().?;
promise.reject(globalThis, err);
}
}
response.body.value.toErrorInstance(err, globalThis);
return;
}
}
globalThis.throwValue(err);
return;
}
@@ -1708,7 +1711,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}
@@ -1901,7 +1904,7 @@ pub const Fetch = struct {
if (decompress.isBoolean()) {
disable_decompression = !decompress.asBoolean();
} else if (decompress.isNumber()) {
disable_keepalive = decompress.to(i32) == 0;
disable_decompression = decompress.to(i32) == 0;
}
}

View File

@@ -225,6 +225,7 @@ pub const ReadableStream = struct {
extern fn ReadableStream__isDisturbed(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__isLocked(possibleReadableStream: JSValue, globalObject: *JSGlobalObject) bool;
extern fn ReadableStream__empty(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__used(*JSGlobalObject) JSC.JSValue;
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
@@ -367,6 +368,12 @@ pub const ReadableStream = struct {
return ReadableStream__empty(globalThis);
}
pub fn used(globalThis: *JSGlobalObject) JSC.JSValue {
JSC.markBinding(@src());
return ReadableStream__used(globalThis);
}
const Base = @import("../../ast/base.zig");
pub const StreamTag = enum(usize) {
invalid = 0,
@@ -3596,6 +3603,9 @@ pub const ByteStream = struct {
this.buffer = try std.ArrayList(u8).initCapacity(bun.default_allocator, chunk.len);
this.buffer.appendSliceAssumeCapacity(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
else => unreachable,
}
return;
@@ -3605,6 +3615,9 @@ pub const ByteStream = struct {
.temporary_and_done, .temporary => {
try this.buffer.appendSlice(chunk);
},
.err => {
this.pending.result = .{ .err = stream.err };
},
// We don't support the rest of these yet
else => unreachable,
}

View File

@@ -647,18 +647,12 @@ pub const Timer = opaque {
pub fn create(loop: *Loop, ptr: anytype) *Timer {
const Type = @TypeOf(ptr);
// never fallthrough poll
// the problem is uSockets hardcodes it on the other end
// so we can never free non-fallthrough polls
return us_create_timer(loop, 0, @sizeOf(Type));
}
pub fn createFallthrough(loop: *Loop, ptr: anytype) *Timer {
const Type = @TypeOf(ptr);
// never fallthrough poll
// the problem is uSockets hardcodes it on the other end
// so we can never free non-fallthrough polls
return us_create_timer(loop, 1, @sizeOf(Type));
}
@@ -669,9 +663,9 @@ pub const Timer = opaque {
@as(*@TypeOf(ptr), @ptrCast(@alignCast(value_ptr))).* = ptr;
}
pub fn deinit(this: *Timer) void {
pub fn deinit(this: *Timer, comptime fallthrough: bool) void {
debug("Timer.deinit()", .{});
us_timer_close(this);
us_timer_close(this, @intFromBool(fallthrough));
}
pub fn ext(this: *Timer, comptime Type: type) ?*Type {
@@ -915,7 +909,7 @@ const uintmax_t = c_ulong;
extern fn us_create_timer(loop: ?*Loop, fallthrough: i32, ext_size: c_uint) *Timer;
extern fn us_timer_ext(timer: ?*Timer) *?*anyopaque;
extern fn us_timer_close(timer: ?*Timer) void;
extern fn us_timer_close(timer: ?*Timer, fallthrough: i32) void;
extern fn us_timer_set(timer: ?*Timer, cb: ?*const fn (*Timer) callconv(.C) void, ms: i32, repeat_ms: i32) void;
extern fn us_timer_loop(t: ?*Timer) ?*Loop;
pub const us_socket_context_options_t = extern struct {

View File

@@ -751,10 +751,14 @@ pub const HTTPThread = struct {
if (socket_async_http_abort_tracker.fetchSwapRemove(http.async_http_id)) |socket_ptr| {
if (http.client.isHTTPS()) {
const socket = uws.SocketTLS.from(socket_ptr.value);
socket.shutdown();
if (socket.isEstablished() and !socket.isClosed() and !socket.isShutdown()) {
socket.shutdownRead();
}
} else {
const socket = uws.SocketTCP.from(socket_ptr.value);
socket.shutdown();
if (socket.isEstablished() and !socket.isClosed() and !socket.isShutdown()) {
socket.shutdownRead();
}
}
}
}
@@ -1099,7 +1103,7 @@ pub fn onClose(
}
if (in_progress) {
client.fail(error.ConnectionClosed);
client.closeAndFail(error.ConnectionClosed, is_ssl, socket);
}
}
pub fn onTimeout(
@@ -1350,10 +1354,6 @@ pub const InternalState = struct {
reader.deinit();
}
// if we are holding a cloned_metadata we need to deinit it
// this should never happen because we should always return the metadata to the user
std.debug.assert(this.cloned_metadata == null);
// just in case we check and free to avoid leaks
if (this.cloned_metadata != null) {
this.cloned_metadata.?.deinit(allocator);
this.cloned_metadata = null;
@@ -1562,6 +1562,9 @@ pub fn isKeepAlivePossible(this: *HTTPClient) bool {
if (this.http_proxy != null and this.url.isHTTPS()) {
return false;
}
if (this.signals.get(.aborted)) {
return false;
}
return !this.disable_keepalive;
}
return false;
@@ -1868,6 +1871,9 @@ pub const AsyncHTTP = struct {
if (this.http_proxy != null and this.url.isHTTPS()) {
return false;
}
if (this.signals.get(.aborted)) {
return false;
}
// check state
if (this.state.allow_keepalive and !this.disable_keepalive) return true;
}
@@ -2893,6 +2899,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
pub fn closeAndAbort(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
log("closeAndAbort", .{});
this.closeAndFail(error.Aborted, comptime is_ssl, socket);
}
@@ -2901,9 +2908,6 @@ fn fail(this: *HTTPClient, err: anyerror) void {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}
this.state.reset(this.allocator);
this.proxy_tunneling = false;
this.state.request_stage = .fail;
this.state.response_stage = .fail;
this.state.fail = err;
@@ -2911,6 +2915,9 @@ fn fail(this: *HTTPClient, err: anyerror) void {
const callback = this.result_callback;
const result = this.toResult();
this.state.reset(this.allocator);
this.proxy_tunneling = false;
callback.run(result);
}

View File

@@ -68,6 +68,7 @@ using namespace JSC;
macro(cork) \
macro(createCommonJSModule) \
macro(createEmptyReadableStream) \
macro(createUsedReadableStream) \
macro(createFIFO) \
macro(createInternalModuleById) \
macro(createNativeReadableStream) \

View File

@@ -290,6 +290,15 @@ export function createEmptyReadableStream() {
return stream;
}
$linkTimeConstant;
export function createUsedReadableStream() {
var stream = new ReadableStream({
pull() {},
} as any);
stream.getReader();
return stream;
}
$linkTimeConstant;
export function createNativeReadableStream(nativePtr, nativeType, autoAllocateChunkSize) {
return new ReadableStream({

View File

@@ -662,7 +662,7 @@ class IncomingMessage extends Readable {
if (this.#aborted) return;
if (done) {
this.push(null);
this.destroy();
process.nextTick(destroyBodyStreamNT, this);
break;
}
for (var v of value) {

File diff suppressed because one or more lines are too long

View File

@@ -1250,6 +1250,14 @@ const int s_readableStreamCreateNativeReadableStreamCodeLength = 215;
static const JSC::Intrinsic s_readableStreamCreateNativeReadableStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamCreateNativeReadableStreamCode = "(function (nativePtr, nativeType, autoAllocateChunkSize) {\"use strict\";\n return new @ReadableStream({\n @lazy: !0,\n @bunNativeType: nativeType,\n @bunNativePtr: nativePtr,\n autoAllocateChunkSize\n });\n})\n";
// createUsedReadableStream
const JSC::ConstructAbility s_readableStreamCreateUsedReadableStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamCreateUsedReadableStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_readableStreamCreateUsedReadableStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Private;
const int s_readableStreamCreateUsedReadableStreamCodeLength = 130;
static const JSC::Intrinsic s_readableStreamCreateUsedReadableStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_readableStreamCreateUsedReadableStreamCode = "(function () {\"use strict\";\n var stream = new @ReadableStream({\n pull() {\n }\n });\n return stream.getReader(), stream;\n})\n";
// getReader
const JSC::ConstructAbility s_readableStreamGetReaderCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_readableStreamGetReaderCodeConstructorKind = JSC::ConstructorKind::None;

View File

@@ -2382,6 +2382,14 @@ extern const JSC::ConstructAbility s_readableStreamCreateNativeReadableStreamCod
extern const JSC::ConstructorKind s_readableStreamCreateNativeReadableStreamCodeConstructorKind;
extern const JSC::ImplementationVisibility s_readableStreamCreateNativeReadableStreamCodeImplementationVisibility;
// createUsedReadableStream
#define WEBCORE_BUILTIN_READABLESTREAM_CREATEUSEDREADABLESTREAM 1
extern const char* const s_readableStreamCreateUsedReadableStreamCode;
extern const int s_readableStreamCreateUsedReadableStreamCodeLength;
extern const JSC::ConstructAbility s_readableStreamCreateUsedReadableStreamCodeConstructAbility;
extern const JSC::ConstructorKind s_readableStreamCreateUsedReadableStreamCodeConstructorKind;
extern const JSC::ImplementationVisibility s_readableStreamCreateUsedReadableStreamCodeImplementationVisibility;
// getReader
#define WEBCORE_BUILTIN_READABLESTREAM_GETREADER 1
extern const char* const s_readableStreamGetReaderCode;
@@ -2499,6 +2507,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
macro(consumeReadableStream, readableStreamConsumeReadableStream, 3) \
macro(createEmptyReadableStream, readableStreamCreateEmptyReadableStream, 0) \
macro(createNativeReadableStream, readableStreamCreateNativeReadableStream, 3) \
macro(createUsedReadableStream, readableStreamCreateUsedReadableStream, 0) \
macro(getReader, readableStreamGetReader, 1) \
macro(initializeReadableStream, readableStreamInitializeReadableStream, 3) \
macro(lazyAsyncIterator, readableStreamLazyAsyncIterator, 0) \
@@ -2519,6 +2528,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
macro(readableStreamConsumeReadableStreamCode, consumeReadableStream, ASCIILiteral(), s_readableStreamConsumeReadableStreamCodeLength) \
macro(readableStreamCreateEmptyReadableStreamCode, createEmptyReadableStream, ASCIILiteral(), s_readableStreamCreateEmptyReadableStreamCodeLength) \
macro(readableStreamCreateNativeReadableStreamCode, createNativeReadableStream, ASCIILiteral(), s_readableStreamCreateNativeReadableStreamCodeLength) \
macro(readableStreamCreateUsedReadableStreamCode, createUsedReadableStream, ASCIILiteral(), s_readableStreamCreateUsedReadableStreamCodeLength) \
macro(readableStreamGetReaderCode, getReader, ASCIILiteral(), s_readableStreamGetReaderCodeLength) \
macro(readableStreamInitializeReadableStreamCode, initializeReadableStream, ASCIILiteral(), s_readableStreamInitializeReadableStreamCodeLength) \
macro(readableStreamLazyAsyncIteratorCode, lazyAsyncIterator, ASCIILiteral(), s_readableStreamLazyAsyncIteratorCodeLength) \
@@ -2539,6 +2549,7 @@ extern const JSC::ImplementationVisibility s_readableStreamValuesCodeImplementat
macro(consumeReadableStream) \
macro(createEmptyReadableStream) \
macro(createNativeReadableStream) \
macro(createUsedReadableStream) \
macro(getReader) \
macro(initializeReadableStream) \
macro(lazyAsyncIterator) \

View File

@@ -265,6 +265,28 @@ describe("TextDecoder", () => {
});
});
describe("TextDecoder ignoreBOM", () => {
it.each([
{
encoding: "utf-8",
bytes: [0xef, 0xbb, 0xbf, 0x61, 0x62, 0x63],
},
{
encoding: "utf-16le",
bytes: [0xff, 0xfe, 0x61, 0x00, 0x62, 0x00, 0x63, 0x00],
},
])("should ignoreBOM for: %o", ({ encoding, bytes }) => {
const BOM = "\uFEFF";
const array = new Uint8Array(bytes);
const decoder_ignore_bom = new TextDecoder(encoding, { ignoreBOM: true });
expect(decoder_ignore_bom.decode(array)).toStrictEqual(`${BOM}abc`);
const decoder_not_ignore_bom = new TextDecoder(encoding, { ignoreBOM: false });
expect(decoder_not_ignore_bom.decode(array)).toStrictEqual("abc");
});
});
it("truncated sequences", () => {
const assert_equals = (a, b) => expect(a).toBe(b);

View File

@@ -28,6 +28,147 @@ const smallText = Buffer.from("Hello".repeat(16));
const empty = Buffer.alloc(0);
describe("fetch() with streaming", () => {
it(`should be able to fail properly when reading from readable stream`, async () => {
let server: Server | null = null;
try {
server = Bun.serve({
port: 0,
fetch(req) {
return new Response(
new ReadableStream({
async start(controller) {
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.close();
},
}),
{
status: 200,
headers: {
"Content-Type": "text/plain",
},
},
);
},
});
const server_url = `http://${server.hostname}:${server.port}`;
const res = await fetch(server_url, { signal: AbortSignal.timeout(20) });
try {
const reader = res.body?.getReader();
while (true) {
const { done } = await reader?.read();
if (done) break;
}
expect(true).toBe("unreachable");
} catch (err: any) {
if (err.name !== "TimeoutError") throw err;
expect(err.message).toBe("The operation timed out.");
}
} finally {
server?.stop();
}
});
it(`should be locked after start buffering`, async () => {
let server: Server | null = null;
try {
server = Bun.serve({
port: 0,
fetch(req) {
return new Response(
new ReadableStream({
async start(controller) {
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.close();
},
}),
{
status: 200,
headers: {
"Content-Type": "text/plain",
},
},
);
},
});
const server_url = `http://${server.hostname}:${server.port}`;
const res = await fetch(server_url);
try {
const promise = res.text(); // start buffering
res.body?.getReader(); // get a reader
const result = await promise; // should throw the right error
expect(result).toBe("unreachable");
} catch (err: any) {
if (err.name !== "TypeError") throw err;
expect(err.message).toBe("ReadableStream is locked");
}
} finally {
server?.stop();
}
});
it(`should be locked after start buffering when calling getReader`, async () => {
let server: Server | null = null;
try {
server = Bun.serve({
port: 0,
fetch(req) {
return new Response(
new ReadableStream({
async start(controller) {
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.enqueue("Hello, World!");
await Bun.sleep(10);
controller.close();
},
}),
{
status: 200,
headers: {
"Content-Type": "text/plain",
},
},
);
},
});
const server_url = `http://${server.hostname}:${server.port}`;
const res = await fetch(server_url);
try {
const body = res.body as ReadableStream<Uint8Array>;
const promise = res.text(); // start buffering
body.getReader(); // get a reader
const result = await promise; // should throw the right error
expect(result).toBe("unreachable");
} catch (err: any) {
if (err.name !== "TypeError") throw err;
expect(err.message).toBe("ReadableStream is locked");
}
} finally {
server?.stop();
}
});
it("can deflate with and without headers #4478", async () => {
let server: Server | null = null;
try {