From 52ee3a3a5a27efe59e5894b5f694480486b15c16 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Fri, 19 Mar 2021 23:01:59 -0700 Subject: [PATCH] fix windows build also move rename temp file clean up logic into put_obj method in preparation for moving the temp file creation logic into optimistic delta commit loop. --- .github/workflows/build.yml | 9 +- Cargo.lock | 56 ++++++++++- python/Cargo.toml | 2 +- rust/Cargo.toml | 1 + rust/src/delta.rs | 24 ++--- rust/src/storage/file/mod.rs | 37 ++++++- rust/src/storage/file/rename.rs | 167 +++++++++++++++++++------------- rust/src/storage/mod.rs | 10 +- 8 files changed, 223 insertions(+), 83 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f349fdfd88..d490733b51 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,8 +12,15 @@ defaults: jobs: build: - runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + - macOS-10.15 + - windows-2019 + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v2 # the whole target dir is over 400MB cache limit, so we have to split up diff --git a/Cargo.lock b/Cargo.lock index 089575286c..eeb4bd0a09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -622,6 +622,7 @@ dependencies = [ "serde", "serde_json", "serial_test", + "tempdir", "thiserror", "tokio", "tokio-stream", @@ -631,7 +632,7 @@ dependencies = [ [[package]] name = "deltalake-python" -version = "0.4.2" +version = "0.4.3" dependencies = [ "arrow", "deltalake", @@ -846,6 +847,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + [[package]] name = "futures" version = "0.3.13" @@ -1889,6 +1896,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" +dependencies = [ + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "rdrand", + "winapi", +] + [[package]] name = "rand" version = "0.7.3" @@ -1934,6 +1954,21 @@ dependencies = [ "rand_core 0.6.2", ] +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + [[package]] name = "rand_core" version = "0.5.1" @@ -1970,6 +2005,15 @@ dependencies = [ "rand_core 0.6.2", ] +[[package]] +name = "rdrand" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +dependencies = [ + "rand_core 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -2619,6 +2663,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempdir" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" +dependencies = [ + "rand 0.4.6", + "remove_dir_all", +] + [[package]] name = "tempfile" version = "3.2.0" diff --git a/python/Cargo.toml b/python/Cargo.toml index 9c80aeeeeb..2f4be967c9 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-python" -version = "0.4.2" +version = "0.4.3" authors = ["Qingping Hou "] homepage = "https://github.com/delta-io/delta-rs" license = "Apache-2.0" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 869e9fd8e9..52721e6d5e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -59,3 +59,4 @@ s3 = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts"] utime = "0.3" serial_test = "*" pretty_assertions = "0" +tempdir = "0" diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 8d5401542b..87a4e54d91 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -8,6 +8,7 @@ use std::io::{BufRead, BufReader, Cursor}; use arrow::error::ArrowError; use chrono::{DateTime, FixedOffset, Utc}; use futures::StreamExt; +use lazy_static::lazy_static; use log::debug; use parquet::errors::ParquetError; use parquet::file::{ @@ -290,23 +291,22 @@ impl DeltaTable { &self, version: DeltaDataTypeVersion, ) -> Result, DeltaTableError> { - let mut cp: Option = None; - let root = self.storage.join_path(r"^*", "_delta_log"); - let regex_checkpoint = self - .storage - .join_path(&root, r"(\d{20})\.checkpoint\.parquet$"); - let regex_checkpoint_parts = self - .storage - .join_path(&root, r"(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"); - let re_checkpoint = Regex::new(®ex_checkpoint).unwrap(); - let re_checkpoint_parts = Regex::new(®ex_checkpoint_parts).unwrap(); + lazy_static! { + static ref CHECKPOINT_REGEX: Regex = + Regex::new(r#"^*[/\\]_delta_log[/\\](\d{20})\.checkpoint\.parquet$"#).unwrap(); + static ref CHECKPOINT_PARTS_REGEX: Regex = Regex::new( + r#"^*[/\\]_delta_log[/\\](\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"# + ) + .unwrap(); + } + let mut cp: Option = None; let mut stream = self.storage.list_objs(&self.log_path).await?; while let Some(obj_meta) = stream.next().await { // Exit early if any objects can't be listed. let obj_meta = obj_meta?; - if let Some(captures) = re_checkpoint.captures(&obj_meta.path) { + if let Some(captures) = CHECKPOINT_REGEX.captures(&obj_meta.path) { let curr_ver_str = captures.get(1).unwrap().as_str(); let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap(); if curr_ver > version { @@ -323,7 +323,7 @@ impl DeltaTable { continue; } - if let Some(captures) = re_checkpoint_parts.captures(&obj_meta.path) { + if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(&obj_meta.path) { let curr_ver_str = captures.get(1).unwrap().as_str(); let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap(); if curr_ver > version { diff --git a/rust/src/storage/file/mod.rs b/rust/src/storage/file/mod.rs index a88c045128..dd91afcc23 100644 --- a/rust/src/storage/file/mod.rs +++ b/rust/src/storage/file/mod.rs @@ -12,6 +12,8 @@ use uuid::Uuid; mod rename; +/// NOTE: The file storage backend is not multi-writer safe in Windows due to lack of support for +/// native atomic file rename system call. #[derive(Default, Debug)] pub struct FileStorageBackend { root: String, @@ -74,7 +76,12 @@ impl StorageBackend for FileStorageBackend { async fn put_obj(&self, path: &str, obj_bytes: &[u8]) -> Result<(), StorageError> { let tmp_path = create_tmp_file_with_retry(self, path, obj_bytes).await?; - rename::rename(&tmp_path, path) + if let Err(e) = rename::atomic_rename(&tmp_path, path) { + fs::remove_file(tmp_path).await.unwrap_or(()); + return Err(e); + } + + Ok(()) } } @@ -119,6 +126,34 @@ async fn create_tmp_file_with_retry( mod tests { use super::*; + #[tokio::test] + async fn put_obj() { + let tmp_dir = tempdir::TempDir::new("rename_test").unwrap(); + let backend = FileStorageBackend::new(tmp_dir.path().to_str().unwrap()); + let tmp_dir_path = tmp_dir.path(); + let new_file_path = tmp_dir_path.join("new_file"); + + if let Err(e) = backend + .put_obj(new_file_path.to_str().unwrap(), b"hello") + .await + { + panic!(format!("Expect put_obj to return Ok, got Err: {:#?}", e)); + } + + // second try should result in already exists error + assert!(matches!( + backend.put_obj(new_file_path.to_str().unwrap(), b"hello").await, + Err(StorageError::AlreadyExists(s)) if s == new_file_path.to_str().unwrap(), + )); + + // make sure rename failure doesn't leave any dangling temp file + let paths = std::fs::read_dir(tmp_dir_path) + .unwrap() + .map(|entry| entry.unwrap().path()) + .collect::>(); + assert_eq!(paths, [new_file_path]); + } + #[test] fn join_multiple_paths() { let backend = FileStorageBackend::new("./"); diff --git a/rust/src/storage/file/rename.rs b/rust/src/storage/file/rename.rs index c455f2f0ed..037d04ce2b 100644 --- a/rust/src/storage/file/rename.rs +++ b/rust/src/storage/file/rename.rs @@ -1,61 +1,81 @@ use crate::StorageError; -use std::ffi::CString; - -#[cfg(target_os = "linux")] -const RENAME_NOREPLACE: libc::c_uint = 1; - -#[cfg(target_os = "linux")] -extern "C" { - fn renameat2( - olddirfd: libc::c_int, - oldpath: *const libc::c_char, - newdirfd: libc::c_int, - newpath: *const libc::c_char, - flags: libc::c_uint, - ) -> libc::c_int; -} - -pub fn rename(from: &str, to: &str) -> Result<(), StorageError> { - let ret = unsafe { - let from = to_c_string(from)?; - let to = to_c_string(to)?; - platform_specific_rename(from.as_ptr(), to.as_ptr()) - }; - if ret != 0 { - std::fs::remove_file(from).unwrap_or(()); +#[cfg(windows)] +mod imp { + use super::*; - let e = errno::errno(); - if let libc::EEXIST = e.0 { - return Err(StorageError::AlreadyExists(String::from(to))); + pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> { + // doing best effort in windows since there is no native atomic rename support + if std::fs::metadata(to).is_ok() { + return Err(StorageError::AlreadyExists(to.to_string())); } + std::fs::rename(from, to).map_err(|e| { + StorageError::other_std_io_err(format!("failed to rename {} to {}: {}", from, to, e)) + }) + } +} - return Err(StorageError::Io { - source: custom_io_error(format!("{}", e)), - }); +#[cfg(unix)] +mod imp { + use super::*; + use std::ffi::CString; + + #[cfg(target_os = "linux")] + const RENAME_NOREPLACE: libc::c_uint = 1; + + #[cfg(target_os = "linux")] + extern "C" { + fn renameat2( + olddirfd: libc::c_int, + oldpath: *const libc::c_char, + newdirfd: libc::c_int, + newpath: *const libc::c_char, + flags: libc::c_uint, + ) -> libc::c_int; } - Ok(()) -} + fn to_c_string(p: &str) -> Result { + CString::new(p).map_err(|e| StorageError::Generic(format!("{}", e))) + } + + pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> { + let ret = unsafe { + let from = to_c_string(from)?; + let to = to_c_string(to)?; + platform_specific_rename(from.as_ptr(), to.as_ptr()) + }; + + if ret != 0 { + let e = errno::errno(); + if let libc::EEXIST = e.0 { + return Err(StorageError::AlreadyExists(String::from(to))); + } -unsafe fn platform_specific_rename(from: *const libc::c_char, to: *const libc::c_char) -> i32 { - cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, RENAME_NOREPLACE) - } else if #[cfg(target_os = "macos")] { - libc::renamex_np(from, to, libc::RENAME_EXCL) - } else { - unimplemented!() + return Err(StorageError::other_std_io_err(format!( + "failed to rename {} to {}: {}", + from, to, e + ))); } + + Ok(()) } -} -fn to_c_string(p: &str) -> Result { - CString::new(p).map_err(|e| StorageError::Generic(format!("{}", e))) + unsafe fn platform_specific_rename(from: *const libc::c_char, to: *const libc::c_char) -> i32 { + cfg_if::cfg_if! { + if #[cfg(target_os = "linux")] { + renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, RENAME_NOREPLACE) + } else if #[cfg(target_os = "macos")] { + libc::renamex_np(from, to, libc::RENAME_EXCL) + } else { + unimplemented!() + } + } + } } -fn custom_io_error(desc: String) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, desc) +#[inline] +pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> { + imp::atomic_rename(from, to) } #[cfg(test)] @@ -64,41 +84,56 @@ mod tests { use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; - use uuid::Uuid; #[test] - fn test_rename() { - let tmp = std::env::temp_dir().join(Uuid::new_v4().to_string()); - std::fs::create_dir_all(&tmp).unwrap(); - let a = create_file(&tmp, "a"); - let b = create_file(&tmp, "b"); - let c = &tmp.join("c"); + fn test_atomic_rename() { + let tmp_dir = tempdir::TempDir::new("test_atomic_rename").unwrap(); + let a = create_file(&tmp_dir.path(), "a"); + let b = create_file(&tmp_dir.path(), "b"); + let c = &tmp_dir.path().join("c"); + + // unsuccessful move not_exists to C, not_exists is missing + match atomic_rename("not_exists", c.to_str().unwrap()) { + Err(StorageError::Io { source: e }) => { + cfg_if::cfg_if! { + if #[cfg(target_os = "windows")] { + assert_eq!( + e.to_string(), + format!( + "failed to rename not_exists to {}: The system cannot find the file specified. (os error 2)", + c.to_str().unwrap() + ) + ); + } else { + assert_eq!( + e.to_string(), + format!( + "failed to rename not_exists to {}: No such file or directory", + c.to_str().unwrap() + ) + ); + } + } + } + Err(e) => panic!(format!("expect std::io::Error, got: {:#}", e)), + Ok(()) => panic!("expect rename to fail with Err, but got Ok"), + } // successful move A to C assert!(a.exists()); assert!(!c.exists()); - rename(a.to_str().unwrap(), c.to_str().unwrap()).unwrap(); + atomic_rename(a.to_str().unwrap(), c.to_str().unwrap()).unwrap(); assert!(!a.exists()); assert!(c.exists()); - // unsuccessful move B to C, C already exists, B is deleted afterwards + // unsuccessful move B to C, C already exists, B is not deleted assert!(b.exists()); - match rename(b.to_str().unwrap(), c.to_str().unwrap()) { + match atomic_rename(b.to_str().unwrap(), c.to_str().unwrap()) { Err(StorageError::AlreadyExists(p)) => assert_eq!(p, c.to_str().unwrap()), _ => panic!("unexpected"), } - assert!(!b.exists()); + assert!(b.exists()); assert_eq!(std::fs::read_to_string(c).unwrap(), "a"); - - // unsuccessful move B to C, B is missing - match rename(b.to_str().unwrap(), c.to_str().unwrap()) { - Err(StorageError::Io { source: e }) => { - assert_eq!(e.to_string(), "No such file or directory") - } - _ => panic!("unexpected"), - } - - std::fs::remove_dir_all(&tmp).unwrap(); } fn create_file(dir: &Path, name: &str) -> PathBuf { diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 8724b8bea1..49fa95492a 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -15,7 +15,7 @@ pub mod file; #[cfg(feature = "s3")] pub mod s3; -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, PartialEq)] pub enum UriError { #[error("Invalid URI scheme: {0}")] InvalidScheme(String), @@ -205,6 +205,14 @@ pub enum StorageError { }, } +impl StorageError { + pub fn other_std_io_err(desc: String) -> Self { + Self::Io { + source: std::io::Error::new(std::io::ErrorKind::Other, desc), + } + } +} + impl From for StorageError { fn from(error: std::io::Error) -> Self { match error.kind() {