diff --git a/src/backend/cdb/cdbmutate.c b/src/backend/cdb/cdbmutate.c index 4b98f58dcd8..36ed4994d32 100644 --- a/src/backend/cdb/cdbmutate.c +++ b/src/backend/cdb/cdbmutate.c @@ -35,6 +35,7 @@ #include "commands/trigger.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "catalog/gp_distribution_policy.h" #include "cdb/cdbhash.h" #include "cdb/cdbllize.h" @@ -963,6 +964,11 @@ shareinput_mutator_xslice_1(Node *node, PlannerInfo *root, bool fPop) /* Remember information about the slice that this instance appears in. */ if (shared) ctxt->shared_inputs[sisc->share_id].producer_slice_id = motId; + else if (sisc->share_id < ctxt->orig_shared_input_count) + { + /* Consumer: count references to original producers. */ + ctxt->consumer_counts[sisc->share_id]++; + } share_info->participant_slices = bms_add_member(share_info->participant_slices, motId); sisc->this_slice_id = motId; @@ -971,6 +977,248 @@ shareinput_mutator_xslice_1(Node *node, PlannerInfo *root, bool fPop) return true; } +/* + * Is 'plan' safe to materialize locally in each consumer slice? + * + * A local copy is only valid if the subtree yields identical output on every + * segment. That holds when the subtree contains no Motion (nothing is moved + * between segments) and every base-relation scan is over a DISTRIBUTED + * REPLICATED table. Anything else -- a Motion, a scan over a distributed + * table, a non-relation scan (VALUES/function/etc.), or an unresolvable nested + * shared scan -- means the per-segment output may differ, so we must not copy. + * + * This generalizes the "single base-table scan" case to any replicated + * producer subtree (UNION ALL / Append, aggregates, joins of replicated + * tables, partitioned scans, ...). + */ +static bool +shareinput_subtree_is_replicated(Plan *plan, List *rtable) +{ + ListCell *lc; + + if (plan == NULL) + return true; + + /* A Motion moves rows between segments: output is not purely local. */ + if (IsA(plan, Motion)) + return false; + + switch (nodeTag(plan)) + { + case T_SeqScan: + case T_IndexScan: + case T_IndexOnlyScan: + case T_BitmapHeapScan: + case T_TidScan: + case T_DynamicSeqScan: + case T_DynamicIndexScan: + case T_DynamicIndexOnlyScan: + case T_DynamicBitmapHeapScan: + { + Index scanrelid = ((Scan *) plan)->scanrelid; + RangeTblEntry *rte; + GpPolicy *policy; + bool replicated; + + if (scanrelid == 0 || scanrelid > (Index) list_length(rtable)) + return false; + rte = rt_fetch(scanrelid, rtable); + if (rte->rtekind != RTE_RELATION) + return false; + policy = GpPolicyFetch(rte->relid); + replicated = (policy != NULL && + policy->ptype == POLICYTYPE_REPLICATED); + if (policy) + pfree(policy); + return replicated; + } + + /* Scans that are not replicated base relations -- not safe to copy. */ + case T_ValuesScan: + case T_FunctionScan: + case T_TableFuncScan: + case T_CteScan: + case T_WorkTableScan: + case T_NamedTuplestoreScan: + case T_ForeignScan: + case T_SampleScan: + return false; + + case T_SubqueryScan: + return shareinput_subtree_is_replicated(((SubqueryScan *) plan)->subplan, + rtable); + + case T_ShareInputScan: + /* + * A nested shared scan consumer (no subtree) refers to another CTE + * whose locality we cannot resolve here -- be conservative. A + * nested producer (with a subtree) is validated through its + * lefttree below. + */ + if (plan->lefttree == NULL) + return false; + break; + + default: + break; + } + + /* Recurse into all child plans; every one must be replicated-safe. */ + if (IsA(plan, Append)) + { + foreach(lc, ((Append *) plan)->appendplans) + if (!shareinput_subtree_is_replicated((Plan *) lfirst(lc), rtable)) + return false; + } + else if (IsA(plan, MergeAppend)) + { + foreach(lc, ((MergeAppend *) plan)->mergeplans) + if (!shareinput_subtree_is_replicated((Plan *) lfirst(lc), rtable)) + return false; + } + else if (IsA(plan, Sequence)) + { + foreach(lc, ((Sequence *) plan)->subplans) + if (!shareinput_subtree_is_replicated((Plan *) lfirst(lc), rtable)) + return false; + } + else if (IsA(plan, BitmapAnd)) + { + foreach(lc, ((BitmapAnd *) plan)->bitmapplans) + if (!shareinput_subtree_is_replicated((Plan *) lfirst(lc), rtable)) + return false; + } + else if (IsA(plan, BitmapOr)) + { + foreach(lc, ((BitmapOr *) plan)->bitmapplans) + if (!shareinput_subtree_is_replicated((Plan *) lfirst(lc), rtable)) + return false; + } + else + { + if (!shareinput_subtree_is_replicated(plan->lefttree, rtable)) + return false; + if (!shareinput_subtree_is_replicated(plan->righttree, rtable)) + return false; + } + + return true; +} + +/* + * Try to repair a cross-slice ShareInputScan consumer that reads a CTE built + * over a DISTRIBUTED REPLICATED table. + * + * When such a CTE is referenced from a scalar subquery, ORCA places the + * SharedScan producer and consumer in different slices. The cross-slice + * temp-file protocol then expects the producer to have written its tuplestore + * on every consumer segment, which is not the case, and execution hangs. + * + * Since the source is replicated, every segment already holds the full data, + * so a local copy of the producer's subtree (any replicated subtree -- see + * shareinput_subtree_is_replicated) is equivalent. Give this consumer its own + * copy with a fresh share_id, turning + * it into an intra-slice producer that materializes locally instead of reading + * cross-slice temp files. Sibling consumers of the same CTE in the same slice + * reuse this copy (tracked by (orig_share_id, motId) -> new_share_id), so the + * CTE is materialized once and read by all references. + * + * Returns true if the consumer was inlined. + */ +static bool +shareinput_inline_replicated_consumer(ShareInputScan *sisc, int motId, + PlannerInfo *root, + ApplyShareInputContext *ctxt) +{ + PlannerGlobal *glob = root->glob; + int origShareId = sisc->share_id; + Plan *producerChild; + Plan *newChild; + int newShareId; + int k; + + /* + * Have we already inlined a consumer for this same original share_id in + * this same slice? If so, make this consumer a reader of the existing + * inlined producer instead of creating another copy. + */ + for (k = 0; k < ctxt->inlined_count; k++) + { + if (ctxt->inlined_orig_ids[k] == origShareId && + ctxt->inlined_mot_ids[k] == motId) + { + if (origShareId < ctxt->orig_shared_input_count) + ctxt->consumer_counts[origShareId]--; + + sisc->share_id = ctxt->inlined_new_ids[k]; + sisc->cross_slice = false; + sisc->producer_slice_id = motId; + sisc->nconsumers = 0; + return true; + } + } + + /* Locate the producer's subtree. */ + if (origShareId >= ctxt->shared_input_count) + return false; + producerChild = ctxt->shared_plans[origShareId]; + if (producerChild == NULL) + return false; + + /* + * Only inline when the whole producer subtree is replicated, i.e. it yields + * identical output on every segment (no Motion, all base scans replicated). + * Then the local copy is equivalent to the cross-slice shared scan. + */ + if (!shareinput_subtree_is_replicated(producerChild, glob->finalrtable)) + return false; + + /* + * Deep-copy the producer's subtree and turn this consumer into a local + * intra-slice producer with a fresh share_id. + */ + if (origShareId < ctxt->orig_shared_input_count) + ctxt->consumer_counts[origShareId]--; + + newChild = (Plan *) copyObject(producerChild); + newShareId = ctxt->shared_input_count; + + sisc->scan.plan.lefttree = newChild; + sisc->share_id = newShareId; + sisc->cross_slice = false; + sisc->producer_slice_id = motId; + sisc->nconsumers = 0; + + /* + * Register the new producer subtree so that replace_shareinput_targetlists() + * can build an RTE for it. This grows shared_plans / shared_input_count. + */ + shareinput_save_producer(sisc, ctxt); + + /* Record the mapping so sibling consumers in this slice can reuse it. */ + if (ctxt->inlined_count == 0) + { + ctxt->inlined_orig_ids = palloc(sizeof(int)); + ctxt->inlined_mot_ids = palloc(sizeof(int)); + ctxt->inlined_new_ids = palloc(sizeof(int)); + } + else + { + ctxt->inlined_orig_ids = repalloc(ctxt->inlined_orig_ids, + (ctxt->inlined_count + 1) * sizeof(int)); + ctxt->inlined_mot_ids = repalloc(ctxt->inlined_mot_ids, + (ctxt->inlined_count + 1) * sizeof(int)); + ctxt->inlined_new_ids = repalloc(ctxt->inlined_new_ids, + (ctxt->inlined_count + 1) * sizeof(int)); + } + ctxt->inlined_orig_ids[ctxt->inlined_count] = origShareId; + ctxt->inlined_mot_ids[ctxt->inlined_count] = motId; + ctxt->inlined_new_ids[ctxt->inlined_count] = newShareId; + ctxt->inlined_count++; + + return true; +} + /* * Second pass: * 1. Mark shareinput scans with multiple consumer slices as cross-slice. @@ -1006,6 +1254,22 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop) pershare = &ctxt->shared_inputs[sisc->share_id]; + /* + * A cross-slice consumer of a replicated-table CTE inside a SubPlan + * cannot use the cross-slice temp-file protocol (it hangs). Give it a + * local copy of the producer's subtree instead. If that succeeds the + * consumer becomes an intra-slice producer and the generic cross-slice + * bookkeeping below must be skipped. + */ + if (ctxt->is_orca_plan && + ctxt->walking_subplan && + plan->lefttree == NULL && + pershare->producer_slice_id != motId && + shareinput_inline_replicated_consumer(sisc, motId, root, ctxt)) + { + return true; + } + if (bms_num_members(pershare->participant_slices) > 1) { Assert(!sisc->cross_slice); @@ -1040,12 +1304,110 @@ shareinput_mutator_xslice_2(Node *node, PlannerInfo *root, bool fPop) return true; } +/* + * cleanup_orphaned_producers + * After inlining cross-slice replicated CTE consumers, some original + * producers may have lost all of their consumers. Remove those orphaned + * ShareInputScan producers from Sequence nodes to eliminate unnecessary + * scans, and collapse Sequence nodes that end up with a single child. + */ +static Plan * +cleanup_orphaned_producers(Plan *plan, ApplyShareInputContext *ctxt) +{ + if (plan == NULL) + return NULL; + + if (IsA(plan, Sequence)) + { + Sequence *seq = (Sequence *) plan; + List *newplans = NIL; + ListCell *lc; + + foreach(lc, seq->subplans) + { + Plan *child = (Plan *) lfirst(lc); + + child = cleanup_orphaned_producers(child, ctxt); + + /* Skip orphaned ShareInputScan producers (those with a subtree). */ + if (IsA(child, ShareInputScan) && child->lefttree != NULL) + { + ShareInputScan *sisc = (ShareInputScan *) child; + + if (sisc->share_id < ctxt->orig_shared_input_count && + ctxt->consumer_counts[sisc->share_id] == 0) + continue; + } + + /* + * Flatten nested Sequences: if a child is itself a Sequence, + * splice its children into this level. + */ + if (IsA(child, Sequence)) + { + Sequence *inner = (Sequence *) child; + ListCell *lc2; + + foreach(lc2, inner->subplans) + newplans = lappend(newplans, lfirst(lc2)); + } + else + newplans = lappend(newplans, child); + } + + seq->subplans = newplans; + return plan; + } + + if (IsA(plan, Append)) + { + ListCell *lc; + + foreach(lc, ((Append *) plan)->appendplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, MergeAppend)) + { + ListCell *lc; + + foreach(lc, ((MergeAppend *) plan)->mergeplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapAnd)) + { + ListCell *lc; + + foreach(lc, ((BitmapAnd *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, BitmapOr)) + { + ListCell *lc; + + foreach(lc, ((BitmapOr *) plan)->bitmapplans) + lfirst(lc) = cleanup_orphaned_producers((Plan *) lfirst(lc), ctxt); + } + else if (IsA(plan, SubqueryScan)) + { + SubqueryScan *sub = (SubqueryScan *) plan; + + sub->subplan = cleanup_orphaned_producers(sub->subplan, ctxt); + } + else + { + plan->lefttree = cleanup_orphaned_producers(plan->lefttree, ctxt); + plan->righttree = cleanup_orphaned_producers(plan->righttree, ctxt); + } + + return plan; +} + /* * Scan through the plan tree and make note of which Share Input Scans * are cross-slice. */ Plan * -apply_shareinput_xslice(Plan *plan, PlannerInfo *root) +apply_shareinput_xslice(Plan *plan, PlannerInfo *root, bool is_orca) { PlannerGlobal *glob = root->glob; ApplyShareInputContext *ctxt = &glob->share; @@ -1067,6 +1429,25 @@ apply_shareinput_xslice(Plan *plan, PlannerInfo *root) ctxt->shared_inputs = palloc0(ctxt->shared_input_count * sizeof(ApplyShareInputContextPerShare)); + /* + * The cross-slice replicated-CTE inlining below structurally rewrites the + * plan tree (adds local producers, drops orphaned ones). That is only safe + * -- and only needed -- for ORCA plans, where this pass runs before + * replace_shareinput_targetlists() and slice-table construction. In the + * standard planner those steps have already run by the time we get here, and + * the Postgres fallback never produces the problematic cross-slice + * replicated shared scan in the first place (it uses InitPlans), so the + * transformation is disabled there. + */ + ctxt->is_orca_plan = is_orca; + ctxt->walking_subplan = false; + ctxt->inlined_orig_ids = NULL; + ctxt->inlined_mot_ids = NULL; + ctxt->inlined_new_ids = NULL; + ctxt->inlined_count = 0; + ctxt->orig_shared_input_count = ctxt->shared_input_count; + ctxt->consumer_counts = palloc0(ctxt->shared_input_count * sizeof(int)); + shareinput_pushmot(ctxt, 0); /* @@ -1089,7 +1470,9 @@ apply_shareinput_xslice(Plan *plan, PlannerInfo *root) int slice_id = glob->subplan_sliceIds[subplan_id]; shareinput_pushmot(ctxt, slice_id); + ctxt->walking_subplan = true; shareinput_walker(shareinput_mutator_xslice_1, (Node *) subplan, subroot); + ctxt->walking_subplan = false; shareinput_popmot(ctxt); subplan_id++; } @@ -1104,12 +1487,22 @@ apply_shareinput_xslice(Plan *plan, PlannerInfo *root) int slice_id = glob->subplan_sliceIds[subplan_id]; shareinput_pushmot(ctxt, slice_id); + ctxt->walking_subplan = true; shareinput_walker(shareinput_mutator_xslice_2, (Node *) subplan, subroot); + ctxt->walking_subplan = false; shareinput_popmot(ctxt); subplan_id++; } shareinput_walker(shareinput_mutator_xslice_2, (Node *) plan, root); + /* + * Cleanup: remove orphaned ShareInputScan producers from Sequence nodes. + * After inlining, some original producers may have lost all consumers; + * keeping them would cause unnecessary scans at execution time. + */ + if (ctxt->inlined_count > 0) + plan = cleanup_orphaned_producers(plan, ctxt); + return plan; } diff --git a/src/backend/gporca/libgpopt/src/base/CUtils.cpp b/src/backend/gporca/libgpopt/src/base/CUtils.cpp index 4776c602fcd..9175bfacc67 100644 --- a/src/backend/gporca/libgpopt/src/base/CUtils.cpp +++ b/src/backend/gporca/libgpopt/src/base/CUtils.cpp @@ -10,6 +10,8 @@ //--------------------------------------------------------------------------- #include "gpopt/base/CUtils.h" +#include "gpos/common/CBitSet.h" +#include "gpos/common/CBitSetIter.h" #include "gpos/common/clibwrapper.h" #include "gpos/common/syslibwrapper.h" #include "gpos/io/CFileDescriptor.h" @@ -978,107 +980,89 @@ CUtils::FHasCTEAnchor(CExpression *pexpr) return false; } -// True if the distribution is replicated-like. -static BOOL -FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt) -{ - return (CDistributionSpec::EdtStrictReplicated == edt || - CDistributionSpec::EdtTaintedReplicated == edt || - CDistributionSpec::EdtUniversal == edt); -} - -struct SCTEInfo -{ - ULONG cteId; - ULONG sliceId; - - SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id) - { - } -}; - -typedef CDynamicPtrArray > CTEInfoArray; - -// Walk the physical tree, recording the slice id of every replicated -// CTE Producer and every CTE Consumer. Slices are delimited by Motion -// nodes: each non-scalar child of a Motion lives in a fresh slice -- -// same motId-stack idea as in apply_shareinput_xslice. +// Collect the CTE ids of every CTE Consumer and CTE Producer found beneath the +// given expression. Scalar subtrees are skipped: a Consumer inside a scalar +// subquery runs in its own SubPlan slice and is repaired separately by the +// local-materialization pass in apply_shareinput_xslice, so it must not be +// treated as living "beneath" the current Motion here. static void -CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice, - ULONG *pNextSlice, CTEInfoArray *prodInfos, - CTEInfoArray *consInfos) +CollectConsumersAndProducers(CExpression *pexpr, CBitSet *pbsConsumers, + CBitSet *pbsProducers) { GPOS_CHECK_STACK_SIZE; GPOS_ASSERT(nullptr != pexpr); COperator *pop = pexpr->Pop(); - if (COperator::EopPhysicalCTEProducer == pop->Eopid()) + if (COperator::EopPhysicalCTEConsumer == pop->Eopid()) { - // Producer's distribution comes from its only child -- inspect - // it there. Skip non-replicated Producers; they cannot trigger - // the cross-slice issue we are checking for. - GPOS_ASSERT(1 == pexpr->Arity()); - CExpression *pexprChild = (*pexpr)[0]; - CDrvdPropPlan *pdpplan = - CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive()); - - if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt())) - { - prodInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice)); - } + pbsConsumers->ExchangeSet( + CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId()); } - else if (COperator::EopPhysicalCTEConsumer == pop->Eopid()) + else if (COperator::EopPhysicalCTEProducer == pop->Eopid()) { - // Consumer is a leaf -- record (cteId, curSlice) and let the - // caller decide later, once the whole tree has been walked. - consInfos->Append(GPOS_NEW(mp) SCTEInfo( - CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice)); + pbsProducers->ExchangeSet( + CPhysicalCTEProducer::PopConvert(pop)->UlCTEId()); } - BOOL isMotion = CUtils::FPhysicalMotion(pop); - for (ULONG ul = 0; ul < pexpr->Arity(); ul++) { CExpression *pexprChild = (*pexpr)[ul]; - if (pexprChild->Pop()->FScalar()) { continue; } - - ULONG childSlice = curSlice; - if (isMotion) - { - (*pNextSlice)++; - childSlice = *pNextSlice; - } - - CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos, - consInfos); + CollectConsumersAndProducers(pexprChild, pbsConsumers, pbsProducers); } } +// True if some CTE Consumer beneath pexprMotion has no matching CTE Producer +// beneath the same Motion -- i.e. the Consumer reads data produced on the +// other side of the Motion (a different slice). static BOOL -FFoundCrossSlice(const CTEInfoArray *consInfos, const CTEInfoArray *prodInfos) +FHasUnpairedCTEConsumer(CMemoryPool *mp, CExpression *pexprMotion) { - for (ULONG ic = 0; ic < consInfos->Size(); ic++) - { - SCTEInfo *cons = (*consInfos)[ic]; + CBitSet *pbsConsumers = GPOS_NEW(mp) CBitSet(mp); + CBitSet *pbsProducers = GPOS_NEW(mp) CBitSet(mp); + + CollectConsumersAndProducers(pexprMotion, pbsConsumers, pbsProducers); - for (ULONG ip = 0; ip < prodInfos->Size(); ip++) + BOOL fUnpaired = false; + CBitSetIter bsiter(*pbsConsumers); + while (bsiter.Advance()) + { + if (!pbsProducers->Get(bsiter.Bit())) { - SCTEInfo *prod = (*prodInfos)[ip]; - if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId) - { - return true; - } + fUnpaired = true; + break; } } - return false; + + pbsConsumers->Release(); + pbsProducers->Release(); + + return fUnpaired; } +//--------------------------------------------------------------------------- +// @function: +// CUtils::FHasCrossSliceReplicatedCTEConsumer +// +// @doc: +// Detect a CTE Consumer placed beneath a duplicate-hazard Motion (a +// Motion whose input is strict-replicated / universal) whose Producer is +// on the other side of that Motion. That topology depends on the +// cross-slice shared-scan protocol and hangs at execution. +// +// Unlike a CTE referenced from a scalar subquery -- whose cross-slice +// Consumer is repaired locally by apply_shareinput_xslice -- this +// broadcast/duplicate-hazard topology cannot be repaired, so the caller +// falls back to the Postgres optimizer. +// +// Mirrors greengage 51fe92e: it deliberately does NOT trigger for the +// scalar-subquery case (whose Motions are plain Gather / Redistribute, +// not duplicate-hazard), leaving that case to apply_shareinput_xslice. +//--------------------------------------------------------------------------- BOOL CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) { @@ -1087,19 +1071,22 @@ CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr) return false; } - CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp); - CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp); - ULONG nextSlice = 0; - - CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos, - consInfos); - - BOOL cross = FFoundCrossSlice(consInfos, prodInfos); + if (CUtils::FPhysicalMotion(pexpr->Pop()) && + CUtils::FDuplicateHazardMotion(pexpr) && + FHasUnpairedCTEConsumer(mp, pexpr)) + { + return true; + } - prodInfos->Release(); - consInfos->Release(); + for (ULONG ul = 0; ul < pexpr->Arity(); ul++) + { + if (FHasCrossSliceReplicatedCTEConsumer(mp, (*pexpr)[ul])) + { + return true; + } + } - return cross; + return false; } //--------------------------------------------------------------------------- diff --git a/src/backend/optimizer/plan/orca.c b/src/backend/optimizer/plan/orca.c index cc5a70639cf..20797507e27 100644 --- a/src/backend/optimizer/plan/orca.c +++ b/src/backend/optimizer/plan/orca.c @@ -336,7 +336,7 @@ optimize_query(Query *parse, int cursorOptions, ParamListInfo boundParams, Optim collect_shareinput_producers(root, result->planTree); /* Post-process ShareInputScan nodes */ - (void) apply_shareinput_xslice(result->planTree, root); + (void) apply_shareinput_xslice(result->planTree, root, true /* is_orca */); /* * Fix ShareInputScans for EXPLAIN, like in standard_planner(). For all diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 58602135656..427167b4516 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -794,7 +794,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, * share input. need to mark material nodes that are split acrossed * multi slices. */ - top_plan = apply_shareinput_xslice(top_plan, root); + top_plan = apply_shareinput_xslice(top_plan, root, false /* is_orca */); } /* build the PlannedStmt result */ diff --git a/src/include/cdb/cdbmutate.h b/src/include/cdb/cdbmutate.h index f7e29e6e027..efd7d8a5d32 100644 --- a/src/include/cdb/cdbmutate.h +++ b/src/include/cdb/cdbmutate.h @@ -41,7 +41,7 @@ cdbmutate_warn_ctid_without_segid(struct PlannerInfo *root, struct RelOptInfo *r extern Plan *apply_shareinput_dag_to_tree(PlannerInfo *root, Plan *plan); extern void collect_shareinput_producers(PlannerInfo *root, Plan *plan); extern Plan *replace_shareinput_targetlists(PlannerInfo *root, Plan *plan); -extern Plan *apply_shareinput_xslice(Plan *plan, PlannerInfo *root); +extern Plan *apply_shareinput_xslice(Plan *plan, PlannerInfo *root, bool is_orca); extern List *getExprListFromTargetList(List *tlist, int numCols, AttrNumber *colIdx); extern void remove_unused_initplans(Plan *plan, PlannerInfo *root); diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index d5556d66c4b..c82892d33b6 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -140,6 +140,28 @@ typedef struct ApplyShareInputContext ApplyShareInputContextPerShare *shared_inputs; /* one for each share */ Bitmapset *qdShares; /* share_ids that are referenced from QD slices */ + bool is_orca_plan; /* true when post-processing an ORCA plan */ + bool walking_subplan; /* true while walking a SubPlan tree */ + + /* + * Track already-inlined cross-slice producers so that a second consumer + * of the same CTE in the same slice can share the inlined copy instead of + * creating yet another independent scan. + * + * Each entry maps (orig_share_id, slice) -> new_share_id. + */ + int *inlined_orig_ids; /* original share_id */ + int *inlined_mot_ids; /* slice (motId) where it was inlined */ + int *inlined_new_ids; /* new share_id of the inlined producer */ + int inlined_count; + + /* + * Consumer reference counts for original producers, used to detect + * orphaned producers after inlining (those with zero remaining consumers). + */ + int *consumer_counts; /* consumer count per producer */ + int orig_shared_input_count; /* shared_input_count before inlining */ + } ApplyShareInputContext; /*---------- diff --git a/src/test/regress/expected/qp_orca_fallback.out b/src/test/regress/expected/qp_orca_fallback.out index 299d2333773..802f8267ab6 100644 --- a/src/test/regress/expected/qp_orca_fallback.out +++ b/src/test/regress/expected/qp_orca_fallback.out @@ -314,70 +314,24 @@ INSERT INTO tbl1 (iscalctrg, iscalcdetail) ANALYZE tbl1; ANALYZE tbl2; -- end_ignore --- Case 1: walker triggers fallback. With scalar subqueries on the CTE --- ORCA produces a plan whose CTE Producer is replicated and Consumers --- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported --- and trace_fallback DETAIL says "CTE Consumer placed on a different --- slice than its replicated Producer". -EXPLAIN (COSTS OFF) -WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.iscalctrg, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 -FROM tbl1 p -LIMIT 1; - QUERY PLAN ----------------------------------------------------------------------------------------- - Gather Motion 1:1 (slice1; segments: 1) - -> Limit - -> Seq Scan on tbl1 p - SubPlan 1 - -> Limit - -> Result - Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text) - -> Materialize - -> Seq Scan on tbl2 - Filter: (referenceid = '101991'::numeric) - SubPlan 2 - -> Limit - -> Result - Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text) - -> Materialize - -> Seq Scan on tbl2 tbl2_1 - Filter: (referenceid = '101991'::numeric) - Optimizer: Postgres query optimizer -(18 rows) - --- Case 2: walker correctly stays silent. The same CTE referenced from a --- JOIN: ORCA pins the Producer body to a single segment with a One-Time --- Filter (gp_execution_segment() = N), so the Producer's child --- distribution is EdtSingleton, not replicated -- the walker skips it. -EXPLAIN (COSTS OFF) -WITH t1 AS (SELECT * FROM tbl1), - t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.* FROM t1 p - JOIN t2 r ON p.iscalctrg = r.refrcode - JOIN t2 r1 ON p.iscalcdetail = r1.refrcode -LIMIT 1; - QUERY PLAN ---------------------------------------------------------------------------------- - Gather Motion 1:1 (slice1; segments: 1) - -> Limit - -> Hash Join - Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text) - -> Hash Join - Hash Cond: ((tbl2.refrcode)::text = (tbl1.iscalctrg)::text) - -> Seq Scan on tbl2 - Filter: (referenceid = '101991'::numeric) - -> Hash - -> Seq Scan on tbl1 - -> Hash - -> Subquery Scan on r1 - -> Seq Scan on tbl2 tbl2_1 - Filter: (referenceid = '101991'::numeric) - Optimizer: Postgres query optimizer -(15 rows) +-- The native scalar-subquery case and the silent JOIN case live in the +-- shared_scan test. A replicated CTE joined to a DISTRIBUTED table: ORCA pins +-- the producer to one segment here too, so it is handled natively (no fallback) +-- and the result is correct. +CREATE TABLE dist_t (a int, b int) DISTRIBUTED BY (a); +INSERT INTO dist_t SELECT i, i % 5 FROM generate_series(1, 50000) i; +ANALYZE dist_t; +WITH cte AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT count(*) +FROM dist_t d + JOIN cte c1 ON d.b = c1.id + JOIN cte c2 ON d.a = c2.id; + count +------- + 40000 +(1 row) +DROP TABLE dist_t; DROP TABLE tbl1, tbl2; -- start_ignore -- FIXME: gpcheckcat fails due to mismatching distribution policy if this table isn't dropped diff --git a/src/test/regress/expected/qp_orca_fallback_optimizer.out b/src/test/regress/expected/qp_orca_fallback_optimizer.out index c27a4cbff7f..0265422870f 100644 --- a/src/test/regress/expected/qp_orca_fallback_optimizer.out +++ b/src/test/regress/expected/qp_orca_fallback_optimizer.out @@ -374,83 +374,24 @@ INSERT INTO tbl1 (iscalctrg, iscalcdetail) ANALYZE tbl1; ANALYZE tbl2; -- end_ignore --- Case 1: walker triggers fallback. With scalar subqueries on the CTE --- ORCA produces a plan whose CTE Producer is replicated and Consumers --- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported --- and trace_fallback DETAIL says "CTE Consumer placed on a different --- slice than its replicated Producer". -EXPLAIN (COSTS OFF) -WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.iscalctrg, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 -FROM tbl1 p -LIMIT 1; -INFO: GPORCA failed to produce a plan, falling back to Postgres-based planner -DETAIL: Falling back to Postgres-based planner because GPORCA does not support the following feature: CTE Consumer placed on a different slice than its replicated Producer - QUERY PLAN ----------------------------------------------------------------------------------------- - Gather Motion 1:1 (slice1; segments: 1) - -> Limit - -> Seq Scan on tbl1 p - SubPlan 1 - -> Limit - -> Result - Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text) - -> Materialize - -> Seq Scan on tbl2 - Filter: (referenceid = '101991'::numeric) - SubPlan 2 - -> Limit - -> Result - Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text) - -> Materialize - -> Seq Scan on tbl2 tbl2_1 - Filter: (referenceid = '101991'::numeric) - Optimizer: Postgres query optimizer -(18 rows) - --- Case 2: walker correctly stays silent. The same CTE referenced from a --- JOIN: ORCA pins the Producer body to a single segment with a One-Time --- Filter (gp_execution_segment() = N), so the Producer's child --- distribution is EdtSingleton, not replicated -- the walker skips it. -EXPLAIN (COSTS OFF) -WITH t1 AS (SELECT * FROM tbl1), - t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.* FROM t1 p - JOIN t2 r ON p.iscalctrg = r.refrcode - JOIN t2 r1 ON p.iscalcdetail = r1.refrcode -LIMIT 1; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) - -> Sequence - -> Shared Scan (share slice:id 1:1) - -> Result - Filter: (tbl2.referenceid = '101991'::numeric) - -> Result - One-Time Filter: (gp_execution_segment() = 1) - -> Seq Scan on tbl2 - -> Redistribute Motion 1:3 (slice2) - -> Limit - -> Gather Motion 3:1 (slice3; segments: 3) - -> Limit - -> Hash Join - Hash Cond: ((tbl1.iscalctrg)::text = (share1_ref2.refrcode)::text) - -> Hash Join - Hash Cond: ((tbl1.iscalcdetail)::text = (share1_ref3.refrcode)::text) - -> Result - -> Seq Scan on tbl1 - -> Hash - -> Redistribute Motion 3:3 (slice4; segments: 3) - Hash Key: share1_ref3.refrcode - -> Shared Scan (share slice:id 4:1) - -> Hash - -> Broadcast Motion 3:3 (slice5; segments: 3) - -> Shared Scan (share slice:id 5:1) - Optimizer: GPORCA -(26 rows) +-- The native scalar-subquery case and the silent JOIN case live in the +-- shared_scan test. A replicated CTE joined to a DISTRIBUTED table: ORCA pins +-- the producer to one segment here too, so it is handled natively (no fallback) +-- and the result is correct. +CREATE TABLE dist_t (a int, b int) DISTRIBUTED BY (a); +INSERT INTO dist_t SELECT i, i % 5 FROM generate_series(1, 50000) i; +ANALYZE dist_t; +WITH cte AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT count(*) +FROM dist_t d + JOIN cte c1 ON d.b = c1.id + JOIN cte c2 ON d.a = c2.id; + count +------- + 40000 +(1 row) +DROP TABLE dist_t; DROP TABLE tbl1, tbl2; -- start_ignore -- FIXME: gpcheckcat fails due to mismatching distribution policy if this table isn't dropped diff --git a/src/test/regress/expected/shared_scan.out b/src/test/regress/expected/shared_scan.out index 83a3815fb0e..d83234ff09d 100644 --- a/src/test/regress/expected/shared_scan.out +++ b/src/test/regress/expected/shared_scan.out @@ -234,8 +234,12 @@ where Optimizer: Postgres query optimizer (37 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -252,6 +256,42 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +-------------------------------------------------- + Limit + InitPlan 1 (returns $0) (slice2) + -> Gather Motion 1:1 (slice3; segments: 1) + -> Seq Scan on ss_t2 + Filter: (id = 1) + InitPlan 2 (returns $1) (slice4) + -> Gather Motion 1:1 (slice5; segments: 1) + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 2) + InitPlan 3 (returns $2) (slice6) + -> Gather Motion 1:1 (slice7; segments: 1) + -> Seq Scan on ss_t2 ss_t2_2 + Filter: (id = 1) + InitPlan 4 (returns $3) (slice8) + -> Gather Motion 1:1 (slice9; segments: 1) + -> Seq Scan on ss_t2 ss_t2_3 + Filter: (id = 2) + -> Gather Motion 3:1 (slice1; segments: 3) + -> Limit + -> Seq Scan on ss_t1 + Optimizer: Postgres query optimizer +(21 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), @@ -265,5 +305,181 @@ WITH 60 (1 row) +RESET statement_timeout; +-- Walker coverage: a single replicated CTE referenced from three scalar +-- subqueries. The first cross-slice consumer is inlined into a local Shared +-- Scan producer; the second and third reuse that inlined copy (the reuse path +-- in shareinput_inline_replicated_consumer), so ss_t2 is scanned exactly once +-- and the orphaned original producer is dropped by cleanup_orphaned_producers. +-- The exact plan text is not pinned (it depends on the optimizer); we only +-- assert that the walker produces a correct, non-hanging plan. cte1.v = 10, so +-- the result is 10 * 3 = 30. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte1) + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 30 +(1 row) + +RESET statement_timeout; +-- Walker coverage: two replicated CTEs, each referenced an odd number of times, +-- interleaved in the same expression. Exercises building two independent +-- inlined producers in the same slice and reusing each. cte1.v = 10, cte2.v = 20, +-- so the result is 10 + 20 + 10 + 20 + 10 = 70. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 70 +(1 row) + +RESET statement_timeout; +-- Walker coverage: a non-replicated CTE referenced from scalar subqueries must +-- NOT be inlined (the leaf scan is not over a replicated table), so the normal +-- cross-slice shared-scan path is taken and the result is still correct. +-- cte1.id = 1, so the result is 1 + 1 = 2. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT id FROM ss_t1 WHERE id = 1) + SELECT (SELECT id FROM cte1) + (SELECT id FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 2 +(1 row) + RESET statement_timeout; DROP TABLE ss_t1, ss_t2; +-- The walker that detects a CTE Consumer on a different slice than its +-- replicated Producer. Without it ORCA would emit a plan with cross-slice +-- replicated CTE Consumers that hangs at execution. +-- start_ignore +DROP TABLE IF EXISTS tbl1, tbl2; +NOTICE: table "tbl1" does not exist, skipping +NOTICE: table "tbl2" does not exist, skipping +-- end_ignore +CREATE TABLE tbl2 (id numeric, refrcode varchar(255), referenceid numeric) +DISTRIBUTED REPLICATED; +CREATE TABLE tbl1 (id bigserial, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) +DISTRIBUTED REPLICATED; +-- start_ignore +INSERT INTO tbl2 SELECT i, 'A'||(i%5), 101991 + FROM generate_series(1, 50000) i; +INSERT INTO tbl1 (iscalctrg, iscalcdetail) + SELECT 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE tbl1; +ANALYZE tbl2; +-- end_ignore +-- Case 1: handled natively (no fallback). With scalar subqueries on the CTE, +-- ORCA produces a plan whose replicated CTE Producer and Consumers live on +-- different slices. apply_shareinput_xslice now materializes the replicated +-- CTE locally in each consumer slice (Shared Scan over a per-slice copy) +-- instead of a hanging cross-slice shared scan, so ORCA no longer falls back. +EXPLAIN (COSTS OFF) +WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.iscalctrg, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 +FROM tbl1 p +LIMIT 1; + QUERY PLAN +---------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + -> Limit + -> Seq Scan on tbl1 p + SubPlan 1 + -> Limit + -> Result + Filter: ((tbl2.refrcode)::text = (p.iscalctrg)::text) + -> Materialize + -> Seq Scan on tbl2 + Filter: (referenceid = '101991'::numeric) + SubPlan 2 + -> Limit + -> Result + Filter: ((tbl2_1.refrcode)::text = (p.iscalcdetail)::text) + -> Materialize + -> Seq Scan on tbl2 tbl2_1 + Filter: (referenceid = '101991'::numeric) + Optimizer: Postgres query optimizer +(18 rows) + +-- Case 2: walker correctly stays silent. The same CTE referenced from a +-- JOIN: ORCA pins the Producer body to a single segment with a One-Time +-- Filter (gp_execution_segment() = N), so the Producer's child +-- distribution is EdtSingleton, not replicated -- the walker skips it. +EXPLAIN (COSTS OFF) +WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode +LIMIT 1; + QUERY PLAN +--------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + -> Limit + -> Hash Join + Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text) + -> Hash Join + Hash Cond: ((tbl2.refrcode)::text = (tbl1.iscalctrg)::text) + -> Seq Scan on tbl2 + Filter: (referenceid = '101991'::numeric) + -> Hash + -> Seq Scan on tbl1 + -> Hash + -> Subquery Scan on r1 + -> Seq Scan on tbl2 tbl2_1 + Filter: (referenceid = '101991'::numeric) + Optimizer: Postgres query optimizer +(15 rows) + +SET statement_timeout = '30s'; +SELECT count(*) AS n FROM ( + WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) + SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode + LIMIT 1) s; + n +--- + 1 +(1 row) + +RESET statement_timeout; +-- Case 3: a replicated CTE built with UNION ALL has an Append at the root of +-- the producer subtree (children are in appendplans, not lefttree), which the +-- old single-leaf inline check missed -- the cross-slice shared scan then hung. +-- shareinput_subtree_is_replicated now recognizes the whole replicated subtree +-- and materializes it locally. Correlated subqueries keep the SubPlan shape; +-- p.id = 1 makes it deterministic (iscalctrg = iscalcdetail = 'A1'), so ok = t. +SET statement_timeout = '30s'; +WITH cte AS ( + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id >= 25000 +) +SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok +FROM tbl1 p WHERE p.id = 1; + ok +---- + t +(1 row) + +RESET statement_timeout; +DROP TABLE tbl1, tbl2; diff --git a/src/test/regress/expected/shared_scan_optimizer.out b/src/test/regress/expected/shared_scan_optimizer.out index e71dfafe035..0383919a88e 100644 --- a/src/test/regress/expected/shared_scan_optimizer.out +++ b/src/test/regress/expected/shared_scan_optimizer.out @@ -242,8 +242,12 @@ where Optimizer: Postgres query optimizer (37 rows) --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -255,6 +259,41 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; + QUERY PLAN +----------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) + -> Sequence + -> Broadcast Motion 1:1 (slice2) + -> Limit + -> Gather Motion 3:1 (slice3; segments: 3) + -> Limit + -> Seq Scan on ss_t1 + SubPlan 1 + -> Shared Scan (share slice:id 3:2) + -> Seq Scan on ss_t2 + Filter: (id = 1) + SubPlan 2 + -> Shared Scan (share slice:id 3:3) + -> Seq Scan on ss_t2 ss_t2_1 + Filter: (id = 2) + SubPlan 3 + -> Shared Scan (share slice:id 3:2) + SubPlan 4 + -> Shared Scan (share slice:id 3:3) + Optimizer: Pivotal Optimizer (GPORCA) +(20 rows) + +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), @@ -268,5 +307,194 @@ WITH 60 (1 row) +RESET statement_timeout; +-- Walker coverage: a single replicated CTE referenced from three scalar +-- subqueries. The first cross-slice consumer is inlined into a local Shared +-- Scan producer; the second and third reuse that inlined copy (the reuse path +-- in shareinput_inline_replicated_consumer), so ss_t2 is scanned exactly once +-- and the orphaned original producer is dropped by cleanup_orphaned_producers. +-- The exact plan text is not pinned (it depends on the optimizer); we only +-- assert that the walker produces a correct, non-hanging plan. cte1.v = 10, so +-- the result is 10 * 3 = 30. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte1) + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 30 +(1 row) + +RESET statement_timeout; +-- Walker coverage: two replicated CTEs, each referenced an odd number of times, +-- interleaved in the same expression. Exercises building two independent +-- inlined producers in the same slice and reusing each. cte1.v = 10, cte2.v = 20, +-- so the result is 10 + 20 + 10 + 20 + 10 = 70. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 70 +(1 row) + +RESET statement_timeout; +-- Walker coverage: a non-replicated CTE referenced from scalar subqueries must +-- NOT be inlined (the leaf scan is not over a replicated table), so the normal +-- cross-slice shared-scan path is taken and the result is still correct. +-- cte1.id = 1, so the result is 1 + 1 = 2. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT id FROM ss_t1 WHERE id = 1) + SELECT (SELECT id FROM cte1) + (SELECT id FROM cte1) AS result + FROM ss_t1 + LIMIT 1; + result +-------- + 2 +(1 row) + RESET statement_timeout; DROP TABLE ss_t1, ss_t2; +-- The walker that detects a CTE Consumer on a different slice than its +-- replicated Producer. Without it ORCA would emit a plan with cross-slice +-- replicated CTE Consumers that hangs at execution. +-- start_ignore +DROP TABLE IF EXISTS tbl1, tbl2; +NOTICE: table "tbl1" does not exist, skipping +NOTICE: table "tbl2" does not exist, skipping +-- end_ignore +CREATE TABLE tbl2 (id numeric, refrcode varchar(255), referenceid numeric) +DISTRIBUTED REPLICATED; +CREATE TABLE tbl1 (id bigserial, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) +DISTRIBUTED REPLICATED; +-- start_ignore +INSERT INTO tbl2 SELECT i, 'A'||(i%5), 101991 + FROM generate_series(1, 50000) i; +INSERT INTO tbl1 (iscalctrg, iscalcdetail) + SELECT 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE tbl1; +ANALYZE tbl2; +-- end_ignore +-- Case 1: handled natively (no fallback). With scalar subqueries on the CTE, +-- ORCA produces a plan whose replicated CTE Producer and Consumers live on +-- different slices. apply_shareinput_xslice now materializes the replicated +-- CTE locally in each consumer slice (Shared Scan over a per-slice copy) +-- instead of a hanging cross-slice shared scan, so ORCA no longer falls back. +EXPLAIN (COSTS OFF) +WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.iscalctrg, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 +FROM tbl1 p +LIMIT 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + -> Sequence + -> Redistribute Motion 1:3 (slice2) + -> Limit + -> Gather Motion 1:1 (slice3; segments: 1) + -> Limit + -> Seq Scan on tbl1 p + SubPlan 1 + -> Limit + -> Result + Filter: ((share1_ref1.refrcode)::text = (p.iscalctrg)::text) + -> Shared Scan (share slice:id 3:1) + -> Seq Scan on tbl2 + Filter: (referenceid = '101991'::numeric) + SubPlan 2 + -> Limit + -> Result + Filter: ((share1_ref2.refrcode)::text = (p.iscalcdetail)::text) + -> Shared Scan (share slice:id 3:1) + Optimizer: Pivotal Optimizer (GPORCA) +(20 rows) + +-- Case 2: walker correctly stays silent. The same CTE referenced from a +-- JOIN: ORCA pins the Producer body to a single segment with a One-Time +-- Filter (gp_execution_segment() = N), so the Producer's child +-- distribution is EdtSingleton, not replicated -- the walker skips it. +EXPLAIN (COSTS OFF) +WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode +LIMIT 1; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> Sequence + -> Shared Scan (share slice:id 1:1) + -> Result + Filter: (tbl2.referenceid = '101991'::numeric) + -> Result + One-Time Filter: (gp_execution_segment() = 1) + -> Seq Scan on tbl2 + -> Redistribute Motion 1:3 (slice2) + -> Limit + -> Gather Motion 3:1 (slice3; segments: 3) + -> Limit + -> Hash Join + Hash Cond: ((tbl1.iscalctrg)::text = (share1_ref2.refrcode)::text) + -> Hash Join + Hash Cond: ((tbl1.iscalcdetail)::text = (share1_ref3.refrcode)::text) + -> Result + -> Seq Scan on tbl1 + -> Hash + -> Redistribute Motion 3:3 (slice4; segments: 3) + Hash Key: share1_ref3.refrcode + -> Shared Scan (share slice:id 4:1) + -> Hash + -> Broadcast Motion 3:3 (slice5; segments: 3) + -> Shared Scan (share slice:id 5:1) + Optimizer: GPORCA +(26 rows) + +SET statement_timeout = '30s'; +SELECT count(*) AS n FROM ( + WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) + SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode + LIMIT 1) s; + n +--- + 1 +(1 row) + +RESET statement_timeout; +-- Case 3: a replicated CTE built with UNION ALL has an Append at the root of +-- the producer subtree (children are in appendplans, not lefttree), which the +-- old single-leaf inline check missed -- the cross-slice shared scan then hung. +-- shareinput_subtree_is_replicated now recognizes the whole replicated subtree +-- and materializes it locally. Correlated subqueries keep the SubPlan shape; +-- p.id = 1 makes it deterministic (iscalctrg = iscalcdetail = 'A1'), so ok = t. +SET statement_timeout = '30s'; +WITH cte AS ( + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id >= 25000 +) +SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok +FROM tbl1 p WHERE p.id = 1; + ok +---- + t +(1 row) + +RESET statement_timeout; +DROP TABLE tbl1, tbl2; diff --git a/src/test/regress/sql/qp_orca_fallback.sql b/src/test/regress/sql/qp_orca_fallback.sql index ecb892c2aa7..ec8c85779f8 100644 --- a/src/test/regress/sql/qp_orca_fallback.sql +++ b/src/test/regress/sql/qp_orca_fallback.sql @@ -141,31 +141,19 @@ ANALYZE tbl1; ANALYZE tbl2; -- end_ignore --- Case 1: walker triggers fallback. With scalar subqueries on the CTE --- ORCA produces a plan whose CTE Producer is replicated and Consumers --- live on a different slice -- the walker raises ExmiExpr2DXLUnsupported --- and trace_fallback DETAIL says "CTE Consumer placed on a different --- slice than its replicated Producer". -EXPLAIN (COSTS OFF) -WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.iscalctrg, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, - (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 -FROM tbl1 p -LIMIT 1; - --- Case 2: walker correctly stays silent. The same CTE referenced from a --- JOIN: ORCA pins the Producer body to a single segment with a One-Time --- Filter (gp_execution_segment() = N), so the Producer's child --- distribution is EdtSingleton, not replicated -- the walker skips it. -EXPLAIN (COSTS OFF) -WITH t1 AS (SELECT * FROM tbl1), - t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) -SELECT p.* FROM t1 p - JOIN t2 r ON p.iscalctrg = r.refrcode - JOIN t2 r1 ON p.iscalcdetail = r1.refrcode -LIMIT 1; - +-- The native scalar-subquery case and the silent JOIN case live in the +-- shared_scan test. A replicated CTE joined to a DISTRIBUTED table: ORCA pins +-- the producer to one segment here too, so it is handled natively (no fallback) +-- and the result is correct. +CREATE TABLE dist_t (a int, b int) DISTRIBUTED BY (a); +INSERT INTO dist_t SELECT i, i % 5 FROM generate_series(1, 50000) i; +ANALYZE dist_t; +WITH cte AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT count(*) +FROM dist_t d + JOIN cte c1 ON d.b = c1.id + JOIN cte c2 ON d.a = c2.id; +DROP TABLE dist_t; DROP TABLE tbl1, tbl2; -- start_ignore diff --git a/src/test/regress/sql/shared_scan.sql b/src/test/regress/sql/shared_scan.sql index 80b4a1d52c7..6a9978800b6 100644 --- a/src/test/regress/sql/shared_scan.sql +++ b/src/test/regress/sql/shared_scan.sql @@ -121,8 +121,12 @@ where and (stat.schema_name || '.' ||stat.table_name not in (select table_nm_onl_act from tbls_w_onl_actl_data)) or (stat.schema_name || '.' ||stat.table_name in (select table_nm_onl_act from tbls_w_onl_actl_data)); --- ORCA should fallback when a CTE over a replicated table is referenced --- from multiple scalar subqueries. +-- A CTE over a replicated table referenced from multiple scalar subqueries +-- used to hang: ORCA placed the SharedScan consumer on a different slice than +-- the producer and the cross-slice temp-file protocol cannot handle that +-- topology. ORCA now force-inlines a replicated-table CTE (the data is on +-- every segment, so a local copy per consumer is equivalent), producing a +-- correct native plan instead of a cross-slice shared scan. -- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan; -- with fewer rows the bug does not manifest and the test would silently -- pass even without the fix. @@ -137,7 +141,17 @@ CREATE TABLE ss_t2 AS DISTRIBUTED REPLICATED; ANALYZE ss_t1; ANALYZE ss_t2; - +-- Plan: the replicated CTE is materialized once into a local Shared Scan +-- co-located with its consumers, and the repeated reference reuses that copy, +-- so ss_t2 is scanned once per CTE -- no cross-slice SharedScan, no duplicates. +EXPLAIN (COSTS OFF) WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) AS result + FROM ss_t1 + LIMIT 1; +-- Run it under a timeout to prove it no longer hangs. SET statement_timeout = '15s'; WITH cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), @@ -147,4 +161,120 @@ WITH FROM ss_t1 LIMIT 1; RESET statement_timeout; + +-- Walker coverage: a single replicated CTE referenced from three scalar +-- subqueries. The first cross-slice consumer is inlined into a local Shared +-- Scan producer; the second and third reuse that inlined copy (the reuse path +-- in shareinput_inline_replicated_consumer), so ss_t2 is scanned exactly once +-- and the orphaned original producer is dropped by cleanup_orphaned_producers. +-- The exact plan text is not pinned (it depends on the optimizer); we only +-- assert that the walker produces a correct, non-hanging plan. cte1.v = 10, so +-- the result is 10 * 3 = 30. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte1) + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; +RESET statement_timeout; + +-- Walker coverage: two replicated CTEs, each referenced an odd number of times, +-- interleaved in the same expression. Exercises building two independent +-- inlined producers in the same slice and reusing each. cte1.v = 10, cte2.v = 20, +-- so the result is 10 + 20 + 10 + 20 + 10 = 70. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT v FROM ss_t2 WHERE id = 1), + cte2 AS (SELECT v FROM ss_t2 WHERE id = 2) + SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) + (SELECT v FROM cte2) + + (SELECT v FROM cte1) AS result + FROM ss_t1 + LIMIT 1; +RESET statement_timeout; + +-- Walker coverage: a non-replicated CTE referenced from scalar subqueries must +-- NOT be inlined (the leaf scan is not over a replicated table), so the normal +-- cross-slice shared-scan path is taken and the result is still correct. +-- cte1.id = 1, so the result is 1 + 1 = 2. +SET statement_timeout = '15s'; +WITH + cte1 AS (SELECT id FROM ss_t1 WHERE id = 1) + SELECT (SELECT id FROM cte1) + (SELECT id FROM cte1) AS result + FROM ss_t1 + LIMIT 1; +RESET statement_timeout; + DROP TABLE ss_t1, ss_t2; + +-- The walker that detects a CTE Consumer on a different slice than its +-- replicated Producer. Without it ORCA would emit a plan with cross-slice +-- replicated CTE Consumers that hangs at execution. +-- start_ignore +DROP TABLE IF EXISTS tbl1, tbl2; +-- end_ignore +CREATE TABLE tbl2 (id numeric, refrcode varchar(255), referenceid numeric) +DISTRIBUTED REPLICATED; +CREATE TABLE tbl1 (id bigserial, iscalctrg varchar(15) NOT NULL, + iscalcdetail varchar(15)) +DISTRIBUTED REPLICATED; +-- start_ignore +INSERT INTO tbl2 SELECT i, 'A'||(i%5), 101991 + FROM generate_series(1, 50000) i; +INSERT INTO tbl1 (iscalctrg, iscalcdetail) + SELECT 'A'||(i%5), 'A'||(i%7) FROM generate_series(1, 50000) i; +ANALYZE tbl1; +ANALYZE tbl2; +-- end_ignore + +-- Case 1: handled natively (no fallback). With scalar subqueries on the CTE, +-- ORCA produces a plan whose replicated CTE Producer and Consumers live on +-- different slices. apply_shareinput_xslice now materializes the replicated +-- CTE locally in each consumer slice (Shared Scan over a per-slice copy) +-- instead of a hanging cross-slice shared scan, so ORCA no longer falls back. +EXPLAIN (COSTS OFF) +WITH t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.iscalctrg, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalctrg LIMIT 1) AS r, + (SELECT refrcode FROM t2 WHERE refrcode = p.iscalcdetail LIMIT 1) AS r1 +FROM tbl1 p +LIMIT 1; + +-- Case 2: walker correctly stays silent. The same CTE referenced from a +-- JOIN: ORCA pins the Producer body to a single segment with a One-Time +-- Filter (gp_execution_segment() = N), so the Producer's child +-- distribution is EdtSingleton, not replicated -- the walker skips it. +EXPLAIN (COSTS OFF) +WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) +SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode +LIMIT 1; +SET statement_timeout = '30s'; +SELECT count(*) AS n FROM ( + WITH t1 AS (SELECT * FROM tbl1), + t2 AS (SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991) + SELECT p.* FROM t1 p + JOIN t2 r ON p.iscalctrg = r.refrcode + JOIN t2 r1 ON p.iscalcdetail = r1.refrcode + LIMIT 1) s; +RESET statement_timeout; + +-- Case 3: a replicated CTE built with UNION ALL has an Append at the root of +-- the producer subtree (children are in appendplans, not lefttree), which the +-- old single-leaf inline check missed -- the cross-slice shared scan then hung. +-- shareinput_subtree_is_replicated now recognizes the whole replicated subtree +-- and materializes it locally. Correlated subqueries keep the SubPlan shape; +-- p.id = 1 makes it deterministic (iscalctrg = iscalcdetail = 'A1'), so ok = t. +SET statement_timeout = '30s'; +WITH cte AS ( + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id < 25000 + UNION ALL + SELECT id, refrcode FROM tbl2 WHERE referenceid = 101991 AND id >= 25000 +) +SELECT (SELECT refrcode FROM cte WHERE refrcode = p.iscalctrg LIMIT 1) = 'A1' + AND (SELECT refrcode FROM cte WHERE refrcode = p.iscalcdetail LIMIT 1) = 'A1' AS ok +FROM tbl1 p WHERE p.id = 1; +RESET statement_timeout; +DROP TABLE tbl1, tbl2;