[eventsource] SSE Client (#3074)

* fix flush

* remove logs

* add HTTP/1.1 eventsource

* fix parse spec

* multiple data in one event

* get lastEventId for reconnection

* fix parsing add reconnect

* fix reconnection retry

* add retry option

* move eventsource to builtins

* remove duplicate interface on globals.d.ts

* move test to TS

* fmt

* allow no Content-Length or Transfer Encoding

* udpate builtins

* hardcoded

* merge

* revert /src/out

* updated

* Update .gitignore

* Make the tests fail

* Cleanup EventSource getter

* fixup

* fixup TS

* fmt

* update builtins

* fix tests

* Clear existing timeouts

* Add `ref` and `unref` methods

* Use `super` to make prototype pollution slightly harder

* Reduce test timeout

* Regenerate builtins

* prettier + ref/unref

* Outdated

* forgot to commit this

---------

Co-authored-by: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com>
This commit is contained in:
Ciro Spaciari
2023-06-18 00:54:50 -03:00
committed by GitHub
parent 065713aeca
commit b2af1984ed
8 changed files with 886 additions and 2 deletions

2
.gitignore vendored
View File

@@ -121,3 +121,5 @@ cold-jsc-start
cold-jsc-start.d
/test.ts
src/js/out/modules_dev

View File

@@ -3197,3 +3197,82 @@ declare module "*.txt" {
var text: string;
export = text;
}
interface EventSourceEventMap {
error: Event;
message: MessageEvent;
open: Event;
}
interface EventSource extends EventTarget {
onerror: ((this: EventSource, ev: ErrorEvent) => any) | null;
onmessage: ((this: EventSource, ev: MessageEvent) => any) | null;
onopen: ((this: EventSource, ev: Event) => any) | null;
/** Returns the state of this EventSource object's connection. It can have the values described below. */
readonly readyState: number;
/** Returns the URL providing the event stream. */
readonly url: string;
/** Returns true if the credentials mode for connection requests to the URL providing the event stream is set to "include", and false otherwise.
*
* Not supported in Bun
*
*/
readonly withCredentials: boolean;
/** Aborts any instances of the fetch algorithm started for this EventSource object, and sets the readyState attribute to CLOSED. */
close(): void;
readonly CLOSED: number;
readonly CONNECTING: number;
readonly OPEN: number;
addEventListener<K extends keyof EventSourceEventMap>(
type: K,
listener: (this: EventSource, ev: EventSourceEventMap[K]) => any,
options?: boolean | AddEventListenerOptions,
): void;
addEventListener(
type: string,
listener: (this: EventSource, event: MessageEvent) => any,
options?: boolean | AddEventListenerOptions,
): void;
addEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions,
): void;
removeEventListener<K extends keyof EventSourceEventMap>(
type: K,
listener: (this: EventSource, ev: EventSourceEventMap[K]) => any,
options?: boolean | EventListenerOptions,
): void;
removeEventListener(
type: string,
listener: (this: EventSource, event: MessageEvent) => any,
options?: boolean | EventListenerOptions,
): void;
removeEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | EventListenerOptions,
): void;
/**
* Keep the event loop alive while connection is open or reconnecting
*
* Not available in browsers
*/
ref(): void;
/**
* Do not keep the event loop alive while connection is open or reconnecting
*
* Not available in browsers
*/
unref(): void;
}
declare var EventSource: {
prototype: EventSource;
new (url: string | URL, eventSourceInitDict?: EventSourceInit): EventSource;
readonly CLOSED: number;
readonly CONNECTING: number;
readonly OPEN: number;
};

View File

@@ -3241,6 +3241,55 @@ JSC_DEFINE_CUSTOM_GETTER(functionBuildMessageGetter, (JSGlobalObject * globalObj
return JSValue::encode(reinterpret_cast<Zig::GlobalObject*>(globalObject)->JSBuildMessageConstructor());
}
JSC_DEFINE_CUSTOM_GETTER(
EventSource_getter, (JSGlobalObject * globalObject, EncodedJSValue thisValue, PropertyName property))
{
auto& vm = globalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// If "this" is not the Global object, just return undefined
// you should not be able to reset the global object's EventSource if you muck around with prototypes
if (JSValue::decode(thisValue) != globalObject)
return JSValue::encode(JSC::jsUndefined());
JSC::JSFunction* getSourceEvent = JSC::JSFunction::create(vm, eventSourceGetEventSourceCodeGenerator(vm), globalObject);
RETURN_IF_EXCEPTION(scope, {});
JSC::MarkedArgumentBuffer args;
auto clientData = WebCore::clientData(vm);
JSC::CallData callData = JSC::getCallData(getSourceEvent);
NakedPtr<JSC::Exception> returnedException = nullptr;
auto result = JSC::call(globalObject, getSourceEvent, callData, globalObject->globalThis(), args, returnedException);
RETURN_IF_EXCEPTION(scope, {});
if (returnedException) {
throwException(globalObject, scope, returnedException.get());
}
RETURN_IF_EXCEPTION(scope, {});
if (LIKELY(result)) {
globalObject->putDirect(vm, property, result, 0);
}
RELEASE_AND_RETURN(scope, JSValue::encode(result));
}
JSC_DEFINE_CUSTOM_SETTER(EventSource_setter,
(JSC::JSGlobalObject * globalObject, JSC::EncodedJSValue thisValue,
JSC::EncodedJSValue value, JSC::PropertyName property))
{
if (JSValue::decode(thisValue) != globalObject) {
return false;
}
auto& vm = globalObject->vm();
globalObject->putDirect(vm, property, JSValue::decode(value), 0);
return true;
}
EncodedJSValue GlobalObject::assignToStream(JSValue stream, JSValue controller)
{
JSC::VM& vm = this->vm();
@@ -3538,6 +3587,8 @@ void GlobalObject::addBuiltinGlobals(JSC::VM& vm)
putDirectCustomAccessor(vm, JSC::Identifier::fromString(vm, "$_BunCommonJSModule_$"_s), JSC::CustomGetterSetter::create(vm, BunCommonJSModule_getter, nullptr),
JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::DontDelete | JSC::PropertyAttribute::ReadOnly);
putDirectCustomAccessor(vm, JSC::Identifier::fromString(vm, "EventSource"_s), JSC::CustomGetterSetter::create(vm, EventSource_getter, EventSource_setter), 0);
auto bufferAccessor = JSC::CustomGetterSetter::create(vm, JSBuffer_getter, JSBuffer_setter);
auto realBufferAccessor = JSC::CustomGetterSetter::create(vm, JSBuffer_privateGetter, nullptr);

View File

@@ -0,0 +1,500 @@
/*
* Copyright 2023 Codeblog Corp. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
* OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
export function getEventSource() {
type Socket = Awaited<ReturnType<typeof Bun.connect<EventSource>>>;
class EventSource extends EventTarget {
#url;
#state;
#onerror;
#onmessage;
#onopen;
#is_tls = false;
#socket: Socket | null = null;
#data_buffer = "";
#send_buffer = "";
#lastEventID = "";
#reconnect = true;
#content_length = 0; // 0 means chunked -1 means not informed aka no auto end
#received_length = 0;
#reconnection_time = 0;
#reconnection_timer: Timer | null = null;
static #ConnectNextTick(self: EventSource) {
self.#connect();
}
static #SendRequest(socket: Socket, url: URL) {
const self = socket.data;
const last_event_header = self.#lastEventID ? `Last-Event-ID: ${self.#lastEventID}\r\n` : "";
const request = `GET ${url.pathname}${url.search} HTTP/1.1\r\nHost: bun\r\nContent-type: text/event-stream\r\nContent-length: 0\r\n${last_event_header}\r\n`;
const sended = socket.write(request);
if (sended !== request.length) {
self.#send_buffer = request.substring(sended);
}
}
static #ProcessChunk(self: EventSource, chunks: string, offset: number) {
for (;;) {
if (offset >= chunks.length) {
return;
}
let chunk_end_idx = -1;
let start_idx = chunks.indexOf("\r\n", offset);
const chunk_start_idx = start_idx + 2;
if (start_idx > 0) {
if (self.#content_length === 0) {
const chunk_size = parseInt(chunks.substring(offset, start_idx), 16);
if (chunk_size === 0) {
// no more chunks
self.#state = 2;
self.#socket?.end();
return;
}
chunk_end_idx = chunk_start_idx + chunk_size;
} else {
//not chunked
chunk_end_idx = chunks.length;
}
} else {
// wait for the chunk if is chunked
if (self.#data_buffer.length === 0) {
self.#data_buffer += chunks.substring(offset);
return;
}
chunk_end_idx = chunks.length;
}
// check for chunk end
let chunk = chunks.substring(chunk_start_idx, chunk_end_idx);
offset = chunk_end_idx + 2;
let chunk_offset = 0;
// wait for data end
let event_idx = chunk.indexOf("\n\n");
if (event_idx == -1) {
// wait for more data
self.#data_buffer += chunks.substring(chunk_start_idx);
return;
}
// combine data
if (self.#data_buffer.length) {
self.#data_buffer += chunk;
chunk = self.#data_buffer;
self.#data_buffer = "";
}
let more_events = true;
while (more_events) {
const event_data = chunk.substring(chunk_offset, event_idx);
let type;
let data = "";
let id;
let event_line_idx = 0;
let retry = -1;
for (;;) {
let idx = event_data.indexOf("\n", event_line_idx);
if (idx === -1) {
if (event_line_idx >= event_data.length) {
break;
}
idx = event_data.length;
}
const line = event_data.substring(event_line_idx, idx);
if (line.startsWith("data:")) {
if (data.length) {
data += `\n${line.substring(5).trim()}`;
} else {
data = line.substring(5).trim();
}
} else if (line.startsWith("event:")) {
type = line.substring(6).trim();
} else if (line.startsWith("id:")) {
id = line.substring(3).trim();
} else if (line.startsWith("retry:")) {
retry = parseInt(line.substring(6).trim(), 10);
if (isNaN(retry)) {
retry = -1;
}
}
event_line_idx = idx + 1;
}
self.#lastEventID = id || "";
if (retry >= 0) {
self.#reconnection_time = retry;
}
if (data || id || type) {
self.dispatchEvent(
new MessageEvent(type || "message", {
data: data || "",
origin: self.#url.origin,
// @ts-ignore
source: self,
lastEventId: id,
}),
);
}
// no more events
if (chunk.length === event_idx + 2) {
more_events = false;
break;
}
const next_event_idx = chunk.indexOf("\n\n", event_idx + 1);
if (next_event_idx === -1) {
break;
}
chunk_offset = event_idx;
event_idx = next_event_idx;
}
}
}
static #Handlers = {
open(socket: Socket) {
const self = socket.data;
self.#socket = socket;
if (!self.#is_tls) {
EventSource.#SendRequest(socket, self.#url);
}
},
handshake(socket: Socket, success: boolean, verifyError: Error) {
const self = socket.data;
if (success) {
EventSource.#SendRequest(socket, self.#url);
} else {
self.#state = 2;
self.dispatchEvent(new ErrorEvent("error", { error: verifyError }));
socket.end();
}
},
data(socket: Socket, buffer: Buffer) {
const self = socket.data;
switch (self.#state) {
case 0: {
let text = buffer.toString();
const headers_idx = text.indexOf("\r\n\r\n");
if (headers_idx === -1) {
// wait headers
self.#data_buffer += text;
return;
}
if (self.#data_buffer.length) {
self.#data_buffer += text;
text = self.#data_buffer;
self.#data_buffer = "";
}
const headers = text.substring(0, headers_idx);
const status_idx = headers.indexOf("\r\n");
if (status_idx === -1) {
self.#state = 2;
self.dispatchEvent(new ErrorEvent("error", { error: new Error("Invalid HTTP request") }));
socket.end();
return;
}
const status = headers.substring(0, status_idx);
if (status !== "HTTP/1.1 200 OK") {
self.#state = 2;
self.dispatchEvent(new ErrorEvent("error", { error: new Error(status) }));
socket.end();
return;
}
let start_idx = status_idx + 1;
let mime_type_ok = false;
let content_length = -1;
for (;;) {
let header_idx = headers.indexOf("\r\n", start_idx);
// No text/event-stream mime type
if (header_idx === -1) {
if (start_idx >= headers.length) {
if (!mime_type_ok) {
self.#state = 2;
self.dispatchEvent(
new ErrorEvent("error", {
error: new Error(
`EventSource's response has no MIME type and "text/event-stream" is required. Aborting the connection.`,
),
}),
);
socket.end();
}
return;
}
header_idx = headers.length;
}
const header = headers.substring(start_idx + 1, header_idx);
const header_name_idx = header.indexOf(":");
const header_name = header.substring(0, header_name_idx);
const is_content_type =
header_name.localeCompare("content-type", undefined, { sensitivity: "accent" }) === 0;
start_idx = header_idx + 1;
if (is_content_type) {
if (header.endsWith(" text/event-stream")) {
mime_type_ok = true;
} else {
// wrong mime type
self.#state = 2;
self.dispatchEvent(
new ErrorEvent("error", {
error: new Error(
`EventSource's response has a MIME type that is not "text/event-stream". Aborting the connection.`,
),
}),
);
socket.end();
return;
}
} else {
const is_content_length =
header_name.localeCompare("content-length", undefined, { sensitivity: "accent" }) === 0;
if (is_content_length) {
content_length = parseInt(header.substring(header_name_idx + 1).trim(), 10);
if (isNaN(content_length) || content_length <= 0) {
self.dispatchEvent(
new ErrorEvent("error", {
error: new Error(`EventSource's Content-Length is invalid. Aborting the connection.`),
}),
);
socket.end();
return;
}
if (mime_type_ok) {
break;
}
} else {
const is_transfer_encoding =
header_name.localeCompare("transfer-encoding", undefined, { sensitivity: "accent" }) === 0;
if (is_transfer_encoding) {
if (header.substring(header_name_idx + 1).trim() !== "chunked") {
self.dispatchEvent(
new ErrorEvent("error", {
error: new Error(`EventSource's Transfer-Encoding is invalid. Aborting the connection.`),
}),
);
socket.end();
return;
}
content_length = 0;
if (mime_type_ok) {
break;
}
}
}
}
}
self.#content_length = content_length;
self.#state = 1;
self.dispatchEvent(new Event("open"));
const chunks = text.substring(headers_idx + 4);
EventSource.#ProcessChunk(self, chunks, 0);
if (self.#content_length > 0) {
self.#received_length += chunks.length;
if (self.#received_length >= self.#content_length) {
self.#state = 2;
socket.end();
}
}
return;
}
case 1:
EventSource.#ProcessChunk(self, buffer.toString(), 2);
if (self.#content_length > 0) {
self.#received_length += buffer.byteLength;
if (self.#received_length >= self.#content_length) {
self.#state = 2;
socket.end();
}
}
return;
default:
break;
}
},
drain(socket: Socket) {
const self = socket.data;
if (self.#state === 0) {
const request = self.#data_buffer;
if (request.length) {
const sended = socket.write(request);
if (sended !== request.length) {
socket.data.#send_buffer = request.substring(sended);
} else {
socket.data.#send_buffer = "";
}
}
}
},
close: EventSource.#Close,
end(socket: Socket) {
EventSource.#Close(socket).dispatchEvent(
new ErrorEvent("error", { error: new Error("Connection closed by server") }),
);
},
timeout(socket: Socket) {
EventSource.#Close(socket).dispatchEvent(new ErrorEvent("error", { error: new Error("Timeout") }));
},
binaryType: "buffer",
};
static #Close(socket: Socket) {
const self = socket.data;
self.#socket = null;
self.#received_length = 0;
self.#state = 2;
if (self.#reconnect) {
if (self.#reconnection_timer) {
clearTimeout(self.#reconnection_timer);
}
self.#reconnection_timer = setTimeout(EventSource.#ConnectNextTick, self.#reconnection_time, self);
}
return self;
}
constructor(url: string, options = undefined) {
super();
const uri = new URL(url);
this.#is_tls = uri.protocol === "https:";
this.#url = uri;
this.#state = 2;
process.nextTick(EventSource.#ConnectNextTick, this);
}
// Not web standard
ref() {
this.#reconnection_timer?.ref();
this.#socket?.ref();
}
// Not web standard
unref() {
this.#reconnection_timer?.unref();
this.#socket?.unref();
}
#connect() {
if (this.#state !== 2) return;
const uri = this.#url;
const is_tls = this.#is_tls;
this.#state = 0;
//@ts-ignore
Bun.connect({
data: this,
socket: EventSource.#Handlers,
hostname: uri.hostname,
port: parseInt(uri.port || (is_tls ? "443" : "80"), 10),
tls: is_tls
? {
requestCert: true,
rejectUnauthorized: false,
}
: false,
}).catch(err => {
super.dispatchEvent(new ErrorEvent("error", { error: err }));
if (this.#reconnect) {
if (this.#reconnection_timer) {
this.#reconnection_timer.unref?.();
}
this.#reconnection_timer = setTimeout(EventSource.#ConnectNextTick, 1000, this);
}
});
}
get url() {
return this.#url.href;
}
get readyState() {
return this.#state;
}
close() {
this.#reconnect = false;
this.#state = 2;
this.#socket?.unref();
this.#socket?.end();
}
get onopen() {
return this.#onopen;
}
get onerror() {
return this.#onerror;
}
get onmessage() {
return this.#onmessage;
}
set onopen(cb) {
if (this.#onopen) {
super.removeEventListener("close", this.#onopen);
}
super.addEventListener("open", cb);
this.#onopen = cb;
}
set onerror(cb) {
if (this.#onerror) {
super.removeEventListener("error", this.#onerror);
}
super.addEventListener("error", cb);
this.#onerror = cb;
}
set onmessage(cb) {
if (this.#onmessage) {
super.removeEventListener("message", this.#onmessage);
}
super.addEventListener("message", cb);
this.#onmessage = cb;
}
}
Object.defineProperty(EventSource.prototype, "CONNECTING", {
enumerable: true,
value: 0,
});
Object.defineProperty(EventSource.prototype, "OPEN", {
enumerable: true,
value: 1,
});
Object.defineProperty(EventSource.prototype, "CLOSED", {
enumerable: true,
value: 2,
});
EventSource[Symbol.for("CommonJS")] = 0;
return EventSource;
}

File diff suppressed because one or more lines are too long

View File

@@ -5400,6 +5400,84 @@ inline void WritableStreamDefaultControllerBuiltinsWrapper::exportNames()
WEBCORE_FOREACH_WRITABLESTREAMDEFAULTCONTROLLER_BUILTIN_FUNCTION_NAME(EXPORT_FUNCTION_NAME)
#undef EXPORT_FUNCTION_NAME
}
/* EventSource.ts */
// getEventSource
#define WEBCORE_BUILTIN_EVENTSOURCE_GETEVENTSOURCE 1
extern const char* const s_eventSourceGetEventSourceCode;
extern const int s_eventSourceGetEventSourceCodeLength;
extern const JSC::ConstructAbility s_eventSourceGetEventSourceCodeConstructAbility;
extern const JSC::ConstructorKind s_eventSourceGetEventSourceCodeConstructorKind;
extern const JSC::ImplementationVisibility s_eventSourceGetEventSourceCodeImplementationVisibility;
#define WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_DATA(macro) \
macro(getEventSource, eventSourceGetEventSource, 0) \
#define WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(macro) \
macro(eventSourceGetEventSourceCode, getEventSource, ASCIILiteral(), s_eventSourceGetEventSourceCodeLength) \
#define WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_FUNCTION_NAME(macro) \
macro(getEventSource) \
#define DECLARE_BUILTIN_GENERATOR(codeName, functionName, overriddenName, argumentCount) \
JSC::FunctionExecutable* codeName##Generator(JSC::VM&);
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(DECLARE_BUILTIN_GENERATOR)
#undef DECLARE_BUILTIN_GENERATOR
class EventSourceBuiltinsWrapper : private JSC::WeakHandleOwner {
public:
explicit EventSourceBuiltinsWrapper(JSC::VM& vm)
: m_vm(vm)
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_FUNCTION_NAME(INITIALIZE_BUILTIN_NAMES)
#define INITIALIZE_BUILTIN_SOURCE_MEMBERS(name, functionName, overriddenName, length) , m_##name##Source(JSC::makeSource(StringImpl::createWithoutCopying(s_##name, length), { }))
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(INITIALIZE_BUILTIN_SOURCE_MEMBERS)
#undef INITIALIZE_BUILTIN_SOURCE_MEMBERS
{
}
#define EXPOSE_BUILTIN_EXECUTABLES(name, functionName, overriddenName, length) \
JSC::UnlinkedFunctionExecutable* name##Executable(); \
const JSC::SourceCode& name##Source() const { return m_##name##Source; }
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(EXPOSE_BUILTIN_EXECUTABLES)
#undef EXPOSE_BUILTIN_EXECUTABLES
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_FUNCTION_NAME(DECLARE_BUILTIN_IDENTIFIER_ACCESSOR)
void exportNames();
private:
JSC::VM& m_vm;
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_FUNCTION_NAME(DECLARE_BUILTIN_NAMES)
#define DECLARE_BUILTIN_SOURCE_MEMBERS(name, functionName, overriddenName, length) \
JSC::SourceCode m_##name##Source;\
JSC::Weak<JSC::UnlinkedFunctionExecutable> m_##name##Executable;
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(DECLARE_BUILTIN_SOURCE_MEMBERS)
#undef DECLARE_BUILTIN_SOURCE_MEMBERS
};
#define DEFINE_BUILTIN_EXECUTABLES(name, functionName, overriddenName, length) \
inline JSC::UnlinkedFunctionExecutable* EventSourceBuiltinsWrapper::name##Executable() \
{\
if (!m_##name##Executable) {\
JSC::Identifier executableName = functionName##PublicName();\
if (overriddenName)\
executableName = JSC::Identifier::fromString(m_vm, overriddenName);\
m_##name##Executable = JSC::Weak<JSC::UnlinkedFunctionExecutable>(JSC::createBuiltinExecutable(m_vm, m_##name##Source, executableName, s_##name##ImplementationVisibility, s_##name##ConstructorKind, s_##name##ConstructAbility), this, &m_##name##Executable);\
}\
return m_##name##Executable.get();\
}
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_CODE(DEFINE_BUILTIN_EXECUTABLES)
#undef DEFINE_BUILTIN_EXECUTABLES
inline void EventSourceBuiltinsWrapper::exportNames()
{
#define EXPORT_FUNCTION_NAME(name) m_vm.propertyNames->appendExternalName(name##PublicName(), name##PrivateName());
WEBCORE_FOREACH_EVENTSOURCE_BUILTIN_FUNCTION_NAME(EXPORT_FUNCTION_NAME)
#undef EXPORT_FUNCTION_NAME
}
class JSBuiltinFunctions {
public:
explicit JSBuiltinFunctions(JSC::VM& vm)
@@ -5427,6 +5505,7 @@ public:
, m_readableStreamDefaultControllerBuiltins(m_vm)
, m_readableByteStreamInternalsBuiltins(m_vm)
, m_writableStreamDefaultControllerBuiltins(m_vm)
, m_eventSourceBuiltins(m_vm)
{
m_writableStreamInternalsBuiltins.exportNames();
@@ -5458,6 +5537,7 @@ public:
ReadableStreamDefaultControllerBuiltinsWrapper& readableStreamDefaultControllerBuiltins() { return m_readableStreamDefaultControllerBuiltins; }
ReadableByteStreamInternalsBuiltinsWrapper& readableByteStreamInternalsBuiltins() { return m_readableByteStreamInternalsBuiltins; }
WritableStreamDefaultControllerBuiltinsWrapper& writableStreamDefaultControllerBuiltins() { return m_writableStreamDefaultControllerBuiltins; }
EventSourceBuiltinsWrapper& eventSourceBuiltins() { return m_eventSourceBuiltins; }
private:
JSC::VM& m_vm;
@@ -5484,6 +5564,7 @@ private:
ReadableStreamDefaultControllerBuiltinsWrapper m_readableStreamDefaultControllerBuiltins;
ReadableByteStreamInternalsBuiltinsWrapper m_readableByteStreamInternalsBuiltins;
WritableStreamDefaultControllerBuiltinsWrapper m_writableStreamDefaultControllerBuiltins;
EventSourceBuiltinsWrapper m_eventSourceBuiltins;
;
};

View File

@@ -2,13 +2,13 @@ function family() {
return Promise.resolve(familySync());
}
function familySync() {
return null;
return GLIBC;
}
function versionAsync() {
return Promise.resolve(version());
}
function version() {
return null;
return "2.29";
}
function isNonGlibcLinuxSync() {
return !1;

View File

@@ -0,0 +1,153 @@
function sse(req: Request) {
const signal = req.signal;
return new Response(
new ReadableStream({
type: "direct",
async pull(controller) {
while (!signal.aborted) {
await controller.write(`data:Hello, World!\n\n`);
await controller.write(`event: bun\ndata: Hello, World!\n\n`);
await controller.write(`event: lines\ndata: Line 1!\ndata: Line 2!\n\n`);
await controller.write(`event: id_test\nid:1\n\n`);
await controller.flush();
await Bun.sleep(100);
}
controller.close();
},
}),
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);
}
function sse_unstable(req: Request) {
const signal = req.signal;
let id = parseInt(req.headers.get("last-event-id") || "0", 10);
return new Response(
new ReadableStream({
type: "direct",
async pull(controller) {
if (!signal.aborted) {
await controller.write(`id:${++id}\ndata: Hello, World!\nretry:100\n\n`);
await controller.flush();
}
controller.close();
},
}),
{ status: 200, headers: { "Content-Type": "text/event-stream" } },
);
}
function sseServer(
done: (err?: unknown) => void,
pathname: string,
callback: (evtSource: EventSource, done: (err?: unknown) => void) => void,
) {
const server = Bun.serve({
port: 0,
fetch(req) {
if (new URL(req.url).pathname === "/stream") {
return sse(req);
}
if (new URL(req.url).pathname === "/unstable") {
return sse_unstable(req);
}
return new Response("Hello, World!");
},
});
let evtSource: EventSource | undefined;
try {
evtSource = new EventSource(`http://localhost:${server.port}${pathname}`);
callback(evtSource, err => {
try {
done(err);
evtSource?.close();
} catch (err) {
done(err);
} finally {
server.stop(true);
}
});
} catch (err) {
evtSource?.close();
server.stop(true);
done(err);
}
}
import { describe, expect, it } from "bun:test";
describe("events", () => {
it("should call open", done => {
sseServer(done, "/stream", (evtSource, done) => {
evtSource.onopen = () => {
done();
};
evtSource.onerror = err => {
done(err);
};
});
});
it("should call message", done => {
sseServer(done, "/stream", (evtSource, done) => {
evtSource.onmessage = e => {
expect(e.data).toBe("Hello, World!");
done();
};
});
});
it("should call custom event", done => {
sseServer(done, "/stream", (evtSource, done) => {
evtSource.addEventListener("bun", e => {
expect(e.data).toBe("Hello, World!");
done();
});
});
});
it("should call event with multiple lines", done => {
sseServer(done, "/stream", (evtSource, done) => {
evtSource.addEventListener("lines", e => {
expect(e.data).toBe("Line 1!\nLine 2!");
done();
});
});
});
it("should receive id", done => {
sseServer(done, "/stream", (evtSource, done) => {
evtSource.addEventListener("id_test", e => {
expect(e.lastEventId).toBe("1");
done();
});
});
});
it("should reconnect with id", done => {
sseServer(done, "/unstable", (evtSource, done) => {
const ids: string[] = [];
evtSource.onmessage = e => {
ids.push(e.lastEventId);
if (ids.length === 2) {
for (let i = 0; i < 2; i++) {
expect(ids[i]).toBe((i + 1).toString());
}
done();
}
};
});
});
it("should call error", done => {
sseServer(done, "/", (evtSource, done) => {
evtSource.onerror = e => {
expect(e.error.message).toBe(
`EventSource's response has a MIME type that is not "text/event-stream". Aborting the connection.`,
);
done();
};
});
});
});