From 604c83c8a6bbbfc8732b8ce7f4e0d2824d694ef6 Mon Sep 17 00:00:00 2001 From: robobun Date: Mon, 29 Dec 2025 20:02:18 -0800 Subject: [PATCH] =?UTF-8?q?perf(ipc):=20fix=20O(n=C2=B2)=20JSON=20scanning?= =?UTF-8?q?=20for=20large=20chunked=20messages=20(#25743)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Fix O(n²) performance bug in JSON mode IPC when receiving large messages that arrive in chunks - Add `JsonIncomingBuffer` wrapper that tracks newline positions to avoid re-scanning - Each byte is now scanned exactly once (on arrival or when preceding message is consumed) ## Problem When data arrives in chunks in JSON mode, `decodeIPCMessage` was calling `indexOfChar(data, '\n')` on the ENTIRE accumulated buffer every time. For a 10MB message arriving in 160 chunks of 64KB: - Chunk 1: scan 64KB - Chunk 2: scan 128KB - Chunk 3: scan 192KB - ... - Chunk 160: scan 10MB Total: ~800MB scanned for one 10MB message. ## Solution Introduced a `JsonIncomingBuffer` struct that: 1. Tracks `newline_pos: ?u32` - position of known upcoming newline (if any) 2. On `append(bytes)`: Only scans new chunk for `\n` if no position is cached 3. On `consume(bytes)`: Updates or re-scans as needed after message processing This ensures O(n) scanning instead of O(n²). ## Test plan - [x] `bun run zig:check-all` passes (all platforms compile) - [x] `bun bd test test/js/bun/spawn/spawn.ipc.test.ts` - 4 tests pass - [x] `bun bd test test/js/node/child_process/child_process_ipc.test.js` - 1 test pass - [x] `bun bd test test/js/bun/spawn/bun-ipc-inherit.test.ts` - 1 test pass - [x] `bun bd test test/js/bun/spawn/spawn.ipc.bun-node.test.ts` - 1 test pass - [x] `bun bd test test/js/bun/spawn/spawn.ipc.node-bun.test.ts` - 1 test pass - [x] `bun bd test test/js/node/child_process/child_process_ipc_large_disconnect.test.js` - 1 test pass - [x] Manual verification with `child-process-send-cb-more.js` (32KB messages) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Bot Co-authored-by: Claude Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jarred Sumner --- bench/snippets/ipc-json-child.mjs | 4 + bench/snippets/ipc-json.mjs | 45 ++++ src/bun.js/JSONLineBuffer.zig | 135 ++++++++++ src/bun.js/ipc.zig | 427 +++++++++++++++++++----------- 4 files changed, 456 insertions(+), 155 deletions(-) create mode 100644 bench/snippets/ipc-json-child.mjs create mode 100644 bench/snippets/ipc-json.mjs create mode 100644 src/bun.js/JSONLineBuffer.zig diff --git a/bench/snippets/ipc-json-child.mjs b/bench/snippets/ipc-json-child.mjs new file mode 100644 index 0000000000..4807baa14c --- /dev/null +++ b/bench/snippets/ipc-json-child.mjs @@ -0,0 +1,4 @@ +// Child process for IPC benchmarks - echoes messages back to parent +process.on("message", message => { + process.send(message); +}); diff --git a/bench/snippets/ipc-json.mjs b/bench/snippets/ipc-json.mjs new file mode 100644 index 0000000000..2daa1cd59e --- /dev/null +++ b/bench/snippets/ipc-json.mjs @@ -0,0 +1,45 @@ +import { fork } from "node:child_process"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; +import { bench, run } from "../runner.mjs"; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +const childPath = path.join(__dirname, "ipc-json-child.mjs"); + +const smallMessage = { type: "ping", id: 1 }; +const largeString = Buffer.alloc(10 * 1024 * 1024, "A").toString(); +const largeMessage = { type: "ping", id: 1, data: largeString }; + +async function runBenchmark(message, count) { + let received = 0; + const { promise, resolve } = Promise.withResolvers(); + + const child = fork(childPath, [], { + stdio: ["ignore", "ignore", "ignore", "ipc"], + serialization: "json", + }); + + child.on("message", () => { + received++; + if (received >= count) { + resolve(); + } + }); + + for (let i = 0; i < count; i++) { + child.send(message); + } + + await promise; + child.kill(); +} + +bench("ipc json - small messages (1000 roundtrips)", async () => { + await runBenchmark(smallMessage, 1000); +}); + +bench("ipc json - 10MB messages (10 roundtrips)", async () => { + await runBenchmark(largeMessage, 10); +}); + +await run(); diff --git a/src/bun.js/JSONLineBuffer.zig b/src/bun.js/JSONLineBuffer.zig new file mode 100644 index 0000000000..b4b2833005 --- /dev/null +++ b/src/bun.js/JSONLineBuffer.zig @@ -0,0 +1,135 @@ +/// Buffer for newline-delimited data that tracks scan positions to avoid O(n²) scanning. +/// Each byte is scanned exactly once. We track: +/// - newline_pos: position of first known newline (if any) +/// - scanned_pos: how far we've scanned (bytes before this have been checked) +/// - head: offset into the buffer where unconsumed data starts (avoids copying on each consume) +/// +/// When data arrives, we only scan the NEW bytes. +/// When we consume a message, we just advance `head` instead of copying. +/// Compaction only happens when head exceeds a threshold. +pub const JSONLineBuffer = struct { + data: bun.ByteList = .{}, + /// Offset into data where unconsumed content starts. + head: u32 = 0, + /// Position of a known upcoming newline relative to head, if any. + newline_pos: ?u32 = null, + /// How far we've scanned for newlines relative to head. + scanned_pos: u32 = 0, + + /// Compact the buffer when head exceeds this threshold. + const compaction_threshold = 16 * 1024 * 1024; // 16 MB + + /// Get the active (unconsumed) portion of the buffer. + fn activeSlice(self: *const @This()) []const u8 { + return self.data.slice()[self.head..]; + } + + /// Scan for newline in unscanned portion of the buffer. + fn scanForNewline(self: *@This()) void { + if (self.newline_pos != null) return; + const slice = self.activeSlice(); + if (self.scanned_pos >= slice.len) return; + + const unscanned = slice[self.scanned_pos..]; + if (bun.strings.indexOfChar(unscanned, '\n')) |local_idx| { + bun.debugAssert(local_idx <= std.math.maxInt(u32)); + const pos = self.scanned_pos +| @as(u32, @intCast(local_idx)); + self.newline_pos = pos; + self.scanned_pos = pos +| 1; // Only scanned up to (and including) the newline + } else { + bun.debugAssert(slice.len <= std.math.maxInt(u32)); + self.scanned_pos = @intCast(slice.len); // No newline, scanned everything + } + } + + /// Compact the buffer by moving data to the front. Called when head exceeds threshold. + fn compact(self: *@This()) void { + if (self.head == 0) return; + const slice = self.activeSlice(); + bun.copy(u8, self.data.ptr[0..slice.len], slice); + bun.debugAssert(slice.len <= std.math.maxInt(u32)); + self.data.len = @intCast(slice.len); + self.head = 0; + } + + /// Append bytes to the buffer, scanning only new data for newline. + pub fn append(self: *@This(), bytes: []const u8) void { + _ = bun.handleOom(self.data.write(bun.default_allocator, bytes)); + self.scanForNewline(); + } + + /// Returns the next complete message (up to and including newline) if available. + pub fn next(self: *const @This()) ?struct { data: []const u8, newline_pos: u32 } { + const pos = self.newline_pos orelse return null; + return .{ + .data = self.activeSlice()[0 .. pos + 1], + .newline_pos = pos, + }; + } + + /// Consume bytes from the front of the buffer after processing a message. + /// Just advances head offset - no copying until compaction threshold is reached. + pub fn consume(self: *@This(), bytes: u32) void { + self.head +|= bytes; + + // Adjust scanned_pos (subtract consumed bytes, but don't go negative) + self.scanned_pos = if (bytes >= self.scanned_pos) 0 else self.scanned_pos - bytes; + + // Adjust newline_pos + if (self.newline_pos) |pos| { + if (bytes > pos) { + // Consumed past the known newline - clear it and scan for next + self.newline_pos = null; + self.scanForNewline(); + } else { + self.newline_pos = pos - bytes; + } + } + + // Check if we've consumed everything + if (self.head >= self.data.len) { + // Free memory if capacity exceeds threshold, otherwise just reset + if (self.data.cap >= compaction_threshold) { + self.data.deinit(bun.default_allocator); + self.data = .{}; + } else { + self.data.len = 0; + } + self.head = 0; + self.scanned_pos = 0; + self.newline_pos = null; + return; + } + + // Compact if head exceeds threshold to avoid unbounded memory growth + if (self.head >= compaction_threshold) { + self.compact(); + } + } + + pub fn isEmpty(self: *const @This()) bool { + return self.head >= self.data.len; + } + + pub fn unusedCapacitySlice(self: *@This()) []u8 { + return self.data.unusedCapacitySlice(); + } + + pub fn ensureUnusedCapacity(self: *@This(), additional: usize) void { + bun.handleOom(self.data.ensureUnusedCapacity(bun.default_allocator, additional)); + } + + /// Notify the buffer that data was written directly (e.g., via pre-allocated slice). + pub fn notifyWritten(self: *@This(), new_data: []const u8) void { + bun.debugAssert(new_data.len <= std.math.maxInt(u32)); + self.data.len +|= @as(u32, @intCast(new_data.len)); + self.scanForNewline(); + } + + pub fn deinit(self: *@This()) void { + self.data.deinit(bun.default_allocator); + } +}; + +const bun = @import("bun"); +const std = @import("std"); diff --git a/src/bun.js/ipc.zig b/src/bun.js/ipc.zig index fa00ec05a5..2cfe41a4da 100644 --- a/src/bun.js/ipc.zig +++ b/src/bun.js/ipc.zig @@ -1,5 +1,28 @@ pub const log = Output.scoped(.IPC, .visible); +/// Union type that switches between simple ByteList (for advanced mode) +/// and JSONLineBuffer (for JSON mode with optimized newline tracking). +const IncomingBuffer = union(enum) { + /// For advanced mode - uses length-prefix, no scanning needed + advanced: bun.ByteList, + /// For JSON mode - tracks newline positions to avoid O(n²) scanning + json: JSONLineBuffer, + + pub fn init(mode: Mode) IncomingBuffer { + return switch (mode) { + .advanced => .{ .advanced = .{} }, + .json => .{ .json = .{} }, + }; + } + + pub fn deinit(self: *@This()) void { + switch (self.*) { + .advanced => |*b| b.deinit(bun.default_allocator), + .json => |*b| b.deinit(), + } + } +}; + const IsInternal = enum { internal, external }; const SerializeAndSendResult = enum { success, @@ -162,69 +185,73 @@ const json = struct { // 2 is internal // ["[{\d\.] is regular - pub fn decodeIPCMessage(data: []const u8, globalThis: *jsc.JSGlobalObject) IPCDecodeError!DecodeIPCMessageResult { + pub fn decodeIPCMessage(data: []const u8, globalThis: *jsc.JSGlobalObject, known_newline: ?u32) IPCDecodeError!DecodeIPCMessageResult { // { "foo": "bar"} // tag is 1 or 2 - if (bun.strings.indexOfChar(data, '\n')) |idx| { - var json_data = data[0..idx]; - // bounds-check for the following json_data[0] - // TODO: should we return NotEnoughBytes? - if (json_data.len == 0) return error.InvalidFormat; + const idx: u32 = known_newline orelse idx: { + const found = bun.strings.indexOfChar(data, '\n') orelse + return IPCDecodeError.NotEnoughBytes; + // Individual IPC messages should not exceed 4GB, and idx+1 must not overflow + if (found >= std.math.maxInt(u32)) return IPCDecodeError.InvalidFormat; + break :idx @intCast(found); + }; - var kind: enum { regular, internal } = .regular; - if (json_data[0] == 2) { - // internal message - json_data = json_data[1..]; - kind = .internal; - } + var json_data = data[0..idx]; + // An empty payload (newline with no preceding data) is invalid JSON. + if (json_data.len == 0) return error.InvalidFormat; - const is_ascii = bun.strings.isAllASCII(json_data); - var was_ascii_string_freed = false; - - // Use ExternalString to avoid copying data if possible. - // This is only possible for ascii data, as that fits into latin1 - // otherwise we have to convert it utf-8 into utf16-le. - var str = if (is_ascii) ascii: { - - // .dead if `json_data` exceeds max length - const s = bun.String.createExternal(*bool, json_data, true, &was_ascii_string_freed, jsonIPCDataStringFreeCB); - if (s.tag == .Dead) { - @branchHint(.unlikely); - return IPCDecodeError.OutOfMemory; - } - break :ascii s; - } else bun.String.borrowUTF8(json_data); - - defer { - str.deref(); - if (is_ascii and !was_ascii_string_freed) { - @panic("Expected ascii string to be freed by ExternalString, but it wasn't. This is a bug in Bun."); - } - } - - const deserialized = str.toJSByParseJSON(globalThis) catch |e| switch (e) { - error.JSError => { - globalThis.clearException(); - return IPCDecodeError.InvalidFormat; - }, - error.JSTerminated => { - globalThis.clearException(); - return IPCDecodeError.InvalidFormat; - }, - error.OutOfMemory => return bun.outOfMemory(), - }; - - return switch (kind) { - .regular => .{ - .bytes_consumed = idx + 1, - .message = .{ .data = deserialized }, - }, - .internal => .{ - .bytes_consumed = idx + 1, - .message = .{ .internal = deserialized }, - }, - }; + var kind: enum { regular, internal } = .regular; + if (json_data[0] == 2) { + // internal message + json_data = json_data[1..]; + kind = .internal; } - return IPCDecodeError.NotEnoughBytes; + + const is_ascii = bun.strings.isAllASCII(json_data); + var was_ascii_string_freed = false; + + // Use ExternalString to avoid copying data if possible. + // This is only possible for ascii data, as that fits into latin1 + // otherwise we have to convert it utf-8 into utf16-le. + var str = if (is_ascii) ascii: { + + // .dead if `json_data` exceeds max length + const s = bun.String.createExternal(*bool, json_data, true, &was_ascii_string_freed, jsonIPCDataStringFreeCB); + if (s.tag == .Dead) { + @branchHint(.unlikely); + return IPCDecodeError.OutOfMemory; + } + break :ascii s; + } else bun.String.borrowUTF8(json_data); + + defer { + str.deref(); + if (is_ascii and !was_ascii_string_freed) { + @panic("Expected ascii string to be freed by ExternalString, but it wasn't. This is a bug in Bun."); + } + } + + const deserialized = str.toJSByParseJSON(globalThis) catch |e| switch (e) { + error.JSError => { + globalThis.clearException(); + return IPCDecodeError.InvalidFormat; + }, + error.JSTerminated => { + globalThis.clearException(); + return IPCDecodeError.InvalidFormat; + }, + error.OutOfMemory => return bun.outOfMemory(), + }; + + return switch (kind) { + .regular => .{ + .bytes_consumed = @intCast(idx + 1), + .message = .{ .data = deserialized }, + }, + .internal => .{ + .bytes_consumed = @intCast(idx + 1), + .message = .{ .internal = deserialized }, + }, + }; } pub fn serialize(writer: *bun.io.StreamBuffer, global: *jsc.JSGlobalObject, value: JSValue, is_internal: IsInternal) !usize { @@ -258,9 +285,11 @@ const json = struct { }; /// Given potentially unfinished buffer `data`, attempt to decode and process a message from it. -pub fn decodeIPCMessage(mode: Mode, data: []const u8, global: *jsc.JSGlobalObject) IPCDecodeError!DecodeIPCMessageResult { +/// For JSON mode, `known_newline` can be provided to avoid re-scanning for the newline delimiter. +pub fn decodeIPCMessage(mode: Mode, data: []const u8, global: *jsc.JSGlobalObject, known_newline: ?u32) IPCDecodeError!DecodeIPCMessageResult { return switch (mode) { - inline else => |t| @field(@This(), @tagName(t)).decodeIPCMessage(data, global), + .advanced => advanced.decodeIPCMessage(data, global), + .json => json.decodeIPCMessage(data, global, known_newline), }; } @@ -420,7 +449,7 @@ pub const SendQueue = struct { has_written_version: if (Environment.allow_assert) u1 else u0 = 0, mode: Mode, internal_msg_queue: node_cluster_binding.InternalMsgHolder = .{}, - incoming: bun.ByteList = .{}, // Maybe we should use StreamBuffer here as well + incoming: IncomingBuffer, incoming_fd: ?bun.FileDescriptor = null, socket: SocketUnion, @@ -455,7 +484,13 @@ pub const SendQueue = struct { pub fn init(mode: Mode, owner: SendQueueOwner, socket: SocketUnion) @This() { log("SendQueue#init", .{}); - return .{ .queue = .init(bun.default_allocator), .mode = mode, .owner = owner, .socket = socket }; + return .{ + .queue = .init(bun.default_allocator), + .mode = mode, + .owner = owner, + .socket = socket, + .incoming = IncomingBuffer.init(mode), + }; } pub fn deinit(self: *@This()) void { log("SendQueue#deinit", .{}); @@ -465,7 +500,7 @@ pub const SendQueue = struct { for (self.queue.items) |*item| item.deinit(); self.queue.deinit(); self.internal_msg_queue.deinit(); - self.incoming.deinit(bun.default_allocator); + self.incoming.deinit(); if (self.waiting_for_ack) |*waiting| waiting.deinit(); // if there is a close next tick task, cancel it so it doesn't get called and then UAF @@ -1136,67 +1171,99 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void { // Decode the message with just the temporary buffer, and if that // fails (not enough bytes) then we allocate to .ipc_buffer - if (send_queue.incoming.len == 0) { - while (true) { - const result = decodeIPCMessage(send_queue.mode, data, globalThis) catch |e| switch (e) { - error.NotEnoughBytes => { - _ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data)); - log("hit NotEnoughBytes", .{}); - return; - }, - error.InvalidFormat, error.JSError, error.JSTerminated => { - send_queue.closeSocket(.failure, .user); - return; - }, - error.OutOfMemory => { - Output.printErrorln("IPC message is too long.", .{}); - send_queue.closeSocket(.failure, .user); - return; - }, - }; + switch (send_queue.incoming) { + .json => |*json_buf| { + // JSON mode: append to buffer (scans only new data for newline), + // then process complete messages using next(). + json_buf.append(data); - handleIPCMessage(send_queue, result.message, globalThis); + while (json_buf.next()) |msg| { + const result = decodeIPCMessage(.json, msg.data, globalThis, msg.newline_pos) catch |e| switch (e) { + error.NotEnoughBytes => { + log("hit NotEnoughBytes", .{}); + return; + }, + error.InvalidFormat, error.JSError, error.JSTerminated => { + send_queue.closeSocket(.failure, .user); + return; + }, + error.OutOfMemory => { + Output.printErrorln("IPC message is too long.", .{}); + send_queue.closeSocket(.failure, .user); + return; + }, + }; - if (result.bytes_consumed < data.len) { - data = data[result.bytes_consumed..]; - } else { - return; + handleIPCMessage(send_queue, result.message, globalThis); + json_buf.consume(result.bytes_consumed); } - } - } + }, + .advanced => |*adv_buf| { + // Advanced mode: uses length-prefix, no newline scanning needed. + // Try to decode directly first, only buffer if needed. + if (adv_buf.len == 0) { + while (true) { + const result = decodeIPCMessage(.advanced, data, globalThis, null) catch |e| switch (e) { + error.NotEnoughBytes => { + _ = bun.handleOom(adv_buf.write(bun.default_allocator, data)); + log("hit NotEnoughBytes", .{}); + return; + }, + error.InvalidFormat, error.JSError, error.JSTerminated => { + send_queue.closeSocket(.failure, .user); + return; + }, + error.OutOfMemory => { + Output.printErrorln("IPC message is too long.", .{}); + send_queue.closeSocket(.failure, .user); + return; + }, + }; - _ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data)); + handleIPCMessage(send_queue, result.message, globalThis); - var slice = send_queue.incoming.slice(); - while (true) { - const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) { - error.NotEnoughBytes => { - // copy the remaining bytes to the start of the buffer - bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice); - send_queue.incoming.len = @truncate(slice.len); - log("hit NotEnoughBytes2", .{}); - return; - }, - error.InvalidFormat, error.JSError, error.JSTerminated => { - send_queue.closeSocket(.failure, .user); - return; - }, - error.OutOfMemory => { - Output.printErrorln("IPC message is too long.", .{}); - send_queue.closeSocket(.failure, .user); - return; - }, - }; + if (result.bytes_consumed < data.len) { + data = data[result.bytes_consumed..]; + } else { + return; + } + } + } - handleIPCMessage(send_queue, result.message, globalThis); + // Buffer has existing data, append and process + _ = bun.handleOom(adv_buf.write(bun.default_allocator, data)); + var slice = adv_buf.slice(); + while (true) { + const result = decodeIPCMessage(.advanced, slice, globalThis, null) catch |e| switch (e) { + error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, adv_buf.ptr[0..slice.len], slice); + bun.debugAssert(slice.len <= std.math.maxInt(u32)); + adv_buf.len = @intCast(slice.len); + log("hit NotEnoughBytes2", .{}); + return; + }, + error.InvalidFormat, error.JSError, error.JSTerminated => { + send_queue.closeSocket(.failure, .user); + return; + }, + error.OutOfMemory => { + Output.printErrorln("IPC message is too long.", .{}); + send_queue.closeSocket(.failure, .user); + return; + }, + }; - if (result.bytes_consumed < slice.len) { - slice = slice[result.bytes_consumed..]; - } else { - // clear the buffer - send_queue.incoming.len = 0; - return; - } + handleIPCMessage(send_queue, result.message, globalThis); + + if (result.bytes_consumed < slice.len) { + slice = slice[result.bytes_consumed..]; + } else { + adv_buf.len = 0; + return; + } + } + }, } } @@ -1303,13 +1370,26 @@ pub const IPCHandlers = struct { pub const WindowsNamedPipe = struct { fn onReadAlloc(send_queue: *SendQueue, suggested_size: usize) []u8 { - var available = send_queue.incoming.unusedCapacitySlice(); - if (available.len < suggested_size) { - bun.handleOom(send_queue.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size)); - available = send_queue.incoming.unusedCapacitySlice(); + switch (send_queue.incoming) { + .json => |*json_buf| { + var available = json_buf.unusedCapacitySlice(); + if (available.len < suggested_size) { + json_buf.ensureUnusedCapacity(suggested_size); + available = json_buf.unusedCapacitySlice(); + } + log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size}); + return available.ptr[0..suggested_size]; + }, + .advanced => |*adv_buf| { + var available = adv_buf.unusedCapacitySlice(); + if (available.len < suggested_size) { + bun.handleOom(adv_buf.ensureUnusedCapacity(bun.default_allocator, suggested_size)); + available = adv_buf.unusedCapacitySlice(); + } + log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size}); + return available.ptr[0..suggested_size]; + }, } - log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size}); - return available.ptr[0..suggested_size]; } fn onReadError(send_queue: *SendQueue, err: bun.sys.E) void { @@ -1323,41 +1403,77 @@ pub const IPCHandlers = struct { const loop = globalThis.bunVM().eventLoop(); loop.enter(); defer loop.exit(); - send_queue.incoming.len += @as(u32, @truncate(buffer.len)); - var slice = send_queue.incoming.slice(); - bun.assert(send_queue.incoming.len <= send_queue.incoming.cap); - bun.assert(bun.isSliceInBuffer(buffer, send_queue.incoming.allocatedSlice())); + switch (send_queue.incoming) { + .json => |*json_buf| { + // For JSON mode on Windows, use notifyWritten to update length and scan for newlines + bun.assert(json_buf.data.len + buffer.len <= json_buf.data.cap); + bun.assert(bun.isSliceInBuffer(buffer, json_buf.data.allocatedSlice())); - while (true) { - const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) { - error.NotEnoughBytes => { - // copy the remaining bytes to the start of the buffer - bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice); - send_queue.incoming.len = @truncate(slice.len); - log("hit NotEnoughBytes3", .{}); - return; - }, - error.InvalidFormat, error.JSError, error.JSTerminated => { - send_queue.closeSocket(.failure, .user); - return; - }, - error.OutOfMemory => { - Output.printErrorln("IPC message is too long.", .{}); - send_queue.closeSocket(.failure, .user); - return; - }, - }; + json_buf.notifyWritten(buffer); - handleIPCMessage(send_queue, result.message, globalThis); + // Process complete messages using next() - avoids O(n²) re-scanning + while (json_buf.next()) |msg| { + const result = decodeIPCMessage(.json, msg.data, globalThis, msg.newline_pos) catch |e| switch (e) { + error.NotEnoughBytes => { + log("hit NotEnoughBytes3", .{}); + return; + }, + error.InvalidFormat, error.JSError, error.JSTerminated => { + send_queue.closeSocket(.failure, .user); + return; + }, + error.OutOfMemory => { + Output.printErrorln("IPC message is too long.", .{}); + send_queue.closeSocket(.failure, .user); + return; + }, + }; - if (result.bytes_consumed < slice.len) { - slice = slice[result.bytes_consumed..]; - } else { - // clear the buffer - send_queue.incoming.len = 0; - return; - } + handleIPCMessage(send_queue, result.message, globalThis); + json_buf.consume(result.bytes_consumed); + } + }, + .advanced => |*adv_buf| { + adv_buf.len +|= @as(u32, @intCast(buffer.len)); + var slice = adv_buf.slice(); + + bun.assert(adv_buf.len <= adv_buf.cap); + bun.assert(bun.isSliceInBuffer(buffer, adv_buf.allocatedSlice())); + + while (true) { + const result = decodeIPCMessage(.advanced, slice, globalThis, null) catch |e| switch (e) { + error.NotEnoughBytes => { + // copy the remaining bytes to the start of the buffer + bun.copy(u8, adv_buf.ptr[0..slice.len], slice); + // slice.len is guaranteed <= adv_buf.len (u32) since it's derived from adv_buf.slice() + bun.debugAssert(slice.len <= std.math.maxInt(u32)); + adv_buf.len = @intCast(slice.len); + log("hit NotEnoughBytes3", .{}); + return; + }, + error.InvalidFormat, error.JSError, error.JSTerminated => { + send_queue.closeSocket(.failure, .user); + return; + }, + error.OutOfMemory => { + Output.printErrorln("IPC message is too long.", .{}); + send_queue.closeSocket(.failure, .user); + return; + }, + }; + + handleIPCMessage(send_queue, result.message, globalThis); + + if (result.bytes_consumed < slice.len) { + slice = slice[result.bytes_consumed..]; + } else { + // clear the buffer + adv_buf.len = 0; + return; + } + } + }, } } @@ -1380,6 +1496,7 @@ const string = []const u8; const node_cluster_binding = @import("./node/node_cluster_binding.zig"); const std = @import("std"); +const JSONLineBuffer = @import("./JSONLineBuffer.zig").JSONLineBuffer; const bun = @import("bun"); const Environment = bun.Environment;