fix server end of stream, fix fetch not streaming without content-length or chunked encoding, fix case when stream do not return a promise on pull (#6086)

This commit is contained in:
Ciro Spaciari
2023-09-26 23:31:20 -03:00
committed by GitHub
parent dc55492698
commit 648d5aecf3
6 changed files with 131 additions and 32 deletions

View File

@@ -439,6 +439,24 @@ public:
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
}
/* Write the end of chunked encoded stream */
bool sendTerminatingChunk(bool closeConnection = false) {
writeStatus(HTTP_200_OK);
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);
return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
}
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
bool write(std::string_view data) {
writeStatus(HTTP_200_OK);

View File

@@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
ctxLog("endStream", .{});
if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false;
@@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this: *RequestContext,
headers: *JSC.FetchHeaders,
) void {
ctxLog("writeHeaders", .{});
headers.fastRemove(.ContentLength);
headers.fastRemove(.TransferEncoding);
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
@@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
fn doRenderStream(pair: *StreamPair) void {
ctxLog("doRenderStream", .{});
var this = pair.this;
var stream = pair.stream;
if (this.resp == null or this.flags.aborted) {
@@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
},
}
return;
} else {
// if is not a promise we treat it as Error
streamLog("returned an error", .{});
if (!this.flags.aborted) resp.clearAborted();
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
return this.handleReject(assignment_result);
}
}
@@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
defer stream.value.unprotect();
response_stream.sink.markDone();
this.finalizeForAbort();
response_stream.sink.onFirstWrite = null;
response_stream.sink.finalize();
return;
@@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize();
response_stream.sink.onFirstWrite = null;
response_stream.sink.ctx = null;
response_stream.detach();
stream.cancel(globalThis);
response_stream.sink.markDone();
this.renderMissing();
}
const streamLog = Output.scoped(.ReadableStream, false);
@@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}
streamLog("onResolve({any})", .{wrote_anything});
//aborted so call finalizeForAbort
if (req.flags.aborted or req.resp == null) {
req.finalizeForAbort();

View File

@@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
}
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
log("flushFromJSNoWait", .{});
if (this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) };
}

View File

@@ -1040,14 +1040,14 @@ extern "C"
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr);
uwsRes->endWithoutBody(std::nullopt, close_connection);
uwsRes->sendTerminatingChunk(close_connection);
}
else
{
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr);
uwsRes->endWithoutBody(std::nullopt, close_connection);
uwsRes->sendTerminatingChunk(close_connection);
}
}

View File

@@ -1071,17 +1071,24 @@ pub fn onClose(
const in_progress = client.state.stage != .done and client.state.stage != .fail;
// if the peer closed after a full chunk, treat this
// as if the transfer had complete, browsers appear to ignore
// a missing 0\r\n chunk
if (in_progress and client.state.isChunkedEncoding()) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
var buf = client.state.getBodyBuffer();
if (buf.list.items.len > 0) {
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
if (in_progress) {
// if the peer closed after a full chunk, treat this
// as if the transfer had complete, browsers appear to ignore
// a missing 0\r\n chunk
if (client.state.isChunkedEncoding()) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
var buf = client.state.getBodyBuffer();
if (buf.list.items.len > 0) {
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
} else if (client.state.content_length == null and client.state.response_stage == .body) {
// no content length informed so we are done here
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
@@ -1121,12 +1128,31 @@ pub fn onConnectError(
pub fn onEnd(
client: *HTTPClient,
comptime is_ssl: bool,
_: NewHTTPContext(is_ssl).HTTPSocket,
socket: NewHTTPContext(is_ssl).HTTPSocket,
) void {
log("onEnd {s}\n", .{client.url.href});
if (client.state.stage != .done and client.state.stage != .fail)
client.fail(error.ConnectionClosed);
const in_progress = client.state.stage != .done and client.state.stage != .fail;
if (in_progress) {
// if the peer closed after a full chunk, treat this
// as if the transfer had complete, browsers appear to ignore
// a missing 0\r\n chunk
if (client.state.isChunkedEncoding()) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
var buf = client.state.getBodyBuffer();
if (buf.list.items.len > 0) {
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
} else if (client.state.content_length == null and client.state.response_stage == .body) {
// no content length informed so we are done here
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
client.fail(error.ConnectionClosed);
}
pub inline fn getAllocator() std.mem.Allocator {
@@ -1369,8 +1395,8 @@ pub const InternalState = struct {
return this.total_body_received >= content_length;
}
// TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
return true;
// Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
return this.received_last_chunk;
}
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
@@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}
if (!can_continue) {
log("onData: can_continue is false", .{});
// this means that the request ended
// clone metadata and return the progress at this point
this.cloneMetadata();
@@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256;
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
std.debug.assert(this.state.transfer_encoding == .identity);
const content_length = this.state.content_length orelse 0;
const content_length = this.state.content_length;
// is it exactly as much as we need?
if (is_only_buffer and incoming_data.len >= content_length) {
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]);
if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) {
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]);
return true;
} else {
return handleResponseBodyFromMultiplePackets(this, incoming_data);
@@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer();
const content_length = this.state.content_length orelse 0;
const content_length = this.state.content_length;
if (buffer.list.items.len == 0 and
content_length > 0 and incoming_data.len < preallocate_max)
{
if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) {
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
}
const remaining_content_length = content_length -| this.state.total_body_received;
var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
var remainder: []const u8 = undefined;
if (content_length != null) {
const remaining_content_length = content_length.? -| this.state.total_body_received;
remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
} else {
remainder = incoming_data;
}
_ = try buffer.write(remainder);
@@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
}
// done or streaming
const is_done = this.state.total_body_received >= content_length;
const is_done = content_length != null and this.state.total_body_received >= content_length.?;
if (is_done or this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer.*);
@@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata(
}
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
const content_length = this.state.content_length orelse 0;
const content_length = this.state.content_length;
if (content_length) |length| {
log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding });
} else {
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
}
// if no body is expected we should stop processing
return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked);
return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
}

View File

@@ -13,6 +13,35 @@ var port = 0;
];
const useRequestObjectValues = [true, false];
test("Should not crash when not returning a promise when stream is in progress", async () => {
var called = false;
await runInServer(
{
async fetch() {
var stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hey");
setTimeout(() => {
controller.end();
}, 100);
},
});
return new Response(stream);
},
},
async url => {
called = true;
expect(await fetch(url).then(res => res.text())).toContain(
"Welcome to Bun! To get started, return a Response object.",
);
},
);
expect(called).toBe(true);
});
for (let RequestPrototypeMixin of BodyMixin) {
for (let useRequestObject of useRequestObjectValues) {
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${