fix(websocket): buffer messages when no listener is attached

Fixes #26560

WebSocket messages sent immediately after handshake were lost when the
`onmessage` handler was not set at the time of message arrival. Browsers
queue these messages until a handler is attached, but Bun was discarding
them.

Changes:
- Add message queue to WebSocket class for buffering messages
- When a message arrives and no listener is attached, queue it instead
  of dispatching to nothing
- When a message listener is first added (via onmessage or
  addEventListener), flush all queued messages
- Support both text and binary message buffering

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Claude Bot
2026-01-29 15:57:36 +00:00
parent 7ebfdf97a8
commit 203e5e9914
3 changed files with 409 additions and 86 deletions

View File

@@ -185,6 +185,7 @@ WebSocket::WebSocket(ScriptExecutionContext& context)
m_state = CONNECTING;
m_hasPendingActivity.store(true);
m_rejectUnauthorized = Bun__getTLSRejectUnauthorizedValue() != 0;
onDidChangeListener = &WebSocket::onDidChangeListenerImpl;
}
WebSocket::~WebSocket()
@@ -1265,22 +1266,17 @@ void WebSocket::didReceiveMessage(String&& message)
// }
// }
if (this->hasEventListeners("message"_s)) {
// the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener
if (m_hasMessageEventListener) {
// Dispatch immediately if we have a listener
this->incPendingActivityCount();
dispatchEvent(MessageEvent::create(WTF::move(message), m_url.string()));
this->decPendingActivityCount();
return;
}
if (auto* context = scriptExecutionContext()) {
this->incPendingActivityCount();
context->postTask([this, message_ = WTF::move(message), protectedThis = Ref { *this }](ScriptExecutionContext& context) {
ASSERT(scriptExecutionContext());
protectedThis->dispatchEvent(MessageEvent::create(message_, protectedThis->m_url.string()));
protectedThis->decPendingActivityCount();
});
}
// Queue the message to be delivered when a listener is attached
// This mimics browser behavior where messages are buffered until onmessage is set
m_pendingMessages.append(QueuedTextMessage { WTF::move(message) });
// });
}
@@ -1296,95 +1292,57 @@ void WebSocket::didReceiveBinaryData(const AtomString& eventName, const std::spa
// if (auto* inspector = m_channel->channelInspector())
// inspector->didReceiveWebSocketFrame(WebSocketChannelInspector::createFrame(binaryData.data(), binaryData.size(), WebSocketFrame::OpCode::OpCodeBinary));
// }
// For "message" events, check if we have a listener and queue if not.
// For "ping" and "pong" events, they have their own listeners, so we dispatch immediately.
bool isMessageEvent = eventName == eventNames().messageEvent;
bool hasListener = isMessageEvent ? m_hasMessageEventListener : this->hasEventListeners(eventName);
if (!hasListener && isMessageEvent) {
// Queue the binary message to be delivered when a listener is attached
Vector<uint8_t> dataCopy(binaryData.size());
memcpy(dataCopy.begin(), binaryData.data(), binaryData.size());
m_pendingMessages.append(QueuedBinaryMessage { eventName, WTF::move(dataCopy) });
return;
}
// Dispatch immediately if we have a listener
switch (m_binaryType) {
case BinaryType::Blob:
if (this->hasEventListeners(eventName)) {
// the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener
this->incPendingActivityCount();
RefPtr<Blob> blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject());
dispatchEvent(MessageEvent::create(eventName, blob.releaseNonNull(), m_url.string()));
this->decPendingActivityCount();
return;
}
if (auto* context = scriptExecutionContext()) {
RefPtr<Blob> blob = Blob::create(binaryData, context->jsGlobalObject());
context->postTask([this, name = eventName, blob = blob.releaseNonNull(), protectedThis = Ref { *this }](ScriptExecutionContext& context) {
ASSERT(scriptExecutionContext());
protectedThis->dispatchEvent(MessageEvent::create(name, blob, protectedThis->m_url.string()));
protectedThis->decPendingActivityCount();
});
}
case BinaryType::Blob: {
this->incPendingActivityCount();
RefPtr<Blob> blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject());
dispatchEvent(MessageEvent::create(eventName, blob.releaseNonNull(), m_url.string()));
this->decPendingActivityCount();
break;
}
case BinaryType::ArrayBuffer: {
if (this->hasEventListeners(eventName)) {
// the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener
this->incPendingActivityCount();
dispatchEvent(MessageEvent::create(eventName, ArrayBuffer::create(binaryData), m_url.string()));
this->decPendingActivityCount();
return;
}
if (auto* context = scriptExecutionContext()) {
auto arrayBuffer = JSC::ArrayBuffer::create(binaryData);
this->incPendingActivityCount();
context->postTask([this, name = eventName, buffer = WTF::move(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) {
ASSERT(scriptExecutionContext());
protectedThis->dispatchEvent(MessageEvent::create(name, buffer, m_url.string()));
protectedThis->decPendingActivityCount();
});
}
this->incPendingActivityCount();
dispatchEvent(MessageEvent::create(eventName, ArrayBuffer::create(binaryData), m_url.string()));
this->decPendingActivityCount();
break;
}
case BinaryType::NodeBuffer: {
this->incPendingActivityCount();
auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm());
JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData);
if (this->hasEventListeners(eventName)) {
// the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener
this->incPendingActivityCount();
auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm());
JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData);
if (!buffer || scope.exception()) [[unlikely]] {
scope.clearExceptionExceptTermination();
if (!buffer || scope.exception()) [[unlikely]] {
scope.clearExceptionExceptTermination();
ErrorEvent::Init errorInit;
errorInit.message = "Failed to allocate memory for binary data"_s;
dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit));
this->decPendingActivityCount();
return;
}
JSC::EnsureStillAliveScope ensureStillAlive(buffer);
MessageEvent::Init init;
init.data = buffer;
init.origin = this->m_url.string();
dispatchEvent(MessageEvent::create(eventName, WTF::move(init), EventIsTrusted::Yes));
ErrorEvent::Init errorInit;
errorInit.message = "Failed to allocate memory for binary data"_s;
dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit));
this->decPendingActivityCount();
return;
}
if (auto* context = scriptExecutionContext()) {
auto arrayBuffer = JSC::ArrayBuffer::tryCreate(binaryData);
this->incPendingActivityCount();
context->postTask([name = eventName, buffer = WTF::move(arrayBuffer), protectedThis = Ref { *this }](ScriptExecutionContext& context) {
size_t length = buffer->byteLength();
auto* globalObject = context.jsGlobalObject();
auto* subclassStructure = static_cast<Zig::GlobalObject*>(globalObject)->JSBufferSubclassStructure();
JSUint8Array* uint8array = JSUint8Array::create(globalObject, subclassStructure, buffer.copyRef(), 0, length);
JSC::EnsureStillAliveScope ensureStillAlive(uint8array);
MessageEvent::Init init;
init.data = uint8array;
init.origin = protectedThis->m_url.string();
protectedThis->dispatchEvent(MessageEvent::create(name, WTF::move(init), EventIsTrusted::Yes));
protectedThis->decPendingActivityCount();
});
}
JSC::EnsureStillAliveScope ensureStillAlive(buffer);
MessageEvent::Init init;
init.data = buffer;
init.origin = this->m_url.string();
dispatchEvent(MessageEvent::create(eventName, WTF::move(init), EventIsTrusted::Yes));
this->decPendingActivityCount();
break;
}
}
@@ -1722,6 +1680,88 @@ void WebSocket::didConnectWithTunnel(void* tunnel, char* bufferedData, size_t bu
WebSocketProxyTunnel__setConnectedWebSocket(tunnel, this->m_connectedWebSocket.client);
}
void WebSocket::onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind)
{
auto& webSocket = static_cast<WebSocket&>(self);
if (eventType != eventNames().messageEvent)
return;
if (kind == OnDidChangeListenerKind::Add) {
// Track that we have a message listener now
webSocket.m_hasMessageEventListener = true;
// Flush any pending messages that were queued before the listener was attached
webSocket.flushPendingMessages();
} else if (kind == OnDidChangeListenerKind::Remove || kind == OnDidChangeListenerKind::Clear) {
// Update the flag based on whether there are still listeners
webSocket.m_hasMessageEventListener = webSocket.hasEventListeners(eventNames().messageEvent);
}
}
void WebSocket::flushPendingMessages()
{
if (m_state != OPEN || m_pendingMessages.isEmpty())
return;
// Move the queue to a local variable to avoid issues if new messages arrive during dispatch
auto pendingMessages = std::exchange(m_pendingMessages, Deque<QueuedMessage>());
for (auto& queuedMessage : pendingMessages) {
if (m_state != OPEN)
break;
std::visit([this](auto&& msg) {
using T = std::decay_t<decltype(msg)>;
if constexpr (std::is_same_v<T, QueuedTextMessage>) {
this->incPendingActivityCount();
dispatchEvent(MessageEvent::create(WTF::move(msg.message), m_url.string()));
this->decPendingActivityCount();
} else if constexpr (std::is_same_v<T, QueuedBinaryMessage>) {
// Re-dispatch the binary data using the same path as normal delivery
std::span<const uint8_t> binaryData(msg.data.begin(), msg.data.size());
switch (m_binaryType) {
case BinaryType::Blob: {
this->incPendingActivityCount();
RefPtr<Blob> blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject());
dispatchEvent(MessageEvent::create(msg.eventName, blob.releaseNonNull(), m_url.string()));
this->decPendingActivityCount();
break;
}
case BinaryType::ArrayBuffer: {
this->incPendingActivityCount();
dispatchEvent(MessageEvent::create(msg.eventName, ArrayBuffer::create(binaryData), m_url.string()));
this->decPendingActivityCount();
break;
}
case BinaryType::NodeBuffer: {
this->incPendingActivityCount();
auto scope = DECLARE_TOP_EXCEPTION_SCOPE(scriptExecutionContext()->vm());
JSUint8Array* buffer = createBuffer(scriptExecutionContext()->jsGlobalObject(), binaryData);
if (!buffer || scope.exception()) [[unlikely]] {
scope.clearExceptionExceptTermination();
ErrorEvent::Init errorInit;
errorInit.message = "Failed to allocate memory for binary data"_s;
dispatchEvent(ErrorEvent::create(eventNames().errorEvent, errorInit));
this->decPendingActivityCount();
return;
}
JSC::EnsureStillAliveScope ensureStillAlive(buffer);
MessageEvent::Init init;
init.data = buffer;
init.origin = m_url.string();
dispatchEvent(MessageEvent::create(msg.eventName, WTF::move(init), EventIsTrusted::Yes));
this->decPendingActivityCount();
break;
}
}
}
}, queuedMessage);
}
}
} // namespace WebCore
extern "C" void WebSocket__didConnect(WebCore::WebSocket* webSocket, us_socket_t* socket, char* bufferedData, size_t len, const PerMessageDeflateParams* deflate_params, void* customSSLCtx)

View File

@@ -37,6 +37,8 @@
#include <wtf/URL.h>
#include <wtf/HashSet.h>
#include <wtf/Lock.h>
#include <wtf/Deque.h>
#include <variant>
#include "FetchHeaders.h"
#include "WebSocketErrorCode.h"
@@ -259,6 +261,22 @@ private:
bool m_dispatchedErrorEvent { false };
// RefPtr<PendingActivity<WebSocket>> m_pendingActivity;
// Message queue for buffering messages when no listener is attached
// This mimics browser behavior where messages are queued until a handler is set
struct QueuedTextMessage {
String message;
};
struct QueuedBinaryMessage {
AtomString eventName;
Vector<uint8_t> data;
};
using QueuedMessage = std::variant<QueuedTextMessage, QueuedBinaryMessage>;
Deque<QueuedMessage> m_pendingMessages;
bool m_hasMessageEventListener { false };
void flushPendingMessages();
static void onDidChangeListenerImpl(EventTarget& self, const AtomString& eventType, OnDidChangeListenerKind kind);
};
} // namespace WebCore

View File

@@ -0,0 +1,265 @@
import { expect, test } from "bun:test";
// Test that WebSocket messages sent immediately after handshake are not lost
// when onmessage handler is not set at the time of message arrival.
// Browsers queue these messages until a handler is attached.
// See: https://github.com/oven-sh/bun/issues/26560
test("WebSocket messages should be buffered when no listener is attached", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) return;
return new Response("Not found", { status: 404 });
},
websocket: {
open(ws) {
// Send messages immediately when the connection opens
ws.send("message1");
ws.send("message2");
ws.send("message3");
},
message() {},
close() {},
},
});
try {
const ws = new WebSocket(`ws://localhost:${server.port}`);
const received: string[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
// Wait a bit before attaching the handler to ensure messages arrive first
await Bun.sleep(50);
ws.onmessage = event => {
received.push(event.data);
if (received.length === 3) {
resolve();
}
};
// Wait for all messages or timeout
await Promise.race([
promise,
Bun.sleep(1000).then(() => {
throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`);
}),
]);
expect(received).toEqual(["message1", "message2", "message3"]);
ws.close();
} finally {
server.stop();
}
});
test("WebSocket messages should be buffered using addEventListener", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) return;
return new Response("Not found", { status: 404 });
},
websocket: {
open(ws) {
ws.send("hello");
ws.send("world");
},
message() {},
close() {},
},
});
try {
const ws = new WebSocket(`ws://localhost:${server.port}`);
const received: string[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
// Wait before adding event listener
await Bun.sleep(50);
ws.addEventListener("message", event => {
received.push(event.data);
if (received.length === 2) {
resolve();
}
});
await Promise.race([
promise,
Bun.sleep(1000).then(() => {
throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`);
}),
]);
expect(received).toEqual(["hello", "world"]);
ws.close();
} finally {
server.stop();
}
});
test("WebSocket binary messages should be buffered when no listener is attached", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) return;
return new Response("Not found", { status: 404 });
},
websocket: {
open(ws) {
ws.send(new Uint8Array([1, 2, 3]));
ws.send(new Uint8Array([4, 5, 6]));
},
message() {},
close() {},
},
});
try {
const ws = new WebSocket(`ws://localhost:${server.port}`);
ws.binaryType = "arraybuffer";
const received: ArrayBuffer[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
// Wait before adding handler
await Bun.sleep(50);
ws.onmessage = event => {
received.push(event.data);
if (received.length === 2) {
resolve();
}
};
await Promise.race([
promise,
Bun.sleep(1000).then(() => {
throw new Error(`Timeout: Only received ${received.length} messages`);
}),
]);
expect(received.length).toBe(2);
expect(new Uint8Array(received[0])).toEqual(new Uint8Array([1, 2, 3]));
expect(new Uint8Array(received[1])).toEqual(new Uint8Array([4, 5, 6]));
ws.close();
} finally {
server.stop();
}
});
test("WebSocket messages sent after listener attached should be received immediately", async () => {
const server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) return;
return new Response("Not found", { status: 404 });
},
websocket: {
open(ws) {
// Don't send immediately, wait for client to be ready
},
message(ws, message) {
// Echo back
ws.send("response: " + message);
},
close() {},
},
});
try {
const ws = new WebSocket(`ws://localhost:${server.port}`);
const received: string[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
// Attach listener immediately
ws.onmessage = event => {
received.push(event.data);
if (received.length === 1) {
resolve();
}
};
ws.onopen = () => {
ws.send("test");
};
await Promise.race([
promise,
Bun.sleep(1000).then(() => {
throw new Error(`Timeout: Only received ${received.length} messages`);
}),
]);
expect(received).toEqual(["response: test"]);
ws.close();
} finally {
server.stop();
}
});
test("WebSocket should handle mixed queued and live messages", async () => {
let serverWs: any = null;
const server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) return;
return new Response("Not found", { status: 404 });
},
websocket: {
open(ws) {
serverWs = ws;
// Send some messages immediately
ws.send("queued1");
ws.send("queued2");
},
message() {},
close() {},
},
});
try {
const ws = new WebSocket(`ws://localhost:${server.port}`);
const received: string[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
// Wait for queued messages to arrive
await Bun.sleep(50);
ws.onmessage = event => {
received.push(event.data);
if (received.length === 4) {
resolve();
}
};
// Give flush a moment to happen, then send more messages
await Bun.sleep(10);
serverWs.send("live1");
serverWs.send("live2");
await Promise.race([
promise,
Bun.sleep(1000).then(() => {
throw new Error(`Timeout: Only received ${received.length} messages: ${JSON.stringify(received)}`);
}),
]);
expect(received).toEqual(["queued1", "queued2", "live1", "live2"]);
ws.close();
} finally {
server.stop();
}
});