Compare commits

...

3 Commits

Author SHA1 Message Date
Ciro Spaciari
8fa8c12656 leak tests 2024-12-04 06:25:06 -08:00
Ciro Spaciari
8d32d8ce21 maybe 2024-12-04 06:07:40 -08:00
Ciro Spaciari
484e762ad7 enable duplex on fetch 2024-12-04 05:00:51 -08:00
15 changed files with 1027 additions and 65 deletions

View File

@@ -9,9 +9,10 @@ enum SinkID : uint8_t {
HTMLRewriterSink = 3,
HTTPResponseSink = 4,
HTTPSResponseSink = 5,
FetchTaskletChunkedRequestSink = 6,
};
static constexpr unsigned numberOfSinkIDs
= 7;
= 8;
}

View File

@@ -3127,6 +3127,12 @@ void GlobalObject::finishCreation(VM& vm)
init.set(prototype);
});
m_JSFetchTaskletChunkedRequestControllerPrototype.initLater(
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSObject>::Initializer& init) {
auto* prototype = createJSSinkControllerPrototype(init.vm, init.owner, WebCore::SinkID::FetchTaskletChunkedRequestSink);
init.set(prototype);
});
m_performanceObject.initLater(
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSObject>::Initializer& init) {
auto* globalObject = reinterpret_cast<Zig::GlobalObject*>(init.owner);
@@ -3255,6 +3261,16 @@ void GlobalObject::finishCreation(VM& vm)
init.setConstructor(constructor);
});
m_JSFetchTaskletChunkedRequestSinkClassStructure.initLater(
[](LazyClassStructure::Initializer& init) {
auto* prototype = createJSSinkPrototype(init.vm, init.global, WebCore::SinkID::FetchTaskletChunkedRequestSink);
auto* structure = JSFetchTaskletChunkedRequestSink::createStructure(init.vm, init.global, prototype);
auto* constructor = JSFetchTaskletChunkedRequestSinkConstructor::create(init.vm, init.global, JSFetchTaskletChunkedRequestSinkConstructor::createStructure(init.vm, init.global, init.global->functionPrototype()), jsCast<JSObject*>(prototype));
init.setPrototype(prototype);
init.setStructure(structure);
init.setConstructor(constructor);
});
m_JSBufferClassStructure.initLater(
[](LazyClassStructure::Initializer& init) {
auto prototype = WebCore::createBufferPrototype(init.vm, init.global);
@@ -3810,6 +3826,8 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor)
thisObject->m_JSHTTPResponseSinkClassStructure.visit(visitor);
thisObject->m_JSHTTPSResponseControllerPrototype.visit(visitor);
thisObject->m_JSHTTPSResponseSinkClassStructure.visit(visitor);
thisObject->m_JSFetchTaskletChunkedRequestSinkClassStructure.visit(visitor);
thisObject->m_JSFetchTaskletChunkedRequestControllerPrototype.visit(visitor);
thisObject->m_JSSocketAddressStructure.visit(visitor);
thisObject->m_JSSQLStatementStructure.visit(visitor);
thisObject->m_V8GlobalInternals.visit(visitor);
@@ -4349,6 +4367,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__onResolveEntryPointResult;
} else if (handler == Bun__onRejectEntryPointResult) {
return GlobalObject::PromiseFunctions::Bun__onRejectEntryPointResult;
} else if (handler == Bun__FetchTasklet__onResolveRequestStream) {
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onResolveRequestStream;
} else if (handler == Bun__FetchTasklet__onRejectRequestStream) {
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onRejectRequestStream;
} else {
RELEASE_ASSERT_NOT_REACHED();
}

View File

@@ -210,6 +210,11 @@ public:
JSC::JSValue HTTPSResponseSinkPrototype() const { return m_JSHTTPSResponseSinkClassStructure.prototypeInitializedOnMainThread(this); }
JSC::JSValue JSReadableHTTPSResponseSinkControllerPrototype() const { return m_JSHTTPSResponseControllerPrototype.getInitializedOnMainThread(this); }
JSC::Structure* FetchTaskletChunkedRequestSinkStructure() const { return m_JSFetchTaskletChunkedRequestSinkClassStructure.getInitializedOnMainThread(this); }
JSC::JSObject* FetchTaskletChunkedRequestSink() { return m_JSFetchTaskletChunkedRequestSinkClassStructure.constructorInitializedOnMainThread(this); }
JSC::JSValue FetchTaskletChunkedRequestSinkPrototype() const { return m_JSFetchTaskletChunkedRequestSinkClassStructure.prototypeInitializedOnMainThread(this); }
JSC::JSValue JSReadableFetchTaskletChunkedRequestSinkControllerPrototype() const { return m_JSFetchTaskletChunkedRequestControllerPrototype.getInitializedOnMainThread(this); }
JSC::Structure* JSBufferListStructure() const { return m_JSBufferListClassStructure.getInitializedOnMainThread(this); }
JSC::JSObject* JSBufferList() { return m_JSBufferListClassStructure.constructorInitializedOnMainThread(this); }
JSC::JSValue JSBufferListPrototype() const { return m_JSBufferListClassStructure.prototypeInitializedOnMainThread(this); }
@@ -329,8 +334,10 @@ public:
Bun__BodyValueBufferer__onResolveStream,
Bun__onResolveEntryPointResult,
Bun__onRejectEntryPointResult,
Bun__FetchTasklet__onRejectRequestStream,
Bun__FetchTasklet__onResolveRequestStream,
};
static constexpr size_t promiseFunctionsSize = 24;
static constexpr size_t promiseFunctionsSize = 26;
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));
@@ -504,6 +511,8 @@ public:
LazyClassStructure m_JSFileSinkClassStructure;
LazyClassStructure m_JSHTTPResponseSinkClassStructure;
LazyClassStructure m_JSHTTPSResponseSinkClassStructure;
LazyClassStructure m_JSFetchTaskletChunkedRequestSinkClassStructure;
LazyClassStructure m_JSStringDecoderClassStructure;
LazyClassStructure m_NapiClassStructure;
LazyClassStructure m_callSiteStructure;
@@ -533,6 +542,7 @@ public:
LazyProperty<JSGlobalObject, JSMap> m_esmRegistryMap;
LazyProperty<JSGlobalObject, JSObject> m_JSArrayBufferControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_JSHTTPSResponseControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_JSFetchTaskletChunkedRequestControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_JSFileSinkControllerPrototype;
LazyProperty<JSGlobalObject, JSObject> m_subtleCryptoObject;
LazyProperty<JSGlobalObject, Structure> m_JSHTTPResponseController;

View File

@@ -140,6 +140,7 @@ pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink;
pub const JSHTTPSResponseSink = JSC.WebCore.HTTPSResponseSink.JSSink;
pub const JSHTTPResponseSink = JSC.WebCore.HTTPResponseSink.JSSink;
pub const JSFileSink = JSC.WebCore.FileSink.JSSink;
pub const JSFetchTaskletChunkedRequestSink = JSC.WebCore.FetchTaskletChunkedRequestSink.JSSink;
// WebSocket
pub const WebSocketHTTPClient = @import("../../http/websocket_http_client.zig").WebSocketHTTPClient;
@@ -962,6 +963,7 @@ comptime {
JSArrayBufferSink.shim.ref();
JSHTTPResponseSink.shim.ref();
JSHTTPSResponseSink.shim.ref();
JSFetchTaskletChunkedRequestSink.shim.ref();
JSFileSink.shim.ref();
JSFileSink.shim.ref();
_ = ZigString__free;

View File

@@ -1,11 +1,5 @@
// clang-format off
//-- GENERATED FILE. Do not edit --
//
// To regenerate this file, run:
//
// make headers
//
//-- GENERATED FILE. Do not edit --
// This file used to be generated but now in hard coded.
#pragma once
#include <stddef.h>
@@ -691,6 +685,25 @@ BUN_DECLARE_HOST_FUNCTION(FileSink__start);
ZIG_DECL void FileSink__updateRef(void* arg0, bool arg1);
BUN_DECLARE_HOST_FUNCTION(FileSink__write);
#endif
CPP_DECL JSC__JSValue FetchTaskletChunkedRequestSink__assignToStream(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1, void* arg2, void** arg3);
CPP_DECL JSC__JSValue FetchTaskletChunkedRequestSink__createObject(JSC__JSGlobalObject* arg0, void* arg1, uintptr_t destructor);
CPP_DECL void FetchTaskletChunkedRequestSink__detachPtr(JSC__JSValue JSValue0);
CPP_DECL void* FetchTaskletChunkedRequestSink__fromJS(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1);
CPP_DECL void FetchTaskletChunkedRequestSink__onClose(JSC__JSValue JSValue0, JSC__JSValue JSValue1);
CPP_DECL void FetchTaskletChunkedRequestSink__onReady(JSC__JSValue JSValue0, JSC__JSValue JSValue1, JSC__JSValue JSValue2);
#ifdef __cplusplus
ZIG_DECL JSC__JSValue FetchTaskletChunkedRequestSink__close(JSC__JSGlobalObject* arg0, void* arg1);
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__construct);
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__end);
ZIG_DECL JSC__JSValue SYSV_ABI SYSV_ABI FetchTaskletChunkedRequestSink__endWithSink(void* arg0, JSC__JSGlobalObject* arg1);
ZIG_DECL void FetchTaskletChunkedRequestSink__finalize(void* arg0);
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__flush);
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__start);
ZIG_DECL void FetchTaskletChunkedRequestSink__updateRef(void* arg0, bool arg1);
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__write);
#endif
#ifdef __cplusplus
@@ -852,4 +865,7 @@ CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *arg
BUN_DECLARE_HOST_FUNCTION(Bun__onResolveEntryPointResult);
BUN_DECLARE_HOST_FUNCTION(Bun__onRejectEntryPointResult);
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onResolveRequestStream);
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onRejectRequestStream);
#endif

View File

@@ -379,6 +379,13 @@ pub extern fn FileSink__setDestroyCallback(JSValue0: JSC__JSValue, callback: usi
pub extern fn FileSink__fromJS(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) ?*anyopaque;
pub extern fn FileSink__onClose(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue) void;
pub extern fn FileSink__onReady(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue, JSValue2: JSC__JSValue) void;
pub extern fn FetchTaskletChunkedRequestSink__assignToStream(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue, arg2: ?*anyopaque, arg3: [*c]*anyopaque) JSC__JSValue;
pub extern fn FetchTaskletChunkedRequestSink__createObject(arg0: *bindings.JSGlobalObject, arg1: ?*anyopaque, onDestroyPtrTag: usize) JSC__JSValue;
pub extern fn FetchTaskletChunkedRequestSink__detachPtr(JSValue0: JSC__JSValue) void;
pub extern fn FetchTaskletChunkedRequestSink__setDestroyCallback(JSValue0: JSC__JSValue, callback: usize) void;
pub extern fn FetchTaskletChunkedRequestSink__fromJS(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) ?*anyopaque;
pub extern fn FetchTaskletChunkedRequestSink__onClose(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue) void;
pub extern fn FetchTaskletChunkedRequestSink__onReady(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue, JSValue2: JSC__JSValue) void;
pub extern fn ZigException__fromException(arg0: [*c]bindings.Exception) ZigException;
pub const JSC__GetterSetter = bindings.GetterSetter;

View File

@@ -743,7 +743,7 @@ pub const DeferredTaskQueue = struct {
};
if (!this.map.values()[i](key)) {
this.map.swapRemoveAt(i);
_ = this.map.swapRemove(key);
last = this.map.count();
} else {
i += 1;

View File

@@ -158,6 +158,18 @@ pub const Body = struct {
return false;
}
pub fn isDisturbed2(this: *const PendingValue, globalObject: *JSC.JSGlobalObject) bool {
if (this.promise != null) {
return true;
}
if (this.readable.get()) |readable| {
return readable.isDisturbed(globalObject);
}
return false;
}
pub fn isStreamingOrBuffering(this: *PendingValue) bool {
return this.readable.held.has() or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull());
}

View File

@@ -765,14 +765,17 @@ pub const Fetch = struct {
};
pub const FetchTasklet = struct {
const log = Output.scoped(.FetchTasklet, false);
pub const FetchTaskletStream = JSC.WebCore.FetchTaskletChunkedRequestSink;
const log = Output.scoped(.FetchTasklet, false);
sink: ?*FetchTaskletStream.JSSink = null,
http: ?*http.AsyncHTTP = null,
result: http.HTTPClientResult = .{},
metadata: ?http.HTTPResponseMetadata = null,
javascript_vm: *VirtualMachine = undefined,
global_this: *JSGlobalObject = undefined,
request_body: HTTPRequestBody = undefined,
/// buffer being used by AsyncHTTP
response_buffer: MutableString = undefined,
/// buffer used to stream response to JS
@@ -814,6 +817,7 @@ pub const Fetch = struct {
hostname: ?[]u8 = null,
is_waiting_body: bool = false,
is_waiting_abort: bool = false,
is_waiting_request_stream_start: bool = false,
mutex: Mutex,
tracker: JSC.AsyncTaskTracker,
@@ -849,6 +853,9 @@ pub const Fetch = struct {
pub const HTTPRequestBody = union(enum) {
AnyBlob: AnyBlob,
Sendfile: http.Sendfile,
ReadableStream: JSC.WebCore.ReadableStream.Strong,
pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } };
pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
return switch (this.*) {
@@ -867,6 +874,9 @@ pub const Fetch = struct {
pub fn detach(this: *HTTPRequestBody) void {
switch (this.*) {
.AnyBlob => this.AnyBlob.detach(),
.ReadableStream => |*stream| {
stream.deinit();
},
.Sendfile => {
if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
_ = bun.sys.close(this.Sendfile.fd);
@@ -875,12 +885,71 @@ pub const Fetch = struct {
},
}
}
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody {
var body_value = try Body.Value.fromJS(globalThis, value);
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) {
return globalThis.ERR_BODY_ALREADY_USED("body already used", .{}).throw();
}
if (body_value == .Locked) {
if (body_value.Locked.readable.has()) {
// just grab the ref
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
const readable = body_value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
}
}
return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
}
pub fn needsToReadFile(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.needsToReadFile(),
else => false,
};
}
pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.hasContentTypeFromUser(),
else => false,
};
}
pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob {
return switch (this.*) {
.AnyBlob => &this.AnyBlob,
else => null,
};
}
pub fn hasBody(this: *HTTPRequestBody) bool {
return switch (this.*) {
.AnyBlob => |blob| blob.size() > 0,
.ReadableStream => |*stream| stream.has(),
.Sendfile => true,
};
}
};
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
return FetchTasklet{};
}
fn clearSink(this: *FetchTasklet) void {
if (this.sink) |wrapper| {
this.sink = null;
wrapper.sink.done = true;
wrapper.sink.ended = true;
wrapper.sink.finalize();
wrapper.detach();
wrapper.sink.destroy();
}
}
fn clearData(this: *FetchTasklet) void {
log("clearData", .{});
const allocator = this.memory_reporter.allocator();
@@ -923,7 +992,9 @@ pub const Fetch = struct {
this.readable_stream_ref.deinit();
this.scheduled_response_buffer.deinit();
this.request_body.detach();
if (this.request_body != .ReadableStream or this.is_waiting_request_stream_start) {
this.request_body.detach();
}
this.abort_reason.deinit();
this.check_server_identity.deinit();
@@ -965,6 +1036,147 @@ pub const Fetch = struct {
return null;
}
pub fn onResolveRequestStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
var args = callframe.arguments_old(2);
var this: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
defer this.deref();
if (this.request_body == .ReadableStream) {
var readable_stream_ref = this.request_body.ReadableStream;
this.request_body.ReadableStream = .{};
defer readable_stream_ref.deinit();
if (readable_stream_ref.get()) |stream| {
stream.done(globalThis);
this.clearSink();
}
}
return JSValue.jsUndefined();
}
pub fn onRejectRequestStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(@This());
defer this.deref();
const err = args.ptr[0];
if (this.request_body == .ReadableStream) {
var readable_stream_ref = this.request_body.ReadableStream;
this.request_body.ReadableStream = .{};
defer readable_stream_ref.deinit();
if (readable_stream_ref.get()) |stream| {
stream.cancel(globalThis);
this.clearSink();
}
}
this.abortListener(err);
return JSValue.jsUndefined();
}
pub const shim = JSC.Shimmer("Bun", "FetchTasklet", @This());
pub const Export = shim.exportFunctions(.{
.onResolveRequestStream = onResolveRequestStream,
.onRejectRequestStream = onRejectRequestStream,
});
comptime {
const jsonResolveRequestStream = JSC.toJSHostFunction(onResolveRequestStream);
@export(jsonResolveRequestStream, .{ .name = Export[0].symbol_name });
const jsonRejectRequestStream = JSC.toJSHostFunction(onRejectRequestStream);
@export(jsonRejectRequestStream, .{ .name = Export[1].symbol_name });
}
pub fn startRequestStream(this: *FetchTasklet) void {
this.is_waiting_request_stream_start = false;
bun.assert(this.request_body == .ReadableStream);
if (this.request_body.ReadableStream.get()) |stream| {
this.ref(); // lets only unref when sink is done
const globalThis = this.global_this;
var response_stream = bun.default_allocator.create(FetchTaskletStream.JSSink) catch unreachable;
response_stream.* = FetchTaskletStream.JSSink{
.sink = .{
.task = this,
.buffer = .{},
.globalThis = globalThis,
},
};
var signal = &response_stream.sink.signal;
this.sink = response_stream;
signal.* = FetchTaskletStream.JSSink.SinkSignal.init(JSValue.zero);
// explicitly set it to a dead pointer
// we use this memory address to disable signals being sent
signal.clear();
bun.assert(signal.isDead());
// We are already corked!
const assignment_result: JSValue = FetchTaskletStream.JSSink.assignToStream(
globalThis,
stream.value,
response_stream,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
// assert that it was updated
bun.assert(!signal.isDead());
if (assignment_result.toError()) |err_value| {
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
return this.abortListener(err_value);
}
if (!assignment_result.isEmptyOrUndefinedOrNull()) {
this.javascript_vm.drainMicrotasks();
assignment_result.ensureStillAlive();
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
this.ref();
assignment_result.then(
globalThis,
this,
onResolveRequestStream,
onRejectRequestStream,
);
},
.fulfilled => {
var readable_stream_ref = this.request_body.ReadableStream;
this.request_body.ReadableStream = .{};
defer {
stream.done(globalThis);
this.clearSink();
readable_stream_ref.deinit();
}
},
.rejected => {
var readable_stream_ref = this.request_body.ReadableStream;
this.request_body.ReadableStream = .{};
defer {
stream.cancel(globalThis);
this.clearSink();
readable_stream_ref.deinit();
}
this.abortListener(promise.result(globalThis.vm()));
},
}
return;
} else {
// if is not a promise we treat it as Error
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
return this.abortListener(assignment_result);
}
}
}
}
pub fn onBodyReceived(this: *FetchTasklet) void {
const success = this.result.isSuccess();
const globalThis = this.global_this;
@@ -1141,11 +1353,17 @@ pub const Fetch = struct {
this.deref();
}
}
if (this.is_waiting_request_stream_start and this.result.can_stream) {
// start streaming
this.startRequestStream();
}
// if we already respond the metadata and still need to process the body
if (this.is_waiting_body) {
this.onBodyReceived();
return;
}
if (this.metadata == null and this.result.isSuccess()) return;
// if we abort because of cert error
// we wait the Http Client because we already have the response
// we just need to deinit
@@ -1195,7 +1413,6 @@ pub const Fetch = struct {
this.promise.deinit();
}
const success = this.result.isSuccess();
const result = switch (success) {
true => JSC.Strong.create(this.onResolve(), globalThis),
false => brk: {
@@ -1713,7 +1930,18 @@ pub const Fetch = struct {
.tls_props = fetch_options.ssl_config,
},
);
// enable streaming the write side
const isStream = fetch_tasklet.request_body == .ReadableStream;
fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream;
fetch_tasklet.is_waiting_request_stream_start = isStream;
if (isStream) {
fetch_tasklet.http.?.request_body = .{
.stream = .{
.buffer = .{},
.ended = false,
},
};
}
// TODO is this necessary? the http client already sets the redirect type,
// so manually setting it here seems redundant
if (fetch_options.redirect_type != FetchRedirect.follow) {
@@ -1740,6 +1968,22 @@ pub const Fetch = struct {
log("abortListener", .{});
reason.ensureStillAlive();
this.abort_reason.set(this.global_this, reason);
this.abortTask();
if (this.sink) |wrapper| {
wrapper.sink.abort();
return;
}
}
pub fn sendRequestData(this: *FetchTasklet, data: []const u8, ended: bool) void {
if (this.http) |http_| {
http.http_thread.scheduleRequestWrite(http_, data, ended);
} else if (data.len != 3) {
bun.default_allocator.free(data);
}
}
pub fn abortTask(this: *FetchTasklet) void {
this.signal_store.aborted.store(true, .monotonic);
this.tracker.didCancel(this.global_this);
@@ -2017,9 +2261,7 @@ pub const Fetch = struct {
// which is important for FormData.
// https://github.com/oven-sh/bun/issues/2264
//
var body: AnyBlob = AnyBlob{
.Blob = .{},
};
var body: FetchTasklet.HTTPRequestBody = FetchTasklet.HTTPRequestBody.Empty;
var disable_timeout = false;
var disable_keepalive = false;
@@ -2559,8 +2801,7 @@ pub const Fetch = struct {
if (options_object) |options| {
if (options.fastGet(globalThis, .body)) |body__| {
if (!body__.isUndefined()) {
var body_value = try Body.Value.fromJS(ctx, body__);
break :extract_body body_value.useAsAnyBlob();
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
}
}
@@ -2575,20 +2816,29 @@ pub const Fetch = struct {
return globalThis.ERR_BODY_ALREADY_USED("Request body already used", .{}).throw();
}
break :extract_body req.body.value.useAsAnyBlob();
if (req.body.value == .Locked) {
if (req.body.value.Locked.readable.has()) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = JSC.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get().?, globalThis) };
}
const readable = req.body.value.toReadableStream(globalThis);
if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has()) {
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = JSC.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get().?, globalThis) };
}
}
break :extract_body FetchTasklet.HTTPRequestBody{ .AnyBlob = req.body.value.useAsAnyBlob() };
}
if (request_init_object) |req| {
if (req.fastGet(globalThis, .body)) |body__| {
if (!body__.isUndefined()) {
var body_value = try Body.Value.fromJS(ctx, body__);
break :extract_body body_value.useAsAnyBlob();
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
}
}
}
break :extract_body null;
} orelse AnyBlob{ .Blob = .{} };
} orelse FetchTasklet.HTTPRequestBody.Empty;
if (globalThis.hasException()) {
is_error = true;
@@ -2682,7 +2932,7 @@ pub const Fetch = struct {
hostname = _hostname.toOwnedSliceZ(allocator) catch bun.outOfMemory();
}
break :extract_headers Headers.from(headers_, allocator, .{ .body = &body }) catch bun.outOfMemory();
break :extract_headers Headers.from(headers_, allocator, .{ .body = body.getAnyBlob() }) catch bun.outOfMemory();
}
break :extract_headers headers;
@@ -2816,27 +3066,25 @@ pub const Fetch = struct {
}
}
if (!method.hasRequestBody() and body.size() > 0) {
if (!method.hasRequestBody() and body.hasBody()) {
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, fetch_error_unexpected_body, .{}, ctx);
is_error = true;
return JSPromise.rejectedPromiseValue(globalThis, err);
}
if (headers == null and body.size() > 0 and body.hasContentTypeFromUser()) {
if (headers == null and body.hasBody() and body.hasContentTypeFromUser()) {
headers = Headers.from(
null,
allocator,
.{ .body = &body },
.{ .body = body.getAnyBlob() },
) catch bun.outOfMemory();
}
var http_body = FetchTasklet.HTTPRequestBody{
.AnyBlob = body,
};
var http_body = body;
if (body.needsToReadFile()) {
prepare_body: {
const opened_fd_res: JSC.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) {
const opened_fd_res: JSC.Maybe(bun.FileDescriptor) = switch (body.store().?.data.file.pathlike) {
.fd => |fd| bun.sys.dup(fd),
.path => |path| bun.sys.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), if (Environment.isWindows) bun.O.RDONLY else bun.O.RDONLY | bun.O.NOCTTY, 0),
};
@@ -2870,7 +3118,7 @@ pub const Fetch = struct {
break :use_sendfile;
}
const original_size = body.Blob.size;
const original_size = body.AnyBlob.Blob.size;
const stat_size = @as(Blob.SizeType, @intCast(stat.size));
const blob_size = if (bun.isRegularFile(stat.mode))
stat_size
@@ -2880,8 +3128,8 @@ pub const Fetch = struct {
http_body = .{
.Sendfile = .{
.fd = opened_fd,
.remain = body.Blob.offset + original_size,
.offset = body.Blob.offset,
.remain = body.AnyBlob.Blob.offset + original_size,
.offset = body.AnyBlob.Blob.offset,
.content_size = blob_size,
},
};
@@ -2902,13 +3150,13 @@ pub const Fetch = struct {
.{
.encoding = .buffer,
.path = .{ .fd = opened_fd },
.offset = body.Blob.offset,
.max_size = body.Blob.size,
.offset = body.AnyBlob.Blob.offset,
.max_size = body.AnyBlob.Blob.size,
},
.sync,
);
if (body.Blob.store.?.data.file.pathlike == .path) {
if (body.store().?.data.file.pathlike == .path) {
_ = bun.sys.close(opened_fd);
}
@@ -2922,8 +3170,8 @@ pub const Fetch = struct {
},
.result => |result| {
body.detach();
body.from(std.ArrayList(u8).fromOwnedSlice(allocator, @constCast(result.slice())));
http_body = .{ .AnyBlob = body };
body.AnyBlob.from(std.ArrayList(u8).fromOwnedSlice(allocator, @constCast(result.slice())));
http_body = .{ .AnyBlob = body.AnyBlob };
},
}
}
@@ -2993,9 +3241,7 @@ pub const Fetch = struct {
body.detach();
} else {
// These are single-use, and have effectively been moved to the FetchTasklet.
body = .{
.Blob = .{},
};
body = FetchTasklet.HTTPRequestBody.Empty;
}
proxy = null;
url_proxy_buffer = "";

View File

@@ -470,6 +470,7 @@ pub const StreamStart = union(Tag) {
FileSink: FileSinkOptions,
HTTPSResponseSink: void,
HTTPResponseSink: void,
FetchTaskletChunkedRequestSink: void,
ready: void,
owned_and_done: bun.ByteList,
done: bun.ByteList,
@@ -496,6 +497,7 @@ pub const StreamStart = union(Tag) {
FileSink,
HTTPSResponseSink,
HTTPResponseSink,
FetchTaskletChunkedRequestSink,
ready,
owned_and_done,
done,
@@ -646,7 +648,7 @@ pub const StreamStart = union(Tag) {
},
};
},
.HTTPSResponseSink, .HTTPResponseSink => {
.FetchTaskletChunkedRequestSink, .HTTPSResponseSink, .HTTPResponseSink => {
var empty = true;
var chunk_size: JSC.WebCore.Blob.SizeType = 2048;
@@ -2600,7 +2602,266 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
pub const HTTPSResponseSink = HTTPServerWritable(true);
pub const HTTPResponseSink = HTTPServerWritable(false);
pub const FetchTaskletChunkedRequestSink = struct {
task: ?*JSC.WebCore.Fetch.FetchTasklet = null,
signal: Signal = .{},
globalThis: *JSGlobalObject = undefined,
highWaterMark: Blob.SizeType = 2048,
buffer: bun.io.StreamBuffer,
ended: bool = false,
done: bool = false,
auto_flusher: AutoFlusher = AutoFlusher{},
fn unregisterAutoFlusher(this: *@This()) void {
if (this.auto_flusher.registered)
AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
}
fn registerAutoFlusher(this: *@This()) void {
if (!this.auto_flusher.registered)
AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
}
pub fn onAutoFlush(this: *@This()) bool {
if (this.done) {
this.auto_flusher.registered = false;
return false;
}
_ = this.internalFlush() catch 0;
return false;
}
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Maybe(void) {
if (this.ended) {
return .{ .result = {} };
}
switch (stream_start) {
.chunk_size => |chunk_size| {
if (chunk_size > 0) {
this.highWaterMark = chunk_size;
}
},
else => {},
}
this.ended = false;
this.signal.start();
return .{ .result = {} };
}
pub fn connect(this: *@This(), signal: Signal) void {
this.signal = signal;
}
pub fn sink(this: *@This()) Sink {
return Sink.init(this);
}
pub fn finalize(this: *@This()) void {
var buffer = this.buffer;
this.buffer = .{};
buffer.deinit();
if (this.task) |task| {
this.task = null;
task.deref();
}
}
pub fn send(this: *@This(), data: []const u8, is_last: bool) !void {
if (this.done) return;
if (this.task) |task| {
if (is_last) this.done = true;
if (data.len == 0) {
task.sendRequestData(bun.http.end_of_chunked_http1_1_encoding_response_body, true);
return;
}
// chunk encoding is really simple
if (is_last) {
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\n\r\n", .{ data.len, data }) catch return error.OOM;
task.sendRequestData(chunk, true);
} else {
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n", .{ data.len, data }) catch return error.OOM;
task.sendRequestData(chunk, false);
}
}
}
pub fn internalFlush(this: *@This()) !usize {
this.unregisterAutoFlusher();
if (this.done) return 0;
var flushed: usize = 0;
// we need to respect the max len for the chunk
while (this.buffer.isNotEmpty()) {
const bytes = this.buffer.slice();
const len: u32 = @min(bytes.len, std.math.maxInt(u32));
try this.send(bytes, this.buffer.list.items.len - (this.buffer.cursor + len) == 0 and this.ended);
flushed += len;
this.buffer.cursor = len;
if (this.buffer.isEmpty()) {
this.buffer.reset();
}
}
if (this.ended and !this.done) {
try this.send("", true);
this.finalize();
}
return flushed;
}
pub fn flush(this: *@This()) JSC.Maybe(void) {
_ = this.internalFlush() catch 0;
return .{ .result = {} };
}
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, _: bool) JSC.Maybe(JSValue) {
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(this.internalFlush() catch 0)) };
}
pub fn destroy(this: *@This()) void {
this.finalize();
bun.default_allocator.destroy(this);
}
pub fn abort(this: *@This()) void {
this.ended = true;
this.done = true;
this.signal.close(null);
this.finalize();
}
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.ended) {
return .{ .owned = 0 };
}
const bytes = data.slice();
const len = @as(Blob.SizeType, @truncate(bytes.len));
if (this.buffer.size() == 0 and len >= this.highWaterMark) {
// fast path:
// - large-ish chunk
this.send(bytes, false) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
} else if (this.buffer.size() + len >= this.highWaterMark) {
_ = this.buffer.write(bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
_ = this.internalFlush() catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
} else {
// queue the data wait until highWaterMark is reached or the auto flusher kicks in
this.buffer.write(bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
}
this.registerAutoFlusher();
return .{ .owned = len };
}
pub const writeBytes = write;
pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.ended) {
return .{ .owned = 0 };
}
const bytes = data.slice();
const len = @as(Blob.SizeType, @truncate(bytes.len));
if (this.buffer.size() == 0 and len >= this.highWaterMark) {
// common case
if (strings.isAllASCII(bytes)) {
// fast path:
// - large-ish chunk
this.send(bytes, false) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
}
this.buffer.writeLatin1(bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
_ = this.internalFlush() catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
} else if (this.buffer.size() + len >= this.highWaterMark) {
// kinda fast path:
// - combined chunk is large enough to flush automatically
this.buffer.writeLatin1(bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
_ = this.internalFlush() catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = len };
} else {
this.buffer.writeLatin1(bytes) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
}
this.registerAutoFlusher();
return .{ .owned = len };
}
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
if (this.ended) {
return .{ .owned = 0 };
}
const bytes = data.slice();
// we must always buffer UTF-16
// we assume the case of all-ascii UTF-16 string is pretty uncommon
this.buffer.writeUTF16(@alignCast(std.mem.bytesAsSlice(u16, bytes))) catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
const readable = this.buffer.slice();
if (readable.len >= this.highWaterMark) {
_ = this.internalFlush() catch {
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
};
return .{ .owned = @as(Blob.SizeType, @intCast(bytes.len)) };
}
this.registerAutoFlusher();
return .{ .owned = @as(Blob.SizeType, @intCast(bytes.len)) };
}
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Maybe(void) {
if (this.ended) {
return .{ .result = {} };
}
// send EOF
this.ended = true;
// flush everything and send EOF
_ = this.internalFlush() catch 0;
this.signal.close(err);
return .{ .result = {} };
}
pub fn endFromJS(this: *@This(), _: *JSGlobalObject) JSC.Maybe(JSValue) {
if (this.ended) {
return .{ .result = JSC.JSValue.jsNumber(0) };
}
if (this.done) {
this.ended = true;
this.signal.close(null);
this.finalize();
return .{ .result = JSC.JSValue.jsNumber(0) };
}
_ = this.end(null);
return .{ .result = JSC.JSValue.jsNumber(0) };
}
const name = "FetchTaskletChunkedRequestSink";
pub const JSSink = NewJSSink(@This(), name);
};
pub const BufferedReadableStreamAction = enum {
text,
arrayBuffer,

View File

@@ -1,6 +1,12 @@
import { join, resolve } from "path";
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink"];
const classes = [
"ArrayBufferSink",
"FileSink",
"HTTPResponseSink",
"HTTPSResponseSink",
"FetchTaskletChunkedRequestSink",
];
function names(name) {
return {

View File

@@ -69,7 +69,7 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined;
// this doesn't need to be stack memory because it is immediately cloned after use
var shared_response_headers_buf: [256]picohttp.Header = undefined;
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
pub const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
pub const Signals = struct {
header_progress: ?*std.atomic.Value(bool) = null,
@@ -118,11 +118,31 @@ pub const FetchRedirect = enum(u8) {
pub const HTTPRequestBody = union(enum) {
bytes: []const u8,
sendfile: Sendfile,
stream: struct {
buffer: bun.io.StreamBuffer,
ended: bool,
pub fn hasEnded(this: *@This()) bool {
return this.ended and this.buffer.isEmpty();
}
},
pub fn isStream(this: *const HTTPRequestBody) bool {
return this.* == .stream;
}
pub fn deinit(this: *HTTPRequestBody) void {
switch (this.*) {
.sendfile, .bytes => {},
.stream => |*stream| stream.buffer.deinit(),
}
}
pub fn len(this: *const HTTPRequestBody) usize {
return switch (this.*) {
.bytes => this.bytes.len,
.sendfile => this.sendfile.content_size,
// unknow amounts
.stream => std.math.maxInt(usize),
};
}
};
@@ -555,6 +575,13 @@ fn NewHTTPContext(comptime ssl: bool) type {
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
}
pub fn getTaggedFromSocket(socket: HTTPSocket) ActiveSocket {
if (socket.ext(anyopaque)) |ctx| {
return getTagged(ctx);
}
return ActiveSocket.init(&dead_socket);
}
pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(),
us_socket_context: *uws.SocketContext,
@@ -1001,6 +1028,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue;
const Queue = UnboundedQueue(AsyncHTTP, .next);
const ShutdownQueue = UnboundedQueue(AsyncHTTP, .next);
const RequestWriteQueue = UnboundedQueue(AsyncHTTP, .next);
pub const HTTPThread = struct {
loop: *JSC.MiniEventLoop,
@@ -1010,7 +1038,10 @@ pub const HTTPThread = struct {
queued_tasks: Queue = Queue{},
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
queued_shutdowns_lock: bun.Lock = .{},
queued_writes_lock: bun.Lock = .{},
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
@@ -1020,7 +1051,14 @@ pub const HTTPThread = struct {
lazy_libdeflater: ?*LibdeflateState = null,
const threadlog = Output.scoped(.HTTPThread, true);
const WriteMessage = struct {
data: []const u8,
async_http_id: u32,
flags: packed struct {
is_tls: bool,
ended: bool,
},
};
const ShutdownMessage = struct {
async_http_id: u32,
is_tls: bool,
@@ -1207,6 +1245,57 @@ pub const HTTPThread = struct {
}
this.queued_shutdowns.clearRetainingCapacity();
}
{
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
for (this.queued_writes.items) |write| {
const ended = write.flags.ended;
defer if (!strings.eqlComptime(write.data, end_of_chunked_http1_1_encoding_response_body)) {
// "0\r\n\r\n" is always a static so no need to free
bun.default_allocator.free(write.data);
};
if (socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| {
if (write.flags.is_tls) {
const socket = uws.SocketTLS.fromAny(socket_ptr);
if (socket.isClosed() or socket.isShutdown()) {
continue;
}
const tagged = NewHTTPContext(true).getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
if (client.state.original_request_body == .stream) {
var stream = &client.state.original_request_body.stream;
stream.buffer.write(write.data) catch {};
stream.ended = ended;
}
client.onWritable(
false,
true,
socket,
);
}
} else {
const socket = uws.SocketTCP.fromAny(socket_ptr);
if (socket.isClosed() or socket.isShutdown()) {
continue;
}
const tagged = NewHTTPContext(false).getTaggedFromSocket(socket);
if (tagged.get(HTTPClient)) |client| {
if (client.state.original_request_body == .stream) {
var stream = &client.state.original_request_body.stream;
stream.buffer.write(write.data) catch {};
stream.ended = ended;
}
client.onWritable(
false,
false,
socket,
);
}
}
}
}
this.queued_writes.clearRetainingCapacity();
}
while (this.queued_proxy_deref.popOrNull()) |http| {
http.deref();
@@ -1282,6 +1371,23 @@ pub const HTTPThread = struct {
this.loop.loop.wakeup();
}
pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, data: []const u8, ended: bool) void {
{
this.queued_writes_lock.lock();
defer this.queued_writes_lock.unlock();
this.queued_writes.append(bun.default_allocator, .{
.async_http_id = http.async_http_id,
.data = data,
.flags = .{
.is_tls = http.client.isHTTPS(),
.ended = ended,
},
}) catch bun.outOfMemory();
}
if (this.has_awoken.load(.monotonic))
this.loop.loop.wakeup();
}
pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void {
// this is always called on the http thread
{
@@ -1847,6 +1953,7 @@ pub const InternalState = struct {
info.deinit(bun.default_allocator);
}
this.original_request_body.deinit();
this.* = .{
.body_out_str = body_msg,
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
@@ -2000,6 +2107,7 @@ pub const Flags = packed struct {
proxy_tunneling: bool = false,
reject_unauthorized: bool = true,
is_preconnect_only: bool = false,
is_streaming_request_body: bool = false,
};
// TODO: reduce the size of this struct
@@ -2131,6 +2239,7 @@ pub const Encoding = enum {
const host_header_name = "Host";
const content_length_header_name = "Content-Length";
const chunked_encoded_header = picohttp.Header{ .name = "Transfer-Encoding", .value = "chunked" };
const connection_header = picohttp.Header{ .name = "Connection", .value = "keep-alive" };
const connection_closing_header = picohttp.Header{ .name = "Connection", .value = "close" };
const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" };
@@ -2745,10 +2854,14 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
}
if (body_len > 0 or this.method.hasRequestBody()) {
request_headers_buf[header_count] = .{
.name = content_length_header_name,
.value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0",
};
if (this.flags.is_streaming_request_body) {
request_headers_buf[header_count] = chunked_encoded_header;
} else {
request_headers_buf[header_count] = .{
.name = content_length_header_name,
.value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0",
};
}
header_count += 1;
}
@@ -2766,8 +2879,16 @@ pub fn doRedirect(
ctx: *NewHTTPContext(is_ssl),
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
if (this.state.original_request_body == .stream) {
// we cannot follow redirect from a stream right now
// NOTE: we can use .tee(), reset the readable stream and cancel/wait pending write requests before redirecting. node.js just errors here so we just closeAndFail too.
this.closeAndFail(error.UnexpectedRedirect, is_ssl, socket);
return;
}
this.unix_socket_path.deinit();
this.unix_socket_path = JSC.ZigString.Slice.empty;
// TODO: what we do with stream body?
const request_body = if (this.state.flags.resend_request_body_on_redirect and this.state.original_request_body == .bytes)
this.state.original_request_body.bytes
else
@@ -3032,6 +3153,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.state.request_stage = .proxy_handshake;
} else {
this.state.request_stage = .body;
if (this.flags.is_streaming_request_body) {
// lets signal to start streaming the body
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
}
}
return;
}
@@ -3041,11 +3166,15 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
this.state.request_stage = .proxy_handshake;
} else {
this.state.request_stage = .body;
if (this.flags.is_streaming_request_body) {
// lets signal to start streaming the body
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
}
}
assert(
// we should have leftover data OR we use sendfile()
// we should have leftover data OR we use sendfile/stream
(this.state.original_request_body == .bytes and this.state.request_body.len > 0) or
this.state.original_request_body == .sendfile,
this.state.original_request_body == .sendfile or this.state.original_request_body == .stream,
);
// we sent everything, but there's some body leftover
@@ -3076,6 +3205,28 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
return;
}
},
.stream => {
var stream = &this.state.original_request_body.stream;
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
if (stream.buffer.isNotEmpty()) {
const to_send = stream.buffer.slice();
const amount = socket.write(to_send, true);
if (amount < 0) {
// this.closeAndFail(error.WriteFailed, is_ssl, socket);
return;
}
this.state.request_sent_len += @as(usize, @intCast(amount));
stream.buffer.cursor = @intCast(amount);
if (stream.buffer.isEmpty()) {
stream.buffer.reset();
}
}
if (stream.hasEnded()) {
this.state.request_stage = .done;
stream.buffer.deinit();
return;
}
},
.sendfile => |*sendfile| {
if (comptime is_ssl) {
@panic("sendfile is only supported without SSL. This code should never have been reached!");
@@ -3098,21 +3249,44 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
}
},
.proxy_body => {
if (this.state.original_request_body != .bytes) {
@panic("sendfile is only supported without SSL. This code should never have been reached!");
}
if (this.proxy_tunnel) |proxy| {
this.setTimeout(socket, 5);
switch (this.state.original_request_body) {
.bytes => {
this.setTimeout(socket, 5);
const to_send = this.state.request_body;
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
const to_send = this.state.request_body;
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
this.state.request_sent_len += @as(usize, @intCast(amount));
this.state.request_body = this.state.request_body[@as(usize, @intCast(amount))..];
this.state.request_sent_len += @as(usize, @intCast(amount));
this.state.request_body = this.state.request_body[@as(usize, @intCast(amount))..];
if (this.state.request_body.len == 0) {
this.state.request_stage = .done;
return;
if (this.state.request_body.len == 0) {
this.state.request_stage = .done;
return;
}
},
.stream => {
var stream = &this.state.original_request_body.stream;
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
if (stream.buffer.isNotEmpty()) {
const to_send = stream.buffer.slice();
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
this.state.request_sent_len += amount;
stream.buffer.cursor = @truncate(amount);
if (stream.buffer.isEmpty()) {
stream.buffer.reset();
}
}
if (stream.hasEnded()) {
this.state.request_stage = .done;
stream.buffer.deinit();
return;
}
},
.sendfile => {
@panic("sendfile is only supported without SSL. This code should never have been reached!");
},
}
}
},
@@ -3175,6 +3349,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
if (has_sent_headers) {
this.state.request_stage = .proxy_body;
if (this.flags.is_streaming_request_body) {
// lets signal to start streaming the body
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
}
assert(this.state.request_body.len > 0);
// we sent everything, but there's some body leftover
@@ -3558,6 +3736,7 @@ pub const HTTPClientResult = struct {
body: ?*MutableString = null,
has_more: bool = false,
redirected: bool = false,
can_stream: bool = false,
fail: ?anyerror = null,
@@ -3665,6 +3844,8 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
.body_size = body_size,
.certificate_info = certificate_info,
// we can stream the request_body at this stage
.can_stream = (this.state.request_stage == .body or this.state.request_stage == .proxy_body) and this.flags.is_streaming_request_body,
};
}

View File

@@ -66,6 +66,19 @@ function getBody() {
case "urlsearchparams":
body = getURLSearchParams();
break;
case "iterator":
body = async function* iter() {
yield (cachedBody ??= getString());
};
break;
case "stream":
body = new ReadableStream({
start(c) {
c.enqueue((cachedBody ??= getBuffer()));
c.close();
},
});
break;
default:
throw new Error(`Invalid type: ${type}`);
}

View File

@@ -99,7 +99,7 @@ describe("fetch doesn't leak", () => {
}
});
describe.each(["FormData", "Blob", "Buffer", "String", "URLSearchParams"])("Sending %s", type => {
describe.each(["FormData", "Blob", "Buffer", "String", "URLSearchParams", "stream", "iterator"])("Sending %s", type => {
test(
"does not leak",
async () => {

View File

@@ -6,7 +6,7 @@ import { mkfifo } from "mkfifo";
import net from "net";
import { join } from "path";
import { gzipSync } from "zlib";
import { Readable } from "stream";
const tmp_dir = tmpdirSync();
const fixture = readFileSync(join(import.meta.dir, "fetch.js.txt"), "utf8").replaceAll("\r\n", "\n");
@@ -2074,3 +2074,188 @@ describe("fetch Response life cycle", () => {
}
});
});
describe("fetch should allow duplex", () => {
it("should allow duplex streaming", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(req.body);
},
});
const intervalStream = new ReadableStream({
start(c) {
let count = 0;
const timer = setInterval(() => {
c.enqueue("Hello\n");
if (count === 5) {
clearInterval(timer);
c.close();
}
count++;
}, 20);
},
}).pipeThrough(new TextEncoderStream());
const resp = await fetch(server.url, {
method: "POST",
body: intervalStream,
duplex: "half",
});
const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader();
var result = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
result += value;
}
expect(result).toBe("Hello\n".repeat(6));
});
it("should allow duplex extending Readable (sync)", async () => {
class HelloWorldStream extends Readable {
constructor(options) {
super(options);
this.chunks = ["Hello", " ", "World!"];
this.index = 0;
}
_read(size) {
if (this.index < this.chunks.length) {
this.push(this.chunks[this.index]);
this.index++;
} else {
this.push(null);
}
}
}
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(req.body);
},
});
const response = await fetch(server.url, {
body: new HelloWorldStream(),
method: "POST",
duplex: "half",
});
expect(await response.text()).toBe("Hello World!");
});
it("should allow duplex extending Readable (async)", async () => {
class HelloWorldStream extends Readable {
constructor(options) {
super(options);
this.chunks = ["Hello", " ", "World!"];
this.index = 0;
}
_read(size) {
setTimeout(() => {
if (this.index < this.chunks.length) {
this.push(this.chunks[this.index]);
this.index++;
} else {
this.push(null);
}
}, 20);
}
}
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(req.body);
},
});
const response = await fetch(server.url, {
body: new HelloWorldStream(),
method: "POST",
duplex: "half",
});
expect(await response.text()).toBe("Hello World!");
});
it("should allow duplex using async iterator (async)", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
return new Response(req.body);
},
});
const response = await fetch(server.url, {
body: async function* iter() {
yield "Hello";
await Bun.sleep(20);
yield " ";
await Bun.sleep(20);
yield "World!";
},
method: "POST",
duplex: "half",
});
expect(await response.text()).toBe("Hello World!");
});
it("should fail in redirects .follow when using duplex", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
if (req.url.indexOf("/redirect") === -1) {
return Response.redirect("/");
}
return new Response(req.body);
},
});
expect(async () => {
const response = await fetch(server.url, {
body: async function* iter() {
yield "Hello";
await Bun.sleep(20);
yield " ";
await Bun.sleep(20);
yield "World!";
},
method: "POST",
duplex: "half",
});
await response.text();
}).toThrow();
});
it("should work in redirects .manual when using duplex", async () => {
using server = Bun.serve({
port: 0,
async fetch(req) {
if (req.url.indexOf("/redirect") === -1) {
return Response.redirect("/");
}
return new Response(req.body);
},
});
expect(async () => {
const response = await fetch(server.url, {
body: async function* iter() {
yield "Hello";
await Bun.sleep(20);
yield " ";
await Bun.sleep(20);
yield "World!";
},
method: "POST",
duplex: "half",
redirect: "manual",
});
await response.text();
}).not.toThrow();
});
});