|
19 | 19 |
|
20 | 20 | use std::cmp::Ordering;
|
21 | 21 | use std::collections::{BTreeSet, HashSet};
|
22 |
| -use std::ops::Deref; |
23 | 22 | use std::sync::Arc;
|
24 | 23 |
|
25 | 24 | use crate::expr::{Alias, Sort, WildcardOptions, WindowFunction, WindowFunctionParams};
|
@@ -696,165 +695,11 @@ pub fn exprlist_to_fields<'a>(
|
696 | 695 | plan: &LogicalPlan,
|
697 | 696 | ) -> Result<Vec<(Option<TableReference>, Arc<Field>)>> {
|
698 | 697 | // Look for exact match in plan's output schema
|
699 |
| - let wildcard_schema = find_base_plan(plan).schema(); |
700 | 698 | let input_schema = plan.schema();
|
701 |
| - let result = exprs |
702 |
| - .into_iter() |
703 |
| - .map(|e| match e { |
704 |
| - #[expect(deprecated)] |
705 |
| - Expr::Wildcard { qualifier, options } => match qualifier { |
706 |
| - None => { |
707 |
| - let mut excluded = exclude_using_columns(plan)?; |
708 |
| - excluded.extend(get_excluded_columns( |
709 |
| - options.exclude.as_ref(), |
710 |
| - options.except.as_ref(), |
711 |
| - wildcard_schema, |
712 |
| - None, |
713 |
| - )?); |
714 |
| - Ok(wildcard_schema |
715 |
| - .iter() |
716 |
| - .filter(|(q, f)| { |
717 |
| - !excluded.contains(&Column::new(q.cloned(), f.name())) |
718 |
| - }) |
719 |
| - .map(|(q, f)| (q.cloned(), Arc::clone(f))) |
720 |
| - .collect::<Vec<_>>()) |
721 |
| - } |
722 |
| - Some(qualifier) => { |
723 |
| - let excluded: Vec<String> = get_excluded_columns( |
724 |
| - options.exclude.as_ref(), |
725 |
| - options.except.as_ref(), |
726 |
| - wildcard_schema, |
727 |
| - Some(qualifier), |
728 |
| - )? |
729 |
| - .into_iter() |
730 |
| - .map(|c| c.flat_name()) |
731 |
| - .collect(); |
732 |
| - Ok(wildcard_schema |
733 |
| - .fields_with_qualified(qualifier) |
734 |
| - .into_iter() |
735 |
| - .filter_map(|field| { |
736 |
| - let flat_name = format!("{}.{}", qualifier, field.name()); |
737 |
| - if excluded.contains(&flat_name) { |
738 |
| - None |
739 |
| - } else { |
740 |
| - Some(( |
741 |
| - Some(qualifier.clone()), |
742 |
| - Arc::new(field.to_owned()), |
743 |
| - )) |
744 |
| - } |
745 |
| - }) |
746 |
| - .collect::<Vec<_>>()) |
747 |
| - } |
748 |
| - }, |
749 |
| - _ => Ok(vec![e.to_field(input_schema)?]), |
750 |
| - }) |
751 |
| - .collect::<Result<Vec<_>>>()? |
752 |
| - .into_iter() |
753 |
| - .flatten() |
754 |
| - .collect(); |
755 |
| - Ok(result) |
756 |
| -} |
757 |
| - |
758 |
| -/// Find the suitable base plan to expand the wildcard expression recursively. |
759 |
| -/// When planning [LogicalPlan::Window] and [LogicalPlan::Aggregate], we will generate |
760 |
| -/// an intermediate plan based on the relation plan (e.g. [LogicalPlan::TableScan], [LogicalPlan::Subquery], ...). |
761 |
| -/// If we expand a wildcard expression basing the intermediate plan, we could get some duplicate fields. |
762 |
| -pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan { |
763 |
| - match input { |
764 |
| - LogicalPlan::Window(window) => find_base_plan(&window.input), |
765 |
| - LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input), |
766 |
| - // [SqlToRel::try_process_unnest] will convert Expr(Unnest(Expr)) to Projection/Unnest/Projection |
767 |
| - // We should expand the wildcard expression based on the input plan of the inner Projection. |
768 |
| - LogicalPlan::Unnest(unnest) => { |
769 |
| - if let LogicalPlan::Projection(projection) = unnest.input.deref() { |
770 |
| - find_base_plan(&projection.input) |
771 |
| - } else { |
772 |
| - input |
773 |
| - } |
774 |
| - } |
775 |
| - LogicalPlan::Filter(filter) => { |
776 |
| - if filter.having { |
777 |
| - // If a filter is used for a having clause, its input plan is an aggregation. |
778 |
| - // We should expand the wildcard expression based on the aggregation's input plan. |
779 |
| - find_base_plan(&filter.input) |
780 |
| - } else { |
781 |
| - input |
782 |
| - } |
783 |
| - } |
784 |
| - _ => input, |
785 |
| - } |
786 |
| -} |
787 |
| - |
788 |
| -/// Count the number of real fields. We should expand the wildcard expression to get the actual number. |
789 |
| -pub fn exprlist_len( |
790 |
| - exprs: &[Expr], |
791 |
| - schema: &DFSchemaRef, |
792 |
| - wildcard_schema: Option<&DFSchemaRef>, |
793 |
| -) -> Result<usize> { |
794 | 699 | exprs
|
795 |
| - .iter() |
796 |
| - .map(|e| match e { |
797 |
| - #[expect(deprecated)] |
798 |
| - Expr::Wildcard { |
799 |
| - qualifier: None, |
800 |
| - options, |
801 |
| - } => { |
802 |
| - let excluded = get_excluded_columns( |
803 |
| - options.exclude.as_ref(), |
804 |
| - options.except.as_ref(), |
805 |
| - wildcard_schema.unwrap_or(schema), |
806 |
| - None, |
807 |
| - )? |
808 |
| - .into_iter() |
809 |
| - .collect::<HashSet<Column>>(); |
810 |
| - Ok( |
811 |
| - get_exprs_except_skipped(wildcard_schema.unwrap_or(schema), excluded) |
812 |
| - .len(), |
813 |
| - ) |
814 |
| - } |
815 |
| - #[expect(deprecated)] |
816 |
| - Expr::Wildcard { |
817 |
| - qualifier: Some(qualifier), |
818 |
| - options, |
819 |
| - } => { |
820 |
| - let related_wildcard_schema = wildcard_schema.as_ref().map_or_else( |
821 |
| - || Ok(Arc::clone(schema)), |
822 |
| - |schema| { |
823 |
| - // Eliminate the fields coming from other tables. |
824 |
| - let qualified_fields = schema |
825 |
| - .fields() |
826 |
| - .iter() |
827 |
| - .enumerate() |
828 |
| - .filter_map(|(idx, field)| { |
829 |
| - let (maybe_table_ref, _) = schema.qualified_field(idx); |
830 |
| - if maybe_table_ref.is_none_or(|q| q == qualifier) { |
831 |
| - Some((maybe_table_ref.cloned(), Arc::clone(field))) |
832 |
| - } else { |
833 |
| - None |
834 |
| - } |
835 |
| - }) |
836 |
| - .collect::<Vec<_>>(); |
837 |
| - let metadata = schema.metadata().clone(); |
838 |
| - DFSchema::new_with_metadata(qualified_fields, metadata) |
839 |
| - .map(Arc::new) |
840 |
| - }, |
841 |
| - )?; |
842 |
| - let excluded = get_excluded_columns( |
843 |
| - options.exclude.as_ref(), |
844 |
| - options.except.as_ref(), |
845 |
| - related_wildcard_schema.as_ref(), |
846 |
| - Some(qualifier), |
847 |
| - )? |
848 |
| - .into_iter() |
849 |
| - .collect::<HashSet<Column>>(); |
850 |
| - Ok( |
851 |
| - get_exprs_except_skipped(related_wildcard_schema.as_ref(), excluded) |
852 |
| - .len(), |
853 |
| - ) |
854 |
| - } |
855 |
| - _ => Ok(1), |
856 |
| - }) |
857 |
| - .sum() |
| 700 | + .into_iter() |
| 701 | + .map(|e| e.to_field(input_schema)) |
| 702 | + .collect() |
858 | 703 | }
|
859 | 704 |
|
860 | 705 | /// Convert an expression into Column expression if it's already provided as input plan.
|
|
0 commit comments