From 06b3e11a627df91f52178acca9e6f2d7a98e2eb3 Mon Sep 17 00:00:00 2001 From: Claude Bot Date: Sat, 16 Aug 2025 14:05:04 +0000 Subject: [PATCH] add getOrSetItem and takeItem methods to processStorage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New methods provide common atomic operations: - getOrSetItem(key, defaultValue): Get existing value or set and return default - Thread-safe get-or-insert pattern - Useful for lazy initialization and caching - takeItem(key): Get value and remove atomically, return null if not found - Thread-safe consume pattern - Useful for work queues and one-time operations Both methods maintain thread safety with proper locking and isolated string copies. Comprehensive test coverage added for all edge cases. Removed vision document as requested. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cmake/sources/CxxSources.txt | 1 + proposal.md | 577 ---------------------- src/bun.js/bindings/BunProcessStorage.cpp | 84 ++++ test/js/bun/process-storage.test.ts | 55 +++ 4 files changed, 140 insertions(+), 577 deletions(-) delete mode 100644 proposal.md diff --git a/cmake/sources/CxxSources.txt b/cmake/sources/CxxSources.txt index ddc959fa37..e916d9ae6e 100644 --- a/cmake/sources/CxxSources.txt +++ b/cmake/sources/CxxSources.txt @@ -23,6 +23,7 @@ src/bun.js/bindings/BunJSCEventLoop.cpp src/bun.js/bindings/BunObject.cpp src/bun.js/bindings/BunPlugin.cpp src/bun.js/bindings/BunProcess.cpp +src/bun.js/bindings/BunProcessStorage.cpp src/bun.js/bindings/BunString.cpp src/bun.js/bindings/BunWorkerGlobalScope.cpp src/bun.js/bindings/c-bindings.cpp diff --git a/proposal.md b/proposal.md deleted file mode 100644 index 5fafa11479..0000000000 --- a/proposal.md +++ /dev/null @@ -1,577 +0,0 @@ -# Proposal: Structured Shared State for JavaScript Concurrency - -## Abstract - -This proposal introduces a new concurrency model for JavaScript that provides type-safe, transactional shared state management across workers and realms. Built on WebKit's existing infrastructure, it enables high-performance parallel computing while maintaining JavaScript's ease of use and safety guarantees. - -## Problem Statement - -### Current JavaScript Concurrency Limitations - -JavaScript's current concurrency model has significant gaps: - -1. **Limited Shared State**: Only SharedArrayBuffer for low-level byte arrays -2. **Serialization Overhead**: postMessage requires expensive cloning for complex objects -3. **Manual Coordination**: Developers must implement their own synchronization primitives -4. **Type Unsafe**: No compile-time guarantees about shared data structures -5. **Race Conditions**: Easy to introduce bugs with manual locking - -### Real-World Pain Points - -```javascript -// Current approach: Error-prone and inefficient -// 1. Pass large config to every worker (memory waste) -workers.forEach(worker => { - worker.postMessage({ config: largeConfigObject }); // Serialized N times -}); - -// 2. Manual coordination with SharedArrayBuffer (complex) -const sharedBuffer = new SharedArrayBuffer(1024); -const view = new Int32Array(sharedBuffer); -// Manual lock implementation, easy to deadlock -while (Atomics.compareExchange(view, 0, 0, 1) !== 0) { - // Spin wait - inefficient -} - -// 3. No structured data sharing -// Can't share Maps, Sets, or custom objects safely -``` - -## Proposed Solution: Structured Shared State - -### Core Principles - -1. **Type Safety**: Full TypeScript support with compile-time guarantees -2. **Structured Data**: Share Maps, Arrays, Objects, not just bytes -3. **Transactional**: Software Transactional Memory prevents race conditions -4. **Reactive**: Built-in change notifications across workers -5. **Zero-Copy**: Efficient sharing without serialization overhead -6. **Familiar**: JavaScript-native APIs that compose naturally - -### High-Level API Overview - -```typescript -// Shared collections with full type safety -const users = new Bun.SharedMap(); -const tasks = new Bun.SharedQueue(); -const metrics = new Bun.SharedRecord(); - -// Transactional updates prevent race conditions -await Bun.transaction(() => { - const user = users.get(userId); - user.score += points; - users.set(userId, user); - tasks.push(new Notification(user.id)); -}); - -// Reactive subscriptions across workers -for await (const change of users.watch(userId)) { - updateUI(change.newValue); -} - -// Structured concurrency -const pool = new Bun.WorkerPool("./worker.js", { size: 4 }); -const results = await pool.map(items, processItem); -``` - -## Detailed API Design - -### 1. Shared Collections - -#### SharedMap -```typescript -class SharedMap { - // Basic operations - set(key: K, value: V): void; - get(key: K): V | undefined; - has(key: K): boolean; - delete(key: K): boolean; - clear(): void; - - // Iteration - keys(): IterableIterator; - values(): IterableIterator; - entries(): IterableIterator<[K, V]>; - - // Reactive operations - watch(key: K): AsyncIterableIterator>; - watchAll(): AsyncIterableIterator>; - - // Batch operations - setMany(entries: Iterable<[K, V]>): void; - getMany(keys: Iterable): Map; - - // Metadata - readonly size: number; - readonly memory: number; // Memory usage in bytes -} - -interface ChangeEvent { - type: 'set' | 'delete'; - oldValue?: V; - newValue?: V; - timestamp: number; -} -``` - -#### SharedArray -```typescript -class SharedArray { - // Array-like interface - get length(): number; - get(index: number): T | undefined; - set(index: number, value: T): void; - push(...items: T[]): number; - pop(): T | undefined; - - // Batch operations - slice(start?: number, end?: number): T[]; - splice(start: number, deleteCount?: number, ...items: T[]): T[]; - - // Iteration - [Symbol.iterator](): IterableIterator; - entries(): IterableIterator<[number, T]>; - - // Reactive - watch(): AsyncIterableIterator>; - watchIndex(index: number): AsyncIterableIterator>; -} -``` - -#### SharedQueue -```typescript -class SharedQueue { - enqueue(item: T): void; - dequeue(): Promise; // Waits if empty - tryDequeue(): T | undefined; // Non-blocking - - peek(): T | undefined; - clear(): void; - - readonly size: number; - readonly isEmpty: boolean; - - // Batch operations - enqueueMany(items: T[]): void; - dequeueMany(count: number): Promise; - - // Async iteration - [Symbol.asyncIterator](): AsyncIterableIterator; -} -``` - -#### SharedRecord -```typescript -class SharedRecord> { - get(key: K): T[K] | undefined; - set(key: K, value: T[K]): void; - - // Reactive updates - watch(key: K): AsyncIterableIterator>; - watchAll(): AsyncIterableIterator>; - - // Batch operations - update(partial: Partial): void; - assign(object: Partial): void; - - // Conversion - toObject(): T; - keys(): (keyof T)[]; - values(): T[keyof T][]; -} -``` - -### 2. Transactions - -```typescript -namespace Bun { - // Software Transactional Memory - function transaction(fn: () => T | Promise): Promise; - - // Optimistic locking with retry - function transaction( - fn: () => T | Promise, - options: { - maxRetries?: number; - backoff?: 'linear' | 'exponential'; - timeout?: number; - } - ): Promise; - - // Read-only transactions (optimized) - function readTransaction(fn: () => T | Promise): Promise; - - // Manual conflict detection - function isInTransaction(): boolean; - function getTransactionId(): string | null; -} -``` - -### 3. Worker Pool Management - -```typescript -class WorkerPool { - constructor( - scriptPath: string, - options: { - size?: number; - maxTasks?: number; - idleTimeout?: number; - } - ); - - // Parallel execution - map(items: T[], fn: (item: T) => R | Promise): Promise; - - // Task scheduling - execute(fn: () => T | Promise): Promise; - - // Resource management - resize(newSize: number): Promise; - drain(): Promise; - terminate(): Promise; - - // Monitoring - readonly activeWorkers: number; - readonly queuedTasks: number; - readonly completedTasks: number; -} -``` - -### 4. Structured Concurrency - -```typescript -namespace Bun { - // All-or-nothing parallel execution - function concurrent(tasks: (() => T | Promise)[]): Promise; - - // Race with cancellation - function race(tasks: (() => T | Promise)[]): Promise; - - // Timeout with cleanup - function timeout( - fn: () => T | Promise, - ms: number - ): Promise; - - // Pipeline processing - function pipeline( - input: AsyncIterable, - stages: PipelineStage[], - options?: { parallelism?: number } - ): AsyncIterable; -} -``` - -## Technical Implementation - -### Building on WebKit Infrastructure - -The implementation leverages WebKit's existing thread-safe primitives: - -```cpp -// Core shared data structure -template -class SharedMap : public ThreadSafeRefCounted> { -private: - mutable Lock m_lock; - WTF_GUARDED_BY_LOCK(m_lock) HashMap> m_data; - WTF_GUARDED_BY_LOCK(m_lock) Vector> m_observers; - -public: - void set(const K& key, RefPtr value); - RefPtr get(const K& key) const; - void notifyObservers(const K& key, ChangeType type); -}; - -// Transaction implementation using versioned data -class TransactionManager { -private: - thread_local TransactionContext* s_currentTransaction; - AtomicCounter m_globalVersion; - -public: - template - T executeTransaction(Function&& fn); - - bool validateAndCommit(TransactionContext&); - void rollback(TransactionContext&); -}; -``` - -### Memory Management - -```cpp -// Efficient structured cloning -class SharedValue { - RefPtr m_serialized; - mutable std::optional m_cachedValue; - -public: - // Zero-copy read access when possible - JSValue toJSValue(JSGlobalObject*) const; - - // Efficient updates using copy-on-write - static Ref create(JSGlobalObject*, JSValue); -}; -``` - -### Change Notification System - -```cpp -// Observer pattern for reactive updates -class ChangeObserver : public CanMakeWeakPtr { -public: - virtual void notifyChange(const ChangeEvent&) = 0; - virtual bool isInSameThread() const = 0; -}; - -// Cross-thread notification queue -class NotificationQueue { - ThreadSafeQueue m_queue; - -public: - void enqueue(ChangeEvent); - std::optional dequeue(); - void notifyWaiters(); -}; -``` - -## Usage Examples - -### Example 1: Real-time Game Server - -```typescript -// Shared game state across worker threads -const players = new Bun.SharedMap(); -const gameEvents = new Bun.SharedQueue(); -const gameConfig = new Bun.SharedRecord(); - -// Worker 1: Handle player connections -async function handlePlayerJoin(playerId: PlayerId, playerData: Player) { - await Bun.transaction(() => { - players.set(playerId, playerData); - gameEvents.enqueue({ - type: 'player_joined', - playerId, - timestamp: Date.now() - }); - }); -} - -// Worker 2: Game logic -for await (const event of gameEvents) { - switch (event.type) { - case 'player_moved': - await Bun.transaction(() => { - const player = players.get(event.playerId); - if (player) { - player.position = event.newPosition; - players.set(event.playerId, player); - } - }); - break; - } -} - -// Worker 3: Broadcasting updates -for await (const change of players.watchAll()) { - broadcastToClients({ - type: 'state_update', - playerId: change.key, - player: change.newValue - }); -} -``` - -### Example 2: Data Processing Pipeline - -```typescript -// Shared cache for expensive computations -const computationCache = new Bun.SharedMap(); -const workQueue = new Bun.SharedQueue(); - -// Producer: Add work items -async function addWork(data: RawData[]) { - workQueue.enqueueMany(data); -} - -// Worker pool: Process items with caching -const pool = new Bun.WorkerPool('./processor.js', { size: 8 }); - -async function processItem(item: RawData): Promise { - const cacheKey = hashItem(item); - - // Check cache first - const cached = computationCache.get(cacheKey); - if (cached) return cached; - - // Expensive computation - const result = await expensiveProcess(item); - - // Cache result for other workers - await Bun.transaction(() => { - computationCache.set(cacheKey, result); - }); - - return result; -} - -// Process all items in parallel -const results = await pool.map(workItems, processItem); -``` - -### Example 3: Configuration Management - -```typescript -// Shared application configuration -const appConfig = new Bun.SharedRecord(); -const featureFlags = new Bun.SharedMap(); - -// Main thread: Update configuration -async function updateConfig(newConfig: Partial) { - await Bun.transaction(() => { - appConfig.update(newConfig); - }); - - console.log('Configuration updated across all workers'); -} - -// Workers: React to configuration changes -for await (const change of appConfig.watch('apiEndpoint')) { - // Automatically reconfigure HTTP client - httpClient.setBaseURL(change.newValue); -} - -// Feature flag updates -for await (const change of featureFlags.watchAll()) { - console.log(`Feature ${change.key} is now ${change.newValue ? 'enabled' : 'disabled'}`); -} -``` - -## Performance Characteristics - -### Memory Efficiency -- **Zero-copy reads**: Multiple workers access same memory -- **Copy-on-write updates**: Efficient handling of large objects -- **Structured cloning**: Only when crossing thread boundaries -- **Automatic cleanup**: Garbage collection handles shared objects - -### Concurrency Performance -- **Lock-free reads**: Read transactions don't block -- **Optimistic updates**: Conflicts resolved automatically -- **Batched notifications**: Efficient observer updates -- **Work stealing**: Worker pools balance load automatically - -### Scalability -- **Horizontal scaling**: Add workers as needed -- **Memory bounded**: Configurable limits prevent runaway growth -- **Backpressure**: Queues handle flow control -- **Monitoring**: Built-in metrics for optimization - -## Migration Path - -### From Current postMessage Patterns - -```typescript -// Before: Manual message passing -worker.postMessage({ type: 'config', data: config }); -worker.onmessage = (e) => { - if (e.data.type === 'config_updated') { - // Handle update - } -}; - -// After: Reactive shared state -await Bun.transaction(() => { - sharedConfig.update(config); -}); - -for await (const change of sharedConfig.watchAll()) { - // Automatically notified of changes -} -``` - -### From SharedArrayBuffer - -```typescript -// Before: Manual byte-level coordination -const sharedBuffer = new SharedArrayBuffer(1024); -const view = new Int32Array(sharedBuffer); - -// Complex manual locking -while (Atomics.compareExchange(view, 0, 0, 1) !== 0) {} -// Critical section -view[1] = newValue; -Atomics.store(view, 0, 0); // Release lock - -// After: Transactional updates -await Bun.transaction(() => { - sharedData.set('key', newValue); -}); -``` - -## Alternative Approaches Considered - -### 1. Event-Driven Architecture -**Pros**: Loose coupling, familiar patterns -**Cons**: Harder to maintain consistency, potential race conditions - -### 2. Actor Model -**Pros**: Strong isolation, message-passing semantics -**Cons**: Serialization overhead, more complex programming model - -### 3. Shared Memory with Manual Locking -**Pros**: Maximum performance, direct control -**Cons**: High complexity, error-prone, deadlock risks - -### 4. External State Stores (Redis, etc.) -**Pros**: Proven at scale, persistence -**Cons**: Network overhead, operational complexity - -## Implementation Phases - -### Phase 1: Core Shared Collections -- SharedMap, SharedArray, SharedQueue -- Basic transaction support -- Single-process implementation - -### Phase 2: Advanced Features -- SharedRecord with type safety -- Reactive observers and watchers -- Worker pool management - -### Phase 3: Production Hardening -- Performance optimization -- Memory management tuning -- Debugging and monitoring tools - -### Phase 4: Ecosystem Integration -- TypeScript integration -- Framework adapters -- Migration utilities - -## Security Considerations - -### Memory Safety -- All shared data validated through structured cloning -- No direct memory access to prevent corruption -- Automatic bounds checking for collections - -### Isolation -- Process-level isolation maintained -- Worker sandboxing preserved -- No cross-origin sharing - -### Resource Limits -- Configurable memory limits per shared collection -- Automatic cleanup of orphaned data -- Protection against memory exhaustion - -## Conclusion - -This proposal addresses fundamental limitations in JavaScript's concurrency model by providing type-safe, efficient shared state management. Built on WebKit's robust infrastructure, it enables new classes of high-performance applications while maintaining JavaScript's accessibility and safety guarantees. - -The design balances power and usability, offering advanced developers the tools they need for complex concurrent applications while providing safety rails that prevent common concurrency bugs. - -By building on proven patterns from other languages (Software Transactional Memory, reactive programming) and adapting them to JavaScript's strengths, this proposal represents a natural evolution of the platform's concurrency capabilities. \ No newline at end of file diff --git a/src/bun.js/bindings/BunProcessStorage.cpp b/src/bun.js/bindings/BunProcessStorage.cpp index 5f0fdd554a..bd7c4f5db4 100644 --- a/src/bun.js/bindings/BunProcessStorage.cpp +++ b/src/bun.js/bindings/BunProcessStorage.cpp @@ -55,6 +55,30 @@ public: m_storage.clear(); } + String getOrSetItem(const String& key, const String& defaultValue) { + Locker locker { m_lock }; + auto it = m_storage.find(key); + if (it != m_storage.end()) { + return it->value; + } + // Item doesn't exist, set it and return the value + String isolatedKey = key.isolatedCopy(); + String isolatedValue = defaultValue.isolatedCopy(); + m_storage.set(isolatedKey, isolatedValue); + return isolatedValue; + } + + String takeItem(const String& key) { + Locker locker { m_lock }; + auto it = m_storage.find(key); + if (it != m_storage.end()) { + String value = it->value; + m_storage.remove(it); + return value; + } + return String(); + } + private: ProcessStorage() = default; ~ProcessStorage() = default; @@ -145,6 +169,56 @@ JSC_DEFINE_HOST_FUNCTION(jsFunctionProcessStorageClear, (JSGlobalObject* globalO return JSValue::encode(jsUndefined()); } +JSC_DEFINE_HOST_FUNCTION(jsFunctionProcessStorageGetOrSetItem, (JSGlobalObject* globalObject, CallFrame* callFrame)) +{ + auto& vm = getVM(globalObject); + auto scope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 2) { + throwTypeError(globalObject, scope, "getOrSetItem requires 2 arguments"_s); + return {}; + } + + JSValue keyValue = callFrame->uncheckedArgument(0); + JSValue defaultValue = callFrame->uncheckedArgument(1); + + String key = keyValue.toWTFString(globalObject); + RETURN_IF_EXCEPTION(scope, {}); + + String defaultString = defaultValue.toWTFString(globalObject); + RETURN_IF_EXCEPTION(scope, {}); + + String result = ProcessStorage::getInstance().getOrSetItem(key, defaultString); + + return JSValue::encode(jsString(vm, result)); +} + +JSC_DEFINE_HOST_FUNCTION(jsFunctionProcessStorageTakeItem, (JSGlobalObject* globalObject, CallFrame* callFrame)) +{ + auto& vm = getVM(globalObject); + auto scope = DECLARE_THROW_SCOPE(vm); + + if (callFrame->argumentCount() < 1) { + throwTypeError(globalObject, scope, "takeItem requires 1 argument"_s); + return {}; + } + + JSValue keyValue = callFrame->uncheckedArgument(0); + if (keyValue.isUndefinedOrNull()) { + return JSValue::encode(jsNull()); + } + + String key = keyValue.toWTFString(globalObject); + RETURN_IF_EXCEPTION(scope, {}); + + String value = ProcessStorage::getInstance().takeItem(key); + if (value.isNull()) { + return JSValue::encode(jsNull()); + } + + return JSValue::encode(jsString(vm, value)); +} + // Function to create the processStorage object JSValue constructProcessStorageObject(VM& vm, JSObject* bunObject) { @@ -170,6 +244,16 @@ JSValue constructProcessStorageObject(VM& vm, JSObject* bunObject) JSC::Identifier::fromString(vm, "clear"_s), 0, jsFunctionProcessStorageClear, ImplementationVisibility::Public, NoIntrinsic, JSC::PropertyAttribute::DontDelete | 0); + + processStorageObject->putDirectNativeFunction(vm, globalObject, + JSC::Identifier::fromString(vm, "getOrSetItem"_s), 2, + jsFunctionProcessStorageGetOrSetItem, ImplementationVisibility::Public, NoIntrinsic, + JSC::PropertyAttribute::DontDelete | 0); + + processStorageObject->putDirectNativeFunction(vm, globalObject, + JSC::Identifier::fromString(vm, "takeItem"_s), 1, + jsFunctionProcessStorageTakeItem, ImplementationVisibility::Public, NoIntrinsic, + JSC::PropertyAttribute::DontDelete | 0); return processStorageObject; } diff --git a/test/js/bun/process-storage.test.ts b/test/js/bun/process-storage.test.ts index fe87364051..adfdadd354 100644 --- a/test/js/bun/process-storage.test.ts +++ b/test/js/bun/process-storage.test.ts @@ -196,3 +196,58 @@ test("Bun.experimental_processStorage concurrent access", async () => { storage.clear(); }); + +test("Bun.experimental_processStorage getOrSetItem", () => { + const storage = Bun.experimental_processStorage; + storage.clear(); + + // Test setting a new item + const result1 = storage.getOrSetItem("new-key", "default-value"); + expect(result1).toBe("default-value"); + expect(storage.getItem("new-key")).toBe("default-value"); + + // Test getting an existing item (should not overwrite) + storage.setItem("existing-key", "existing-value"); + const result2 = storage.getOrSetItem("existing-key", "new-default"); + expect(result2).toBe("existing-value"); + expect(storage.getItem("existing-key")).toBe("existing-value"); + + // Test with type conversion + const result3 = storage.getOrSetItem("number-key", 42); + expect(result3).toBe("42"); + expect(storage.getItem("number-key")).toBe("42"); + + storage.clear(); +}); + +test("Bun.experimental_processStorage takeItem", () => { + const storage = Bun.experimental_processStorage; + storage.clear(); + + // Test taking a non-existent item + const result1 = storage.takeItem("non-existent"); + expect(result1).toBe(null); + + // Test taking an existing item + storage.setItem("to-take", "take-me"); + const result2 = storage.takeItem("to-take"); + expect(result2).toBe("take-me"); + + // Verify item was removed + expect(storage.getItem("to-take")).toBe(null); + + // Test taking the same item again (should be null) + const result3 = storage.takeItem("to-take"); + expect(result3).toBe(null); + + // Test with multiple items + storage.setItem("item1", "value1"); + storage.setItem("item2", "value2"); + + const taken1 = storage.takeItem("item1"); + expect(taken1).toBe("value1"); + expect(storage.getItem("item1")).toBe(null); + expect(storage.getItem("item2")).toBe("value2"); // Should still exist + + storage.clear(); +});