mirror of
https://github.com/oven-sh/bun
synced 2026-02-03 23:48:52 +00:00
fix server end of stream, fix fetch not streaming without content-length or chunked encoding, fix case when stream do not return a promise on pull (#6086)
This commit is contained in:
@@ -439,6 +439,24 @@ public:
|
||||
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
|
||||
}
|
||||
|
||||
/* Write the end of chunked encoded stream */
|
||||
bool sendTerminatingChunk(bool closeConnection = false) {
|
||||
writeStatus(HTTP_200_OK);
|
||||
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
|
||||
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
|
||||
/* Write mark on first call to write */
|
||||
writeMark();
|
||||
|
||||
writeHeader("Transfer-Encoding", "chunked");
|
||||
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
|
||||
}
|
||||
|
||||
/* Terminating 0 chunk */
|
||||
Super::write("\r\n0\r\n\r\n", 7);
|
||||
|
||||
return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
|
||||
}
|
||||
|
||||
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
|
||||
bool write(std::string_view data) {
|
||||
writeStatus(HTTP_200_OK);
|
||||
|
||||
@@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
|
||||
ctxLog("endStream", .{});
|
||||
if (this.resp) |resp| {
|
||||
if (this.flags.is_waiting_for_request_body) {
|
||||
this.flags.is_waiting_for_request_body = false;
|
||||
@@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
this: *RequestContext,
|
||||
headers: *JSC.FetchHeaders,
|
||||
) void {
|
||||
ctxLog("writeHeaders", .{});
|
||||
headers.fastRemove(.ContentLength);
|
||||
headers.fastRemove(.TransferEncoding);
|
||||
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
|
||||
@@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
fn doRenderStream(pair: *StreamPair) void {
|
||||
ctxLog("doRenderStream", .{});
|
||||
var this = pair.this;
|
||||
var stream = pair.stream;
|
||||
if (this.resp == null or this.flags.aborted) {
|
||||
@@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
},
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
// if is not a promise we treat it as Error
|
||||
streamLog("returned an error", .{});
|
||||
if (!this.flags.aborted) resp.clearAborted();
|
||||
response_stream.detach();
|
||||
this.sink = null;
|
||||
response_stream.sink.destroy();
|
||||
return this.handleReject(assignment_result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
defer stream.value.unprotect();
|
||||
response_stream.sink.markDone();
|
||||
this.finalizeForAbort();
|
||||
response_stream.sink.onFirstWrite = null;
|
||||
|
||||
response_stream.sink.finalize();
|
||||
return;
|
||||
@@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
|
||||
this.setAbortHandler();
|
||||
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
|
||||
this.finalize();
|
||||
response_stream.sink.onFirstWrite = null;
|
||||
response_stream.sink.ctx = null;
|
||||
response_stream.detach();
|
||||
stream.cancel(globalThis);
|
||||
response_stream.sink.markDone();
|
||||
this.renderMissing();
|
||||
}
|
||||
|
||||
const streamLog = Output.scoped(.ReadableStream, false);
|
||||
@@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
||||
}
|
||||
|
||||
streamLog("onResolve({any})", .{wrote_anything});
|
||||
|
||||
//aborted so call finalizeForAbort
|
||||
if (req.flags.aborted or req.resp == null) {
|
||||
req.finalizeForAbort();
|
||||
|
||||
@@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
||||
}
|
||||
|
||||
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
|
||||
log("flushFromJSNoWait", .{});
|
||||
if (this.hasBackpressure() or this.done) {
|
||||
return .{ .result = JSValue.jsNumberFromInt32(0) };
|
||||
}
|
||||
|
||||
@@ -1040,14 +1040,14 @@ extern "C"
|
||||
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
|
||||
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
||||
uwsRes->onAborted(nullptr);
|
||||
uwsRes->endWithoutBody(std::nullopt, close_connection);
|
||||
uwsRes->sendTerminatingChunk(close_connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
|
||||
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
||||
uwsRes->onAborted(nullptr);
|
||||
uwsRes->endWithoutBody(std::nullopt, close_connection);
|
||||
uwsRes->sendTerminatingChunk(close_connection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1071,17 +1071,24 @@ pub fn onClose(
|
||||
|
||||
const in_progress = client.state.stage != .done and client.state.stage != .fail;
|
||||
|
||||
// if the peer closed after a full chunk, treat this
|
||||
// as if the transfer had complete, browsers appear to ignore
|
||||
// a missing 0\r\n chunk
|
||||
if (in_progress and client.state.isChunkedEncoding()) {
|
||||
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
||||
var buf = client.state.getBodyBuffer();
|
||||
if (buf.list.items.len > 0) {
|
||||
client.state.received_last_chunk = true;
|
||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
return;
|
||||
if (in_progress) {
|
||||
// if the peer closed after a full chunk, treat this
|
||||
// as if the transfer had complete, browsers appear to ignore
|
||||
// a missing 0\r\n chunk
|
||||
if (client.state.isChunkedEncoding()) {
|
||||
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
||||
var buf = client.state.getBodyBuffer();
|
||||
if (buf.list.items.len > 0) {
|
||||
client.state.received_last_chunk = true;
|
||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if (client.state.content_length == null and client.state.response_stage == .body) {
|
||||
// no content length informed so we are done here
|
||||
client.state.received_last_chunk = true;
|
||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1121,12 +1128,31 @@ pub fn onConnectError(
|
||||
pub fn onEnd(
|
||||
client: *HTTPClient,
|
||||
comptime is_ssl: bool,
|
||||
_: NewHTTPContext(is_ssl).HTTPSocket,
|
||||
socket: NewHTTPContext(is_ssl).HTTPSocket,
|
||||
) void {
|
||||
log("onEnd {s}\n", .{client.url.href});
|
||||
|
||||
if (client.state.stage != .done and client.state.stage != .fail)
|
||||
client.fail(error.ConnectionClosed);
|
||||
const in_progress = client.state.stage != .done and client.state.stage != .fail;
|
||||
if (in_progress) {
|
||||
// if the peer closed after a full chunk, treat this
|
||||
// as if the transfer had complete, browsers appear to ignore
|
||||
// a missing 0\r\n chunk
|
||||
if (client.state.isChunkedEncoding()) {
|
||||
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
||||
var buf = client.state.getBodyBuffer();
|
||||
if (buf.list.items.len > 0) {
|
||||
client.state.received_last_chunk = true;
|
||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if (client.state.content_length == null and client.state.response_stage == .body) {
|
||||
// no content length informed so we are done here
|
||||
client.state.received_last_chunk = true;
|
||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||
return;
|
||||
}
|
||||
}
|
||||
client.fail(error.ConnectionClosed);
|
||||
}
|
||||
|
||||
pub inline fn getAllocator() std.mem.Allocator {
|
||||
@@ -1369,8 +1395,8 @@ pub const InternalState = struct {
|
||||
return this.total_body_received >= content_length;
|
||||
}
|
||||
|
||||
// TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
|
||||
return true;
|
||||
// Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
|
||||
return this.received_last_chunk;
|
||||
}
|
||||
|
||||
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
|
||||
@@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
|
||||
}
|
||||
|
||||
if (!can_continue) {
|
||||
log("onData: can_continue is false", .{});
|
||||
// this means that the request ended
|
||||
// clone metadata and return the progress at this point
|
||||
this.cloneMetadata();
|
||||
@@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256;
|
||||
|
||||
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
|
||||
std.debug.assert(this.state.transfer_encoding == .identity);
|
||||
const content_length = this.state.content_length orelse 0;
|
||||
const content_length = this.state.content_length;
|
||||
// is it exactly as much as we need?
|
||||
if (is_only_buffer and incoming_data.len >= content_length) {
|
||||
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]);
|
||||
if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) {
|
||||
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]);
|
||||
return true;
|
||||
} else {
|
||||
return handleResponseBodyFromMultiplePackets(this, incoming_data);
|
||||
@@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
|
||||
|
||||
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
|
||||
var buffer = this.state.getBodyBuffer();
|
||||
const content_length = this.state.content_length orelse 0;
|
||||
const content_length = this.state.content_length;
|
||||
|
||||
if (buffer.list.items.len == 0 and
|
||||
content_length > 0 and incoming_data.len < preallocate_max)
|
||||
{
|
||||
if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) {
|
||||
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
|
||||
}
|
||||
|
||||
const remaining_content_length = content_length -| this.state.total_body_received;
|
||||
var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
|
||||
var remainder: []const u8 = undefined;
|
||||
if (content_length != null) {
|
||||
const remaining_content_length = content_length.? -| this.state.total_body_received;
|
||||
remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
|
||||
} else {
|
||||
remainder = incoming_data;
|
||||
}
|
||||
|
||||
_ = try buffer.write(remainder);
|
||||
|
||||
@@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
|
||||
}
|
||||
|
||||
// done or streaming
|
||||
const is_done = this.state.total_body_received >= content_length;
|
||||
const is_done = content_length != null and this.state.total_body_received >= content_length.?;
|
||||
if (is_done or this.signals.get(.body_streaming)) {
|
||||
const processed = try this.state.processBodyBuffer(buffer.*);
|
||||
|
||||
@@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata(
|
||||
}
|
||||
|
||||
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
|
||||
const content_length = this.state.content_length orelse 0;
|
||||
const content_length = this.state.content_length;
|
||||
if (content_length) |length| {
|
||||
log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding });
|
||||
} else {
|
||||
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
|
||||
}
|
||||
// if no body is expected we should stop processing
|
||||
return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked);
|
||||
return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
|
||||
}
|
||||
|
||||
@@ -13,6 +13,35 @@ var port = 0;
|
||||
];
|
||||
const useRequestObjectValues = [true, false];
|
||||
|
||||
test("Should not crash when not returning a promise when stream is in progress", async () => {
|
||||
var called = false;
|
||||
await runInServer(
|
||||
{
|
||||
async fetch() {
|
||||
var stream = new ReadableStream({
|
||||
type: "direct",
|
||||
pull(controller) {
|
||||
controller.write("hey");
|
||||
setTimeout(() => {
|
||||
controller.end();
|
||||
}, 100);
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream);
|
||||
},
|
||||
},
|
||||
async url => {
|
||||
called = true;
|
||||
expect(await fetch(url).then(res => res.text())).toContain(
|
||||
"Welcome to Bun! To get started, return a Response object.",
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
expect(called).toBe(true);
|
||||
});
|
||||
|
||||
for (let RequestPrototypeMixin of BodyMixin) {
|
||||
for (let useRequestObject of useRequestObjectValues) {
|
||||
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${
|
||||
|
||||
Reference in New Issue
Block a user