Use StringDecoder

This commit is contained in:
Jarred Sumner
2023-12-12 01:17:45 -08:00
parent ea585803e7
commit 3d78b2bd7e
2 changed files with 61 additions and 94 deletions

View File

@@ -1,54 +1,21 @@
$overriddenName = "[Symbol.asyncIterator]";
export function asyncIterator(this: Console) {
var stream = Bun.stdin.stream();
const StringDecoder: typeof import("string_decoder").StringDecoder = require("node:string_decoder").StringDecoder;
const decoder = new StringDecoder("utf8");
var indexOf = Bun.indexOfLine;
var actualChunk: Uint8Array;
var i: number = -1;
var idx: number;
var last: number;
var done: boolean;
var value: Uint8Array[];
var value_len: number;
var pendingChunk: Uint8Array | undefined;
var reader: ReadableStreamDefaultReader | undefined = stream.getReader();
async function* ConsoleAsyncIterator() {
var stream = Bun.stdin.stream();
const decoder = new StringDecoder("utf8");
var deferredError;
var done = false;
var value: Uint8Array[];
var pendingChunk: Uint8Array | undefined;
var reader: ReadableStreamDefaultReader | undefined = stream.getReader();
try {
if (i !== -1) {
last = i + 1;
i = indexOf(actualChunk, last);
while (i !== -1) {
yield decoder.write(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
for (idx++; idx < value_len; idx++) {
actualChunk = value[idx];
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
yield decoder.write(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
}
actualChunk = undefined!;
}
var text = "";
while (true) {
reader ??= stream.getReader();
@@ -64,36 +31,51 @@ export function asyncIterator(this: Console) {
reader = undefined;
stream = Bun.stdin.stream();
if (!done) {
for (let chunk of value) {
text += decoder.write(chunk);
}
} else if (value && value.length > 0) {
const valueLength = value.length - 1;
for (let i = 0; i < valueLength; i++) {
text += decoder.write(value[i]);
}
text += decoder.end(value[valueLength]);
} else {
text += decoder.end();
}
{
let i = 0,
textLength = text.length;
while (i < textLength) {
let j = text.indexOf("\n", i);
if (j === -1) {
break;
}
const next = j + 1;
if (text[j - 1] === "\r") {
j--;
}
yield text.slice(i, j);
i = next;
}
if (i < textLength) {
text = text.slice(i);
} else {
text = "";
}
}
if (done) {
if (pendingChunk) {
yield decoder.write(pendingChunk);
if (text.length > 0) {
yield text;
}
return;
}
// we assume it was given line-by-line
for (idx = 0, value_len = value.length; idx < value_len; idx++) {
actualChunk = value[idx];
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
// This yield may end the function, in that case we need to be able to recover state
// if the iterator was fired up again.
yield decoder.write(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
}
actualChunk = undefined!;
}
} catch (e) {
deferredError = e;

View File

@@ -4,14 +4,9 @@ import { mkdirSync, rmSync, writeFileSync, readFileSync, mkdtempSync } from "fs"
import { tmpdir } from "os";
import { dirname, join } from "path";
import { sleep, spawn, spawnSync, which } from "bun";
import { StringDecoder } from "node:string_decoder";
// https://github.com/oven-sh/bun/issues/2499
it("onAborted() and onWritable are not called after receiving an empty response body due to a promise rejection", async testDone => {
var timeout = AbortSignal.timeout(10_000);
timeout.onabort = e => {
testDone(new Error("Test timed out, which means it failed"));
};
const invalidJSON = Buffer.from("invalid json");
// We want to test that the server isn't keeping the connection open in a
@@ -45,24 +40,16 @@ it("onAborted() and onWritable are not called after receiving an empty response
const reader = bunProcess.stdout.getReader();
let hostname, port;
var text = "";
{
const chunks: Buffer[] = [];
var decoder = new TextDecoder();
var decoder = new StringDecoder();
while (!hostname && !port) {
var { value, done } = await reader.read();
if (done) break;
if (chunks.length > 0) {
chunks.push(value!);
}
text = decoder.write(value!);
try {
if (chunks.length > 0) {
value = Buffer.concat(chunks);
}
({ hostname, port } = JSON.parse(decoder.decode(value).trim()));
} catch {
chunks.push(value!);
}
({ hostname, port } = JSON.parse(text.trim()));
} catch {}
}
}
@@ -72,22 +59,20 @@ it("onAborted() and onWritable are not called after receiving an empty response
keepalive: false,
method: "POST",
timeout: true,
signal: timeout,
});
} catch (e) {}
bunProcess.stdin?.write("--CLOSE--");
const wrote = bunProcess.stdin?.write("--CLOSE--");
await bunProcess.stdin?.flush();
await bunProcess.stdin?.end();
expect(await bunProcess.exited).toBe(0);
console.count("Completed");
} catch (e) {
timeout.onabort = () => {};
testDone(e);
throw e;
} finally {
bunProcess?.kill(9);
}
}
timeout.onabort = () => {};
testDone();
}, 30_000);
}, 60_000);