Skip to content

Commit

Permalink
feat: configurable column encoding for parquet checkpoint files
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Münch <[email protected]>
  • Loading branch information
dmunch committed Feb 13, 2025
1 parent b3efdfc commit 4ddbb45
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 18 deletions.
25 changes: 16 additions & 9 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use itertools::Itertools;
use object_store::{Error, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::basic::Encoding;
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use regex::Regex;
Expand Down Expand Up @@ -354,17 +355,23 @@ fn parquet_bytes_from_state(
);

debug!("Writing to checkpoint parquet buffer...");

let writer_properties = if state.table_config().use_checkpoint_rle() {
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build()
} else {
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.build()
};

// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(
&mut bytes,
arrow_schema.clone(),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)?;
let mut writer =
ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), Some(writer_properties))?;
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub enum TableProperty {
/// stats_parsed column and to write partition values as a struct for partitionValues_parsed.
CheckpointWriteStatsAsStruct,

/// true for Delta Lake to write checkpoint files using run length encoding (RLE).
/// Some readers don't support run length encoding (i.e. Fabric) so this can be disabled.
CheckpointUseRunLengthEncoding,

/// Whether column mapping is enabled for Delta table columns and the corresponding
/// Parquet columns that use different names.
ColumnMappingMode,
Expand Down Expand Up @@ -126,6 +130,7 @@ impl AsRef<str> for TableProperty {
Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite",
Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson",
Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct",
Self::CheckpointUseRunLengthEncoding => "delta-rs.checkpoint.useRunLengthEncoding",
Self::CheckpointPolicy => "delta.checkpointPolicy",
Self::ColumnMappingMode => "delta.columnMapping.mode",
Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols",
Expand Down Expand Up @@ -158,6 +163,7 @@ impl FromStr for TableProperty {
"delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite),
"delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson),
"delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct),
"delta-rs.checkpoint.useRunLengthEncoding" => Ok(Self::CheckpointUseRunLengthEncoding),
"delta.checkpointPolicy" => Ok(Self::CheckpointPolicy),
"delta.columnMapping.mode" => Ok(Self::ColumnMappingMode),
"delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols),
Expand Down Expand Up @@ -238,6 +244,13 @@ impl TableConfig<'_> {
bool,
false
),
(
"true for Delta Lake to write checkpoint files using run length encoding (RLE)",
TableProperty::CheckpointUseRunLengthEncoding,
use_checkpoint_rle,
bool,
true
),
(
"The target file size in bytes or higher units for file tuning",
TableProperty::TargetFileSize,
Expand Down
105 changes: 96 additions & 9 deletions crates/core/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use deltalake_core::protocol::DeltaOperation;

mod simple_checkpoint {
use deltalake_core::*;
use parquet::basic::Encoding;
use parquet::file::reader::{FileReader, SerializedFileReader};
use pretty_assertions::assert_eq;
use std::fs;
use regex::Regex;
use std::fs::{self, File};
use std::path::{Path, PathBuf};

#[tokio::test]
Expand All @@ -31,6 +34,9 @@ mod simple_checkpoint {
let checkpoint_path = log_path.join("00000000000000000005.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does use run length encoding
assert_column_rle_encoding(checkpoint_path, true);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(5, version);
Expand All @@ -42,6 +48,9 @@ mod simple_checkpoint {
let checkpoint_path = log_path.join("00000000000000000010.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does use run length encoding
assert_column_rle_encoding(checkpoint_path, true);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(10, version);
Expand All @@ -53,6 +62,77 @@ mod simple_checkpoint {
assert_eq!(12, files.count());
}

#[tokio::test]
async fn checkpoint_run_length_encoding_test() {
let table_location = "../test/tests/data/checkpoints";
let table_path = PathBuf::from(table_location);
let log_path = table_path.join("_delta_log");

// Delete checkpoint files from previous runs
cleanup_checkpoint_files(log_path.as_path());

// Load the delta table
let base_table = deltalake_core::open_table(table_location).await.unwrap();

// Set the table properties to disable run length encoding
// this alters table version and should be done in a more principled way
let table = DeltaOps(base_table)
.set_tbl_properties()
.with_properties(std::collections::HashMap::<String, String>::from([(
"delta-rs.checkpoint.useRunLengthEncoding".to_string(),
"false".to_string(),
)]))
.await
.unwrap();

// Write a checkpoint
checkpoints::create_checkpoint(&table, None).await.unwrap();

// checkpoint should exist
let checkpoint_path = log_path.join("00000000000000000013.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does not use run length encoding
assert_column_rle_encoding(checkpoint_path, false);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(table.version(), version);

// delta table should load just fine with the checkpoint in place
let table_result = deltalake_core::open_table(table_location).await.unwrap();
let table = table_result;
let files = table.get_files_iter().unwrap();
assert_eq!(12, files.count());
}

fn assert_column_rle_encoding(file_path: PathBuf, should_be_rle: bool) {
let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let meta = reader.metadata();
let mut found_rle = false;

for i in 0..meta.num_row_groups() {
let row_group = meta.row_group(i);
for j in 0..row_group.num_columns() {
let column_chunk: &parquet::file::metadata::ColumnChunkMetaData =
row_group.column(j);

for encoding in column_chunk.encodings() {
if *encoding == Encoding::RLE_DICTIONARY {
found_rle = true;
}
}
}
}

if should_be_rle {
assert!(found_rle, "Expected RLE_DICTIONARY encoding");
} else {
assert!(!found_rle, "Expected no RLE_DICTIONARY encoding");
}
}

fn get_last_checkpoint_version(log_path: &Path) -> i64 {
let last_checkpoint_path = log_path.join("_last_checkpoint");
assert!(last_checkpoint_path.as_path().exists());
Expand All @@ -69,15 +149,22 @@ mod simple_checkpoint {
}

fn cleanup_checkpoint_files(log_path: &Path) {
let paths = fs::read_dir(log_path).unwrap();
for d in paths.flatten() {
let path = d.path();

if path.file_name().unwrap() == "_last_checkpoint"
|| (path.extension().is_some() && path.extension().unwrap() == "parquet")
{
fs::remove_file(path).unwrap();
let re = Regex::new(r"^(\d{20})\.json$").unwrap();
for entry in fs::read_dir(log_path).unwrap().flatten() {
let path = entry.path();
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};

if let Some(caps) = re.captures(filename) {
if let Ok(num) = caps[1].parse::<u64>() {
if num <= 12 {
continue;
}
}
}
let _ = fs::remove_file(path);
}
}
}
Expand Down

0 comments on commit 4ddbb45

Please sign in to comment.