Compare commits

...

2 Commits

Author SHA1 Message Date
autofix-ci[bot]
152e975eb4 [autofix.ci] apply automated fixes 2025-11-13 02:10:46 +00:00
Claude Bot
d01fa4f193 Fix hang when writing custom ReadableStreams to files
When writing a Response or Request with a locked body (streaming)
to a file using Bun.write(), the operation would hang indefinitely.
This occurred because the locked body was never converted to a
ReadableStream and piped to the file destination.

The fix ensures that:
1. Locked body values are converted to ReadableStreams via toReadableStream()
2. The resulting ReadableStream is piped directly to the file using pipeReadableStreamToBlob()
3. This approach works for both regular files and S3 destinations
4. The correct number of bytes written is now returned

This resolves hangs in three scenarios:
- Writing custom JS ReadableStreams wrapped in Response
- Writing Response bodies from fetch() wrapped in new Response
- Writing teed ReadableStreams to files
2025-11-13 01:57:22 +00:00
2 changed files with 146 additions and 47 deletions

View File

@@ -1359,17 +1359,19 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try bodyValue.toReadableStream(globalThis);
// Convert the locked body to a ReadableStream
_ = try bodyValue.toReadableStream(globalThis);
if (response.getBodyReadableStream(globalThis) orelse bodyValue.Locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
if (response.getBodyReadableStream(globalThis) orelse bodyValue.Locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
const proxy = globalThis.bunVM().transpiler.env.getHttpProxy(true, null);
const proxy_url = if (proxy) |p| p.href else null;
@@ -1387,18 +1389,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
undefined,
);
}
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
// For regular file destinations, pipe the ReadableStream to the file
return destination_blob.pipeReadableStreamToBlob(globalThis, readable, options.extra_options);
}
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
.promise = jsc.JSPromise.Strong.init(globalThis),
.mkdirp_if_not_exists = options.mkdirp_if_not_exists orelse true,
});
bodyValue.Locked.task = task;
bodyValue.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
return task.promise.value();
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
},
}
}
@@ -1421,16 +1418,19 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, err_ref.toJS(globalThis));
},
.Locked => |locked| {
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
_ = try bodyValue.toReadableStream(globalThis);
if (request.getBodyReadableStream(globalThis) orelse locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
// Convert the locked body to a ReadableStream
_ = try bodyValue.toReadableStream(globalThis);
if (request.getBodyReadableStream(globalThis) orelse locked.readable.get(globalThis)) |readable| {
if (readable.isDisturbed(globalThis)) {
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
}
if (destination_blob.isS3()) {
const s3 = &destination_blob.store.?.data.s3;
var aws_options = try s3.getCredentialsWithOptions(options.extra_options, globalThis);
defer aws_options.deinit();
const proxy = globalThis.bunVM().transpiler.env.getHttpProxy(true, null);
const proxy_url = if (proxy) |p| p.href else null;
return S3.uploadStream(
@@ -1447,20 +1447,13 @@ pub fn writeFileInternal(globalThis: *jsc.JSGlobalObject, path_or_blob_: *PathOr
undefined,
);
}
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
// For regular file destinations, pipe the ReadableStream to the file
return destination_blob.pipeReadableStreamToBlob(globalThis, readable, options.extra_options);
}
var task = bun.new(WriteFileWaitFromLockedValueTask, .{
.globalThis = globalThis,
.file_blob = destination_blob,
.promise = jsc.JSPromise.Strong.init(globalThis),
.mkdirp_if_not_exists = options.mkdirp_if_not_exists orelse true,
});
bodyValue.Locked.task = task;
bodyValue.Locked.onReceiveValue = WriteFileWaitFromLockedValueTask.thenWrap;
return task.promise.value();
destination_blob.detach();
return globalThis.throwInvalidArguments("ReadableStream has already been used", .{});
},
}
}
@@ -2338,6 +2331,7 @@ pub const FileStreamWrapper = struct {
pub fn onFileStreamResolveRequestStream(globalThis: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
var args = callframe.arguments_old(2);
var this = args.ptr[args.len - 1].asPromisePtr(FileStreamWrapper);
const written = this.sink.written;
defer this.deinit();
var strong = this.readable_stream_ref;
defer strong.deinit();
@@ -2345,7 +2339,7 @@ pub fn onFileStreamResolveRequestStream(globalThis: *jsc.JSGlobalObject, callfra
if (strong.get(globalThis)) |stream| {
stream.done(globalThis);
}
try this.promise.resolve(globalThis, jsc.JSValue.jsNumber(0));
try this.promise.resolve(globalThis, jsc.JSValue.jsNumberFromUint64(written));
return .js_undefined;
}
@@ -2554,9 +2548,10 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re
return promise_value;
},
.fulfilled => {
const written = file_sink.written;
file_sink.deref();
readable_stream.done(globalThis);
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(0));
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumberFromUint64(written));
},
.rejected => {
file_sink.deref();
@@ -2574,9 +2569,10 @@ pub fn pipeReadableStreamToBlob(this: *Blob, globalThis: *jsc.JSGlobalObject, re
return jsc.JSPromise.dangerouslyCreateRejectedPromiseValueWithoutNotifyingVM(globalThis, assignment_result);
}
}
const written = file_sink.written;
file_sink.deref();
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumber(0));
return jsc.JSPromise.resolvedPromiseValue(globalThis, jsc.JSValue.jsNumberFromUint64(written));
}
pub fn getWriter(
@@ -4772,7 +4768,6 @@ export fn Blob__deref(self: *Blob) void {
}
const WriteFilePromise = write_file.WriteFilePromise;
const WriteFileWaitFromLockedValueTask = write_file.WriteFileWaitFromLockedValueTask;
const NewReadFileHandler = read_file.NewReadFileHandler;
const string = []const u8;

View File

@@ -0,0 +1,104 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";
test("custom ReadableStream can be written to file", async () => {
using dir = tempDir("issue-24659", {
"custom-stream.ts": `
const stream = new ReadableStream({
start(controller) {
const chunks = ['A', 'B', 'C', 'D', 'E'];
for (const chunk of chunks) controller.enqueue(chunk);
controller.close();
},
});
console.log('Writing to file');
const size = await Bun.write('text.txt', new Response(stream));
console.log(\`Wrote \${size} bytes\`);
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "custom-stream.ts"],
env: bunEnv,
cwd: String(dir),
stderr: "pipe",
stdout: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
expect(stdout).toContain("Writing to file");
expect(stdout).toContain("Wrote 5 bytes");
expect(exitCode).toBe(0);
});
test("wrapped Response body can be written to file", async () => {
using dir = tempDir("issue-24659-wrapped", {
"wrapped-stream.ts": `
const server = Bun.serve({
port: 0,
fetch() {
return new Response("Hello World");
},
});
const res = await fetch(\`http://localhost:\${server.port}\`);
const stream = res.body!;
console.log('Writing to file');
const size = await Bun.write('example.txt', new Response(stream));
console.log(\`Wrote \${size} bytes\`);
server.stop();
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "wrapped-stream.ts"],
env: bunEnv,
cwd: String(dir),
stderr: "pipe",
stdout: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
expect(stdout).toContain("Writing to file");
expect(stdout).toContain("Wrote 11 bytes");
expect(exitCode).toBe(0);
});
test("teed ReadableStream can be written to file", async () => {
using dir = tempDir("issue-24659-tee", {
"tee-stream.ts": `
const stream = new ReadableStream({
start(controller) {
const chunks = ['A', 'B', 'C', 'D', 'E'];
for (const chunk of chunks) controller.enqueue(chunk);
controller.close();
},
});
const [newStream, _] = stream.tee();
console.log('Writing to file');
const size = await Bun.write('tee.txt', new Response(newStream));
console.log(\`Wrote \${size} bytes\`);
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "tee-stream.ts"],
env: bunEnv,
cwd: String(dir),
stderr: "pipe",
stdout: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
expect(stdout).toContain("Writing to file");
expect(stdout).toContain("Wrote 5 bytes");
expect(exitCode).toBe(0);
});