Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
482ac9bca4 fix: support JS ReadableStream in HTMLRewriter.transform()
Previously, calling HTMLRewriter.transform() on a Response backed by a
JavaScript-created ReadableStream would throw ERR_STREAM_CANNOT_PIPE.
This was because the ValueBufferer explicitly rejected .JavaScript and
.Direct stream types with an UnsupportedStreamType error.

Fix this by using readableStreamToArrayBuffer to buffer the stream
contents through the JS runtime, which properly handles all
ReadableStream types. The resolved ArrayBuffer bytes are then passed
to onFinishedBuffering for HTMLRewriter processing.

Closes #14216

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-20 02:53:43 +00:00
2 changed files with 144 additions and 64 deletions

View File

@@ -1561,7 +1561,7 @@ pub const ValueBufferer = struct {
pub fn onResolveStream(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
var args = callframe.arguments_old(2);
var sink: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
sink.handleResolveStream(true);
sink.handleResolveStream(args.ptr[0], true);
return .js_undefined;
}
@@ -1584,81 +1584,52 @@ pub const ValueBufferer = struct {
sink.onFinishedBuffering(sink.ctx, "", .{ .JSValue = ref }, is_async);
}
fn handleResolveStream(sink: *@This(), is_async: bool) void {
fn handleResolveStream(sink: *@This(), resolved_value: JSValue, is_async: bool) void {
if (sink.js_sink) |wrapper| {
const bytes = wrapper.sink.bytes.slice();
log("handleResolveStream {}", .{bytes.len});
log("handleResolveStream js_sink {}", .{bytes.len});
sink.onFinishedBuffering(sink.ctx, bytes, null, is_async);
} else if (resolved_value.asArrayBuffer(sink.global)) |array_buffer| {
const bytes = array_buffer.slice();
log("handleResolveStream arraybuffer {}", .{bytes.len});
sink.onFinishedBuffering(sink.ctx, bytes, null, is_async);
} else {
log("handleResolveStream no sink", .{});
log("handleResolveStream empty", .{});
sink.onFinishedBuffering(sink.ctx, "", null, is_async);
}
}
fn createJSSink(sink: *@This(), stream: jsc.WebCore.ReadableStream) !void {
/// Buffer a JavaScript-created ReadableStream by using the JS runtime's
/// readableStreamToArrayBuffer, which returns a Promise<ArrayBuffer>.
fn bufferJSReadableStream(sink: *@This(), stream: jsc.WebCore.ReadableStream) !void {
stream.value.ensureStillAlive();
var allocator = sink.allocator;
var buffer_stream = try allocator.create(ArrayBufferSink.JSSink);
var globalThis = sink.global;
buffer_stream.* = ArrayBufferSink.JSSink{
.sink = ArrayBufferSink{
.bytes = bun.ByteList.empty,
.allocator = allocator,
.next = null,
},
};
var signal = &buffer_stream.sink.signal;
sink.js_sink = buffer_stream;
const globalThis = sink.global;
signal.* = ArrayBufferSink.JSSink.SinkSignal.init(JSValue.zero);
const promise_value = globalThis.readableStreamToArrayBuffer(stream.value);
promise_value.ensureStillAlive();
// explicitly set it to a dead pointer
// we use this memory address to disable signals being sent
signal.clear();
assert(signal.isDead());
const assignment_result: JSValue = ArrayBufferSink.JSSink.assignToStream(
globalThis,
stream.value,
buffer_stream,
@as(**anyopaque, @ptrCast(&signal.ptr)),
);
assignment_result.ensureStillAlive();
// assert that it was updated
assert(!signal.isDead());
if (assignment_result.isError()) {
if (promise_value.toError()) |_| {
return error.PipeFailed;
}
if (!assignment_result.isEmptyOrUndefinedOrNull()) {
assignment_result.ensureStillAlive();
// it returns a Promise when it goes through ReadableStreamDefaultReader
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status()) {
.Pending => {
assignment_result.then(
globalThis,
sink,
onResolveStream,
onRejectStream,
);
},
.Fulfilled => {
defer stream.value.unprotect();
sink.handleResolveStream(false);
},
.Rejected => {
defer stream.value.unprotect();
sink.handleRejectStream(promise.result(globalThis.vm()), false);
},
}
return;
if (promise_value.asAnyPromise()) |promise| {
switch (promise.status()) {
.pending => {
promise_value.then(
globalThis,
sink,
onResolveStream,
onRejectStream,
) catch {};
},
.fulfilled => {
sink.handleResolveStream(promise.result(globalThis.vm()), false);
},
.rejected => {
sink.handleRejectStream(promise.result(globalThis.vm()), false);
},
}
return;
}
return error.PipeFailed;
@@ -1694,9 +1665,7 @@ pub const ValueBufferer = struct {
// toBlobIfPossible should've caught this
.Blob, .File => unreachable,
.JavaScript, .Direct => {
// this is broken right now
// return sink.createJSSink(stream);
return error.UnsupportedStreamType;
return sink.bufferJSReadableStream(stream);
},
.Bytes => |byte_stream| {
assert(byte_stream.pipe.ctx == null);

View File

@@ -0,0 +1,111 @@
import { expect, test } from "bun:test";
// https://github.com/oven-sh/bun/issues/14216
// HTMLRewriter should work with JavaScript-created ReadableStreams
test("HTMLRewriter.transform() works with a JS ReadableStream", async () => {
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue("<html><body>Hello world</body></html>");
controller.close();
},
});
const rw = new HTMLRewriter();
rw.on("body", {
element(element) {
element.setAttribute("class", "modified");
},
});
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe('<html><body class="modified">Hello world</body></html>');
});
test("HTMLRewriter.transform() works with a JS ReadableStream and onEndTag", async () => {
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue("<html><body>Hello world</body></html>");
controller.close();
},
});
const rw = new HTMLRewriter();
rw.on("body", {
element(element) {
element.onEndTag(end => {
end.before("<span>injected</span>", { html: true });
});
},
});
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe("<html><body>Hello world<span>injected</span></body></html>");
});
test("HTMLRewriter.transform() works with a multi-chunk JS ReadableStream", async () => {
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue("<html><body>");
controller.enqueue("Hello ");
controller.enqueue("world");
controller.enqueue("</body></html>");
controller.close();
},
});
const rw = new HTMLRewriter();
rw.on("body", {
element(element) {
element.setAttribute("class", "modified");
},
});
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe('<html><body class="modified">Hello world</body></html>');
});
test("HTMLRewriter.transform() works with a binary-chunk JS ReadableStream", async () => {
const encoder = new TextEncoder();
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode("<html><body>Binary</body></html>"));
controller.close();
},
});
const rw = new HTMLRewriter();
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe("<html><body>Binary</body></html>");
});
test("HTMLRewriter.transform() works with an async pull-based JS ReadableStream", async () => {
const inputStream = new ReadableStream({
async pull(controller) {
controller.enqueue("<html><body>Async</body></html>");
controller.close();
},
});
const rw = new HTMLRewriter();
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe("<html><body>Async</body></html>");
});
test("HTMLRewriter.transform() works with an empty JS ReadableStream", async () => {
const inputStream = new ReadableStream({
start(controller) {
controller.close();
},
});
const rw = new HTMLRewriter();
const response = rw.transform(new Response(inputStream));
const text = await response.text();
expect(text).toBe("");
});