downstreamadapter: shard dispatcher orchestrator queue#5052
Conversation
|
Skipping CI for Draft Pull Request. |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Code Review
This pull request implements sharding within the DispatcherOrchestrator to allow concurrent processing of changefeed control messages, partitioned by changefeed ID. It also enhances the MySQL sink's metadata management by introducing robust error handling and recovery logic for the ddl_ts_v1 table. Feedback suggests optimizing the shutdown process by closing all shard queues before waiting for workers to terminate and ensuring the shard Run method is idempotent to avoid starting multiple worker goroutines.
| for _, shard := range m.shards { | ||
| shard.Close() | ||
| } |
There was a problem hiding this comment.
Closing shards sequentially in a loop can lead to a cumulative shutdown delay if multiple shards are processing long-running bootstrap or sink initialization requests. Consider closing all shard queues first to signal termination, and then waiting for all shard workers to finish in parallel.
| for _, shard := range m.shards { | |
| shard.Close() | |
| } | |
| for _, shard := range m.shards { | |
| shard.queue.Close() | |
| } | |
| for _, shard := range m.shards { | |
| shard.wg.Wait() | |
| } |
| } | ||
|
|
||
| // Run starts the shard worker loop. | ||
| func (s *orchestratorShard) Run() { |
There was a problem hiding this comment.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
During TiCDC bootstrap recovery, DispatcherOrchestrator currently processes all changefeed control messages through one global pending queue and one worker. When a bootstrap request blocks on sink initialization, unrelated changefeeds on the same node queue behind it and recovery lag grows quickly.
This change keeps the existing bootstrap and retry semantics intact, but removes the global head-of-line blocking at the orchestrator queue layer.
Issue Number: ref #0
What is changed and how it works?
changefeedID.Id.Hash(...).(changefeedID, msgType)de-duplication semantics inside each shard.Check List
Tests
SDKROOT=$(xcrun --show-sdk-path) CC=$(xcrun --find clang) CXX=$(xcrun --find clang++) CGO_CFLAGS="-isysroot $(xcrun --show-sdk-path)" CGO_LDFLAGS="-isysroot $(xcrun --show-sdk-path) -L$(xcrun --show-sdk-path)/usr/lib -F$(xcrun --show-sdk-path)/System/Library/Frameworks" go test ./downstreamadapter/dispatcherorchestratorQuestions
Will it cause performance regression or break compatibility?
No compatibility change is expected. The change only increases concurrency across shards; per-shard processing order and message semantics stay the same.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note