Skip to content

Commit be8aeee

Browse files
committed
[HSTACK] Bugfix apache#10978: Inconsistent behavior in HashJoin Projections
1 parent d5ca830 commit be8aeee

File tree

2 files changed

+91
-5
lines changed

2 files changed

+91
-5
lines changed

datafusion/physical-plan/src/joins/hash_join.rs

+26-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ use std::{any::Any, vec};
2626

2727
use super::utils::{
2828
asymmetric_join_output_partitioning, get_final_indices_from_shared_bitmap,
29-
reorder_output_after_swap, swap_join_projection,
29+
project_index_to_exprs, remap_join_projections_join_to_output,
30+
swap_join_projection, swap_reverting_projection,
3031
};
3132
use super::{
3233
utils::{OnceAsync, OnceFut},
@@ -625,7 +626,29 @@ impl HashJoinExec {
625626
{
626627
Ok(Arc::new(new_join))
627628
} else {
628-
reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
629+
// TODO avoid adding ProjectionExec again and again, only adding Final Projection
630+
// ADR: FIXME the projection inside the hash join functionality is not consistent
631+
// see https://github.com/apache/datafusion/commit/afddb321e9a98ffc1947005c38b6b50a6ef2a401
632+
// Failing to do the below code will create a projection exec with a projection that is
633+
// possibly outside the schema.
634+
let actual_projection = if new_join.projection.is_some() {
635+
let tmp = remap_join_projections_join_to_output(
636+
new_join.left().clone(),
637+
new_join.right().clone(),
638+
new_join.join_type(),
639+
new_join.projection.clone(),
640+
)?
641+
.unwrap();
642+
project_index_to_exprs(&tmp, &new_join.schema())
643+
} else {
644+
swap_reverting_projection(&left.schema(), &right.schema())
645+
};
646+
// let swap_proj = swap_reverting_projection(&left.schema(), &right.schema());
647+
648+
let proj = ProjectionExec::try_new(actual_projection, Arc::new(new_join))?;
649+
Ok(Arc::new(proj))
650+
651+
// reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema())
629652
}
630653
}
631654
}
@@ -1655,7 +1678,7 @@ mod tests {
16551678
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
16561679
use datafusion_expr::Operator;
16571680
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
1658-
use datafusion_physical_expr::PhysicalExpr;
1681+
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
16591682
use hashbrown::HashTable;
16601683
use rstest::*;
16611684
use rstest_reuse::*;

datafusion/physical-plan/src/joins/utils.rs

+65-2
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ use arrow::compute;
4141
use arrow::datatypes::{
4242
ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type,
4343
};
44+
use arrow_schema::SchemaRef;
4445
use datafusion_common::cast::as_boolean_array;
4546
use datafusion_common::stats::Precision;
4647
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
4748
use datafusion_common::{
4849
plan_err, DataFusionError, JoinSide, JoinType, Result, SharedResult,
4950
};
5051
use datafusion_expr::interval_arithmetic::Interval;
51-
use datafusion_physical_expr::equivalence::add_offset_to_expr;
52+
use datafusion_physical_expr::equivalence::{add_offset_to_expr, ProjectionMapping};
5253
use datafusion_physical_expr::expressions::Column;
5354
use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
5455
use datafusion_physical_expr::{
@@ -62,6 +63,7 @@ use crate::projection::ProjectionExec;
6263
use futures::future::{BoxFuture, Shared};
6364
use futures::{ready, FutureExt};
6465
use parking_lot::Mutex;
66+
use crate::common::can_project;
6567

6668
/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value.
6769
///
@@ -649,6 +651,66 @@ pub fn build_join_schema(
649651
(fields.finish().with_metadata(metadata), column_indices)
650652
}
651653

654+
/// This assumes that the projections are relative to the join schema.
655+
/// We need to redo them to point to the actual hash join output schema
656+
pub fn remap_join_projections_join_to_output(
657+
left: Arc<dyn ExecutionPlan>,
658+
right: Arc<dyn ExecutionPlan>,
659+
join_type: &JoinType,
660+
projection: Option<Vec<usize>>,
661+
) -> Result<Option<Vec<usize>>> {
662+
match projection {
663+
Some(ref projection) => {
664+
let (join_schema, _) = build_join_schema(
665+
left.schema().as_ref(),
666+
right.schema().as_ref(),
667+
join_type
668+
);
669+
670+
let join_schema = Arc::new(join_schema);
671+
can_project(&join_schema, Some(projection.clone()).as_ref())?;
672+
673+
let projection_exprs = project_index_to_exprs(
674+
&projection.clone(),
675+
&join_schema
676+
);
677+
let projection_mapping =
678+
ProjectionMapping::try_new(&projection_exprs, &join_schema)?;
679+
680+
// projection mapping contains from and to, get the second one
681+
let dest_physical_exprs = projection_mapping.map.iter().map(|(_, t)| t.clone()).collect::<Vec<_>>();
682+
let dest_columns = dest_physical_exprs.iter().map(|pe| pe.as_any().downcast_ref::<Column>()).collect::<Vec<_>>();
683+
let output = dest_physical_exprs.iter().enumerate().map(|(idx, _)| {
684+
// :Vec<(Arc<dyn PhysicalExpr>, String)>
685+
// (pe.clone(), dest_column.name().to_owned())
686+
let dest_column = dest_columns.get(idx).unwrap().unwrap();
687+
dest_column.index()
688+
}).collect::<Vec<_>>();
689+
Ok(Some(output))
690+
},
691+
None => Ok(None)
692+
}
693+
}
694+
695+
pub fn project_index_to_exprs(
696+
projection_index: &[usize],
697+
schema: &SchemaRef,
698+
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
699+
projection_index
700+
.iter()
701+
.map(|index| {
702+
let field = schema.field(*index);
703+
(
704+
Arc::new(Column::new(
705+
field.name(),
706+
*index,
707+
)) as Arc<dyn PhysicalExpr>,
708+
field.name().to_owned(),
709+
)
710+
})
711+
.collect::<Vec<_>>()
712+
}
713+
652714
/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
653715
/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
654716
/// same computation.
@@ -1755,6 +1817,7 @@ pub(crate) fn reorder_output_after_swap(
17551817
left_schema: &Schema,
17561818
right_schema: &Schema,
17571819
) -> Result<Arc<dyn ExecutionPlan>> {
1820+
//////////////////////
17581821
let proj = ProjectionExec::try_new(
17591822
swap_reverting_projection(left_schema, right_schema),
17601823
plan,
@@ -1767,7 +1830,7 @@ pub(crate) fn reorder_output_after_swap(
17671830
///
17681831
/// Returns the expressions that will allow to swap back the values from the
17691832
/// original left as the first columns and those on the right next.
1770-
fn swap_reverting_projection(
1833+
pub fn swap_reverting_projection(
17711834
left_schema: &Schema,
17721835
right_schema: &Schema,
17731836
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {

0 commit comments

Comments
 (0)