fix stdin stream unref and resuming (#4250)

* fix stream unref and resuming stream

* fix `child-process-stdio` test
This commit is contained in:
Dylan Conway
2023-08-21 23:39:41 -07:00
committed by GitHub
parent 3a45f2c71b
commit 44e4d5852a
4 changed files with 26 additions and 19 deletions

View File

@@ -87,11 +87,8 @@ export function getStdioWriteStream(fd) {
}
export function getStdinStream(fd) {
var { destroy } = require("node:stream");
var reader: ReadableStreamDefaultReader | undefined;
var readerRef;
var unrefOnRead = false;
function ref() {
reader ??= Bun.stdin.stream().getReader();
// TODO: remove this. likely we are dereferencing the stream
@@ -104,6 +101,10 @@ export function getStdinStream(fd) {
clearInterval(readerRef);
readerRef = undefined;
}
if (reader) {
reader.cancel();
reader = undefined;
}
}
const tty = require("node:tty");
@@ -123,7 +124,6 @@ export function getStdinStream(fd) {
// and does not apply to the underlying Stream implementation.
if (event === "readable") {
ref();
unrefOnRead = true;
}
return originalOn.call(this, event, listener);
};
@@ -163,7 +163,7 @@ export function getStdinStream(fd) {
stream.push(value[i]);
}
} else {
stream.push(null);
stream.emit("end");
stream.pause();
}
} catch (err) {
@@ -172,22 +172,28 @@ export function getStdinStream(fd) {
}
stream._read = function (size) {
if (unrefOnRead) {
unref();
unrefOnRead = false;
}
internalRead(this);
};
stream.on("resume", () => {
ref();
stream._undestroy();
});
stream._readableState.reading = false;
stream.on("pause", () => {
process.nextTick(() => {
destroy(stream);
if (!stream.readableFlowing) {
stream._readableState.reading = false;
}
});
});
stream.on("close", () => {
process.nextTick(() => {
reader?.cancel();
stream.destroy();
unref();
});
});

View File

@@ -1608,11 +1608,12 @@ var _Interface = class Interface extends InterfaceConstructor {
}
[kSetRawMode](flag) {
const mode = flag ? 1 : 0;
const mode = flag + 0;
const wasInRawMode = this.input.isRaw;
if (typeof this.input.setRawMode === "function") {
this.input.setRawMode(mode);
var setRawMode = this.input.setRawMode;
if (typeof setRawMode === "function") {
setRawMode.call(this.input, mode);
}
return wasInRawMode;

File diff suppressed because one or more lines are too long

View File

@@ -654,9 +654,9 @@ const char* const s_processObjectInternalsGetStdioWriteStreamCode = "(function (
const JSC::ConstructAbility s_processObjectInternalsGetStdinStreamCodeConstructAbility = JSC::ConstructAbility::CannotConstruct;
const JSC::ConstructorKind s_processObjectInternalsGetStdinStreamCodeConstructorKind = JSC::ConstructorKind::None;
const JSC::ImplementationVisibility s_processObjectInternalsGetStdinStreamCodeImplementationVisibility = JSC::ImplementationVisibility::Public;
const int s_processObjectInternalsGetStdinStreamCodeLength = 1358;
const int s_processObjectInternalsGetStdinStreamCodeLength = 1386;
static const JSC::Intrinsic s_processObjectInternalsGetStdinStreamCodeIntrinsic = JSC::NoIntrinsic;
const char* const s_processObjectInternalsGetStdinStreamCode = "(function (fd){\"use strict\";var{destroy}=@getInternalField(@internalModuleRegistry,35)||@createInternalModuleById(35),reader,readerRef,unrefOnRead=!1;function ref(){reader\?\?=@Bun.stdin.stream().getReader(),readerRef\?\?=setInterval(()=>{},1<<30)}function unref(){if(readerRef)clearInterval(readerRef),readerRef=@undefined}const stream=new((@getInternalField(@internalModuleRegistry,42))||(@createInternalModuleById(42))).ReadStream(fd),originalOn=stream.on;stream.on=function(event,listener){if(event===\"readable\")ref(),unrefOnRead=!0;return originalOn.call(this,event,listener)},stream.fd=fd;const originalPause=stream.pause;stream.pause=function(){return unref(),originalPause.call(this)};const originalResume=stream.resume;stream.resume=function(){return ref(),originalResume.call(this)};async function internalRead(stream2){try{var done,value;const read=reader\?.readMany();if(@isPromise(read))({done,value}=await read);else({done,value}=read);if(!done){stream2.push(value[0]);const length=value.length;for(let i=1;i<length;i++)stream2.push(value[i])}else stream2.push(null),stream2.pause()}catch(err){stream2.destroy(err)}}return stream._read=function(size){if(unrefOnRead)unref(),unrefOnRead=!1;internalRead(this)},stream.on(\"pause\",()=>{process.nextTick(()=>{destroy(stream)})}),stream.on(\"close\",()=>{process.nextTick(()=>{reader\?.cancel()})}),stream})\n";
const char* const s_processObjectInternalsGetStdinStreamCode = "(function (fd){\"use strict\";var reader,readerRef;function ref(){reader\?\?=@Bun.stdin.stream().getReader(),readerRef\?\?=setInterval(()=>{},1<<30)}function unref(){if(readerRef)clearInterval(readerRef),readerRef=@undefined;if(reader)reader.cancel(),reader=@undefined}const stream=new((@getInternalField(@internalModuleRegistry,42))||(@createInternalModuleById(42))).ReadStream(fd),originalOn=stream.on;stream.on=function(event,listener){if(event===\"readable\")ref();return originalOn.call(this,event,listener)},stream.fd=fd;const originalPause=stream.pause;stream.pause=function(){return unref(),originalPause.call(this)};const originalResume=stream.resume;stream.resume=function(){return ref(),originalResume.call(this)};async function internalRead(stream2){try{var done,value;const read=reader\?.readMany();if(@isPromise(read))({done,value}=await read);else({done,value}=read);if(!done){stream2.push(value[0]);const length=value.length;for(let i=1;i<length;i++)stream2.push(value[i])}else stream2.emit(\"end\"),stream2.pause()}catch(err){stream2.destroy(err)}}return stream._read=function(size){internalRead(this)},stream.on(\"resume\",()=>{ref(),stream._undestroy()}),stream._readableState.reading=!1,stream.on(\"pause\",()=>{process.nextTick(()=>{if(!stream.readableFlowing)stream._readableState.reading=!1})}),stream.on(\"close\",()=>{process.nextTick(()=>{stream.destroy(),unref()})}),stream})\n";
#define DEFINE_BUILTIN_GENERATOR(codeName, functionName, overriddenName, argumentCount) \
JSC::FunctionExecutable* codeName##Generator(JSC::VM& vm) \