Compare commits

...

2 Commits

Author SHA1 Message Date
cirospaciari
543c7f7e56 todos 2024-06-12 17:48:15 -03:00
cirospaciari
fea79acde8 wip 2024-06-12 17:48:15 -03:00
3 changed files with 84 additions and 20 deletions

View File

@@ -4,6 +4,7 @@ const JSC = bun.JSC;
pub const WeakRefType = enum(u32) {
None = 0,
FetchResponse = 1,
FetchResponseStream = 2,
};
const WeakImpl = opaque {
pub fn init(globalThis: *JSC.JSGlobalObject, value: JSC.JSValue, refType: WeakRefType, ctx: ?*anyopaque) *WeakImpl {

View File

@@ -10,12 +10,14 @@ namespace Bun {
enum class WeakRefType : uint32_t {
None = 0,
FetchResponse = 1,
FetchResponseStream = 2,
};
typedef void (*WeakRefFinalizeFn)(void* context);
#define FOR_EACH_WEAK_REF_TYPE(macro) \
macro(FetchResponse)
macro(FetchResponse) \
macro(FetchResponseStream)
#define DECLARE_WEAK_REF_OWNER(X) \
extern "C" void Bun__##X##_finalize(void* context);
@@ -32,6 +34,9 @@ public:
case WeakRefType::FetchResponse:
Bun__FetchResponse_finalize(context);
break;
case WeakRefType::FetchResponseStream:
Bun__FetchResponseStream_finalize(context);
break;
default:
break;
}

View File

@@ -24,7 +24,6 @@ const Properties = @import("../base.zig").Properties;
const castObj = @import("../base.zig").castObj;
const getAllocator = @import("../base.zig").getAllocator;
const GetJSPrivateData = @import("../base.zig").GetJSPrivateData;
const Environment = @import("../../env.zig");
const ZigString = JSC.ZigString;
const IdentityContext = @import("../../identity_context.zig").IdentityContext;
@@ -34,6 +33,7 @@ const JSError = JSC.JSError;
const JSGlobalObject = JSC.JSGlobalObject;
const NullableAllocator = bun.NullableAllocator;
const DataURL = @import("../../resolver/data_url.zig").DataURL;
const ReadableStream = @import("./streams.zig").ReadableStream;
const SSLConfig = @import("../api/server.zig").ServerConfig.SSLConfig;
@@ -735,7 +735,7 @@ pub const Fetch = struct {
native_response: ?*Response = null,
ignore_data: bool = false,
/// stream strong ref if any is available
readable_stream_ref: JSC.WebCore.ReadableStream.Strong = .{},
readable_stream: JSC.Weak(FetchTasklet) = .{},
request_headers: Headers = Headers{ .allocator = undefined },
promise: JSC.JSPromise.Strong,
concurrent_task: JSC.ConcurrentTask = .{},
@@ -860,7 +860,7 @@ pub const Fetch = struct {
response.unref();
}
this.readable_stream_ref.deinit();
this.readable_stream.deinit();
this.scheduled_response_buffer.deinit();
this.request_body.detach();
@@ -894,6 +894,19 @@ pub const Fetch = struct {
bun.default_allocator.destroy(reporter);
}
fn getReadableStream(this: *FetchTasklet) ?ReadableStream {
// TODO: add native ref here to check the native source
// we need to check if we have a underlining source with pending resolution
// if (this.native_readable_stream) |readable_stream| {
// return readable_stream;
// }
if (this.readable_stream.get()) |readable_stream_js| {
return ReadableStream.fromJS(readable_stream_js, this.global_this);
}
return null;
}
fn getCurrentResponse(this: *FetchTasklet) ?*Response {
// we need a body to resolve the promise when buffering
if (this.native_response) |response| {
@@ -933,7 +946,7 @@ pub const Fetch = struct {
const err = this.onReject();
err.ensureStillAlive();
// if we are streaming update with error
if (this.readable_stream_ref.get()) |readable| {
if (this.getReadableStream()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.onData(
.{
@@ -957,7 +970,7 @@ pub const Fetch = struct {
return;
}
if (this.readable_stream_ref.get()) |readable| {
if (this.getReadableStream()) |readable| {
if (readable.ptr == .Bytes) {
readable.ptr.Bytes.size_hint = this.getSizeHint();
// body can be marked as used but we still need to pipe the data
@@ -973,8 +986,8 @@ pub const Fetch = struct {
bun.default_allocator,
);
} else {
var prev = this.readable_stream_ref;
this.readable_stream_ref = .{};
var prev = this.readable_stream;
this.readable_stream = .{};
defer prev.deinit();
readable.ptr.Bytes.onData(
.{
@@ -1378,7 +1391,9 @@ pub const Fetch = struct {
pub fn onReadableStreamAvailable(ctx: *anyopaque, readable: JSC.WebCore.ReadableStream) void {
const this = bun.cast(*FetchTasklet, ctx);
this.readable_stream_ref = JSC.WebCore.ReadableStream.Strong.init(readable, this.global_this);
this.readable_stream = JSC.Weak(FetchTasklet).create(readable.value, this.global_this, .FetchResponseStream, this);
// we need to check if we have a underlining source with pending resolution
// TODO: add a native ref here to check the native source
}
pub fn onStartStreamingRequestBodyCallback(ctx: *anyopaque) JSC.WebCore.DrainResult {
@@ -1496,7 +1511,7 @@ pub const Fetch = struct {
const vm = this.global_this.bunVM();
this.poll_ref.unref(vm);
// clean any remaining refereces
this.readable_stream_ref.deinit();
this.readable_stream.deinit();
this.response.deinit();
if (this.native_response) |response| {
@@ -1507,37 +1522,80 @@ pub const Fetch = struct {
this.ignore_data = true;
}
export fn Bun__FetchResponseStream_finalize(this: *FetchTasklet) callconv(.C) void {
if (this.native_response) |response| {
const body = response.body;
// Four scenarios:
//
// 1. We are already done.
// 2. We are streaming, in which case
// 2a. if we have no promise pending we should ignore the body.
// 2b. if we have a promise pending we should keep loading the body.
// 3. We were buffering, in which case
// 3a. if we have no promise, we should ignore the body.
// 3b. if we have a promise, we should keep loading the body.
// 4. We never started buffering, in which case we should ignore the body.
//
if (body.value != .Locked) {
// Scenario 1
return;
}
// if(this.native_readable_stream) |readable_stream| {
// TODO: check for pending stream reads to solve and if none we should cancel the reader here
// Scenario 2
// return;
// }
if (body.value.Locked.promise) |promise| {
if (promise.isEmptyOrUndefinedOrNull()) {
// Scenario 3a.
this.ignoreRemainingResponseBody();
}
} else {
// Scenario 4.
this.ignoreRemainingResponseBody();
}
}
}
export fn Bun__FetchResponse_finalize(this: *FetchTasklet) callconv(.C) void {
log("onResponseFinalize", .{});
if (this.native_response) |response| {
const body = response.body;
// Three scenarios:
// Four scenarios:
//
// 1. We are streaming, in which case we should not ignore the body.
// 2. We were buffering, in which case
// 2a. if we have no promise, we should ignore the body.
// 2b. if we have a promise, we should keep loading the body.
// 3. We never started buffering, in which case we should ignore the body.
// 1. We are already done.
// 2. We are streaming, in which case we should not ignore the body.
// 3. We were buffering, in which case
// 3a. if we have no promise, we should ignore the body.
// 3b. if we have a promise, we should keep loading the body.
// 4. We never started buffering, in which case we should ignore the body.
//
// Note: We cannot call .get() on the ReadableStreamRef. This is called inside a finalizer.
if (body.value != .Locked or this.readable_stream_ref.held.has()) {
// Scenario 1 or 3.
if (body.value != .Locked or this.readable_stream.has()) {
// Scenario 1 or 2.
return;
}
// if(this.native_readable_stream) |readable_stream| {
// TODO: check for pending stream reads to solve and if none we should cancel the reader here
// return;
// }
if (body.value.Locked.promise) |promise| {
if (promise.isEmptyOrUndefinedOrNull()) {
// Scenario 2b.
// Scenario 3a.
this.ignoreRemainingResponseBody();
}
} else {
// Scenario 3.
// Scenario 4.
this.ignoreRemainingResponseBody();
}
}
}
comptime {
_ = Bun__FetchResponse_finalize;
_ = Bun__FetchResponseStream_finalize;
}
pub fn onResolve(this: *FetchTasklet) JSValue {