Compare commits

...

4 Commits

Author SHA1 Message Date
Jarred Sumner
f711fd6107 Try this 2025-01-26 06:02:59 -08:00
Jarred Sumner
1143da6d76 Update http.ts 2025-01-26 05:42:51 -08:00
Jarred Sumner
66a6bf771b Fix backpressure signaling 2025-01-26 05:40:07 -08:00
Jarred Sumner
71fec1f541 Fix test-http-chunk-problem on macOS 2025-01-26 05:17:01 -08:00
6 changed files with 128 additions and 105 deletions

View File

@@ -3630,10 +3630,17 @@ pub const FileSink = struct {
this.fd = fd;
this.is_socket = std.posix.S.ISSOCK(stat.mode);
this.nonblocking = is_nonblocking_tty or (this.pollable and switch (options.input_path) {
.path => true,
.fd => |fd_| bun.FDTag.get(fd_) == .none,
});
if (!this.nonblocking and this.is_socket) {
if (bun.sys.setNonblocking(fd) == .result) {
this.nonblocking = true;
}
}
},
}
} else if (comptime Environment.isWindows) {

View File

@@ -137,7 +137,7 @@ pub const windows_bunx_fast_path = true;
// This causes strange bugs where writing via console.log (sync) has a different
// order than via Bun.file.writer() so we turn it off until there's a unified,
// buffered writer abstraction shared throughout Bun
pub const nonblocking_stdout_and_stderr_on_posix = false;
pub const nonblocking_stdout_and_stderr_on_posix = true;
pub const postgresql = env.is_canary or env.isDebug;

View File

@@ -55,12 +55,7 @@ export function asyncIterator(this: Console) {
}
while (true) {
const firstResult = reader.readMany();
if ($isPromise(firstResult)) {
({ done, value } = await firstResult);
} else {
({ done, value } = firstResult);
}
({ done, value } = await reader.read());
if (done) {
if (pendingChunk) {
@@ -69,33 +64,30 @@ export function asyncIterator(this: Console) {
return;
}
// we assume it was given line-by-line
for (idx = 0, value_len = value.length; idx < value_len; idx++) {
actualChunk = value[idx];
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
// This yield may end the function, in that case we need to be able to recover state
// if the iterator was fired up again.
yield decoder.decode(
actualChunk.subarray(
last,
process.platform === "win32" ? (actualChunk[i - 1] === 0x0d /* \r */ ? i - 1 : i) : i,
),
);
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
actualChunk = value as unknown as Uint8Array;
if (pendingChunk) {
actualChunk = Buffer.concat([pendingChunk, actualChunk]);
pendingChunk = undefined;
}
last = 0;
// TODO: "\r", 0x4048, 0x4049, 0x404A, 0x404B, 0x404C, 0x404D, 0x404E, 0x404F
i = indexOf(actualChunk, last);
while (i !== -1) {
// This yield may end the function, in that case we need to be able to recover state
// if the iterator was fired up again.
yield decoder.decode(
actualChunk.subarray(
last,
process.platform === "win32" ? (actualChunk[i - 1] === 0x0d /* \r */ ? i - 1 : i) : i,
),
);
last = i + 1;
i = indexOf(actualChunk, last);
}
i = -1;
pendingChunk = actualChunk.subarray(last);
actualChunk = undefined!;
}
} catch (e) {

View File

@@ -101,19 +101,11 @@ class ReadableFromWeb extends Readable {
var deferredError;
try {
do {
var done = false,
value;
const firstResult = reader.readMany();
var { done, value } = await reader.read();
if ($isPromise(firstResult)) {
({ done, value } = await firstResult);
if (this.#closed) {
this.#pendingChunks.push(...value);
return;
}
} else {
({ done, value } = firstResult);
if (this.#closed) {
this.#pendingChunks.push(value);
return;
}
if (done) {
@@ -121,17 +113,9 @@ class ReadableFromWeb extends Readable {
return;
}
if (!this.push(value[0])) {
this.#pendingChunks = value.slice(1);
if (!this.push(value)) {
return;
}
for (let i = 1, count = value.length; i < count; i++) {
if (!this.push(value[i])) {
this.#pendingChunks = value.slice(i + 1);
return;
}
}
} while (!this.#closed);
} catch (e) {
deferredError = e;

View File

@@ -857,7 +857,7 @@ IncomingMessage.prototype = {
this.complete = true;
this.push(null);
} else if (this[bodyStreamSymbol] == null) {
const reader = this[reqSymbol].body?.getReader() as ReadableStreamDefaultReader;
const reader = this[reqSymbol]?.body?.getReader() as ReadableStreamDefaultReader;
if (!reader) {
this.complete = true;
this.push(null);
@@ -968,27 +968,19 @@ $setPrototypeDirect.$call(IncomingMessage.prototype, Readable.prototype);
$setPrototypeDirect.$call(IncomingMessage, Readable);
async function consumeStream(self, reader: ReadableStreamDefaultReader) {
var done = false,
value,
aborted = false;
var aborted = false;
try {
while (true) {
const result = reader.readMany();
if ($isPromise(result)) {
({ done, value } = await result);
} else {
({ done, value } = result);
var { done, value } = await reader.read();
if (done) {
break;
}
if (self.destroyed || (aborted = self[abortedSymbol])) {
break;
}
for (var v of value) {
self.push(v);
}
if (self.destroyed || (aborted = self[abortedSymbol]) || done) {
break;
}
self.push(value);
}
} catch (err) {
if (aborted || self.destroyed) return;
@@ -1246,15 +1238,24 @@ ServerResponse.prototype._implicitHeader = function () {
ServerResponse.prototype._write = function (chunk, encoding, callback) {
if (this[firstWriteSymbol] === undefined && !this.headersSent) {
this[firstWriteSymbol] = chunk;
this[firstWriteSymbol] =
encoding && encoding !== "utf-8" && encoding !== "utf8" && typeof chunk === "string"
? Buffer.from(chunk, encoding)
: chunk;
callback();
return;
}
ensureReadableStreamController.$call(this, controller => {
controller.write(chunk);
callback();
});
ensureReadableStreamController.$call(
this,
(controller, chunk, encoding, callback) => {
controller.write(chunk);
callback();
},
chunk,
encoding,
callback,
);
};
ServerResponse.prototype._writev = function (chunks, callback) {
@@ -1264,18 +1265,30 @@ ServerResponse.prototype._writev = function (chunks, callback) {
return;
}
ensureReadableStreamController.$call(this, controller => {
for (const chunk of chunks) {
controller.write(chunk.chunk);
}
callback();
});
ensureReadableStreamController.$call(
this,
(controller, chunks, _, callback) => {
let promise;
for (const chunk of chunks) {
promise = controller.write(chunk.chunk);
}
if (promise && $isPromise(promise)) {
promise.then(() => {
callback();
});
} else {
callback();
}
},
chunks,
undefined,
callback,
);
};
function ensureReadableStreamController(run) {
function ensureReadableStreamController(run, chunk, encoding, callback) {
const thisController = this[controllerSymbol];
if (thisController) return run(thisController);
if (thisController) return run(thisController, chunk, encoding, callback);
this.headersSent = true;
let firstWrite = this[firstWriteSymbol];
this[controllerSymbol] = undefined;
@@ -1285,9 +1298,18 @@ function ensureReadableStreamController(run) {
type: "direct",
pull: controller => {
this[controllerSymbol] = controller;
if (firstWrite) controller.write(firstWrite);
firstWrite = undefined;
run(controller);
let promise = undefined;
if (firstWrite) {
promise = controller.write(firstWrite);
firstWrite = undefined;
}
if (promise && $isPromise(promise)) {
promise.then(() => {
run(controller, chunk, encoding, callback);
});
} else {
run(controller, chunk, encoding, callback);
}
if (!this[finishedSymbol]) {
const { promise, resolve } = $newPromiseCapability(GlobalPromise);
this[deferredSymbol] = resolve;
@@ -1338,19 +1360,25 @@ ServerResponse.prototype._final = function (callback) {
}
this[finishedSymbol] = true;
ensureReadableStreamController.$call(this, controller => {
controller.end();
if (shouldEmitClose) {
req.complete = true;
process.nextTick(emitRequestCloseNT, req);
}
callback();
const deferred = this[deferredSymbol];
if (deferred) {
this[deferredSymbol] = undefined;
deferred();
}
});
ensureReadableStreamController.$call(
this,
(controller, chunk, encoding, callback) => {
controller.end();
if (shouldEmitClose) {
req.complete = true;
process.nextTick(emitRequestCloseNT, req);
}
callback();
const deferred = this[deferredSymbol];
if (deferred) {
this[deferredSymbol] = undefined;
deferred();
}
},
undefined,
undefined,
callback,
);
};
ServerResponse.prototype.writeProcessing = function () {
@@ -1545,13 +1573,25 @@ class ClientRequest extends OutgoingMessage {
}
_write(chunk, encoding, callback) {
if (this.#controller) {
const controller = this.#controller;
if (controller) {
let promise;
if (typeof chunk === "string") {
this.#controller.write(Buffer.from(chunk, encoding));
if (encoding === "utf-8" || encoding === "utf8" || !encoding) {
promise = controller.write(chunk);
} else {
promise = controller.write(Buffer.from(chunk, encoding));
}
} else {
this.#controller.write(chunk);
promise = controller.write(chunk);
}
if ($isPromise(promise)) {
promise.then(() => {
callback(null);
});
} else {
callback(null);
}
process.nextTick(callback);
return;
}
if (!this.#bodyChunks) {

View File

@@ -3517,8 +3517,8 @@ pub fn dupWithFlags(fd: bun.FileDescriptor, flags: i32) Maybe(bun.FileDescriptor
}
if (flags != 0) {
const fd_flags: ArgType = @intCast(syscall.fcntl(@intCast(out), @as(i32, std.posix.F.GETFD), @as(ArgType, 0)));
_ = syscall.fcntl(@intCast(out), @as(i32, std.posix.F.SETFD), @as(ArgType, @intCast(fd_flags | @as(ArgType, @intCast(flags)))));
const fd_flags: ArgType = @intCast(syscall.fcntl(@intCast(out), @as(i32, std.posix.F.GETFL), @as(ArgType, 0)));
_ = syscall.fcntl(@intCast(out), @as(i32, std.posix.F.SETFL), @as(ArgType, @intCast(fd_flags | @as(ArgType, @intCast(flags)))));
}
return Maybe(bun.FileDescriptor){