Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 87a515d

Browse files
committed
fixed all from/into bugs
1 parent c82f078 commit 87a515d

File tree

3 files changed

+98
-99
lines changed

3 files changed

+98
-99
lines changed

optd-datafusion-bridge/src/from_optd.rs

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ fn from_optd_schema(optd_schema: &OptdSchema) -> Schema {
5656

5757
impl OptdPlanContext<'_> {
5858
#[async_recursion]
59-
async fn from_optd_table_scan(
59+
async fn conv_from_optd_table_scan(
6060
&mut self,
6161
node: PhysicalScan,
6262
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
@@ -66,12 +66,12 @@ impl OptdPlanContext<'_> {
6666
Ok(plan)
6767
}
6868

69-
fn from_optd_sort_order_expr(
69+
fn conv_from_optd_sort_order_expr(
7070
&mut self,
7171
sort_expr: SortOrderExpr,
7272
context: &SchemaRef,
7373
) -> Result<physical_expr::PhysicalSortExpr> {
74-
let expr = self.from_optd_expr(sort_expr.child(), context)?;
74+
let expr = self.conv_from_optd_expr(sort_expr.child(), context)?;
7575
Ok(physical_expr::PhysicalSortExpr {
7676
expr,
7777
options: match sort_expr.order() {
@@ -87,7 +87,7 @@ impl OptdPlanContext<'_> {
8787
})
8888
}
8989

90-
fn from_optd_agg_expr(
90+
fn conv_from_optd_agg_expr(
9191
&mut self,
9292
expr: Expr,
9393
context: &SchemaRef,
@@ -101,7 +101,7 @@ impl OptdPlanContext<'_> {
101101
.children()
102102
.to_vec()
103103
.into_iter()
104-
.map(|expr| self.from_optd_expr(expr, context))
104+
.map(|expr| self.conv_from_optd_expr(expr, context))
105105
.collect::<Result<Vec<_>>>()?;
106106
Ok(create_aggregate_expr(
107107
&func,
@@ -113,7 +113,7 @@ impl OptdPlanContext<'_> {
113113
)?)
114114
}
115115

116-
fn from_optd_expr(&mut self, expr: Expr, context: &SchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
116+
fn conv_from_optd_expr(&mut self, expr: Expr, context: &SchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
117117
match expr.typ() {
118118
OptRelNodeTyp::ColumnRef => {
119119
let expr = ColumnRefExpr::from_rel_node(expr.into_rel_node()).unwrap();
@@ -147,7 +147,7 @@ impl OptdPlanContext<'_> {
147147
.children()
148148
.to_vec()
149149
.into_iter()
150-
.map(|expr| self.from_optd_expr(expr, context))
150+
.map(|expr| self.conv_from_optd_expr(expr, context))
151151
.collect::<Result<Vec<_>>>()?;
152152
match func {
153153
FuncType::Scalar(func) => {
@@ -175,13 +175,13 @@ impl OptdPlanContext<'_> {
175175
OptRelNodeTyp::LogOp(typ) => {
176176
let expr = LogOpExpr::from_rel_node(expr.into_rel_node()).unwrap();
177177
let mut children = expr.children().to_vec().into_iter();
178-
let first_expr = self.from_optd_expr(children.next().unwrap(), context)?;
178+
let first_expr = self.conv_from_optd_expr(children.next().unwrap(), context)?;
179179
let op = match typ {
180180
LogOpType::And => datafusion::logical_expr::Operator::And,
181181
LogOpType::Or => datafusion::logical_expr::Operator::Or,
182182
};
183183
children.try_fold(first_expr, |acc, expr| {
184-
let expr = self.from_optd_expr(expr, context)?;
184+
let expr = self.conv_from_optd_expr(expr, context)?;
185185
Ok(
186186
Arc::new(datafusion::physical_plan::expressions::BinaryExpr::new(
187187
acc, op, expr,
@@ -191,8 +191,8 @@ impl OptdPlanContext<'_> {
191191
}
192192
OptRelNodeTyp::BinOp(op) => {
193193
let expr = BinOpExpr::from_rel_node(expr.into_rel_node()).unwrap();
194-
let left = self.from_optd_expr(expr.left_child(), context)?;
195-
let right = self.from_optd_expr(expr.right_child(), context)?;
194+
let left = self.conv_from_optd_expr(expr.left_child(), context)?;
195+
let right = self.conv_from_optd_expr(expr.right_child(), context)?;
196196
let op = match op {
197197
BinOpType::Eq => Operator::Eq,
198198
BinOpType::Neq => Operator::NotEq,
@@ -216,19 +216,19 @@ impl OptdPlanContext<'_> {
216216
}
217217

218218
#[async_recursion]
219-
async fn from_optd_projection(
219+
async fn conv_from_optd_projection(
220220
&mut self,
221221
node: PhysicalProjection,
222222
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
223-
let input_exec = self.from_optd_plan_node(node.child()).await?;
223+
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
224224
let physical_exprs = node
225225
.exprs()
226226
.to_vec()
227227
.into_iter()
228228
.enumerate()
229229
.map(|(idx, expr)| {
230230
Ok((
231-
self.from_optd_expr(expr, &input_exec.schema())?,
231+
self.conv_from_optd_expr(expr, &input_exec.schema())?,
232232
format!("col{}", idx),
233233
))
234234
})
@@ -241,12 +241,12 @@ impl OptdPlanContext<'_> {
241241
}
242242

243243
#[async_recursion]
244-
async fn from_optd_filter(
244+
async fn conv_from_optd_filter(
245245
&mut self,
246246
node: PhysicalFilter,
247247
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
248-
let input_exec = self.from_optd_plan_node(node.child()).await?;
249-
let physical_expr = self.from_optd_expr(node.cond(), &input_exec.schema())?;
248+
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
249+
let physical_expr = self.conv_from_optd_expr(node.cond(), &input_exec.schema())?;
250250
Ok(
251251
Arc::new(datafusion::physical_plan::filter::FilterExec::try_new(
252252
physical_expr,
@@ -256,17 +256,17 @@ impl OptdPlanContext<'_> {
256256
}
257257

258258
#[async_recursion]
259-
async fn from_optd_sort(
259+
async fn conv_from_optd_sort(
260260
&mut self,
261261
node: PhysicalSort,
262262
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
263-
let input_exec = self.from_optd_plan_node(node.child()).await?;
263+
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
264264
let physical_exprs = node
265265
.exprs()
266266
.to_vec()
267267
.into_iter()
268268
.map(|expr| {
269-
self.from_optd_sort_order_expr(
269+
self.conv_from_optd_sort_order_expr(
270270
SortOrderExpr::from_rel_node(expr.into_rel_node()).unwrap(),
271271
&input_exec.schema(),
272272
)
@@ -281,24 +281,24 @@ impl OptdPlanContext<'_> {
281281
}
282282

283283
#[async_recursion]
284-
async fn from_optd_agg(
284+
async fn conv_from_optd_agg(
285285
&mut self,
286286
node: PhysicalAgg,
287287
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
288-
let input_exec = self.from_optd_plan_node(node.child()).await?;
288+
let input_exec = self.conv_from_optd_plan_node(node.child()).await?;
289289
let agg_exprs = node
290290
.aggrs()
291291
.to_vec()
292292
.into_iter()
293-
.map(|expr| self.from_optd_agg_expr(expr, &input_exec.schema()))
293+
.map(|expr| self.conv_from_optd_agg_expr(expr, &input_exec.schema()))
294294
.collect::<Result<Vec<_>>>()?;
295295
let group_exprs = node
296296
.groups()
297297
.to_vec()
298298
.into_iter()
299299
.map(|expr| {
300300
Ok((
301-
self.from_optd_expr(expr, &input_exec.schema())?,
301+
self.conv_from_optd_expr(expr, &input_exec.schema())?,
302302
"<agg_expr>".to_string(),
303303
))
304304
})
@@ -320,12 +320,12 @@ impl OptdPlanContext<'_> {
320320
}
321321

322322
#[async_recursion]
323-
async fn from_optd_nested_loop_join(
323+
async fn conv_from_optd_nested_loop_join(
324324
&mut self,
325325
node: PhysicalNestedLoopJoin,
326326
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
327-
let left_exec = self.from_optd_plan_node(node.left()).await?;
328-
let right_exec = self.from_optd_plan_node(node.right()).await?;
327+
let left_exec = self.conv_from_optd_plan_node(node.left()).await?;
328+
let right_exec = self.conv_from_optd_plan_node(node.right()).await?;
329329
let filter_schema = {
330330
let fields = left_exec
331331
.schema()
@@ -337,7 +337,7 @@ impl OptdPlanContext<'_> {
337337
Schema::new_with_metadata(fields, HashMap::new())
338338
};
339339

340-
let physical_expr = self.from_optd_expr(node.cond(), &Arc::new(filter_schema.clone()))?;
340+
let physical_expr = self.conv_from_optd_expr(node.cond(), &Arc::new(filter_schema.clone()))?;
341341

342342
if let JoinType::Cross = node.join_type() {
343343
return Ok(Arc::new(CrossJoinExec::new(left_exec, right_exec))
@@ -375,12 +375,12 @@ impl OptdPlanContext<'_> {
375375
}
376376

377377
#[async_recursion]
378-
async fn from_optd_hash_join(
378+
async fn conv_from_optd_hash_join(
379379
&mut self,
380380
node: PhysicalHashJoin,
381381
) -> Result<Arc<dyn ExecutionPlan + 'static>> {
382-
let left_exec = self.from_optd_plan_node(node.left()).await?;
383-
let right_exec = self.from_optd_plan_node(node.right()).await?;
382+
let left_exec = self.conv_from_optd_plan_node(node.left()).await?;
383+
let right_exec = self.conv_from_optd_plan_node(node.right()).await?;
384384
let join_type = match node.join_type() {
385385
JoinType::Inner => datafusion::logical_expr::JoinType::Inner,
386386
_ => unimplemented!(),
@@ -421,7 +421,7 @@ impl OptdPlanContext<'_> {
421421
}
422422

423423
#[async_recursion]
424-
async fn from_optd_plan_node(&mut self, node: PlanNode) -> Result<Arc<dyn ExecutionPlan>> {
424+
async fn conv_from_optd_plan_node(&mut self, node: PlanNode) -> Result<Arc<dyn ExecutionPlan>> {
425425
let mut schema = OptdSchema(vec![]);
426426
if node.typ() == OptRelNodeTyp::PhysicalEmptyRelation {
427427
schema = node.schema(self.optimizer.unwrap().optd_optimizer());
@@ -430,38 +430,38 @@ impl OptdPlanContext<'_> {
430430
let rel_node_dbg = rel_node.clone();
431431
let result = match &rel_node.typ {
432432
OptRelNodeTyp::PhysicalScan => {
433-
self.from_optd_table_scan(PhysicalScan::from_rel_node(rel_node).unwrap())
433+
self.conv_from_optd_table_scan(PhysicalScan::from_rel_node(rel_node).unwrap())
434434
.await
435435
}
436436
OptRelNodeTyp::PhysicalProjection => {
437-
self.from_optd_projection(PhysicalProjection::from_rel_node(rel_node).unwrap())
437+
self.conv_from_optd_projection(PhysicalProjection::from_rel_node(rel_node).unwrap())
438438
.await
439439
}
440440
OptRelNodeTyp::PhysicalFilter => {
441-
self.from_optd_filter(PhysicalFilter::from_rel_node(rel_node).unwrap())
441+
self.conv_from_optd_filter(PhysicalFilter::from_rel_node(rel_node).unwrap())
442442
.await
443443
}
444444
OptRelNodeTyp::PhysicalSort => {
445-
self.from_optd_sort(PhysicalSort::from_rel_node(rel_node).unwrap())
445+
self.conv_from_optd_sort(PhysicalSort::from_rel_node(rel_node).unwrap())
446446
.await
447447
}
448448
OptRelNodeTyp::PhysicalAgg => {
449-
self.from_optd_agg(PhysicalAgg::from_rel_node(rel_node).unwrap())
449+
self.conv_from_optd_agg(PhysicalAgg::from_rel_node(rel_node).unwrap())
450450
.await
451451
}
452452
OptRelNodeTyp::PhysicalNestedLoopJoin(_) => {
453-
self.from_optd_nested_loop_join(
453+
self.conv_from_optd_nested_loop_join(
454454
PhysicalNestedLoopJoin::from_rel_node(rel_node).unwrap(),
455455
)
456456
.await
457457
}
458458
OptRelNodeTyp::PhysicalHashJoin(_) => {
459-
self.from_optd_hash_join(PhysicalHashJoin::from_rel_node(rel_node).unwrap())
459+
self.conv_from_optd_hash_join(PhysicalHashJoin::from_rel_node(rel_node).unwrap())
460460
.await
461461
}
462462
OptRelNodeTyp::PhysicalCollector(_) => {
463463
let node = PhysicalCollector::from_rel_node(rel_node).unwrap();
464-
let child = self.from_optd_plan_node(node.child()).await?;
464+
let child = self.conv_from_optd_plan_node(node.child()).await?;
465465
Ok(Arc::new(CollectorExec::new(
466466
child,
467467
node.group_id(),
@@ -481,8 +481,8 @@ impl OptdPlanContext<'_> {
481481
result.with_context(|| format!("when processing {}", rel_node_dbg))
482482
}
483483

484-
pub async fn from_optd(&mut self, root_rel: OptRelNodeRef) -> Result<Arc<dyn ExecutionPlan>> {
485-
self.from_optd_plan_node(PlanNode::from_rel_node(root_rel).unwrap())
484+
pub async fn conv_from_optd(&mut self, root_rel: OptRelNodeRef) -> Result<Arc<dyn ExecutionPlan>> {
485+
self.conv_from_optd_plan_node(PlanNode::from_rel_node(root_rel).unwrap())
486486
.await
487487
}
488488
}

0 commit comments

Comments
 (0)