Compare commits

...

17 Commits

Author SHA1 Message Date
Ciro Spaciari
dc85530b30 Merge branch 'main' into ciro/fix-Bun.write 2024-12-17 14:18:30 -08:00
Ciro Spaciari
c1812f98c8 Merge branch 'main' into ciro/fix-Bun.write 2024-12-16 11:20:03 -08:00
Jarred Sumner
038c818463 Merge branch 'main' into ciro/fix-Bun.write 2024-12-15 07:19:51 -08:00
Jarred Sumner
2158a21fa0 Merge branch 'main' into ciro/fix-Bun.write 2024-12-12 01:38:45 -08:00
Ciro Spaciari
4389d20017 Merge branch 'main' into ciro/fix-Bun.write 2024-12-11 11:21:50 -08:00
Jarred Sumner
483083b6e0 Merge branch 'main' into ciro/fix-Bun.write 2024-12-10 20:35:52 -08:00
Jarred Sumner
613b94527d Merge branch 'main' into ciro/fix-Bun.write 2024-12-09 20:59:46 -08:00
Ciro Spaciari
843954c3e6 Merge branch 'main' into ciro/fix-Bun.write 2024-12-09 18:58:29 -08:00
Ciro Spaciari
17e6205c2c Merge branch 'main' into ciro/fix-Bun.write 2024-12-09 16:21:19 -08:00
Ciro Spaciari
79c4d016eb Update src/bun.js/webcore/streams.zig 2024-12-09 16:20:54 -08:00
cirospaciari
b831435cbf fix windows 2024-12-09 15:47:56 -08:00
Jarred Sumner
c76c787da6 Merge branch 'main' into ciro/fix-Bun.write 2024-12-08 09:36:37 -08:00
Jarred Sumner
5dd6edf386 Add more tests 2024-12-06 19:13:16 -08:00
Ciro Spaciari
c85140ce6d opsie 2024-12-06 18:55:29 -08:00
Ciro Spaciari
18b498a2b4 fix 2024-12-06 18:52:47 -08:00
Ciro Spaciari
a9dd69f91e use a loop 2024-12-06 15:09:00 -08:00
Ciro Spaciari
3b6537af02 one more attempt to deflaky this 2024-12-06 15:01:19 -08:00
8 changed files with 341 additions and 19 deletions

View File

@@ -4371,6 +4371,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onResolveRequestStream;
} else if (handler == Bun__FetchTasklet__onRejectRequestStream) {
return GlobalObject::PromiseFunctions::Bun__FetchTasklet__onRejectRequestStream;
} else if (handler == Bun__BlobToFileSink__onResolveStream) {
return GlobalObject::PromiseFunctions::Bun__BlobToFileSink__onResolveStream;
} else if (handler == Bun__BlobToFileSink__onRejectStream) {
return GlobalObject::PromiseFunctions::Bun__BlobToFileSink__onRejectStream;
} else {
RELEASE_ASSERT_NOT_REACHED();
}

View File

@@ -336,8 +336,10 @@ public:
Bun__onRejectEntryPointResult,
Bun__FetchTasklet__onRejectRequestStream,
Bun__FetchTasklet__onResolveRequestStream,
Bun__BlobToFileSink__onResolveStream,
Bun__BlobToFileSink__onRejectStream,
};
static constexpr size_t promiseFunctionsSize = 26;
static constexpr size_t promiseFunctionsSize = 28;
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC__JSGlobalObject* arg0, JSC__CallFrame* arg1));

View File

@@ -973,6 +973,8 @@ comptime {
BodyValueBuffererContext.shim.ref();
_ = Bun__LoadLibraryBunString;
JSC.WebCore.BlobToFileSink.shim.ref();
}
}

View File

@@ -869,4 +869,7 @@ BUN_DECLARE_HOST_FUNCTION(Bun__onRejectEntryPointResult);
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onResolveRequestStream);
BUN_DECLARE_HOST_FUNCTION(Bun__FetchTasklet__onRejectRequestStream);
BUN_DECLARE_HOST_FUNCTION(Bun__BlobToFileSink__onResolveStream);
BUN_DECLARE_HOST_FUNCTION(Bun__BlobToFileSink__onRejectStream);
#endif

View File

@@ -73,7 +73,7 @@ const NewReadFileHandler = @import("./blob/ReadFile.zig").NewReadFileHandler;
const WriteFile = @import("./blob/WriteFile.zig").WriteFile;
const ReadFile = @import("./blob/ReadFile.zig").ReadFile;
const WriteFileWindows = @import("./blob/WriteFile.zig").WriteFileWindows;
const FileSink = JSC.WebCore.FileSink;
pub const Blob = struct {
const bloblog = Output.scoped(.Blob, false);
@@ -1141,6 +1141,27 @@ pub const Blob = struct {
return JSC.JSPromise.rejectedPromiseValue(globalThis, err_ref.toJS(globalThis));
},
.Locked => {
if ((response.body.value == .Locked and (response.body.value.Locked.action != .none or response.body.value.Locked.isDisturbed(Response, globalThis, data)))) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}
if (response.body.value.Locked.readable.get()) |stream| {
if (stream.isDisturbed(globalThis)) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}
if (Environment.isWindows) {
// TODO: make FileSink work on every case on Windows
if (destination_blob.store.?.data.file.pathlike == .path) {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
} else {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
}
// TODO: removing this using toReadableStream in a followup, fixing HTMLRewriter.transform will be needed.
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
@@ -1172,6 +1193,25 @@ pub const Blob = struct {
return JSC.JSPromise.rejectedPromiseValue(globalThis, err_ref.toJS(globalThis));
},
.Locked => {
if ((request.body.value == .Locked and (request.body.value.Locked.action != .none or request.body.value.Locked.isDisturbed(Request, globalThis, data)))) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Request body already used", .{}).toJS());
}
if (request.body.value.Locked.readable.get()) |stream| {
if (stream.isDisturbed(globalThis)) {
destination_blob.detach();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("Response body already used", .{}).toJS());
}
if (Environment.isWindows) {
// TODO: make FileSink work on every case on Windows
if (destination_blob.store.?.data.file.pathlike == .path) {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
} else {
return BlobToFileSink.consume(globalThis, destination_blob, stream);
}
}
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
@@ -1181,7 +1221,6 @@ pub const Blob = struct {
request.body.value.Locked.task = task;
request.body.value.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
return task.promise.value();
},
}
@@ -4771,6 +4810,157 @@ pub const Blob = struct {
}
};
pub const BlobToFileSink = struct {
blob: Blob,
sink: FileSink.JSSink,
stream: JSC.WebCore.ReadableStream.Strong,
pub usingnamespace bun.New(BlobToFileSink);
pub fn deinit(this: *@This()) void {
this.sink.sink.finalize();
this.blob.detach();
this.stream.deinit();
this.destroy();
}
pub fn onResolveStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
var args = callframe.arguments_old(2);
var this: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
this.deinit();
return JSValue.jsUndefined();
}
pub fn onRejectStream(_: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(@This());
this.deinit();
return JSValue.jsUndefined();
}
pub const shim = JSC.Shimmer("Bun", "BlobToFileSink", @This());
pub const Export = shim.exportFunctions(.{
.onResolveStream = onResolveStream,
.onRejectStream = onRejectStream,
});
comptime {
const jsonResolveRequestStream = JSC.toJSHostFunction(onResolveStream);
@export(jsonResolveRequestStream, .{ .name = Export[0].symbol_name });
const jsonRejectRequestStream = JSC.toJSHostFunction(onRejectStream);
@export(jsonRejectRequestStream, .{ .name = Export[1].symbol_name });
}
pub fn consume(globalThis: *JSGlobalObject, destination_blob: Blob, stream: JSC.WebCore.ReadableStream) JSValue {
bun.assert(destination_blob.store != null);
if (destination_blob.store.?.data != .file) {
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.createInvalidArgs("Blob is read-only", .{}));
}
// lets do something similar to ReadableStream piping in a Bun.file(filename).writer()
var this = BlobToFileSink.new(.{
.blob = destination_blob,
.sink = .{
.sink = .{
.event_loop_handle = JSC.EventLoopHandle.init(JSC.VirtualMachine.get().eventLoop()),
.fd = bun.invalid_fd,
},
},
.stream = .{},
});
this.sink.sink.writer.setParent(&this.sink.sink);
const path = destination_blob.store.?.data.file.pathlike;
const input_path: JSC.WebCore.PathOrFileDescriptor = brk: {
if (path == .fd) {
break :brk .{ .fd = path.fd };
} else {
break :brk .{
.path = ZigString.Slice.fromUTF8NeverFree(
path.path.slice(),
).clone(
globalThis.allocator(),
) catch bun.outOfMemory(),
};
}
};
defer input_path.deinit();
switch (this.sink.sink.start(.{
.FileSink = .{
.input_path = input_path,
},
})) {
.err => |err| {
this.deinit();
return JSC.JSPromise.rejectedPromiseValue(globalThis, err.toJSC(globalThis));
},
else => {},
}
var signal = &this.sink.sink.signal;
signal.* = FileSink.JSSink.SinkSignal.init(JSValue.zero);
// explicitly set it to a dead pointer
// we use this memory address to disable signals being sent
signal.clear();
bun.assert(signal.isDead());
// We are already corked!
const assignment_result: JSValue = FileSink.JSSink.assignToStream(
globalThis,
stream.value,
&this.sink,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
// assert that it was updated
bun.assert(!signal.isDead());
if (assignment_result.toError()) |err_value| {
this.deinit();
return JSC.JSPromise.rejectedPromiseValue(globalThis, err_value);
}
if (!assignment_result.isEmptyOrUndefinedOrNull()) {
globalThis.bunVM().drainMicrotasks();
assignment_result.ensureStillAlive();
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
this.stream = JSC.WebCore.ReadableStream.Strong.init(stream, globalThis);
assignment_result.then(
globalThis,
this,
onResolveStream,
onRejectStream,
);
return assignment_result;
},
.fulfilled, .rejected => {
this.deinit();
return assignment_result;
},
}
} else {
// if is not a promise we treat it as Error
this.deinit();
return JSC.JSPromise.rejectedPromiseValue(globalThis, assignment_result);
}
}
this.deinit();
return JSC.JSPromise.rejectedPromiseValue(globalThis, globalThis.ERR_BODY_ALREADY_USED("body already used", .{}).toJS());
}
};
pub const AnyBlob = union(enum) {
Blob: Blob,
// InlineBlob: InlineBlob,

View File

@@ -1230,11 +1230,11 @@ pub fn WindowsStreamingWriter(
onWrite(this.parent, written, if (done) .drained else .pending);
// process pending outgoing data if any
this.processSend();
// TODO: should we report writable?
if (onWritable) |onWritableFn| {
onWritableFn(this.parent);
if (!done and this.processSend()) {
// TODO: should we report writable?
if (onWritable) |onWritableFn| {
onWritableFn(this.parent);
}
}
}
@@ -1257,19 +1257,20 @@ pub fn WindowsStreamingWriter(
}
/// this tries to send more data returning if we are writable or not after this
fn processSend(this: *WindowsWriter) void {
/// returns true if not closed, is unsafe to access this after this returns false
fn processSend(this: *WindowsWriter) bool {
log("processSend", .{});
if (this.current_payload.isNotEmpty()) {
// we have some pending async request, the next outgoing data will be processed after this finish
this.last_write_result = .{ .pending = 0 };
return;
return true;
}
const bytes = this.outgoing.slice();
// nothing todo (we assume we are writable until we try to write something)
if (bytes.len == 0) {
this.last_write_result = .{ .wrote = 0 };
return;
return true;
}
var pipe = this.source orelse {
@@ -1277,7 +1278,7 @@ pub fn WindowsStreamingWriter(
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
};
// current payload is empty we can just swap with outgoing
@@ -1297,7 +1298,7 @@ pub fn WindowsStreamingWriter(
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
}
},
else => {
@@ -1307,11 +1308,12 @@ pub fn WindowsStreamingWriter(
this.last_write_result = .{ .err = err };
onError(this.parent, err);
this.closeWithoutReporting();
return;
return false;
}
},
}
this.last_write_result = .{ .pending = 0 };
return true;
}
const WindowsWriter = @This();
@@ -1372,8 +1374,11 @@ pub fn WindowsStreamingWriter(
if (had_buffered_data) {
return .{ .pending = 0 };
}
this.processSend();
return this.last_write_result;
if (this.processSend()) {
return this.last_write_result;
} else {
return .{ .done = 0 };
}
}
pub fn writeUTF16(this: *WindowsWriter, buf: []const u16) WriteResult {
@@ -1396,8 +1401,11 @@ pub fn WindowsStreamingWriter(
return .{ .wrote = 0 };
}
this.processSend();
return this.last_write_result;
if (this.processSend()) {
return this.last_write_result;
} else {
return .{ .wrote = 0 };
}
}
pub fn end(this: *WindowsWriter) void {

View File

@@ -1328,6 +1328,21 @@ pub fn openatA(dirfd: bun.FileDescriptor, file_path: []const u8, flags: bun.Mode
}
pub fn openA(file_path: []const u8, flags: bun.Mode, perm: bun.Mode) Maybe(bun.FileDescriptor) {
if (comptime Environment.isWindows) {
if (file_path.len > bun.MAX_PATH_BYTES) {
return .{
.err = .{
.errno = @intFromEnum(bun.C.E.NAMETOOLONG),
.syscall = .open,
},
};
}
var buffer: [bun.MAX_PATH_BYTES + 1]u8 = undefined;
@memcpy(buffer[0..file_path.len], file_path);
buffer[file_path.len] = 0;
return sys_uv.open(buffer[0..file_path.len :0], flags, perm);
}
// this is what open() does anyway.
return openatA(bun.toFD((std.fs.cwd().fd)), file_path, flags, perm);
}
@@ -2922,6 +2937,17 @@ pub fn openNullDevice() Maybe(bun.FileDescriptor) {
pub fn dupWithFlags(fd: bun.FileDescriptor, flags: i32) Maybe(bun.FileDescriptor) {
if (comptime Environment.isWindows) {
if (Environment.isWindows) {
if (!fd.isValid()) {
// we cannot dupe an invalid fd
return .{
.err = .{
.errno = @intFromEnum(bun.C.SystemErrno.EINVAL),
.syscall = .dup,
},
};
}
}
var target: windows.HANDLE = undefined;
const process = kernel32.GetCurrentProcess();
const out = kernel32.DuplicateHandle(

View File

@@ -1,6 +1,6 @@
import { describe, expect, it, test } from "bun:test";
import fs, { mkdirSync } from "fs";
import { bunEnv, bunExe, gcTick, isWindows, withoutAggressiveGC } from "harness";
import { bunEnv, bunExe, gcTick, isWindows, tempDirWithFiles, withoutAggressiveGC } from "harness";
import { tmpdir } from "os";
import path, { join } from "path";
const tmpbase = tmpdir() + path.sep;
@@ -527,3 +527,90 @@ if (isWindows && !IS_UV_FS_COPYFILE_DISABLED) {
expect(await exited).toBe(0);
}, 10000);
}
describe("Bun.write() with Response(ReadableStream)", () => {
it("should write Response(req.body) from HTTP request", async () => {
const tempdir = tempDirWithFiles("response and request", {
"response.txt": "",
});
const response_filename = join(tempdir, "response.txt");
using server = Bun.serve({
port: 0,
async fetch(req) {
await Bun.write(response_filename, new Response(req.body));
return new Response("ok");
},
});
await fetch(server.url, { method: "POST", body: "Bun" });
expect(await Bun.file(response_filename).text()).toBe("Bun");
});
it("should write Response(ReadableStream) from string chunks", async () => {
const tempdir = tempDirWithFiles("stream chunks", {
"output.txt": "",
});
const output_file = join(tempdir, "output.txt");
const stream = new ReadableStream({
start(controller) {
controller.enqueue("Hello");
controller.enqueue(" ");
controller.enqueue("World");
controller.close();
},
});
await Bun.write(output_file, new Response(stream));
expect(await Bun.file(output_file).text()).toBe("Hello World");
});
it("should write Response(ReadableStream) from string chunks with delay in pull", async () => {
const tempdir = tempDirWithFiles("stream chunks", {
"output.txt": "",
});
const output_file = join(tempdir, "output.txt");
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue("Hello");
await Bun.sleep(0);
controller.enqueue(" ");
await Bun.sleep(0);
controller.enqueue("World");
controller.close();
},
});
await Bun.write(output_file, new Response(stream));
expect(await Bun.file(output_file).text()).toBe("Hello World");
});
it("should write Response(ReadableStream) from Uint8Array chunks", async () => {
const tempdir = tempDirWithFiles("binary chunks", {
"output.bin": "",
});
const output_file = join(tempdir, "output.bin");
const encoder = new TextEncoder();
const stream = new ReadableStream({
async pull(controller) {
await Bun.sleep(0);
controller.enqueue(encoder.encode("Binary"));
await Bun.sleep(0);
controller.enqueue(encoder.encode("Data"));
controller.close();
},
});
await Bun.write(output_file, new Response(stream));
expect(await Bun.file(output_file).text()).toBe("BinaryData");
});
it("should handle empty streams", async () => {
const tempdir = tempDirWithFiles("empty stream", {
"empty.txt": "",
});
const output_file = join(tempdir, "empty.txt");
const stream = new ReadableStream({
start(controller) {
controller.close();
},
});
await Bun.write(output_file, new Response(stream));
expect(await Bun.file(output_file).text()).toBe("");
});
});