Skip to content

Commit

Permalink
fix windows build
Browse files Browse the repository at this point in the history
also move rename temp file clean up logic into put_obj method in
preparation for moving the temp file creation logic into optimistic
delta commit loop.
  • Loading branch information
Qingping Hou authored and houqp committed Mar 21, 2021
1 parent 0d8b23f commit 52ee3a3
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 83 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ defaults:

jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
- macOS-10.15
- windows-2019

runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
# the whole target dir is over 400MB cache limit, so we have to split up
Expand Down
56 changes: 55 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.4.2"
version = "0.4.3"
authors = ["Qingping Hou <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ s3 = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts"]
utime = "0.3"
serial_test = "*"
pretty_assertions = "0"
tempdir = "0"
24 changes: 12 additions & 12 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::io::{BufRead, BufReader, Cursor};
use arrow::error::ArrowError;
use chrono::{DateTime, FixedOffset, Utc};
use futures::StreamExt;
use lazy_static::lazy_static;
use log::debug;
use parquet::errors::ParquetError;
use parquet::file::{
Expand Down Expand Up @@ -290,23 +291,22 @@ impl DeltaTable {
&self,
version: DeltaDataTypeVersion,
) -> Result<Option<CheckPoint>, DeltaTableError> {
let mut cp: Option<CheckPoint> = None;
let root = self.storage.join_path(r"^*", "_delta_log");
let regex_checkpoint = self
.storage
.join_path(&root, r"(\d{20})\.checkpoint\.parquet$");
let regex_checkpoint_parts = self
.storage
.join_path(&root, r"(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$");
let re_checkpoint = Regex::new(&regex_checkpoint).unwrap();
let re_checkpoint_parts = Regex::new(&regex_checkpoint_parts).unwrap();
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r#"^*[/\\]_delta_log[/\\](\d{20})\.checkpoint\.parquet$"#).unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex = Regex::new(
r#"^*[/\\]_delta_log[/\\](\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#
)
.unwrap();
}

let mut cp: Option<CheckPoint> = None;
let mut stream = self.storage.list_objs(&self.log_path).await?;

while let Some(obj_meta) = stream.next().await {
// Exit early if any objects can't be listed.
let obj_meta = obj_meta?;
if let Some(captures) = re_checkpoint.captures(&obj_meta.path) {
if let Some(captures) = CHECKPOINT_REGEX.captures(&obj_meta.path) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap();
if curr_ver > version {
Expand All @@ -323,7 +323,7 @@ impl DeltaTable {
continue;
}

if let Some(captures) = re_checkpoint_parts.captures(&obj_meta.path) {
if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(&obj_meta.path) {
let curr_ver_str = captures.get(1).unwrap().as_str();
let curr_ver: DeltaDataTypeVersion = curr_ver_str.parse().unwrap();
if curr_ver > version {
Expand Down
37 changes: 36 additions & 1 deletion rust/src/storage/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use uuid::Uuid;

mod rename;

/// NOTE: The file storage backend is not multi-writer safe in Windows due to lack of support for
/// native atomic file rename system call.
#[derive(Default, Debug)]
pub struct FileStorageBackend {
root: String,
Expand Down Expand Up @@ -74,7 +76,12 @@ impl StorageBackend for FileStorageBackend {
async fn put_obj(&self, path: &str, obj_bytes: &[u8]) -> Result<(), StorageError> {
let tmp_path = create_tmp_file_with_retry(self, path, obj_bytes).await?;

rename::rename(&tmp_path, path)
if let Err(e) = rename::atomic_rename(&tmp_path, path) {
fs::remove_file(tmp_path).await.unwrap_or(());
return Err(e);
}

Ok(())
}
}

Expand Down Expand Up @@ -119,6 +126,34 @@ async fn create_tmp_file_with_retry(
mod tests {
use super::*;

#[tokio::test]
async fn put_obj() {
let tmp_dir = tempdir::TempDir::new("rename_test").unwrap();
let backend = FileStorageBackend::new(tmp_dir.path().to_str().unwrap());
let tmp_dir_path = tmp_dir.path();
let new_file_path = tmp_dir_path.join("new_file");

if let Err(e) = backend
.put_obj(new_file_path.to_str().unwrap(), b"hello")
.await
{
panic!(format!("Expect put_obj to return Ok, got Err: {:#?}", e));
}

// second try should result in already exists error
assert!(matches!(
backend.put_obj(new_file_path.to_str().unwrap(), b"hello").await,
Err(StorageError::AlreadyExists(s)) if s == new_file_path.to_str().unwrap(),
));

// make sure rename failure doesn't leave any dangling temp file
let paths = std::fs::read_dir(tmp_dir_path)
.unwrap()
.map(|entry| entry.unwrap().path())
.collect::<Vec<_>>();
assert_eq!(paths, [new_file_path]);
}

#[test]
fn join_multiple_paths() {
let backend = FileStorageBackend::new("./");
Expand Down
Loading

0 comments on commit 52ee3a3

Please sign in to comment.