|
16 | 16 | // under the License.
|
17 | 17 |
|
18 | 18 | use datafusion_common::{internal_err, not_impl_err, plan_err, DataFusionError, Result};
|
19 |
| -use datafusion_expr::{expr::Alias, Expr, JoinConstraint, JoinType, LogicalPlan}; |
| 19 | +use datafusion_expr::{ |
| 20 | + expr::Alias, Distinct, Expr, JoinConstraint, JoinType, LogicalPlan, |
| 21 | +}; |
20 | 22 | use sqlparser::ast::{self, SetExpr};
|
21 | 23 |
|
22 | 24 | use crate::unparser::utils::unproject_agg_exprs;
|
@@ -271,8 +273,39 @@ impl Unparser<'_> {
|
271 | 273 | relation,
|
272 | 274 | )
|
273 | 275 | }
|
274 |
| - LogicalPlan::Distinct(_distinct) => { |
275 |
| - not_impl_err!("Unsupported operator: {plan:?}") |
| 276 | + LogicalPlan::Distinct(distinct) => { |
| 277 | + let (select_distinct, input) = match distinct { |
| 278 | + Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()), |
| 279 | + Distinct::On(on) => { |
| 280 | + let exprs = on |
| 281 | + .on_expr |
| 282 | + .iter() |
| 283 | + .map(|e| self.expr_to_sql(e)) |
| 284 | + .collect::<Result<Vec<_>>>()?; |
| 285 | + let items = on |
| 286 | + .select_expr |
| 287 | + .iter() |
| 288 | + .map(|e| self.select_item_to_sql(e)) |
| 289 | + .collect::<Result<Vec<_>>>()?; |
| 290 | + match &on.sort_expr { |
| 291 | + Some(sort_expr) => { |
| 292 | + if let Some(query_ref) = query { |
| 293 | + query_ref |
| 294 | + .order_by(self.sort_to_sql(sort_expr.clone())?); |
| 295 | + } else { |
| 296 | + return internal_err!( |
| 297 | + "Sort operator only valid in a statement context." |
| 298 | + ); |
| 299 | + } |
| 300 | + } |
| 301 | + None => {} |
| 302 | + } |
| 303 | + select.projection(items); |
| 304 | + (ast::Distinct::On(exprs), on.input.as_ref()) |
| 305 | + } |
| 306 | + }; |
| 307 | + select.distinct(Some(select_distinct)); |
| 308 | + self.select_to_sql_recursively(input, query, select, relation) |
276 | 309 | }
|
277 | 310 | LogicalPlan::Join(join) => {
|
278 | 311 | match join.join_constraint {
|
|
0 commit comments