Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil

import org.apache.calcite.plan._
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.logical.LogicalCalc
import org.apache.calcite.rel.metadata.RelMdCollation
import org.apache.calcite.rex.RexProgram
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
import org.apache.calcite.sql.SqlKind

import java.util
import java.util.function.Supplier

import scala.collection.JavaConversions._

/**
* Sub-class of [[Calc]] that is a relational expression which computes project expressions and also
* filters in Flink.
Expand All @@ -48,6 +52,113 @@ class FlinkLogicalCalc(
new FlinkLogicalCalc(cluster, traitSet, child, program)
}

override def explainTerms(pw: RelWriter): RelWriter = {
val inputFieldNames = calcProgram.getInputRowType.getFieldNames.toList
val localExprs = calcProgram.getExprList.toList

// Format the where condition with field names if present
val formattedWhere = if (calcProgram.getCondition != null) {
val condition = calcProgram.expandLocalRef(calcProgram.getCondition)
formatFilterCondition(condition, inputFieldNames)
} else {
null
}

pw.input("input", getInput)
.item(
"select",
projectionToString(
org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat(pw),
pw.getDetailLevel))
.itemIf("where", formattedWhere, formattedWhere != null)
}

/**
* Formats a filter condition into a readable string with field names. Converts expressions like
* ">($2, 100)" into "amount > 100"
*/
private def formatFilterCondition(condition: RexNode, fieldNames: List[String]): String = {

condition match {
case call: RexCall if call.getKind == SqlKind.GREATER_THAN =>
// Handle greater than: field > value
formatBinaryCondition(call, fieldNames, " > ")

case call: RexCall if call.getKind == SqlKind.LESS_THAN =>
// Handle less than: field < value
formatBinaryCondition(call, fieldNames, " < ")

case call: RexCall if call.getKind == SqlKind.GREATER_THAN_OR_EQUAL =>
// Handle greater than or equal: field >= value
formatBinaryCondition(call, fieldNames, " >= ")

case call: RexCall if call.getKind == SqlKind.LESS_THAN_OR_EQUAL =>
// Handle less than or equal: field <= value
formatBinaryCondition(call, fieldNames, " <= ")

case call: RexCall if call.getKind == SqlKind.EQUALS =>
// Handle equals: field = value
formatBinaryCondition(call, fieldNames, " = ")

case call: RexCall if call.getKind == SqlKind.NOT_EQUALS =>
// Handle not equals: field <> value
formatBinaryCondition(call, fieldNames, " <> ")

case call: RexCall if call.getKind == SqlKind.AND =>
// Handle composite conditions: cond1 AND cond2
val operands = call.getOperands
val formattedOperands = operands.map {
operand => formatFilterCondition(operand, fieldNames)
}
formattedOperands.mkString(" AND ")

case call: RexCall if call.getKind == SqlKind.OR =>
// Handle OR conditions
val operands = call.getOperands
val formattedOperands = operands.map {
operand => formatFilterCondition(operand, fieldNames)
}
formattedOperands.mkString(" OR ")

case call: RexCall if call.getKind == SqlKind.NOT =>
// Handle NOT condition
val operand = call.getOperands.get(0)
s"NOT ${formatFilterCondition(operand, fieldNames)}"

case _ =>
// Fallback to default formatting for complex expressions
FlinkRexUtil.getExpressionString(condition, fieldNames)
}
}

/** Formats a binary condition (e.g., field > value) */
private def formatBinaryCondition(
call: RexCall,
fieldNames: List[String],
operator: String): String = {
val operands = call.getOperands
if (operands.size == 2) {
val left = formatOperand(operands.get(0), fieldNames)
val right = formatOperand(operands.get(1), fieldNames)
s"$left$operator$right"
} else {
FlinkRexUtil.getExpressionString(call, fieldNames)
}
}

/** Formats a single operand (field reference or literal) */
private def formatOperand(operand: RexNode, fieldNames: List[String]): String = {

operand match {
case inputRef: RexInputRef =>
val index = inputRef.getIndex
fieldNames(index)
case _ =>
// For literals and complex expressions, use default formatting
FlinkRexUtil.getExpressionString(operand, fieldNames)
}
}

}

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.flink.table.planner.plan.nodes.logical

import org.apache.flink.table.planner.JList
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil

import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.convert.ConverterRule.Config
import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rex.RexNode
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SqlKind

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -76,6 +78,94 @@ class FlinkLogicalJoin(
}
}

override def explainTerms(pw: RelWriter): RelWriter = {
val leftFieldCount = getLeft.getRowType.getFieldCount
val rightFieldCount = getRight.getRowType.getFieldCount
val leftFieldNames = getLeft.getRowType.getFieldNames.toList
val rightFieldNames = getRight.getRowType.getFieldNames.toList

// Format join condition with field names
val conditionStr = formatJoinCondition(
getCondition,
leftFieldNames,
rightFieldNames,
leftFieldCount
)

pw.input("left", getLeft)
.input("right", getRight)
.item("Type", joinType.toString)
.item("Condition", conditionStr)
}

/**
* Formats a join condition into a readable string with field names. Converts expressions like
* "=($0, $3)" into "orders.user_id = users.id"
*/
private def formatJoinCondition(
condition: RexNode,
leftFieldNames: List[String],
rightFieldNames: List[String],
leftFieldCount: Int): String = {

condition match {
case call: RexCall if call.getKind == SqlKind.EQUALS =>
// Handle simple equality: field1 = field2
val operands = call.getOperands
if (operands.size == 2) {
val left = formatOperand(operands.get(0), leftFieldNames, rightFieldNames, leftFieldCount)
val right =
formatOperand(operands.get(1), leftFieldNames, rightFieldNames, leftFieldCount)
s"$left = $right"
} else {
FlinkRexUtil.getExpressionString(condition, leftFieldNames ++ rightFieldNames)
}

case call: RexCall if call.getKind == SqlKind.AND =>
// Handle composite conditions: cond1 AND cond2
val operands = call.getOperands
val formattedOperands = operands.map {
operand => formatJoinCondition(operand, leftFieldNames, rightFieldNames, leftFieldCount)
}
formattedOperands.mkString(" AND ")

case call: RexCall if call.getKind == SqlKind.OR =>
// Handle OR conditions
val operands = call.getOperands
val formattedOperands = operands.map {
operand => formatJoinCondition(operand, leftFieldNames, rightFieldNames, leftFieldCount)
}
formattedOperands.mkString(" OR ")

case _ =>
// Fallback to default formatting for complex expressions
FlinkRexUtil.getExpressionString(condition, leftFieldNames ++ rightFieldNames)
}
}

/** Formats a single operand (field reference) with table context. */
private def formatOperand(
operand: RexNode,
leftFieldNames: List[String],
rightFieldNames: List[String],
leftFieldCount: Int): String = {

operand match {
case inputRef: RexInputRef =>
val index = inputRef.getIndex
if (index < leftFieldCount) {
// Field from left table
leftFieldNames(index)
} else {
// Field from right table
rightFieldNames(index - leftFieldCount)
}
case _ =>
// For complex expressions, use default formatting
FlinkRexUtil.getExpressionString(operand, leftFieldNames ++ rightFieldNames)
}
}

}

/** Support all joins. */
Expand Down