From 36c5bfa3895c84f37f1b2a4e01de5ed4f0050fd0 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:24:32 +0100 Subject: [PATCH 01/27] feat(telemetry): initialize OpenTelemetry metrics provider --- Cargo.lock | 7 +++++++ Cargo.toml | 5 +++-- src/telemetry.rs | 21 ++++++++++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 905a003c..113f68a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3376,6 +3376,12 @@ dependencies = [ "tonic-prost", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.31.0" @@ -4612,6 +4618,7 @@ dependencies = [ "openidconnect", "opentelemetry", "opentelemetry-otlp", + "opentelemetry-semantic-conventions", "opentelemetry_sdk", "password-hash", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml index 83d26619..059d0c28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,8 +88,9 @@ tracing-opentelemetry = "0.32" tracing-actix-web = { version = "0.7", default-features = false, features = ["opentelemetry_0_31"] } tracing-log = "0.2" opentelemetry = "0.31" -opentelemetry_sdk = { version = "0.31", features = ["rt-tokio-current-thread"] } -opentelemetry-otlp = { version = "0.31", features = ["http-proto", "grpc-tonic"] } +opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread"] } +opentelemetry-otlp = { version = "0.31", features = ["http-proto", "grpc-tonic", "metrics"] } +opentelemetry-semantic-conventions = "0.31" [features] diff --git a/src/telemetry.rs b/src/telemetry.rs index 2c11921d..ad12b851 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -10,8 +10,10 @@ use std::env; use std::sync::OnceLock; use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_sdk::metrics::SdkMeterProvider; static TRACER_PROVIDER: OnceLock = OnceLock::new(); +static METER_PROVIDER: OnceLock = OnceLock::new(); /// Initializes logging / tracing. Returns `true` if `OTel` was activated. #[must_use] @@ -35,6 +37,11 @@ pub fn shutdown_telemetry() { eprintln!("Error shutting down tracer provider: {e}"); } } + if let Some(provider) = METER_PROVIDER.get() { + if let Err(e) = provider.shutdown() { + eprintln!("Error shutting down meter provider: {e}"); + } + } } /// Tracing subscriber without `OTel` export — logfmt output only. @@ -71,6 +78,17 @@ fn init_otel_tracing() { global::set_tracer_provider(provider.clone()); let _ = TRACER_PROVIDER.set(provider); + // OTLP Metric exporter + let metric_exporter = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .build() + .expect("Failed to build OTLP metric exporter"); + + let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + global::set_meter_provider(meter_provider.clone()); + let _ = METER_PROVIDER.set(meter_provider.clone()); + let otel_layer = tracing_opentelemetry::layer() .with_tracer(tracer) .with_location(false); @@ -78,7 +96,8 @@ fn init_otel_tracing() { let subscriber = tracing_subscriber::registry() .with(default_env_filter()) .with(logfmt::LogfmtLayer::new()) - .with(otel_layer); + .with(otel_layer) + .with(tracing_opentelemetry::MetricsLayer::new(meter_provider)); set_global_subscriber(subscriber); } From 9d73989d760f634932f56a0b4aef9e6d6d4d803b Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:24:35 +0100 Subject: [PATCH 02/27] feat(metrics): implement and integrate HTTP request duration metrics --- src/webserver/http.rs | 1 + src/webserver/http_metrics.rs | 92 +++++++++++++++++++++++++++++++++++ src/webserver/mod.rs | 1 + 3 files changed, 94 insertions(+) create mode 100644 src/webserver/http_metrics.rs diff --git a/src/webserver/http.rs b/src/webserver/http.rs index 9098b550..b0ea9a8c 100644 --- a/src/webserver/http.rs +++ b/src/webserver/http.rs @@ -553,6 +553,7 @@ pub fn create_app( // when receiving a request outside of the prefix, redirect to the prefix .default_service(fn_service(default_prefix_redirect)) .wrap(OidcMiddleware::new(&app_state)) + .wrap(super::http_metrics::HttpMetrics) .wrap(TracingLogger::::new()) .wrap(default_headers()) .wrap(middleware::Condition::new( diff --git a/src/webserver/http_metrics.rs b/src/webserver/http_metrics.rs new file mode 100644 index 00000000..fe09173d --- /dev/null +++ b/src/webserver/http_metrics.rs @@ -0,0 +1,92 @@ +use std::future::{ready, Ready}; +use std::time::Instant; + +use actix_web::{ + dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, + Error, +}; +use futures_util::future::LocalBoxFuture; +use opentelemetry::{global, KeyValue}; +use opentelemetry::metrics::Histogram; +use tracing_actix_web::root_span_macro::private::{http_method_str, http_scheme}; + +pub struct HttpMetrics; + +impl Transform for HttpMetrics +where + S: Service, Error = Error> + 'static, + S::Future: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Transform = HttpMetricsMiddleware; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + let histogram = global::meter("sqlpage") + .f64_histogram("http.server.request.duration") + .with_unit("s") + .with_description("Duration of HTTP requests processed by the server.") + .build(); + + ready(Ok(HttpMetricsMiddleware { + service, + histogram, + })) + } +} + +pub struct HttpMetricsMiddleware { + service: S, + histogram: Histogram, +} + +impl Service for HttpMetricsMiddleware +where + S: Service, Error = Error> + 'static, + S::Future: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let start_time = Instant::now(); + let method = http_method_str(req.method()).to_string(); + let connection_info = req.connection_info(); + let scheme = http_scheme(connection_info.scheme()).to_string(); + let host = connection_info.host().to_string(); + drop(connection_info); + + // We get the route pattern. In Actix, req.match_pattern() returns the matched route + let route = req.match_pattern().unwrap_or_else(|| req.path().to_string()); + + let histogram = self.histogram.clone(); + let fut = self.service.call(req); + + Box::pin(async move { + let res = fut.await?; + let duration = start_time.elapsed().as_secs_f64(); + let status = res.status().as_u16(); + + let mut attributes = vec![ + KeyValue::new("http.request.method", method), + KeyValue::new("http.response.status_code", status.to_string()), + KeyValue::new("http.route", route), + KeyValue::new("url.scheme", scheme), + KeyValue::new("server.address", host), + ]; + + if status >= 500 { + attributes.push(KeyValue::new("error.type", status.to_string())); + } + + histogram.record(duration, &attributes); + + Ok(res) + }) + } +} diff --git a/src/webserver/mod.rs b/src/webserver/mod.rs index c640970c..1f834974 100644 --- a/src/webserver/mod.rs +++ b/src/webserver/mod.rs @@ -34,6 +34,7 @@ pub mod database; mod error; pub mod error_with_status; pub mod http; +pub mod http_metrics; pub mod http_client; pub mod http_request_info; mod https; From 594392d20411b2222c356d07d647ecfd0b4f54f0 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:24:38 +0100 Subject: [PATCH 03/27] feat(metrics): instrument database query durations --- src/webserver/database/execute_queries.rs | 25 ++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index fed185f5..6082fd64 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -93,10 +93,13 @@ pub fn stream_query_results_with_conn<'a>( request.server_timing.record("bind_params"); let connection = take_connection(&request.app_state.db, db_connection, request).await?; log::trace!("Executing query {:?}", query.sql); + let operation_name = query.sql.split_whitespace().next().unwrap_or("").to_uppercase(); + let db_system_name = request.app_state.db.info.database_type.otel_name(); let query_span = tracing::info_span!( "db.query", db.query.text = query.sql, - db.system.name = request.app_state.db.info.database_type.otel_name(), + db.system.name = db_system_name, + db.operation.name = operation_name, code.file.path = %source_file.display(), code.line.number = source_line_number(stmt.query_position.start.line), otel.status_code = tracing::field::Empty, @@ -105,6 +108,8 @@ pub fn stream_query_results_with_conn<'a>( db.response.returned_rows = tracing::field::Empty, ); record_query_params(&query_span, &query.param_values); + + let start_time = std::time::Instant::now(); let mut stream = connection.fetch_many(query); let mut error = None; let mut returned_rows: i64 = 0; @@ -130,11 +135,29 @@ pub fn stream_query_results_with_conn<'a>( } } drop(stream); + + let duration = start_time.elapsed().as_secs_f64(); + let histogram = opentelemetry::global::meter("sqlpage") + .f64_histogram("db.client.operation.duration") + .with_unit("s") + .with_description("Duration of executing SQL queries.") + .build(); + + let mut attributes = vec![ + opentelemetry::KeyValue::new("db.system.name", db_system_name), + opentelemetry::KeyValue::new("db.operation.name", operation_name), + ]; + if let Some(error) = error { + attributes.push(opentelemetry::KeyValue::new("otel.status_code", "ERROR")); + attributes.push(opentelemetry::KeyValue::new("error.type", error.to_string())); + histogram.record(duration, &attributes); record_db_query_error(&query_span, returned_rows, &error); try_rollback_transaction(connection).await; yield DbItem::Error(error); } else { + attributes.push(opentelemetry::KeyValue::new("otel.status_code", "OK")); + histogram.record(duration, &attributes); record_db_query_success(&query_span, returned_rows); } }, From 03f1b7b71d84b885872be020e86edd73d0b7684d Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:26:05 +0100 Subject: [PATCH 04/27] refactor(database): add OTel name mapping helper --- src/webserver/database/mod.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 950096db..2849c37a 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -63,7 +63,12 @@ impl SupportedDatabase { /// See #[must_use] pub fn otel_name(self) -> &'static str { - match self { + Self::otel_name_from_kind(self) + } + + #[must_use] + pub fn otel_name_from_kind(kind: impl Into) -> &'static str { + match kind.into() { Self::Sqlite => "sqlite", Self::Duckdb => "duckdb", Self::Oracle => "oracle.db", @@ -76,6 +81,18 @@ impl SupportedDatabase { } } +impl From for SupportedDatabase { + fn from(kind: AnyKind) -> Self { + match kind { + AnyKind::Postgres => Self::Postgres, + AnyKind::MySql => Self::MySql, + AnyKind::Sqlite => Self::Sqlite, + AnyKind::Mssql => Self::Mssql, + AnyKind::Odbc => Self::Generic, + } + } +} + pub struct Database { pub connection: sqlx::AnyPool, pub info: DbInfo, From 1692b5c6ee023c153e32ca86d53c71b9873c25b9 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:27:02 +0100 Subject: [PATCH 05/27] feat(metrics): instrument database connection pool --- src/webserver/database/connect.rs | 113 +++++++++++++------------ src/webserver/database/mod.rs | 1 + src/webserver/database/pool_metrics.rs | 47 ++++++++++ 3 files changed, 108 insertions(+), 53 deletions(-) create mode 100644 src/webserver/database/pool_metrics.rs diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index 573df2fa..2ef506c3 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -70,6 +70,7 @@ impl Database { } fn create_pool_options(config: &AppConfig, kind: AnyKind) -> PoolOptions { + let db_system_name = SupportedDatabase::otel_name_from_kind(kind); let mut pool_options = PoolOptions::new() .max_connections(if let Some(max) = config.max_database_pool_connections { max @@ -93,40 +94,49 @@ impl Database { .acquire_timeout(Duration::from_secs_f64( config.database_connection_acquire_timeout_seconds, )); - pool_options = add_on_return_to_pool(config, pool_options); - pool_options = add_on_connection_handler(config, pool_options); + pool_options = add_on_return_to_pool(config, pool_options, db_system_name); + pool_options = add_on_connection_handler(config, pool_options, db_system_name); + pool_options = pool_options.before_acquire(move |_, _| { + super::pool_metrics::on_acquire(db_system_name); + Box::pin(async move { Ok(true) }) + }); pool_options } } -fn add_on_return_to_pool(config: &AppConfig, pool_options: PoolOptions) -> PoolOptions { +fn add_on_return_to_pool( + config: &AppConfig, + pool_options: PoolOptions, + db_system_name: &'static str, +) -> PoolOptions { let on_disconnect_file = config.configuration_directory.join(ON_RESET_FILE); - if !on_disconnect_file.exists() { - log::debug!( - "Not creating a custom SQL connection cleanup handler because {} does not exist", - on_disconnect_file.display() - ); - return pool_options; - } - log::info!( - "Creating a custom SQL connection cleanup handler from {}", - on_disconnect_file.display() - ); - let sql = match std::fs::read_to_string(&on_disconnect_file) { - Ok(sql) => std::sync::Arc::new(sql), - Err(e) => { - log::error!( - "Unable to read the file {}: {}", - on_disconnect_file.display(), - e - ); - return pool_options; + let sql = if on_disconnect_file.exists() { + match std::fs::read_to_string(&on_disconnect_file) { + Ok(sql) => Some(std::sync::Arc::new(sql)), + Err(e) => { + log::error!( + "Unable to read the file {}: {}", + on_disconnect_file.display(), + e + ); + None + } } + } else { + None }; - log::trace!("The custom SQL connection cleanup handler is:\n{sql}"); - let sql = sql.clone(); - pool_options - .after_release(move |conn, meta| on_return_to_pool(conn, meta, std::sync::Arc::clone(&sql))) + + pool_options.after_release(move |conn, meta| { + let sql = sql.clone(); + super::pool_metrics::on_release(db_system_name); + Box::pin(async move { + if let Some(sql) = sql { + on_return_to_pool(conn, meta, sql).await + } else { + Ok(true) + } + }) + }) } fn on_return_to_pool( @@ -152,37 +162,34 @@ fn on_return_to_pool( fn add_on_connection_handler( config: &AppConfig, pool_options: PoolOptions, + db_system_name: &'static str, ) -> PoolOptions { let on_connect_file = config.configuration_directory.join(ON_CONNECT_FILE); - if !on_connect_file.exists() { - log::debug!( - "Not creating a custom SQL database connection handler because {} does not exist", - on_connect_file.display() - ); - return pool_options; - } - log::info!( - "Creating a custom SQL database connection handler from {}", - on_connect_file.display() - ); - let sql = match std::fs::read_to_string(&on_connect_file) { - Ok(sql) => std::sync::Arc::new(sql), - Err(e) => { - log::error!( - "Unable to read the file {}: {}", - on_connect_file.display(), - e - ); - return pool_options; + let sql = if on_connect_file.exists() { + match std::fs::read_to_string(&on_connect_file) { + Ok(sql) => Some(std::sync::Arc::new(sql)), + Err(e) => { + log::error!( + "Unable to read the file {}: {}", + on_connect_file.display(), + e + ); + None + } } + } else { + None }; - log::trace!("The custom SQL database connection handler is:\n{sql}"); - pool_options.after_connect(move |conn, _metadata| { - log::debug!("Running {} on new connection", on_connect_file.display()); - let sql = std::sync::Arc::clone(&sql); + + pool_options.after_connect(move |conn, _| { + let sql = sql.clone(); + super::pool_metrics::on_connect(db_system_name); Box::pin(async move { - let r = conn.execute(sql.as_str()).await?; - log::debug!("Finished running connection handler on new connection: {r:?}"); + if let Some(sql) = sql { + log::debug!("Running connection handler on new connection"); + let r = conn.execute(sql.as_str()).await?; + log::debug!("Finished running connection handler on new connection: {r:?}"); + } Ok(()) }) }) diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 2849c37a..26db4650 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -5,6 +5,7 @@ pub mod execute_queries; pub mod migrations; mod sql; mod sqlpage_functions; +mod pool_metrics; mod syntax_tree; mod error_highlighting; diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs new file mode 100644 index 00000000..77e122bf --- /dev/null +++ b/src/webserver/database/pool_metrics.rs @@ -0,0 +1,47 @@ +use opentelemetry::{global, KeyValue}; +use opentelemetry::metrics::UpDownCounter; + +fn get_counter() -> UpDownCounter { + global::meter("sqlpage") + .i64_up_down_counter("db.client.connection.count") + .with_unit("{connection}") + .with_description("Number of connections in the database pool.") + .build() +} + +pub fn on_acquire(system_name: &'static str) { + let counter = get_counter(); + counter.add(1, &[ + KeyValue::new("db.system.name", system_name), + KeyValue::new("db.client.connection.pool.name", "sqlpage"), + KeyValue::new("db.client.connection.state", "used"), + ]); + counter.add(-1, &[ + KeyValue::new("db.system.name", system_name), + KeyValue::new("db.client.connection.pool.name", "sqlpage"), + KeyValue::new("db.client.connection.state", "idle"), + ]); +} + +pub fn on_release(system_name: &'static str) { + let counter = get_counter(); + counter.add(-1, &[ + KeyValue::new("db.system.name", system_name), + KeyValue::new("db.client.connection.pool.name", "sqlpage"), + KeyValue::new("db.client.connection.state", "used"), + ]); + counter.add(1, &[ + KeyValue::new("db.system.name", system_name), + KeyValue::new("db.client.connection.pool.name", "sqlpage"), + KeyValue::new("db.client.connection.state", "idle"), + ]); +} + +pub fn on_connect(system_name: &'static str) { + let counter = get_counter(); + counter.add(1, &[ + KeyValue::new("db.system.name", system_name), + KeyValue::new("db.client.connection.pool.name", "sqlpage"), + KeyValue::new("db.client.connection.state", "idle"), + ]); +} From feb0a7715ccb2ac4f361714a084cd5cd8a0dda19 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:27:08 +0100 Subject: [PATCH 06/27] feat(telemetry): update OTel collector to receive OTLP metrics --- examples/telemetry/otel-collector.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/telemetry/otel-collector.yaml b/examples/telemetry/otel-collector.yaml index 032d2919..e1deba27 100644 --- a/examples/telemetry/otel-collector.yaml +++ b/examples/telemetry/otel-collector.yaml @@ -122,7 +122,7 @@ service: processors: [batch] exporters: [otlp_grpc/tempo] metrics: - receivers: [postgresql] + receivers: [otlp, postgresql] processors: [batch] exporters: [prometheus] logs/sqlpage: From dc98ea3d16c50c311b9a68c27bd16701976f1f93 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:27:27 +0100 Subject: [PATCH 07/27] feat(telemetry): add metrics panels to Grafana dashboard --- examples/telemetry/grafana/sqlpage-home.json | 103 +++++++++++++------ 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/examples/telemetry/grafana/sqlpage-home.json b/examples/telemetry/grafana/sqlpage-home.json index 6c19ad36..60c0900a 100644 --- a/examples/telemetry/grafana/sqlpage-home.json +++ b/examples/telemetry/grafana/sqlpage-home.json @@ -39,7 +39,7 @@ "showLineNumbers": false, "showMiniMap": false }, - "content": "

Recent SQLPage traces, logs, and PostgreSQL metrics

Open http://localhost and interact with the app. New requests will appear here automatically.

The trace table shows recent requests. Click any trace ID to open the full span waterfall in Grafana. PostgreSQL slow-query explain plans appear in the PostgreSQL Logs panel and link back to the same trace via the extracted trace ID. The metrics panels come from the OpenTelemetry PostgreSQL receiver via Prometheus.

", + "content": "

SQLPage Observability

Open http://localhost and interact with the app. New requests will appear here automatically.

This dashboard shows traces, logs, and application metrics exported by SQLPage. Trace waterfalls link to PostgreSQL logs via trace IDs. Metrics include HTTP durations, DB query latencies, and connection pool states.

", "mode": "html" }, "pluginVersion": "12.4.0", @@ -54,7 +54,39 @@ "fieldConfig": { "defaults": { "color": { - "mode": "thresholds" + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 2, + "pointSize": 4, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } }, "mappings": [], "thresholds": { @@ -63,38 +95,31 @@ { "color": "green", "value": null - }, - { - "color": "orange", - "value": 10 } ] }, - "unit": "none" + "unit": "s" }, "overrides": [] }, "gridPos": { - "h": 4, + "h": 8, "w": 12, "x": 0, "y": 4 }, - "id": 4, + "id": 10, "options": { - "colorMode": "value", - "graphMode": "none", - "justifyMode": "auto", - "orientation": "auto", - "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": ["lastNotNull"], - "fields": "", - "values": false + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true }, - "showPercentChange": false, - "textMode": "auto", - "wideLayout": true + "tooltip": { + "mode": "single", + "sort": "none" + } }, "pluginVersion": "12.4.0", "targets": [ @@ -103,12 +128,22 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "sum(postgresql_backends)", + "expr": "histogram_quantile(0.95, sum(rate(http_server_request_duration_seconds_bucket[5m])) by (le, http_route))", + "legendFormat": "HTTP P95 {{http_route}}", "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(db_client_operation_duration_seconds_bucket[5m])) by (le, db_operation_name))", + "legendFormat": "DB P95 {{db_operation_name}}", + "refId": "B" } ], - "title": "PostgreSQL Backends", - "type": "stat" + "title": "Request & Query Latency (P95)", + "type": "timeseries" }, { "datasource": { @@ -162,17 +197,17 @@ } ] }, - "unit": "bytes" + "unit": "none" }, "overrides": [] }, "gridPos": { - "h": 4, + "h": 8, "w": 12, "x": 12, "y": 4 }, - "id": 5, + "id": 11, "options": { "legend": { "calcs": [], @@ -192,12 +227,12 @@ "type": "prometheus", "uid": "prometheus" }, - "expr": "sum(postgresql_db_size) by (postgresql_database_name)", - "legendFormat": "{{postgresql_database_name}}", + "expr": "sum(db_client_connection_count) by (db_client_connection_state)", + "legendFormat": "{{db_client_connection_state}}", "refId": "A" } ], - "title": "PostgreSQL Database Size", + "title": "SQLPage DB Connection Pool", "type": "timeseries" }, { @@ -279,10 +314,10 @@ ] }, "gridPos": { - "h": 12, + "h": 8, "w": 24, "x": 0, - "y": 8 + "y": 12 }, "id": 2, "options": { @@ -428,7 +463,7 @@ "refresh": "5s", "schemaVersion": 41, "style": "dark", - "tags": ["sqlpage", "tracing", "logs"], + "tags": ["sqlpage", "tracing", "logs", "metrics"], "templating": { "list": [] }, @@ -440,5 +475,5 @@ "timezone": "browser", "title": "SQLPage Observability Home", "uid": "sqlpage-tracing-home", - "version": 5 + "version": 6 } From e5117f4e357edb0e11c9d0d8aa65e3926ea5afec Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:31:33 +0100 Subject: [PATCH 08/27] refactor(metrics): clean up database instrumentation with helper functions --- src/webserver/database/execute_queries.rs | 109 ++++++++++++---------- 1 file changed, 59 insertions(+), 50 deletions(-) diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index 6082fd64..d58ac66a 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -46,16 +46,58 @@ fn source_line_number(line: usize) -> i64 { i64::try_from(line).unwrap_or(i64::MAX) } -fn record_db_query_success(span: &tracing::Span, returned_rows: i64) { +fn record_db_query_success(span: &tracing::Span, returned_rows: i64, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { span.record("db.response.returned_rows", returned_rows); span.record("otel.status_code", "OK"); + let duration = start_time.elapsed().as_secs_f64(); + let histogram = opentelemetry::global::meter("sqlpage") + .f64_histogram("db.client.operation.duration") + .with_unit("s") + .with_description("Duration of executing SQL queries.") + .build(); + let attributes = [ + opentelemetry::KeyValue::new("db.system.name", db_system_name), + opentelemetry::KeyValue::new("db.operation.name", operation_name), + opentelemetry::KeyValue::new("otel.status_code", "OK"), + ]; + histogram.record(duration, &attributes); } -fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyhow::Error) { +fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyhow::Error, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { span.record("db.response.returned_rows", returned_rows); span.record("otel.status_code", "ERROR"); span.record("exception.message", tracing::field::display(error)); span.record("exception.details", tracing::field::debug(error)); + let duration = start_time.elapsed().as_secs_f64(); + let histogram = opentelemetry::global::meter("sqlpage") + .f64_histogram("db.client.operation.duration") + .with_unit("s") + .with_description("Duration of executing SQL queries.") + .build(); + let attributes = [ + opentelemetry::KeyValue::new("db.system.name", db_system_name), + opentelemetry::KeyValue::new("db.operation.name", operation_name), + opentelemetry::KeyValue::new("otel.status_code", "ERROR"), + opentelemetry::KeyValue::new("error.type", error.to_string()), + ]; + histogram.record(duration, &attributes); +} + +fn create_db_query_span(sql: &str, source_file: &Path, line: usize, db_system_name: &'static str) -> (tracing::Span, String) { + let operation_name = sql.split_whitespace().next().unwrap_or("").to_uppercase(); + let span = tracing::info_span!( + "db.query", + db.query.text = sql, + db.system.name = db_system_name, + db.operation.name = operation_name, + code.file.path = %source_file.display(), + code.line.number = source_line_number(line), + otel.status_code = tracing::field::Empty, + exception.message = tracing::field::Empty, + exception.details = tracing::field::Empty, + db.response.returned_rows = tracing::field::Empty, + ); + (span, operation_name) } impl Database { @@ -93,20 +135,8 @@ pub fn stream_query_results_with_conn<'a>( request.server_timing.record("bind_params"); let connection = take_connection(&request.app_state.db, db_connection, request).await?; log::trace!("Executing query {:?}", query.sql); - let operation_name = query.sql.split_whitespace().next().unwrap_or("").to_uppercase(); let db_system_name = request.app_state.db.info.database_type.otel_name(); - let query_span = tracing::info_span!( - "db.query", - db.query.text = query.sql, - db.system.name = db_system_name, - db.operation.name = operation_name, - code.file.path = %source_file.display(), - code.line.number = source_line_number(stmt.query_position.start.line), - otel.status_code = tracing::field::Empty, - exception.message = tracing::field::Empty, - exception.details = tracing::field::Empty, - db.response.returned_rows = tracing::field::Empty, - ); + let (query_span, operation_name) = create_db_query_span(&query.sql, source_file, stmt.query_position.start.line, db_system_name); record_query_params(&query_span, &query.param_values); let start_time = std::time::Instant::now(); @@ -135,30 +165,12 @@ pub fn stream_query_results_with_conn<'a>( } } drop(stream); - - let duration = start_time.elapsed().as_secs_f64(); - let histogram = opentelemetry::global::meter("sqlpage") - .f64_histogram("db.client.operation.duration") - .with_unit("s") - .with_description("Duration of executing SQL queries.") - .build(); - - let mut attributes = vec![ - opentelemetry::KeyValue::new("db.system.name", db_system_name), - opentelemetry::KeyValue::new("db.operation.name", operation_name), - ]; - if let Some(error) = error { - attributes.push(opentelemetry::KeyValue::new("otel.status_code", "ERROR")); - attributes.push(opentelemetry::KeyValue::new("error.type", error.to_string())); - histogram.record(duration, &attributes); - record_db_query_error(&query_span, returned_rows, &error); + record_db_query_error(&query_span, returned_rows, &error, start_time, db_system_name, operation_name); try_rollback_transaction(connection).await; yield DbItem::Error(error); } else { - attributes.push(opentelemetry::KeyValue::new("otel.status_code", "OK")); - histogram.record(duration, &attributes); - record_db_query_success(&query_span, returned_rows); + record_db_query_success(&query_span, returned_rows, start_time, db_system_name, operation_name); } }, ParsedStatement::SetVariable { variable, value} => { @@ -294,35 +306,32 @@ async fn execute_set_variable_query<'a>( query.sql ); - let query_span = tracing::info_span!( - "db.query", - db.query.text = query.sql, - db.system.name = request.app_state.db.info.database_type.otel_name(), - code.file.path = %source_file.display(), - code.line.number = source_line_number(statement.query_position.start.line), - otel.status_code = tracing::field::Empty, - exception.message = tracing::field::Empty, - exception.details = tracing::field::Empty, - db.response.returned_rows = tracing::field::Empty, + let db_system_name = request.app_state.db.info.database_type.otel_name(); + let (query_span, operation_name) = create_db_query_span( + &query.sql, + source_file, + statement.query_position.start.line, + db_system_name, ); record_query_params(&query_span, &query.param_values); + let start_time = std::time::Instant::now(); let value = match connection .fetch_optional(query) .instrument(query_span.clone()) .await { Ok(Some(row)) => { - record_db_query_success(&query_span, 1_i64); + record_db_query_success(&query_span, 1_i64, start_time, db_system_name, operation_name); row_to_string(&row) } Ok(None) => { - record_db_query_success(&query_span, 0_i64); + record_db_query_success(&query_span, 0_i64, start_time, db_system_name, operation_name); None } Err(e) => { try_rollback_transaction(connection).await; let err = display_stmt_db_error(source_file, statement, e); - record_db_query_error(&query_span, 0_i64, &err); + record_db_query_error(&query_span, 0_i64, &err, start_time, db_system_name, operation_name); return Err(err); } }; @@ -821,7 +830,7 @@ mod tests { exception.details = tracing::field::Empty, db.response.returned_rows = tracing::field::Empty, ); - record_db_query_success(&span, 3); + record_db_query_success(&span, 3, std::time::Instant::now(), "sqlite", "SELECT".to_string()); drop(span); }); @@ -842,7 +851,7 @@ mod tests { db.response.returned_rows = tracing::field::Empty, ); let error = anyhow!("query failed").context("while executing SELECT 1"); - record_db_query_error(&span, 2, &error); + record_db_query_error(&span, 2, &error, std::time::Instant::now(), "sqlite", "SELECT".to_string()); drop(span); }); From 9d969d43801773c63bc19eaabe0a75ef44014ed7 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:35:15 +0100 Subject: [PATCH 09/27] fix(metrics): ensure correct db.system.name for ODBC connections by reusing discovery logic --- src/webserver/database/connect.rs | 35 ++++++++++++++------------ src/webserver/database/mod.rs | 11 ++++++++ src/webserver/database/pool_metrics.rs | 19 ++++++++------ 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index 2ef506c3..2725b378 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -10,7 +10,7 @@ use anyhow::Context; use futures_util::future::BoxFuture; use sqlx::odbc::OdbcConnectOptions; use sqlx::{ - any::{Any, AnyConnectOptions, AnyKind}, + any::{Any, AnyConnectOptions, AnyKind, AnyConnection}, pool::PoolOptions, sqlite::{Function, SqliteConnectOptions, SqliteFunctionCtx}, ConnectOptions, Connection, Executor, @@ -38,12 +38,9 @@ impl Database { set_custom_connect_options(&mut connect_options, config); log::debug!("Connecting to database: {database_url}"); let mut retries = config.database_connection_retries; - let db_kind = connect_options.kind(); - let pool = loop { - match Self::create_pool_options(config, db_kind) - .connect_with(connect_options.clone()) - .await - { + + let mut conn: AnyConnection = loop { + match AnyConnection::connect_with(&connect_options).await { Ok(c) => break c, Err(e) => { if retries == 0 { @@ -56,8 +53,17 @@ impl Database { } } }; - let dbms_name: String = pool.acquire().await?.dbms_name().await?; + let dbms_name: String = conn.dbms_name().await?; let database_type = SupportedDatabase::from_dbms_name(&dbms_name); + super::set_discovered_db_type(database_type); + drop(conn); + + let db_kind = connect_options.kind(); + let pool = Self::create_pool_options(config, db_kind) + .connect_with(connect_options) + .await + .with_context(|| format!("Unable to open connection pool to {database_url}"))?; + log::debug!("Initialized {dbms_name:?} database pool: {pool:#?}"); Ok(Database { connection: pool, @@ -70,7 +76,6 @@ impl Database { } fn create_pool_options(config: &AppConfig, kind: AnyKind) -> PoolOptions { - let db_system_name = SupportedDatabase::otel_name_from_kind(kind); let mut pool_options = PoolOptions::new() .max_connections(if let Some(max) = config.max_database_pool_connections { max @@ -94,10 +99,10 @@ impl Database { .acquire_timeout(Duration::from_secs_f64( config.database_connection_acquire_timeout_seconds, )); - pool_options = add_on_return_to_pool(config, pool_options, db_system_name); - pool_options = add_on_connection_handler(config, pool_options, db_system_name); + pool_options = add_on_return_to_pool(config, pool_options); + pool_options = add_on_connection_handler(config, pool_options); pool_options = pool_options.before_acquire(move |_, _| { - super::pool_metrics::on_acquire(db_system_name); + super::pool_metrics::on_acquire(); Box::pin(async move { Ok(true) }) }); pool_options @@ -107,7 +112,6 @@ impl Database { fn add_on_return_to_pool( config: &AppConfig, pool_options: PoolOptions, - db_system_name: &'static str, ) -> PoolOptions { let on_disconnect_file = config.configuration_directory.join(ON_RESET_FILE); let sql = if on_disconnect_file.exists() { @@ -128,7 +132,7 @@ fn add_on_return_to_pool( pool_options.after_release(move |conn, meta| { let sql = sql.clone(); - super::pool_metrics::on_release(db_system_name); + super::pool_metrics::on_release(); Box::pin(async move { if let Some(sql) = sql { on_return_to_pool(conn, meta, sql).await @@ -162,7 +166,6 @@ fn on_return_to_pool( fn add_on_connection_handler( config: &AppConfig, pool_options: PoolOptions, - db_system_name: &'static str, ) -> PoolOptions { let on_connect_file = config.configuration_directory.join(ON_CONNECT_FILE); let sql = if on_connect_file.exists() { @@ -183,7 +186,7 @@ fn add_on_connection_handler( pool_options.after_connect(move |conn, _| { let sql = sql.clone(); - super::pool_metrics::on_connect(db_system_name); + super::pool_metrics::on_connect(); Box::pin(async move { if let Some(sql) = sql { log::debug!("Running connection handler on new connection"); diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 26db4650..16ad7f23 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -99,6 +99,17 @@ pub struct Database { pub info: DbInfo, } +static DB_TYPE: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub fn set_discovered_db_type(db_type: SupportedDatabase) { + let _ = DB_TYPE.set(db_type); +} + +#[must_use] +pub fn get_discovered_db_type() -> SupportedDatabase { + DB_TYPE.get().copied().unwrap_or(SupportedDatabase::Generic) +} + #[derive(Debug, Clone)] pub struct DbInfo { pub dbms_name: String, diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs index 77e122bf..dc3530c0 100644 --- a/src/webserver/database/pool_metrics.rs +++ b/src/webserver/database/pool_metrics.rs @@ -9,38 +9,41 @@ fn get_counter() -> UpDownCounter { .build() } -pub fn on_acquire(system_name: &'static str) { +pub fn on_acquire() { let counter = get_counter(); + let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(1, &[ - KeyValue::new("db.system.name", system_name), + KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), KeyValue::new("db.client.connection.state", "used"), ]); counter.add(-1, &[ - KeyValue::new("db.system.name", system_name), + KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), KeyValue::new("db.client.connection.state", "idle"), ]); } -pub fn on_release(system_name: &'static str) { +pub fn on_release() { let counter = get_counter(); + let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(-1, &[ - KeyValue::new("db.system.name", system_name), + KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), KeyValue::new("db.client.connection.state", "used"), ]); counter.add(1, &[ - KeyValue::new("db.system.name", system_name), + KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), KeyValue::new("db.client.connection.state", "idle"), ]); } -pub fn on_connect(system_name: &'static str) { +pub fn on_connect() { let counter = get_counter(); + let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(1, &[ - KeyValue::new("db.system.name", system_name), + KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), KeyValue::new("db.client.connection.state", "idle"), ]); From 4ec275a69c16e405b16c75b38b33b39a46bed92a Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:37:16 +0100 Subject: [PATCH 10/27] fix(metrics): remove global OnceLock and pass database type explicitly to pool callbacks --- src/webserver/database/connect.rs | 18 ++++++++++-------- src/webserver/database/mod.rs | 11 ----------- src/webserver/database/pool_metrics.rs | 9 +++------ 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index 2725b378..b392fc02 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -55,11 +55,10 @@ impl Database { }; let dbms_name: String = conn.dbms_name().await?; let database_type = SupportedDatabase::from_dbms_name(&dbms_name); - super::set_discovered_db_type(database_type); drop(conn); let db_kind = connect_options.kind(); - let pool = Self::create_pool_options(config, db_kind) + let pool = Self::create_pool_options(config, db_kind, database_type) .connect_with(connect_options) .await .with_context(|| format!("Unable to open connection pool to {database_url}"))?; @@ -75,7 +74,7 @@ impl Database { }) } - fn create_pool_options(config: &AppConfig, kind: AnyKind) -> PoolOptions { + fn create_pool_options(config: &AppConfig, kind: AnyKind, database_type: SupportedDatabase) -> PoolOptions { let mut pool_options = PoolOptions::new() .max_connections(if let Some(max) = config.max_database_pool_connections { max @@ -99,10 +98,11 @@ impl Database { .acquire_timeout(Duration::from_secs_f64( config.database_connection_acquire_timeout_seconds, )); - pool_options = add_on_return_to_pool(config, pool_options); - pool_options = add_on_connection_handler(config, pool_options); + let db_system_name = database_type.otel_name(); + pool_options = add_on_return_to_pool(config, pool_options, db_system_name); + pool_options = add_on_connection_handler(config, pool_options, db_system_name); pool_options = pool_options.before_acquire(move |_, _| { - super::pool_metrics::on_acquire(); + super::pool_metrics::on_acquire(db_system_name); Box::pin(async move { Ok(true) }) }); pool_options @@ -112,6 +112,7 @@ impl Database { fn add_on_return_to_pool( config: &AppConfig, pool_options: PoolOptions, + db_system_name: &'static str, ) -> PoolOptions { let on_disconnect_file = config.configuration_directory.join(ON_RESET_FILE); let sql = if on_disconnect_file.exists() { @@ -132,7 +133,7 @@ fn add_on_return_to_pool( pool_options.after_release(move |conn, meta| { let sql = sql.clone(); - super::pool_metrics::on_release(); + super::pool_metrics::on_release(db_system_name); Box::pin(async move { if let Some(sql) = sql { on_return_to_pool(conn, meta, sql).await @@ -166,6 +167,7 @@ fn on_return_to_pool( fn add_on_connection_handler( config: &AppConfig, pool_options: PoolOptions, + db_system_name: &'static str, ) -> PoolOptions { let on_connect_file = config.configuration_directory.join(ON_CONNECT_FILE); let sql = if on_connect_file.exists() { @@ -186,7 +188,7 @@ fn add_on_connection_handler( pool_options.after_connect(move |conn, _| { let sql = sql.clone(); - super::pool_metrics::on_connect(); + super::pool_metrics::on_connect(db_system_name); Box::pin(async move { if let Some(sql) = sql { log::debug!("Running connection handler on new connection"); diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 16ad7f23..26db4650 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -99,17 +99,6 @@ pub struct Database { pub info: DbInfo, } -static DB_TYPE: std::sync::OnceLock = std::sync::OnceLock::new(); - -pub fn set_discovered_db_type(db_type: SupportedDatabase) { - let _ = DB_TYPE.set(db_type); -} - -#[must_use] -pub fn get_discovered_db_type() -> SupportedDatabase { - DB_TYPE.get().copied().unwrap_or(SupportedDatabase::Generic) -} - #[derive(Debug, Clone)] pub struct DbInfo { pub dbms_name: String, diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs index dc3530c0..562c7fe9 100644 --- a/src/webserver/database/pool_metrics.rs +++ b/src/webserver/database/pool_metrics.rs @@ -9,9 +9,8 @@ fn get_counter() -> UpDownCounter { .build() } -pub fn on_acquire() { +pub fn on_acquire(db_system_name: &'static str) { let counter = get_counter(); - let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(1, &[ KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), @@ -24,9 +23,8 @@ pub fn on_acquire() { ]); } -pub fn on_release() { +pub fn on_release(db_system_name: &'static str) { let counter = get_counter(); - let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(-1, &[ KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), @@ -39,9 +37,8 @@ pub fn on_release() { ]); } -pub fn on_connect() { +pub fn on_connect(db_system_name: &'static str) { let counter = get_counter(); - let db_system_name = super::get_discovered_db_type().otel_name(); counter.add(1, &[ KeyValue::new("db.system.name", db_system_name), KeyValue::new("db.client.connection.pool.name", "sqlpage"), From 43368e6852fc33a9f51dd4ed91fb98c5e325f8d9 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 17:57:05 +0100 Subject: [PATCH 11/27] refactor(metrics): use OpenTelemetry semantic convention constants --- src/webserver/database/execute_queries.rs | 56 ++++++++++++++--------- src/webserver/database/pool_metrics.rs | 31 +++++++------ src/webserver/http_metrics.rs | 13 +++--- 3 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index d58ac66a..a5cacdb6 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -46,9 +46,23 @@ fn source_line_number(line: usize) -> i64 { i64::try_from(line).unwrap_or(i64::MAX) } +use opentelemetry_semantic_conventions::attribute as otel; + +fn record_query_params(span: &tracing::Span, params: &[Option]) { + use tracing_opentelemetry::OpenTelemetrySpanExt; + for (idx, value) in params.iter().enumerate() { + let key = opentelemetry::Key::new(format!("{}.{idx}", otel::DB_QUERY_PARAMETER)); + let otel_value = match value { + Some(v) => opentelemetry::Value::String(v.clone().into()), + None => opentelemetry::Value::String("NULL".into()), + }; + span.set_attribute(key, otel_value); + } +} + fn record_db_query_success(span: &tracing::Span, returned_rows: i64, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { - span.record("db.response.returned_rows", returned_rows); - span.record("otel.status_code", "OK"); + span.record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); + span.record(otel::OTEL_STATUS_CODE, "OK"); let duration = start_time.elapsed().as_secs_f64(); let histogram = opentelemetry::global::meter("sqlpage") .f64_histogram("db.client.operation.duration") @@ -56,17 +70,17 @@ fn record_db_query_success(span: &tracing::Span, returned_rows: i64, start_time: .with_description("Duration of executing SQL queries.") .build(); let attributes = [ - opentelemetry::KeyValue::new("db.system.name", db_system_name), - opentelemetry::KeyValue::new("db.operation.name", operation_name), - opentelemetry::KeyValue::new("otel.status_code", "OK"), + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, operation_name), + opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "OK"), ]; histogram.record(duration, &attributes); } fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyhow::Error, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { - span.record("db.response.returned_rows", returned_rows); - span.record("otel.status_code", "ERROR"); - span.record("exception.message", tracing::field::display(error)); + span.record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); + span.record(otel::OTEL_STATUS_CODE, "ERROR"); + span.record(otel::EXCEPTION_MESSAGE, tracing::field::display(error)); span.record("exception.details", tracing::field::debug(error)); let duration = start_time.elapsed().as_secs_f64(); let histogram = opentelemetry::global::meter("sqlpage") @@ -75,10 +89,10 @@ fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyho .with_description("Duration of executing SQL queries.") .build(); let attributes = [ - opentelemetry::KeyValue::new("db.system.name", db_system_name), - opentelemetry::KeyValue::new("db.operation.name", operation_name), - opentelemetry::KeyValue::new("otel.status_code", "ERROR"), - opentelemetry::KeyValue::new("error.type", error.to_string()), + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, operation_name), + opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "ERROR"), + opentelemetry::KeyValue::new(otel::ERROR_TYPE, error.to_string()), ]; histogram.record(duration, &attributes); } @@ -87,15 +101,15 @@ fn create_db_query_span(sql: &str, source_file: &Path, line: usize, db_system_na let operation_name = sql.split_whitespace().next().unwrap_or("").to_uppercase(); let span = tracing::info_span!( "db.query", - db.query.text = sql, - db.system.name = db_system_name, - db.operation.name = operation_name, - code.file.path = %source_file.display(), - code.line.number = source_line_number(line), - otel.status_code = tracing::field::Empty, - exception.message = tracing::field::Empty, - exception.details = tracing::field::Empty, - db.response.returned_rows = tracing::field::Empty, + { otel::DB_QUERY_TEXT } = sql, + { otel::DB_SYSTEM_NAME } = db_system_name, + { otel::DB_OPERATION_NAME } = operation_name, + { otel::CODE_FILE_PATH } = %source_file.display(), + { otel::CODE_LINE_NUMBER } = source_line_number(line), + { otel::OTEL_STATUS_CODE } = tracing::field::Empty, + { otel::EXCEPTION_MESSAGE } = tracing::field::Empty, + "exception.details" = tracing::field::Empty, + { otel::DB_RESPONSE_RETURNED_ROWS } = tracing::field::Empty, ); (span, operation_name) } diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs index 562c7fe9..1f9d12f3 100644 --- a/src/webserver/database/pool_metrics.rs +++ b/src/webserver/database/pool_metrics.rs @@ -1,5 +1,6 @@ use opentelemetry::{global, KeyValue}; use opentelemetry::metrics::UpDownCounter; +use opentelemetry_semantic_conventions::attribute as otel; fn get_counter() -> UpDownCounter { global::meter("sqlpage") @@ -12,36 +13,36 @@ fn get_counter() -> UpDownCounter { pub fn on_acquire(db_system_name: &'static str) { let counter = get_counter(); counter.add(1, &[ - KeyValue::new("db.system.name", db_system_name), - KeyValue::new("db.client.connection.pool.name", "sqlpage"), - KeyValue::new("db.client.connection.state", "used"), + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), ]); counter.add(-1, &[ - KeyValue::new("db.system.name", db_system_name), - KeyValue::new("db.client.connection.pool.name", "sqlpage"), - KeyValue::new("db.client.connection.state", "idle"), + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), ]); } pub fn on_release(db_system_name: &'static str) { let counter = get_counter(); counter.add(-1, &[ - KeyValue::new("db.system.name", db_system_name), - KeyValue::new("db.client.connection.pool.name", "sqlpage"), - KeyValue::new("db.client.connection.state", "used"), + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), ]); counter.add(1, &[ - KeyValue::new("db.system.name", db_system_name), - KeyValue::new("db.client.connection.pool.name", "sqlpage"), - KeyValue::new("db.client.connection.state", "idle"), + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), ]); } pub fn on_connect(db_system_name: &'static str) { let counter = get_counter(); counter.add(1, &[ - KeyValue::new("db.system.name", db_system_name), - KeyValue::new("db.client.connection.pool.name", "sqlpage"), - KeyValue::new("db.client.connection.state", "idle"), + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), ]); } diff --git a/src/webserver/http_metrics.rs b/src/webserver/http_metrics.rs index fe09173d..4b0a6e31 100644 --- a/src/webserver/http_metrics.rs +++ b/src/webserver/http_metrics.rs @@ -8,6 +8,7 @@ use actix_web::{ use futures_util::future::LocalBoxFuture; use opentelemetry::{global, KeyValue}; use opentelemetry::metrics::Histogram; +use opentelemetry_semantic_conventions::attribute as otel; use tracing_actix_web::root_span_macro::private::{http_method_str, http_scheme}; pub struct HttpMetrics; @@ -73,15 +74,15 @@ where let status = res.status().as_u16(); let mut attributes = vec![ - KeyValue::new("http.request.method", method), - KeyValue::new("http.response.status_code", status.to_string()), - KeyValue::new("http.route", route), - KeyValue::new("url.scheme", scheme), - KeyValue::new("server.address", host), + KeyValue::new(otel::HTTP_REQUEST_METHOD, method), + KeyValue::new(otel::HTTP_RESPONSE_STATUS_CODE, status.to_string()), + KeyValue::new(otel::HTTP_ROUTE, route), + KeyValue::new(otel::URL_SCHEME, scheme), + KeyValue::new(otel::SERVER_ADDRESS, host), ]; if status >= 500 { - attributes.push(KeyValue::new("error.type", status.to_string())); + attributes.push(KeyValue::new(otel::ERROR_TYPE, status.to_string())); } histogram.record(duration, &attributes); From 461f348722b52b5d1ce3f6e31bcb128f626b3b74 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:02:22 +0100 Subject: [PATCH 12/27] refactor(metrics): eliminate all OTel convention string literals in favor of constants --- Cargo.toml | 2 +- src/telemetry.rs | 17 +++++----- src/webserver/database/execute_queries.rs | 29 +++++----------- src/webserver/database/pool_metrics.rs | 3 +- .../database/sqlpage_functions/functions.rs | 29 ++++++++-------- src/webserver/http.rs | 33 ++++++++++--------- src/webserver/http_metrics.rs | 3 +- src/webserver/oidc.rs | 5 +-- 8 files changed, 58 insertions(+), 63 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 059d0c28..0336cd84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ tracing-log = "0.2" opentelemetry = "0.31" opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread"] } opentelemetry-otlp = { version = "0.31", features = ["http-proto", "grpc-tonic", "metrics"] } -opentelemetry-semantic-conventions = "0.31" +opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } [features] diff --git a/src/telemetry.rs b/src/telemetry.rs index ad12b851..5bbfd185 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -172,14 +172,15 @@ mod logfmt { } } +use opentelemetry_semantic_conventions::attribute as otel; /// Fields we pick from spans, in display order. /// (`span_field_name`, `logfmt_key`) const SPAN_FIELDS: &[(&str, &str)] = &[ - ("http.method", "method"), - ("http.target", "path"), - ("http.status_code", "status"), + (otel::HTTP_REQUEST_METHOD, "method"), + (otel::HTTP_TARGET, "path"), + (otel::HTTP_RESPONSE_STATUS_CODE, "status"), ("sqlpage.file", "file"), - ("http.client_ip", "client_ip"), + (otel::HTTP_CLIENT_IP, "client_ip"), ]; /// All-zeros trace ID means no real trace context. @@ -486,8 +487,8 @@ mod logfmt { fn debug_logs_include_unmapped_span_fields() { let mut buf = String::new(); let span_fields = HashMap::from([ - ("http.method", "GET".to_string()), - ("http.route", "/users/:id".to_string()), + (otel::HTTP_REQUEST_METHOD, "GET".to_string()), + (otel::HTTP_ROUTE, "/users/:id".to_string()), ("otel.kind", "server".to_string()), ]); @@ -500,8 +501,8 @@ mod logfmt { fn info_logs_keep_only_mapped_span_fields_when_not_in_debug_mode() { let mut buf = String::new(); let span_fields = HashMap::from([ - ("http.method", "GET".to_string()), - ("http.route", "/users/:id".to_string()), + (otel::HTTP_REQUEST_METHOD, "GET".to_string()), + (otel::HTTP_ROUTE, "/users/:id".to_string()), ("otel.kind", "server".to_string()), ]); diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index a5cacdb6..d3ad2495 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -30,23 +30,12 @@ use sqlx::{ pub type DbConn = Option>; -fn record_query_params(span: &tracing::Span, params: &[Option]) { - use tracing_opentelemetry::OpenTelemetrySpanExt; - for (idx, value) in params.iter().enumerate() { - let key = opentelemetry::Key::new(format!("db.query.parameter.{idx}")); - let otel_value = match value { - Some(v) => opentelemetry::Value::String(v.clone().into()), - None => opentelemetry::Value::String("NULL".into()), - }; - span.set_attribute(key, otel_value); - } -} - fn source_line_number(line: usize) -> i64 { i64::try_from(line).unwrap_or(i64::MAX) } use opentelemetry_semantic_conventions::attribute as otel; +use opentelemetry_semantic_conventions::metric as otel_metric; fn record_query_params(span: &tracing::Span, params: &[Option]) { use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -65,7 +54,7 @@ fn record_db_query_success(span: &tracing::Span, returned_rows: i64, start_time: span.record(otel::OTEL_STATUS_CODE, "OK"); let duration = start_time.elapsed().as_secs_f64(); let histogram = opentelemetry::global::meter("sqlpage") - .f64_histogram("db.client.operation.duration") + .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) .with_unit("s") .with_description("Duration of executing SQL queries.") .build(); @@ -84,7 +73,7 @@ fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyho span.record("exception.details", tracing::field::debug(error)); let duration = start_time.elapsed().as_secs_f64(); let histogram = opentelemetry::global::meter("sqlpage") - .f64_histogram("db.client.operation.duration") + .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) .with_unit("s") .with_description("Duration of executing SQL queries.") .build(); @@ -848,9 +837,9 @@ mod tests { drop(span); }); - assert_eq!(fields["otel.status_code"], "OK"); - assert_eq!(fields["db.response.returned_rows"], "3"); - assert!(!fields.contains_key("exception.message")); + assert_eq!(fields[otel::OTEL_STATUS_CODE], "OK"); + assert_eq!(fields[otel::DB_RESPONSE_RETURNED_ROWS], "3"); + assert!(!fields.contains_key(otel::EXCEPTION_MESSAGE)); assert!(!fields.contains_key("exception.details")); } @@ -869,9 +858,9 @@ mod tests { drop(span); }); - assert_eq!(fields["otel.status_code"], "ERROR"); - assert_eq!(fields["db.response.returned_rows"], "2"); - assert!(fields["exception.message"].contains("while executing SELECT 1")); + assert_eq!(fields[otel::OTEL_STATUS_CODE], "ERROR"); + assert_eq!(fields[otel::DB_RESPONSE_RETURNED_ROWS], "2"); + assert!(fields[otel::EXCEPTION_MESSAGE].contains("while executing SELECT 1")); assert!(fields["exception.details"].contains("query failed")); } } diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs index 1f9d12f3..6c4b6138 100644 --- a/src/webserver/database/pool_metrics.rs +++ b/src/webserver/database/pool_metrics.rs @@ -1,10 +1,11 @@ use opentelemetry::{global, KeyValue}; use opentelemetry::metrics::UpDownCounter; use opentelemetry_semantic_conventions::attribute as otel; +use opentelemetry_semantic_conventions::metric as otel_metric; fn get_counter() -> UpDownCounter { global::meter("sqlpage") - .i64_up_down_counter("db.client.connection.count") + .i64_up_down_counter(otel_metric::DB_CLIENT_CONNECTION_COUNT) .with_unit("{connection}") .with_description("Number of connections in the database pool.") .build() diff --git a/src/webserver/database/sqlpage_functions/functions.rs b/src/webserver/database/sqlpage_functions/functions.rs index c3dbd218..00c0b86d 100644 --- a/src/webserver/database/sqlpage_functions/functions.rs +++ b/src/webserver/database/sqlpage_functions/functions.rs @@ -15,6 +15,7 @@ use futures_util::StreamExt; use mime_guess::mime; use std::fmt::Write; use std::{borrow::Cow, ffi::OsStr, str::FromStr}; +use opentelemetry_semantic_conventions::attribute as otel; use tracing::Instrument; super::function_definition_macro::sqlpage_functions! { @@ -216,11 +217,11 @@ async fn fetch( let method = http_request.method.as_deref().unwrap_or("GET"); let fetch_span = tracing::info_span!( "http.client", - otel.name = format!("{method}"), - http.request.method = method, - url.full = %http_request.url, - http.request.body.size = tracing::field::Empty, - http.response.status_code = tracing::field::Empty, + "otel.name" = format!("{method}"), + { otel::HTTP_REQUEST_METHOD } = method, + { otel::URL_FULL } = %http_request.url, + { otel::HTTP_REQUEST_BODY_SIZE } = tracing::field::Empty, + { otel::HTTP_RESPONSE_STATUS_CODE } = tracing::field::Empty, ); async { @@ -232,7 +233,7 @@ async fn fetch( let mut response = if let Some(body) = &http_request.body { let (body, req) = prepare_request_body(body, req)?; tracing::Span::current().record( - "http.request.body.size", + otel::HTTP_REQUEST_BODY_SIZE, i64::try_from(body.len()).unwrap_or(i64::MAX), ); req.send_body(body) @@ -243,7 +244,7 @@ async fn fetch( .map_err(|e| anyhow!("Unable to fetch {}: {e}", http_request.url))?; tracing::Span::current().record( - "http.response.status_code", + otel::HTTP_RESPONSE_STATUS_CODE, i64::from(response.status().as_u16()), ); @@ -319,11 +320,11 @@ async fn fetch_with_meta( let method = http_request.method.as_deref().unwrap_or("GET"); let fetch_span = tracing::info_span!( "http.client", - otel.name = format!("{method}"), - http.request.method = method, - url.full = %http_request.url, - http.request.body.size = tracing::field::Empty, - http.response.status_code = tracing::field::Empty, + "otel.name" = format!("{method}"), + { otel::HTTP_REQUEST_METHOD } = method, + { otel::URL_FULL } = %http_request.url, + { otel::HTTP_REQUEST_BODY_SIZE } = tracing::field::Empty, + { otel::HTTP_RESPONSE_STATUS_CODE } = tracing::field::Empty, ); async { @@ -335,7 +336,7 @@ async fn fetch_with_meta( let response_result = if let Some(body) = &http_request.body { let (body, req) = prepare_request_body(body, req)?; tracing::Span::current().record( - "http.request.body.size", + otel::HTTP_REQUEST_BODY_SIZE, i64::try_from(body.len()).unwrap_or(i64::MAX), ); req.send_body(body).await @@ -350,7 +351,7 @@ async fn fetch_with_meta( Ok(mut response) => { let status = response.status(); tracing::Span::current() - .record("http.response.status_code", i64::from(status.as_u16())); + .record(otel::HTTP_RESPONSE_STATUS_CODE, i64::from(status.as_u16())); obj.serialize_entry("status", &status.as_u16())?; let mut has_error = false; if status.is_server_error() { diff --git a/src/webserver/http.rs b/src/webserver/http.rs index b0ea9a8c..93062c81 100644 --- a/src/webserver/http.rs +++ b/src/webserver/http.rs @@ -17,6 +17,7 @@ use actix_web::http::header::{ContentType, Header, HttpDate, IfModifiedSince, La use actix_web::http::{header, StatusCode}; use actix_web::web::PayloadConfig; use actix_web::{dev::ServiceResponse, middleware, web, App, Error, HttpResponse, HttpServer}; +use opentelemetry_semantic_conventions::attribute as otel; use tracing::{Instrument, Span}; use tracing_actix_web::{DefaultRootSpanBuilder, RootSpanBuilder, TracingLogger}; @@ -258,7 +259,7 @@ async fn render_sql( let exec_span = tracing::info_span!( "sqlpage.file", otel.name = %sql_execution_span_name(&source_path), - code.file.path = %source_path.display(), + { otel::CODE_FILE_PATH } = %source_path.display(), ); actix_web::rt::spawn(tracing::Instrument::instrument( async move { @@ -342,24 +343,24 @@ impl RootSpanBuilder for SqlPageRootSpanBuilder { let span = tracing::span!( tracing::Level::INFO, "HTTP request", - http.method = %http_method, - http.route = %http_route, - http.flavor = %tracing_actix_web::root_span_macro::private::http_flavor(request.version()), - http.scheme = %tracing_actix_web::root_span_macro::private::http_scheme(connection_info.scheme()), - http.host = %connection_info.host(), - http.client_ip = %request.connection_info().realip_remote_addr().unwrap_or(""), - http.user_agent = %user_agent, - http.target = %request + { otel::HTTP_REQUEST_METHOD } = %http_method, + { otel::HTTP_ROUTE } = %http_route, + { otel::HTTP_FLAVOR } = %tracing_actix_web::root_span_macro::private::http_flavor(request.version()), + { otel::URL_SCHEME } = %tracing_actix_web::root_span_macro::private::http_scheme(connection_info.scheme()), + { otel::HTTP_HOST } = %connection_info.host(), + { otel::HTTP_CLIENT_IP } = %request.connection_info().realip_remote_addr().unwrap_or(""), + { otel::HTTP_USER_AGENT } = %user_agent, + { otel::HTTP_TARGET } = %request .uri() .path_and_query() .map_or("", actix_web::http::uri::PathAndQuery::as_str), - http.status_code = tracing::field::Empty, - otel.name = %otel_name, - otel.kind = "server", - otel.status_code = tracing::field::Empty, + { otel::HTTP_RESPONSE_STATUS_CODE } = tracing::field::Empty, + "otel.name" = %otel_name, + "otel.kind" = "server", + { otel::OTEL_STATUS_CODE } = tracing::field::Empty, request_id = %request_id, - exception.message = tracing::field::Empty, - exception.details = tracing::field::Empty, + { otel::EXCEPTION_MESSAGE } = tracing::field::Empty, + "exception.details" = tracing::field::Empty, ); std::mem::drop(connection_info); tracing_actix_web::root_span_macro::private::set_otel_parent(request, &span); @@ -392,7 +393,7 @@ async fn process_sql_request( let sql_file = { let span = tracing::info_span!( "sqlpage.file.load", - code.file.path = %sql_path.display(), + { otel::CODE_FILE_PATH } = %sql_path.display(), ); app_state .sql_file_cache diff --git a/src/webserver/http_metrics.rs b/src/webserver/http_metrics.rs index 4b0a6e31..21aff485 100644 --- a/src/webserver/http_metrics.rs +++ b/src/webserver/http_metrics.rs @@ -9,6 +9,7 @@ use futures_util::future::LocalBoxFuture; use opentelemetry::{global, KeyValue}; use opentelemetry::metrics::Histogram; use opentelemetry_semantic_conventions::attribute as otel; +use opentelemetry_semantic_conventions::metric as otel_metric; use tracing_actix_web::root_span_macro::private::{http_method_str, http_scheme}; pub struct HttpMetrics; @@ -26,7 +27,7 @@ where fn new_transform(&self, service: S) -> Self::Future { let histogram = global::meter("sqlpage") - .f64_histogram("http.server.request.duration") + .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) .with_unit("s") .with_description("Duration of HTTP requests processed by the server.") .build(); diff --git a/src/webserver/oidc.rs b/src/webserver/oidc.rs index f1ce269c..b432108e 100644 --- a/src/webserver/oidc.rs +++ b/src/webserver/oidc.rs @@ -35,6 +35,7 @@ use openidconnect::{ StandardTokenResponse, }; use serde::{Deserialize, Serialize}; +use opentelemetry_semantic_conventions::attribute as otel; use tracing::Instrument; use super::error::anyhow_err_to_actix_resp; @@ -727,8 +728,8 @@ async fn exchange_code_for_token( ) -> anyhow::Result { let span = tracing::info_span!( "http.client", - otel.name = "POST token_endpoint", - http.request.method = "POST", + "otel.name" = "POST token_endpoint", + { otel::HTTP_REQUEST_METHOD } = "POST", ); let token_response = oidc_client .exchange_code(openidconnect::AuthorizationCode::new( From 090b8c329d137cd89ad2037abcaa8d3aa79ea537 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:03:18 +0100 Subject: [PATCH 13/27] fix(telemetry): use standard YAML list notation in OTel collector config to fix parsing error --- examples/telemetry/otel-collector.yaml | 49 ++++++++++++++++++-------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/examples/telemetry/otel-collector.yaml b/examples/telemetry/otel-collector.yaml index e1deba27..36325199 100644 --- a/examples/telemetry/otel-collector.yaml +++ b/examples/telemetry/otel-collector.yaml @@ -118,22 +118,41 @@ exporters: service: pipelines: traces: - receivers: [otlp] - processors: [batch] - exporters: [otlp_grpc/tempo] + receivers: + - otlp + processors: + - batch + exporters: + - otlp_grpc/tempo metrics: - receivers: [otlp, postgresql] - processors: [batch] - exporters: [prometheus] + receivers: + - otlp + - postgresql + processors: + - batch + exporters: + - prometheus logs/sqlpage: - receivers: [syslog/sqlpage] - processors: [transform/sqlpage_logs, batch] - exporters: [otlp_http/loki] + receivers: + - syslog/sqlpage + processors: + - transform/sqlpage_logs + - batch + exporters: + - otlp_http/loki logs/postgresql: - receivers: [filelog/postgresql] - processors: [transform/postgresql_logs, batch] - exporters: [otlp_http/loki] + receivers: + - filelog/postgresql + processors: + - transform/postgresql_logs + - batch + exporters: + - otlp_http/loki logs/nginx: - receivers: [syslog/nginx] - processors: [transform/nginx_logs, batch] - exporters: [otlp_http/loki] + receivers: + - syslog/nginx + processors: + - transform/nginx_logs + - batch + exporters: + - otlp_http/loki From 8d19fc726bda28882d18761a46e463259ba4d8a1 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:24:44 +0100 Subject: [PATCH 14/27] fix(telemetry): fix clippy warnings and use latest OTel attribute names --- src/lib.rs | 5 ++++ src/telemetry.rs | 4 +-- src/telemetry_metrics.rs | 31 +++++++++++++++++++++++ src/webserver/database/execute_queries.rs | 4 +-- src/webserver/http.rs | 15 +++++------ src/webserver/http_metrics.rs | 28 ++++++++------------ 6 files changed, 58 insertions(+), 29 deletions(-) create mode 100644 src/telemetry_metrics.rs diff --git a/src/lib.rs b/src/lib.rs index a4babf3c..464a8695 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,6 +77,7 @@ pub mod file_cache; pub mod filesystem; pub mod render; pub mod telemetry; +pub mod telemetry_metrics; pub mod template_helpers; pub mod templates; pub mod utils; @@ -90,6 +91,7 @@ use file_cache::FileCache; use std::path::{Path, PathBuf}; use std::sync::Arc; use templates::AllTemplates; +use telemetry_metrics::TelemetryMetrics; use webserver::Database; /// `TEMPLATES_DIR` is the directory where .handlebars files are stored @@ -108,6 +110,7 @@ pub struct AppState { file_system: FileSystem, config: AppConfig, pub oidc_state: Option>, + pub telemetry_metrics: TelemetryMetrics, } impl AppState { @@ -133,6 +136,7 @@ impl AppState { ); let oidc_state = crate::webserver::oidc::initialize_oidc_state(config).await?; + let telemetry_metrics = TelemetryMetrics::new(); Ok(AppState { db, @@ -141,6 +145,7 @@ impl AppState { file_system, config: config.clone(), oidc_state, + telemetry_metrics, }) } } diff --git a/src/telemetry.rs b/src/telemetry.rs index 5bbfd185..6192c15b 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -177,10 +177,10 @@ use opentelemetry_semantic_conventions::attribute as otel; /// (`span_field_name`, `logfmt_key`) const SPAN_FIELDS: &[(&str, &str)] = &[ (otel::HTTP_REQUEST_METHOD, "method"), - (otel::HTTP_TARGET, "path"), + (otel::URL_PATH, "path"), (otel::HTTP_RESPONSE_STATUS_CODE, "status"), ("sqlpage.file", "file"), - (otel::HTTP_CLIENT_IP, "client_ip"), + (otel::CLIENT_ADDRESS, "client_ip"), ]; /// All-zeros trace ID means no real trace context. diff --git a/src/telemetry_metrics.rs b/src/telemetry_metrics.rs new file mode 100644 index 00000000..54073210 --- /dev/null +++ b/src/telemetry_metrics.rs @@ -0,0 +1,31 @@ +use opentelemetry::global; +use opentelemetry::metrics::Histogram; +use opentelemetry_semantic_conventions::metric as otel_metric; + +#[derive(Clone)] +pub struct TelemetryMetrics { + pub http_request_duration: Histogram, + pub db_query_duration: Histogram, +} + +impl TelemetryMetrics { + #[must_use] + pub fn new() -> Self { + let meter = global::meter("sqlpage"); + let http_request_duration = meter + .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) + .with_unit("s") + .with_description("Duration of HTTP requests processed by the server.") + .build(); + let db_query_duration = meter + .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) + .with_unit("s") + .with_description("Duration of executing SQL queries.") + .build(); + + Self { + http_request_duration, + db_query_duration, + } + } +} diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index d3ad2495..fed669c1 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -139,7 +139,7 @@ pub fn stream_query_results_with_conn<'a>( let connection = take_connection(&request.app_state.db, db_connection, request).await?; log::trace!("Executing query {:?}", query.sql); let db_system_name = request.app_state.db.info.database_type.otel_name(); - let (query_span, operation_name) = create_db_query_span(&query.sql, source_file, stmt.query_position.start.line, db_system_name); + let (query_span, operation_name) = create_db_query_span(query.sql, source_file, stmt.query_position.start.line, db_system_name); record_query_params(&query_span, &query.param_values); let start_time = std::time::Instant::now(); @@ -311,7 +311,7 @@ async fn execute_set_variable_query<'a>( let db_system_name = request.app_state.db.info.database_type.otel_name(); let (query_span, operation_name) = create_db_query_span( - &query.sql, + query.sql, source_file, statement.query_position.start.line, db_system_name, diff --git a/src/webserver/http.rs b/src/webserver/http.rs index 93062c81..1d0e31e7 100644 --- a/src/webserver/http.rs +++ b/src/webserver/http.rs @@ -345,15 +345,14 @@ impl RootSpanBuilder for SqlPageRootSpanBuilder { "HTTP request", { otel::HTTP_REQUEST_METHOD } = %http_method, { otel::HTTP_ROUTE } = %http_route, - { otel::HTTP_FLAVOR } = %tracing_actix_web::root_span_macro::private::http_flavor(request.version()), + { otel::NETWORK_PROTOCOL_NAME } = "http", + { otel::NETWORK_PROTOCOL_VERSION } = %tracing_actix_web::root_span_macro::private::http_flavor(request.version()), { otel::URL_SCHEME } = %tracing_actix_web::root_span_macro::private::http_scheme(connection_info.scheme()), - { otel::HTTP_HOST } = %connection_info.host(), - { otel::HTTP_CLIENT_IP } = %request.connection_info().realip_remote_addr().unwrap_or(""), - { otel::HTTP_USER_AGENT } = %user_agent, - { otel::HTTP_TARGET } = %request - .uri() - .path_and_query() - .map_or("", actix_web::http::uri::PathAndQuery::as_str), + { otel::SERVER_ADDRESS } = %connection_info.host(), + { otel::CLIENT_ADDRESS } = %request.connection_info().realip_remote_addr().unwrap_or(""), + { otel::USER_AGENT_ORIGINAL } = %user_agent, + { otel::URL_PATH } = %request.path(), + { otel::URL_QUERY } = %request.query_string(), { otel::HTTP_RESPONSE_STATUS_CODE } = tracing::field::Empty, "otel.name" = %otel_name, "otel.kind" = "server", diff --git a/src/webserver/http_metrics.rs b/src/webserver/http_metrics.rs index 21aff485..2e54292a 100644 --- a/src/webserver/http_metrics.rs +++ b/src/webserver/http_metrics.rs @@ -3,15 +3,15 @@ use std::time::Instant; use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, - Error, + web, Error, }; use futures_util::future::LocalBoxFuture; -use opentelemetry::{global, KeyValue}; -use opentelemetry::metrics::Histogram; +use opentelemetry::KeyValue; use opentelemetry_semantic_conventions::attribute as otel; -use opentelemetry_semantic_conventions::metric as otel_metric; use tracing_actix_web::root_span_macro::private::{http_method_str, http_scheme}; +use crate::AppState; + pub struct HttpMetrics; impl Transform for HttpMetrics @@ -26,22 +26,12 @@ where type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { - let histogram = global::meter("sqlpage") - .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) - .with_unit("s") - .with_description("Duration of HTTP requests processed by the server.") - .build(); - - ready(Ok(HttpMetricsMiddleware { - service, - histogram, - })) + ready(Ok(HttpMetricsMiddleware { service })) } } pub struct HttpMetricsMiddleware { service: S, - histogram: Histogram, } impl Service for HttpMetricsMiddleware @@ -66,7 +56,6 @@ where // We get the route pattern. In Actix, req.match_pattern() returns the matched route let route = req.match_pattern().unwrap_or_else(|| req.path().to_string()); - let histogram = self.histogram.clone(); let fut = self.service.call(req); Box::pin(async move { @@ -86,7 +75,12 @@ where attributes.push(KeyValue::new(otel::ERROR_TYPE, status.to_string())); } - histogram.record(duration, &attributes); + if let Some(app_state) = res.request().app_data::>() { + app_state + .telemetry_metrics + .http_request_duration + .record(duration, &attributes); + } Ok(res) }) From d85bbcef6eea96c5caa67a0a124003a7b286f3ee Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:24:59 +0100 Subject: [PATCH 15/27] fix(telemetry): add Default impl for TelemetryMetrics --- src/telemetry_metrics.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/telemetry_metrics.rs b/src/telemetry_metrics.rs index 54073210..da4bfc40 100644 --- a/src/telemetry_metrics.rs +++ b/src/telemetry_metrics.rs @@ -8,6 +8,12 @@ pub struct TelemetryMetrics { pub db_query_duration: Histogram, } +impl Default for TelemetryMetrics { + fn default() -> Self { + Self::new() + } +} + impl TelemetryMetrics { #[must_use] pub fn new() -> Self { From cb6eb499561b0926894bdb8e783268e39c57d2e3 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:32:35 +0100 Subject: [PATCH 16/27] Add app-scoped telemetry metrics for HTTP --- src/lib.rs | 2 +- src/webserver/http_metrics.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 464a8695..06991f15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,8 +90,8 @@ use crate::webserver::oidc::OidcState; use file_cache::FileCache; use std::path::{Path, PathBuf}; use std::sync::Arc; -use templates::AllTemplates; use telemetry_metrics::TelemetryMetrics; +use templates::AllTemplates; use webserver::Database; /// `TEMPLATES_DIR` is the directory where .handlebars files are stored diff --git a/src/webserver/http_metrics.rs b/src/webserver/http_metrics.rs index 2e54292a..ea8bb4b8 100644 --- a/src/webserver/http_metrics.rs +++ b/src/webserver/http_metrics.rs @@ -52,10 +52,12 @@ where let scheme = http_scheme(connection_info.scheme()).to_string(); let host = connection_info.host().to_string(); drop(connection_info); - + // We get the route pattern. In Actix, req.match_pattern() returns the matched route - let route = req.match_pattern().unwrap_or_else(|| req.path().to_string()); - + let route = req + .match_pattern() + .unwrap_or_else(|| req.path().to_string()); + let fut = self.service.call(req); Box::pin(async move { From 3a8ac44133142238fefd88ede89c088c00d3167b Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:32:39 +0100 Subject: [PATCH 17/27] Refactor db query metrics recording --- src/webserver/database/execute_queries.rs | 140 ++++++++++++++-------- 1 file changed, 92 insertions(+), 48 deletions(-) diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index fed669c1..65d862e1 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -34,8 +34,8 @@ fn source_line_number(line: usize) -> i64 { i64::try_from(line).unwrap_or(i64::MAX) } +use crate::telemetry_metrics::TelemetryMetrics; use opentelemetry_semantic_conventions::attribute as otel; -use opentelemetry_semantic_conventions::metric as otel_metric; fn record_query_params(span: &tracing::Span, params: &[Option]) { use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -49,44 +49,68 @@ fn record_query_params(span: &tracing::Span, params: &[Option]) { } } -fn record_db_query_success(span: &tracing::Span, returned_rows: i64, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { - span.record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); - span.record(otel::OTEL_STATUS_CODE, "OK"); - let duration = start_time.elapsed().as_secs_f64(); - let histogram = opentelemetry::global::meter("sqlpage") - .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) - .with_unit("s") - .with_description("Duration of executing SQL queries.") - .build(); - let attributes = [ - opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, operation_name), - opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "OK"), - ]; - histogram.record(duration, &attributes); +struct DbQueryMetricsContext<'a> { + span: tracing::Span, + start_time: std::time::Instant, + db_system_name: &'static str, + operation_name: String, + metrics: &'a TelemetryMetrics, } -fn record_db_query_error(span: &tracing::Span, returned_rows: i64, error: &anyhow::Error, start_time: std::time::Instant, db_system_name: &'static str, operation_name: String) { - span.record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); - span.record(otel::OTEL_STATUS_CODE, "ERROR"); - span.record(otel::EXCEPTION_MESSAGE, tracing::field::display(error)); - span.record("exception.details", tracing::field::debug(error)); - let duration = start_time.elapsed().as_secs_f64(); - let histogram = opentelemetry::global::meter("sqlpage") - .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) - .with_unit("s") - .with_description("Duration of executing SQL queries.") - .build(); - let attributes = [ - opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, operation_name), - opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "ERROR"), - opentelemetry::KeyValue::new(otel::ERROR_TYPE, error.to_string()), - ]; - histogram.record(duration, &attributes); +impl<'a> DbQueryMetricsContext<'a> { + fn new( + span: tracing::Span, + operation_name: String, + db_system_name: &'static str, + metrics: &'a TelemetryMetrics, + ) -> Self { + Self { + span, + start_time: std::time::Instant::now(), + db_system_name, + operation_name, + metrics, + } + } + + fn record_success(&self, returned_rows: i64) { + self.span + .record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); + self.span.record(otel::OTEL_STATUS_CODE, "OK"); + let duration = self.start_time.elapsed().as_secs_f64(); + let attributes = [ + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, self.db_system_name), + opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, self.operation_name.clone()), + opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "OK"), + ]; + self.metrics.db_query_duration.record(duration, &attributes); + } + + fn record_error(&self, returned_rows: i64, error: &anyhow::Error) { + self.span + .record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); + self.span.record(otel::OTEL_STATUS_CODE, "ERROR"); + self.span + .record(otel::EXCEPTION_MESSAGE, tracing::field::display(error)); + self.span + .record("exception.details", tracing::field::debug(error)); + let duration = self.start_time.elapsed().as_secs_f64(); + let attributes = [ + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, self.db_system_name), + opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, self.operation_name.clone()), + opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "ERROR"), + opentelemetry::KeyValue::new(otel::ERROR_TYPE, error.to_string()), + ]; + self.metrics.db_query_duration.record(duration, &attributes); + } } -fn create_db_query_span(sql: &str, source_file: &Path, line: usize, db_system_name: &'static str) -> (tracing::Span, String) { +fn create_db_query_span( + sql: &str, + source_file: &Path, + line: usize, + db_system_name: &'static str, +) -> (tracing::Span, String) { let operation_name = sql.split_whitespace().next().unwrap_or("").to_uppercase(); let span = tracing::info_span!( "db.query", @@ -139,10 +163,19 @@ pub fn stream_query_results_with_conn<'a>( let connection = take_connection(&request.app_state.db, db_connection, request).await?; log::trace!("Executing query {:?}", query.sql); let db_system_name = request.app_state.db.info.database_type.otel_name(); - let (query_span, operation_name) = create_db_query_span(query.sql, source_file, stmt.query_position.start.line, db_system_name); - record_query_params(&query_span, &query.param_values); - - let start_time = std::time::Instant::now(); + let (query_span, operation_name) = create_db_query_span( + query.sql, + source_file, + stmt.query_position.start.line, + db_system_name, + ); + let query_metrics = DbQueryMetricsContext::new( + query_span.clone(), + operation_name, + db_system_name, + &request.app_state.telemetry_metrics, + ); + record_query_params(&query_metrics.span, &query.param_values); let mut stream = connection.fetch_many(query); let mut error = None; let mut returned_rows: i64 = 0; @@ -169,11 +202,11 @@ pub fn stream_query_results_with_conn<'a>( } drop(stream); if let Some(error) = error { - record_db_query_error(&query_span, returned_rows, &error, start_time, db_system_name, operation_name); + query_metrics.record_error(returned_rows, &error); try_rollback_transaction(connection).await; yield DbItem::Error(error); } else { - record_db_query_success(&query_span, returned_rows, start_time, db_system_name, operation_name); + query_metrics.record_success(returned_rows); } }, ParsedStatement::SetVariable { variable, value} => { @@ -316,25 +349,30 @@ async fn execute_set_variable_query<'a>( statement.query_position.start.line, db_system_name, ); - record_query_params(&query_span, &query.param_values); - let start_time = std::time::Instant::now(); + let query_metrics = DbQueryMetricsContext::new( + query_span.clone(), + operation_name, + db_system_name, + &request.app_state.telemetry_metrics, + ); + record_query_params(&query_metrics.span, &query.param_values); let value = match connection .fetch_optional(query) .instrument(query_span.clone()) .await { Ok(Some(row)) => { - record_db_query_success(&query_span, 1_i64, start_time, db_system_name, operation_name); + query_metrics.record_success(1_i64); row_to_string(&row) } Ok(None) => { - record_db_query_success(&query_span, 0_i64, start_time, db_system_name, operation_name); + query_metrics.record_success(0_i64); None } Err(e) => { try_rollback_transaction(connection).await; let err = display_stmt_db_error(source_file, statement, e); - record_db_query_error(&query_span, 0_i64, &err, start_time, db_system_name, operation_name); + query_metrics.record_error(0_i64, &err); return Err(err); } }; @@ -833,7 +871,10 @@ mod tests { exception.details = tracing::field::Empty, db.response.returned_rows = tracing::field::Empty, ); - record_db_query_success(&span, 3, std::time::Instant::now(), "sqlite", "SELECT".to_string()); + let metrics = crate::telemetry_metrics::TelemetryMetrics::new(); + let query_metrics = + DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); + query_metrics.record_success(3); drop(span); }); @@ -854,7 +895,10 @@ mod tests { db.response.returned_rows = tracing::field::Empty, ); let error = anyhow!("query failed").context("while executing SELECT 1"); - record_db_query_error(&span, 2, &error, std::time::Instant::now(), "sqlite", "SELECT".to_string()); + let metrics = crate::telemetry_metrics::TelemetryMetrics::new(); + let query_metrics = + DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); + query_metrics.record_error(2, &error); drop(span); }); From a8682e5508f717fcbd2a6d9724673ff0577b4ab7 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:32:48 +0100 Subject: [PATCH 18/27] Use semantic convention attribute names --- src/telemetry.rs | 4 ++-- src/webserver/database/sqlpage_functions/functions.rs | 2 +- src/webserver/mod.rs | 2 +- src/webserver/oidc.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/telemetry.rs b/src/telemetry.rs index 6192c15b..06519e08 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -9,8 +9,8 @@ use std::env; use std::sync::OnceLock; -use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::trace::SdkTracerProvider; static TRACER_PROVIDER: OnceLock = OnceLock::new(); static METER_PROVIDER: OnceLock = OnceLock::new(); @@ -172,7 +172,7 @@ mod logfmt { } } -use opentelemetry_semantic_conventions::attribute as otel; + use opentelemetry_semantic_conventions::attribute as otel; /// Fields we pick from spans, in display order. /// (`span_field_name`, `logfmt_key`) const SPAN_FIELDS: &[(&str, &str)] = &[ diff --git a/src/webserver/database/sqlpage_functions/functions.rs b/src/webserver/database/sqlpage_functions/functions.rs index 00c0b86d..e756b0d9 100644 --- a/src/webserver/database/sqlpage_functions/functions.rs +++ b/src/webserver/database/sqlpage_functions/functions.rs @@ -13,9 +13,9 @@ use crate::webserver::{ use anyhow::{anyhow, Context}; use futures_util::StreamExt; use mime_guess::mime; +use opentelemetry_semantic_conventions::attribute as otel; use std::fmt::Write; use std::{borrow::Cow, ffi::OsStr, str::FromStr}; -use opentelemetry_semantic_conventions::attribute as otel; use tracing::Instrument; super::function_definition_macro::sqlpage_functions! { diff --git a/src/webserver/mod.rs b/src/webserver/mod.rs index 1f834974..a692616d 100644 --- a/src/webserver/mod.rs +++ b/src/webserver/mod.rs @@ -34,8 +34,8 @@ pub mod database; mod error; pub mod error_with_status; pub mod http; -pub mod http_metrics; pub mod http_client; +pub mod http_metrics; pub mod http_request_info; mod https; pub mod request_variables; diff --git a/src/webserver/oidc.rs b/src/webserver/oidc.rs index b432108e..8e26ad43 100644 --- a/src/webserver/oidc.rs +++ b/src/webserver/oidc.rs @@ -34,8 +34,8 @@ use openidconnect::{ EmptyExtraTokenFields, IdTokenFields, IdTokenVerifier, StandardErrorResponse, StandardTokenResponse, }; -use serde::{Deserialize, Serialize}; use opentelemetry_semantic_conventions::attribute as otel; +use serde::{Deserialize, Serialize}; use tracing::Instrument; use super::error::anyhow_err_to_actix_resp; From 508282034224f3f91a35b8267816f734a9f5bf4d Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:32:53 +0100 Subject: [PATCH 19/27] Track db pool metrics during lifecycle --- src/webserver/database/connect.rs | 8 ++- src/webserver/database/mod.rs | 2 +- src/webserver/database/pool_metrics.rs | 67 ++++++++++++++++---------- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index b392fc02..67dd441f 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -10,7 +10,7 @@ use anyhow::Context; use futures_util::future::BoxFuture; use sqlx::odbc::OdbcConnectOptions; use sqlx::{ - any::{Any, AnyConnectOptions, AnyKind, AnyConnection}, + any::{Any, AnyConnectOptions, AnyConnection, AnyKind}, pool::PoolOptions, sqlite::{Function, SqliteConnectOptions, SqliteFunctionCtx}, ConnectOptions, Connection, Executor, @@ -74,7 +74,11 @@ impl Database { }) } - fn create_pool_options(config: &AppConfig, kind: AnyKind, database_type: SupportedDatabase) -> PoolOptions { + fn create_pool_options( + config: &AppConfig, + kind: AnyKind, + database_type: SupportedDatabase, + ) -> PoolOptions { let mut pool_options = PoolOptions::new() .max_connections(if let Some(max) = config.max_database_pool_connections { max diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 26db4650..832dc62a 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -3,9 +3,9 @@ mod connect; mod csv_import; pub mod execute_queries; pub mod migrations; +mod pool_metrics; mod sql; mod sqlpage_functions; -mod pool_metrics; mod syntax_tree; mod error_highlighting; diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs index 6c4b6138..f3a2eab3 100644 --- a/src/webserver/database/pool_metrics.rs +++ b/src/webserver/database/pool_metrics.rs @@ -1,5 +1,5 @@ -use opentelemetry::{global, KeyValue}; use opentelemetry::metrics::UpDownCounter; +use opentelemetry::{global, KeyValue}; use opentelemetry_semantic_conventions::attribute as otel; use opentelemetry_semantic_conventions::metric as otel_metric; @@ -13,37 +13,52 @@ fn get_counter() -> UpDownCounter { pub fn on_acquire(db_system_name: &'static str) { let counter = get_counter(); - counter.add(1, &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), - ]); - counter.add(-1, &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ]); + counter.add( + 1, + &[ + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), + ], + ); + counter.add( + -1, + &[ + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), + ], + ); } pub fn on_release(db_system_name: &'static str) { let counter = get_counter(); - counter.add(-1, &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), - ]); - counter.add(1, &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ]); + counter.add( + -1, + &[ + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), + ], + ); } pub fn on_connect(db_system_name: &'static str) { let counter = get_counter(); - counter.add(1, &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ]); + counter.add( + 1, + &[ + KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), + KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), + ], + ); } From ef0d1cec8e996b1a058d287d82a7528aee18fa88 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:32:57 +0100 Subject: [PATCH 20/27] Adjust telemetry docker compose --- examples/telemetry/docker-compose.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/telemetry/docker-compose.yml b/examples/telemetry/docker-compose.yml index fcc2b776..53d0ccb2 100644 --- a/examples/telemetry/docker-compose.yml +++ b/examples/telemetry/docker-compose.yml @@ -113,8 +113,10 @@ services: - "1514:1514/udp" - "1516:1516/udp" depends_on: - - tempo - - postgres + tempo: + condition: service_started + postgres: + condition: service_started loki: condition: service_healthy From 33e8632ac78550100f1616f31d4bb4498c30bbc9 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 18:49:27 +0100 Subject: [PATCH 21/27] fix(metrics): configure explicit histogram boundaries and fix database latency measurement --- src/telemetry.rs | 26 +++++++++++++++++- src/webserver/database/execute_queries.rs | 33 ++++++++++++++++------- 2 files changed, 49 insertions(+), 10 deletions(-) diff --git a/src/telemetry.rs b/src/telemetry.rs index 06519e08..a322417e 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -85,7 +85,31 @@ fn init_otel_tracing() { .expect("Failed to build OTLP metric exporter"); let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter).build(); - let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| { + if instrument.name() == "http.server.request.duration" + || instrument.name() == "db.client.operation.duration" + { + Some( + opentelemetry_sdk::metrics::Stream::builder() + .with_aggregation( + opentelemetry_sdk::metrics::Aggregation::ExplicitBucketHistogram { + boundaries: vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, + 1.0, 2.5, 5.0, 7.5, 10.0, + ], + record_min_max: true, + }, + ) + .build() + .expect("Failed to build metrics stream"), + ) + } else { + None + } + }) + .build(); global::set_meter_provider(meter_provider.clone()); let _ = METER_PROVIDER.set(meter_provider.clone()); diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index 65d862e1..b136f16b 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -51,7 +51,7 @@ fn record_query_params(span: &tracing::Span, params: &[Option]) { struct DbQueryMetricsContext<'a> { span: tracing::Span, - start_time: std::time::Instant, + duration: std::time::Duration, db_system_name: &'static str, operation_name: String, metrics: &'a TelemetryMetrics, @@ -66,24 +66,29 @@ impl<'a> DbQueryMetricsContext<'a> { ) -> Self { Self { span, - start_time: std::time::Instant::now(), + duration: std::time::Duration::ZERO, db_system_name, operation_name, metrics, } } + fn add_duration(&mut self, duration: std::time::Duration) { + self.duration += duration; + } + fn record_success(&self, returned_rows: i64) { self.span .record(otel::DB_RESPONSE_RETURNED_ROWS, returned_rows); self.span.record(otel::OTEL_STATUS_CODE, "OK"); - let duration = self.start_time.elapsed().as_secs_f64(); let attributes = [ opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, self.db_system_name), opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, self.operation_name.clone()), opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "OK"), ]; - self.metrics.db_query_duration.record(duration, &attributes); + self.metrics + .db_query_duration + .record(self.duration.as_secs_f64(), &attributes); } fn record_error(&self, returned_rows: i64, error: &anyhow::Error) { @@ -94,14 +99,15 @@ impl<'a> DbQueryMetricsContext<'a> { .record(otel::EXCEPTION_MESSAGE, tracing::field::display(error)); self.span .record("exception.details", tracing::field::debug(error)); - let duration = self.start_time.elapsed().as_secs_f64(); let attributes = [ opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, self.db_system_name), opentelemetry::KeyValue::new(otel::DB_OPERATION_NAME, self.operation_name.clone()), opentelemetry::KeyValue::new(otel::OTEL_STATUS_CODE, "ERROR"), opentelemetry::KeyValue::new(otel::ERROR_TYPE, error.to_string()), ]; - self.metrics.db_query_duration.record(duration, &attributes); + self.metrics + .db_query_duration + .record(self.duration.as_secs_f64(), &attributes); } } @@ -169,7 +175,7 @@ pub fn stream_query_results_with_conn<'a>( stmt.query_position.start.line, db_system_name, ); - let query_metrics = DbQueryMetricsContext::new( + let mut query_metrics = DbQueryMetricsContext::new( query_span.clone(), operation_name, db_system_name, @@ -179,7 +185,12 @@ pub fn stream_query_results_with_conn<'a>( let mut stream = connection.fetch_many(query); let mut error = None; let mut returned_rows: i64 = 0; - while let Some(elem) = stream.next().instrument(query_span.clone()).await { + loop { + let start_next = std::time::Instant::now(); + let next_elem = stream.next().instrument(query_span.clone()).await; + query_metrics.add_duration(start_next.elapsed()); + let Some(elem) = next_elem else { break; }; + let mut query_result = parse_single_sql_result(source_file, stmt, elem); if let DbItem::Error(e) = query_result { error = Some(e); @@ -349,27 +360,31 @@ async fn execute_set_variable_query<'a>( statement.query_position.start.line, db_system_name, ); - let query_metrics = DbQueryMetricsContext::new( + let mut query_metrics = DbQueryMetricsContext::new( query_span.clone(), operation_name, db_system_name, &request.app_state.telemetry_metrics, ); record_query_params(&query_metrics.span, &query.param_values); + let start_time = std::time::Instant::now(); let value = match connection .fetch_optional(query) .instrument(query_span.clone()) .await { Ok(Some(row)) => { + query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(1_i64); row_to_string(&row) } Ok(None) => { + query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(0_i64); None } Err(e) => { + query_metrics.add_duration(start_time.elapsed()); try_rollback_transaction(connection).await; let err = display_stmt_db_error(source_file, statement, e); query_metrics.record_error(0_i64, &err); From 0c46af7721e0bd803509b871ad4d9cb69ca2f0ad Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 19:12:06 +0100 Subject: [PATCH 22/27] Enable metrics views for explicit histograms --- Cargo.toml | 2 +- src/telemetry.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0336cd84..e51cc5bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ tracing-opentelemetry = "0.32" tracing-actix-web = { version = "0.7", default-features = false, features = ["opentelemetry_0_31"] } tracing-log = "0.2" opentelemetry = "0.31" -opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread"] } +opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread", "spec_unstable_metrics_views"] } opentelemetry-otlp = { version = "0.31", features = ["http-proto", "grpc-tonic", "metrics"] } opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } diff --git a/src/telemetry.rs b/src/telemetry.rs index a322417e..46a47757 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -88,9 +88,7 @@ fn init_otel_tracing() { let meter_provider = SdkMeterProvider::builder() .with_reader(reader) .with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| { - if instrument.name() == "http.server.request.duration" - || instrument.name() == "db.client.operation.duration" - { + if instrument.kind() == opentelemetry_sdk::metrics::InstrumentKind::Histogram { Some( opentelemetry_sdk::metrics::Stream::builder() .with_aggregation( From 2848690e9eb270daa3ab68b7a864c6d4a2ea02e4 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 20:13:38 +0100 Subject: [PATCH 23/27] Use observable gauge for pool connection count --- src/lib.rs | 2 +- src/telemetry_metrics.rs | 67 +++++++++++++++++++++-- src/webserver/database/connect.rs | 26 ++------- src/webserver/database/execute_queries.rs | 19 ++++++- src/webserver/database/mod.rs | 1 - src/webserver/database/pool_metrics.rs | 64 ---------------------- 6 files changed, 86 insertions(+), 93 deletions(-) delete mode 100644 src/webserver/database/pool_metrics.rs diff --git a/src/lib.rs b/src/lib.rs index 06991f15..34e3e8d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,7 +136,7 @@ impl AppState { ); let oidc_state = crate::webserver::oidc::initialize_oidc_state(config).await?; - let telemetry_metrics = TelemetryMetrics::new(); + let telemetry_metrics = TelemetryMetrics::new(db.info.database_type.otel_name()); Ok(AppState { db, diff --git a/src/telemetry_metrics.rs b/src/telemetry_metrics.rs index da4bfc40..7fd5143d 100644 --- a/src/telemetry_metrics.rs +++ b/src/telemetry_metrics.rs @@ -1,23 +1,37 @@ use opentelemetry::global; -use opentelemetry::metrics::Histogram; +use opentelemetry::metrics::{Histogram, ObservableGauge}; +use opentelemetry_semantic_conventions::attribute as otel; use opentelemetry_semantic_conventions::metric as otel_metric; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +struct PoolConnectionSnapshot { + used: AtomicI64, + idle: AtomicI64, +} -#[derive(Clone)] pub struct TelemetryMetrics { pub http_request_duration: Histogram, pub db_query_duration: Histogram, + _pool_connection_count: ObservableGauge, + pool_snapshot: Arc, + db_system_name: &'static str, } impl Default for TelemetryMetrics { fn default() -> Self { - Self::new() + Self::new("other_sql") } } impl TelemetryMetrics { #[must_use] - pub fn new() -> Self { + pub fn new(db_system_name: &'static str) -> Self { let meter = global::meter("sqlpage"); + let pool_snapshot = Arc::new(PoolConnectionSnapshot { + used: AtomicI64::new(0), + idle: AtomicI64::new(0), + }); let http_request_duration = meter .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) .with_unit("s") @@ -28,10 +42,55 @@ impl TelemetryMetrics { .with_unit("s") .with_description("Duration of executing SQL queries.") .build(); + let snapshot_ref = Arc::clone(&pool_snapshot); + let pool_connection_count = meter + .i64_observable_gauge(otel_metric::DB_CLIENT_CONNECTION_COUNT) + .with_unit("{connection}") + .with_description("Number of connections in the database pool.") + .with_callback(move |observer| { + let used = snapshot_ref.used.load(Ordering::Relaxed); + let idle = snapshot_ref.idle.load(Ordering::Relaxed); + observer.observe( + used, + &[ + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + opentelemetry::KeyValue::new( + otel::DB_CLIENT_CONNECTION_POOL_NAME, + "sqlpage", + ), + opentelemetry::KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), + ], + ); + observer.observe( + idle, + &[ + opentelemetry::KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), + opentelemetry::KeyValue::new( + otel::DB_CLIENT_CONNECTION_POOL_NAME, + "sqlpage", + ), + opentelemetry::KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), + ], + ); + }) + .build(); Self { http_request_duration, db_query_duration, + _pool_connection_count: pool_connection_count, + pool_snapshot, + db_system_name, + } + } + + pub fn record_pool_snapshot(&self, db_system_name: &'static str, size: u32, idle: usize) { + if db_system_name != self.db_system_name { + return; } + let idle = i64::try_from(idle).unwrap_or(i64::MAX); + let used = i64::from(size).saturating_sub(idle); + self.pool_snapshot.used.store(used, Ordering::Relaxed); + self.pool_snapshot.idle.store(idle, Ordering::Relaxed); } } diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index 67dd441f..0cac02b0 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -58,7 +58,7 @@ impl Database { drop(conn); let db_kind = connect_options.kind(); - let pool = Self::create_pool_options(config, db_kind, database_type) + let pool = Self::create_pool_options(config, db_kind) .connect_with(connect_options) .await .with_context(|| format!("Unable to open connection pool to {database_url}"))?; @@ -74,11 +74,7 @@ impl Database { }) } - fn create_pool_options( - config: &AppConfig, - kind: AnyKind, - database_type: SupportedDatabase, - ) -> PoolOptions { + fn create_pool_options(config: &AppConfig, kind: AnyKind) -> PoolOptions { let mut pool_options = PoolOptions::new() .max_connections(if let Some(max) = config.max_database_pool_connections { max @@ -102,22 +98,13 @@ impl Database { .acquire_timeout(Duration::from_secs_f64( config.database_connection_acquire_timeout_seconds, )); - let db_system_name = database_type.otel_name(); - pool_options = add_on_return_to_pool(config, pool_options, db_system_name); - pool_options = add_on_connection_handler(config, pool_options, db_system_name); - pool_options = pool_options.before_acquire(move |_, _| { - super::pool_metrics::on_acquire(db_system_name); - Box::pin(async move { Ok(true) }) - }); + pool_options = add_on_return_to_pool(config, pool_options); + pool_options = add_on_connection_handler(config, pool_options); pool_options } } -fn add_on_return_to_pool( - config: &AppConfig, - pool_options: PoolOptions, - db_system_name: &'static str, -) -> PoolOptions { +fn add_on_return_to_pool(config: &AppConfig, pool_options: PoolOptions) -> PoolOptions { let on_disconnect_file = config.configuration_directory.join(ON_RESET_FILE); let sql = if on_disconnect_file.exists() { match std::fs::read_to_string(&on_disconnect_file) { @@ -137,7 +124,6 @@ fn add_on_return_to_pool( pool_options.after_release(move |conn, meta| { let sql = sql.clone(); - super::pool_metrics::on_release(db_system_name); Box::pin(async move { if let Some(sql) = sql { on_return_to_pool(conn, meta, sql).await @@ -171,7 +157,6 @@ fn on_return_to_pool( fn add_on_connection_handler( config: &AppConfig, pool_options: PoolOptions, - db_system_name: &'static str, ) -> PoolOptions { let on_connect_file = config.configuration_directory.join(ON_CONNECT_FILE); let sql = if on_connect_file.exists() { @@ -192,7 +177,6 @@ fn add_on_connection_handler( pool_options.after_connect(move |conn, _| { let sql = sql.clone(); - super::pool_metrics::on_connect(db_system_name); Box::pin(async move { if let Some(sql) = sql { log::debug!("Running connection handler on new connection"); diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index b136f16b..615021a0 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -214,10 +214,12 @@ pub fn stream_query_results_with_conn<'a>( drop(stream); if let Some(error) = error { query_metrics.record_error(returned_rows, &error); + record_pool_snapshot(request); try_rollback_transaction(connection).await; yield DbItem::Error(error); } else { query_metrics.record_success(returned_rows); + record_pool_snapshot(request); } }, ParsedStatement::SetVariable { variable, value} => { @@ -376,11 +378,13 @@ async fn execute_set_variable_query<'a>( Ok(Some(row)) => { query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(1_i64); + record_pool_snapshot(request); row_to_string(&row) } Ok(None) => { query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(0_i64); + record_pool_snapshot(request); None } Err(e) => { @@ -388,6 +392,7 @@ async fn execute_set_variable_query<'a>( try_rollback_transaction(connection).await; let err = display_stmt_db_error(source_file, statement, e); query_metrics.record_error(0_i64, &err); + record_pool_snapshot(request); return Err(err); } }; @@ -445,6 +450,15 @@ fn vars_and_name<'a, 'b>( } } +fn record_pool_snapshot(request: &ExecutionContext) { + let db = &request.app_state.db; + request.app_state.telemetry_metrics.record_pool_snapshot( + db.info.database_type.otel_name(), + db.connection.size(), + db.connection.num_idle(), + ); +} + async fn take_connection<'a>( db: &'a Database, conn: &'a mut DbConn, @@ -462,6 +476,7 @@ async fn take_connection<'a>( *conn = Some(c); let connection = conn.as_mut().unwrap(); set_trace_context(connection, db).await; + record_pool_snapshot(request); Ok(connection) } Err(e) => { @@ -886,7 +901,7 @@ mod tests { exception.details = tracing::field::Empty, db.response.returned_rows = tracing::field::Empty, ); - let metrics = crate::telemetry_metrics::TelemetryMetrics::new(); + let metrics = crate::telemetry_metrics::TelemetryMetrics::new("sqlite"); let query_metrics = DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); query_metrics.record_success(3); @@ -910,7 +925,7 @@ mod tests { db.response.returned_rows = tracing::field::Empty, ); let error = anyhow!("query failed").context("while executing SELECT 1"); - let metrics = crate::telemetry_metrics::TelemetryMetrics::new(); + let metrics = crate::telemetry_metrics::TelemetryMetrics::new("sqlite"); let query_metrics = DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); query_metrics.record_error(2, &error); diff --git a/src/webserver/database/mod.rs b/src/webserver/database/mod.rs index 832dc62a..2849c37a 100644 --- a/src/webserver/database/mod.rs +++ b/src/webserver/database/mod.rs @@ -3,7 +3,6 @@ mod connect; mod csv_import; pub mod execute_queries; pub mod migrations; -mod pool_metrics; mod sql; mod sqlpage_functions; mod syntax_tree; diff --git a/src/webserver/database/pool_metrics.rs b/src/webserver/database/pool_metrics.rs deleted file mode 100644 index f3a2eab3..00000000 --- a/src/webserver/database/pool_metrics.rs +++ /dev/null @@ -1,64 +0,0 @@ -use opentelemetry::metrics::UpDownCounter; -use opentelemetry::{global, KeyValue}; -use opentelemetry_semantic_conventions::attribute as otel; -use opentelemetry_semantic_conventions::metric as otel_metric; - -fn get_counter() -> UpDownCounter { - global::meter("sqlpage") - .i64_up_down_counter(otel_metric::DB_CLIENT_CONNECTION_COUNT) - .with_unit("{connection}") - .with_description("Number of connections in the database pool.") - .build() -} - -pub fn on_acquire(db_system_name: &'static str) { - let counter = get_counter(); - counter.add( - 1, - &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), - ], - ); - counter.add( - -1, - &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ], - ); -} - -pub fn on_release(db_system_name: &'static str) { - let counter = get_counter(); - counter.add( - -1, - &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "used"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ], - ); -} - -pub fn on_connect(db_system_name: &'static str) { - let counter = get_counter(); - counter.add( - 1, - &[ - KeyValue::new(otel::DB_SYSTEM_NAME, db_system_name), - KeyValue::new(otel::DB_CLIENT_CONNECTION_POOL_NAME, "sqlpage"), - KeyValue::new(otel::DB_CLIENT_CONNECTION_STATE, "idle"), - ], - ); -} From 539abd8552d1b451697724bfc4490a4513e5002e Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 22:15:48 +0100 Subject: [PATCH 24/27] Read pool gauge directly from sqlx pool --- src/lib.rs | 3 +- src/telemetry_metrics.rs | 61 ++++++++++++----------- src/webserver/database/execute_queries.rs | 19 +------ 3 files changed, 35 insertions(+), 48 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 34e3e8d7..d0c13b3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,7 +136,8 @@ impl AppState { ); let oidc_state = crate::webserver::oidc::initialize_oidc_state(config).await?; - let telemetry_metrics = TelemetryMetrics::new(db.info.database_type.otel_name()); + let telemetry_metrics = + TelemetryMetrics::new(&db.connection, db.info.database_type.otel_name()); Ok(AppState { db, diff --git a/src/telemetry_metrics.rs b/src/telemetry_metrics.rs index 7fd5143d..89974438 100644 --- a/src/telemetry_metrics.rs +++ b/src/telemetry_metrics.rs @@ -2,36 +2,47 @@ use opentelemetry::global; use opentelemetry::metrics::{Histogram, ObservableGauge}; use opentelemetry_semantic_conventions::attribute as otel; use opentelemetry_semantic_conventions::metric as otel_metric; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; - -struct PoolConnectionSnapshot { - used: AtomicI64, - idle: AtomicI64, -} +use sqlx::AnyPool; pub struct TelemetryMetrics { pub http_request_duration: Histogram, pub db_query_duration: Histogram, _pool_connection_count: ObservableGauge, - pool_snapshot: Arc, - db_system_name: &'static str, } impl Default for TelemetryMetrics { fn default() -> Self { - Self::new("other_sql") + let meter = global::meter("sqlpage"); + let http_request_duration = meter + .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) + .with_unit("s") + .with_description("Duration of HTTP requests processed by the server.") + .build(); + let db_query_duration = meter + .f64_histogram(otel_metric::DB_CLIENT_OPERATION_DURATION) + .with_unit("s") + .with_description("Duration of executing SQL queries.") + .build(); + // This default is only used in tests that don't touch pool metrics. + let pool_connection_count = meter + .i64_observable_gauge(otel_metric::DB_CLIENT_CONNECTION_COUNT) + .with_unit("{connection}") + .with_description("Number of connections in the database pool.") + .with_callback(|_| {}) + .build(); + + Self { + http_request_duration, + db_query_duration, + _pool_connection_count: pool_connection_count, + } } } impl TelemetryMetrics { #[must_use] - pub fn new(db_system_name: &'static str) -> Self { + pub fn new(pool: &AnyPool, db_system_name: &'static str) -> Self { let meter = global::meter("sqlpage"); - let pool_snapshot = Arc::new(PoolConnectionSnapshot { - used: AtomicI64::new(0), - idle: AtomicI64::new(0), - }); let http_request_duration = meter .f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION) .with_unit("s") @@ -42,14 +53,16 @@ impl TelemetryMetrics { .with_unit("s") .with_description("Duration of executing SQL queries.") .build(); - let snapshot_ref = Arc::clone(&pool_snapshot); + let pool_ref = pool.clone(); let pool_connection_count = meter .i64_observable_gauge(otel_metric::DB_CLIENT_CONNECTION_COUNT) .with_unit("{connection}") .with_description("Number of connections in the database pool.") .with_callback(move |observer| { - let used = snapshot_ref.used.load(Ordering::Relaxed); - let idle = snapshot_ref.idle.load(Ordering::Relaxed); + let size = pool_ref.size(); + let idle_u32 = u32::try_from(pool_ref.num_idle()).unwrap_or(u32::MAX); + let used = i64::from(size.saturating_sub(idle_u32)); + let idle = i64::from(idle_u32); observer.observe( used, &[ @@ -79,18 +92,6 @@ impl TelemetryMetrics { http_request_duration, db_query_duration, _pool_connection_count: pool_connection_count, - pool_snapshot, - db_system_name, - } - } - - pub fn record_pool_snapshot(&self, db_system_name: &'static str, size: u32, idle: usize) { - if db_system_name != self.db_system_name { - return; } - let idle = i64::try_from(idle).unwrap_or(i64::MAX); - let used = i64::from(size).saturating_sub(idle); - self.pool_snapshot.used.store(used, Ordering::Relaxed); - self.pool_snapshot.idle.store(idle, Ordering::Relaxed); } } diff --git a/src/webserver/database/execute_queries.rs b/src/webserver/database/execute_queries.rs index 615021a0..2e29b4e8 100644 --- a/src/webserver/database/execute_queries.rs +++ b/src/webserver/database/execute_queries.rs @@ -214,12 +214,10 @@ pub fn stream_query_results_with_conn<'a>( drop(stream); if let Some(error) = error { query_metrics.record_error(returned_rows, &error); - record_pool_snapshot(request); try_rollback_transaction(connection).await; yield DbItem::Error(error); } else { query_metrics.record_success(returned_rows); - record_pool_snapshot(request); } }, ParsedStatement::SetVariable { variable, value} => { @@ -378,13 +376,11 @@ async fn execute_set_variable_query<'a>( Ok(Some(row)) => { query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(1_i64); - record_pool_snapshot(request); row_to_string(&row) } Ok(None) => { query_metrics.add_duration(start_time.elapsed()); query_metrics.record_success(0_i64); - record_pool_snapshot(request); None } Err(e) => { @@ -392,7 +388,6 @@ async fn execute_set_variable_query<'a>( try_rollback_transaction(connection).await; let err = display_stmt_db_error(source_file, statement, e); query_metrics.record_error(0_i64, &err); - record_pool_snapshot(request); return Err(err); } }; @@ -450,15 +445,6 @@ fn vars_and_name<'a, 'b>( } } -fn record_pool_snapshot(request: &ExecutionContext) { - let db = &request.app_state.db; - request.app_state.telemetry_metrics.record_pool_snapshot( - db.info.database_type.otel_name(), - db.connection.size(), - db.connection.num_idle(), - ); -} - async fn take_connection<'a>( db: &'a Database, conn: &'a mut DbConn, @@ -476,7 +462,6 @@ async fn take_connection<'a>( *conn = Some(c); let connection = conn.as_mut().unwrap(); set_trace_context(connection, db).await; - record_pool_snapshot(request); Ok(connection) } Err(e) => { @@ -901,7 +886,7 @@ mod tests { exception.details = tracing::field::Empty, db.response.returned_rows = tracing::field::Empty, ); - let metrics = crate::telemetry_metrics::TelemetryMetrics::new("sqlite"); + let metrics = crate::telemetry_metrics::TelemetryMetrics::default(); let query_metrics = DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); query_metrics.record_success(3); @@ -925,7 +910,7 @@ mod tests { db.response.returned_rows = tracing::field::Empty, ); let error = anyhow!("query failed").context("while executing SELECT 1"); - let metrics = crate::telemetry_metrics::TelemetryMetrics::new("sqlite"); + let metrics = crate::telemetry_metrics::TelemetryMetrics::default(); let query_metrics = DbQueryMetricsContext::new(span.clone(), "SELECT".to_string(), "sqlite", &metrics); query_metrics.record_error(2, &error); From 8f8fc09d21dff598441be91d9784211c3099cc47 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 23:51:34 +0100 Subject: [PATCH 25/27] telemetry: export OTEL metrics every second in example stack --- examples/telemetry/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/telemetry/docker-compose.yml b/examples/telemetry/docker-compose.yml index 53d0ccb2..e4640c06 100644 --- a/examples/telemetry/docker-compose.yml +++ b/examples/telemetry/docker-compose.yml @@ -23,6 +23,7 @@ services: environment: - DATABASE_URL=postgres://sqlpage:sqlpage@postgres:5432/sqlpage - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 + - OTEL_METRIC_EXPORT_INTERVAL=1000 - OTEL_SERVICE_NAME=sqlpage volumes: - ./website:/var/www From acd38a717036d3b55a6de1d0e25e4b99ad76fe42 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sat, 14 Mar 2026 23:52:25 +0100 Subject: [PATCH 26/27] Improve Grafana trace list links and readability --- examples/telemetry/grafana/sqlpage-home.json | 89 ++++++++++++++++++-- 1 file changed, 84 insertions(+), 5 deletions(-) diff --git a/examples/telemetry/grafana/sqlpage-home.json b/examples/telemetry/grafana/sqlpage-home.json index 60c0900a..3cd35cb9 100644 --- a/examples/telemetry/grafana/sqlpage-home.json +++ b/examples/telemetry/grafana/sqlpage-home.json @@ -258,8 +258,8 @@ }, "properties": [ { - "id": "custom.width", - "value": 300 + "id": "custom.hidden", + "value": true } ] }, @@ -283,7 +283,35 @@ "properties": [ { "id": "custom.width", - "value": 140 + "value": 120 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "traceName" + }, + "properties": [ + { + "id": "custom.width", + "value": 520 + }, + { + "id": "custom.cellOptions", + "value": { + "type": "data-links" + } + }, + { + "id": "links", + "value": [ + { + "targetBlank": false, + "title": "${__value.text}", + "url": "/a/grafana-exploretraces-app/explore?traceId=${__data.fields.traceID}" + } + ] } ] }, @@ -370,7 +398,7 @@ "renameByName": { "startTime": "Start time", "traceDuration": "Duration", - "traceID": "Trace ID", + "traceID": "Trace", "traceName": "Route", "traceService": "Service" } @@ -379,6 +407,36 @@ ], "type": "table" }, + { + "datasource": { + "type": "tempo", + "uid": "tempo" + }, + "gridPos": { + "h": 10, + "w": 24, + "x": 0, + "y": 30 + }, + "id": 12, + "pluginVersion": "12.4.0", + "targets": [ + { + "datasource": { + "type": "tempo", + "uid": "tempo" + }, + "limit": 20, + "query": "$traceId", + "queryType": "traceql", + "refId": "A", + "tableType": "traces" + } + ], + "timeFrom": "1h", + "title": "Selected Trace", + "type": "traces" + }, { "datasource": { "type": "loki", @@ -465,7 +523,28 @@ "style": "dark", "tags": ["sqlpage", "tracing", "logs", "metrics"], "templating": { - "list": [] + "list": [ + { + "current": { + "selected": true, + "text": "", + "value": "" + }, + "hide": 2, + "label": "Trace ID", + "name": "traceId", + "options": [ + { + "selected": true, + "text": "", + "value": "" + } + ], + "query": "", + "skipUrlSync": false, + "type": "textbox" + } + ] }, "time": { "from": "now-1h", From 9fe421d0bcb020603526b88d9bdd00780414502c Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sun, 15 Mar 2026 09:11:23 +0100 Subject: [PATCH 27/27] Restore removed DB connection debug logs --- src/webserver/database/connect.rs | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/webserver/database/connect.rs b/src/webserver/database/connect.rs index 0cac02b0..dfb49654 100644 --- a/src/webserver/database/connect.rs +++ b/src/webserver/database/connect.rs @@ -107,8 +107,15 @@ impl Database { fn add_on_return_to_pool(config: &AppConfig, pool_options: PoolOptions) -> PoolOptions { let on_disconnect_file = config.configuration_directory.join(ON_RESET_FILE); let sql = if on_disconnect_file.exists() { + log::info!( + "Creating a custom SQL connection cleanup handler from {}", + on_disconnect_file.display() + ); match std::fs::read_to_string(&on_disconnect_file) { - Ok(sql) => Some(std::sync::Arc::new(sql)), + Ok(sql) => { + log::trace!("The custom SQL connection cleanup handler is:\n{sql}"); + Some(std::sync::Arc::new(sql)) + } Err(e) => { log::error!( "Unable to read the file {}: {}", @@ -119,6 +126,10 @@ fn add_on_return_to_pool(config: &AppConfig, pool_options: PoolOptions) -> } } } else { + log::debug!( + "Not creating a custom SQL connection cleanup handler because {} does not exist", + on_disconnect_file.display() + ); None }; @@ -159,9 +170,17 @@ fn add_on_connection_handler( pool_options: PoolOptions, ) -> PoolOptions { let on_connect_file = config.configuration_directory.join(ON_CONNECT_FILE); + let on_connect_file_display = on_connect_file.display().to_string(); let sql = if on_connect_file.exists() { + log::info!( + "Creating a custom SQL database connection handler from {}", + on_connect_file.display() + ); match std::fs::read_to_string(&on_connect_file) { - Ok(sql) => Some(std::sync::Arc::new(sql)), + Ok(sql) => { + log::trace!("The custom SQL database connection handler is:\n{sql}"); + Some(std::sync::Arc::new(sql)) + } Err(e) => { log::error!( "Unable to read the file {}: {}", @@ -172,14 +191,19 @@ fn add_on_connection_handler( } } } else { + log::debug!( + "Not creating a custom SQL database connection handler because {} does not exist", + on_connect_file.display() + ); None }; pool_options.after_connect(move |conn, _| { let sql = sql.clone(); + let on_connect_file_display = on_connect_file_display.clone(); Box::pin(async move { if let Some(sql) = sql { - log::debug!("Running connection handler on new connection"); + log::debug!("Running {on_connect_file_display} on new connection"); let r = conn.execute(sql.as_str()).await?; log::debug!("Finished running connection handler on new connection: {r:?}"); }