Skip to content

Commit c234833

Browse files
getChanalamb
andauthored
Fix: Avoid recursive external error wrapping (#14371)
* Fix: Avoid recursive external error wrapping in type conversion. * cargo fmt * test TableProvider * Revert "test TableProvider" This reverts commit 127705e. * create WrappedError * Replace `ExternalError` with `WrappedError` in `RepartitionExec` * Replace `ExternalError` with `WrappedError` in Join Exec * sqllogictest * rename SharedError * Update comments and rename to DataFusionError::Shared * Improve API * fix clippy --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 4f6062e commit c234833

File tree

8 files changed

+88
-13
lines changed

8 files changed

+88
-13
lines changed

datafusion/common/src/error.rs

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ pub enum DataFusionError {
136136
/// human-readable messages, and locations in the source query that relate
137137
/// to the error in some way.
138138
Diagnostic(Box<Diagnostic>, Box<DataFusionError>),
139+
/// A [`DataFusionError`] which shares an underlying [`DataFusionError`].
140+
///
141+
/// This is useful when the same underlying [`DataFusionError`] is passed
142+
/// to multiple receivers. For example, when the source of a repartition
143+
/// errors and the error is propagated to multiple consumers.
144+
Shared(Arc<DataFusionError>),
139145
}
140146

141147
#[macro_export]
@@ -262,6 +268,17 @@ impl From<DataFusionError> for ArrowError {
262268
}
263269
}
264270

271+
impl From<&Arc<DataFusionError>> for DataFusionError {
272+
fn from(e: &Arc<DataFusionError>) -> Self {
273+
if let DataFusionError::Shared(e_inner) = e.as_ref() {
274+
// don't re-wrap
275+
DataFusionError::Shared(Arc::clone(e_inner))
276+
} else {
277+
DataFusionError::Shared(Arc::clone(e))
278+
}
279+
}
280+
}
281+
265282
#[cfg(feature = "parquet")]
266283
impl From<ParquetError> for DataFusionError {
267284
fn from(e: ParquetError) -> Self {
@@ -298,7 +315,16 @@ impl From<ParserError> for DataFusionError {
298315

299316
impl From<GenericError> for DataFusionError {
300317
fn from(err: GenericError) -> Self {
301-
DataFusionError::External(err)
318+
// If the error is already a DataFusionError, not wrapping it.
319+
if err.is::<DataFusionError>() {
320+
if let Ok(e) = err.downcast::<DataFusionError>() {
321+
*e
322+
} else {
323+
unreachable!()
324+
}
325+
} else {
326+
DataFusionError::External(err)
327+
}
302328
}
303329
}
304330

@@ -334,6 +360,7 @@ impl Error for DataFusionError {
334360
DataFusionError::Context(_, e) => Some(e.as_ref()),
335361
DataFusionError::Substrait(_) => None,
336362
DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
363+
DataFusionError::Shared(e) => Some(e.as_ref()),
337364
}
338365
}
339366
}
@@ -448,6 +475,7 @@ impl DataFusionError {
448475
DataFusionError::Context(_, _) => "",
449476
DataFusionError::Substrait(_) => "Substrait error: ",
450477
DataFusionError::Diagnostic(_, _) => "",
478+
DataFusionError::Shared(_) => "",
451479
}
452480
}
453481

@@ -489,6 +517,7 @@ impl DataFusionError {
489517
}
490518
DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()),
491519
DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()),
520+
DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
492521
}
493522
}
494523

@@ -713,7 +742,7 @@ pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionE
713742
mod test {
714743
use std::sync::Arc;
715744

716-
use crate::error::DataFusionError;
745+
use crate::error::{DataFusionError, GenericError};
717746
use arrow::error::ArrowError;
718747

719748
#[test]
@@ -867,6 +896,43 @@ mod test {
867896
);
868897
}
869898

899+
#[test]
900+
fn external_error() {
901+
// assert not wrapping DataFusionError
902+
let generic_error: GenericError =
903+
Box::new(DataFusionError::Plan("test".to_string()));
904+
let datafusion_error: DataFusionError = generic_error.into();
905+
println!("{}", datafusion_error.strip_backtrace());
906+
assert_eq!(
907+
datafusion_error.strip_backtrace(),
908+
"Error during planning: test"
909+
);
910+
911+
// assert wrapping other Error
912+
let generic_error: GenericError =
913+
Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error"));
914+
let datafusion_error: DataFusionError = generic_error.into();
915+
println!("{}", datafusion_error.strip_backtrace());
916+
assert_eq!(
917+
datafusion_error.strip_backtrace(),
918+
"External error: io error"
919+
);
920+
}
921+
922+
#[test]
923+
fn external_error_no_recursive() {
924+
let generic_error_1: GenericError =
925+
Box::new(std::io::Error::new(std::io::ErrorKind::Other, "io error"));
926+
let external_error_1: DataFusionError = generic_error_1.into();
927+
let generic_error_2: GenericError = Box::new(external_error_1);
928+
let external_error_2: DataFusionError = generic_error_2.into();
929+
930+
println!("{}", external_error_2);
931+
assert!(external_error_2
932+
.to_string()
933+
.starts_with("External error: io error"));
934+
}
935+
870936
/// Model what happens when implementing SendableRecordBatchStream:
871937
/// DataFusion code needs to return an ArrowError
872938
fn return_arrow_error() -> arrow::error::Result<()> {

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -867,7 +867,7 @@ mod tests {
867867

868868
assert_contains!(
869869
err.to_string(),
870-
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec"
870+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec"
871871
);
872872

873873
Ok(())

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4014,7 +4014,7 @@ mod tests {
40144014
// Asserting that operator-level reservation attempting to overallocate
40154015
assert_contains!(
40164016
err.to_string(),
4017-
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput"
4017+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput"
40184018
);
40194019

40204020
assert_contains!(
@@ -4095,7 +4095,7 @@ mod tests {
40954095
// Asserting that stream-level reservation attempting to overallocate
40964096
assert_contains!(
40974097
err.to_string(),
4098-
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]"
4098+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]"
40994099

41004100
);
41014101

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1514,7 +1514,7 @@ pub(crate) mod tests {
15141514

15151515
assert_contains!(
15161516
err.to_string(),
1517-
"External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
1517+
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]"
15181518
);
15191519
}
15201520

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ impl<T: 'static> OnceFut<T> {
10771077
OnceFutState::Ready(r) => Poll::Ready(
10781078
r.as_ref()
10791079
.map(|r| r.as_ref())
1080-
.map_err(|e| DataFusionError::External(Box::new(Arc::clone(e)))),
1080+
.map_err(DataFusionError::from),
10811081
),
10821082
}
10831083
}
@@ -1091,10 +1091,9 @@ impl<T: 'static> OnceFut<T> {
10911091

10921092
match &self.state {
10931093
OnceFutState::Pending(_) => unreachable!(),
1094-
OnceFutState::Ready(r) => Poll::Ready(
1095-
r.clone()
1096-
.map_err(|e| DataFusionError::External(Box::new(e))),
1097-
),
1094+
OnceFutState::Ready(r) => {
1095+
Poll::Ready(r.clone().map_err(DataFusionError::Shared))
1096+
}
10981097
}
10991098
}
11001099
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,11 +911,12 @@ impl RepartitionExec {
911911
}
912912
// Error from running input task
913913
Ok(Err(e)) => {
914+
// send the same Arc'd error to all output partitions
914915
let e = Arc::new(e);
915916

916917
for (_, tx) in txs {
917918
// wrap it because need to send error to all output partitions
918-
let err = Err(DataFusionError::External(Box::new(Arc::clone(&e))));
919+
let err = Err(DataFusionError::from(&e));
919920
tx.send(Some(err)).await.ok();
920921
}
921922
}

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ statement error DataFusion error: Error during planning: Failed to coerce argume
142142
SELECT approx_percentile_cont_with_weight(c3, c2, c1) FROM aggregate_test_100
143143

144144
# csv_query_approx_percentile_cont_with_histogram_bins
145-
statement error DataFusion error: External error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\.
145+
statement error DataFusion error: This feature is not implemented: Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal \(got data type Int64\)\.
146146
SELECT c1, approx_percentile_cont(c3, 0.95, -1000) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1
147147

148148
statement error DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'approx_percentile_cont' function: coercion from \[Int16, Float64, Utf8\] to the signature OneOf(.*) failed(.|\n)*

datafusion/sqllogictest/test_files/errors.slt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,3 +161,12 @@ create table records (timestamp timestamp, value float) as values (
161161
'2021-01-01 00:00:00', 1.0,
162162
'2021-01-01 00:00:00', 2.0
163163
);
164+
165+
statement ok
166+
CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER);
167+
168+
statement ok
169+
INSERT INTO tab0 VALUES(83,0,38);
170+
171+
query error DataFusion error: Arrow error: Divide by zero error
172+
SELECT DISTINCT - 84 FROM tab0 AS cor0 WHERE NOT + 96 / + col1 <= NULL GROUP BY col1, col0;

0 commit comments

Comments
 (0)