mirror of
https://github.com/oven-sh/bun
synced 2026-02-12 11:59:00 +00:00
implement a custom websocket client
This commit is contained in:
committed by
Jarred Sumner
parent
ab888d2ebe
commit
dda85d92c9
@@ -63,18 +63,18 @@ static uWS::WebSocketContext<SSL, false, WebCore::WebSocket*>* registerWebSocket
|
||||
auto* opts = ctx->getExt();
|
||||
|
||||
/* Maximum message size we can receive */
|
||||
static unsigned int maxPayloadLength = 128 * 1024 * 1024;
|
||||
unsigned int maxPayloadLength = 16 * 1024;
|
||||
/* 2 minutes timeout is good */
|
||||
static unsigned short idleTimeout = 120;
|
||||
unsigned short idleTimeout = 120;
|
||||
/* 64kb backpressure is probably good */
|
||||
static unsigned int maxBackpressure = 128 * 1024 * 1024;
|
||||
static bool closeOnBackpressureLimit = false;
|
||||
unsigned int maxBackpressure = 64 * 1024;
|
||||
bool closeOnBackpressureLimit = false;
|
||||
/* This one depends on kernel timeouts and is a bad default */
|
||||
static bool resetIdleTimeoutOnSend = false;
|
||||
bool resetIdleTimeoutOnSend = false;
|
||||
/* A good default, esp. for newcomers */
|
||||
static bool sendPingsAutomatically = true;
|
||||
bool sendPingsAutomatically = false;
|
||||
/* Maximum socket lifetime in seconds before forced closure (defaults to disabled) */
|
||||
static unsigned short maxLifetime = 0;
|
||||
unsigned short maxLifetime = 0;
|
||||
|
||||
opts->maxPayloadLength = maxPayloadLength;
|
||||
opts->maxBackpressure = maxBackpressure;
|
||||
|
||||
@@ -210,6 +210,10 @@ pub const ZigString = extern struct {
|
||||
return JSC.JSValue.fromRef(slice_).getZigString(ctx.ptr());
|
||||
}
|
||||
|
||||
pub fn from16Slice(slice_: []const u16) ZigString {
|
||||
return from16(slice_.ptr, slice_.len);
|
||||
}
|
||||
|
||||
pub fn from16(slice_: [*]const u16, len: usize) ZigString {
|
||||
var str = init(@ptrCast([*]const u8, slice_)[0..len]);
|
||||
str.markUTF16();
|
||||
|
||||
@@ -189,6 +189,8 @@ pub const JSArrayBufferSink = JSC.WebCore.ArrayBufferSink.JSSink;
|
||||
// WebSocket
|
||||
pub const WebSocketHTTPClient = @import("../../../http/websocket_http_client.zig").WebSocketHTTPClient;
|
||||
pub const WebSocketHTTSPClient = @import("../../../http/websocket_http_client.zig").WebSocketHTTPSClient;
|
||||
pub const WebSocketClient = @import("../../../http/websocket_http_client.zig").WebSocketClient;
|
||||
pub const WebSocketClientTLS = @import("../../../http/websocket_http_client.zig").WebSocketClientTLS;
|
||||
|
||||
pub fn Errorable(comptime Type: type) type {
|
||||
return extern struct {
|
||||
@@ -2510,6 +2512,8 @@ pub const Formatter = ZigConsoleClient.Formatter;
|
||||
comptime {
|
||||
WebSocketHTTPClient.shim.ref();
|
||||
WebSocketHTTSPClient.shim.ref();
|
||||
WebSocketClient.shim.ref();
|
||||
WebSocketClientTLS.shim.ref();
|
||||
|
||||
if (!is_bindgen) {
|
||||
_ = Process.getTitle;
|
||||
|
||||
@@ -420,8 +420,6 @@ ExceptionOr<void> WebSocket::send(const String& message)
|
||||
if (utf8.length() > 0)
|
||||
this->sendWebSocketData<false>(utf8.data(), utf8.length());
|
||||
|
||||
delete utf8;
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -490,31 +488,34 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length)
|
||||
if constexpr (isBinary)
|
||||
opCode = uWS::OpCode::BINARY;
|
||||
|
||||
switch (m_connectedWebSocketKind) {
|
||||
case ConnectedWebSocketKind::Client: {
|
||||
this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::ClientSSL: {
|
||||
this->m_connectedWebSocket.clientSSL->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.clientSSL->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::Server: {
|
||||
this->m_connectedWebSocket.server->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.server->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::ServerSSL: {
|
||||
this->m_connectedWebSocket.serverSSL->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.serverSSL->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
RELEASE_ASSERT_NOT_REACHED();
|
||||
}
|
||||
}
|
||||
this->m_connectedWebSocket.client->cork(
|
||||
[&]() {
|
||||
switch (m_connectedWebSocketKind) {
|
||||
case ConnectedWebSocketKind::Client: {
|
||||
this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::ClientSSL: {
|
||||
this->m_connectedWebSocket.clientSSL->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.clientSSL->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::Server: {
|
||||
this->m_connectedWebSocket.server->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.server->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
case ConnectedWebSocketKind::ServerSSL: {
|
||||
this->m_connectedWebSocket.serverSSL->send({ baseAddress, length }, opCode);
|
||||
this->m_bufferedAmount = this->m_connectedWebSocket.serverSSL->getBufferedAmount();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
RELEASE_ASSERT_NOT_REACHED();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ExceptionOr<void> WebSocket::close(std::optional<unsigned short> optionalCode, const String& reason)
|
||||
@@ -856,16 +857,18 @@ void WebSocket::didConnect(us_socket_t* socket, char* bufferedData, size_t buffe
|
||||
|
||||
this->didConnect();
|
||||
}
|
||||
void WebSocket::didFailToConnect(int32_t code)
|
||||
void WebSocket::didFailWithErrorCode(int32_t code)
|
||||
{
|
||||
m_state = CLOSED;
|
||||
|
||||
// this means we already handled it
|
||||
if (this->m_upgradeClient == nullptr) {
|
||||
if (this->m_upgradeClient == nullptr && this->m_connectedWebSocketKind == ConnectedWebSocketKind::None) {
|
||||
return;
|
||||
}
|
||||
|
||||
this->m_upgradeClient = nullptr;
|
||||
this->m_connectedWebSocketKind = ConnectedWebSocketKind::None;
|
||||
this->m_connectedWebSocket.client = nullptr;
|
||||
|
||||
switch (code) {
|
||||
// cancel
|
||||
@@ -982,7 +985,7 @@ extern "C" void WebSocket__didConnect(WebCore::WebSocket* webSocket, us_socket_t
|
||||
{
|
||||
webSocket->didConnect(socket, bufferedData, len);
|
||||
}
|
||||
extern "C" void WebSocket__didFailToConnect(WebCore::WebSocket* webSocket, int32_t errorCode)
|
||||
extern "C" void WebSocket__didFailWithErrorCode(WebCore::WebSocket* webSocket, int32_t errorCode)
|
||||
{
|
||||
webSocket->didFailToConnect(errorCode);
|
||||
webSocket->didFailWithErrorCode(errorCode);
|
||||
}
|
||||
@@ -97,7 +97,7 @@ public:
|
||||
void didConnect();
|
||||
void didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason);
|
||||
void didConnect(us_socket_t* socket, char* bufferedData, size_t bufferedDataSize);
|
||||
void didFailToConnect(int32_t code);
|
||||
void didFailWithErrorCode(int32_t code);
|
||||
|
||||
void didReceiveMessage(String&& message);
|
||||
void didReceiveData(const char* data, size_t length);
|
||||
|
||||
@@ -7,12 +7,14 @@ const Syscall = @import("./node/syscall.zig");
|
||||
const JSC = @import("javascript_core");
|
||||
const std = @import("std");
|
||||
const BoringSSL = @import("boringssl");
|
||||
boring_ssl_engine: ?*BoringSSL.ENGINE = null,
|
||||
const WebSocketClientMask = @import("../../http/websocket_http_client.zig").Mask;
|
||||
|
||||
boring_ssl_engine: ?*BoringSSL.ENGINE = null,
|
||||
editor_context: EditorContext = EditorContext{},
|
||||
stderr_store: ?*Blob.Store = null,
|
||||
stdin_store: ?*Blob.Store = null,
|
||||
stdout_store: ?*Blob.Store = null,
|
||||
websocket_mask: WebSocketClientMask = WebSocketClientMask{},
|
||||
|
||||
// TODO: make this per JSGlobalObject instead of global
|
||||
// This does not handle ShadowRealm correctly!
|
||||
|
||||
Reference in New Issue
Block a user