fix(streams): pipeTo now responds to AbortSignal

When the AbortSignal's abort event fires during a pipeTo operation,
the abort algorithm needs to resolve the pending read promise before
calling pipeToShutdownWithAction. Without this, the shutdown waits
indefinitely for a read operation that will never complete because
the ReadableStream is blocked.

The fix resolves the pendingReadPromiseCapability before invoking
pipeToShutdownWithAction, following the same pattern used by
pipeToErrorsMustBePropagatedForward and similar error handlers.

Fixes #26392

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude Bot
2026-01-23 13:10:55 +00:00
parent 827c7091d9
commit 42a97e89eb
2 changed files with 173 additions and 0 deletions

View File

@@ -258,6 +258,9 @@ export function readableStreamPipeToWritableStream(
if (signal !== undefined) {
const algorithm = reason => {
// Resolve the pending read promise to unblock any pending read operation.
// This allows the shutdown to proceed without waiting for the read to complete.
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
$pipeToShutdownWithAction(
pipeState,
() => {

View File

@@ -0,0 +1,170 @@
import assert from "node:assert";
import { test } from "node:test";
// https://github.com/oven-sh/bun/issues/26392
// ReadableStream.prototype.pipeTo does not respond to AbortSignal
test("pipeTo responds to AbortSignal", async () => {
const abortController = new AbortController();
let cancelCalled = false;
let abortCalled = false;
// Promise that resolves when the pipe has started (first write received)
const { promise: pipeStartedPromise, resolve: pipeStarted } = Promise.withResolvers<void>();
const pipePromise = new ReadableStream({
start(controller) {
// Keep the stream open - don't close it
controller.enqueue("data");
},
cancel(reason) {
cancelCalled = true;
assert(reason instanceof DOMException);
assert.strictEqual(reason.name, "AbortError");
},
}).pipeTo(
new WritableStream({
write() {
// Signal that the pipe has started processing data
pipeStarted();
},
abort(reason) {
abortCalled = true;
assert(reason instanceof DOMException);
assert.strictEqual(reason.name, "AbortError");
},
}),
{ signal: abortController.signal },
);
// Wait for the pipe to actually start processing
await pipeStartedPromise;
// Abort the signal
abortController.abort();
// The promise should reject with an AbortError
await assert.rejects(pipePromise, (err: Error) => {
assert(err instanceof DOMException);
assert.strictEqual(err.name, "AbortError");
return true;
});
// Both cancel and abort should have been called
assert.strictEqual(cancelCalled, true);
assert.strictEqual(abortCalled, true);
});
test("pipeTo with already aborted signal", async () => {
const abortController = new AbortController();
abortController.abort();
let cancelCalled = false;
let abortCalled = false;
const pipePromise = new ReadableStream({
start(controller) {
controller.enqueue("data");
},
cancel() {
cancelCalled = true;
},
}).pipeTo(
new WritableStream({
abort() {
abortCalled = true;
},
}),
{ signal: abortController.signal },
);
await assert.rejects(pipePromise, (err: Error) => {
assert(err instanceof DOMException);
assert.strictEqual(err.name, "AbortError");
return true;
});
assert.strictEqual(cancelCalled, true);
assert.strictEqual(abortCalled, true);
});
test("pipeTo with preventCancel respects AbortSignal", async () => {
const abortController = new AbortController();
let cancelCalled = false;
let abortCalled = false;
// Promise that resolves when the pipe has started (first write received)
const { promise: pipeStartedPromise, resolve: pipeStarted } = Promise.withResolvers<void>();
const pipePromise = new ReadableStream({
start(controller) {
controller.enqueue("data");
},
cancel() {
cancelCalled = true;
},
}).pipeTo(
new WritableStream({
write() {
pipeStarted();
},
abort() {
abortCalled = true;
},
}),
{ signal: abortController.signal, preventCancel: true },
);
await pipeStartedPromise;
abortController.abort();
await assert.rejects(pipePromise, (err: Error) => {
assert(err instanceof DOMException);
assert.strictEqual(err.name, "AbortError");
return true;
});
// cancel should NOT be called because preventCancel is true
assert.strictEqual(cancelCalled, false);
assert.strictEqual(abortCalled, true);
});
test("pipeTo with preventAbort respects AbortSignal", async () => {
const abortController = new AbortController();
let cancelCalled = false;
let abortCalled = false;
// Promise that resolves when the pipe has started (first write received)
const { promise: pipeStartedPromise, resolve: pipeStarted } = Promise.withResolvers<void>();
const pipePromise = new ReadableStream({
start(controller) {
controller.enqueue("data");
},
cancel() {
cancelCalled = true;
},
}).pipeTo(
new WritableStream({
write() {
pipeStarted();
},
abort() {
abortCalled = true;
},
}),
{ signal: abortController.signal, preventAbort: true },
);
await pipeStartedPromise;
abortController.abort();
await assert.rejects(pipePromise, (err: Error) => {
assert(err instanceof DOMException);
assert.strictEqual(err.name, "AbortError");
return true;
});
assert.strictEqual(cancelCalled, true);
// abort should NOT be called because preventAbort is true
assert.strictEqual(abortCalled, false);
});