From 7cdcd34f5811eeada95a89bcb85b6e534baa875d Mon Sep 17 00:00:00 2001 From: robobun Date: Fri, 1 Aug 2025 02:05:56 -0700 Subject: [PATCH] Add Blob support for WebSocket binaryType (#21471) --- src/bun.js/bindings/blob.h | 8 + .../bindings/webcore/JSMessageEventCustom.cpp | 9 +- src/bun.js/bindings/webcore/JSWebSocket.cpp | 63 ++---- src/bun.js/bindings/webcore/MessageEvent.cpp | 3 +- src/bun.js/bindings/webcore/MessageEvent.h | 5 +- src/bun.js/bindings/webcore/WebSocket.cpp | 119 +++++++--- src/bun.js/bindings/webcore/WebSocket.h | 12 +- src/bun.js/webcore/Blob.zig | 27 +++ src/http/websocket_client.zig | 43 ++++ test/js/web/websocket/websocket-blob.test.ts | 212 ++++++++++++++++++ 10 files changed, 418 insertions(+), 83 deletions(-) create mode 100644 test/js/web/websocket/websocket-blob.test.ts diff --git a/src/bun.js/bindings/blob.h b/src/bun.js/bindings/blob.h index 66eed55cb2..9e59790070 100644 --- a/src/bun.js/bindings/blob.h +++ b/src/bun.js/bindings/blob.h @@ -9,6 +9,9 @@ namespace WebCore { extern "C" void* Blob__dupeFromJS(JSC::EncodedJSValue impl); extern "C" void* Blob__dupe(void* impl); extern "C" void Blob__destroy(void* impl); +extern "C" void* Blob__getDataPtr(JSC::EncodedJSValue blob); +extern "C" size_t Blob__getSize(JSC::EncodedJSValue blob); +extern "C" void* Blob__fromBytes(JSC::JSGlobalObject* globalThis, const void* ptr, size_t len); class Blob : public RefCounted { public: @@ -26,6 +29,11 @@ public: return adoptRef(*new Blob(implPtr)); } + static RefPtr create(std::span bytes, JSC::JSGlobalObject* globalThis) + { + return adoptRef(*new Blob(Blob__fromBytes(globalThis, bytes.data(), bytes.size()))); + } + static RefPtr create(void* ptr) { void* implPtr = Blob__dupe(ptr); diff --git a/src/bun.js/bindings/webcore/JSMessageEventCustom.cpp b/src/bun.js/bindings/webcore/JSMessageEventCustom.cpp index 66dfccb263..e67b3b4b56 100644 --- a/src/bun.js/bindings/webcore/JSMessageEventCustom.cpp +++ b/src/bun.js/bindings/webcore/JSMessageEventCustom.cpp @@ -40,6 +40,7 @@ // #include "JSMessagePort.h" #include #include +#include "blob.h" namespace WebCore { @@ -57,9 +58,13 @@ JSC::JSValue JSMessageEvent::data(JSC::JSGlobalObject& lexicalGlobalObject) cons auto throwScope = DECLARE_THROW_SCOPE(lexicalGlobalObject.vm()); return cachedPropertyValue(throwScope, lexicalGlobalObject, *this, wrapped().cachedData(), [this, &lexicalGlobalObject](JSC::ThrowScope&) { return WTF::switchOn( - wrapped().data(), [this](MessageEvent::JSValueTag) -> JSC::JSValue { return wrapped().jsData().getValue(JSC::jsNull()); }, [this, &lexicalGlobalObject](const Ref& data) { + wrapped().data(), [this](MessageEvent::JSValueTag) -> JSC::JSValue { return wrapped().jsData().getValue(JSC::jsNull()); }, + [this, &lexicalGlobalObject](const Ref& data) { // FIXME: Is it best to handle errors by returning null rather than throwing an exception? - return data->deserialize(lexicalGlobalObject, globalObject(), wrapped().ports(), SerializationErrorMode::NonThrowing); }, [&lexicalGlobalObject](const String& data) { return toJS(lexicalGlobalObject, data); }, [this, &lexicalGlobalObject](const Ref& data) { return toJS>(lexicalGlobalObject, *globalObject(), data); }); + return data->deserialize(lexicalGlobalObject, globalObject(), wrapped().ports(), SerializationErrorMode::NonThrowing); }, + [&lexicalGlobalObject](const String& data) { return toJS(lexicalGlobalObject, data); }, + [this, &lexicalGlobalObject](const Ref& data) { return toJS>(lexicalGlobalObject, *globalObject(), data); }, + [this, &lexicalGlobalObject](const Ref& data) { return toJS>(lexicalGlobalObject, *globalObject(), data); }); }); } diff --git a/src/bun.js/bindings/webcore/JSWebSocket.cpp b/src/bun.js/bindings/webcore/JSWebSocket.cpp index b6e032ab6a..c9475097f9 100644 --- a/src/bun.js/bindings/webcore/JSWebSocket.cpp +++ b/src/bun.js/bindings/webcore/JSWebSocket.cpp @@ -26,7 +26,7 @@ #include "ExtendedDOMClientIsoSubspaces.h" #include "ExtendedDOMIsoSubspaces.h" #include "IDLTypes.h" -// #include "JSBlob.h" +#include "ZigGeneratedClasses.h" #include "JSDOMAttribute.h" #include "JSDOMBinding.h" #include "JSDOMConstructor.h" @@ -632,19 +632,6 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_send2Body(JSC::JS RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.send(data.releaseNonNull()); }))); } -// static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_send3Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) -// { -// auto& vm = JSC::getVM(lexicalGlobalObject); -// auto throwScope = DECLARE_THROW_SCOPE(vm); -// UNUSED_PARAM(throwScope); -// UNUSED_PARAM(callFrame); -// auto& impl = castedThis->wrapped(); -// EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); -// auto data = convert>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data"_s, "WebSocket"_s, "send"_s, "Blob"_s); }); -// RETURN_IF_EXCEPTION(throwScope, {}); -// RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.send(*data); }))); -// } - static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_send4Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) { auto& vm = JSC::getVM(lexicalGlobalObject); @@ -671,8 +658,12 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_sendOverloadDispa RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_send1Body(lexicalGlobalObject, callFrame, castedThis))); if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_send2Body(lexicalGlobalObject, callFrame, castedThis))); - // if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) - // RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_send3Body(lexicalGlobalObject, callFrame, castedThis))); + if (distinguishingArg.isObject()) { + if (auto* blob = jsDynamicCast(distinguishingArg)) { + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return castedThis->wrapped().send(blob); }))); + } + } + RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_send4Body(lexicalGlobalObject, callFrame, castedThis))); } return argsCount < 1 ? throwVMError(lexicalGlobalObject, throwScope, createNotEnoughArgumentsError(lexicalGlobalObject)) : throwVMTypeError(lexicalGlobalObject, throwScope); @@ -740,19 +731,6 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping3Body(JSC::JS RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(data.releaseNonNull()); }))); } -// static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping4Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) -// { -// auto& vm = JSC::getVM(lexicalGlobalObject); -// auto throwScope = DECLARE_THROW_SCOPE(vm); -// UNUSED_PARAM(throwScope); -// UNUSED_PARAM(callFrame); -// auto& impl = castedThis->wrapped(); -// EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); -// auto data = convert>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data"_s, "WebSocket"_s, "ping"_s, "Blob"_s); }); -// RETURN_IF_EXCEPTION(throwScope, {}); -// RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.ping(*data); }))); -// } - static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_ping5Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) { auto& vm = JSC::getVM(lexicalGlobalObject); @@ -781,8 +759,11 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pingOverloadDispa RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping2Body(lexicalGlobalObject, callFrame, castedThis))); if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping3Body(lexicalGlobalObject, callFrame, castedThis))); - // if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) - // RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping4Body(lexicalGlobalObject, callFrame, castedThis))); + if (distinguishingArg.isObject()) { + if (auto* blob = jsDynamicCast(distinguishingArg)) { + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return castedThis->wrapped().ping(blob); }))); + } + } RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_ping5Body(lexicalGlobalObject, callFrame, castedThis))); } return throwVMTypeError(lexicalGlobalObject, throwScope); @@ -829,19 +810,6 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong3Body(JSC::JS RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(data.releaseNonNull()); }))); } -// static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong4Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) -// { -// auto& vm = JSC::getVM(lexicalGlobalObject); -// auto throwScope = DECLARE_THROW_SCOPE(vm); -// UNUSED_PARAM(throwScope); -// UNUSED_PARAM(callFrame); -// auto& impl = castedThis->wrapped(); -// EnsureStillAliveScope argument0 = callFrame->uncheckedArgument(0); -// auto data = convert>(*lexicalGlobalObject, argument0.value(), [](JSC::JSGlobalObject& lexicalGlobalObject, JSC::ThrowScope& scope) { throwArgumentTypeError(lexicalGlobalObject, scope, 0, "data"_s, "WebSocket"_s, "pong"_s, "Blob"_s); }); -// RETURN_IF_EXCEPTION(throwScope, {}); -// RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return impl.pong(*data); }))); -// } - static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pong5Body(JSC::JSGlobalObject* lexicalGlobalObject, JSC::CallFrame* callFrame, typename IDLOperation::ClassParameter castedThis) { auto& vm = JSC::getVM(lexicalGlobalObject); @@ -870,8 +838,11 @@ static inline JSC::EncodedJSValue jsWebSocketPrototypeFunction_pongOverloadDispa RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong2Body(lexicalGlobalObject, callFrame, castedThis))); if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong3Body(lexicalGlobalObject, callFrame, castedThis))); - // if (distinguishingArg.isObject() && asObject(distinguishingArg)->inherits()) - // RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong4Body(lexicalGlobalObject, callFrame, castedThis))); + if (distinguishingArg.isObject()) { + if (auto* blob = jsDynamicCast(distinguishingArg)) { + RELEASE_AND_RETURN(throwScope, JSValue::encode(toJS(*lexicalGlobalObject, throwScope, [&]() -> decltype(auto) { return castedThis->wrapped().pong(blob); }))); + } + } RELEASE_AND_RETURN(throwScope, (jsWebSocketPrototypeFunction_pong5Body(lexicalGlobalObject, callFrame, castedThis))); } return throwVMTypeError(lexicalGlobalObject, throwScope); diff --git a/src/bun.js/bindings/webcore/MessageEvent.cpp b/src/bun.js/bindings/webcore/MessageEvent.cpp index 8fdc58adb4..c8a00d99ed 100644 --- a/src/bun.js/bindings/webcore/MessageEvent.cpp +++ b/src/bun.js/bindings/webcore/MessageEvent.cpp @@ -34,6 +34,7 @@ #include "JSMessageEvent.h" #include #include +#include "blob.h" namespace WebCore { @@ -149,7 +150,7 @@ size_t MessageEvent::memoryCost() const m_data, [](JSValueTag) -> size_t { return 0; }, [](const Ref& data) -> size_t { return data->memoryCost(); }, [](const String& string) -> size_t { return string.sizeInBytes(); }, - // [](const Ref& blob) -> size_t { return blob->size(); }, + [](const Ref& blob) -> size_t { return blob->memoryCost(); }, [](const Ref& buffer) -> size_t { return buffer->byteLength(); }); } diff --git a/src/bun.js/bindings/webcore/MessageEvent.h b/src/bun.js/bindings/webcore/MessageEvent.h index 3d65710df4..3c7bbe0daa 100644 --- a/src/bun.js/bindings/webcore/MessageEvent.h +++ b/src/bun.js/bindings/webcore/MessageEvent.h @@ -39,14 +39,15 @@ namespace WebCore { +class Blob; + class MessageEvent final : public Event { WTF_MAKE_TZONE_ALLOCATED(MessageEvent); public: struct JSValueTag { }; - // using DataType = std::variant, String, Ref, Ref>; - using DataType = std::variant, String, Ref>; + using DataType = std::variant, String, Ref, Ref>; static Ref create(const AtomString& type, DataType&&, const String& origin = {}, const String& lastEventId = {}, RefPtr&& = nullptr, Vector>&& = {}); static Ref create(DataType&&, const String& origin = {}, const String& lastEventId = {}, RefPtr&& = nullptr, Vector>&& = {}); diff --git a/src/bun.js/bindings/webcore/WebSocket.cpp b/src/bun.js/bindings/webcore/WebSocket.cpp index d337a2f7ab..e1cb1f424a 100644 --- a/src/bun.js/bindings/webcore/WebSocket.cpp +++ b/src/bun.js/bindings/webcore/WebSocket.cpp @@ -33,7 +33,8 @@ #include "WebSocket.h" #include "WebSocketDeflate.h" #include "headers.h" -// #include "Blob.h" +#include "blob.h" +#include "ZigGeneratedClasses.h" #include "CloseEvent.h" // #include "ContentSecurityPolicy.h" // #include "DOMWindow.h" @@ -564,22 +565,27 @@ ExceptionOr WebSocket::send(ArrayBufferView& arrayBufferView) return {}; } -// ExceptionOr WebSocket::send(Blob& binaryData) -// { -// LOG(Network, "WebSocket %p send() Sending Blob '%s'", this, binaryData.url().stringCenterEllipsizedToLength().utf8().data()); -// if (m_state == CONNECTING) -// return Exception { InvalidStateError }; -// if (m_state == CLOSING || m_state == CLOSED) { -// unsigned payloadSize = static_cast(binaryData.size()); -// m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); -// m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); -// return {}; -// } -// m_bufferedAmount = saturateAdd(m_bufferedAmount, binaryData.size()); -// ASSERT(m_channel); -// m_channel->send(binaryData); -// return {}; -// } +WebCore::ExceptionOr WebCore::WebSocket::send(WebCore::JSBlob* blob) +{ + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + if (m_state == CLOSING || m_state == CLOSED) { + return {}; + } + + // Get the blob data and send it using existing binary data path + void* dataPtr = Blob__getDataPtr(JSC::JSValue::encode(blob)); + size_t dataSize = Blob__getSize(JSC::JSValue::encode(blob)); + + if (dataPtr && dataSize > 0) { + this->sendWebSocketData(static_cast(dataPtr), dataSize, Opcode::Binary); + } else { + // Send empty frame for empty blobs + this->sendWebSocketData(nullptr, 0, Opcode::Binary); + } + + return {}; +} void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const Opcode op) { @@ -957,10 +963,10 @@ String WebSocket::binaryType() const ExceptionOr WebSocket::setBinaryType(const String& binaryType) { - // if (binaryType == "blob"_s) { - // m_binaryType = BinaryType::Blob; - // return {}; - // } + if (binaryType == "blob"_s) { + m_binaryType = BinaryType::Blob; + return {}; + } if (binaryType == "arraybuffer"_s) { m_binaryType = BinaryType::ArrayBuffer; return {}; @@ -1103,10 +1109,26 @@ void WebSocket::didReceiveBinaryData(const AtomString& eventName, const std::spa // inspector->didReceiveWebSocketFrame(WebSocketChannelInspector::createFrame(binaryData.data(), binaryData.size(), WebSocketFrame::OpCode::OpCodeBinary)); // } switch (m_binaryType) { - // case BinaryType::Blob: - // // FIXME: We just received the data from NetworkProcess, and are sending it back. This is inefficient. - // dispatchEvent(MessageEvent::create(Blob::create(scriptExecutionContext(), WTFMove(binaryData), emptyString()), SecurityOrigin::create(m_url)->toString())); - // break; + case BinaryType::Blob: + if (this->hasEventListeners(eventName)) { + // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener + this->incPendingActivityCount(); + RefPtr blob = Blob::create(binaryData, scriptExecutionContext()->jsGlobalObject()); + dispatchEvent(MessageEvent::create(eventName, blob.releaseNonNull(), m_url.string())); + this->decPendingActivityCount(); + return; + } + + if (auto* context = scriptExecutionContext()) { + RefPtr blob = Blob::create(binaryData, context->jsGlobalObject()); + context->postTask([this, name = eventName, blob = blob.releaseNonNull(), protectedThis = Ref { *this }](ScriptExecutionContext& context) { + ASSERT(scriptExecutionContext()); + protectedThis->dispatchEvent(MessageEvent::create(name, blob, protectedThis->m_url.string())); + protectedThis->decPendingActivityCount(); + }); + } + + break; case BinaryType::ArrayBuffer: { if (this->hasEventListeners(eventName)) { // the main reason for dispatching on a separate tick is to handle when you haven't yet attached an event listener @@ -1177,9 +1199,6 @@ void WebSocket::didReceiveBinaryData(const AtomString& eventName, const std::spa break; } - case BinaryType::Blob: { - // TODO: Blob is not supported currently. - } } // }); } @@ -1533,3 +1552,47 @@ extern "C" void WebSocket__decrementPendingActivity(WebCore::WebSocket* webSocke { webSocket->decPendingActivityCount(); } + +WebCore::ExceptionOr WebCore::WebSocket::ping(WebCore::JSBlob* blob) +{ + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + if (m_state == CLOSING || m_state == CLOSED) { + return {}; + } + + // Get the blob data and send it using existing binary data path + void* dataPtr = Blob__getDataPtr(JSC::JSValue::encode(blob)); + size_t dataSize = Blob__getSize(JSC::JSValue::encode(blob)); + + if (dataPtr && dataSize > 0) { + this->sendWebSocketData(static_cast(dataPtr), dataSize, Opcode::Ping); + } else { + // Send empty frame for empty blobs + this->sendWebSocketData(nullptr, 0, Opcode::Ping); + } + + return {}; +} + +WebCore::ExceptionOr WebCore::WebSocket::pong(WebCore::JSBlob* blob) +{ + if (m_state == CONNECTING) + return Exception { InvalidStateError }; + if (m_state == CLOSING || m_state == CLOSED) { + return {}; + } + + // Get the blob data and send it using existing binary data path + void* dataPtr = Blob__getDataPtr(JSC::JSValue::encode(blob)); + size_t dataSize = Blob__getSize(JSC::JSValue::encode(blob)); + + if (dataPtr && dataSize > 0) { + this->sendWebSocketData(static_cast(dataPtr), dataSize, Opcode::Pong); + } else { + // Send empty frame for empty blobs + this->sendWebSocketData(nullptr, 0, Opcode::Pong); + } + + return {}; +} diff --git a/src/bun.js/bindings/webcore/WebSocket.h b/src/bun.js/bindings/webcore/WebSocket.h index eecbcf4e4a..9bdfa028e9 100644 --- a/src/bun.js/bindings/webcore/WebSocket.h +++ b/src/bun.js/bindings/webcore/WebSocket.h @@ -40,6 +40,10 @@ #include "FetchHeaders.h" #include "WebSocketErrorCode.h" +namespace WebCore { +class JSBlob; +} + namespace uWS { template struct WebSocket; @@ -52,7 +56,7 @@ class ArrayBufferView; namespace WebCore { -// class Blob; +class Blob; class WebSocket final : public RefCounted, public EventTargetWithInlineData, public ContextDestructionObserver { WTF_MAKE_TZONE_ALLOCATED(WebSocket); @@ -95,19 +99,19 @@ public: ExceptionOr send(const String& message); ExceptionOr send(JSC::ArrayBuffer&); ExceptionOr send(JSC::ArrayBufferView&); - // ExceptionOr send(Blob&); + ExceptionOr send(JSBlob*); ExceptionOr ping(); ExceptionOr ping(const String& message); ExceptionOr ping(JSC::ArrayBuffer&); ExceptionOr ping(JSC::ArrayBufferView&); - // ExceptionOr ping(Blob&); + ExceptionOr ping(JSBlob*); ExceptionOr pong(); ExceptionOr pong(const String& message); ExceptionOr pong(JSC::ArrayBuffer&); ExceptionOr pong(JSC::ArrayBufferView&); - // ExceptionOr ping(Blob&); + ExceptionOr pong(JSBlob*); ExceptionOr close(std::optional code, const String& reason); ExceptionOr terminate(); diff --git a/src/bun.js/webcore/Blob.zig b/src/bun.js/webcore/Blob.zig index 4a1869ab53..059d11e4cb 100644 --- a/src/bun.js/webcore/Blob.zig +++ b/src/bun.js/webcore/Blob.zig @@ -3010,6 +3010,33 @@ export fn Bun__Blob__getSizeForBindings(this: *Blob) callconv(.C) u64 { return this.getSizeForBindings(); } +export fn Blob__getDataPtr(value: jsc.JSValue) callconv(.C) ?*anyopaque { + const blob = Blob.fromJS(value) orelse return null; + const data = blob.sharedView(); + if (data.len == 0) return null; + return @constCast(data.ptr); +} + +export fn Blob__getSize(value: jsc.JSValue) callconv(.C) usize { + const blob = Blob.fromJS(value) orelse return 0; + const data = blob.sharedView(); + return data.len; +} + +export fn Blob__fromBytes(globalThis: *jsc.JSGlobalObject, ptr: ?[*]const u8, len: usize) callconv(.C) *Blob { + if (ptr == null or len == 0) { + const blob = new(initEmpty(globalThis)); + blob.allocator = bun.default_allocator; + return blob; + } + + const bytes = bun.default_allocator.dupe(u8, ptr.?[0..len]) catch bun.outOfMemory(); + const store = Store.init(bytes, bun.default_allocator); + var blob = initWithStore(store, globalThis); + blob.allocator = bun.default_allocator; + return new(blob); +} + pub fn getStat(this: *Blob, globalThis: *jsc.JSGlobalObject, callback: *jsc.CallFrame) bun.JSError!jsc.JSValue { const store = this.store orelse return .js_undefined; // TODO: make this async for files diff --git a/src/http/websocket_client.zig b/src/http/websocket_client.zig index 4d681d9ee7..e6ef2d8f37 100644 --- a/src/http/websocket_client.zig +++ b/src/http/websocket_client.zig @@ -984,6 +984,48 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { return !this.tcp.isClosed() and !this.tcp.isShutdown(); } + pub fn writeBlob( + this: *WebSocket, + blob_value: jsc.JSValue, + op: u8, + ) callconv(.C) void { + if (!this.hasTCP() or op > 0xF) { + this.dispatchAbruptClose(ErrorCode.ended); + return; + } + + const opcode: Opcode = @enumFromInt(op); + + // Cast the JSValue to a Blob + if (blob_value.as(jsc.WebCore.Blob)) |blob| { + // Get the shared view of the blob data + const data = blob.sharedView(); + if (data.len == 0) { + // Empty blob, send empty frame + const bytes = Copy{ .bytes = &[0]u8{} }; + _ = this.sendData(bytes, !this.hasBackpressure(), opcode); + return; + } + + // Send the blob data similar to writeBinaryData + const bytes = Copy{ .bytes = data }; + + // Fast path for small blobs + const frame_size = WebsocketHeader.frameSizeIncludingMask(data.len); + if (!this.hasBackpressure() and frame_size < stack_frame_size) { + var inline_buf: [stack_frame_size]u8 = undefined; + bytes.copy(this.globalThis, inline_buf[0..frame_size], data.len, opcode); + _ = this.enqueueEncodedBytes(this.tcp, inline_buf[0..frame_size]); + return; + } + + _ = this.sendData(bytes, !this.hasBackpressure(), opcode); + } else { + // Invalid blob, close connection + this.dispatchAbruptClose(ErrorCode.ended); + } + } + pub fn writeString( this: *WebSocket, str_: *const jsc.ZigString, @@ -1216,6 +1258,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type { @export(&memoryCost, .{ .name = "Bun__" ++ name ++ "__memoryCost" }); @export(®ister, .{ .name = "Bun__" ++ name ++ "__register" }); @export(&writeBinaryData, .{ .name = "Bun__" ++ name ++ "__writeBinaryData" }); + @export(&writeBlob, .{ .name = "Bun__" ++ name ++ "__writeBlob" }); @export(&writeString, .{ .name = "Bun__" ++ name ++ "__writeString" }); } } diff --git a/test/js/web/websocket/websocket-blob.test.ts b/test/js/web/websocket/websocket-blob.test.ts new file mode 100644 index 0000000000..c229c5d4b5 --- /dev/null +++ b/test/js/web/websocket/websocket-blob.test.ts @@ -0,0 +1,212 @@ +import { expect, test } from "bun:test"; + +test("WebSocket should send Blob data", async () => { + await using server = Bun.serve({ + port: 0, + websocket: { + open(ws) { + console.log("Server: WebSocket opened"); + }, + message(ws, message) { + console.log("Server received:", message); + // Echo back text messages + ws.send(message); + }, + close(ws) { + console.log("Server: WebSocket closed"); + }, + }, + fetch(req, server) { + if (server.upgrade(req)) { + return undefined; + } + return new Response("Upgrade failed", { status: 500 }); + }, + }); + + const url = `ws://localhost:${server.port}`; + + const { promise, resolve, reject } = Promise.withResolvers(); + const ws = new WebSocket(url); + ws.binaryType = "blob"; + let messageReceived = false; + + ws.onopen = () => { + console.log("Client: WebSocket opened"); + + // Create a blob with test data + const testData = new Uint8Array([72, 101, 108, 108, 111]); // "Hello" in bytes + const blob = new Blob([testData], { type: "application/octet-stream" }); + + console.log("Sending blob with length:", blob.size); + ws.send(blob); + }; + + ws.onmessage = async event => { + console.log("Client received message:", event.data); + messageReceived = true; + + if (event.data instanceof Blob) { + const received = new Uint8Array(await event.data.arrayBuffer()); + console.log("Received bytes:", Array.from(received)); + + // Verify we received the correct data + expect(received).toEqual(new Uint8Array([72, 101, 108, 108, 111])); + ws.close(); + resolve(); + } else { + ws.close(); + reject(new Error("Expected blob data, got: " + typeof event.data)); + } + }; + + ws.onerror = error => { + console.error("WebSocket error:", error); + ws.close(); + reject(error); + }; + + ws.onclose = event => { + console.log("Client: WebSocket closed", event.code, event.reason); + if (!messageReceived) { + reject(new Error("Connection closed without receiving message")); + } + }; + + await promise; +}); + +test("WebSocket should send empty Blob", async () => { + await using server = Bun.serve({ + port: 0, + websocket: { + message(ws, message) { + // Echo back the message + ws.send(message); + }, + }, + fetch(req, server) { + if (server.upgrade(req)) { + return undefined; + } + return new Response("Upgrade failed", { status: 500 }); + }, + }); + + const url = `ws://localhost:${server.port}`; + + const { promise, resolve, reject } = Promise.withResolvers(); + const ws = new WebSocket(url); + ws.binaryType = "blob"; + let messageReceived = false; + + ws.onopen = () => { + // Create an empty blob + const blob = new Blob([], { type: "application/octet-stream" }); + + console.log("Sending empty blob with length:", blob.size); + ws.send(blob); + }; + + ws.onmessage = async event => { + console.log("Client received message:", event.data); + messageReceived = true; + + if (event.data instanceof Blob) { + const received = new Uint8Array(await event.data.arrayBuffer()); + console.log("Received bytes length:", received.length); + + // Verify we received empty data + expect(received.length).toBe(0); + ws.close(); + resolve(); + } else { + ws.close(); + reject(new Error("Expected blob data, got: " + typeof event.data)); + } + }; + + ws.onerror = error => { + console.error("WebSocket error:", error); + ws.close(); + reject(error); + }; + + ws.onclose = event => { + console.log("Client: WebSocket closed", event.code, event.reason); + if (!messageReceived) { + reject(new Error("Connection closed without receiving message")); + } + }; + + await promise; +}); + +test("WebSocket should ping with Blob", async () => { + await using server = Bun.serve({ + port: 0, + websocket: { + ping(ws, data) { + console.log("Server received ping with data:", data); + // Respond with pong containing the same data + ws.pong(data); + }, + }, + fetch(req, server) { + if (server.upgrade(req)) { + return undefined; + } + return new Response("Upgrade failed", { status: 500 }); + }, + }); + + const url = `ws://localhost:${server.port}`; + + const { promise, resolve, reject } = Promise.withResolvers(); + const ws = new WebSocket(url); + ws.binaryType = "blob"; + let pongReceived = false; + + ws.onopen = () => { + console.log("Client: WebSocket opened"); + + // Create a blob with ping data + const pingData = new Uint8Array([80, 73, 78, 71]); // "PING" in bytes + const blob = new Blob([pingData], { type: "application/octet-stream" }); + + console.log("Sending ping with blob"); + ws.ping(blob); + }; + + ws.addEventListener("pong", async (event: any) => { + console.log("Client received pong:", event.data); + pongReceived = true; + + if (event.data instanceof Blob) { + const received = new Uint8Array(await event.data.arrayBuffer()); + + // Verify we received the correct ping data back + expect(new Uint8Array(received)).toEqual(new Uint8Array([80, 73, 78, 71])); + ws.close(); + resolve(); + } else { + ws.close(); + reject(new Error("Expected blob data in pong, got: " + typeof event.data)); + } + }); + + ws.onerror = error => { + console.error("WebSocket error:", error); + ws.close(); + reject(error); + }; + + ws.onclose = event => { + console.log("Client: WebSocket closed", event.code, event.reason); + if (!pongReceived) { + reject(new Error("Connection closed without receiving pong")); + } + }; + + await promise; +});