Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file.

- Support activating and deactivation Trino clusters via API calls to `/admin/clusters/{cluster_name}/activate` and `/admin/clusters/{cluster_name}/deactivate` respectively. For this to work you need to authenticate yourself at trino-lb via basic auth ([#95]).
- Expose cluster statistics at `/admin/clusters/{cluster_name}/status` and `/admin/clusters/status` ([#95]).
- Support configuring an external endpoint of Trino clusters.
This is used to update the segments ackUris to, as sometimes Trino get's confused and put's the wrong endpoint (namely the one of trino-lb) in there.
Please note that this runs a database migration on Postgres ([#100]).

### Changed

Expand All @@ -23,13 +26,16 @@ All notable changes to this project will be documented in this file.

- Set connection and response timeout for Redis connections ([#85]).
- Only remove queries from the persistence in case they don't send a `nextUri` and are in state `FINISHED` ([#98]).
- Correctly proxy HEAD requests to `/v1/statement/executing/{queryId}/{slug}/{token}`.
Previously, we would GET (instead of HEAD) the URL at the Trino cluster, which resulted in trino-lb dropping the HTTP body, causing problems ([#100]).

[#68]: https://github.com/stackabletech/trino-lb/pull/68
[#85]: https://github.com/stackabletech/trino-lb/pull/85
[#86]: https://github.com/stackabletech/trino-lb/pull/86
[#91]: https://github.com/stackabletech/trino-lb/pull/91
[#95]: https://github.com/stackabletech/trino-lb/pull/95
[#98]: https://github.com/stackabletech/trino-lb/pull/98
[#100]: https://github.com/stackabletech/trino-lb/pull/100

## [0.5.0] - 2025-03-14

Expand Down
4 changes: 4 additions & 0 deletions trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ pub struct TrinoClusterGroupConfig {
pub struct TrinoClusterConfig {
pub name: String,
pub endpoint: Url,

/// Public endpoint of the Trino cluster.
/// This can e.g. be used to change segment ackUris to.
pub external_endpoint: Option<Url>,
pub credentials: TrinoClusterCredentialsConfig,
}

Expand Down
128 changes: 126 additions & 2 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use prusto::{QueryError, Warning};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::{Value, value::RawValue};
use snafu::{ResultExt, Snafu};
use tracing::instrument;
use url::Url;
Expand All @@ -23,6 +23,12 @@ pub enum Error {
#[snafu(display("Failed to parse nextUri Trino send us"))]
ParseNextUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to parse segment ackUri Trino send us"))]
ParseSegmentAckUriFromTrino { source: url::ParseError },

#[snafu(display("Failed to change segment ackUri to point to external Trino address"))]
ChangeSegmentAckUriToTrino { source: url::ParseError },

#[snafu(display(
"Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"
))]
Expand All @@ -48,7 +54,7 @@ pub struct TrinoQueryApiResponse {
pub partial_cancel_uri: Option<String>,

pub columns: Option<Box<RawValue>>,
pub data: Option<Box<RawValue>>,
pub data: Option<Box<Value>>,

pub error: Option<QueryError>,
pub warnings: Vec<Warning>,
Expand Down Expand Up @@ -172,17 +178,82 @@ impl TrinoQueryApiResponse {
update_count: None,
})
}

/// Changes the following references in the query (if they exist)
///
/// 1. nextUri to point to trino-lb
/// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external
/// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the
/// trino-lb address into the ackUri (but the requests should go to Trino directly).
Comment on lines +185 to +187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Something seems to be missing here.

Suggested change
/// 2. In case the `external_trino_addr` is set, segments ackUri to point to the external
/// address of Trino. Trino sometimes get's confused (likely by some HTTP) headers and put's the
/// trino-lb address into the ackUri (but the requests should go to Trino directly).
/// 2. In case the `external_trino_addr` is set, update the segments ackUri to point to the
/// external address of Trino. Trino sometimes gets confused (likely by some HTTP headers) and
/// puts the trino-lb address into the ackUri (but the requests should go to Trino directly).

#[instrument(
skip(self),
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn update_trino_references(
&mut self,
trino_lb_addr: &Url,
external_trino_addr: Option<&Url>,
Comment on lines +194 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why are these parameters called *_addr if they contain URLs? I think their names should be updated to reflect this.

) -> Result<(), Error> {
// Point nextUri to trino-lb
if let Some(next_uri) = &self.next_uri {
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Immediately parsing this as a URL when deserializing TrinoQueryApiResponse might be a better idea and also gets rid of the to_string call below.

self.next_uri = Some(change_next_uri_to_trino_lb(&next_uri, trino_lb_addr).to_string());
}

// Point segment ackUris to Trino
if let Some(external_trino_addr) = external_trino_addr
&& let Some(data) = self.data.as_deref_mut()
{
change_segment_ack_uris_to_trino(data, external_trino_addr)?;
}

Ok(())
}
}

#[instrument(
fields(next_uri = %next_uri, trino_lb_addr = %trino_lb_addr),
)]
fn change_next_uri_to_trino_lb(next_uri: &Url, trino_lb_addr: &Url) -> Url {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: We should not take &Url for trino_lb_addr here if the first thing we do is to clone it.

let mut result = trino_lb_addr.clone();
result.set_path(next_uri.path());
result
}

#[instrument(
skip(data),
fields(external_trino_addr = %external_trino_addr),
)]
fn change_segment_ack_uris_to_trino(
data: &mut Value,
external_trino_addr: &Url,
) -> Result<(), Error> {
let Some(segments) = data.get_mut("segments").and_then(Value::as_array_mut) else {
return Ok(());
};

for segment in segments {
if let Some("spooled") = segment.get("type").and_then(Value::as_str)
&& let Some(ack_uri) = segment.get_mut("ackUri")
&& let Some(ack_uri_str) = ack_uri.as_str()
{
let parsed_ack_uri = ack_uri_str
.parse::<Url>()
.context(ParseSegmentAckUriFromTrinoSnafu)?;
let mut result = external_trino_addr.clone();
result.set_path(parsed_ack_uri.path());

*ack_uri = Value::String(result.to_string());
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use rstest::rstest;
use serde_json::json;

use super::*;

Expand Down Expand Up @@ -214,4 +285,57 @@ mod tests {
let result = change_next_uri_to_trino_lb(&next_uri, &trino_lb_addr);
assert_eq!(result.to_string(), expected);
}

#[test]
fn test_change_segment_ack_uris_to_trino() {
let mut data = json!({
"encoding": "json+zstd",
"segments": [
{
"type": "spooled",
"uri": "https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d",
"ackUri": "https://trino-client-spooling-coordinator:8443/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4=",
"metadata": {
"segmentSize": 2716023,
"uncompressedSize": 7706400,
"rowsCount": 43761,
"expiresAt": "2025-12-13T01:16:21.454",
"rowOffset": 10952
},
"headers": {
"x-amz-server-side-encryption-customer-algorithm": [
"AES256"
],
"x-amz-server-side-encryption-customer-key": [
"iemW0eosEhVVn+QR3q/OApysz8ieRCzAHngdoJFlbHY="
],
"x-amz-server-side-encryption-customer-key-MD5": [
"D1VfXAwD/ffApNMNf3gBig=="
]
}
}
]
});
let external_trino_addr = "https://trino-external:1234"
.parse()
.expect("static URL is always valid");

change_segment_ack_uris_to_trino(&mut data, &external_trino_addr).unwrap();

let segment = data
.get("segments")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap();
assert_eq!(
segment.get("uri").unwrap(),
"https://minio:9000/trino/spooling/01KCAH1KEE432S8VXFDJTZYTTT.json%2Bzstd?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=20251212T121622Z&X-Amz-SignedHeaders=host%3Bx-amz-server-side-encryption-customer-algorithm%3Bx-amz-server-side-encryption-customer-key%3Bx-amz-server-side-encryption-customer-key-md5&X-Amz-Credential=minioAccessKey%2F20251212%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=331b80bdae6c92352d12985ae8863dddbc72c755d49466c1aeeb732cd08b7d8d"
);
assert_eq!(
segment.get("ackUri").unwrap(),
"https://trino-external:1234/v1/spooled/ack/LYp8Bg6PDoTuUO86fmNMQNhtC0xryOhvWpL2LXhwLI4="
);
}
}
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct TrinoQuery {
/// Endpoint of the Trino cluster the query is running on.
pub trino_endpoint: Url,

/// (Optionally, if configured) public endpoint of the Trino cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Slight reword. The fact that this option is optional implies that it only takes effect when configured.

Suggested change
/// (Optionally, if configured) public endpoint of the Trino cluster.
/// Optional public endpoint of the Trino cluster.

/// This can e.g. be used to change segment ackUris to.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Change to... what? I feel like this sentence needs to better communicate what actually gets updated/changed.

pub trino_external_endpoint: Option<Url>,

/// The time the query was submitted to trino-lb.
pub creation_time: SystemTime,

Expand Down Expand Up @@ -80,13 +84,15 @@ impl TrinoQuery {
trino_cluster: TrinoClusterName,
trino_query_id: TrinoQueryId,
trino_endpoint: Url,
trino_external_endpoint: Option<Url>,
creation_time: SystemTime,
delivered_time: SystemTime,
) -> Self {
TrinoQuery {
id: trino_query_id,
trino_cluster,
trino_endpoint,
trino_external_endpoint,
creation_time,
delivered_time,
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions trino-lb-persistence/src/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Postgres sqlx stuff

First start a postgres:

```bash
docker run --rm -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=admin postgres
```

Afterwards you set the `DATABASE_URL` env var and prepare stuff for offline compilation:

```bash
export DATABASE_URL=postgres://postgres:postgres@localhost/postgres

cd trino-lb-persistence

cargo sqlx migrate run --source src/postgres/migrations
cargo sqlx prepare --workspace
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE queries
-- nullable, as it's Option<&str>
ADD trino_external_endpoint VARCHAR;
18 changes: 15 additions & 3 deletions trino-lb-persistence/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub enum Error {
#[snafu(display("Failed to parse endpoint url of cluster from stored query"))]
ParseClusterEndpointFromStoredQuery { source: url::ParseError },

#[snafu(display("Failed to parse external endpoint url of cluster from stored query"))]
ParseClusterExternalEndpointFromStoredQuery { source: url::ParseError },

#[snafu(display("Failed to convert max query counter to u64, as it is too high"))]
ConvertMaxAllowedQueryCounterToU64 { source: TryFromIntError },

Expand Down Expand Up @@ -204,11 +207,12 @@ impl Persistence for PostgresPersistence {
#[instrument(skip(self, query))]
async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> {
query!(
r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)
VALUES ($1, $2, $3, $4, $5)"#,
r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time)
VALUES ($1, $2, $3, $4, $5, $6)"#,
query.id,
query.trino_cluster,
query.trino_endpoint.as_str(),
query.trino_external_endpoint.as_ref().map(Url::as_str),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Can we pass Url directly instead? Or does query! not support that?

Into::<DateTime<Utc>>::into(query.creation_time),
Into::<DateTime<Utc>>::into(query.delivered_time),
)
Expand All @@ -222,7 +226,7 @@ impl Persistence for PostgresPersistence {
#[instrument(skip(self))]
async fn load_query(&self, query_id: &TrinoQueryId) -> Result<TrinoQuery, super::Error> {
let result = query!(
r#"SELECT id, trino_cluster, trino_endpoint, creation_time, delivered_time
r#"SELECT id, trino_cluster, trino_endpoint, trino_external_endpoint, creation_time, delivered_time
FROM queries
WHERE id = $1"#,
query_id,
Expand All @@ -231,11 +235,19 @@ impl Persistence for PostgresPersistence {
.await
.context(LoadQuerySnafu)?;

let trino_external_endpoint = match result.trino_external_endpoint {
Some(trino_external_endpoint) => Some(
Url::parse(&trino_external_endpoint)
.context(ParseClusterExternalEndpointFromStoredQuerySnafu)?,
Comment on lines +240 to +241
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: Such a bummer that have to do this every time. Can we maybe natively support Url to be used as part of query!?

),
None => None,
};
let query = TrinoQuery {
id: result.id,
trino_cluster: result.trino_cluster,
trino_endpoint: Url::parse(&result.trino_endpoint)
.context(ParseClusterEndpointFromStoredQuerySnafu)?,
trino_external_endpoint,
creation_time: result.creation_time.into(),
delivered_time: result.delivered_time.into(),
};
Expand Down
Loading
Loading