fix server end of stream, fix fetch not streaming without content-length or chunked encoding, fix case when stream do not return a promise on pull (#6086)

This commit is contained in:
Ciro Spaciari
2023-09-26 23:31:20 -03:00
committed by GitHub
parent dc55492698
commit 648d5aecf3
6 changed files with 131 additions and 32 deletions

View File

@@ -439,6 +439,24 @@ public:
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()}; return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
} }
/* Write the end of chunked encoded stream */
bool sendTerminatingChunk(bool closeConnection = false) {
writeStatus(HTTP_200_OK);
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);
return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
}
/* Write parts of the response in chunking fashion. Starts timeout if failed. */ /* Write parts of the response in chunking fashion. Starts timeout if failed. */
bool write(std::string_view data) { bool write(std::string_view data) {
writeStatus(HTTP_200_OK); writeStatus(HTTP_200_OK);

View File

@@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
} }
pub fn endStream(this: *RequestContext, closeConnection: bool) void { pub fn endStream(this: *RequestContext, closeConnection: bool) void {
ctxLog("endStream", .{});
if (this.resp) |resp| { if (this.resp) |resp| {
if (this.flags.is_waiting_for_request_body) { if (this.flags.is_waiting_for_request_body) {
this.flags.is_waiting_for_request_body = false; this.flags.is_waiting_for_request_body = false;
@@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this: *RequestContext, this: *RequestContext,
headers: *JSC.FetchHeaders, headers: *JSC.FetchHeaders,
) void { ) void {
ctxLog("writeHeaders", .{});
headers.fastRemove(.ContentLength); headers.fastRemove(.ContentLength);
headers.fastRemove(.TransferEncoding); headers.fastRemove(.TransferEncoding);
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity); if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
@@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
} }
fn doRenderStream(pair: *StreamPair) void { fn doRenderStream(pair: *StreamPair) void {
ctxLog("doRenderStream", .{});
var this = pair.this; var this = pair.this;
var stream = pair.stream; var stream = pair.stream;
if (this.resp == null or this.flags.aborted) { if (this.resp == null or this.flags.aborted) {
@@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
}, },
} }
return; return;
} else {
// if is not a promise we treat it as Error
streamLog("returned an error", .{});
if (!this.flags.aborted) resp.clearAborted();
response_stream.detach();
this.sink = null;
response_stream.sink.destroy();
return this.handleReject(assignment_result);
} }
} }
@@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
defer stream.value.unprotect(); defer stream.value.unprotect();
response_stream.sink.markDone(); response_stream.sink.markDone();
this.finalizeForAbort(); this.finalizeForAbort();
response_stream.sink.onFirstWrite = null;
response_stream.sink.finalize(); response_stream.sink.finalize();
return; return;
@@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
this.setAbortHandler(); this.setAbortHandler();
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{}); streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
this.finalize(); response_stream.sink.onFirstWrite = null;
response_stream.sink.ctx = null;
response_stream.detach();
stream.cancel(globalThis);
response_stream.sink.markDone();
this.renderMissing();
} }
const streamLog = Output.scoped(.ReadableStream, false); const streamLog = Output.scoped(.ReadableStream, false);
@@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
} }
streamLog("onResolve({any})", .{wrote_anything}); streamLog("onResolve({any})", .{wrote_anything});
//aborted so call finalizeForAbort //aborted so call finalizeForAbort
if (req.flags.aborted or req.resp == null) { if (req.flags.aborted or req.resp == null) {
req.finalizeForAbort(); req.finalizeForAbort();

View File

@@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
} }
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) { fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
log("flushFromJSNoWait", .{});
if (this.hasBackpressure() or this.done) { if (this.hasBackpressure() or this.done) {
return .{ .result = JSValue.jsNumberFromInt32(0) }; return .{ .result = JSValue.jsNumberFromInt32(0) };
} }

View File

@@ -1040,14 +1040,14 @@ extern "C"
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res; uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr; uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr); uwsRes->onAborted(nullptr);
uwsRes->endWithoutBody(std::nullopt, close_connection); uwsRes->sendTerminatingChunk(close_connection);
} }
else else
{ {
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res; uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
uwsRes->getHttpResponseData()->onWritable = nullptr; uwsRes->getHttpResponseData()->onWritable = nullptr;
uwsRes->onAborted(nullptr); uwsRes->onAborted(nullptr);
uwsRes->endWithoutBody(std::nullopt, close_connection); uwsRes->sendTerminatingChunk(close_connection);
} }
} }

View File

@@ -1071,17 +1071,24 @@ pub fn onClose(
const in_progress = client.state.stage != .done and client.state.stage != .fail; const in_progress = client.state.stage != .done and client.state.stage != .fail;
// if the peer closed after a full chunk, treat this if (in_progress) {
// as if the transfer had complete, browsers appear to ignore // if the peer closed after a full chunk, treat this
// a missing 0\r\n chunk // as if the transfer had complete, browsers appear to ignore
if (in_progress and client.state.isChunkedEncoding()) { // a missing 0\r\n chunk
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) { if (client.state.isChunkedEncoding()) {
var buf = client.state.getBodyBuffer(); if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
if (buf.list.items.len > 0) { var buf = client.state.getBodyBuffer();
client.state.received_last_chunk = true; if (buf.list.items.len > 0) {
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket); client.state.received_last_chunk = true;
return; client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
} }
} else if (client.state.content_length == null and client.state.response_stage == .body) {
// no content length informed so we are done here
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
} }
} }
@@ -1121,12 +1128,31 @@ pub fn onConnectError(
pub fn onEnd( pub fn onEnd(
client: *HTTPClient, client: *HTTPClient,
comptime is_ssl: bool, comptime is_ssl: bool,
_: NewHTTPContext(is_ssl).HTTPSocket, socket: NewHTTPContext(is_ssl).HTTPSocket,
) void { ) void {
log("onEnd {s}\n", .{client.url.href}); log("onEnd {s}\n", .{client.url.href});
const in_progress = client.state.stage != .done and client.state.stage != .fail;
if (client.state.stage != .done and client.state.stage != .fail) if (in_progress) {
client.fail(error.ConnectionClosed); // if the peer closed after a full chunk, treat this
// as if the transfer had complete, browsers appear to ignore
// a missing 0\r\n chunk
if (client.state.isChunkedEncoding()) {
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
var buf = client.state.getBodyBuffer();
if (buf.list.items.len > 0) {
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
} else if (client.state.content_length == null and client.state.response_stage == .body) {
// no content length informed so we are done here
client.state.received_last_chunk = true;
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
return;
}
}
client.fail(error.ConnectionClosed);
} }
pub inline fn getAllocator() std.mem.Allocator { pub inline fn getAllocator() std.mem.Allocator {
@@ -1369,8 +1395,8 @@ pub const InternalState = struct {
return this.total_body_received >= content_length; return this.total_body_received >= content_length;
} }
// TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection // Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
return true; return this.received_last_chunk;
} }
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void { fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
@@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
} }
if (!can_continue) { if (!can_continue) {
log("onData: can_continue is false", .{});
// this means that the request ended // this means that the request ended
// clone metadata and return the progress at this point // clone metadata and return the progress at this point
this.cloneMetadata(); this.cloneMetadata();
@@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256;
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool { pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
std.debug.assert(this.state.transfer_encoding == .identity); std.debug.assert(this.state.transfer_encoding == .identity);
const content_length = this.state.content_length orelse 0; const content_length = this.state.content_length;
// is it exactly as much as we need? // is it exactly as much as we need?
if (is_only_buffer and incoming_data.len >= content_length) { if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) {
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]); try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]);
return true; return true;
} else { } else {
return handleResponseBodyFromMultiplePackets(this, incoming_data); return handleResponseBodyFromMultiplePackets(this, incoming_data);
@@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool { fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
var buffer = this.state.getBodyBuffer(); var buffer = this.state.getBodyBuffer();
const content_length = this.state.content_length orelse 0; const content_length = this.state.content_length;
if (buffer.list.items.len == 0 and if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) {
content_length > 0 and incoming_data.len < preallocate_max)
{
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {}; buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
} }
const remaining_content_length = content_length -| this.state.total_body_received; var remainder: []const u8 = undefined;
var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)]; if (content_length != null) {
const remaining_content_length = content_length.? -| this.state.total_body_received;
remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
} else {
remainder = incoming_data;
}
_ = try buffer.write(remainder); _ = try buffer.write(remainder);
@@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
} }
// done or streaming // done or streaming
const is_done = this.state.total_body_received >= content_length; const is_done = content_length != null and this.state.total_body_received >= content_length.?;
if (is_done or this.signals.get(.body_streaming)) { if (is_done or this.signals.get(.body_streaming)) {
const processed = try this.state.processBodyBuffer(buffer.*); const processed = try this.state.processBodyBuffer(buffer.*);
@@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata(
} }
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body; this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
const content_length = this.state.content_length orelse 0; const content_length = this.state.content_length;
if (content_length) |length| {
log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding });
} else {
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
}
// if no body is expected we should stop processing // if no body is expected we should stop processing
return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked); return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
} }

View File

@@ -13,6 +13,35 @@ var port = 0;
]; ];
const useRequestObjectValues = [true, false]; const useRequestObjectValues = [true, false];
test("Should not crash when not returning a promise when stream is in progress", async () => {
var called = false;
await runInServer(
{
async fetch() {
var stream = new ReadableStream({
type: "direct",
pull(controller) {
controller.write("hey");
setTimeout(() => {
controller.end();
}, 100);
},
});
return new Response(stream);
},
},
async url => {
called = true;
expect(await fetch(url).then(res => res.text())).toContain(
"Welcome to Bun! To get started, return a Response object.",
);
},
);
expect(called).toBe(true);
});
for (let RequestPrototypeMixin of BodyMixin) { for (let RequestPrototypeMixin of BodyMixin) {
for (let useRequestObject of useRequestObjectValues) { for (let useRequestObject of useRequestObjectValues) {
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${ describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${