Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
395 changes: 394 additions & 1 deletion src/backend/cdb/cdbmutate.c

Large diffs are not rendered by default.

151 changes: 69 additions & 82 deletions src/backend/gporca/libgpopt/src/base/CUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
//---------------------------------------------------------------------------
#include "gpopt/base/CUtils.h"

#include "gpos/common/CBitSet.h"
#include "gpos/common/CBitSetIter.h"
#include "gpos/common/clibwrapper.h"
#include "gpos/common/syslibwrapper.h"
#include "gpos/io/CFileDescriptor.h"
Expand Down Expand Up @@ -978,107 +980,89 @@ CUtils::FHasCTEAnchor(CExpression *pexpr)
return false;
}

// True if the distribution is replicated-like.
static BOOL
FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt)
{
return (CDistributionSpec::EdtStrictReplicated == edt ||
CDistributionSpec::EdtTaintedReplicated == edt ||
CDistributionSpec::EdtUniversal == edt);
}

struct SCTEInfo
{
ULONG cteId;
ULONG sliceId;

SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id)
{
}
};

typedef CDynamicPtrArray<SCTEInfo, CleanupDelete<SCTEInfo> > CTEInfoArray;

// Walk the physical tree, recording the slice id of every replicated
// CTE Producer and every CTE Consumer. Slices are delimited by Motion
// nodes: each non-scalar child of a Motion lives in a fresh slice --
// same motId-stack idea as in apply_shareinput_xslice.
// Collect the CTE ids of every CTE Consumer and CTE Producer found beneath the
// given expression. Scalar subtrees are skipped: a Consumer inside a scalar
// subquery runs in its own SubPlan slice and is repaired separately by the
// local-materialization pass in apply_shareinput_xslice, so it must not be
// treated as living "beneath" the current Motion here.
static void
CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice,
ULONG *pNextSlice, CTEInfoArray *prodInfos,
CTEInfoArray *consInfos)
CollectConsumersAndProducers(CExpression *pexpr, CBitSet *pbsConsumers,
CBitSet *pbsProducers)
{
GPOS_CHECK_STACK_SIZE;
GPOS_ASSERT(nullptr != pexpr);

COperator *pop = pexpr->Pop();

if (COperator::EopPhysicalCTEProducer == pop->Eopid())
if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
{
// Producer's distribution comes from its only child -- inspect
// it there. Skip non-replicated Producers; they cannot trigger
// the cross-slice issue we are checking for.
GPOS_ASSERT(1 == pexpr->Arity());
CExpression *pexprChild = (*pexpr)[0];
CDrvdPropPlan *pdpplan =
CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive());

if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt()))
{
prodInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice));
}
pbsConsumers->ExchangeSet(
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId());
}
else if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
else if (COperator::EopPhysicalCTEProducer == pop->Eopid())
{
// Consumer is a leaf -- record (cteId, curSlice) and let the
// caller decide later, once the whole tree has been walked.
consInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice));
pbsProducers->ExchangeSet(
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId());
}

BOOL isMotion = CUtils::FPhysicalMotion(pop);

for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
{
CExpression *pexprChild = (*pexpr)[ul];

if (pexprChild->Pop()->FScalar())
{
continue;
}

ULONG childSlice = curSlice;
if (isMotion)
{
(*pNextSlice)++;
childSlice = *pNextSlice;
}

CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos,
consInfos);
CollectConsumersAndProducers(pexprChild, pbsConsumers, pbsProducers);
}
}

// True if some CTE Consumer beneath pexprMotion has no matching CTE Producer
// beneath the same Motion -- i.e. the Consumer reads data produced on the
// other side of the Motion (a different slice).
static BOOL
FFoundCrossSlice(const CTEInfoArray *consInfos, const CTEInfoArray *prodInfos)
FHasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexprMotion)
{
for (ULONG ic = 0; ic < consInfos->Size(); ic++)
{
SCTEInfo *cons = (*consInfos)[ic];
CBitSet *pbsConsumers = GPOS_NEW(mp) CBitSet(mp);
CBitSet *pbsProducers = GPOS_NEW(mp) CBitSet(mp);

CollectConsumersAndProducers(pexprMotion, pbsConsumers, pbsProducers);

for (ULONG ip = 0; ip < prodInfos->Size(); ip++)
BOOL fUnpaired = false;
CBitSetIter bsiter(*pbsConsumers);
while (bsiter.Advance())
{
if (!pbsProducers->Get(bsiter.Bit()))
{
SCTEInfo *prod = (*prodInfos)[ip];
if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId)
{
return true;
}
fUnpaired = true;
break;
}
}
return false;

pbsConsumers->Release();
pbsProducers->Release();

return fUnpaired;
}

//---------------------------------------------------------------------------
// @function:
// CUtils::FHasCrossSliceReplicatedCTEConsumer
//
// @doc:
// Detect a CTE Consumer placed beneath a duplicate-hazard Motion (a
// Motion whose input is strict-replicated / universal) whose Producer is
// on the other side of that Motion. That topology depends on the
// cross-slice shared-scan protocol and hangs at execution.
//
// Unlike a CTE referenced from a scalar subquery -- whose cross-slice
// Consumer is repaired locally by apply_shareinput_xslice -- this
// broadcast/duplicate-hazard topology cannot be repaired, so the caller
// falls back to the Postgres optimizer.
//
// Mirrors greengage 51fe92e: it deliberately does NOT trigger for the
// scalar-subquery case (whose Motions are plain Gather / Redistribute,
// not duplicate-hazard), leaving that case to apply_shareinput_xslice.
//---------------------------------------------------------------------------
BOOL
CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
{
Expand All @@ -1087,19 +1071,22 @@ CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
return false;
}

CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp);
CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp);
ULONG nextSlice = 0;

CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos,
consInfos);

BOOL cross = FFoundCrossSlice(consInfos, prodInfos);
if (CUtils::FPhysicalMotion(pexpr->Pop()) &&
CUtils::FDuplicateHazardMotion(pexpr) &&
FHasUnpairedCTEConsumer(mp, pexpr))
{
return true;
}

prodInfos->Release();
consInfos->Release();
for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
{
if (FHasCrossSliceReplicatedCTEConsumer(mp, (*pexpr)[ul]))
{
return true;
}
}

return cross;
return false;
}

//---------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/backend/optimizer/plan/orca.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ optimize_query(Query *parse, int cursorOptions, ParamListInfo boundParams, Optim
collect_shareinput_producers(root, result->planTree);

/* Post-process ShareInputScan nodes */
(void) apply_shareinput_xslice(result->planTree, root);
(void) apply_shareinput_xslice(result->planTree, root, true /* is_orca */);

/*
* Fix ShareInputScans for EXPLAIN, like in standard_planner(). For all
Expand Down
2 changes: 1 addition & 1 deletion src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
* share input. need to mark material nodes that are split acrossed
* multi slices.
*/
top_plan = apply_shareinput_xslice(top_plan, root);
top_plan = apply_shareinput_xslice(top_plan, root, false /* is_orca */);
}

/* build the PlannedStmt result */
Expand Down
2 changes: 1 addition & 1 deletion src/include/cdb/cdbmutate.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ cdbmutate_warn_ctid_without_segid(struct PlannerInfo *root, struct RelOptInfo *r
extern Plan *apply_shareinput_dag_to_tree(PlannerInfo *root, Plan *plan);
extern void collect_shareinput_producers(PlannerInfo *root, Plan *plan);
extern Plan *replace_shareinput_targetlists(PlannerInfo *root, Plan *plan);
extern Plan *apply_shareinput_xslice(Plan *plan, PlannerInfo *root);
extern Plan *apply_shareinput_xslice(Plan *plan, PlannerInfo *root, bool is_orca);

extern List *getExprListFromTargetList(List *tlist, int numCols, AttrNumber *colIdx);
extern void remove_unused_initplans(Plan *plan, PlannerInfo *root);
Expand Down
22 changes: 22 additions & 0 deletions src/include/nodes/pathnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,28 @@ typedef struct ApplyShareInputContext
ApplyShareInputContextPerShare *shared_inputs; /* one for each share */
Bitmapset *qdShares; /* share_ids that are referenced from QD slices */

bool is_orca_plan; /* true when post-processing an ORCA plan */
bool walking_subplan; /* true while walking a SubPlan tree */

/*
* Track already-inlined cross-slice producers so that a second consumer
* of the same CTE in the same slice can share the inlined copy instead of
* creating yet another independent scan.
*
* Each entry maps (orig_share_id, slice) -> new_share_id.
*/
int *inlined_orig_ids; /* original share_id */
int *inlined_mot_ids; /* slice (motId) where it was inlined */
int *inlined_new_ids; /* new share_id of the inlined producer */
int inlined_count;

/*
* Consumer reference counts for original producers, used to detect
* orphaned producers after inlining (those with zero remaining consumers).
*/
int *consumer_counts; /* consumer count per producer */
int orig_shared_input_count; /* shared_input_count before inlining */

} ApplyShareInputContext;

/*----------
Expand Down
80 changes: 17 additions & 63 deletions src/test/regress/expected/qp_orca_fallback.out
Original file line number Diff line number Diff line change
Expand Up @@ -314,70 +314,24 @@ INSERT INTO tbl1 (iscalctrg, iscalcdetail)
ANALYZE tbl1;
ANALYZE tbl2;
-- end_ignore
-- Case 1: walker triggers fallback. With scalar subqueries on the CTE
-- ORCA produces a plan whose CTE Producer is replicated and Consumers
-- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported
-- and trace_fallback DETAIL says "CTE Consumer placed on a different
-- slice than its replicated Producer".
EXPLAIN (COSTS OFF)
WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
SELECT p.iscalctrg,
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r,
(SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1
FROM tbl1 p
LIMIT 1;
QUERY PLAN
----------------------------------------------------------------------------------------
Gather Motion 1:1 (slice1; segments: 1)
-> Limit
-> Seq Scan on tbl1 p
SubPlan 1
-> Limit
-> Result
Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text)
-> Materialize
-> Seq Scan on tbl2
Filter: (referenceid = '101991'::numeric)
SubPlan 2
-> Limit
-> Result
Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text)
-> Materialize
-> Seq Scan on tbl2 tbl2_1
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(18 rows)

-- Case 2: walker correctly stays silent. The same CTE referenced from a
-- JOIN: ORCA pins the Producer body to a single segment with a One-Time
-- Filter (gp_execution_segment() = N), so the Producer's child
-- distribution is EdtSingleton, not replicated -- the walker skips it.
EXPLAIN (COSTS OFF)
WITH t1 AS (SELECT * FROM tbl1),
t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
SELECT p.* FROM t1 p
JOIN t2 r ON p.iscalctrg = r.refrcode
JOIN t2 r1 ON p.iscalcdetail = r1.refrcode
LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------------------
Gather Motion 1:1 (slice1; segments: 1)
-> Limit
-> Hash Join
Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text)
-> Hash Join
Hash Cond: ((tbl2.refrcode)::text = (tbl1.iscalctrg)::text)
-> Seq Scan on tbl2
Filter: (referenceid = '101991'::numeric)
-> Hash
-> Seq Scan on tbl1
-> Hash
-> Subquery Scan on r1
-> Seq Scan on tbl2 tbl2_1
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(15 rows)
-- The native scalar-subquery case and the silent JOIN case live in the
-- shared_scan test. A replicated CTE joined to a DISTRIBUTED table: ORCA pins
-- the producer to one segment here too, so it is handled natively (no fallback)
-- and the result is correct.
CREATE TABLE dist_t (a int, b int) DISTRIBUTED BY (a);
INSERT INTO dist_t SELECT i, i % 5 FROM generate_series(1, 50000) i;
ANALYZE dist_t;
WITH cte AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991)
SELECT count(*)
FROM dist_t d
JOIN cte c1 ON d.b = c1.id
JOIN cte c2 ON d.a = c2.id;
count
-------
40000
(1 row)

DROP TABLE dist_t;
DROP TABLE tbl1, tbl2;
-- start_ignore
-- FIXME: gpcheckcat fails due to mismatching distribution policy if this table isn't dropped
Expand Down
Loading
Loading