Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ use crate::{Error, ErrorKind, Result};
/// like Cloudlare R2 requires all chunk sizes to be consistent except for the last.
pub const IO_CHUNK_SIZE: &str = "io.write.chunk-size";

/// Configuration property for setting the timeout duration for IO operations.
///
/// This timeout is applied to individual operations like read, write, delete, etc.
/// Value should be in seconds. If not set, uses OpenDAL's default timeout.
pub const IO_TIMEOUT_SECONDS: &str = "io.timeout";

/// Configuration property for setting the maximum number of retries for IO operations.
///
/// This controls how many times an operation will be retried upon failure.
/// If not set, uses OpenDAL's default retry count.
pub const IO_MAX_RETRIES: &str = "io.max-retries";

/// Configuration property for setting the minimum retry delay in milliseconds.
///
/// This controls the minimum delay between retry attempts.
/// If not set, uses OpenDAL's default minimum delay.
pub const IO_RETRY_MIN_DELAY_MS: &str = "io.retry.min-delay";

/// Configuration property for setting the maximum retry delay in milliseconds.
///
/// This controls the maximum delay between retry attempts.
/// If not set, uses OpenDAL's default maximum delay.
pub const IO_RETRY_MAX_DELAY_MS: &str = "io.retry.max-delay";

/// FileIO implementation, used to manipulate files in underlying storage.
///
/// # Note
Expand Down Expand Up @@ -96,7 +120,9 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn delete(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self
.inner
.create_operator_with_config(&path, &self.builder.props)?;
Ok(op.delete(relative_path).await?)
}

Expand All @@ -106,7 +132,9 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn remove_all(&self, path: impl AsRef<str>) -> Result<()> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self
.inner
.create_operator_with_config(&path, &self.builder.props)?;
Ok(op.remove_all(relative_path).await?)
}

Expand All @@ -116,7 +144,9 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub async fn exists(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self
.inner
.create_operator_with_config(&path, &self.builder.props)?;
Ok(op.exists(relative_path).await?)
}

Expand All @@ -126,7 +156,9 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_input(&self, path: impl AsRef<str>) -> Result<InputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self
.inner
.create_operator_with_config(&path, &self.builder.props)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();
Ok(InputFile {
Expand All @@ -142,16 +174,18 @@ impl FileIO {
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
pub fn new_output(&self, path: impl AsRef<str>) -> Result<OutputFile> {
let (op, relative_path) = self.inner.create_operator(&path)?;
let (op, relative_path) = self
.inner
.create_operator_with_config(&path, &self.builder.props)?;
let path = path.as_ref().to_string();
let relative_path_pos = path.len() - relative_path.len();

// ADLS requires append mode for writes
#[cfg(feature = "storage-azdls")]
let append_file = matches!(self.inner.as_ref(), Storage::Azdls { .. });
#[cfg(not(feature = "storage-azdls"))]
let append_file = false;

Ok(OutputFile {
op,
path,
Expand Down Expand Up @@ -426,7 +460,10 @@ mod tests {
use tempfile::TempDir;

use super::{FileIO, FileIOBuilder};
use crate::io::IO_CHUNK_SIZE;
use crate::io::{
IO_CHUNK_SIZE, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS,
IO_TIMEOUT_SECONDS,
};

fn create_local_file_io() -> FileIO {
FileIOBuilder::new_fs_io().build().unwrap()
Expand Down Expand Up @@ -574,4 +611,22 @@ mod tests {
let output_file = io.new_output(&path).unwrap();
assert_eq!(Some(32 * 1024 * 1024), output_file.chunk_size);
}

#[test]
fn test_file_io_builder_with_timeout_and_retry_config() {
let builder = FileIOBuilder::new("memory")
.with_prop(IO_TIMEOUT_SECONDS, "30")
.with_prop(IO_MAX_RETRIES, "5")
.with_prop(IO_RETRY_MIN_DELAY_MS, "100")
.with_prop(IO_RETRY_MAX_DELAY_MS, "5000");

assert_eq!(builder.props.get(IO_TIMEOUT_SECONDS).unwrap(), "30");
assert_eq!(builder.props.get(IO_MAX_RETRIES).unwrap(), "5");
assert_eq!(builder.props.get(IO_RETRY_MIN_DELAY_MS).unwrap(), "100");
assert_eq!(builder.props.get(IO_RETRY_MAX_DELAY_MS).unwrap(), "5000");

// Verify that FileIO can be built with these configurations
let file_io = builder.build();
assert!(file_io.is_ok());
}
}
48 changes: 44 additions & 4 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use opendal::layers::{RetryLayer, TimeoutLayer};
#[cfg(feature = "storage-azblob")]
Expand All @@ -32,7 +34,9 @@ use opendal::{Operator, Scheme};

#[cfg(feature = "storage-azdls")]
use super::AzureStorageScheme;
use super::FileIOBuilder;
use super::{
FileIOBuilder, IO_MAX_RETRIES, IO_RETRY_MAX_DELAY_MS, IO_RETRY_MIN_DELAY_MS, IO_TIMEOUT_SECONDS,
};
use crate::{Error, ErrorKind};

/// The storage carries all supported storage services in iceberg
Expand Down Expand Up @@ -116,21 +120,23 @@ impl Storage {
}
}

/// Creates operator from path.
/// Creates operator from path with configuration.
///
/// # Arguments
///
/// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`].
/// * config: Configuration properties for timeout and retry settings.
///
/// # Returns
///
/// The return value consists of two parts:
///
/// * An [`opendal::Operator`] instance used to operate on file.
/// * Relative path to the root uri of [`opendal::Operator`].
pub(crate) fn create_operator<'a>(
pub(crate) fn create_operator_with_config<'a>(
&self,
path: &'a impl AsRef<str>,
config: &HashMap<String, String>,
) -> crate::Result<(Operator, &'a str)> {
let path = path.as_ref();
let (operator, relative_path): (Operator, &str) = match self {
Expand Down Expand Up @@ -237,7 +243,41 @@ impl Storage {

// Transient errors are common for object stores; however there's no
// harm in retrying temporary failures for other storage backends as well.
let operator = operator.layer(TimeoutLayer::new()).layer(RetryLayer::new());

// Configure timeout layer
let operator = if let Some(timeout_str) = config.get(IO_TIMEOUT_SECONDS) {
if let Ok(timeout_secs) = timeout_str.parse::<u64>() {
operator
.layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs)))
} else {
operator.layer(TimeoutLayer::new())
}
} else {
operator.layer(TimeoutLayer::new())
};

// Configure retry layer
let mut retry_layer = RetryLayer::new();

if let Some(max_retries_str) = config.get(IO_MAX_RETRIES) {
if let Ok(max_retries) = max_retries_str.parse::<usize>() {
retry_layer = retry_layer.with_max_times(max_retries);
}
}

if let Some(min_delay_str) = config.get(IO_RETRY_MIN_DELAY_MS) {
if let Ok(min_delay_ms) = min_delay_str.parse::<u64>() {
retry_layer = retry_layer.with_min_delay(Duration::from_millis(min_delay_ms));
}
}

if let Some(max_delay_str) = config.get(IO_RETRY_MAX_DELAY_MS) {
if let Ok(max_delay_ms) = max_delay_str.parse::<u64>() {
retry_layer = retry_layer.with_max_delay(Duration::from_millis(max_delay_ms));
}
}

let operator = operator.layer(retry_layer);

Ok((operator, relative_path))
}
Expand Down
Loading