Compare commits

...

2 Commits

Author SHA1 Message Date
Claude Bot
0d7441d808 fix(server): always pause socket after onData, only resume in onPull
The previous backpressure implementation only paused the HTTP socket
when data overflowed into ByteStream's internal buffer. However, when
the pipeTo loop was fast enough to always have a pending pull ready,
data went through the "fast path" (directly copied to the pending
buffer) without ever triggering a pause. This meant the socket was
never actually throttled.

Fix by always calling pause() after onData delivers data, even on the
fast path. The socket is only resumed when the consumer's next onPull
arrives (either when setting up a pending read or when the internal
buffer is fully drained). This creates a natural throttle that matches
the consumer's processing speed.

Also rewrite the test to:
- Use process.memoryUsage().rss from within the server (cross-platform)
- Use 512MB payload for clear differentiation between fixed/unfixed
- Remove Linux-only /proc/pid/statm dependency
- Use faster 5ms write delay (down from 500ms) for quicker completion

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-19 04:28:51 +00:00
Claude Bot
bd7fd33745 fix(server): propagate backpressure from request body stream to HTTP socket
When `Bun.serve()` receives a request body and pipes it to a slow
`WritableStream` via `pipeTo()`, data was buffered without limit because
backpressure from the consumer never propagated back to the HTTP socket.
This caused unbounded memory growth proportional to the request body size.

Add a backpressure callback to ByteStream that pauses/resumes the
underlying HTTP socket via uWebSockets' pause()/resume() API. When the
ByteStream's internal buffer accumulates data (no pending JS read), the
socket is paused. When the JS consumer pulls data, the socket is resumed.

Closes #27104

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-18 19:34:31 +00:00
3 changed files with 246 additions and 1 deletions

View File

@@ -1372,6 +1372,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
fn detachResponse(this: *RequestContext) void {
this.request_body_buf.clearAndFree(bun.default_allocator);
// Clear the backpressure callback before detaching the response,
// so the ByteStream cannot try to pause/resume a freed response.
this.clearRequestBodyBackpressure();
if (this.resp) |resp| {
this.resp = null;
@@ -2395,6 +2399,10 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
bun.default_allocator,
) catch {}; // TODO: properly propagate exception upwards
} else {
// This is the last chunk - clear the backpressure callback
// since the RequestContext may be freed after this point.
readable.ptr.Bytes.backpressure.clear();
var strong = this.request_body_readable_stream_ref;
this.request_body_readable_stream_ref = .{};
defer strong.deinit();
@@ -2523,6 +2531,35 @@ pub fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool,
var this = bun.cast(*RequestContext, ptr);
bun.debugAssert(this.request_body_readable_stream_ref.held.impl == null);
this.request_body_readable_stream_ref = jsc.WebCore.ReadableStream.Strong.init(readable, globalThis);
// Wire up backpressure so ByteStream can pause/resume the HTTP socket
// when the JS consumer is not keeping up with incoming data.
readable.ptr.Bytes.backpressure = .{
.ctx = this,
.onBackpressure = onRequestBodyBackpressure,
};
}
fn onRequestBodyBackpressure(ctx: *anyopaque, paused: bool) void {
const this = bun.cast(*RequestContext, ctx);
if (this.resp) |resp| {
if (paused) {
ctxLog("backpressure: pausing HTTP socket", .{});
resp.pause();
} else {
ctxLog("backpressure: resuming HTTP socket", .{});
resp.@"resume"();
}
}
}
/// Clear the backpressure callback on the request body's ByteStream,
/// preventing it from referencing this RequestContext after it's freed.
fn clearRequestBodyBackpressure(this: *RequestContext) void {
if (this.server) |server| {
if (this.request_body_readable_stream_ref.get(server.globalThis)) |readable| {
readable.ptr.Bytes.backpressure.clear();
}
}
}
pub fn onStartBufferingCallback(this: *anyopaque) void {

View File

@@ -15,6 +15,31 @@ highWaterMark: Blob.SizeType = 0,
pipe: Pipe = .{},
size_hint: Blob.SizeType = 0,
buffer_action: ?BufferAction = null,
backpressure: Backpressure = .{},
/// Callback interface for propagating backpressure to the data source
/// (e.g., pausing/resuming an HTTP socket when the stream consumer is slow).
pub const Backpressure = struct {
ctx: ?*anyopaque = null,
onBackpressure: ?*const fn (ctx: *anyopaque, paused: bool) void = null,
pub fn pause(this: *const Backpressure) void {
if (this.ctx) |ctx| {
if (this.onBackpressure) |cb| cb(ctx, true);
}
}
pub fn @"resume"(this: *const Backpressure) void {
if (this.ctx) |ctx| {
if (this.onBackpressure) |cb| cb(ctx, false);
}
}
pub fn clear(this: *Backpressure) void {
this.ctx = null;
this.onBackpressure = null;
}
};
pub const Source = webcore.ReadableStream.NewSource(
@This(),
@@ -201,8 +226,14 @@ pub fn onData(
}
const remaining = chunk[to_copy.len..];
if (remaining.len > 0 and chunk.len > 0)
if (remaining.len > 0 and chunk.len > 0) {
this.append(stream, to_copy.len, chunk, allocator) catch @panic("Out of memory while copying request body");
}
// Pause the source after delivering data. The source will be resumed
// when the consumer's next onPull arrives, which creates a natural
// throttle matching the consumer's processing speed.
this.backpressure.pause();
log("ByteStream.onData pending.run()", .{});
@@ -214,6 +245,10 @@ pub fn onData(
log("ByteStream.onData no action just append", .{});
this.append(stream, 0, chunk, allocator) catch @panic("Out of memory while copying request body");
// No consumer is actively reading (pending state is not pending), so data is
// accumulating in our internal buffer. Signal backpressure to pause the source.
this.backpressure.pause();
}
pub fn append(
@@ -299,8 +334,11 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
if (this.offset + to_write == this.buffer.items.len) {
this.offset = 0;
this.buffer.items.len = 0;
// Buffer fully drained - resume the source so more data can flow in.
this.backpressure.@"resume"();
} else {
this.offset += to_write;
// Buffer still has data - keep the source paused until fully drained.
}
if (this.has_received_last_chunk and remaining_in_buffer.len == 0) {
@@ -329,6 +367,9 @@ pub fn onPull(this: *@This(), buffer: []u8, view: jsc.JSValue) streams.Result {
};
}
// Consumer is waiting for data - ensure source is resumed.
this.backpressure.@"resume"();
this.pending_buffer = buffer;
this.setValue(view);
@@ -343,6 +384,9 @@ pub fn onCancel(this: *@This()) void {
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
this.done = true;
this.pending_value.deinit();
// Resume the source before clearing the callback so the socket doesn't stay paused.
this.backpressure.@"resume"();
this.backpressure.clear();
if (view != .zero) {
this.pending_buffer = &.{};
@@ -367,6 +411,10 @@ pub fn deinit(this: *@This()) void {
jsc.markBinding(@src());
if (this.buffer.capacity > 0) this.buffer.clearAndFree();
// Resume the source before tearing down so the socket doesn't stay paused.
this.backpressure.@"resume"();
this.backpressure.clear();
this.pending_value.deinit();
if (!this.done) {
this.done = true;

View File

@@ -0,0 +1,160 @@
import { expect, test } from "bun:test";
import { bunEnv, bunExe, tempDir } from "harness";
// Test that Bun.serve() propagates backpressure from a slow WritableStream
// consumer back to the HTTP socket, preventing unbounded memory growth.
// Uses separate processes to ensure independent sender/server event loops.
// The server tracks peak RSS from within the process (cross-platform).
test("Bun.serve request body backpressure limits memory growth", async () => {
// Use 512MB payload to clearly distinguish backpressure from no-backpressure.
// With backpressure: RSS bounded by kernel TCP buffers (~10MB) + stream overhead.
// Without: server buffers the full payload.
const PAYLOAD_SIZE = 512 * 1024 * 1024;
const PAYLOAD_MB = PAYLOAD_SIZE / (1024 * 1024);
using dir = tempDir("backpressure-test", {
"server.ts": `
let initialRss = 0;
let peakRss = 0;
function trackMemory() {
const rss = process.memoryUsage().rss;
if (rss > peakRss) peakRss = rss;
}
const server = Bun.serve({
hostname: "127.0.0.1",
port: 0,
idleTimeout: 255,
async fetch(req) {
initialRss = process.memoryUsage().rss;
peakRss = initialRss;
let totalReceived = 0;
const memInterval = setInterval(trackMemory, 25);
const writeStream = new WritableStream({
async write(value) {
totalReceived += value.length;
trackMemory();
// Simulate a slow consumer - 5ms per chunk is enough to create
// backpressure with a fast sender.
await new Promise(res => setTimeout(res, 5));
trackMemory();
},
}, { highWaterMark: 1 });
await req.body!.pipeTo(writeStream).catch((e) => {
console.error("pipeTo error:", e);
});
clearInterval(memInterval);
trackMemory();
const rssGrowthMB = (peakRss - initialRss) / (1024 * 1024);
return new Response(JSON.stringify({
totalReceived,
rssGrowthMB: +rssGrowthMB.toFixed(1),
}));
},
});
console.log("READY:" + server.port);
`,
"sender.ts": `
const port = process.argv[2];
const PAYLOAD_SIZE = ${PAYLOAD_SIZE};
const chunkSize = 256 * 1024; // 256KB chunks for fast sending
const totalChunks = PAYLOAD_SIZE / chunkSize;
let chunksSent = 0;
const body = new ReadableStream({
pull(controller) {
if (chunksSent >= totalChunks) {
controller.close();
return;
}
controller.enqueue(new Uint8Array(chunkSize));
chunksSent++;
},
});
const response = await fetch("http://127.0.0.1:" + port, {
method: "POST",
body,
// @ts-ignore
duplex: "half",
});
const result = await response.json();
console.log("RESULT:" + JSON.stringify(result));
`,
});
// Start server
await using serverProc = Bun.spawn({
cmd: [bunExe(), "run", "server.ts"],
cwd: String(dir),
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});
// Wait for server to be ready and get port
const reader = serverProc.stdout.getReader();
let port: string = "";
let accumulated = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
accumulated += new TextDecoder().decode(value);
const match = accumulated.match(/READY:(\d+)/);
if (match) {
port = match[1];
break;
}
}
reader.releaseLock();
expect(port).not.toBe("");
// Start sender
await using senderProc = Bun.spawn({
cmd: [bunExe(), "run", "sender.ts", port],
cwd: String(dir),
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});
const [senderStdout, senderStderr, senderExitCode] = await Promise.all([
senderProc.stdout.text(),
senderProc.stderr.text(),
senderProc.exited,
]);
if (senderStderr) console.log("sender stderr:", senderStderr);
// Verify the sender got a successful response
expect(senderStdout).toContain("RESULT:");
const resultMatch = senderStdout.match(/RESULT:(.+)/);
const result = JSON.parse(resultMatch![1]);
console.log(`Server RSS growth: ${result.rssGrowthMB}MB (payload: ${PAYLOAD_MB}MB)`);
expect(result.totalReceived).toBe(PAYLOAD_SIZE);
// With backpressure: RSS growth bounded regardless of payload size (~80-120MB
// from kernel TCP buffers, stream overhead, and GC not immediately releasing pages).
// Without backpressure: RSS growth approaches or exceeds the payload size (512MB+).
// Threshold of 200MB catches the no-backpressure case while accommodating the
// fixed overhead from TCP buffers + debug build + CI variability.
expect(result.rssGrowthMB).toBeLessThan(200);
expect(senderExitCode).toBe(0);
// Clean up server
serverProc.kill();
}, 120_000);