-
Notifications
You must be signed in to change notification settings - Fork 31
ddl: Fix potential data inconsistency when move dispatcher during an in-flight DDL or Syncpoint #3677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a potential data consistency issue in TiCDC by refining how table dispatchers are initialized when they are moved or recreated, particularly in the presence of in-flight syncpoints. The core change ensures that a new dispatcher starts processing events from the correct timestamp (the syncpoint's Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request addresses a potential race condition when a table is moved while a syncpoint is in-flight. The fix involves adjusting the startTs for the new dispatcher to prevent re-scanning of events, which could corrupt snapshot consistency. The change is well-commented and supported by new unit tests and a comprehensive integration test. My review includes a suggestion to enhance the unit tests for better coverage.
| func TestSpanReplication_NewAddDispatcherMessage_UseBlockTsForInFlightSyncPoint(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), common.NewDispatcherID(), 1, getTableSpanByID(4), 9, common.DefaultMode, false) | ||
| replicaSet.UpdateBlockState(heartbeatpb.State{ | ||
| IsBlocked: true, | ||
| BlockTs: 10, | ||
| IsSyncPoint: true, | ||
| Stage: heartbeatpb.BlockStage_WAITING, | ||
| }) | ||
|
|
||
| msg := replicaSet.NewAddDispatcherMessage("node1") | ||
| req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest) | ||
| require.Equal(t, uint64(10), req.Config.StartTs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To improve test coverage, it's good practice to use a table-driven test here to cover both BlockStage_WAITING and BlockStage_WRITING stages for an in-flight sync point. This ensures the logic is robust for all relevant states.
func TestSpanReplication_NewAddDispatcherMessage_UseBlockTsForInFlightSyncPoint(t *testing.T) {
t.Parallel()
testCases := []struct {
stage heartbeatpb.BlockStage
name string
}{
{stage: heartbeatpb.BlockStage_WAITING, name: "waiting"},
{stage: heartbeatpb.BlockStage_WRITING, name: "writing"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
replicaSet := NewSpanReplication(common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceNamme), common.NewDispatcherID(), 1, getTableSpanByID(4), 9, common.DefaultMode, false)
replicaSet.UpdateBlockState(heartbeatpb.State{
IsBlocked: true,
BlockTs: 10,
IsSyncPoint: true,
Stage: tc.stage,
})
msg := replicaSet.NewAddDispatcherMessage("node1")
req := msg.Message[0].(*heartbeatpb.ScheduleDispatcherRequest)
require.Equal(t, uint64(10), req.Config.StartTs)
})
}
}|
/test all |
|
/test all |
|
/gemini summary |
|
/gemini review |
Summary of ChangesThis pull request addresses a critical data consistency concern in TiCDC related to how table dispatchers are initialized when they are moved or recreated, particularly during in-flight DDL and syncpoint barrier operations. The changes ensure that dispatchers correctly determine their starting timestamp and whether to skip DML events, preventing data inconsistencies such as duplicate writes or missed DDLs. This is achieved by introducing a Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request addresses a potential DDL loss issue when a table is moved while a DDL operation is in-flight. The fix involves introducing a skipDMLAsStartTs flag and adjusting the startTs for newly created dispatchers under specific conditions (in-flight DDL or syncpoint). The changes are propagated from the maintainer to the dispatcher manager and finally to the dispatcher. The logic appears sound and is well-supported by new unit and integration tests that cover both DDL and syncpoint scenarios.
My review includes a couple of suggestions to improve maintainability: one to fix a typo in a key comment, and another to refactor duplicated logic into a helper function. Overall, this is a solid contribution that improves the robustness of table migration.
| // This flag is set to true ONLY when is_syncpoint=false AND finished=0 in ddl-ts table (non-syncpoint DDL not finished). | ||
| // In this case, we return startTs = ddlTs-1 to replay the DDL, and skip the already-written DML at ddlTs | ||
| // to avoid duplicate writes while ensuring the DDL is replayed. | ||
| // This flag is set to true in two secnaios: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // if the newStartTs equals to the original startTs, we need to combine the skipDMLAsStartTs flag | ||
| // otherwise, we just use the skipDMLAsStartTs flag from mysql sink | ||
| var skipDMLAsStartTs bool | ||
| if newStartTsList[idx] == startTsList[idx] { | ||
| skipDMLAsStartTs = scheduleSkipDMLAsStartTsList[idx] || skipDMLAsStartTsList[idx] | ||
| } else { | ||
| skipDMLAsStartTs = skipDMLAsStartTsList[idx] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic for determining the skipDMLAsStartTs flag is duplicated in dispatcher_manager_redo.go (lines 131-138). To improve maintainability and reduce code duplication, consider extracting this logic into a shared helper function, potentially in dispatcher_manager_helper.go.
For example:
func determineSkipDMLFlag(newStartTs, originalStartTs int64, scheduleSkip, sinkSkip bool) bool {
if newStartTs == originalStartTs {
return scheduleSkip || sinkSkip
}
return sinkSkip
}You could then call this helper function from both newEventDispatchers and newRedoDispatchers.
|
/test all |
|
/test all |
| SchemaID: r.schemaID, | ||
| Span: r.Span, | ||
| StartTs: startTs, | ||
| SkipDMLAsStartTs: skipDMLAsStartTs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the merge operator need to carry SkipDMLAsStartTs?
| } | ||
|
|
||
| trap 'stop_tidb_cluster; collect_logs $WORK_DIR' EXIT | ||
| run $* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more tests for the redo mode
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/gemini review |
|
/gemini summary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request addresses a critical data consistency issue that can occur when a dispatcher is moved during in-flight DDL or syncpoint operations. The changes are comprehensive, touching the dispatcher, dispatcher manager, and maintainer components to correctly calculate the dispatcher's start timestamp and a new skipDMLAsStartTs flag. The logic for handling different barrier scenarios (DDL vs. syncpoint) and for merge operations appears sound. The addition of extensive integration tests for these specific scenarios is excellent and provides high confidence in the fix. My review identifies a couple of opportunities to improve maintainability by refactoring duplicated code and a long, complex function. Overall, this is a high-quality contribution that solves a complex problem.
| // if the newStartTs equals to the original startTs, we need to combine the skipDMLAsStartTs flag | ||
| // otherwise, we just use the skipDMLAsStartTs flag from mysql sink | ||
| var skipDMLAsStartTs bool | ||
| if newStartTsList[idx] == startTsList[idx] { | ||
| skipDMLAsStartTs = scheduleSkipDMLAsStartTsList[idx] || skipDMLAsStartTsList[idx] | ||
| } else { | ||
| skipDMLAsStartTs = skipDMLAsStartTsList[idx] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of logic for determining the skipDMLAsStartTs flag is duplicated from newEventDispatchers in dispatcher_manager.go. To improve maintainability and avoid potential inconsistencies in the future, consider extracting this logic into a shared helper function. This would ensure that any future changes to this logic are applied consistently for both event dispatchers and redo dispatchers.
| // resolveMergedDispatcherStartTs returns the effective startTs and skip flags for the merged dispatcher. | ||
| // | ||
| // Inputs: | ||
| // - minCheckpointTs: min checkpointTs among all source dispatchers, collected after they are closed. | ||
| // - pendingStates: per-source block state from GetBlockEventStatus() captured at close time. | ||
| // | ||
| // Algorithm: | ||
| // 1. Build a merge candidate from minCheckpointTs. | ||
| // If all source dispatchers have a non-nil pending block state and they refer to the same (commitTs, isSyncPoint), | ||
| // adjust the merge candidate so the merged dispatcher can replay that block event safely: | ||
| // - DDL: startTs = commitTs - 1, skipDMLAsStartTs = true. | ||
| // - SyncPoint: startTs = commitTs. | ||
| // The merge candidate always uses skipSyncpointAtStartTs = false. | ||
| // 2. If the sink is MySQL, query downstream ddl_ts recovery info using the merge candidate startTs and merge the results: | ||
| // - If recoveryStartTs > mergeStartTsCandidate: use recoveryStartTs and its skip flags. | ||
| // - If recoveryStartTs == mergeStartTsCandidate: OR the skip flags. | ||
| // - If recoveryStartTs < mergeStartTsCandidate: keep the merge candidate. | ||
| // If the query fails, the error is reported via mergedDispatcher.HandleError and the merge candidate is returned. | ||
| // | ||
| // For non-MySQL and redo, the merge candidate is the final result. | ||
| func resolveMergedDispatcherStartTs(t *MergeCheckTask, minCheckpointTs uint64, pendingStates []*heartbeatpb.State) (uint64, bool, bool) { | ||
| mergeStartTsCandidate := minCheckpointTs | ||
| mergeSkipSyncpointAtStartTsCandidate := false | ||
| mergeSkipDMLAsStartTsCandidate := false | ||
|
|
||
| // If all source dispatchers have a pending block event and they are the same one, | ||
| // adjust the startTs to ensure the merged dispatcher can replay it safely. | ||
| allSamePending := true | ||
| var pendingCommitTs uint64 | ||
| var pendingIsSyncPoint bool | ||
| for idx, state := range pendingStates { | ||
| if state == nil { | ||
| allSamePending = false | ||
| break | ||
| } | ||
| if idx == 0 { | ||
| pendingCommitTs = state.BlockTs | ||
| pendingIsSyncPoint = state.IsSyncPoint | ||
| continue | ||
| } | ||
| if state.BlockTs != pendingCommitTs || state.IsSyncPoint != pendingIsSyncPoint { | ||
| allSamePending = false | ||
| break | ||
| } | ||
| } | ||
| if allSamePending { | ||
| if pendingIsSyncPoint { | ||
| mergeStartTsCandidate = pendingCommitTs | ||
| } else if pendingCommitTs > 0 { | ||
| mergeStartTsCandidate = pendingCommitTs - 1 | ||
| mergeSkipDMLAsStartTsCandidate = true | ||
| } else { | ||
| log.Warn("pending ddl has zero commit ts, fallback to min checkpoint ts", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Uint64("minCheckpointTs", minCheckpointTs), | ||
| zap.Any("mergedDispatcher", t.mergedDispatcher.GetId())) | ||
| } | ||
| log.Info("merge dispatcher uses pending block event to calculate start ts", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Any("mergedDispatcher", t.mergedDispatcher.GetId()), | ||
| zap.Uint64("pendingCommitTs", pendingCommitTs), | ||
| zap.Bool("pendingIsSyncPoint", pendingIsSyncPoint), | ||
| zap.Uint64("startTs", mergeStartTsCandidate), | ||
| zap.Bool("skipSyncpointAtStartTs", mergeSkipSyncpointAtStartTsCandidate), | ||
| zap.Bool("skipDMLAsStartTs", mergeSkipDMLAsStartTsCandidate)) | ||
| } | ||
|
|
||
| finalStartTs := mergeStartTsCandidate | ||
| finalSkipSyncpointAtStartTs := mergeSkipSyncpointAtStartTsCandidate | ||
| finalSkipDMLAsStartTs := mergeSkipDMLAsStartTsCandidate | ||
|
|
||
| if common.IsDefaultMode(t.mergedDispatcher.GetMode()) && t.manager.sink.SinkType() == common.MysqlSinkType { | ||
| newStartTsList, skipSyncpointAtStartTsList, skipDMLAsStartTsList, err := t.manager.sink.(*mysql.Sink).GetTableRecoveryInfo( | ||
| []int64{t.mergedDispatcher.GetTableSpan().TableID}, | ||
| []int64{int64(mergeStartTsCandidate)}, | ||
| false, | ||
| ) | ||
| if err != nil { | ||
| log.Error("get table recovery info for merge dispatcher failed", | ||
| zap.Stringer("dispatcherID", t.mergedDispatcher.GetId()), | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Error(err), | ||
| ) | ||
| t.mergedDispatcher.HandleError(err) | ||
| return finalStartTs, finalSkipSyncpointAtStartTs, finalSkipDMLAsStartTs | ||
| } | ||
| recoveryStartTs := uint64(newStartTsList[0]) | ||
| recoverySkipSyncpointAtStartTs := skipSyncpointAtStartTsList[0] | ||
| recoverySkipDMLAsStartTs := skipDMLAsStartTsList[0] | ||
| if recoveryStartTs > mergeStartTsCandidate { | ||
| finalStartTs = recoveryStartTs | ||
| finalSkipSyncpointAtStartTs = recoverySkipSyncpointAtStartTs | ||
| finalSkipDMLAsStartTs = recoverySkipDMLAsStartTs | ||
| } else if recoveryStartTs == mergeStartTsCandidate { | ||
| finalSkipSyncpointAtStartTs = mergeSkipSyncpointAtStartTsCandidate || recoverySkipSyncpointAtStartTs | ||
| finalSkipDMLAsStartTs = mergeSkipDMLAsStartTsCandidate || recoverySkipDMLAsStartTs | ||
| } | ||
|
|
||
| log.Info("get table recovery info for merge dispatcher", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Uint64("mergeStartTsCandidate", mergeStartTsCandidate), | ||
| zap.Any("recoveryStartTs", newStartTsList), | ||
| zap.Any("recoverySkipSyncpointAtStartTsList", skipSyncpointAtStartTsList), | ||
| zap.Any("recoverySkipDMLAsStartTsList", skipDMLAsStartTsList), | ||
| zap.Uint64("finalStartTs", finalStartTs), | ||
| zap.Bool("finalSkipSyncpointAtStartTs", finalSkipSyncpointAtStartTs), | ||
| zap.Bool("finalSkipDMLAsStartTs", finalSkipDMLAsStartTs), | ||
| ) | ||
| } | ||
|
|
||
| return finalStartTs, finalSkipSyncpointAtStartTs, finalSkipDMLAsStartTs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new function resolveMergedDispatcherStartTs is quite long and complex, spanning over 100 lines. While it is well-commented, its length makes it difficult to digest at a glance. To improve readability, testability, and maintainability, consider refactoring it into smaller, more focused helper functions. For example:
- A function to determine the merge candidate
startTsand flags based on thependingStatesof source dispatchers. - A function to apply the MySQL-specific recovery logic, taking the candidate values and refining them.
Breaking down this logic would make the merge process easier to understand and reason about.
Summary of ChangesThis pull request resolves a critical data consistency issue in TiCDC by enhancing how table dispatchers are initialized when they are moved or recreated, especially during in-flight DDL and syncpoint barrier operations. The core changes ensure that dispatchers accurately determine their starting timestamp and whether to skip DML events, effectively preventing data inconsistencies such as duplicate writes or missed DDLs. This is achieved through refined dispatcher creation logic and comprehensive testing under complex distributed system conditions. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/test all |
|
/test all |
|
/test all |
|
/test all |
1 similar comment
|
/test all |
|
/retest |
1 similar comment
|
/retest |
|
/gemini review |
|
/gemini summary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a crucial fix for potential data inconsistencies that can occur when a dispatcher is moved or merged during in-flight DDL and syncpoint operations. The changes correctly adjust the dispatcher's starting timestamp and introduce a skipDMLAsStartTs flag to prevent duplicate writes or missed DDLs, which is a solid approach. The logic is well-supported by new unit and integration tests. My feedback focuses on improving code structure for better long-term maintainability by refactoring a long function and extracting duplicated logic into a helper.
| // if the newStartTs equals to the original startTs, we need to combine the skipDMLAsStartTs flag | ||
| // otherwise, we just use the skipDMLAsStartTs flag from mysql sink | ||
| var skipDMLAsStartTs bool | ||
| if newStartTsList[idx] == startTsList[idx] { | ||
| skipDMLAsStartTs = scheduleSkipDMLAsStartTsList[idx] || skipDMLAsStartTsList[idx] | ||
| } else { | ||
| skipDMLAsStartTs = skipDMLAsStartTsList[idx] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic for determining skipDMLAsStartTs is duplicated in newEventDispatchers and newRedoDispatchers (in dispatcher_manager_redo.go). To improve maintainability and reduce code duplication, consider extracting this logic into a small helper function.
For example, you could create a helper in dispatcher_manager_helper.go:
func resolveSkipDMLAsStartTs(newStartTs, originalStartTs int64, scheduleSkip, sinkSkip bool) bool {
// if the newStartTs equals to the original startTs, we need to combine the skipDMLAsStartTs flag
// otherwise, we just use the skipDMLAsStartTs flag from mysql sink
if newStartTs == originalStartTs {
return scheduleSkip || sinkSkip
}
return sinkSkip
}Then call it from both places.
| // resolveMergedDispatcherStartTs returns the effective startTs and skip flags for the merged dispatcher. | ||
| // | ||
| // Inputs: | ||
| // - minCheckpointTs: min checkpointTs among all source dispatchers, collected after they are closed. | ||
| // - pendingStates: per-source block state from GetBlockEventStatus() captured at close time. | ||
| // | ||
| // Algorithm: | ||
| // 1. Build a merge candidate from minCheckpointTs. | ||
| // If all source dispatchers have a non-nil pending block state and they refer to the same (commitTs, isSyncPoint), | ||
| // adjust the merge candidate so the merged dispatcher can replay that block event safely: | ||
| // - DDL: startTs = commitTs - 1, skipDMLAsStartTs = true. | ||
| // - SyncPoint: startTs = commitTs. | ||
| // The merge candidate always uses skipSyncpointAtStartTs = false. | ||
| // 2. If the sink is MySQL, query downstream ddl_ts recovery info using the merge candidate startTs and merge the results: | ||
| // - If recoveryStartTs > mergeStartTsCandidate: use recoveryStartTs and its skip flags. | ||
| // - If recoveryStartTs == mergeStartTsCandidate: OR the skip flags. | ||
| // - If recoveryStartTs < mergeStartTsCandidate: keep the merge candidate. | ||
| // If the query fails, the error is reported via mergedDispatcher.HandleError and the merge candidate is returned. | ||
| // | ||
| // For non-MySQL and redo, the merge candidate is the final result. | ||
| func resolveMergedDispatcherStartTs(t *MergeCheckTask, minCheckpointTs uint64, pendingStates []*heartbeatpb.State) (uint64, bool, bool) { | ||
| mergeStartTsCandidate := minCheckpointTs | ||
| mergeSkipSyncpointAtStartTsCandidate := false | ||
| mergeSkipDMLAsStartTsCandidate := false | ||
|
|
||
| // If all source dispatchers have a pending block event and they are the same one, | ||
| // adjust the startTs to ensure the merged dispatcher can replay it safely. | ||
| allSamePending := true | ||
| var pendingCommitTs uint64 | ||
| var pendingIsSyncPoint bool | ||
| for idx, state := range pendingStates { | ||
| if state == nil { | ||
| allSamePending = false | ||
| break | ||
| } | ||
| if idx == 0 { | ||
| pendingCommitTs = state.BlockTs | ||
| pendingIsSyncPoint = state.IsSyncPoint | ||
| continue | ||
| } | ||
| if state.BlockTs != pendingCommitTs || state.IsSyncPoint != pendingIsSyncPoint { | ||
| allSamePending = false | ||
| break | ||
| } | ||
| } | ||
| if allSamePending { | ||
| if pendingIsSyncPoint { | ||
| mergeStartTsCandidate = pendingCommitTs | ||
| } else if pendingCommitTs > 0 { | ||
| mergeStartTsCandidate = pendingCommitTs - 1 | ||
| mergeSkipDMLAsStartTsCandidate = true | ||
| } else { | ||
| log.Warn("pending ddl has zero commit ts, fallback to min checkpoint ts", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Uint64("minCheckpointTs", minCheckpointTs), | ||
| zap.Any("mergedDispatcher", t.mergedDispatcher.GetId())) | ||
| } | ||
| log.Info("merge dispatcher uses pending block event to calculate start ts", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Any("mergedDispatcher", t.mergedDispatcher.GetId()), | ||
| zap.Uint64("pendingCommitTs", pendingCommitTs), | ||
| zap.Bool("pendingIsSyncPoint", pendingIsSyncPoint), | ||
| zap.Uint64("startTs", mergeStartTsCandidate), | ||
| zap.Bool("skipSyncpointAtStartTs", mergeSkipSyncpointAtStartTsCandidate), | ||
| zap.Bool("skipDMLAsStartTs", mergeSkipDMLAsStartTsCandidate)) | ||
| } | ||
|
|
||
| finalStartTs := mergeStartTsCandidate | ||
| finalSkipSyncpointAtStartTs := mergeSkipSyncpointAtStartTsCandidate | ||
| finalSkipDMLAsStartTs := mergeSkipDMLAsStartTsCandidate | ||
|
|
||
| if common.IsDefaultMode(t.mergedDispatcher.GetMode()) && t.manager.sink.SinkType() == common.MysqlSinkType { | ||
| newStartTsList, skipSyncpointAtStartTsList, skipDMLAsStartTsList, err := t.manager.sink.(*mysql.Sink).GetTableRecoveryInfo( | ||
| []int64{t.mergedDispatcher.GetTableSpan().TableID}, | ||
| []int64{int64(mergeStartTsCandidate)}, | ||
| false, | ||
| ) | ||
| if err != nil { | ||
| log.Error("get table recovery info for merge dispatcher failed", | ||
| zap.Stringer("dispatcherID", t.mergedDispatcher.GetId()), | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Error(err), | ||
| ) | ||
| t.mergedDispatcher.HandleError(err) | ||
| return finalStartTs, finalSkipSyncpointAtStartTs, finalSkipDMLAsStartTs | ||
| } | ||
| recoveryStartTs := uint64(newStartTsList[0]) | ||
| recoverySkipSyncpointAtStartTs := skipSyncpointAtStartTsList[0] | ||
| recoverySkipDMLAsStartTs := skipDMLAsStartTsList[0] | ||
| if recoveryStartTs > mergeStartTsCandidate { | ||
| finalStartTs = recoveryStartTs | ||
| finalSkipSyncpointAtStartTs = recoverySkipSyncpointAtStartTs | ||
| finalSkipDMLAsStartTs = recoverySkipDMLAsStartTs | ||
| } else if recoveryStartTs == mergeStartTsCandidate { | ||
| finalSkipSyncpointAtStartTs = mergeSkipSyncpointAtStartTsCandidate || recoverySkipSyncpointAtStartTs | ||
| finalSkipDMLAsStartTs = mergeSkipDMLAsStartTsCandidate || recoverySkipDMLAsStartTs | ||
| } | ||
|
|
||
| log.Info("get table recovery info for merge dispatcher", | ||
| zap.Stringer("changefeedID", t.manager.changefeedID), | ||
| zap.Uint64("mergeStartTsCandidate", mergeStartTsCandidate), | ||
| zap.Any("recoveryStartTs", newStartTsList), | ||
| zap.Any("recoverySkipSyncpointAtStartTsList", skipSyncpointAtStartTsList), | ||
| zap.Any("recoverySkipDMLAsStartTsList", skipDMLAsStartTsList), | ||
| zap.Uint64("finalStartTs", finalStartTs), | ||
| zap.Bool("finalSkipSyncpointAtStartTs", finalSkipSyncpointAtStartTs), | ||
| zap.Bool("finalSkipDMLAsStartTs", finalSkipDMLAsStartTs), | ||
| ) | ||
| } | ||
|
|
||
| return finalStartTs, finalSkipSyncpointAtStartTs, finalSkipDMLAsStartTs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function resolveMergedDispatcherStartTs is quite long (over 100 lines) and handles multiple complex logical steps. To improve readability and maintainability, consider refactoring it into smaller, more focused helper functions.
For example, you could break it down into:
- A function to determine the candidate
startTsand skip flags based on the pending states of the source dispatchers. - A function that takes these candidate values and merges them with the recovery information from the MySQL sink.
This would make the overall logic easier to follow and test.
Summary of ChangesThis pull request addresses a critical data consistency concern in TiCDC by refining how table dispatchers are initialized when they are moved or recreated, particularly during in-flight DDL and syncpoint barrier operations. The changes ensure that dispatchers correctly determine their starting timestamp and whether to skip DML events, preventing data inconsistencies such as duplicate writes or missed DDLs. This is achieved by introducing a Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a critical fix for data consistency issues that could occur when a dispatcher is moved or recreated during in-flight DDL and syncpoint operations. The changes correctly determine the dispatcher's start timestamp and whether to skip DML events by introducing and propagating a skipDMLAsStartTs flag. The logic for handling these barrier scenarios is now more robust, especially in the dispatcher creation and merge logic. The addition of comprehensive integration and unit tests is a great step towards ensuring the correctness of these complex distributed scenarios. Overall, the changes are well-implemented and significantly improve the reliability of TiCDC. I've found one critical issue regarding an undefined variable that would cause a compilation failure, and a minor typo in a test script.
| t.mergedDispatcher.SetCurrentPDTs(t.manager.pdClock.CurrentTS()) | ||
| t.mergedDispatcher.SetComponentStatus(heartbeatpb.ComponentState_Initializing) | ||
| appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).CommitAddDispatcher(t.mergedDispatcher, minCheckpointTs) | ||
| appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).CommitAddDispatcher(t.mergedDispatcher, finalStartTs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable finalStartTs is used here but it's not defined in the scope of the doMerge function. This will cause a compilation error. Based on the surrounding code and the refactoring, it seems you intended to use startTs, which is defined on line 276 and holds the correctly resolved start timestamp for the merged dispatcher. Please correct this to use startTs.
| appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).CommitAddDispatcher(t.mergedDispatcher, finalStartTs) | |
| appcontext.GetService[*eventcollector.EventCollector](appcontext.EventCollector).CommitAddDispatcher(t.mergedDispatcher, startTs) |
| 'cdc move_table in_flight_ddl_during_scheduling checkpoint_race_ddl_crash' | ||
| # G04 | ||
| 'complex_transaction syncpoint syncpoint_check_ts random_drop_message' | ||
| 'complex_transaction syncpoint in_flight_syncpoint_during_sheduling syncpoint_check_ts random_drop_message' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a typo in the test case name. It should be in_flight_syncpoint_during_scheduling instead of in_flight_syncpoint_during_sheduling to match the directory name.
| 'complex_transaction syncpoint in_flight_syncpoint_during_sheduling syncpoint_check_ts random_drop_message' | |
| 'complex_transaction syncpoint in_flight_syncpoint_during_scheduling syncpoint_check_ts random_drop_message' |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a critical fix for data consistency issues that can occur when a dispatcher is moved, recreated, or merged during an in-flight DDL or syncpoint barrier. The core of the change is to make the dispatcher creation and merge logic aware of these in-flight barriers, correctly setting the startTs and a new skipDMLAsStartTs flag to ensure events are replayed correctly without data loss or duplication.
The changes are comprehensive, touching the dispatcher, dispatcher manager, maintainer, and protobuf definitions. The logic for determining the correct starting timestamp and skip flags is well-encapsulated in new helper functions, improving maintainability. The addition of two thorough integration tests (ddl_move_table and syncpoint_move_table) that use failpoints to simulate the exact race conditions is excellent and provides strong confidence in the fix. The code is well-documented with comments explaining the complex logic. Overall, this is a high-quality and important contribution to the stability of TiCDC.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
What problem does this PR solve?
Issue Number: close #3663
What is changed and how it works?
This pull request addresses a critical data consistency concern in TiCDC related to how table dispatchers are initialized when they are moved or recreated, particularly during in-flight DDL and syncpoint barrier operations. The changes ensure that dispatchers correctly determine their starting timestamp and whether to skip DML events, preventing data inconsistencies such as duplicate writes or missed DDLs. This is achieved by introducing a
skipDMLAsStartTsflag, updating the dispatcher creation logic, and adding robust unit and integration tests to validate the behavior under complex distributed system conditions.Highlights
blockTs-1and setskipDMLAsStartTstotrue. This ensures the DDL is replayed without duplicating DML events that might have already been written.blockTs(the syncpoint's commit timestamp) and setskipDMLAsStartTstofalse. This prevents re-scanning and re-applying events that could corrupt snapshot consistency.skipDMLAsStartTs: TheskipDMLAsStartTsflag is now properly propagated through theDispatcherConfigprotobuf message,dispatcherCreateInfostruct, and various dispatcher creation functions (NewEventDispatcher,NewRedoDispatcher) to ensure consistent behavior across the system.ddl_move_tableandsyncpoint_move_table) have been added to thoroughly validate the correct behavior of dispatchers when moved during DDL and syncpoint barriers, respectively, ensuring end-to-end data consistency.SpanReplication.NewAddDispatcherMessageandDispatcherManagerto cover various scenarios involvingblockState,syncpoint, andDDLbarriers, ensuring the new logic functions as expected.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note