Compare commits

...

6 Commits

Author SHA1 Message Date
Jarred Sumner
3d78b2bd7e Use StringDecoder 2023-12-12 01:17:45 -08:00
Jarred Sumner
ea585803e7 Merge branch 'main' into jarred/fix-console-stuck 2023-12-12 09:18:17 +01:00
Jarred Sumner
44e40ac44f Merge branch 'main' into jarred/fix-console-stuck 2023-12-12 07:30:52 +01:00
Jarred Sumner
9306d5fd1d Merge branch 'main' into jarred/fix-console-stuck 2023-12-07 09:46:33 +01:00
Jarred Sumner
19cbf7e177 Update 02499.test.ts 2023-12-06 23:27:22 -08:00
Jarred Sumner
c271e7af70 Fixes https://github.com/oven-sh/bun/issues/7398 2023-12-06 08:50:02 -08:00
2 changed files with 70 additions and 96 deletions

View File

@@ -1,97 +1,86 @@
$overriddenName = "[Symbol.asyncIterator]";
export function asyncIterator(this: Console) {
var stream = Bun.stdin.stream();
var decoder = new TextDecoder("utf-8", { fatal: false });
const StringDecoder: typeof import("string_decoder").StringDecoder = require("node:string_decoder").StringDecoder;
var indexOf = Bun.indexOfLine;
var actualChunk: Uint8Array;
var i: number = -1;
var idx: number;
var last: number;
var done: boolean;
var value: Uint8Array[];
var value_len: number;
var pendingChunk: Uint8Array | undefined;
async function* ConsoleAsyncIterator() {
var reader = stream.getReader();
var stream = Bun.stdin.stream();
const decoder = new StringDecoder("utf8");
var deferredError;
var done = false;
var value: Uint8Array[];
var pendingChunk: Uint8Array | undefined;
var reader: ReadableStreamDefaultReader | undefined = stream.getReader();
try {
if (i !== -1) {
last = i + 1;
i = indexOf(actualChunk, last);
while (i !== -1) {
yield decoder.decode(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
for (idx++; idx < value_len; idx++) {
actualChunk = value[idx];
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
yield decoder.decode(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
}
actualChunk = undefined!;
}
var text = "";
while (true) {
const firstResult = reader.readMany();
reader ??= stream.getReader();
const firstResult = reader!.readMany();
if ($isPromise(firstResult)) {
({ done, value } = await firstResult);
} else {
({ done, value } = firstResult);
}
reader.releaseLock();
stream.cancel().finally(() => {});
reader = undefined;
stream = Bun.stdin.stream();
if (!done) {
for (let chunk of value) {
text += decoder.write(chunk);
}
} else if (value && value.length > 0) {
const valueLength = value.length - 1;
for (let i = 0; i < valueLength; i++) {
text += decoder.write(value[i]);
}
text += decoder.end(value[valueLength]);
} else {
text += decoder.end();
}
{
let i = 0,
textLength = text.length;
while (i < textLength) {
let j = text.indexOf("\n", i);
if (j === -1) {
break;
}
const next = j + 1;
if (text[j - 1] === "\r") {
j--;
}
yield text.slice(i, j);
i = next;
}
if (i < textLength) {
text = text.slice(i);
} else {
text = "";
}
}
if (done) {
if (pendingChunk) {
yield decoder.decode(pendingChunk);
if (text.length > 0) {
yield text;
}
return;
}
// we assume it was given line-by-line
for (idx = 0, value_len = value.length; idx < value_len; idx++) {
actualChunk = value[idx];
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
// This yield may end the function, in that case we need to be able to recover state
// if the iterator was fired up again.
yield decoder.decode(actualChunk.subarray(last, i));
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
}
actualChunk = undefined!;
}
} catch (e) {
deferredError = e;
} finally {
reader.releaseLock();
reader?.releaseLock?.();
if (deferredError) {
throw deferredError;

View File

@@ -4,14 +4,9 @@ import { mkdirSync, rmSync, writeFileSync, readFileSync, mkdtempSync } from "fs"
import { tmpdir } from "os";
import { dirname, join } from "path";
import { sleep, spawn, spawnSync, which } from "bun";
import { StringDecoder } from "node:string_decoder";
// https://github.com/oven-sh/bun/issues/2499
it("onAborted() and onWritable are not called after receiving an empty response body due to a promise rejection", async testDone => {
var timeout = AbortSignal.timeout(10_000);
timeout.onabort = e => {
testDone(new Error("Test timed out, which means it failed"));
};
const invalidJSON = Buffer.from("invalid json");
// We want to test that the server isn't keeping the connection open in a
@@ -38,31 +33,23 @@ it("onAborted() and onWritable are not called after receiving an empty response
bunProcess = spawn({
cmd: [bunExe(), "run", join(import.meta.dir, "./02499-repro.ts")],
stdin: "pipe",
stderr: "ignore",
stderr: "inherit",
stdout: "pipe",
env: bunEnv,
});
const reader = bunProcess.stdout.getReader();
let hostname, port;
var text = "";
{
const chunks: Buffer[] = [];
var decoder = new TextDecoder();
var decoder = new StringDecoder();
while (!hostname && !port) {
var { value, done } = await reader.read();
if (done) break;
if (chunks.length > 0) {
chunks.push(value!);
}
text = decoder.write(value!);
try {
if (chunks.length > 0) {
value = Buffer.concat(chunks);
}
({ hostname, port } = JSON.parse(decoder.decode(value).trim()));
} catch {
chunks.push(value!);
}
({ hostname, port } = JSON.parse(text.trim()));
} catch {}
}
}
@@ -72,22 +59,20 @@ it("onAborted() and onWritable are not called after receiving an empty response
keepalive: false,
method: "POST",
timeout: true,
signal: timeout,
});
} catch (e) {}
bunProcess.stdin?.write("--CLOSE--");
const wrote = bunProcess.stdin?.write("--CLOSE--");
await bunProcess.stdin?.flush();
await bunProcess.stdin?.end();
expect(await bunProcess.exited).toBe(0);
console.count("Completed");
} catch (e) {
timeout.onabort = () => {};
testDone(e);
throw e;
} finally {
bunProcess?.kill(9);
}
}
timeout.onabort = () => {};
testDone();
}, 30_000);
}, 60_000);