mirror of
https://github.com/oven-sh/bun
synced 2026-02-02 15:08:46 +00:00
@@ -4382,15 +4382,10 @@ pub const ServerWebSocket = struct {
|
||||
return .zero;
|
||||
}
|
||||
|
||||
if (this.isClosed() and !publish_to_self) {
|
||||
// We can't access the socket context on a closed socket.
|
||||
return JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
if (message_value.asArrayBuffer(globalThis)) |array_buffer| {
|
||||
const buffer = array_buffer.slice();
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
|
||||
@@ -4408,7 +4403,7 @@ pub const ServerWebSocket = struct {
|
||||
|
||||
const buffer = string_slice.slice();
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
|
||||
@@ -4469,17 +4464,12 @@ pub const ServerWebSocket = struct {
|
||||
return .zero;
|
||||
}
|
||||
|
||||
if (this.isClosed() and !publish_to_self) {
|
||||
// Can't publish on a closed socket.
|
||||
return JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
var string_slice = message_value.toSlice(globalThis, bun.default_allocator);
|
||||
defer string_slice.deinit();
|
||||
|
||||
const buffer = string_slice.slice();
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
|
||||
@@ -4540,18 +4530,13 @@ pub const ServerWebSocket = struct {
|
||||
return .zero;
|
||||
}
|
||||
|
||||
if (this.isClosed() and !publish_to_self) {
|
||||
// Can't publish on a closed socket.
|
||||
return JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
const array_buffer = message_value.asArrayBuffer(globalThis) orelse {
|
||||
globalThis.throw("publishBinary expects an ArrayBufferView", .{});
|
||||
return .zero;
|
||||
};
|
||||
const buffer = array_buffer.slice();
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
|
||||
@@ -4591,12 +4576,7 @@ pub const ServerWebSocket = struct {
|
||||
return JSC.JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
if (this.isClosed() and !publish_to_self) {
|
||||
// We can't access the socket context on a closed socket.
|
||||
return JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .binary, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .binary, compress);
|
||||
@@ -4639,12 +4619,7 @@ pub const ServerWebSocket = struct {
|
||||
return JSC.JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
if (this.isClosed() and !publish_to_self) {
|
||||
// We can't access the socket context on a closed socket.
|
||||
return JSValue.jsNumber(0);
|
||||
}
|
||||
|
||||
const result = if (!publish_to_self)
|
||||
const result = if (!publish_to_self and !this.isClosed())
|
||||
this.websocket().publish(topic_slice.slice(), buffer, .text, compress)
|
||||
else
|
||||
uws.AnyWebSocket.publishWithOptions(ssl, app, topic_slice.slice(), buffer, .text, compress);
|
||||
@@ -6052,6 +6027,9 @@ pub fn NewServer(comptime NamespaceType: type, comptime ssl_enabled_: bool, comp
|
||||
if (!abrupt) {
|
||||
listener.close();
|
||||
} else if (!this.flags.terminated) {
|
||||
if (this.config.websocket) |*ws| {
|
||||
ws.handler.app = null;
|
||||
}
|
||||
this.flags.terminated = true;
|
||||
this.app.close();
|
||||
}
|
||||
|
||||
@@ -86,6 +86,73 @@ afterEach(() => {
|
||||
}
|
||||
});
|
||||
|
||||
// publish on a closed websocket
|
||||
// connecct 2 websocket clients to one server
|
||||
// wait for one to call close callback
|
||||
// publish to the other client
|
||||
// the other client should not receive the message
|
||||
// the server should not crash
|
||||
// https://github.com/oven-sh/bun/issues/4443
|
||||
it("websocket/4443", async () => {
|
||||
var serverSockets: ServerWebSocket<unknown>[] = [];
|
||||
var onFirstConnected = Promise.withResolvers();
|
||||
var onSecondMessageEchoedBack = Promise.withResolvers();
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
websocket: {
|
||||
open(ws) {
|
||||
serverSockets.push(ws);
|
||||
ws.subscribe("test");
|
||||
if (serverSockets.length === 2) {
|
||||
onFirstConnected.resolve();
|
||||
}
|
||||
},
|
||||
message(ws, message) {
|
||||
onSecondMessageEchoedBack.resolve();
|
||||
ws.close();
|
||||
},
|
||||
close(ws) {
|
||||
ws.publish("test", "close");
|
||||
},
|
||||
},
|
||||
fetch(req, server) {
|
||||
server.upgrade(req);
|
||||
return new Response();
|
||||
},
|
||||
});
|
||||
|
||||
var clients = [];
|
||||
var closedCount = 0;
|
||||
var onClientsOpened = Promise.withResolvers();
|
||||
|
||||
var { promise, resolve } = Promise.withResolvers();
|
||||
for (let i = 0; i < 2; i++) {
|
||||
const ws = new WebSocket(`ws://${server.hostname}:${server.port}`);
|
||||
ws.binaryType = "arraybuffer";
|
||||
|
||||
const clientSocket = new WebSocket(`ws://${server.hostname}:${server.port}`);
|
||||
clientSocket.binaryType = "arraybuffer";
|
||||
clientSocket.onopen = () => {
|
||||
clients.push(clientSocket);
|
||||
if (clients.length === 2) {
|
||||
onClientsOpened.resolve();
|
||||
}
|
||||
};
|
||||
clientSocket.onmessage = e => {
|
||||
clientSocket.send(e.data);
|
||||
};
|
||||
clientSocket.onclose = () => {
|
||||
if (closedCount++ === 1) {
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
await Promise.all([onFirstConnected.promise, onClientsOpened.promise]);
|
||||
clients[0].close();
|
||||
await promise;
|
||||
});
|
||||
|
||||
describe("Server", () => {
|
||||
test("subscribe", done => ({
|
||||
open(ws) {
|
||||
|
||||
Reference in New Issue
Block a user