diff --git a/src/bun.js/bindings/webcore/WebSocket.cpp b/src/bun.js/bindings/webcore/WebSocket.cpp index 67d994b077..728e0120ad 100644 --- a/src/bun.js/bindings/webcore/WebSocket.cpp +++ b/src/bun.js/bindings/webcore/WebSocket.cpp @@ -185,6 +185,7 @@ WebSocket::WebSocket(ScriptExecutionContext& context) m_state = CONNECTING; m_hasPendingActivity.store(true); m_rejectUnauthorized = Bun__getTLSRejectUnauthorizedValue() != 0; + onDidChangeListener = &WebSocket::onDidChangeListenerImpl; } WebSocket::~WebSocket() @@ -1265,22 +1266,17 @@ void WebSocket::didReceiveMessage(String&& message) // } // } - if (this->hasEventListeners("message"_s)) { - // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener + if (m_hasMessageEventListener) { + // Dispatch immediately if we have a listener this->incPendingActivityCount(); dispatchEvent(MessageEvent::create(WTF::move(message), m_url.string())); this->decPendingActivityCount(); return; } - if (auto* context = scriptExecutionContext()) { - this->incPendingActivityCount(); - context->postTask([this, message_ = WTF::move(message), protectedThis = Ref { *this }](ScriptExecutionContext& context) { - ASSERT(scriptExecutionContext()); - protectedThis->dispatchEvent(MessageEvent::create(message_, protectedThis->m_url.string())); - protectedThis->decPendingActivityCount(); - }); - } + // Queue the message to be delivered when a listener is attached + // This mimics browser behavior where messages are buffered until onmessage is set + m_pendingMessages.append(QueuedTextMessage { WTF::move(message) }); // }); } @@ -1296,95 +1292,57 @@ void WebSocket::didReceiveBinaryData(const AtomString& eventName, const std::spa // if (auto* inspector = m_channel->channelInspector()) // inspector->didReceiveWebSocketFrame(WebSocketChannelInspector::createFrame(binaryData.data(), binaryData.size(), WebSocketFrame::OpCode::OpCodeBinary)); // } + + // For "message" events, check if we have a listener and queue if not. + // For "ping" and "pong" events, they have their own listeners, so we dispatch immediately. + bool isMessageEvent = eventName == eventNames().messageEvent; + bool hasListener = isMessageEvent ? m_hasMessageEventListener : this->hasEventListeners(eventName); + + if (!hasListener && isMessageEvent) { + // Queue the binary message to be delivered when a listener is attached + Vector dataCopy(binaryData.size()); + memcpy(dataCopy.begin(), binaryData.data(), binaryData.size()); + m_pendingMessages.append(QueuedBinaryMessage { eventName, WTF::move(dataCopy) }); + return; + } + + // Dispatch immediately if we have a listener switch (m_binaryType) { - case BinaryType::Blob: - if (this->hasEventListeners(eventName)) { - // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener - this->incPendingActivityCount(); - RefPtr blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject()); - dispatchEvent(MessageEvent::create(eventName, blob.releaseNonNull(), m_url.string())); - this->decPendingActivityCount(); - return; - } - - if (auto* context = scriptExecutionContext()) { - RefPtr blob = Blob::create(binaryData, context->jsGlobalObject()); - context->postTask([this, name = eventName, blob = blob.releaseNonNull(), protectedThis = Ref { *this }](ScriptExecutionContext& context) { - ASSERT(scriptExecutionContext()); - protectedThis->dispatchEvent(MessageEvent::create(name, blob, protectedThis->m_url.string())); - protectedThis->decPendingActivityCount(); - }); - } - + case BinaryType::Blob: { + this->incPendingActivityCount(); + RefPtr blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject()); + dispatchEvent(MessageEvent::create(eventName, blob.releaseNonNull(), m_url.string())); + this->decPendingActivityCount(); break; + } case BinaryType::ArrayBuffer: { - if (this->hasEventListeners(eventName)) { - // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener - this->incPendingActivityCount(); - dispatchEvent(MessageEvent::create(eventName, ArrayBuffer::create(binaryData), m_url.string())); - this->decPendingActivityCount(); - return; - } - - if (auto* context = scriptExecutionContext()) { - auto arrayBuffer = JSC::ArrayBuffer::create(binaryData); - this->incPendingActivityCount(); - context->postTask([this, name = eventName, buffer = WTF::move(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { - ASSERT(scriptExecutionContext()); - protectedThis->dispatchEvent(MessageEvent::create(name, buffer, m_url.string())); - protectedThis->decPendingActivityCount(); - }); - } - + this->incPendingActivityCount(); + dispatchEvent(MessageEvent::create(eventName, ArrayBuffer::create(binaryData), m_url.string())); + this->decPendingActivityCount(); break; } case BinaryType::NodeBuffer: { + this->incPendingActivityCount(); + auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm()); + JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData); - if (this->hasEventListeners(eventName)) { - // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener - this->incPendingActivityCount(); - auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm()); - JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData); + if (!buffer || scope.exception()) [[unlikely]] { + scope.clearExceptionExceptTermination(); - if (!buffer || scope.exception()) [[unlikely]] { - scope.clearExceptionExceptTermination(); - - ErrorEvent::Init errorInit; - errorInit.message = "Failed to allocate memory for binary data"_s; - dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit)); - this->decPendingActivityCount(); - return; - } - - JSC::EnsureStillAliveScope ensureStillAlive(buffer); - MessageEvent::Init init; - init.data = buffer; - init.origin = this->m_url.string(); - - dispatchEvent(MessageEvent::create(eventName, WTF::move(init), EventIsTrusted::Yes)); + ErrorEvent::Init errorInit; + errorInit.message = "Failed to allocate memory for binary data"_s; + dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit)); this->decPendingActivityCount(); return; } - if (auto* context = scriptExecutionContext()) { - auto arrayBuffer = JSC::ArrayBuffer::tryCreate(binaryData); - - this->incPendingActivityCount(); - - context->postTask([name = eventName, buffer = WTF::move(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) { - size_t length = buffer->byteLength(); - auto* globalObject = context.jsGlobalObject(); - auto* subclassStructure = static_cast(globalObject)->JSBufferSubclassStructure(); - JSUint8Array* uint8array = JSUint8Array::create(globalObject, subclassStructure, buffer.copyRef(), 0, length); - JSC::EnsureStillAliveScope ensureStillAlive(uint8array); - MessageEvent::Init init; - init.data = uint8array; - init.origin = protectedThis->m_url.string(); - protectedThis->dispatchEvent(MessageEvent::create(name, WTF::move(init), EventIsTrusted::Yes)); - protectedThis->decPendingActivityCount(); - }); - } + JSC::EnsureStillAliveScope ensureStillAlive(buffer); + MessageEvent::Init init; + init.data = buffer; + init.origin = this->m_url.string(); + dispatchEvent(MessageEvent::create(eventName, WTF::move(init), EventIsTrusted::Yes)); + this->decPendingActivityCount(); break; } } @@ -1722,6 +1680,88 @@ void WebSocket::didConnectWithTunnel(void* tunnel, char* bufferedData, size_t bu WebSocketProxyTunnel__setConnectedWebSocket(tunnel, this->m_connectedWebSocket.client); } +void WebSocket::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind) +{ + auto& webSocket = static_cast(self); + + if (eventType != eventNames().messageEvent) + return; + + if (kind == OnDidChangeListenerKind::Add) { + // Track that we have a message listener now + webSocket.m_hasMessageEventListener = true; + // Flush any pending messages that were queued before the listener was attached + webSocket.flushPendingMessages(); + } else if (kind == OnDidChangeListenerKind::Remove || kind == OnDidChangeListenerKind::Clear) { + // Update the flag based on whether there are still listeners + webSocket.m_hasMessageEventListener = webSocket.hasEventListeners(eventNames().messageEvent); + } +} + +void WebSocket::flushPendingMessages() +{ + if (m_state != OPEN || m_pendingMessages.isEmpty()) + return; + + // Move the queue to a local variable to avoid issues if new messages arrive during dispatch + auto pendingMessages = std::exchange(m_pendingMessages, Deque()); + + for (auto& queuedMessage : pendingMessages) { + if (m_state != OPEN) + break; + + std::visit([this](auto&& msg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + this->incPendingActivityCount(); + dispatchEvent(MessageEvent::create(WTF::move(msg.message), m_url.string())); + this->decPendingActivityCount(); + } else if constexpr (std::is_same_v) { + // Re-dispatch the binary data using the same path as normal delivery + std::span binaryData(msg.data.begin(), msg.data.size()); + switch (m_binaryType) { + case BinaryType::Blob: { + this->incPendingActivityCount(); + RefPtr blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject()); + dispatchEvent(MessageEvent::create(msg.eventName, blob.releaseNonNull(), m_url.string())); + this->decPendingActivityCount(); + break; + } + case BinaryType::ArrayBuffer: { + this->incPendingActivityCount(); + dispatchEvent(MessageEvent::create(msg.eventName, ArrayBuffer::create(binaryData), m_url.string())); + this->decPendingActivityCount(); + break; + } + case BinaryType::NodeBuffer: { + this->incPendingActivityCount(); + auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm()); + JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData); + + if (!buffer || scope.exception()) [[unlikely]] { + scope.clearExceptionExceptTermination(); + ErrorEvent::Init errorInit; + errorInit.message = "Failed to allocate memory for binary data"_s; + dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit)); + this->decPendingActivityCount(); + return; + } + + JSC::EnsureStillAliveScope ensureStillAlive(buffer); + MessageEvent::Init init; + init.data = buffer; + init.origin = m_url.string(); + + dispatchEvent(MessageEvent::create(msg.eventName, WTF::move(init), EventIsTrusted::Yes)); + this->decPendingActivityCount(); + break; + } + } + } + }, queuedMessage); + } +} + } // namespace WebCore extern "C" void WebSocket__didConnect(WebCore::WebSocket* webSocket, us_socket_t* socket, char* bufferedData, size_t len, const PerMessageDeflateParams* deflate_params, void* customSSLCtx) diff --git a/src/bun.js/bindings/webcore/WebSocket.h b/src/bun.js/bindings/webcore/WebSocket.h index b993aab6c3..b2c08a91bb 100644 --- a/src/bun.js/bindings/webcore/WebSocket.h +++ b/src/bun.js/bindings/webcore/WebSocket.h @@ -37,6 +37,8 @@ #include #include #include +#include +#include #include "FetchHeaders.h" #include "WebSocketErrorCode.h" @@ -259,6 +261,22 @@ private: bool m_dispatchedErrorEvent { false }; // RefPtr> m_pendingActivity; + + // Message queue for buffering messages when no listener is attached + // This mimics browser behavior where messages are queued until a handler is set + struct QueuedTextMessage { + String message; + }; + struct QueuedBinaryMessage { + AtomString eventName; + Vector data; + }; + using QueuedMessage = std::variant; + Deque m_pendingMessages; + bool m_hasMessageEventListener { false }; + + void flushPendingMessages(); + static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind); }; } // namespace WebCore diff --git a/test/regression/issue/26560.test.ts b/test/regression/issue/26560.test.ts new file mode 100644 index 0000000000..8587600e2c --- /dev/null +++ b/test/regression/issue/26560.test.ts @@ -0,0 +1,265 @@ +import { expect, test } from "bun:test"; + +// Test that WebSocket messages sent immediately after handshake are not lost +// when onmessage handler is not set at the time of message arrival. +// Browsers queue these messages until a handler is attached. +// See: https://github.com/oven-sh/bun/issues/26560 + +test("WebSocket messages should be buffered when no listener is attached", async () => { + const server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return; + return new Response("Not found", { status: 404 }); + }, + websocket: { + open(ws) { + // Send messages immediately when the connection opens + ws.send("message1"); + ws.send("message2"); + ws.send("message3"); + }, + message() {}, + close() {}, + }, + }); + + try { + const ws = new WebSocket(`ws://localhost:${server.port}`); + + const received: string[] = []; + const { promise, resolve } = Promise.withResolvers(); + + // Wait a bit before attaching the handler to ensure messages arrive first + await Bun.sleep(50); + + ws.onmessage = event => { + received.push(event.data); + if (received.length === 3) { + resolve(); + } + }; + + // Wait for all messages or timeout + await Promise.race([ + promise, + Bun.sleep(1000).then(() => { + throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`); + }), + ]); + + expect(received).toEqual(["message1", "message2", "message3"]); + + ws.close(); + } finally { + server.stop(); + } +}); + +test("WebSocket messages should be buffered using addEventListener", async () => { + const server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return; + return new Response("Not found", { status: 404 }); + }, + websocket: { + open(ws) { + ws.send("hello"); + ws.send("world"); + }, + message() {}, + close() {}, + }, + }); + + try { + const ws = new WebSocket(`ws://localhost:${server.port}`); + + const received: string[] = []; + const { promise, resolve } = Promise.withResolvers(); + + // Wait before adding event listener + await Bun.sleep(50); + + ws.addEventListener("message", event => { + received.push(event.data); + if (received.length === 2) { + resolve(); + } + }); + + await Promise.race([ + promise, + Bun.sleep(1000).then(() => { + throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`); + }), + ]); + + expect(received).toEqual(["hello", "world"]); + + ws.close(); + } finally { + server.stop(); + } +}); + +test("WebSocket binary messages should be buffered when no listener is attached", async () => { + const server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return; + return new Response("Not found", { status: 404 }); + }, + websocket: { + open(ws) { + ws.send(new Uint8Array([1, 2, 3])); + ws.send(new Uint8Array([4, 5, 6])); + }, + message() {}, + close() {}, + }, + }); + + try { + const ws = new WebSocket(`ws://localhost:${server.port}`); + ws.binaryType = "arraybuffer"; + + const received: ArrayBuffer[] = []; + const { promise, resolve } = Promise.withResolvers(); + + // Wait before adding handler + await Bun.sleep(50); + + ws.onmessage = event => { + received.push(event.data); + if (received.length === 2) { + resolve(); + } + }; + + await Promise.race([ + promise, + Bun.sleep(1000).then(() => { + throw new Error(`Timeout: Only received ${received.length} messages`); + }), + ]); + + expect(received.length).toBe(2); + expect(new Uint8Array(received[0])).toEqual(new Uint8Array([1, 2, 3])); + expect(new Uint8Array(received[1])).toEqual(new Uint8Array([4, 5, 6])); + + ws.close(); + } finally { + server.stop(); + } +}); + +test("WebSocket messages sent after listener attached should be received immediately", async () => { + const server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return; + return new Response("Not found", { status: 404 }); + }, + websocket: { + open(ws) { + // Don't send immediately, wait for client to be ready + }, + message(ws, message) { + // Echo back + ws.send("response: " + message); + }, + close() {}, + }, + }); + + try { + const ws = new WebSocket(`ws://localhost:${server.port}`); + + const received: string[] = []; + const { promise, resolve } = Promise.withResolvers(); + + // Attach listener immediately + ws.onmessage = event => { + received.push(event.data); + if (received.length === 1) { + resolve(); + } + }; + + ws.onopen = () => { + ws.send("test"); + }; + + await Promise.race([ + promise, + Bun.sleep(1000).then(() => { + throw new Error(`Timeout: Only received ${received.length} messages`); + }), + ]); + + expect(received).toEqual(["response: test"]); + + ws.close(); + } finally { + server.stop(); + } +}); + +test("WebSocket should handle mixed queued and live messages", async () => { + let serverWs: any = null; + + const server = Bun.serve({ + port: 0, + fetch(req, server) { + if (server.upgrade(req)) return; + return new Response("Not found", { status: 404 }); + }, + websocket: { + open(ws) { + serverWs = ws; + // Send some messages immediately + ws.send("queued1"); + ws.send("queued2"); + }, + message() {}, + close() {}, + }, + }); + + try { + const ws = new WebSocket(`ws://localhost:${server.port}`); + + const received: string[] = []; + const { promise, resolve } = Promise.withResolvers(); + + // Wait for queued messages to arrive + await Bun.sleep(50); + + ws.onmessage = event => { + received.push(event.data); + if (received.length === 4) { + resolve(); + } + }; + + // Give flush a moment to happen, then send more messages + await Bun.sleep(10); + serverWs.send("live1"); + serverWs.send("live2"); + + await Promise.race([ + promise, + Bun.sleep(1000).then(() => { + throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`); + }), + ]); + + expect(received).toEqual(["queued1", "queued2", "live1", "live2"]); + + ws.close(); + } finally { + server.stop(); + } +});