consumer: change message acknowledgment to cumulative for pulsar consumer#5537
consumer: change message acknowledgment to cumulative for pulsar consumer#5537wk989898 wants to merge 1 commit into
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughIn ChangesPulsar Consumer Ack Semantics
Estimated code review effort🎯 2 (Simple) | ⏱️ ~5 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces individual message acknowledgment (AckID) with cumulative acknowledgment (AckIDCumulative) in the Pulsar consumer. The reviewer identified a critical bug in this change: using cumulative acknowledgment for all messages, including DDL messages, can lead to silent data loss if the consumer crashes before buffered DMLs of other tables are flushed. The reviewer recommends using cumulative acknowledgment only for ResolvedTS messages and individual acknowledgment for DDL messages, and has provided a detailed code suggestion to implement this strategy.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| continue | ||
| } | ||
| err := c.pulsarConsumer.AckID(consumerMsg.ID()) | ||
| err := c.pulsarConsumer.AckIDCumulative(consumerMsg.ID()) |
There was a problem hiding this comment.
Critical Bug: Potential Silent Data Loss with Cumulative Acknowledgment on DDLs
Changing the acknowledgment to AckIDCumulative for all committed messages introduces a critical data loss vulnerability when processing DDL messages.
Why this happens:
- When a DDL message is received,
WriteMessageonly flushes the DMLs of the tables blocked by this DDL (viaflushDDLEvent). - DMLs of other (non-blocked) tables remain buffered in memory (
progress.eventsGroup) and are not flushed to the downstream database. - Since the DDL is executed successfully,
WriteMessagereturnsneedCommit = true. - The consumer then calls
AckIDCumulative(DDL_Msg.ID()), which cumulatively acknowledges all messages up to the DDL message, including the unflushed DMLs of other tables. - If the consumer crashes or restarts before those other DMLs are flushed (which only happens when a subsequent
ResolvedTSis processed), those DMLs will never be redelivered because they were already acknowledged in Pulsar. This results in silent data loss.
Recommended Solution:
- ResolvedTS Messages: Use
AckIDCumulativebecause a ResolvedTS flush guarantees that all DMLs of all tables up to the watermark are fully flushed downstream. - DDL Messages: Use individual acknowledgment (
AckID) so that only the DDL message itself is acknowledged, leaving any unflushed DMLs of other tables unacknowledged and safe for redelivery upon restart.
To implement this, you can modify WriteMessage to return an acknowledgment strategy or type, and apply the appropriate ack method in consumer.go:
// In writer.go
type AckStrategy int
const (
AckStrategyNone AckStrategy = iota
AckStrategyIndividual
AckStrategyCumulative
)
func (w *writer) WriteMessage(ctx context.Context, message pulsar.Message) AckStrategy {
// ...
if needFlush {
if w.Write(ctx, messageType) {
if messageType == common.MessageTypeResolved {
return AckStrategyCumulative
}
return AckStrategyIndividual
}
}
return AckStrategyNone
}And in consumer.go:
ackStrategy := c.writer.WriteMessage(ctx, consumerMsg)
switch ackStrategy {
case AckStrategyCumulative:
err := c.pulsarConsumer.AckIDCumulative(consumerMsg.ID())
if err != nil {
log.Panic("Error ack message cumulatively", zap.Error(err))
}
case AckStrategyIndividual:
err := c.pulsarConsumer.AckID(consumerMsg.ID())
if err != nil {
log.Panic("Error ack message individually", zap.Error(err))
}
}|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit