Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable column encoding for parquet checkpoint files #3214

Merged
merged 2 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use itertools::Itertools;
use object_store::{Error, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::basic::Encoding;
use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use regex::Regex;
Expand Down Expand Up @@ -354,17 +355,23 @@ fn parquet_bytes_from_state(
);

debug!("Writing to checkpoint parquet buffer...");

let writer_properties = if state.table_config().use_checkpoint_rle() {
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build()
} else {
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_dictionary_enabled(false)
.set_encoding(Encoding::PLAIN)
.build()
};

// Write the Checkpoint parquet file.
let mut bytes = vec![];
let mut writer = ArrowWriter::try_new(
&mut bytes,
arrow_schema.clone(),
Some(
WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build(),
),
)?;
let mut writer =
ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), Some(writer_properties))?;
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/table/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub enum TableProperty {
/// stats_parsed column and to write partition values as a struct for partitionValues_parsed.
CheckpointWriteStatsAsStruct,

/// true for Delta Lake to write checkpoint files using run length encoding (RLE).
/// Some readers don't support run length encoding (i.e. Fabric) so this can be disabled.
CheckpointUseRunLengthEncoding,

/// Whether column mapping is enabled for Delta table columns and the corresponding
/// Parquet columns that use different names.
ColumnMappingMode,
Expand Down Expand Up @@ -126,6 +130,7 @@ impl AsRef<str> for TableProperty {
Self::AutoOptimizeOptimizeWrite => "delta.autoOptimize.optimizeWrite",
Self::CheckpointWriteStatsAsJson => "delta.checkpoint.writeStatsAsJson",
Self::CheckpointWriteStatsAsStruct => "delta.checkpoint.writeStatsAsStruct",
Self::CheckpointUseRunLengthEncoding => "delta-rs.checkpoint.useRunLengthEncoding",
Self::CheckpointPolicy => "delta.checkpointPolicy",
Self::ColumnMappingMode => "delta.columnMapping.mode",
Self::DataSkippingNumIndexedCols => "delta.dataSkippingNumIndexedCols",
Expand Down Expand Up @@ -158,6 +163,7 @@ impl FromStr for TableProperty {
"delta.autoOptimize.optimizeWrite" => Ok(Self::AutoOptimizeOptimizeWrite),
"delta.checkpoint.writeStatsAsJson" => Ok(Self::CheckpointWriteStatsAsJson),
"delta.checkpoint.writeStatsAsStruct" => Ok(Self::CheckpointWriteStatsAsStruct),
"delta-rs.checkpoint.useRunLengthEncoding" => Ok(Self::CheckpointUseRunLengthEncoding),
"delta.checkpointPolicy" => Ok(Self::CheckpointPolicy),
"delta.columnMapping.mode" => Ok(Self::ColumnMappingMode),
"delta.dataSkippingNumIndexedCols" => Ok(Self::DataSkippingNumIndexedCols),
Expand Down Expand Up @@ -238,6 +244,13 @@ impl TableConfig<'_> {
bool,
false
),
(
"true for Delta Lake to write checkpoint files using run length encoding (RLE)",
TableProperty::CheckpointUseRunLengthEncoding,
use_checkpoint_rle,
bool,
true
),
(
"The target file size in bytes or higher units for file tuning",
TableProperty::TargetFileSize,
Expand Down
108 changes: 99 additions & 9 deletions crates/core/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ use deltalake_core::protocol::DeltaOperation;

mod simple_checkpoint {
use deltalake_core::*;
use parquet::basic::Encoding;
use parquet::file::reader::{FileReader, SerializedFileReader};
use pretty_assertions::assert_eq;
use std::fs;
use regex::Regex;
use serial_test::serial;
use std::fs::{self, File};
use std::path::{Path, PathBuf};

#[tokio::test]
#[serial]
async fn simple_checkpoint_test() {
let table_location = "../test/tests/data/checkpoints";
let table_path = PathBuf::from(table_location);
Expand All @@ -31,6 +36,9 @@ mod simple_checkpoint {
let checkpoint_path = log_path.join("00000000000000000005.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does use run length encoding
assert_column_rle_encoding(checkpoint_path, true);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(5, version);
Expand All @@ -42,6 +50,9 @@ mod simple_checkpoint {
let checkpoint_path = log_path.join("00000000000000000010.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does use run length encoding
assert_column_rle_encoding(checkpoint_path, true);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(10, version);
Expand All @@ -53,6 +64,78 @@ mod simple_checkpoint {
assert_eq!(12, files.count());
}

#[tokio::test]
#[serial]
async fn checkpoint_run_length_encoding_test() {
let table_location = "../test/tests/data/checkpoints";
let table_path = PathBuf::from(table_location);
let log_path = table_path.join("_delta_log");

// Delete checkpoint files from previous runs
cleanup_checkpoint_files(log_path.as_path());

// Load the delta table
let base_table = deltalake_core::open_table(table_location).await.unwrap();

// Set the table properties to disable run length encoding
// this alters table version and should be done in a more principled way
let table = DeltaOps(base_table)
.set_tbl_properties()
.with_properties(std::collections::HashMap::<String, String>::from([(
"delta-rs.checkpoint.useRunLengthEncoding".to_string(),
"false".to_string(),
)]))
.await
.unwrap();

// Write a checkpoint
checkpoints::create_checkpoint(&table, None).await.unwrap();

// checkpoint should exist
let checkpoint_path = log_path.join("00000000000000000013.checkpoint.parquet");
assert!(checkpoint_path.as_path().exists());

// Check that the checkpoint does not use run length encoding
assert_column_rle_encoding(checkpoint_path, false);

// _last_checkpoint should exist and point to the correct version
let version = get_last_checkpoint_version(&log_path);
assert_eq!(table.version(), version);

// delta table should load just fine with the checkpoint in place
let table_result = deltalake_core::open_table(table_location).await.unwrap();
let table = table_result;
let files = table.get_files_iter().unwrap();
assert_eq!(12, files.count());
}

fn assert_column_rle_encoding(file_path: PathBuf, should_be_rle: bool) {
let file = File::open(&file_path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let meta = reader.metadata();
let mut found_rle = false;

for i in 0..meta.num_row_groups() {
let row_group = meta.row_group(i);
for j in 0..row_group.num_columns() {
let column_chunk: &parquet::file::metadata::ColumnChunkMetaData =
row_group.column(j);

for encoding in column_chunk.encodings() {
if *encoding == Encoding::RLE_DICTIONARY {
found_rle = true;
}
}
}
}

if should_be_rle {
assert!(found_rle, "Expected RLE_DICTIONARY encoding");
} else {
assert!(!found_rle, "Expected no RLE_DICTIONARY encoding");
}
}

fn get_last_checkpoint_version(log_path: &Path) -> i64 {
let last_checkpoint_path = log_path.join("_last_checkpoint");
assert!(last_checkpoint_path.as_path().exists());
Expand All @@ -69,15 +152,22 @@ mod simple_checkpoint {
}

fn cleanup_checkpoint_files(log_path: &Path) {
let paths = fs::read_dir(log_path).unwrap();
for d in paths.flatten() {
let path = d.path();

if path.file_name().unwrap() == "_last_checkpoint"
|| (path.extension().is_some() && path.extension().unwrap() == "parquet")
{
fs::remove_file(path).unwrap();
let re = Regex::new(r"^(\d{20})\.json$").unwrap();
for entry in fs::read_dir(log_path).unwrap().flatten() {
let path = entry.path();
let filename = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};

if let Some(caps) = re.captures(filename) {
if let Ok(num) = caps[1].parse::<u64>() {
if num <= 12 {
continue;
}
}
}
let _ = fs::remove_file(path);
}
}
}
Expand Down
Loading