Skip to content

[SPARK-56632][SQL][CONNECT] Fix AMBIGUOUS_COLUMN_REFERENCE regression for reused DataFrame in natural join#55556

Draft
zhengruifeng wants to merge 1 commit intoapache:masterfrom
zhengruifeng:pick-metadata-col-resolution
Draft

[SPARK-56632][SQL][CONNECT] Fix AMBIGUOUS_COLUMN_REFERENCE regression for reused DataFrame in natural join#55556
zhengruifeng wants to merge 1 commit intoapache:masterfrom
zhengruifeng:pick-metadata-col-resolution

Conversation

@zhengruifeng
Copy link
Copy Markdown
Contributor

@zhengruifeng zhengruifeng commented Apr 27, 2026

What changes were proposed in this pull request?

Fix an AMBIGUOUS_COLUMN_REFERENCE regression introduced by SPARK-55070 when a DataFrame is referenced both directly in a join and also nested under a natural/USING join elsewhere in the same plan.

Replace the single broadened ancestor walk in resolveDataFrameColumn with a two-walk pattern, mirroring the outputAttributes.resolve(...) orElse outputMetadataAttributes.resolve(...) precedence used by LogicalPlan.resolve / LogicalPlan.resolveChildren:

  • Metadata access (df["_metadata"], IS_METADATA_COL tagged): a single walk filtered by p.metadataOutput.
  • Regular access (df["col"]): walk first with the strict filter p.outputSet. That drops candidates hidden at an ancestor — e.g. the right side's join key after a natural/USING join. If strict resolves, use it. Otherwise retry with the broad filter p.output ++ p.metadataOutput to handle the SPARK-55070 rhs["join_key"] case where the only valid resolution is via p.metadataOutput.

The filter choice is threaded as a getAllowed: LogicalPlan => AttributeSet argument through resolveDataFrameColumnByPlanId / resolveDataFrameColumnRecursively; no change to the foldLeft merge logic.

Why are the changes needed?

SPARK-55070 broadened the ancestor filter in resolveDataFrameColumnRecursively from p.outputSet to p.output ++ p.metadataOutput so that rhs["join_key"] works after a natural/USING join (where one join key is hidden in Project.hiddenOutputTag). But when the same DataFrame is both used directly in a join and also nested under a natural/USING-join wrapper elsewhere in the plan, the broadened filter lets both candidates through resolveDataFrameColumnByPlanId's merge, tripping throw ambiguousColumnReferences(u).

For example, queries like:

enriched = events.join(dim, "dim_id", "left")   # USING join hides dim's dim_id
result = (fact
  .join(dim, fact["fk"] == dim["dim_id"], "left")  # direct use of dim
  .join(enriched, "txn_id", "full_outer")
  .select(dim["dim_id"]))                          # previously AMBIGUOUS

now resolve dim["dim_id"] to the direct-usage output candidate.

Does this PR introduce any user-facing change?

Yes — bug fix. Queries that referenced a DataFrame both directly in a join and nested under a natural/USING join (where the wrapper hides one of the columns into metadataOutput) previously raised AMBIGUOUS_COLUMN_REFERENCE. They now resolve to the direct-usage candidate.

How was this patch tested?

  • New test_select_regular_column_with_reused_dataframe_hidden_in_natural_join added to ColumnTestsMixin in python/pyspark/sql/tests/test_column.py.
  • Existing pyspark column-resolution tests should keep passing, including test_self_join, test_self_join_II/III/IV, and test_select_join_keys.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.7)

@zhengruifeng zhengruifeng changed the title [SPARK-XXXXX][SQL][CONNECT] Prefer output match over hidden-column match in DataFrame column resolution [SPARK-56632][SQL][CONNECT] Prefer output match over hidden-column match in DataFrame column resolution Apr 27, 2026
@zhengruifeng zhengruifeng force-pushed the pick-metadata-col-resolution branch 2 times, most recently from b086d2d to f365869 Compare April 27, 2026 08:24
@zhengruifeng zhengruifeng marked this pull request as ready for review April 27, 2026 10:17
@zhengruifeng zhengruifeng requested a review from cloud-fan April 27, 2026 10:17
@zhengruifeng zhengruifeng force-pushed the pick-metadata-col-resolution branch 2 times, most recently from 3162326 to ddf46ff Compare April 28, 2026 01:36
@zhengruifeng zhengruifeng changed the title [SPARK-56632][SQL][CONNECT] Prefer output match over hidden-column match in DataFrame column resolution [SPARK-56632][SQL][CONNECT] Fix AMBIGUOUS_COLUMN_REFERENCE regression for reused DataFrame in natural join Apr 28, 2026
@zhengruifeng zhengruifeng force-pushed the pick-metadata-col-resolution branch from ddf46ff to c34827a Compare April 28, 2026 01:38
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem. SPARK-55070 fixed a regression introduced by SPARK-53503 by broadening the ancestor filter in resolveDataFrameColumnRecursively from p.outputSet to p.output ++ p.metadataOutput for regular plan-id-tagged column resolution. The broad filter is needed for rhs["join_key"] after a natural/USING join, where the right-side join key is hidden into Project.hiddenOutputTag (metadataOutput). But the broad filter also accepts candidates that should be rejected: when the same DataFrame appears both directly in a join and nested inside a natural/USING-join wrapper that hides its columns, both candidates pass at every ancestor and resolveDataFrameColumnByPlanId's foldLeft throws AMBIGUOUS_COLUMN_REFERENCE.

Design approach. Replace the single broadened walk with a strict-then-broad two-pass pattern, mirroring the output-then-metadata precedence in LogicalPlan.resolve (outputAttributes.resolve orElse outputMetadataAttributes.resolve):

  • Regular access: walk first with strict p.outputSet. That drops candidates hidden at an ancestor (the reused-DataFrame case). If strict resolves, use it. Otherwise retry with broad p.output ++ p.metadataOutput to handle the SPARK-55070 rhs["join_key"] case where the only valid resolution is via metadata.
  • Metadata access (IS_METADATA_COL): a single walk filtered by p.metadataOutput.

The filter is threaded as a getAllowed: LogicalPlan => AttributeSet through resolveDataFrameColumnByPlanId and resolveDataFrameColumnRecursively; the foldLeft merge logic is unchanged.

Key design decisions.

  • Strict-first precedence: when both a directly-visible and a metadata-only candidate exist for the same plan-id, prefer the directly-visible one. This matches LogicalPlan.resolve's output-before-metadata preference.
  • Fallback uses union, not metadata-only: the broad fallback uses p.output ++ p.metadataOutput rather than p.metadataOutput. Necessary because each candidate is filtered at every ancestor in the walk; metadata-only at the fallback would still reject the SPARK-55070 candidate at the matched node.
  • Metadata-access filter narrowed from p.output ++ p.metadataOutput (post-SPARK-55070) to p.metadataOutput. This is a quiet behavior change — see inline.

Implementation sketch. All changes are in ColumnResolutionHelper. resolveDataFrameColumn becomes the strategy split (metadata vs. regular; regular runs strict-then-broad). resolveDataFrameColumnByPlanId and resolveDataFrameColumnRecursively gain a getAllowed parameter. Star resolution (resolveDataFrameStarRecursively) is left unchanged — see general comment below.

General comment

The same regression pattern appears reachable on the star path. resolveDataFrameStarRecursively (lines 672–685, unchanged) still uses the single broad filter p.output ++ p.metadataOutput. If the test from this PR is repeated with dim["*"] instead of dim["dim_id"], the direct-dim and nested-under-USING-dim star candidates both pass the broad filter at every ancestor, and resolveDataFrameStarByPlanId's if (r1.nonEmpty && r2.nonEmpty) throw ambiguous would fire. Could you either thread the same strict-then-broad pattern through the star path, or, if the star case isn't user-reachable, document why?

u, planId, isMetadataAccess, q, 0)
val (resolved, matched) = if (u.containsTag(LogicalPlan.IS_METADATA_COL)) {
// Metadata access (e.g. `df["_metadata"]`): the resolved attribute lives
// in `p.metadataOutput`, so filter ancestors by `p.metadataOutput`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment claims "the resolved attribute lives in p.metadataOutput," but getMetadataAttributeByNameOpt (LogicalPlan.scala:56) explicitly looks in (metadataOutput ++ output).collectFirst with the note "An already-referenced column might appear in output instead of metadataOutput." The resolved attribute can be in p.output, not p.metadataOutput.

This matters because the filter is now narrower than pre-SPARK-55070 (p.output ++ p.metadataOutput). At any ancestor that clears metadataOutput to Nil (Aggregate, Limit, Sort, Window, set ops — basicLogicalOperators.scala:404, 448, 510, 853, 885, 1228, 1513, 1573, 1684) but carries the metadata attribute through output, the strict-metadata filter would reject a candidate the previous code accepted. Is the narrowing intentional? If so, can you spell that out in the comment and add a test for the metadata-access path? The PR description doesn't mention this behavior change.

resolveDataFrameColumnByPlanId(
u, planId, false, q, 0, plan => plan.outputSet) match {
case (Some(r), m) => (Some(r), m)
case _ => resolveDataFrameColumnByPlanId(u, planId, false, q, 0,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback re-walks the tree from scratch — re-descending, re-running p.resolve(u.nameParts, conf.resolver) at every matched node, and re-merging — only to swap the filter set. Resolution at matched nodes and the descent are identical between the two passes; only getAllowed(p) differs.

Consider collapsing to a single walk by exposing the two filter components per level (e.g. (p.outputSet, AttributeSet(p.metadataOutput))) and tracking pass-states on each candidate as it flows up. Concretely: drop getAllowed, return candidates as (NamedExpression, depth, passesStrict); at every ancestor, use r.references.subsetOf(AttributeSet(p.output ++ p.metadataOutput)) as the survival gate (matches today's broad filter) and AND-in r.references.subsetOf(p.outputSet) to update passesStrict. At the top of resolveDataFrameColumn, prefer the passesStrict subset and fall back to all survivors. That preserves the foldLeft merge and the strict-then-broad precedence, but pays one walk instead of two.

Not a blocker — just feels like the two passes are doing the same descent twice when the only difference is the filter.

Comment thread python/pyspark/sql/tests/test_column.py Outdated
@zhengruifeng zhengruifeng marked this pull request as draft April 28, 2026 10:38
… for reused DataFrame in natural join

Fix an AMBIGUOUS_COLUMN_REFERENCE regression introduced by SPARK-55070
when a DataFrame is referenced both directly in a join and also
nested under a natural/USING join elsewhere in the same plan.

Replace the single broadened ancestor walk in `resolveDataFrameColumn`
with a two-walk pattern, mirroring the
`outputAttributes.resolve orElse outputMetadataAttributes.resolve`
precedence in `LogicalPlan.resolve`. Regular access walks first with
the strict `p.outputSet` filter; only on no match does it retry with
`p.output ++ p.metadataOutput`. Metadata access keeps a single walk
filtered by `p.metadataOutput`.

Co-authored-by: Isaac
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
@zhengruifeng zhengruifeng force-pushed the pick-metadata-col-resolution branch from 1e69359 to 79a3074 Compare April 28, 2026 10:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants