Compare commits

...

6 Commits

Author SHA1 Message Date
Alistair Smith
a178c240e4 Merge branch 'main' into claude/add-http-diagnostics-channel-events 2025-11-06 13:42:45 -08:00
Alistair Smith
318eb75469 for now, skip server events 2025-11-06 13:13:11 -08:00
Alistair Smith
912fced1e8 use the correct parallel test 2025-11-06 12:28:53 -08:00
Alistair Smith
ba27ee2d19 Merge branch 'main' into claude/add-http-diagnostics-channel-events 2025-11-05 08:34:35 -08:00
Claude Bot
ba64e0ab1e Add Node.js parallel test for HTTP client diagnostics_channel events
This test verifies that all four HTTP client diagnostics_channel events
are properly emitted:
- http.client.request.created
- http.client.request.start
- http.client.request.error
- http.client.response.finish

Based on Node.js's test-diagnostics-channel-http.js but focused only on
client events (server events not yet implemented in Bun).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 16:40:06 +00:00
Claude Bot
f688d1eed0 Add HTTP client diagnostics_channel event support
Implements diagnostics_channel events for HTTP client requests to match
Node.js behavior. This enables observability tools like Sentry to monitor
HTTP requests via diagnostics channels.

Events added:
- http.client.request.created: Emitted when ClientRequest is created
- http.client.request.start: Emitted when fetch() starts
- http.client.request.error: Emitted on request errors
- http.client.response.finish: Emitted when response completes

Fixes support for Sentry's HTTP instrumentation which relies on these
events to generate spans.

Reference: https://github.com/getsentry/sentry-javascript/issues/17779

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-04 16:37:01 +00:00
3 changed files with 409 additions and 0 deletions

View File

@@ -6,6 +6,13 @@ const { isValidTLSArray } = require("internal/tls");
const { validateHeaderName } = require("node:_http_common");
const { getTimerDuration } = require("internal/timers");
const { ConnResetException } = require("internal/shared");
const dc = require("node:diagnostics_channel");
const onClientRequestCreatedChannel = dc.channel("http.client.request.created");
const onClientRequestStartChannel = dc.channel("http.client.request.start");
const onClientRequestErrorChannel = dc.channel("http.client.request.error");
const onClientResponseFinishChannel = dc.channel("http.client.response.finish");
const {
kBodyChunks,
abortedSymbol,
@@ -62,6 +69,12 @@ const StringPrototypeToUpperCase = String.prototype.toUpperCase;
function emitErrorEventNT(self, err) {
if (self.destroyed) return;
if (onClientRequestErrorChannel.hasSubscribers) {
onClientRequestErrorChannel.publish({
request: self,
error: err,
});
}
if (self.listenerCount("error") > 0) {
self.emit("error", err);
}
@@ -376,6 +389,13 @@ function ClientRequest(input, options, cb) {
fetchOptions.unix = socketPath;
}
// Emit diagnostics channel event for request start
if (onClientRequestStartChannel.hasSubscribers) {
onClientRequestStartChannel.publish({
request: this,
});
}
//@ts-ignore
this[kFetchRequest] = fetch(url, fetchOptions).then(response => {
if (this.aborted) {
@@ -409,6 +429,17 @@ function ClientRequest(input, options, cb) {
callback?.();
}, msecs);
};
// Emit diagnostics channel event when response headers are received
// Note: Despite the name "response.finish", Node.js emits this when
// response headers arrive, not when the body completes
if (onClientResponseFinishChannel.hasSubscribers) {
onClientResponseFinishChannel.publish({
request: this,
response: res,
});
}
process.nextTick(
(self, res) => {
// If the user did not listen for the 'response' event, then they
@@ -464,6 +495,13 @@ function ClientRequest(input, options, cb) {
if (!!$debug) globalReportError(err);
if (onClientRequestErrorChannel.hasSubscribers) {
onClientRequestErrorChannel.publish({
request: this,
error: err,
});
}
try {
this.emit("error", err);
} catch (_err) {
@@ -561,6 +599,12 @@ function ClientRequest(input, options, cb) {
};
} catch (err) {
if (!!$debug) globalReportError(err);
if (onClientRequestErrorChannel.hasSubscribers) {
onClientRequestErrorChannel.publish({
request: this,
error: err,
});
}
this.emit("error", err);
} finally {
process.nextTick(maybeEmitFinish.bind(this));
@@ -925,6 +969,13 @@ function ClientRequest(input, options, cb) {
this.removeAllListeners("timeout");
}
};
// Emit diagnostics channel event for request creation
if (onClientRequestCreatedChannel.hasSubscribers) {
onClientRequestCreatedChannel.publish({
request: this,
});
}
}
const ClientRequestPrototype = {

View File

@@ -0,0 +1,255 @@
import { expect, test } from "bun:test";
import * as diagnostics_channel from "node:diagnostics_channel";
import * as http from "node:http";
test("http.client diagnostics_channel events are emitted", async () => {
const events: string[] = [];
const requestData: any[] = [];
// Subscribe to all HTTP client diagnostics channels
const createdChannel = diagnostics_channel.channel("http.client.request.created");
const startChannel = diagnostics_channel.channel("http.client.request.start");
const errorChannel = diagnostics_channel.channel("http.client.request.error");
const finishChannel = diagnostics_channel.channel("http.client.response.finish");
createdChannel.subscribe(({ request }) => {
events.push("created");
requestData.push({ event: "created", hasRequest: !!request });
});
startChannel.subscribe(({ request }) => {
events.push("start");
requestData.push({ event: "start", hasRequest: !!request });
});
errorChannel.subscribe(({ request, error }) => {
events.push("error");
requestData.push({ event: "error", hasRequest: !!request, hasError: !!error });
});
finishChannel.subscribe(({ request, response }) => {
events.push("finish");
requestData.push({ event: "finish", hasRequest: !!request, hasResponse: !!response });
});
// Create a simple HTTP server
const server = http.createServer((req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
res.end("Hello World");
});
await new Promise<void>(resolve => {
server.listen(0, () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("Failed to get server address");
}
// Make an HTTP request
await new Promise<void>((resolve, reject) => {
const req = http.request(
{
hostname: "localhost",
port: address.port,
path: "/",
method: "GET",
},
res => {
let data = "";
res.on("data", chunk => {
data += chunk;
});
res.on("end", () => {
expect(data).toBe("Hello World");
resolve();
});
},
);
req.on("error", reject);
req.end();
});
// Close the server
await new Promise<void>(resolve => {
server.close(() => resolve());
});
// Verify events were emitted in the correct order
expect(events).toEqual(["created", "start", "finish"]);
// Verify each event had the required data
expect(requestData[0]).toEqual({ event: "created", hasRequest: true });
expect(requestData[1]).toEqual({ event: "start", hasRequest: true });
expect(requestData[2]).toEqual({ event: "finish", hasRequest: true, hasResponse: true });
});
test("http.client.request.error diagnostics_channel event is emitted on error", async () => {
const events: string[] = [];
const errorData: any[] = [];
const createdChannel = diagnostics_channel.channel("http.client.request.created");
const startChannel = diagnostics_channel.channel("http.client.request.start");
const errorChannel = diagnostics_channel.channel("http.client.request.error");
const finishChannel = diagnostics_channel.channel("http.client.response.finish");
createdChannel.subscribe(() => events.push("created"));
startChannel.subscribe(() => events.push("start"));
errorChannel.subscribe(({ request, error }) => {
events.push("error");
errorData.push({ hasRequest: !!request, hasError: !!error, errorMessage: error?.message });
});
finishChannel.subscribe(() => events.push("finish"));
// Make a request to a non-existent server
await new Promise<void>((resolve, reject) => {
const req = http.request(
{
hostname: "localhost",
port: 1, // Port 1 should refuse connection
path: "/",
method: "GET",
},
() => {
reject(new Error("Should not receive response"));
},
);
req.on("error", err => {
// Error is expected
expect(err).toBeDefined();
resolve();
});
req.end();
});
// Verify created and start events were emitted, followed by error
expect(events).toContain("created");
expect(events).toContain("start");
expect(events).toContain("error");
// Verify finish was not emitted (since request errored)
expect(events).not.toContain("finish");
// Verify error event had the required data
expect(errorData.length).toBeGreaterThan(0);
expect(errorData[0].hasRequest).toBe(true);
expect(errorData[0].hasError).toBe(true);
});
test("http.get also emits diagnostics_channel events", async () => {
const events: string[] = [];
const createdChannel = diagnostics_channel.channel("http.client.request.created");
const startChannel = diagnostics_channel.channel("http.client.request.start");
const finishChannel = diagnostics_channel.channel("http.client.response.finish");
createdChannel.subscribe(() => events.push("created"));
startChannel.subscribe(() => events.push("start"));
finishChannel.subscribe(() => events.push("finish"));
// Create a simple HTTP server
const server = http.createServer((req, res) => {
res.writeHead(200, { "Content-Type": "text/plain" });
res.end("OK");
});
await new Promise<void>(resolve => {
server.listen(0, () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("Failed to get server address");
}
// Make an HTTP GET request
await new Promise<void>((resolve, reject) => {
http.get(`http://localhost:${address.port}/`, res => {
res.on("data", () => {});
res.on("end", () => resolve());
res.on("error", reject);
});
});
// Close the server
await new Promise<void>(resolve => {
server.close(() => resolve());
});
// Verify events were emitted
expect(events).toEqual(["created", "start", "finish"]);
});
test("diagnostics_channel events work with POST requests with body", async () => {
const events: string[] = [];
const createdChannel = diagnostics_channel.channel("http.client.request.created");
const startChannel = diagnostics_channel.channel("http.client.request.start");
const finishChannel = diagnostics_channel.channel("http.client.response.finish");
createdChannel.subscribe(() => events.push("created"));
startChannel.subscribe(() => events.push("start"));
finishChannel.subscribe(() => events.push("finish"));
// Create a simple HTTP server
const server = http.createServer((req, res) => {
let body = "";
req.on("data", chunk => {
body += chunk;
});
req.on("end", () => {
res.writeHead(200, { "Content-Type": "text/plain" });
res.end(`Received: ${body}`);
});
});
await new Promise<void>(resolve => {
server.listen(0, () => resolve());
});
const address = server.address();
if (!address || typeof address === "string") {
throw new Error("Failed to get server address");
}
// Make an HTTP POST request with body
await new Promise<void>((resolve, reject) => {
const req = http.request(
{
hostname: "localhost",
port: address.port,
path: "/",
method: "POST",
headers: {
"Content-Type": "application/json",
},
},
res => {
let data = "";
res.on("data", chunk => {
data += chunk;
});
res.on("end", () => {
expect(data).toBe('Received: {"test":"data"}');
resolve();
});
},
);
req.on("error", reject);
req.write('{"test":"data"}');
req.end();
});
// Close the server
await new Promise<void>(resolve => {
server.close(() => resolve());
});
// Verify events were emitted
expect(events).toEqual(["created", "start", "finish"]);
});

View File

@@ -0,0 +1,103 @@
'use strict';
const common = require('../common');
const { addresses } = require('../common/internet');
const assert = require('assert');
const http = require('http');
const net = require('net');
const dc = require('diagnostics_channel');
const isHTTPServer = (server) => server instanceof http.Server;
const isIncomingMessage = (object) => object instanceof http.IncomingMessage;
const isOutgoingMessage = (object) => object instanceof http.OutgoingMessage;
const isNetSocket = (socket) => socket instanceof net.Socket;
const isError = (error) => error instanceof Error;
dc.subscribe('http.client.request.start', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}, 2));
dc.subscribe('http.client.request.error', common.mustCall(({ request, error }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isError(error), true);
}));
dc.subscribe('http.client.response.finish', common.mustCall(({
request,
response
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
}));
// TODO: Implement HTTP server diagnostics channel events These server-side
// events are not yet implemented in Bun because:
// 1. The implementation requires careful handling of socket lifecycle and event
// timing
// 2. NodeHTTPServerSocket extends Duplex (not net.Socket) which complicates
// instanceof checks
// 3. The finish event handler setup needs to work correctly across all code
// paths (upgrade, expect headers, etc.)
// 4. Working around this by patching prototype chains or extending net.Socket
// is too brittle, just for the sake of making this test file pass fully
//
// Server events to implement:
// - http.server.request.start: Emitted when server receives a request
// - http.server.response.created: Emitted when ServerResponse is created
// - http.server.response.finish: Emitted when response finishes
// dc.subscribe('http.server.request.start', common.mustCall(({
// request,
// response,
// socket,
// server,
// }) => {
// assert.strictEqual(isIncomingMessage(request), true);
// assert.strictEqual(isOutgoingMessage(response), true);
// assert.strictEqual(isNetSocket(socket), true);
// assert.strictEqual(isHTTPServer(server), true);
// }));
//
// dc.subscribe('http.server.response.finish', common.mustCall(({
// request,
// response,
// socket,
// server,
// }) => {
// assert.strictEqual(isIncomingMessage(request), true);
// assert.strictEqual(isOutgoingMessage(response), true);
// assert.strictEqual(isNetSocket(socket), true);
// assert.strictEqual(isHTTPServer(server), true);
// }));
//
// dc.subscribe('http.server.response.created', common.mustCall(({
// request,
// response,
// }) => {
// assert.strictEqual(isIncomingMessage(request), true);
// assert.strictEqual(isOutgoingMessage(response), true);
// }));
dc.subscribe('http.client.request.created', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isHTTPServer(server), true);
}, 2));
const server = http.createServer(common.mustCall((req, res) => {
res.end('done');
}));
server.listen(async () => {
const { port } = server.address();
const invalidRequest = http.get({
host: addresses.INVALID_HOST,
});
await new Promise((resolve) => {
invalidRequest.on('error', resolve);
});
http.get(`http://localhost:${port}`, (res) => {
res.resume();
res.on('end', () => {
server.close();
});
});
});