Compare commits

...

1 Commits

Author SHA1 Message Date
Claude Bot
9062e7ad92 Add workspace-local process manager with bun start/stop/list/logs
Implements a lightweight process manager for managing background processes
within a workspace. Each workspace (cwd) gets its own daemon that manages
processes via Unix socket IPC.

Commands:
- bun start [script] - Start a process in the background
- bun stop [name] - Stop a running process
- bun list - List all running processes in this workspace
- bun logs [name] [-f] - View process logs (with optional follow mode)

Implementation details:
- Uses uSockets for efficient Unix socket IPC between client and daemon
- Daemon auto-spawns on first command and runs in the background
- Processes are workspace-isolated based on cwd hash
- Logs are captured to /tmp/bun-logs/{workspace-hash}/
- Simple process supervision with pid tracking

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-30 21:32:10 +00:00
6 changed files with 1204 additions and 1 deletions

View File

@@ -91,6 +91,7 @@ pub const PackCommand = @import("./cli/pack_command.zig").PackCommand;
pub const AuditCommand = @import("./cli/audit_command.zig").AuditCommand;
pub const InitCommand = @import("./cli/init_command.zig").InitCommand;
pub const WhyCommand = @import("./cli/why_command.zig").WhyCommand;
pub const ProcessManagerCommand = @import("./cli/process_manager_command.zig").ProcessManagerCommand;
pub const Arguments = @import("./cli/Arguments.zig");
@@ -617,8 +618,11 @@ pub const Command = struct {
RootCommandMatcher.case("logout") => .ReservedCommand,
RootCommandMatcher.case("whoami") => .PackageManagerCommand,
RootCommandMatcher.case("prune") => .ReservedCommand,
RootCommandMatcher.case("list") => .ReservedCommand,
RootCommandMatcher.case("list") => .ListCommand,
RootCommandMatcher.case("why") => .WhyCommand,
RootCommandMatcher.case("start") => .StartCommand,
RootCommandMatcher.case("stop") => .StopCommand,
RootCommandMatcher.case("logs") => .LogsCommand,
RootCommandMatcher.case("-e") => .AutoCommand,
@@ -790,6 +794,26 @@ pub const Command = struct {
try WhyCommand.exec(ctx);
return;
},
.StartCommand => {
const ctx = try Command.init(allocator, log, .StartCommand);
try ProcessManagerCommand.exec(ctx);
return;
},
.StopCommand => {
const ctx = try Command.init(allocator, log, .StopCommand);
try ProcessManagerCommand.exec(ctx);
return;
},
.ListCommand => {
const ctx = try Command.init(allocator, log, .ListCommand);
try ProcessManagerCommand.exec(ctx);
return;
},
.LogsCommand => {
const ctx = try Command.init(allocator, log, .LogsCommand);
try ProcessManagerCommand.exec(ctx);
return;
},
.BunxCommand => {
const ctx = try Command.init(allocator, log, .BunxCommand);
@@ -963,6 +987,10 @@ pub const Command = struct {
PublishCommand,
AuditCommand,
WhyCommand,
StartCommand,
StopCommand,
ListCommand,
LogsCommand,
/// Used by crash reports.
///
@@ -1000,6 +1028,10 @@ pub const Command = struct {
.PublishCommand => 'k',
.AuditCommand => 'A',
.WhyCommand => 'W',
.StartCommand => 'S',
.StopCommand => 's',
.ListCommand => 'L',
.LogsCommand => 'O',
};
}

View File

@@ -0,0 +1,388 @@
const std = @import("std");
const bun = @import("bun");
const uws = bun.uws;
const Environment = bun.Environment;
const Command = @import("../../cli.zig").Command;
const Protocol = @import("./protocol.zig");
const Manager = @import("./manager.zig");
const Output = bun.Output;
const Global = bun.Global;
const strings = bun.strings;
var path_buf: bun.PathBuffer = undefined;
fn getSocketPath(allocator: std.mem.Allocator, cwd: []const u8) ![]const u8 {
const hash = std.hash.Wyhash.hash(0, cwd);
if (Environment.isLinux) {
return try std.fmt.allocPrint(allocator, "\x00bun-pm-{x}", .{hash});
} else if (Environment.isMac) {
return try std.fmt.allocPrint(allocator, "/tmp/bun-pm-{x}.sock", .{hash});
} else if (Environment.isWindows) {
return try std.fmt.allocPrint(allocator, "\\\\.\\pipe\\bun-pm-{x}", .{hash});
}
unreachable;
}
const ClientContext = struct {
socket: uws.NewSocketHandler(false),
allocator: std.mem.Allocator,
response_buffer: std.ArrayList(u8),
command_to_send: []const u8, // JSON serialized command
done: bool = false,
error_occurred: bool = false,
pub fn onOpen(this: *ClientContext, socket: uws.NewSocketHandler(false)) void {
_ = socket;
// CRITICAL: Send command immediately when connected
_ = this.socket.write(this.command_to_send);
// Shutdown write side to signal we're done sending
this.socket.shutdown();
}
pub fn onData(this: *ClientContext, socket: uws.NewSocketHandler(false), data: []const u8) void {
_ = socket;
// Accumulate response data
this.response_buffer.appendSlice(data) catch {
this.error_occurred = true;
this.socket.close(.failure);
};
}
pub fn onClose(this: *ClientContext, socket: uws.NewSocketHandler(false), code: i32, reason: ?*anyopaque) void {
_ = socket;
_ = code;
_ = reason;
// Connection closed - we're done
this.done = true;
}
pub fn onEnd(this: *ClientContext, socket: uws.NewSocketHandler(false)) void {
_ = socket;
// Server finished sending, close our side
this.socket.close(.normal);
}
pub fn onConnectError(this: *ClientContext, socket: uws.NewSocketHandler(false), code: i32) void {
_ = socket;
_ = code;
this.error_occurred = true;
this.done = true;
}
pub fn onWritable(this: *ClientContext, socket: uws.NewSocketHandler(false)) void {
_ = this;
_ = socket;
}
pub fn onTimeout(this: *ClientContext, socket: uws.NewSocketHandler(false)) void {
socket.close(.failure);
this.error_occurred = true;
}
pub fn onLongTimeout(this: *ClientContext, socket: uws.NewSocketHandler(false)) void {
socket.close(.failure);
this.error_occurred = true;
}
pub fn onHandshake(this: *ClientContext, socket: uws.NewSocketHandler(false), success: i32, verify_error: uws.us_bun_verify_error_t) void {
_ = this;
_ = socket;
_ = success;
_ = verify_error;
}
};
fn sendCommandAndWaitForResponse(
allocator: std.mem.Allocator,
socket_path: []const u8,
cmd: Protocol.Command,
) !Protocol.Response {
// 1. Get event loop
const loop = uws.Loop.get();
// 2. Create socket context
const socket_ctx = uws.SocketContext.createNoSSLContext(loop, @sizeOf(*ClientContext)) orelse
return error.SocketContextFailed;
defer socket_ctx.deinit(false);
// 3. Configure callbacks
const Socket = uws.NewSocketHandler(false);
Socket.configure(socket_ctx, true, *ClientContext, ClientContext);
// 4. Serialize command to JSON
var cmd_buf = std.ArrayList(u8).init(allocator);
defer cmd_buf.deinit();
try std.json.stringify(cmd, .{}, cmd_buf.writer());
// 5. Create client context
var client = ClientContext{
.socket = .{ .socket = .{ .detached = {} } },
.allocator = allocator,
.response_buffer = std.ArrayList(u8).init(allocator),
.command_to_send = cmd_buf.items,
};
defer client.response_buffer.deinit();
// 6. Convert socket path to null-terminated
const socket_path_z = try allocator.dupeZ(u8, socket_path);
defer allocator.free(socket_path_z);
// 7. Connect to Unix socket
client.socket = Socket.connectUnixAnon(
socket_path_z,
socket_ctx,
&client,
false,
) catch {
return error.ManagerNotRunning;
};
// 8. Run event loop until connection closes
while (!client.done) {
loop.tick();
}
// 9. Check for errors
if (client.error_occurred) {
return error.ConnectionFailed;
}
// 10. Parse JSON response
const parsed = try std.json.parseFromSlice(
Protocol.Response,
allocator,
client.response_buffer.items,
.{},
);
return parsed.value;
}
// Command implementations
pub fn startCommand(ctx: Command.Context) !void {
if (ctx.positionals.len < 2) {
Output.errGeneric("Usage: bun start [script]", .{});
Global.exit(1);
}
const script_name = ctx.positionals[1];
const cwd = try bun.getcwd(&path_buf);
const socket_path = try getSocketPath(ctx.allocator, cwd);
const hash = std.hash.Wyhash.hash(0, cwd);
const cmd = Protocol.Command{
.start = .{
.name = script_name,
.script = script_name,
.cwd = cwd,
},
};
const response = sendCommandAndWaitForResponse(ctx.allocator, socket_path, cmd) catch |err| {
if (err == error.ManagerNotRunning) {
// Spawn manager and retry
try Manager.spawnManager(socket_path, hash, ctx.allocator);
std.time.sleep(100 * std.time.ns_per_ms);
const retry_response = try sendCommandAndWaitForResponse(ctx.allocator, socket_path, cmd);
handleResponse(retry_response, script_name);
return;
}
return err;
};
handleResponse(response, script_name);
}
pub fn stopCommand(ctx: Command.Context) !void {
if (ctx.positionals.len < 2) {
Output.errGeneric("Usage: bun stop [name]", .{});
Global.exit(1);
}
const name = ctx.positionals[1];
const cwd = try bun.getcwd(&path_buf);
const socket_path = try getSocketPath(ctx.allocator, cwd);
const cmd = Protocol.Command{ .stop = .{ .name = name } };
const response = try sendCommandAndWaitForResponse(ctx.allocator, socket_path, cmd);
handleResponse(response, name);
}
pub fn listCommand(ctx: Command.Context) !void {
const cwd = try bun.getcwd(&path_buf);
const socket_path = try getSocketPath(ctx.allocator, cwd);
const cmd = Protocol.Command.list;
const response = sendCommandAndWaitForResponse(ctx.allocator, socket_path, cmd) catch |err| {
if (err == error.ManagerNotRunning) {
Output.prettyln("No processes running in this workspace", .{});
return;
}
return err;
};
switch (response) {
.process_list => |list| {
if (list.len == 0) {
Output.prettyln("No processes running", .{});
return;
}
Output.prettyln("\n<b>NAME{s: <20}PID{s: <10}COMMAND{s: <30}UPTIME<r>", .{ "", "", "" });
for (list) |proc| {
const uptime = formatUptime(proc.uptime);
Output.prettyln("{s: <20}{d: <10}{s: <30}{s}", .{ proc.name, proc.pid, proc.script, uptime });
}
Output.prettyln("", .{});
},
else => {
Output.errGeneric("Unexpected response from manager", .{});
Global.exit(1);
},
}
}
pub fn logsCommand(ctx: Command.Context) !void {
if (ctx.positionals.len < 2) {
Output.errGeneric("Usage: bun logs [name] [-f]", .{});
Global.exit(1);
}
const name = ctx.positionals[1];
// Parse -f flag
const follow = blk: {
for (ctx.positionals[2..]) |arg| {
if (strings.eqlComptime(arg, "-f")) break :blk true;
}
break :blk false;
};
const cwd = try bun.getcwd(&path_buf);
const socket_path = try getSocketPath(ctx.allocator, cwd);
const cmd = Protocol.Command{ .logs = .{ .name = name, .follow = follow } };
const response = try sendCommandAndWaitForResponse(ctx.allocator, socket_path, cmd);
switch (response) {
.log_path => |paths| {
if (follow) {
try tailLogsFollow(paths.stdout, paths.stderr);
} else {
try catLogs(paths.stdout, paths.stderr);
}
},
.err => |e| {
Output.errGeneric("{s}", .{e.message});
Global.exit(1);
},
else => unreachable,
}
}
fn handleResponse(response: Protocol.Response, name: []const u8) void {
switch (response) {
.success => |s| {
Output.prettyln("<green>✓<r> {s}: <b>{s}<r>", .{ s.message, name });
},
.err => |e| {
Output.errGeneric("{s}", .{e.message});
Global.exit(1);
},
else => {
Output.errGeneric("Unexpected response", .{});
Global.exit(1);
},
}
}
fn formatUptime(seconds: i64) []const u8 {
var buf: [32]u8 = undefined;
if (seconds < 60) {
return std.fmt.bufPrint(&buf, "{d}s", .{seconds}) catch "?";
} else if (seconds < 3600) {
const mins = @divFloor(seconds, 60);
return std.fmt.bufPrint(&buf, "{d}m", .{mins}) catch "?";
} else if (seconds < 86400) {
const hours = @divFloor(seconds, 3600);
return std.fmt.bufPrint(&buf, "{d}h", .{hours}) catch "?";
} else {
const days = @divFloor(seconds, 86400);
return std.fmt.bufPrint(&buf, "{d}d", .{days}) catch "?";
}
}
fn catLogs(stdout_path: []const u8, stderr_path: []const u8) !void {
// Convert paths to zero-terminated
var stdout_buf: bun.PathBuffer = undefined;
const stdout_path_z = try std.fmt.bufPrintZ(&stdout_buf, "{s}", .{stdout_path});
var stderr_buf: bun.PathBuffer = undefined;
const stderr_path_z = try std.fmt.bufPrintZ(&stderr_buf, "{s}", .{stderr_path});
// Read and print stdout
const stdout_file = try bun.sys.open(stdout_path_z, bun.O.RDONLY, 0).unwrap();
defer _ = stdout_file.close();
var buf: [4096]u8 = undefined;
while (true) {
const n = try bun.sys.read(stdout_file, &buf).unwrap();
if (n == 0) break;
try bun.Output.writer().writeAll(buf[0..n]);
}
// Read and print stderr
const stderr_file = try bun.sys.open(stderr_path_z, bun.O.RDONLY, 0).unwrap();
defer _ = stderr_file.close();
while (true) {
const n = try bun.sys.read(stderr_file, &buf).unwrap();
if (n == 0) break;
try bun.Output.writer().writeAll(buf[0..n]);
}
}
fn tailLogsFollow(stdout_path: []const u8, stderr_path: []const u8) !void {
// Convert paths to zero-terminated
var stdout_buf: bun.PathBuffer = undefined;
const stdout_path_z = try std.fmt.bufPrintZ(&stdout_buf, "{s}", .{stdout_path});
var stderr_buf: bun.PathBuffer = undefined;
const stderr_path_z = try std.fmt.bufPrintZ(&stderr_buf, "{s}", .{stderr_path});
// Open files
const stdout_file = try bun.sys.open(stdout_path_z, bun.O.RDONLY, 0).unwrap();
defer _ = stdout_file.close();
const stderr_file = try bun.sys.open(stderr_path_z, bun.O.RDONLY, 0).unwrap();
defer _ = stderr_file.close();
// Seek to end
_ = try bun.sys.lseek(stdout_file, 0, std.posix.SEEK.END).unwrap();
_ = try bun.sys.lseek(stderr_file, 0, std.posix.SEEK.END).unwrap();
var buf: [4096]u8 = undefined;
// Poll for new data
while (true) {
// Try stdout
const n_out = bun.sys.read(stdout_file, &buf).unwrap() catch 0;
if (n_out > 0) {
try bun.Output.writer().writeAll(buf[0..n_out]);
}
// Try stderr
const n_err = bun.sys.read(stderr_file, &buf).unwrap() catch 0;
if (n_err > 0) {
try bun.Output.writer().writeAll(buf[0..n_err]);
}
// Sleep briefly if no new data
if (n_out == 0 and n_err == 0) {
std.time.sleep(100 * std.time.ns_per_ms);
}
}
}

View File

@@ -0,0 +1,384 @@
const std = @import("std");
const bun = @import("bun");
const uws = bun.uws;
const Environment = bun.Environment;
const Protocol = @import("./protocol.zig");
const Output = bun.Output;
const Global = bun.Global;
const strings = bun.strings;
const SpawnResult = bun.spawn.SpawnResult;
const Subprocess = bun.jsc.Subprocess;
pub const ProcessManager = struct {
allocator: std.mem.Allocator,
loop: *uws.Loop,
socket_context: *uws.SocketContext,
listen_socket: *uws.ListenSocket,
processes: std.StringHashMap(*ManagedProcess),
workspace_hash: u64,
log_dir: []const u8,
socket_path: []const u8,
active_clients: u32 = 0,
pub fn init(allocator: std.mem.Allocator, socket_path: []const u8, workspace_hash: u64) !*ProcessManager {
const loop = uws.Loop.get();
// Create socket context
const ctx = uws.SocketContext.createNoSSLContext(loop, @sizeOf(*ProcessManager)) orelse
return error.SocketContextFailed;
const self = try allocator.create(ProcessManager);
self.* = .{
.allocator = allocator,
.loop = loop,
.socket_context = ctx,
.listen_socket = undefined,
.processes = std.StringHashMap(*ManagedProcess).init(allocator),
.workspace_hash = workspace_hash,
.log_dir = try std.fmt.allocPrint(allocator, "/tmp/bun-logs/{x}", .{workspace_hash}),
.socket_path = try allocator.dupe(u8, socket_path),
};
// Store manager pointer in context extension
const ctx_ext = ctx.ext(false, *ProcessManager).?;
ctx_ext.* = self;
// Create log directory
const log_dir_z = try allocator.dupeZ(u8, self.log_dir);
defer allocator.free(log_dir_z);
_ = bun.sys.mkdir(log_dir_z, 0o755).unwrap() catch {};
// Listen on Unix socket
var listen_err: c_int = 0;
const socket_path_z = try allocator.dupeZ(u8, socket_path);
defer allocator.free(socket_path_z);
self.listen_socket = ctx.listenUnix(
false,
socket_path_z,
socket_path.len,
0,
@sizeOf(ClientHandler),
&listen_err,
) orelse {
if (listen_err == @intFromEnum(std.posix.E.ADDRINUSE)) return error.AddressInUse;
return error.ListenFailed;
};
// Configure callbacks
ctx.onOpen(false, onClientOpen);
ctx.onData(false, onClientData);
ctx.onClose(false, onClientClose);
ctx.onEnd(false, onClientEnd);
return self;
}
pub fn run(self: *ProcessManager) void {
// CRITICAL: Loop until no processes AND no active clients
while (self.processes.count() > 0 or self.active_clients > 0) {
self.loop.tick();
}
self.cleanup();
}
fn cleanup(self: *ProcessManager) void {
// Kill remaining processes
var iter = self.processes.iterator();
while (iter.next()) |entry| {
std.posix.kill(entry.value_ptr.*.pid, std.posix.SIG.KILL) catch {};
}
self.socket_context.close(false);
// Remove socket file on macOS
if (Environment.isMac and !strings.hasPrefix(self.socket_path, "\x00")) {
_ = bun.sys.unlink(self.socket_path);
}
self.allocator.free(self.log_dir);
self.allocator.free(self.socket_path);
}
fn handleCommand(self: *ProcessManager, cmd: Protocol.Command) Protocol.Response {
return switch (cmd) {
.start => |s| self.handleStart(s.name, s.script, s.cwd),
.stop => |s| self.handleStop(s.name),
.list => self.handleList(),
.logs => |l| self.handleLogs(l.name),
};
}
fn handleStart(self: *ProcessManager, name: []const u8, script: []const u8, cwd: []const u8) Protocol.Response {
self.startProcess(name, script, cwd) catch |err| {
return .{ .err = .{ .message = @errorName(err) } };
};
return .{ .success = .{ .message = "Started" } };
}
fn handleStop(self: *ProcessManager, name: []const u8) Protocol.Response {
if (self.processes.get(name)) |proc| {
std.posix.kill(proc.pid, std.posix.SIG.TERM) catch {};
return .{ .success = .{ .message = "Stopped" } };
}
return .{ .err = .{ .message = "Process not found" } };
}
fn handleList(self: *ProcessManager) Protocol.Response {
var list = std.ArrayList(Protocol.ProcessInfo).init(self.allocator);
var iter = self.processes.iterator();
while (iter.next()) |entry| {
const pid = entry.value_ptr.*.pid;
list.append(.{
.name = entry.value_ptr.*.name,
.pid = pid,
.script = entry.value_ptr.*.script,
.uptime = std.time.timestamp() - entry.value_ptr.*.started_at,
}) catch return .{ .err = .{ .message = "OutOfMemory" } };
}
return .{ .process_list = list.toOwnedSlice() catch &.{} };
}
fn handleLogs(self: *ProcessManager, name: []const u8) Protocol.Response {
if (self.processes.contains(name)) {
const stdout_path = std.fmt.allocPrint(
self.allocator,
"{s}/{s}-stdout.log",
.{ self.log_dir, name },
) catch return .{ .err = .{ .message = "OutOfMemory" } };
const stderr_path = std.fmt.allocPrint(
self.allocator,
"{s}/{s}-stderr.log",
.{ self.log_dir, name },
) catch return .{ .err = .{ .message = "OutOfMemory" } };
return .{ .log_path = .{ .stdout = stdout_path, .stderr = stderr_path } };
}
return .{ .err = .{ .message = "Process not found" } };
}
fn startProcess(self: *ProcessManager, name: []const u8, script: []const u8, cwd: []const u8) !void {
if (self.processes.contains(name)) {
return error.ProcessAlreadyExists;
}
// Create log files
const stdout_path = try std.fmt.allocPrintZ(self.allocator, "{s}/{s}-stdout.log", .{ self.log_dir, name });
defer self.allocator.free(stdout_path);
const stderr_path = try std.fmt.allocPrintZ(self.allocator, "{s}/{s}-stderr.log", .{ self.log_dir, name });
defer self.allocator.free(stderr_path);
const stdout_fd = try bun.sys.open(
stdout_path,
bun.O.WRONLY | bun.O.CREAT | bun.O.APPEND,
0o644,
).unwrap();
const stderr_fd = try bun.sys.open(
stderr_path,
bun.O.WRONLY | bun.O.CREAT | bun.O.APPEND,
0o644,
).unwrap();
// Build argv
const bun_exe = try bun.selfExePath();
var argv = std.ArrayList(?[*:0]const u8).init(self.allocator);
defer argv.deinit();
try argv.append(try self.allocator.dupeZ(u8, bun_exe));
try argv.append(try self.allocator.dupeZ(u8, "run"));
try argv.append(try self.allocator.dupeZ(u8, script));
try argv.append(null); // null terminator
const envp = std.os.environ;
// Spawn process
const spawn_options = bun.spawn.SpawnOptions{
.cwd = cwd,
.stdin = .ignore,
.stdout = .{ .pipe = stdout_fd },
.stderr = .{ .pipe = stderr_fd },
.detached = false,
.windows = if (Environment.isWindows) .{
.loop = bun.jsc.EventLoopHandle.init(self.loop),
} else undefined,
};
const maybe_result = try bun.spawn.spawnProcess(
&spawn_options,
@ptrCast(argv.items.ptr),
@ptrCast(envp),
);
const result = try maybe_result.unwrap();
// Create managed process
const managed = try self.allocator.create(ManagedProcess);
managed.* = .{
.name = try self.allocator.dupe(u8, name),
.pid = result.pid,
.log_stdout = stdout_fd,
.log_stderr = stderr_fd,
.script = try self.allocator.dupe(u8, script),
.cwd = try self.allocator.dupe(u8, cwd),
.started_at = std.time.timestamp(),
.manager = self,
};
// Add to map
try self.processes.put(try self.allocator.dupe(u8, name), managed);
}
};
const ManagedProcess = struct {
name: []const u8,
pid: std.posix.pid_t,
log_stdout: bun.FileDescriptor,
log_stderr: bun.FileDescriptor,
script: []const u8,
cwd: []const u8,
started_at: i64,
manager: *ProcessManager,
};
const ClientHandler = struct {
buffer: std.ArrayList(u8),
manager: *ProcessManager,
};
fn onClientOpen(
socket: *uws.Socket,
is_client: i32,
ip: [*c]u8,
ip_len: i32,
) callconv(.C) ?*uws.Socket {
_ = is_client;
_ = ip;
_ = ip_len;
const ctx = socket.context(false);
const manager = ctx.ext(false, *ProcessManager).?.*;
manager.active_clients += 1;
const handler = @as(*ClientHandler, @ptrCast(@alignCast(socket.ext(false))));
handler.* = .{
.buffer = std.ArrayList(u8).init(manager.allocator),
.manager = manager,
};
return socket;
}
fn onClientData(
socket: *uws.Socket,
data_ptr: [*c]u8,
data_len: i32,
) callconv(.C) ?*uws.Socket {
const handler = @as(*ClientHandler, @ptrCast(@alignCast(socket.ext(false))));
const data = data_ptr[0..@intCast(data_len)];
handler.buffer.appendSlice(data) catch {
socket.close(false, .failure);
return null;
};
return socket;
}
fn onClientEnd(
socket: *uws.Socket,
) callconv(.C) ?*uws.Socket {
const handler = @as(*ClientHandler, @ptrCast(@alignCast(socket.ext(false))));
// Parse command
const cmd = std.json.parseFromSlice(
Protocol.Command,
handler.manager.allocator,
handler.buffer.items,
.{},
) catch {
socket.close(false, .failure);
return null;
};
defer cmd.deinit();
// Handle command
const response = handler.manager.handleCommand(cmd.value);
// Serialize response
var response_buf = std.ArrayList(u8).init(handler.manager.allocator);
defer response_buf.deinit();
std.json.stringify(response, .{}, response_buf.writer()) catch {
socket.close(false, .failure);
return null;
};
// Send response
_ = socket.write(false, response_buf.items);
// Close
socket.close(false, .normal);
return null;
}
fn onClientClose(
socket: *uws.Socket,
code: i32,
reason: ?*anyopaque,
) callconv(.C) ?*uws.Socket {
_ = code;
_ = reason;
const handler = @as(*ClientHandler, @ptrCast(@alignCast(socket.ext(false))));
handler.manager.active_clients -= 1;
handler.buffer.deinit();
return socket;
}
pub fn spawnManager(socket_path: []const u8, workspace_hash: u64, allocator: std.mem.Allocator) !void {
if (Environment.isWindows) {
return try spawnManagerWindows(socket_path, workspace_hash, allocator);
}
const pid = std.c.fork();
if (pid < 0) return error.ForkFailed;
if (pid > 0) return;
_ = std.os.linux.setsid();
// Close all FDs except 0,1,2
const max_fd = if (Environment.isLinux) 1024 else 256;
var fd: i32 = 3;
while (fd < max_fd) : (fd += 1) {
_ = bun.FD.fromNative(fd).close();
}
// Redirect stdio to /dev/null
const null_fd = try bun.sys.open("/dev/null", bun.O.RDWR, 0).unwrap();
try std.posix.dup2(null_fd.cast(), bun.FD.stdin().cast());
try std.posix.dup2(null_fd.cast(), bun.FD.stdout().cast());
try std.posix.dup2(null_fd.cast(), bun.FD.stderr().cast());
if (null_fd.cast() > 2) _ = null_fd.close();
// Run manager
const manager = ProcessManager.init(allocator, socket_path, workspace_hash) catch {
Global.exit(1);
};
manager.run();
Global.exit(0);
}
fn spawnManagerWindows(socket_path: []const u8, workspace_hash: u64, allocator: std.mem.Allocator) !void {
// TODO: Windows implementation
_ = socket_path;
_ = workspace_hash;
_ = allocator;
return error.NotImplemented;
}

View File

@@ -0,0 +1,38 @@
const std = @import("std");
pub const Command = union(enum) {
start: struct {
name: []const u8,
script: []const u8,
cwd: []const u8,
},
stop: struct {
name: []const u8,
},
list: void,
logs: struct {
name: []const u8,
follow: bool,
},
};
pub const Response = union(enum) {
success: struct {
message: []const u8,
},
err: struct {
message: []const u8,
},
process_list: []ProcessInfo,
log_path: struct {
stdout: []const u8,
stderr: []const u8,
},
};
pub const ProcessInfo = struct {
name: []const u8,
pid: i32,
script: []const u8,
uptime: i64,
};

View File

@@ -0,0 +1,66 @@
const std = @import("std");
const bun = @import("bun");
const Command = @import("../cli.zig").Command;
const strings = bun.strings;
const Output = bun.Output;
const Global = bun.Global;
const Client = @import("./process_manager/client.zig");
pub const ProcessManagerCommand = struct {
pub fn exec(ctx: Command.Context) !void {
const args = ctx.positionals;
if (args.len == 0) {
printHelp();
Global.exit(1);
}
const subcommand = args[0];
if (strings.eqlComptime(subcommand, "start")) {
try Client.startCommand(ctx);
} else if (strings.eqlComptime(subcommand, "stop")) {
try Client.stopCommand(ctx);
} else if (strings.eqlComptime(subcommand, "list")) {
try Client.listCommand(ctx);
} else if (strings.eqlComptime(subcommand, "logs")) {
try Client.logsCommand(ctx);
} else {
Output.errGeneric("Unknown subcommand: {s}", .{subcommand});
printHelp();
Global.exit(1);
}
}
fn printHelp() void {
const help_text =
\\<b>Usage:<r>
\\
\\ <b><green>bun start<r> <cyan>[script]<r>
\\ Start a process in the background
\\
\\ <b><green>bun stop<r> <cyan>[name]<r>
\\ Stop a running process
\\
\\ <b><green>bun list<r>
\\ List all running processes in this workspace
\\
\\ <b><green>bun logs<r> <cyan>[name]<r> [-f]
\\ Show logs for a process (-f to follow)
\\
\\<b>Examples:<r>
\\
\\ bun start dev # Start "dev" script from package.json
\\ bun start ./server.js # Start a file directly
\\ bun list # See what's running
\\ bun logs dev # View dev logs
\\ bun logs dev -f # Follow dev logs
\\ bun stop dev # Stop dev process
\\
;
Output.pretty(help_text, .{});
Output.flush();
}
};

View File

@@ -0,0 +1,295 @@
import { describe, expect, test } from "bun:test";
import { bunEnv, bunExe, normalizeBunSnapshot, tempDir } from "harness";
describe("bun process manager", () => {
test("bun start - starts a process", async () => {
using dir = tempDir("process-manager-start", {
"server.js": `
console.log("Server started");
setInterval(() => {}, 1000); // Keep alive
`,
});
await using proc = Bun.spawn({
cmd: [bunExe(), "start", "server.js"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);
expect(normalizeBunSnapshot(stdout, dir)).toMatchInlineSnapshot(`"✓ Started: server.js"`);
expect(exitCode).toBe(0);
// Clean up - stop the process
const stopProc = Bun.spawn({
cmd: [bunExe(), "stop", "server.js"],
env: bunEnv,
cwd: String(dir),
});
await stopProc.exited;
});
test("bun list - lists running processes", async () => {
using dir = tempDir("process-manager-list", {
"worker.js": `
console.log("Worker started");
setInterval(() => {}, 1000);
`,
});
// Start a process first
const startProc = Bun.spawn({
cmd: [bunExe(), "start", "worker.js"],
env: bunEnv,
cwd: String(dir),
});
await startProc.exited;
// Now list processes
await using listProc = Bun.spawn({
cmd: [bunExe(), "list"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
listProc.stdout.text(),
listProc.stderr.text(),
listProc.exited,
]);
// Should show the worker process
expect(stdout).toContain("worker.js");
expect(stdout).toContain("NAME");
expect(stdout).toContain("PID");
expect(exitCode).toBe(0);
// Clean up
const stopProc = Bun.spawn({
cmd: [bunExe(), "stop", "worker.js"],
env: bunEnv,
cwd: String(dir),
});
await stopProc.exited;
});
test("bun stop - stops a running process", async () => {
using dir = tempDir("process-manager-stop", {
"service.js": `
console.log("Service running");
setInterval(() => {}, 1000);
`,
});
// Start a process
const startProc = Bun.spawn({
cmd: [bunExe(), "start", "service.js"],
env: bunEnv,
cwd: String(dir),
});
await startProc.exited;
// Stop the process
await using stopProc = Bun.spawn({
cmd: [bunExe(), "stop", "service.js"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
stopProc.stdout.text(),
stopProc.stderr.text(),
stopProc.exited,
]);
expect(normalizeBunSnapshot(stdout, dir)).toMatchInlineSnapshot(`"✓ Stopped: service.js"`);
expect(exitCode).toBe(0);
// Verify it's not in the list anymore
const listProc = Bun.spawn({
cmd: [bunExe(), "list"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
});
const listOutput = await listProc.stdout.text();
await listProc.exited;
// Should either show no processes or not include service.js
if (!listOutput.includes("No processes")) {
expect(listOutput).not.toContain("service.js");
}
});
test("bun logs - shows process logs", async () => {
using dir = tempDir("process-manager-logs", {
"logger.js": `
console.log("Log message 1");
console.error("Error message 1");
console.log("Log message 2");
`,
});
// Start and let it finish
const startProc = Bun.spawn({
cmd: [bunExe(), "start", "logger.js"],
env: bunEnv,
cwd: String(dir),
});
await startProc.exited;
// Wait a bit for logs to be written
await Bun.sleep(100);
// Check logs
await using logsProc = Bun.spawn({
cmd: [bunExe(), "logs", "logger.js"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
logsProc.stdout.text(),
logsProc.stderr.text(),
logsProc.exited,
]);
expect(stdout).toContain("Log message 1");
expect(stdout).toContain("Log message 2");
expect(exitCode).toBe(0);
// Clean up
const stopProc = Bun.spawn({
cmd: [bunExe(), "stop", "logger.js"],
env: bunEnv,
cwd: String(dir),
});
await stopProc.exited;
});
test("bun start - prevents duplicate process names", async () => {
using dir = tempDir("process-manager-duplicate", {
"app.js": `
console.log("App started");
setInterval(() => {}, 1000);
`,
});
// Start first process
const start1 = Bun.spawn({
cmd: [bunExe(), "start", "app.js"],
env: bunEnv,
cwd: String(dir),
});
await start1.exited;
// Try to start again with same name
await using start2 = Bun.spawn({
cmd: [bunExe(), "start", "app.js"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([start2.stdout.text(), start2.stderr.text(), start2.exited]);
expect(exitCode).not.toBe(0);
expect(stderr.toLowerCase()).toMatch(/already|exists/);
// Clean up
const stopProc = Bun.spawn({
cmd: [bunExe(), "stop", "app.js"],
env: bunEnv,
cwd: String(dir),
});
await stopProc.exited;
});
test("bun list - shows empty list when no processes running", async () => {
using dir = tempDir("process-manager-empty");
await using listProc = Bun.spawn({
cmd: [bunExe(), "list"],
env: bunEnv,
cwd: String(dir),
stdout: "pipe",
stderr: "pipe",
});
const [stdout, stderr, exitCode] = await Promise.all([
listProc.stdout.text(),
listProc.stderr.text(),
listProc.exited,
]);
expect(stdout.toLowerCase()).toMatch(/no processes|not running/);
expect(exitCode).toBe(0);
});
test("workspace isolation - processes in different directories are separate", async () => {
using dir1 = tempDir("process-manager-ws1", {
"proc.js": `setInterval(() => {}, 1000);`,
});
using dir2 = tempDir("process-manager-ws2", {
"proc.js": `setInterval(() => {}, 1000);`,
});
// Start process in dir1
const start1 = Bun.spawn({
cmd: [bunExe(), "start", "proc.js"],
env: bunEnv,
cwd: String(dir1),
});
await start1.exited;
// Start process in dir2
const start2 = Bun.spawn({
cmd: [bunExe(), "start", "proc.js"],
env: bunEnv,
cwd: String(dir2),
});
await start2.exited;
// List in dir1 should only show dir1's process
const list1 = Bun.spawn({
cmd: [bunExe(), "list"],
env: bunEnv,
cwd: String(dir1),
stdout: "pipe",
});
const out1 = await list1.stdout.text();
await list1.exited;
// List in dir2 should only show dir2's process
const list2 = Bun.spawn({
cmd: [bunExe(), "list"],
env: bunEnv,
cwd: String(dir2),
stdout: "pipe",
});
const out2 = await list2.stdout.text();
await list2.exited;
// Both should show exactly one process
const count1 = (out1.match(/proc\.js/g) || []).length;
const count2 = (out2.match(/proc\.js/g) || []).length;
expect(count1).toBe(1);
expect(count2).toBe(1);
// Clean up
Bun.spawn({ cmd: [bunExe(), "stop", "proc.js"], env: bunEnv, cwd: String(dir1) });
Bun.spawn({ cmd: [bunExe(), "stop", "proc.js"], env: bunEnv, cwd: String(dir2) });
});
});