Skip to content

Commit 8ee33c1

Browse files
mbasmanovafacebook-github-bot
authored andcommitted
fix: TestIndexSource::initOutputProjections (facebookincubator#13855)
Summary: Pull Request resolved: facebookincubator#13855 TestIndexSource didn't support using column assignments in TableScan plan node to project columns under different names. Also, introduce TestIndexConnectorFactory::registerConnector helper method to reduce boiler plate. Reviewed By: xiaoxmeng Differential Revision: D77173098 fbshipit-source-id: f6cabde2f173506fe4382e83c2360b16256a9270
1 parent aad4438 commit 8ee33c1

File tree

6 files changed

+112
-79
lines changed

6 files changed

+112
-79
lines changed

velox/exec/tests/IndexLookupJoinTest.cpp

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -80,25 +80,14 @@ class IndexLookupJoinTest : public IndexLookupJoinTestBase,
8080
connector::hive::HiveColumnHandle::registerSerDe();
8181
Type::registerSerDe();
8282
core::ITypedExpr::registerSerDe();
83-
connector::registerConnectorFactory(
84-
std::make_shared<TestIndexConnectorFactory>());
85-
std::shared_ptr<connector::Connector> connector =
86-
connector::getConnectorFactory(kTestIndexConnectorName)
87-
->newConnector(
88-
kTestIndexConnectorName,
89-
{},
90-
nullptr,
91-
connectorCpuExecutor_.get());
92-
connector::registerConnector(connector);
83+
TestIndexConnectorFactory::registerConnector(connectorCpuExecutor_.get());
9384

9485
keyType_ = ROW({"u0", "u1", "u2"}, {BIGINT(), BIGINT(), BIGINT()});
9586
valueType_ = ROW({"u3", "u4", "u5"}, {BIGINT(), BIGINT(), VARCHAR()});
9687
tableType_ = concat(keyType_, valueType_);
9788
probeType_ = ROW(
9889
{"t0", "t1", "t2", "t3", "t4", "t5"},
9990
{BIGINT(), BIGINT(), BIGINT(), BIGINT(), ARRAY(BIGINT()), VARCHAR()});
100-
101-
TestIndexTableHandle::registerSerDe();
10291
}
10392

10493
void TearDown() override {
@@ -115,13 +104,25 @@ class IndexLookupJoinTest : public IndexLookupJoinTestBase,
115104

116105
// Makes index table handle with the specified index table and async lookup
117106
// flag.
118-
std::shared_ptr<TestIndexTableHandle> makeIndexTableHandle(
107+
static std::shared_ptr<TestIndexTableHandle> makeIndexTableHandle(
119108
const std::shared_ptr<TestIndexTable>& indexTable,
120109
bool asyncLookup) {
121110
return std::make_shared<TestIndexTableHandle>(
122111
kTestIndexConnectorName, indexTable, asyncLookup);
123112
}
124113

114+
static std::
115+
unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
116+
makeIndexColumnHandles(const std::vector<std::string>& names) {
117+
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
118+
handles;
119+
for (const auto& name : names) {
120+
handles.emplace(name, std::make_shared<TestIndexColumnHandle>(name));
121+
}
122+
123+
return handles;
124+
}
125+
125126
const std::unique_ptr<folly::CPUThreadPoolExecutor> connectorCpuExecutor_{
126127
std::make_unique<folly::CPUThreadPoolExecutor>(128)};
127128
};
@@ -173,8 +174,7 @@ TEST_P(IndexLookupJoinTest, joinCondition) {
173174
TEST_P(IndexLookupJoinTest, planNodeAndSerde) {
174175
TestIndexTableHandle::registerSerDe();
175176

176-
auto indexConnectorHandle = std::make_shared<TestIndexTableHandle>(
177-
kTestIndexConnectorName, nullptr, true);
177+
auto indexConnectorHandle = makeIndexTableHandle(nullptr, true);
178178

179179
auto left = makeRowVector(
180180
{"t0", "t1", "t2", "t3", "t4"},
@@ -201,15 +201,15 @@ TEST_P(IndexLookupJoinTest, planNodeAndSerde) {
201201
auto planBuilder = PlanBuilder();
202202
auto nonIndexTableScan = std::dynamic_pointer_cast<const core::TableScanNode>(
203203
PlanBuilder::TableScanBuilder(planBuilder)
204-
.outputType(std::dynamic_pointer_cast<const RowType>(right->type()))
204+
.outputType(asRowType(right->type()))
205205
.endTableScan()
206206
.planNode());
207207
VELOX_CHECK_NOT_NULL(nonIndexTableScan);
208208

209209
auto indexTableScan = std::dynamic_pointer_cast<const core::TableScanNode>(
210210
PlanBuilder::TableScanBuilder(planBuilder)
211211
.tableHandle(indexConnectorHandle)
212-
.outputType(std::dynamic_pointer_cast<const RowType>(right->type()))
212+
.outputType(asRowType(right->type()))
213213
.endTableScan()
214214
.planNode());
215215
VELOX_CHECK_NOT_NULL(indexTableScan);
@@ -735,13 +735,11 @@ TEST_P(IndexLookupJoinTest, equalJoin) {
735735
const auto indexTableHandle =
736736
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
737737
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
738-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
739-
columnHandles;
740738
const auto indexScanNode = makeIndexScanNode(
741739
planNodeIdGenerator,
742740
indexTableHandle,
743741
makeScanOutputType(testData.scanOutputColumns),
744-
columnHandles);
742+
makeIndexColumnHandles(testData.scanOutputColumns));
745743

746744
auto plan = makeLookupPlan(
747745
planNodeIdGenerator,
@@ -1193,13 +1191,11 @@ TEST_P(IndexLookupJoinTest, betweenJoinCondition) {
11931191
const auto indexTableHandle =
11941192
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
11951193
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1196-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1197-
columnHandles;
11981194
const auto indexScanNode = makeIndexScanNode(
11991195
planNodeIdGenerator,
12001196
indexTableHandle,
12011197
makeScanOutputType(testData.lookupOutputColumns),
1202-
columnHandles);
1198+
makeIndexColumnHandles(testData.lookupOutputColumns));
12031199

12041200
auto plan = makeLookupPlan(
12051201
planNodeIdGenerator,
@@ -1517,13 +1513,11 @@ TEST_P(IndexLookupJoinTest, inJoinCondition) {
15171513
const auto indexTableHandle =
15181514
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
15191515
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1520-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1521-
columnHandles;
15221516
const auto indexScanNode = makeIndexScanNode(
15231517
planNodeIdGenerator,
15241518
indexTableHandle,
15251519
makeScanOutputType(testData.lookupOutputColumns),
1526-
columnHandles);
1520+
makeIndexColumnHandles(testData.lookupOutputColumns));
15271521

15281522
auto plan = makeLookupPlan(
15291523
planNodeIdGenerator,
@@ -1568,13 +1562,11 @@ DEBUG_ONLY_TEST_P(IndexLookupJoinTest, connectorError) {
15681562
const auto indexTableHandle =
15691563
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
15701564
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1571-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1572-
columnHandles;
15731565
const auto indexScanNode = makeIndexScanNode(
15741566
planNodeIdGenerator,
15751567
indexTableHandle,
15761568
makeScanOutputType({"u0", "u1", "u2", "u5"}),
1577-
columnHandles);
1569+
makeIndexColumnHandles({"u0", "u1", "u2", "u5"}));
15781570

15791571
auto plan = makeLookupPlan(
15801572
planNodeIdGenerator,
@@ -1637,13 +1629,11 @@ DEBUG_ONLY_TEST_P(IndexLookupJoinTest, prefetch) {
16371629
const auto indexTableHandle =
16381630
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
16391631
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1640-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1641-
columnHandles;
16421632
const auto indexScanNode = makeIndexScanNode(
16431633
planNodeIdGenerator,
16441634
indexTableHandle,
16451635
makeScanOutputType({"u0", "u1", "u2", "u3", "u5"}),
1646-
columnHandles);
1636+
makeIndexColumnHandles({"u0", "u1", "u2", "u3", "u5"}));
16471637

16481638
auto plan = makeLookupPlan(
16491639
planNodeIdGenerator,
@@ -1739,13 +1729,12 @@ TEST_P(IndexLookupJoinTest, outputBatchSizeWithInnerJoin) {
17391729
const auto indexTableHandle =
17401730
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
17411731
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1742-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1743-
columnHandles;
1732+
17441733
const auto indexScanNode = makeIndexScanNode(
17451734
planNodeIdGenerator,
17461735
indexTableHandle,
17471736
makeScanOutputType({"u0", "u1", "u2", "u5"}),
1748-
columnHandles);
1737+
makeIndexColumnHandles({"u0", "u1", "u2", "u5"}));
17491738

17501739
auto plan = makeLookupPlan(
17511740
planNodeIdGenerator,
@@ -1847,13 +1836,11 @@ TEST_P(IndexLookupJoinTest, outputBatchSizeWithLeftJoin) {
18471836
const auto indexTableHandle =
18481837
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
18491838
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1850-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1851-
columnHandles;
18521839
const auto indexScanNode = makeIndexScanNode(
18531840
planNodeIdGenerator,
18541841
indexTableHandle,
18551842
makeScanOutputType({"u0", "u1", "u2", "u5"}),
1856-
columnHandles);
1843+
makeIndexColumnHandles({"u0", "u1", "u2", "u5"}));
18571844

18581845
auto plan = makeLookupPlan(
18591846
planNodeIdGenerator,
@@ -1919,13 +1906,11 @@ DEBUG_ONLY_TEST_P(IndexLookupJoinTest, runtimeStats) {
19191906
const auto indexTableHandle =
19201907
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
19211908
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
1922-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
1923-
columnHandles;
19241909
const auto indexScanNode = makeIndexScanNode(
19251910
planNodeIdGenerator,
19261911
indexTableHandle,
19271912
makeScanOutputType({"u0", "u1", "u2", "u3", "u5"}),
1928-
columnHandles);
1913+
makeIndexColumnHandles({"u0", "u1", "u2", "u3", "u5"}));
19291914

19301915
auto plan = makeLookupPlan(
19311916
planNodeIdGenerator,
@@ -2009,13 +1994,11 @@ TEST_P(IndexLookupJoinTest, barrier) {
20091994
const auto indexTableHandle =
20101995
makeIndexTableHandle(indexTable, GetParam().asyncLookup);
20111996
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
2012-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
2013-
columnHandles;
20141997
const auto indexScanNode = makeIndexScanNode(
20151998
planNodeIdGenerator,
20161999
indexTableHandle,
20172000
makeScanOutputType({"u0", "u1", "u2", "u3", "u5"}),
2018-
columnHandles);
2001+
makeIndexColumnHandles({"u0", "u1", "u2", "u3", "u5"}));
20192002

20202003
auto plan = makeLookupPlan(
20212004
planNodeIdGenerator,
@@ -2084,13 +2067,11 @@ TEST_P(IndexLookupJoinTest, joinFuzzer) {
20842067
std::random_device rd;
20852068
std::mt19937 g(rd());
20862069
std::shuffle(scanOutput.begin(), scanOutput.end(), g);
2087-
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
2088-
columnHandles;
20892070
const auto indexScanNode = makeIndexScanNode(
20902071
planNodeIdGenerator,
20912072
indexTableHandle,
20922073
makeScanOutputType(scanOutput),
2093-
columnHandles);
2074+
makeIndexColumnHandles(scanOutput));
20942075

20952076
auto plan = makeLookupPlan(
20962077
planNodeIdGenerator,

velox/exec/tests/utils/IndexLookupJoinTestBase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ IndexLookupJoinTestBase::makeIndexScanNode(
283283
const std::shared_ptr<facebook::velox::connector::ConnectorTableHandle>
284284
indexTableHandle,
285285
const facebook::velox::RowTypePtr& outputType,
286-
std::unordered_map<
286+
const std::unordered_map<
287287
std::string,
288288
std::shared_ptr<facebook::velox::connector::ColumnHandle>>&
289289
assignments) {

velox/exec/tests/utils/IndexLookupJoinTestBase.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class IndexLookupJoinTestBase
125125
const std::shared_ptr<facebook::velox::connector::ConnectorTableHandle>
126126
indexTableHandle,
127127
const facebook::velox::RowTypePtr& outputType,
128-
std::unordered_map<
128+
const std::unordered_map<
129129
std::string,
130130
std::shared_ptr<facebook::velox::connector::ColumnHandle>>&
131131
assignments);

velox/exec/tests/utils/TestIndexStorageConnector.cpp

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ TestIndexSource::TestIndexSource(
176176
size_t numEqualJoinKeys,
177177
const core::TypedExprPtr& joinConditionExpr,
178178
const std::shared_ptr<TestIndexTableHandle>& tableHandle,
179+
const std::unordered_map<
180+
std::string,
181+
std::shared_ptr<TestIndexColumnHandle>>& columnHandles,
179182
connector::ConnectorQueryCtx* connectorQueryCtx,
180183
folly::Executor* executor)
181184
: tableHandle_(tableHandle),
@@ -192,7 +195,9 @@ TestIndexSource::TestIndexSource(
192195
: nullptr),
193196
pool_(connectorQueryCtx_->memoryPool()->shared_from_this()),
194197
executor_(executor) {
195-
VELOX_CHECK_NOT_NULL(executor_);
198+
if (tableHandle_->asyncLookup()) {
199+
VELOX_CHECK_NOT_NULL(executor_);
200+
}
196201
VELOX_CHECK_LE(outputType_->size(), valueType_->size() + keyType_->size());
197202
VELOX_CHECK_LE(numEqualJoinKeys_, keyType_->size());
198203
for (int i = 0; i < numEqualJoinKeys_; ++i) {
@@ -202,7 +207,7 @@ TestIndexSource::TestIndexSource(
202207
keyType_->toString(),
203208
inputType_->toString());
204209
}
205-
initOutputProjections();
210+
initOutputProjections(columnHandles);
206211
initConditionProjections();
207212
}
208213

@@ -260,20 +265,25 @@ void TestIndexSource::initConditionProjections() {
260265
conditionInputType_ = ROW(std::move(names), std::move(types));
261266
}
262267

263-
void TestIndexSource::initOutputProjections() {
268+
void TestIndexSource::initOutputProjections(
269+
const std::unordered_map<
270+
std::string,
271+
std::shared_ptr<TestIndexColumnHandle>>& columnHandles) {
264272
VELOX_CHECK(lookupOutputProjections_.empty());
273+
265274
lookupOutputProjections_.reserve(outputType_->size());
266275
for (auto outputChannel = 0; outputChannel < outputType_->size();
267276
++outputChannel) {
268277
const auto outputName = outputType_->nameOf(outputChannel);
269-
if (valueType_->containsChild(outputName)) {
270-
const auto tableValueChannel = valueType_->getChildIdx(outputName);
278+
const auto columnName = columnHandles.at(outputName)->name();
279+
if (valueType_->containsChild(columnName)) {
280+
const auto tableValueChannel = valueType_->getChildIdx(columnName);
271281
// The hash table layout is: index columns, value columns.
272282
lookupOutputProjections_.emplace_back(
273283
keyType_->size() + tableValueChannel, outputChannel);
274284
continue;
275285
}
276-
const auto tableKeyChannel = keyType_->getChildIdx(outputName);
286+
const auto tableKeyChannel = keyType_->getChildIdx(columnName);
277287
lookupOutputProjections_.emplace_back(tableKeyChannel, outputChannel);
278288
}
279289
VELOX_CHECK_EQ(lookupOutputProjections_.size(), outputType_->size());
@@ -547,12 +557,24 @@ std::shared_ptr<connector::IndexSource> TestIndexConnector::createIndexSource(
547557
const auto& indexTable = testIndexTableHandle->indexTable();
548558
auto joinConditionExpr =
549559
toJoinConditionExpr(joinConditions, indexTable, inputType, columnHandles);
560+
561+
std::unordered_map<std::string, std::shared_ptr<TestIndexColumnHandle>>
562+
testColumnHandles;
563+
for (const auto& [name, handle] : columnHandles) {
564+
auto testColumnHandle =
565+
std::dynamic_pointer_cast<TestIndexColumnHandle>(handle);
566+
VELOX_CHECK_NOT_NULL(testColumnHandle);
567+
568+
testColumnHandles.emplace(name, testColumnHandle);
569+
}
570+
550571
return std::make_shared<TestIndexSource>(
551572
inputType,
552573
outputType,
553574
numJoinKeys,
554-
std::move(joinConditionExpr),
555-
std::move(testIndexTableHandle),
575+
joinConditionExpr,
576+
testIndexTableHandle,
577+
testColumnHandles,
556578
connectorQueryCtx,
557579
executor_);
558580
}

0 commit comments

Comments
 (0)