Skip to content
Closed

[DNM] #3786

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions logservice/logpuller/batch_resolved_ts_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package logpuller

type batchResolvedTsEntry struct {
state *regionFeedState
resolvedTs uint64
}

// batchResolvedTsEvent groups multiple resolved ts updates of the same subscription.
type batchResolvedTsEvent struct {
subscriptionID SubscriptionID
entries []batchResolvedTsEntry
}

func newBatchResolvedTsEvent(subID SubscriptionID, capacity int) *batchResolvedTsEvent {
return &batchResolvedTsEvent{
subscriptionID: subID,
entries: make([]batchResolvedTsEntry, 0, capacity),
}
}

func (b *batchResolvedTsEvent) add(state *regionFeedState, resolvedTs uint64) {
if state == nil {
return
}
b.entries = append(b.entries, batchResolvedTsEntry{
state: state,
resolvedTs: resolvedTs,
})
}

func (b *batchResolvedTsEvent) len() int {
return len(b.entries)
}

func (b *batchResolvedTsEvent) minResolvedTs() uint64 {
if len(b.entries) == 0 {
return 0
}
min := b.entries[0].resolvedTs
for _, entry := range b.entries[1:] {
if entry.resolvedTs < min {
min = entry.resolvedTs
}
}
return min
}
177 changes: 127 additions & 50 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/spanz"
Expand All @@ -42,8 +43,9 @@ type regionEvent struct {
worker *regionRequestWorker // TODO: remove the field

// only one of the following fields will be set
entries *cdcpb.Event_Entries_
resolvedTs uint64
entries *cdcpb.Event_Entries_
resolvedTs uint64
batchResolvedTs *batchResolvedTsEvent
}

func (event *regionEvent) getSize() int {
Expand All @@ -61,6 +63,11 @@ func (event *regionEvent) getSize() int {
size += len(row.OldValue)
}
}
if event.batchResolvedTs != nil {
size += int(unsafe.Sizeof(*event.batchResolvedTs))
entrySize := int(unsafe.Sizeof(batchResolvedTsEntry{}))
size += len(event.batchResolvedTs.entries) * entrySize
}
return size
}

Expand All @@ -69,6 +76,12 @@ type regionEventHandler struct {
}

func (h *regionEventHandler) Path(event regionEvent) SubscriptionID {
if event.batchResolvedTs != nil {
return event.batchResolvedTs.subscriptionID
}
if event.state == nil {
log.Panic("region event has no state", zap.Any("event", event))
}
return SubscriptionID(event.state.requestID)
}

Expand All @@ -79,23 +92,38 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
zap.Uint64("subscriptionID", uint64(span.subID)))
}

newResolvedTs := uint64(0)
updatedStates := make([]*regionFeedState, 0)
for _, event := range events {
switch {
case event.batchResolvedTs != nil:
for _, batchEvent := range event.batchResolvedTs.entries {
if batchEvent.state.isStale() {
h.handleRegionError(batchEvent.state, event.worker)
continue
}
if handleResolvedTs(batchEvent.state, batchEvent.resolvedTs) {
updatedStates = append(updatedStates, batchEvent.state)
}
}
continue
case event.state == nil:
log.Panic("region event has no state", zap.Any("event", event))
}
if event.state.isStale() {
h.handleRegionError(event.state, event.worker)
continue
}
if event.entries != nil {
handleEventEntries(span, event.state, event.entries)
} else if event.resolvedTs != 0 {
resolvedTs := handleResolvedTs(span, event.state, event.resolvedTs)
if resolvedTs > newResolvedTs {
newResolvedTs = resolvedTs
if handleResolvedTs(event.state, event.resolvedTs) {
updatedStates = append(updatedStates, event.state)
}
} else {
log.Panic("should not reach", zap.Any("event", event), zap.Any("events", events))
}
}
newResolvedTs := h.updateSpanResolvedTs(span, updatedStates)
tryAdvanceResolvedTs := func() {
if newResolvedTs != 0 {
span.advanceResolvedTs(newResolvedTs)
Expand Down Expand Up @@ -144,33 +172,50 @@ func (h *regionEventHandler) GetTimestamp(event regionEvent) dynstream.Timestamp
}
}
return 0
} else if event.batchResolvedTs != nil {
return dynstream.Timestamp(event.batchResolvedTs.minResolvedTs())
} else {
return dynstream.Timestamp(event.resolvedTs)
}
}
func (h *regionEventHandler) IsPaused(event regionEvent) bool { return false }

func (h *regionEventHandler) GetType(event regionEvent) dynstream.EventType {
if event.entries != nil || event.resolvedTs != 0 {
if event.entries != nil || event.resolvedTs != 0 || event.batchResolvedTs != nil {
// Note: resolved ts may be from different regions, so they are not periodic signal
return dynstream.EventType{DataGroup: DataGroupEntriesOrResolvedTs, Property: dynstream.BatchableData}
} else if event.state.isStale() {
} else if event.state != nil && event.state.isStale() {
return dynstream.EventType{DataGroup: DataGroupError, Property: dynstream.BatchableData}
} else {
log.Panic("unknown event type",
zap.Uint64("regionID", event.state.getRegionID()),
zap.Uint64("requestID", event.state.requestID),
zap.Uint64("workerID", event.worker.workerID))
}
log.Panic("unknown event type", zap.Any("event", event))
return dynstream.DefaultEventType
}

func (h *regionEventHandler) OnDrop(event regionEvent) interface{} {
// TODO: Distinguish between drop events caused by "path not found" errors and memory control.
if event.batchResolvedTs != nil {
workerID := uint64(0)
if event.worker != nil {
workerID = event.worker.workerID
}
log.Warn("drop batch resolved ts event",
zap.Uint64("subscriptionID", uint64(event.batchResolvedTs.subscriptionID)),
zap.Int("resolvedEntries", len(event.batchResolvedTs.entries)),
zap.Uint64("workerID", workerID))
return nil
}
if event.state == nil {
log.Warn("drop region event without state", zap.Any("event", event))
return nil
}
workerID := uint64(0)
if event.worker != nil {
workerID = event.worker.workerID
}
log.Warn("drop region event",
zap.Uint64("regionID", event.state.getRegionID()),
zap.Uint64("requestID", event.state.requestID),
zap.Uint64("workerID", event.worker.workerID),
zap.Uint64("workerID", workerID),
zap.Bool("hasEntries", event.entries != nil),
zap.Bool("stateIsStale", event.state.isStale()))
return nil
Comment on lines +196 to 221

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to get workerID is duplicated. It can be extracted to the top of the function to improve readability and avoid code duplication. Also, the workerID is not logged in the event.state == nil case, which might be useful for debugging.

	workerID := uint64(0)
	if event.worker != nil {
		workerID = event.worker.workerID
	}

	if event.batchResolvedTs != nil {
		log.Warn("drop batch resolved ts event",
			zap.Uint64("subscriptionID", uint64(event.batchResolvedTs.subscriptionID)),
			zap.Int("resolvedEntries", len(event.batchResolvedTs.entries)),
			zap.Uint64("workerID", workerID))
		return nil
	}
	if event.state == nil {
		log.Warn("drop region event without state", zap.Any("event", event), zap.Uint64("workerID", workerID))
		return nil
	}
	log.Warn("drop region event",
		zap.Uint64("regionID", event.state.getRegionID()),
		zap.Uint64("requestID", event.state.requestID),
		zap.Uint64("workerID", workerID),
		zap.Bool("hasEntries", event.entries != nil),
		zap.Bool("stateIsStale", event.state.isStale()))
	return nil

Expand Down Expand Up @@ -290,9 +335,9 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c
}
}

func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs uint64) uint64 {
func handleResolvedTs(state *regionFeedState, resolvedTs uint64) bool {
if state.isStale() || !state.isInitialized() {
return 0
return false
}
state.matcher.tryCleanUnmatchedValue()
regionID := state.getRegionID()
Expand All @@ -303,45 +348,77 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("lastResolvedTs", lastResolvedTs))
return 0
return false
}
state.updateResolvedTs(resolvedTs)
span.rangeLock.UpdateLockedRangeStateHeap(state.region.lockedRangeState)
return true
}

now := time.Now().UnixMilli()
lastAdvance := span.lastAdvanceTime.Load()
if now-lastAdvance >= span.advanceInterval && span.lastAdvanceTime.CompareAndSwap(lastAdvance, now) {
ts := span.rangeLock.GetHeapMinTs()
if ts > 0 && span.initialized.CompareAndSwap(false, true) {
log.Info("subscription client is initialized",
zap.Uint64("subscriptionID", uint64(span.subID)),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts))
func (h *regionEventHandler) updateSpanResolvedTs(span *subscribedSpan, updatedStates []*regionFeedState) uint64 {
if len(updatedStates) == 0 {
return 0
}
uniqueStates := make([]*regionFeedState, 0, len(updatedStates))
seen := make(map[*regionFeedState]struct{}, len(updatedStates))
for _, state := range updatedStates {
if state == nil {
continue
}
lastResolvedTs := span.resolvedTs.Load()
nextResolvedPhyTs := oracle.ExtractPhysical(ts)
// Generally, we don't want to send duplicate resolved ts,
// so we check whether `ts` is larger than `lastResolvedTs` before send it.
// but when `ts` == `lastResolvedTs` == `span.startTs`,
// the span may just be initialized and have not receive any resolved ts before,
// so we also send ts in this case for quick notification to downstream.
if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) {
resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs)
decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3
const largeResolvedTsAdvanceStepInSecs = 30
if decreaseLag > largeResolvedTsAdvanceStepInSecs {
log.Warn("resolved ts advance step is too large",
zap.Uint64("subID", uint64(span.subID)),
zap.Int64("tableID", span.span.TableID),
zap.Uint64("regionID", regionID),
zap.Uint64("resolvedTs", ts),
zap.Uint64("lastResolvedTs", lastResolvedTs),
zap.Float64("decreaseLag(s)", decreaseLag))
}
span.resolvedTs.Store(ts)
span.resolvedTsUpdated.Store(time.Now().Unix())
return ts
if _, ok := seen[state]; ok {
continue
}
seen[state] = struct{}{}
uniqueStates = append(uniqueStates, state)
}
if len(uniqueStates) == 0 {
return 0
}
lockStates := make([]*regionlock.LockedRangeState, 0, len(uniqueStates))
for _, state := range uniqueStates {
if state.region.lockedRangeState != nil {
lockStates = append(lockStates, state.region.lockedRangeState)
}
}
if len(lockStates) == 0 {
return 0
}
span.rangeLock.UpdateLockedRangeStateHeapBatch(lockStates)

ts := span.rangeLock.GetHeapMinTs()
if ts == 0 {
return 0
}

firstRegionID := uniqueStates[0].getRegionID()
if span.initialized.CompareAndSwap(false, true) {
log.Info("subscription client is initialized",
zap.Uint64("subscriptionID", uint64(span.subID)),
zap.Uint64("regionID", firstRegionID),
zap.Uint64("resolvedTs", ts))
}
lastResolvedTs := span.resolvedTs.Load()
nextResolvedPhyTs := oracle.ExtractPhysical(ts)
// Generally, we don't want to send duplicate resolved ts,
// so we check whether `ts` is larger than `lastResolvedTs` before send it.
// but when `ts` == `lastResolvedTs` == `span.startTs`,
// the span may just be initialized and have not receive any resolved ts before,
// so we also send ts in this case for quick notification to downstream.
if ts > lastResolvedTs || (ts == lastResolvedTs && lastResolvedTs == span.startTs) {
resolvedPhyTs := oracle.ExtractPhysical(lastResolvedTs)
decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3
const largeResolvedTsAdvanceStepInSecs = 30
if decreaseLag > largeResolvedTsAdvanceStepInSecs {
log.Warn("resolved ts advance step is too large",
zap.Uint64("subID", uint64(span.subID)),
zap.Int64("tableID", span.span.TableID),
zap.Uint64("regionID", firstRegionID),
zap.Uint64("resolvedTs", ts),
zap.Uint64("lastResolvedTs", lastResolvedTs),
zap.Float64("decreaseLag(s)", decreaseLag))
}
span.resolvedTs.Store(ts)
span.resolvedTsUpdated.Store(time.Now().Unix())
return ts
}
return 0
}
Loading