Skip to content

Commit

Permalink
serialize fsck files_removed metric as a string for compatibility wit…
Browse files Browse the repository at this point in the history
…h Spark

Signed-off-by: Liam Murphy <[email protected]>
  • Loading branch information
liamphmurphy authored and rtyler committed Feb 18, 2025
1 parent 1b629c6 commit 76480f9
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
24 changes: 23 additions & 1 deletion crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::future::BoxFuture;
use futures::StreamExt;
pub use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;
use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
use url::{ParseError, Url};
use uuid::Uuid;

Expand Down Expand Up @@ -56,6 +56,10 @@ pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
/// Files that wrere removed successfully
#[serde(
serialize_with = "serialize_vec_string",
deserialize_with = "deserialize_vec_string"
)]
pub files_removed: Vec<String>,
}

Expand All @@ -66,6 +70,24 @@ struct FileSystemCheckPlan {
pub files_to_remove: Vec<Add>,
}

// Custom serialization function that serializes metric details as a string
fn serialize_vec_string<S>(value: &Vec<String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let json_string = serde_json::to_string(value).map_err(serde::ser::Error::custom)?;
serializer.serialize_str(&json_string)
}

// Custom deserialization that parses a JSON string into MetricDetails
fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
serde_json::from_str(&s).map_err(DeError::custom)
}

fn is_absolute_path(path: &str) -> DeltaResult<bool> {
match Url::parse(path) {
Ok(_) => Ok(true),
Expand Down
8 changes: 7 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
NOT_SUPPORTED_READER_VERSION = 2
SUPPORTED_READER_FEATURES = {"timestampNtz"}

FSCK_METRICS_FILES_REMOVED_LABEL = "files_removed"

FilterLiteralType = Tuple[str, str, Any]
FilterConjunctionType = List[FilterLiteralType]
FilterDNFType = List[FilterConjunctionType]
Expand Down Expand Up @@ -1432,7 +1434,11 @@ def repair(
commit_properties,
post_commithook_properties,
)
return json.loads(metrics)
deserialized_metrics = json.loads(metrics)
deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL] = json.loads(
deserialized_metrics[FSCK_METRICS_FILES_REMOVED_LABEL]
)
return deserialized_metrics

def transaction_versions(self) -> Dict[str, Transaction]:
return self._table.transaction_versions()
Expand Down
29 changes: 29 additions & 0 deletions python/tests/pyspark_integration/test_write_to_pyspark.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests that deltalake(delta-rs) can write to tables written by PySpark."""

import os
import pathlib

import pyarrow as pa
Expand Down Expand Up @@ -169,3 +170,31 @@ def test_spark_read_z_ordered_history(tmp_path: pathlib.Path):
)

assert latest_operation_metrics["operationMetrics"] is not None


@pytest.mark.pyspark
@pytest.mark.integration
def test_spark_read_repair_run(tmp_path):
ids = ["1"] * 10
values = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

id_array = pa.array(ids, type=pa.string())
value_array = pa.array(values, type=pa.int32())

pa_table = pa.Table.from_arrays([id_array, value_array], names=["id", "value"])

write_deltalake(tmp_path, pa_table, mode="append")
write_deltalake(tmp_path, pa_table, mode="append")
dt = DeltaTable(tmp_path)
os.remove(dt.file_uris()[0])

dt.repair(dry_run=False)
spark = get_spark()

history_df = spark.sql(f"DESCRIBE HISTORY '{tmp_path}'")

latest_operation_metrics = (
history_df.orderBy(history_df.version.desc()).select("operationMetrics").first()
)

assert latest_operation_metrics["operationMetrics"] is not None

0 comments on commit 76480f9

Please sign in to comment.