mirror of
https://github.com/oven-sh/bun
synced 2026-02-16 05:42:43 +00:00
fix server end of stream, fix fetch not streaming without content-length or chunked encoding, fix case when stream do not return a promise on pull (#6086)
This commit is contained in:
@@ -439,6 +439,24 @@ public:
|
|||||||
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
|
return {internalEnd(data, totalSize, true, true, closeConnection), hasResponded()};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Write the end of chunked encoded stream */
|
||||||
|
bool sendTerminatingChunk(bool closeConnection = false) {
|
||||||
|
writeStatus(HTTP_200_OK);
|
||||||
|
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
|
||||||
|
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
|
||||||
|
/* Write mark on first call to write */
|
||||||
|
writeMark();
|
||||||
|
|
||||||
|
writeHeader("Transfer-Encoding", "chunked");
|
||||||
|
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Terminating 0 chunk */
|
||||||
|
Super::write("\r\n0\r\n\r\n", 7);
|
||||||
|
|
||||||
|
return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
|
||||||
|
}
|
||||||
|
|
||||||
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
|
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
|
||||||
bool write(std::string_view data) {
|
bool write(std::string_view data) {
|
||||||
writeStatus(HTTP_200_OK);
|
writeStatus(HTTP_200_OK);
|
||||||
|
|||||||
@@ -1443,6 +1443,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
|
pub fn endStream(this: *RequestContext, closeConnection: bool) void {
|
||||||
|
ctxLog("endStream", .{});
|
||||||
if (this.resp) |resp| {
|
if (this.resp) |resp| {
|
||||||
if (this.flags.is_waiting_for_request_body) {
|
if (this.flags.is_waiting_for_request_body) {
|
||||||
this.flags.is_waiting_for_request_body = false;
|
this.flags.is_waiting_for_request_body = false;
|
||||||
@@ -1729,6 +1730,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
this: *RequestContext,
|
this: *RequestContext,
|
||||||
headers: *JSC.FetchHeaders,
|
headers: *JSC.FetchHeaders,
|
||||||
) void {
|
) void {
|
||||||
|
ctxLog("writeHeaders", .{});
|
||||||
headers.fastRemove(.ContentLength);
|
headers.fastRemove(.ContentLength);
|
||||||
headers.fastRemove(.TransferEncoding);
|
headers.fastRemove(.TransferEncoding);
|
||||||
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
|
if (!ssl_enabled) headers.fastRemove(.StrictTransportSecurity);
|
||||||
@@ -2100,6 +2102,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn doRenderStream(pair: *StreamPair) void {
|
fn doRenderStream(pair: *StreamPair) void {
|
||||||
|
ctxLog("doRenderStream", .{});
|
||||||
var this = pair.this;
|
var this = pair.this;
|
||||||
var stream = pair.stream;
|
var stream = pair.stream;
|
||||||
if (this.resp == null or this.flags.aborted) {
|
if (this.resp == null or this.flags.aborted) {
|
||||||
@@ -2223,6 +2226,14 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
} else {
|
||||||
|
// if is not a promise we treat it as Error
|
||||||
|
streamLog("returned an error", .{});
|
||||||
|
if (!this.flags.aborted) resp.clearAborted();
|
||||||
|
response_stream.detach();
|
||||||
|
this.sink = null;
|
||||||
|
response_stream.sink.destroy();
|
||||||
|
return this.handleReject(assignment_result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2232,6 +2243,7 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
defer stream.value.unprotect();
|
defer stream.value.unprotect();
|
||||||
response_stream.sink.markDone();
|
response_stream.sink.markDone();
|
||||||
this.finalizeForAbort();
|
this.finalizeForAbort();
|
||||||
|
response_stream.sink.onFirstWrite = null;
|
||||||
|
|
||||||
response_stream.sink.finalize();
|
response_stream.sink.finalize();
|
||||||
return;
|
return;
|
||||||
@@ -2255,7 +2267,12 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
|
|
||||||
this.setAbortHandler();
|
this.setAbortHandler();
|
||||||
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
|
streamLog("is in progress, but did not return a Promise. Finalizing request context", .{});
|
||||||
this.finalize();
|
response_stream.sink.onFirstWrite = null;
|
||||||
|
response_stream.sink.ctx = null;
|
||||||
|
response_stream.detach();
|
||||||
|
stream.cancel(globalThis);
|
||||||
|
response_stream.sink.markDone();
|
||||||
|
this.renderMissing();
|
||||||
}
|
}
|
||||||
|
|
||||||
const streamLog = Output.scoped(.ReadableStream, false);
|
const streamLog = Output.scoped(.ReadableStream, false);
|
||||||
@@ -2455,7 +2472,6 @@ fn NewRequestContext(comptime ssl_enabled: bool, comptime debug_mode: bool, comp
|
|||||||
}
|
}
|
||||||
|
|
||||||
streamLog("onResolve({any})", .{wrote_anything});
|
streamLog("onResolve({any})", .{wrote_anything});
|
||||||
|
|
||||||
//aborted so call finalizeForAbort
|
//aborted so call finalizeForAbort
|
||||||
if (req.flags.aborted or req.resp == null) {
|
if (req.flags.aborted or req.resp == null) {
|
||||||
req.finalizeForAbort();
|
req.finalizeForAbort();
|
||||||
|
|||||||
@@ -2634,6 +2634,7 @@ pub fn HTTPServerWritable(comptime ssl: bool) type {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
|
fn flushFromJSNoWait(this: *@This()) JSC.Node.Maybe(JSValue) {
|
||||||
|
log("flushFromJSNoWait", .{});
|
||||||
if (this.hasBackpressure() or this.done) {
|
if (this.hasBackpressure() or this.done) {
|
||||||
return .{ .result = JSValue.jsNumberFromInt32(0) };
|
return .{ .result = JSValue.jsNumberFromInt32(0) };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1040,14 +1040,14 @@ extern "C"
|
|||||||
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
|
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
|
||||||
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
||||||
uwsRes->onAborted(nullptr);
|
uwsRes->onAborted(nullptr);
|
||||||
uwsRes->endWithoutBody(std::nullopt, close_connection);
|
uwsRes->sendTerminatingChunk(close_connection);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
|
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
|
||||||
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
uwsRes->getHttpResponseData()->onWritable = nullptr;
|
||||||
uwsRes->onAborted(nullptr);
|
uwsRes->onAborted(nullptr);
|
||||||
uwsRes->endWithoutBody(std::nullopt, close_connection);
|
uwsRes->sendTerminatingChunk(close_connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1071,17 +1071,24 @@ pub fn onClose(
|
|||||||
|
|
||||||
const in_progress = client.state.stage != .done and client.state.stage != .fail;
|
const in_progress = client.state.stage != .done and client.state.stage != .fail;
|
||||||
|
|
||||||
// if the peer closed after a full chunk, treat this
|
if (in_progress) {
|
||||||
// as if the transfer had complete, browsers appear to ignore
|
// if the peer closed after a full chunk, treat this
|
||||||
// a missing 0\r\n chunk
|
// as if the transfer had complete, browsers appear to ignore
|
||||||
if (in_progress and client.state.isChunkedEncoding()) {
|
// a missing 0\r\n chunk
|
||||||
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
if (client.state.isChunkedEncoding()) {
|
||||||
var buf = client.state.getBodyBuffer();
|
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
||||||
if (buf.list.items.len > 0) {
|
var buf = client.state.getBodyBuffer();
|
||||||
client.state.received_last_chunk = true;
|
if (buf.list.items.len > 0) {
|
||||||
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
client.state.received_last_chunk = true;
|
||||||
return;
|
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else if (client.state.content_length == null and client.state.response_stage == .body) {
|
||||||
|
// no content length informed so we are done here
|
||||||
|
client.state.received_last_chunk = true;
|
||||||
|
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1121,12 +1128,31 @@ pub fn onConnectError(
|
|||||||
pub fn onEnd(
|
pub fn onEnd(
|
||||||
client: *HTTPClient,
|
client: *HTTPClient,
|
||||||
comptime is_ssl: bool,
|
comptime is_ssl: bool,
|
||||||
_: NewHTTPContext(is_ssl).HTTPSocket,
|
socket: NewHTTPContext(is_ssl).HTTPSocket,
|
||||||
) void {
|
) void {
|
||||||
log("onEnd {s}\n", .{client.url.href});
|
log("onEnd {s}\n", .{client.url.href});
|
||||||
|
const in_progress = client.state.stage != .done and client.state.stage != .fail;
|
||||||
if (client.state.stage != .done and client.state.stage != .fail)
|
if (in_progress) {
|
||||||
client.fail(error.ConnectionClosed);
|
// if the peer closed after a full chunk, treat this
|
||||||
|
// as if the transfer had complete, browsers appear to ignore
|
||||||
|
// a missing 0\r\n chunk
|
||||||
|
if (client.state.isChunkedEncoding()) {
|
||||||
|
if (picohttp.phr_decode_chunked_is_in_data(&client.state.chunked_decoder) == 0) {
|
||||||
|
var buf = client.state.getBodyBuffer();
|
||||||
|
if (buf.list.items.len > 0) {
|
||||||
|
client.state.received_last_chunk = true;
|
||||||
|
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (client.state.content_length == null and client.state.response_stage == .body) {
|
||||||
|
// no content length informed so we are done here
|
||||||
|
client.state.received_last_chunk = true;
|
||||||
|
client.progressUpdate(comptime is_ssl, if (is_ssl) &http_thread.https_context else &http_thread.http_context, socket);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client.fail(error.ConnectionClosed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub inline fn getAllocator() std.mem.Allocator {
|
pub inline fn getAllocator() std.mem.Allocator {
|
||||||
@@ -1369,8 +1395,8 @@ pub const InternalState = struct {
|
|||||||
return this.total_body_received >= content_length;
|
return this.total_body_received >= content_length;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: in future to handle Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
|
// Content-Type: text/event-stream we should be done only when Close/End/Timeout connection
|
||||||
return true;
|
return this.received_last_chunk;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
|
fn decompressConst(this: *InternalState, buffer: []const u8, body_out_str: *MutableString) !void {
|
||||||
@@ -2695,6 +2721,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!can_continue) {
|
if (!can_continue) {
|
||||||
|
log("onData: can_continue is false", .{});
|
||||||
// this means that the request ended
|
// this means that the request ended
|
||||||
// clone metadata and return the progress at this point
|
// clone metadata and return the progress at this point
|
||||||
this.cloneMetadata();
|
this.cloneMetadata();
|
||||||
@@ -3089,10 +3116,10 @@ const preallocate_max = 1024 * 1024 * 256;
|
|||||||
|
|
||||||
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
|
pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8, is_only_buffer: bool) !bool {
|
||||||
std.debug.assert(this.state.transfer_encoding == .identity);
|
std.debug.assert(this.state.transfer_encoding == .identity);
|
||||||
const content_length = this.state.content_length orelse 0;
|
const content_length = this.state.content_length;
|
||||||
// is it exactly as much as we need?
|
// is it exactly as much as we need?
|
||||||
if (is_only_buffer and incoming_data.len >= content_length) {
|
if (is_only_buffer and content_length != null and incoming_data.len >= content_length.?) {
|
||||||
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length]);
|
try handleResponseBodyFromSinglePacket(this, incoming_data[0..content_length.?]);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return handleResponseBodyFromMultiplePackets(this, incoming_data);
|
return handleResponseBodyFromMultiplePackets(this, incoming_data);
|
||||||
@@ -3135,16 +3162,19 @@ fn handleResponseBodyFromSinglePacket(this: *HTTPClient, incoming_data: []const
|
|||||||
|
|
||||||
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
|
fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []const u8) !bool {
|
||||||
var buffer = this.state.getBodyBuffer();
|
var buffer = this.state.getBodyBuffer();
|
||||||
const content_length = this.state.content_length orelse 0;
|
const content_length = this.state.content_length;
|
||||||
|
|
||||||
if (buffer.list.items.len == 0 and
|
if (buffer.list.items.len == 0 and incoming_data.len < preallocate_max) {
|
||||||
content_length > 0 and incoming_data.len < preallocate_max)
|
|
||||||
{
|
|
||||||
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
|
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, incoming_data.len) catch {};
|
||||||
}
|
}
|
||||||
|
|
||||||
const remaining_content_length = content_length -| this.state.total_body_received;
|
var remainder: []const u8 = undefined;
|
||||||
var remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
|
if (content_length != null) {
|
||||||
|
const remaining_content_length = content_length.? -| this.state.total_body_received;
|
||||||
|
remainder = incoming_data[0..@min(incoming_data.len, remaining_content_length)];
|
||||||
|
} else {
|
||||||
|
remainder = incoming_data;
|
||||||
|
}
|
||||||
|
|
||||||
_ = try buffer.write(remainder);
|
_ = try buffer.write(remainder);
|
||||||
|
|
||||||
@@ -3157,7 +3187,7 @@ fn handleResponseBodyFromMultiplePackets(this: *HTTPClient, incoming_data: []con
|
|||||||
}
|
}
|
||||||
|
|
||||||
// done or streaming
|
// done or streaming
|
||||||
const is_done = this.state.total_body_received >= content_length;
|
const is_done = content_length != null and this.state.total_body_received >= content_length.?;
|
||||||
if (is_done or this.signals.get(.body_streaming)) {
|
if (is_done or this.signals.get(.body_streaming)) {
|
||||||
const processed = try this.state.processBodyBuffer(buffer.*);
|
const processed = try this.state.processBodyBuffer(buffer.*);
|
||||||
|
|
||||||
@@ -3545,7 +3575,12 @@ pub fn handleResponseMetadata(
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
|
this.state.response_stage = if (this.state.transfer_encoding == .chunked) .body_chunk else .body;
|
||||||
const content_length = this.state.content_length orelse 0;
|
const content_length = this.state.content_length;
|
||||||
|
if (content_length) |length| {
|
||||||
|
log("handleResponseMetadata: content_length is {} and transfer_encoding {}", .{ length, this.state.transfer_encoding });
|
||||||
|
} else {
|
||||||
|
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
|
||||||
|
}
|
||||||
// if no body is expected we should stop processing
|
// if no body is expected we should stop processing
|
||||||
return this.method.hasBody() and (content_length > 0 or this.state.transfer_encoding == .chunked);
|
return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,35 @@ var port = 0;
|
|||||||
];
|
];
|
||||||
const useRequestObjectValues = [true, false];
|
const useRequestObjectValues = [true, false];
|
||||||
|
|
||||||
|
test("Should not crash when not returning a promise when stream is in progress", async () => {
|
||||||
|
var called = false;
|
||||||
|
await runInServer(
|
||||||
|
{
|
||||||
|
async fetch() {
|
||||||
|
var stream = new ReadableStream({
|
||||||
|
type: "direct",
|
||||||
|
pull(controller) {
|
||||||
|
controller.write("hey");
|
||||||
|
setTimeout(() => {
|
||||||
|
controller.end();
|
||||||
|
}, 100);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Response(stream);
|
||||||
|
},
|
||||||
|
},
|
||||||
|
async url => {
|
||||||
|
called = true;
|
||||||
|
expect(await fetch(url).then(res => res.text())).toContain(
|
||||||
|
"Welcome to Bun! To get started, return a Response object.",
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(called).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
for (let RequestPrototypeMixin of BodyMixin) {
|
for (let RequestPrototypeMixin of BodyMixin) {
|
||||||
for (let useRequestObject of useRequestObjectValues) {
|
for (let useRequestObject of useRequestObjectValues) {
|
||||||
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${
|
describe(`Request.prototoype.${RequestPrototypeMixin.name}() ${
|
||||||
|
|||||||
Reference in New Issue
Block a user