mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
perf(ipc): fix O(n²) JSON scanning for large chunked messages (#25743)
## Summary - Fix O(n²) performance bug in JSON mode IPC when receiving large messages that arrive in chunks - Add `JsonIncomingBuffer` wrapper that tracks newline positions to avoid re-scanning - Each byte is now scanned exactly once (on arrival or when preceding message is consumed) ## Problem When data arrives in chunks in JSON mode, `decodeIPCMessage` was calling `indexOfChar(data, '\n')` on the ENTIRE accumulated buffer every time. For a 10MB message arriving in 160 chunks of 64KB: - Chunk 1: scan 64KB - Chunk 2: scan 128KB - Chunk 3: scan 192KB - ... - Chunk 160: scan 10MB Total: ~800MB scanned for one 10MB message. ## Solution Introduced a `JsonIncomingBuffer` struct that: 1. Tracks `newline_pos: ?u32` - position of known upcoming newline (if any) 2. On `append(bytes)`: Only scans new chunk for `\n` if no position is cached 3. On `consume(bytes)`: Updates or re-scans as needed after message processing This ensures O(n) scanning instead of O(n²). ## Test plan - [x] `bun run zig:check-all` passes (all platforms compile) - [x] `bun bd test test/js/bun/spawn/spawn.ipc.test.ts` - 4 tests pass - [x] `bun bd test test/js/node/child_process/child_process_ipc.test.js` - 1 test pass - [x] `bun bd test test/js/bun/spawn/bun-ipc-inherit.test.ts` - 1 test pass - [x] `bun bd test test/js/bun/spawn/spawn.ipc.bun-node.test.ts` - 1 test pass - [x] `bun bd test test/js/bun/spawn/spawn.ipc.node-bun.test.ts` - 1 test pass - [x] `bun bd test test/js/node/child_process/child_process_ipc_large_disconnect.test.js` - 1 test pass - [x] Manual verification with `child-process-send-cb-more.js` (32KB messages) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Bot <claude-bot@bun.sh> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
4
bench/snippets/ipc-json-child.mjs
Normal file
4
bench/snippets/ipc-json-child.mjs
Normal file
@@ -0,0 +1,4 @@
|
||||
// Child process for IPC benchmarks - echoes messages back to parent
|
||||
process.on("message", message => {
|
||||
process.send(message);
|
||||
});
|
||||
45
bench/snippets/ipc-json.mjs
Normal file
45
bench/snippets/ipc-json.mjs
Normal file
@@ -0,0 +1,45 @@
|
||||
import { fork } from "node:child_process";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { bench, run } from "../runner.mjs";
|
||||
|
||||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
const childPath = path.join(__dirname, "ipc-json-child.mjs");
|
||||
|
||||
const smallMessage = { type: "ping", id: 1 };
|
||||
const largeString = Buffer.alloc(10 * 1024 * 1024, "A").toString();
|
||||
const largeMessage = { type: "ping", id: 1, data: largeString };
|
||||
|
||||
async function runBenchmark(message, count) {
|
||||
let received = 0;
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
|
||||
const child = fork(childPath, [], {
|
||||
stdio: ["ignore", "ignore", "ignore", "ipc"],
|
||||
serialization: "json",
|
||||
});
|
||||
|
||||
child.on("message", () => {
|
||||
received++;
|
||||
if (received >= count) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
for (let i = 0; i < count; i++) {
|
||||
child.send(message);
|
||||
}
|
||||
|
||||
await promise;
|
||||
child.kill();
|
||||
}
|
||||
|
||||
bench("ipc json - small messages (1000 roundtrips)", async () => {
|
||||
await runBenchmark(smallMessage, 1000);
|
||||
});
|
||||
|
||||
bench("ipc json - 10MB messages (10 roundtrips)", async () => {
|
||||
await runBenchmark(largeMessage, 10);
|
||||
});
|
||||
|
||||
await run();
|
||||
135
src/bun.js/JSONLineBuffer.zig
Normal file
135
src/bun.js/JSONLineBuffer.zig
Normal file
@@ -0,0 +1,135 @@
|
||||
/// Buffer for newline-delimited data that tracks scan positions to avoid O(n²) scanning.
|
||||
/// Each byte is scanned exactly once. We track:
|
||||
/// - newline_pos: position of first known newline (if any)
|
||||
/// - scanned_pos: how far we've scanned (bytes before this have been checked)
|
||||
/// - head: offset into the buffer where unconsumed data starts (avoids copying on each consume)
|
||||
///
|
||||
/// When data arrives, we only scan the NEW bytes.
|
||||
/// When we consume a message, we just advance `head` instead of copying.
|
||||
/// Compaction only happens when head exceeds a threshold.
|
||||
pub const JSONLineBuffer = struct {
|
||||
data: bun.ByteList = .{},
|
||||
/// Offset into data where unconsumed content starts.
|
||||
head: u32 = 0,
|
||||
/// Position of a known upcoming newline relative to head, if any.
|
||||
newline_pos: ?u32 = null,
|
||||
/// How far we've scanned for newlines relative to head.
|
||||
scanned_pos: u32 = 0,
|
||||
|
||||
/// Compact the buffer when head exceeds this threshold.
|
||||
const compaction_threshold = 16 * 1024 * 1024; // 16 MB
|
||||
|
||||
/// Get the active (unconsumed) portion of the buffer.
|
||||
fn activeSlice(self: *const @This()) []const u8 {
|
||||
return self.data.slice()[self.head..];
|
||||
}
|
||||
|
||||
/// Scan for newline in unscanned portion of the buffer.
|
||||
fn scanForNewline(self: *@This()) void {
|
||||
if (self.newline_pos != null) return;
|
||||
const slice = self.activeSlice();
|
||||
if (self.scanned_pos >= slice.len) return;
|
||||
|
||||
const unscanned = slice[self.scanned_pos..];
|
||||
if (bun.strings.indexOfChar(unscanned, '\n')) |local_idx| {
|
||||
bun.debugAssert(local_idx <= std.math.maxInt(u32));
|
||||
const pos = self.scanned_pos +| @as(u32, @intCast(local_idx));
|
||||
self.newline_pos = pos;
|
||||
self.scanned_pos = pos +| 1; // Only scanned up to (and including) the newline
|
||||
} else {
|
||||
bun.debugAssert(slice.len <= std.math.maxInt(u32));
|
||||
self.scanned_pos = @intCast(slice.len); // No newline, scanned everything
|
||||
}
|
||||
}
|
||||
|
||||
/// Compact the buffer by moving data to the front. Called when head exceeds threshold.
|
||||
fn compact(self: *@This()) void {
|
||||
if (self.head == 0) return;
|
||||
const slice = self.activeSlice();
|
||||
bun.copy(u8, self.data.ptr[0..slice.len], slice);
|
||||
bun.debugAssert(slice.len <= std.math.maxInt(u32));
|
||||
self.data.len = @intCast(slice.len);
|
||||
self.head = 0;
|
||||
}
|
||||
|
||||
/// Append bytes to the buffer, scanning only new data for newline.
|
||||
pub fn append(self: *@This(), bytes: []const u8) void {
|
||||
_ = bun.handleOom(self.data.write(bun.default_allocator, bytes));
|
||||
self.scanForNewline();
|
||||
}
|
||||
|
||||
/// Returns the next complete message (up to and including newline) if available.
|
||||
pub fn next(self: *const @This()) ?struct { data: []const u8, newline_pos: u32 } {
|
||||
const pos = self.newline_pos orelse return null;
|
||||
return .{
|
||||
.data = self.activeSlice()[0 .. pos + 1],
|
||||
.newline_pos = pos,
|
||||
};
|
||||
}
|
||||
|
||||
/// Consume bytes from the front of the buffer after processing a message.
|
||||
/// Just advances head offset - no copying until compaction threshold is reached.
|
||||
pub fn consume(self: *@This(), bytes: u32) void {
|
||||
self.head +|= bytes;
|
||||
|
||||
// Adjust scanned_pos (subtract consumed bytes, but don't go negative)
|
||||
self.scanned_pos = if (bytes >= self.scanned_pos) 0 else self.scanned_pos - bytes;
|
||||
|
||||
// Adjust newline_pos
|
||||
if (self.newline_pos) |pos| {
|
||||
if (bytes > pos) {
|
||||
// Consumed past the known newline - clear it and scan for next
|
||||
self.newline_pos = null;
|
||||
self.scanForNewline();
|
||||
} else {
|
||||
self.newline_pos = pos - bytes;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we've consumed everything
|
||||
if (self.head >= self.data.len) {
|
||||
// Free memory if capacity exceeds threshold, otherwise just reset
|
||||
if (self.data.cap >= compaction_threshold) {
|
||||
self.data.deinit(bun.default_allocator);
|
||||
self.data = .{};
|
||||
} else {
|
||||
self.data.len = 0;
|
||||
}
|
||||
self.head = 0;
|
||||
self.scanned_pos = 0;
|
||||
self.newline_pos = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// Compact if head exceeds threshold to avoid unbounded memory growth
|
||||
if (self.head >= compaction_threshold) {
|
||||
self.compact();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn isEmpty(self: *const @This()) bool {
|
||||
return self.head >= self.data.len;
|
||||
}
|
||||
|
||||
pub fn unusedCapacitySlice(self: *@This()) []u8 {
|
||||
return self.data.unusedCapacitySlice();
|
||||
}
|
||||
|
||||
pub fn ensureUnusedCapacity(self: *@This(), additional: usize) void {
|
||||
bun.handleOom(self.data.ensureUnusedCapacity(bun.default_allocator, additional));
|
||||
}
|
||||
|
||||
/// Notify the buffer that data was written directly (e.g., via pre-allocated slice).
|
||||
pub fn notifyWritten(self: *@This(), new_data: []const u8) void {
|
||||
bun.debugAssert(new_data.len <= std.math.maxInt(u32));
|
||||
self.data.len +|= @as(u32, @intCast(new_data.len));
|
||||
self.scanForNewline();
|
||||
}
|
||||
|
||||
pub fn deinit(self: *@This()) void {
|
||||
self.data.deinit(bun.default_allocator);
|
||||
}
|
||||
};
|
||||
|
||||
const bun = @import("bun");
|
||||
const std = @import("std");
|
||||
@@ -1,5 +1,28 @@
|
||||
pub const log = Output.scoped(.IPC, .visible);
|
||||
|
||||
/// Union type that switches between simple ByteList (for advanced mode)
|
||||
/// and JSONLineBuffer (for JSON mode with optimized newline tracking).
|
||||
const IncomingBuffer = union(enum) {
|
||||
/// For advanced mode - uses length-prefix, no scanning needed
|
||||
advanced: bun.ByteList,
|
||||
/// For JSON mode - tracks newline positions to avoid O(n²) scanning
|
||||
json: JSONLineBuffer,
|
||||
|
||||
pub fn init(mode: Mode) IncomingBuffer {
|
||||
return switch (mode) {
|
||||
.advanced => .{ .advanced = .{} },
|
||||
.json => .{ .json = .{} },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn deinit(self: *@This()) void {
|
||||
switch (self.*) {
|
||||
.advanced => |*b| b.deinit(bun.default_allocator),
|
||||
.json => |*b| b.deinit(),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const IsInternal = enum { internal, external };
|
||||
const SerializeAndSendResult = enum {
|
||||
success,
|
||||
@@ -162,69 +185,73 @@ const json = struct {
|
||||
// 2 is internal
|
||||
// ["[{\d\.] is regular
|
||||
|
||||
pub fn decodeIPCMessage(data: []const u8, globalThis: *jsc.JSGlobalObject) IPCDecodeError!DecodeIPCMessageResult {
|
||||
pub fn decodeIPCMessage(data: []const u8, globalThis: *jsc.JSGlobalObject, known_newline: ?u32) IPCDecodeError!DecodeIPCMessageResult {
|
||||
// <tag>{ "foo": "bar"} // tag is 1 or 2
|
||||
if (bun.strings.indexOfChar(data, '\n')) |idx| {
|
||||
var json_data = data[0..idx];
|
||||
// bounds-check for the following json_data[0]
|
||||
// TODO: should we return NotEnoughBytes?
|
||||
if (json_data.len == 0) return error.InvalidFormat;
|
||||
const idx: u32 = known_newline orelse idx: {
|
||||
const found = bun.strings.indexOfChar(data, '\n') orelse
|
||||
return IPCDecodeError.NotEnoughBytes;
|
||||
// Individual IPC messages should not exceed 4GB, and idx+1 must not overflow
|
||||
if (found >= std.math.maxInt(u32)) return IPCDecodeError.InvalidFormat;
|
||||
break :idx @intCast(found);
|
||||
};
|
||||
|
||||
var kind: enum { regular, internal } = .regular;
|
||||
if (json_data[0] == 2) {
|
||||
// internal message
|
||||
json_data = json_data[1..];
|
||||
kind = .internal;
|
||||
}
|
||||
var json_data = data[0..idx];
|
||||
// An empty payload (newline with no preceding data) is invalid JSON.
|
||||
if (json_data.len == 0) return error.InvalidFormat;
|
||||
|
||||
const is_ascii = bun.strings.isAllASCII(json_data);
|
||||
var was_ascii_string_freed = false;
|
||||
|
||||
// Use ExternalString to avoid copying data if possible.
|
||||
// This is only possible for ascii data, as that fits into latin1
|
||||
// otherwise we have to convert it utf-8 into utf16-le.
|
||||
var str = if (is_ascii) ascii: {
|
||||
|
||||
// .dead if `json_data` exceeds max length
|
||||
const s = bun.String.createExternal(*bool, json_data, true, &was_ascii_string_freed, jsonIPCDataStringFreeCB);
|
||||
if (s.tag == .Dead) {
|
||||
@branchHint(.unlikely);
|
||||
return IPCDecodeError.OutOfMemory;
|
||||
}
|
||||
break :ascii s;
|
||||
} else bun.String.borrowUTF8(json_data);
|
||||
|
||||
defer {
|
||||
str.deref();
|
||||
if (is_ascii and !was_ascii_string_freed) {
|
||||
@panic("Expected ascii string to be freed by ExternalString, but it wasn't. This is a bug in Bun.");
|
||||
}
|
||||
}
|
||||
|
||||
const deserialized = str.toJSByParseJSON(globalThis) catch |e| switch (e) {
|
||||
error.JSError => {
|
||||
globalThis.clearException();
|
||||
return IPCDecodeError.InvalidFormat;
|
||||
},
|
||||
error.JSTerminated => {
|
||||
globalThis.clearException();
|
||||
return IPCDecodeError.InvalidFormat;
|
||||
},
|
||||
error.OutOfMemory => return bun.outOfMemory(),
|
||||
};
|
||||
|
||||
return switch (kind) {
|
||||
.regular => .{
|
||||
.bytes_consumed = idx + 1,
|
||||
.message = .{ .data = deserialized },
|
||||
},
|
||||
.internal => .{
|
||||
.bytes_consumed = idx + 1,
|
||||
.message = .{ .internal = deserialized },
|
||||
},
|
||||
};
|
||||
var kind: enum { regular, internal } = .regular;
|
||||
if (json_data[0] == 2) {
|
||||
// internal message
|
||||
json_data = json_data[1..];
|
||||
kind = .internal;
|
||||
}
|
||||
return IPCDecodeError.NotEnoughBytes;
|
||||
|
||||
const is_ascii = bun.strings.isAllASCII(json_data);
|
||||
var was_ascii_string_freed = false;
|
||||
|
||||
// Use ExternalString to avoid copying data if possible.
|
||||
// This is only possible for ascii data, as that fits into latin1
|
||||
// otherwise we have to convert it utf-8 into utf16-le.
|
||||
var str = if (is_ascii) ascii: {
|
||||
|
||||
// .dead if `json_data` exceeds max length
|
||||
const s = bun.String.createExternal(*bool, json_data, true, &was_ascii_string_freed, jsonIPCDataStringFreeCB);
|
||||
if (s.tag == .Dead) {
|
||||
@branchHint(.unlikely);
|
||||
return IPCDecodeError.OutOfMemory;
|
||||
}
|
||||
break :ascii s;
|
||||
} else bun.String.borrowUTF8(json_data);
|
||||
|
||||
defer {
|
||||
str.deref();
|
||||
if (is_ascii and !was_ascii_string_freed) {
|
||||
@panic("Expected ascii string to be freed by ExternalString, but it wasn't. This is a bug in Bun.");
|
||||
}
|
||||
}
|
||||
|
||||
const deserialized = str.toJSByParseJSON(globalThis) catch |e| switch (e) {
|
||||
error.JSError => {
|
||||
globalThis.clearException();
|
||||
return IPCDecodeError.InvalidFormat;
|
||||
},
|
||||
error.JSTerminated => {
|
||||
globalThis.clearException();
|
||||
return IPCDecodeError.InvalidFormat;
|
||||
},
|
||||
error.OutOfMemory => return bun.outOfMemory(),
|
||||
};
|
||||
|
||||
return switch (kind) {
|
||||
.regular => .{
|
||||
.bytes_consumed = @intCast(idx + 1),
|
||||
.message = .{ .data = deserialized },
|
||||
},
|
||||
.internal => .{
|
||||
.bytes_consumed = @intCast(idx + 1),
|
||||
.message = .{ .internal = deserialized },
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
pub fn serialize(writer: *bun.io.StreamBuffer, global: *jsc.JSGlobalObject, value: JSValue, is_internal: IsInternal) !usize {
|
||||
@@ -258,9 +285,11 @@ const json = struct {
|
||||
};
|
||||
|
||||
/// Given potentially unfinished buffer `data`, attempt to decode and process a message from it.
|
||||
pub fn decodeIPCMessage(mode: Mode, data: []const u8, global: *jsc.JSGlobalObject) IPCDecodeError!DecodeIPCMessageResult {
|
||||
/// For JSON mode, `known_newline` can be provided to avoid re-scanning for the newline delimiter.
|
||||
pub fn decodeIPCMessage(mode: Mode, data: []const u8, global: *jsc.JSGlobalObject, known_newline: ?u32) IPCDecodeError!DecodeIPCMessageResult {
|
||||
return switch (mode) {
|
||||
inline else => |t| @field(@This(), @tagName(t)).decodeIPCMessage(data, global),
|
||||
.advanced => advanced.decodeIPCMessage(data, global),
|
||||
.json => json.decodeIPCMessage(data, global, known_newline),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -420,7 +449,7 @@ pub const SendQueue = struct {
|
||||
has_written_version: if (Environment.allow_assert) u1 else u0 = 0,
|
||||
mode: Mode,
|
||||
internal_msg_queue: node_cluster_binding.InternalMsgHolder = .{},
|
||||
incoming: bun.ByteList = .{}, // Maybe we should use StreamBuffer here as well
|
||||
incoming: IncomingBuffer,
|
||||
incoming_fd: ?bun.FileDescriptor = null,
|
||||
|
||||
socket: SocketUnion,
|
||||
@@ -455,7 +484,13 @@ pub const SendQueue = struct {
|
||||
|
||||
pub fn init(mode: Mode, owner: SendQueueOwner, socket: SocketUnion) @This() {
|
||||
log("SendQueue#init", .{});
|
||||
return .{ .queue = .init(bun.default_allocator), .mode = mode, .owner = owner, .socket = socket };
|
||||
return .{
|
||||
.queue = .init(bun.default_allocator),
|
||||
.mode = mode,
|
||||
.owner = owner,
|
||||
.socket = socket,
|
||||
.incoming = IncomingBuffer.init(mode),
|
||||
};
|
||||
}
|
||||
pub fn deinit(self: *@This()) void {
|
||||
log("SendQueue#deinit", .{});
|
||||
@@ -465,7 +500,7 @@ pub const SendQueue = struct {
|
||||
for (self.queue.items) |*item| item.deinit();
|
||||
self.queue.deinit();
|
||||
self.internal_msg_queue.deinit();
|
||||
self.incoming.deinit(bun.default_allocator);
|
||||
self.incoming.deinit();
|
||||
if (self.waiting_for_ack) |*waiting| waiting.deinit();
|
||||
|
||||
// if there is a close next tick task, cancel it so it doesn't get called and then UAF
|
||||
@@ -1136,67 +1171,99 @@ fn onData2(send_queue: *SendQueue, all_data: []const u8) void {
|
||||
|
||||
// Decode the message with just the temporary buffer, and if that
|
||||
// fails (not enough bytes) then we allocate to .ipc_buffer
|
||||
if (send_queue.incoming.len == 0) {
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(send_queue.mode, data, globalThis) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
_ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data));
|
||||
log("hit NotEnoughBytes", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
switch (send_queue.incoming) {
|
||||
.json => |*json_buf| {
|
||||
// JSON mode: append to buffer (scans only new data for newline),
|
||||
// then process complete messages using next().
|
||||
json_buf.append(data);
|
||||
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
while (json_buf.next()) |msg| {
|
||||
const result = decodeIPCMessage(.json, msg.data, globalThis, msg.newline_pos) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
log("hit NotEnoughBytes", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
if (result.bytes_consumed < data.len) {
|
||||
data = data[result.bytes_consumed..];
|
||||
} else {
|
||||
return;
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
json_buf.consume(result.bytes_consumed);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
.advanced => |*adv_buf| {
|
||||
// Advanced mode: uses length-prefix, no newline scanning needed.
|
||||
// Try to decode directly first, only buffer if needed.
|
||||
if (adv_buf.len == 0) {
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(.advanced, data, globalThis, null) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
_ = bun.handleOom(adv_buf.write(bun.default_allocator, data));
|
||||
log("hit NotEnoughBytes", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
_ = bun.handleOom(send_queue.incoming.write(bun.default_allocator, data));
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
|
||||
var slice = send_queue.incoming.slice();
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
// copy the remaining bytes to the start of the buffer
|
||||
bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice);
|
||||
send_queue.incoming.len = @truncate(slice.len);
|
||||
log("hit NotEnoughBytes2", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
if (result.bytes_consumed < data.len) {
|
||||
data = data[result.bytes_consumed..];
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
// Buffer has existing data, append and process
|
||||
_ = bun.handleOom(adv_buf.write(bun.default_allocator, data));
|
||||
var slice = adv_buf.slice();
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(.advanced, slice, globalThis, null) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
// copy the remaining bytes to the start of the buffer
|
||||
bun.copy(u8, adv_buf.ptr[0..slice.len], slice);
|
||||
bun.debugAssert(slice.len <= std.math.maxInt(u32));
|
||||
adv_buf.len = @intCast(slice.len);
|
||||
log("hit NotEnoughBytes2", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
if (result.bytes_consumed < slice.len) {
|
||||
slice = slice[result.bytes_consumed..];
|
||||
} else {
|
||||
// clear the buffer
|
||||
send_queue.incoming.len = 0;
|
||||
return;
|
||||
}
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
|
||||
if (result.bytes_consumed < slice.len) {
|
||||
slice = slice[result.bytes_consumed..];
|
||||
} else {
|
||||
adv_buf.len = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1303,13 +1370,26 @@ pub const IPCHandlers = struct {
|
||||
|
||||
pub const WindowsNamedPipe = struct {
|
||||
fn onReadAlloc(send_queue: *SendQueue, suggested_size: usize) []u8 {
|
||||
var available = send_queue.incoming.unusedCapacitySlice();
|
||||
if (available.len < suggested_size) {
|
||||
bun.handleOom(send_queue.incoming.ensureUnusedCapacity(bun.default_allocator, suggested_size));
|
||||
available = send_queue.incoming.unusedCapacitySlice();
|
||||
switch (send_queue.incoming) {
|
||||
.json => |*json_buf| {
|
||||
var available = json_buf.unusedCapacitySlice();
|
||||
if (available.len < suggested_size) {
|
||||
json_buf.ensureUnusedCapacity(suggested_size);
|
||||
available = json_buf.unusedCapacitySlice();
|
||||
}
|
||||
log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size});
|
||||
return available.ptr[0..suggested_size];
|
||||
},
|
||||
.advanced => |*adv_buf| {
|
||||
var available = adv_buf.unusedCapacitySlice();
|
||||
if (available.len < suggested_size) {
|
||||
bun.handleOom(adv_buf.ensureUnusedCapacity(bun.default_allocator, suggested_size));
|
||||
available = adv_buf.unusedCapacitySlice();
|
||||
}
|
||||
log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size});
|
||||
return available.ptr[0..suggested_size];
|
||||
},
|
||||
}
|
||||
log("NewNamedPipeIPCHandler#onReadAlloc {d}", .{suggested_size});
|
||||
return available.ptr[0..suggested_size];
|
||||
}
|
||||
|
||||
fn onReadError(send_queue: *SendQueue, err: bun.sys.E) void {
|
||||
@@ -1323,41 +1403,77 @@ pub const IPCHandlers = struct {
|
||||
const loop = globalThis.bunVM().eventLoop();
|
||||
loop.enter();
|
||||
defer loop.exit();
|
||||
send_queue.incoming.len += @as(u32, @truncate(buffer.len));
|
||||
var slice = send_queue.incoming.slice();
|
||||
|
||||
bun.assert(send_queue.incoming.len <= send_queue.incoming.cap);
|
||||
bun.assert(bun.isSliceInBuffer(buffer, send_queue.incoming.allocatedSlice()));
|
||||
switch (send_queue.incoming) {
|
||||
.json => |*json_buf| {
|
||||
// For JSON mode on Windows, use notifyWritten to update length and scan for newlines
|
||||
bun.assert(json_buf.data.len + buffer.len <= json_buf.data.cap);
|
||||
bun.assert(bun.isSliceInBuffer(buffer, json_buf.data.allocatedSlice()));
|
||||
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(send_queue.mode, slice, globalThis) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
// copy the remaining bytes to the start of the buffer
|
||||
bun.copy(u8, send_queue.incoming.ptr[0..slice.len], slice);
|
||||
send_queue.incoming.len = @truncate(slice.len);
|
||||
log("hit NotEnoughBytes3", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
json_buf.notifyWritten(buffer);
|
||||
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
// Process complete messages using next() - avoids O(n²) re-scanning
|
||||
while (json_buf.next()) |msg| {
|
||||
const result = decodeIPCMessage(.json, msg.data, globalThis, msg.newline_pos) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
log("hit NotEnoughBytes3", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
if (result.bytes_consumed < slice.len) {
|
||||
slice = slice[result.bytes_consumed..];
|
||||
} else {
|
||||
// clear the buffer
|
||||
send_queue.incoming.len = 0;
|
||||
return;
|
||||
}
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
json_buf.consume(result.bytes_consumed);
|
||||
}
|
||||
},
|
||||
.advanced => |*adv_buf| {
|
||||
adv_buf.len +|= @as(u32, @intCast(buffer.len));
|
||||
var slice = adv_buf.slice();
|
||||
|
||||
bun.assert(adv_buf.len <= adv_buf.cap);
|
||||
bun.assert(bun.isSliceInBuffer(buffer, adv_buf.allocatedSlice()));
|
||||
|
||||
while (true) {
|
||||
const result = decodeIPCMessage(.advanced, slice, globalThis, null) catch |e| switch (e) {
|
||||
error.NotEnoughBytes => {
|
||||
// copy the remaining bytes to the start of the buffer
|
||||
bun.copy(u8, adv_buf.ptr[0..slice.len], slice);
|
||||
// slice.len is guaranteed <= adv_buf.len (u32) since it's derived from adv_buf.slice()
|
||||
bun.debugAssert(slice.len <= std.math.maxInt(u32));
|
||||
adv_buf.len = @intCast(slice.len);
|
||||
log("hit NotEnoughBytes3", .{});
|
||||
return;
|
||||
},
|
||||
error.InvalidFormat, error.JSError, error.JSTerminated => {
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
error.OutOfMemory => {
|
||||
Output.printErrorln("IPC message is too long.", .{});
|
||||
send_queue.closeSocket(.failure, .user);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
handleIPCMessage(send_queue, result.message, globalThis);
|
||||
|
||||
if (result.bytes_consumed < slice.len) {
|
||||
slice = slice[result.bytes_consumed..];
|
||||
} else {
|
||||
// clear the buffer
|
||||
adv_buf.len = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1380,6 +1496,7 @@ const string = []const u8;
|
||||
|
||||
const node_cluster_binding = @import("./node/node_cluster_binding.zig");
|
||||
const std = @import("std");
|
||||
const JSONLineBuffer = @import("./JSONLineBuffer.zig").JSONLineBuffer;
|
||||
|
||||
const bun = @import("bun");
|
||||
const Environment = bun.Environment;
|
||||
|
||||
Reference in New Issue
Block a user