Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
820e5bad91 fix: call cancel callback on "direct" ReadableStream cancel
When a "direct" type ReadableStream's `.cancel()` method was called,
the user-provided cancel callback was never invoked. This happened in
two scenarios:

1. Lazy streams (never read from): The controller was null so
   readableStreamCancel returned immediately without checking for the
   underlyingSource's cancel callback.

2. Initialized streams (read from): The direct stream controller
   objects didn't have a $cancel property, so the cancel path in
   readableStreamCancel was skipped and onCloseDirectStream was called
   instead, which returned early because the stream state was already
   set to closed.

Fix by:
- Adding onCancelDirectStream handler and $cancel property to both
  direct stream controller objects (initializeArrayStream and
  initializeArrayBufferStream)
- Handling the lazy case in readableStreamCancel by looking up the
  underlyingSource directly from the stream when controller is null

Closes #18315

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-20 02:26:22 +00:00
2 changed files with 111 additions and 1 deletions

View File

@@ -1138,6 +1138,22 @@ export function tryUseReadableStreamBufferedFastPath(stream, method) {
}
}
export function onCancelDirectStream(controller, reason) {
var underlyingSource = controller.$underlyingSource;
if (underlyingSource) {
var cancelFn = underlyingSource.cancel;
if (typeof cancelFn === "function") {
try {
var result = cancelFn.$call(underlyingSource, reason);
if ($isPromise(result)) return result;
} catch (e) {
return Promise.$reject(e);
}
}
}
return Promise.$resolve();
}
export function onCloseDirectStream(reason) {
var stream = this.$controlledReadableStream;
if (!stream || $getByIdDirectPrivate(stream, "state") !== $streamReadable) return;
@@ -1428,6 +1444,7 @@ export function initializeArrayStream(underlyingSource, _highWaterMark: number)
var controller = {
$underlyingSource: underlyingSource,
$pull: $onPullDirectStream,
$cancel: $onCancelDirectStream,
$controlledReadableStream: this,
$sink: sink,
close: $onCloseDirectStream,
@@ -1464,6 +1481,7 @@ export function initializeArrayBufferStream(underlyingSource, highWaterMark: num
var controller = {
$underlyingSource: underlyingSource,
$pull: $onPullDirectStream,
$cancel: $onCancelDirectStream,
$controlledReadableStream: this,
$sink: sink,
close: $onCloseDirectStream,
@@ -1598,7 +1616,24 @@ export function readableStreamCancel(stream: ReadableStream, reason: any) {
$readableStreamClose(stream);
const controller = $getByIdDirectPrivate(stream, "readableStreamController");
if (controller === null) return Promise.$resolve();
if (controller === null) {
// For lazy direct streams that were never read from, the controller hasn't been
// created yet but the underlyingSource is stored on the stream itself.
const underlyingSource = $getByIdDirectPrivate(stream, "underlyingSource");
if (underlyingSource) {
const cancelFn = underlyingSource.cancel;
if (typeof cancelFn === "function") {
try {
const result = cancelFn.$call(underlyingSource, reason);
if ($isPromise(result)) return result.$then(function () {});
return Promise.$resolve();
} catch (e) {
return Promise.$reject(e);
}
}
}
return Promise.$resolve();
}
const cancel = controller.$cancel;
if (cancel) return cancel(controller, reason).$then(function () {});

View File

@@ -0,0 +1,75 @@
import { expect, test } from "bun:test";
test("cancel callback of 'direct' readable stream is called (lazy, never read)", async () => {
let cancelled = false;
let cancelReason: unknown;
const sourceStream = new ReadableStream({
type: "direct",
pull() {},
cancel(reason) {
cancelled = true;
cancelReason = reason;
},
});
const reason = new Error("test cancel");
await sourceStream.cancel(reason);
expect(cancelled).toBe(true);
expect(cancelReason).toBe(reason);
});
test("cancel callback of 'direct' readable stream is called (after reading)", async () => {
let cancelled = false;
let cancelReason: unknown;
const sourceStream = new ReadableStream({
type: "direct",
async pull(controller) {
controller.write("hello");
controller.flush();
// Keep the stream open so it can be cancelled
await new Promise(() => {});
},
cancel(reason) {
cancelled = true;
cancelReason = reason;
},
});
const reader = sourceStream.getReader();
// Read one chunk to trigger controller initialization
const chunk = await reader.read();
expect(chunk.done).toBe(false);
const reason = new Error("test cancel");
await reader.cancel(reason);
expect(cancelled).toBe(true);
expect(cancelReason).toBe(reason);
});
test("cancel callback of 'direct' readable stream works with async cancel", async () => {
let cancelled = false;
const sourceStream = new ReadableStream({
type: "direct",
pull() {},
async cancel() {
await Bun.sleep(1);
cancelled = true;
},
});
await sourceStream.cancel();
expect(cancelled).toBe(true);
});
test("cancel callback of 'direct' readable stream without cancel callback doesn't throw", async () => {
const sourceStream = new ReadableStream({
type: "direct",
pull() {},
});
// Should not throw
await sourceStream.cancel();
});