Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file.

- Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]).
- Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]).
- Support configuring an external endpoint of Trino clusters.
This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there.
Please note that this runs a database migration on Postgres ([#100]).

### Changed

Expand All @@ -23,13 +26,16 @@ All notable changes to this project will be documented in this file.

- Set connection and response timeout for Redis connections ([#85]).
- Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]).
- Correctly proxy HEAD requests to `/v1/statement/executing/{queryId}/{slug}/{token}`.
Previously, we would GET (instead of HEAD) the URL at the Trino cluster, which resulted in trino-lb dropping the HTTP body, causing problems ([#100]).

[#68]: https://github.com/stackabletech/trino-lb/pull/68
[#85]: https://github.com/stackabletech/trino-lb/pull/85
[#86]: https://github.com/stackabletech/trino-lb/pull/86
[#91]: https://github.com/stackabletech/trino-lb/pull/91
[#95]: https://github.com/stackabletech/trino-lb/pull/95
[#98]: https://github.com/stackabletech/trino-lb/pull/98
[#100]: https://github.com/stackabletech/trino-lb/pull/100

## [0.5.0] - 2025-03-14

Expand Down
4 changes: 4 additions & 0 deletions trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig {
pub struct TrinoClusterConfig {
pub name: String,
pub endpoint: Url,

/// Public endpoint of the Trino cluster.
/// This can e.g. be used to change segment ackUris to.
pub external_endpoint: Option<Url>,
pub credentials: TrinoClusterCredentialsConfig,
}

Expand Down
151 changes: 134 additions & 17 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use prusto::{QueryError, Warning};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::{Value, value::RawValue};
use snafu::{ResultExt, Snafu};
use tracing::instrument;
use url::Url;
Expand All @@ -20,8 +20,11 @@ pub enum Error {
trino_lb_addr: Url,
},

#[snafu(display("Failed to parse nextUri Trino send us"))]
ParseNextUriFromTrino { source: url::ParseError },
#[snafu(display("Failed to parse segment ackUri Trino send us"))]
ParseSegmentAckUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to change segment ackUri to point to external Trino address"))]
ChangeSegmentAckUriToTrino { source: url::ParseError },

#[snafu(display(
"Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"
Expand All @@ -43,12 +46,12 @@ pub struct TrinoQueryApiResponse {
pub id: TrinoQueryId,

/// Normally this will always be set, only the last call will not return a `next_uri`.
pub next_uri: Option<String>,
pub info_uri: String,
pub partial_cancel_uri: Option<String>,
pub next_uri: Option<Url>,
pub info_uri: Url,
pub partial_cancel_uri: Option<Url>,

pub columns: Option<Box<RawValue>>,
pub data: Option<Box<RawValue>>,
pub data: Option<Box<Value>>,

pub error: Option<QueryError>,
pub warnings: Vec<Warning>,
Expand Down Expand Up @@ -94,10 +97,9 @@ impl TrinoQueryApiResponse {
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> {
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: Url) -> Result<(), Error> {
if let Some(next_uri) = &self.next_uri {
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string());
self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr));
}

Ok(())
Expand Down Expand Up @@ -132,15 +134,13 @@ impl TrinoQueryApiResponse {
))
.context(JoinApiPathToTrinoLbUrlSnafu {
trino_lb_addr: trino_lb_addr.clone(),
})?
.to_string(),
})?,
),
info_uri: trino_lb_addr
.join(&format!("ui/query.html?{query_id}"))
.context(JoinApiPathToTrinoLbUrlSnafu {
trino_lb_addr: trino_lb_addr.clone(),
})?
.to_string(),
})?,
partial_cancel_uri: None,
columns: None,
data: None,
Expand Down Expand Up @@ -172,17 +172,81 @@ impl TrinoQueryApiResponse {
update_count: None,
})
}

/// Changes the following references in the query (if they exist)
///
/// 1. nextUri to point to trino-lb
/// 2. In case the `external_trino_endpoint` is set, update the segments ackUri to point to the
/// external address of Trino. Trino sometimes gets confused (likely by some HTTP headers) and
/// puts the trino-lb address into the ackUri (but the requests should go to Trino directly).
#[instrument(
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn update_trino_references(
&mut self,
trino_lb_addr: Url,
external_trino_endpoint: Option<&Url>,
) -> Result<(), Error> {
// Point nextUri to trino-lb
if let Some(next_uri) = &self.next_uri {
self.next_uri = Some(change_next_uri_to_trino_lb(next_uri, trino_lb_addr));
}

// Point segment ackUris to Trino
if let Some(external_trino_endpoint) = external_trino_endpoint
&& let Some(data) = self.data.as_deref_mut()
{
change_segment_ack_uris_to_trino(data, external_trino_endpoint)?;
}

Ok(())
}
}

fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url {
let mut result = trino_lb_addr.clone();
#[instrument(
fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr),
)]
fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: Url) -> Url {
let mut result = trino_lb_addr;
result.set_path(next_uri.path());
result
}

#[instrument(
skip(data),
fields(external_trino_endpoint = %external_trino_endpoint),
)]
fn change_segment_ack_uris_to_trino(
data: &mut Value,
external_trino_endpoint: &Url,
) -> Result<(), Error> {
let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else {
return Ok(());
};

for segment in segments {
if let Some("spooled") = segment.get("type").and_then(Value::as_str)
&& let Some(ack_uri) = segment.get_mut("ackUri")
&& let Some(ack_uri_str) = ack_uri.as_str()
{
let parsed_ack_uri = ack_uri_str
.parse::<Url>()
.context(ParseSegmentAckUriFromTrinoSnafu)?;
let mut result = external_trino_endpoint.clone();
result.set_path(parsed_ack_uri.path());

*ack_uri = Value::String(result.to_string());
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;

use super::*;

Expand Down Expand Up @@ -211,7 +275,60 @@ mod tests {
) {
let next_uri = Url::parse(&next_uri).unwrap();
let trino_lb_addr = Url::parse(&trino_lb_addr).unwrap();
let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr);
let result = change_next_uri_to_trino_lb(&next_uri, trino_lb_addr);
assert_eq!(result.to_string(), expected);
}

#[test]
fn test_change_segment_ack_uris_to_trino() {
let mut data = json!({
"encoding": "json+zstd",
"segments": [
{
"type": "spooled",
"uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d",
"ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=",
"metadata": {
"segmentSize": 2716023,
"uncompressedSize": 7706400,
"rowsCount": 43761,
"expiresAt": "2025-12-13T01:16:21.454",
"rowOffset": 10952
},
"headers": {
"x-amz-server-side-encryption-customer-algorithm": [
"AES256"
],
"x-amz-server-side-encryption-customer-key": [
"iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY="
],
"x-amz-server-side-encryption-customer-key-MD5": [
"D1VfXAwD/ffApNMNf3gBig=="
]
}
}
]
});
let external_trino_endpoint = "https://trino-external:1234"
.parse()
.expect("static URL is always valid");

change_segment_ack_uris_to_trino(&mut data, &external_trino_endpoint).unwrap();

let segment = data
.get("segments")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap();
assert_eq!(
segment.get("uri").unwrap(),
"https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d"
);
assert_eq!(
segment.get("ackUri").unwrap(),
"https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4="
);
}
}
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct TrinoQuery {
/// Endpoint of the Trino cluster the query is running on.
pub trino_endpoint: Url,

/// (Optionally, if configured) public endpoint of the Trino cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Slight reword. The fact that this option is optional implies that it only takes effect when configured.

Suggested change
/// (Optionally, if configured) public endpoint of the Trino cluster.
/// Optional public endpoint of the Trino cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer my variant as it makes it a bit more clear.
Every Trino always has a public endpoint. The question is, if trino-lb was configured to know about it or not

/// This can e.g. be used to change segment ackUris to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Change to... what? I feel like this sentence needs to better communicate what actually gets updated/changed.

pub trino_external_endpoint: Option<Url>,

/// The time the query was submitted to trino-lb.
pub creation_time: SystemTime,

Expand Down Expand Up @@ -80,13 +84,15 @@ impl TrinoQuery {
trino_cluster: TrinoClusterName,
trino_query_id: TrinoQueryId,
trino_endpoint: Url,
trino_external_endpoint: Option<Url>,
creation_time: SystemTime,
delivered_time: SystemTime,
) -> Self {
TrinoQuery {
id: trino_query_id,
trino_cluster,
trino_endpoint,
trino_external_endpoint,
creation_time,
delivered_time,
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions trino-lb-persistence/src/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Postgres sqlx stuff

First start a postgres:

```bash
docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres
```

Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation:

```bash
export DATABASE_URL=postgres://postgres:postgres@localhost/postgres

cd trino-lb-persistence

cargo sqlx migrate run --source src/postgres/migrations
cargo sqlx prepare --workspace
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE queries
-- nullable, as it's Option<&str>
ADD trino_external_endpoint VARCHAR;
Loading
Loading