Compare commits

...

1 Commits

Author SHA1 Message Date
Cursor Agent
9a1e0754ef Fix file stream error handling and EPIPE propagation across multiple components 2025-06-06 00:02:46 +00:00
5 changed files with 137 additions and 34 deletions

View File

@@ -40,7 +40,7 @@ static StringView extractCookieName(const StringView& cookie)
{
auto nameEnd = cookie.find('=');
if (nameEnd == notFound)
return String();
return StringView();
return cookie.substring(0, nameEnd);
}

View File

@@ -484,6 +484,10 @@ pub fn construct(this: *FileSink, _: std.mem.Allocator) void {
pub fn write(this: *@This(), data: streams.Result) streams.Result.Writable {
if (this.done) {
// Check if we have a pending PIPE error from onAttachedProcessExit
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
return .{ .err = this.pending.result.err };
}
return .{ .done = {} };
}
@@ -492,6 +496,10 @@ pub fn write(this: *@This(), data: streams.Result) streams.Result.Writable {
pub const writeBytes = write;
pub fn writeLatin1(this: *@This(), data: streams.Result) streams.Result.Writable {
if (this.done) {
// Check if we have a pending PIPE error from onAttachedProcessExit
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
return .{ .err = this.pending.result.err };
}
return .{ .done = {} };
}
@@ -499,6 +507,10 @@ pub fn writeLatin1(this: *@This(), data: streams.Result) streams.Result.Writable
}
pub fn writeUTF16(this: *@This(), data: streams.Result) streams.Result.Writable {
if (this.done) {
// Check if we have a pending PIPE error from onAttachedProcessExit
if (this.pending.result == .err and this.pending.result.err.getErrno() == .PIPE) {
return .{ .err = this.pending.result.err };
}
return .{ .done = {} };
}

View File

@@ -53,10 +53,8 @@ pub fn PosixPipeWriter(
return .{ .pending = offset };
}
if (err.getErrno() == .PIPE) {
return .{ .done = offset };
}
// Don't treat EPIPE as done - return it as an error
// so it can be properly handled by the stream
return .{ .err = err };
},
@@ -326,7 +324,7 @@ pub fn PosixBufferedWriter(Parent: type, function_table: anytype) type {
}
}
pub fn updateRef(this: *const PosixWriter, event_loop: anytype, value: bool) void {
pub fn updateRef(this: *PosixWriter, event_loop: anytype, value: bool) void {
const poll = this.getPoll() orelse return;
poll.setKeepingProcessAlive(event_loop, value);
}

View File

@@ -559,18 +559,40 @@ function _write(data, encoding, cb) {
const fileSink = this[kWriteStreamFastPath];
if (fileSink && fileSink !== true) {
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
maybePromise
.then(() => {
this.emit("drain"); // Emit drain event
cb(null);
})
.catch(cb);
return false; // Indicate backpressure
} else {
cb(null);
return true; // No backpressure
try {
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
// Check if it's a rejected promise (synchronous error)
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
// Synchronous error - get the rejection reason
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", err);
});
cb(err);
return false;
}
// Normal async promise
maybePromise
.then(() => {
this.emit("drain"); // Emit drain event
cb(null);
})
.catch(cb);
return false; // Indicate backpressure
} else {
cb(null);
return true; // No backpressure
}
} catch (err) {
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", err);
});
cb(err);
return false;
}
} else {
writeAll.$call(this, data, data.length, this.pos, er => {
@@ -603,16 +625,38 @@ function underscoreWriteFast(this: FSStream, data: any, encoding: any, cb: any)
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
maybePromise.then(() => {
cb(null);
this.emit("drain");
}, cb);
// Check if it's a rejected promise (synchronous error)
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
// Synchronous error - get the rejection reason
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", err);
});
if (cb) cb(err);
return false;
}
// Normal async promise
maybePromise.then(
() => {
cb(null);
this.emit("drain");
},
err => {
if (cb) cb(err);
},
);
return false;
} else {
if (cb) process.nextTick(cb, null);
return true;
}
} catch (e) {
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", e);
});
if (cb) process.nextTick(cb, e);
return false;
}
@@ -634,18 +678,40 @@ function writeFast(this: FSStream, data: any, encoding: any, cb: any) {
const fileSink = this[kWriteStreamFastPath];
if (fileSink && fileSink !== true) {
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
maybePromise
.then(() => {
this.emit("drain"); // Emit drain event
cb(null);
})
.catch(cb);
return false; // Indicate backpressure
} else {
cb(null);
return true; // No backpressure
try {
const maybePromise = fileSink.write(data);
if ($isPromise(maybePromise)) {
// Check if it's a rejected promise (synchronous error)
const state = $getPromiseInternalField(maybePromise, $promiseFieldFlags);
if ((state & $promiseStateFulfilled) === $promiseStateRejected) {
// Synchronous error - get the rejection reason
const err = $getPromiseInternalField(maybePromise, $promiseFieldReactionsOrResult);
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", err);
});
cb(err);
return false;
}
// Normal async promise
maybePromise
.then(() => {
this.emit("drain"); // Emit drain event
cb(null);
})
.catch(cb);
return false; // Indicate backpressure
} else {
cb(null);
return true; // No backpressure
}
} catch (err) {
// Emit error event on next tick for consistency with Node.js
process.nextTick(() => {
this.emit("error", err);
});
cb(err);
return false;
}
} else {
const result: any = this._write(data, encoding, cb);

View File

@@ -0,0 +1,27 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const child_process = require('child_process');
const fixtures = require('../common/fixtures');
const { getSystemErrorName } = require('util');
const testScript = fixtures.path('catch-stdout-error.js');
const child = child_process.exec(
...common.escapePOSIXShell`"${process.execPath}" "${testScript}" | "${process.execPath}" -pe "process.stdin.on('data' , () => process.exit(1))"`
);
let output = '';
child.stderr.on('data', function(c) {
output += c;
});
child.on('close', common.mustCall(function(code) {
output = JSON.parse(output);
assert.strictEqual(output.code, 'EPIPE');
assert.strictEqual(getSystemErrorName(output.errno), 'EPIPE');
assert.strictEqual(output.syscall, 'write');
console.log('ok');
}));