Skip to content

Commit 4d4e4ad

Browse files
authored
Merge pull request mariadb-corporation#2741 from mariadb-corporation/MDEV-25080-CS-dev
MDEV-25080 Allow pushdown of queries involving UNIONs in outer select to ColumnStore
2 parents b267186 + 8bf545b commit 4d4e4ad

21 files changed

+985
-198
lines changed

.drone.jsonnet

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
local events = ['pull_request', 'cron'];
22

33
local servers = {
4-
develop: ['10.6-enterprise'],
5-
'develop-22.08': ['10.6-enterprise'],
4+
develop: ['10.6-MENT-1667'],
5+
'develop-22.08': ['10.6-MENT-1667'],
66
};
77

88
local platforms = {
@@ -108,7 +108,7 @@ local testPreparation(platform) =
108108
platform_map[platform];
109109

110110

111-
local Pipeline(branch, platform, event, arch='amd64', server='10.6-enterprise') = {
111+
local Pipeline(branch, platform, event, arch='amd64', server='10.6-MENT-1667') = {
112112
local pkg_format = if (std.split(platform, ':')[0] == 'centos' || std.split(platform, ':')[0] == 'rockylinux') then 'rpm' else 'deb',
113113
local init = if (pkg_format == 'rpm') then '/usr/lib/systemd/systemd' else 'systemd',
114114
local mtr_path = if (pkg_format == 'rpm') then '/usr/share/mysql-test' else '/usr/share/mysql/mysql-test',
@@ -125,7 +125,7 @@ local Pipeline(branch, platform, event, arch='amd64', server='10.6-enterprise')
125125
local container_tags = if (event == 'cron') then [brancht + std.strReplace(event, '_', '-') + '${DRONE_BUILD_NUMBER}', brancht] else [brancht + std.strReplace(event, '_', '-') + '${DRONE_BUILD_NUMBER}'],
126126
local container_version = branchp + event + '/${DRONE_BUILD_NUMBER}/' + server + '/' + arch,
127127

128-
local server_remote = if (std.endsWith(server, 'enterprise')) then 'https://github.com/mariadb-corporation/MariaDBEnterprise' else 'https://github.com/MariaDB/server',
128+
local server_remote = if (std.endsWith(server, 'enterprise') || std.endsWith(server, '10.6-MENT-1667')) then 'https://github.com/mariadb-corporation/MariaDBEnterprise' else 'https://github.com/MariaDB/server',
129129

130130
local sccache_arch = if (arch == 'amd64') then 'x86_64' else 'aarch64',
131131
local get_sccache = 'curl -L -o sccache.tar.gz https://github.com/mozilla/sccache/releases/download/v0.3.0/sccache-v0.3.0-' + sccache_arch + '-unknown-linux-musl.tar.gz ' +
@@ -693,8 +693,8 @@ local FinalPipeline(branch, event) = {
693693
'failure',
694694
],
695695
} + (if event == 'cron' then { cron: ['nightly-' + std.strReplace(branch, '.', '-')] } else {}),
696-
depends_on: std.map(function(p) std.join(' ', [branch, p, event, 'amd64', '10.6-enterprise']), platforms.develop) +
697-
std.map(function(p) std.join(' ', [branch, p, event, 'arm64', '10.6-enterprise']), platforms_arm.develop),
696+
depends_on: std.map(function(p) std.join(' ', [branch, p, event, 'amd64', '10.6-MENT-1667']), platforms.develop) +
697+
std.map(function(p) std.join(' ', [branch, p, event, 'arm64', '10.6-MENT-1667']), platforms_arm.develop),
698698
};
699699

700700

@@ -719,10 +719,10 @@ local FinalPipeline(branch, event) = {
719719
] +
720720

721721
[
722-
Pipeline(any_branch, p, 'custom', 'amd64', '10.6-enterprise')
722+
Pipeline(any_branch, p, 'custom', 'amd64', '10.6-MENT-1667')
723723
for p in platforms_custom
724724
] +
725725
[
726-
Pipeline(any_branch, p, 'custom', 'arm64', '10.6-enterprise')
726+
Pipeline(any_branch, p, 'custom', 'arm64', '10.6-MENT-1667')
727727
for p in platforms_arm_custom
728728
]

dbcon/execplan/calpontsystemcatalog.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6393,12 +6393,13 @@ boost::any CalpontSystemCatalog::ColType::convertColumnData(const std::string& d
63936393
}
63946394

63956395
CalpontSystemCatalog::ColType CalpontSystemCatalog::ColType::convertUnionColType(
6396-
vector<CalpontSystemCatalog::ColType>& types)
6396+
vector<CalpontSystemCatalog::ColType>& types,
6397+
unsigned int& rc)
63976398
{
63986399
idbassert(types.size());
63996400
CalpontSystemCatalog::ColType unionedType = types[0];
64006401
for (uint64_t i = 1; i < types.size(); i++)
6401-
dataconvert::DataConvert::joinColTypeForUnion(unionedType, types[i]);
6402+
dataconvert::DataConvert::joinColTypeForUnion(unionedType, types[i], rc);
64026403
return unionedType;
64036404
}
64046405

dbcon/execplan/calpontsystemcatalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ class CalpontSystemCatalog : public datatypes::SystemCatalog
306306
return !(*this == t);
307307
}
308308

309-
static ColType convertUnionColType(std::vector<ColType>&);
309+
static ColType convertUnionColType(std::vector<ColType>&, unsigned int&);
310310
};
311311

312312
/** the structure of a table infomation

dbcon/joblist/jlf_tuplejoblist.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5102,11 +5102,18 @@ SJSTEP unionQueries(JobStepVector& queries, uint64_t distinctUnionNum, JobInfo&
51025102
unionStep->inputAssociation(jsaToUnion);
51035103
unionStep->outputAssociation(jsa);
51045104

5105+
// This return code in the call to convertUnionColType() below would
5106+
// always be 0. This is because convertUnionColType() is also called
5107+
// in the connector code in getSelectPlan()/getGroupPlan() which handle
5108+
// the non-zero return code scenarios from this function call and error
5109+
// out, in which case, the execution does not even get to ExeMgr.
5110+
unsigned int dummyUnionedTypeRc = 0;
5111+
51055112
// get unioned column types
51065113
for (uint64_t j = 0; j < colCount; ++j)
51075114
{
51085115
CalpontSystemCatalog::ColType colType =
5109-
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j]);
5116+
CalpontSystemCatalog::ColType::convertUnionColType(queryColTypes[j], dummyUnionedTypeRc);
51105117
types.push_back(colType.colDataType);
51115118
csNums.push_back(colType.charsetNumber);
51125119
scale.push_back(colType.scale);

dbcon/joblist/tupleunion.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1766,12 +1766,14 @@ void TupleUnion::writeNull(Row* out, uint32_t col)
17661766
{
17671767
case 1: out->setUintField<1>(joblist::TINYINTNULL, col); break;
17681768

1769-
case 2: out->setUintField<1>(joblist::SMALLINTNULL, col); break;
1769+
case 2: out->setUintField<2>(joblist::SMALLINTNULL, col); break;
17701770

17711771
case 4: out->setUintField<4>(joblist::INTNULL, col); break;
17721772

17731773
case 8: out->setUintField<8>(joblist::BIGINTNULL, col); break;
17741774

1775+
case 16: out->setInt128Field(datatypes::Decimal128Null, col); break;
1776+
17751777
default:
17761778
{
17771779
}

dbcon/mysql/ha_mcs.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ group_by_handler* create_columnstore_group_by_handler(THD* thd, Query* query);
4545

4646
derived_handler* create_columnstore_derived_handler(THD* thd, TABLE_LIST* derived);
4747

48-
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* sel);
48+
select_handler* create_columnstore_select_handler(THD* thd, SELECT_LEX* sel_lex, SELECT_LEX_UNIT* sel_unit);
49+
select_handler* create_columnstore_unit_handler(THD* thd, SELECT_LEX_UNIT* sel_unit);
4950

5051
/* Variables for example share methods */
5152

@@ -1835,6 +1836,7 @@ static int columnstore_init_func(void* p)
18351836
mcs_hton->create_group_by = create_columnstore_group_by_handler;
18361837
mcs_hton->create_derived = create_columnstore_derived_handler;
18371838
mcs_hton->create_select = create_columnstore_select_handler;
1839+
mcs_hton->create_unit = create_columnstore_unit_handler;
18381840
mcs_hton->db_type = DB_TYPE_AUTOASSIGN;
18391841

18401842
#ifdef HAVE_PSI_INTERFACE

dbcon/mysql/ha_mcs_execplan.cpp

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6682,7 +6682,8 @@ void setExecutionParams(gp_walk_info& gwi, SCSEP& csep)
66826682
* RETURNS
66836683
* error id as an int
66846684
***********************************************************/
6685-
int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep)
6685+
int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP& csep,
6686+
bool isSelectHandlerTop, bool isSelectLexUnit)
66866687
{
66876688
// populate table map and trigger syscolumn cache for all the tables (@bug 1637).
66886689
// all tables on FROM list must have at least one col in colmap
@@ -6819,9 +6820,9 @@ int processFrom(bool& isUnion, SELECT_LEX& select_lex, gp_walk_info& gwi, SCSEP&
68196820
bool unionSel = false;
68206821
// UNION master unit check
68216822
// Existed pushdown handlers won't get in this scope
6822-
// except UNION pushdown that is to come.
6823+
// MDEV-25080 Union pushdown would enter this scope
68236824
// is_unit_op() give a segv for derived_handler's SELECT_LEX
6824-
if (!isUnion && select_lex.master_unit()->is_unit_op())
6825+
if (!isUnion && (!isSelectHandlerTop || isSelectLexUnit) && select_lex.master_unit()->is_unit_op())
68256826
{
68266827
// MCOL-2178 isUnion member only assigned, never used
68276828
// MIGR::infinidb_vtable.isUnion = true;
@@ -7383,7 +7384,8 @@ void buildInToExistsFilter(gp_walk_info& gwi, SELECT_LEX& select_lex)
73837384
* error id as an int
73847385
***********************************************************/
73857386
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool isUnion,
7386-
bool isSelectHandlerTop, const std::vector<COND*>& condStack)
7387+
bool isSelectHandlerTop, bool isSelectLexUnit,
7388+
const std::vector<COND*>& condStack)
73877389
{
73887390
#ifdef DEBUG_WALK_COND
73897391
cerr << "getSelectPlan()" << endl;
@@ -7411,13 +7413,12 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
74117413
CalpontSelectExecutionPlan::SelectList derivedTbList;
74127414
// @bug 1796. Remember table order on the FROM list.
74137415
gwi.clauseType = FROM;
7414-
if ((rc = processFrom(isUnion, select_lex, gwi, csep)))
7416+
if ((rc = processFrom(isUnion, select_lex, gwi, csep, isSelectHandlerTop,
7417+
isSelectLexUnit)))
74157418
{
74167419
return rc;
74177420
}
74187421

7419-
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
7420-
74217422
gwi.clauseType = WHERE;
74227423
if ((rc = processWhere(select_lex, gwi, csep, condStack)))
74237424
{
@@ -7860,25 +7861,32 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
78607861
// @bug4388 normalize the project coltypes for union main select list
78617862
if (!csep->unionVec().empty())
78627863
{
7864+
unsigned int unionedTypeRc = 0;
7865+
78637866
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
78647867
{
78657868
vector<CalpontSystemCatalog::ColType> coltypes;
78667869

78677870
for (uint32_t j = 0; j < csep->unionVec().size(); j++)
78687871
{
7869-
coltypes.push_back(dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
7870-
->returnedCols()[i]
7871-
->resultType());
7872+
CalpontSelectExecutionPlan* unionCsep =
7873+
dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get());
7874+
coltypes.push_back(unionCsep->returnedCols()[i]->resultType());
78727875

78737876
// @bug5976. set hasAggregate true for the main column if
78747877
// one corresponding union column has aggregate
7875-
if (dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
7876-
->returnedCols()[i]
7877-
->hasAggregate())
7878+
if (unionCsep->returnedCols()[i]->hasAggregate())
78787879
gwi.returnedCols[i]->hasAggregate(true);
78797880
}
78807881

7881-
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes));
7882+
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
7883+
7884+
if (unionedTypeRc != 0)
7885+
{
7886+
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(unionedTypeRc);
7887+
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
7888+
return ER_CHECK_NOT_IMPLEMENTED;
7889+
}
78827890
}
78837891
}
78847892

@@ -8047,6 +8055,8 @@ int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, bool i
80478055

80488056
SRCP minSc; // min width projected column. for count(*) use
80498057

8058+
bool unionSel = (!isUnion && select_lex.master_unit()->is_unit_op()) ? true : false;
8059+
80508060
// Group by list. not valid for union main query
80518061
if (!unionSel)
80528062
{
@@ -8805,9 +8815,10 @@ int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* thd, SCSEP
88058815
return 0;
88068816
}
88078817

8808-
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& csep, gp_walk_info& gwi)
8818+
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP& csep, gp_walk_info& gwi,
8819+
bool isSelectLexUnit)
88098820
{
8810-
SELECT_LEX& select_lex = *handler->select;
8821+
SELECT_LEX& select_lex = handler->select_lex ? *handler->select_lex : *handler->lex_unit->first_select();
88118822

88128823
if (select_lex.where)
88138824
{
@@ -8819,7 +8830,7 @@ int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, SCSEP&
88198830
convertOuterJoinToInnerJoin(&select_lex.top_join_list, gwi.tableOnExprList, gwi.condList,
88208831
handler->tableOuterJoinMap);
88218832

8822-
int status = getSelectPlan(gwi, select_lex, csep, false, true);
8833+
int status = getSelectPlan(gwi, select_lex, csep, false, true, isSelectLexUnit);
88238834

88248835
if (status > 0)
88258836
return ER_INTERNAL_ERROR;
@@ -9683,25 +9694,32 @@ int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, SCSEP& csep, cal_gro
96839694
// @bug4388 normalize the project coltypes for union main select list
96849695
if (!csep->unionVec().empty())
96859696
{
9697+
unsigned int unionedTypeRc = 0;
9698+
96869699
for (uint32_t i = 0; i < gwi.returnedCols.size(); i++)
96879700
{
96889701
vector<CalpontSystemCatalog::ColType> coltypes;
96899702

96909703
for (uint32_t j = 0; j < csep->unionVec().size(); j++)
96919704
{
9692-
coltypes.push_back(dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
9693-
->returnedCols()[i]
9694-
->resultType());
9705+
CalpontSelectExecutionPlan* unionCsep =
9706+
dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get());
9707+
coltypes.push_back(unionCsep->returnedCols()[i]->resultType());
96959708

96969709
// @bug5976. set hasAggregate true for the main column if
96979710
// one corresponding union column has aggregate
9698-
if (dynamic_cast<CalpontSelectExecutionPlan*>(csep->unionVec()[j].get())
9699-
->returnedCols()[i]
9700-
->hasAggregate())
9711+
if (unionCsep->returnedCols()[i]->hasAggregate())
97019712
gwi.returnedCols[i]->hasAggregate(true);
97029713
}
97039714

9704-
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes));
9715+
gwi.returnedCols[i]->resultType(CalpontSystemCatalog::ColType::convertUnionColType(coltypes, unionedTypeRc));
9716+
9717+
if (unionedTypeRc != 0)
9718+
{
9719+
gwi.parseErrorText = IDBErrorInfo::instance()->errorMsg(unionedTypeRc);
9720+
setError(gwi.thd, ER_CHECK_NOT_IMPLEMENTED, gwi.parseErrorText, gwi);
9721+
return ER_CHECK_NOT_IMPLEMENTED;
9722+
}
97059723
}
97069724
}
97079725

dbcon/mysql/ha_mcs_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,7 +1392,7 @@ uint32_t doUpdateDelete(THD* thd, gp_walk_info& gwi, const std::vector<COND*>& c
13921392

13931393
gwi.clauseType = WHERE;
13941394

1395-
if (getSelectPlan(gwi, select_lex, updateCP, false, false, condStack) !=
1395+
if (getSelectPlan(gwi, select_lex, updateCP, false, false, false, condStack) !=
13961396
0) //@Bug 3030 Modify the error message for unsupported functions
13971397
{
13981398
if (gwi.cs_vtable_is_update_with_derive)
@@ -4886,7 +4886,7 @@ int ha_mcs_impl_group_by_end(TABLE* table)
48864886
* RETURN:
48874887
* rc as int
48884888
***********************************************************/
4889-
int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
4889+
int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool isSelectLexUnit)
48904890
{
48914891
IDEBUG(cout << "pushdown_init for table " << endl);
48924892
THD* thd = current_thd;
@@ -5076,7 +5076,7 @@ int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table)
50765076
if (handler_info->hndl_type == mcs_handler_types_t::SELECT)
50775077
{
50785078
sh = reinterpret_cast<ha_columnstore_select_handler*>(handler_info->hndl_ptr);
5079-
status = cs_get_select_plan(sh, thd, csep, gwi);
5079+
status = cs_get_select_plan(sh, thd, csep, gwi, isSelectLexUnit);
50805080
}
50815081
else if (handler_info->hndl_type == DERIVED)
50825082
{

dbcon/mysql/ha_mcs_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ extern int ha_mcs_impl_direct_update_delete_rows(bool execute, ha_rows* affected
4444
const std::vector<COND*>& condStack);
4545
extern int ha_mcs_impl_delete_row();
4646
extern int ha_mcs_impl_rnd_pos(uchar* buf, uchar* pos);
47-
extern int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table);
47+
extern int ha_mcs_impl_pushdown_init(mcs_handler_info* handler_info, TABLE* table, bool isSelectLexUnit = false);
4848
extern int ha_mcs_impl_select_next(uchar* buf, TABLE* table, long timeZone);
4949
extern int ha_mcs_impl_group_by_init(mcs_handler_info* handler_info, TABLE* table);
5050
extern int ha_mcs_impl_group_by_next(TABLE* table, long timeZone);

dbcon/mysql/ha_mcs_impl_if.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,9 @@ int cp_get_group_plan(THD* thd, execplan::SCSEP& csep, cal_impl_if::cal_group_in
397397
int cs_get_derived_plan(ha_columnstore_derived_handler* handler, THD* thd, execplan::SCSEP& csep,
398398
gp_walk_info& gwi);
399399
int cs_get_select_plan(ha_columnstore_select_handler* handler, THD* thd, execplan::SCSEP& csep,
400-
gp_walk_info& gwi);
400+
gp_walk_info& gwi, bool isSelectLexUnit);
401401
int getSelectPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, bool isUnion = false,
402-
bool isSelectHandlerTop = false,
402+
bool isSelectHandlerTop = false, bool isSelectLexUnit = false,
403403
const std::vector<COND*>& condStack = std::vector<COND*>());
404404
int getGroupPlan(gp_walk_info& gwi, SELECT_LEX& select_lex, execplan::SCSEP& csep, cal_group_info& gi,
405405
bool isUnion = false);

0 commit comments

Comments
 (0)