Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,35 @@ export class AppKit<TPlugins extends InputPluginMap> {
client?: WorkspaceClient;
} = {},
): Promise<PluginMap<T>> {
const rawPlugins = config.plugins as T;

// Collect plugin-contributed trace exporter headers before telemetry init
const traceExporterHeaders: Record<string, string> = {
...config?.telemetry?.traceExporterHeaders,
};
for (const entry of rawPlugins) {
if (typeof entry.plugin.appendTraceHeaders === "function") {
Object.assign(
traceExporterHeaders,
entry.plugin.appendTraceHeaders(
entry.config as Parameters<
typeof entry.plugin.appendTraceHeaders
>[0],
),
);
}
}

// Initialize core services
TelemetryManager.initialize(config?.telemetry);
await TelemetryManager.initialize({
...config?.telemetry,
traceExporterHeaders:
Object.keys(traceExporterHeaders).length > 0
? traceExporterHeaders
: undefined,
});
await CacheManager.getInstance(config?.cache);

const rawPlugins = config.plugins as T;

// Collect manifest resources via registry
const registry = new ResourceRegistry();
registry.collectResources(rawPlugins);
Expand Down
10 changes: 6 additions & 4 deletions packages/appkit/src/plugins/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ export class ServerPlugin extends Plugin {
this.serverApplication = express();
this.server = null;
this.serverExtensions = [];
this.telemetry.registerInstrumentations([
instrumentations.http,
instrumentations.express,
]);
if (config.enableDefaultTelemetry !== false) {
this.telemetry.registerInstrumentations([
instrumentations.http(),
instrumentations.express(),
]);
}
}

/** Setup the server plugin. */
Expand Down
4 changes: 2 additions & 2 deletions packages/appkit/src/plugins/server/tests/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ vi.mock("../../../telemetry", () => ({
}),
},
instrumentations: {
http: {},
express: {},
http: () => ({}),
express: () => ({}),
},
}));

Expand Down
6 changes: 6 additions & 0 deletions packages/appkit/src/plugins/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@ export interface ServerConfig extends BasePluginConfig {
staticPath?: string;
autoStart?: boolean;
host?: string;
/**
* Register HTTP and Express OpenTelemetry instrumentations.
* When false, no HTTP/Express spans are created regardless of other
* telemetry settings. Default: true.
*/
enableDefaultTelemetry?: boolean;
}
86 changes: 41 additions & 45 deletions packages/appkit/src/telemetry/instrumentations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,56 +4,52 @@ import { HttpInstrumentation } from "@opentelemetry/instrumentation-http";
import { shouldIgnoreRequest } from "../utils/path-exclusions";

/**
* Registry of pre-configured instrumentations for common use cases.
* These can be selectively registered by plugins that need them.
*
* While instrumentations are generally safe to re-register,
* the recommended approach is to register them once in a corresponding plugin constructor.
* Factory functions that create pre-configured instrumentations on demand.
* Lazy creation avoids side-effects at import time (e.g. module patching
* triggered by instrumentation constructors).
*/
export const instrumentations: Record<string, Instrumentation> = {
http: new HttpInstrumentation({
// Filter out requests before creating spans - this is the most efficient approach
ignoreIncomingRequestHook: shouldIgnoreRequest,
export const instrumentations = {
http: (): Instrumentation =>
new HttpInstrumentation({
ignoreIncomingRequestHook: shouldIgnoreRequest,

applyCustomAttributesOnSpan(span: any, request: any) {
let spanName: string | null = null;
applyCustomAttributesOnSpan(span: any, request: any) {
let spanName: string | null = null;

if (request.route) {
const baseUrl = request.baseUrl || "";
const url = request.url?.split("?")[0] || "";
const fullPath = baseUrl + url;
if (fullPath) {
spanName = `${request.method} ${fullPath}`;
if (request.route) {
const baseUrl = request.baseUrl || "";
const url = request.url?.split("?")[0] || "";
const fullPath = baseUrl + url;
if (fullPath) {
spanName = `${request.method} ${fullPath}`;
}
} else if (request.url) {
// No Express route (e.g., static assets) - use the raw URL path
// Remove query string for cleaner trace names
const path = request.url.split("?")[0];
spanName = `${request.method} ${path}`;
}
} else if (request.url) {
// No Express route (e.g., static assets) - use the raw URL path
// Remove query string for cleaner trace names
const path = request.url.split("?")[0];
spanName = `${request.method} ${path}`;
}

if (spanName) {
span.updateName(spanName);
}
},
}),
express: new ExpressInstrumentation({
requestHook: (span: any, info: any) => {
const req = info.request;

// Only update span name for route handlers (layerType: request_handler)
// This ensures we're not renaming middleware spans
if (info.layerType === "request_handler" && req.route) {
// Combine baseUrl with url to get full path with actual parameter values
// e.g., baseUrl="/api/analytics" + url="/query/spend_data" = "/api/analytics/query/spend_data"
const baseUrl = req.baseUrl || "";
const url = req.url?.split("?")[0] || "";
const fullPath = baseUrl + url;
if (fullPath) {
const spanName = `${req.method} ${fullPath}`;
if (spanName) {
span.updateName(spanName);
}
}
},
}),
},
}),

express: (): Instrumentation =>
new ExpressInstrumentation({
requestHook: (span: any, info: any) => {
const req = info.request;

if (info.layerType === "request_handler" && req.route) {
const baseUrl = req.baseUrl || "";
const url = req.url?.split("?")[0] || "";
const fullPath = baseUrl + url;
if (fullPath) {
const spanName = `${req.method} ${fullPath}`;
span.updateName(spanName);
}
}
},
}),
};
56 changes: 51 additions & 5 deletions packages/appkit/src/telemetry/telemetry-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,31 @@ export class TelemetryManager {
return TelemetryManager.instance;
}

static initialize(config: Partial<TelemetryConfig> = {}): void {
static async initialize(
config: Partial<TelemetryConfig> = {},
): Promise<void> {
const instance = TelemetryManager.getInstance();
instance._initialize(config);
await instance._initialize(config);
}

private _initialize(config: Partial<TelemetryConfig>): void {
private async _initialize(config: Partial<TelemetryConfig>): Promise<void> {
if (this.sdk) return;

if (!process.env.OTEL_EXPORTER_OTLP_ENDPOINT) {
return;
}

try {
const traceHeaders = await this.buildTraceExporterHeaders(
config.headers,
config.traceExporterHeaders,
);

this.sdk = new NodeSDK({
resource: this.createResource(config),
autoDetectResources: false,
sampler: new AppKitSampler(),
traceExporter: new OTLPTraceExporter({ headers: config.headers }),
traceExporter: new OTLPTraceExporter({ headers: traceHeaders }),
metricReaders: [
new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({ headers: config.headers }),
Expand All @@ -91,7 +98,8 @@ export class TelemetryManager {
new OTLPLogExporter({ headers: config.headers }),
),
],
instrumentations: this.getDefaultInstrumentations(),
instrumentations:
config.instrumentations ?? this.getDefaultInstrumentations(),
});

this.sdk.start();
Expand Down Expand Up @@ -130,6 +138,44 @@ export class TelemetryManager {
return initialResource.merge(detectedResource);
}

/**
* Builds headers for the trace exporter by merging (in priority order):
* 1. Base `headers` from TelemetryConfig
* 2. Databricks auth (auto-resolved when DATABRICKS_HOST is set)
* 3. Plugin-contributed `traceExporterHeaders` (e.g. MLflow experiment ID)
*/
private async buildTraceExporterHeaders(
configHeaders?: Record<string, string>,
pluginHeaders?: Record<string, string>,
): Promise<Record<string, string>> {
const headers: Record<string, string> = { ...configHeaders };

if (process.env.DATABRICKS_HOST && !headers.authorization) {
try {
const { WorkspaceClient } = await import(
"@databricks/sdk-experimental"
);
const client = new WorkspaceClient({});
const authHeaders = new Headers();
await client.config.authenticate(authHeaders);
authHeaders.forEach((value, key) => {
headers[key] = value;
});
} catch (err) {
logger.warn(
"Could not obtain Databricks auth for trace exporter: %O",
err,
);
}
}

if (pluginHeaders) {
Object.assign(headers, pluginHeaders);
}

return headers;
}

private getDefaultInstrumentations(): Instrumentation[] {
return [
...getNodeAutoInstrumentations({
Expand Down
14 changes: 8 additions & 6 deletions packages/appkit/src/telemetry/tests/telemetry-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ describe("TelemetryManager", () => {

beforeEach(() => {
originalEnv = { ...process.env };
// Prevent TelemetryManager from attempting Databricks auth during tests
delete process.env.DATABRICKS_HOST;
vi.clearAllMocks();
// @ts-expect-error - accessing private static property for testing
TelemetryManager.instance = undefined;
Expand All @@ -75,7 +77,7 @@ describe("TelemetryManager", () => {
const { detectResources } = await import("@opentelemetry/resources");
vi.clearAllMocks();

TelemetryManager.initialize({
await TelemetryManager.initialize({
serviceName: "test-service-config",
});

Expand All @@ -85,7 +87,7 @@ describe("TelemetryManager", () => {
test("should initialize providers and create telemetry instances", async () => {
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://localhost:4318";

TelemetryManager.initialize({
await TelemetryManager.initialize({
serviceName: "integration-test",
serviceVersion: "1.0.0",
});
Expand Down Expand Up @@ -118,7 +120,7 @@ describe("TelemetryManager", () => {
test("should support disabled telemetry config", async () => {
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "";

TelemetryManager.initialize({
await TelemetryManager.initialize({
serviceName: "disabled-test",
serviceVersion: "1.0.0",
});
Expand Down Expand Up @@ -161,7 +163,7 @@ describe("TelemetryManager", () => {
);
vi.clearAllMocks();

TelemetryManager.initialize({
await TelemetryManager.initialize({
headers: {
Authorization: "Bearer token",
"Custom-Header": "value",
Expand All @@ -182,7 +184,7 @@ describe("TelemetryManager", () => {
test("should create and execute spans with real tracer", async () => {
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://localhost:4318";

TelemetryManager.initialize({
await TelemetryManager.initialize({
serviceName: "span-test",
serviceVersion: "1.0.0",
});
Expand All @@ -207,7 +209,7 @@ describe("TelemetryManager", () => {
test("should handle span errors", async () => {
process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "http://localhost:4318";

TelemetryManager.initialize({
await TelemetryManager.initialize({
serviceName: "error-test",
serviceVersion: "1.0.0",
});
Expand Down
6 changes: 6 additions & 0 deletions packages/appkit/src/telemetry/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ export interface TelemetryConfig {
instrumentations?: Instrumentation[];
exportIntervalMs?: number;
headers?: Record<string, string>;
/**
* Additional headers to include on the OTLP **trace** exporter only.
* Merged on top of `headers` and any auto-resolved Databricks auth.
* Typically populated automatically by plugins via `appendTraceHeaders`.
*/
traceExporterHeaders?: Record<string, string>;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/shared/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ export type PluginConstructor<
* Use this when resource requirements depend on plugin configuration.
*/
getResourceRequirements?(config: C): ResourceRequirement[];
/**
* Returns extra headers to append to the OTLP trace exporter.
* Called by `_createApp` before TelemetryManager initializes.
*/
appendTraceHeaders?(config: C): Record<string, string>;
};

/**
Expand Down