From 033fc38feb27100f43f47edad87ee95e89916a41 Mon Sep 17 00:00:00 2001 From: Claude Bot Date: Tue, 30 Dec 2025 04:25:25 +0000 Subject: [PATCH] feat(ReadableStream): add .jsonl() method for parsing newline-delimited JSON MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new `.jsonl()` method to ReadableStream that returns an async iterable for parsing newline-delimited JSON (JSONL/NDJSON) streams. Example usage: ```js for await (const object of response.body.jsonl()) { console.log(object); } ``` The implementation: - Reads from the stream chunk by chunk as text - Buffers incomplete lines across chunks - Parses complete lines as JSON and yields each object - Handles trailing content without a final newline - Skips empty/whitespace-only lines 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- packages/bun-types/overrides.d.ts | 14 + src/bun.js/bindings/ZigGlobalObject.h | 2 + .../bindings/webcore/JSReadableStream.cpp | 14 + .../bindings/webcore/ReadableStream.cpp | 20 ++ src/js/builtins/ReadableStream.ts | 48 ++++ .../web/streams/readablestream-jsonl.test.ts | 248 ++++++++++++++++++ 6 files changed, 346 insertions(+) create mode 100644 test/js/web/streams/readablestream-jsonl.test.ts diff --git a/packages/bun-types/overrides.d.ts b/packages/bun-types/overrides.d.ts index 88cd98da39..43d36c56b2 100644 --- a/packages/bun-types/overrides.d.ts +++ b/packages/bun-types/overrides.d.ts @@ -31,6 +31,20 @@ declare module "stream/web" { * Consume as a Blob */ blob(): Promise; + + /** + * Consume as newline-delimited JSON (JSONL/NDJSON). + * + * Returns an async iterable that yields parsed JSON objects for each line. + * + * @example + * ```ts + * for await (const obj of response.body.jsonl()) { + * console.log(obj); + * } + * ``` + */ + jsonl(): AsyncIterable; } } diff --git a/src/bun.js/bindings/ZigGlobalObject.h b/src/bun.js/bindings/ZigGlobalObject.h index 98043cb034..41c0bf219d 100644 --- a/src/bun.js/bindings/ZigGlobalObject.h +++ b/src/bun.js/bindings/ZigGlobalObject.h @@ -464,6 +464,7 @@ public: V(public, WriteBarrier, m_readableStreamToBytes) \ V(public, WriteBarrier, m_readableStreamToBlob) \ V(public, WriteBarrier, m_readableStreamToJSON) \ + V(public, WriteBarrier, m_readableStreamToJSONL) \ V(public, WriteBarrier, m_readableStreamToText) \ V(public, WriteBarrier, m_readableStreamToFormData) \ \ @@ -828,6 +829,7 @@ extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToText(Zig::Global extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToArrayBuffer(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue); extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBytes(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue); extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSON(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue); +extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSONL(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue); extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBlob(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue); #endif diff --git a/src/bun.js/bindings/webcore/JSReadableStream.cpp b/src/bun.js/bindings/webcore/JSReadableStream.cpp index 1b5c4fc84d..a649d7ace1 100644 --- a/src/bun.js/bindings/webcore/JSReadableStream.cpp +++ b/src/bun.js/bindings/webcore/JSReadableStream.cpp @@ -136,6 +136,19 @@ JSC_DEFINE_HOST_FUNCTION(jsReadableStreamProtoFuncBlob, (JSGlobalObject * global return ZigGlobalObject__readableStreamToBlob(defaultGlobalObject(globalObject), JSValue::encode(thisObject)); } + +JSC_DEFINE_HOST_FUNCTION(jsReadableStreamProtoFuncJSONL, (JSGlobalObject * globalObject, CallFrame* callFrame)) +{ + JSReadableStream* thisObject = jsDynamicCast(callFrame->thisValue()); + if (!thisObject) [[unlikely]] { + auto& vm = globalObject->vm(); + auto scope = DECLARE_THROW_SCOPE(vm); + throwThisTypeError(*globalObject, scope, "ReadableStream"_s, "jsonl"_s); + return {}; + } + + return ZigGlobalObject__readableStreamToJSONL(defaultGlobalObject(globalObject), JSValue::encode(thisObject)); +} using JSReadableStreamDOMConstructor = JSDOMBuiltinConstructor; template<> const ClassInfo JSReadableStreamDOMConstructor::s_info = { "ReadableStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSReadableStreamDOMConstructor) }; @@ -169,6 +182,7 @@ static const HashTableValue JSReadableStreamPrototypeTableValues[] = { { "cancel"_s, static_cast(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamCancelCodeGenerator, 0 } }, { "getReader"_s, static_cast(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamGetReaderCodeGenerator, 0 } }, { "json"_s, static_cast(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsReadableStreamProtoFuncJSON, 0 } }, + { "jsonl"_s, static_cast(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsReadableStreamProtoFuncJSONL, 0 } }, { "locked"_s, static_cast(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinAccessorType, readableStreamLockedCodeGenerator, 0 } }, { "pipeThrough"_s, static_cast(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamPipeThroughCodeGenerator, 2 } }, { "pipeTo"_s, static_cast(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamPipeToCodeGenerator, 1 } }, diff --git a/src/bun.js/bindings/webcore/ReadableStream.cpp b/src/bun.js/bindings/webcore/ReadableStream.cpp index f7efa38ace..e0d79d5093 100644 --- a/src/bun.js/bindings/webcore/ReadableStream.cpp +++ b/src/bun.js/bindings/webcore/ReadableStream.cpp @@ -633,6 +633,26 @@ extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBlob(Zig::Global return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments)); } +extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSONL(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue) +{ + auto& vm = JSC::getVM(globalObject); + + JSC::JSFunction* function = nullptr; + if (auto readableStreamToJSONL = globalObject->m_readableStreamToJSONL.get()) { + function = readableStreamToJSONL; + } else { + function = JSFunction::create(vm, globalObject, static_cast(readableStreamReadableStreamToJSONLCodeGenerator(vm)), globalObject); + + globalObject->m_readableStreamToJSONL.set(vm, globalObject, function); + } + + JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer(); + arguments.append(JSValue::decode(readableStreamValue)); + + auto callData = JSC::getCallData(function); + return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments)); +} + JSC_DEFINE_HOST_FUNCTION(functionReadableStreamToArrayBuffer, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame)) { auto& vm = JSC::getVM(globalObject); diff --git a/src/js/builtins/ReadableStream.ts b/src/js/builtins/ReadableStream.ts index c4c17f5cab..4ba4c65bc8 100644 --- a/src/js/builtins/ReadableStream.ts +++ b/src/js/builtins/ReadableStream.ts @@ -330,6 +330,54 @@ export function readableStreamToJSON(stream: ReadableStream): unknown { return text.then(globalThis.JSON.parse); } +$linkTimeConstant; +export function readableStreamToJSONL(stream: ReadableStream): AsyncIterable { + if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); + if ($isReadableStreamLocked(stream)) throw $ERR_INVALID_STATE_TypeError("ReadableStream is locked"); + + var JSONLIterator = async function* JSONLIterator(stream) { + const reader = stream.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + try { + while (true) { + const { done, value } = await reader.read(); + + if (done) { + // Process any remaining content in the buffer + if (buffer.length > 0) { + const trimmed = buffer.trim(); + if (trimmed.length > 0) { + yield globalThis.JSON.parse(trimmed); + } + } + return; + } + + // Decode the chunk and append to buffer + buffer += typeof value === "string" ? value : decoder.decode(value, { stream: true }); + + // Process complete lines + let newlineIndex; + while ((newlineIndex = buffer.indexOf("\n")) !== -1) { + const line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + + const trimmed = line.trim(); + if (trimmed.length > 0) { + yield globalThis.JSON.parse(trimmed); + } + } + } + } finally { + reader.releaseLock(); + } + }; + + return JSONLIterator(stream); +} + $linkTimeConstant; export function readableStreamToBlob(stream: ReadableStream): Promise { if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream); diff --git a/test/js/web/streams/readablestream-jsonl.test.ts b/test/js/web/streams/readablestream-jsonl.test.ts new file mode 100644 index 0000000000..bc79cbd225 --- /dev/null +++ b/test/js/web/streams/readablestream-jsonl.test.ts @@ -0,0 +1,248 @@ +import { describe, expect, test } from "bun:test"; +import { bunEnv, bunExe } from "harness"; + +describe("ReadableStream.jsonl()", () => { + test("basic jsonl parsing", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\n{"b": 2}\n{"c": 3}\n')); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }, { c: 3 }]); + }); + + test("handles chunks split across JSON boundaries", async () => { + const stream = new ReadableStream({ + start(controller) { + // Split {"a": 1}\n{"b": 2} across chunks + controller.enqueue(new TextEncoder().encode('{"a":')); + controller.enqueue(new TextEncoder().encode(' 1}\n{"b"')); + controller.enqueue(new TextEncoder().encode(": 2}\n")); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); + + test("handles trailing content without newline", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\n{"b": 2}')); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); + + test("handles empty lines", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\n\n{"b": 2}\n\n')); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); + + test("handles whitespace-only lines", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\n \n{"b": 2}\n')); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); + + test("parses complex JSON objects", async () => { + const obj1 = { name: "test", values: [1, 2, 3], nested: { a: "b" } }; + const obj2 = { unicode: "日本語", emoji: "🎉" }; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(JSON.stringify(obj1) + "\n" + JSON.stringify(obj2) + "\n")); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([obj1, obj2]); + }); + + test("throws on invalid JSON", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\n{invalid json}\n')); + controller.close(); + }, + }); + + const results: unknown[] = []; + let error: Error | null = null; + try { + for await (const obj of stream.jsonl()) { + results.push(obj); + } + } catch (e) { + error = e as Error; + } + + expect(results).toEqual([{ a: 1 }]); + expect(error).toBeInstanceOf(SyntaxError); + }); + + test("works with Response.body", async () => { + using server = Bun.serve({ + port: 0, + fetch() { + return new Response('{"x": 1}\n{"y": 2}\n{"z": 3}\n', { + headers: { "content-type": "application/x-ndjson" }, + }); + }, + }); + + const response = await fetch(server.url); + const results: unknown[] = []; + for await (const obj of response.body!.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ x: 1 }, { y: 2 }, { z: 3 }]); + }); + + test("throws TypeError for non-ReadableStream", () => { + // @ts-expect-error - testing runtime error + expect(() => ReadableStream.prototype.jsonl.call({})).toThrow(TypeError); + }); + + test("throws TypeError when stream is locked", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + // Lock the stream + stream.getReader(); + + expect(() => stream.jsonl()).toThrow(TypeError); + }); + + test("handles string chunks", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue('{"a": 1}\n'); + controller.enqueue('{"b": 2}\n'); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); + + test("handles empty stream", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([]); + }); + + test("handles stream with only whitespace", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(" \n\n \n")); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + expect(results).toEqual([]); + }); + + test("works with Bun.spawn stdout", async () => { + const jsonData = [{ line: 1 }, { line: 2 }, { line: 3 }]; + const script = jsonData.map(d => `console.log(JSON.stringify(${JSON.stringify(d)}))`).join(";"); + + await using proc = Bun.spawn({ + cmd: [bunExe(), "-e", script], + env: bunEnv, + stdout: "pipe", + }); + + const results: unknown[] = []; + for await (const obj of proc.stdout.jsonl()) { + results.push(obj); + } + + expect(results).toEqual(jsonData); + expect(await proc.exited).toBe(0); + }); + + test("handles CRLF line endings", async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"a": 1}\r\n{"b": 2}\r\n')); + controller.close(); + }, + }); + + const results: unknown[] = []; + for await (const obj of stream.jsonl()) { + results.push(obj); + } + + // The \r will be included in the trim, so this should work + expect(results).toEqual([{ a: 1 }, { b: 2 }]); + }); +});