[SPARK-56614][SQL][CONNECT] Add config for strict DataFrame column resolution#55531
Open
zhengruifeng wants to merge 17 commits intoapache:masterfrom
Open
[SPARK-56614][SQL][CONNECT] Add config for strict DataFrame column resolution#55531zhengruifeng wants to merge 17 commits intoapache:masterfrom
zhengruifeng wants to merge 17 commits intoapache:masterfrom
Conversation
### What changes were proposed in this pull request?
Adds a new internal SQL config `spark.sql.analyzer.strictDataFrameColumnResolution`
(default `false`) that controls how `UnresolvedAttribute` instances carrying a
`PLAN_ID_TAG` (i.e. DataFrame columns coming through Spark Connect) are resolved
in `ColumnResolutionHelper`:
- When `true`:
- In `resolveDataFrameColumnRecursively`, if the target plan node is found
but the column cannot be resolved on it, fail immediately with
`CANNOT_RESOLVE_DATAFRAME_COLUMN` instead of delaying the failure.
- In the main `resolveExpression` path, allow name-based resolution as a
fallback for tagged attributes as well.
- When `false` (default): keep the current behavior of delaying the failure so
that later analyzer iterations/rules still have a chance to resolve the
column, and keep skipping name-based resolution for tagged attributes.
### Why are the changes needed?
Give users an opt-in mode that surfaces DataFrame column resolution failures
earlier, matching the semantics used internally in Databricks Runtime
(databricks-eng/runtime@4452ee5). The default preserves existing Apache Spark
behavior.
### Does this PR introduce _any_ user-facing change?
No behavior change by default. A new internal config is added.
### How was this patch tested?
- New unit test in `SparkSessionExtensionSuite` that enables the config and
verifies `CANNOT_RESOLVE_DATAFRAME_COLUMN` is raised immediately, alongside
the existing "inject analyzer rule - hidden column" test which still exercises
the default delayed-failure path.
- `sbt 'catalyst/testOnly *AnalysisSuite'`
- `sbt 'sql/testOnly *SparkSessionExtensionSuite *ColumnNodeToExpressionConverterSuite *ResolverGuardSuite *PlanResolutionSuite'`
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic), claude-opus-4-7
Flip the default to match the semantics of ES_1636208_FIX_DELAY_DF_COL_FALURE = false (the Databricks default): tagged DataFrame columns that cannot be resolved on their matched plan node now fail immediately with CANNOT_RESOLVE_DATAFRAME_COLUMN, and name-based fallback is allowed. Setting the config to false restores the previous delayed-failure behavior. Update the existing "inject analyzer rule - hidden column" test to set the config to false locally, since it exercises the delayed-failure path.
Remove the early-throw in resolveDataFrameColumnRecursively; the config now only controls whether name-based resolution is attempted as a fallback for tagged UnresolvedAttributes. Failures from plan-id-based resolution continue to be delayed so that later analyzer iterations can still resolve the column. Update the SQLConf doc accordingly and revert the hidden-column test back to its original form (no longer needs to toggle the config).
Reorder the guard on the UnresolvedAttribute case so the cheap `!u.containsTag(PLAN_ID_TAG)` check runs first. Most attributes have no PLAN_ID_TAG, so this avoids the SQLConf lookup on the common path; the config is only consulted for tagged (Spark Connect) attributes.
The previous default (true) gave the more lenient behavior (name-based fallback allowed), which is the opposite of what "strict" implies. Flip the polarity so that `true` now means strict (plan-id-only resolution for tagged attributes) and `false` (default) keeps the lenient fallback behavior. Net user-facing behavior on the default path is unchanged.
Flip the default to true so strict plan-id-based resolution of tagged DataFrame columns is on by default. Drop the name-fallback wording from the doc string.
…est_column Move `test_join_with_cte`, `test_self_join`, `test_self_join_II`, `test_self_join_III`, `test_self_join_IV`, `test_drop_notexistent_col`, and `test_select_join_keys` into `ColumnTestsMixin` in `test_column.py`. These all exercise DataFrame column resolution behavior, so they are a better fit there and will pick up coverage on both classic (`ColumnTests`) and Connect (`ColumnParityTests`) via the existing mixin plumbing. Drop the Connect-specific parity variant of `test_join_with_cte` since the mixin already covers both sessions, and remove now-unused `when`/`concat` imports from `test_dataframe.py`.
…solution disabled Add ColumnParityWithNonStrictDataFrameColumnResolutionTests that re-runs the Column parity suite with `spark.sql.analyzer.strictDataFrameColumnResolution=false`, so the name-based fallback path for tagged UnresolvedAttributes is exercised under Spark Connect.
…class Shorter name: ColumnParityTestsWithLenientDFColResolution.
…hNonStrictDFColResolution
Revert the move of test_join_with_cte; it stays in test_connect_basic.py where it exercises cross-session parity between classic and Connect.
Note the config is specific to Spark Connect (only Connect attaches plan-id tags) and describe the `false` behavior.
Drop the leading 'Specific to Spark Connect.' sentence and qualify 'DataFrame columns' as 'Spark Connect DataFrame columns'.
…both parity classes Add `test_df_col_resolution_mode` to ColumnParityTests (expects "true") and to ColumnParityTestsWithNonStrictDFColResolution (expects "false") to confirm each class is running under the intended config.
…ss config for parity
* Add test_select_column_replaced_by_withcolumn in test_connect_column.py,
covering the case where `df.withColumn("c", ...).select(df["c"])` must
resolve the tagged reference to the overwritten alias. Wrapped in
`self.connect_conf(...)` so it exercises the non-strict DataFrame column
resolution path.
* In test_parity_column.py, set the strict DataFrame column resolution
config via setUpClass/tearDownClass instead of conf(), so both parity
classes explicitly pin the value on the live session.
…onStrictDFColResolution Mirror the setUpClass override with an explicit tearDownClass so each parity class pairs its own set/unset and does not rely on the parent's teardown to do it.
…ed_by_withcolumn Add a second case with spark.sql.analyzer.strictDataFrameColumnResolution set to true, asserting that the tagged reference to a column shadowed by withColumn fails with CANNOT_RESOLVE_DATAFRAME_COLUMN under strict mode.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Add a new internal SQL config
spark.sql.analyzer.strictDataFrameColumnResolution(defaulttrue) that controls howUnresolvedAttributes carrying aPLAN_ID_TAG(Spark Connect DataFrame columns) are resolved inColumnResolutionHelper:true(default): strict plan-id-based resolution only; no name-based fallback.false: if plan-id-based resolution does not resolve the tagged attribute, also try name-based resolution as a fallback.Additional test hygiene in this PR:
test_self_join,test_self_join_II,test_self_join_III,test_self_join_IV,test_drop_notexistent_col, andtest_select_join_keysfromtest_dataframe.pytoColumnTestsMixinintest_column.py; they exercise DataFrame column resolution and belong with the Column tests.test_parity_column.py, addColumnParityTestsWithNonStrictDFColResolutionwhich re-runs the Column parity suite with the new config set tofalse. Both parity classes pin the config value viasetUpClass/tearDownClassand include atest_df_col_resolution_modesanity check.test_connect_column.py, addtest_select_column_replaced_by_withcolumn: understrictDataFrameColumnResolution=falsethe plan-id-tagged reference to awithColumn-shadowed column resolves via name fallback; understrictDataFrameColumnResolution=truethe same query fails withCANNOT_RESOLVE_DATAFRAME_COLUMN.Why are the changes needed?
Keep Spark Connect DataFrame column references strictly anchored to their plan of origin by default, while giving users an escape hatch to opt into the more permissive name-based fallback path.
Does this PR introduce any user-facing change?
A new internal config is added. Default behavior is equivalent to current Apache Spark master for tagged attributes; setting
spark.sql.analyzer.strictDataFrameColumnResolution=falseopts into a slightly more permissive path where tagged attributes can also be resolved by name.How was this patch tested?
build/sbt 'catalyst/testOnly *AnalysisSuite'build/sbt 'sql/testOnly *SparkSessionExtensionSuite *ColumnNodeToExpressionConverterSuite *ResolverGuardSuite *PlanResolutionSuite *DataFrameSuite *DataFrameColumnarSuite'python/run-tests --testnames "pyspark.sql.tests.test_column,pyspark.sql.tests.connect.test_parity_column,pyspark.sql.tests.connect.test_connect_column SparkConnectColumnTests.test_select_column_replaced_by_withcolumn"Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic), claude-opus-4-7