Skip to content

Commit

Permalink
[#25151] YSQL: Simplify DmlRead pushdown binding
Browse files Browse the repository at this point in the history
Summary:
There are several places in code where pushdown is bound to Dml. The code looks like

```
if (rel_pushdown != NULL)
{
	YbDmlAppendQuals(rel_pushdown->quals, true /* is_primary */, ybScan->handle);
	YbDmlAppendColumnRefs(rel_pushdown->colrefs, true /* is_primary */, ybScan->handle);
}
if (idx_pushdown != NULL)
{
	YbDmlAppendQuals(idx_pushdown->quals, false /* is_primary */, ybScan->handle);
	YbDmlAppendColumnRefs(idx_pushdown->colrefs, false /* is_primary */, ybScan->handle);
}
```

It is reasonable to create helper function to make same code look like

```
YbApplyPrimaryPushdown(ybScan->handle, rel_pushdown);
YbApplySecondaryIndexPushdown(ybScan->handle, idx_pushdown);
```

**Note:**
In context of this diff some code polishing is performed in the `PgDml`/`PgDmlRead`/`PgDmlWrite`:
- `AppendQual` is moved from `PgDml` to `PgDmlRead`
- 2 virtual methods `PgDml::ClearColRefPBs` and `PgDml::AllocColRefPB` are substituted with the single one `PgDml::ColRefPBs` which returns reference to `ArenaList`
Jira: DB-14312

Test Plan: Jenkins

Reviewers: amartsinchyk, telgersma

Reviewed By: telgersma

Subscribers: yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D40401
  • Loading branch information
d-uspenskiy committed Dec 4, 2024
1 parent a0a91b6 commit f093177
Show file tree
Hide file tree
Showing 15 changed files with 110 additions and 192 deletions.
102 changes: 48 additions & 54 deletions src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -2811,63 +2811,72 @@ YbDmlAppendTargets(List *colrefs, YBCPgStatement handle)
}
}

/*
* YbDmlAppendQuals
*
* Add storage filter expressions to the statement.
* The expression are pushed down to DocDB and used to filter rows early to
* avoid sending them across network.
* Set is_primary to false if the filter expression is to apply to secondary
* index. In this case Var nodes must be properly adjusted to refer the index
* columns rather than main relation columns.
* For primary key scan or sequential scan is_primary should be true.
*/
void
YbDmlAppendQuals(List *quals, bool is_primary, YBCPgStatement handle)
YbAppendPrimaryColumnRef(YBCPgStatement dml, YBCPgExpr colref)
{
ListCell *lc;

foreach(lc, quals)
{
Expr *expr = (Expr *) lfirst(lc);
/* Create new PgExpr wrapper for the expression */
YBCPgExpr yb_expr = YBCNewEvalExprCall(handle, expr);
/* Add the PgExpr to the statement */
HandleYBStatus(YbPgDmlAppendQual(handle, yb_expr, is_primary));
}
HandleYBStatus(YbPgDmlAppendColumnRef(dml, colref, true /* is_primary */));
}

/*
* YbDmlAppendColumnRefs
* YbDmlAppendColumnRefsImpl
*
* Add the list of column references used by pushed down expressions to the
* statement.
* The colref list is expected to be the list of YbExprColrefDesc nodes.
* Set is_primary to false if the filter expression is to apply to secondary
* index. In this case attno field values must be properly adjusted to refer
* the index columns rather than main relation columns.
* For primary key scan or sequential scan is_primary should be true.
*/
void
YbDmlAppendColumnRefs(List *colrefs, bool is_primary, YBCPgStatement handle)
static void
YbAppendColumnRefsImpl(YBCPgStatement dml, List *colrefs, bool is_primary)
{
ListCell *lc;

foreach(lc, colrefs)
{
YbExprColrefDesc *param = lfirst_node(YbExprColrefDesc, lc);
YBCPgTypeAttrs type_attrs = { param->typmod };
/* Create new PgExpr wrapper for the column reference */
YBCPgExpr yb_expr = YBCNewColumnRef(handle,
param->attno,
param->typid,
param->collid,
&type_attrs);
/* Add the PgExpr to the statement */
HandleYBStatus(YbPgDmlAppendColumnRef(handle, yb_expr, is_primary));
HandleYBStatus(YbPgDmlAppendColumnRef(
dml,
YBCNewColumnRef(
dml, param->attno, param->typid, param->collid, &type_attrs),
is_primary));
}
}

void
YbAppendPrimaryColumnRefs(YBCPgStatement dml, List *colrefs)
{
YbAppendColumnRefsImpl(dml, colrefs, true /* is_primary */);
}

static void
YbApplyPushdownImpl(
YBCPgStatement dml, const PushdownExprs *pushdown, bool is_primary)
{
if (!pushdown)
return;

YbAppendColumnRefsImpl(dml, pushdown->colrefs, is_primary);

ListCell *lc;
foreach(lc, pushdown->quals)
{
Expr *expr = lfirst(lc);
HandleYBStatus(YbPgDmlAppendQual(
dml, YBCNewEvalExprCall(dml, expr), is_primary));
}
}

void
YbApplyPrimaryPushdown(YBCPgStatement dml, const PushdownExprs *pushdown)
{
YbApplyPushdownImpl(dml, pushdown, true /* is_primary */);
}

void
YbApplySecondaryIndexPushdown(YBCPgStatement dml, const PushdownExprs *pushdown)
{
YbApplyPushdownImpl(dml, pushdown, false /* is_primary */);
}

/*
* Begin a scan for
* SELECT <Targets> FROM <relation> USING <index> WHERE <Binds>
Expand Down Expand Up @@ -2961,23 +2970,8 @@ ybcBeginScan(Relation relation,
else
ybcSetupTargets(ybScan, &scan_plan, pg_scan_plan);

/*
* Set up pushdown expressions.
*/
if (rel_pushdown != NULL)
{
YbDmlAppendQuals(rel_pushdown->quals, true /* is_primary */,
ybScan->handle);
YbDmlAppendColumnRefs(rel_pushdown->colrefs, true /* is_primary */,
ybScan->handle);
}
if (idx_pushdown != NULL)
{
YbDmlAppendQuals(idx_pushdown->quals, false /* is_primary */,
ybScan->handle);
YbDmlAppendColumnRefs(idx_pushdown->colrefs, false /* is_primary */,
ybScan->handle);
}
YbApplyPrimaryPushdown(ybScan->handle, rel_pushdown);
YbApplySecondaryIndexPushdown(ybScan->handle, idx_pushdown);

/*
* Set the current syscatalog version (will check that we are up to
Expand Down
23 changes: 2 additions & 21 deletions src/postgres/src/backend/access/ybgin/ybginscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,8 @@ ybginrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
YBCIsRegionLocal(scan->heapRelation),
&ybso->handle));

/* Add any pushdown expression to the main table scan. */
if (scan->yb_rel_pushdown != NULL)
{
YbDmlAppendQuals(scan->yb_rel_pushdown->quals,
true /* is_primary */,
ybso->handle);
YbDmlAppendColumnRefs(scan->yb_rel_pushdown->colrefs,
true /* is_primary */,
ybso->handle);
}

/* Add any pushdown expression to the index relation scan. */
if (scan->yb_idx_pushdown != NULL)
{
YbDmlAppendQuals(scan->yb_idx_pushdown->quals,
false /* is_primary */,
ybso->handle);
YbDmlAppendColumnRefs(scan->yb_idx_pushdown->colrefs,
false /* is_primary */,
ybso->handle);
}
YbApplyPrimaryPushdown(ybso->handle, scan->yb_rel_pushdown);
YbApplySecondaryIndexPushdown(ybso->handle, scan->yb_idx_pushdown);

/* Initialize ybgin scan opaque is_exec_done. */
ybso->is_exec_done = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,9 @@ YbIncrementMasterDBCatalogVersionTableEntryImpl(
YBCPgExpr ybc_expr = YBCNewEvalExprCall(update_stmt, (Expr *) expr);

HandleYBStatus(YBCPgDmlAssignColumn(update_stmt, attnum, ybc_expr));
yb_expr = YBCNewColumnRef(update_stmt,
attnum,
INT8OID,
InvalidOid,
&type_attrs);
HandleYBStatus(YbPgDmlAppendColumnRef(update_stmt, yb_expr, true));
yb_expr = YBCNewColumnRef(
update_stmt, attnum, INT8OID, InvalidOid, &type_attrs);
YbAppendPrimaryColumnRef(update_stmt, yb_expr);

/* If breaking change set the latest breaking version to the same expression. */
if (is_breaking_change)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,9 @@ YbIncrementMasterDBLogicalClientVersionTableEntryImpl(
YBCPgExpr ybc_expr = YBCNewEvalExprCall(update_stmt, (Expr *) expr);

HandleYBStatus(YBCPgDmlAssignColumn(update_stmt, attnum, ybc_expr));
yb_expr = YBCNewColumnRef(update_stmt,
attnum,
INT8OID,
InvalidOid,
&type_attrs);
HandleYBStatus(YbPgDmlAppendColumnRef(update_stmt, yb_expr, true));
yb_expr = YBCNewColumnRef(
update_stmt, attnum, INT8OID, InvalidOid, &type_attrs);
YbAppendPrimaryColumnRef(update_stmt, yb_expr);

int rows_affected_count = 0;

Expand All @@ -222,7 +219,7 @@ bool YbIncrementMasterLogicalClientVersionTableEntry()

Datum YbGetMasterLogicalClientVersionTableEntryYbctid(Relation logical_client_version_rel,
Oid db_oid)
{
{
/*
* Construct virtual slot (db_oid, null) for computing ybctid using
* YBCComputeYBTupleIdFromSlot. Note that db_oid is the primary key so we
Expand Down
5 changes: 1 addition & 4 deletions src/postgres/src/backend/executor/nodeYbBitmapTablescan.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,7 @@ CreateYbBitmapTableScanDesc(YbBitmapTableScanState *scanstate)
scanstate->ss.ps.state);
if (recheck_pushdown)
{
YbDmlAppendQuals(recheck_pushdown->quals,
true /* is_primary */, ybScan->handle);
YbDmlAppendColumnRefs(recheck_pushdown->colrefs,
true /* is_primary */, ybScan->handle);
YbApplyPrimaryPushdown(ybScan->handle, recheck_pushdown);
pfree(recheck_pushdown);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,7 @@ YBCExecuteUpdate(ResultRelInfo *resultRelInfo,
YbDmlAppendTargets(mt_plan->ybReturningColumns, update_stmt);

/* Column references to prepare data to evaluate pushed down expressions */
YbDmlAppendColumnRefs(mt_plan->ybColumnRefs, true /* is_primary */,
update_stmt);
YbAppendPrimaryColumnRefs(update_stmt, mt_plan->ybColumnRefs);

/* Execute the statement. */

Expand Down
16 changes: 5 additions & 11 deletions src/postgres/src/backend/executor/ybc_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -488,19 +488,19 @@ ybcIterateForeignScan(ForeignScanState *node)
*
* - YbSeqNext
* - YbInstantiatePushdownParams
* - YbDmlAppendQuals/YbDmlAppendColumnRefs
* - YbApplyPrimaryPushdown/YbApplySecondaryIndexPushdown
* - IndexScan/IndexNextWithReorder/ExecReScanIndexScan
* - YbInstantiatePushdownParams
* - index_rescan
* - YbDmlAppendQuals/YbDmlAppendColumnRefs
* - YbApplyPrimaryPushdown/YbApplySecondaryIndexPushdown
* - IndexOnlyScan/ExecReScanIndexOnlyScan
* - YbInstantiatePushdownParams
* - index_rescan
* - YbDmlAppendQuals/YbDmlAppendColumnRefs
* - YbApplyPrimaryPushdown/YbApplySecondaryIndexPushdown
* - ForeignNext
* - ybcIterateForeignScan (impl of IterateForeignScan)
* - YbInstantiatePushdownParams
* - YbDmlAppendQuals/YbDmlAppendColumnRefs
* - YbApplyPrimaryPushdown/YbApplySecondaryIndexPushdown
*
* Reasoning:
*
Expand Down Expand Up @@ -555,13 +555,7 @@ ybcIterateForeignScan(ForeignScanState *node)
*/
if (!ybc_state->is_exec_done) {
ybcSetupScanTargets(node);
if (pushdown != NULL)
{
YbDmlAppendQuals(pushdown->quals, true /* is_primary */,
ybc_state->handle);
YbDmlAppendColumnRefs(pushdown->colrefs, true /* is_primary */,
ybc_state->handle);
}
YbApplyPrimaryPushdown(ybc_state->handle, pushdown);
HandleYBStatus(YBCPgExecSelect(ybc_state->handle, ybc_state->exec_params));
ybc_state->is_exec_done = true;
}
Expand Down
16 changes: 10 additions & 6 deletions src/postgres/src/include/access/yb_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,16 @@ extern void YbDmlAppendTargetsAggregate(List *aggrefs, TupleDesc tupdesc,
Relation index, bool xs_want_itup,
YBCPgStatement handle);
extern void YbDmlAppendTargets(List *colrefs, YBCPgStatement handle);
/* Add quals to the given statement. */
extern void YbDmlAppendQuals(List *quals, bool is_primary,
YBCPgStatement handle);
/* Add column references to the given statement. */
extern void YbDmlAppendColumnRefs(List *colrefs, bool is_primary,
YBCPgStatement handle);

extern void YbAppendPrimaryColumnRef(YBCPgStatement dml, YBCPgExpr colref);

extern void YbAppendPrimaryColumnRefs(YBCPgStatement dml, List *colrefs);

extern void YbApplyPrimaryPushdown(
YBCPgStatement dml, const PushdownExprs *pushdown);

extern void YbApplySecondaryIndexPushdown(
YBCPgStatement dml, const PushdownExprs *pushdown);

/*
* The ybc_idx API is used to process the following SELECT.
Expand Down
32 changes: 8 additions & 24 deletions src/yb/yql/pggate/pg_dml.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,6 @@ Status PgDml::AppendTargetPB(PgExpr* target) {
return target->PrepareForRead(this, AllocTargetPB());
}

Status PgDml::AppendQual(PgExpr* qual, bool is_primary) {
if (!is_primary) {
DCHECK(secondary_index_query_) << "The secondary index query is expected";
return secondary_index_query_->AppendQual(qual, true);
}

// Allocate associated protobuf.
auto* expr_pb = AllocQualPB();

// Populate the expr_pb with data from the qual expression.
// Side effect of PrepareForRead is to call PrepareColumnForRead on "this" being passed in
// for any column reference found in the expression. However, the serialized Postgres expressions,
// the only kind of Postgres expressions supported as quals, can not be searched.
// Their column references should be explicitly appended with AppendColumnRef()
return qual->PrepareForRead(this, expr_pb);
}

Status PgDml::AppendColumnRef(PgColumnRef* colref, bool is_primary) {
if (!is_primary) {
DCHECK(secondary_index_query_) << "The secondary index query is expected";
Expand Down Expand Up @@ -170,21 +153,22 @@ void PgDml::ColumnRefsToPB(LWPgsqlColumnRefsPB* column_refs) {

void PgDml::ColRefsToPB() {
// Remove previously set column references in case if the statement is being reexecuted
ClearColRefPBs();
auto& col_refs = ColRefPBs();
col_refs.clear();
for (const auto& col : target_.columns()) {
// Only used columns are added to the request
if (col.read_requested() || col.write_requested()) {
// Allocate a protobuf entry
auto* col_ref = AllocColRefPB();
auto& col_ref = col_refs.emplace_back();
// Add DocDB identifier
col_ref->set_column_id(col.id());
col_ref.set_column_id(col.id());
// Add Postgres identifier
col_ref->set_attno(col.attr_num());
col_ref.set_attno(col.attr_num());
// Add Postgres type information, if defined
if (col.has_pg_type_info()) {
col_ref->set_typid(col.pg_typid());
col_ref->set_typmod(col.pg_typmod());
col_ref->set_collid(col.pg_collid());
col_ref.set_typid(col.pg_typid());
col_ref.set_typmod(col.pg_typmod());
col_ref.set_collid(col.pg_collid());
}
}
}
Expand Down
Loading

0 comments on commit f093177

Please sign in to comment.