mirror of
https://github.com/oven-sh/bun
synced 2026-02-11 11:29:02 +00:00
feat(ReadableStream): add .jsonl() method for parsing newline-delimited JSON
Adds a new `.jsonl()` method to ReadableStream that returns an async iterable
for parsing newline-delimited JSON (JSONL/NDJSON) streams.
Example usage:
```js
for await (const object of response.body.jsonl()) {
console.log(object);
}
```
The implementation:
- Reads from the stream chunk by chunk as text
- Buffers incomplete lines across chunks
- Parses complete lines as JSON and yields each object
- Handles trailing content without a final newline
- Skips empty/whitespace-only lines
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
14
packages/bun-types/overrides.d.ts
vendored
14
packages/bun-types/overrides.d.ts
vendored
@@ -31,6 +31,20 @@ declare module "stream/web" {
|
||||
* Consume as a Blob
|
||||
*/
|
||||
blob(): Promise<Blob>;
|
||||
|
||||
/**
|
||||
* Consume as newline-delimited JSON (JSONL/NDJSON).
|
||||
*
|
||||
* Returns an async iterable that yields parsed JSON objects for each line.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* for await (const obj of response.body.jsonl()) {
|
||||
* console.log(obj);
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
jsonl(): AsyncIterable<unknown>;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -464,6 +464,7 @@ public:
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToBytes) \
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToBlob) \
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToJSON) \
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToJSONL) \
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToText) \
|
||||
V(public, WriteBarrier<JSFunction>, m_readableStreamToFormData) \
|
||||
\
|
||||
@@ -828,6 +829,7 @@ extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToText(Zig::Global
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToArrayBuffer(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue);
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBytes(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue);
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSON(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue);
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSONL(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue);
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBlob(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue);
|
||||
|
||||
#endif
|
||||
|
||||
@@ -136,6 +136,19 @@ JSC_DEFINE_HOST_FUNCTION(jsReadableStreamProtoFuncBlob, (JSGlobalObject * global
|
||||
|
||||
return ZigGlobalObject__readableStreamToBlob(defaultGlobalObject(globalObject), JSValue::encode(thisObject));
|
||||
}
|
||||
|
||||
JSC_DEFINE_HOST_FUNCTION(jsReadableStreamProtoFuncJSONL, (JSGlobalObject * globalObject, CallFrame* callFrame))
|
||||
{
|
||||
JSReadableStream* thisObject = jsDynamicCast<JSReadableStream*>(callFrame->thisValue());
|
||||
if (!thisObject) [[unlikely]] {
|
||||
auto& vm = globalObject->vm();
|
||||
auto scope = DECLARE_THROW_SCOPE(vm);
|
||||
throwThisTypeError(*globalObject, scope, "ReadableStream"_s, "jsonl"_s);
|
||||
return {};
|
||||
}
|
||||
|
||||
return ZigGlobalObject__readableStreamToJSONL(defaultGlobalObject(globalObject), JSValue::encode(thisObject));
|
||||
}
|
||||
using JSReadableStreamDOMConstructor = JSDOMBuiltinConstructor<JSReadableStream>;
|
||||
|
||||
template<> const ClassInfo JSReadableStreamDOMConstructor::s_info = { "ReadableStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSReadableStreamDOMConstructor) };
|
||||
@@ -169,6 +182,7 @@ static const HashTableValue JSReadableStreamPrototypeTableValues[] = {
|
||||
{ "cancel"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamCancelCodeGenerator, 0 } },
|
||||
{ "getReader"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamGetReaderCodeGenerator, 0 } },
|
||||
{ "json"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsReadableStreamProtoFuncJSON, 0 } },
|
||||
{ "jsonl"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function), NoIntrinsic, { HashTableValue::NativeFunctionType, jsReadableStreamProtoFuncJSONL, 0 } },
|
||||
{ "locked"_s, static_cast<unsigned>(JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinAccessorType, readableStreamLockedCodeGenerator, 0 } },
|
||||
{ "pipeThrough"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamPipeThroughCodeGenerator, 2 } },
|
||||
{ "pipeTo"_s, static_cast<unsigned>(JSC::PropertyAttribute::Function | JSC::PropertyAttribute::Builtin), NoIntrinsic, { HashTableValue::BuiltinGeneratorType, readableStreamPipeToCodeGenerator, 1 } },
|
||||
|
||||
@@ -633,6 +633,26 @@ extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToBlob(Zig::Global
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
|
||||
extern "C" JSC::EncodedJSValue ZigGlobalObject__readableStreamToJSONL(Zig::GlobalObject* globalObject, JSC::EncodedJSValue readableStreamValue)
|
||||
{
|
||||
auto& vm = JSC::getVM(globalObject);
|
||||
|
||||
JSC::JSFunction* function = nullptr;
|
||||
if (auto readableStreamToJSONL = globalObject->m_readableStreamToJSONL.get()) {
|
||||
function = readableStreamToJSONL;
|
||||
} else {
|
||||
function = JSFunction::create(vm, globalObject, static_cast<JSC::FunctionExecutable*>(readableStreamReadableStreamToJSONLCodeGenerator(vm)), globalObject);
|
||||
|
||||
globalObject->m_readableStreamToJSONL.set(vm, globalObject, function);
|
||||
}
|
||||
|
||||
JSC::MarkedArgumentBuffer arguments = JSC::MarkedArgumentBuffer();
|
||||
arguments.append(JSValue::decode(readableStreamValue));
|
||||
|
||||
auto callData = JSC::getCallData(function);
|
||||
return JSC::JSValue::encode(call(globalObject, function, callData, JSC::jsUndefined(), arguments));
|
||||
}
|
||||
|
||||
JSC_DEFINE_HOST_FUNCTION(functionReadableStreamToArrayBuffer, (JSC::JSGlobalObject * globalObject, JSC::CallFrame* callFrame))
|
||||
{
|
||||
auto& vm = JSC::getVM(globalObject);
|
||||
|
||||
@@ -330,6 +330,54 @@ export function readableStreamToJSON(stream: ReadableStream): unknown {
|
||||
return text.then(globalThis.JSON.parse);
|
||||
}
|
||||
|
||||
$linkTimeConstant;
|
||||
export function readableStreamToJSONL(stream: ReadableStream): AsyncIterable<unknown> {
|
||||
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
||||
if ($isReadableStreamLocked(stream)) throw $ERR_INVALID_STATE_TypeError("ReadableStream is locked");
|
||||
|
||||
var JSONLIterator = async function* JSONLIterator(stream) {
|
||||
const reader = stream.getReader();
|
||||
const decoder = new TextDecoder("utf-8");
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
|
||||
if (done) {
|
||||
// Process any remaining content in the buffer
|
||||
if (buffer.length > 0) {
|
||||
const trimmed = buffer.trim();
|
||||
if (trimmed.length > 0) {
|
||||
yield globalThis.JSON.parse(trimmed);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Decode the chunk and append to buffer
|
||||
buffer += typeof value === "string" ? value : decoder.decode(value, { stream: true });
|
||||
|
||||
// Process complete lines
|
||||
let newlineIndex;
|
||||
while ((newlineIndex = buffer.indexOf("\n")) !== -1) {
|
||||
const line = buffer.slice(0, newlineIndex);
|
||||
buffer = buffer.slice(newlineIndex + 1);
|
||||
|
||||
const trimmed = line.trim();
|
||||
if (trimmed.length > 0) {
|
||||
yield globalThis.JSON.parse(trimmed);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
};
|
||||
|
||||
return JSONLIterator(stream);
|
||||
}
|
||||
|
||||
$linkTimeConstant;
|
||||
export function readableStreamToBlob(stream: ReadableStream): Promise<Blob> {
|
||||
if (!$isReadableStream(stream)) throw $ERR_INVALID_ARG_TYPE("stream", "ReadableStream", typeof stream);
|
||||
|
||||
248
test/js/web/streams/readablestream-jsonl.test.ts
Normal file
248
test/js/web/streams/readablestream-jsonl.test.ts
Normal file
@@ -0,0 +1,248 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { bunEnv, bunExe } from "harness";
|
||||
|
||||
describe("ReadableStream.jsonl()", () => {
|
||||
test("basic jsonl parsing", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\n{"b": 2}\n{"c": 3}\n'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }, { c: 3 }]);
|
||||
});
|
||||
|
||||
test("handles chunks split across JSON boundaries", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
// Split {"a": 1}\n{"b": 2} across chunks
|
||||
controller.enqueue(new TextEncoder().encode('{"a":'));
|
||||
controller.enqueue(new TextEncoder().encode(' 1}\n{"b"'));
|
||||
controller.enqueue(new TextEncoder().encode(": 2}\n"));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
|
||||
test("handles trailing content without newline", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\n{"b": 2}'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
|
||||
test("handles empty lines", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\n\n{"b": 2}\n\n'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
|
||||
test("handles whitespace-only lines", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\n \n{"b": 2}\n'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
|
||||
test("parses complex JSON objects", async () => {
|
||||
const obj1 = { name: "test", values: [1, 2, 3], nested: { a: "b" } };
|
||||
const obj2 = { unicode: "日本語", emoji: "🎉" };
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode(JSON.stringify(obj1) + "\n" + JSON.stringify(obj2) + "\n"));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([obj1, obj2]);
|
||||
});
|
||||
|
||||
test("throws on invalid JSON", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\n{invalid json}\n'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
let error: Error | null = null;
|
||||
try {
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
} catch (e) {
|
||||
error = e as Error;
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }]);
|
||||
expect(error).toBeInstanceOf(SyntaxError);
|
||||
});
|
||||
|
||||
test("works with Response.body", async () => {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch() {
|
||||
return new Response('{"x": 1}\n{"y": 2}\n{"z": 3}\n', {
|
||||
headers: { "content-type": "application/x-ndjson" },
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
const response = await fetch(server.url);
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of response.body!.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ x: 1 }, { y: 2 }, { z: 3 }]);
|
||||
});
|
||||
|
||||
test("throws TypeError for non-ReadableStream", () => {
|
||||
// @ts-expect-error - testing runtime error
|
||||
expect(() => ReadableStream.prototype.jsonl.call({})).toThrow(TypeError);
|
||||
});
|
||||
|
||||
test("throws TypeError when stream is locked", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
// Lock the stream
|
||||
stream.getReader();
|
||||
|
||||
expect(() => stream.jsonl()).toThrow(TypeError);
|
||||
});
|
||||
|
||||
test("handles string chunks", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue('{"a": 1}\n');
|
||||
controller.enqueue('{"b": 2}\n');
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
|
||||
test("handles empty stream", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([]);
|
||||
});
|
||||
|
||||
test("handles stream with only whitespace", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode(" \n\n \n"));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual([]);
|
||||
});
|
||||
|
||||
test("works with Bun.spawn stdout", async () => {
|
||||
const jsonData = [{ line: 1 }, { line: 2 }, { line: 3 }];
|
||||
const script = jsonData.map(d => `console.log(JSON.stringify(${JSON.stringify(d)}))`).join(";");
|
||||
|
||||
await using proc = Bun.spawn({
|
||||
cmd: [bunExe(), "-e", script],
|
||||
env: bunEnv,
|
||||
stdout: "pipe",
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of proc.stdout.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
expect(results).toEqual(jsonData);
|
||||
expect(await proc.exited).toBe(0);
|
||||
});
|
||||
|
||||
test("handles CRLF line endings", async () => {
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode('{"a": 1}\r\n{"b": 2}\r\n'));
|
||||
controller.close();
|
||||
},
|
||||
});
|
||||
|
||||
const results: unknown[] = [];
|
||||
for await (const obj of stream.jsonl()) {
|
||||
results.push(obj);
|
||||
}
|
||||
|
||||
// The \r will be included in the trim, so this should work
|
||||
expect(results).toEqual([{ a: 1 }, { b: 2 }]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user