From 71fec1f541eceae5df1fdee03a75207d14db2214 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Sun, 26 Jan 2025 05:17:01 -0800 Subject: [PATCH] Fix test-http-chunk-problem on macOS --- src/js/builtins/ConsoleObject.ts | 56 +++++++++++--------------- src/js/internal/webstreams_adapters.ts | 26 +++--------- src/js/node/http.ts | 35 ++++++++-------- 3 files changed, 46 insertions(+), 71 deletions(-) diff --git a/src/js/builtins/ConsoleObject.ts b/src/js/builtins/ConsoleObject.ts index 7377e5710c..6b10d8be17 100644 --- a/src/js/builtins/ConsoleObject.ts +++ b/src/js/builtins/ConsoleObject.ts @@ -55,12 +55,7 @@ export function asyncIterator(this: Console) { } while (true) { - const firstResult = reader.readMany(); - if ($isPromise(firstResult)) { - ({ done, value } = await firstResult); - } else { - ({ done, value } = firstResult); - } + ({ done, value } = await reader.read()); if (done) { if (pendingChunk) { @@ -69,33 +64,30 @@ export function asyncIterator(this: Console) { return; } - // we assume it was given line-by-line - for (idx = 0, value_len = value.length; idx < value_len; idx++) { - actualChunk = value[idx]; - if (pendingChunk) { - actualChunk = Buffer.concat([pendingChunk, actualChunk]); - pendingChunk = undefined; - } - - last = 0; - // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F - i = indexOf(actualChunk, last); - while (i !== -1) { - // This yield may end the function, in that case we need to be able to recover state - // if the iterator was fired up again. - yield decoder.decode( - actualChunk.subarray( - last, - process.platform === "win32" ? (actualChunk[i - 1] === 0x0d /* \r */ ? i - 1 : i) : i, - ), - ); - last = i + 1; - i = indexOf(actualChunk, last); - } - i = -1; - - pendingChunk = actualChunk.subarray(last); + actualChunk = value as unknown as Uint8Array; + if (pendingChunk) { + actualChunk = Buffer.concat([pendingChunk, actualChunk]); + pendingChunk = undefined; } + + last = 0; + // TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F + i = indexOf(actualChunk, last); + while (i !== -1) { + // This yield may end the function, in that case we need to be able to recover state + // if the iterator was fired up again. + yield decoder.decode( + actualChunk.subarray( + last, + process.platform === "win32" ? (actualChunk[i - 1] === 0x0d /* \r */ ? i - 1 : i) : i, + ), + ); + last = i + 1; + i = indexOf(actualChunk, last); + } + i = -1; + + pendingChunk = actualChunk.subarray(last); actualChunk = undefined!; } } catch (e) { diff --git a/src/js/internal/webstreams_adapters.ts b/src/js/internal/webstreams_adapters.ts index f69d1bb1b4..0e9f7e58bb 100644 --- a/src/js/internal/webstreams_adapters.ts +++ b/src/js/internal/webstreams_adapters.ts @@ -101,19 +101,11 @@ class ReadableFromWeb extends Readable { var deferredError; try { do { - var done = false, - value; - const firstResult = reader.readMany(); + var { done, value } = await reader.read(); - if ($isPromise(firstResult)) { - ({ done, value } = await firstResult); - - if (this.#closed) { - this.#pendingChunks.push(...value); - return; - } - } else { - ({ done, value } = firstResult); + if (this.#closed) { + this.#pendingChunks.push(value); + return; } if (done) { @@ -121,17 +113,9 @@ class ReadableFromWeb extends Readable { return; } - if (!this.push(value[0])) { - this.#pendingChunks = value.slice(1); + if (!this.push(value)) { return; } - - for (let i = 1, count = value.length; i < count; i++) { - if (!this.push(value[i])) { - this.#pendingChunks = value.slice(i + 1); - return; - } - } } while (!this.#closed); } catch (e) { deferredError = e; diff --git a/src/js/node/http.ts b/src/js/node/http.ts index 2068f73ecb..96f64f97ae 100644 --- a/src/js/node/http.ts +++ b/src/js/node/http.ts @@ -857,7 +857,7 @@ IncomingMessage.prototype = { this.complete = true; this.push(null); } else if (this[bodyStreamSymbol] == null) { - const reader = this[reqSymbol].body?.getReader() as ReadableStreamDefaultReader; + const reader = this[reqSymbol]?.body?.getReader() as ReadableStreamDefaultReader; if (!reader) { this.complete = true; this.push(null); @@ -968,27 +968,19 @@ $setPrototypeDirect.$call(IncomingMessage.prototype, Readable.prototype); $setPrototypeDirect.$call(IncomingMessage, Readable); async function consumeStream(self, reader: ReadableStreamDefaultReader) { - var done = false, - value, - aborted = false; + var aborted = false; try { while (true) { - const result = reader.readMany(); - if ($isPromise(result)) { - ({ done, value } = await result); - } else { - ({ done, value } = result); + var { done, value } = await reader.read(); + if (done) { + break; } + if (self.destroyed || (aborted = self[abortedSymbol])) { break; } - for (var v of value) { - self.push(v); - } - if (self.destroyed || (aborted = self[abortedSymbol]) || done) { - break; - } + self.push(value); } } catch (err) { if (aborted || self.destroyed) return; @@ -1546,12 +1538,19 @@ class ClientRequest extends OutgoingMessage { _write(chunk, encoding, callback) { if (this.#controller) { + let promise; if (typeof chunk === "string") { - this.#controller.write(Buffer.from(chunk, encoding)); + promise = this.#controller.write(Buffer.from(chunk, encoding)); } else { - this.#controller.write(chunk); + promise = this.#controller.write(chunk); + } + if ($isPromise(promise)) { + promise.then(() => { + callback(null); + }); + } else { + callback(null); } - process.nextTick(callback); return; } if (!this.#bodyChunks) {