Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 30 additions & 2 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,26 @@ impl Datum {
}
}
PrimitiveType::Int => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
PrimitiveType::Long => PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?)),
PrimitiveType::Long => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant section from the spec: https://iceberg.apache.org/spec/#schema-evolution

Copy link
Contributor Author

@Fokko Fokko May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I first did only the V2 transformations for now. I've added a test for the float and decimal case as well 👍 Decimal works with the current code since we don't have a fixed number of bytes in contrast to int/long.

if bytes.len() == 4 {
// In the case of an evolved field
PrimitiveLiteral::Long(i32::from_le_bytes(bytes.try_into()?) as i64)
} else {
PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?))
}
}
PrimitiveType::Float => {
PrimitiveLiteral::Float(OrderedFloat(f32::from_le_bytes(bytes.try_into()?)))
}
PrimitiveType::Double => {
PrimitiveLiteral::Double(OrderedFloat(f64::from_le_bytes(bytes.try_into()?)))
if bytes.len() == 4 {
// In the case of an evolved field
PrimitiveLiteral::Double(OrderedFloat(
f32::from_le_bytes(bytes.try_into()?) as f64
))
} else {
PrimitiveLiteral::Double(OrderedFloat(f64::from_le_bytes(bytes.try_into()?)))
}
}
PrimitiveType::Date => PrimitiveLiteral::Int(i32::from_le_bytes(bytes.try_into()?)),
PrimitiveType::Time => PrimitiveLiteral::Long(i64::from_le_bytes(bytes.try_into()?)),
Expand Down Expand Up @@ -3172,6 +3186,13 @@ mod tests {
check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
}

#[test]
fn avro_bytes_long_from_int() {
let bytes = vec![32u8, 0u8, 0u8, 0u8];

check_avro_bytes_serde(bytes, Datum::long(32), &PrimitiveType::Long);
}

#[test]
fn avro_bytes_float() {
let bytes = vec![0u8, 0u8, 128u8, 63u8];
Expand All @@ -3186,6 +3207,13 @@ mod tests {
check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
}

#[test]
fn avro_bytes_double_from_float() {
let bytes = vec![0u8, 0u8, 128u8, 63u8];

check_avro_bytes_serde(bytes, Datum::double(1.0), &PrimitiveType::Double);
}

#[test]
fn avro_bytes_string() {
let bytes = vec![105u8, 99u8, 101u8, 98u8, 101u8, 114u8, 103u8];
Expand Down
1 change: 1 addition & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }
ordered-float = "2.10.1"
8 changes: 8 additions & 0 deletions crates/integration_tests/testdata/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@
spark.sql("ALTER TABLE rest.default.test_promote_column ALTER COLUMN foo TYPE bigint")
spark.sql("INSERT INTO rest.default.test_promote_column VALUES (25)")

# Create a table, and do some evolution on a partition column
spark.sql("CREATE OR REPLACE TABLE rest.default.test_promote_partition_column (foo int, bar float, baz decimal(4, 2)) USING iceberg PARTITIONED BY (foo)")
spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (19, 19.25, 19.25)")
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN foo TYPE bigint")
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN bar TYPE double")
spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN baz TYPE decimal(6, 2)")
spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (25, 22.25, 22.25)")

# Create a table with various types
spark.sql("""
CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Integration tests for rest catalog.

use arrow_array::{Int64Array, StringArray};
use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::{Catalog, TableIdent};
use iceberg_catalog_rest::RestCatalog;
use ordered_float::OrderedFloat;

use crate::get_shared_containers;

Expand Down Expand Up @@ -98,4 +99,80 @@ async fn test_evolved_schema() {
actual.sort();

assert_eq!(actual, vec![19, 25]);

// Evolve a partitioned column
let table = rest_catalog
.load_table(&TableIdent::from_strs(["default", "test_promote_partition_column"]).unwrap())
.await
.unwrap();
let scan = table.scan().build();
let batch_stream = scan.unwrap().to_arrow().await.unwrap();

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
let mut actual_foo = vec![
batches[0]
.column_by_name("foo")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0),
batches[1]
.column_by_name("foo")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0),
];

actual_foo.sort();

assert_eq!(actual_foo, vec![19, 25]);

let mut actual_bar = vec![
OrderedFloat(
batches[0]
.column_by_name("bar")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.value(0),
),
OrderedFloat(
batches[1]
.column_by_name("bar")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.value(0),
),
];

actual_bar.sort();

assert_eq!(actual_bar, vec![19.25, 22.25]);

let mut actual_baz = vec![
batches[0]
.column_by_name("baz")
.unwrap()
.as_any()
.downcast_ref::<Decimal128Array>()
.unwrap()
.value(0),
batches[1]
.column_by_name("baz")
.unwrap()
.as_any()
.downcast_ref::<Decimal128Array>()
.unwrap()
.value(0),
];

actual_baz.sort();

assert_eq!(actual_baz, vec![1925, 2225]);
}
Loading