mirror of
https://github.com/oven-sh/bun
synced 2026-02-05 08:28:55 +00:00
Compare commits
8 Commits
ciro/fix-a
...
jarred/sup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fae8dda372 | ||
|
|
9c2c62d8e4 | ||
|
|
b366866d61 | ||
|
|
1b4c25ff6c | ||
|
|
bca9fedee4 | ||
|
|
e1cd128078 | ||
|
|
4720ae907f | ||
|
|
2924329ab0 |
@@ -1714,9 +1714,9 @@ fn NewSocket(comptime ssl: bool) type {
|
||||
if (args.ptr[0].as(JSC.WebCore.Blob)) |blob| {
|
||||
break :getter JSC.WebCore.AnyBlob{ .Blob = blob.* };
|
||||
} else if (args.ptr[0].as(JSC.WebCore.Response)) |response| {
|
||||
response.body.value.toBlobIfPossible();
|
||||
response.getBodyValue().toBlobIfPossible();
|
||||
|
||||
if (response.body.value.tryUseAsAnyBlob()) |blob| {
|
||||
if (response.getBodyValue().tryUseAsAnyBlob()) |blob| {
|
||||
break :getter blob;
|
||||
}
|
||||
|
||||
|
||||
@@ -457,7 +457,7 @@ pub const HTMLRewriter = struct {
|
||||
result.url = original.url.clone();
|
||||
result.status_text = original.status_text.clone();
|
||||
|
||||
var input = original.body.value.useAsAnyBlob();
|
||||
var input = original.getBodyValue().useAsAnyBlob();
|
||||
sink.input = input;
|
||||
|
||||
const is_pending = input.needsToReadFile();
|
||||
@@ -474,22 +474,24 @@ pub const HTMLRewriter = struct {
|
||||
}
|
||||
|
||||
pub fn onFinishedLoading(sink: *BufferOutputSink, bytes: JSC.WebCore.Blob.Store.ReadFile.ResultType) void {
|
||||
var body = sink.response.getBodyValue();
|
||||
|
||||
switch (bytes) {
|
||||
.err => |err| {
|
||||
if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
|
||||
sink.response.body.value.Locked.promise == null)
|
||||
if (body.* == .Locked and @intFromPtr(body.Locked.task) == @intFromPtr(sink) and
|
||||
body.Locked.promise == null)
|
||||
{
|
||||
sink.response.body.value = .{ .Empty = {} };
|
||||
body.* = .{ .Empty = {} };
|
||||
// is there a pending promise?
|
||||
// we will need to reject it
|
||||
} else if (sink.response.body.value == .Locked and @intFromPtr(sink.response.body.value.Locked.task) == @intFromPtr(sink) and
|
||||
sink.response.body.value.Locked.promise != null)
|
||||
} else if (body.* == .Locked and @intFromPtr(body.Locked.task) == @intFromPtr(sink) and
|
||||
body.Locked.promise != null)
|
||||
{
|
||||
sink.response.body.value.Locked.onReceiveValue = null;
|
||||
sink.response.body.value.Locked.task = null;
|
||||
body.Locked.onReceiveValue = null;
|
||||
body.Locked.task = null;
|
||||
}
|
||||
|
||||
sink.response.body.value.toErrorInstance(err.toErrorInstance(sink.global), sink.global);
|
||||
body.toErrorInstance(err.toErrorInstance(sink.global), sink.global);
|
||||
sink.rewriter.end() catch {};
|
||||
sink.deinit();
|
||||
return;
|
||||
@@ -518,7 +520,7 @@ pub const HTMLRewriter = struct {
|
||||
bun.default_allocator.destroy(sink);
|
||||
|
||||
if (is_async) {
|
||||
response.body.value.toErrorInstance(throwLOLHTMLError(global), global);
|
||||
response.getBodyValue().toErrorInstance(throwLOLHTMLError(global), global);
|
||||
|
||||
return null;
|
||||
} else {
|
||||
@@ -532,7 +534,7 @@ pub const HTMLRewriter = struct {
|
||||
sink.deinit();
|
||||
|
||||
if (is_async) {
|
||||
response.body.value.toErrorInstance(throwLOLHTMLError(global), global);
|
||||
response.getBodyValue().toErrorInstance(throwLOLHTMLError(global), global);
|
||||
return null;
|
||||
} else {
|
||||
return throwLOLHTMLError(global);
|
||||
@@ -545,16 +547,16 @@ pub const HTMLRewriter = struct {
|
||||
pub const Sync = enum { suspended, pending, done };
|
||||
|
||||
pub fn done(this: *BufferOutputSink) void {
|
||||
var prev_value = this.response.body.value;
|
||||
var prev_value = this.response.getBodyValue().*;
|
||||
var bytes = this.bytes.toOwnedSliceLeaky();
|
||||
this.response.body.value = JSC.WebCore.Body.Value.createBlobValue(
|
||||
this.response.getBodyValue().* = JSC.WebCore.Body.Value.createBlobValue(
|
||||
bytes,
|
||||
bun.default_allocator,
|
||||
|
||||
true,
|
||||
);
|
||||
prev_value.resolve(
|
||||
&this.response.body.value,
|
||||
this.response.getBodyValue(),
|
||||
this.global,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1466,9 +1466,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
if (this.response_ptr) |response| {
|
||||
if (response.body.value == .Locked) {
|
||||
if (response.body.value.Locked.readable) |*readable| {
|
||||
response.body.value.Locked.readable = null;
|
||||
if (response.getBodyValue().* == .Locked) {
|
||||
if (response.getBodyValue().Locked.readable) |*readable| {
|
||||
response.getBodyValue().Locked.readable = null;
|
||||
readable.abort(this.server.globalThis);
|
||||
}
|
||||
}
|
||||
@@ -2049,7 +2049,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
streamLog("promise still Pending", .{});
|
||||
// TODO: should this timeout?
|
||||
this.setAbortHandler();
|
||||
this.response_ptr.?.body.value = .{
|
||||
this.response_ptr.?.getBodyValue().* = .{
|
||||
.Locked = .{
|
||||
.readable = stream,
|
||||
.global = this.server.globalThis,
|
||||
@@ -2154,9 +2154,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
ctx.response_jsvalue = response_value;
|
||||
ctx.response_jsvalue.ensureStillAlive();
|
||||
ctx.flags.response_protected = false;
|
||||
response.body.value.toBlobIfPossible();
|
||||
response.getBodyValue().toBlobIfPossible();
|
||||
|
||||
switch (response.body.value) {
|
||||
switch (response.getBodyValue().*) {
|
||||
.Blob => |*blob| {
|
||||
if (blob.needsToReadFile()) {
|
||||
response_value.protect();
|
||||
@@ -2205,8 +2205,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
ctx.response_jsvalue.ensureStillAlive();
|
||||
ctx.flags.response_protected = false;
|
||||
ctx.response_ptr = response;
|
||||
response.body.value.toBlobIfPossible();
|
||||
switch (response.body.value) {
|
||||
response.getBodyValue().toBlobIfPossible();
|
||||
switch (response.getBodyValue().*) {
|
||||
.Blob => |*blob| {
|
||||
if (blob.needsToReadFile()) {
|
||||
fulfilled_value.protect();
|
||||
@@ -2278,9 +2278,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
if (req.response_ptr) |resp| {
|
||||
if (resp.body.value == .Locked) {
|
||||
resp.body.value.Locked.readable.?.done();
|
||||
resp.body.value = .{ .Used = {} };
|
||||
if (resp.getBodyValue().* == .Locked) {
|
||||
resp.getBodyValue().Locked.readable.?.done();
|
||||
resp.getBodyValue().* = .{ .Used = {} };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2339,9 +2339,9 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
if (req.response_ptr) |resp| {
|
||||
if (resp.body.value == .Locked) {
|
||||
resp.body.value.Locked.readable.?.done();
|
||||
resp.body.value = .{ .Used = {} };
|
||||
if (resp.getBodyValue().* == .Locked) {
|
||||
resp.getBodyValue().Locked.readable.?.done();
|
||||
resp.getBodyValue().* = .{ .Used = {} };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2561,7 +2561,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
return;
|
||||
}
|
||||
var response = this.response_ptr.?;
|
||||
this.doRenderWithBody(&response.body.value);
|
||||
this.doRenderWithBody(response.getBodyValue());
|
||||
}
|
||||
|
||||
pub fn renderProductionError(this: *RequestContext, status: u16) void {
|
||||
@@ -4914,7 +4914,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
|
||||
existing_request = Request{
|
||||
.url = bun.String.create(url.href),
|
||||
.headers = headers,
|
||||
.body = JSC.WebCore.InitRequestBodyValue(body) catch unreachable,
|
||||
.body = JSC.WebCore.createBodyValueRef(body) catch unreachable,
|
||||
.method = method,
|
||||
};
|
||||
} else if (first_arg.as(Request)) |request_| {
|
||||
@@ -5296,7 +5296,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
|
||||
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
|
||||
ctx.create(this, req, resp);
|
||||
var request_object = this.allocator.create(JSC.WebCore.Request) catch unreachable;
|
||||
var body = JSC.WebCore.InitRequestBodyValue(.{ .Null = {} }) catch unreachable;
|
||||
var body = JSC.WebCore.createBodyValueRef(.{ .Null = {} }) catch unreachable;
|
||||
|
||||
ctx.request_body = body;
|
||||
const js_signal = JSC.WebCore.AbortSignal.create(this.globalThis);
|
||||
@@ -5398,7 +5398,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
|
||||
var ctx = this.request_pool_allocator.tryGet() catch @panic("ran out of memory");
|
||||
ctx.create(this, req, resp);
|
||||
var request_object = this.allocator.create(JSC.WebCore.Request) catch unreachable;
|
||||
var body = JSC.WebCore.InitRequestBodyValue(.{ .Null = {} }) catch unreachable;
|
||||
var body = JSC.WebCore.createBodyValueRef(.{ .Null = {} }) catch unreachable;
|
||||
|
||||
ctx.request_body = body;
|
||||
const js_signal = JSC.WebCore.AbortSignal.create(this.globalThis);
|
||||
|
||||
154
src/bun.js/bindings/ReadableStreamBindings.cpp
Normal file
154
src/bun.js/bindings/ReadableStreamBindings.cpp
Normal file
@@ -0,0 +1,154 @@
|
||||
#include "root.h"
|
||||
#include "ZigGlobalObject.h"
|
||||
#include "JSReadableStream.h"
|
||||
#include "ReadableStream.h"
|
||||
#include "BunClientData.h"
|
||||
|
||||
using namespace WebCore;
|
||||
using namespace JSC;
|
||||
|
||||
extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
|
||||
if (UNLIKELY(!readableStream))
|
||||
return;
|
||||
|
||||
if (!ReadableStream(*globalObject, *readableStream).isLocked()) {
|
||||
return;
|
||||
}
|
||||
|
||||
WebCore::Exception exception { AbortError };
|
||||
ReadableStream(*globalObject, *readableStream).cancel(exception);
|
||||
}
|
||||
|
||||
extern "C" void ReadableStream__detach(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
|
||||
if (UNLIKELY(!readableStream))
|
||||
return;
|
||||
auto& vm = globalObject->vm();
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
readableStream->putDirect(vm, clientData->builtinNames().bunNativePtrPrivateName(), JSC::jsUndefined(), 0);
|
||||
readableStream->putDirect(vm, clientData->builtinNames().bunNativeTypePrivateName(), JSC::jsUndefined(), 0);
|
||||
}
|
||||
|
||||
extern "C" bool ReadableStream__isDisturbed(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
return ReadableStream::isDisturbed(globalObject, jsDynamicCast<WebCore::JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream)));
|
||||
}
|
||||
|
||||
extern "C" bool ReadableStream__isLocked(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
WebCore::JSReadableStream* stream = jsDynamicCast<WebCore::JSReadableStream*>(JSValue::decode(possibleReadableStream));
|
||||
return stream != nullptr && ReadableStream::isLocked(globalObject, stream);
|
||||
}
|
||||
|
||||
extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
JSC::JSObject* object = JSValue::decode(possibleReadableStream).getObject();
|
||||
if (!object || !object->inherits<JSReadableStream>()) {
|
||||
*ptr = JSC::JSValue();
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto* readableStream = jsCast<JSReadableStream*>(object);
|
||||
auto& vm = globalObject->vm();
|
||||
auto& builtinNames = WebCore::clientData(vm)->builtinNames();
|
||||
int32_t num = 0;
|
||||
if (JSValue numberValue = readableStream->getDirect(vm, builtinNames.bunNativeTypePrivateName())) {
|
||||
num = numberValue.toInt32(globalObject);
|
||||
}
|
||||
|
||||
// If this type is outside the expected range, it means something is wrong.
|
||||
if (UNLIKELY(!(num > 0 && num < 5))) {
|
||||
*ptr = JSC::JSValue();
|
||||
return 0;
|
||||
}
|
||||
|
||||
*ptr = readableStream->getDirect(vm, builtinNames.bunNativePtrPrivateName());
|
||||
return num;
|
||||
}
|
||||
|
||||
extern "C" bool ReadableStream__tee(JSC__JSValue stream, Zig::GlobalObject* globalObject, JSC__JSValue* value1, JSC__JSValue* value2)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
|
||||
auto& vm = globalObject->vm();
|
||||
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto& builtinNames = WebCore::builtinNames(vm);
|
||||
|
||||
auto* function = globalObject->builtinInternalFunctions().readableStreamInternals().m_readableStreamTeeFunction.get();
|
||||
RELEASE_ASSERT(function);
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(stream));
|
||||
arguments.append(jsBoolean(false));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
JSValue result = call(globalObject, function, callData, JSC::jsUndefined(), arguments);
|
||||
|
||||
if (scope.exception()) {
|
||||
scope.clearException();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto* array = jsDynamicCast<JSC::JSArray*>(result);
|
||||
if (UNLIKELY(!array))
|
||||
return false;
|
||||
|
||||
*value1 = JSValue::encode(array->getDirectIndex(globalObject, 0));
|
||||
if (UNLIKELY(scope.exception())) {
|
||||
scope.clearException();
|
||||
return false;
|
||||
}
|
||||
*value2 = JSValue::encode(array->getDirectIndex(globalObject, 1));
|
||||
if (UNLIKELY(scope.exception())) {
|
||||
scope.clearException();
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
extern "C" JSC__JSValue ReadableStream__consume(Zig::GlobalObject* globalObject, JSC__JSValue stream, JSC__JSValue nativeType, JSC__JSValue nativePtr)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
|
||||
auto& vm = globalObject->vm();
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto& builtinNames = WebCore::builtinNames(vm);
|
||||
|
||||
auto function = globalObject->getDirect(vm, builtinNames.consumeReadableStreamPrivateName()).getObject();
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(nativePtr));
|
||||
arguments.append(JSValue::decode(nativeType));
|
||||
arguments.append(JSValue::decode(stream));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
|
||||
extern "C" JSC__JSValue ZigGlobalObject__createNativeReadableStream(Zig::GlobalObject* globalObject, JSC__JSValue nativeType, JSC__JSValue nativePtr)
|
||||
{
|
||||
auto& vm = globalObject->vm();
|
||||
auto scope = DECLARE_THROW_SCOPE(vm);
|
||||
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto& builtinNames = WebCore::builtinNames(vm);
|
||||
|
||||
auto function = globalObject->getDirect(vm, builtinNames.createNativeReadableStreamPrivateName()).getObject();
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(nativeType));
|
||||
arguments.append(JSValue::decode(nativePtr));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
@@ -2133,114 +2133,6 @@ JSC_DEFINE_HOST_FUNCTION(isAbortSignal, (JSGlobalObject*, CallFrame* callFrame))
|
||||
return JSValue::encode(jsBoolean(callFrame->uncheckedArgument(0).inherits<JSAbortSignal>()));
|
||||
}
|
||||
|
||||
extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject);
|
||||
extern "C" void ReadableStream__cancel(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
|
||||
if (UNLIKELY(!readableStream))
|
||||
return;
|
||||
|
||||
if (!ReadableStream(*globalObject, *readableStream).isLocked()) {
|
||||
return;
|
||||
}
|
||||
|
||||
WebCore::Exception exception { AbortError };
|
||||
ReadableStream(*globalObject, *readableStream).cancel(exception);
|
||||
}
|
||||
|
||||
extern "C" void ReadableStream__detach(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject);
|
||||
extern "C" void ReadableStream__detach(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
auto* readableStream = jsDynamicCast<JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream));
|
||||
if (UNLIKELY(!readableStream))
|
||||
return;
|
||||
auto& vm = globalObject->vm();
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
readableStream->putDirect(vm, clientData->builtinNames().bunNativePtrPrivateName(), JSC::jsUndefined(), 0);
|
||||
readableStream->putDirect(vm, clientData->builtinNames().bunNativeTypePrivateName(), JSC::jsUndefined(), 0);
|
||||
}
|
||||
extern "C" bool ReadableStream__isDisturbed(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject);
|
||||
extern "C" bool ReadableStream__isDisturbed(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
return ReadableStream::isDisturbed(globalObject, jsDynamicCast<WebCore::JSReadableStream*>(JSC::JSValue::decode(possibleReadableStream)));
|
||||
}
|
||||
|
||||
extern "C" bool ReadableStream__isLocked(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject);
|
||||
extern "C" bool ReadableStream__isLocked(JSC__JSValue possibleReadableStream, Zig::GlobalObject* globalObject)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
WebCore::JSReadableStream* stream = jsDynamicCast<WebCore::JSReadableStream*>(JSValue::decode(possibleReadableStream));
|
||||
return stream != nullptr && ReadableStream::isLocked(globalObject, stream);
|
||||
}
|
||||
|
||||
extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr);
|
||||
extern "C" int32_t ReadableStreamTag__tagged(Zig::GlobalObject* globalObject, JSC__JSValue possibleReadableStream, JSValue* ptr)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
JSC::JSObject* object = JSValue::decode(possibleReadableStream).getObject();
|
||||
if (!object || !object->inherits<JSReadableStream>()) {
|
||||
*ptr = JSC::JSValue();
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto* readableStream = jsCast<JSReadableStream*>(object);
|
||||
auto& vm = globalObject->vm();
|
||||
auto& builtinNames = WebCore::clientData(vm)->builtinNames();
|
||||
int32_t num = 0;
|
||||
if (JSValue numberValue = readableStream->getDirect(vm, builtinNames.bunNativeTypePrivateName())) {
|
||||
num = numberValue.toInt32(globalObject);
|
||||
}
|
||||
|
||||
// If this type is outside the expected range, it means something is wrong.
|
||||
if (UNLIKELY(!(num > 0 && num < 5))) {
|
||||
*ptr = JSC::JSValue();
|
||||
return 0;
|
||||
}
|
||||
|
||||
*ptr = readableStream->getDirect(vm, builtinNames.bunNativePtrPrivateName());
|
||||
return num;
|
||||
}
|
||||
|
||||
extern "C" JSC__JSValue ReadableStream__consume(Zig::GlobalObject* globalObject, JSC__JSValue stream, JSC__JSValue nativeType, JSC__JSValue nativePtr);
|
||||
extern "C" JSC__JSValue ReadableStream__consume(Zig::GlobalObject* globalObject, JSC__JSValue stream, JSC__JSValue nativeType, JSC__JSValue nativePtr)
|
||||
{
|
||||
ASSERT(globalObject);
|
||||
|
||||
auto& vm = globalObject->vm();
|
||||
auto scope = DECLARE_CATCH_SCOPE(vm);
|
||||
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto& builtinNames = WebCore::builtinNames(vm);
|
||||
|
||||
auto function = globalObject->getDirect(vm, builtinNames.consumeReadableStreamPrivateName()).getObject();
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(nativePtr));
|
||||
arguments.append(JSValue::decode(nativeType));
|
||||
arguments.append(JSValue::decode(stream));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
|
||||
extern "C" JSC__JSValue ZigGlobalObject__createNativeReadableStream(Zig::GlobalObject* globalObject, JSC__JSValue nativeType, JSC__JSValue nativePtr);
|
||||
extern "C" JSC__JSValue ZigGlobalObject__createNativeReadableStream(Zig::GlobalObject* globalObject, JSC__JSValue nativeType, JSC__JSValue nativePtr)
|
||||
{
|
||||
auto& vm = globalObject->vm();
|
||||
auto scope = DECLARE_THROW_SCOPE(vm);
|
||||
|
||||
auto clientData = WebCore::clientData(vm);
|
||||
auto& builtinNames = WebCore::builtinNames(vm);
|
||||
|
||||
auto function = globalObject->getDirect(vm, builtinNames.createNativeReadableStreamPrivateName()).getObject();
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(nativeType));
|
||||
arguments.append(JSValue::decode(nativePtr));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
|
||||
static inline EncodedJSValue flattenArrayOfBuffersIntoArrayBuffer(JSGlobalObject* lexicalGlobalObject, JSValue arrayValue)
|
||||
{
|
||||
auto& vm = lexicalGlobalObject->vm();
|
||||
@@ -2645,7 +2537,6 @@ static inline JSC__JSValue ZigGlobalObject__readableStreamToArrayBufferBody(Zig:
|
||||
RELEASE_AND_RETURN(throwScope, JSC::JSValue::encode(promise));
|
||||
}
|
||||
|
||||
extern "C" JSC__JSValue ZigGlobalObject__readableStreamToArrayBuffer(Zig::GlobalObject* globalObject, JSC__JSValue readableStreamValue);
|
||||
extern "C" JSC__JSValue ZigGlobalObject__readableStreamToArrayBuffer(Zig::GlobalObject* globalObject, JSC__JSValue readableStreamValue)
|
||||
{
|
||||
return ZigGlobalObject__readableStreamToArrayBufferBody(reinterpret_cast<Zig::GlobalObject*>(globalObject), readableStreamValue);
|
||||
|
||||
@@ -884,7 +884,8 @@ pub const Blob = struct {
|
||||
// TODO: implement a writeev() fast path
|
||||
var source_blob: Blob = brk: {
|
||||
if (data.as(Response)) |response| {
|
||||
switch (response.body.value) {
|
||||
var body = response.getBodyValue();
|
||||
switch (body.*) {
|
||||
.WTFStringImpl,
|
||||
.InternalBlob,
|
||||
.Used,
|
||||
@@ -892,13 +893,13 @@ pub const Blob = struct {
|
||||
.Blob,
|
||||
.Null,
|
||||
=> {
|
||||
break :brk response.body.use();
|
||||
break :brk body.use();
|
||||
},
|
||||
.Error => {
|
||||
destination_blob.detach();
|
||||
const err = response.body.value.Error;
|
||||
const err = body.Error;
|
||||
JSC.C.JSValueUnprotect(ctx, err.asObjectRef());
|
||||
_ = response.body.value.use();
|
||||
_ = body.use();
|
||||
return JSC.JSPromise.rejectedPromiseValue(ctx.ptr(), err).asObjectRef();
|
||||
},
|
||||
.Locked => {
|
||||
@@ -910,8 +911,8 @@ pub const Blob = struct {
|
||||
.promise = promise,
|
||||
};
|
||||
|
||||
response.body.value.Locked.task = task;
|
||||
response.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
|
||||
body.Locked.task = task;
|
||||
body.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
|
||||
|
||||
return promise.asValue(ctx.ptr()).asObjectRef();
|
||||
},
|
||||
@@ -919,7 +920,8 @@ pub const Blob = struct {
|
||||
}
|
||||
|
||||
if (data.as(Request)) |request| {
|
||||
switch (request.body.value) {
|
||||
var body = request.getBodyValue();
|
||||
switch (body.*) {
|
||||
.WTFStringImpl,
|
||||
.InternalBlob,
|
||||
.Used,
|
||||
@@ -927,13 +929,13 @@ pub const Blob = struct {
|
||||
.Blob,
|
||||
.Null,
|
||||
=> {
|
||||
break :brk request.body.value.use();
|
||||
break :brk body.use();
|
||||
},
|
||||
.Error => {
|
||||
destination_blob.detach();
|
||||
const err = request.body.value.Error;
|
||||
const err = body.Error;
|
||||
JSC.C.JSValueUnprotect(ctx, err.asObjectRef());
|
||||
_ = request.body.value.use();
|
||||
_ = body.use();
|
||||
return JSC.JSPromise.rejectedPromiseValue(ctx.ptr(), err).asObjectRef();
|
||||
},
|
||||
.Locked => {
|
||||
@@ -945,8 +947,8 @@ pub const Blob = struct {
|
||||
.promise = promise,
|
||||
};
|
||||
|
||||
request.body.value.Locked.task = task;
|
||||
request.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
|
||||
body.Locked.task = task;
|
||||
body.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
|
||||
|
||||
return promise.asValue(ctx.ptr()).asObjectRef();
|
||||
},
|
||||
|
||||
@@ -236,7 +236,7 @@ pub const Body = struct {
|
||||
pub fn setPromise(value: *PendingValue, globalThis: *JSC.JSGlobalObject, action: Action) JSValue {
|
||||
value.action = action;
|
||||
|
||||
if (value.readable) |readable| {
|
||||
if (value.readable) |*readable| {
|
||||
// switch (readable.ptr) {
|
||||
// .JavaScript
|
||||
// }
|
||||
@@ -935,9 +935,12 @@ pub const Body = struct {
|
||||
JSC.C.JSValueUnprotect(VirtualMachine.get().global, this.Error.asObjectRef());
|
||||
}
|
||||
}
|
||||
const debug = Output.scoped(.body, false);
|
||||
|
||||
pub fn clone(this: *Value, globalThis: *JSC.JSGlobalObject) Value {
|
||||
if (this.* == .InternalBlob) {
|
||||
var internal_blob = this.InternalBlob;
|
||||
debug("InternalBlob -> Blob", .{});
|
||||
this.* = .{
|
||||
.Blob = Blob.init(
|
||||
internal_blob.toOwnedSlice(),
|
||||
@@ -966,6 +969,48 @@ pub const Body = struct {
|
||||
|
||||
return Value{ .Empty = {} };
|
||||
}
|
||||
|
||||
pub fn cloneAllowTee(this: *Value, teed: *Value, did_tee: *bool, globalThis: *JSC.JSGlobalObject) Value {
|
||||
if (this.* == .Locked) {
|
||||
if (this.Locked.toAnyBlob()) |blob| {
|
||||
debug("Locked -> {s}", .{@tagName(blob)});
|
||||
|
||||
this.* = switch (blob) {
|
||||
.Blob => .{ .Blob = blob.Blob },
|
||||
.InternalBlob => .{ .InternalBlob = blob.InternalBlob },
|
||||
.WTFStringImpl => .{ .WTFStringImpl = blob.WTFStringImpl },
|
||||
// .InlineBlob => .{ .InlineBlob = blob.InlineBlob },
|
||||
};
|
||||
} else if (this.Locked.promise == null and !this.Locked.deinit) {
|
||||
debug("Locked -> ReadableStream", .{});
|
||||
_ = this.toReadableStream(globalThis);
|
||||
if (this.Locked.readable) |*readable| {
|
||||
if (!readable.isLocked(globalThis)) {
|
||||
if (readable.tee(globalThis)) |branches| {
|
||||
did_tee.* = true;
|
||||
teed.* = Value{
|
||||
.Locked = .{
|
||||
.readable = branches[0],
|
||||
.global = globalThis,
|
||||
},
|
||||
};
|
||||
|
||||
return Value{
|
||||
.Locked = .{
|
||||
.readable = branches[1],
|
||||
.global = globalThis,
|
||||
},
|
||||
};
|
||||
}
|
||||
} else {
|
||||
debug("ReadableStream is locked", .{});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.clone(globalThis);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn @"404"(_: js.JSContextRef) Body {
|
||||
|
||||
@@ -58,7 +58,7 @@ const BodyValueHiveAllocator = bun.HiveArray(BodyValueRef, body_value_pool_size)
|
||||
|
||||
var body_value_hive_allocator = BodyValueHiveAllocator.init(bun.default_allocator);
|
||||
|
||||
pub fn InitRequestBodyValue(value: Body.Value) !*BodyValueRef {
|
||||
pub fn createBodyValueRef(value: Body.Value) !*BodyValueRef {
|
||||
return try BodyValueRef.init(value, &body_value_hive_allocator);
|
||||
}
|
||||
// https://developer.mozilla.org/en-US/docs/Web/API/Request
|
||||
@@ -68,6 +68,7 @@ pub const Request = struct {
|
||||
headers: ?*FetchHeaders = null,
|
||||
signal: ?*AbortSignal = null,
|
||||
body: *BodyValueRef,
|
||||
teed_body: ?*BodyValueRef = null,
|
||||
method: Method = Method.GET,
|
||||
uws_request: ?*uws.Request = null,
|
||||
https: bool = false,
|
||||
@@ -102,9 +103,9 @@ pub const Request = struct {
|
||||
}
|
||||
}
|
||||
|
||||
if (this.body.value == .Blob) {
|
||||
if (this.body.value.Blob.content_type.len > 0)
|
||||
return ZigString.Slice.fromUTF8NeverFree(this.body.value.Blob.content_type);
|
||||
if (this.getBodyValue().* == .Blob) {
|
||||
if (this.getBodyValue().Blob.content_type.len > 0)
|
||||
return ZigString.Slice.fromUTF8NeverFree(this.getBodyValue().Blob.content_type);
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -149,22 +150,23 @@ pub const Request = struct {
|
||||
try formatter.writeIndent(Writer, writer);
|
||||
try writer.writeAll(comptime Output.prettyFmt("<r>headers<d>:<r> ", enable_ansi_colors));
|
||||
formatter.printAs(.Private, Writer, writer, this.getHeaders(formatter.globalThis), .DOMWrapper, enable_ansi_colors);
|
||||
var body = this.getBodyValue();
|
||||
|
||||
if (this.body.value == .Blob) {
|
||||
if (body.* == .Blob) {
|
||||
try writer.writeAll("\n");
|
||||
try formatter.writeIndent(Writer, writer);
|
||||
try this.body.value.Blob.writeFormat(Formatter, formatter, writer, enable_ansi_colors);
|
||||
} else if (this.body.value == .InternalBlob or this.body.value == .WTFStringImpl) {
|
||||
try body.Blob.writeFormat(Formatter, formatter, writer, enable_ansi_colors);
|
||||
} else if (body.* == .InternalBlob or body.* == .WTFStringImpl) {
|
||||
try writer.writeAll("\n");
|
||||
try formatter.writeIndent(Writer, writer);
|
||||
const size = this.body.value.size();
|
||||
const size = body.size();
|
||||
if (size == 0) {
|
||||
try Blob.initEmpty(undefined).writeFormat(Formatter, formatter, writer, enable_ansi_colors);
|
||||
} else {
|
||||
try Blob.writeFormatForSize(size, writer, enable_ansi_colors);
|
||||
}
|
||||
} else if (this.body.value == .Locked) {
|
||||
if (this.body.value.Locked.readable) |stream| {
|
||||
} else if (body.* == .Locked) {
|
||||
if (body.Locked.readable) |stream| {
|
||||
try writer.writeAll("\n");
|
||||
try formatter.writeIndent(Writer, writer);
|
||||
formatter.printAs(.Object, Writer, writer, stream.value, stream.value.jsType(), enable_ansi_colors);
|
||||
@@ -179,7 +181,7 @@ pub const Request = struct {
|
||||
pub fn fromRequestContext(ctx: *RequestContext) !Request {
|
||||
var req = Request{
|
||||
.url = bun.String.create(ctx.full_url),
|
||||
.body = try InitRequestBodyValue(.{ .Null = {} }),
|
||||
.body = try createBodyValueRef(.{ .Null = {} }),
|
||||
.method = ctx.method,
|
||||
.headers = FetchHeaders.createFromPicoHeaders(ctx.request.headers),
|
||||
};
|
||||
@@ -281,6 +283,9 @@ pub const Request = struct {
|
||||
pub fn finalize(this: *Request) callconv(.C) void {
|
||||
this.finalizeWithoutDeinit();
|
||||
_ = this.body.unref();
|
||||
if (this.teed_body) |teed| {
|
||||
_ = teed.unref();
|
||||
}
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
@@ -439,7 +444,7 @@ pub const Request = struct {
|
||||
arguments: []const JSC.JSValue,
|
||||
) ?Request {
|
||||
var req = Request{
|
||||
.body = InitRequestBodyValue(.{ .Null = {} }) catch {
|
||||
.body = createBodyValueRef(.{ .Null = {} }) catch {
|
||||
return null;
|
||||
},
|
||||
};
|
||||
@@ -545,10 +550,10 @@ pub const Request = struct {
|
||||
}
|
||||
|
||||
if (!fields.contains(.body)) {
|
||||
switch (response.body.value) {
|
||||
switch (response.getBodyValue().*) {
|
||||
.Null, .Empty, .Used => {},
|
||||
else => {
|
||||
req.body.value = response.body.value.clone(globalThis);
|
||||
req.getBodyValue().* = response.getBodyValue().*.clone(globalThis);
|
||||
fields.insert(.body);
|
||||
},
|
||||
}
|
||||
@@ -675,7 +680,7 @@ pub const Request = struct {
|
||||
pub fn getBodyValue(
|
||||
this: *Request,
|
||||
) *Body.Value {
|
||||
return &this.body.value;
|
||||
return if (this.teed_body) |teed| &teed.value else &this.body.value;
|
||||
}
|
||||
|
||||
pub fn getFetchHeaders(
|
||||
@@ -743,10 +748,26 @@ pub const Request = struct {
|
||||
_ = allocator;
|
||||
this.ensureURL() catch {};
|
||||
|
||||
var body = InitRequestBodyValue(this.body.value.clone(globalThis)) catch {
|
||||
var teed_value: Body.Value = undefined;
|
||||
var did_tee = false;
|
||||
var reference_value = this.getBodyValue();
|
||||
|
||||
var body = createBodyValueRef(reference_value.cloneAllowTee(&teed_value, &did_tee, globalThis)) catch {
|
||||
globalThis.throw("Failed to clone request", .{});
|
||||
return;
|
||||
};
|
||||
if (did_tee) {
|
||||
var new_teed_body = createBodyValueRef(teed_value) catch {
|
||||
globalThis.throw("Failed to clone request", .{});
|
||||
_ = body.unref();
|
||||
return;
|
||||
};
|
||||
if (this.teed_body != null) {
|
||||
_ = this.teed_body.?.unref();
|
||||
}
|
||||
this.teed_body = new_teed_body;
|
||||
}
|
||||
|
||||
var original_url = req.url;
|
||||
|
||||
req.* = Request{
|
||||
|
||||
@@ -58,7 +58,12 @@ pub const Response = struct {
|
||||
pub usingnamespace JSC.Codegen.JSResponse;
|
||||
|
||||
allocator: std.mem.Allocator,
|
||||
/// Avoid accessing .body directly, use .getBodyValue() instead
|
||||
body: Body,
|
||||
/// When the Response has been cloned and it was originally a stream
|
||||
/// We have to preserve the pointer of the original Body
|
||||
/// Therefore, we have a separate pointer and it becomes the "public" body
|
||||
teed_body_value: ?*JSC.WebCore.BodyValueRef = null,
|
||||
url: bun.String = bun.String.empty,
|
||||
status_text: bun.String = bun.String.empty,
|
||||
redirected: bool = false,
|
||||
@@ -94,7 +99,7 @@ pub const Response = struct {
|
||||
pub fn getBodyValue(
|
||||
this: *Response,
|
||||
) *Body.Value {
|
||||
return &this.body.value;
|
||||
return if (this.teed_body_value) |body| &body.value else &this.body.value;
|
||||
}
|
||||
|
||||
pub fn getFetchHeaders(
|
||||
@@ -254,9 +259,30 @@ pub const Response = struct {
|
||||
allocator: std.mem.Allocator,
|
||||
globalThis: *JSGlobalObject,
|
||||
) void {
|
||||
var teed_value: Body.Value = undefined;
|
||||
var did_tee = false;
|
||||
var reference_value = this.getBodyValue();
|
||||
|
||||
var value = reference_value.cloneAllowTee(&teed_value, &did_tee, globalThis);
|
||||
|
||||
if (did_tee) {
|
||||
var new_teed_body = JSC.WebCore.createBodyValueRef(teed_value) catch {
|
||||
globalThis.throw("Failed to clone Response", .{});
|
||||
value.deinit();
|
||||
return;
|
||||
};
|
||||
if (this.teed_body_value) |teed| {
|
||||
_ = teed.unref();
|
||||
}
|
||||
this.teed_body_value = new_teed_body;
|
||||
}
|
||||
|
||||
new_response.* = Response{
|
||||
.allocator = allocator,
|
||||
.body = this.body.clone(globalThis),
|
||||
.body = .{
|
||||
.init = this.body.init.clone(globalThis),
|
||||
.value = value,
|
||||
},
|
||||
.url = this.url.clone(),
|
||||
.status_text = this.status_text.clone(),
|
||||
.redirected = this.redirected,
|
||||
@@ -281,6 +307,10 @@ pub const Response = struct {
|
||||
this: *Response,
|
||||
) callconv(.C) void {
|
||||
this.body.deinit(this.allocator);
|
||||
if (this.teed_body_value) |teed_body_value| {
|
||||
this.teed_body_value = null;
|
||||
_ = teed_body_value.unref();
|
||||
}
|
||||
|
||||
var allocator = this.allocator;
|
||||
|
||||
@@ -304,15 +334,15 @@ pub const Response = struct {
|
||||
return MimeType.byExtension(request_ctx.url.extname).value;
|
||||
}
|
||||
}
|
||||
|
||||
switch (response.body.value) {
|
||||
var body = @constCast(response).getBodyValue();
|
||||
switch (body.*) {
|
||||
.Blob => |blob| {
|
||||
if (blob.content_type.len > 0) {
|
||||
return blob.content_type;
|
||||
}
|
||||
|
||||
// auto-detect HTML if unspecified
|
||||
if (strings.hasPrefixComptime(response.body.value.slice(), "<!DOCTYPE html>")) {
|
||||
if (strings.hasPrefixComptime(body.slice(), "<!DOCTYPE html>")) {
|
||||
return MimeType.html.value;
|
||||
}
|
||||
|
||||
@@ -327,11 +357,11 @@ pub const Response = struct {
|
||||
},
|
||||
.InternalBlob => {
|
||||
// auto-detect HTML if unspecified
|
||||
if (strings.hasPrefixComptime(response.body.value.slice(), "<!DOCTYPE html>")) {
|
||||
if (strings.hasPrefixComptime(body.slice(), "<!DOCTYPE html>")) {
|
||||
return MimeType.html.value;
|
||||
}
|
||||
|
||||
return response.body.value.InternalBlob.contentType();
|
||||
return body.InternalBlob.contentType();
|
||||
},
|
||||
.Null, .Used, .Locked, .Empty, .Error => return default.value,
|
||||
}
|
||||
|
||||
@@ -114,20 +114,37 @@ pub const ReadableStream = struct {
|
||||
|
||||
pub fn cancel(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
JSC.markBinding(@src());
|
||||
this.value.unprotect();
|
||||
ReadableStream__cancel(this.value, globalThis);
|
||||
const value = this.value;
|
||||
value.ensureStillAlive();
|
||||
defer value.unprotect();
|
||||
ReadableStream__cancel(value, globalThis);
|
||||
}
|
||||
|
||||
pub fn abort(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
JSC.markBinding(@src());
|
||||
this.value.unprotect();
|
||||
ReadableStream__cancel(this.value, globalThis);
|
||||
const value = this.value;
|
||||
defer value.unprotect();
|
||||
ReadableStream__cancel(value, globalThis);
|
||||
}
|
||||
|
||||
pub fn detach(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
|
||||
JSC.markBinding(@src());
|
||||
this.value.unprotect();
|
||||
ReadableStream__detach(this.value, globalThis);
|
||||
const value = this.value;
|
||||
defer value.unprotect();
|
||||
ReadableStream__detach(value, globalThis);
|
||||
}
|
||||
|
||||
pub fn tee(this: *ReadableStream, globalThis: *JSGlobalObject) ?[2]ReadableStream {
|
||||
JSC.markBinding(@src());
|
||||
const value = this.value;
|
||||
var new_value1 = JSC.JSValue.zero;
|
||||
var new_value2 = JSC.JSValue.zero;
|
||||
_ = ReadableStream__tee(value, globalThis, &new_value1, &new_value2);
|
||||
|
||||
return [2]ReadableStream{
|
||||
.{ .ptr = .{ .JavaScript = {} }, .value = new_value1 },
|
||||
.{ .ptr = .{ .JavaScript = {} }, .value = new_value2 },
|
||||
};
|
||||
}
|
||||
|
||||
pub const Tag = enum(i32) {
|
||||
@@ -180,6 +197,7 @@ pub const ReadableStream = struct {
|
||||
extern fn ReadableStream__cancel(stream: JSValue, *JSGlobalObject) void;
|
||||
extern fn ReadableStream__abort(stream: JSValue, *JSGlobalObject) void;
|
||||
extern fn ReadableStream__detach(stream: JSValue, *JSGlobalObject) void;
|
||||
extern fn ReadableStream__tee(stream: JSValue, *JSGlobalObject, result1: *JSValue, result2: *JSValue) bool;
|
||||
extern fn ReadableStream__fromBlob(
|
||||
*JSGlobalObject,
|
||||
store: *anyopaque,
|
||||
|
||||
@@ -500,11 +500,13 @@ export function readableStreamTee(stream, shouldClone) {
|
||||
const reader = new $ReadableStreamDefaultReader(stream);
|
||||
|
||||
const teeState = {
|
||||
closedOrErrored: false,
|
||||
stream: stream,
|
||||
canceled1: false,
|
||||
canceled2: false,
|
||||
reason1: undefined,
|
||||
reason2: undefined,
|
||||
reading: false,
|
||||
readAgain: false,
|
||||
};
|
||||
|
||||
teeState.cancelPromiseCapability = $newPromiseCapability(Promise);
|
||||
@@ -523,10 +525,8 @@ export function readableStreamTee(stream, shouldClone) {
|
||||
const branch2 = new $ReadableStream(branch2Source);
|
||||
|
||||
$getByIdDirectPrivate(reader, "closedPromiseCapability").promise.$then(undefined, function (e) {
|
||||
if (teeState.closedOrErrored) return;
|
||||
$readableStreamDefaultControllerError(branch1.$readableStreamController, e);
|
||||
$readableStreamDefaultControllerError(branch2.$readableStreamController, e);
|
||||
teeState.closedOrErrored = true;
|
||||
if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.resolve.$call();
|
||||
});
|
||||
|
||||
@@ -538,26 +538,64 @@ export function readableStreamTee(stream, shouldClone) {
|
||||
}
|
||||
|
||||
export function readableStreamTeePullFunction(teeState, reader, shouldClone) {
|
||||
return function () {
|
||||
Promise.prototype.$then.$call($readableStreamDefaultReaderRead(reader), function (result) {
|
||||
$assert($isObject(result));
|
||||
$assert(typeof result.done === "boolean");
|
||||
if (result.done && !teeState.closedOrErrored) {
|
||||
if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController);
|
||||
if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController);
|
||||
teeState.closedOrErrored = true;
|
||||
if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.resolve.$call();
|
||||
}
|
||||
if (teeState.closedOrErrored) return;
|
||||
if (!teeState.canceled1)
|
||||
$readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, result.value);
|
||||
if (!teeState.canceled2)
|
||||
$readableStreamDefaultControllerEnqueue(
|
||||
teeState.branch2.$readableStreamController,
|
||||
shouldClone ? $structuredCloneForStream(result.value) : result.value,
|
||||
);
|
||||
});
|
||||
var pullAlgorithm;
|
||||
pullAlgorithm = function () {
|
||||
if (teeState.reading) {
|
||||
teeState.readAgain = true;
|
||||
return Promise.resolve();
|
||||
}
|
||||
teeState.reading = true;
|
||||
Promise.prototype.$then.$call(
|
||||
$readableStreamDefaultReaderRead(reader),
|
||||
function (result) {
|
||||
$assert($isObject(result));
|
||||
$assert(typeof result.done === "boolean");
|
||||
if (result.done) {
|
||||
// close steps.
|
||||
teeState.reading = false;
|
||||
if (!teeState.canceled1) $readableStreamDefaultControllerClose(teeState.branch1.$readableStreamController);
|
||||
if (!teeState.canceled2) $readableStreamDefaultControllerClose(teeState.branch2.$readableStreamController);
|
||||
if (!teeState.canceled1 || !teeState.canceled2) teeState.cancelPromiseCapability.resolve.$call();
|
||||
return;
|
||||
}
|
||||
// chunk steps.
|
||||
teeState.readAgain = false;
|
||||
let chunk1 = result.value;
|
||||
let chunk2 = chunk1;
|
||||
if (!teeState.canceled2 && shouldClone && typeof chunk1 !== "string") {
|
||||
try {
|
||||
chunk2 = $structuredCloneForStream(chunk1);
|
||||
} catch (e) {
|
||||
$readableStreamDefaultControllerError(teeState.branch1.$readableStreamController, e);
|
||||
$readableStreamDefaultControllerError(teeState.branch2.$readableStreamController, e);
|
||||
$readableStreamCancel(teeState.stream, e).$then(
|
||||
teeState.cancelPromiseCapability.resolve,
|
||||
teeState.cancelPromiseCapability.reject,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (!teeState.canceled1) {
|
||||
$readableStreamDefaultControllerEnqueue(teeState.branch1.$readableStreamController, chunk1);
|
||||
}
|
||||
if (!teeState.canceled2) {
|
||||
$readableStreamDefaultControllerEnqueue(teeState.branch2.$readableStreamController, chunk2);
|
||||
}
|
||||
teeState.reading = false;
|
||||
|
||||
Promise.resolve().$then(() => {
|
||||
if (teeState.readAgain) pullAlgorithm();
|
||||
});
|
||||
},
|
||||
() => {
|
||||
// error steps.
|
||||
teeState.reading = false;
|
||||
},
|
||||
);
|
||||
return Promise.resolve();
|
||||
};
|
||||
return pullAlgorithm;
|
||||
}
|
||||
|
||||
export function readableStreamTeeBranch1CancelFunction(teeState, stream) {
|
||||
@@ -566,8 +604,8 @@ export function readableStreamTeeBranch1CancelFunction(teeState, stream) {
|
||||
teeState.reason1 = r;
|
||||
if (teeState.canceled2) {
|
||||
$readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
|
||||
teeState.cancelPromiseCapability.$resolve,
|
||||
teeState.cancelPromiseCapability.$reject,
|
||||
teeState.cancelPromiseCapability.resolve,
|
||||
teeState.cancelPromiseCapability.reject,
|
||||
);
|
||||
}
|
||||
return teeState.cancelPromiseCapability.promise;
|
||||
@@ -580,8 +618,8 @@ export function readableStreamTeeBranch2CancelFunction(teeState, stream) {
|
||||
teeState.reason2 = r;
|
||||
if (teeState.canceled1) {
|
||||
$readableStreamCancel(stream, [teeState.reason1, teeState.reason2]).$then(
|
||||
teeState.cancelPromiseCapability.$resolve,
|
||||
teeState.cancelPromiseCapability.$reject,
|
||||
teeState.cancelPromiseCapability.resolve,
|
||||
teeState.cancelPromiseCapability.reject,
|
||||
);
|
||||
}
|
||||
return teeState.cancelPromiseCapability.promise;
|
||||
|
||||
@@ -241,7 +241,7 @@ export function transformStreamDefaultControllerPerformTransform(controller, chu
|
||||
const transformPromise = $getByIdDirectPrivate(controller, "transformAlgorithm").$call(undefined, chunk);
|
||||
transformPromise.$then(
|
||||
() => {
|
||||
promiseCapability.$resolve();
|
||||
promiseCapability.resolve();
|
||||
},
|
||||
r => {
|
||||
$transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
|
||||
@@ -286,7 +286,7 @@ export function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
|
||||
$assert(state === "writable");
|
||||
$transformStreamDefaultControllerPerformTransform(controller, chunk).$then(
|
||||
() => {
|
||||
promiseCapability.$resolve();
|
||||
promiseCapability.resolve();
|
||||
},
|
||||
e => {
|
||||
promiseCapability.reject.$call(undefined, e);
|
||||
@@ -329,7 +329,7 @@ export function transformStreamDefaultSinkCloseAlgorithm(stream) {
|
||||
// FIXME: Update readableStreamDefaultControllerClose to make this check.
|
||||
if ($readableStreamDefaultControllerCanCloseOrEnqueue(readableController))
|
||||
$readableStreamDefaultControllerClose(readableController);
|
||||
promiseCapability.$resolve();
|
||||
promiseCapability.resolve();
|
||||
},
|
||||
r => {
|
||||
$transformStreamError($getByIdDirectPrivate(controller, "stream"), r);
|
||||
|
||||
@@ -3328,16 +3328,94 @@ var require_readable = __commonJS({
|
||||
};
|
||||
var webStreamsAdapters = {
|
||||
newStreamReadableFromReadableStream,
|
||||
|
||||
newReadableStreamFromStreamReadable(streamReadable, options = {}) {
|
||||
// Not using the internal/streams/utils isReadableNodeStream utility
|
||||
// here because it will return false if streamReadable is a Duplex
|
||||
// whose readable option is false. For a Duplex that is not readable,
|
||||
// we want it to pass this check but return a closed ReadableStream.
|
||||
if (typeof streamReadable?._readableState !== "object") {
|
||||
throw new ERR_INVALID_ARG_TYPE("streamReadable", "stream.Readable", streamReadable);
|
||||
}
|
||||
var { isDestroyed, isReadable } = require_utils();
|
||||
|
||||
if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
|
||||
const readable = new ReadableStream();
|
||||
readable.cancel();
|
||||
return readable;
|
||||
}
|
||||
|
||||
const objectMode = streamReadable.readableObjectMode;
|
||||
const highWaterMark = streamReadable.readableHighWaterMark;
|
||||
|
||||
const evaluateStrategyOrFallback = strategy => {
|
||||
// If there is a strategy available, use it
|
||||
if (strategy) return strategy;
|
||||
|
||||
if (objectMode) {
|
||||
// When running in objectMode explicitly but no strategy, we just fall
|
||||
// back to CountQueuingStrategy
|
||||
return new CountQueuingStrategy({ highWaterMark });
|
||||
}
|
||||
|
||||
// When not running in objectMode explicitly, we just fall
|
||||
// back to a minimal strategy that just specifies the highWaterMark
|
||||
// and no size algorithm. Using a ByteLengthQueuingStrategy here
|
||||
// is unnecessary.
|
||||
return { highWaterMark };
|
||||
};
|
||||
|
||||
const strategy = evaluateStrategyOrFallback(options?.strategy);
|
||||
|
||||
let controller;
|
||||
|
||||
function onData(chunk) {
|
||||
controller.enqueue(chunk);
|
||||
if (controller.desiredSize <= 0) streamReadable.pause();
|
||||
}
|
||||
|
||||
streamReadable.pause();
|
||||
|
||||
const cleanup = finished(streamReadable, error => {
|
||||
if (error?.code === "ERR_STREAM_PREMATURE_CLOSE") {
|
||||
const err = new AbortError(undefined, { cause: error });
|
||||
error = err;
|
||||
}
|
||||
|
||||
cleanup();
|
||||
// This is a protection against non-standard, legacy streams
|
||||
// that happen to emit an error event again after finished is called.
|
||||
streamReadable.on("error", () => {});
|
||||
if (error) return controller.error(error);
|
||||
controller.close();
|
||||
});
|
||||
|
||||
streamReadable.on("data", onData);
|
||||
|
||||
return new ReadableStream(
|
||||
{
|
||||
start(c) {
|
||||
controller = c;
|
||||
},
|
||||
|
||||
pull() {
|
||||
streamReadable.resume();
|
||||
},
|
||||
|
||||
cancel(reason) {
|
||||
destroy(streamReadable, reason);
|
||||
},
|
||||
},
|
||||
strategy,
|
||||
);
|
||||
},
|
||||
};
|
||||
function lazyWebStreams() {
|
||||
if (webStreamsAdapters === void 0) webStreamsAdapters = {};
|
||||
return webStreamsAdapters;
|
||||
}
|
||||
|
||||
Readable.fromWeb = function (readableStream, options) {
|
||||
return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options);
|
||||
return webStreamsAdapters.newStreamReadableFromReadableStream(readableStream, options);
|
||||
};
|
||||
Readable.toWeb = function (streamReadable) {
|
||||
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable);
|
||||
Readable.toWeb = function (streamReadable, options) {
|
||||
return webStreamsAdapters.newReadableStreamFromStreamReadable(streamReadable, options);
|
||||
};
|
||||
Readable.wrap = function (src, options) {
|
||||
var _ref, _src$readableObjectMo;
|
||||
|
||||
28
src/js/out/WebCoreJSBuiltins.cpp
generated
28
src/js/out/WebCoreJSBuiltins.cpp
generated
@@ -580,9 +580,9 @@ const char* const s_transformStreamInternalsTransformStreamDefaultControllerErro
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeLength = 275;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeLength = 274;
|
||||
static const JSC::Intrinsic s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCode = "(function (d,g){\"use strict\";const _=@newPromiseCapability(@Promise);return @getByIdDirectPrivate(d,\"transformAlgorithm\").@call(@undefined,g).@then(()=>{_.@resolve()},(f)=>{@transformStreamError(@getByIdDirectPrivate(d,\"stream\"),f),_.reject.@call(@undefined,f)}),_.promise})\n";
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultControllerPerformTransformCode = "(function (d,g){\"use strict\";const _=@newPromiseCapability(@Promise);return @getByIdDirectPrivate(d,\"transformAlgorithm\").@call(@undefined,g).@then(()=>{_.resolve()},(f)=>{@transformStreamError(@getByIdDirectPrivate(d,\"stream\"),f),_.reject.@call(@undefined,f)}),_.promise})\n";
|
||||
|
||||
// transformStreamDefaultControllerTerminate
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultControllerTerminateCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
@@ -596,9 +596,9 @@ const char* const s_transformStreamInternalsTransformStreamDefaultControllerTerm
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeLength = 759;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeLength = 758;
|
||||
static const JSC::Intrinsic s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCode = "(function (d,q){\"use strict\";const j=@getByIdDirectPrivate(d,\"internalWritable\");@assert(@getByIdDirectPrivate(j,\"state\")===\"writable\");const v=@getByIdDirectPrivate(d,\"controller\");if(@getByIdDirectPrivate(d,\"backpressure\")){const _=@newPromiseCapability(@Promise),x=@getByIdDirectPrivate(d,\"backpressureChangePromise\");return @assert(x!==@undefined),x.promise.@then(()=>{const f=@getByIdDirectPrivate(j,\"state\");if(f===\"erroring\"){_.reject.@call(@undefined,@getByIdDirectPrivate(j,\"storedError\"));return}@assert(f===\"writable\"),@transformStreamDefaultControllerPerformTransform(v,q).@then(()=>{_.@resolve()},(z)=>{_.reject.@call(@undefined,z)})},(f)=>{_.reject.@call(@undefined,f)}),_.promise}return @transformStreamDefaultControllerPerformTransform(v,q)})\n";
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultSinkWriteAlgorithmCode = "(function (d,q){\"use strict\";const j=@getByIdDirectPrivate(d,\"internalWritable\");@assert(@getByIdDirectPrivate(j,\"state\")===\"writable\");const v=@getByIdDirectPrivate(d,\"controller\");if(@getByIdDirectPrivate(d,\"backpressure\")){const _=@newPromiseCapability(@Promise),x=@getByIdDirectPrivate(d,\"backpressureChangePromise\");return @assert(x!==@undefined),x.promise.@then(()=>{const f=@getByIdDirectPrivate(j,\"state\");if(f===\"erroring\"){_.reject.@call(@undefined,@getByIdDirectPrivate(j,\"storedError\"));return}@assert(f===\"writable\"),@transformStreamDefaultControllerPerformTransform(v,q).@then(()=>{_.resolve()},(z)=>{_.reject.@call(@undefined,z)})},(f)=>{_.reject.@call(@undefined,f)}),_.promise}return @transformStreamDefaultControllerPerformTransform(v,q)})\n";
|
||||
|
||||
// transformStreamDefaultSinkAbortAlgorithm
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultSinkAbortAlgorithmCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
@@ -612,9 +612,9 @@ const char* const s_transformStreamInternalsTransformStreamDefaultSinkAbortAlgor
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeLength = 786;
|
||||
const int s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeLength = 785;
|
||||
static const JSC::Intrinsic s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCode = "(function (j){\"use strict\";const _=@getByIdDirectPrivate(j,\"readable\"),I=@getByIdDirectPrivate(j,\"controller\"),k=@getByIdDirectPrivate(_,\"readableStreamController\"),q=@getByIdDirectPrivate(I,\"flushAlgorithm\");@assert(q!==@undefined);const u=@getByIdDirectPrivate(I,\"flushAlgorithm\").@call();@transformStreamDefaultControllerClearAlgorithms(I);const S=@newPromiseCapability(@Promise);return u.@then(()=>{if(@getByIdDirectPrivate(_,\"state\")===@streamErrored){S.reject.@call(@undefined,@getByIdDirectPrivate(_,\"storedError\"));return}if(@readableStreamDefaultControllerCanCloseOrEnqueue(k))@readableStreamDefaultControllerClose(k);S.@resolve()},(v)=>{@transformStreamError(@getByIdDirectPrivate(I,\"stream\"),v),S.reject.@call(@undefined,@getByIdDirectPrivate(_,\"storedError\"))}),S.promise})\n";
|
||||
const char* const s_transformStreamInternalsTransformStreamDefaultSinkCloseAlgorithmCode = "(function (S){\"use strict\";const _=@getByIdDirectPrivate(S,\"readable\"),u=@getByIdDirectPrivate(S,\"controller\"),c=@getByIdDirectPrivate(_,\"readableStreamController\"),g=@getByIdDirectPrivate(u,\"flushAlgorithm\");@assert(g!==@undefined);const j=@getByIdDirectPrivate(u,\"flushAlgorithm\").@call();@transformStreamDefaultControllerClearAlgorithms(u);const I=@newPromiseCapability(@Promise);return j.@then(()=>{if(@getByIdDirectPrivate(_,\"state\")===@streamErrored){I.reject.@call(@undefined,@getByIdDirectPrivate(_,\"storedError\"));return}if(@readableStreamDefaultControllerCanCloseOrEnqueue(c))@readableStreamDefaultControllerClose(c);I.resolve()},(k)=>{@transformStreamError(@getByIdDirectPrivate(u,\"stream\"),k),I.reject.@call(@undefined,@getByIdDirectPrivate(_,\"storedError\"))}),I.promise})\n";
|
||||
|
||||
// transformStreamDefaultSourcePullAlgorithm
|
||||
const JSC::ConstructAbility s_transformStreamInternalsTransformStreamDefaultSourcePullAlgorithmCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
@@ -1506,33 +1506,33 @@ const char* const s_readableStreamInternalsPipeToFinalizeCode = "(function (l){\
|
||||
const JSC::ConstructAbility s_readableStreamInternalsReadableStreamTeeCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamInternalsReadableStreamTeeCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_readableStreamInternalsReadableStreamTeeCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_readableStreamInternalsReadableStreamTeeCodeLength = 1102;
|
||||
const int s_readableStreamInternalsReadableStreamTeeCodeLength = 1067;
|
||||
static const JSC::Intrinsic s_readableStreamInternalsReadableStreamTeeCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeCode = "(function (i,q){\"use strict\";@assert(@isReadableStream(i)),@assert(typeof q===\"boolean\");var v=@getByIdDirectPrivate(i,\"start\");if(v)@putByIdDirectPrivate(i,\"start\",@undefined),v();const w=new @ReadableStreamDefaultReader(i),_={closedOrErrored:!1,canceled1:!1,canceled2:!1,reason1:@undefined,reason2:@undefined};_.cancelPromiseCapability=@newPromiseCapability(@Promise);const x=@readableStreamTeePullFunction(_,w,q),f={};@putByIdDirectPrivate(f,\"pull\",x),@putByIdDirectPrivate(f,\"cancel\",@readableStreamTeeBranch1CancelFunction(_,i));const g={};@putByIdDirectPrivate(g,\"pull\",x),@putByIdDirectPrivate(g,\"cancel\",@readableStreamTeeBranch2CancelFunction(_,i));const j=new @ReadableStream(f),k=new @ReadableStream(g);return @getByIdDirectPrivate(w,\"closedPromiseCapability\").promise.@then(@undefined,function(y){if(_.closedOrErrored)return;if(@readableStreamDefaultControllerError(j.@readableStreamController,y),@readableStreamDefaultControllerError(k.@readableStreamController,y),_.closedOrErrored=!0,!_.canceled1||!_.canceled2)_.cancelPromiseCapability.resolve.@call()}),_.branch1=j,_.branch2=k,[j,k]})\n";
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeCode = "(function (f,k){\"use strict\";@assert(@isReadableStream(f)),@assert(typeof k===\"boolean\");var q=@getByIdDirectPrivate(f,\"start\");if(q)@putByIdDirectPrivate(f,\"start\",@undefined),q();const v=new @ReadableStreamDefaultReader(f),i={stream:f,canceled1:!1,canceled2:!1,reason1:@undefined,reason2:@undefined,reading:!1,readAgain:!1};i.cancelPromiseCapability=@newPromiseCapability(@Promise);const w=@readableStreamTeePullFunction(i,v,k),_={};@putByIdDirectPrivate(_,\"pull\",w),@putByIdDirectPrivate(_,\"cancel\",@readableStreamTeeBranch1CancelFunction(i,f));const D={};@putByIdDirectPrivate(D,\"pull\",w),@putByIdDirectPrivate(D,\"cancel\",@readableStreamTeeBranch2CancelFunction(i,f));const g=new @ReadableStream(_),j=new @ReadableStream(D);return @getByIdDirectPrivate(v,\"closedPromiseCapability\").promise.@then(@undefined,function(x){if(@readableStreamDefaultControllerError(g.@readableStreamController,x),@readableStreamDefaultControllerError(j.@readableStreamController,x),!i.canceled1||!i.canceled2)i.cancelPromiseCapability.resolve.@call()}),i.branch1=g,i.branch2=j,[g,j]})\n";
|
||||
|
||||
// readableStreamTeePullFunction
|
||||
const JSC::ConstructAbility s_readableStreamInternalsReadableStreamTeePullFunctionCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamInternalsReadableStreamTeePullFunctionCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_readableStreamInternalsReadableStreamTeePullFunctionCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_readableStreamInternalsReadableStreamTeePullFunctionCodeLength = 763;
|
||||
const int s_readableStreamInternalsReadableStreamTeePullFunctionCodeLength = 1235;
|
||||
static const JSC::Intrinsic s_readableStreamInternalsReadableStreamTeePullFunctionCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamInternalsReadableStreamTeePullFunctionCode = "(function (_,c,f){\"use strict\";return function(){@Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(c),function(i){if(@assert(@isObject(i)),@assert(typeof i.done===\"boolean\"),i.done&&!_.closedOrErrored){if(!_.canceled1)@readableStreamDefaultControllerClose(_.branch1.@readableStreamController);if(!_.canceled2)@readableStreamDefaultControllerClose(_.branch2.@readableStreamController);if(_.closedOrErrored=!0,!_.canceled1||!_.canceled2)_.cancelPromiseCapability.resolve.@call()}if(_.closedOrErrored)return;if(!_.canceled1)@readableStreamDefaultControllerEnqueue(_.branch1.@readableStreamController,i.value);if(!_.canceled2)@readableStreamDefaultControllerEnqueue(_.branch2.@readableStreamController,f\?@structuredCloneForStream(i.value):i.value)})}})\n";
|
||||
const char* const s_readableStreamInternalsReadableStreamTeePullFunctionCode = "(function (_,q,v){\"use strict\";var d=function(){if(_.reading)return _.readAgain=!0,@Promise.resolve();return _.reading=!0,@Promise.prototype.@then.@call(@readableStreamDefaultReaderRead(q),function(b){if(@assert(@isObject(b)),@assert(typeof b.done===\"boolean\"),b.done){if(_.reading=!1,!_.canceled1)@readableStreamDefaultControllerClose(_.branch1.@readableStreamController);if(!_.canceled2)@readableStreamDefaultControllerClose(_.branch2.@readableStreamController);if(!_.canceled1||!_.canceled2)_.cancelPromiseCapability.resolve.@call();return}_.readAgain=!1;let f=b.value,j=f;if(!_.canceled2&&v&&typeof f!==\"string\")try{j=@structuredCloneForStream(f)}catch(i){@readableStreamDefaultControllerError(_.branch1.@readableStreamController,i),@readableStreamDefaultControllerError(_.branch2.@readableStreamController,i),@readableStreamCancel(_.stream,i).@then(_.cancelPromiseCapability.resolve,_.cancelPromiseCapability.reject);return}if(!_.canceled1)@readableStreamDefaultControllerEnqueue(_.branch1.@readableStreamController,f);if(!_.canceled2)@readableStreamDefaultControllerEnqueue(_.branch2.@readableStreamController,j);_.reading=!1,@Promise.resolve().@then(()=>{if(_.readAgain)d()})},()=>{_.reading=!1}),@Promise.resolve()};return d})\n";
|
||||
|
||||
// readableStreamTeeBranch1CancelFunction
|
||||
const JSC::ConstructAbility s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeLength = 257;
|
||||
const int s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeLength = 255;
|
||||
static const JSC::Intrinsic s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCode = "(function (d,i){\"use strict\";return function(n){if(d.canceled1=!0,d.reason1=n,d.canceled2)@readableStreamCancel(i,[d.reason1,d.reason2]).@then(d.cancelPromiseCapability.@resolve,d.cancelPromiseCapability.@reject);return d.cancelPromiseCapability.promise}})\n";
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeBranch1CancelFunctionCode = "(function (c,d){\"use strict\";return function(n){if(c.canceled1=!0,c.reason1=n,c.canceled2)@readableStreamCancel(d,[c.reason1,c.reason2]).@then(c.cancelPromiseCapability.resolve,c.cancelPromiseCapability.reject);return c.cancelPromiseCapability.promise}})\n";
|
||||
|
||||
// readableStreamTeeBranch2CancelFunction
|
||||
const JSC::ConstructAbility s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
const JSC::ConstructorKind s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeConstructorKind = JSC::ConstructorKind::None;
|
||||
const JSC::ImplementationVisibility s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
|
||||
const int s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeLength = 257;
|
||||
const int s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeLength = 255;
|
||||
static const JSC::Intrinsic s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCodeIntrinsic = JSC::NoIntrinsic;
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCode = "(function (d,i){\"use strict\";return function(n){if(d.canceled2=!0,d.reason2=n,d.canceled1)@readableStreamCancel(i,[d.reason1,d.reason2]).@then(d.cancelPromiseCapability.@resolve,d.cancelPromiseCapability.@reject);return d.cancelPromiseCapability.promise}})\n";
|
||||
const char* const s_readableStreamInternalsReadableStreamTeeBranch2CancelFunctionCode = "(function (c,d){\"use strict\";return function(n){if(c.canceled2=!0,c.reason2=n,c.canceled1)@readableStreamCancel(d,[c.reason1,c.reason2]).@then(c.cancelPromiseCapability.resolve,c.cancelPromiseCapability.reject);return c.cancelPromiseCapability.promise}})\n";
|
||||
|
||||
// isReadableStream
|
||||
const JSC::ConstructAbility s_readableStreamInternalsIsReadableStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -4,15 +4,15 @@ describe("Server", () => {
|
||||
test("should not allow Bun.serve without first argument being a object", () => {
|
||||
expect(() => {
|
||||
//@ts-ignore
|
||||
const server = Bun.serve();
|
||||
server.stop(true);
|
||||
Bun.serve();
|
||||
throw new Error("Should not reach here");
|
||||
}).toThrow("Bun.serve expects an object");
|
||||
|
||||
[undefined, null, 1, "string", true, false, Symbol("symbol")].forEach(value => {
|
||||
expect(() => {
|
||||
//@ts-ignore
|
||||
const server = Bun.serve(value);
|
||||
server.stop(true);
|
||||
Bun.serve(value);
|
||||
throw new Error("Should not reach here");
|
||||
}).toThrow("Bun.serve expects an object");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1074,3 +1074,216 @@ it("does propagate type for Blob", async () => {
|
||||
|
||||
server.stop(true);
|
||||
});
|
||||
|
||||
it("request.url should log successfully", async () => {
|
||||
const fixture = resolve(import.meta.dir, "./fetch.js.txt");
|
||||
const textToExpect = readFileSync(fixture, "utf-8");
|
||||
var expected;
|
||||
await runTest(
|
||||
{
|
||||
fetch(req) {
|
||||
expect(Bun.inspect(req).includes(expected)).toBe(true);
|
||||
return new Response(file(fixture));
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
expected = `http://localhost:${server.port}/helloooo`;
|
||||
const response = await fetch(expected);
|
||||
expect(response.url).toBe(expected);
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.url should be based on the Host header", async () => {
|
||||
const fixture = resolve(import.meta.dir, "./fetch.js.txt");
|
||||
const textToExpect = readFileSync(fixture, "utf-8");
|
||||
await runTest(
|
||||
{
|
||||
fetch(req) {
|
||||
expect(req.url).toBe("http://example.com/helloooo");
|
||||
return new Response(file(fixture));
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
headers: {
|
||||
Host: "example.com",
|
||||
},
|
||||
});
|
||||
expect(response.url).toBe(expected);
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.json content correct", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
await req.json();
|
||||
} catch (e: any) {
|
||||
expect(e.message).toBe("Body already used");
|
||||
}
|
||||
}, 0);
|
||||
const body = await req.json();
|
||||
expect(body).toEqual(requestBody);
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.clone json() content correct", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
const body = await req.clone().json();
|
||||
console.log("here");
|
||||
expect(body).toEqual(requestBody);
|
||||
console.log("there");
|
||||
try {
|
||||
const body2 = await req.json();
|
||||
expect(body2).toEqual(requestBody);
|
||||
} catch (e) {
|
||||
throw e;
|
||||
}
|
||||
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.clone json() after requesting body", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
const body = req.body;
|
||||
expect(body instanceof ReadableStream).toBe(true);
|
||||
const cloned = await req.clone().json();
|
||||
expect(cloned).toEqual(requestBody);
|
||||
const cloned2 = await req.clone().json();
|
||||
expect(cloned2).toEqual(requestBody);
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.clone json() twice trigger used body", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
const cloned = req.clone();
|
||||
const body1 = await cloned.json();
|
||||
try {
|
||||
const body2 = await cloned.json();
|
||||
} catch (e: any) {
|
||||
expect(e.message).toBe("Body already used");
|
||||
}
|
||||
expect(JSON.stringify(body1)).toBe(JSON.stringify(requestBody));
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.clone not allowed when body is used", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
const body = await req.json();
|
||||
expect(body).toEqual(requestBody);
|
||||
try {
|
||||
req.clone();
|
||||
expect(1).toBe(2);
|
||||
} catch (e: any) {
|
||||
expect(e.message).toBe("Body already used");
|
||||
}
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it("request.clone text() content correct", async () => {
|
||||
const textToExpect = "OK";
|
||||
const requestBody = { test: "123" };
|
||||
await runTest(
|
||||
{
|
||||
async fetch(req) {
|
||||
const cloned = req.clone();
|
||||
const body1 = await cloned.text();
|
||||
try {
|
||||
const body2 = await cloned.text();
|
||||
} catch (e: any) {
|
||||
expect(e.message).toBe("Body already used");
|
||||
}
|
||||
expect(body1).toBe(JSON.stringify(requestBody));
|
||||
return new Response(textToExpect);
|
||||
},
|
||||
},
|
||||
async server => {
|
||||
const expected = `http://${server.hostname}:${server.port}/helloooo`;
|
||||
const response = await fetch(expected, {
|
||||
method: "POST",
|
||||
body: JSON.stringify(requestBody),
|
||||
});
|
||||
expect(await response.text()).toBe(textToExpect);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -13,126 +13,139 @@ var port = 0;
|
||||
];
|
||||
const useRequestObjectValues = [true, false];
|
||||
|
||||
for (let RequestPrototypeMixin of BodyMixin) {
|
||||
for (let useRequestObject of useRequestObjectValues) {
|
||||
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${
|
||||
useRequestObject ? "fetch(req)" : "fetch(url)"
|
||||
}`, () => {
|
||||
const inputFixture = [
|
||||
[JSON.stringify("Hello World"), JSON.stringify("Hello World")],
|
||||
[JSON.stringify("Hello World 123"), Buffer.from(JSON.stringify("Hello World 123")).buffer],
|
||||
[JSON.stringify("Hello World 456"), Buffer.from(JSON.stringify("Hello World 456"))],
|
||||
[
|
||||
JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)),
|
||||
Buffer.from(
|
||||
for (let doClone of [0, "before", "after"]) {
|
||||
for (let RequestPrototypeMixin of BodyMixin) {
|
||||
for (let useRequestObject of useRequestObjectValues) {
|
||||
describe(`${doClone ? `[clone - ${doClone}] ` : ""}Request.prototoype.${RequestPrototypeMixin.name}() ${
|
||||
useRequestObject ? "fetch(req)" : "fetch(url)"
|
||||
}`, () => {
|
||||
const inputFixture = [
|
||||
[JSON.stringify("Hello World"), JSON.stringify("Hello World")],
|
||||
[JSON.stringify("Hello World 123"), Buffer.from(JSON.stringify("Hello World 123")).buffer],
|
||||
[JSON.stringify("Hello World 456"), Buffer.from(JSON.stringify("Hello World 456"))],
|
||||
[
|
||||
JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)),
|
||||
),
|
||||
],
|
||||
[
|
||||
JSON.stringify("EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)),
|
||||
Buffer.from(
|
||||
Buffer.from(
|
||||
JSON.stringify("EXTREMELY LONG VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100)),
|
||||
),
|
||||
],
|
||||
[
|
||||
JSON.stringify(
|
||||
"EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100),
|
||||
),
|
||||
),
|
||||
],
|
||||
];
|
||||
Buffer.from(
|
||||
JSON.stringify(
|
||||
"EXTREMELY LONG 🔥 UTF16 🔥 VERY LONG STRING WOW SO LONG YOU WONT BELIEVE IT! ".repeat(100),
|
||||
),
|
||||
),
|
||||
],
|
||||
];
|
||||
|
||||
for (const [name, input] of inputFixture) {
|
||||
test(`${name.slice(0, Math.min(name.length ?? name.byteLength, 64))}`, async () => {
|
||||
await runInServer(
|
||||
{
|
||||
async fetch(req) {
|
||||
var result = await RequestPrototypeMixin.call(req);
|
||||
if (RequestPrototypeMixin === Request.prototype.json) {
|
||||
result = JSON.stringify(result);
|
||||
}
|
||||
if (typeof result === "string") {
|
||||
expect(result.length).toBe(name.length);
|
||||
expect(result).toBe(name);
|
||||
} else if (result && result instanceof Blob) {
|
||||
expect(result.size).toBe(new TextEncoder().encode(name).byteLength);
|
||||
expect(await result.text()).toBe(name);
|
||||
} else {
|
||||
expect(result.byteLength).toBe(Buffer.from(input).byteLength);
|
||||
expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64"));
|
||||
}
|
||||
return new Response(result, {
|
||||
headers: req.headers,
|
||||
});
|
||||
for (const [name, input] of inputFixture) {
|
||||
test(`${name.slice(0, Math.min(name.length ?? name.byteLength, 64))}`, async () => {
|
||||
await runInServer(
|
||||
{
|
||||
async fetch(req) {
|
||||
if (doClone === "after") {
|
||||
await 1;
|
||||
}
|
||||
|
||||
if (doClone) {
|
||||
req = req.clone();
|
||||
}
|
||||
|
||||
var result = await RequestPrototypeMixin.call(req);
|
||||
|
||||
if (RequestPrototypeMixin === Request.prototype.json) {
|
||||
result = JSON.stringify(result);
|
||||
}
|
||||
if (typeof result === "string") {
|
||||
expect(result.length).toBe(name.length);
|
||||
expect(result).toBe(name);
|
||||
} else if (result && result instanceof Blob) {
|
||||
expect(result.size).toBe(new TextEncoder().encode(name).byteLength);
|
||||
expect(await result.text()).toBe(name);
|
||||
} else {
|
||||
expect(result.byteLength).toBe(Buffer.from(input).byteLength);
|
||||
expect(Bun.SHA1.hash(result, "base64")).toBe(Bun.SHA1.hash(input, "base64"));
|
||||
}
|
||||
return new Response(result, {
|
||||
headers: req.headers,
|
||||
});
|
||||
},
|
||||
},
|
||||
},
|
||||
async url => {
|
||||
var response;
|
||||
async url => {
|
||||
var response;
|
||||
|
||||
// once, then batch of 5
|
||||
// once, then batch of 5
|
||||
|
||||
if (useRequestObject) {
|
||||
response = await fetch(
|
||||
new Request({
|
||||
body: input,
|
||||
method: "POST",
|
||||
url: url,
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
},
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
response = await fetch(url, {
|
||||
body: input,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength));
|
||||
expect(response.headers.get("content-type")).toBe("text/plain");
|
||||
expect(await response.text()).toBe(name);
|
||||
|
||||
var promises = new Array(5);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
if (useRequestObject) {
|
||||
promises[i] = await fetch(
|
||||
response = await fetch(
|
||||
new Request({
|
||||
body: input,
|
||||
method: "POST",
|
||||
url: url,
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-counter": i,
|
||||
},
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
promises[i] = await fetch(url, {
|
||||
response = await fetch(url, {
|
||||
body: input,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-counter": i,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const response = results[i];
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength));
|
||||
expect(response.headers.get("content-type")).toBe("text/plain");
|
||||
expect(response.headers.get("x-counter")).toBe(String(i));
|
||||
expect(await response.text()).toBe(name);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
var promises = new Array(5);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
if (useRequestObject) {
|
||||
promises[i] = await fetch(
|
||||
new Request({
|
||||
body: input,
|
||||
method: "POST",
|
||||
url: url,
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-counter": i,
|
||||
},
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
promises[i] = await fetch(url, {
|
||||
body: input,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-counter": i,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const results = await Promise.all(promises);
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const response = results[i];
|
||||
expect(response.status).toBe(200);
|
||||
expect(response.headers.get("content-length")).toBe(String(Buffer.from(input).byteLength));
|
||||
expect(response.headers.get("content-type")).toBe("text/plain");
|
||||
expect(response.headers.get("x-counter")).toBe(String(i));
|
||||
expect(await response.text()).toBe(name);
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -334,8 +347,212 @@ describe("reader", function () {
|
||||
},
|
||||
);
|
||||
|
||||
for (let position of ["clone-begin", "clone-end"]) {
|
||||
for (let isDirectStream of [true, false]) {
|
||||
it(`streaming back ${thisArray.constructor.name}(${
|
||||
thisArray.byteLength ?? thisArray.size
|
||||
}:${inputLength}) from a request.clone().body at ${position} (${
|
||||
isDirectStream ? "direct" : "web"
|
||||
})`, async () => {
|
||||
var huge = thisArray;
|
||||
gc();
|
||||
|
||||
const expectedHash =
|
||||
huge instanceof Blob
|
||||
? Bun.SHA1.hash(new Uint8Array(await huge.arrayBuffer()), "base64")
|
||||
: Bun.SHA1.hash(huge, "base64");
|
||||
const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength;
|
||||
|
||||
const out = await runInServer(
|
||||
{
|
||||
async fetch(req) {
|
||||
try {
|
||||
if (withDelay) await 1;
|
||||
|
||||
if (position === "clone-begin") {
|
||||
req = req.clone();
|
||||
}
|
||||
|
||||
expect(req.headers.get("x-custom")).toBe("hello");
|
||||
expect(req.headers.get("content-type")).toBe("text/plain");
|
||||
expect(req.headers.get("user-agent")).toBe(navigator.userAgent);
|
||||
|
||||
gc();
|
||||
expect(req.headers.get("x-custom")).toBe("hello");
|
||||
expect(req.headers.get("content-type")).toBe("text/plain");
|
||||
expect(req.headers.get("user-agent")).toBe(navigator.userAgent);
|
||||
|
||||
if (position === "clone-end") {
|
||||
await 1;
|
||||
|
||||
req = req.clone();
|
||||
}
|
||||
|
||||
return new Response(req.body, {
|
||||
headers: req.headers,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
throw e;
|
||||
}
|
||||
},
|
||||
},
|
||||
async url => {
|
||||
gc();
|
||||
const response = await fetch(url, {
|
||||
body: huge,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-custom": "hello",
|
||||
"x-typed-array": thisArray.constructor.name,
|
||||
},
|
||||
});
|
||||
huge = undefined;
|
||||
expect(response.status).toBe(200);
|
||||
const response_body = new Uint8Array(await response.arrayBuffer());
|
||||
|
||||
expect(response_body.byteLength).toBe(expectedSize);
|
||||
expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash);
|
||||
|
||||
gc();
|
||||
if (!response.headers.has("content-type")) {
|
||||
console.error(Object.fromEntries(response.headers.entries()));
|
||||
}
|
||||
|
||||
expect(response.headers.get("content-type")).toBe("text/plain");
|
||||
gc();
|
||||
},
|
||||
);
|
||||
gc();
|
||||
return out;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (let position of ["tee-begin", "tee-end"]) {
|
||||
for (let isDirectStream of [true, false]) {
|
||||
it(`streaming back ${thisArray.constructor.name}(${
|
||||
thisArray.byteLength ?? thisArray.size
|
||||
}:${inputLength}) from a request.body.tee() at ${position} (${
|
||||
isDirectStream ? "direct" : "web"
|
||||
})`, async () => {
|
||||
var huge = thisArray;
|
||||
var called = false;
|
||||
gc();
|
||||
|
||||
const expectedHash =
|
||||
huge instanceof Blob
|
||||
? Bun.SHA1.hash(new Uint8Array(await huge.arrayBuffer()), "base64")
|
||||
: Bun.SHA1.hash(huge, "base64");
|
||||
const expectedSize = huge instanceof Blob ? huge.size : huge.byteLength;
|
||||
|
||||
const out = await runInServer(
|
||||
{
|
||||
async fetch(req) {
|
||||
try {
|
||||
var reader;
|
||||
|
||||
if (withDelay) await 1;
|
||||
|
||||
if (position === "tee-begin") {
|
||||
reader = req.body.tee()[1].getReader();
|
||||
}
|
||||
|
||||
if (position === "tee-end") {
|
||||
await 1;
|
||||
reader = req.body.tee()[1].getReader();
|
||||
}
|
||||
|
||||
expect(req.headers.get("x-custom")).toBe("hello");
|
||||
expect(req.headers.get("content-type")).toBe("text/plain");
|
||||
expect(req.headers.get("user-agent")).toBe(navigator.userAgent);
|
||||
|
||||
gc();
|
||||
expect(req.headers.get("x-custom")).toBe("hello");
|
||||
expect(req.headers.get("content-type")).toBe("text/plain");
|
||||
expect(req.headers.get("user-agent")).toBe(navigator.userAgent);
|
||||
|
||||
const direct = {
|
||||
type: "direct",
|
||||
async pull(controller) {
|
||||
if (withDelay) await 1;
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
called = true;
|
||||
controller.end();
|
||||
|
||||
return;
|
||||
}
|
||||
controller.write(value);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
const web = {
|
||||
async start() {
|
||||
if (withDelay) await 1;
|
||||
},
|
||||
async pull(controller) {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
called = true;
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
controller.enqueue(value);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
return new Response(new ReadableStream(isDirectStream ? direct : web), {
|
||||
headers: req.headers,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
throw e;
|
||||
}
|
||||
},
|
||||
},
|
||||
async url => {
|
||||
gc();
|
||||
const response = await fetch(url, {
|
||||
body: huge,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "text/plain",
|
||||
"x-custom": "hello",
|
||||
"x-typed-array": thisArray.constructor.name,
|
||||
},
|
||||
});
|
||||
huge = undefined;
|
||||
expect(response.status).toBe(200);
|
||||
const response_body = new Uint8Array(await response.arrayBuffer());
|
||||
|
||||
expect(response_body.byteLength).toBe(expectedSize);
|
||||
expect(Bun.SHA1.hash(response_body, "base64")).toBe(expectedHash);
|
||||
|
||||
gc();
|
||||
if (!response.headers.has("content-type")) {
|
||||
console.error(Object.fromEntries(response.headers.entries()));
|
||||
}
|
||||
|
||||
expect(response.headers.get("content-type")).toBe("text/plain");
|
||||
gc();
|
||||
},
|
||||
);
|
||||
expect(called).toBe(true);
|
||||
gc();
|
||||
return out;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
for (let isDirectStream of [true, false]) {
|
||||
const positions = ["begin", "end"];
|
||||
const positions = ["begin", "end"] as const;
|
||||
const inner = thisArray => {
|
||||
for (let position of positions) {
|
||||
it(`streaming back ${thisArray.constructor.name}(${
|
||||
|
||||
@@ -171,20 +171,34 @@ for (const { body, fn } of bodyTypes) {
|
||||
const streams = [
|
||||
{
|
||||
label: "empty stream",
|
||||
stream: () => new ReadableStream(),
|
||||
stream: () => {
|
||||
var stream = new ReadableStream();
|
||||
stream.cancel();
|
||||
return stream;
|
||||
},
|
||||
content: "",
|
||||
skip: true, // hangs
|
||||
},
|
||||
{
|
||||
label: "custom stream",
|
||||
stream: () =>
|
||||
new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue("hello\n");
|
||||
async start(controller) {
|
||||
await controller.enqueue("hello\n");
|
||||
await controller.close();
|
||||
},
|
||||
}),
|
||||
content: "hello\n",
|
||||
},
|
||||
{
|
||||
label: "custom stream (pull)",
|
||||
stream: () =>
|
||||
new ReadableStream({
|
||||
async pull(controller) {
|
||||
await controller.enqueue("hello\n");
|
||||
await controller.close();
|
||||
},
|
||||
}),
|
||||
content: "hello\n",
|
||||
skip: true, // hangs
|
||||
},
|
||||
{
|
||||
label: "direct stream",
|
||||
@@ -197,7 +211,6 @@ for (const { body, fn } of bodyTypes) {
|
||||
},
|
||||
}),
|
||||
content: "bye\n",
|
||||
skip: true, // hangs
|
||||
},
|
||||
{
|
||||
label: "Bun.file() stream",
|
||||
@@ -230,17 +243,23 @@ for (const { body, fn } of bodyTypes) {
|
||||
content: /Example Domain/,
|
||||
},
|
||||
];
|
||||
for (const { label, stream, content, skip } of streams) {
|
||||
const it = skip ? test.skip : test;
|
||||
it(label, async () => {
|
||||
expect(async () => fn(await stream())).not.toThrow();
|
||||
const text = await fn(await stream()).text();
|
||||
if (typeof content === "string") {
|
||||
expect(text).toBe(content);
|
||||
} else {
|
||||
expect(content.test(text)).toBe(true);
|
||||
}
|
||||
});
|
||||
for (let clone of [false, true]) {
|
||||
for (const { label, stream, content, skip = false } of streams) {
|
||||
const it = skip ? test.skip : test;
|
||||
it((clone ? "clone -> " : "") + label, async () => {
|
||||
expect(async () => fn(await stream())).not.toThrow();
|
||||
let call = fn(await stream());
|
||||
if (clone) {
|
||||
call = call.clone();
|
||||
}
|
||||
const text = await call.text();
|
||||
if (typeof content === "string") {
|
||||
expect(text).toBe(content);
|
||||
} else {
|
||||
expect(content.test(text)).toBe(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
test(body.name, async () => {
|
||||
|
||||
Reference in New Issue
Block a user