Compare commits

...

2 Commits

Author SHA1 Message Date
Jarred Sumner
44b23ac296 Update streams.ts 2025-01-27 22:45:26 -08:00
Jarred Sumner
03e8142608 Update streams.ts 2025-01-27 22:42:53 -08:00

View File

@@ -47,6 +47,7 @@ const kWriteFastSimpleBuffering = Symbol("writeFastSimpleBuffering");
// using `node:fs`, `Bun.file(...).writer()` is used instead.
const kWriteStreamFastPath = Symbol("kWriteStreamFastPath");
const kFs = Symbol("kFs");
const kPendingWrites = Symbol("kPendingWrites");
const {
read: fileHandlePrototypeRead,
@@ -470,6 +471,7 @@ function WriteStream(this: FSStream, path: string | null, options?: any): void {
this.start = start;
this.pos = undefined;
this.bytesWritten = 0;
this[kPendingWrites] = 0;
if (start !== undefined) {
validateInteger(start, "start", 0);
@@ -561,16 +563,19 @@ function _write(data, encoding, cb) {
const fileSink = this[kWriteStreamFastPath];
if (fileSink && fileSink !== true) {
this[kPendingWrites]++;
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
maybePromise
.then(() => {
this[kPendingWrites]--;
this.emit("drain"); // Emit drain event
cb(null);
})
.catch(cb);
return false; // Indicate backpressure
} else {
this[kPendingWrites]--;
cb(null);
return true; // No backpressure
}
@@ -700,16 +705,33 @@ writeStreamPrototype._writev = function (data, cb) {
}
};
writeStreamPrototype._destroy = function (err, cb) {
const sink = this[kWriteStreamFastPath];
function WriteStreamPrototype_close(self: FSStream, err: any, cb: any) {
const sink = self[kWriteStreamFastPath];
if (sink && sink !== true) {
const end = sink.end(err);
const end = err ? sink.end(err) : sink.end();
if ($isPromise(end)) {
end.then(() => cb(err), cb);
end.then(() => {
close(self, err, cb);
});
return;
}
}
close(this, err, cb);
close(self, err, cb);
}
writeStreamPrototype._destroy = function (err, cb) {
// Wait for pending writes to complete before closing
if ((this[kPendingWrites] | 0) > 0) {
const sink = this[kWriteStreamFastPath];
const flushResult = sink.flush();
if ($isPromise(flushResult)) {
flushResult.finally(() => WriteStreamPrototype_close(this, err, cb));
} else {
WriteStreamPrototype_close(this, err, cb);
}
} else {
close(this, err, cb);
}
};
writeStreamPrototype.close = function (this: FSStream, cb) {