diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f31d25..a115d2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file. - Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]). - Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]). +- Support configuring an external endpoint of Trino clusters. + This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there. + Please note that this runs a database migration on Postgres ([#100]). ### Changed @@ -23,6 +26,8 @@ All notable changes to this project will be documented in this file. - Set connection and response timeout for Redis connections ([#85]). - Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]). +- Correctly proxy HEAD requests to `/v1/statement/executing/{queryId}/{slug}/{token}`. + Previously, we would GET (instead of HEAD) the URL at the Trino cluster, which resulted in trino-lb dropping the HTTP body, causing problems ([#100]). [#68]: https://github.com/stackabletech/trino-lb/pull/68 [#85]: https://github.com/stackabletech/trino-lb/pull/85 @@ -30,6 +35,7 @@ All notable changes to this project will be documented in this file. [#91]: https://github.com/stackabletech/trino-lb/pull/91 [#95]: https://github.com/stackabletech/trino-lb/pull/95 [#98]: https://github.com/stackabletech/trino-lb/pull/98 +[#100]: https://github.com/stackabletech/trino-lb/pull/100 ## [0.5.0] - 2025-03-14 diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 19c4287..71745ca 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig { pub struct TrinoClusterConfig { pub name: String, pub endpoint: Url, + + /// Public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub external_endpoint: Option, pub credentials: TrinoClusterCredentialsConfig, } diff --git a/trino-lb-core/src/trino_api.rs b/trino-lb-core/src/trino_api.rs index 4e9c831..645f38d 100644 --- a/trino-lb-core/src/trino_api.rs +++ b/trino-lb-core/src/trino_api.rs @@ -5,7 +5,7 @@ use std::{ use prusto::{QueryError, Warning}; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; +use serde_json::{Value, value::RawValue}; use snafu::{ResultExt, Snafu}; use tracing::instrument; use url::Url; @@ -20,8 +20,11 @@ pub enum Error { trino_lb_addr: Url, }, - #[snafu(display("Failed to parse nextUri Trino send us"))] - ParseNextUriFromTrino { source: url::ParseError }, + #[snafu(display("Failed to parse segment ackUri Trino send us"))] + ParseSegmentAckUriFromTrino { source: url::ParseError }, + + #[snafu(display("Failed to change segment ackUri to point to external Trino address"))] + ChangeSegmentAckUriToTrino { source: url::ParseError }, #[snafu(display( "Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?" @@ -43,12 +46,12 @@ pub struct TrinoQueryApiResponse { pub id: TrinoQueryId, /// Normally this will always be set, only the last call will not return a `next_uri`. - pub next_uri: Option, - pub info_uri: String, - pub partial_cancel_uri: Option, + pub next_uri: Option, + pub info_uri: Url, + pub partial_cancel_uri: Option, pub columns: Option>, - pub data: Option>, + pub data: Option>, pub error: Option, pub warnings: Vec, @@ -94,10 +97,9 @@ impl TrinoQueryApiResponse { skip(self), fields(trino_lb_addr = %trino_lb_addr), )] - pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> { + pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: Url) -> Result<(), Error> { if let Some(next_uri) = &self.next_uri { - let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?; - self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string()); + self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr)); } Ok(()) @@ -132,15 +134,13 @@ impl TrinoQueryApiResponse { )) .context(JoinApiPathToTrinoLbUrlSnafu { trino_lb_addr: trino_lb_addr.clone(), - })? - .to_string(), + })?, ), info_uri: trino_lb_addr .join(&format!("ui/query.html?{query_id}")) .context(JoinApiPathToTrinoLbUrlSnafu { trino_lb_addr: trino_lb_addr.clone(), - })? - .to_string(), + })?, partial_cancel_uri: None, columns: None, data: None, @@ -172,17 +172,81 @@ impl TrinoQueryApiResponse { update_count: None, }) } + + /// Changes the following references in the query (if they exist) + /// + /// 1. nextUri to point to trino-lb + /// 2. In case the `external_trino_endpoint` is set, update the segments ackUri to point to the + /// external address of Trino. Trino sometimes gets confused (likely by some HTTP headers) and + /// puts the trino-lb address into the ackUri (but the requests should go to Trino directly). + #[instrument( + skip(self), + fields(trino_lb_addr = %trino_lb_addr), + )] + pub fn update_trino_references( + &mut self, + trino_lb_addr: Url, + external_trino_endpoint: Option<&Url>, + ) -> Result<(), Error> { + // Point nextUri to trino-lb + if let Some(next_uri) = &self.next_uri { + self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr)); + } + + // Point segment ackUris to Trino + if let Some(external_trino_endpoint) = external_trino_endpoint + && let Some(data) = self.data.as_deref_mut() + { + change_segment_ack_uris_to_trino(data, external_trino_endpoint)?; + } + + Ok(()) + } } -fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url { - let mut result = trino_lb_addr.clone(); +#[instrument( + fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr), +)] +fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: Url) -> Url { + let mut result = trino_lb_addr; result.set_path(next_uri.path()); result } +#[instrument( + skip(data), + fields(external_trino_endpoint = %external_trino_endpoint), +)] +fn change_segment_ack_uris_to_trino( + data: &mut Value, + external_trino_endpoint: &Url, +) -> Result<(), Error> { + let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else { + return Ok(()); + }; + + for segment in segments { + if let Some("spooled") = segment.get("type").and_then(Value::as_str) + && let Some(ack_uri) = segment.get_mut("ackUri") + && let Some(ack_uri_str) = ack_uri.as_str() + { + let parsed_ack_uri = ack_uri_str + .parse::() + .context(ParseSegmentAckUriFromTrinoSnafu)?; + let mut result = external_trino_endpoint.clone(); + result.set_path(parsed_ack_uri.path()); + + *ack_uri = Value::String(result.to_string()); + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use rstest::rstest; + use serde_json::json; use super::*; @@ -211,7 +275,60 @@ mod tests { ) { let next_uri = Url::parse(&next_uri).unwrap(); let trino_lb_addr = Url::parse(&trino_lb_addr).unwrap(); - let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr); + let result = change_next_uri_to_trino_lb(&next_uri, trino_lb_addr); assert_eq!(result.to_string(), expected); } + + #[test] + fn test_change_segment_ack_uris_to_trino() { + let mut data = json!({ + "encoding": "json+zstd", + "segments": [ + { + "type": "spooled", + "uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d", + "ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=", + "metadata": { + "segmentSize": 2716023, + "uncompressedSize": 7706400, + "rowsCount": 43761, + "expiresAt": "2025-12-13T01:16:21.454", + "rowOffset": 10952 + }, + "headers": { + "x-amz-server-side-encryption-customer-algorithm": [ + "AES256" + ], + "x-amz-server-side-encryption-customer-key": [ + "iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY=" + ], + "x-amz-server-side-encryption-customer-key-MD5": [ + "D1VfXAwD/ffApNMNf3gBig==" + ] + } + } + ] + }); + let external_trino_endpoint = "https://trino-external:1234" + .parse() + .expect("static URL is always valid"); + + change_segment_ack_uris_to_trino(&mut data, &external_trino_endpoint).unwrap(); + + let segment = data + .get("segments") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap(); + assert_eq!( + segment.get("uri").unwrap(), + "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d" + ); + assert_eq!( + segment.get("ackUri").unwrap(), + "https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=" + ); + } } diff --git a/trino-lb-core/src/trino_query.rs b/trino-lb-core/src/trino_query.rs index 9701ac8..ebf0d0e 100644 --- a/trino-lb-core/src/trino_query.rs +++ b/trino-lb-core/src/trino_query.rs @@ -52,6 +52,10 @@ pub struct TrinoQuery { /// Endpoint of the Trino cluster the query is running on. pub trino_endpoint: Url, + /// (Optionally, if configured) public endpoint of the Trino cluster. + /// This can e.g. be used to change segment ackUris to. + pub trino_external_endpoint: Option, + /// The time the query was submitted to trino-lb. pub creation_time: SystemTime, @@ -80,6 +84,7 @@ impl TrinoQuery { trino_cluster: TrinoClusterName, trino_query_id: TrinoQueryId, trino_endpoint: Url, + trino_external_endpoint: Option, creation_time: SystemTime, delivered_time: SystemTime, ) -> Self { @@ -87,6 +92,7 @@ impl TrinoQuery { id: trino_query_id, trino_cluster, trino_endpoint, + trino_external_endpoint, creation_time, delivered_time, } diff --git a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json similarity index 60% rename from trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json rename to trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json index a184cf3..a2abdf4 100644 --- a/trino-lb-persistence/.sqlx/query-dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b.json +++ b/trino-lb-persistence/.sqlx/query-1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5)", + "query": "INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time)\n VALUES ($1, $2, $3, $4, $5, $6)", "describe": { "columns": [], "parameters": { @@ -8,11 +8,12 @@ "Varchar", "Varchar", "Varchar", + "Varchar", "Timestamptz", "Timestamptz" ] }, "nullable": [] }, - "hash": "dcdff617008bd9e5db76c1aa6099da4728bca33d362d6ff990e14b7e4b17288b" + "hash": "1f8cc8bc6093d2fc198d0b2849568b812633632a68235f2fa26f80a9c5352ded" } diff --git a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json similarity index 66% rename from trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json rename to trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json index 45ce62e..cb5a1ab 100644 --- a/trino-lb-persistence/.sqlx/query-d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1.json +++ b/trino-lb-persistence/.sqlx/query-e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", + "query": "SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time\n FROM queries\n WHERE id = $1", "describe": { "columns": [ { @@ -20,11 +20,16 @@ }, { "ordinal": 3, + "name": "trino_external_endpoint", + "type_info": "Varchar" + }, + { + "ordinal": 4, "name": "creation_time", "type_info": "Timestamptz" }, { - "ordinal": 4, + "ordinal": 5, "name": "delivered_time", "type_info": "Timestamptz" } @@ -38,9 +43,10 @@ false, false, false, + true, false, false ] }, - "hash": "d6b1cc38b5e80b51b157d1989beec66a68126c1606a1ae39caeed6ecd380b9f1" + "hash": "e727412091850e7ac76082775abe9250d852451790bb2509b260fafdb268b6b1" } diff --git a/trino-lb-persistence/src/postgres/README.md b/trino-lb-persistence/src/postgres/README.md new file mode 100644 index 0000000..20827c2 --- /dev/null +++ b/trino-lb-persistence/src/postgres/README.md @@ -0,0 +1,18 @@ +# Postgres sqlx stuff + +First start a postgres: + +```bash +docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres +``` + +Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation: + +```bash +export DATABASE_URL=postgres://postgres:postgres@localhost/postgres + +cd trino-lb-persistence + +cargo sqlx migrate run --source src/postgres/migrations +cargo sqlx prepare --workspace +``` diff --git a/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql new file mode 100644 index 0000000..aff5474 --- /dev/null +++ b/trino-lb-persistence/src/postgres/migrations/20260101314410_add_trino_external_endpoint.sql @@ -0,0 +1,3 @@ +ALTER TABLE queries + -- nullable, as it's Option<&str> +ADD trino_external_endpoint VARCHAR; diff --git a/trino-lb-persistence/src/postgres/mod.rs b/trino-lb-persistence/src/postgres/mod.rs index 710eb31..7d08f25 100644 --- a/trino-lb-persistence/src/postgres/mod.rs +++ b/trino-lb-persistence/src/postgres/mod.rs @@ -92,6 +92,9 @@ pub enum Error { #[snafu(display("Failed to parse endpoint url of cluster from stored query"))] ParseClusterEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to parse external endpoint url of cluster from stored query"))] + ParseClusterExternalEndpointFromStoredQuery { source: url::ParseError }, + #[snafu(display("Failed to convert max query counter to u64, as it is too high"))] ConvertMaxAllowedQueryCounterToU64 { source: TryFromIntError }, @@ -204,11 +207,12 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self, query))] async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> { query!( - r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time) - VALUES ($1, $2, $3, $4, $5)"#, + r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time) + VALUES ($1, $2, $3, $4, $5, $6)"#, query.id, query.trino_cluster, query.trino_endpoint.as_str(), + query.trino_external_endpoint.as_ref().map(Url::as_str), Into::>::into(query.creation_time), Into::>::into(query.delivered_time), ) @@ -222,7 +226,7 @@ impl Persistence for PostgresPersistence { #[instrument(skip(self))] async fn load_query(&self, query_id: &TrinoQueryId) -> Result { let result = query!( - r#"SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time + r#"SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time FROM queries WHERE id = $1"#, query_id, @@ -231,11 +235,19 @@ impl Persistence for PostgresPersistence { .await .context(LoadQuerySnafu)?; + let trino_external_endpoint = match result.trino_external_endpoint { + Some(trino_external_endpoint) => Some( + Url::parse(&trino_external_endpoint) + .context(ParseClusterExternalEndpointFromStoredQuerySnafu)?, + ), + None => None, + }; let query = TrinoQuery { id: result.id, trino_cluster: result.trino_cluster, trino_endpoint: Url::parse(&result.trino_endpoint) .context(ParseClusterEndpointFromStoredQuerySnafu)?, + trino_external_endpoint, creation_time: result.creation_time.into(), delivered_time: result.delivered_time.into(), }; diff --git a/trino-lb/src/cluster_group_manager.rs b/trino-lb/src/cluster_group_manager.rs index b98f419..49f78ea 100644 --- a/trino-lb/src/cluster_group_manager.rs +++ b/trino-lb/src/cluster_group_manager.rs @@ -35,6 +35,9 @@ pub enum Error { #[snafu(display("Failed to contact Trino API to post query"))] ContactTrinoPostQuery { source: reqwest::Error }, + #[snafu(display("Failed to call Trino HEAD URL {url:?}"))] + CallTrinoHeadUrl { source: reqwest::Error, url: Url }, + #[snafu(display("Failed to decode Trino API response"))] DecodeTrinoResponse { source: reqwest::Error }, @@ -86,6 +89,7 @@ pub struct TrinoCluster { pub name: String, pub max_running_queries: u64, pub endpoint: Url, + pub external_endpoint: Option, } #[derive(Clone, Debug, Serialize)] @@ -145,6 +149,7 @@ impl ClusterGroupManager { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group); @@ -230,7 +235,7 @@ impl ClusterGroupManager { .get(next_uri) .headers(headers) .send() - .instrument(info_span!("Send HTTP get to Trino")) + .instrument(info_span!("Send HTTP GET to Trino")) .await .context(ContactTrinoPostQuerySnafu)?; let headers = response.headers(); @@ -247,6 +252,33 @@ impl ClusterGroupManager { Ok((trino_query_api_response, headers)) } + /// Sometimes the trino-client HEADs a /executing/xxx endpoint instead of GETing it. + /// We need to proxy this as a HEAD request as well. + #[instrument( + skip(self), + fields(head_uri = %head_uri, headers = ?headers.sanitize()) + )] + pub async fn send_head_to_trino( + &self, + head_uri: Url, + mut headers: HeaderMap, + ) -> Result { + add_current_context_to_client_request(tracing::Span::current().context(), &mut headers); + + let response = self + .http_client + .head(head_uri.clone()) + .headers(headers) + .send() + .instrument(info_span!("Send HTTP HEAD to Trino")) + .await + .with_context(|_| CallTrinoHeadUrlSnafu { url: head_uri })?; + let headers = response.headers(); + let headers = filter_to_trino_headers(headers); + + Ok(headers) + } + #[instrument( skip(self), fields(request_headers = ?request_headers.sanitize()) diff --git a/trino-lb/src/http_server/v1/statement.rs b/trino-lb/src/http_server/v1/statement.rs index 47f1ae2..bfdecda 100644 --- a/trino-lb/src/http_server/v1/statement.rs +++ b/trino-lb/src/http_server/v1/statement.rs @@ -8,6 +8,7 @@ use std::{ use axum::{ Json, + body::Body, extract::{Path, State}, response::{IntoResponse, Response}, }; @@ -93,6 +94,11 @@ pub enum Error { source: cluster_group_manager::Error, }, + #[snafu(display("Failed to send HEAD request to trino"))] + SendHeadToTrino { + source: cluster_group_manager::Error, + }, + #[snafu(display( "Failed to decrement the query counter query trino cluster {trino_cluster:?}" ))] @@ -127,6 +133,12 @@ pub enum Error { requested_path: String, trino_endpoint: Url, }, + + #[snafu(display("Unexpected HTTP method {actual}, expected on of {expected:?}"))] + UnexpectedHttpMethod { + actual: http::Method, + expected: Vec, + }, } impl IntoResponse for Error { @@ -216,22 +228,57 @@ pub async fn get_trino_queued_statement( /// Trino cluster and currently running. /// /// In case the nextUri is null, the query will be stopped and removed from trino-lb. +/// +/// Please note that sometimes the Trino clients also HEAD this endpoint, in which case we need some +/// special handling. #[instrument( - name = "GET /v1/statement/executing/{queryId}/{slug}/{token}", + name = "GET (or HEAD) /v1/statement/executing/{queryId}/{slug}/{token}", skip(state, headers) )] pub async fn get_trino_executing_statement( + method: http::Method, headers: HeaderMap, State(state): State>, Path((query_id, _, _)): Path<(TrinoQueryId, String, u64)>, uri: Uri, -) -> Result<(HeaderMap, Json), Error> { - state.metrics.http_counter.add( - 1, - &[KeyValue::new("resource", "get_trino_executing_statement")], - ); +) -> Result { + match method { + http::Method::GET => { + state.metrics.http_counter.add( + 1, + &[KeyValue::new("resource", "get_trino_executing_statement")], + ); - handle_query_running_on_trino(&state, headers, query_id, uri.path()).await + let (headers, body) = + handle_query_running_on_trino(&state, headers, query_id, uri.path()).await?; + + let mut response = body.into_response(); + // We can not simply replace the headers, as otherwise e.g. "Content-Type: application/json" + // would be missing + response.headers_mut().extend(headers); + Ok(response) + } + http::Method::HEAD => { + state.metrics.http_counter.add( + 1, + &[KeyValue::new("resource", "head_trino_executing_statement")], + ); + + let headers = + handle_head_request_to_trino(&state, headers, query_id, uri.path()).await?; + + // For a HEAD request we don't need (nor can) return a body. + let mut response = Response::new(Body::empty()); + *response.status_mut() = StatusCode::OK; + *response.headers_mut() = headers; + Ok(response) + } + _ => UnexpectedHttpMethodSnafu { + actual: method, + expected: vec![http::Method::GET, http::Method::HEAD], + } + .fail(), + } } #[instrument(skip( @@ -304,6 +351,7 @@ async fn queue_or_hand_over_query( cluster.name.clone(), trino_query_api_response.id.clone(), cluster.endpoint.clone(), + cluster.external_endpoint.clone(), *creation_time, SystemTime::now(), ); @@ -316,7 +364,10 @@ async fn queue_or_hand_over_query( )?; trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + state.config.trino_lb.external_address.clone(), + cluster.external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; info!( @@ -481,13 +532,48 @@ async fn handle_query_running_on_trino( } else { // Change the nextUri to actually point to trino-lb instead of Trino. trino_query_api_response - .change_next_uri_to_trino_lb(&state.config.trino_lb.external_address) + .update_trino_references( + state.config.trino_lb.external_address.clone(), + query.trino_external_endpoint.as_ref(), + ) .context(ModifyNextUriSnafu)?; } Ok((trino_headers, Json(trino_query_api_response))) } +async fn handle_head_request_to_trino( + state: &Arc, + headers: HeaderMap, + query_id: TrinoQueryId, + requested_path: &str, +) -> Result { + let query = + state + .persistence + .load_query(&query_id) + .await + .context(LoadQueryFromPersistenceSnafu { + query_id: query_id.clone(), + })?; + + let headers = state + .cluster_group_manager + .send_head_to_trino( + query.trino_endpoint.join(requested_path).context( + JoinRequestPathToTrinoEndpointSnafu { + requested_path, + trino_endpoint: query.trino_endpoint.clone(), + }, + )?, + headers, + ) + .await + .context(SendHeadToTrinoSnafu)?; + + Ok(headers) +} + /// This function get's asked to delete the queued query. /// IMPORTANT: It does not check that the user is authorized to delete the queued query. Instead we assume that the /// random part of the queryId trino-lb generates provides sufficient protection, as other clients can not extract diff --git a/trino-lb/src/scaling/mod.rs b/trino-lb/src/scaling/mod.rs index 9fdf66f..8c83010 100644 --- a/trino-lb/src/scaling/mod.rs +++ b/trino-lb/src/scaling/mod.rs @@ -200,6 +200,7 @@ impl Scaler { name: cluster_name, max_running_queries: group_config.max_running_queries, endpoint: cluster_config.endpoint.clone(), + external_endpoint: cluster_config.external_endpoint.clone(), }) } groups.insert(group_name.clone(), group);