Skip to content

Commit ca06b28

Browse files
zhangyue-hashdatamy-ship-it
authored andcommitted
Push the runtime filter from HashJoin down to SeqScan.
+----------+ AttrFilter +------+ ScanKey +---------+ | HashJoin | ------------> | Hash | ---------> | SeqScan | +----------+ +------+ +---------+ If "gp_enable_runtime_filter_pushdown" is on, three steps will be run: Step 1. In ExecInitHashJoin(), try to find the mapper between the var in hashclauses and the var in SeqScan. If found we will save the mapper in AttrFilter and push them to Hash node; Step 2. We will create the range/bloom filters in AttrFilter during building hash table, and these filters will be converted to the list of ScanKey and pushed down to Seqscan when the building finishes; Step 3. ScanKeys will be used to filter slot in Seqscan. TODO: 1. support singlenode or utility mode; 2. support null value filter; 3. support Motion, SharedScan as target node; 4. support join qual like: t1.c1 = t2.c1+5;
1 parent e1c99e4 commit ca06b28

File tree

14 files changed

+1106
-8
lines changed

14 files changed

+1106
-8
lines changed

src/backend/commands/explain.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
139139
static void show_hash_info(HashState *hashstate, ExplainState *es);
140140
static void show_runtime_filter_info(RuntimeFilterState *rfstate,
141141
ExplainState *es);
142+
static void show_pushdown_runtime_filter_info(const char *qlabel,
143+
PlanState *planstate,
144+
ExplainState *es);
142145
static void show_memoize_info(MemoizeState *mstate, List *ancestors,
143146
ExplainState *es);
144147
static void show_hashagg_info(AggState *hashstate, ExplainState *es);
@@ -2495,6 +2498,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
24952498
/* fall through to print additional fields the same as SeqScan */
24962499
/* FALLTHROUGH */
24972500
case T_SeqScan:
2501+
if (gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState))
2502+
show_pushdown_runtime_filter_info("Rows Removed by Pushdown Runtime Filter",
2503+
planstate, es);
2504+
/* FALLTHROUGH */
24982505
case T_DynamicSeqScan:
24992506
case T_ValuesScan:
25002507
case T_CteScan:
@@ -4365,6 +4372,24 @@ show_instrumentation_count(const char *qlabel, int which,
43654372
}
43664373
}
43674374

4375+
/*
4376+
* If it's EXPLAIN ANALYZE, show instrumentation information with pushdown
4377+
* runtime filter.
4378+
*/
4379+
static void
4380+
show_pushdown_runtime_filter_info(const char *qlabel,
4381+
PlanState *planstate,
4382+
ExplainState *es)
4383+
{
4384+
Assert(gp_enable_runtime_filter_pushdown && IsA(planstate, SeqScanState));
4385+
4386+
if (!es->analyze || !planstate->instrument)
4387+
return;
4388+
4389+
if (planstate->instrument->prf_work)
4390+
ExplainPropertyFloat(qlabel, NULL, planstate->instrument->nfilteredPRF, 0, es);
4391+
}
4392+
43684393
/*
43694394
* Show extra information for a ForeignScan node.
43704395
*/

src/backend/commands/explain_gp.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ typedef struct CdbExplain_StatInst
5656
double nloops; /* # of run cycles for this node */
5757
double nfiltered1;
5858
double nfiltered2;
59+
bool prf_work;
60+
double nfilteredPRF;
5961
double execmemused; /* executor memory used (bytes) */
6062
double workmemused; /* work_mem actually used (bytes) */
6163
double workmemwanted; /* work_mem to avoid workfile i/o (bytes) */
@@ -885,6 +887,8 @@ cdbexplain_collectStatsFromNode(PlanState *planstate, CdbExplain_SendStatCtx *ct
885887
si->nloops = instr->nloops;
886888
si->nfiltered1 = instr->nfiltered1;
887889
si->nfiltered2 = instr->nfiltered2;
890+
si->prf_work = instr->prf_work;
891+
si->nfilteredPRF = instr->nfilteredPRF;
888892
si->workmemused = instr->workmemused;
889893
si->workmemwanted = instr->workmemwanted;
890894
si->workfileCreated = instr->workfileCreated;
@@ -1188,6 +1192,8 @@ cdbexplain_depositStatsToNode(PlanState *planstate, CdbExplain_RecvStatCtx *ctx)
11881192
instr->nloops = nodeAcc->nsimax->nloops;
11891193
instr->nfiltered1 = nodeAcc->nsimax->nfiltered1;
11901194
instr->nfiltered2 = nodeAcc->nsimax->nfiltered2;
1195+
instr->prf_work = nodeAcc->nsimax->prf_work;
1196+
instr->nfilteredPRF = nodeAcc->nsimax->nfilteredPRF;
11911197
instr->execmemused = nodeAcc->nsimax->execmemused;
11921198
instr->workmemused = nodeAcc->nsimax->workmemused;
11931199
instr->workmemwanted = nodeAcc->nsimax->workmemwanted;

src/backend/executor/nodeHash.c

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
101101
size_t size);
102102
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
103103
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
104-
104+
static void AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot);
105+
static void PushdownRuntimeFilter(HashState *node);
106+
static void FreeRuntimeFilter(HashState *node);
107+
static void ResetRuntimeFilter(HashState *node);
105108

106109
/* ----------------------------------------------------------------
107110
* ExecHash
@@ -192,7 +195,15 @@ MultiExecPrivateHash(HashState *node)
192195
{
193196
slot = ExecProcNode(outerNode);
194197
if (TupIsNull(slot))
198+
{
199+
if (gp_enable_runtime_filter_pushdown && node->filters)
200+
PushdownRuntimeFilter(node);
195201
break;
202+
}
203+
204+
if (gp_enable_runtime_filter_pushdown && node->filters)
205+
AddTupleValuesIntoRF(node, slot);
206+
196207
/* We have to compute the hash value */
197208
econtext->ecxt_outertuple = slot;
198209
bool hashkeys_null = false;
@@ -334,6 +345,7 @@ MultiExecParallelHash(HashState *node)
334345
slot = ExecProcNode(outerNode);
335346
if (TupIsNull(slot))
336347
break;
348+
337349
econtext->ecxt_outertuple = slot;
338350
if (ExecHashGetHashValue(node, hashtable, econtext, hashkeys,
339351
false, hashtable->keepNulls,
@@ -511,6 +523,9 @@ ExecEndHash(HashState *node)
511523
*/
512524
outerPlan = outerPlanState(node);
513525
ExecEndNode(outerPlan);
526+
527+
if (node->filters)
528+
FreeRuntimeFilter(node);
514529
}
515530

516531

@@ -2519,6 +2534,9 @@ ExecReScanHash(HashState *node)
25192534
*/
25202535
if (node->ps.lefttree->chgParam == NULL)
25212536
ExecReScan(node->ps.lefttree);
2537+
2538+
if (gp_enable_runtime_filter_pushdown && node->filters)
2539+
ResetRuntimeFilter(node);
25222540
}
25232541

25242542

@@ -4125,3 +4143,142 @@ get_hash_mem(void)
41254143

41264144
return (int) mem_limit;
41274145
}
4146+
4147+
/*
4148+
* Convert AttrFilter to ScanKeyData and send these runtime filters to the
4149+
* target node(seqscan).
4150+
*/
4151+
void
4152+
PushdownRuntimeFilter(HashState *node)
4153+
{
4154+
ListCell *lc;
4155+
List *scankeys;
4156+
ScanKey sk;
4157+
AttrFilter *attr_filter;
4158+
4159+
foreach (lc, node->filters)
4160+
{
4161+
scankeys = NIL;
4162+
4163+
attr_filter = lfirst(lc);
4164+
if (!IsA(attr_filter->target, SeqScanState) || attr_filter->empty)
4165+
continue;
4166+
4167+
/* bloom filter */
4168+
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
4169+
sk->sk_flags = SK_BLOOM_FILTER;
4170+
sk->sk_attno = attr_filter->lattno;
4171+
sk->sk_subtype = INT8OID;
4172+
sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
4173+
scankeys = lappend(scankeys, sk);
4174+
4175+
/* range filter */
4176+
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
4177+
sk->sk_flags = 0;
4178+
sk->sk_attno = attr_filter->lattno;
4179+
sk->sk_strategy = BTGreaterEqualStrategyNumber;
4180+
sk->sk_subtype = INT8OID;
4181+
sk->sk_argument = attr_filter->min;
4182+
scankeys = lappend(scankeys, sk);
4183+
4184+
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
4185+
sk->sk_flags = 0;
4186+
sk->sk_attno = attr_filter->lattno;
4187+
sk->sk_strategy = BTLessEqualStrategyNumber;
4188+
sk->sk_subtype = INT8OID;
4189+
sk->sk_argument = attr_filter->max;
4190+
scankeys = lappend(scankeys, sk);
4191+
4192+
/* append new runtime filters to target node */
4193+
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
4194+
sss->filters = list_concat(sss->filters, scankeys);
4195+
}
4196+
}
4197+
4198+
static void
4199+
AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot)
4200+
{
4201+
Datum val;
4202+
bool isnull;
4203+
ListCell *lc;
4204+
AttrFilter *attr_filter;
4205+
4206+
foreach (lc, node->filters)
4207+
{
4208+
attr_filter = (AttrFilter *) lfirst(lc);
4209+
4210+
val = slot_getattr(slot, attr_filter->rattno, &isnull);
4211+
if (isnull)
4212+
continue;
4213+
4214+
attr_filter->empty = false;
4215+
4216+
if ((int64_t)val < (int64_t)attr_filter->min)
4217+
attr_filter->min = val;
4218+
4219+
if ((int64_t)val > (int64_t)attr_filter->max)
4220+
attr_filter->max = val;
4221+
4222+
if (attr_filter->blm_filter)
4223+
bloom_add_element(attr_filter->blm_filter, (unsigned char *)&val, sizeof(Datum));
4224+
}
4225+
}
4226+
4227+
void
4228+
FreeRuntimeFilter(HashState *node)
4229+
{
4230+
ListCell *lc;
4231+
AttrFilter *attr_filter;
4232+
4233+
if (!node->filters)
4234+
return;
4235+
4236+
foreach (lc, node->filters)
4237+
{
4238+
attr_filter = lfirst(lc);
4239+
if (attr_filter->blm_filter)
4240+
bloom_free(attr_filter->blm_filter);
4241+
}
4242+
4243+
list_free_deep(node->filters);
4244+
node->filters = NIL;
4245+
}
4246+
4247+
void
4248+
ResetRuntimeFilter(HashState *node)
4249+
{
4250+
ListCell *lc;
4251+
AttrFilter *attr_filter;
4252+
SeqScanState *sss;
4253+
4254+
if (!node->filters)
4255+
return;
4256+
4257+
foreach (lc, node->filters)
4258+
{
4259+
attr_filter = lfirst(lc);
4260+
attr_filter->empty = true;
4261+
4262+
if (IsA(attr_filter->target, SeqScanState))
4263+
{
4264+
sss = castNode(SeqScanState, attr_filter->target);
4265+
if (sss->filters)
4266+
{
4267+
list_free_deep(sss->filters);
4268+
sss->filters = NIL;
4269+
}
4270+
}
4271+
4272+
if (attr_filter->blm_filter)
4273+
bloom_free(attr_filter->blm_filter);
4274+
4275+
attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows,
4276+
work_mem,
4277+
random());
4278+
4279+
StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
4280+
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");
4281+
attr_filter->min = LONG_MAX;
4282+
attr_filter->max = LONG_MIN;
4283+
}
4284+
}

0 commit comments

Comments
 (0)