Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()
else
fillColumnsUsingCurrentPartition<false, true>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::Full:
if (parent.has_other_condition)
fillColumnsUsingCurrentPartition<true, false>(columns_left, columns_right, row_counter_column);
else
fillColumnsUsingCurrentPartition<false, false>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::RightAnti:
case ASTTableJoin::Kind::RightOuter:
if (parent.has_other_condition)
Expand Down
116 changes: 56 additions & 60 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,63 @@
#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Debug/MockExecutor/JoinBinder.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>

namespace DB::mock
{
namespace
{
void appendJoinSchema(DAGSchema & output_schema, const DAGSchema & input_schema, bool make_nullable)
{
for (const auto & field : input_schema)
{
if (make_nullable && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
}

void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
appendJoinSchema(schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(tp));
}

void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
appendJoinSchema(schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(tp));
}
}

DAGSchema buildOtherConditionSchema(
const DAGSchema & left_schema,
const DAGSchema & right_schema,
tipb::JoinType join_type)
{
DAGSchema merged_children_schema;
appendJoinSchema(merged_children_schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(join_type));
appendJoinSchema(merged_children_schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(join_type));
return merged_children_schema;
}
} // namespace

void JoinBinder::addRuntimeFilter(MockRuntimeFilter & rf)
{
Expand Down Expand Up @@ -95,22 +147,8 @@ void JoinBinder::columnPrune(std::unordered_set<String> & used_columns)

/// update output schema
output_schema.clear();

for (auto & field : children[0]->output_schema)
{
if (tp == tipb::TypeRightOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}

for (auto & field : children[1]->output_schema)
{
if (tp == tipb::TypeLeftOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
buildLeftSideJoinSchema(output_schema, children[0]->output_schema, tp);
buildRightSideJoinSchema(output_schema, children[1]->output_schema, tp);
}

void JoinBinder::fillJoinKeyAndFieldType(
Expand Down Expand Up @@ -187,11 +225,8 @@ bool JoinBinder::toTiPBExecutor(
astToPB(children[1]->output_schema, expr, cond, collator_id, context);
}

DAGSchema merged_children_schema{children[0]->output_schema};
merged_children_schema.insert(
merged_children_schema.end(),
children[1]->output_schema.begin(),
children[1]->output_schema.end());
DAGSchema merged_children_schema
= buildOtherConditionSchema(children[0]->output_schema, children[1]->output_schema, tp);

for (const auto & expr : other_conds)
{
Expand Down Expand Up @@ -293,45 +328,6 @@ void JoinBinder::toMPPSubPlan(
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

static void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
for (const auto & field : left_schema)
{
if (tp == tipb::JoinType::TypeRightOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}

static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
for (const auto & field : right_schema)
{
if (tp == tipb::JoinType::TypeLeftOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}
}

// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorBinderPtr compileJoin(
size_t & executor_index,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,8 @@ String getJoinTypeName(const tipb::JoinType & tp)
return "LeftOuterJoin";
case tipb::JoinType::TypeRightOuterJoin:
return "RightOuterJoin";
case tipb::JoinType::TypeFullOuterJoin:
return "FullOuterJoin";
case tipb::JoinType::TypeLeftOuterSemiJoin:
return "LeftOuterSemiJoin";
case tipb::JoinType::TypeAntiSemiJoin:
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
/// 3. for non-cross left/right outer join, there is no problem in this swap.
/// 4. for cross left outer join, the build side is always right, needn't and can't swap.
/// 5. for cross right outer join, the build side is always left, so it will always swap and change to cross left outer join.
/// 6. for non-cross full outer join, keep full join kind and respect inner_idx as build side.
/// note that whatever the build side is, we can't support cross-right-outer join now.
static const std::unordered_map<
std::pair<tipb::JoinType, size_t>,
Expand All @@ -72,6 +73,8 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
{{tipb::JoinType::TypeLeftOuterJoin, 1}, {ASTTableJoin::Kind::LeftOuter, 1}},
{{tipb::JoinType::TypeRightOuterJoin, 0}, {ASTTableJoin::Kind::LeftOuter, 0}},
{{tipb::JoinType::TypeRightOuterJoin, 1}, {ASTTableJoin::Kind::RightOuter, 1}},
{{tipb::JoinType::TypeFullOuterJoin, 0}, {ASTTableJoin::Kind::Full, 0}},
{{tipb::JoinType::TypeFullOuterJoin, 1}, {ASTTableJoin::Kind::Full, 1}},
{{tipb::JoinType::TypeSemiJoin, 0}, {ASTTableJoin::Kind::RightSemi, 0}},
{{tipb::JoinType::TypeSemiJoin, 1}, {ASTTableJoin::Kind::Semi, 1}},
{{tipb::JoinType::TypeAntiSemiJoin, 0}, {ASTTableJoin::Kind::RightAnti, 0}},
Expand Down Expand Up @@ -103,6 +106,8 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
{{tipb::JoinType::TypeAntiLeftOuterSemiJoin, 1}, {ASTTableJoin::Kind::NullAware_LeftOuterAnti, 1}}};

RUNTIME_ASSERT(inner_index == 0 || inner_index == 1);
if (unlikely(tipb_join_type == tipb::JoinType::TypeFullOuterJoin && join_keys_size == 0))
throw TiFlashException("Cartesian full outer join is not supported yet", Errors::Coprocessor::BadRequest);
const auto & join_type_map = [is_null_aware, join_keys_size]() {
if (is_null_aware)
{
Expand Down Expand Up @@ -295,8 +300,8 @@ NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
column_set_for_origin_columns.emplace(p.name);
}
};
append_origin_columns(left_cols, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
append_origin_columns(right_cols, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);
append_origin_columns(left_cols, makeLeftJoinSideNullable(join.join_type()));
append_origin_columns(right_cols, makeRightJoinSideNullable(join.join_type()));

/// append the columns generated by probe side prepare join actions.
/// the new columns are
Expand All @@ -310,8 +315,8 @@ NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
columns_for_other_join_filter.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
};
bool make_nullable = build_side_index == 1 ? join.join_type() == tipb::JoinType::TypeRightOuterJoin
: join.join_type() == tipb::JoinType::TypeLeftOuterJoin;
bool make_nullable = build_side_index == 1 ? makeLeftJoinSideNullable(join.join_type())
: makeRightJoinSideNullable(join.join_type());
append_new_columns(probe_prepare_join_actions->getSampleBlock(), make_nullable);

return columns_for_other_join_filter;
Expand All @@ -330,11 +335,11 @@ NamesAndTypes TiFlashJoin::genJoinOutputColumns(
}
};

append_output_columns(left_cols, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
append_output_columns(left_cols, makeLeftJoinSideNullable(join.join_type()));
if (!isSemiFamily() && !isLeftOuterSemiFamily())
{
/// for (left outer) semi join, the columns from right table will be ignored
append_output_columns(right_cols, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);
append_output_columns(right_cols, makeRightJoinSideNullable(join.join_type()));
}

if (!match_helper_name.empty())
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ struct JoinNonEqualConditions
/// Validate this JoinNonEqualConditions and return error message if any.
const char * validate(ASTTableJoin::Kind kind) const
{
if unlikely (!left_filter_column.empty() && !isLeftOuterJoin(kind))
if unlikely (!left_filter_column.empty() && !(isLeftOuterJoin(kind) || kind == ASTTableJoin::Kind::Full))
return "non left join with left conditions";
if unlikely (!right_filter_column.empty() && !isRightOuterJoin(kind))
if unlikely (!right_filter_column.empty() && !(isRightOuterJoin(kind) || kind == ASTTableJoin::Kind::Full))
return "non right join with right conditions";

if unlikely ((!other_cond_name.empty() || !other_eq_cond_from_in_name.empty()) && other_cond_expr == nullptr)
Expand All @@ -119,6 +119,16 @@ struct JoinNonEqualConditions

namespace JoinInterpreterHelper
{
constexpr bool makeLeftJoinSideNullable(tipb::JoinType join_type)
{
return join_type == tipb::JoinType::TypeRightOuterJoin || join_type == tipb::JoinType::TypeFullOuterJoin;
}

constexpr bool makeRightJoinSideNullable(tipb::JoinType join_type)
{
return join_type == tipb::JoinType::TypeLeftOuterJoin || join_type == tipb::JoinType::TypeFullOuterJoin;
}

struct TiFlashJoin
{
TiFlashJoin(const tipb::Join & join_, bool is_test);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -172,9 +173,9 @@ bool collectForJoin(std::vector<tipb::FieldType> & output_field_types, const tip
// collect output_field_types for join self
for (auto & field_type : children_output_field_types[0])
{
if (executor.join().join_type() == tipb::JoinType::TypeRightOuterJoin)
if (JoinInterpreterHelper::makeLeftJoinSideNullable(executor.join().join_type()))
{
/// the type of left column for right join is always nullable
/// the type of left column for right/full join is always nullable
auto updated_field_type = field_type;
updated_field_type.set_flag(
static_cast<UInt32>(updated_field_type.flag()) & (~static_cast<UInt32>(TiDB::ColumnFlagNotNull)));
Expand Down Expand Up @@ -210,9 +211,9 @@ bool collectForJoin(std::vector<tipb::FieldType> & output_field_types, const tip
/// for semi/anti semi join, the right table column is ignored
for (auto & field_type : children_output_field_types[1])
{
if (executor.join().join_type() == tipb::JoinType::TypeLeftOuterJoin)
if (JoinInterpreterHelper::makeRightJoinSideNullable(executor.join().join_type()))
{
/// the type of right column for left join is always nullable
/// the type of right column for left/full join is always nullable
auto updated_field_type = field_type;
updated_field_type.set_flag(
updated_field_type.flag() & (~static_cast<UInt32>(TiDB::ColumnFlagNotNull)));
Expand Down
Loading