mirror of
https://github.com/oven-sh/bun
synced 2026-02-04 16:08:53 +00:00
Compare commits
3 Commits
dylan/pyth
...
ciro/duple
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fa8c12656 | ||
|
|
8d32d8ce21 | ||
|
|
484e762ad7 |
@@ -9,9 +9,10 @@ enum SinkID : uint8_t {
|
||||
HTMLRewriterSink = 3,
|
||||
HTTPResponseSink = 4,
|
||||
HTTPSResponseSink = 5,
|
||||
FetchTaskletChunkedRequestSink = 6,
|
||||
|
||||
};
|
||||
static constexpr unsigned numberOfSinkIDs
|
||||
= 7;
|
||||
= 8;
|
||||
|
||||
}
|
||||
|
||||
@@ -3127,6 +3127,12 @@ void GlobalObject::finishCreation(VM& vm)
|
||||
init.set(prototype);
|
||||
});
|
||||
|
||||
m_JSFetchTaskletChunkedRequestControllerPrototype.initLater(
|
||||
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSObject>::Initializer& init) {
|
||||
auto* prototype = createJSSinkControllerPrototype(init.vm, init.owner, WebCore::SinkID::FetchTaskletChunkedRequestSink);
|
||||
init.set(prototype);
|
||||
});
|
||||
|
||||
m_performanceObject.initLater(
|
||||
[](const JSC::LazyProperty<JSC::JSGlobalObject, JSC::JSObject>::Initializer& init) {
|
||||
auto* globalObject = reinterpret_cast<Zig::GlobalObject*>(init.owner);
|
||||
@@ -3255,6 +3261,16 @@ void GlobalObject::finishCreation(VM& vm)
|
||||
init.setConstructor(constructor);
|
||||
});
|
||||
|
||||
m_JSFetchTaskletChunkedRequestSinkClassStructure.initLater(
|
||||
[](LazyClassStructure::Initializer& init) {
|
||||
auto* prototype = createJSSinkPrototype(init.vm, init.global, WebCore::SinkID::FetchTaskletChunkedRequestSink);
|
||||
auto* structure = JSFetchTaskletChunkedRequestSink::createStructure(init.vm, init.global, prototype);
|
||||
auto* constructor = JSFetchTaskletChunkedRequestSinkConstructor::create(init.vm, init.global, JSFetchTaskletChunkedRequestSinkConstructor::createStructure(init.vm, init.global, init.global->functionPrototype()), jsCast<JSObject*>(prototype));
|
||||
init.setPrototype(prototype);
|
||||
init.setStructure(structure);
|
||||
init.setConstructor(constructor);
|
||||
});
|
||||
|
||||
m_JSBufferClassStructure.initLater(
|
||||
[](LazyClassStructure::Initializer& init) {
|
||||
auto prototype = WebCore::createBufferPrototype(init.vm, init.global);
|
||||
@@ -3810,6 +3826,8 @@ void GlobalObject::visitChildrenImpl(JSCell* cell, Visitor& visitor)
|
||||
thisObject->m_JSHTTPResponseSinkClassStructure.visit(visitor);
|
||||
thisObject->m_JSHTTPSResponseControllerPrototype.visit(visitor);
|
||||
thisObject->m_JSHTTPSResponseSinkClassStructure.visit(visitor);
|
||||
thisObject->m_JSFetchTaskletChunkedRequestSinkClassStructure.visit(visitor);
|
||||
thisObject->m_JSFetchTaskletChunkedRequestControllerPrototype.visit(visitor);
|
||||
thisObject->m_JSSocketAddressStructure.visit(visitor);
|
||||
thisObject->m_JSSQLStatementStructure.visit(visitor);
|
||||
thisObject->m_V8GlobalInternals.visit(visitor);
|
||||
@@ -4349,6 +4367,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
|
||||
return GlobalObject::PromiseFunctions::Bun__onResolveEntryPointResult;
|
||||
} else if (handler == Bun__onRejectEntryPointResult) {
|
||||
return GlobalObject::PromiseFunctions::Bun__onRejectEntryPointResult;
|
||||
} else if (handler == Bun__FetchTasklet__onResolveRequestStream) {
|
||||
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onResolveRequestStream;
|
||||
} else if (handler == Bun__FetchTasklet__onRejectRequestStream) {
|
||||
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onRejectRequestStream;
|
||||
} else {
|
||||
RELEASE_ASSERT_NOT_REACHED();
|
||||
}
|
||||
|
||||
@@ -210,6 +210,11 @@ public:
|
||||
JSC::JSValue HTTPSResponseSinkPrototype() const { return m_JSHTTPSResponseSinkClassStructure.prototypeInitializedOnMainThread(this); }
|
||||
JSC::JSValue JSReadableHTTPSResponseSinkControllerPrototype() const { return m_JSHTTPSResponseControllerPrototype.getInitializedOnMainThread(this); }
|
||||
|
||||
JSC::Structure* FetchTaskletChunkedRequestSinkStructure() const { return m_JSFetchTaskletChunkedRequestSinkClassStructure.getInitializedOnMainThread(this); }
|
||||
JSC::JSObject* FetchTaskletChunkedRequestSink() { return m_JSFetchTaskletChunkedRequestSinkClassStructure.constructorInitializedOnMainThread(this); }
|
||||
JSC::JSValue FetchTaskletChunkedRequestSinkPrototype() const { return m_JSFetchTaskletChunkedRequestSinkClassStructure.prototypeInitializedOnMainThread(this); }
|
||||
JSC::JSValue JSReadableFetchTaskletChunkedRequestSinkControllerPrototype() const { return m_JSFetchTaskletChunkedRequestControllerPrototype.getInitializedOnMainThread(this); }
|
||||
|
||||
JSC::Structure* JSBufferListStructure() const { return m_JSBufferListClassStructure.getInitializedOnMainThread(this); }
|
||||
JSC::JSObject* JSBufferList() { return m_JSBufferListClassStructure.constructorInitializedOnMainThread(this); }
|
||||
JSC::JSValue JSBufferListPrototype() const { return m_JSBufferListClassStructure.prototypeInitializedOnMainThread(this); }
|
||||
@@ -329,8 +334,10 @@ public:
|
||||
Bun__BodyValueBufferer__onResolveStream,
|
||||
Bun__onResolveEntryPointResult,
|
||||
Bun__onRejectEntryPointResult,
|
||||
Bun__FetchTasklet__onRejectRequestStream,
|
||||
Bun__FetchTasklet__onResolveRequestStream,
|
||||
};
|
||||
static constexpr size_t promiseFunctionsSize = 24;
|
||||
static constexpr size_t promiseFunctionsSize = 26;
|
||||
|
||||
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));
|
||||
|
||||
@@ -504,6 +511,8 @@ public:
|
||||
LazyClassStructure m_JSFileSinkClassStructure;
|
||||
LazyClassStructure m_JSHTTPResponseSinkClassStructure;
|
||||
LazyClassStructure m_JSHTTPSResponseSinkClassStructure;
|
||||
LazyClassStructure m_JSFetchTaskletChunkedRequestSinkClassStructure;
|
||||
|
||||
LazyClassStructure m_JSStringDecoderClassStructure;
|
||||
LazyClassStructure m_NapiClassStructure;
|
||||
LazyClassStructure m_callSiteStructure;
|
||||
@@ -533,6 +542,7 @@ public:
|
||||
LazyProperty<JSGlobalObject, JSMap> m_esmRegistryMap;
|
||||
LazyProperty<JSGlobalObject, JSObject> m_JSArrayBufferControllerPrototype;
|
||||
LazyProperty<JSGlobalObject, JSObject> m_JSHTTPSResponseControllerPrototype;
|
||||
LazyProperty<JSGlobalObject, JSObject> m_JSFetchTaskletChunkedRequestControllerPrototype;
|
||||
LazyProperty<JSGlobalObject, JSObject> m_JSFileSinkControllerPrototype;
|
||||
LazyProperty<JSGlobalObject, JSObject> m_subtleCryptoObject;
|
||||
LazyProperty<JSGlobalObject, Structure> m_JSHTTPResponseController;
|
||||
|
||||
@@ -140,6 +140,7 @@ pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink;
|
||||
pub const JSHTTPSResponseSink = JSC.WebCore.HTTPSResponseSink.JSSink;
|
||||
pub const JSHTTPResponseSink = JSC.WebCore.HTTPResponseSink.JSSink;
|
||||
pub const JSFileSink = JSC.WebCore.FileSink.JSSink;
|
||||
pub const JSFetchTaskletChunkedRequestSink = JSC.WebCore.FetchTaskletChunkedRequestSink.JSSink;
|
||||
|
||||
// WebSocket
|
||||
pub const WebSocketHTTPClient = @import("../../http/websocket_http_client.zig").WebSocketHTTPClient;
|
||||
@@ -962,6 +963,7 @@ comptime {
|
||||
JSArrayBufferSink.shim.ref();
|
||||
JSHTTPResponseSink.shim.ref();
|
||||
JSHTTPSResponseSink.shim.ref();
|
||||
JSFetchTaskletChunkedRequestSink.shim.ref();
|
||||
JSFileSink.shim.ref();
|
||||
JSFileSink.shim.ref();
|
||||
_ = ZigString__free;
|
||||
|
||||
30
src/bun.js/bindings/headers.h
generated
30
src/bun.js/bindings/headers.h
generated
@@ -1,11 +1,5 @@
|
||||
// clang-format off
|
||||
//-- GENERATED FILE. Do not edit --
|
||||
//
|
||||
// To regenerate this file, run:
|
||||
//
|
||||
// make headers
|
||||
//
|
||||
//-- GENERATED FILE. Do not edit --
|
||||
// This file used to be generated but now in hard coded.
|
||||
#pragma once
|
||||
|
||||
#include <stddef.h>
|
||||
@@ -691,6 +685,25 @@ BUN_DECLARE_HOST_FUNCTION(FileSink__start);
|
||||
ZIG_DECL void FileSink__updateRef(void* arg0, bool arg1);
|
||||
BUN_DECLARE_HOST_FUNCTION(FileSink__write);
|
||||
|
||||
#endif
|
||||
CPP_DECL JSC__JSValue FetchTaskletChunkedRequestSink__assignToStream(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1, void* arg2, void** arg3);
|
||||
CPP_DECL JSC__JSValue FetchTaskletChunkedRequestSink__createObject(JSC__JSGlobalObject* arg0, void* arg1, uintptr_t destructor);
|
||||
CPP_DECL void FetchTaskletChunkedRequestSink__detachPtr(JSC__JSValue JSValue0);
|
||||
CPP_DECL void* FetchTaskletChunkedRequestSink__fromJS(JSC__JSGlobalObject* arg0, JSC__JSValue JSValue1);
|
||||
CPP_DECL void FetchTaskletChunkedRequestSink__onClose(JSC__JSValue JSValue0, JSC__JSValue JSValue1);
|
||||
CPP_DECL void FetchTaskletChunkedRequestSink__onReady(JSC__JSValue JSValue0, JSC__JSValue JSValue1, JSC__JSValue JSValue2);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
||||
ZIG_DECL JSC__JSValue FetchTaskletChunkedRequestSink__close(JSC__JSGlobalObject* arg0, void* arg1);
|
||||
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__construct);
|
||||
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__end);
|
||||
ZIG_DECL JSC__JSValue SYSV_ABI SYSV_ABI FetchTaskletChunkedRequestSink__endWithSink(void* arg0, JSC__JSGlobalObject* arg1);
|
||||
ZIG_DECL void FetchTaskletChunkedRequestSink__finalize(void* arg0);
|
||||
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__flush);
|
||||
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__start);
|
||||
ZIG_DECL void FetchTaskletChunkedRequestSink__updateRef(void* arg0, bool arg1);
|
||||
BUN_DECLARE_HOST_FUNCTION(FetchTaskletChunkedRequestSink__write);
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
@@ -852,4 +865,7 @@ CPP_DECL bool JSC__CustomGetterSetter__isSetterNull(JSC__CustomGetterSetter *arg
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__onResolveEntryPointResult);
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__onRejectEntryPointResult);
|
||||
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onResolveRequestStream);
|
||||
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onRejectRequestStream);
|
||||
|
||||
#endif
|
||||
7
src/bun.js/bindings/headers.zig
generated
7
src/bun.js/bindings/headers.zig
generated
@@ -379,6 +379,13 @@ pub extern fn FileSink__setDestroyCallback(JSValue0: JSC__JSValue, callback: usi
|
||||
pub extern fn FileSink__fromJS(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) ?*anyopaque;
|
||||
pub extern fn FileSink__onClose(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue) void;
|
||||
pub extern fn FileSink__onReady(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue, JSValue2: JSC__JSValue) void;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__assignToStream(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue, arg2: ?*anyopaque, arg3: [*c]*anyopaque) JSC__JSValue;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__createObject(arg0: *bindings.JSGlobalObject, arg1: ?*anyopaque, onDestroyPtrTag: usize) JSC__JSValue;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__detachPtr(JSValue0: JSC__JSValue) void;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__setDestroyCallback(JSValue0: JSC__JSValue, callback: usize) void;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__fromJS(arg0: *bindings.JSGlobalObject, JSValue1: JSC__JSValue) ?*anyopaque;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__onClose(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue) void;
|
||||
pub extern fn FetchTaskletChunkedRequestSink__onReady(JSValue0: JSC__JSValue, JSValue1: JSC__JSValue, JSValue2: JSC__JSValue) void;
|
||||
pub extern fn ZigException__fromException(arg0: [*c]bindings.Exception) ZigException;
|
||||
|
||||
pub const JSC__GetterSetter = bindings.GetterSetter;
|
||||
|
||||
@@ -743,7 +743,7 @@ pub const DeferredTaskQueue = struct {
|
||||
};
|
||||
|
||||
if (!this.map.values()[i](key)) {
|
||||
this.map.swapRemoveAt(i);
|
||||
_ = this.map.swapRemove(key);
|
||||
last = this.map.count();
|
||||
} else {
|
||||
i += 1;
|
||||
|
||||
@@ -158,6 +158,18 @@ pub const Body = struct {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn isDisturbed2(this: *const PendingValue, globalObject: *JSC.JSGlobalObject) bool {
|
||||
if (this.promise != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.readable.get()) |readable| {
|
||||
return readable.isDisturbed(globalObject);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
pub fn isStreamingOrBuffering(this: *PendingValue) bool {
|
||||
return this.readable.held.has() or (this.promise != null and !this.promise.?.isEmptyOrUndefinedOrNull());
|
||||
}
|
||||
|
||||
@@ -765,14 +765,17 @@ pub const Fetch = struct {
|
||||
};
|
||||
|
||||
pub const FetchTasklet = struct {
|
||||
const log = Output.scoped(.FetchTasklet, false);
|
||||
pub const FetchTaskletStream = JSC.WebCore.FetchTaskletChunkedRequestSink;
|
||||
|
||||
const log = Output.scoped(.FetchTasklet, false);
|
||||
sink: ?*FetchTaskletStream.JSSink = null,
|
||||
http: ?*http.AsyncHTTP = null,
|
||||
result: http.HTTPClientResult = .{},
|
||||
metadata: ?http.HTTPResponseMetadata = null,
|
||||
javascript_vm: *VirtualMachine = undefined,
|
||||
global_this: *JSGlobalObject = undefined,
|
||||
request_body: HTTPRequestBody = undefined,
|
||||
|
||||
/// buffer being used by AsyncHTTP
|
||||
response_buffer: MutableString = undefined,
|
||||
/// buffer used to stream response to JS
|
||||
@@ -814,6 +817,7 @@ pub const Fetch = struct {
|
||||
hostname: ?[]u8 = null,
|
||||
is_waiting_body: bool = false,
|
||||
is_waiting_abort: bool = false,
|
||||
is_waiting_request_stream_start: bool = false,
|
||||
mutex: Mutex,
|
||||
|
||||
tracker: JSC.AsyncTaskTracker,
|
||||
@@ -849,6 +853,9 @@ pub const Fetch = struct {
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
AnyBlob: AnyBlob,
|
||||
Sendfile: http.Sendfile,
|
||||
ReadableStream: JSC.WebCore.ReadableStream.Strong,
|
||||
|
||||
pub const Empty: HTTPRequestBody = .{ .AnyBlob = .{ .Blob = .{} } };
|
||||
|
||||
pub fn store(this: *HTTPRequestBody) ?*JSC.WebCore.Blob.Store {
|
||||
return switch (this.*) {
|
||||
@@ -867,6 +874,9 @@ pub const Fetch = struct {
|
||||
pub fn detach(this: *HTTPRequestBody) void {
|
||||
switch (this.*) {
|
||||
.AnyBlob => this.AnyBlob.detach(),
|
||||
.ReadableStream => |*stream| {
|
||||
stream.deinit();
|
||||
},
|
||||
.Sendfile => {
|
||||
if (@max(this.Sendfile.offset, this.Sendfile.remain) > 0)
|
||||
_ = bun.sys.close(this.Sendfile.fd);
|
||||
@@ -875,12 +885,71 @@ pub const Fetch = struct {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fromJS(globalThis: *JSGlobalObject, value: JSValue) bun.JSError!HTTPRequestBody {
|
||||
var body_value = try Body.Value.fromJS(globalThis, value);
|
||||
if (body_value == .Used or (body_value == .Locked and (body_value.Locked.action != .none or body_value.Locked.isDisturbed2(globalThis)))) {
|
||||
return globalThis.ERR_BODY_ALREADY_USED("body already used", .{}).throw();
|
||||
}
|
||||
if (body_value == .Locked) {
|
||||
if (body_value.Locked.readable.has()) {
|
||||
// just grab the ref
|
||||
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
|
||||
}
|
||||
const readable = body_value.toReadableStream(globalThis);
|
||||
if (!readable.isEmptyOrUndefinedOrNull() and body_value == .Locked and body_value.Locked.readable.has()) {
|
||||
return FetchTasklet.HTTPRequestBody{ .ReadableStream = body_value.Locked.readable };
|
||||
}
|
||||
}
|
||||
return FetchTasklet.HTTPRequestBody{ .AnyBlob = body_value.useAsAnyBlob() };
|
||||
}
|
||||
|
||||
pub fn needsToReadFile(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.needsToReadFile(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasContentTypeFromUser(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.hasContentTypeFromUser(),
|
||||
else => false,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn getAnyBlob(this: *HTTPRequestBody) ?*AnyBlob {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => &this.AnyBlob,
|
||||
else => null,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn hasBody(this: *HTTPRequestBody) bool {
|
||||
return switch (this.*) {
|
||||
.AnyBlob => |blob| blob.size() > 0,
|
||||
.ReadableStream => |*stream| stream.has(),
|
||||
.Sendfile => true,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
pub fn init(_: std.mem.Allocator) anyerror!FetchTasklet {
|
||||
return FetchTasklet{};
|
||||
}
|
||||
|
||||
fn clearSink(this: *FetchTasklet) void {
|
||||
if (this.sink) |wrapper| {
|
||||
this.sink = null;
|
||||
|
||||
wrapper.sink.done = true;
|
||||
wrapper.sink.ended = true;
|
||||
wrapper.sink.finalize();
|
||||
wrapper.detach();
|
||||
wrapper.sink.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
fn clearData(this: *FetchTasklet) void {
|
||||
log("clearData", .{});
|
||||
const allocator = this.memory_reporter.allocator();
|
||||
@@ -923,7 +992,9 @@ pub const Fetch = struct {
|
||||
this.readable_stream_ref.deinit();
|
||||
|
||||
this.scheduled_response_buffer.deinit();
|
||||
this.request_body.detach();
|
||||
if (this.request_body != .ReadableStream or this.is_waiting_request_stream_start) {
|
||||
this.request_body.detach();
|
||||
}
|
||||
|
||||
this.abort_reason.deinit();
|
||||
this.check_server_identity.deinit();
|
||||
@@ -965,6 +1036,147 @@ pub const Fetch = struct {
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn onResolveRequestStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
var args = callframe.arguments_old(2);
|
||||
var this: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
|
||||
defer this.deref();
|
||||
if (this.request_body == .ReadableStream) {
|
||||
var readable_stream_ref = this.request_body.ReadableStream;
|
||||
this.request_body.ReadableStream = .{};
|
||||
defer readable_stream_ref.deinit();
|
||||
if (readable_stream_ref.get()) |stream| {
|
||||
stream.done(globalThis);
|
||||
this.clearSink();
|
||||
}
|
||||
}
|
||||
|
||||
return JSValue.jsUndefined();
|
||||
}
|
||||
|
||||
pub fn onRejectRequestStream(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
|
||||
const args = callframe.arguments_old(2);
|
||||
var this = args.ptr[args.len - 1].asPromisePtr(@This());
|
||||
defer this.deref();
|
||||
const err = args.ptr[0];
|
||||
if (this.request_body == .ReadableStream) {
|
||||
var readable_stream_ref = this.request_body.ReadableStream;
|
||||
this.request_body.ReadableStream = .{};
|
||||
defer readable_stream_ref.deinit();
|
||||
if (readable_stream_ref.get()) |stream| {
|
||||
stream.cancel(globalThis);
|
||||
this.clearSink();
|
||||
}
|
||||
}
|
||||
|
||||
this.abortListener(err);
|
||||
return JSValue.jsUndefined();
|
||||
}
|
||||
pub const shim = JSC.Shimmer("Bun", "FetchTasklet", @This());
|
||||
|
||||
pub const Export = shim.exportFunctions(.{
|
||||
.onResolveRequestStream = onResolveRequestStream,
|
||||
.onRejectRequestStream = onRejectRequestStream,
|
||||
});
|
||||
comptime {
|
||||
const jsonResolveRequestStream = JSC.toJSHostFunction(onResolveRequestStream);
|
||||
@export(jsonResolveRequestStream, .{ .name = Export[0].symbol_name });
|
||||
const jsonRejectRequestStream = JSC.toJSHostFunction(onRejectRequestStream);
|
||||
@export(jsonRejectRequestStream, .{ .name = Export[1].symbol_name });
|
||||
}
|
||||
|
||||
pub fn startRequestStream(this: *FetchTasklet) void {
|
||||
this.is_waiting_request_stream_start = false;
|
||||
bun.assert(this.request_body == .ReadableStream);
|
||||
if (this.request_body.ReadableStream.get()) |stream| {
|
||||
this.ref(); // lets only unref when sink is done
|
||||
|
||||
const globalThis = this.global_this;
|
||||
var response_stream = bun.default_allocator.create(FetchTaskletStream.JSSink) catch unreachable;
|
||||
response_stream.* = FetchTaskletStream.JSSink{
|
||||
.sink = .{
|
||||
.task = this,
|
||||
.buffer = .{},
|
||||
.globalThis = globalThis,
|
||||
},
|
||||
};
|
||||
var signal = &response_stream.sink.signal;
|
||||
this.sink = response_stream;
|
||||
|
||||
signal.* = FetchTaskletStream.JSSink.SinkSignal.init(JSValue.zero);
|
||||
|
||||
// explicitly set it to a dead pointer
|
||||
// we use this memory address to disable signals being sent
|
||||
signal.clear();
|
||||
bun.assert(signal.isDead());
|
||||
|
||||
// We are already corked!
|
||||
const assignment_result: JSValue = FetchTaskletStream.JSSink.assignToStream(
|
||||
globalThis,
|
||||
stream.value,
|
||||
response_stream,
|
||||
@as(**anyopaque, @ptrCast(&signal.ptr)),
|
||||
);
|
||||
|
||||
assignment_result.ensureStillAlive();
|
||||
|
||||
// assert that it was updated
|
||||
bun.assert(!signal.isDead());
|
||||
|
||||
if (assignment_result.toError()) |err_value| {
|
||||
response_stream.detach();
|
||||
this.sink = null;
|
||||
response_stream.sink.destroy();
|
||||
return this.abortListener(err_value);
|
||||
}
|
||||
|
||||
if (!assignment_result.isEmptyOrUndefinedOrNull()) {
|
||||
this.javascript_vm.drainMicrotasks();
|
||||
|
||||
assignment_result.ensureStillAlive();
|
||||
// it returns a Promise when it goes through ReadableStreamDefaultReader
|
||||
if (assignment_result.asAnyPromise()) |promise| {
|
||||
switch (promise.status(globalThis.vm())) {
|
||||
.pending => {
|
||||
this.ref();
|
||||
assignment_result.then(
|
||||
globalThis,
|
||||
this,
|
||||
onResolveRequestStream,
|
||||
onRejectRequestStream,
|
||||
);
|
||||
},
|
||||
.fulfilled => {
|
||||
var readable_stream_ref = this.request_body.ReadableStream;
|
||||
this.request_body.ReadableStream = .{};
|
||||
defer {
|
||||
stream.done(globalThis);
|
||||
this.clearSink();
|
||||
readable_stream_ref.deinit();
|
||||
}
|
||||
},
|
||||
.rejected => {
|
||||
var readable_stream_ref = this.request_body.ReadableStream;
|
||||
this.request_body.ReadableStream = .{};
|
||||
defer {
|
||||
stream.cancel(globalThis);
|
||||
this.clearSink();
|
||||
readable_stream_ref.deinit();
|
||||
}
|
||||
|
||||
this.abortListener(promise.result(globalThis.vm()));
|
||||
},
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
// if is not a promise we treat it as Error
|
||||
response_stream.detach();
|
||||
this.sink = null;
|
||||
response_stream.sink.destroy();
|
||||
return this.abortListener(assignment_result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn onBodyReceived(this: *FetchTasklet) void {
|
||||
const success = this.result.isSuccess();
|
||||
const globalThis = this.global_this;
|
||||
@@ -1141,11 +1353,17 @@ pub const Fetch = struct {
|
||||
this.deref();
|
||||
}
|
||||
}
|
||||
if (this.is_waiting_request_stream_start and this.result.can_stream) {
|
||||
// start streaming
|
||||
this.startRequestStream();
|
||||
}
|
||||
// if we already respond the metadata and still need to process the body
|
||||
if (this.is_waiting_body) {
|
||||
this.onBodyReceived();
|
||||
return;
|
||||
}
|
||||
if (this.metadata == null and this.result.isSuccess()) return;
|
||||
|
||||
// if we abort because of cert error
|
||||
// we wait the Http Client because we already have the response
|
||||
// we just need to deinit
|
||||
@@ -1195,7 +1413,6 @@ pub const Fetch = struct {
|
||||
this.promise.deinit();
|
||||
}
|
||||
const success = this.result.isSuccess();
|
||||
|
||||
const result = switch (success) {
|
||||
true => JSC.Strong.create(this.onResolve(), globalThis),
|
||||
false => brk: {
|
||||
@@ -1713,7 +1930,18 @@ pub const Fetch = struct {
|
||||
.tls_props = fetch_options.ssl_config,
|
||||
},
|
||||
);
|
||||
|
||||
// enable streaming the write side
|
||||
const isStream = fetch_tasklet.request_body == .ReadableStream;
|
||||
fetch_tasklet.http.?.client.flags.is_streaming_request_body = isStream;
|
||||
fetch_tasklet.is_waiting_request_stream_start = isStream;
|
||||
if (isStream) {
|
||||
fetch_tasklet.http.?.request_body = .{
|
||||
.stream = .{
|
||||
.buffer = .{},
|
||||
.ended = false,
|
||||
},
|
||||
};
|
||||
}
|
||||
// TODO is this necessary? the http client already sets the redirect type,
|
||||
// so manually setting it here seems redundant
|
||||
if (fetch_options.redirect_type != FetchRedirect.follow) {
|
||||
@@ -1740,6 +1968,22 @@ pub const Fetch = struct {
|
||||
log("abortListener", .{});
|
||||
reason.ensureStillAlive();
|
||||
this.abort_reason.set(this.global_this, reason);
|
||||
this.abortTask();
|
||||
if (this.sink) |wrapper| {
|
||||
wrapper.sink.abort();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sendRequestData(this: *FetchTasklet, data: []const u8, ended: bool) void {
|
||||
if (this.http) |http_| {
|
||||
http.http_thread.scheduleRequestWrite(http_, data, ended);
|
||||
} else if (data.len != 3) {
|
||||
bun.default_allocator.free(data);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abortTask(this: *FetchTasklet) void {
|
||||
this.signal_store.aborted.store(true, .monotonic);
|
||||
this.tracker.didCancel(this.global_this);
|
||||
|
||||
@@ -2017,9 +2261,7 @@ pub const Fetch = struct {
|
||||
// which is important for FormData.
|
||||
// https://github.com/oven-sh/bun/issues/2264
|
||||
//
|
||||
var body: AnyBlob = AnyBlob{
|
||||
.Blob = .{},
|
||||
};
|
||||
var body: FetchTasklet.HTTPRequestBody = FetchTasklet.HTTPRequestBody.Empty;
|
||||
|
||||
var disable_timeout = false;
|
||||
var disable_keepalive = false;
|
||||
@@ -2559,8 +2801,7 @@ pub const Fetch = struct {
|
||||
if (options_object) |options| {
|
||||
if (options.fastGet(globalThis, .body)) |body__| {
|
||||
if (!body__.isUndefined()) {
|
||||
var body_value = try Body.Value.fromJS(ctx, body__);
|
||||
break :extract_body body_value.useAsAnyBlob();
|
||||
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2575,20 +2816,29 @@ pub const Fetch = struct {
|
||||
return globalThis.ERR_BODY_ALREADY_USED("Request body already used", .{}).throw();
|
||||
}
|
||||
|
||||
break :extract_body req.body.value.useAsAnyBlob();
|
||||
if (req.body.value == .Locked) {
|
||||
if (req.body.value.Locked.readable.has()) {
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = JSC.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get().?, globalThis) };
|
||||
}
|
||||
const readable = req.body.value.toReadableStream(globalThis);
|
||||
if (!readable.isEmptyOrUndefinedOrNull() and req.body.value == .Locked and req.body.value.Locked.readable.has()) {
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .ReadableStream = JSC.WebCore.ReadableStream.Strong.init(req.body.value.Locked.readable.get().?, globalThis) };
|
||||
}
|
||||
}
|
||||
|
||||
break :extract_body FetchTasklet.HTTPRequestBody{ .AnyBlob = req.body.value.useAsAnyBlob() };
|
||||
}
|
||||
|
||||
if (request_init_object) |req| {
|
||||
if (req.fastGet(globalThis, .body)) |body__| {
|
||||
if (!body__.isUndefined()) {
|
||||
var body_value = try Body.Value.fromJS(ctx, body__);
|
||||
break :extract_body body_value.useAsAnyBlob();
|
||||
break :extract_body try FetchTasklet.HTTPRequestBody.fromJS(ctx, body__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break :extract_body null;
|
||||
} orelse AnyBlob{ .Blob = .{} };
|
||||
} orelse FetchTasklet.HTTPRequestBody.Empty;
|
||||
|
||||
if (globalThis.hasException()) {
|
||||
is_error = true;
|
||||
@@ -2682,7 +2932,7 @@ pub const Fetch = struct {
|
||||
hostname = _hostname.toOwnedSliceZ(allocator) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
break :extract_headers Headers.from(headers_, allocator, .{ .body = &body }) catch bun.outOfMemory();
|
||||
break :extract_headers Headers.from(headers_, allocator, .{ .body = body.getAnyBlob() }) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
break :extract_headers headers;
|
||||
@@ -2816,27 +3066,25 @@ pub const Fetch = struct {
|
||||
}
|
||||
}
|
||||
|
||||
if (!method.hasRequestBody() and body.size() > 0) {
|
||||
if (!method.hasRequestBody() and body.hasBody()) {
|
||||
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, fetch_error_unexpected_body, .{}, ctx);
|
||||
is_error = true;
|
||||
return JSPromise.rejectedPromiseValue(globalThis, err);
|
||||
}
|
||||
|
||||
if (headers == null and body.size() > 0 and body.hasContentTypeFromUser()) {
|
||||
if (headers == null and body.hasBody() and body.hasContentTypeFromUser()) {
|
||||
headers = Headers.from(
|
||||
null,
|
||||
allocator,
|
||||
.{ .body = &body },
|
||||
.{ .body = body.getAnyBlob() },
|
||||
) catch bun.outOfMemory();
|
||||
}
|
||||
|
||||
var http_body = FetchTasklet.HTTPRequestBody{
|
||||
.AnyBlob = body,
|
||||
};
|
||||
var http_body = body;
|
||||
|
||||
if (body.needsToReadFile()) {
|
||||
prepare_body: {
|
||||
const opened_fd_res: JSC.Maybe(bun.FileDescriptor) = switch (body.Blob.store.?.data.file.pathlike) {
|
||||
const opened_fd_res: JSC.Maybe(bun.FileDescriptor) = switch (body.store().?.data.file.pathlike) {
|
||||
.fd => |fd| bun.sys.dup(fd),
|
||||
.path => |path| bun.sys.open(path.sliceZ(&globalThis.bunVM().nodeFS().sync_error_buf), if (Environment.isWindows) bun.O.RDONLY else bun.O.RDONLY | bun.O.NOCTTY, 0),
|
||||
};
|
||||
@@ -2870,7 +3118,7 @@ pub const Fetch = struct {
|
||||
break :use_sendfile;
|
||||
}
|
||||
|
||||
const original_size = body.Blob.size;
|
||||
const original_size = body.AnyBlob.Blob.size;
|
||||
const stat_size = @as(Blob.SizeType, @intCast(stat.size));
|
||||
const blob_size = if (bun.isRegularFile(stat.mode))
|
||||
stat_size
|
||||
@@ -2880,8 +3128,8 @@ pub const Fetch = struct {
|
||||
http_body = .{
|
||||
.Sendfile = .{
|
||||
.fd = opened_fd,
|
||||
.remain = body.Blob.offset + original_size,
|
||||
.offset = body.Blob.offset,
|
||||
.remain = body.AnyBlob.Blob.offset + original_size,
|
||||
.offset = body.AnyBlob.Blob.offset,
|
||||
.content_size = blob_size,
|
||||
},
|
||||
};
|
||||
@@ -2902,13 +3150,13 @@ pub const Fetch = struct {
|
||||
.{
|
||||
.encoding = .buffer,
|
||||
.path = .{ .fd = opened_fd },
|
||||
.offset = body.Blob.offset,
|
||||
.max_size = body.Blob.size,
|
||||
.offset = body.AnyBlob.Blob.offset,
|
||||
.max_size = body.AnyBlob.Blob.size,
|
||||
},
|
||||
.sync,
|
||||
);
|
||||
|
||||
if (body.Blob.store.?.data.file.pathlike == .path) {
|
||||
if (body.store().?.data.file.pathlike == .path) {
|
||||
_ = bun.sys.close(opened_fd);
|
||||
}
|
||||
|
||||
@@ -2922,8 +3170,8 @@ pub const Fetch = struct {
|
||||
},
|
||||
.result => |result| {
|
||||
body.detach();
|
||||
body.from(std.ArrayList(u8).fromOwnedSlice(allocator, @constCast(result.slice())));
|
||||
http_body = .{ .AnyBlob = body };
|
||||
body.AnyBlob.from(std.ArrayList(u8).fromOwnedSlice(allocator, @constCast(result.slice())));
|
||||
http_body = .{ .AnyBlob = body.AnyBlob };
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -2993,9 +3241,7 @@ pub const Fetch = struct {
|
||||
body.detach();
|
||||
} else {
|
||||
// These are single-use, and have effectively been moved to the FetchTasklet.
|
||||
body = .{
|
||||
.Blob = .{},
|
||||
};
|
||||
body = FetchTasklet.HTTPRequestBody.Empty;
|
||||
}
|
||||
proxy = null;
|
||||
url_proxy_buffer = "";
|
||||
|
||||
@@ -470,6 +470,7 @@ pub const StreamStart = union(Tag) {
|
||||
FileSink: FileSinkOptions,
|
||||
HTTPSResponseSink: void,
|
||||
HTTPResponseSink: void,
|
||||
FetchTaskletChunkedRequestSink: void,
|
||||
ready: void,
|
||||
owned_and_done: bun.ByteList,
|
||||
done: bun.ByteList,
|
||||
@@ -496,6 +497,7 @@ pub const StreamStart = union(Tag) {
|
||||
FileSink,
|
||||
HTTPSResponseSink,
|
||||
HTTPResponseSink,
|
||||
FetchTaskletChunkedRequestSink,
|
||||
ready,
|
||||
owned_and_done,
|
||||
done,
|
||||
@@ -646,7 +648,7 @@ pub const StreamStart = union(Tag) {
|
||||
},
|
||||
};
|
||||
},
|
||||
.HTTPSResponseSink, .HTTPResponseSink => {
|
||||
.FetchTaskletChunkedRequestSink, .HTTPSResponseSink, .HTTPResponseSink => {
|
||||
var empty = true;
|
||||
var chunk_size: JSC.WebCore.Blob.SizeType = 2048;
|
||||
|
||||
@@ -2600,7 +2602,266 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
pub const HTTPSResponseSink = HTTPServerWritable(true);
|
||||
pub const HTTPResponseSink = HTTPServerWritable(false);
|
||||
pub const FetchTaskletChunkedRequestSink = struct {
|
||||
task: ?*JSC.WebCore.Fetch.FetchTasklet = null,
|
||||
signal: Signal = .{},
|
||||
globalThis: *JSGlobalObject = undefined,
|
||||
highWaterMark: Blob.SizeType = 2048,
|
||||
buffer: bun.io.StreamBuffer,
|
||||
ended: bool = false,
|
||||
done: bool = false,
|
||||
auto_flusher: AutoFlusher = AutoFlusher{},
|
||||
|
||||
fn unregisterAutoFlusher(this: *@This()) void {
|
||||
if (this.auto_flusher.registered)
|
||||
AutoFlusher.unregisterDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
|
||||
}
|
||||
|
||||
fn registerAutoFlusher(this: *@This()) void {
|
||||
if (!this.auto_flusher.registered)
|
||||
AutoFlusher.registerDeferredMicrotaskWithTypeUnchecked(@This(), this, this.globalThis.bunVM());
|
||||
}
|
||||
|
||||
pub fn onAutoFlush(this: *@This()) bool {
|
||||
if (this.done) {
|
||||
this.auto_flusher.registered = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
_ = this.internalFlush() catch 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn start(this: *@This(), stream_start: StreamStart) JSC.Maybe(void) {
|
||||
if (this.ended) {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
switch (stream_start) {
|
||||
.chunk_size => |chunk_size| {
|
||||
if (chunk_size > 0) {
|
||||
this.highWaterMark = chunk_size;
|
||||
}
|
||||
},
|
||||
else => {},
|
||||
}
|
||||
this.ended = false;
|
||||
this.signal.start();
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
pub fn connect(this: *@This(), signal: Signal) void {
|
||||
this.signal = signal;
|
||||
}
|
||||
pub fn sink(this: *@This()) Sink {
|
||||
return Sink.init(this);
|
||||
}
|
||||
pub fn finalize(this: *@This()) void {
|
||||
var buffer = this.buffer;
|
||||
this.buffer = .{};
|
||||
buffer.deinit();
|
||||
|
||||
if (this.task) |task| {
|
||||
this.task = null;
|
||||
task.deref();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(this: *@This(), data: []const u8, is_last: bool) !void {
|
||||
if (this.done) return;
|
||||
|
||||
if (this.task) |task| {
|
||||
if (is_last) this.done = true;
|
||||
|
||||
if (data.len == 0) {
|
||||
task.sendRequestData(bun.http.end_of_chunked_http1_1_encoding_response_body, true);
|
||||
return;
|
||||
}
|
||||
|
||||
// chunk encoding is really simple
|
||||
if (is_last) {
|
||||
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\n\r\n", .{ data.len, data }) catch return error.OOM;
|
||||
task.sendRequestData(chunk, true);
|
||||
} else {
|
||||
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n", .{ data.len, data }) catch return error.OOM;
|
||||
task.sendRequestData(chunk, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn internalFlush(this: *@This()) !usize {
|
||||
this.unregisterAutoFlusher();
|
||||
if (this.done) return 0;
|
||||
var flushed: usize = 0;
|
||||
// we need to respect the max len for the chunk
|
||||
while (this.buffer.isNotEmpty()) {
|
||||
const bytes = this.buffer.slice();
|
||||
const len: u32 = @min(bytes.len, std.math.maxInt(u32));
|
||||
try this.send(bytes, this.buffer.list.items.len - (this.buffer.cursor + len) == 0 and this.ended);
|
||||
flushed += len;
|
||||
this.buffer.cursor = len;
|
||||
if (this.buffer.isEmpty()) {
|
||||
this.buffer.reset();
|
||||
}
|
||||
}
|
||||
if (this.ended and !this.done) {
|
||||
try this.send("", true);
|
||||
this.finalize();
|
||||
}
|
||||
return flushed;
|
||||
}
|
||||
|
||||
pub fn flush(this: *@This()) JSC.Maybe(void) {
|
||||
_ = this.internalFlush() catch 0;
|
||||
return .{ .result = {} };
|
||||
}
|
||||
pub fn flushFromJS(this: *@This(), globalThis: *JSGlobalObject, _: bool) JSC.Maybe(JSValue) {
|
||||
return .{ .result = JSC.JSPromise.resolvedPromiseValue(globalThis, JSValue.jsNumber(this.internalFlush() catch 0)) };
|
||||
}
|
||||
pub fn destroy(this: *@This()) void {
|
||||
this.finalize();
|
||||
bun.default_allocator.destroy(this);
|
||||
}
|
||||
|
||||
pub fn abort(this: *@This()) void {
|
||||
this.ended = true;
|
||||
this.done = true;
|
||||
this.signal.close(null);
|
||||
this.finalize();
|
||||
}
|
||||
|
||||
pub fn write(this: *@This(), data: StreamResult) StreamResult.Writable {
|
||||
if (this.ended) {
|
||||
return .{ .owned = 0 };
|
||||
}
|
||||
const bytes = data.slice();
|
||||
const len = @as(Blob.SizeType, @truncate(bytes.len));
|
||||
|
||||
if (this.buffer.size() == 0 and len >= this.highWaterMark) {
|
||||
// fast path:
|
||||
// - large-ish chunk
|
||||
this.send(bytes, false) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = len };
|
||||
} else if (this.buffer.size() + len >= this.highWaterMark) {
|
||||
_ = this.buffer.write(bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
_ = this.internalFlush() catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = len };
|
||||
} else {
|
||||
// queue the data wait until highWaterMark is reached or the auto flusher kicks in
|
||||
this.buffer.write(bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
}
|
||||
this.registerAutoFlusher();
|
||||
return .{ .owned = len };
|
||||
}
|
||||
|
||||
pub const writeBytes = write;
|
||||
pub fn writeLatin1(this: *@This(), data: StreamResult) StreamResult.Writable {
|
||||
if (this.ended) {
|
||||
return .{ .owned = 0 };
|
||||
}
|
||||
|
||||
const bytes = data.slice();
|
||||
const len = @as(Blob.SizeType, @truncate(bytes.len));
|
||||
|
||||
if (this.buffer.size() == 0 and len >= this.highWaterMark) {
|
||||
// common case
|
||||
if (strings.isAllASCII(bytes)) {
|
||||
// fast path:
|
||||
// - large-ish chunk
|
||||
this.send(bytes, false) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = len };
|
||||
}
|
||||
|
||||
this.buffer.writeLatin1(bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
|
||||
_ = this.internalFlush() catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = len };
|
||||
} else if (this.buffer.size() + len >= this.highWaterMark) {
|
||||
// kinda fast path:
|
||||
// - combined chunk is large enough to flush automatically
|
||||
this.buffer.writeLatin1(bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
_ = this.internalFlush() catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = len };
|
||||
} else {
|
||||
this.buffer.writeLatin1(bytes) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
}
|
||||
|
||||
this.registerAutoFlusher();
|
||||
|
||||
return .{ .owned = len };
|
||||
}
|
||||
pub fn writeUTF16(this: *@This(), data: StreamResult) StreamResult.Writable {
|
||||
if (this.ended) {
|
||||
return .{ .owned = 0 };
|
||||
}
|
||||
const bytes = data.slice();
|
||||
// we must always buffer UTF-16
|
||||
// we assume the case of all-ascii UTF-16 string is pretty uncommon
|
||||
this.buffer.writeUTF16(@alignCast(std.mem.bytesAsSlice(u16, bytes))) catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
|
||||
const readable = this.buffer.slice();
|
||||
if (readable.len >= this.highWaterMark) {
|
||||
_ = this.internalFlush() catch {
|
||||
return .{ .err = Syscall.Error.fromCode(.NOMEM, .write) };
|
||||
};
|
||||
return .{ .owned = @as(Blob.SizeType, @intCast(bytes.len)) };
|
||||
}
|
||||
|
||||
this.registerAutoFlusher();
|
||||
return .{ .owned = @as(Blob.SizeType, @intCast(bytes.len)) };
|
||||
}
|
||||
|
||||
pub fn end(this: *@This(), err: ?Syscall.Error) JSC.Maybe(void) {
|
||||
if (this.ended) {
|
||||
return .{ .result = {} };
|
||||
}
|
||||
|
||||
// send EOF
|
||||
this.ended = true;
|
||||
// flush everything and send EOF
|
||||
_ = this.internalFlush() catch 0;
|
||||
|
||||
this.signal.close(err);
|
||||
return .{ .result = {} };
|
||||
}
|
||||
pub fn endFromJS(this: *@This(), _: *JSGlobalObject) JSC.Maybe(JSValue) {
|
||||
if (this.ended) {
|
||||
return .{ .result = JSC.JSValue.jsNumber(0) };
|
||||
}
|
||||
|
||||
if (this.done) {
|
||||
this.ended = true;
|
||||
this.signal.close(null);
|
||||
this.finalize();
|
||||
return .{ .result = JSC.JSValue.jsNumber(0) };
|
||||
}
|
||||
_ = this.end(null);
|
||||
return .{ .result = JSC.JSValue.jsNumber(0) };
|
||||
}
|
||||
const name = "FetchTaskletChunkedRequestSink";
|
||||
pub const JSSink = NewJSSink(@This(), name);
|
||||
};
|
||||
pub const BufferedReadableStreamAction = enum {
|
||||
text,
|
||||
arrayBuffer,
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
import { join, resolve } from "path";
|
||||
|
||||
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink"];
|
||||
const classes = [
|
||||
"ArrayBufferSink",
|
||||
"FileSink",
|
||||
"HTTPResponseSink",
|
||||
"HTTPSResponseSink",
|
||||
"FetchTaskletChunkedRequestSink",
|
||||
];
|
||||
|
||||
function names(name) {
|
||||
return {
|
||||
|
||||
219
src/http.zig
219
src/http.zig
@@ -69,7 +69,7 @@ var shared_request_headers_buf: [256]picohttp.Header = undefined;
|
||||
// this doesn't need to be stack memory because it is immediately cloned after use
|
||||
var shared_response_headers_buf: [256]picohttp.Header = undefined;
|
||||
|
||||
const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
|
||||
pub const end_of_chunked_http1_1_encoding_response_body = "0\r\n\r\n";
|
||||
|
||||
pub const Signals = struct {
|
||||
header_progress: ?*std.atomic.Value(bool) = null,
|
||||
@@ -118,11 +118,31 @@ pub const FetchRedirect = enum(u8) {
|
||||
pub const HTTPRequestBody = union(enum) {
|
||||
bytes: []const u8,
|
||||
sendfile: Sendfile,
|
||||
stream: struct {
|
||||
buffer: bun.io.StreamBuffer,
|
||||
ended: bool,
|
||||
|
||||
pub fn hasEnded(this: *@This()) bool {
|
||||
return this.ended and this.buffer.isEmpty();
|
||||
}
|
||||
},
|
||||
|
||||
pub fn isStream(this: *const HTTPRequestBody) bool {
|
||||
return this.* == .stream;
|
||||
}
|
||||
|
||||
pub fn deinit(this: *HTTPRequestBody) void {
|
||||
switch (this.*) {
|
||||
.sendfile, .bytes => {},
|
||||
.stream => |*stream| stream.buffer.deinit(),
|
||||
}
|
||||
}
|
||||
pub fn len(this: *const HTTPRequestBody) usize {
|
||||
return switch (this.*) {
|
||||
.bytes => this.bytes.len,
|
||||
.sendfile => this.sendfile.content_size,
|
||||
// unknow amounts
|
||||
.stream => std.math.maxInt(usize),
|
||||
};
|
||||
}
|
||||
};
|
||||
@@ -555,6 +575,13 @@ fn NewHTTPContext(comptime ssl: bool) type {
|
||||
return ActiveSocket.from(bun.cast(**anyopaque, ptr).*);
|
||||
}
|
||||
|
||||
pub fn getTaggedFromSocket(socket: HTTPSocket) ActiveSocket {
|
||||
if (socket.ext(anyopaque)) |ctx| {
|
||||
return getTagged(ctx);
|
||||
}
|
||||
return ActiveSocket.init(&dead_socket);
|
||||
}
|
||||
|
||||
pending_sockets: HiveArray(PooledSocket, pool_size) = HiveArray(PooledSocket, pool_size).init(),
|
||||
us_socket_context: *uws.SocketContext,
|
||||
|
||||
@@ -1001,6 +1028,7 @@ fn NewHTTPContext(comptime ssl: bool) type {
|
||||
const UnboundedQueue = @import("./bun.js/unbounded_queue.zig").UnboundedQueue;
|
||||
const Queue = UnboundedQueue(AsyncHTTP, .next);
|
||||
const ShutdownQueue = UnboundedQueue(AsyncHTTP, .next);
|
||||
const RequestWriteQueue = UnboundedQueue(AsyncHTTP, .next);
|
||||
|
||||
pub const HTTPThread = struct {
|
||||
loop: *JSC.MiniEventLoop,
|
||||
@@ -1010,7 +1038,10 @@ pub const HTTPThread = struct {
|
||||
queued_tasks: Queue = Queue{},
|
||||
|
||||
queued_shutdowns: std.ArrayListUnmanaged(ShutdownMessage) = std.ArrayListUnmanaged(ShutdownMessage){},
|
||||
queued_writes: std.ArrayListUnmanaged(WriteMessage) = std.ArrayListUnmanaged(WriteMessage){},
|
||||
|
||||
queued_shutdowns_lock: bun.Lock = .{},
|
||||
queued_writes_lock: bun.Lock = .{},
|
||||
|
||||
queued_proxy_deref: std.ArrayListUnmanaged(*ProxyTunnel) = std.ArrayListUnmanaged(*ProxyTunnel){},
|
||||
|
||||
@@ -1020,7 +1051,14 @@ pub const HTTPThread = struct {
|
||||
lazy_libdeflater: ?*LibdeflateState = null,
|
||||
|
||||
const threadlog = Output.scoped(.HTTPThread, true);
|
||||
|
||||
const WriteMessage = struct {
|
||||
data: []const u8,
|
||||
async_http_id: u32,
|
||||
flags: packed struct {
|
||||
is_tls: bool,
|
||||
ended: bool,
|
||||
},
|
||||
};
|
||||
const ShutdownMessage = struct {
|
||||
async_http_id: u32,
|
||||
is_tls: bool,
|
||||
@@ -1207,6 +1245,57 @@ pub const HTTPThread = struct {
|
||||
}
|
||||
this.queued_shutdowns.clearRetainingCapacity();
|
||||
}
|
||||
{
|
||||
this.queued_writes_lock.lock();
|
||||
defer this.queued_writes_lock.unlock();
|
||||
for (this.queued_writes.items) |write| {
|
||||
const ended = write.flags.ended;
|
||||
defer if (!strings.eqlComptime(write.data, end_of_chunked_http1_1_encoding_response_body)) {
|
||||
// "0\r\n\r\n" is always a static so no need to free
|
||||
bun.default_allocator.free(write.data);
|
||||
};
|
||||
if (socket_async_http_abort_tracker.get(write.async_http_id)) |socket_ptr| {
|
||||
if (write.flags.is_tls) {
|
||||
const socket = uws.SocketTLS.fromAny(socket_ptr);
|
||||
if (socket.isClosed() or socket.isShutdown()) {
|
||||
continue;
|
||||
}
|
||||
const tagged = NewHTTPContext(true).getTaggedFromSocket(socket);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
if (client.state.original_request_body == .stream) {
|
||||
var stream = &client.state.original_request_body.stream;
|
||||
stream.buffer.write(write.data) catch {};
|
||||
stream.ended = ended;
|
||||
}
|
||||
client.onWritable(
|
||||
false,
|
||||
true,
|
||||
socket,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const socket = uws.SocketTCP.fromAny(socket_ptr);
|
||||
if (socket.isClosed() or socket.isShutdown()) {
|
||||
continue;
|
||||
}
|
||||
const tagged = NewHTTPContext(false).getTaggedFromSocket(socket);
|
||||
if (tagged.get(HTTPClient)) |client| {
|
||||
if (client.state.original_request_body == .stream) {
|
||||
var stream = &client.state.original_request_body.stream;
|
||||
stream.buffer.write(write.data) catch {};
|
||||
stream.ended = ended;
|
||||
}
|
||||
client.onWritable(
|
||||
false,
|
||||
false,
|
||||
socket,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.queued_writes.clearRetainingCapacity();
|
||||
}
|
||||
|
||||
while (this.queued_proxy_deref.popOrNull()) |http| {
|
||||
http.deref();
|
||||
@@ -1282,6 +1371,23 @@ pub const HTTPThread = struct {
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn scheduleRequestWrite(this: *@This(), http: *AsyncHTTP, data: []const u8, ended: bool) void {
|
||||
{
|
||||
this.queued_writes_lock.lock();
|
||||
defer this.queued_writes_lock.unlock();
|
||||
this.queued_writes.append(bun.default_allocator, .{
|
||||
.async_http_id = http.async_http_id,
|
||||
.data = data,
|
||||
.flags = .{
|
||||
.is_tls = http.client.isHTTPS(),
|
||||
.ended = ended,
|
||||
},
|
||||
}) catch bun.outOfMemory();
|
||||
}
|
||||
if (this.has_awoken.load(.monotonic))
|
||||
this.loop.loop.wakeup();
|
||||
}
|
||||
|
||||
pub fn scheduleProxyDeref(this: *@This(), proxy: *ProxyTunnel) void {
|
||||
// this is always called on the http thread
|
||||
{
|
||||
@@ -1847,6 +1953,7 @@ pub const InternalState = struct {
|
||||
info.deinit(bun.default_allocator);
|
||||
}
|
||||
|
||||
this.original_request_body.deinit();
|
||||
this.* = .{
|
||||
.body_out_str = body_msg,
|
||||
.compressed_body = MutableString{ .allocator = default_allocator, .list = .{} },
|
||||
@@ -2000,6 +2107,7 @@ pub const Flags = packed struct {
|
||||
proxy_tunneling: bool = false,
|
||||
reject_unauthorized: bool = true,
|
||||
is_preconnect_only: bool = false,
|
||||
is_streaming_request_body: bool = false,
|
||||
};
|
||||
|
||||
// TODO: reduce the size of this struct
|
||||
@@ -2131,6 +2239,7 @@ pub const Encoding = enum {
|
||||
|
||||
const host_header_name = "Host";
|
||||
const content_length_header_name = "Content-Length";
|
||||
const chunked_encoded_header = picohttp.Header{ .name = "Transfer-Encoding", .value = "chunked" };
|
||||
const connection_header = picohttp.Header{ .name = "Connection", .value = "keep-alive" };
|
||||
const connection_closing_header = picohttp.Header{ .name = "Connection", .value = "close" };
|
||||
const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" };
|
||||
@@ -2745,10 +2854,14 @@ pub fn buildRequest(this: *HTTPClient, body_len: usize) picohttp.Request {
|
||||
}
|
||||
|
||||
if (body_len > 0 or this.method.hasRequestBody()) {
|
||||
request_headers_buf[header_count] = .{
|
||||
.name = content_length_header_name,
|
||||
.value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0",
|
||||
};
|
||||
if (this.flags.is_streaming_request_body) {
|
||||
request_headers_buf[header_count] = chunked_encoded_header;
|
||||
} else {
|
||||
request_headers_buf[header_count] = .{
|
||||
.name = content_length_header_name,
|
||||
.value = std.fmt.bufPrint(&this.request_content_len_buf, "{d}", .{body_len}) catch "0",
|
||||
};
|
||||
}
|
||||
header_count += 1;
|
||||
}
|
||||
|
||||
@@ -2766,8 +2879,16 @@ pub fn doRedirect(
|
||||
ctx: *NewHTTPContext(is_ssl),
|
||||
socket: NewHTTPContext(is_ssl).HTTPSocket,
|
||||
) void {
|
||||
if (this.state.original_request_body == .stream) {
|
||||
// we cannot follow redirect from a stream right now
|
||||
// NOTE: we can use .tee(), reset the readable stream and cancel/wait pending write requests before redirecting. node.js just errors here so we just closeAndFail too.
|
||||
this.closeAndFail(error.UnexpectedRedirect, is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
this.unix_socket_path.deinit();
|
||||
this.unix_socket_path = JSC.ZigString.Slice.empty;
|
||||
// TODO: what we do with stream body?
|
||||
const request_body = if (this.state.flags.resend_request_body_on_redirect and this.state.original_request_body == .bytes)
|
||||
this.state.original_request_body.bytes
|
||||
else
|
||||
@@ -3032,6 +3153,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
this.state.request_stage = .proxy_handshake;
|
||||
} else {
|
||||
this.state.request_stage = .body;
|
||||
if (this.flags.is_streaming_request_body) {
|
||||
// lets signal to start streaming the body
|
||||
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -3041,11 +3166,15 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
this.state.request_stage = .proxy_handshake;
|
||||
} else {
|
||||
this.state.request_stage = .body;
|
||||
if (this.flags.is_streaming_request_body) {
|
||||
// lets signal to start streaming the body
|
||||
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
}
|
||||
}
|
||||
assert(
|
||||
// we should have leftover data OR we use sendfile()
|
||||
// we should have leftover data OR we use sendfile/stream
|
||||
(this.state.original_request_body == .bytes and this.state.request_body.len > 0) or
|
||||
this.state.original_request_body == .sendfile,
|
||||
this.state.original_request_body == .sendfile or this.state.original_request_body == .stream,
|
||||
);
|
||||
|
||||
// we sent everything, but there's some body leftover
|
||||
@@ -3076,6 +3205,28 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
return;
|
||||
}
|
||||
},
|
||||
.stream => {
|
||||
var stream = &this.state.original_request_body.stream;
|
||||
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
|
||||
if (stream.buffer.isNotEmpty()) {
|
||||
const to_send = stream.buffer.slice();
|
||||
const amount = socket.write(to_send, true);
|
||||
if (amount < 0) {
|
||||
// this.closeAndFail(error.WriteFailed, is_ssl, socket);
|
||||
return;
|
||||
}
|
||||
this.state.request_sent_len += @as(usize, @intCast(amount));
|
||||
stream.buffer.cursor = @intCast(amount);
|
||||
if (stream.buffer.isEmpty()) {
|
||||
stream.buffer.reset();
|
||||
}
|
||||
}
|
||||
if (stream.hasEnded()) {
|
||||
this.state.request_stage = .done;
|
||||
stream.buffer.deinit();
|
||||
return;
|
||||
}
|
||||
},
|
||||
.sendfile => |*sendfile| {
|
||||
if (comptime is_ssl) {
|
||||
@panic("sendfile is only supported without SSL. This code should never have been reached!");
|
||||
@@ -3098,21 +3249,44 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
}
|
||||
},
|
||||
.proxy_body => {
|
||||
if (this.state.original_request_body != .bytes) {
|
||||
@panic("sendfile is only supported without SSL. This code should never have been reached!");
|
||||
}
|
||||
if (this.proxy_tunnel) |proxy| {
|
||||
this.setTimeout(socket, 5);
|
||||
switch (this.state.original_request_body) {
|
||||
.bytes => {
|
||||
this.setTimeout(socket, 5);
|
||||
|
||||
const to_send = this.state.request_body;
|
||||
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
|
||||
const to_send = this.state.request_body;
|
||||
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
|
||||
|
||||
this.state.request_sent_len += @as(usize, @intCast(amount));
|
||||
this.state.request_body = this.state.request_body[@as(usize, @intCast(amount))..];
|
||||
this.state.request_sent_len += @as(usize, @intCast(amount));
|
||||
this.state.request_body = this.state.request_body[@as(usize, @intCast(amount))..];
|
||||
|
||||
if (this.state.request_body.len == 0) {
|
||||
this.state.request_stage = .done;
|
||||
return;
|
||||
if (this.state.request_body.len == 0) {
|
||||
this.state.request_stage = .done;
|
||||
return;
|
||||
}
|
||||
},
|
||||
.stream => {
|
||||
var stream = &this.state.original_request_body.stream;
|
||||
|
||||
// to simplify things here the buffer contains the raw data we just need to flush to the socket it
|
||||
if (stream.buffer.isNotEmpty()) {
|
||||
const to_send = stream.buffer.slice();
|
||||
const amount = proxy.writeData(to_send) catch return; // just wait and retry when onWritable! if closed internally will call proxy.onClose
|
||||
this.state.request_sent_len += amount;
|
||||
stream.buffer.cursor = @truncate(amount);
|
||||
if (stream.buffer.isEmpty()) {
|
||||
stream.buffer.reset();
|
||||
}
|
||||
}
|
||||
if (stream.hasEnded()) {
|
||||
this.state.request_stage = .done;
|
||||
stream.buffer.deinit();
|
||||
return;
|
||||
}
|
||||
},
|
||||
.sendfile => {
|
||||
@panic("sendfile is only supported without SSL. This code should never have been reached!");
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -3175,6 +3349,10 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
|
||||
|
||||
if (has_sent_headers) {
|
||||
this.state.request_stage = .proxy_body;
|
||||
if (this.flags.is_streaming_request_body) {
|
||||
// lets signal to start streaming the body
|
||||
this.progressUpdate(is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
}
|
||||
assert(this.state.request_body.len > 0);
|
||||
|
||||
// we sent everything, but there's some body leftover
|
||||
@@ -3558,6 +3736,7 @@ pub const HTTPClientResult = struct {
|
||||
body: ?*MutableString = null,
|
||||
has_more: bool = false,
|
||||
redirected: bool = false,
|
||||
can_stream: bool = false,
|
||||
|
||||
fail: ?anyerror = null,
|
||||
|
||||
@@ -3665,6 +3844,8 @@ pub fn toResult(this: *HTTPClient) HTTPClientResult {
|
||||
.has_more = certificate_info != null or (this.state.fail == null and !this.state.isDone()),
|
||||
.body_size = body_size,
|
||||
.certificate_info = certificate_info,
|
||||
// we can stream the request_body at this stage
|
||||
.can_stream = (this.state.request_stage == .body or this.state.request_stage == .proxy_body) and this.flags.is_streaming_request_body,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
13
test/js/web/fetch/fetch-leak-test-fixture-5.js
generated
13
test/js/web/fetch/fetch-leak-test-fixture-5.js
generated
@@ -66,6 +66,19 @@ function getBody() {
|
||||
case "urlsearchparams":
|
||||
body = getURLSearchParams();
|
||||
break;
|
||||
case "iterator":
|
||||
body = async function* iter() {
|
||||
yield (cachedBody ??= getString());
|
||||
};
|
||||
break;
|
||||
case "stream":
|
||||
body = new ReadableStream({
|
||||
start(c) {
|
||||
c.enqueue((cachedBody ??= getBuffer()));
|
||||
c.close();
|
||||
},
|
||||
});
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Invalid type: ${type}`);
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ describe("fetch doesn't leak", () => {
|
||||
}
|
||||
});
|
||||
|
||||
describe.each(["FormData", "Blob", "Buffer", "String", "URLSearchParams"])("Sending %s", type => {
|
||||
describe.each(["FormData", "Blob", "Buffer", "String", "URLSearchParams", "stream", "iterator"])("Sending %s", type => {
|
||||
test(
|
||||
"does not leak",
|
||||
async () => {
|
||||
|
||||
@@ -6,7 +6,7 @@ import { mkfifo } from "mkfifo";
|
||||
import net from "net";
|
||||
import { join } from "path";
|
||||
import { gzipSync } from "zlib";
|
||||
|
||||
import { Readable } from "stream";
|
||||
const tmp_dir = tmpdirSync();
|
||||
|
||||
const fixture = readFileSync(join(import.meta.dir, "fetch.js.txt"), "utf8").replaceAll("\r\n", "\n");
|
||||
@@ -2074,3 +2074,188 @@ describe("fetch Response life cycle", () => {
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("fetch should allow duplex", () => {
|
||||
it("should allow duplex streaming", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
const intervalStream = new ReadableStream({
|
||||
start(c) {
|
||||
let count = 0;
|
||||
const timer = setInterval(() => {
|
||||
c.enqueue("Hello\n");
|
||||
if (count === 5) {
|
||||
clearInterval(timer);
|
||||
c.close();
|
||||
}
|
||||
count++;
|
||||
}, 20);
|
||||
},
|
||||
}).pipeThrough(new TextEncoderStream());
|
||||
|
||||
const resp = await fetch(server.url, {
|
||||
method: "POST",
|
||||
body: intervalStream,
|
||||
duplex: "half",
|
||||
});
|
||||
|
||||
const reader = resp.body.pipeThrough(new TextDecoderStream()).getReader();
|
||||
var result = "";
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
result += value;
|
||||
}
|
||||
expect(result).toBe("Hello\n".repeat(6));
|
||||
});
|
||||
|
||||
it("should allow duplex extending Readable (sync)", async () => {
|
||||
class HelloWorldStream extends Readable {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this.chunks = ["Hello", " ", "World!"];
|
||||
this.index = 0;
|
||||
}
|
||||
|
||||
_read(size) {
|
||||
if (this.index < this.chunks.length) {
|
||||
this.push(this.chunks[this.index]);
|
||||
this.index++;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
const response = await fetch(server.url, {
|
||||
body: new HelloWorldStream(),
|
||||
method: "POST",
|
||||
duplex: "half",
|
||||
});
|
||||
|
||||
expect(await response.text()).toBe("Hello World!");
|
||||
});
|
||||
it("should allow duplex extending Readable (async)", async () => {
|
||||
class HelloWorldStream extends Readable {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this.chunks = ["Hello", " ", "World!"];
|
||||
this.index = 0;
|
||||
}
|
||||
|
||||
_read(size) {
|
||||
setTimeout(() => {
|
||||
if (this.index < this.chunks.length) {
|
||||
this.push(this.chunks[this.index]);
|
||||
this.index++;
|
||||
} else {
|
||||
this.push(null);
|
||||
}
|
||||
}, 20);
|
||||
}
|
||||
}
|
||||
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
const response = await fetch(server.url, {
|
||||
body: new HelloWorldStream(),
|
||||
method: "POST",
|
||||
duplex: "half",
|
||||
});
|
||||
|
||||
expect(await response.text()).toBe("Hello World!");
|
||||
});
|
||||
|
||||
it("should allow duplex using async iterator (async)", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
const response = await fetch(server.url, {
|
||||
body: async function* iter() {
|
||||
yield "Hello";
|
||||
await Bun.sleep(20);
|
||||
yield " ";
|
||||
await Bun.sleep(20);
|
||||
yield "World!";
|
||||
},
|
||||
method: "POST",
|
||||
duplex: "half",
|
||||
});
|
||||
|
||||
expect(await response.text()).toBe("Hello World!");
|
||||
});
|
||||
|
||||
it("should fail in redirects .follow when using duplex", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
if (req.url.indexOf("/redirect") === -1) {
|
||||
return Response.redirect("/");
|
||||
}
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
|
||||
expect(async () => {
|
||||
const response = await fetch(server.url, {
|
||||
body: async function* iter() {
|
||||
yield "Hello";
|
||||
await Bun.sleep(20);
|
||||
yield " ";
|
||||
await Bun.sleep(20);
|
||||
yield "World!";
|
||||
},
|
||||
method: "POST",
|
||||
duplex: "half",
|
||||
});
|
||||
|
||||
await response.text();
|
||||
}).toThrow();
|
||||
});
|
||||
|
||||
it("should work in redirects .manual when using duplex", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
async fetch(req) {
|
||||
if (req.url.indexOf("/redirect") === -1) {
|
||||
return Response.redirect("/");
|
||||
}
|
||||
return new Response(req.body);
|
||||
},
|
||||
});
|
||||
|
||||
expect(async () => {
|
||||
const response = await fetch(server.url, {
|
||||
body: async function* iter() {
|
||||
yield "Hello";
|
||||
await Bun.sleep(20);
|
||||
yield " ";
|
||||
await Bun.sleep(20);
|
||||
yield "World!";
|
||||
},
|
||||
method: "POST",
|
||||
duplex: "half",
|
||||
redirect: "manual",
|
||||
});
|
||||
|
||||
await response.text();
|
||||
}).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user