Skip to content

Commit 1685873

Browse files
committed
feat(rest): support AWS SIGv4
Signed-off-by: xxchan <[email protected]>
1 parent c34982a commit 1685873

File tree

5 files changed

+85
-16
lines changed

5 files changed

+85
-16
lines changed

Cargo.lock

Lines changed: 5 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ datafusion-cli = "45"
6464
datafusion-sqllogictest = "45"
6565
derive_builder = "0.20"
6666
dirs = "6"
67-
expect-test = "1"
6867
enum-ordinalize = "4.3.0"
68+
expect-test = "1"
6969
faststr = "0.2.31"
7070
fnv = "1.0.7"
7171
fs-err = "3.1.0"
@@ -93,6 +93,7 @@ port_scanner = "0.1.5"
9393
pretty_assertions = "1.4"
9494
rand = "0.8.5"
9595
regex = "1.10.5"
96+
reqsign = { version = "0.16.3" }
9697
reqwest = { version = "0.12.12", default-features = false, features = ["json"] }
9798
roaring = { version = "0.10", git = "https://github.com/RoaringBitmap/roaring-rs.git" }
9899
rust_decimal = "1.36"

crates/catalog/rest/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ tokio = { workspace = true, features = ["sync"] }
4343
tracing = { workspace = true }
4444
typed-builder = { workspace = true }
4545
uuid = { workspace = true, features = ["v4"] }
46+
reqsign = { workspace = true }
4647

4748
[dev-dependencies]
4849
ctor = { workspace = true }

crates/catalog/rest/src/catalog.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use iceberg::{
2828
TableIdent,
2929
};
3030
use itertools::Itertools;
31+
use reqsign::{AwsConfig, AwsDefaultLoader, AwsV4Signer};
3132
use reqwest::header::{
3233
HeaderMap, HeaderName, HeaderValue, {self},
3334
};
@@ -84,6 +85,30 @@ impl RestCatalogConfig {
8485
}
8586
}
8687

88+
pub(crate) fn get_signer(&self) -> Result<Option<(AwsDefaultLoader, AwsV4Signer)>> {
89+
if let Some("true") = self.props.get("rest.sigv4-enabled").map(|s| s.as_str()) {
90+
let Some(signing_region) = self.props.get("rest.signing-region") else {
91+
return Err(Error::new(
92+
ErrorKind::Unexpected,
93+
"rest.signing-region is not set when rest.sigv4-enabled is true",
94+
));
95+
};
96+
let Some(signing_name) = self.props.get("rest.signing-name") else {
97+
return Err(Error::new(
98+
ErrorKind::Unexpected,
99+
"rest.signing-name is not set when rest.sigv4-enabled is true",
100+
));
101+
};
102+
103+
let config = AwsConfig::default().from_profile().from_env();
104+
println!("access_key_id {:?}", config.access_key_id);
105+
let loader = AwsDefaultLoader::new(self.client().unwrap_or_default(), config);
106+
let signer = AwsV4Signer::new(signing_name, signing_region);
107+
Ok(Some((loader, signer)))
108+
} else {
109+
Ok(None)
110+
}
111+
}
87112
fn namespaces_endpoint(&self) -> String {
88113
self.url_prefixed(&["namespaces"])
89114
}
@@ -306,6 +331,13 @@ impl RestCatalog {
306331
None => None,
307332
};
308333

334+
if let Some(warehouse_path) = warehouse_path {
335+
if warehouse_path.starts_with("arn:aws:") {
336+
let file_io = FileIOBuilder::new("s3").with_props(&props).build()?;
337+
return Ok(file_io);
338+
}
339+
}
340+
309341
let file_io = match warehouse_path.or(metadata_location) {
310342
Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
311343
None => {
@@ -612,7 +644,10 @@ impl Catalog for RestCatalog {
612644
/// provided locally to the `RestCatalog` will take precedence.
613645
async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
614646
let context = self.context().await?;
615-
647+
println!(
648+
"table_endpoint: {:?}",
649+
context.config.table_endpoint(table_ident)
650+
);
616651
let request = context
617652
.client
618653
.request(Method::GET, context.config.table_endpoint(table_ident))

crates/catalog/rest/src/client.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
use std::collections::HashMap;
1919
use std::fmt::{Debug, Formatter};
2020

21-
use http::StatusCode;
21+
use http::{HeaderValue, StatusCode};
2222
use iceberg::{Error, ErrorKind, Result};
23+
use reqsign::{AwsDefaultLoader, AwsV4Signer};
2324
use reqwest::header::HeaderMap;
2425
use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
2526
use serde::de::DeserializeOwned;
@@ -43,6 +44,8 @@ pub(crate) struct HttpClient {
4344
extra_headers: HeaderMap,
4445
/// Extra oauth parameters to be added to each authentication request.
4546
extra_oauth_params: HashMap<String, String>,
47+
48+
signer: Option<(AwsDefaultLoader, AwsV4Signer)>,
4649
}
4750

4851
impl Debug for HttpClient {
@@ -65,6 +68,7 @@ impl HttpClient {
6568
credential: cfg.credential(),
6669
extra_headers,
6770
extra_oauth_params: cfg.extra_oauth_params(),
71+
signer: cfg.get_signer()?,
6872
})
6973
}
7074

@@ -88,6 +92,7 @@ impl HttpClient {
8892
extra_oauth_params: (!cfg.extra_oauth_params().is_empty())
8993
.then(|| cfg.extra_oauth_params())
9094
.unwrap_or(self.extra_oauth_params),
95+
signer: cfg.get_signer()?,
9196
})
9297
}
9398

@@ -220,6 +225,39 @@ impl HttpClient {
220225
/// Executes the given `Request` and returns a `Response`.
221226
pub async fn execute(&self, mut request: Request) -> Result<Response> {
222227
request.headers_mut().extend(self.extra_headers.clone());
228+
229+
if let Some((loader, signer)) = &self.signer {
230+
match loader.load().await {
231+
Ok(Some(credential)) => {
232+
const EMPTY_STRING_SHA256: &str =
233+
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
234+
request.headers_mut().insert(
235+
"x-amz-content-sha256",
236+
HeaderValue::from_str(EMPTY_STRING_SHA256).unwrap(),
237+
);
238+
if let Err(e) = signer.sign(&mut request, &credential) {
239+
return Err(Error::new(
240+
ErrorKind::Unexpected,
241+
"Failed to sign request for sigv4 signing",
242+
)
243+
.with_source(e));
244+
}
245+
}
246+
Ok(None) => {
247+
return Err(Error::new(
248+
ErrorKind::Unexpected,
249+
"Credential not found for sigv4 signing",
250+
));
251+
}
252+
Err(e) => {
253+
return Err(Error::new(
254+
ErrorKind::Unexpected,
255+
"Failed to load credential for sigv4 signing",
256+
)
257+
.with_source(e));
258+
}
259+
}
260+
}
223261
Ok(self.client.execute(request).await?)
224262
}
225263

@@ -255,6 +293,7 @@ pub(crate) async fn deserialize_catalog_response<R: DeserializeOwned>(
255293
/// codes that all endpoints share (400, 404, etc.).
256294
pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) -> Error {
257295
let (status, headers) = (response.status(), response.headers().clone());
296+
let url = response.url().to_string();
258297
let bytes = match response.bytes().await {
259298
Ok(bytes) => bytes,
260299
Err(err) => return err.into(),
@@ -264,4 +303,5 @@ pub(crate) async fn deserialize_unexpected_catalog_error(response: Response) ->
264303
.with_context("status", status.to_string())
265304
.with_context("headers", format!("{:?}", headers))
266305
.with_context("json", String::from_utf8_lossy(&bytes))
306+
.with_context("url", url)
267307
}

0 commit comments

Comments
 (0)