Skip to content

Commit 84c9409

Browse files
authored
Fix stack overflow calculating projected orderings (#12759)
* Fix stack overflow calculating projected orderings * fix docs
1 parent 9b492c6 commit 84c9409

File tree

1 file changed

+121
-41
lines changed

1 file changed

+121
-41
lines changed

datafusion/physical-expr/src/equivalence/properties.rs

Lines changed: 121 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,21 +1295,30 @@ fn construct_prefix_orderings(
12951295
relevant_sort_expr: &PhysicalSortExpr,
12961296
dependency_map: &DependencyMap,
12971297
) -> Vec<LexOrdering> {
1298+
let mut dep_enumerator = DependencyEnumerator::new();
12981299
dependency_map[relevant_sort_expr]
12991300
.dependencies
13001301
.iter()
1301-
.flat_map(|dep| construct_orderings(dep, dependency_map))
1302+
.flat_map(|dep| dep_enumerator.construct_orderings(dep, dependency_map))
13021303
.collect()
13031304
}
13041305

1305-
/// Given a set of relevant dependencies (`relevant_deps`) and a map of dependencies
1306-
/// (`dependency_map`), this function generates all possible prefix orderings
1307-
/// based on the given dependencies.
1306+
/// Generates all possible orderings where dependencies are satisfied for the
1307+
/// current projection expression.
1308+
///
1309+
/// # Examaple
1310+
/// If `dependences` is `a + b ASC` and the dependency map holds dependencies
1311+
/// * `a ASC` --> `[c ASC]`
1312+
/// * `b ASC` --> `[d DESC]`,
1313+
///
1314+
/// This function generates these two sort orders
1315+
/// * `[c ASC, d DESC, a + b ASC]`
1316+
/// * `[d DESC, c ASC, a + b ASC]`
13081317
///
13091318
/// # Parameters
13101319
///
1311-
/// * `dependencies` - A reference to the dependencies.
1312-
/// * `dependency_map` - A reference to the map of dependencies for expressions.
1320+
/// * `dependencies` - Set of relevant expressions.
1321+
/// * `dependency_map` - Map of dependencies for expressions that may appear in `dependencies`
13131322
///
13141323
/// # Returns
13151324
///
@@ -1335,11 +1344,6 @@ fn generate_dependency_orderings(
13351344
return vec![vec![]];
13361345
}
13371346

1338-
// Generate all possible orderings where dependencies are satisfied for the
1339-
// current projection expression. For example, if expression is `a + b ASC`,
1340-
// and the dependency for `a ASC` is `[c ASC]`, the dependency for `b ASC`
1341-
// is `[d DESC]`, then we generate `[c ASC, d DESC, a + b ASC]` and
1342-
// `[d DESC, c ASC, a + b ASC]`.
13431347
relevant_prefixes
13441348
.into_iter()
13451349
.multi_cartesian_product()
@@ -1421,7 +1425,7 @@ struct DependencyNode {
14211425
}
14221426

14231427
impl DependencyNode {
1424-
// Insert dependency to the state (if exists).
1428+
/// Insert dependency to the state (if exists).
14251429
fn insert_dependency(&mut self, dependency: Option<&PhysicalSortExpr>) {
14261430
if let Some(dep) = dependency {
14271431
self.dependencies.insert(dep.clone());
@@ -1437,38 +1441,69 @@ impl DependencyNode {
14371441
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
14381442
type Dependencies = IndexSet<PhysicalSortExpr>;
14391443

1440-
/// This function recursively analyzes the dependencies of the given sort
1441-
/// expression within the given dependency map to construct lexicographical
1442-
/// orderings that include the sort expression and its dependencies.
1443-
///
1444-
/// # Parameters
1445-
///
1446-
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
1447-
/// for which lexicographical orderings satisfying its dependencies are to be
1448-
/// constructed.
1449-
/// - `dependency_map`: A reference to the `DependencyMap` that contains
1450-
/// dependencies for different `PhysicalSortExpr`s.
1451-
///
1452-
/// # Returns
1453-
///
1454-
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
1455-
/// sort expression and its dependencies.
1456-
fn construct_orderings(
1457-
referred_sort_expr: &PhysicalSortExpr,
1458-
dependency_map: &DependencyMap,
1459-
) -> Vec<LexOrdering> {
1460-
// We are sure that `referred_sort_expr` is inside `dependency_map`.
1461-
let node = &dependency_map[referred_sort_expr];
1462-
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
1463-
// exists.
1464-
let target_sort_expr = node.target_sort_expr.clone().unwrap();
1465-
if node.dependencies.is_empty() {
1466-
vec![vec![target_sort_expr]]
1467-
} else {
1444+
/// Contains a mapping of all dependencies we have processed for each sort expr
1445+
struct DependencyEnumerator<'a> {
1446+
/// Maps `expr` --> `[exprs]` that have previously been processed
1447+
seen: IndexMap<&'a PhysicalSortExpr, IndexSet<&'a PhysicalSortExpr>>,
1448+
}
1449+
1450+
impl<'a> DependencyEnumerator<'a> {
1451+
fn new() -> Self {
1452+
Self {
1453+
seen: IndexMap::new(),
1454+
}
1455+
}
1456+
1457+
/// Insert a new dependency,
1458+
///
1459+
/// returns false if the dependency was already in the map
1460+
/// returns true if the dependency was newly inserted
1461+
fn insert(
1462+
&mut self,
1463+
target: &'a PhysicalSortExpr,
1464+
dep: &'a PhysicalSortExpr,
1465+
) -> bool {
1466+
self.seen.entry(target).or_default().insert(dep)
1467+
}
1468+
1469+
/// This function recursively analyzes the dependencies of the given sort
1470+
/// expression within the given dependency map to construct lexicographical
1471+
/// orderings that include the sort expression and its dependencies.
1472+
///
1473+
/// # Parameters
1474+
///
1475+
/// - `referred_sort_expr`: A reference to the sort expression (`PhysicalSortExpr`)
1476+
/// for which lexicographical orderings satisfying its dependencies are to be
1477+
/// constructed.
1478+
/// - `dependency_map`: A reference to the `DependencyMap` that contains
1479+
/// dependencies for different `PhysicalSortExpr`s.
1480+
///
1481+
/// # Returns
1482+
///
1483+
/// A vector of lexicographical orderings (`Vec<LexOrdering>`) based on the given
1484+
/// sort expression and its dependencies.
1485+
fn construct_orderings(
1486+
&mut self,
1487+
referred_sort_expr: &'a PhysicalSortExpr,
1488+
dependency_map: &'a DependencyMap,
1489+
) -> Vec<LexOrdering> {
1490+
// We are sure that `referred_sort_expr` is inside `dependency_map`.
1491+
let node = &dependency_map[referred_sort_expr];
1492+
// Since we work on intermediate nodes, we are sure `val.target_sort_expr`
1493+
// exists.
1494+
let target_sort_expr = node.target_sort_expr.as_ref().unwrap();
1495+
if node.dependencies.is_empty() {
1496+
return vec![vec![target_sort_expr.clone()]];
1497+
};
1498+
14681499
node.dependencies
14691500
.iter()
14701501
.flat_map(|dep| {
1471-
let mut orderings = construct_orderings(dep, dependency_map);
1502+
let mut orderings = if self.insert(target_sort_expr, dep) {
1503+
self.construct_orderings(dep, dependency_map)
1504+
} else {
1505+
vec![]
1506+
};
14721507
for ordering in orderings.iter_mut() {
14731508
ordering.push(target_sort_expr.clone())
14741509
}
@@ -1763,6 +1798,51 @@ mod tests {
17631798
Ok(())
17641799
}
17651800

1801+
#[test]
1802+
fn project_equivalence_properties_test_multi() -> Result<()> {
1803+
// test multiple input orderings with equivalence properties
1804+
let input_schema = Arc::new(Schema::new(vec![
1805+
Field::new("a", DataType::Int64, true),
1806+
Field::new("b", DataType::Int64, true),
1807+
Field::new("c", DataType::Int64, true),
1808+
Field::new("d", DataType::Int64, true),
1809+
]));
1810+
1811+
let mut input_properties = EquivalenceProperties::new(Arc::clone(&input_schema));
1812+
// add equivalent ordering [a, b, c, d]
1813+
input_properties.add_new_ordering(vec![
1814+
parse_sort_expr("a", &input_schema),
1815+
parse_sort_expr("b", &input_schema),
1816+
parse_sort_expr("c", &input_schema),
1817+
parse_sort_expr("d", &input_schema),
1818+
]);
1819+
1820+
// add equivalent ordering [a, c, b, d]
1821+
input_properties.add_new_ordering(vec![
1822+
parse_sort_expr("a", &input_schema),
1823+
parse_sort_expr("c", &input_schema),
1824+
parse_sort_expr("b", &input_schema), // NB b and c are swapped
1825+
parse_sort_expr("d", &input_schema),
1826+
]);
1827+
1828+
// simply project all the columns in order
1829+
let proj_exprs = vec![
1830+
(col("a", &input_schema)?, "a".to_string()),
1831+
(col("b", &input_schema)?, "b".to_string()),
1832+
(col("c", &input_schema)?, "c".to_string()),
1833+
(col("d", &input_schema)?, "d".to_string()),
1834+
];
1835+
let projection_mapping = ProjectionMapping::try_new(&proj_exprs, &input_schema)?;
1836+
let out_properties = input_properties.project(&projection_mapping, input_schema);
1837+
1838+
assert_eq!(
1839+
out_properties.to_string(),
1840+
"order: [[a@0 ASC,c@2 ASC,b@1 ASC,d@3 ASC], [a@0 ASC,b@1 ASC,c@2 ASC,d@3 ASC]]"
1841+
);
1842+
1843+
Ok(())
1844+
}
1845+
17661846
#[test]
17671847
fn test_join_equivalence_properties() -> Result<()> {
17681848
let schema = create_test_schema()?;

0 commit comments

Comments
 (0)