Compare commits

..

2 Commits

Author SHA1 Message Date
Jarred Sumner
22f4d46b54 Merge branch 'main' into claude/fix-direct-stream-cancel-abort 2026-03-02 01:43:39 -08:00
Claude Bot
353d95935d fix: don't call direct ReadableStream cancel on normal completion, but do on abort
The previous attempt (#27214) fixed cancel being incorrectly called on
normal stream completion, but broke the abort path. The native abort
path (SinkSignal.close in Sink.zig) was discarding the error parameter
and always passing js_undefined, making abort indistinguishable from
normal completion at the JS layer. This meant cancel was never called
from native abort paths, causing async generators used as HTTP response
bodies to leak when clients disconnect.

This fix addresses both issues:
- SinkSignal.close now forwards error information as a truthy JSValue
  instead of always passing js_undefined
- The abort() functions and abort-related code paths in streams.zig now
  pass a non-null Syscall.Error (CONNRESET) to signal.close()
- The JS close() guard uses reason !== undefined (not truthiness) to
  correctly handle both cases

Closes #17175

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-01 10:03:33 +00:00
153 changed files with 1114 additions and 1562 deletions

View File

@@ -400,7 +400,6 @@
"/guides/http/file-uploads",
"/guides/http/fetch-unix",
"/guides/http/stream-iterator",
"/guides/http/sse",
"/guides/http/stream-node-streams-in-bun"
]
},

View File

@@ -1,91 +0,0 @@
---
title: Server-Sent Events (SSE) with Bun
sidebarTitle: Server-Sent Events
mode: center
---
[Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) let you push a stream of text events to the browser over a single HTTP response. The client consumes them via [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource).
In Bun, you can implement an SSE endpoint by returning a `Response` whose body is a streaming source and setting the `Content-Type` header to `text/event-stream`.
<Note>
`Bun.serve` closes idle connections after **10 seconds** by default. A quiet SSE stream counts as idle, so the
examples below call `server.timeout(req, 0)` to disable the timeout for the stream. See
[`idleTimeout`](/runtime/http/server#idletimeout) for details.
</Note>
## Using an async generator
In Bun, `new Response` accepts an async generator function directly. This is usually the simplest way to write an SSE endpoint — each `yield` flushes a chunk to the client, and if the client disconnects, the generator's `finally` block runs so you can clean up.
```ts server.ts icon="/icons/typescript.svg"
Bun.serve({
port: 3000,
routes: {
"/events": (req, server) => {
// SSE streams are often quiet between events. By default,
// Bun.serve closes connections after 10 seconds of inactivity.
// Disable the idle timeout for this request so the stream
// stays open indefinitely.
server.timeout(req, 0);
return new Response(
async function* () {
yield `data: connected at ${Date.now()}\n\n`;
// Emit a tick every 5 seconds until the client disconnects.
// When the client goes away, the generator is returned
// (cancelled) and this loop stops automatically.
while (true) {
await Bun.sleep(5000);
yield `data: tick ${Date.now()}\n\n`;
}
},
{
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
},
},
);
},
},
});
```
## Using a `ReadableStream`
If your events originate from callbacks — message brokers, timers, external pushes — rather than a linear `await` flow, a `ReadableStream` often fits better. When the client disconnects, Bun calls the stream's `cancel()` method automatically, so you can release any resources you set up in `start()`.
```ts server.ts icon="/icons/typescript.svg"
Bun.serve({
port: 3000,
routes: {
"/events": (req, server) => {
server.timeout(req, 0);
let timer: Timer;
const stream = new ReadableStream({
start(controller) {
controller.enqueue(`data: connected at ${Date.now()}\n\n`);
timer = setInterval(() => {
controller.enqueue(`data: tick ${Date.now()}\n\n`);
}, 5000);
},
cancel() {
// Called automatically when the client disconnects.
clearInterval(timer);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
},
});
},
},
});
```

View File

@@ -171,14 +171,12 @@ Unlike unix domain sockets, abstract namespace sockets are not bound to the file
## idleTimeout
By default, `Bun.serve` closes connections after **10 seconds** of inactivity. A connection is considered idle when there is no data being sent or received — this includes in-flight requests where your handler is still running but hasn't written any bytes to the response yet. Browsers and `fetch()` clients will see this as a connection reset.
To configure this, set the `idleTimeout` field (in seconds). The maximum value is `255`, and `0` disables the timeout entirely.
To configure the idle timeout, set the `idleTimeout` field in Bun.serve.
```ts
Bun.serve({
// 30 seconds (default is 10)
idleTimeout: 30,
// 10 seconds:
idleTimeout: 10,
fetch(req) {
return new Response("Bun!");
@@ -186,11 +184,7 @@ Bun.serve({
});
```
<Note>
**Streaming & Server-Sent Events** — The idle timer applies while a response is being streamed. If your stream goes
quiet for longer than `idleTimeout`, the connection will be closed mid-response. For long-lived streams, disable the
timeout for that request with [`server.timeout(req, 0)`](#server-timeout-request-seconds).
</Note>
This is the maximum amount of time a connection is allowed to be idle before the server closes it. A connection is idling if there is no data sent or received.
---
@@ -302,12 +296,12 @@ This is useful for development and hot reloading. Only `fetch`, `error`, and `ro
### `server.timeout(Request, seconds)`
Override the idle timeout for an individual request. Pass `0` to disable the timeout entirely for that request.
Set a custom idle timeout for individual requests:
```ts
const server = Bun.serve({
async fetch(req, server) {
// Give this request up to 60 seconds of inactivity instead of the default 10
// Set 60 second timeout for this request
server.timeout(req, 60);
// If they take longer than 60 seconds to send the body, the request will be aborted
@@ -318,28 +312,7 @@ const server = Bun.serve({
});
```
This is the recommended way to keep long-lived streaming responses (like Server-Sent Events) alive without raising the global `idleTimeout` for every request:
```ts
Bun.serve({
routes: {
"/events": (req, server) => {
// Disable the idle timeout for this streaming response.
// Otherwise the connection will be closed if no bytes
// are sent for 10 seconds (the default idleTimeout).
server.timeout(req, 0);
return new Response(
async function* () {
yield "data: hello\n\n";
// events can arrive sporadically without the connection being killed
},
{ headers: { "Content-Type": "text/event-stream" } },
);
},
},
});
```
Pass `0` to disable the timeout for a request.
### `server.requestIP(Request)`

View File

@@ -201,10 +201,6 @@ export const GuidesList = () => {
title: "Streaming HTTP Server with Async Iterators",
href: "/guides/http/stream-iterator",
},
{
title: "Server-Sent Events (SSE)",
href: "/guides/http/sse",
},
{
title: "Streaming HTTP Server with Node.js Streams",
href: "/guides/http/stream-node-streams-in-bun",

View File

@@ -32,102 +32,120 @@ namespace uWS {
constexpr uint64_t STATE_HAS_SIZE = 1ull << (sizeof(uint64_t) * 8 - 1);//0x8000000000000000;
constexpr uint64_t STATE_IS_CHUNKED = 1ull << (sizeof(uint64_t) * 8 - 2);//0x4000000000000000;
constexpr uint64_t STATE_IS_CHUNKED_EXTENSION = 1ull << (sizeof(uint64_t) * 8 - 3);//0x2000000000000000;
constexpr uint64_t STATE_WAITING_FOR_LF = 1ull << (sizeof(uint64_t) * 8 - 4);//0x1000000000000000;
constexpr uint64_t STATE_SIZE_MASK = ~(STATE_HAS_SIZE | STATE_IS_CHUNKED | STATE_IS_CHUNKED_EXTENSION | STATE_WAITING_FOR_LF);//0x0FFFFFFFFFFFFFFF;
constexpr uint64_t STATE_SIZE_MASK = ~(STATE_HAS_SIZE | STATE_IS_CHUNKED | STATE_IS_CHUNKED_EXTENSION);//0x1FFFFFFFFFFFFFFF;
constexpr uint64_t STATE_IS_ERROR = ~0ull;//0xFFFFFFFFFFFFFFFF;
/* Overflow guard: if any of bits 55-59 are set before the next *16, one more
* hex digit (plus the +2 for the trailing CRLF of chunk-data) would carry into
* STATE_WAITING_FOR_LF at bit 60. Limits chunk size to 14 hex digits (~72 PB). */
constexpr uint64_t STATE_SIZE_OVERFLOW = 0x1Full << (sizeof(uint64_t) * 8 - 9);//0x0F80000000000000;
constexpr uint64_t STATE_SIZE_OVERFLOW = 0x0Full << (sizeof(uint64_t) * 8 - 8);//0x0F00000000000000;
inline uint64_t chunkSize(uint64_t state) {
return state & STATE_SIZE_MASK;
}
/* Parses the chunk-size line: HEXDIG+ [;ext...] CRLF
*
* Returns the new state. On return, exactly one of:
* - state has STATE_HAS_SIZE set (success, data advanced past LF)
* - state == STATE_IS_ERROR (malformed input)
* - data is empty (short read; flags persist for resume)
*
* Resume flags:
* STATE_WAITING_FOR_LF -> saw '\r' on previous call, need '\n'
* STATE_IS_CHUNKED_EXTENSION -> mid-extension, skip hex parsing on resume
*
* Structure follows upstream uWS (scan-for-LF) with strict CRLF validation
* added. Every byte is consumed in a forward scan so TCP segment boundaries
* splitting the line at any point are handled by construction.
*
* RFC 7230 4.1.1:
* chunk = chunk-size [ chunk-ext ] CRLF chunk-data CRLF
* chunk-size = 1*HEXDIG
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
* chunk-ext-name = token
* chunk-ext-val = token / quoted-string (TODO: quoted-string unsupported)
*/
inline uint64_t consumeHexNumber(std::string_view &data, uint64_t state) {
/* Resume: '\r' was the last byte of the previous segment. Rare path,
* use data directly to avoid the p/len load on the hot path. */
if (state & STATE_WAITING_FOR_LF) [[unlikely]] {
if (!data.length()) return state;
if (data[0] != '\n') return STATE_IS_ERROR;
data.remove_prefix(1);
return ((state & ~(STATE_WAITING_FOR_LF | STATE_IS_CHUNKED_EXTENSION)) + 2)
| STATE_HAS_SIZE | STATE_IS_CHUNKED;
}
inline bool isParsingChunkedExtension(uint64_t state) {
return (state & STATE_IS_CHUNKED_EXTENSION) != 0;
}
/* Load pointer+length into locals so the loops operate in registers.
* Without this, Clang writes back to the string_view on every iteration.
* Error paths skip the writeback: HttpParser returns immediately on
* STATE_IS_ERROR and never reads data. */
const char *p = data.data();
size_t len = data.length();
/* Reads hex number until CR or out of data to consume. Updates state. Returns bytes consumed. */
inline void consumeHexNumber(std::string_view &data, uint64_t &state) {
/* Hex digits. Skipped when resuming mid-extension so that extension bytes
* like 'a' aren't misparsed as hex. */
if (!(state & STATE_IS_CHUNKED_EXTENSION)) {
while (len) {
unsigned char c = (unsigned char) *p;
if (c <= 32 || c == ';') break; /* fall through to drain loop */
unsigned int d = c | 0x20; /* fold A-F -> a-f; '0'..'9' unchanged */
unsigned int n;
if ((unsigned)(d - '0') < 10) [[likely]] n = d - '0';
else if ((unsigned)(d - 'a') < 6) n = d - 'a' + 10;
else return STATE_IS_ERROR;
if (chunkSize(state) & STATE_SIZE_OVERFLOW) [[unlikely]] return STATE_IS_ERROR;
state = ((state & STATE_SIZE_MASK) * 16ull + n) | STATE_IS_CHUNKED;
++p; --len;
}
}
/* RFC 9110: 5.5 Field Values (TLDR; anything above 31 is allowed \r, \n ; depending on context)*/
/* Drain [;ext...] \r \n. Upstream-style forward scan for LF, with strict
* validation: only >32 bytes (extension) and exactly one '\r' immediately
* before '\n' are allowed. */
while (len) {
unsigned char c = (unsigned char) *p;
if (c == '\n') return STATE_IS_ERROR; /* bare LF */
++p; --len;
if (c == '\r') {
if (!len) {
data = std::string_view(p, len);
return state | STATE_WAITING_FOR_LF;
if(!isParsingChunkedExtension(state)){
/* Consume everything higher than 32 and not ; (extension)*/
while (data.length() && data[0] > 32 && data[0] != ';') {
unsigned char digit = (unsigned char)data[0];
unsigned int number;
if (digit >= '0' && digit <= '9') {
number = digit - '0';
} else if (digit >= 'a' && digit <= 'f') {
number = digit - 'a' + 10;
} else if (digit >= 'A' && digit <= 'F') {
number = digit - 'A' + 10;
} else {
state = STATE_IS_ERROR;
return;
}
if (*p != '\n') return STATE_IS_ERROR;
++p; --len;
data = std::string_view(p, len);
return ((state & ~STATE_IS_CHUNKED_EXTENSION) + 2)
| STATE_HAS_SIZE | STATE_IS_CHUNKED;
if ((chunkSize(state) & STATE_SIZE_OVERFLOW)) {
state = STATE_IS_ERROR;
return;
}
// extract state bits
uint64_t bits = /*state &*/ STATE_IS_CHUNKED;
state = (state & STATE_SIZE_MASK) * 16ull + number;
state |= bits;
data.remove_prefix(1);
}
if (c <= 32) return STATE_IS_ERROR;
state |= STATE_IS_CHUNKED_EXTENSION;
}
data = std::string_view(p, len);
return state; /* short read */
auto len = data.length();
if(len) {
// consume extension
if(data[0] == ';' || isParsingChunkedExtension(state)) {
// mark that we are parsing chunked extension
state |= STATE_IS_CHUNKED_EXTENSION;
/* we got chunk extension lets remove it*/
while(data.length()) {
if(data[0] == '\r') {
// we are done parsing extension
state &= ~STATE_IS_CHUNKED_EXTENSION;
break;
}
/* RFC 9110: Token format (TLDR; anything bellow 32 is not allowed)
* TODO: add support for quoted-strings values (RFC 9110: 3.2.6. Quoted-String)
* Example of chunked encoding with extensions:
*
* 4;key=value\r\n
* Wiki\r\n
* 5;foo=bar;baz=quux\r\n
* pedia\r\n
* 0\r\n
* \r\n
*
* The chunk size is in hex (4, 5, 0), followed by optional
* semicolon-separated extensions. Extensions consist of a key
* (token) and optional value. The value may be a token or a
* quoted string. The chunk data follows the CRLF after the
* extensions and must be exactly the size specified.
*
* RFC 7230 Section 4.1.1 defines chunk extensions as:
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
* chunk-ext-name = token
* chunk-ext-val = token / quoted-string
*/
if(data[0] <= 32) {
state = STATE_IS_ERROR;
return;
}
data.remove_prefix(1);
}
}
if(data.length() >= 2) {
/* Consume \r\n */
if((data[0] != '\r' || data[1] != '\n')) {
state = STATE_IS_ERROR;
return;
}
state += 2; // include the two last /r/n
state |= STATE_HAS_SIZE | STATE_IS_CHUNKED;
data.remove_prefix(2);
}
}
// short read
}
inline void decChunkSize(uint64_t &state, uint64_t by) {
//unsigned int bits = state & STATE_IS_CHUNKED;
state = (state & ~STATE_SIZE_MASK) | (chunkSize(state) - by);
//state |= bits;
}
inline bool hasChunkSize(uint64_t state) {
@@ -169,8 +187,8 @@ namespace uWS {
}
if (!hasChunkSize(state)) {
state = consumeHexNumber(data, state);
if (isParsingInvalidChunkedEncoding(state)) [[unlikely]] {
consumeHexNumber(data, state);
if (isParsingInvalidChunkedEncoding(state)) {
return std::nullopt;
}
if (hasChunkSize(state) && chunkSize(state) == 2) {
@@ -186,10 +204,6 @@ namespace uWS {
return std::string_view(nullptr, 0);
}
if (!hasChunkSize(state)) [[unlikely]] {
/* Incomplete chunk-size line — need more data from the network. */
return std::nullopt;
}
continue;
}

View File

@@ -138,7 +138,7 @@ pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObje
promise_value.ensureStillAlive();
handler.promise.strong.set(global, promise_value);
read_file.ReadFileUV.start(handler.globalThis.bunVM().eventLoop(), this.store.?, this.offset, this.size, Handler, handler);
read_file.ReadFileUV.start(handler.globalThis.bunVM().uvLoop(), this.store.?, this.offset, this.size, Handler, handler);
return promise_value;
}
@@ -180,7 +180,7 @@ pub fn NewInternalReadFileHandler(comptime Context: type, comptime Function: any
pub fn doReadFileInternal(this: *Blob, comptime Handler: type, ctx: Handler, comptime Function: anytype, global: *JSGlobalObject) void {
if (Environment.isWindows) {
const ReadFileHandler = NewInternalReadFileHandler(Handler, Function);
return read_file.ReadFileUV.start(global.bunVM().eventLoop(), this.store.?, this.offset, this.size, ReadFileHandler, ctx);
return read_file.ReadFileUV.start(libuv.Loop.get(), this.store.?, this.offset, this.size, ReadFileHandler, ctx);
}
const file_read = read_file.ReadFile.createWithCtx(
bun.default_allocator,
@@ -3134,7 +3134,7 @@ pub fn getStat(this: *Blob, globalThis: *jsc.JSGlobalObject, callback: *jsc.Call
.encoded_slice = switch (path_like) {
// it's already converted to utf8
.encoded_slice => |slice| try slice.toOwned(bun.default_allocator),
else => try ZigString.fromUTF8(path_like.slice()).toSliceClone(bun.default_allocator),
else => try ZigString.init(path_like.slice()).toSliceClone(bun.default_allocator),
},
},
}, globalThis.bunVM());

View File

@@ -227,8 +227,11 @@ pub fn JSSink(comptime SinkType: type, comptime abi_name: []const u8) type {
return streams.Signal.initWithType(SinkSignal, @as(*SinkSignal, @ptrFromInt(@as(usize, @bitCast(@intFromEnum(cpp))))));
}
pub fn close(this: *@This(), _: ?Syscall.Error) void {
onClose(@as(SinkSignal, @bitCast(@intFromPtr(this))).cpp, .js_undefined);
pub fn close(this: *@This(), err: ?Syscall.Error) void {
// Pass a truthy value for abort/error (non-null err) and js_undefined for normal
// completion. This allows the JS close callback to distinguish between the two cases.
const reason: JSValue = if (err != null) .true else .js_undefined;
onClose(@as(SinkSignal, @bitCast(@intFromPtr(this))).cpp, reason);
}
pub fn ready(this: *@This(), _: ?Blob.SizeType, _: ?Blob.SizeType) void {

View File

@@ -263,7 +263,7 @@ pub const File = struct {
.path = .{
.encoded_slice = switch (path_like) {
.encoded_slice => |slice| try slice.toOwned(bun.default_allocator),
else => try jsc.ZigString.fromUTF8(path_like.slice()).toSliceClone(bun.default_allocator),
else => try jsc.ZigString.init(path_like.slice()).toSliceClone(bun.default_allocator),
},
},
}, globalThis.bunVM()),

View File

@@ -523,7 +523,6 @@ pub const ReadFileUV = struct {
pub const doClose = FileCloser(@This()).doClose;
loop: *libuv.Loop,
event_loop: *jsc.EventLoop,
file_store: FileStore,
byte_store: ByteStore = ByteStore{ .allocator = bun.default_allocator },
store: *Store,
@@ -544,11 +543,10 @@ pub const ReadFileUV = struct {
req: libuv.fs_t = std.mem.zeroes(libuv.fs_t),
pub fn start(event_loop: *jsc.EventLoop, store: *Store, off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
pub fn start(loop: *libuv.Loop, store: *Store, off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
log("ReadFileUV.start", .{});
var this = bun.new(ReadFileUV, .{
.loop = event_loop.virtual_machine.uvLoop(),
.event_loop = event_loop,
.loop = loop,
.file_store = store.data.file,
.store = store,
.offset = off,
@@ -557,20 +555,15 @@ pub const ReadFileUV = struct {
.on_complete_fn = @ptrCast(&Handler.run),
});
store.ref();
// Keep the event loop alive while the async operation is pending
event_loop.refConcurrently();
this.getFd(onFileOpen);
}
pub fn finalize(this: *ReadFileUV) void {
log("ReadFileUV.finalize", .{});
const event_loop = this.event_loop;
defer {
this.store.deref();
this.req.deinit();
bun.destroy(this);
// Release the event loop reference now that we're done
event_loop.unrefConcurrently();
log("ReadFileUV.finalize destroy", .{});
}

View File

@@ -824,7 +824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
// onWritable reset backpressure state to allow flushing
this.has_backpressure = false;
if (this.aborted) {
this.signal.close(null);
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
this.flushPromise() catch {}; // TODO: properly propagate exception upwards
this.finalize();
return false;
@@ -880,7 +880,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
pub fn start(this: *@This(), stream_start: Start) bun.sys.Maybe(void) {
if (this.aborted or this.res == null or this.res.?.hasResponded()) {
this.markDone();
this.signal.close(null);
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
return .success;
}
@@ -986,7 +986,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.res == null or this.res.?.hasResponded()) {
this.markDone();
this.signal.close(null);
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
}
return .success;
@@ -1040,7 +1040,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
if (this.res == null or this.res.?.hasResponded()) {
this.signal.close(null);
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
this.markDone();
return .{ .done = {} };
}
@@ -1098,7 +1098,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
if (this.res == null or this.res.?.hasResponded()) {
this.signal.close(null);
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
this.markDone();
return .{ .done = {} };
}
@@ -1168,7 +1168,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
if (this.done or this.res == null or this.res.?.hasResponded()) {
this.requested_end = true;
this.signal.close(null);
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
this.markDone();
this.finalize();
return .{ .result = jsc.JSValue.jsNumber(0) };
@@ -1212,7 +1212,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
this.aborted = true;
this.signal.close(null);
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
this.flushPromise() catch {}; // TODO: properly propagate exception upwards
this.finalize();
@@ -1433,7 +1433,7 @@ pub const NetworkSink = struct {
pub fn abort(this: *@This()) void {
this.ended = true;
this.done = true;
this.signal.close(null);
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
this.cancel = true;
this.finalize();
}

View File

@@ -270,14 +270,14 @@ pub const Mask = struct {
while (true) {
if (image == null) {
if (input.tryParse(Image.parse, .{}).asValue()) |value| {
if (@call(.auto, @field(Image, "parse"), .{input}).asValue()) |value| {
image = value;
continue;
}
}
if (position == null) {
if (input.tryParse(Position.parse, .{}).asValue()) |value| {
if (Position.parse(input).asValue()) |value| {
position = value;
size = input.tryParse(struct {
pub inline fn parseFn(i: *css.Parser) css.Result(BackgroundSize) {
@@ -290,35 +290,35 @@ pub const Mask = struct {
}
if (repeat == null) {
if (input.tryParse(BackgroundRepeat.parse, .{}).asValue()) |value| {
if (BackgroundRepeat.parse(input).asValue()) |value| {
repeat = value;
continue;
}
}
if (origin == null) {
if (input.tryParse(GeometryBox.parse, .{}).asValue()) |value| {
if (GeometryBox.parse(input).asValue()) |value| {
origin = value;
continue;
}
}
if (clip == null) {
if (input.tryParse(MaskClip.parse, .{}).asValue()) |value| {
if (MaskClip.parse(input).asValue()) |value| {
clip = value;
continue;
}
}
if (composite == null) {
if (input.tryParse(MaskComposite.parse, .{}).asValue()) |value| {
if (MaskComposite.parse(input).asValue()) |value| {
composite = value;
continue;
}
}
if (mode == null) {
if (input.tryParse(MaskMode.parse, .{}).asValue()) |value| {
if (MaskMode.parse(input).asValue()) |value| {
mode = value;
continue;
}

View File

@@ -683,22 +683,29 @@ export function readDirectStream(stream, sink, underlyingSource) {
$putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false
$putByIdDirectPrivate(stream, "start", undefined);
function close(stream, reason) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
var prom = cancelFn.$call(underlyingSource, reason);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
underlyingSource = undefined;
// reason !== undefined means an abort/error (native passes a truthy value for abort).
// reason === undefined means normal stream completion.
// Only call the user's cancel callback on abort/error, not on normal completion.
if (reason !== undefined) {
const cancelFn = underlyingSource?.cancel;
if (cancelFn) {
try {
// Pass undefined to the cancel callback to preserve existing abort semantics
// (readableStreamFromAsyncIterator relies on cancel(undefined) to call iter.return()
// instead of iter.throw() for abort).
var prom = cancelFn.$call(underlyingSource, undefined);
if ($isPromise(prom)) {
$markPromiseAsHandled(prom);
}
} catch {}
}
}
underlyingSource = undefined;
if (stream) {
$putByIdDirectPrivate(stream, "readableStreamController", undefined);
$putByIdDirectPrivate(stream, "reader", undefined);
if (reason) {
if (reason !== undefined) {
$putByIdDirectPrivate(stream, "state", $streamErrored);
$putByIdDirectPrivate(stream, "storedError", reason);
} else {

View File

@@ -26,7 +26,6 @@ pub const RedisError = error{
UnsupportedProtocol,
ConnectionTimeout,
IdleTimeout,
NestingDepthExceeded,
};
pub fn valkeyErrorToJS(globalObject: *jsc.JSGlobalObject, message: ?[]const u8, err: RedisError) jsc.JSValue {
@@ -56,7 +55,6 @@ pub fn valkeyErrorToJS(globalObject: *jsc.JSGlobalObject, message: ?[]const u8,
error.InvalidResponseType => .REDIS_INVALID_RESPONSE_TYPE,
error.ConnectionTimeout => .REDIS_CONNECTION_TIMEOUT,
error.IdleTimeout => .REDIS_IDLE_TIMEOUT,
error.NestingDepthExceeded => .REDIS_INVALID_RESPONSE,
error.JSError => return globalObject.takeException(error.JSError),
error.OutOfMemory => globalObject.throwOutOfMemory() catch return globalObject.takeException(error.JSError),
error.JSTerminated => return globalObject.takeException(error.JSTerminated),
@@ -422,16 +420,7 @@ pub const ValkeyReader = struct {
};
}
/// Maximum allowed nesting depth for RESP aggregate types.
/// This limits recursion to prevent excessive stack usage from
/// deeply nested responses.
const max_nesting_depth = 128;
pub fn readValue(self: *ValkeyReader, allocator: std.mem.Allocator) RedisError!RESPValue {
return self.readValueWithDepth(allocator, 0);
}
fn readValueWithDepth(self: *ValkeyReader, allocator: std.mem.Allocator, depth: usize) RedisError!RESPValue {
const type_byte = try self.readByte();
return switch (RESPType.fromByte(type_byte) orelse return error.InvalidResponseType) {
@@ -462,7 +451,6 @@ pub const ValkeyReader = struct {
return RESPValue{ .BulkString = owned };
},
.Array => {
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
const len = try self.readInteger();
if (len < 0) return RESPValue{ .Array = &[_]RESPValue{} };
const array = try allocator.alloc(RESPValue, @as(usize, @intCast(len)));
@@ -474,7 +462,7 @@ pub const ValkeyReader = struct {
}
}
while (i < len) : (i += 1) {
array[i] = try self.readValueWithDepth(allocator, depth + 1);
array[i] = try self.readValue(allocator);
}
return RESPValue{ .Array = array };
},
@@ -507,7 +495,6 @@ pub const ValkeyReader = struct {
return RESPValue{ .VerbatimString = try self.readVerbatimString(allocator) };
},
.Map => {
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
const len = try self.readInteger();
if (len < 0) return error.InvalidMap;
@@ -521,15 +508,11 @@ pub const ValkeyReader = struct {
}
while (i < len) : (i += 1) {
var key = try self.readValueWithDepth(allocator, depth + 1);
errdefer key.deinit(allocator);
const value = try self.readValueWithDepth(allocator, depth + 1);
entries[i] = .{ .key = key, .value = value };
entries[i] = .{ .key = try self.readValue(allocator), .value = try self.readValue(allocator) };
}
return RESPValue{ .Map = entries };
},
.Set => {
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
const len = try self.readInteger();
if (len < 0) return error.InvalidSet;
@@ -542,12 +525,11 @@ pub const ValkeyReader = struct {
}
}
while (i < len) : (i += 1) {
set[i] = try self.readValueWithDepth(allocator, depth + 1);
set[i] = try self.readValue(allocator);
}
return RESPValue{ .Set = set };
},
.Attribute => {
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
const len = try self.readInteger();
if (len < 0) return error.InvalidAttribute;
@@ -560,9 +542,9 @@ pub const ValkeyReader = struct {
}
}
while (i < len) : (i += 1) {
var key = try self.readValueWithDepth(allocator, depth + 1);
var key = try self.readValue(allocator);
errdefer key.deinit(allocator);
const value = try self.readValueWithDepth(allocator, depth + 1);
const value = try self.readValue(allocator);
attrs[i] = .{ .key = key, .value = value };
}
@@ -571,7 +553,7 @@ pub const ValkeyReader = struct {
errdefer {
allocator.destroy(value_ptr);
}
value_ptr.* = try self.readValueWithDepth(allocator, depth + 1);
value_ptr.* = try self.readValue(allocator);
return RESPValue{ .Attribute = .{
.attributes = attrs,
@@ -579,13 +561,11 @@ pub const ValkeyReader = struct {
} };
},
.Push => {
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
const len = try self.readInteger();
if (len < 0 or len == 0) return error.InvalidPush;
// First element is the push type
var push_type = try self.readValueWithDepth(allocator, depth + 1);
defer push_type.deinit(allocator);
const push_type = try self.readValue(allocator);
var push_type_str: []const u8 = "";
switch (push_type) {
@@ -614,7 +594,7 @@ pub const ValkeyReader = struct {
}
}
while (i < len - 1) : (i += 1) {
data[i] = try self.readValueWithDepth(allocator, depth + 1);
data[i] = try self.readValue(allocator);
}
return RESPValue{ .Push = .{

View File

@@ -1,44 +0,0 @@
import { describe, expect } from "bun:test";
import { itBundled } from "../expectBundled";
describe("css", () => {
itBundled("css/mask-geometry-box-preserved", {
files: {
"index.css": /* css */ `
.test-a::after {
mask: linear-gradient(#fff 0 0) padding-box, linear-gradient(#fff 0 0);
}
.test-b::after {
mask: linear-gradient(#fff 0 0) content-box, linear-gradient(#fff 0 0);
}
`,
},
outdir: "/out",
entryPoints: ["/index.css"],
onAfterBundle(api) {
const output = api.readFile("/out/index.css");
expect(output).toContain("padding-box");
expect(output).toContain("content-box");
expect(output).toContain(".test-a");
expect(output).toContain(".test-b");
expect(output).not.toContain(".test-a:after, .test-b:after");
},
});
itBundled("css/webkit-mask-geometry-box-preserved", {
files: {
"index.css": /* css */ `
.test-c::after {
-webkit-mask: linear-gradient(#fff 0 0) padding-box, linear-gradient(#fff 0 0);
-webkit-mask-composite: xor;
}
`,
},
outdir: "/out",
entryPoints: ["/index.css"],
onAfterBundle(api) {
const output = api.readFile("/out/index.css");
expect(output).toContain("padding-box");
},
});
});

Some files were not shown because too many files have changed in this diff Show More