Skip to content

[DO NOT MERGE][SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55567

Open
gengliangwang wants to merge 6 commits intoapache:masterfrom
gengliangwang:55507
Open

[DO NOT MERGE][SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55567
gengliangwang wants to merge 6 commits intoapache:masterfrom
gengliangwang:55507

Conversation

@gengliangwang
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This is PR 1 of a split of #55426 (see the split suggestion for the full plan). Can merge in any order, but 1 (#55507) < 2 (#55508) would be preferable. For more context, see discussion posted to dev@spark.apache.org and linked SPIP.

Validates the CDC metadata columns and row-identity presence returned by a Changelog connector at relation construction time, and introduces a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error.

  • ChangelogTable.validateSchema: fail-fast checks that the connector schema contains the required metadata columns (_change_type as StringType, _commit_version of connector-defined type, _commit_timestamp as TimestampType), and that rowId() returns a non-empty array when a capability requires row identity. rowVersion() is invoked when a capability requires it and surfaces the default UnsupportedOperationException directly if the connector has not overridden it. References can be top-level or nested (e.g. Delta's _metadata.row_commit_version). Invoked from the ChangelogTable constructor.
  • New error class INVALID_CHANGELOG_SCHEMA with sub-classes MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID.
  • Matching QueryCompilationErrors helpers for each sub-class.
  • rowVersion nullability is enforced at runtime in the carry-over filter in [SPARK-55952][SPARK-55953][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing #55508 via count(rowVersion) = 2 (see the #55426 NULL-safety thread for rationale). rowId nullability is not enforced. It is covered by the Changelog.rowId() Javadoc contract.

Why are the changes needed?

Gives connector implementors a clear analysis-time error message for misshapen CDC schemas instead of an opaque execution-time failure. Background on the original PR and its discussion thread.

Does this PR introduce any user-facing change?

Yes, for connector implementors. A connector that returns an invalid changelog schema (missing or wrong-typed metadata column, or advertising a capability requiring row identity without declaring rowId()) now fails at analysis time with INVALID_CHANGELOG_SCHEMA.*. A connector that advertises a capability requiring rowId() or rowVersion() without implementing the method surfaces the default UnsupportedOperationException at analysis time.

How was this patch tested?

Added schema-validation cases to ChangelogResolutionSuite covering:

  • Missing metadata column: _change_type, _commit_version, _commit_timestamp.
  • Wrong data type: _change_type non-String, _commit_timestamp non-Timestamp.
  • Connector-defined _commit_version type accepted (Integer, Long, String).
  • Valid schema with data columns passes.
  • Nested rowId and rowVersion references (Delta-style _metadata.row_id / _metadata.row_commit_version) pass.
  • MISSING_ROW_ID triggered by representsUpdateAsDeleteAndInsert = true.
  • MISSING_ROW_ID triggered by containsIntermediateChanges = true.
  • Default UnsupportedOperationException on rowId() surfaces when a capability requires it.
  • Default UnsupportedOperationException on rowVersion() surfaces when a capability requires it.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

SanJSp added 5 commits April 27, 2026 11:54
…HANGELOG_SCHEMA error class

Validate the CDC metadata columns, row identity and row versioning returned
by a `Changelog` connector at relation construction time, and introduce a
dedicated error class to report the failure at analysis time rather than
later at execution time with a less helpful error.

- `ChangelogTable.validateSchema`: fail-fast checks that the connector
  schema contains the required metadata columns (`_change_type`,
  `_commit_version`, `_commit_timestamp`), and that, when the connector
  advertises a capability requiring it, `rowId()` and `rowVersion()` are
  declared and the row version column is a non-nullable top-level column.
  Invoked from the `ChangelogTable` constructor.
- New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes
  `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`,
  `MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`, `NULLABLE_ROW_VERSION`.
- `QueryCompilationErrors` helpers for each sub-class.
- Tests: `ChangelogResolutionSuite` schema-validation cases using a
  `TestChangelog` fixture that returns hand-crafted schemas.
@gengliangwang gengliangwang changed the title [SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class [DO NOT MERGE][SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class Apr 27, 2026
@gengliangwang
Copy link
Copy Markdown
Member Author

This PR is to help run CI for #55507

@SanJSp
Copy link
Copy Markdown
Contributor

SanJSp commented Apr 28, 2026

I've resolved the merge conflicts on the main branch, feel free to test again 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants