diff --git a/src/js/builtins/ConsoleObject.ts b/src/js/builtins/ConsoleObject.ts index 9699c36b33..709c4198c2 100644 --- a/src/js/builtins/ConsoleObject.ts +++ b/src/js/builtins/ConsoleObject.ts @@ -1,54 +1,21 @@ $overriddenName = "[Symbol.asyncIterator]"; export function asyncIterator(this: Console) { - var stream = Bun.stdin.stream(); - const StringDecoder: typeof import("string_decoder").StringDecoder = require("node:string_decoder").StringDecoder; - const decoder = new StringDecoder("utf8"); - var indexOf = Bun.indexOfLine; - var actualChunk: Uint8Array; - var i: number = -1; - var idx: number; - var last: number; - var done: boolean; - var value: Uint8Array[]; - var value_len: number; - var pendingChunk: Uint8Array | undefined; - var reader: ReadableStreamDefaultReader | undefined = stream.getReader(); + async function* ConsoleAsyncIterator() { + var stream = Bun.stdin.stream(); + + const decoder = new StringDecoder("utf8"); + var deferredError; + var done = false; + var value: Uint8Array[]; + var pendingChunk: Uint8Array | undefined; + var reader: ReadableStreamDefaultReader | undefined = stream.getReader(); + try { - if (i !== -1) { - last = i + 1; - i = indexOf(actualChunk, last); - - while (i !== -1) { - yield decoder.write(actualChunk.subarray(last, i)); - last = i + 1; - i = indexOf(actualChunk, last); - } - - for (idx++; idx < value_len; idx++) { - actualChunk = value[idx]; - if (pendingChunk) { - actualChunk = Buffer.concat([pendingChunk, actualChunk]); - pendingChunk = undefined; - } - - last = 0; - // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F - i = indexOf(actualChunk, last); - while (i !== -1) { - yield decoder.write(actualChunk.subarray(last, i)); - last = i + 1; - i = indexOf(actualChunk, last); - } - i = -1; - - pendingChunk = actualChunk.subarray(last); - } - actualChunk = undefined!; - } + var text = ""; while (true) { reader ??= stream.getReader(); @@ -64,36 +31,51 @@ export function asyncIterator(this: Console) { reader = undefined; stream = Bun.stdin.stream(); + if (!done) { + for (let chunk of value) { + text += decoder.write(chunk); + } + } else if (value && value.length > 0) { + const valueLength = value.length - 1; + for (let i = 0; i < valueLength; i++) { + text += decoder.write(value[i]); + } + text += decoder.end(value[valueLength]); + } else { + text += decoder.end(); + } + + { + let i = 0, + textLength = text.length; + + while (i < textLength) { + let j = text.indexOf("\n", i); + if (j === -1) { + break; + } + + const next = j + 1; + if (text[j - 1] === "\r") { + j--; + } + yield text.slice(i, j); + i = next; + } + + if (i < textLength) { + text = text.slice(i); + } else { + text = ""; + } + } + if (done) { - if (pendingChunk) { - yield decoder.write(pendingChunk); + if (text.length > 0) { + yield text; } return; } - - // we assume it was given line-by-line - for (idx = 0, value_len = value.length; idx < value_len; idx++) { - actualChunk = value[idx]; - if (pendingChunk) { - actualChunk = Buffer.concat([pendingChunk, actualChunk]); - pendingChunk = undefined; - } - - last = 0; - // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F - i = indexOf(actualChunk, last); - while (i !== -1) { - // This yield may end the function, in that case we need to be able to recover state - // if the iterator was fired up again. - yield decoder.write(actualChunk.subarray(last, i)); - last = i + 1; - i = indexOf(actualChunk, last); - } - i = -1; - - pendingChunk = actualChunk.subarray(last); - } - actualChunk = undefined!; } } catch (e) { deferredError = e; diff --git a/test/regression/issue/02499.test.ts b/test/regression/issue/02499.test.ts index 0468b06682..d9754366e1 100644 --- a/test/regression/issue/02499.test.ts +++ b/test/regression/issue/02499.test.ts @@ -4,14 +4,9 @@ import { mkdirSync, rmSync, writeFileSync, readFileSync, mkdtempSync } from "fs" import { tmpdir } from "os"; import { dirname, join } from "path"; import { sleep, spawn, spawnSync, which } from "bun"; - +import { StringDecoder } from "node:string_decoder"; // https://github.com/oven-sh/bun/issues/2499 it("onAborted() and onWritable are not called after receiving an empty response body due to a promise rejection", async testDone => { - var timeout = AbortSignal.timeout(10_000); - timeout.onabort = e => { - testDone(new Error("Test timed out, which means it failed")); - }; - const invalidJSON = Buffer.from("invalid json"); // We want to test that the server isn't keeping the connection open in a @@ -45,24 +40,16 @@ it("onAborted() and onWritable are not called after receiving an empty response const reader = bunProcess.stdout.getReader(); let hostname, port; + var text = ""; { - const chunks: Buffer[] = []; - var decoder = new TextDecoder(); + var decoder = new StringDecoder(); while (!hostname && !port) { var { value, done } = await reader.read(); if (done) break; - if (chunks.length > 0) { - chunks.push(value!); - } + text = decoder.write(value!); try { - if (chunks.length > 0) { - value = Buffer.concat(chunks); - } - - ({ hostname, port } = JSON.parse(decoder.decode(value).trim())); - } catch { - chunks.push(value!); - } + ({ hostname, port } = JSON.parse(text.trim())); + } catch {} } } @@ -72,22 +59,20 @@ it("onAborted() and onWritable are not called after receiving an empty response keepalive: false, method: "POST", timeout: true, - signal: timeout, }); } catch (e) {} - bunProcess.stdin?.write("--CLOSE--"); + const wrote = bunProcess.stdin?.write("--CLOSE--"); await bunProcess.stdin?.flush(); await bunProcess.stdin?.end(); expect(await bunProcess.exited).toBe(0); + console.count("Completed"); } catch (e) { - timeout.onabort = () => {}; testDone(e); throw e; } finally { bunProcess?.kill(9); } } - timeout.onabort = () => {}; testDone(); -}, 30_000); +}, 60_000);