From 7114bec23a8f511113b783886f87ee01d0fe9e2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 26 Jun 2025 23:06:03 -0400 Subject: [PATCH 1/7] add distribution metric type and method --- src/metrics.ts | 19 +++++++++++++++++++ src/types.ts | 9 ++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/metrics.ts b/src/metrics.ts index e63e784..47ff92b 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -4,6 +4,7 @@ import { type GaugeMetricPayload, type HistogramMetricPayload, type HistogramOptions, + type DistributionMetricPayload, METRICS_CHANNEL_NAME, MetricType, type Tags, @@ -69,8 +70,26 @@ export function histogram( metricsChannel.publish(payload); } +/** + * Record a distribution metric + * @param name - The metric name + * @param value - The distribution value + * @param tags - Optional tags + */ +export function distribution(name: string, value: number, tags: Tags = {}): void { + const payload: DistributionMetricPayload = { + type: MetricType.DISTRIBUTION, + name, + value, + tags, + }; + + metricsChannel.publish(payload); +} + export default { count, gauge, histogram, + distribution, } diff --git a/src/types.ts b/src/types.ts index eed7b39..8b10eca 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,6 +4,7 @@ export enum MetricType { COUNT = "COUNT", GAUGE = "GAUGE", HISTOGRAM = "HISTOGRAM", + DISTRIBUTION = "DISTRIBUTION", } export type Tags = Record; @@ -47,9 +48,15 @@ export interface HistogramMetricPayload extends BaseMetricPayload { options: HistogramOptions; } +export interface DistributionMetricPayload extends BaseMetricPayload { + type: MetricType.DISTRIBUTION; + value: number; +} + export type MetricPayload = | CountMetricPayload | GaugeMetricPayload - | HistogramMetricPayload; + | HistogramMetricPayload + | DistributionMetricPayload; export type ExportedMetricPayload = MetricPayload & { timestamp: number }; \ No newline at end of file From 3acbef85eccef78667b29e00bf4afdc344c3424a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Thu, 26 Jun 2025 23:06:48 -0400 Subject: [PATCH 2/7] use distribution metrics in Datadog also converts count and histograms to distributions due to their nature in Serverless environments and how aggregation is done --- src/sinks/metrics/datadog.ts | 162 +++++++++++++++++++++++++++++------ 1 file changed, 135 insertions(+), 27 deletions(-) diff --git a/src/sinks/metrics/datadog.ts b/src/sinks/metrics/datadog.ts index 89dde20..392e2e2 100644 --- a/src/sinks/metrics/datadog.ts +++ b/src/sinks/metrics/datadog.ts @@ -1,6 +1,10 @@ -import type { ExportedMetricPayload } from "../../types"; +import { MetricType, type ExportedMetricPayload, type Tags } from "../../types"; import { env } from "cloudflare:workers"; import type { MetricSink } from "../sink"; + +const DISTRIBUTION_POINTS_ENDPOINT_PATH: string = "api/v1/distribution_points"; +const METRICS_SERIES_ENDPOINT_PATH: string = "api/v1/series"; + export interface DatadogMetricSinkOptions { /** * Datadog API key @@ -13,9 +17,14 @@ export interface DatadogMetricSinkOptions { site?: string; /** - * Custom endpoint URL override (for testing or proxies) + * Custom distribution points endpoint URL override (for testing or proxies) + */ + distributionPointsEndpoint?: string; + + /** + * Custom metrics series endpoint URL override (for testing or proxies) */ - endpoint?: string; + metricsSeriesEndpoint?: string; } /** @@ -25,24 +34,27 @@ export class DatadogMetricSink implements MetricSink { private readonly options: { apiKey: string; site: string; - endpoint: string; + distributionPointsEndpoint: string; + metricsSeriesEndpoint: string; }; constructor(options?: DatadogMetricSinkOptions) { // @ts-ignore - const apiKey = options?.apiKey || env.DD_API_KEY || env.DATADOG_API_KEY; + let apiKey = options?.apiKey || env.DD_API_KEY || env.DATADOG_API_KEY; if (!apiKey || apiKey.length === 0) { console.error("Datadog API key was not found. Provide it in the sink options or set the DD_API_KEY environment variable. Metrics will not be sent to Datadog."); } // @ts-ignore - const site = options?.site || env.DD_SITE || "datadoghq.com"; - const endpoint = options?.endpoint || `https://api.${site}/api/v1/series`; + let site = options?.site || env.DD_SITE || "datadoghq.com"; + let distributionPointsEndpoint = options?.distributionPointsEndpoint || `https://api.${site}/${DISTRIBUTION_POINTS_ENDPOINT_PATH}`; + let metricsSeriesEndpoint = options?.metricsSeriesEndpoint || `https://api.${site}/${METRICS_SERIES_ENDPOINT_PATH}`; this.options = { apiKey, site, - endpoint, + distributionPointsEndpoint, + metricsSeriesEndpoint, }; } @@ -55,9 +67,12 @@ export class DatadogMetricSink implements MetricSink { } try { - const datadogMetrics = payloads.map((payload) => - this.transformMetric(payload), - ); + // Filter out worker metrics, since Datadog is currently getting this metrics through an integration + // For now, Datadog only accepts custom metrics. + const payloadsWithoutWorkerMetrics = payloads.filter((payload) => !payload.name.startsWith('worker.')); + + const datadogMetrics = payloadsWithoutWorkerMetrics.map((payload) => this.transformMetric(payload)); + await this.sendToDatadog(datadogMetrics); } catch (error) { throw new Error(`Failed to send metrics to Datadog: ${error instanceof Error ? error.message : String(error)}`); @@ -67,37 +82,123 @@ export class DatadogMetricSink implements MetricSink { /** * Transform a metric payload to Datadog format */ - private transformMetric(payload: ExportedMetricPayload): DatadogMetric { - const formattedTags = Object.entries(payload.tags).map( - ([key, value]) => `${key}:${value}`, - ); - - const metricType = payload.type.toLowerCase(); - - return { - metric: payload.name, - type: metricType, - points: [[Math.floor(payload.timestamp / 1000), payload.value]], - tags: formattedTags, - }; + private transformMetric(payload: ExportedMetricPayload): DatadogMetric | DatadogPoint { + const tags = this.formatTags(payload.tags); + switch (payload.type) { + case MetricType.GAUGE: + return { + metric: payload.name, + type: 'gauge', + points: [[Math.floor(payload.timestamp / 1000), payload.value]], + tags, + } as DatadogMetric; + case MetricType.DISTRIBUTION: + // In Serverless, count and histogram metrics need to be sent as distribution metrics. + // Distributions metrics are stateless by design, no local aggregation is needed. + case MetricType.HISTOGRAM: + case MetricType.COUNT: + default: + return { + metric: payload.name, + type: 'distribution', + points: [[Math.floor(payload.timestamp / 1000), [payload.value]]], + tags, + } as DatadogPoint; + } + } + + /** + * Format tags returns a list of tags in the format `key:value`, + * and adds the following tags: + * - `worker_script:${scriptName}` + * - `execution_model:${executionModel}` + * - `version:${versionId}` + * - `trigger:${trigger}` + * - `region:earth` + */ + private formatTags(tags: Tags): string[] { + const { + scriptName, + executionModel, + versionId, + trigger, + ...customTags + } = tags; + + let formattedTags = Object.entries(customTags) + .filter(([_, value]) => value !== undefined && value !== null) + .map( + ([key, value]) => `${key}:${value}`, + ); + + if (scriptName != null) { + formattedTags.push(`worker_script:${scriptName}`); + } + + if (executionModel != null) { + formattedTags.push(`execution_model:${executionModel}`); + } + + if (versionId != null) { + formattedTags.push(`version:${versionId}`); + } + + if (trigger != null) { + formattedTags.push(`trigger:${trigger}`); + } + + formattedTags.push(`region:earth`); + + return formattedTags; } /** * Send metrics to Datadog API */ - private async sendToDatadog(metrics: DatadogMetric[]): Promise { + private async sendToDatadog(metrics: (DatadogMetric | DatadogPoint)[]): Promise { if (!this.options.apiKey || this.options.apiKey.length === 0) { console.warn(`Datadog API key was not found. Dropping ${metrics.length} metrics.`); return; } + + const distributionMetrics: DatadogPoint[] = metrics.filter((metric) => metric.type === 'distribution') as DatadogPoint[]; + // Gauge metrics are sent as metrics series + const gaugeMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'gauge') as DatadogMetric[]; - const response = await fetch(this.options.endpoint, { + if (distributionMetrics.length > 0) { + try { + await this.sendDistributionMetrics(distributionMetrics); + } catch (error) { + throw new Error(`Distribution metrics failed to send:\n ${error instanceof Error ? error.message : String(error)}`); + } + } + + if (gaugeMetrics.length > 0) { + try { + await this.sendMetricsSeries(gaugeMetrics); + } catch (error) { + throw new Error(`Gauge metrics failed to send:\n ${error instanceof Error ? error.message : String(error)}`); + } + } + } + + private async sendMetricsSeries(metrics: DatadogMetric[]): Promise { + this.postRequest(this.options.metricsSeriesEndpoint, JSON.stringify({ series: metrics })); + } + + private async sendDistributionMetrics(metrics: DatadogPoint[]): Promise { + await this.postRequest(this.options.distributionPointsEndpoint, JSON.stringify({ series: metrics })); + } + + private async postRequest(endpoint: string, body: string): Promise { + console.log(body); + const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", "DD-API-KEY": this.options.apiKey, }, - body: JSON.stringify({ series: metrics }), + body, }); if (!response.ok) { @@ -113,3 +214,10 @@ interface DatadogMetric { points: [number, number][]; // [timestamp, value] tags: string[]; } + +interface DatadogPoint { + metric: string; + type: string; + points: [number, number[]][]; // [timestamp, [values]] + tags: string[]; +} From 2da6d88985b3c4022897f122efe1c247000e3197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Mon, 30 Jun 2025 14:01:13 -0400 Subject: [PATCH 3/7] consolidate Datadog metric to be `DatadogMetric` with different types for the points --- src/sinks/metrics/datadog.ts | 49 ++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/sinks/metrics/datadog.ts b/src/sinks/metrics/datadog.ts index 392e2e2..6e558de 100644 --- a/src/sinks/metrics/datadog.ts +++ b/src/sinks/metrics/datadog.ts @@ -82,29 +82,34 @@ export class DatadogMetricSink implements MetricSink { /** * Transform a metric payload to Datadog format */ - private transformMetric(payload: ExportedMetricPayload): DatadogMetric | DatadogPoint { + private transformMetric(payload: ExportedMetricPayload): DatadogMetric { const tags = this.formatTags(payload.tags); + + let type; + let value: number | number[]; switch (payload.type) { case MetricType.GAUGE: - return { - metric: payload.name, - type: 'gauge', - points: [[Math.floor(payload.timestamp / 1000), payload.value]], - tags, - } as DatadogMetric; + type = 'gauge'; + value = payload.value; + break; case MetricType.DISTRIBUTION: // In Serverless, count and histogram metrics need to be sent as distribution metrics. // Distributions metrics are stateless by design, no local aggregation is needed. case MetricType.HISTOGRAM: case MetricType.COUNT: + type = 'distribution'; + value = [payload.value]; + break; default: - return { - metric: payload.name, - type: 'distribution', - points: [[Math.floor(payload.timestamp / 1000), [payload.value]]], - tags, - } as DatadogPoint; + throw new Error(`Unsupported metric type: ${payload}`); } + + return { + metric: payload.name, + type, + points: [[Math.floor(payload.timestamp / 1000), value]], + tags, + } as DatadogMetric; } /** @@ -155,15 +160,15 @@ export class DatadogMetricSink implements MetricSink { /** * Send metrics to Datadog API */ - private async sendToDatadog(metrics: (DatadogMetric | DatadogPoint)[]): Promise { + private async sendToDatadog(metrics: DatadogMetric[]): Promise { if (!this.options.apiKey || this.options.apiKey.length === 0) { console.warn(`Datadog API key was not found. Dropping ${metrics.length} metrics.`); return; } - const distributionMetrics: DatadogPoint[] = metrics.filter((metric) => metric.type === 'distribution') as DatadogPoint[]; + const distributionMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'distribution'); // Gauge metrics are sent as metrics series - const gaugeMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'gauge') as DatadogMetric[]; + const gaugeMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'gauge'); if (distributionMetrics.length > 0) { try { @@ -186,7 +191,7 @@ export class DatadogMetricSink implements MetricSink { this.postRequest(this.options.metricsSeriesEndpoint, JSON.stringify({ series: metrics })); } - private async sendDistributionMetrics(metrics: DatadogPoint[]): Promise { + private async sendDistributionMetrics(metrics: DatadogMetric[]): Promise { await this.postRequest(this.options.distributionPointsEndpoint, JSON.stringify({ series: metrics })); } @@ -208,16 +213,12 @@ export class DatadogMetricSink implements MetricSink { } } +type DatadogPoint = [number, number[]] | [number, number]; // [timestamp, [values]] (distribution) or [timestamp, value] (count, histogram, gauge) + interface DatadogMetric { metric: string; type: string; - points: [number, number][]; // [timestamp, value] + points: DatadogPoint[]; tags: string[]; } -interface DatadogPoint { - metric: string; - type: string; - points: [number, number[]][]; // [timestamp, [values]] - tags: string[]; -} From 917d4d6a82b6ed9b7a4f76cf1af572232c0a8556 Mon Sep 17 00:00:00 2001 From: Ankcorn Date: Mon, 30 Jun 2025 22:44:31 +0100 Subject: [PATCH 4/7] refactor metrics db to remove aggregation of the histogram --- src/metricsDb.test.ts | 145 +++++------------------------------------- src/metricsDb.ts | 56 +++++++--------- src/types.ts | 12 +++- 3 files changed, 48 insertions(+), 165 deletions(-) diff --git a/src/metricsDb.test.ts b/src/metricsDb.test.ts index 525aded..bf4f96d 100644 --- a/src/metricsDb.test.ts +++ b/src/metricsDb.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, beforeEach } from "vitest"; import { MetricsDb } from "./metricsDb"; import { + EmittedMetricPayload, MetricType, type ExportedMetricPayload, type HistogramAggregates, @@ -65,16 +66,12 @@ describe("MetricsDb", () => { }); it("should store a histogram metric", () => { - const metric: ExportedMetricPayload = { + const metric: EmittedMetricPayload = { type: MetricType.HISTOGRAM, name: "test.histogram", value: 150, tags: { endpoint: "/api/users" }, timestamp: Date.now(), - options: { - percentiles: [0.5, 0.95], - aggregates: ["max", "min"] as HistogramAggregates[], - }, }; metricsDb.storeMetric(metric); @@ -84,10 +81,8 @@ describe("MetricsDb", () => { expect(metrics[0]).toEqual({ type: MetricType.HISTOGRAM, name: "test.histogram", - value: [150], + value: [{ value: 150, time: metric.timestamp }], tags: { endpoint: "/api/users" }, - percentiles: [0.5, 0.95], - aggregates: ["max", "min"], lastUpdated: metric.timestamp, }); }); @@ -97,10 +92,6 @@ describe("MetricsDb", () => { type: MetricType.HISTOGRAM, name: "test.histogram", tags: { endpoint: "/api/users" }, - options: { - percentiles: [0.5], - aggregates: ["max"] as HistogramAggregates[], - }, }; metricsDb.storeMetric({ @@ -117,8 +108,7 @@ describe("MetricsDb", () => { expect(metricsDb.getMetricCount()).toBe(1); const metrics = metricsDb.getAllMetrics(); - expect(metrics[0].value).toEqual([100, 200]); - expect(metrics[0].lastUpdated).toBe(2000); + expect(metrics[0].value).toEqual([{ value: 100, time: 1000 }, { value: 200, time: 2000 }]); }); it("should group metric keys based on name, type, and tags", () => { @@ -237,52 +227,7 @@ describe("MetricsDb", () => { name: "test.histogram", value: 100, tags: { endpoint: "/api" }, - timestamp: 1000, - options: { - percentiles: [0.5, 0.95], - }, - }); - - metricsDb.storeMetric({ - type: MetricType.HISTOGRAM, - name: "test.histogram", - value: 200, - tags: { endpoint: "/api" }, - timestamp: 2000, - options: { - percentiles: [0.5, 0.95], - }, - }); - - const payloads = metricsDb.toMetricPayloads(); - - expect(payloads).toHaveLength(2); - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.p50", - value: 100, // 50th percentile of [100, 200] - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.p95", - value: 200, // 95th percentile of [100, 200] - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - }); - - it("should export histogram aggregates as appropriate metric types", () => { - metricsDb.storeMetric({ - type: MetricType.HISTOGRAM, - name: "test.histogram", - value: 100, - tags: { endpoint: "/api" }, - timestamp: 1000, - options: { - aggregates: ["count", "max", "min", "avg"] as HistogramAggregates[], - }, + timestamp: 1000 }); metricsDb.storeMetric({ @@ -290,80 +235,23 @@ describe("MetricsDb", () => { name: "test.histogram", value: 200, tags: { endpoint: "/api" }, - timestamp: 2000, - options: { - aggregates: ["count", "max", "min", "avg"] as HistogramAggregates[], - }, + timestamp: 2000 }); const payloads = metricsDb.toMetricPayloads(); - expect(payloads).toHaveLength(4); - - // Count should be COUNT type - expect(payloads).toContainEqual({ - type: MetricType.COUNT, - name: "test.histogram.count", - value: 2, - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - - // Other aggregates should be GAUGE type - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.max", - value: 200, - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.min", - value: 100, - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.avg", - value: 150, - tags: { endpoint: "/api" }, - timestamp: 2000, - }); - }); - - it("should handle histogram with both percentiles and aggregates", () => { - metricsDb.storeMetric({ + expect(payloads).toHaveLength(1); + expect(payloads[0]).toEqual({ type: MetricType.HISTOGRAM, name: "test.histogram", - value: 100, - tags: { endpoint: "/api" }, - timestamp: 1000, - options: { - percentiles: [0.5], - aggregates: ["count"] as HistogramAggregates[], - }, - }); - - const payloads = metricsDb.toMetricPayloads(); - - expect(payloads).toHaveLength(2); - expect(payloads).toContainEqual({ - type: MetricType.GAUGE, - name: "test.histogram.p50", - value: 100, - tags: { endpoint: "/api" }, - timestamp: 1000, - }); - expect(payloads).toContainEqual({ - type: MetricType.COUNT, - name: "test.histogram.count", - value: 1, + value: [{ + value: 100, + time: 1000, + }, { + value: 200, + time: 2000, + }], tags: { endpoint: "/api" }, - timestamp: 1000, }); }); @@ -372,14 +260,13 @@ describe("MetricsDb", () => { type: MetricType.HISTOGRAM, name: "test.histogram", value: 100, - options: {}, tags: { endpoint: "/api" }, timestamp: 1000, }); const payloads = metricsDb.toMetricPayloads(); - expect(payloads).toHaveLength(0); + expect(payloads).toHaveLength(1); }); }); diff --git a/src/metricsDb.ts b/src/metricsDb.ts index f901be0..b524bcd 100644 --- a/src/metricsDb.ts +++ b/src/metricsDb.ts @@ -5,6 +5,7 @@ import { type ExportedMetricPayload, MetricType, type Tags, + EmittedMetricPayload, } from "./types"; interface BaseStoredMetric { @@ -25,9 +26,10 @@ interface StoredGaugeMetric extends BaseStoredMetric { interface StoredHistogramMetric extends BaseStoredMetric { type: MetricType.HISTOGRAM; - value: number[]; - percentiles?: number[]; - aggregates?: HistogramAggregates[]; + value: { + time: number; + value: number; + }[]; } type StoredMetric = @@ -49,7 +51,7 @@ export class MetricsDb { return `${metric.name}:${metric.type}:${tagKey}`; } - public storeMetric(metric: ExportedMetricPayload): void { + public storeMetric(metric: EmittedMetricPayload): void { const key = this.getMetricKey(metric); const existingMetric = this.metrics.get(key); @@ -82,15 +84,20 @@ export class MetricsDb { case MetricType.HISTOGRAM: { const existingValue = existingMetric - ? (existingMetric.value as number[]) + ? (existingMetric.value as { value: number; time: number }[]) : []; this.metrics.set(key, { type: metric.type, name: metric.name, tags: metric.tags, - percentiles: metric.options?.percentiles, - aggregates: metric.options?.aggregates, - value: [...existingValue, metric.value], + + value: [ + ...existingValue, + { + value: Number(metric.value), + time: metric.timestamp, + }, + ], lastUpdated: metric.timestamp, }); } @@ -98,7 +105,7 @@ export class MetricsDb { } public storeMetrics( - metrics: (MetricPayload & { timestamp: number })[], + metrics: (MetricPayload & { timestamp: number })[] ): void { for (const metric of metrics) { this.storeMetric(metric); @@ -140,30 +147,13 @@ export class MetricsDb { }); break; case MetricType.HISTOGRAM: { - const sortedArray = [...metric.value].sort(); - - for (const percentile of metric.percentiles || []) { - const value = calculatePercentile(sortedArray, percentile); - payloads.push({ - type: MetricType.GAUGE, - name: `${metric.name}.p${Math.round(percentile * 100)}`, - value: value, - tags: metric.tags, - timestamp: metric.lastUpdated, - }); - } - - for (const aggregate of metric.aggregates || []) { - const value = calculateHistogramValue(aggregate, metric.value); - - payloads.push({ - type: aggregate === "count" ? MetricType.COUNT : MetricType.GAUGE, - name: `${metric.name}.${aggregate}`, - value: value, - tags: metric.tags, - timestamp: metric.lastUpdated, - }); - } + payloads.push({ + type: MetricType.HISTOGRAM, + name: metric.name, + value: metric.value, + tags: metric.tags, + }); + break; } } } diff --git a/src/types.ts b/src/types.ts index eed7b39..746f54d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -11,7 +11,7 @@ export type Tags = Record; interface BaseMetricPayload { type: MetricType; name: string; - value: number; + value: unknown; tags: Tags; } @@ -44,7 +44,6 @@ export interface GaugeMetricPayload extends BaseMetricPayload { export interface HistogramMetricPayload extends BaseMetricPayload { type: MetricType.HISTOGRAM; value: number; - options: HistogramOptions; } export type MetricPayload = @@ -52,4 +51,11 @@ export type MetricPayload = | GaugeMetricPayload | HistogramMetricPayload; -export type ExportedMetricPayload = MetricPayload & { timestamp: number }; \ No newline at end of file +export type EmittedMetricPayload = MetricPayload & { timestamp: number }; + +export interface ExportedHistogramPayload extends BaseMetricPayload { + type: MetricType.HISTOGRAM; + value: { time: number, value: number }[]; +} + +export type ExportedMetricPayload = (CountMetricPayload | GaugeMetricPayload) & { timestamp: number } | ExportedHistogramPayload \ No newline at end of file From 1a4986567721873ee6029c015c738b8986260799 Mon Sep 17 00:00:00 2001 From: Ankcorn Date: Mon, 30 Jun 2025 23:11:50 +0100 Subject: [PATCH 5/7] integrate @duncanista's work for datadog distribution types --- src/metrics.test.ts | 8 +------ src/metrics.ts | 22 ----------------- src/metricsDb.test.ts | 4 +--- src/metricsDb.ts | 4 +--- src/sinks/metrics/datadog.ts | 46 +++++++++++++++--------------------- 5 files changed, 22 insertions(+), 62 deletions(-) diff --git a/src/metrics.test.ts b/src/metrics.test.ts index 1bfea91..a04f21c 100644 --- a/src/metrics.test.ts +++ b/src/metrics.test.ts @@ -64,12 +64,7 @@ describe("metrics", () => { describe("histogram", () => { it("should publish histogram metric with options and tags", () => { - const options = { - aggregates: ["max", "min", "avg"] as HistogramOptions["aggregates"], - percentiles: [0.5, 0.95, 0.99], - }; - - metrics.histogram("test.histogram", 150, options, { endpoint: "/api/users" }); + metrics.histogram("test.histogram", 150, { endpoint: "/api/users" }); expect(receivedMessages).toHaveLength(1); expect(receivedMessages[0]).toEqual({ @@ -77,7 +72,6 @@ describe("metrics", () => { name: "test.histogram", value: 150, tags: { endpoint: "/api/users" }, - options, }); }); }); diff --git a/src/metrics.ts b/src/metrics.ts index 47ff92b..b3cd05d 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -3,8 +3,6 @@ import { type CountMetricPayload, type GaugeMetricPayload, type HistogramMetricPayload, - type HistogramOptions, - type DistributionMetricPayload, METRICS_CHANNEL_NAME, MetricType, type Tags, @@ -56,7 +54,6 @@ export function gauge(name: string, value: number, tags: Tags = {}): void { export function histogram( name: string, value: number, - options: HistogramOptions = {}, tags: Tags = {}, ): void { const payload: HistogramMetricPayload = { @@ -64,24 +61,6 @@ export function histogram( name, value, tags, - options, - }; - - metricsChannel.publish(payload); -} - -/** - * Record a distribution metric - * @param name - The metric name - * @param value - The distribution value - * @param tags - Optional tags - */ -export function distribution(name: string, value: number, tags: Tags = {}): void { - const payload: DistributionMetricPayload = { - type: MetricType.DISTRIBUTION, - name, - value, - tags, }; metricsChannel.publish(payload); @@ -91,5 +70,4 @@ export default { count, gauge, histogram, - distribution, } diff --git a/src/metricsDb.test.ts b/src/metricsDb.test.ts index bf4f96d..791de44 100644 --- a/src/metricsDb.test.ts +++ b/src/metricsDb.test.ts @@ -1,10 +1,8 @@ import { describe, it, expect, beforeEach } from "vitest"; import { MetricsDb } from "./metricsDb"; import { - EmittedMetricPayload, + type EmittedMetricPayload, MetricType, - type ExportedMetricPayload, - type HistogramAggregates, } from "./types"; describe("MetricsDb", () => { diff --git a/src/metricsDb.ts b/src/metricsDb.ts index b524bcd..90a1e84 100644 --- a/src/metricsDb.ts +++ b/src/metricsDb.ts @@ -1,11 +1,9 @@ -import { calculateHistogramValue, calculatePercentile } from "./utils/maths"; import { - type HistogramAggregates, type MetricPayload, type ExportedMetricPayload, MetricType, type Tags, - EmittedMetricPayload, + type EmittedMetricPayload, } from "./types"; interface BaseStoredMetric { diff --git a/src/sinks/metrics/datadog.ts b/src/sinks/metrics/datadog.ts index 6e558de..4237b21 100644 --- a/src/sinks/metrics/datadog.ts +++ b/src/sinks/metrics/datadog.ts @@ -46,9 +46,9 @@ export class DatadogMetricSink implements MetricSink { } // @ts-ignore - let site = options?.site || env.DD_SITE || "datadoghq.com"; - let distributionPointsEndpoint = options?.distributionPointsEndpoint || `https://api.${site}/${DISTRIBUTION_POINTS_ENDPOINT_PATH}`; - let metricsSeriesEndpoint = options?.metricsSeriesEndpoint || `https://api.${site}/${METRICS_SERIES_ENDPOINT_PATH}`; + const site = options?.site || env.DD_SITE || "datadoghq.com"; + const distributionPointsEndpoint = options?.distributionPointsEndpoint || `https://api.${site}/${DISTRIBUTION_POINTS_ENDPOINT_PATH}`; + const metricsSeriesEndpoint = options?.metricsSeriesEndpoint || `https://api.${site}/${METRICS_SERIES_ENDPOINT_PATH}`; this.options = { apiKey, @@ -85,31 +85,23 @@ export class DatadogMetricSink implements MetricSink { private transformMetric(payload: ExportedMetricPayload): DatadogMetric { const tags = this.formatTags(payload.tags); - let type; - let value: number | number[]; - switch (payload.type) { - case MetricType.GAUGE: - type = 'gauge'; - value = payload.value; - break; - case MetricType.DISTRIBUTION: - // In Serverless, count and histogram metrics need to be sent as distribution metrics. - // Distributions metrics are stateless by design, no local aggregation is needed. - case MetricType.HISTOGRAM: - case MetricType.COUNT: - type = 'distribution'; - value = [payload.value]; - break; - default: - throw new Error(`Unsupported metric type: ${payload}`); - } + const metricType = payload.type; + + if(metricType === MetricType.HISTOGRAM) { + return { + metric: payload.name, + type: 'distribution', + points: payload.value.map((value) => [Math.floor(value.time / 1000), [value.value]]), + tags: tags, + } + } return { metric: payload.name, - type, - points: [[Math.floor(payload.timestamp / 1000), value]], - tags, - } as DatadogMetric; + type: metricType.toLowerCase(), + points: [[Math.floor(payload.timestamp / 1000), payload.value]], + tags: tags, + }; } /** @@ -130,7 +122,7 @@ export class DatadogMetricSink implements MetricSink { ...customTags } = tags; - let formattedTags = Object.entries(customTags) + const formattedTags = Object.entries(customTags) .filter(([_, value]) => value !== undefined && value !== null) .map( ([key, value]) => `${key}:${value}`, @@ -152,7 +144,7 @@ export class DatadogMetricSink implements MetricSink { formattedTags.push(`trigger:${trigger}`); } - formattedTags.push(`region:earth`); + formattedTags.push("region:earth"); return formattedTags; } From 678df911531dac80c16b624d216bcc5dc5259b17 Mon Sep 17 00:00:00 2001 From: Ankcorn Date: Mon, 30 Jun 2025 23:18:46 +0100 Subject: [PATCH 6/7] fix mistake in sending metrics logic --- src/sinks/metrics/datadog.ts | 120 +++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 47 deletions(-) diff --git a/src/sinks/metrics/datadog.ts b/src/sinks/metrics/datadog.ts index 4237b21..9c01de7 100644 --- a/src/sinks/metrics/datadog.ts +++ b/src/sinks/metrics/datadog.ts @@ -40,15 +40,21 @@ export class DatadogMetricSink implements MetricSink { constructor(options?: DatadogMetricSinkOptions) { // @ts-ignore - let apiKey = options?.apiKey || env.DD_API_KEY || env.DATADOG_API_KEY; + const apiKey = options?.apiKey || env.DD_API_KEY || env.DATADOG_API_KEY; if (!apiKey || apiKey.length === 0) { - console.error("Datadog API key was not found. Provide it in the sink options or set the DD_API_KEY environment variable. Metrics will not be sent to Datadog."); + console.error( + "Datadog API key was not found. Provide it in the sink options or set the DD_API_KEY environment variable. Metrics will not be sent to Datadog." + ); } // @ts-ignore const site = options?.site || env.DD_SITE || "datadoghq.com"; - const distributionPointsEndpoint = options?.distributionPointsEndpoint || `https://api.${site}/${DISTRIBUTION_POINTS_ENDPOINT_PATH}`; - const metricsSeriesEndpoint = options?.metricsSeriesEndpoint || `https://api.${site}/${METRICS_SERIES_ENDPOINT_PATH}`; + const distributionPointsEndpoint = + options?.distributionPointsEndpoint || + `https://api.${site}/${DISTRIBUTION_POINTS_ENDPOINT_PATH}`; + const metricsSeriesEndpoint = + options?.metricsSeriesEndpoint || + `https://api.${site}/${METRICS_SERIES_ENDPOINT_PATH}`; this.options = { apiKey, @@ -65,17 +71,25 @@ export class DatadogMetricSink implements MetricSink { if (!payloads || payloads.length === 0) { return; } - + try { // Filter out worker metrics, since Datadog is currently getting this metrics through an integration // For now, Datadog only accepts custom metrics. - const payloadsWithoutWorkerMetrics = payloads.filter((payload) => !payload.name.startsWith('worker.')); + const payloadsWithoutWorkerMetrics = payloads.filter( + (payload) => !payload.name.startsWith("worker.") + ); - const datadogMetrics = payloadsWithoutWorkerMetrics.map((payload) => this.transformMetric(payload)); + const datadogMetrics = payloadsWithoutWorkerMetrics.map((payload) => + this.transformMetric(payload) + ); await this.sendToDatadog(datadogMetrics); } catch (error) { - throw new Error(`Failed to send metrics to Datadog: ${error instanceof Error ? error.message : String(error)}`); + throw new Error( + `Failed to send metrics to Datadog: ${ + error instanceof Error ? error.message : String(error) + }` + ); } } @@ -84,17 +98,19 @@ export class DatadogMetricSink implements MetricSink { */ private transformMetric(payload: ExportedMetricPayload): DatadogMetric { const tags = this.formatTags(payload.tags); - + const metricType = payload.type; - - if(metricType === MetricType.HISTOGRAM) { + + if (metricType === MetricType.HISTOGRAM) { return { metric: payload.name, - type: 'distribution', - points: payload.value.map((value) => [Math.floor(value.time / 1000), [value.value]]), + type: "distribution", + points: payload.value.map((value) => [ + Math.floor(value.time / 1000), + [value.value], + ]), tags: tags, - } - + }; } return { metric: payload.name, @@ -114,19 +130,12 @@ export class DatadogMetricSink implements MetricSink { * - `region:earth` */ private formatTags(tags: Tags): string[] { - const { - scriptName, - executionModel, - versionId, - trigger, - ...customTags - } = tags; - + const { scriptName, executionModel, versionId, trigger, ...customTags } = + tags; + const formattedTags = Object.entries(customTags) .filter(([_, value]) => value !== undefined && value !== null) - .map( - ([key, value]) => `${key}:${value}`, - ); + .map(([key, value]) => `${key}:${value}`); if (scriptName != null) { formattedTags.push(`worker_script:${scriptName}`); @@ -154,37 +163,55 @@ export class DatadogMetricSink implements MetricSink { */ private async sendToDatadog(metrics: DatadogMetric[]): Promise { if (!this.options.apiKey || this.options.apiKey.length === 0) { - console.warn(`Datadog API key was not found. Dropping ${metrics.length} metrics.`); + console.warn( + `Datadog API key was not found. Dropping ${metrics.length} metrics.` + ); return; } - const distributionMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'distribution'); - // Gauge metrics are sent as metrics series - const gaugeMetrics: DatadogMetric[] = metrics.filter((metric) => metric.type === 'gauge'); - - if (distributionMetrics.length > 0) { - try { - await this.sendDistributionMetrics(distributionMetrics); - } catch (error) { - throw new Error(`Distribution metrics failed to send:\n ${error instanceof Error ? error.message : String(error)}`); - } + const distributionMetrics: DatadogMetric[] = metrics.filter( + (metric) => metric.type === "histogram" + ); + // Other metrics are sent as metrics series + const otherMetrics: DatadogMetric[] = metrics.filter( + (metric) => metric.type !== "histogram" + ); + + const promises = [ + distributionMetrics.length > 0 && + this.sendDistributionMetrics(distributionMetrics), + otherMetrics.length > 0 && this.sendMetricsSeries(otherMetrics), + ].filter(Boolean); + + if (promises.length === 0) { + return; } - if (gaugeMetrics.length > 0) { - try { - await this.sendMetricsSeries(gaugeMetrics); - } catch (error) { - throw new Error(`Gauge metrics failed to send:\n ${error instanceof Error ? error.message : String(error)}`); - } + try { + await Promise.all(promises); + } catch (error) { + throw new Error( + `Failed to send metrics to Datadog: ${ + error instanceof Error ? error.message : String(error) + }` + ); } } private async sendMetricsSeries(metrics: DatadogMetric[]): Promise { - this.postRequest(this.options.metricsSeriesEndpoint, JSON.stringify({ series: metrics })); + this.postRequest( + this.options.metricsSeriesEndpoint, + JSON.stringify({ series: metrics }) + ); } - private async sendDistributionMetrics(metrics: DatadogMetric[]): Promise { - await this.postRequest(this.options.distributionPointsEndpoint, JSON.stringify({ series: metrics })); + private async sendDistributionMetrics( + metrics: DatadogMetric[] + ): Promise { + await this.postRequest( + this.options.distributionPointsEndpoint, + JSON.stringify({ series: metrics }) + ); } private async postRequest(endpoint: string, body: string): Promise { @@ -213,4 +240,3 @@ interface DatadogMetric { points: DatadogPoint[]; tags: string[]; } - From 4ad3604d4d95e8d356fbc3401e202dc881cec244 Mon Sep 17 00:00:00 2001 From: Ankcorn Date: Mon, 30 Jun 2025 23:29:48 +0100 Subject: [PATCH 7/7] use claude to prototype an exponential histogram implementation --- src/sinks/metrics/otel-metrics-types.ts | 30 ++++++++ src/sinks/metrics/otel.ts | 95 +++++++++++++++++++++++-- 2 files changed, 119 insertions(+), 6 deletions(-) diff --git a/src/sinks/metrics/otel-metrics-types.ts b/src/sinks/metrics/otel-metrics-types.ts index d487044..db00cad 100644 --- a/src/sinks/metrics/otel-metrics-types.ts +++ b/src/sinks/metrics/otel-metrics-types.ts @@ -32,12 +32,36 @@ export interface Sum { isMonotonic: boolean; } +export interface ExponentialHistogramDataPoint { + attributes: KeyValue[]; + timeUnixNano: string; + startTimeUnixNano?: string; + count: string; + sum?: number; + scale: number; + zeroCount: string; + positive?: { + offset: number; + bucketCounts: string[]; + }; + negative?: { + offset: number; + bucketCounts: string[]; + }; +} + +export interface ExponentialHistogram { + dataPoints: ExponentialHistogramDataPoint[]; + aggregationTemporality: AggregationTemporality; +} + export interface Metric { name: string; description?: string; unit?: string; gauge?: Gauge; sum?: Sum; + exponentialHistogram?: ExponentialHistogram; } export interface InstrumentationScope { @@ -73,3 +97,9 @@ export function isGaugeMetric( export function isSumMetric(metric: Metric): metric is Metric & { sum: Sum } { return metric.sum !== undefined; } + +export function isExponentialHistogramMetric( + metric: Metric, +): metric is Metric & { exponentialHistogram: ExponentialHistogram } { + return metric.exponentialHistogram !== undefined; +} diff --git a/src/sinks/metrics/otel.ts b/src/sinks/metrics/otel.ts index 89b5e9b..7b9cb97 100644 --- a/src/sinks/metrics/otel.ts +++ b/src/sinks/metrics/otel.ts @@ -44,13 +44,15 @@ export class OtelMetricSink implements MetricSink { await this.exportMetrics(otlpPayload); } catch (error) { throw new Error( - `Failed to send metrics to OTEL collector: ${error instanceof Error ? error.message : String(error)}`, + `Failed to send metrics to OTEL collector: ${ + error instanceof Error ? error.message : String(error) + }` ); } } private buildOTLPPayload( - metrics: ExportedMetricPayload[], + metrics: ExportedMetricPayload[] ): OTLPMetricsPayload { const otlpMetrics: (ResourceMetrics | undefined)[] = metrics.map( (payload) => { @@ -93,7 +95,7 @@ export class OtelMetricSink implements MetricSink { scopeMetrics: [scopeMetrics], }; return resourceMetrics; - }, + } ); return { @@ -117,13 +119,13 @@ export class OtelMetricSink implements MetricSink { if (!response.ok) { const errorText = await response.text(); throw new Error( - `HTTP ${response.status} ${response.statusText}: ${errorText}`, + `HTTP ${response.status} ${response.statusText}: ${errorText}` ); } } private convertTagsToAttributes( - tags?: Record, + tags?: Record ): KeyValue[] { return Object.entries(tags || {}) .filter(([_, value]) => value !== undefined && value !== null) @@ -137,8 +139,23 @@ export class OtelMetricSink implements MetricSink { private payloadToMetric( payload: ExportedMetricPayload, - attributes: KeyValue[], + attributes: KeyValue[] ) { + if (payload.type === MetricType.HISTOGRAM) { + return { + name: payload.name, + exponentialHistogram: { + dataPoints: [ + this.buildExponentialHistogramDataPoint( + payload.value, + attributes + ), + ], + aggregationTemporality: + AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA, + }, + }; + } const timeUnixNano = this.timestampToNanos(payload.timestamp); if (payload.type === MetricType.COUNT) { return { @@ -173,6 +190,72 @@ export class OtelMetricSink implements MetricSink { } } + private buildExponentialHistogramDataPoint( + values: { time: number; value: number }[], + attributes: KeyValue[] + ) { + // Sort values to calculate buckets + const sortedValues = values.map((v) => v.value).sort((a, b) => a - b); + const count = String(sortedValues.length); + const sum = sortedValues.reduce((acc, val) => acc + val, 0); + + // Use scale 0 for simplicity (base-2 exponential buckets) + const scale = 0; + const zeroCount = String(sortedValues.filter((v) => v === 0).length); + + // Build positive buckets for exponential histogram + const positive = this.buildExponentialBuckets( + sortedValues.filter((v) => v > 0) + ); + const negative = this.buildExponentialBuckets( + sortedValues.filter((v) => v < 0).map((v) => Math.abs(v)) + ); + + return { + attributes, + count, + startTimeUnixNano: this.timestampToNanos( + values.length > 0 ? values[0].time : Date.now() + ), + timeUnixNano: this.timestampToNanos( + values.length > 0 ? values[values.length - 1].time : Date.now() + ), + sum, + scale, + zeroCount, + ...(positive.bucketCounts.length > 0 && { positive }), + ...(negative.bucketCounts.length > 0 && { negative }), + }; + } + + private buildExponentialBuckets(values: number[]) { + if (values.length === 0) { + return { offset: 0, bucketCounts: [] }; + } + + // For scale 0, bucket boundaries are powers of 2: [1, 2), [2, 4), [4, 8), etc. + const buckets = new Map(); + + for (const value of values) { + // Calculate bucket index for scale 0: floor(log2(value)) + const bucketIndex = value <= 0 ? 0 : Math.floor(Math.log2(value)); + buckets.set(bucketIndex, (buckets.get(bucketIndex) || 0) + 1); + } + + const minBucket = Math.min(...buckets.keys()); + const maxBucket = Math.max(...buckets.keys()); + const bucketCounts: string[] = []; + + for (let i = minBucket; i <= maxBucket; i++) { + bucketCounts.push(String(buckets.get(i) || 0)); + } + + return { + offset: minBucket, + bucketCounts, + }; + } + private timestampToNanos(timestampMs: number): string { // Convert milliseconds to nanoseconds using BigInt to avoid precision loss return String(BigInt(Math.round(timestampMs)) * BigInt(1000000));