From d922b311b4ff697f5e0c964ddc7572324d2d3c01 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Tue, 1 Jun 2021 17:35:04 -0700 Subject: [PATCH] optimize vacuum operation (#258) * optimize vacuum operation Avoid repeatedly creating strings and vectors at runtime. Turn O(n) path lookup into O(1) with Hashset. Early exit in stale file check loop. * rename table uri to table path everywhere for consistency * change path in public methods to uri * bump python version --- python/Cargo.toml | 4 +- python/Makefile | 4 +- python/deltalake/table.py | 26 ++- python/src/lib.rs | 18 +- ruby/spec/deltalake_spec.rb | 6 +- ruby/src/lib.rs | 26 +-- rust/README.md | 8 + rust/src/bin/delta-inspect.rs | 29 ++- rust/src/delta.rs | 272 ++++++++++++++++++--------- rust/src/delta_datafusion.rs | 2 +- rust/src/storage/azure.rs | 19 ++ rust/src/storage/file/mod.rs | 39 ++++ rust/src/storage/mod.rs | 69 +++---- rust/src/storage/s3/mod.rs | 21 +++ rust/tests/read_delta_test.rs | 18 +- rust/tests/read_simple_table_test.rs | 4 +- rust/tests/write_exploration.rs | 2 +- 17 files changed, 371 insertions(+), 196 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 4019323d4f..d3a3867787 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.4.9" +version = "0.5.0" authors = ["Qingping Hou "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" @@ -54,4 +54,4 @@ requires-dist = [ "sphinx-rtd-theme; extra == 'devel'", "toml; extra == 'devel'", ] -provides-extra = ["pandas", "devel"] \ No newline at end of file +provides-extra = ["pandas", "devel"] diff --git a/python/Makefile b/python/Makefile index 71d9c7ef02..9e781d080f 100644 --- a/python/Makefile +++ b/python/Makefile @@ -23,7 +23,7 @@ format: ## Format the code $(info --- Rust format ---) cargo fmt $(info --- Python format ---) - black . + black deltalake tests *.py isort . .PHONY: check-rust @@ -66,4 +66,4 @@ clean: ## Run clean .PHONY: help help: - @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' \ No newline at end of file + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 2daece84d8..8e78c22b72 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1,4 +1,5 @@ import os +import warnings from dataclasses import dataclass from typing import Any, List, Optional, Tuple from urllib.parse import urlparse @@ -61,15 +62,15 @@ def __str__(self) -> str: class DeltaTable: """Create a DeltaTable instance.""" - def __init__(self, table_path: str, version: Optional[int] = None): + def __init__(self, table_uri: str, version: Optional[int] = None): """ Create the Delta Table from a path with an optional version. Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2 and local URI. - :param table_path: the path of the DeltaTable + :param table_uri: the path of the DeltaTable :param version: version of the DeltaTable """ - self._table = RawDeltaTable(table_path, version=version) + self._table = RawDeltaTable(table_uri, version=version) self._metadata = Metadata(self._table) def version(self) -> int: @@ -123,9 +124,22 @@ def file_paths(self) -> List[str]: """ Get the list of files with an absolute path. - :return: list of the .parquet files with an absolute path referenced for the current version of the DeltaTable + :return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable """ - return self._table.file_paths() + warnings.warn( + "Call to deprecated method file_paths. Please use file_uris instead.", + category=DeprecationWarning, + stacklevel=2, + ) + return self.file_uris() + + def file_uris(self) -> List[str]: + """ + Get the list of files with an absolute path. + + :return: list of the .parquet files with an absolute URI referenced for the current version of the DeltaTable + """ + return self._table.file_uris() def load_version(self, version: int) -> None: """ @@ -182,7 +196,7 @@ def to_pyarrow_dataset( :return: the PyArrow dataset in PyArrow """ if partitions is None: - file_paths = self._table.file_paths() + file_paths = self._table.file_uris() else: file_paths = self._table.files_by_partitions(partitions) paths = [urlparse(curr_file) for curr_file in file_paths] diff --git a/python/src/lib.rs b/python/src/lib.rs index 38c4bd474a..f07d42c8d9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -62,19 +62,17 @@ struct RawDeltaTableMetaData { #[pymethods] impl RawDeltaTable { #[new] - fn new(table_path: &str, version: Option) -> PyResult { + fn new(table_uri: &str, version: Option) -> PyResult { let table = match version { - None => rt()?.block_on(deltalake::open_table(table_path)), - Some(version) => { - rt()?.block_on(deltalake::open_table_with_version(table_path, version)) - } + None => rt()?.block_on(deltalake::open_table(table_uri)), + Some(version) => rt()?.block_on(deltalake::open_table_with_version(table_uri, version)), } .map_err(PyDeltaTableError::from_raw)?; Ok(RawDeltaTable { _table: table }) } - pub fn table_path(&self) -> PyResult<&str> { - Ok(&self._table.table_path) + pub fn table_uri(&self) -> PyResult<&str> { + Ok(&self._table.table_uri) } pub fn version(&self) -> PyResult { @@ -121,7 +119,7 @@ impl RawDeltaTable { match partition_filters { Ok(filters) => Ok(self ._table - .get_file_paths_by_partitions(&filters) + .get_file_uris_by_partitions(&filters) .map_err(PyDeltaTableError::from_raw)?), Err(err) => Err(PyDeltaTableError::from_raw(err)), } @@ -135,8 +133,8 @@ impl RawDeltaTable { .collect()) } - pub fn file_paths(&self) -> PyResult> { - Ok(self._table.get_file_paths()) + pub fn file_uris(&self) -> PyResult> { + Ok(self._table.get_file_uris()) } pub fn schema_json(&self) -> PyResult { diff --git a/ruby/spec/deltalake_spec.rb b/ruby/spec/deltalake_spec.rb index 771d82ceae..ed33200b59 100644 --- a/ruby/spec/deltalake_spec.rb +++ b/ruby/spec/deltalake_spec.rb @@ -5,13 +5,13 @@ describe Deltalake do describe '#open_table' do - let(:table_path) do + let(:table_uri) do File.expand_path('../rust/tests/data/simple_table') end - subject(:table) { Deltalake.open_table(table_path) } + subject(:table) { Deltalake.open_table(table_uri) } - its(:table_path) { should eq(table_path) } + its(:table_uri) { should eq(table_uri) } its(:version) { should eq 4 } describe '#files' do diff --git a/ruby/src/lib.rs b/ruby/src/lib.rs index 1a149f883b..18f077f9e4 100644 --- a/ruby/src/lib.rs +++ b/ruby/src/lib.rs @@ -13,23 +13,23 @@ use rutie::{AnyObject, Array, Class, Integer, Object, RString}; use std::sync::Arc; pub struct TableData { - table_path: String, + table_uri: String, actual: Arc, } impl TableData { - fn new(table_path: String) -> Self { - println!("initializing with {}", table_path); + fn new(table_uri: String) -> Self { + println!("initializing with {}", table_uri); let rt = tokio::runtime::Runtime::new().unwrap(); - let table = rt.block_on(deltalake::open_table(&table_path)).unwrap(); + let table = rt.block_on(deltalake::open_table(&table_uri)).unwrap(); let actual = Arc::new(table); - Self { table_path, actual } + Self { table_uri, actual } } - fn table_path(&self) -> &str { - &self.table_path + fn table_uri(&self) -> &str { + &self.table_uri } fn version(&self) -> i64 { @@ -51,15 +51,15 @@ class!(Table); methods!( Table, rtself, - fn ruby_table_new(table_path: RString) -> AnyObject { - let table_data = TableData::new(table_path.unwrap().to_string()); + fn ruby_table_new(table_uri: RString) -> AnyObject { + let table_data = TableData::new(table_uri.unwrap().to_string()); Class::from_existing("Table").wrap_data(table_data, &*TABLE_DATA_WRAPPER) }, - fn ruby_table_path() -> RString { - let table_path = rtself.get_data(&*TABLE_DATA_WRAPPER).table_path(); + fn ruby_table_uri() -> RString { + let table_uri = rtself.get_data(&*TABLE_DATA_WRAPPER).table_uri(); - RString::new_utf8(table_path) + RString::new_utf8(table_uri) }, fn ruby_version() -> Integer { let version = rtself.get_data(&*TABLE_DATA_WRAPPER).version(); @@ -87,7 +87,7 @@ pub extern "C" fn Init_table() { Class::new("Table", Some(&data_class)).define(|klass| { klass.def_self("new", ruby_table_new); - klass.def("table_path", ruby_table_path); + klass.def("table_uri", ruby_table_uri); klass.def("version", ruby_version); klass.def("files", ruby_files); }); diff --git a/rust/README.md b/rust/README.md index 61437b8440..260cde89c6 100644 --- a/rust/README.md +++ b/rust/README.md @@ -39,3 +39,11 @@ Optional cargo package features - `s3` - enable the S3 storage backend to work with Delta Tables in AWS S3. - `azure` - enable the Azure storage backend to work with Delta Tables in Azure Data Lake Storage Gen2 accounts. - `datafusion-ext` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow/tree/master/rust/datafusion). + + +Development +----------- + +To run s3 integration tests from local machine, we use docker-compose to stand +up AWS local stack. To spin up the test environment run `docker-compose up` in +the root of the `delta-rs` repo. diff --git a/rust/src/bin/delta-inspect.rs b/rust/src/bin/delta-inspect.rs index a19c53c646..f0399bdbfc 100644 --- a/rust/src/bin/delta-inspect.rs +++ b/rust/src/bin/delta-inspect.rs @@ -16,18 +16,18 @@ async fn main() -> anyhow::Result<()> { App::new("info") .about("dump table metadata info") .setting(AppSettings::ArgRequiredElseHelp) - .args(&[Arg::new("path").about("Table path").required(true)]), + .args(&[Arg::new("uri").about("Table URI").required(true)]), ) .subcommand( App::new("files") .setting(AppSettings::ArgRequiredElseHelp) .about("output list of files for a given version, defalt to latest") .args(&[ - Arg::new("path").about("Table path").required(true), - Arg::new("full_path") - .about("Display files in full path") + Arg::new("uri").about("Table URI").required(true), + Arg::new("full_uri") + .about("Display files in full URI") .takes_value(false) - .long("full-path") + .long("full-uri") .short('f'), Arg::new("version") .takes_value(true) @@ -40,29 +40,26 @@ async fn main() -> anyhow::Result<()> { match matches.subcommand() { Some(("files", files_matches)) => { - let table_path = files_matches.value_of("path").unwrap(); + let table_uri = files_matches.value_of("uri").unwrap(); let table = match files_matches.value_of_t::("version") { - Ok(v) => deltalake::open_table_with_version(table_path, v).await?, + Ok(v) => deltalake::open_table_with_version(table_uri, v).await?, Err(clap::Error { kind: clap::ErrorKind::ArgumentNotFound, .. - }) => deltalake::open_table(table_path).await?, + }) => deltalake::open_table(table_uri).await?, Err(e) => e.exit(), }; - if files_matches.is_present("full_path") { - table - .get_file_paths() - .iter() - .for_each(|f| println!("{}", f)); + if files_matches.is_present("full_uri") { + table.get_file_uris().iter().for_each(|f| println!("{}", f)); } else { - table.get_files().iter().for_each(|f| println!("{}", f)); + table.get_files_iter().for_each(|f| println!("{}", f)); }; } Some(("info", info_matches)) => { - let table_path = info_matches.value_of("path").unwrap(); - let table = deltalake::open_table(table_path).await?; + let table_uri = info_matches.value_of("uri").unwrap(); + let table = deltalake::open_table(table_uri).await?; println!("{}", table); } _ => unreachable!(), diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 4e558bff38..506d95de14 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -3,7 +3,7 @@ // Reference: https://github.com/delta-io/delta/blob/master/PROTOCOL.md use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt; use std::io::{BufRead, BufReader, Cursor}; @@ -28,7 +28,7 @@ use super::action::{Action, DeltaOperation}; use super::partitions::{DeltaTablePartition, PartitionFilter}; use super::schema::*; use super::storage; -use super::storage::{StorageBackend, StorageError, UriError}; +use super::storage::{parse_uri, StorageBackend, StorageError, UriError}; use uuid::Uuid; /// Metadata for a checkpoint file @@ -158,11 +158,14 @@ pub enum DeltaTableError { /// The invalid partition filter used. partition_filter: String, }, - /// Error returned when Vacuume retention period is below the safe threshold + /// Error returned when Vacuum retention period is below the safe threshold #[error( "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)" )] InvalidVacuumRetentionPeriod, + /// Generic Delta Table error + #[error("Generic DeltaTable error: {0}")] + Generic(String), } /// Delta table metadata @@ -276,12 +279,29 @@ struct DeltaTableState { current_metadata: Option, } +#[inline] +/// Return path relative to parent_path +fn extract_rel_path<'a, 'b>( + parent_path: &'b str, + path: &'a str, +) -> Result<&'a str, DeltaTableError> { + if path.starts_with(&parent_path) { + // plus one to account for path separator + Ok(&path[parent_path.len() + 1..]) + } else { + Err(DeltaTableError::Generic(format!( + "Parent path `{}` is not a prefix of path `{}`", + parent_path, path + ))) + } +} + /// In memory representation of a Delta Table pub struct DeltaTable { /// The version of the table as of the most recent loaded Delta log entry. pub version: DeltaDataTypeVersion, - /// The path the DeltaTable was loaded from. - pub table_path: String, + /// The URI the DeltaTable was loaded from. + pub table_uri: String, state: DeltaTableState, @@ -290,26 +310,26 @@ pub struct DeltaTable { storage: Box, last_check_point: Option, - log_path: String, + log_uri: String, version_timestamp: HashMap, } impl DeltaTable { - fn version_to_log_path(&self, version: DeltaDataTypeVersion) -> String { + fn commit_uri_from_version(&self, version: DeltaDataTypeVersion) -> String { let version = format!("{:020}.json", version); - self.storage.join_path(&self.log_path, &version) + self.storage.join_path(&self.log_uri, &version) } - fn tmp_commit_log_path(&self, token: &str) -> String { + fn tmp_commit_uri(&self, token: &str) -> String { let path = format!("_commit_{}.json", token); - self.storage.join_path(&self.log_path, &path) + self.storage.join_path(&self.log_uri, &path) } fn get_checkpoint_data_paths(&self, check_point: &CheckPoint) -> Vec { let checkpoint_prefix_pattern = format!("{:020}", check_point.version); let checkpoint_prefix = self .storage - .join_path(&self.log_path, &checkpoint_prefix_pattern); + .join_path(&self.log_uri, &checkpoint_prefix_pattern); let mut checkpoint_data_paths = Vec::new(); match check_point.parts { @@ -332,7 +352,7 @@ impl DeltaTable { } async fn get_last_checkpoint(&self) -> Result { - let last_checkpoint_path = self.storage.join_path(&self.log_path, "_last_checkpoint"); + let last_checkpoint_path = self.storage.join_path(&self.log_uri, "_last_checkpoint"); let data = self.storage.get_obj(&last_checkpoint_path).await?; Ok(serde_json::from_slice(&data)?) @@ -352,7 +372,7 @@ impl DeltaTable { } let mut cp: Option = None; - let mut stream = self.storage.list_objs(&self.log_path).await?; + let mut stream = self.storage.list_objs(&self.log_uri).await?; while let Some(obj_meta) = stream.next().await { // Exit early if any objects can't be listed. @@ -410,8 +430,8 @@ impl DeltaTable { } async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> { - let log_path = self.version_to_log_path(version); - let commit_log_bytes = self.storage.get_obj(&log_path).await?; + let commit_uri = self.commit_uri_from_version(version); + let commit_log_bytes = self.storage.get_obj(&commit_uri).await?; let reader = BufReader::new(Cursor::new(commit_log_bytes)); self.apply_log_from_bufread(reader) @@ -457,7 +477,7 @@ impl DeltaTable { loop { match self .storage - .head_obj(&self.version_to_log_path(version)) + .head_obj(&self.commit_uri_from_version(version)) .await { Ok(meta) => { @@ -561,8 +581,8 @@ impl DeltaTable { version: DeltaDataTypeVersion, ) -> Result<(), DeltaTableError> { // check if version is valid - let log_path = self.version_to_log_path(version); - match self.storage.head_obj(&log_path).await { + let commit_uri = self.commit_uri_from_version(version); + match self.storage.head_obj(&commit_uri).await { Ok(_) => {} Err(StorageError::NotFound) => { return Err(DeltaTableError::InvalidVersion(version)); @@ -604,7 +624,7 @@ impl DeltaTable { None => { let meta = self .storage - .head_obj(&self.version_to_log_path(version)) + .head_obj(&self.commit_uri_from_version(version)) .await?; let ts = meta.modified.timestamp(); // also cache timestamp for version @@ -652,15 +672,27 @@ impl DeltaTable { Ok(files) } - /// Return the full file paths as strings for the partition(s) + /// Return the file uris as strings for the partition(s) + #[deprecated( + since = "0.4.0", + note = "Please use the get_file_uris_by_partitions function instead" + )] pub fn get_file_paths_by_partitions( &self, filters: &[PartitionFilter<&str>], + ) -> Result, DeltaTableError> { + self.get_file_uris_by_partitions(filters) + } + + /// Return the file uris as strings for the partition(s) + pub fn get_file_uris_by_partitions( + &self, + filters: &[PartitionFilter<&str>], ) -> Result, DeltaTableError> { let files = self.get_files_by_partitions(filters)?; Ok(files .iter() - .map(|fname| self.storage.join_path(&self.table_path, fname)) + .map(|fname| self.storage.join_path(&self.table_uri, fname)) .collect()) } @@ -681,12 +713,30 @@ impl DeltaTable { self.get_files_iter().collect() } - /// Returns a copy of the file paths present in the loaded state. + /// Returns file names present in the loaded state in HashSet + pub fn get_file_set(&self) -> HashSet<&str> { + self.state + .files + .iter() + .map(|add| add.path.as_str()) + .collect() + } + + /// Returns a URIs for all active files present in the current table version. + #[deprecated( + since = "0.4.0", + note = "Please use the get_file_uris function instead" + )] pub fn get_file_paths(&self) -> Vec { + self.get_file_uris() + } + + /// Returns a URIs for all active files present in the current table version. + pub fn get_file_uris(&self) -> Vec { self.state .files .iter() - .map(|add| self.storage.join_path(&self.table_path, &add.path)) + .map(|add| self.storage.join_path(&self.table_uri, &add.path)) .collect() } @@ -721,7 +771,7 @@ impl DeltaTable { } /// List files no longer referenced by a Delta table and are older than the retention threshold. - fn get_stale_files(&self, retention_hours: u64) -> Result, DeltaTableError> { + fn get_stale_files(&self, retention_hours: u64) -> Result, DeltaTableError> { if retention_hours < 168 { return Err(DeltaTableError::InvalidVacuumRetentionPeriod); } @@ -736,8 +786,8 @@ impl DeltaTable { .get_tombstones() .iter() .filter(|tombstone| tombstone.deletion_timestamp < delete_before_timestamp) - .map(|tombstone| self.storage.join_path(&self.table_path, &tombstone.path)) - .collect::>()) + .map(|tombstone| tombstone.path.as_str()) + .collect::>()) } /// Whether a path should be hidden for delta-related file operations, such as Vacuum. @@ -745,26 +795,17 @@ impl DeltaTable { /// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) /// indexes and these must be deleted when the data they are tied to is deleted. fn is_hidden_directory(&self, path_name: &str) -> Result { - Ok( - (path_name.starts_with(&self.storage.join_path(&self.table_path, ".")) - || path_name.starts_with(&self.storage.join_path(&self.table_path, "_"))) - && !path_name - .starts_with(&self.storage.join_path(&self.table_path, "_delta_index")) - && !path_name - .starts_with(&self.storage.join_path(&self.table_path, "_change_data")) - && !self - .state - .current_metadata - .as_ref() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .iter() - .any(|partition_column| { - path_name.starts_with( - &self.storage.join_path(&self.table_path, partition_column), - ) - }), - ) + Ok((path_name.starts_with('.') || path_name.starts_with('_')) + && !path_name.starts_with("_delta_index") + && !path_name.starts_with("_change_data") + && !self + .state + .current_metadata + .as_ref() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .iter() + .any(|partition_column| path_name.starts_with(partition_column))) } /// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. @@ -774,33 +815,50 @@ impl DeltaTable { retention_hours: u64, dry_run: bool, ) -> Result, DeltaTableError> { - let tombstones_path = self.get_stale_files(retention_hours)?; + let expired_tombstones = self.get_stale_files(retention_hours)?; + let valid_files = self.get_file_set(); + + let mut files_to_delete = vec![]; + let mut all_files = self.storage.list_objs(&self.table_uri).await?; + + // TODO: table_path is currently only used in vacuum, consider precalcualte it during table + // struct initialization if it ends up being used in other hot paths + let table_path = parse_uri(&self.table_uri)?.path(); - let mut tombstones = vec![]; - let mut all_files = self.storage.list_objs(&self.table_path).await?; while let Some(obj_meta) = all_files.next().await { let obj_meta = obj_meta?; - let is_not_valid_file = !self.get_file_paths().contains(&obj_meta.path); - let is_valid_tombstone = tombstones_path.contains(&obj_meta.path); - let is_not_hidden_directory = !self.is_hidden_directory(&obj_meta.path)?; - if is_not_valid_file && is_valid_tombstone && is_not_hidden_directory { - tombstones.push(obj_meta.path); + // We can't use self.table_uri as the prefix to extract relative path because + // obj_meta.path is not a URI. For example, for S3 objects, obj_meta.path is just the + // object key without `s3://` and bucket name. + let rel_path = extract_rel_path(&table_path, &obj_meta.path)?; + + if valid_files.contains(rel_path) // file is still being tracked in table + || !expired_tombstones.contains(rel_path) // file is not an expired tombstone + || self.is_hidden_directory(rel_path)? + { + continue; } + + files_to_delete.push(obj_meta.path); } if dry_run { - return Ok(tombstones); + return Ok(files_to_delete); } - for tombstone in &tombstones { - match self.storage.delete_obj(&tombstone).await { + for rel_path in &files_to_delete { + match self + .storage + .delete_obj(&self.storage.join_path(&self.table_uri, rel_path)) + .await + { Ok(_) => continue, Err(StorageError::NotFound) => continue, Err(err) => return Err(DeltaTableError::StorageError { source: err }), } } - Ok(tombstones) + Ok(files_to_delete) } /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or @@ -830,17 +888,18 @@ impl DeltaTable { /// NOTE: This is for advanced users. If you don't know why you need to use this method, please /// call one of the `open_table` helper methods instead. pub fn new( - table_path: &str, + table_uri: &str, storage_backend: Box, ) -> Result { - let log_path_normalized = storage_backend.join_path(table_path, "_delta_log"); + let table_uri = storage_backend.trim_path(table_uri); + let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log"); Ok(Self { version: 0, state: DeltaTableState::default(), storage: storage_backend, - table_path: table_path.to_string(), + table_uri, last_check_point: None, - log_path: log_path_normalized, + log_uri: log_uri_normalized, version_timestamp: HashMap::new(), }) } @@ -888,7 +947,7 @@ impl DeltaTable { impl fmt::Display for DeltaTable { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - writeln!(f, "DeltaTable({})", self.table_path)?; + writeln!(f, "DeltaTable({})", self.table_uri)?; writeln!(f, "\tversion: {}", self.version)?; match self.state.current_metadata.as_ref() { Some(metadata) => { @@ -909,7 +968,7 @@ impl fmt::Display for DeltaTable { impl std::fmt::Debug for DeltaTable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!(f, "DeltaTable <{}>", self.table_path) + write!(f, "DeltaTable <{}>", self.table_uri) } } @@ -1099,15 +1158,15 @@ impl<'a> DeltaTransaction<'a> { } let path = self.generate_parquet_filename(partitions); - let storage_path = self + let parquet_uri = self .delta_table .storage - .join_path(&self.delta_table.table_path, &path); + .join_path(&self.delta_table.table_uri, &path); - debug!("Writing a parquet file to {}", &storage_path); + debug!("Writing a parquet file to {}", &parquet_uri); self.delta_table .storage - .put_obj(&storage_path, &bytes) + .put_obj(&parquet_uri, &bytes) .await .map_err(|source| DeltaTransactionError::Storage { source })?; @@ -1199,8 +1258,8 @@ impl<'a> DeltaTransaction<'a> { // TODO: create a CommitInfo action and prepend it to actions. let log_entry = log_entry_from_actions(&self.actions)?; - let tmp_log_path = self.prepare_commit(log_entry.as_bytes()).await?; - let version = self.try_commit(&tmp_log_path, version).await?; + let tmp_commit_uri = self.prepare_commit(log_entry.as_bytes()).await?; + let version = self.try_commit(&tmp_commit_uri, version).await?; self.delta_table.update().await?; @@ -1213,11 +1272,11 @@ impl<'a> DeltaTransaction<'a> { ) -> Result { let mut attempt_number: u32 = 0; - let tmp_log_path = self.prepare_commit(log_entry).await?; + let tmp_commit_uri = self.prepare_commit(log_entry).await?; loop { let version = self.next_attempt_version().await?; - let commit_result = self.try_commit(&tmp_log_path, version).await; + let commit_result = self.try_commit(&tmp_commit_uri, version).await; match commit_result { Ok(v) => { @@ -1250,28 +1309,28 @@ impl<'a> DeltaTransaction<'a> { log_entry: &[u8], ) -> Result { let token = Uuid::new_v4().to_string(); - let tmp_log_path = self.delta_table.tmp_commit_log_path(&token); + let tmp_commit_uri = self.delta_table.tmp_commit_uri(&token); self.delta_table .storage - .put_obj(&tmp_log_path, log_entry) + .put_obj(&tmp_commit_uri, log_entry) .await?; - Ok(tmp_log_path) + Ok(tmp_commit_uri) } async fn try_commit( &mut self, - tmp_log_path: &str, + tmp_commit_uri: &str, version: DeltaDataTypeVersion, ) -> Result { - let log_path = self.delta_table.version_to_log_path(version); + let commit_uri = self.delta_table.commit_uri_from_version(version); // move temporary commit file to delta log directory // rely on storage to fail if the file already exists - self.delta_table .storage - .rename_obj(tmp_log_path, &log_path) + .rename_obj(tmp_commit_uri, &commit_uri) .await?; Ok(version) @@ -1340,9 +1399,9 @@ fn process_action( /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. -pub async fn open_table(table_path: &str) -> Result { - let storage_backend = storage::get_backend_for_uri(table_path)?; - let mut table = DeltaTable::new(table_path, storage_backend)?; +pub async fn open_table(table_uri: &str) -> Result { + let storage_backend = storage::get_backend_for_uri(table_uri)?; + let mut table = DeltaTable::new(table_uri, storage_backend)?; table.load().await?; Ok(table) @@ -1351,11 +1410,11 @@ pub async fn open_table(table_path: &str) -> Result /// Creates a DeltaTable from the given path and loads it with the metadata from the given version. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table_with_version( - table_path: &str, + table_uri: &str, version: DeltaDataTypeVersion, ) -> Result { - let storage_backend = storage::get_backend_for_uri(table_path)?; - let mut table = DeltaTable::new(table_path, storage_backend)?; + let storage_backend = storage::get_backend_for_uri(table_uri)?; + let mut table = DeltaTable::new(table_uri, storage_backend)?; table.load_version(version).await?; Ok(table) @@ -1364,10 +1423,10 @@ pub async fn open_table_with_version( /// Creates a DeltaTable from the given path. /// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp. /// Infers the storage backend to use from the scheme in the given table path. -pub async fn open_table_with_ds(table_path: &str, ds: &str) -> Result { +pub async fn open_table_with_ds(table_uri: &str, ds: &str) -> Result { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339(ds)?); - let storage_backend = storage::get_backend_for_uri(table_path)?; - let mut table = DeltaTable::new(table_path, storage_backend)?; + let storage_backend = storage::get_backend_for_uri(table_uri)?; + let mut table = DeltaTable::new(table_uri, storage_backend)?; table.load_with_datetime(datetime).await?; Ok(table) @@ -1380,9 +1439,8 @@ pub fn crate_version() -> &'static str { #[cfg(test)] mod tests { - use super::action; - use super::action::Action; - use super::{process_action, DeltaTableState}; + use super::*; + use pretty_assertions::assert_eq; use std::collections::HashMap; #[test] @@ -1412,4 +1470,38 @@ mod tests { assert_eq!(2, *state.app_transaction_version.get("abc").unwrap()); assert_eq!(1, *state.app_transaction_version.get("xyz").unwrap()); } + + #[cfg(feature = "s3")] + #[test] + fn normalize_table_uri() { + for table_uri in [ + "s3://tests/data/delta-0.8.0/", + "s3://tests/data/delta-0.8.0//", + "s3://tests/data/delta-0.8.0", + ] + .iter() + { + let be = storage::get_backend_for_uri(table_uri).unwrap(); + let table = DeltaTable::new(table_uri, be).unwrap(); + assert_eq!(table.table_uri, "s3://tests/data/delta-0.8.0"); + } + } + + #[test] + fn rel_path() { + assert!(matches!( + extract_rel_path("data/delta-0.8.0", "data/delta-0.8.0/abc/123"), + Ok("abc/123"), + )); + + assert!(matches!( + extract_rel_path("data/delta-0.8.0", "data/delta-0.8.0/abc.json"), + Ok("abc.json"), + )); + + assert!(matches!( + extract_rel_path("data/delta-0.8.0", "tests/abc.json"), + Err(DeltaTableError::Generic(_)), + )); + } } diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index d1ea62a3b3..d2bc18d84c 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -57,7 +57,7 @@ impl TableProvider for delta::DeltaTable { let schema = >::try_from( delta::DeltaTable::schema(&self).unwrap(), )?; - let filenames = self.get_file_paths(); + let filenames = self.get_file_uris(); let partitions = filenames .into_iter() diff --git a/rust/src/storage/azure.rs b/rust/src/storage/azure.rs index 7cc3b65978..6d11506469 100644 --- a/rust/src/storage/azure.rs +++ b/rust/src/storage/azure.rs @@ -201,3 +201,22 @@ impl StorageBackend for AdlsGen2Backend { unimplemented!("delete_obj not implemented for azure"); } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_azure_object_uri() { + let uri = parse_uri("abfss://fs@sa.dfs.core.windows.net/foo").unwrap(); + assert_eq!(uri.path(), "foo"); + assert_eq!( + uri.into_adlsgen2_object().unwrap(), + AdlsGen2Object { + account_name: "sa", + file_system: "fs", + path: "foo", + } + ); + } +} diff --git a/rust/src/storage/file/mod.rs b/rust/src/storage/file/mod.rs index 96c6941898..75b8736eeb 100644 --- a/rust/src/storage/file/mod.rs +++ b/rust/src/storage/file/mod.rs @@ -44,6 +44,7 @@ impl FileStorageBackend { #[async_trait::async_trait] impl StorageBackend for FileStorageBackend { + #[inline] fn join_path(&self, path: &str, path_to_join: &str) -> String { let new_path = Path::new(path); new_path @@ -53,6 +54,7 @@ impl StorageBackend for FileStorageBackend { .unwrap() } + #[inline] fn join_paths(&self, paths: &[&str]) -> String { let mut iter = paths.iter(); let mut path = PathBuf::from(iter.next().unwrap_or(&"")); @@ -60,6 +62,11 @@ impl StorageBackend for FileStorageBackend { path.into_os_string().into_string().unwrap() } + #[inline] + fn trim_path(&self, path: &str) -> String { + path.trim_end_matches(std::path::MAIN_SEPARATOR).to_string() + } + async fn head_obj(&self, path: &str) -> Result { let attr = fs::metadata(path).await?; @@ -116,6 +123,7 @@ impl StorageBackend for FileStorageBackend { #[cfg(test)] mod tests { + use super::super::parse_uri; use super::*; #[tokio::test] @@ -173,4 +181,35 @@ mod tests { assert_eq!(&backend.join_paths(&["foo"]), "foo",); assert_eq!(&backend.join_paths(&[]), "",); } + + #[test] + fn trim_path() { + let be = FileStorageBackend::new("root"); + let path = be.join_paths(&["foo", "bar"]); + assert_eq!(be.trim_path(&path), path); + assert_eq!( + be.trim_path(&format!("{}{}", path, std::path::MAIN_SEPARATOR)), + path, + ); + assert_eq!( + be.trim_path(&format!( + "{}{}{}", + path, + std::path::MAIN_SEPARATOR, + std::path::MAIN_SEPARATOR + )), + path, + ); + } + + #[test] + fn test_parse_uri() { + let uri = parse_uri("foo/bar").unwrap(); + assert_eq!(uri.path(), "foo/bar"); + assert_eq!(uri.into_localpath().unwrap(), "foo/bar"); + + let uri2 = parse_uri("file:///foo/bar").unwrap(); + assert_eq!(uri2.path(), "/foo/bar"); + assert_eq!(uri2.into_localpath().unwrap(), "/foo/bar"); + } } diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 7bf6c166c9..19670457ce 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -122,6 +122,18 @@ impl<'a> Uri<'a> { Uri::AdlsGen2Object(x) => Err(UriError::ExpectedSLocalPathUri(format!("{}", x))), } } + + /// Return URI path component as String + #[inline] + pub fn path(&self) -> String { + match self { + Uri::LocalPath(x) => x.to_string(), + #[cfg(feature = "s3")] + Uri::S3Object(x) => x.key.to_string(), + #[cfg(feature = "azure")] + Uri::AdlsGen2Object(x) => x.path.to_string(), + } + } } /// Parses the URI and returns a variant of the Uri enum for the appropriate storage backend based @@ -325,7 +337,13 @@ impl From for StorageError { /// Describes metadata of a storage object. pub struct ObjectMeta { - /// The path where the object is stored. + /// The path where the object is stored. This is the path component of the object URI. + /// + /// For example: + /// * path for `s3://bucket/foo/bar` should be `foo/bar`. + /// * path for `dir/foo/bar` should be `dir/foo/bar`. + /// + /// Given a table URI, object URI can be constructed by joining table URI with object path. pub path: String, /// The last time the object was modified in the storage backend. // The timestamp of a commit comes from the remote storage `lastModifiedTime`, and can be @@ -338,6 +356,7 @@ pub struct ObjectMeta { #[async_trait::async_trait] pub trait StorageBackend: Send + Sync + Debug { /// Create a new path by appending `path_to_join` as a new component to `path`. + #[inline] fn join_path(&self, path: &str, path_to_join: &str) -> String { let normalized_path = path.trim_end_matches('/'); format!("{}/{}", normalized_path, path_to_join) @@ -345,6 +364,7 @@ pub trait StorageBackend: Send + Sync + Debug { /// More efficient path join for multiple path components. Use this method if you need to /// combine more than two path components. + #[inline] fn join_paths(&self, paths: &[&str]) -> String { paths .iter() @@ -353,6 +373,12 @@ pub trait StorageBackend: Send + Sync + Debug { .join("/") } + /// Returns trimed path with trailing path separator removed. + #[inline] + fn trim_path(&self, path: &str) -> String { + path.trim_end_matches('/').to_string() + } + /// Fetch object metadata without reading the actual content async fn head_obj(&self, path: &str) -> Result; @@ -394,44 +420,3 @@ pub fn get_backend_for_uri(uri: &str) -> Result, Storage Uri::AdlsGen2Object(obj) => Ok(Box::new(azure::AdlsGen2Backend::new(obj.file_system)?)), } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_uri_local_file() { - let uri = parse_uri("foo/bar").unwrap(); - assert_eq!(uri.into_localpath().unwrap(), "foo/bar"); - - let uri2 = parse_uri("file:///foo/bar").unwrap(); - assert_eq!(uri2.into_localpath().unwrap(), "/foo/bar"); - } - - #[cfg(feature = "s3")] - #[test] - fn test_parse_s3_object_uri() { - let uri = parse_uri("s3://foo/bar").unwrap(); - assert_eq!( - uri.into_s3object().unwrap(), - s3::S3Object { - bucket: "foo", - key: "bar", - } - ); - } - - #[cfg(feature = "azure")] - #[test] - fn test_parse_azure_object_uri() { - let uri = parse_uri("abfss://fs@sa.dfs.core.windows.net/foo").unwrap(); - assert_eq!( - uri.into_adlsgen2_object().unwrap(), - azure::AdlsGen2Object { - account_name: "sa", - file_system: "fs", - path: "foo", - } - ); - } -} diff --git a/rust/src/storage/s3/mod.rs b/rust/src/storage/s3/mod.rs index fcdecc3a8d..876f6e3671 100644 --- a/rust/src/storage/s3/mod.rs +++ b/rust/src/storage/s3/mod.rs @@ -581,4 +581,25 @@ mod tests { assert_eq!(&backend.join_paths(&["foo"]), "foo",); assert_eq!(&backend.join_paths(&[]), "",); } + + #[test] + fn trim_path() { + let be = S3StorageBackend::new().unwrap(); + assert_eq!(be.trim_path("s3://foo/bar"), "s3://foo/bar"); + assert_eq!(be.trim_path("s3://foo/bar/"), "s3://foo/bar"); + assert_eq!(be.trim_path("/foo/bar//"), "/foo/bar"); + } + + #[test] + fn parse_s3_object_uri() { + let uri = parse_uri("s3://foo/bar/baz").unwrap(); + assert_eq!(uri.path(), "bar/baz"); + assert_eq!( + uri.into_s3object().unwrap(), + S3Object { + bucket: "foo", + key: "bar/baz", + } + ); + } } diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 4a1db5beef..20c4663e43 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -152,7 +152,7 @@ async fn read_delta_8_0_table_with_partitions() { #[cfg(unix)] assert_eq!( - table.get_file_paths_by_partitions(&filters).unwrap(), + table.get_file_uris_by_partitions(&filters).unwrap(), vec![ "./tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet".to_string(), "./tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet".to_string() @@ -160,7 +160,7 @@ async fn read_delta_8_0_table_with_partitions() { ); #[cfg(windows)] assert_eq!( - table.get_file_paths_by_partitions(&filters).unwrap(), + table.get_file_uris_by_partitions(&filters).unwrap(), vec![ "./tests/data/delta-0.8.0-partitioned\\year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet".to_string(), "./tests/data/delta-0.8.0-partitioned\\year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet".to_string() @@ -210,12 +210,12 @@ async fn read_delta_8_0_table_with_partitions() { #[tokio::test] async fn vacuum_delta_8_0_table() { - let mut table = deltalake::open_table("./tests/data/delta-0.8.0") + let backend = FileStorageBackend::new(""); + let mut table = deltalake::open_table(&backend.join_paths(&["tests", "data", "delta-0.8.0"])) .await .unwrap(); let retention_hours = 1; - let backend = FileStorageBackend::new("./tests/data/delta-0.8.0"); let dry_run = true; assert!(matches!( @@ -227,10 +227,12 @@ async fn vacuum_delta_8_0_table() { assert_eq!( table.vacuum(retention_hours, dry_run).await.unwrap(), - vec![backend.join_path( - "./tests/data/delta-0.8.0", - "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet" - )] + vec![backend.join_paths(&[ + "tests", + "data", + "delta-0.8.0", + "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet", + ])] ); let retention_hours = SystemTime::now() diff --git a/rust/tests/read_simple_table_test.rs b/rust/tests/read_simple_table_test.rs index 04e845c3c9..71af0d7422 100644 --- a/rust/tests/read_simple_table_test.rs +++ b/rust/tests/read_simple_table_test.rs @@ -44,7 +44,7 @@ async fn read_simple_table() { "./tests/data/simple_table/part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet".to_string(), "./tests/data/simple_table/part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet".to_string(), ]; - assert_eq!(table.get_file_paths(), paths); + assert_eq!(table.get_file_uris(), paths); } #[cfg(windows)] { @@ -55,7 +55,7 @@ async fn read_simple_table() { "./tests/data/simple_table\\part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet".to_string(), "./tests/data/simple_table\\part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet".to_string(), ]; - assert_eq!(table.get_file_paths(), paths); + assert_eq!(table.get_file_uris(), paths); } } diff --git a/rust/tests/write_exploration.rs b/rust/tests/write_exploration.rs index 6a64c4cfed..e3638dbcf8 100644 --- a/rust/tests/write_exploration.rs +++ b/rust/tests/write_exploration.rs @@ -359,7 +359,7 @@ async fn smoke_test() { let mut delta_table = deltalake::open_table("./tests/data/write_exploration") .await .unwrap(); - let delta_writer = DeltaWriter::for_table_path(delta_table.table_path.clone()) + let delta_writer = DeltaWriter::for_table_path(delta_table.table_uri.clone()) .await .unwrap();