mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
Add ServerWebSocket.subscriptions getter (#24299)
## Summary
Adds a `subscriptions` getter to `ServerWebSocket` that returns an array
of all topics the WebSocket is currently subscribed to.
## Implementation
- Added `getTopicsCount()` and `iterateTopics()` helpers to uWS
WebSocket
- Implemented C++ function `uws_ws_get_topics_as_js_array` that:
- Uses `JSC::MarkedArgumentBuffer` to protect values from GC
- Constructs JSArray directly in C++ for efficiency
- Uses template pattern for SSL/TCP variants
- Properly handles iterator locks with explicit scopes
- Exposed as `subscriptions` getter property on ServerWebSocket
- Returns empty array when WebSocket is closed (not null)
## API
```typescript
const server = Bun.serve({
websocket: {
open(ws) {
ws.subscribe("chat");
ws.subscribe("notifications");
console.log(ws.subscriptions); // ["chat", "notifications"]
ws.unsubscribe("chat");
console.log(ws.subscriptions); // ["notifications"]
}
}
});
```
## Test Coverage
Added 5 comprehensive test cases covering:
- Basic subscription/unsubscription flow
- All subscriptions removed
- Behavior after WebSocket close
- Duplicate subscriptions (should only appear once)
- Multiple subscribe/unsubscribe cycles
All tests pass with 24 assertions.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Bot <claude-bot@bun.sh>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
10
packages/bun-types/serve.d.ts
vendored
10
packages/bun-types/serve.d.ts
vendored
@@ -202,6 +202,16 @@ declare module "bun" {
|
||||
*/
|
||||
isSubscribed(topic: string): boolean;
|
||||
|
||||
/**
|
||||
* Returns an array of all topics the client is currently subscribed to.
|
||||
*
|
||||
* @example
|
||||
* ws.subscribe("chat");
|
||||
* ws.subscribe("notifications");
|
||||
* console.log(ws.subscriptions); // ["chat", "notifications"]
|
||||
*/
|
||||
readonly subscriptions: string[];
|
||||
|
||||
/**
|
||||
* Batches `send()` and `publish()` operations, which makes it faster to send data.
|
||||
*
|
||||
|
||||
@@ -312,6 +312,9 @@ export default [
|
||||
fn: "isSubscribed",
|
||||
length: 1,
|
||||
},
|
||||
subscriptions: {
|
||||
getter: "getSubscriptions",
|
||||
},
|
||||
remoteAddress: {
|
||||
getter: "getRemoteAddress",
|
||||
cache: true,
|
||||
|
||||
@@ -1232,6 +1232,18 @@ pub fn isSubscribed(
|
||||
return JSValue.jsBoolean(this.websocket().isSubscribed(topic.slice()));
|
||||
}
|
||||
|
||||
pub fn getSubscriptions(
|
||||
this: *ServerWebSocket,
|
||||
globalThis: *jsc.JSGlobalObject,
|
||||
) bun.JSError!JSValue {
|
||||
if (this.isClosed()) {
|
||||
return try JSValue.createEmptyArray(globalThis, 0);
|
||||
}
|
||||
|
||||
// Get the JSValue directly from C++
|
||||
return this.websocket().getTopicsAsJSArray(globalThis);
|
||||
}
|
||||
|
||||
pub fn getRemoteAddress(
|
||||
this: *ServerWebSocket,
|
||||
globalThis: *jsc.JSGlobalObject,
|
||||
|
||||
47
src/bun.js/bindings/uws_bindings.cpp
Normal file
47
src/bun.js/bindings/uws_bindings.cpp
Normal file
@@ -0,0 +1,47 @@
|
||||
// clang-format off
|
||||
#include "root.h"
|
||||
|
||||
#include "JavaScriptCore/JSGlobalObject.h"
|
||||
#include "JavaScriptCore/JSArray.h"
|
||||
#include "JavaScriptCore/ObjectConstructor.h"
|
||||
#include "wtf/text/WTFString.h"
|
||||
#include <bun-uws/src/App.h>
|
||||
#include <span>
|
||||
#include <string_view>
|
||||
|
||||
typedef void uws_websocket_t;
|
||||
|
||||
using TLSWebSocket = uWS::WebSocket<true, true, void *>;
|
||||
using TCPWebSocket = uWS::WebSocket<false, true, void *>;
|
||||
|
||||
// Template helpers (must be outside extern "C")
|
||||
template<bool isSSL>
|
||||
static JSC::EncodedJSValue uws_ws_get_topics_as_js_array_impl(uws_websocket_t *ws, void* globalObject) {
|
||||
JSC::JSGlobalObject* global = reinterpret_cast<JSC::JSGlobalObject*>(globalObject);
|
||||
JSC::VM& vm = global->vm();
|
||||
|
||||
using WebSocketType = typename std::conditional<isSSL, TLSWebSocket, TCPWebSocket>::type;
|
||||
WebSocketType *uws = reinterpret_cast<WebSocketType*>(ws);
|
||||
|
||||
JSC::MarkedArgumentBuffer args;
|
||||
{
|
||||
// Scope ensures the iterator lock is released before constructArray
|
||||
uws->iterateTopics([&](std::string_view topic) {
|
||||
auto str = WTF::String::fromUTF8ReplacingInvalidSequences(std::span {
|
||||
reinterpret_cast<const unsigned char*>(topic.data()),
|
||||
topic.length()
|
||||
});
|
||||
args.append(JSC::jsString(vm, str));
|
||||
});
|
||||
}
|
||||
|
||||
return JSC::JSValue::encode(JSC::constructArray(global, static_cast<JSC::ArrayAllocationProfile*>(nullptr), args));
|
||||
}
|
||||
|
||||
extern "C" JSC::EncodedJSValue uws_ws_get_topics_as_js_array(int ssl, uws_websocket_t *ws, void* globalObject) {
|
||||
if (ssl) {
|
||||
return uws_ws_get_topics_as_js_array_impl<true>(ws, globalObject);
|
||||
} else {
|
||||
return uws_ws_get_topics_as_js_array_impl<false>(ws, globalObject);
|
||||
}
|
||||
}
|
||||
@@ -49,6 +49,9 @@ pub fn NewWebSocket(comptime ssl_flag: c_int) type {
|
||||
pub fn isSubscribed(this: *WebSocket, topic: []const u8) bool {
|
||||
return c.uws_ws_is_subscribed(ssl_flag, this.raw(), topic.ptr, topic.len);
|
||||
}
|
||||
pub fn getTopicsAsJSArray(this: *WebSocket, globalObject: *JSGlobalObject) JSValue {
|
||||
return c.uws_ws_get_topics_as_js_array(ssl_flag, this.raw(), globalObject);
|
||||
}
|
||||
|
||||
pub fn publish(this: *WebSocket, topic: []const u8, message: []const u8) bool {
|
||||
return c.uws_ws_publish(ssl_flag, this.raw(), topic.ptr, topic.len, message.ptr, message.len);
|
||||
@@ -162,6 +165,12 @@ pub const AnyWebSocket = union(enum) {
|
||||
.tcp => c.uws_ws_is_subscribed(0, this.raw(), topic.ptr, topic.len),
|
||||
};
|
||||
}
|
||||
pub fn getTopicsAsJSArray(this: AnyWebSocket, globalObject: *JSGlobalObject) JSValue {
|
||||
return switch (this) {
|
||||
.ssl => c.uws_ws_get_topics_as_js_array(1, this.raw(), globalObject),
|
||||
.tcp => c.uws_ws_get_topics_as_js_array(0, this.raw(), globalObject),
|
||||
};
|
||||
}
|
||||
// pub fn iterateTopics(this: AnyWebSocket) {
|
||||
// return uws_ws_iterate_topics(ssl_flag, this.raw(), callback: ?*const fn ([*c]const u8, usize, ?*anyopaque) callconv(.C) void, user_data: ?*anyopaque) void;
|
||||
// }
|
||||
@@ -338,6 +347,7 @@ pub const c = struct {
|
||||
pub extern fn uws_ws_unsubscribe(ssl: i32, ws: ?*RawWebSocket, topic: [*c]const u8, length: usize) bool;
|
||||
pub extern fn uws_ws_is_subscribed(ssl: i32, ws: ?*RawWebSocket, topic: [*c]const u8, length: usize) bool;
|
||||
pub extern fn uws_ws_iterate_topics(ssl: i32, ws: ?*RawWebSocket, callback: ?*const fn ([*c]const u8, usize, ?*anyopaque) callconv(.C) void, user_data: ?*anyopaque) void;
|
||||
pub extern fn uws_ws_get_topics_as_js_array(ssl: i32, ws: *RawWebSocket, globalObject: *JSGlobalObject) JSValue;
|
||||
pub extern fn uws_ws_publish(ssl: i32, ws: ?*RawWebSocket, topic: [*]const u8, topic_length: usize, message: [*]const u8, message_length: usize) bool;
|
||||
pub extern fn uws_ws_publish_with_options(ssl: i32, ws: ?*RawWebSocket, topic: [*c]const u8, topic_length: usize, message: [*c]const u8, message_length: usize, opcode: Opcode, compress: bool) bool;
|
||||
pub extern fn uws_ws_get_buffered_amount(ssl: i32, ws: ?*RawWebSocket) usize;
|
||||
@@ -351,6 +361,9 @@ const bun = @import("bun");
|
||||
const std = @import("std");
|
||||
const uws_app_t = @import("./App.zig").uws_app_t;
|
||||
|
||||
const JSGlobalObject = bun.jsc.JSGlobalObject;
|
||||
const JSValue = bun.jsc.JSValue;
|
||||
|
||||
const uws = bun.uws;
|
||||
const NewApp = uws.NewApp;
|
||||
const Opcode = uws.Opcode;
|
||||
|
||||
@@ -168,6 +168,225 @@ describe("Server", () => {
|
||||
},
|
||||
}));
|
||||
|
||||
it("subscriptions - basic usage", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const { promise: onClosePromise, resolve: onClose } = Promise.withResolvers();
|
||||
|
||||
using server = serve({
|
||||
port: 0,
|
||||
fetch(req, server) {
|
||||
if (server.upgrade(req)) {
|
||||
return;
|
||||
}
|
||||
return new Response("Not a websocket");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
// Initially no subscriptions
|
||||
const initialSubs = ws.subscriptions;
|
||||
expect(Array.isArray(initialSubs)).toBeTrue();
|
||||
expect(initialSubs.length).toBe(0);
|
||||
|
||||
// Subscribe to multiple topics
|
||||
ws.subscribe("topic1");
|
||||
ws.subscribe("topic2");
|
||||
ws.subscribe("topic3");
|
||||
const threeSubs = ws.subscriptions;
|
||||
expect(threeSubs.length).toBe(3);
|
||||
expect(threeSubs).toContain("topic1");
|
||||
expect(threeSubs).toContain("topic2");
|
||||
expect(threeSubs).toContain("topic3");
|
||||
|
||||
// Unsubscribe from one
|
||||
ws.unsubscribe("topic2");
|
||||
const finalSubs = ws.subscriptions;
|
||||
|
||||
resolve(finalSubs);
|
||||
ws.close();
|
||||
},
|
||||
close() {
|
||||
onClose();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${server.port}`);
|
||||
ws.onclose = () => onClose();
|
||||
|
||||
const [subscriptions] = await Promise.all([promise, onClosePromise]);
|
||||
expect(subscriptions.length).toBe(2);
|
||||
expect(subscriptions).toContain("topic1");
|
||||
expect(subscriptions).toContain("topic3");
|
||||
expect(subscriptions).not.toContain("topic2");
|
||||
});
|
||||
|
||||
it("subscriptions - all unsubscribed", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const { promise: onClosePromise, resolve: onClose } = Promise.withResolvers();
|
||||
|
||||
using server = serve({
|
||||
port: 0,
|
||||
fetch(req, server) {
|
||||
if (server.upgrade(req)) {
|
||||
return;
|
||||
}
|
||||
return new Response("Not a websocket");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
// Subscribe to topics
|
||||
ws.subscribe("topic1");
|
||||
ws.subscribe("topic2");
|
||||
ws.subscribe("topic3");
|
||||
expect(ws.subscriptions.length).toBe(3);
|
||||
|
||||
// Unsubscribe from all
|
||||
ws.unsubscribe("topic1");
|
||||
ws.unsubscribe("topic2");
|
||||
ws.unsubscribe("topic3");
|
||||
const finalSubs = ws.subscriptions;
|
||||
|
||||
resolve(finalSubs);
|
||||
ws.close();
|
||||
},
|
||||
close() {
|
||||
onClose();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${server.port}`);
|
||||
ws.onclose = () => onClose();
|
||||
|
||||
const [subscriptions] = await Promise.all([promise, onClosePromise]);
|
||||
expect(subscriptions).toEqual([]);
|
||||
expect(subscriptions.length).toBe(0);
|
||||
});
|
||||
|
||||
it("subscriptions - after close", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const { promise: onClosePromise, resolve: onClose } = Promise.withResolvers();
|
||||
|
||||
using server = serve({
|
||||
port: 0,
|
||||
fetch(req, server) {
|
||||
if (server.upgrade(req)) {
|
||||
return;
|
||||
}
|
||||
return new Response("Not a websocket");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
ws.subscribe("topic1");
|
||||
ws.subscribe("topic2");
|
||||
expect(ws.subscriptions.length).toBe(2);
|
||||
ws.close();
|
||||
},
|
||||
close(ws) {
|
||||
// After close, should return empty array
|
||||
const subsAfterClose = ws.subscriptions;
|
||||
resolve(subsAfterClose);
|
||||
onClose();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${server.port}`);
|
||||
ws.onclose = () => onClose();
|
||||
|
||||
const [subscriptions] = await Promise.all([promise, onClosePromise]);
|
||||
expect(subscriptions).toStrictEqual([]);
|
||||
});
|
||||
|
||||
it("subscriptions - duplicate subscriptions", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const { promise: onClosePromise, resolve: onClose } = Promise.withResolvers();
|
||||
|
||||
using server = serve({
|
||||
port: 0,
|
||||
fetch(req, server) {
|
||||
if (server.upgrade(req)) {
|
||||
return;
|
||||
}
|
||||
return new Response("Not a websocket");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
// Subscribe to same topic multiple times
|
||||
ws.subscribe("topic1");
|
||||
ws.subscribe("topic1");
|
||||
ws.subscribe("topic1");
|
||||
const subs = ws.subscriptions;
|
||||
|
||||
resolve(subs);
|
||||
ws.close();
|
||||
},
|
||||
close() {
|
||||
onClose();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${server.port}`);
|
||||
ws.onclose = () => onClose();
|
||||
|
||||
const [subscriptions] = await Promise.all([promise, onClosePromise]);
|
||||
// Should only have one instance of topic1
|
||||
expect(subscriptions.length).toBe(1);
|
||||
expect(subscriptions).toContain("topic1");
|
||||
});
|
||||
|
||||
it("subscriptions - multiple cycles", async () => {
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
const { promise: onClosePromise, resolve: onClose } = Promise.withResolvers();
|
||||
|
||||
using server = serve({
|
||||
port: 0,
|
||||
fetch(req, server) {
|
||||
if (server.upgrade(req)) {
|
||||
return;
|
||||
}
|
||||
return new Response("Not a websocket");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
// First cycle
|
||||
ws.subscribe("topic1");
|
||||
expect(ws.subscriptions).toEqual(["topic1"]);
|
||||
|
||||
ws.unsubscribe("topic1");
|
||||
expect(ws.subscriptions.length).toBe(0);
|
||||
|
||||
// Second cycle with different topics
|
||||
ws.subscribe("topic2");
|
||||
ws.subscribe("topic3");
|
||||
expect(ws.subscriptions.length).toBe(2);
|
||||
|
||||
ws.unsubscribe("topic2");
|
||||
expect(ws.subscriptions).toEqual(["topic3"]);
|
||||
|
||||
// Third cycle - resubscribe to topic1
|
||||
ws.subscribe("topic1");
|
||||
const finalSubs = ws.subscriptions;
|
||||
|
||||
resolve(finalSubs);
|
||||
ws.close();
|
||||
},
|
||||
close() {
|
||||
onClose();
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${server.port}`);
|
||||
ws.onclose = () => onClose();
|
||||
|
||||
const [subscriptions] = await Promise.all([promise, onClosePromise]);
|
||||
expect(subscriptions.length).toBe(2);
|
||||
expect(subscriptions).toContain("topic1");
|
||||
expect(subscriptions).toContain("topic3");
|
||||
});
|
||||
|
||||
describe("websocket", () => {
|
||||
test("open", done => ({
|
||||
open(ws) {
|
||||
|
||||
Reference in New Issue
Block a user