mirror of
https://github.com/oven-sh/bun
synced 2026-03-02 13:31:01 +01:00
Compare commits
2 Commits
main
...
claude/fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22f4d46b54 | ||
|
|
353d95935d |
@@ -400,7 +400,6 @@
|
||||
"/guides/http/file-uploads",
|
||||
"/guides/http/fetch-unix",
|
||||
"/guides/http/stream-iterator",
|
||||
"/guides/http/sse",
|
||||
"/guides/http/stream-node-streams-in-bun"
|
||||
]
|
||||
},
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
---
|
||||
title: Server-Sent Events (SSE) with Bun
|
||||
sidebarTitle: Server-Sent Events
|
||||
mode: center
|
||||
---
|
||||
|
||||
[Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) let you push a stream of text events to the browser over a single HTTP response. The client consumes them via [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource).
|
||||
|
||||
In Bun, you can implement an SSE endpoint by returning a `Response` whose body is a streaming source and setting the `Content-Type` header to `text/event-stream`.
|
||||
|
||||
<Note>
|
||||
`Bun.serve` closes idle connections after **10 seconds** by default. A quiet SSE stream counts as idle, so the
|
||||
examples below call `server.timeout(req, 0)` to disable the timeout for the stream. See
|
||||
[`idleTimeout`](/runtime/http/server#idletimeout) for details.
|
||||
</Note>
|
||||
|
||||
## Using an async generator
|
||||
|
||||
In Bun, `new Response` accepts an async generator function directly. This is usually the simplest way to write an SSE endpoint — each `yield` flushes a chunk to the client, and if the client disconnects, the generator's `finally` block runs so you can clean up.
|
||||
|
||||
```ts server.ts icon="/icons/typescript.svg"
|
||||
Bun.serve({
|
||||
port: 3000,
|
||||
routes: {
|
||||
"/events": (req, server) => {
|
||||
// SSE streams are often quiet between events. By default,
|
||||
// Bun.serve closes connections after 10 seconds of inactivity.
|
||||
// Disable the idle timeout for this request so the stream
|
||||
// stays open indefinitely.
|
||||
server.timeout(req, 0);
|
||||
|
||||
return new Response(
|
||||
async function* () {
|
||||
yield `data: connected at ${Date.now()}\n\n`;
|
||||
|
||||
// Emit a tick every 5 seconds until the client disconnects.
|
||||
// When the client goes away, the generator is returned
|
||||
// (cancelled) and this loop stops automatically.
|
||||
while (true) {
|
||||
await Bun.sleep(5000);
|
||||
yield `data: tick ${Date.now()}\n\n`;
|
||||
}
|
||||
},
|
||||
{
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
## Using a `ReadableStream`
|
||||
|
||||
If your events originate from callbacks — message brokers, timers, external pushes — rather than a linear `await` flow, a `ReadableStream` often fits better. When the client disconnects, Bun calls the stream's `cancel()` method automatically, so you can release any resources you set up in `start()`.
|
||||
|
||||
```ts server.ts icon="/icons/typescript.svg"
|
||||
Bun.serve({
|
||||
port: 3000,
|
||||
routes: {
|
||||
"/events": (req, server) => {
|
||||
server.timeout(req, 0);
|
||||
|
||||
let timer: Timer;
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(`data: connected at ${Date.now()}\n\n`);
|
||||
|
||||
timer = setInterval(() => {
|
||||
controller.enqueue(`data: tick ${Date.now()}\n\n`);
|
||||
}, 5000);
|
||||
},
|
||||
cancel() {
|
||||
// Called automatically when the client disconnects.
|
||||
clearInterval(timer);
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
},
|
||||
});
|
||||
},
|
||||
},
|
||||
});
|
||||
```
|
||||
@@ -171,14 +171,12 @@ Unlike unix domain sockets, abstract namespace sockets are not bound to the file
|
||||
|
||||
## idleTimeout
|
||||
|
||||
By default, `Bun.serve` closes connections after **10 seconds** of inactivity. A connection is considered idle when there is no data being sent or received — this includes in-flight requests where your handler is still running but hasn't written any bytes to the response yet. Browsers and `fetch()` clients will see this as a connection reset.
|
||||
|
||||
To configure this, set the `idleTimeout` field (in seconds). The maximum value is `255`, and `0` disables the timeout entirely.
|
||||
To configure the idle timeout, set the `idleTimeout` field in Bun.serve.
|
||||
|
||||
```ts
|
||||
Bun.serve({
|
||||
// 30 seconds (default is 10)
|
||||
idleTimeout: 30,
|
||||
// 10 seconds:
|
||||
idleTimeout: 10,
|
||||
|
||||
fetch(req) {
|
||||
return new Response("Bun!");
|
||||
@@ -186,11 +184,7 @@ Bun.serve({
|
||||
});
|
||||
```
|
||||
|
||||
<Note>
|
||||
**Streaming & Server-Sent Events** — The idle timer applies while a response is being streamed. If your stream goes
|
||||
quiet for longer than `idleTimeout`, the connection will be closed mid-response. For long-lived streams, disable the
|
||||
timeout for that request with [`server.timeout(req, 0)`](#server-timeout-request-seconds).
|
||||
</Note>
|
||||
This is the maximum amount of time a connection is allowed to be idle before the server closes it. A connection is idling if there is no data sent or received.
|
||||
|
||||
---
|
||||
|
||||
@@ -302,12 +296,12 @@ This is useful for development and hot reloading. Only `fetch`, `error`, and `ro
|
||||
|
||||
### `server.timeout(Request, seconds)`
|
||||
|
||||
Override the idle timeout for an individual request. Pass `0` to disable the timeout entirely for that request.
|
||||
Set a custom idle timeout for individual requests:
|
||||
|
||||
```ts
|
||||
const server = Bun.serve({
|
||||
async fetch(req, server) {
|
||||
// Give this request up to 60 seconds of inactivity instead of the default 10
|
||||
// Set 60 second timeout for this request
|
||||
server.timeout(req, 60);
|
||||
|
||||
// If they take longer than 60 seconds to send the body, the request will be aborted
|
||||
@@ -318,28 +312,7 @@ const server = Bun.serve({
|
||||
});
|
||||
```
|
||||
|
||||
This is the recommended way to keep long-lived streaming responses (like Server-Sent Events) alive without raising the global `idleTimeout` for every request:
|
||||
|
||||
```ts
|
||||
Bun.serve({
|
||||
routes: {
|
||||
"/events": (req, server) => {
|
||||
// Disable the idle timeout for this streaming response.
|
||||
// Otherwise the connection will be closed if no bytes
|
||||
// are sent for 10 seconds (the default idleTimeout).
|
||||
server.timeout(req, 0);
|
||||
|
||||
return new Response(
|
||||
async function* () {
|
||||
yield "data: hello\n\n";
|
||||
// events can arrive sporadically without the connection being killed
|
||||
},
|
||||
{ headers: { "Content-Type": "text/event-stream" } },
|
||||
);
|
||||
},
|
||||
},
|
||||
});
|
||||
```
|
||||
Pass `0` to disable the timeout for a request.
|
||||
|
||||
### `server.requestIP(Request)`
|
||||
|
||||
|
||||
@@ -201,10 +201,6 @@ export const GuidesList = () => {
|
||||
title: "Streaming HTTP Server with Async Iterators",
|
||||
href: "/guides/http/stream-iterator",
|
||||
},
|
||||
{
|
||||
title: "Server-Sent Events (SSE)",
|
||||
href: "/guides/http/sse",
|
||||
},
|
||||
{
|
||||
title: "Streaming HTTP Server with Node.js Streams",
|
||||
href: "/guides/http/stream-node-streams-in-bun",
|
||||
|
||||
@@ -32,102 +32,120 @@ namespace uWS {
|
||||
constexpr uint64_t STATE_HAS_SIZE = 1ull << (sizeof(uint64_t) * 8 - 1);//0x8000000000000000;
|
||||
constexpr uint64_t STATE_IS_CHUNKED = 1ull << (sizeof(uint64_t) * 8 - 2);//0x4000000000000000;
|
||||
constexpr uint64_t STATE_IS_CHUNKED_EXTENSION = 1ull << (sizeof(uint64_t) * 8 - 3);//0x2000000000000000;
|
||||
constexpr uint64_t STATE_WAITING_FOR_LF = 1ull << (sizeof(uint64_t) * 8 - 4);//0x1000000000000000;
|
||||
constexpr uint64_t STATE_SIZE_MASK = ~(STATE_HAS_SIZE | STATE_IS_CHUNKED | STATE_IS_CHUNKED_EXTENSION | STATE_WAITING_FOR_LF);//0x0FFFFFFFFFFFFFFF;
|
||||
constexpr uint64_t STATE_SIZE_MASK = ~(STATE_HAS_SIZE | STATE_IS_CHUNKED | STATE_IS_CHUNKED_EXTENSION);//0x1FFFFFFFFFFFFFFF;
|
||||
constexpr uint64_t STATE_IS_ERROR = ~0ull;//0xFFFFFFFFFFFFFFFF;
|
||||
/* Overflow guard: if any of bits 55-59 are set before the next *16, one more
|
||||
* hex digit (plus the +2 for the trailing CRLF of chunk-data) would carry into
|
||||
* STATE_WAITING_FOR_LF at bit 60. Limits chunk size to 14 hex digits (~72 PB). */
|
||||
constexpr uint64_t STATE_SIZE_OVERFLOW = 0x1Full << (sizeof(uint64_t) * 8 - 9);//0x0F80000000000000;
|
||||
constexpr uint64_t STATE_SIZE_OVERFLOW = 0x0Full << (sizeof(uint64_t) * 8 - 8);//0x0F00000000000000;
|
||||
|
||||
inline uint64_t chunkSize(uint64_t state) {
|
||||
return state & STATE_SIZE_MASK;
|
||||
}
|
||||
|
||||
/* Parses the chunk-size line: HEXDIG+ [;ext...] CRLF
|
||||
*
|
||||
* Returns the new state. On return, exactly one of:
|
||||
* - state has STATE_HAS_SIZE set (success, data advanced past LF)
|
||||
* - state == STATE_IS_ERROR (malformed input)
|
||||
* - data is empty (short read; flags persist for resume)
|
||||
*
|
||||
* Resume flags:
|
||||
* STATE_WAITING_FOR_LF -> saw '\r' on previous call, need '\n'
|
||||
* STATE_IS_CHUNKED_EXTENSION -> mid-extension, skip hex parsing on resume
|
||||
*
|
||||
* Structure follows upstream uWS (scan-for-LF) with strict CRLF validation
|
||||
* added. Every byte is consumed in a forward scan so TCP segment boundaries
|
||||
* splitting the line at any point are handled by construction.
|
||||
*
|
||||
* RFC 7230 4.1.1:
|
||||
* chunk = chunk-size [ chunk-ext ] CRLF chunk-data CRLF
|
||||
* chunk-size = 1*HEXDIG
|
||||
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
|
||||
* chunk-ext-name = token
|
||||
* chunk-ext-val = token / quoted-string (TODO: quoted-string unsupported)
|
||||
*/
|
||||
inline uint64_t consumeHexNumber(std::string_view &data, uint64_t state) {
|
||||
/* Resume: '\r' was the last byte of the previous segment. Rare path,
|
||||
* use data directly to avoid the p/len load on the hot path. */
|
||||
if (state & STATE_WAITING_FOR_LF) [[unlikely]] {
|
||||
if (!data.length()) return state;
|
||||
if (data[0] != '\n') return STATE_IS_ERROR;
|
||||
data.remove_prefix(1);
|
||||
return ((state & ~(STATE_WAITING_FOR_LF | STATE_IS_CHUNKED_EXTENSION)) + 2)
|
||||
| STATE_HAS_SIZE | STATE_IS_CHUNKED;
|
||||
}
|
||||
inline bool isParsingChunkedExtension(uint64_t state) {
|
||||
return (state & STATE_IS_CHUNKED_EXTENSION) != 0;
|
||||
}
|
||||
|
||||
/* Load pointer+length into locals so the loops operate in registers.
|
||||
* Without this, Clang writes back to the string_view on every iteration.
|
||||
* Error paths skip the writeback: HttpParser returns immediately on
|
||||
* STATE_IS_ERROR and never reads data. */
|
||||
const char *p = data.data();
|
||||
size_t len = data.length();
|
||||
/* Reads hex number until CR or out of data to consume. Updates state. Returns bytes consumed. */
|
||||
inline void consumeHexNumber(std::string_view &data, uint64_t &state) {
|
||||
|
||||
/* Hex digits. Skipped when resuming mid-extension so that extension bytes
|
||||
* like 'a' aren't misparsed as hex. */
|
||||
if (!(state & STATE_IS_CHUNKED_EXTENSION)) {
|
||||
while (len) {
|
||||
unsigned char c = (unsigned char) *p;
|
||||
if (c <= 32 || c == ';') break; /* fall through to drain loop */
|
||||
unsigned int d = c | 0x20; /* fold A-F -> a-f; '0'..'9' unchanged */
|
||||
unsigned int n;
|
||||
if ((unsigned)(d - '0') < 10) [[likely]] n = d - '0';
|
||||
else if ((unsigned)(d - 'a') < 6) n = d - 'a' + 10;
|
||||
else return STATE_IS_ERROR;
|
||||
if (chunkSize(state) & STATE_SIZE_OVERFLOW) [[unlikely]] return STATE_IS_ERROR;
|
||||
state = ((state & STATE_SIZE_MASK) * 16ull + n) | STATE_IS_CHUNKED;
|
||||
++p; --len;
|
||||
}
|
||||
}
|
||||
/* RFC 9110: 5.5 Field Values (TLDR; anything above 31 is allowed \r, \n ; depending on context)*/
|
||||
|
||||
/* Drain [;ext...] \r \n. Upstream-style forward scan for LF, with strict
|
||||
* validation: only >32 bytes (extension) and exactly one '\r' immediately
|
||||
* before '\n' are allowed. */
|
||||
while (len) {
|
||||
unsigned char c = (unsigned char) *p;
|
||||
if (c == '\n') return STATE_IS_ERROR; /* bare LF */
|
||||
++p; --len;
|
||||
if (c == '\r') {
|
||||
if (!len) {
|
||||
data = std::string_view(p, len);
|
||||
return state | STATE_WAITING_FOR_LF;
|
||||
if(!isParsingChunkedExtension(state)){
|
||||
/* Consume everything higher than 32 and not ; (extension)*/
|
||||
while (data.length() && data[0] > 32 && data[0] != ';') {
|
||||
|
||||
unsigned char digit = (unsigned char)data[0];
|
||||
unsigned int number;
|
||||
if (digit >= '0' && digit <= '9') {
|
||||
number = digit - '0';
|
||||
} else if (digit >= 'a' && digit <= 'f') {
|
||||
number = digit - 'a' + 10;
|
||||
} else if (digit >= 'A' && digit <= 'F') {
|
||||
number = digit - 'A' + 10;
|
||||
} else {
|
||||
state = STATE_IS_ERROR;
|
||||
return;
|
||||
}
|
||||
if (*p != '\n') return STATE_IS_ERROR;
|
||||
++p; --len;
|
||||
data = std::string_view(p, len);
|
||||
return ((state & ~STATE_IS_CHUNKED_EXTENSION) + 2)
|
||||
| STATE_HAS_SIZE | STATE_IS_CHUNKED;
|
||||
|
||||
if ((chunkSize(state) & STATE_SIZE_OVERFLOW)) {
|
||||
state = STATE_IS_ERROR;
|
||||
return;
|
||||
}
|
||||
|
||||
// extract state bits
|
||||
uint64_t bits = /*state &*/ STATE_IS_CHUNKED;
|
||||
|
||||
state = (state & STATE_SIZE_MASK) * 16ull + number;
|
||||
|
||||
state |= bits;
|
||||
data.remove_prefix(1);
|
||||
}
|
||||
if (c <= 32) return STATE_IS_ERROR;
|
||||
state |= STATE_IS_CHUNKED_EXTENSION;
|
||||
}
|
||||
data = std::string_view(p, len);
|
||||
return state; /* short read */
|
||||
|
||||
auto len = data.length();
|
||||
if(len) {
|
||||
// consume extension
|
||||
if(data[0] == ';' || isParsingChunkedExtension(state)) {
|
||||
// mark that we are parsing chunked extension
|
||||
state |= STATE_IS_CHUNKED_EXTENSION;
|
||||
/* we got chunk extension lets remove it*/
|
||||
while(data.length()) {
|
||||
if(data[0] == '\r') {
|
||||
// we are done parsing extension
|
||||
state &= ~STATE_IS_CHUNKED_EXTENSION;
|
||||
break;
|
||||
}
|
||||
/* RFC 9110: Token format (TLDR; anything bellow 32 is not allowed)
|
||||
* TODO: add support for quoted-strings values (RFC 9110: 3.2.6. Quoted-String)
|
||||
* Example of chunked encoding with extensions:
|
||||
*
|
||||
* 4;key=value\r\n
|
||||
* Wiki\r\n
|
||||
* 5;foo=bar;baz=quux\r\n
|
||||
* pedia\r\n
|
||||
* 0\r\n
|
||||
* \r\n
|
||||
*
|
||||
* The chunk size is in hex (4, 5, 0), followed by optional
|
||||
* semicolon-separated extensions. Extensions consist of a key
|
||||
* (token) and optional value. The value may be a token or a
|
||||
* quoted string. The chunk data follows the CRLF after the
|
||||
* extensions and must be exactly the size specified.
|
||||
*
|
||||
* RFC 7230 Section 4.1.1 defines chunk extensions as:
|
||||
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
|
||||
* chunk-ext-name = token
|
||||
* chunk-ext-val = token / quoted-string
|
||||
*/
|
||||
if(data[0] <= 32) {
|
||||
state = STATE_IS_ERROR;
|
||||
return;
|
||||
}
|
||||
|
||||
data.remove_prefix(1);
|
||||
}
|
||||
}
|
||||
if(data.length() >= 2) {
|
||||
/* Consume \r\n */
|
||||
if((data[0] != '\r' || data[1] != '\n')) {
|
||||
state = STATE_IS_ERROR;
|
||||
return;
|
||||
}
|
||||
state += 2; // include the two last /r/n
|
||||
state |= STATE_HAS_SIZE | STATE_IS_CHUNKED;
|
||||
|
||||
data.remove_prefix(2);
|
||||
}
|
||||
}
|
||||
// short read
|
||||
}
|
||||
|
||||
inline void decChunkSize(uint64_t &state, uint64_t by) {
|
||||
|
||||
//unsigned int bits = state & STATE_IS_CHUNKED;
|
||||
|
||||
state = (state & ~STATE_SIZE_MASK) | (chunkSize(state) - by);
|
||||
|
||||
//state |= bits;
|
||||
}
|
||||
|
||||
inline bool hasChunkSize(uint64_t state) {
|
||||
@@ -169,8 +187,8 @@ namespace uWS {
|
||||
}
|
||||
|
||||
if (!hasChunkSize(state)) {
|
||||
state = consumeHexNumber(data, state);
|
||||
if (isParsingInvalidChunkedEncoding(state)) [[unlikely]] {
|
||||
consumeHexNumber(data, state);
|
||||
if (isParsingInvalidChunkedEncoding(state)) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (hasChunkSize(state) && chunkSize(state) == 2) {
|
||||
@@ -186,10 +204,6 @@ namespace uWS {
|
||||
|
||||
return std::string_view(nullptr, 0);
|
||||
}
|
||||
if (!hasChunkSize(state)) [[unlikely]] {
|
||||
/* Incomplete chunk-size line — need more data from the network. */
|
||||
return std::nullopt;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -138,7 +138,7 @@ pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObje
|
||||
promise_value.ensureStillAlive();
|
||||
handler.promise.strong.set(global, promise_value);
|
||||
|
||||
read_file.ReadFileUV.start(handler.globalThis.bunVM().eventLoop(), this.store.?, this.offset, this.size, Handler, handler);
|
||||
read_file.ReadFileUV.start(handler.globalThis.bunVM().uvLoop(), this.store.?, this.offset, this.size, Handler, handler);
|
||||
|
||||
return promise_value;
|
||||
}
|
||||
@@ -180,7 +180,7 @@ pub fn NewInternalReadFileHandler(comptime Context: type, comptime Function: any
|
||||
pub fn doReadFileInternal(this: *Blob, comptime Handler: type, ctx: Handler, comptime Function: anytype, global: *JSGlobalObject) void {
|
||||
if (Environment.isWindows) {
|
||||
const ReadFileHandler = NewInternalReadFileHandler(Handler, Function);
|
||||
return read_file.ReadFileUV.start(global.bunVM().eventLoop(), this.store.?, this.offset, this.size, ReadFileHandler, ctx);
|
||||
return read_file.ReadFileUV.start(libuv.Loop.get(), this.store.?, this.offset, this.size, ReadFileHandler, ctx);
|
||||
}
|
||||
const file_read = read_file.ReadFile.createWithCtx(
|
||||
bun.default_allocator,
|
||||
@@ -3134,7 +3134,7 @@ pub fn getStat(this: *Blob, globalThis: *jsc.JSGlobalObject, callback: *jsc.Call
|
||||
.encoded_slice = switch (path_like) {
|
||||
// it's already converted to utf8
|
||||
.encoded_slice => |slice| try slice.toOwned(bun.default_allocator),
|
||||
else => try ZigString.fromUTF8(path_like.slice()).toSliceClone(bun.default_allocator),
|
||||
else => try ZigString.init(path_like.slice()).toSliceClone(bun.default_allocator),
|
||||
},
|
||||
},
|
||||
}, globalThis.bunVM());
|
||||
|
||||
@@ -227,8 +227,11 @@ pub fn JSSink(comptime SinkType: type, comptime abi_name: []const u8) type {
|
||||
return streams.Signal.initWithType(SinkSignal, @as(*SinkSignal, @ptrFromInt(@as(usize, @bitCast(@intFromEnum(cpp))))));
|
||||
}
|
||||
|
||||
pub fn close(this: *@This(), _: ?Syscall.Error) void {
|
||||
onClose(@as(SinkSignal, @bitCast(@intFromPtr(this))).cpp, .js_undefined);
|
||||
pub fn close(this: *@This(), err: ?Syscall.Error) void {
|
||||
// Pass a truthy value for abort/error (non-null err) and js_undefined for normal
|
||||
// completion. This allows the JS close callback to distinguish between the two cases.
|
||||
const reason: JSValue = if (err != null) .true else .js_undefined;
|
||||
onClose(@as(SinkSignal, @bitCast(@intFromPtr(this))).cpp, reason);
|
||||
}
|
||||
|
||||
pub fn ready(this: *@This(), _: ?Blob.SizeType, _: ?Blob.SizeType) void {
|
||||
|
||||
@@ -263,7 +263,7 @@ pub const File = struct {
|
||||
.path = .{
|
||||
.encoded_slice = switch (path_like) {
|
||||
.encoded_slice => |slice| try slice.toOwned(bun.default_allocator),
|
||||
else => try jsc.ZigString.fromUTF8(path_like.slice()).toSliceClone(bun.default_allocator),
|
||||
else => try jsc.ZigString.init(path_like.slice()).toSliceClone(bun.default_allocator),
|
||||
},
|
||||
},
|
||||
}, globalThis.bunVM()),
|
||||
|
||||
@@ -523,7 +523,6 @@ pub const ReadFileUV = struct {
|
||||
pub const doClose = FileCloser(@This()).doClose;
|
||||
|
||||
loop: *libuv.Loop,
|
||||
event_loop: *jsc.EventLoop,
|
||||
file_store: FileStore,
|
||||
byte_store: ByteStore = ByteStore{ .allocator = bun.default_allocator },
|
||||
store: *Store,
|
||||
@@ -544,11 +543,10 @@ pub const ReadFileUV = struct {
|
||||
|
||||
req: libuv.fs_t = std.mem.zeroes(libuv.fs_t),
|
||||
|
||||
pub fn start(event_loop: *jsc.EventLoop, store: *Store, off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
|
||||
pub fn start(loop: *libuv.Loop, store: *Store, off: SizeType, max_len: SizeType, comptime Handler: type, handler: *anyopaque) void {
|
||||
log("ReadFileUV.start", .{});
|
||||
var this = bun.new(ReadFileUV, .{
|
||||
.loop = event_loop.virtual_machine.uvLoop(),
|
||||
.event_loop = event_loop,
|
||||
.loop = loop,
|
||||
.file_store = store.data.file,
|
||||
.store = store,
|
||||
.offset = off,
|
||||
@@ -557,20 +555,15 @@ pub const ReadFileUV = struct {
|
||||
.on_complete_fn = @ptrCast(&Handler.run),
|
||||
});
|
||||
store.ref();
|
||||
// Keep the event loop alive while the async operation is pending
|
||||
event_loop.refConcurrently();
|
||||
this.getFd(onFileOpen);
|
||||
}
|
||||
|
||||
pub fn finalize(this: *ReadFileUV) void {
|
||||
log("ReadFileUV.finalize", .{});
|
||||
const event_loop = this.event_loop;
|
||||
defer {
|
||||
this.store.deref();
|
||||
this.req.deinit();
|
||||
bun.destroy(this);
|
||||
// Release the event loop reference now that we're done
|
||||
event_loop.unrefConcurrently();
|
||||
log("ReadFileUV.finalize destroy", .{});
|
||||
}
|
||||
|
||||
|
||||
@@ -824,7 +824,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
// onWritable reset backpressure state to allow flushing
|
||||
this.has_backpressure = false;
|
||||
if (this.aborted) {
|
||||
this.signal.close(null);
|
||||
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
|
||||
this.flushPromise() catch {}; // TODO: properly propagate exception upwards
|
||||
this.finalize();
|
||||
return false;
|
||||
@@ -880,7 +880,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
pub fn start(this: *@This(), stream_start: Start) bun.sys.Maybe(void) {
|
||||
if (this.aborted or this.res == null or this.res.?.hasResponded()) {
|
||||
this.markDone();
|
||||
this.signal.close(null);
|
||||
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
|
||||
return .success;
|
||||
}
|
||||
|
||||
@@ -986,7 +986,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
|
||||
if (this.res == null or this.res.?.hasResponded()) {
|
||||
this.markDone();
|
||||
this.signal.close(null);
|
||||
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
|
||||
}
|
||||
|
||||
return .success;
|
||||
@@ -1040,7 +1040,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
if (this.res == null or this.res.?.hasResponded()) {
|
||||
this.signal.close(null);
|
||||
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
|
||||
this.markDone();
|
||||
return .{ .done = {} };
|
||||
}
|
||||
@@ -1098,7 +1098,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
if (this.res == null or this.res.?.hasResponded()) {
|
||||
this.signal.close(null);
|
||||
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
|
||||
this.markDone();
|
||||
return .{ .done = {} };
|
||||
}
|
||||
@@ -1168,7 +1168,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
|
||||
if (this.done or this.res == null or this.res.?.hasResponded()) {
|
||||
this.requested_end = true;
|
||||
this.signal.close(null);
|
||||
this.signal.close(if (this.aborted) Syscall.Error.fromCode(.CONNRESET, .close) else null);
|
||||
this.markDone();
|
||||
this.finalize();
|
||||
return .{ .result = jsc.JSValue.jsNumber(0) };
|
||||
@@ -1212,7 +1212,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
|
||||
this.aborted = true;
|
||||
|
||||
this.signal.close(null);
|
||||
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
|
||||
|
||||
this.flushPromise() catch {}; // TODO: properly propagate exception upwards
|
||||
this.finalize();
|
||||
@@ -1433,7 +1433,7 @@ pub const NetworkSink = struct {
|
||||
pub fn abort(this: *@This()) void {
|
||||
this.ended = true;
|
||||
this.done = true;
|
||||
this.signal.close(null);
|
||||
this.signal.close(Syscall.Error.fromCode(.CONNRESET, .close));
|
||||
this.cancel = true;
|
||||
this.finalize();
|
||||
}
|
||||
|
||||
@@ -270,14 +270,14 @@ pub const Mask = struct {
|
||||
|
||||
while (true) {
|
||||
if (image == null) {
|
||||
if (input.tryParse(Image.parse, .{}).asValue()) |value| {
|
||||
if (@call(.auto, @field(Image, "parse"), .{input}).asValue()) |value| {
|
||||
image = value;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (position == null) {
|
||||
if (input.tryParse(Position.parse, .{}).asValue()) |value| {
|
||||
if (Position.parse(input).asValue()) |value| {
|
||||
position = value;
|
||||
size = input.tryParse(struct {
|
||||
pub inline fn parseFn(i: *css.Parser) css.Result(BackgroundSize) {
|
||||
@@ -290,35 +290,35 @@ pub const Mask = struct {
|
||||
}
|
||||
|
||||
if (repeat == null) {
|
||||
if (input.tryParse(BackgroundRepeat.parse, .{}).asValue()) |value| {
|
||||
if (BackgroundRepeat.parse(input).asValue()) |value| {
|
||||
repeat = value;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (origin == null) {
|
||||
if (input.tryParse(GeometryBox.parse, .{}).asValue()) |value| {
|
||||
if (GeometryBox.parse(input).asValue()) |value| {
|
||||
origin = value;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (clip == null) {
|
||||
if (input.tryParse(MaskClip.parse, .{}).asValue()) |value| {
|
||||
if (MaskClip.parse(input).asValue()) |value| {
|
||||
clip = value;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (composite == null) {
|
||||
if (input.tryParse(MaskComposite.parse, .{}).asValue()) |value| {
|
||||
if (MaskComposite.parse(input).asValue()) |value| {
|
||||
composite = value;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (mode == null) {
|
||||
if (input.tryParse(MaskMode.parse, .{}).asValue()) |value| {
|
||||
if (MaskMode.parse(input).asValue()) |value| {
|
||||
mode = value;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -683,22 +683,29 @@ export function readDirectStream(stream, sink, underlyingSource) {
|
||||
$putByIdDirectPrivate(stream, "underlyingSource", null); // doing this causes isReadableStreamDefaultController to return false
|
||||
$putByIdDirectPrivate(stream, "start", undefined);
|
||||
function close(stream, reason) {
|
||||
const cancelFn = underlyingSource?.cancel;
|
||||
if (cancelFn) {
|
||||
try {
|
||||
var prom = cancelFn.$call(underlyingSource, reason);
|
||||
if ($isPromise(prom)) {
|
||||
$markPromiseAsHandled(prom);
|
||||
}
|
||||
} catch {}
|
||||
|
||||
underlyingSource = undefined;
|
||||
// reason !== undefined means an abort/error (native passes a truthy value for abort).
|
||||
// reason === undefined means normal stream completion.
|
||||
// Only call the user's cancel callback on abort/error, not on normal completion.
|
||||
if (reason !== undefined) {
|
||||
const cancelFn = underlyingSource?.cancel;
|
||||
if (cancelFn) {
|
||||
try {
|
||||
// Pass undefined to the cancel callback to preserve existing abort semantics
|
||||
// (readableStreamFromAsyncIterator relies on cancel(undefined) to call iter.return()
|
||||
// instead of iter.throw() for abort).
|
||||
var prom = cancelFn.$call(underlyingSource, undefined);
|
||||
if ($isPromise(prom)) {
|
||||
$markPromiseAsHandled(prom);
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
underlyingSource = undefined;
|
||||
|
||||
if (stream) {
|
||||
$putByIdDirectPrivate(stream, "readableStreamController", undefined);
|
||||
$putByIdDirectPrivate(stream, "reader", undefined);
|
||||
if (reason) {
|
||||
if (reason !== undefined) {
|
||||
$putByIdDirectPrivate(stream, "state", $streamErrored);
|
||||
$putByIdDirectPrivate(stream, "storedError", reason);
|
||||
} else {
|
||||
|
||||
@@ -26,7 +26,6 @@ pub const RedisError = error{
|
||||
UnsupportedProtocol,
|
||||
ConnectionTimeout,
|
||||
IdleTimeout,
|
||||
NestingDepthExceeded,
|
||||
};
|
||||
|
||||
pub fn valkeyErrorToJS(globalObject: *jsc.JSGlobalObject, message: ?[]const u8, err: RedisError) jsc.JSValue {
|
||||
@@ -56,7 +55,6 @@ pub fn valkeyErrorToJS(globalObject: *jsc.JSGlobalObject, message: ?[]const u8,
|
||||
error.InvalidResponseType => .REDIS_INVALID_RESPONSE_TYPE,
|
||||
error.ConnectionTimeout => .REDIS_CONNECTION_TIMEOUT,
|
||||
error.IdleTimeout => .REDIS_IDLE_TIMEOUT,
|
||||
error.NestingDepthExceeded => .REDIS_INVALID_RESPONSE,
|
||||
error.JSError => return globalObject.takeException(error.JSError),
|
||||
error.OutOfMemory => globalObject.throwOutOfMemory() catch return globalObject.takeException(error.JSError),
|
||||
error.JSTerminated => return globalObject.takeException(error.JSTerminated),
|
||||
@@ -422,16 +420,7 @@ pub const ValkeyReader = struct {
|
||||
};
|
||||
}
|
||||
|
||||
/// Maximum allowed nesting depth for RESP aggregate types.
|
||||
/// This limits recursion to prevent excessive stack usage from
|
||||
/// deeply nested responses.
|
||||
const max_nesting_depth = 128;
|
||||
|
||||
pub fn readValue(self: *ValkeyReader, allocator: std.mem.Allocator) RedisError!RESPValue {
|
||||
return self.readValueWithDepth(allocator, 0);
|
||||
}
|
||||
|
||||
fn readValueWithDepth(self: *ValkeyReader, allocator: std.mem.Allocator, depth: usize) RedisError!RESPValue {
|
||||
const type_byte = try self.readByte();
|
||||
|
||||
return switch (RESPType.fromByte(type_byte) orelse return error.InvalidResponseType) {
|
||||
@@ -462,7 +451,6 @@ pub const ValkeyReader = struct {
|
||||
return RESPValue{ .BulkString = owned };
|
||||
},
|
||||
.Array => {
|
||||
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
|
||||
const len = try self.readInteger();
|
||||
if (len < 0) return RESPValue{ .Array = &[_]RESPValue{} };
|
||||
const array = try allocator.alloc(RESPValue, @as(usize, @intCast(len)));
|
||||
@@ -474,7 +462,7 @@ pub const ValkeyReader = struct {
|
||||
}
|
||||
}
|
||||
while (i < len) : (i += 1) {
|
||||
array[i] = try self.readValueWithDepth(allocator, depth + 1);
|
||||
array[i] = try self.readValue(allocator);
|
||||
}
|
||||
return RESPValue{ .Array = array };
|
||||
},
|
||||
@@ -507,7 +495,6 @@ pub const ValkeyReader = struct {
|
||||
return RESPValue{ .VerbatimString = try self.readVerbatimString(allocator) };
|
||||
},
|
||||
.Map => {
|
||||
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
|
||||
const len = try self.readInteger();
|
||||
if (len < 0) return error.InvalidMap;
|
||||
|
||||
@@ -521,15 +508,11 @@ pub const ValkeyReader = struct {
|
||||
}
|
||||
|
||||
while (i < len) : (i += 1) {
|
||||
var key = try self.readValueWithDepth(allocator, depth + 1);
|
||||
errdefer key.deinit(allocator);
|
||||
const value = try self.readValueWithDepth(allocator, depth + 1);
|
||||
entries[i] = .{ .key = key, .value = value };
|
||||
entries[i] = .{ .key = try self.readValue(allocator), .value = try self.readValue(allocator) };
|
||||
}
|
||||
return RESPValue{ .Map = entries };
|
||||
},
|
||||
.Set => {
|
||||
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
|
||||
const len = try self.readInteger();
|
||||
if (len < 0) return error.InvalidSet;
|
||||
|
||||
@@ -542,12 +525,11 @@ pub const ValkeyReader = struct {
|
||||
}
|
||||
}
|
||||
while (i < len) : (i += 1) {
|
||||
set[i] = try self.readValueWithDepth(allocator, depth + 1);
|
||||
set[i] = try self.readValue(allocator);
|
||||
}
|
||||
return RESPValue{ .Set = set };
|
||||
},
|
||||
.Attribute => {
|
||||
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
|
||||
const len = try self.readInteger();
|
||||
if (len < 0) return error.InvalidAttribute;
|
||||
|
||||
@@ -560,9 +542,9 @@ pub const ValkeyReader = struct {
|
||||
}
|
||||
}
|
||||
while (i < len) : (i += 1) {
|
||||
var key = try self.readValueWithDepth(allocator, depth + 1);
|
||||
var key = try self.readValue(allocator);
|
||||
errdefer key.deinit(allocator);
|
||||
const value = try self.readValueWithDepth(allocator, depth + 1);
|
||||
const value = try self.readValue(allocator);
|
||||
attrs[i] = .{ .key = key, .value = value };
|
||||
}
|
||||
|
||||
@@ -571,7 +553,7 @@ pub const ValkeyReader = struct {
|
||||
errdefer {
|
||||
allocator.destroy(value_ptr);
|
||||
}
|
||||
value_ptr.* = try self.readValueWithDepth(allocator, depth + 1);
|
||||
value_ptr.* = try self.readValue(allocator);
|
||||
|
||||
return RESPValue{ .Attribute = .{
|
||||
.attributes = attrs,
|
||||
@@ -579,13 +561,11 @@ pub const ValkeyReader = struct {
|
||||
} };
|
||||
},
|
||||
.Push => {
|
||||
if (depth >= max_nesting_depth) return error.NestingDepthExceeded;
|
||||
const len = try self.readInteger();
|
||||
if (len < 0 or len == 0) return error.InvalidPush;
|
||||
|
||||
// First element is the push type
|
||||
var push_type = try self.readValueWithDepth(allocator, depth + 1);
|
||||
defer push_type.deinit(allocator);
|
||||
const push_type = try self.readValue(allocator);
|
||||
var push_type_str: []const u8 = "";
|
||||
|
||||
switch (push_type) {
|
||||
@@ -614,7 +594,7 @@ pub const ValkeyReader = struct {
|
||||
}
|
||||
}
|
||||
while (i < len - 1) : (i += 1) {
|
||||
data[i] = try self.readValueWithDepth(allocator, depth + 1);
|
||||
data[i] = try self.readValue(allocator);
|
||||
}
|
||||
|
||||
return RESPValue{ .Push = .{
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
import { describe, expect } from "bun:test";
|
||||
import { itBundled } from "../expectBundled";
|
||||
|
||||
describe("css", () => {
|
||||
itBundled("css/mask-geometry-box-preserved", {
|
||||
files: {
|
||||
"index.css": /* css */ `
|
||||
.test-a::after {
|
||||
mask: linear-gradient(#fff 0 0) padding-box, linear-gradient(#fff 0 0);
|
||||
}
|
||||
.test-b::after {
|
||||
mask: linear-gradient(#fff 0 0) content-box, linear-gradient(#fff 0 0);
|
||||
}
|
||||
`,
|
||||
},
|
||||
outdir: "/out",
|
||||
entryPoints: ["/index.css"],
|
||||
onAfterBundle(api) {
|
||||
const output = api.readFile("/out/index.css");
|
||||
expect(output).toContain("padding-box");
|
||||
expect(output).toContain("content-box");
|
||||
expect(output).toContain(".test-a");
|
||||
expect(output).toContain(".test-b");
|
||||
expect(output).not.toContain(".test-a:after, .test-b:after");
|
||||
},
|
||||
});
|
||||
|
||||
itBundled("css/webkit-mask-geometry-box-preserved", {
|
||||
files: {
|
||||
"index.css": /* css */ `
|
||||
.test-c::after {
|
||||
-webkit-mask: linear-gradient(#fff 0 0) padding-box, linear-gradient(#fff 0 0);
|
||||
-webkit-mask-composite: xor;
|
||||
}
|
||||
`,
|
||||
},
|
||||
outdir: "/out",
|
||||
entryPoints: ["/index.css"],
|
||||
onAfterBundle(api) {
|
||||
const output = api.readFile("/out/index.css");
|
||||
expect(output).toContain("padding-box");
|
||||
},
|
||||
});
|
||||
});
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user