-
Notifications
You must be signed in to change notification settings - Fork 31
wip-checksum #3782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
wip-checksum #3782
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements a critical dispatcher set checksum mechanism to enhance the reliability of watermark advancement. By introducing a robust verification process, the system can now prevent checkpoint progression when there are discrepancies between the expected and actual dispatcher configurations on individual captures. This ensures data consistency and stability, particularly in dynamic environments where dispatchers might be added, removed, or moved. The changes span across dispatcher and maintainer components, incorporating new communication protocols, state management, and comprehensive monitoring capabilities. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a checksum mechanism to ensure the consistency of the dispatcher set between the maintainer and dispatcher managers. This is a significant improvement for data integrity and safety against scheduling anomalies. The implementation is thorough, covering initialization, incremental updates, and safety checks to halt checkpoint advancement on mismatches. The changes are well-integrated across various components. I have a few suggestions to refactor some parts of the new code to improve readability and reduce duplication.
| for _, state := range []string{"mismatch", "uninitialized"} { | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | ||
| } | ||
| if e.RedoEnable { | ||
| for _, state := range []string{"mismatch", "uninitialized"} { | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for deleting metrics for 'default' and 'redo' modes is duplicated. This can be refactored to improve maintainability by iterating over a list of modes. For example:
modes := []string{"default"}
if e.RedoEnable {
modes = append(modes, "redo")
}
for _, mode := range modes {
for _, state := range []string{"mismatch", "uninitialized"} {
metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, mode, state)
metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, mode, state)
}
}| func (e *DispatcherManager) verifyDispatcherSetChecksum(mode int64, actual dispatcherSetFingerprint) heartbeatpb.ChecksumState { | ||
| now := time.Now() | ||
| capture := appcontext.GetID() | ||
| modeLabel := common.StringMode(mode) | ||
| keyspace := e.changefeedID.Keyspace() | ||
| changefeed := e.changefeedID.Name() | ||
|
|
||
| var ( | ||
| state heartbeatpb.ChecksumState | ||
| expectedSeq uint64 | ||
| expectedInit bool | ||
| expectedFP dispatcherSetFingerprint | ||
| oldState heartbeatpb.ChecksumState | ||
| nonOKSince time.Time | ||
| needGaugeUpdate bool | ||
| logRecovered bool | ||
| logNotOKWarn bool | ||
| logNotOKError bool | ||
| recoveredFor time.Duration | ||
| notOKFor time.Duration | ||
| ) | ||
|
|
||
| e.dispatcherSetChecksum.mu.Lock() | ||
| expected := &e.dispatcherSetChecksum.defaultExpected | ||
| runtime := &e.dispatcherSetChecksum.defaultRuntime | ||
| if common.IsRedoMode(mode) { | ||
| expected = &e.dispatcherSetChecksum.redoExpected | ||
| runtime = &e.dispatcherSetChecksum.redoRuntime | ||
| } | ||
|
|
||
| expectedSeq = expected.seq | ||
| expectedInit = expected.initialized | ||
| expectedFP = expected.fingerprint | ||
|
|
||
| if !expected.initialized { | ||
| state = heartbeatpb.ChecksumState_UNINITIALIZED | ||
| } else if !actual.equal(expected.fingerprint) { | ||
| state = heartbeatpb.ChecksumState_MISMATCH | ||
| } else { | ||
| state = heartbeatpb.ChecksumState_OK | ||
| } | ||
|
|
||
| oldState = runtime.state | ||
| nonOKSince = runtime.nonOKSince | ||
| needGaugeUpdate = !runtime.gaugeInitialized || oldState != state | ||
|
|
||
| const ( | ||
| errorAfter = 30 * time.Second | ||
| errorInterval = 30 * time.Second | ||
| ) | ||
|
|
||
| if state == heartbeatpb.ChecksumState_OK { | ||
| if oldState != heartbeatpb.ChecksumState_OK && !runtime.nonOKSince.IsZero() { | ||
| logRecovered = true | ||
| recoveredFor = now.Sub(runtime.nonOKSince) | ||
| } | ||
| runtime.state = state | ||
| runtime.nonOKSince = time.Time{} | ||
| runtime.lastErrorLogTime = time.Time{} | ||
| } else { | ||
| needResetTimer := oldState == heartbeatpb.ChecksumState_OK || oldState != state | ||
| if needResetTimer || runtime.nonOKSince.IsZero() { | ||
| runtime.nonOKSince = now | ||
| runtime.lastErrorLogTime = time.Time{} | ||
| logNotOKWarn = true | ||
| } else { | ||
| notOKFor = now.Sub(runtime.nonOKSince) | ||
| if notOKFor >= errorAfter && now.Sub(runtime.lastErrorLogTime) >= errorInterval { | ||
| runtime.lastErrorLogTime = now | ||
| logNotOKError = true | ||
| } | ||
| } | ||
| runtime.state = state | ||
| nonOKSince = runtime.nonOKSince | ||
| } | ||
| runtime.gaugeInitialized = true | ||
| e.dispatcherSetChecksum.mu.Unlock() | ||
|
|
||
| if needGaugeUpdate { | ||
| setGauge := func(stateLabel string, value float64) { | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.WithLabelValues( | ||
| keyspace, changefeed, capture, modeLabel, stateLabel, | ||
| ).Set(value) | ||
| } | ||
|
|
||
| setGauge("mismatch", 0) | ||
| setGauge("uninitialized", 0) | ||
|
|
||
| if state != heartbeatpb.ChecksumState_OK { | ||
| stateLabel := "mismatch" | ||
| if state == heartbeatpb.ChecksumState_UNINITIALIZED { | ||
| stateLabel = "uninitialized" | ||
| } | ||
| setGauge(stateLabel, 1) | ||
| } | ||
| } | ||
|
|
||
| if logRecovered { | ||
| log.Info("dispatcher set checksum recovered", | ||
| zap.Stringer("changefeedID", e.changefeedID), | ||
| zap.String("capture", capture), | ||
| zap.String("mode", modeLabel), | ||
| zap.Duration("duration", recoveredFor), | ||
| zap.Uint64("expectedSeq", expectedSeq), | ||
| ) | ||
| } | ||
|
|
||
| if logNotOKWarn || logNotOKError { | ||
| level := "warn" | ||
| if logNotOKError { | ||
| level = "error" | ||
| } | ||
| stateStr := "mismatch" | ||
| if state == heartbeatpb.ChecksumState_UNINITIALIZED { | ||
| stateStr = "uninitialized" | ||
| } | ||
| notOKFor = now.Sub(nonOKSince) | ||
| fields := []zap.Field{ | ||
| zap.Stringer("changefeedID", e.changefeedID), | ||
| zap.String("capture", capture), | ||
| zap.String("mode", modeLabel), | ||
| zap.String("state", stateStr), | ||
| zap.Duration("duration", notOKFor), | ||
| zap.Uint64("expectedSeq", expectedSeq), | ||
| zap.Bool("expectedInitialized", expectedInit), | ||
| zap.Uint64("actualCount", actual.count), | ||
| zap.Uint64("actualXorHigh", actual.xorHigh), | ||
| zap.Uint64("actualXorLow", actual.xorLow), | ||
| zap.Uint64("actualSumHigh", actual.sumHigh), | ||
| zap.Uint64("actualSumLow", actual.sumLow), | ||
| zap.Uint64("expectedCount", expectedFP.count), | ||
| zap.Uint64("expectedXorHigh", expectedFP.xorHigh), | ||
| zap.Uint64("expectedXorLow", expectedFP.xorLow), | ||
| zap.Uint64("expectedSumHigh", expectedFP.sumHigh), | ||
| zap.Uint64("expectedSumLow", expectedFP.sumLow), | ||
| zap.String("prevState", oldState.String()), | ||
| } | ||
| if level == "error" { | ||
| log.Error("dispatcher set checksum not ok, skip watermark reporting", fields...) | ||
| } else { | ||
| log.Warn("dispatcher set checksum not ok, skip watermark reporting", fields...) | ||
| } | ||
| } | ||
|
|
||
| return state | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The verifyDispatcherSetChecksum function is quite long and complex, which can make it difficult to understand and maintain. Consider refactoring it by extracting parts of the logic into smaller, well-named helper functions. For example, the metric update logic (lines 285-302) and the logging logic (lines 304-349) could be moved to separate functions.
| for capture, ids := range defaultExpected { | ||
| state, ok := m.defaultNodes[capture] | ||
| if !ok { | ||
| continue | ||
| } | ||
| for _, id := range ids { | ||
| oldCapture, exists := m.defaultDispatcherToNode[id] | ||
| if exists { | ||
| if oldCapture == capture { | ||
| log.Warn("dispatcher already exists in expected set, ignore it", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.String("dispatcherID", id.String()), | ||
| zap.String("capture", capture.String()), | ||
| zap.String("mode", common.StringMode(common.DefaultMode)), | ||
| ) | ||
| continue | ||
| } | ||
| log.Warn("dispatcher exists in another capture, override expected node", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.String("dispatcherID", id.String()), | ||
| zap.String("oldCapture", oldCapture.String()), | ||
| zap.String("newCapture", capture.String()), | ||
| zap.String("mode", common.StringMode(common.DefaultMode)), | ||
| ) | ||
| if oldState, ok := m.defaultNodes[oldCapture]; ok { | ||
| oldState.fingerprint.remove(id) | ||
| } | ||
| } | ||
| m.defaultDispatcherToNode[id] = capture | ||
| state.fingerprint.add(id) | ||
| } | ||
| } | ||
|
|
||
| if m.redoEnabled { | ||
| for capture, ids := range redoExpected { | ||
| state, ok := m.redoNodes[capture] | ||
| if !ok { | ||
| continue | ||
| } | ||
| for _, id := range ids { | ||
| oldCapture, exists := m.redoDispatcherToNode[id] | ||
| if exists { | ||
| if oldCapture == capture { | ||
| log.Warn("dispatcher already exists in expected set, ignore it", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.String("dispatcherID", id.String()), | ||
| zap.String("capture", capture.String()), | ||
| zap.String("mode", common.StringMode(common.RedoMode)), | ||
| ) | ||
| continue | ||
| } | ||
| log.Warn("dispatcher exists in another capture, override expected node", | ||
| zap.Stringer("changefeedID", m.changefeedID), | ||
| zap.String("dispatcherID", id.String()), | ||
| zap.String("oldCapture", oldCapture.String()), | ||
| zap.String("newCapture", capture.String()), | ||
| zap.String("mode", common.StringMode(common.RedoMode)), | ||
| ) | ||
| if oldState, ok := m.redoNodes[oldCapture]; ok { | ||
| oldState.fingerprint.remove(id) | ||
| } | ||
| } | ||
| m.redoDispatcherToNode[id] = capture | ||
| state.fingerprint.add(id) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maintainer/maintainer.go
Outdated
| defaultExpected := make(map[node.ID][]common.DispatcherID, len(nodes)) | ||
| defaultExpected[m.selfNode.ID] = append(defaultExpected[m.selfNode.ID], m.ddlSpan.ID) | ||
| for _, span := range m.controller.spanController.GetReplicating() { | ||
| capture := span.GetNodeID() | ||
| defaultExpected[capture] = append(defaultExpected[capture], span.ID) | ||
| } | ||
|
|
||
| var redoExpected map[node.ID][]common.DispatcherID | ||
| if m.enableRedo { | ||
| redoExpected = make(map[node.ID][]common.DispatcherID, len(nodes)) | ||
| redoExpected[m.selfNode.ID] = append(redoExpected[m.selfNode.ID], m.redoDDLSpan.ID) | ||
| for _, span := range m.controller.redoSpanController.GetReplicating() { | ||
| capture := span.GetNodeID() | ||
| redoExpected[capture] = append(redoExpected[capture], span.ID) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
/gemini review |
|
/gemini summary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a checksum mechanism to verify the consistency of the dispatcher set between the maintainer and dispatcher managers. This is a crucial feature for ensuring data integrity and preventing incorrect checkpoint advancement, especially during dispatcher lifecycle changes. The implementation is comprehensive, covering both the maintainer and dispatcher manager sides, with new protocol messages, metrics, and tests. The logic for incremental updates, acknowledgements, and resends appears robust. Operators are correctly updated to maintain the expected dispatcher set.
One minor suggestion for code improvement is to refactor a small piece of duplicated code in the cleanMetrics function to improve maintainability.
| for _, state := range []string{"mismatch", "uninitialized"} { | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | ||
| } | ||
| if e.RedoEnable { | ||
| for _, state := range []string{"mismatch", "uninitialized"} { | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | ||
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for deleting checksum-related metrics for default and redo modes is duplicated. This can be refactored into a loop over the modes to improve readability and maintainability.
| for _, state := range []string{"mismatch", "uninitialized"} { | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "default", state) | |
| } | |
| if e.RedoEnable { | |
| for _, state := range []string{"mismatch", "uninitialized"} { | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, "redo", state) | |
| } | |
| } | |
| modes := []string{"default"} | |
| if e.RedoEnable { | |
| modes = append(modes, "redo") | |
| } | |
| for _, mode := range modes { | |
| for _, state := range []string{"mismatch", "uninitialized"} { | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKGauge.DeleteLabelValues(keyspace, changefeed, capture, mode, state) | |
| metrics.DispatcherManagerDispatcherSetChecksumNotOKTotal.DeleteLabelValues(keyspace, changefeed, capture, mode, state) | |
| } | |
| } |
|
/test all |
Summary of ChangesThis pull request introduces a robust dispatcher set checksum mechanism to enhance the reliability of watermark advancement in the system. By verifying the consistency of dispatcher configurations on individual captures against the maintainer's expectations, it prevents checkpoint progression in the event of discrepancies. This ensures data integrity and system stability, especially in dynamic environments where dispatcher assignments change. The changes involve new communication protocols, state management, and comprehensive monitoring across both dispatcher and maintainer components. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/test pull-cdc-mysql-integration-light |
1 similar comment
|
/test pull-cdc-mysql-integration-light |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
/test all |
|
/retest |
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note