Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ queries = { path = "crates/queries" }
async-trait = { version = "0.1.84" }
aws-config = { version = "1.5.17" }
aws-sdk-dynamodb = { version = "1.100.0" }
aws-credential-types = { version = "1.2.1", features = ["hardcoded-credentials"]}
aws-sdk-s3tables = { version = "1.47.0" }
aws-credential-types = { version = "1.2.11", features = ["hardcoded-credentials"]}
axum = { version = "0.8.1", features = ["multipart", "macros"] }
axum-macros = "0.5"
bytes = { version = "1.8.0" }
Expand Down
3 changes: 2 additions & 1 deletion crates/catalog-metastore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ error-stack = { path = "../error-stack" }

async-trait = { workspace = true }
aws-config = { workspace = true }
aws-credential-types = { version = "1.2.11" }
aws-sdk-s3tables = { workspace = true }
aws-credential-types = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
iceberg-rust = { workspace = true }
Expand Down
68 changes: 65 additions & 3 deletions crates/catalog-metastore/src/metastore_bootstrap_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::{
SchemaIdent, TableFormat, TableIdent, Volume, VolumeIdent, VolumeType,
};
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_credential_types::provider::ProvideCredentials;
use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider};
use aws_sdk_s3tables::Client as S3TablesClient;
use iceberg_rust::spec::table_metadata::TableMetadata;
use iceberg_rust::spec::util::strip_prefix;
use serde::Deserialize;
Expand Down Expand Up @@ -426,8 +429,14 @@ async fn load_volume_from_env() -> Result<Option<VolumeEntry>, ConfigError> {
"s3tables" | "s3_tables" | "s3-tables" => {
let arn = env::var("VOLUME_ARN").map_err(|_| missing_var_error("VOLUME_ARN"))?;

let credentials =
credentials_from_env_or_provider("VOLUME_ACCESS_KEY", "VOLUME_SECRET_KEY").await?;
let credentials = credentials_from_env_or_provider(
"VOLUME_ACCESS_KEY",
"VOLUME_SECRET_KEY",
"VOLUME_AWS_SESSION_TOKEN",
)
.await?;

validate_s3tables_credentials(&arn, &credentials).await?;

VolumeType::S3Tables(S3TablesVolume {
endpoint: None,
Expand All @@ -445,6 +454,7 @@ async fn load_volume_from_env() -> Result<Option<VolumeEntry>, ConfigError> {
let credentials = AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: access_key,
aws_secret_access_key: secret_key,
aws_session_token: None,
});

VolumeType::S3(
Expand Down Expand Up @@ -479,11 +489,14 @@ async fn load_volume_from_env() -> Result<Option<VolumeEntry>, ConfigError> {
async fn credentials_from_env_or_provider(
access_key_env: &str,
secret_key_env: &str,
session_token_env: &str,
) -> Result<AwsCredentials, ConfigError> {
if let (Ok(access_key), Ok(secret_key)) = (env::var(access_key_env), env::var(secret_key_env)) {
let session_token = env::var(session_token_env).ok();
return Ok(AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: access_key,
aws_secret_access_key: secret_key,
aws_session_token: session_token,
}));
}

Expand All @@ -506,5 +519,54 @@ async fn credentials_from_env_or_provider(
Ok(AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: creds.access_key_id().to_string(),
aws_secret_access_key: creds.secret_access_key().to_string(),
aws_session_token: creds.session_token().map(std::string::ToString::to_string),
}))
}

async fn validate_s3tables_credentials(
arn: &str,
credentials: &AwsCredentials,
) -> Result<(), ConfigError> {
let (access_key, secret_key, token) = match credentials {
AwsCredentials::AccessKey(creds) => (
creds.aws_access_key_id.clone(),
creds.aws_secret_access_key.clone(),
creds.aws_session_token.clone(),
),
AwsCredentials::Token(_) => {
return Err(ConfigError::EnvConfig {
reason: "S3 Tables validation requires access key credentials".to_string(),
});
}
};

let region = arn
.split(':')
.nth(3)
.filter(|value| !value.trim().is_empty())
.unwrap_or("us-east-1")
.to_string();

let config = aws_config::defaults(BehaviorVersion::latest())
.credentials_provider(SharedCredentialsProvider::new(Credentials::from_keys(
access_key, secret_key, token,
)))
.region(Region::new(region))
.load()
.await;
let client = S3TablesClient::new(&config);

client
.get_table_bucket()
.table_bucket_arn(arn)
.send()
.await
.map_err(|error| ConfigError::EnvConfig {
reason: format!(
"Failed to validate S3 Tables credentials for {arn}: {:?}",
error.as_service_error()
),
})?;

Ok(())
}
2 changes: 2 additions & 0 deletions crates/catalog-metastore/src/models/volumes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct AwsAccessKeyCredentials {
pub aws_access_key_id: String,
#[validate(regex(path = aws_secret_access_key_regex_func(), message = "AWS Secret access key is expected to be 40 chars Base64-like string with uppercase, lowercase, digits, and +/= .\n"))]
pub aws_secret_access_key: String,
pub aws_session_token: Option<String>,
}

impl std::fmt::Display for AwsAccessKeyCredentials {
Expand All @@ -70,6 +71,7 @@ impl std::fmt::Debug for AwsAccessKeyCredentials {
f.debug_struct("AwsAccessKeyCredentials")
.field("aws_access_key_id", &self.aws_access_key_id)
.field("aws_secret_access_key", &"**********")
.field("aws_session_token", &"**********")
.finish()
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/src/catalog_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl EmbucketCatalogList {
AwsCredentials::AccessKey(ref creds) => (
Some(creds.aws_access_key_id.clone()),
Some(creds.aws_secret_access_key.clone()),
None,
creds.aws_session_token.clone(),
),
AwsCredentials::Token(ref t) => (None, None, Some(t.clone())),
};
Expand Down
3 changes: 3 additions & 0 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,7 @@ impl UserQuery {
credentials: AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: key_id,
aws_secret_access_key: secret_key,
aws_session_token: None,
}),
arn: params.aws_access_point_arn.unwrap_or_default(),
client_options: None,
Expand All @@ -1517,6 +1518,7 @@ impl UserQuery {
let aws_credentials = AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: key_id,
aws_secret_access_key: secret_key,
aws_session_token: None,
});

Volume::new(
Expand Down Expand Up @@ -2844,6 +2846,7 @@ impl UserQuery {
Some(AwsCredentials::AccessKey(AwsAccessKeyCredentials {
aws_access_key_id: access_key.to_string(),
aws_secret_access_key: secret_key.to_string(),
aws_session_token: None,
}))
};

Expand Down