Move streams to it's own file

This commit is contained in:
Jarred Sumner
2022-06-03 04:44:11 -07:00
parent 102553dca6
commit e5322eb63b
7 changed files with 1506 additions and 1157 deletions

View File

@@ -3,20 +3,16 @@ pub const Environment = @import("env.zig");
pub const use_mimalloc = !Environment.isTest;
/// For sizes less than 8 MB, allocate via mimalloc
pub const default_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else
@import("./memory_allocator.zig").c_allocator;
/// For sizes larger than 8 MB, allocate via mmap() instead of malloc().
pub const huge_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else
@import("./memory_allocator.zig").huge_allocator;
/// For sizes larger than 8 MB, allocate via mmap() instead of malloc().
/// For sizes less than 8 MB, allocate via mimalloc
pub const auto_allocator: std.mem.Allocator = if (!use_mimalloc)
std.heap.c_allocator
else

View File

@@ -1359,7 +1359,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
}
pub fn onRequestData(this: *RequestContext) void {
pub fn onPull(this: *RequestContext) void {
if (this.req.header("content-length")) |content_length| {
const len = std.fmt.parseInt(usize, content_length, 10) catch 0;
if (len == 0) {
@@ -1396,8 +1396,8 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.resp.onData(*RequestContext, onBodyChunk, this);
}
pub fn onRequestDataCallback(this: *anyopaque) void {
onRequestData(bun.cast(*RequestContext, this));
pub fn onPullCallback(this: *anyopaque) void {
onPull(bun.cast(*RequestContext, this));
}
};
}
@@ -1700,7 +1700,7 @@ pub fn NewServer(comptime ssl_enabled_: bool, comptime debug_mode_: bool) type {
.Locked = .{
.task = ctx,
.global = this.globalThis,
.onRequestData = RequestContext.onRequestDataCallback,
.onPull = RequestContext.onPullCallback,
},
},
};

View File

@@ -88,6 +88,165 @@ function initializeReadableStream(underlyingSource, strategy)
return this;
}
@globalPrivate
function readableStreamToArray(stream) {
"use strict";
if (@getByIdDirectPrivate(stream, "state") === @streamClosed) {
return null;
}
var reader = stream.getReader();
var manyResult = reader.readMany();
var processManyResult = (0, (async function(manyResult) {
if (result.done) {
return null;
}
var chunks = result.value;
while (true) {
var thisResult = await reader.read();
if (thisResult.done) {
return chunks;
}
chunks.push(thisResult.value);
}
return chunks;
}));
if (manyResult && @isPromise(manyResult)) {
return manyResult.then(processManyResult);
}
if (manyResult && manyResult.done) {
return null;
}
return processManyResult(manyResult);
}
@globalPrivate
function consumeReadableStream(nativePtr, nativeType, inputStream) {
"use strict";
const symbol = Symbol.for("Bun.consumeReadableStreamPrototype");
var cached = globalThis[symbol];
if (!cached) {
cached = globalThis[symbol] = [];
}
var Prototype = cached[nativeType];
if (Prototype === @undefined) {
var [doRead, doError, doReadMany, doClose, onClose, deinit] = globalThis[Symbol.for("Bun.lazy")](nativeType);
Prototype = class NativeReadableStreamSink {
constructor(reader, ptr) {
this.#ptr = ptr;
this.#reader = reader;
this.#didClose = false;
this.handleError = this._handleError.bind(this);
this.handleClosed = this._handleClosed.bind(this);
this.processResult = this._processResult.bind(this);
reader.closed.then(this.handleClosed, this.handleError);
}
handleError;
handleClosed;
_handleClosed() {
if (this.#didClose) return;
this.#didClose = true;
var ptr = this.#ptr;
this.#ptr = 0;
doClose(ptr);
deinit(ptr);
}
_handleError(error) {
if (this.#didClose) return;
this.#didClose = true;
var ptr = this.#ptr;
this.#ptr = 0;
doError(ptr, error);
deinit(ptr);
}
#ptr;
#didClose = false;
#reader;
_handleReadMany({value, done, size}) {
if (done) {
this.handleClosed();
return;
}
if (this.#didClose) return;
doReadMany(this.#ptr, value, done, size);
}
read() {
if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
return this.processResult(this.#reader.read());
}
_processResult(result) {
if (result && @isPromise(result)) {
const flags = @getPromiseInternalField(result, @promiseFieldFlags);
if (flags & @promiseStateFulfilled) {
const fulfilledValue = @getPromiseInternalField(result, @promiseFieldReactionsOrResult);
if (fulfilledValue) {
result = fulfilledValue;
}
}
}
if (result && @isPromise(result)) {
result.then(this.processResult, this.handleError);
return null;
}
if (result.done) {
this.handleClosed();
return 0;
} else if (result.value) {
return result.value;
} else {
return -1;
}
}
readMany() {
if (!this.#ptr) return @throwTypeError("ReadableStreamSink is already closed");
return this.processResult(this.#reader.readMany());
}
};
const minlength = nativeType + 1;
if (cached.length < minlength) {
cached.length = minlength;
}
@putByValDirect(cached, nativeType, Prototype);
}
if (@isReadableStreamLocked(inputStream)) {
@throwTypeError("Cannot start reading from a locked stream");
}
return new Prototype(inputStream.getReader(), nativePtr);
}
@globalPrivate
function createEmptyReadableStream() {
var stream = new @ReadableStream({

View File

@@ -732,7 +732,7 @@ function readableStreamDefaultControllerEnqueue(controller, chunk)
// this is checked by callers
@assert(@readableStreamDefaultControllerCanCloseOrEnqueue(controller));
if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests").isNotEmpty) {
if (@isReadableStreamLocked(stream) && @getByIdDirectPrivate(@getByIdDirectPrivate(stream, "reader"), "readRequests")?.isNotEmpty()) {
@readableStreamFulfillReadRequest(stream, chunk, false);
@readableStreamDefaultControllerCallPullIfNeeded(controller);
return;

View File

@@ -1,5 +1,7 @@
pub usingnamespace @import("./webcore/response.zig");
pub usingnamespace @import("./webcore/encoding.zig");
pub usingnamespace @import("./webcore/streams.zig");
const JSC = @import("../../jsc.zig");
const std = @import("std");

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff