From 0f67ea255e4b6b590c42586ab52593c520b32476 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 14 Jan 2026 08:49:41 +0100 Subject: [PATCH 01/13] feat: Support configuring the external endpoint of Trino clusters for ackUri rewriting --- CHANGELOG.md | 4 + trino-lb-core/src/config.rs | 4 + trino-lb-core/src/trino_api.rs | 117 +++++++++++++++++- trino-lb-core/src/trino_query.rs | 6 + ...8b812633632a68235f2fa26f80a9c5352ded.json} | 5 +- ...9250d852451790bb2509b260fafdb268b6b1.json} | 12 +- trino-lb-persistence/src/postgres/README.md | 16 +++ ...0101314410_add_trino_external_endpoint.sql | 3 + trino-lb-persistence/src/postgres/mod.rs | 18 ++- trino-lb/src/cluster_group_manager.rs | 2 + trino-lb/src/http_server/v1/statement.rs | 11 +- trino-lb/src/scaling/mod.rs | 1 + 12 files changed, 186 insertions(+), 13 deletions(-) rename trino-lb-persistence/.sqlx/{query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json => query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json} (60%) rename trino-lb-persistence/.sqlx/{query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json => query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json} (66%) create mode 100644 trino-lb-persistence/src/postgres/README.md create mode 100644 trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index cae06d0..8ff1458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file. - Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]). - Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]). +- Support configuring an external endpoint of Trino clusters. + This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there. + Please note that this runs a database migration on Postgres ([#XX]). ### Changed @@ -28,6 +31,7 @@ All notable changes to this project will be documented in this file. [#86]: https://github.com/stackabletech/trino-lb/pull/86 [#91]: https://github.com/stackabletech/trino-lb/pull/91 [#95]: https://github.com/stackabletech/trino-lb/pull/95 +[#XX]: https://github.com/stackabletech/trino-lb/pull/XX ## [0.5.0] - 2025-03-14 diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 19c4287..71745ca 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig { pub struct TrinoClusterConfig { pub name: String, pub endpoint: Url, + + /// Public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub external_endpoint: Option, pub credentials: TrinoClusterCredentialsConfig, } diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index d03dcc5..1c020d0 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -5,7 +5,7 @@ use std::{ use prusto::{QueryError, Warning}; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; +use serde_json::{Value, value::RawValue}; use snafu::{ResultExt, Snafu}; use tracing::instrument; use url::Url; @@ -23,6 +23,12 @@ pub enum Error { #[snafu(display("Failed to parse nextUri Trino send us"))] ParseNextUriFromTrino { source: url::ParseError }, + #[snafu(display("Failed to parse segment ackUri Trino send us"))] + ParseSegmentAckUriFromTrino { source: url::ParseError }, + + #[snafu(display("Failed to change segment ackUri to point to external Trino address"))] + ChangeSegmentAckUriToTrino { source: url::ParseError }, + #[snafu(display( "Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?" ))] @@ -48,7 +54,7 @@ pub struct TrinoQueryApiResponse { pub partial_cancel_uri: Option, pub columns: Option>, - pub data: Option>, + pub data: Option>, pub error: Option, pub warnings: Vec, @@ -156,29 +162,81 @@ impl TrinoQueryApiResponse { }) } + /// Changes the following references in the query (if they exist) + /// + /// 1. nextUri to point to trino-lb + /// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external + /// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the + /// trino-lb address into the ackUri (but the requests should go to Trino directly). #[instrument( skip(self), fields(trino_lb_addr = %trino_lb_addr), )] - pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { + pub fn update_trino_references( + &mut self, + trino_lb_addr: &Url, + external_trino_addr: Option<&Url>, + ) -> Result<(), Error> { + // Point nextUri to trino-lb if let Some(next_uri) = &self.next_uri { let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); } + // Point segment ackUris to Trino + if let Some(external_trino_addr) = external_trino_addr + && let Some(data) = self.data.as_deref_mut() + { + change_segment_ack_uris_to_trino(data, external_trino_addr)?; + } + Ok(()) } } +#[instrument( + fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr), +)] fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url { let mut result = trino_lb_addr.clone(); result.set_path(next_uri.path()); result } +#[instrument( + skip(data), + fields(external_trino_addr = %external_trino_addr), +)] +fn change_segment_ack_uris_to_trino( + data: &mut Value, + external_trino_addr: &Url, +) -> Result<(), Error> { + let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else { + return Ok(()); + }; + + for segment in segments { + if let Some("spooled") = segment.get("type").and_then(Value::as_str) + && let Some(ack_uri) = segment.get_mut("ackUri") + && let Some(ack_uri_str) = ack_uri.as_str() + { + let parsed_ack_uri = ack_uri_str + .parse::() + .context(ParseSegmentAckUriFromTrinoSnafu)?; + let mut result = external_trino_addr.clone(); + result.set_path(parsed_ack_uri.path()); + + *ack_uri = Value::String(result.to_string()); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use rstest::rstest; + use serde_json::json; use super::*; @@ -210,4 +268,57 @@ mod tests { let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr); assert_eq!(result.to_string(), expected); } + + #[test] + fn test_change_segment_ack_uris_to_trino() { + let mut data = json!({ + "encoding": "json+zstd", + "segments": [ + { + "type": "spooled", + "uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d", + "ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=", + "metadata": { + "segmentSize": 2716023, + "uncompressedSize": 7706400, + "rowsCount": 43761, + "expiresAt": "2025-12-13T01:16:21.454", + "rowOffset": 10952 + }, + "headers": { + "x-amz-server-side-encryption-customer-algorithm": [ + "AES256" + ], + "x-amz-server-side-encryption-customer-key": [ + "iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY=" + ], + "x-amz-server-side-encryption-customer-key-MD5": [ + "D1VfXAwD/ffApNMNf3gBig==" + ] + } + } + ] + }); + let external_trino_addr = "https://trino-external:1234" + .parse() + .expect("static URL is always valid"); + + change_segment_ack_uris_to_trino(&mut data, &external_trino_addr).unwrap(); + + let segment = data + .get("segments") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap(); + assert_eq!( + segment.get("uri").unwrap(), + "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d" + ); + assert_eq!( + segment.get("ackUri").unwrap(), + "https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=" + ); + } } diff --git a/trino-lb-core/src/trino_query.rs b/trino-lb-core/src/trino_query.rs index 9701ac8..ebf0d0e 100644 --- a/trino-lb-core/src/trino_query.rs +++ b/trino-lb-core/src/trino_query.rs @@ -52,6 +52,10 @@ pub struct TrinoQuery { /// Endpoint of the Trino cluster the query is running on. pub trino_endpoint: Url, + /// (Optionally, if configured) public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub trino_external_endpoint: Option, + /// The time the query was submitted to trino-lb. pub creation_time: SystemTime, @@ -80,6 +84,7 @@ impl TrinoQuery { trino_cluster: TrinoClusterName, trino_query_id: TrinoQueryId, trino_endpoint: Url, + trino_external_endpoint: Option, creation_time: SystemTime, delivered_time: SystemTime, ) -> Self { @@ -87,6 +92,7 @@ impl TrinoQuery { id: trino_query_id, trino_cluster, trino_endpoint, + trino_external_endpoint, creation_time, delivered_time, } diff --git a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json similarity index 60% rename from trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json rename to trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json index a184cf3..a2abdf4 100644 --- a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json +++ b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5)", + "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5, $6)", "describe": { "columns": [], "parameters": { @@ -8,11 +8,12 @@ "Varchar", "Varchar", "Varchar", + "Varchar", "Timestamptz", "Timestamptz" ] }, "nullable": [] }, - "hash": "dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b" + "hash": "1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded" } diff --git a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json similarity index 66% rename from trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json rename to trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json index 45ce62e..cb5a1ab 100644 --- a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json +++ b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", + "query": "SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", "describe": { "columns": [ { @@ -20,11 +20,16 @@ }, { "ordinal": 3, + "name": "trino_external_endpoint", + "type_info": "Varchar" + }, + { + "ordinal": 4, "name": "creation_time", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "delivered_time", "type_info": "Timestamptz" } @@ -38,9 +43,10 @@ false, false, false, + true, false, false ] }, - "hash": "d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1" + "hash": "e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1" } diff --git a/trino-lb-persistence/src/postgres/README.md b/trino-lb-persistence/src/postgres/README.md new file mode 100644 index 0000000..cb48fc5 --- /dev/null +++ b/trino-lb-persistence/src/postgres/README.md @@ -0,0 +1,16 @@ +First start a postgres: + +```bash +docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres +``` + +Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation: + +```bash +export DATABASE_URL=postgres://postgres:postgres@localhost/postgres + +cd trino-lb-persistence + +cargo sqlx migrate run --source src/postgres/migrations +cargo sqlx prepare --workspace +``` diff --git a/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql new file mode 100644 index 0000000..aff5474 --- /dev/null +++ b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql @@ -0,0 +1,3 @@ +ALTER TABLE queries + -- nullable, as it's Option<&str> +ADD trino_external_endpoint VARCHAR; diff --git a/trino-lb-persistence/src/postgres/mod.rs b/trino-lb-persistence/src/postgres/mod.rs index 710eb31..7d08f25 100644 --- a/trino-lb-persistence/src/postgres/mod.rs +++ b/trino-lb-persistence/src/postgres/mod.rs @@ -92,6 +92,9 @@ pub enum Error { #[snafu(display("Failed to parse endpoint url of cluster from stored query"))] ParseClusterEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to parse external endpoint url of cluster from stored query"))] + ParseClusterExternalEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to convert max query counter to u64, as it is too high"))] ConvertMaxAllowedQueryCounterToU64 { source: TryFromIntError }, @@ -204,11 +207,12 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self, query))] async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> { query!( - r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time) - VALUES ($1, $2, $3, $4, $5)"#, + r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time) + VALUES ($1, $2, $3, $4, $5, $6)"#, query.id, query.trino_cluster, query.trino_endpoint.as_str(), + query.trino_external_endpoint.as_ref().map(Url::as_str), Into::>::into(query.creation_time), Into::>::into(query.delivered_time), ) @@ -222,7 +226,7 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self))] async fn load_query(&self, query_id: &TrinoQueryId) -> Result { let result = query!( - r#"SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time + r#"SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time FROM queries WHERE id = $1"#, query_id, @@ -231,11 +235,19 @@ impl Persistence for PostgresPersistence { .await .context(LoadQuerySnafu)?; + let trino_external_endpoint = match result.trino_external_endpoint { + Some(trino_external_endpoint) => Some( + Url::parse(&trino_external_endpoint) + .context(ParseClusterExternalEndpointFromStoredQuerySnafu)?, + ), + None => None, + }; let query = TrinoQuery { id: result.id, trino_cluster: result.trino_cluster, trino_endpoint: Url::parse(&result.trino_endpoint) .context(ParseClusterEndpointFromStoredQuerySnafu)?, + trino_external_endpoint, creation_time: result.creation_time.into(), delivered_time: result.delivered_time.into(), }; diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index b98f419..1156399 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -86,6 +86,7 @@ pub struct TrinoCluster { pub name: String, pub max_running_queries: u64, pub endpoint: Url, + pub external_endpoint: Option, } #[derive(Clone, Debug, Serialize)] @@ -145,6 +146,7 @@ impl ClusterGroupManager { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group); diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index be55120..ffed593 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -304,6 +304,7 @@ async fn queue_or_hand_over_query( cluster.name.clone(), trino_query_api_response.id.clone(), cluster.endpoint.clone(), + cluster.external_endpoint.clone(), *creation_time, SystemTime::now(), ); @@ -316,7 +317,10 @@ async fn queue_or_hand_over_query( )?; trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + &state.config.trino_lb.external_address, + cluster.external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; info!( @@ -458,7 +462,10 @@ async fn handle_query_running_on_trino( if trino_query_api_response.next_uri.is_some() { // Change the nextUri to actually point to trino-lb instead of Trino. trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + &state.config.trino_lb.external_address, + query.trino_external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; } else { info!(%query_id, "Query completed (no next_uri send)"); diff --git a/trino-lb/src/scaling/mod.rs b/trino-lb/src/scaling/mod.rs index 9fdf66f..8c83010 100644 --- a/trino-lb/src/scaling/mod.rs +++ b/trino-lb/src/scaling/mod.rs @@ -200,6 +200,7 @@ impl Scaler { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group); From f72c9add47a07caeb25cf18058292110260ed267 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 14 Jan 2026 09:35:12 +0100 Subject: [PATCH 02/13] Remove unnecessary file --- _TEST/query-finished.json | 79 --------------------------------------- 1 file changed, 79 deletions(-) delete mode 100644 _TEST/query-finished.json diff --git a/_TEST/query-finished.json b/_TEST/query-finished.json deleted file mode 100644 index 3135491..0000000 --- a/_TEST/query-finished.json +++ /dev/null @@ -1,79 +0,0 @@ -{ - "id": "20251212_121505_00013_em2hb", - "nextUri": null, - "infoUri": "https://trino-client-spooling-coordinator:8443/ui/query.html?20251212_121505_00013_em2hb", - "partialCancelUri": null, - "columns": [ - { - "name": "_col0", - "type": "bigint", - "typeSignature": { - "rawType": "bigint", - "arguments": [] - } - } - ], - "data": null, - "error": null, - "warnings": [], - "stats": { - "completedSplits": 41, - "cpuTimeMillis": 8768, - "elapsedTimeMillis": 10002, - "nodes": 1, - "peakMemoryBytes": 2896, - "physicalInputBytes": 0, - "processedBytes": 0, - "processedRows": 15000000, - "progressPercentage": 100.0, - "queuedSplits": 0, - "queuedTimeMillis": 0, - "queued": false, - "rootStage": { - "completedSplits": 17, - "coordinatorOnly": false, - "cpuTimeMillis": 8, - "done": true, - "failedTasks": 0, - "nodes": 1, - "physicalInputBytes": 0, - "processedBytes": 1056, - "processedRows": 24, - "queuedSplits": 0, - "runningSplits": 0, - "stageId": "0", - "state": "FINISHED", - "subStages": [ - { - "completedSplits": 24, - "coordinatorOnly": false, - "cpuTimeMillis": 8760, - "done": true, - "failedTasks": 0, - "nodes": 1, - "physicalInputBytes": 0, - "processedBytes": 0, - "processedRows": 15000000, - "queuedSplits": 0, - "runningSplits": 0, - "stageId": "1", - "state": "FINISHED", - "subStages": [], - "totalSplits": 24, - "wallTimeMillis": 207600 - } - ], - "totalSplits": 17, - "wallTimeMillis": 195 - }, - "runningPercentage": 0.0, - "runningSplits": 0, - "scheduled": true, - "spilledBytes": 0, - "state": "FINISHED", - "totalSplits": 41, - "wallTimeMillis": 207795 - }, - "updateType": null, - "updateCount": null -} From ec1c67a20d70b89fe9642109754971146d098f24 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 14 Jan 2026 09:55:27 +0100 Subject: [PATCH 03/13] changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 840563f..213d713 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file. - Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]). - Support configuring an external endpoint of Trino clusters. This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there. - Please note that this runs a database migration on Postgres ([#XX]). + Please note that this runs a database migration on Postgres ([#100]). ### Changed @@ -33,7 +33,7 @@ All notable changes to this project will be documented in this file. [#91]: https://github.com/stackabletech/trino-lb/pull/91 [#95]: https://github.com/stackabletech/trino-lb/pull/95 [#98]: https://github.com/stackabletech/trino-lb/pull/98 -[#XX]: https://github.com/stackabletech/trino-lb/pull/XX +[#100]: https://github.com/stackabletech/trino-lb/pull/100 ## [0.5.0] - 2025-03-14 From 7c17b4eb7c7b6acee701083e201a7635d9815d13 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 14 Jan 2026 11:46:53 +0100 Subject: [PATCH 04/13] fix trailing whitespace --- trino-lb-persistence/src/postgres/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trino-lb-persistence/src/postgres/README.md b/trino-lb-persistence/src/postgres/README.md index cb48fc5..84e8e90 100644 --- a/trino-lb-persistence/src/postgres/README.md +++ b/trino-lb-persistence/src/postgres/README.md @@ -1,7 +1,7 @@ First start a postgres: ```bash -docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres +docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres ``` Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation: From 26e2840830bfbf1299291935ad642a0003eb2111 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 14 Jan 2026 11:50:34 +0100 Subject: [PATCH 05/13] markdown linter --- trino-lb-persistence/src/postgres/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/trino-lb-persistence/src/postgres/README.md b/trino-lb-persistence/src/postgres/README.md index 84e8e90..20827c2 100644 --- a/trino-lb-persistence/src/postgres/README.md +++ b/trino-lb-persistence/src/postgres/README.md @@ -1,3 +1,5 @@ +# Postgres sqlx stuff + First start a postgres: ```bash From 2615b9c5bb2be131afac88f30f035a493ae679a5 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 20 Jan 2026 10:34:41 +0100 Subject: [PATCH 06/13] fix: Correctly proxy HEAD requests --- CHANGELOG.md | 2 + trino-lb/src/cluster_group_manager.rs | 32 ++++++++++- trino-lb/src/http_server/v1/statement.rs | 67 ++++++++++++++++++++++-- 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 213d713..a115d2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ All notable changes to this project will be documented in this file. - Set connection and response timeout for Redis connections ([#85]). - Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]). +- Correctly proxy HEAD requests to `/v1/statement/executing/{queryId}/{slug}/{token}`. + Previously, we would GET (instead of HEAD) the URL at the Trino cluster, which resulted in trino-lb dropping the HTTP body, causing problems ([#100]). [#68]: https://github.com/stackabletech/trino-lb/pull/68 [#85]: https://github.com/stackabletech/trino-lb/pull/85 diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index 1156399..49f78ea 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -35,6 +35,9 @@ pub enum Error { #[snafu(display("Failed to contact Trino API to post query"))] ContactTrinoPostQuery { source: reqwest::Error }, + #[snafu(display("Failed to call Trino HEAD URL {url:?}"))] + CallTrinoHeadUrl { source: reqwest::Error, url: Url }, + #[snafu(display("Failed to decode Trino API response"))] DecodeTrinoResponse { source: reqwest::Error }, @@ -232,7 +235,7 @@ impl ClusterGroupManager { .get(next_uri) .headers(headers) .send() - .instrument(info_span!("Send HTTP get to Trino")) + .instrument(info_span!("Send HTTP GET to Trino")) .await .context(ContactTrinoPostQuerySnafu)?; let headers = response.headers(); @@ -249,6 +252,33 @@ impl ClusterGroupManager { Ok((trino_query_api_response, headers)) } + /// Sometimes the trino-client HEADs a /executing/xxx endpoint instead of GETing it. + /// We need to proxy this as a HEAD request as well. + #[instrument( + skip(self), + fields(head_uri = %head_uri, headers = ?headers.sanitize()) + )] + pub async fn send_head_to_trino( + &self, + head_uri: Url, + mut headers: HeaderMap, + ) -> Result { + add_current_context_to_client_request(tracing::Span::current().context(), &mut headers); + + let response = self + .http_client + .head(head_uri.clone()) + .headers(headers) + .send() + .instrument(info_span!("Send HTTP HEAD to Trino")) + .await + .with_context(|_| CallTrinoHeadUrlSnafu { url: head_uri })?; + let headers = response.headers(); + let headers = filter_to_trino_headers(headers); + + Ok(headers) + } + #[instrument( skip(self), fields(request_headers = ?request_headers.sanitize()) diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 2c03dad..6e37d54 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -8,6 +8,7 @@ use std::{ use axum::{ Json, + body::Body, extract::{Path, State}, response::{IntoResponse, Response}, }; @@ -93,6 +94,11 @@ pub enum Error { source: cluster_group_manager::Error, }, + #[snafu(display("Failed to send HEAD request to trino"))] + SendHeadToTrino { + source: cluster_group_manager::Error, + }, + #[snafu(display( "Failed to decrement the query counter query trino cluster {trino_cluster:?}" ))] @@ -216,22 +222,45 @@ pub async fn get_trino_queued_statement( /// Trino cluster and currently running. /// /// In case the nextUri is null, the query will be stopped and removed from trino-lb. +/// +/// Please note that sometimes the Trino clients also HEAD this endpoint, in which case we need some +/// special handling. #[instrument( - name = "GET /v1/statement/executing/{queryId}/{slug}/{token}", + name = "GET (or HEAD) /v1/statement/executing/{queryId}/{slug}/{token}", skip(state, headers) )] pub async fn get_trino_executing_statement( + method: http::Method, headers: HeaderMap, State(state): State>, Path((query_id, _, _)): Path<(TrinoQueryId, String, u64)>, uri: Uri, -) -> Result<(HeaderMap, Json), Error> { +) -> Result { + if method == http::Method::HEAD { + state.metrics.http_counter.add( + 1, + &[KeyValue::new("resource", "head_trino_executing_statement")], + ); + + let headers = handle_head_request_to_trino(&state, headers, query_id, uri.path()).await?; + + // For a HEAD request we don't need (nor can) return a body. + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::OK; + *response.headers_mut() = headers; + return Ok(response); + } state.metrics.http_counter.add( 1, &[KeyValue::new("resource", "get_trino_executing_statement")], ); - handle_query_running_on_trino(&state, headers, query_id, uri.path()).await + let (headers, body) = + handle_query_running_on_trino(&state, headers, query_id, uri.path()).await?; + + let mut response = body.into_response(); + *response.headers_mut() = headers; + Ok(response) } #[instrument(skip( @@ -495,6 +524,38 @@ async fn handle_query_running_on_trino( Ok((trino_headers, Json(trino_query_api_response))) } +async fn handle_head_request_to_trino( + state: &Arc, + headers: HeaderMap, + query_id: TrinoQueryId, + requested_path: &str, +) -> Result { + let query = + state + .persistence + .load_query(&query_id) + .await + .context(LoadQueryFromPersistenceSnafu { + query_id: query_id.clone(), + })?; + + let headers = state + .cluster_group_manager + .send_head_to_trino( + query.trino_endpoint.join(requested_path).context( + JoinRequestPathToTrinoEndpointSnafu { + requested_path, + trino_endpoint: query.trino_endpoint.clone(), + }, + )?, + headers, + ) + .await + .context(SendHeadToTrinoSnafu)?; + + Ok(headers) +} + /// This function get's asked to delete the queued query. /// IMPORTANT: It does not check that the user is authorized to delete the queued query. Instead we assume that the /// random part of the queryId trino-lb generates provides sufficient protection, as other clients can not extract From 5557722491726c7e57310ab975dcd6ad135aa465 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 20 Jan 2026 10:57:08 +0100 Subject: [PATCH 07/13] fix: Only append HTTP headers --- trino-lb/src/http_server/v1/statement.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 6e37d54..87d9a76 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -259,7 +259,9 @@ pub async fn get_trino_executing_statement( handle_query_running_on_trino(&state, headers, query_id, uri.path()).await?; let mut response = body.into_response(); - *response.headers_mut() = headers; + // We can not simply replace the headers, as otherwise e.g. "Content-Type: application/json" + // would be missing + response.headers_mut().extend(headers); Ok(response) } From cfc1edeafe8115ad65c07e108d490ea1ff550103 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 3 Feb 2026 13:55:14 +0100 Subject: [PATCH 08/13] Update trino-lb-core/src/trino_api.rs Co-authored-by: Techassi --- trino-lb-core/src/trino_api.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index b3c06c4..b7b01e6 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -182,9 +182,9 @@ impl TrinoQueryApiResponse { /// Changes the following references in the query (if they exist) /// /// 1. nextUri to point to trino-lb - /// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external - /// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the - /// trino-lb address into the ackUri (but the requests should go to Trino directly). + /// 2. In case the `external_trino_addr` is set, update the segments ackUri to point to the + /// external address of Trino. Trino sometimes gets confused (likely by some HTTP headers) and + /// puts the trino-lb address into the ackUri (but the requests should go to Trino directly). #[instrument( skip(self), fields(trino_lb_addr = %trino_lb_addr), From ee5a8d492890c611291f39e50df4415346a5f941 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 3 Feb 2026 13:58:21 +0100 Subject: [PATCH 09/13] Serde as Url directly (instead as String) --- trino-lb-core/src/trino_api.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index b7b01e6..034db81 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -49,9 +49,9 @@ pub struct TrinoQueryApiResponse { pub id: TrinoQueryId, /// Normally this will always be set, only the last call will not return a `next_uri`. - pub next_uri: Option, - pub info_uri: String, - pub partial_cancel_uri: Option, + pub next_uri: Option, + pub info_uri: Url, + pub partial_cancel_uri: Option, pub columns: Option>, pub data: Option>, @@ -102,8 +102,7 @@ impl TrinoQueryApiResponse { )] pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { if let Some(next_uri) = &self.next_uri { - let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; - self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); + self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr)); } Ok(()) @@ -138,15 +137,13 @@ impl TrinoQueryApiResponse { )) .context(JoinApiPathToTrinoLbUrlSnafu { trino_lb_addr: trino_lb_addr.clone(), - })? - .to_string(), + })?, ), info_uri: trino_lb_addr .join(&format!("ui/query.html?{query_id}")) .context(JoinApiPathToTrinoLbUrlSnafu { trino_lb_addr: trino_lb_addr.clone(), - })? - .to_string(), + })?, partial_cancel_uri: None, columns: None, data: None, @@ -196,8 +193,7 @@ impl TrinoQueryApiResponse { ) -> Result<(), Error> { // Point nextUri to trino-lb if let Some(next_uri) = &self.next_uri { - let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; - self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); + self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr)); } // Point segment ackUris to Trino From 875fb249e45f38a61e2897ecefdf7ede1ef10c88 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 3 Feb 2026 13:59:29 +0100 Subject: [PATCH 10/13] Remove unused error --- trino-lb-core/src/trino_api.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index 034db81..361de83 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -20,9 +20,6 @@ pub enum Error { trino_lb_addr: Url, }, - #[snafu(display("Failed to parse nextUri Trino send us"))] - ParseNextUriFromTrino { source: url::ParseError }, - #[snafu(display("Failed to parse segment ackUri Trino send us"))] ParseSegmentAckUriFromTrino { source: url::ParseError }, From f40cabd3fe22851bf42524382e30a09d6316cdb3 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 5 Feb 2026 12:31:21 +0100 Subject: [PATCH 11/13] Pass Url instead of &Url --- trino-lb-core/src/trino_api.rs | 10 +++++----- trino-lb/src/http_server/v1/statement.rs | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index 361de83..a848362 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -97,7 +97,7 @@ impl TrinoQueryApiResponse { skip(self), fields(trino_lb_addr = %trino_lb_addr), )] - pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { + pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: Url) -> Result<(), Error> { if let Some(next_uri) = &self.next_uri { self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr)); } @@ -185,7 +185,7 @@ impl TrinoQueryApiResponse { )] pub fn update_trino_references( &mut self, - trino_lb_addr: &Url, + trino_lb_addr: Url, external_trino_addr: Option<&Url>, ) -> Result<(), Error> { // Point nextUri to trino-lb @@ -207,8 +207,8 @@ impl TrinoQueryApiResponse { #[instrument( fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr), )] -fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url { - let mut result = trino_lb_addr.clone(); +fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: Url) -> Url { + let mut result = trino_lb_addr; result.set_path(next_uri.path()); result } @@ -275,7 +275,7 @@ mod tests { ) { let next_uri = Url::parse(&next_uri).unwrap(); let trino_lb_addr = Url::parse(&trino_lb_addr).unwrap(); - let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr); + let result = change_next_uri_to_trino_lb(&next_uri, trino_lb_addr); assert_eq!(result.to_string(), expected); } diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 87d9a76..7cf455f 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -349,7 +349,7 @@ async fn queue_or_hand_over_query( trino_query_api_response .update_trino_references( - &state.config.trino_lb.external_address, + state.config.trino_lb.external_address.clone(), cluster.external_endpoint.as_ref(), ) .context(ModifyNextUriSnafu)?; @@ -517,7 +517,7 @@ async fn handle_query_running_on_trino( // Change the nextUri to actually point to trino-lb instead of Trino. trino_query_api_response .update_trino_references( - &state.config.trino_lb.external_address, + state.config.trino_lb.external_address.clone(), query.trino_external_endpoint.as_ref(), ) .context(ModifyNextUriSnafu)?; From e9b150be73a2c163673c7132a09fa62eb7c1e195 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 5 Feb 2026 12:42:21 +0100 Subject: [PATCH 12/13] Convert HTTP method handling to match --- trino-lb/src/http_server/v1/statement.rs | 62 +++++++++++++++--------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 7cf455f..bfdecda 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -133,6 +133,12 @@ pub enum Error { requested_path: String, trino_endpoint: Url, }, + + #[snafu(display("Unexpected HTTP method {actual}, expected on of {expected:?}"))] + UnexpectedHttpMethod { + actual: http::Method, + expected: Vec, + }, } impl IntoResponse for Error { @@ -236,33 +242,43 @@ pub async fn get_trino_executing_statement( Path((query_id, _, _)): Path<(TrinoQueryId, String, u64)>, uri: Uri, ) -> Result { - if method == http::Method::HEAD { - state.metrics.http_counter.add( - 1, - &[KeyValue::new("resource", "head_trino_executing_statement")], - ); + match method { + http::Method::GET => { + state.metrics.http_counter.add( + 1, + &[KeyValue::new("resource", "get_trino_executing_statement")], + ); - let headers = handle_head_request_to_trino(&state, headers, query_id, uri.path()).await?; + let (headers, body) = + handle_query_running_on_trino(&state, headers, query_id, uri.path()).await?; - // For a HEAD request we don't need (nor can) return a body. - let mut response = Response::new(Body::empty()); - *response.status_mut() = StatusCode::OK; - *response.headers_mut() = headers; - return Ok(response); - } - state.metrics.http_counter.add( - 1, - &[KeyValue::new("resource", "get_trino_executing_statement")], - ); + let mut response = body.into_response(); + // We can not simply replace the headers, as otherwise e.g. "Content-Type: application/json" + // would be missing + response.headers_mut().extend(headers); + Ok(response) + } + http::Method::HEAD => { + state.metrics.http_counter.add( + 1, + &[KeyValue::new("resource", "head_trino_executing_statement")], + ); - let (headers, body) = - handle_query_running_on_trino(&state, headers, query_id, uri.path()).await?; + let headers = + handle_head_request_to_trino(&state, headers, query_id, uri.path()).await?; - let mut response = body.into_response(); - // We can not simply replace the headers, as otherwise e.g. "Content-Type: application/json" - // would be missing - response.headers_mut().extend(headers); - Ok(response) + // For a HEAD request we don't need (nor can) return a body. + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::OK; + *response.headers_mut() = headers; + Ok(response) + } + _ => UnexpectedHttpMethodSnafu { + actual: method, + expected: vec![http::Method::GET, http::Method::HEAD], + } + .fail(), + } } #[instrument(skip( From 54b235de523e0bf3df3b07f8a33b0975ea429625 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 5 Feb 2026 12:53:37 +0100 Subject: [PATCH 13/13] external_trino_addr -> external_trino_endpoint --- trino-lb-core/src/trino_api.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index a848362..645f38d 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -176,7 +176,7 @@ impl TrinoQueryApiResponse { /// Changes the following references in the query (if they exist) /// /// 1. nextUri to point to trino-lb - /// 2. In case the `external_trino_addr` is set, update the segments ackUri to point to the + /// 2. In case the `external_trino_endpoint` is set, update the segments ackUri to point to the /// external address of Trino. Trino sometimes gets confused (likely by some HTTP headers) and /// puts the trino-lb address into the ackUri (but the requests should go to Trino directly). #[instrument( @@ -186,7 +186,7 @@ impl TrinoQueryApiResponse { pub fn update_trino_references( &mut self, trino_lb_addr: Url, - external_trino_addr: Option<&Url>, + external_trino_endpoint: Option<&Url>, ) -> Result<(), Error> { // Point nextUri to trino-lb if let Some(next_uri) = &self.next_uri { @@ -194,10 +194,10 @@ impl TrinoQueryApiResponse { } // Point segment ackUris to Trino - if let Some(external_trino_addr) = external_trino_addr + if let Some(external_trino_endpoint) = external_trino_endpoint && let Some(data) = self.data.as_deref_mut() { - change_segment_ack_uris_to_trino(data, external_trino_addr)?; + change_segment_ack_uris_to_trino(data, external_trino_endpoint)?; } Ok(()) @@ -215,11 +215,11 @@ fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: Url) -> Url { #[instrument( skip(data), - fields(external_trino_addr = %external_trino_addr), + fields(external_trino_endpoint = %external_trino_endpoint), )] fn change_segment_ack_uris_to_trino( data: &mut Value, - external_trino_addr: &Url, + external_trino_endpoint: &Url, ) -> Result<(), Error> { let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else { return Ok(()); @@ -233,7 +233,7 @@ fn change_segment_ack_uris_to_trino( let parsed_ack_uri = ack_uri_str .parse::() .context(ParseSegmentAckUriFromTrinoSnafu)?; - let mut result = external_trino_addr.clone(); + let mut result = external_trino_endpoint.clone(); result.set_path(parsed_ack_uri.path()); *ack_uri = Value::String(result.to_string()); @@ -309,11 +309,11 @@ mod tests { } ] }); - let external_trino_addr = "https://trino-external:1234" + let external_trino_endpoint = "https://trino-external:1234" .parse() .expect("static URL is always valid"); - change_segment_ack_uris_to_trino(&mut data, &external_trino_addr).unwrap(); + change_segment_ack_uris_to_trino(&mut data, &external_trino_endpoint).unwrap(); let segment = data .get("segments")