mirror of
https://github.com/oven-sh/bun
synced 2026-02-09 10:28:47 +00:00
[streams] speed up Readable in some cases (#1708)
If `encoding` is set, no `Buffer`s would be exposed thus `Uint8Array` can be used directly. - fix data corruption in `BufferList.concat()` - fix segfaults in `BufferList.join()`
This commit is contained in:
@@ -53,10 +53,17 @@ JSC::JSValue JSBufferList::concat(JSC::VM& vm, JSC::JSGlobalObject* lexicalGloba
|
||||
size_t i = 0;
|
||||
for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
|
||||
auto array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
|
||||
if (!array)
|
||||
continue;
|
||||
size_t length = array->byteLength();
|
||||
uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable);
|
||||
if (UNLIKELY(!array)) {
|
||||
return throwTypeError(lexicalGlobalObject, throwScope, "concat can only be called when all buffers are Uint8Array"_s);
|
||||
}
|
||||
const size_t length = array->byteLength();
|
||||
if (UNLIKELY(i + length > n)) {
|
||||
return throwRangeError(lexicalGlobalObject, throwScope, "specified size too small to fit all buffers"_s);
|
||||
}
|
||||
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, i, array, 0, length, JSC::CopyType::Unobservable))) {
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
}
|
||||
i += length;
|
||||
}
|
||||
|
||||
RELEASE_AND_RETURN(throwScope, uint8Array);
|
||||
@@ -68,16 +75,18 @@ JSC::JSValue JSBufferList::join(JSC::VM& vm, JSC::JSGlobalObject* lexicalGlobalO
|
||||
if (length() == 0) {
|
||||
RELEASE_AND_RETURN(throwScope, JSC::jsEmptyString(vm));
|
||||
}
|
||||
bool needSeq = false;
|
||||
const bool needSeq = seq->length() != 0;
|
||||
const auto end = m_deque.end();
|
||||
JSRopeString::RopeBuilder<RecordOverflow> ropeBuilder(vm);
|
||||
for (auto iter = m_deque.begin(); iter != m_deque.end(); ++iter) {
|
||||
auto str = JSC::jsCast<JSC::JSString*>(iter->get());
|
||||
for (auto iter = m_deque.begin(); ;) {
|
||||
auto str = iter->get().toString(lexicalGlobalObject);
|
||||
if (!ropeBuilder.append(str))
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
if (++iter == end)
|
||||
break;
|
||||
if (needSeq)
|
||||
if (!ropeBuilder.append(seq))
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
if (!ropeBuilder.append(str))
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
needSeq = seq->length() != 0;
|
||||
}
|
||||
RELEASE_AND_RETURN(throwScope, ropeBuilder.release());
|
||||
}
|
||||
@@ -142,11 +151,13 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
|
||||
for (auto iter = m_deque.begin(); iter != m_deque.end() && n > 0; ++iter) {
|
||||
JSC::JSUint8Array* array = JSC::jsDynamicCast<JSC::JSUint8Array*>(iter->get());
|
||||
if (UNLIKELY(!array)) {
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
|
||||
return throwTypeError(lexicalGlobalObject, throwScope, "_getBuffer can only be called when all buffers are Uint8Array"_s);
|
||||
}
|
||||
size_t length = array->byteLength();
|
||||
if (length > n) {
|
||||
uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable);
|
||||
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, n, JSC::CopyType::Unobservable))) {
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
}
|
||||
// create a new array of size length - n.
|
||||
// is there a faster way to do this?
|
||||
auto arrayBuffer = JSC::ArrayBuffer::tryCreateUninitialized(length - n, 1);
|
||||
@@ -160,7 +171,9 @@ JSC::JSValue JSBufferList::_getBuffer(JSC::VM& vm, JSC::JSGlobalObject* lexicalG
|
||||
memcpy(newArray->typedVector(), array->typedVector() + n, length - n);
|
||||
iter->set(vm, this, newArray);
|
||||
} else {
|
||||
uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable);
|
||||
if (UNLIKELY(!uint8Array->setFromTypedArray(lexicalGlobalObject, offset, array, 0, length, JSC::CopyType::Unobservable))) {
|
||||
return throwOutOfMemoryError(lexicalGlobalObject, throwScope);
|
||||
}
|
||||
m_deque.removeFirst();
|
||||
}
|
||||
n -= static_cast<int32_t>(length);
|
||||
|
||||
@@ -2949,7 +2949,9 @@ var require_readable = __commonJS({
|
||||
} else if (chunk instanceof Buffer) {
|
||||
encoding = "";
|
||||
} else if (Stream._isUint8Array(chunk)) {
|
||||
chunk = Stream._uint8ArrayToBuffer(chunk);
|
||||
if (addToFront || !state.decoder) {
|
||||
chunk = Stream._uint8ArrayToBuffer(chunk);
|
||||
}
|
||||
encoding = "";
|
||||
} else if (chunk != null) {
|
||||
err = new ERR_INVALID_ARG_TYPE(
|
||||
|
||||
111
test/bun.js/node-stream-uint8array.test.ts
Normal file
111
test/bun.js/node-stream-uint8array.test.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
import { beforeEach, describe, expect, it } from "bun:test";
|
||||
import { Readable, Writable } from "stream";
|
||||
|
||||
const ABC = new Uint8Array([0x41, 0x42, 0x43]);
|
||||
const DEF = new Uint8Array([0x44, 0x45, 0x46]);
|
||||
const GHI = new Uint8Array([0x47, 0x48, 0x49]);
|
||||
|
||||
describe("Writable", () => {
|
||||
let called;
|
||||
|
||||
function logCall(fn, id) {
|
||||
return function() {
|
||||
called[id] = (called[id] || 0) + 1;
|
||||
return fn.apply(this, arguments);
|
||||
};
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
called = [];
|
||||
});
|
||||
|
||||
it("should perform simple operations", () => {
|
||||
let n = 0;
|
||||
const writable = new Writable({
|
||||
write: logCall((chunk, encoding, cb) => {
|
||||
expect(chunk instanceof Buffer).toBe(true);
|
||||
if (n++ === 0) {
|
||||
expect(String(chunk)).toBe("ABC");
|
||||
} else {
|
||||
expect(String(chunk)).toBe("DEF");
|
||||
}
|
||||
|
||||
cb();
|
||||
}, 0),
|
||||
});
|
||||
|
||||
writable.write(ABC);
|
||||
writable.end(DEF);
|
||||
expect(called).toEqual([ 2 ]);
|
||||
});
|
||||
|
||||
it("should pass in Uint8Array in object mode", () => {
|
||||
const writable = new Writable({
|
||||
objectMode: true,
|
||||
write: logCall((chunk, encoding, cb) => {
|
||||
expect(chunk instanceof Buffer).toBe(false);
|
||||
expect(chunk instanceof Uint8Array).toBe(true);
|
||||
expect(chunk).toStrictEqual(ABC);
|
||||
expect(encoding).toBe("utf8");
|
||||
cb();
|
||||
}, 0),
|
||||
});
|
||||
|
||||
writable.end(ABC);
|
||||
expect(called).toEqual([ 1 ]);
|
||||
});
|
||||
|
||||
it("should handle multiple writes carried out via writev()", () => {
|
||||
let callback;
|
||||
|
||||
const writable = new Writable({
|
||||
write: logCall((chunk, encoding, cb) => {
|
||||
expect(chunk instanceof Buffer).toBe(true);
|
||||
expect(encoding).toBe("buffer");
|
||||
expect(String(chunk)).toBe("ABC");
|
||||
callback = cb;
|
||||
}, 0),
|
||||
writev: logCall((chunks, cb) => {
|
||||
expect(chunks.length).toBe(2);
|
||||
expect(chunks[0].encoding).toBe("buffer");
|
||||
expect(chunks[1].encoding).toBe("buffer");
|
||||
expect(chunks[0].chunk + chunks[1].chunk).toBe("DEFGHI");
|
||||
}, 1),
|
||||
});
|
||||
|
||||
writable.write(ABC);
|
||||
writable.write(DEF);
|
||||
writable.end(GHI);
|
||||
callback();
|
||||
expect(called).toEqual([ 1, 1 ]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Readable", () => {
|
||||
it("should perform simple operations", () => {
|
||||
const readable = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
readable.push(DEF);
|
||||
readable.unshift(ABC);
|
||||
|
||||
const buf = readable.read();
|
||||
expect(buf instanceof Buffer).toBe(true);
|
||||
expect([ ...buf ]).toEqual([ ...ABC, ...DEF ]);
|
||||
});
|
||||
|
||||
it("should work with setEncoding()", () => {
|
||||
const readable = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
readable.setEncoding("utf8");
|
||||
|
||||
readable.push(DEF);
|
||||
readable.unshift(ABC);
|
||||
|
||||
const out = readable.read();
|
||||
expect(out).toBe("ABCDEF");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user