Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Dispatcher interface {
SetTryRemoving()
GetHeartBeatInfo(h *HeartBeatInfo)
GetComponentStatus() heartbeatpb.ComponentState
GetBlockEventStatus() *heartbeatpb.State
GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus
GetEventSizePerSecond() float32
IsTableTriggerEventDispatcher() bool
Expand Down Expand Up @@ -133,11 +134,15 @@ type BasicDispatcher struct {
skipSyncpointAtStartTs bool
// skipDMLAsStartTs indicates whether to skip DML events at startTs+1 timestamp.
// When true, the dispatcher should filter out DML events with commitTs == startTs+1, but keep DDL events.
// This flag is set to true ONLY when is_syncpoint=false AND finished=0 in ddl-ts table (non-syncpoint DDL not finished).
// In this case, we return startTs = ddlTs-1 to replay the DDL, and skip the already-written DML at ddlTs
// to avoid duplicate writes while ensuring the DDL is replayed.
// This flag is set to true in two scenarios:
// 1. when is_syncpoint=false AND finished=0 in ddl-ts table (non-syncpoint DDL not finished).
// In this case, we return startTs = ddlTs-1 to replay the DDL, and skip the already-written DML at ddlTs
// to avoid duplicate writes while ensuring the DDL is replayed.
// Note: When is_syncpoint=true AND finished=0 (DDL finished but syncpoint not finished),
// skipDMLAsStartTs is false because the DDL is already completed and DML should be processed normally.
// 2. maintainer ask dispatcher to make a move operator, while the dispatcher just dealing with a ddl event.
// and the block state for ddl event is still waiting.
// In this case, we also return startTs = ddlTs-1 to replay the DDL but skip the dmls.
skipDMLAsStartTs bool
// The ts from pdClock when the dispatcher is created.
// when downstream is mysql-class, for dml event we need to compare the commitTs with this ts
Expand Down Expand Up @@ -443,10 +448,10 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
}

// Skip DML events at startTs+1 when skipDMLAsStartTs is true.
// This handles the corner case where a DDL at ts=X crashed after writing DML but before marking finished.
// We return startTs=X-1 to replay the DDL, but need to skip the already-written DML at ts=X (startTs+1).
// This flag is used when a dispatcher starts from (blockTs-1) to replay the DDL at blockTs,
// while avoiding potential duplicate DML writes at blockTs.
if d.skipDMLAsStartTs && event.GetCommitTs() == d.startTs+1 {
log.Info("skip DML event at startTs+1 due to DDL crash recovery",
log.Info("skip DML event at startTs+1 due to skipDMLAsStartTs",
zap.Stringer("dispatcher", d.id),
zap.Uint64("startTs", d.startTs),
zap.Uint64("dmlCommitTs", event.GetCommitTs()),
Expand Down Expand Up @@ -609,25 +614,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
actionCommitTs := action.CommitTs
actionIsSyncPoint := action.IsSyncPoint
d.sharedInfo.GetBlockEventExecutor().Submit(d, func() {
failpoint.Inject("BlockOrWaitBeforeWrite", nil)
err := d.AddBlockEventToSink(pendingEvent)
if err != nil {
d.HandleError(err)
return
}
failpoint.Inject("BlockOrWaitReportAfterWrite", nil)

d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
GetDispatcherStatusDynamicStream().Wake(d.id)
d.ExecuteBlockEventDDL(pendingEvent, actionCommitTs, actionIsSyncPoint)
})
return true
} else {
Expand Down Expand Up @@ -661,6 +648,28 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
return false
}

func (d *BasicDispatcher) ExecuteBlockEventDDL(pendingEvent commonEvent.BlockEvent, actionCommitTs uint64, actionIsSyncPoint bool) {
failpoint.Inject("BlockOrWaitBeforeWrite", nil)
err := d.AddBlockEventToSink(pendingEvent)
if err != nil {
d.HandleError(err)
return
}
failpoint.Inject("BlockOrWaitReportAfterWrite", nil)

d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
GetDispatcherStatusDynamicStream().Wake(d.id)
}

// shouldBlock check whether the event should be blocked(to wait maintainer response)
// For the ddl event with more than one blockedTable, it should block.
// For the ddl event with only one blockedTable, it should block only if the table is not complete span.
Expand Down
3 changes: 2 additions & 1 deletion downstreamadapter/dispatcher/redo_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewRedoDispatcher(
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
skipSyncpointAtStartTs bool,
skipDMLAsStartTs bool,
sink sink.Sink,
sharedInfo *SharedInfo,
) *RedoDispatcher {
Expand All @@ -54,7 +55,7 @@ func NewRedoDispatcher(
schemaID,
schemaIDToDispatchers,
skipSyncpointAtStartTs,
false, // skipDMLAsStartTs is not needed for redo dispatcher
skipDMLAsStartTs,
0,
common.RedoMode,
sink,
Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/redo_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan)
1, // schemaID
NewSchemaIDToDispatchers(),
false, // skipSyncpointAtStartTs
false, // skipDMLAsStartTs
sink,
sharedInfo,
)
Expand Down
13 changes: 10 additions & 3 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (e *DispatcherManager) newEventDispatchers(infos map[common.DispatcherID]di
start := time.Now()
currentPdTs := e.pdClock.CurrentTS()

dispatcherIds, tableIds, startTsList, tableSpans, schemaIds := prepareCreateDispatcher(infos, e.dispatcherMap)
dispatcherIds, tableIds, startTsList, tableSpans, schemaIds, scheduleSkipDMLAsStartTsList := prepareCreateDispatcher(infos, e.dispatcherMap)
if len(dispatcherIds) == 0 {
return nil
}
Expand Down Expand Up @@ -423,14 +423,20 @@ func (e *DispatcherManager) newEventDispatchers(infos map[common.DispatcherID]di
}

for idx, id := range dispatcherIds {
skipDMLAsStartTs := resolveSkipDMLAsStartTs(
newStartTsList[idx],
startTsList[idx],
scheduleSkipDMLAsStartTsList[idx],
skipDMLAsStartTsList[idx],
)
d := dispatcher.NewEventDispatcher(
id,
tableSpans[idx],
uint64(newStartTsList[idx]),
schemaIds[idx],
e.schemaIDToDispatchers,
skipSyncpointAtStartTsList[idx],
skipDMLAsStartTsList[idx],
skipDMLAsStartTs,
currentPdTs,
e.sink,
e.sharedInfo,
Expand Down Expand Up @@ -467,7 +473,8 @@ func (e *DispatcherManager) newEventDispatchers(infos map[common.DispatcherID]di
zap.Stringer("changefeedID", e.changefeedID),
zap.Stringer("dispatcherID", id),
zap.String("tableSpan", common.FormatTableSpan(tableSpans[idx])),
zap.Int64("startTs", newStartTsList[idx]))
zap.Int64("startTs", newStartTsList[idx]),
zap.Bool("skipDMLAsStartTs", skipDMLAsStartTs))
}
e.metricCreateDispatcherDuration.Observe(time.Since(start).Seconds() / float64(len(dispatcherIds)))
log.Info("batch create new dispatchers",
Expand Down
16 changes: 14 additions & 2 deletions downstreamadapter/dispatchermanager/dispatcher_manager_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ func getDispatcherStatus(id common.DispatcherID, dispatcherItem dispatcher.Dispa
}

func prepareCreateDispatcher[T dispatcher.Dispatcher](infos map[common.DispatcherID]dispatcherCreateInfo, dispatcherMap *DispatcherMap[T]) (
[]common.DispatcherID, []int64, []int64, []*heartbeatpb.TableSpan, []int64,
[]common.DispatcherID, []int64, []int64, []*heartbeatpb.TableSpan, []int64, []bool,
) {
dispatcherIds := make([]common.DispatcherID, 0, len(infos))
tableIds := make([]int64, 0, len(infos))
startTsList := make([]int64, 0, len(infos))
tableSpans := make([]*heartbeatpb.TableSpan, 0, len(infos))
schemaIds := make([]int64, 0, len(infos))
skipDMLAsStartTsList := make([]bool, 0, len(infos))
for _, info := range infos {
id := info.Id
if _, ok := dispatcherMap.Get(id); ok {
Expand All @@ -89,8 +90,19 @@ func prepareCreateDispatcher[T dispatcher.Dispatcher](infos map[common.Dispatche
startTsList = append(startTsList, int64(info.StartTs))
tableSpans = append(tableSpans, info.TableSpan)
schemaIds = append(schemaIds, info.SchemaID)
skipDMLAsStartTsList = append(skipDMLAsStartTsList, info.SkipDMLAsStartTs)
}
return dispatcherIds, tableIds, startTsList, tableSpans, schemaIds
return dispatcherIds, tableIds, startTsList, tableSpans, schemaIds, skipDMLAsStartTsList
}

func resolveSkipDMLAsStartTs(newStartTs, originalStartTs int64, scheduleSkipDMLAsStartTs, sinkSkipDMLAsStartTs bool) bool {
// When the sink keeps startTs unchanged, preserve the skipDMLAsStartTs decision carried by the
// scheduling request (e.g. recreating a dispatcher during an in-flight DDL barrier).
// If the sink adjusts startTs (e.g. based on ddl_ts recovery), the sink decision dominates.
if newStartTs == originalStartTs {
return scheduleSkipDMLAsStartTs || sinkSkipDMLAsStartTs
}
return sinkSkipDMLAsStartTs
}

func prepareMergeDispatcher[T dispatcher.Dispatcher](changefeedID common.ChangeFeedID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type dispatcherCreateInfo struct {
TableSpan *heartbeatpb.TableSpan
StartTs uint64
SchemaID int64
// SkipDMLAsStartTs indicates whether to skip DML events at (StartTs+1).
// It is used when a dispatcher is recreated during an in-flight DDL barrier:
// we need to replay the DDL by starting from (blockTs-1), while avoiding
// potential duplicate DML writes at blockTs.
SkipDMLAsStartTs bool
}

type cleanMap struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *DispatcherManager) NewTableTriggerRedoDispatcher(id *heartbeatpb.Dispat
func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dispatcherCreateInfo, removeDDLTs bool) error {
start := time.Now()

dispatcherIds, _, startTsList, tableSpans, schemaIds := prepareCreateDispatcher(infos, e.redoDispatcherMap)
dispatcherIds, _, startTsList, tableSpans, schemaIds, scheduleSkipDMLAsStartTsList := prepareCreateDispatcher(infos, e.redoDispatcherMap)
if len(dispatcherIds) == 0 {
return nil
}
Expand All @@ -145,6 +145,7 @@ func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dis
schemaIds[idx],
e.redoSchemaIDToDispatchers,
false, // skipSyncpointAtStartTs
scheduleSkipDMLAsStartTsList[idx],
e.redoSink,
e.sharedInfo,
)
Expand Down Expand Up @@ -172,7 +173,8 @@ func (e *DispatcherManager) newRedoDispatchers(infos map[common.DispatcherID]dis
zap.Stringer("changefeedID", e.changefeedID),
zap.Stringer("dispatcherID", id),
zap.String("tableSpan", common.FormatTableSpan(tableSpans[idx])),
zap.Int64("startTs", startTsList[idx]))
zap.Int64("startTs", startTsList[idx]),
zap.Bool("skipDMLAsStartTs", scheduleSkipDMLAsStartTsList[idx]))
}
e.metricRedoCreateDispatcherDuration.Observe(time.Since(start).Seconds() / float64(len(dispatcherIds)))
log.Info("batch create new redo dispatchers",
Expand Down Expand Up @@ -207,6 +209,7 @@ func (e *DispatcherManager) mergeRedoDispatcher(dispatcherIDs []common.Dispatche
schemaID,
e.redoSchemaIDToDispatchers,
false, // skipSyncpointAtStartTs
false, // skipDMLAsStartTs
e.redoSink,
e.sharedInfo,
)
Expand Down
9 changes: 5 additions & 4 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ func (h *SchedulerDispatcherRequestHandler) Handle(dispatcherManager *Dispatcher
switch req.ScheduleAction {
case heartbeatpb.ScheduleAction_Create:
info := dispatcherCreateInfo{
Id: dispatcherID,
TableSpan: config.Span,
StartTs: config.StartTs,
SchemaID: config.SchemaID,
Id: dispatcherID,
TableSpan: config.Span,
StartTs: config.StartTs,
SchemaID: config.SchemaID,
SkipDMLAsStartTs: config.SkipDMLAsStartTs,
}
if common.IsRedoMode(config.Mode) {
redoInfos[dispatcherID] = info
Expand Down
Loading