mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 13:51:47 +00:00
Fix file stream error handling and EPIPE propagation across multiple components
This commit is contained in:
@@ -40,7 +40,7 @@ static StringView extractCookieName(const StringView& cookie)
|
||||
{
|
||||
auto nameEnd = cookie.find('=');
|
||||
if (nameEnd == notFound)
|
||||
return String();
|
||||
return StringView();
|
||||
return cookie.substring(0, nameEnd);
|
||||
}
|
||||
|
||||
|
||||
@@ -484,6 +484,10 @@ pub fn construct(this: *FileSink, _: std.mem.Allocator) void {
|
||||
|
||||
pub fn write(this: *@This(), data: streams.Result) streams.Result.Writable {
|
||||
if (this.done) {
|
||||
// Check if we have a pending PIPE error from onAttachedProcessExit
|
||||
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
|
||||
return .{ .err = this.pending.result.err };
|
||||
}
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
@@ -492,6 +496,10 @@ pub fn write(this: *@This(), data: streams.Result) streams.Result.Writable {
|
||||
pub const writeBytes = write;
|
||||
pub fn writeLatin1(this: *@This(), data: streams.Result) streams.Result.Writable {
|
||||
if (this.done) {
|
||||
// Check if we have a pending PIPE error from onAttachedProcessExit
|
||||
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
|
||||
return .{ .err = this.pending.result.err };
|
||||
}
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
@@ -499,6 +507,10 @@ pub fn writeLatin1(this: *@This(), data: streams.Result) streams.Result.Writable
|
||||
}
|
||||
pub fn writeUTF16(this: *@This(), data: streams.Result) streams.Result.Writable {
|
||||
if (this.done) {
|
||||
// Check if we have a pending PIPE error from onAttachedProcessExit
|
||||
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
|
||||
return .{ .err = this.pending.result.err };
|
||||
}
|
||||
return .{ .done = {} };
|
||||
}
|
||||
|
||||
|
||||
@@ -53,10 +53,8 @@ pub fn PosixPipeWriter(
|
||||
return .{ .pending = offset };
|
||||
}
|
||||
|
||||
if (err.getErrno() == .PIPE) {
|
||||
return .{ .done = offset };
|
||||
}
|
||||
|
||||
// Don't treat EPIPE as done - return it as an error
|
||||
// so it can be properly handled by the stream
|
||||
return .{ .err = err };
|
||||
},
|
||||
|
||||
@@ -326,7 +324,7 @@ pub fn PosixBufferedWriter(Parent: type, function_table: anytype) type {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void {
|
||||
pub fn updateRef(this: *PosixWriter, event_loop: anytype, value: bool) void {
|
||||
const poll = this.getPoll() orelse return;
|
||||
poll.setKeepingProcessAlive(event_loop, value);
|
||||
}
|
||||
|
||||
@@ -559,18 +559,40 @@ function _write(data, encoding, cb) {
|
||||
const fileSink = this[kWriteStreamFastPath];
|
||||
|
||||
if (fileSink && fileSink !== true) {
|
||||
const maybePromise = fileSink.write(data);
|
||||
if ($isPromise(maybePromise)) {
|
||||
maybePromise
|
||||
.then(() => {
|
||||
this.emit("drain"); // Emit drain event
|
||||
cb(null);
|
||||
})
|
||||
.catch(cb);
|
||||
return false; // Indicate backpressure
|
||||
} else {
|
||||
cb(null);
|
||||
return true; // No backpressure
|
||||
try {
|
||||
const maybePromise = fileSink.write(data);
|
||||
if ($isPromise(maybePromise)) {
|
||||
// Check if it's a rejected promise (synchronous error)
|
||||
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
|
||||
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
|
||||
// Synchronous error - get the rejection reason
|
||||
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
cb(err);
|
||||
return false;
|
||||
}
|
||||
// Normal async promise
|
||||
maybePromise
|
||||
.then(() => {
|
||||
this.emit("drain"); // Emit drain event
|
||||
cb(null);
|
||||
})
|
||||
.catch(cb);
|
||||
return false; // Indicate backpressure
|
||||
} else {
|
||||
cb(null);
|
||||
return true; // No backpressure
|
||||
}
|
||||
} catch (err) {
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
cb(err);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
writeAll.$call(this, data, data.length, this.pos, er => {
|
||||
@@ -603,16 +625,38 @@ function underscoreWriteFast(this: FSStream, data: any, encoding: any, cb: any)
|
||||
|
||||
const maybePromise = fileSink.write(data);
|
||||
if ($isPromise(maybePromise)) {
|
||||
maybePromise.then(() => {
|
||||
cb(null);
|
||||
this.emit("drain");
|
||||
}, cb);
|
||||
// Check if it's a rejected promise (synchronous error)
|
||||
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
|
||||
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
|
||||
// Synchronous error - get the rejection reason
|
||||
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
if (cb) cb(err);
|
||||
return false;
|
||||
}
|
||||
// Normal async promise
|
||||
maybePromise.then(
|
||||
() => {
|
||||
cb(null);
|
||||
this.emit("drain");
|
||||
},
|
||||
err => {
|
||||
if (cb) cb(err);
|
||||
},
|
||||
);
|
||||
return false;
|
||||
} else {
|
||||
if (cb) process.nextTick(cb, null);
|
||||
return true;
|
||||
}
|
||||
} catch (e) {
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", e);
|
||||
});
|
||||
if (cb) process.nextTick(cb, e);
|
||||
return false;
|
||||
}
|
||||
@@ -634,18 +678,40 @@ function writeFast(this: FSStream, data: any, encoding: any, cb: any) {
|
||||
|
||||
const fileSink = this[kWriteStreamFastPath];
|
||||
if (fileSink && fileSink !== true) {
|
||||
const maybePromise = fileSink.write(data);
|
||||
if ($isPromise(maybePromise)) {
|
||||
maybePromise
|
||||
.then(() => {
|
||||
this.emit("drain"); // Emit drain event
|
||||
cb(null);
|
||||
})
|
||||
.catch(cb);
|
||||
return false; // Indicate backpressure
|
||||
} else {
|
||||
cb(null);
|
||||
return true; // No backpressure
|
||||
try {
|
||||
const maybePromise = fileSink.write(data);
|
||||
if ($isPromise(maybePromise)) {
|
||||
// Check if it's a rejected promise (synchronous error)
|
||||
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
|
||||
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
|
||||
// Synchronous error - get the rejection reason
|
||||
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
cb(err);
|
||||
return false;
|
||||
}
|
||||
// Normal async promise
|
||||
maybePromise
|
||||
.then(() => {
|
||||
this.emit("drain"); // Emit drain event
|
||||
cb(null);
|
||||
})
|
||||
.catch(cb);
|
||||
return false; // Indicate backpressure
|
||||
} else {
|
||||
cb(null);
|
||||
return true; // No backpressure
|
||||
}
|
||||
} catch (err) {
|
||||
// Emit error event on next tick for consistency with Node.js
|
||||
process.nextTick(() => {
|
||||
this.emit("error", err);
|
||||
});
|
||||
cb(err);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
const result: any = this._write(data, encoding, cb);
|
||||
|
||||
27
test/js/node/test/parallel/test-stdout-close-catch.js
Normal file
27
test/js/node/test/parallel/test-stdout-close-catch.js
Normal file
@@ -0,0 +1,27 @@
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const child_process = require('child_process');
|
||||
const fixtures = require('../common/fixtures');
|
||||
const { getSystemErrorName } = require('util');
|
||||
|
||||
const testScript = fixtures.path('catch-stdout-error.js');
|
||||
|
||||
const child = child_process.exec(
|
||||
...common.escapePOSIXShell`"${process.execPath}" "${testScript}" | "${process.execPath}" -pe "process.stdin.on('data' , () => process.exit(1))"`
|
||||
);
|
||||
let output = '';
|
||||
|
||||
child.stderr.on('data', function(c) {
|
||||
output += c;
|
||||
});
|
||||
|
||||
|
||||
child.on('close', common.mustCall(function(code) {
|
||||
output = JSON.parse(output);
|
||||
|
||||
assert.strictEqual(output.code, 'EPIPE');
|
||||
assert.strictEqual(getSystemErrorName(output.errno), 'EPIPE');
|
||||
assert.strictEqual(output.syscall, 'write');
|
||||
console.log('ok');
|
||||
}));
|
||||
Reference in New Issue
Block a user