Skip to content

Commit 85be9d0

Browse files
committed
Push the runtime filter to table am.
1 parent 8226b7d commit 85be9d0

15 files changed

Lines changed: 807 additions & 66 deletions

File tree

src/backend/executor/nodeHash.c

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2584,6 +2584,7 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
25842584
Instrumentation *jinstrument = hjstate->js.ps.instrument;
25852585
int total_buckets;
25862586
int i;
2587+
HashState *hashState = (HashState *) innerPlanState(hjstate);
25872588

25882589
if (!hashtable ||
25892590
!hashtable->stats ||
@@ -2598,11 +2599,13 @@ ExecHashTableExplainEnd(PlanState *planstate, struct StringInfoData *buf)
25982599

25992600
if (!hashtable->eagerlyReleased)
26002601
{
2601-
HashState *hashState = (HashState *) innerPlanState(hjstate);
2602-
26032602
/* Report on batch in progress, in case the join is being ended early. */
26042603
ExecHashTableExplainBatchEnd(hashState, hashtable);
26052604
}
2605+
if (gp_enable_runtime_filter_pushdown && hashState->filters)
2606+
{
2607+
ExecRFExplainEnd(hashState, buf);
2608+
}
26062609

26072610
/* Report actual work_mem high water mark. */
26082611
jinstrument->workmemused = Max(jinstrument->workmemused, stats->workmem_max);
@@ -4161,52 +4164,83 @@ PushdownRuntimeFilter(HashState *node)
41614164
scankeys = NIL;
41624165

41634166
attr_filter = lfirst(lc);
4164-
if (attr_filter->empty ||
4167+
if (attr_filter->empty || attr_filter->hasnulls ||
41654168
(!IsA(attr_filter->target, SeqScanState) &&
41664169
!IsA(attr_filter->target, DynamicSeqScanState)))
41674170
continue;
41684171

4172+
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
41694173
/* bloom filter */
41704174
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41714175
sk->sk_flags = SK_BLOOM_FILTER;
41724176
sk->sk_attno = attr_filter->lattno;
41734177
sk->sk_subtype = INT8OID;
41744178
sk->sk_argument = PointerGetDatum(attr_filter->blm_filter);
4179+
sk->sk_collation = attr_filter->collation;
41754180
scankeys = lappend(scankeys, sk);
41764181

4182+
if (attr_filter->n_distinct > 0)
4183+
{
4184+
int64 range = attr_filter->max - attr_filter->min + 1;
4185+
if ((range / attr_filter->n_distinct) > gp_runtime_filter_selectivity_threshold)
4186+
{
4187+
/* push previous scankeys */
4188+
sss->filters = list_concat(sss->filters, scankeys);
4189+
continue;
4190+
}
4191+
}
41774192
/* range filter */
41784193
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41794194
sk->sk_flags = 0;
41804195
sk->sk_attno = attr_filter->lattno;
41814196
sk->sk_strategy = BTGreaterEqualStrategyNumber;
4182-
sk->sk_subtype = INT8OID;
4197+
sk->sk_subtype = attr_filter->vartype;
41834198
sk->sk_argument = attr_filter->min;
4199+
sk->sk_collation = attr_filter->collation;
41844200
scankeys = lappend(scankeys, sk);
41854201

41864202
sk = (ScanKey)palloc0(sizeof(ScanKeyData));
41874203
sk->sk_flags = 0;
41884204
sk->sk_attno = attr_filter->lattno;
41894205
sk->sk_strategy = BTLessEqualStrategyNumber;
4190-
sk->sk_subtype = INT8OID;
4206+
sk->sk_subtype = attr_filter->vartype;
41914207
sk->sk_argument = attr_filter->max;
4208+
sk->sk_collation = attr_filter->collation;
41924209
scankeys = lappend(scankeys, sk);
41934210

41944211
/* append new runtime filters to target node */
41954212
if (IsA(attr_filter->target, SeqScanState))
41964213
{
41974214
SeqScanState *sss = castNode(SeqScanState, attr_filter->target);
4198-
sss->filters = list_concat(sss->filters, scankeys);
4215+
if (sss->ss.ss_currentScanDesc != NULL)
4216+
{
4217+
/* if seqscan is started, we can't pushdown the runtime filter */
4218+
list_free_deep(scankeys);
4219+
}
4220+
else
4221+
{
4222+
sss->filters = list_concat(sss->filters, scankeys);
4223+
}
41994224
}
42004225
else if (IsA(attr_filter->target, DynamicSeqScanState))
42014226
{
42024227
DynamicSeqScanState *dsss = castNode(DynamicSeqScanState, attr_filter->target);
4203-
dsss->filters = list_concat(dsss->filters, scankeys);
4228+
if (dsss->ss.ss_currentScanDesc != NULL)
4229+
{
4230+
/* if dynamic seqscan is started, we can't pushdown the runtime filter */
4231+
list_free_deep(scankeys);
4232+
}
4233+
else
4234+
{
4235+
dsss->filters = list_concat(dsss->filters, scankeys);
4236+
}
42044237
}
42054238
else
42064239
{
42074240
/* never reach here */
42084241
pg_unreachable();
42094242
}
4243+
42104244
}
42114245
}
42124246

@@ -4221,10 +4255,15 @@ AddTupleValuesIntoRF(HashState *node, TupleTableSlot *slot)
42214255
foreach (lc, node->filters)
42224256
{
42234257
attr_filter = (AttrFilter *) lfirst(lc);
4258+
if (attr_filter->hasnulls)
4259+
continue;
42244260

42254261
val = slot_getattr(slot, attr_filter->rattno, &isnull);
42264262
if (isnull)
4263+
{
4264+
attr_filter->hasnulls = true;
42274265
continue;
4266+
}
42284267

42294268
attr_filter->empty = false;
42304269

@@ -4274,6 +4313,7 @@ ResetRuntimeFilter(HashState *node)
42744313
{
42754314
attr_filter = lfirst(lc);
42764315
attr_filter->empty = true;
4316+
attr_filter->hasnulls = false;
42774317

42784318
if (IsA(attr_filter->target, SeqScanState))
42794319
{
@@ -4304,7 +4344,7 @@ ResetRuntimeFilter(HashState *node)
43044344

43054345
attr_filter->blm_filter = bloom_create_aggresive(node->ps.plan->plan_rows,
43064346
work_mem,
4307-
random());
4347+
gp_session_id);
43084348

43094349
StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
43104350
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");

src/backend/executor/nodeHashjoin.c

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110

111111
#include "access/htup_details.h"
112112
#include "access/parallel.h"
113+
#include "catalog/pg_statistic.h"
114+
#include "catalog/pg_namespace.h"
113115
#include "executor/executor.h"
114116
#include "executor/hashjoin.h"
115117
#include "executor/instrument.h" /* Instrumentation */
@@ -118,9 +120,12 @@
118120
#include "executor/nodeRuntimeFilter.h"
119121
#include "miscadmin.h"
120122
#include "pgstat.h"
123+
#include "utils/datum.h"
121124
#include "utils/guc.h"
122125
#include "utils/fmgroids.h"
126+
#include "utils/lsyscache.h"
123127
#include "utils/memutils.h"
128+
#include "utils/rel.h"
124129
#include "utils/sharedtuplestore.h"
125130

126131
#include "cdb/cdbvars.h"
@@ -168,10 +173,10 @@ static bool IsEqualOp(Expr *expr);
168173
static bool CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno);
169174
static bool CheckTargetNode(PlanState *node,
170175
AttrNumber attno,
171-
AttrNumber *lattno);
176+
AttrNumber *lattno, Oid *collation, Oid *var_type);
172177
static List *FindTargetNodes(HashJoinState *hjstate,
173178
AttrNumber attno,
174-
AttrNumber *lattno);
179+
AttrNumber *lattno, Oid *collation, Oid *var_type);
175180
static AttrFilter *CreateAttrFilter(PlanState *target,
176181
AttrNumber lattno,
177182
AttrNumber rattno,
@@ -2192,6 +2197,8 @@ CreateRuntimeFilter(HashJoinState* hjstate)
21922197
AttrFilter *attr_filter;
21932198
ListCell *lc;
21942199
List *targets;
2200+
Oid var_type;
2201+
Oid collation;
21952202

21962203
/*
21972204
* A build-side Bloom filter tells us if a row is definitely not in the build
@@ -2232,7 +2239,7 @@ CreateRuntimeFilter(HashJoinState* hjstate)
22322239
if (lattno < 1 || rattno < 1)
22332240
continue;
22342241

2235-
targets = FindTargetNodes(hjstate, lattno, &lattno);
2242+
targets = FindTargetNodes(hjstate, lattno, &lattno, &collation, &var_type);
22362243
if (lattno == -1 || targets == NULL)
22372244
continue;
22382245

@@ -2243,6 +2250,8 @@ CreateRuntimeFilter(HashJoinState* hjstate)
22432250

22442251
attr_filter = CreateAttrFilter(target, lattno, rattno,
22452252
hstate->ps.plan->plan_rows);
2253+
attr_filter->vartype = var_type;
2254+
attr_filter->collation = collation;
22462255
if (attr_filter->blm_filter)
22472256
hstate->filters = lappend(hstate->filters, attr_filter);
22482257
else
@@ -2329,7 +2338,7 @@ CheckEqualArgs(Expr *expr, AttrNumber *lattno, AttrNumber *rattno)
23292338
}
23302339

23312340
static bool
2332-
CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
2341+
CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno, Oid *collation, Oid *var_type)
23332342
{
23342343
Var *var;
23352344
TargetEntry *te;
@@ -2360,6 +2369,8 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
23602369
return false;
23612370

23622371
*lattno = var->varattno;
2372+
*collation = var->varcollid;
2373+
*var_type = var->vartype;
23632374

23642375
return true;
23652376
}
@@ -2372,7 +2383,7 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno)
23722383
* SeqScan <- target
23732384
*/
23742385
static List *
2375-
FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
2386+
FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno, Oid *collation, Oid *var_type)
23762387
{
23772388
Var *var;
23782389
PlanState *child, *parent;
@@ -2396,7 +2407,7 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
23962407
* [result]
23972408
* seqscan | dynamicseqscan
23982409
*/
2399-
if (!CheckTargetNode(child, attno, lattno))
2410+
if (!CheckTargetNode(child, attno, lattno, collation, var_type))
24002411
return NULL;
24012412

24022413
targetNodes = lappend(targetNodes, child);
@@ -2414,7 +2425,7 @@ FindTargetNodes(HashJoinState *hjstate, AttrNumber attno, AttrNumber *lattno)
24142425
for (int i = 0; i < as->as_nplans; i++)
24152426
{
24162427
child = as->appendplans[i];
2417-
if (!CheckTargetNode(child, attno, lattno))
2428+
if (!CheckTargetNode(child, attno, lattno, collation, var_type))
24182429
return NULL;
24192430

24202431
targetNodes = lappend(targetNodes, child);
@@ -2462,12 +2473,25 @@ CreateAttrFilter(PlanState *target, AttrNumber lattno, AttrNumber rattno,
24622473
{
24632474
AttrFilter *attr_filter = palloc0(sizeof(AttrFilter));
24642475
attr_filter->empty = true;
2476+
attr_filter->hasnulls = false;
24652477
attr_filter->target = target;
24662478

24672479
attr_filter->lattno = lattno;
24682480
attr_filter->rattno = rattno;
2481+
attr_filter->n_distinct = 0.0;
24692482

2470-
attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, random());
2483+
attr_filter->blm_filter = bloom_create_aggresive(plan_rows, work_mem, gp_session_id);
2484+
2485+
if (target && IsA(target, SeqScanState))
2486+
{
2487+
HeapTuple statstuple;
2488+
SeqScanState *scan = (SeqScanState *)target;
2489+
statstuple = get_att_stats(RelationGetRelid(scan->ss.ss_currentRelation), lattno);
2490+
if (HeapTupleIsValid(statstuple))
2491+
{
2492+
attr_filter->n_distinct = ((Form_pg_statistic) GETSTRUCT(statstuple))->stadistinct;
2493+
}
2494+
}
24712495

24722496
StaticAssertDecl(sizeof(LONG_MAX) == sizeof(Datum), "sizeof(LONG_MAX) should be equal to sizeof(Datum)");
24732497
StaticAssertDecl(sizeof(LONG_MIN) == sizeof(Datum), "sizeof(LONG_MIN) should be equal to sizeof(Datum)");

src/backend/executor/nodeRuntimeFilter.c

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@
2828

2929
#include "postgres.h"
3030

31+
#include "access/tableam.h"
3132
#include "catalog/pg_type.h"
33+
#include "catalog/pg_statistic.h"
34+
#include "catalog/pg_namespace.h"
3235
#include "executor/executor.h"
3336
#include "executor/hashjoin.h"
3437
#include "executor/nodeRuntimeFilter.h"
3538
#include "lib/bloomfilter.h"
3639
#include "miscadmin.h"
3740
#include "nodes/nodeFuncs.h"
3841
#include "nodes/pg_list.h"
42+
#include "utils/datum.h"
3943
#include "utils/lsyscache.h"
4044

4145
#include "cdb/cdbvars.h"
@@ -325,4 +329,28 @@ RFFillTupleValues(RuntimeFilterState *rfstate, List *values)
325329
rfstate->value_buf[idx] = *dp;
326330
idx++;
327331
}
328-
}
332+
}
333+
334+
void
335+
ExecRFExplainEnd(HashState *hashState, struct StringInfoData *buf)
336+
{
337+
ListCell *lc;
338+
AttrFilter *attr_filter;
339+
SeqScanState *sss;
340+
341+
if (!hashState->filters)
342+
return;
343+
344+
foreach (lc, hashState->filters)
345+
{
346+
attr_filter = lfirst(lc);
347+
if (attr_filter->empty || attr_filter->hasnulls)
348+
continue;
349+
350+
sss = castNode(SeqScanState, attr_filter->target);
351+
appendStringInfo(buf, "RF: %s attrno: %d, range[%ld, %ld], n_distinct: %.2f\n",
352+
RelationGetRelationName(sss->ss.ss_currentRelation),
353+
attr_filter->lattno, (int64_t) attr_filter->min,
354+
(int64_t) attr_filter->max, attr_filter->n_distinct);
355+
}
356+
}

0 commit comments

Comments
 (0)