[SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55507
Conversation
…HANGELOG_SCHEMA error class Validate the CDC metadata columns, row identity and row versioning returned by a `Changelog` connector at relation construction time, and introduce a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error. - `ChangelogTable.validateSchema`: fail-fast checks that the connector schema contains the required metadata columns (`_change_type`, `_commit_version`, `_commit_timestamp`), and that, when the connector advertises a capability requiring it, `rowId()` and `rowVersion()` are declared and the row version column is a non-nullable top-level column. Invoked from the `ChangelogTable` constructor. - New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`, `MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`, `NULLABLE_ROW_VERSION`. - `QueryCompilationErrors` helpers for each sub-class. - Tests: `ChangelogResolutionSuite` schema-validation cases using a `TestChangelog` fixture that returns hand-crafted schemas.
5f5279e to
753bea1
Compare
johanl-db
left a comment
There was a problem hiding this comment.
i don't have concerns about this change, some minor improvements suggested
gengliangwang
left a comment
There was a problem hiding this comment.
Summary
Clean, contained change: a connector-side Changelog that returns a misshapen CDC schema now gets a sharp INVALID_CHANGELOG_SCHEMA.* at analysis time, replacing the earlier opaque execution-time failure. The validator runs eagerly in ChangelogTable's constructor, which is the right boundary — everything downstream (resolution, planning, scans) then sees a schema it can trust.
A handful of things worth addressing before merging, in priority order:
rowIdnon-nullability is not validated, even though theChangelog.rowId()Javadoc says "Each referenced column must be non-nullable" and the existing peerSupportsDelta.rowId()path (resolveRowIdAttrs→NULLABLE_ROW_ID_ATTRIBUTES) has been doing this check for years. This PR is asymmetric:rowVersiongets nullability + top-level-ness,rowIdgets presence only.- The new
NESTED_ROW_VERSIONconstraint ("rowVersion must be a top-level column") is not documented onChangelog.rowVersion(). Right now a connector author can follow the Javadoc exactly ("non-nullable") and still trip this error. Either add the requirement to the Javadoc or drop the check. - PR description overstates test coverage. The "How was this patch tested?" section lists tests for "row-identity-required capabilities without rowId/rowVersion" and "nested rowVersion", but the suite only exercises metadata presence/types, nullable
rowVersion, and valid schemas — there is noMISSING_ROW_ID,MISSING_ROW_VERSION, orNESTED_ROW_VERSIONcase, and capability triggers other thancontainsCarryoverRows=trueare unexercised.
Remaining inline comments are smaller (scoping, error-message specificity, a comment typo).
For the error-text wording around MISSING_ROW_ID / MISSING_ROW_VERSION I'll defer to @johanl-db's existing comments rather than duplicate.
| cl.containsIntermediateChanges() | ||
| if (needsRowId && (rowIds == null || rowIds.isEmpty)) { | ||
| throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) | ||
| } |
There was a problem hiding this comment.
rowId columns are not checked for non-nullability, even though (a) the Changelog.rowId() Javadoc requires "Each referenced column must be non-nullable", and (b) the peer row-level-operations path validates this via RewriteRowLevelCommand.resolveRowIdAttrs with NULLABLE_ROW_ID_ATTRIBUTES. Consider adding a parallel NULLABLE_ROW_ID sub-class (or at least stating explicitly that rowId column validation is deferred to a later PR). As written, rowVersion gets nullability + top-level-ness but rowId gets presence only.
There was a problem hiding this comment.
Done, using your option 2 from the NULL-safety thread on #55426. Added count(rowVersion) to the carry-over Window as a third aggregate alongside min and max (no extra Window operator, no additional shuffle). The filter now requires _rv_cnt = 2 AND _min_rv = _max_rv. A NULL rowVersion on either side fails the count check and the pair falls through as raw delete+insert instead of being silently dropped. Nesting-agnostic. Implementation and regression test ("NULL rowVersion on one side is NOT silently dropped as carry-over") in #55508.
On the rowId asymmetry: rowId nullability is not schema-checked. An analogous silent-drop path exists (multiple NULL-rowId rows collapse into one Window partition via SQL NULL-group semantics), but the trigger surface is narrower than for rowVersion and a count()=2-style runtime guard does not port cleanly.
A top-level-only schema check would cover id but miss, for example, Delta's nested _metadata.row_id. This asymmetric coverage feels worse than no coverage at all.
We can
- either do a full schema walk through metadata columns covering both top-level and nested (how deep do we go, I think all the way, right?),
- or leave it unenforced and trust the Javadoc contract.
Currently, it's implemented for the latter. Open to the recursive column check but could need some input here. What do you think @gengliangwang?
There was a problem hiding this comment.
For now I'd defer to a later PR. Is that fine?
| // delete+insert pair would be misclassified as a real update). | ||
| rowVersionRef.foreach { ref => | ||
| val fieldNames = ref.fieldNames() | ||
| if (fieldNames.length != 1) { |
There was a problem hiding this comment.
The top-level requirement is new — the Changelog.rowVersion() Javadoc only says "non-nullable". A connector that reads the contract and returns a nested NamedReference will fail with NESTED_ROW_VERSION but get no hint from the API docs. Please either (a) update the Changelog.rowVersion() Javadoc to state that the reference must be a top-level column of columns(), or (b) remove this check and accept nested references. Same applies (by extension) to rowId if top-level-ness is also intended there.
There was a problem hiding this comment.
Done (option b). Removed the top-level-only restriction. NESTED_ROW_VERSION error class, helper, and subclass are gone.
There was a problem hiding this comment.
There is no new commit since the review yesterday.
d737051 to
cee3a81
Compare
What changes were proposed in this pull request?
This is PR 1 of a split of #55426 (see the split suggestion for the full plan). Can merge in any order, but 1 (#55507) < 2 (#55508) would be preferable. For more context, see discussion posted to dev@spark.apache.org and linked SPIP.
Validates the CDC metadata columns and row-identity presence returned by a
Changelogconnector at relation construction time, and introduces a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error.ChangelogTable.validateSchema: fail-fast checks that the connector schema contains the required metadata columns (_change_typeas StringType,_commit_versionof connector-defined type,_commit_timestampas TimestampType), and thatrowId()returns a non-empty array when a capability requires row identity.rowVersion()is invoked when a capability requires it and surfaces the defaultUnsupportedOperationExceptiondirectly if the connector has not overridden it. References can be top-level or nested (e.g. Delta's_metadata.row_commit_version). Invoked from theChangelogTableconstructor.INVALID_CHANGELOG_SCHEMAwith sub-classesMISSING_COLUMN,INVALID_COLUMN_TYPE,MISSING_ROW_ID.QueryCompilationErrorshelpers for each sub-class.count(rowVersion) = 2(see the #55426 NULL-safety thread for rationale). rowId nullability is not enforced. It is covered by theChangelog.rowId()Javadoc contract.Why are the changes needed?
Gives connector implementors a clear analysis-time error message for misshapen CDC schemas instead of an opaque execution-time failure. Background on the original PR and its discussion thread.
Does this PR introduce any user-facing change?
Yes, for connector implementors. A connector that returns an invalid changelog schema (missing or wrong-typed metadata column, or advertising a capability requiring row identity without declaring
rowId()) now fails at analysis time withINVALID_CHANGELOG_SCHEMA.*. A connector that advertises a capability requiringrowId()orrowVersion()without implementing the method surfaces the defaultUnsupportedOperationExceptionat analysis time.How was this patch tested?
Added schema-validation cases to
ChangelogResolutionSuitecovering:_change_type,_commit_version,_commit_timestamp._change_typenon-String,_commit_timestampnon-Timestamp._commit_versiontype accepted (Integer, Long, String)._metadata.row_id/_metadata.row_commit_version) pass.MISSING_ROW_IDtriggered byrepresentsUpdateAsDeleteAndInsert = true.MISSING_ROW_IDtriggered bycontainsIntermediateChanges = true.UnsupportedOperationExceptiononrowId()surfaces when a capability requires it.UnsupportedOperationExceptiononrowVersion()surfaces when a capability requires it.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7