Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 102 additions & 5 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,11 +917,55 @@ impl Catalog for SqlCatalog {
.build()?)
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
/// Updates an existing table within the SQL catalog.
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let current_table = self.load_table(&table_ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();

let staged_table = commit.apply(current_table)?;
let staged_metadata_location = staged_table.metadata_location_result()?;

staged_table
.metadata()
.write_to(staged_table.file_io(), &staged_metadata_location)
.await?;

let update_result = self
.execute(
&format!(
"UPDATE {CATALOG_TABLE_NAME}
SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?, {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} = ?
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)
AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
),
vec![
Some(staged_metadata_location),
Some(current_metadata_location.as_str()),
Some(&self.name),
Some(table_ident.name()),
Some(&table_ident.namespace().join(".")),
Some(current_metadata_location.as_str()),
],
None,
)
.await?;

if update_result.rows_affected() == 0 {
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit conflicted for table: {table_ident}"),
)
.with_retryable(true));
}

Ok(staged_table)
}
}

Expand All @@ -932,6 +976,7 @@ mod tests {

use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::table::Table;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
use itertools::Itertools;
use regex::Regex;
Expand Down Expand Up @@ -2293,4 +2338,56 @@ mod tests {
assert_eq!(table.identifier(), expected_table.identifier());
assert_eq!(table.metadata_location(), Some(metadata_location.as_str()));
}

#[tokio::test]
async fn test_update_table() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc).await;

// Create a test namespace and table
let namespace_ident = NamespaceIdent::new("ns1".into());
create_namespace(&catalog, &namespace_ident).await;
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
create_table(&catalog, &table_ident).await;

let table = catalog.load_table(&table_ident).await.unwrap();

// Store the original metadata location for comparison
let original_metadata_location = table.metadata_location().unwrap().to_string();

// Create a transaction to update the table
let tx = Transaction::new(&table);
let tx = tx
.update_table_properties()
.set("test_property".to_string(), "test_value".to_string())
.apply(tx)
.unwrap();

// Commit the transaction to the catalog
let updated_table = tx.commit(&catalog).await.unwrap();

// Verify the update was successful
assert_eq!(
updated_table.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);
// Verify the metadata location has been updated
assert_ne!(
updated_table.metadata_location().unwrap(),
original_metadata_location.as_str()
);

// Load the table again from the catalog to verify changes were persisted
let reloaded = catalog.load_table(&table_ident).await.unwrap();

// Verify the reloaded table matches the updated table
assert_eq!(
reloaded.metadata().properties().get("test_property"),
Some(&"test_value".to_string())
);
assert_eq!(
reloaded.metadata_location(),
updated_table.metadata_location()
);
}
}
Loading