Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
3773115d31 fix(HTMLRewriter): support transforming Responses from ReadableStreams
When a Response was created from a user-constructed ReadableStream
(e.g., `new Response(new ReadableStream({...}))`), HTMLRewriter.transform()
would fail with "ERR_STREAM_CANNOT_PIPE" because the BodyValueBufferer
returned UnsupportedStreamType for JavaScript-tagged streams.

Fix by consuming JavaScript/Direct ReadableStreams through the existing
readableStreamToArrayBuffer() JS API, which returns a Promise that
resolves with the complete stream data as an ArrayBuffer.

Closes #11758

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-20 02:37:10 +00:00
5 changed files with 204 additions and 4 deletions

View File

@@ -3588,6 +3588,10 @@ GlobalObject::PromiseFunctions GlobalObject::promiseHandlerID(Zig::FFIFunction h
return GlobalObject::PromiseFunctions::Bun__FileSink__onResolveStream;
} else if (handler == Bun__FileSink__onRejectStream) {
return GlobalObject::PromiseFunctions::Bun__FileSink__onRejectStream;
} else if (handler == Bun__BodyValueBufferer__onResolveStreamToArrayBuffer) {
return GlobalObject::PromiseFunctions::Bun__BodyValueBufferer__onResolveStreamToArrayBuffer;
} else if (handler == Bun__BodyValueBufferer__onRejectStreamToArrayBuffer) {
return GlobalObject::PromiseFunctions::Bun__BodyValueBufferer__onRejectStreamToArrayBuffer;
} else {
RELEASE_ASSERT_NOT_REACHED();
}

View File

@@ -389,8 +389,10 @@ public:
Bun__FileStreamWrapper__onResolveRequestStream,
Bun__FileSink__onResolveStream,
Bun__FileSink__onRejectStream,
Bun__BodyValueBufferer__onResolveStreamToArrayBuffer,
Bun__BodyValueBufferer__onRejectStreamToArrayBuffer,
};
static constexpr size_t promiseFunctionsSize = 36;
static constexpr size_t promiseFunctionsSize = 38;
static PromiseFunctions promiseHandlerID(SYSV_ABI EncodedJSValue (*handler)(JSC::JSGlobalObject* arg0, JSC::CallFrame* arg1));

View File

@@ -741,6 +741,8 @@ BUN_DECLARE_HOST_FUNCTION(Bun__HTTPRequestContextDebugTLS__onResolveStream);
BUN_DECLARE_HOST_FUNCTION(Bun__BodyValueBufferer__onRejectStream);
BUN_DECLARE_HOST_FUNCTION(Bun__BodyValueBufferer__onResolveStream);
BUN_DECLARE_HOST_FUNCTION(Bun__BodyValueBufferer__onResolveStreamToArrayBuffer);
BUN_DECLARE_HOST_FUNCTION(Bun__BodyValueBufferer__onRejectStreamToArrayBuffer);
#endif

View File

@@ -1664,6 +1664,65 @@ pub const ValueBufferer = struct {
return error.PipeFailed;
}
/// Use the JavaScript-level readableStreamToArrayBuffer() to consume a
/// JavaScript or Direct ReadableStream, then deliver the result via
/// onFinishedBuffering.
fn bufferJSReadableStream(sink: *@This(), stream: jsc.WebCore.ReadableStream) !void {
const promise_value = sink.global.readableStreamToArrayBuffer(stream.value);
if (promise_value == .zero) {
return error.PipeFailed;
}
if (promise_value.asAnyPromise()) |promise| {
switch (promise.status()) {
.pending => {
promise_value.then(
sink.global,
sink,
onResolveStreamToArrayBuffer,
onRejectStreamToArrayBuffer,
) catch return error.PipeFailed;
return;
},
.fulfilled => {
sink.handleResolveStreamArrayBuffer(promise.result(sink.global.vm()), false);
return;
},
.rejected => {
sink.handleRejectStream(promise.result(sink.global.vm()), false);
return;
},
}
}
return error.PipeFailed;
}
fn onResolveStreamToArrayBuffer(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments_old(2);
var sink: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
const resolved_value = args.ptr[0];
sink.handleResolveStreamArrayBuffer(resolved_value, true);
return .js_undefined;
}
fn onRejectStreamToArrayBuffer(_: *jsc.JSGlobalObject, callframe: *jsc.CallFrame) bun.JSError!jsc.JSValue {
const args = callframe.arguments_old(2);
var sink: *@This() = args.ptr[args.len - 1].asPromisePtr(@This());
const err = args.ptr[0];
sink.handleRejectStream(err, true);
return .js_undefined;
}
fn handleResolveStreamArrayBuffer(sink: *@This(), resolved_value: JSValue, is_async: bool) void {
if (resolved_value.asArrayBuffer(sink.global)) |array_buffer| {
const bytes = array_buffer.slice();
log("handleResolveStreamArrayBuffer {}", .{bytes.len});
sink.onFinishedBuffering(sink.ctx, bytes, null, is_async);
} else {
log("handleResolveStreamArrayBuffer no array buffer", .{});
sink.onFinishedBuffering(sink.ctx, "", null, is_async);
}
}
fn bufferLockedBodyValue(sink: *@This(), value: *jsc.WebCore.Body.Value, owned_readable_stream: ?jsc.WebCore.ReadableStream) !void {
assert(value.* == .Locked);
const locked = &value.Locked;
@@ -1694,9 +1753,7 @@ pub const ValueBufferer = struct {
// toBlobIfPossible should've caught this
.Blob, .File => unreachable,
.JavaScript, .Direct => {
// this is broken right now
// return sink.createJSSink(stream);
return error.UnsupportedStreamType;
return sink.bufferJSReadableStream(stream);
},
.Bytes => |byte_stream| {
assert(byte_stream.pipe.ctx == null);
@@ -1758,6 +1815,10 @@ pub const ValueBufferer = struct {
@export(&jsonResolveStream, .{ .name = "Bun__BodyValueBufferer__onResolveStream" });
const jsonRejectStream = jsc.toJSHostFn(onRejectStream);
@export(&jsonRejectStream, .{ .name = "Bun__BodyValueBufferer__onRejectStream" });
const jsonResolveStreamToArrayBuffer = jsc.toJSHostFn(onResolveStreamToArrayBuffer);
@export(&jsonResolveStreamToArrayBuffer, .{ .name = "Bun__BodyValueBufferer__onResolveStreamToArrayBuffer" });
const jsonRejectStreamToArrayBuffer = jsc.toJSHostFn(onRejectStreamToArrayBuffer);
@export(&jsonRejectStreamToArrayBuffer, .{ .name = "Bun__BodyValueBufferer__onRejectStreamToArrayBuffer" });
}
};

View File

@@ -0,0 +1,131 @@
import { expect, test } from "bun:test";
// https://github.com/oven-sh/bun/issues/11758
// HTMLRewriter should be able to transform Responses created from ReadableStreams
test("HTMLRewriter transforms Response from ReadableStream", async () => {
const rewriter = new HTMLRewriter();
rewriter.on("b", {
element(element) {
element.before("<h1>", { html: true });
element.after("</h1>", { html: true });
element.removeAndKeepContent();
},
});
const response = rewriter.transform(
new Response(
new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("<b>hello world</b>"));
controller.close();
},
}),
{ headers: { "content-type": "text/html" } },
),
);
const text = await response.text();
expect(text).toBe("<h1>hello world</h1>");
});
test("HTMLRewriter transforms Response from ReadableStream with multiple chunks", async () => {
const rewriter = new HTMLRewriter();
rewriter.on("p", {
element(element) {
element.setAttribute("class", "modified");
},
});
const response = rewriter.transform(
new Response(
new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode("<p>chunk one</p>"));
controller.enqueue(new TextEncoder().encode("<p>chunk two</p>"));
controller.close();
},
}),
{ headers: { "content-type": "text/html" } },
),
);
const text = await response.text();
expect(text).toBe('<p class="modified">chunk one</p><p class="modified">chunk two</p>');
});
test("HTMLRewriter transforms Response from async ReadableStream", async () => {
const rewriter = new HTMLRewriter();
rewriter.on("span", {
element(element) {
element.setInnerContent("replaced");
},
});
const response = rewriter.transform(
new Response(
new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode("<div><span>original</span></div>"));
controller.close();
},
}),
{ headers: { "content-type": "text/html" } },
),
);
const text = await response.text();
expect(text).toBe("<div><span>replaced</span></div>");
});
test("HTMLRewriter transforms Response from ReadableStream with pull", async () => {
const rewriter = new HTMLRewriter();
rewriter.on("em", {
element(element) {
element.tagName = "strong";
},
});
let pullCount = 0;
const response = rewriter.transform(
new Response(
new ReadableStream({
pull(controller) {
if (pullCount === 0) {
controller.enqueue(new TextEncoder().encode("<em>emphasis</em>"));
pullCount++;
} else {
controller.close();
}
},
}),
{ headers: { "content-type": "text/html" } },
),
);
const text = await response.text();
expect(text).toBe("<strong>emphasis</strong>");
});
test("HTMLRewriter handles empty ReadableStream", async () => {
const rewriter = new HTMLRewriter();
rewriter.on("b", {
element(element) {
element.remove();
},
});
const response = rewriter.transform(
new Response(
new ReadableStream({
start(controller) {
controller.close();
},
}),
{ headers: { "content-type": "text/html" } },
),
);
const text = await response.text();
expect(text).toBe("");
});