diff --git a/go/base/context.go b/go/base/context.go index 26d13fe07..182b51576 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -274,9 +274,28 @@ type MigrationContext struct { SkipMetadataLockCheck bool IsOpenMetadataLockInstruments bool + // MTS parallel apply configuration + NumWorkers int + BinlogHasLogicalTimestamps bool + LogicalTimestampsDetected chan struct{} + logicalTimestampsDetectOnce sync.Once + Log Logger } +// NotifyLogicalTimestampsDetection closes LogicalTimestampsDetected once so MTS +// startup can proceed. found=true when binlog carries logical timestamps (MySQL 5.7+). +func (mctx *MigrationContext) NotifyLogicalTimestampsDetection(found bool) { + mctx.logicalTimestampsDetectOnce.Do(func() { + if found { + mctx.BinlogHasLogicalTimestamps = true + } + if mctx.LogicalTimestampsDetected != nil { + close(mctx.LogicalTimestampsDetected) + } + }) +} + type Logger interface { Debug(args ...interface{}) Debugf(format string, args ...interface{}) diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index 7620281d2..a346305bd 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -13,8 +13,10 @@ import ( // BinlogEntry describes an entry in the binary log type BinlogEntry struct { - Coordinates mysql.BinlogCoordinates - DmlEvent *BinlogDMLEvent + Coordinates mysql.BinlogCoordinates + DmlEvent *BinlogDMLEvent + LastCommitted int64 // logical timestamp of commit parent (0 = SEQ_UNINIT, unavailable) + SequenceNumber int64 // monotonically increasing logical timestamp (0 = SEQ_UNINIT) } // NewBinlogEntryAt creates an empty, ready to go BinlogEntry object diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 189a5f399..03796272d 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -85,7 +85,7 @@ func (gmr *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates return gmr.currentCoordinates.Clone() } -func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { +func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry, lastCommitted int64, sequenceNumber int64) error { currentCoords := gmr.GetCurrentBinlogCoordinates() dml := ToEventDML(ev.Header.EventType.String()) if dml == NotDML { @@ -98,6 +98,8 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent continue } binlogEntry := NewBinlogEntryAt(currentCoords) + binlogEntry.LastCommitted = lastCommitted + binlogEntry.SequenceNumber = sequenceNumber binlogEntry.DmlEvent = NewBinlogDMLEvent( string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table), @@ -130,6 +132,10 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent // StreamEvents func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { + var currentLastCommitted int64 + var currentSequenceNumber int64 + logicalTimestampsDetected := false + for !canStopStreaming() { ev, err := gmr.binlogStreamer.GetEvent(context.Background()) if err != nil { @@ -156,6 +162,14 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan if !gmr.migrationContext.UseGTIDs { continue } + // Capture logical timestamps for MTS dependency tracking + currentLastCommitted = event.LastCommitted + currentSequenceNumber = event.SequenceNumber + // Detect whether binlog contains logical timestamps (MySQL 5.7+) + if !logicalTimestampsDetected && (event.LastCommitted > 0 || event.SequenceNumber > 0) { + logicalTimestampsDetected = true + gmr.migrationContext.NotifyLogicalTimestampsDetection(true) + } sid, err := uuid.FromBytes(event.SID) if err != nil { return err @@ -184,12 +198,13 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan gmr.LastTrxCoords = gmr.currentCoordinates.Clone() } case *replication.RowsEvent: - if err := gmr.handleRowsEvent(ev, event, entriesChannel); err != nil { + if err := gmr.handleRowsEvent(ev, event, entriesChannel, currentLastCommitted, currentSequenceNumber); err != nil { return err } } } gmr.migrationContext.Log.Debugf("done streaming events") + gmr.migrationContext.NotifyLogicalTimestampsDetection(logicalTimestampsDetected) return nil } diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d77046231..13583c64e 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -124,6 +124,7 @@ func main() { exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)") + numWorkers := flag.Int("num-workers", 1, "number of parallel DML apply workers (MTS mode). Requires MySQL 5.7+ with binlog logical timestamps. Default: 1 (single-threaded, backward compatible)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL") @@ -377,6 +378,14 @@ func main() { migrationContext.SetChunkSize(*chunkSize) migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) + if *numWorkers < 1 { + migrationContext.Log.Warningf("invalid --num-workers=%d; using 1", *numWorkers) + *numWorkers = 1 + } + migrationContext.NumWorkers = *numWorkers + if migrationContext.NumWorkers > 1 { + migrationContext.LogicalTimestampsDetected = make(chan struct{}) + } migrationContext.SetThrottleQuery(*throttleQuery) migrationContext.SetThrottleHTTP(*throttleHTTP) migrationContext.SetIgnoreHTTPErrors(*ignoreHTTPErrors) diff --git a/go/logic/applier.go b/go/logic/applier.go index f3474b3ef..bfb2782c5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -296,6 +296,18 @@ func (apl *Applier) releaseMigrationLock() { apl.migrationLockConn = nil } +// adoptDMLQueryBuildersFrom shares read-only DML query builders from the primary applier. +// The primary applier must have called prepareQueries() before MTS workers start. +func (apl *Applier) adoptDMLQueryBuildersFrom(source *Applier) error { + if source.dmlInsertQueryBuilder == nil { + return fmt.Errorf("primary applier DML query builders are not prepared") + } + apl.dmlDeleteQueryBuilder = source.dmlDeleteQueryBuilder + apl.dmlInsertQueryBuilder = source.dmlInsertQueryBuilder + apl.dmlUpdateQueryBuilder = source.dmlUpdateQueryBuilder + return nil +} + func (apl *Applier) prepareQueries() (err error) { if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( apl.migrationContext.DatabaseName, @@ -462,6 +474,31 @@ func (apl *Applier) AttemptInstantDDL() error { }, apl.migrationContext.MaxRetries(), apl.migrationContext.Log) } +// isDeadlockError checks whether the given error is a MySQL InnoDB deadlock (errno 1213). +func isDeadlockError(err error) bool { + var mysqlErr *drivermysql.MySQLError + return errors.As(err, &mysqlErr) && mysqlErr.Number == 1213 +} + +// isRetryableApplyError reports whether a failed DML apply is a transient +// concurrency error that should be retried. Under concurrent MTS workers the +// following errors are expected: InnoDB deadlocks (1213), lock-wait timeouts +// (1205), and NOWAIT lock failures (3572). Gap locks on the ghost table's +// secondary indexes cause contention between parallel statements. All are +// resolved by retrying the whole transaction, mirroring MySQL's +// slave_transaction_retries behaviour. +func isRetryableApplyError(err error) bool { + var mysqlErr *drivermysql.MySQLError + if !errors.As(err, &mysqlErr) { + return false + } + switch mysqlErr.Number { + case 1205, 1213, 3572: + return true + } + return false +} + // retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout // (errno 1205). Non-timeout errors return immediately. This is used for instant // DDL attempts where the operation may be blocked by a long-running transaction. @@ -1660,12 +1697,16 @@ func (apl *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBu case binlog.UpdateDML: { if _, isModified := apl.updateModifiesUniqueKeyColumns(dmlEvent); isModified { - results := make([]*dmlBuildResult, 0, 2) - dmlEvent.DML = binlog.DeleteDML - results = append(results, apl.buildDMLEventQuery(dmlEvent)...) - dmlEvent.DML = binlog.InsertDML - results = append(results, apl.buildDMLEventQuery(dmlEvent)...) - return results + // A unique-key-modifying UPDATE is split into DELETE + INSERT. + // We must NOT mutate dmlEvent.DML here: the same event may be + // re-applied (e.g. MTS deadlock retry), and a mutated DML would + // be rebuilt as the wrong statement, corrupting the ghost table. + deleteQuery, deleteArgs, deleteErr := apl.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues()) + insertQuery, insertArgs, insertErr := apl.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues()) + return []*dmlBuildResult{ + newDmlBuildResult(deleteQuery, deleteArgs, -1, deleteErr), + newDmlBuildResult(insertQuery, insertArgs, 1, insertErr), + } } query, updateArgs, err := apl.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) args := sqlutils.Args() @@ -1778,9 +1819,8 @@ func (apl *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gos } // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table -func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { +func (apl *Applier) ApplyDMLEventQueries(ctx context.Context, dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 - ctx := context.Background() err := func() error { conn, err := apl.db.Conn(ctx) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 85a5a01d3..219123e0f 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -185,6 +185,47 @@ func TestApplierBuildDMLEventQuery(t *testing.T) { require.Equal(t, 123456, res[0].args[2]) require.Equal(t, 42, res[0].args[3]) }) + + t.Run("update modifying unique key does not mutate event and is retry-safe", func(t *testing.T) { + // A UPDATE that changes the unique key splits into DELETE + INSERT. + // The event must NOT be mutated, so a re-apply (e.g. MTS deadlock retry) + // produces the identical DELETE + INSERT pair rather than a bare INSERT. + whereValues := sql.ToColumnValues([]interface{}{123456, 42}) + newValues := sql.ToColumnValues([]interface{}{123456, 99}) + binlogEvent := &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.UpdateDML, + NewColumnValues: newValues, + WhereColumnValues: whereValues, + } + + first := applier.buildDMLEventQuery(binlogEvent) + require.Len(t, first, 2) + require.NoError(t, first[0].err) + require.NoError(t, first[1].err) + require.Equal(t, int64(-1), first[0].rowsDelta) + require.Equal(t, int64(1), first[1].rowsDelta) + require.Contains(t, first[0].query, "delete") + require.Contains(t, first[1].query, "insert") + + // Event type is untouched after building the query. + require.Equal(t, binlog.UpdateDML, binlogEvent.DML) + + // Re-applying the same event (retry) yields the same DELETE + INSERT. + second := applier.buildDMLEventQuery(binlogEvent) + require.Len(t, second, 2) + require.Equal(t, first[0].query, second[0].query) + require.Equal(t, first[1].query, second[1].query) + require.Equal(t, binlog.UpdateDML, binlogEvent.DML) + }) +} + +func TestIsRetryableApplyError(t *testing.T) { + require.True(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1213, Message: "Deadlock found"})) + require.True(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1205, Message: "Lock wait timeout exceeded"})) + require.False(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1146, Message: "Table doesn't exist"})) + require.False(t, isRetryableApplyError(errors.New("generic error"))) + require.False(t, isRetryableApplyError(nil)) } func TestApplierInstantDDL(t *testing.T) { @@ -385,7 +426,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() { NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}), }, } - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err) // Check that the row was inserted @@ -674,7 +715,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}), }, } - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err) err = applier.CreateChangelogTable() @@ -918,7 +959,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigration } // This should return an error when PanicOnWarnings is enabled - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().Error(err) suite.Require().Contains(err.Error(), "Duplicate entry") @@ -1008,7 +1049,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateCompositeUniqueKe } // This should return an error when PanicOnWarnings is enabled - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().Error(err) suite.Require().Contains(err.Error(), "Duplicate entry") @@ -1120,7 +1161,7 @@ func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherI suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT") // Apply the event - this should FAIL because INSERT will have duplicate email warning - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key") suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry") @@ -1209,7 +1250,7 @@ func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() { suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query") // Apply the event - should succeed - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err) // Verify the update was applied correctly @@ -1284,7 +1325,7 @@ func (suite *ApplierTestSuite) TestDuplicateOnMigrationKeyAllowedInBinlogReplay( } // This should succeed - duplicate on migration unique key is expected and should be filtered out - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err) // Verify that the ghost table still has only the original 2 rows with correct data @@ -1375,7 +1416,7 @@ func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() { }, } - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err, "Duplicate on idx+email (migration key) should be allowed with PanicOnWarnings enabled") // Test: duplicate on PRIMARY (not the migration key) should fail @@ -1388,7 +1429,7 @@ func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() { }, } - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().Error(err, "Duplicate on PRIMARY (not migration key) should fail with PanicOnWarnings enabled") suite.Require().Contains(err.Error(), "Duplicate entry") @@ -1477,7 +1518,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() { } // Should succeed because PanicOnWarnings is disabled - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().NoError(err) // Verify that only 2 original rows exist with correct data (the duplicate was silently ignored) @@ -1583,7 +1624,7 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { } // Should fail due to the second event - err = applier.ApplyDMLEventQueries(dmlEvents) + err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents) suite.Require().Error(err) suite.Require().Contains(err.Error(), "Duplicate entry") diff --git a/go/logic/commit_barrier.go b/go/logic/commit_barrier.go new file mode 100644 index 000000000..9c855ca36 --- /dev/null +++ b/go/logic/commit_barrier.go @@ -0,0 +1,213 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "context" + "sort" + "sync" +) + +const seqUninit int64 = 0 + +// commitBarrier implements MTS LOGICAL_CLOCK dependency tracking for gh-ost. +// +// It mirrors MySQL 8.0's GAQ low-water-mark (LWM) scheduling, with one crucial +// adaptation: gh-ost only streams the binlog rows of the single migrated table, +// so the GTID sequence_number values it observes are SPARSE. The server assigns +// sequence_number across all transactions on all tables, so consecutive +// transactions on our table might be numbered 5, 9, 14, ... with large gaps for +// other tables' transactions that we never see. +// +// A naive gap-free LWM over the global sequence space (advance only lwm+1) would +// stall permanently at the first gap, because the missing sequence numbers +// belong to other tables and are never committed in our barrier. Instead, the +// LWM advances over the *dispatched* (seen) subsequence: the coordinator records +// every transaction it dispatches via addDelegatedJob(seq) in binlog order, and +// the LWM advances to the highest dispatched sequence whose predecessors in that +// subsequence have all committed. +// +// Invariant: all dispatched (seen) transactions with sequence_number <= lwm have +// been applied. This is exactly what wait_for_last_committed_trx needs, restricted +// to the transactions gh-ost actually applies. +// +// Concurrency: every field is guarded by mu. delegatedJobs MUST be mutated under +// mu so its change and the corresponding cond.Broadcast() are serialized with +// waiters in waitForAllWorkers; otherwise a decrement-to-zero racing with a +// waiter's predicate check would lose the wakeup and deadlock the coordinator. +type commitBarrier struct { + mu sync.Mutex + dispatched []int64 // seen sequence numbers in dispatch (increasing) order + head int // index into dispatched: entries before head are folded into lwm + committed map[int64]bool // committed sequences not yet folded into lwm + lwm int64 // all dispatched seq <= lwm are committed + delegatedJobs int64 // jobs dispatched but not yet completed + cond *sync.Cond +} + +func newCommitBarrier() *commitBarrier { + cb := &commitBarrier{ + lwm: seqUninit, + committed: make(map[int64]bool), + } + cb.cond = sync.NewCond(&cb.mu) + return cb +} + +// addDelegatedJob records a dispatched transaction. seq must be the transaction's +// sequence_number, supplied in binlog (increasing) order. A zero seq (logical +// timestamps unavailable) is accepted for job accounting but not tracked for +// dependency ordering. +func (cb *commitBarrier) addDelegatedJob(seq int64) { + cb.mu.Lock() + if seq != seqUninit { + cb.dispatched = append(cb.dispatched, seq) + } + cb.delegatedJobs++ + cb.mu.Unlock() +} + +// commit records a transaction as applied and advances the LWM over the +// dispatched subsequence as far as consecutively-committed entries allow. +func (cb *commitBarrier) commit(sequenceNumber int64) { + if sequenceNumber == seqUninit { + return + } + cb.mu.Lock() + cb.committed[sequenceNumber] = true + for cb.head < len(cb.dispatched) && cb.committed[cb.dispatched[cb.head]] { + seq := cb.dispatched[cb.head] + delete(cb.committed, seq) + cb.lwm = seq + cb.head++ + } + cb.compactLocked() + cb.cond.Broadcast() + cb.mu.Unlock() +} + +// compactLocked reclaims the folded prefix of the dispatched slice so it does +// not grow unbounded over a long migration. Caller must hold mu. +func (cb *commitBarrier) compactLocked() { + const compactThreshold = 4096 + if cb.head < compactThreshold || cb.head < len(cb.dispatched)/2 { + return + } + remaining := len(cb.dispatched) - cb.head + copy(cb.dispatched, cb.dispatched[cb.head:]) + cb.dispatched = cb.dispatched[:remaining] + cb.head = 0 +} + +// isPendingDispatchedLocked reports whether seq was dispatched on this table's +// stream and has not yet been folded into the LWM. dispatched[head:] is sorted +// ascending, so we binary-search it. Caller must hold mu. +func (cb *commitBarrier) isPendingDispatchedLocked(seq int64) bool { + tail := cb.dispatched[cb.head:] + i := sort.Search(len(tail), func(i int) bool { return tail[i] >= seq }) + return i < len(tail) && tail[i] == seq +} + +// waitForDependency blocks until every seen transaction with sequence_number <= +// lastCommitted has been applied (lwm >= lastCommitted). +// +// "Seen" is determined from the dispatched set itself, atomically under the lock: +// - lastCommitted <= lwm: the parent (and all earlier seen transactions) have +// already completed. Return immediately. +// - lastCommitted is a dispatched-but-not-yet-folded sequence: wait until the +// LWM advances past it. +// - lastCommitted was never dispatched on this table's stream: it is a +// cross-table dependency (the parent did not touch the migrated table), so +// gh-ost never applies it and must not wait — return immediately. +// +// Because the coordinator processes transactions in binlog order and a child's +// lastCommitted is always strictly less than its own sequence_number, any parent +// that lives on this stream has already been dispatched by the time we get here; +// "not dispatched" therefore reliably means a cross-table parent. This avoids the +// observe/dispatch ordering hazard of tracking "seen" separately from the barrier. +func (cb *commitBarrier) waitForDependency(ctx context.Context, lastCommitted int64) { + if lastCommitted == seqUninit { + return + } + cb.mu.Lock() + defer cb.mu.Unlock() + if lastCommitted <= cb.lwm { + return + } + if !cb.isPendingDispatchedLocked(lastCommitted) { + return + } + cb.waitWhileLocked(ctx, func() bool { return cb.lwm < lastCommitted }) +} + +// waitForAllWorkers blocks until all delegated jobs have completed +// (delegatedJobs == 0). Corresponds to MySQL: wait_for_workers_to_finish() +func (cb *commitBarrier) waitForAllWorkers(ctx context.Context) { + cb.mu.Lock() + defer cb.mu.Unlock() + cb.waitWhileLocked(ctx, func() bool { return cb.delegatedJobs > 0 }) +} + +// waitWhileLocked repeatedly waits on cond until either cond is met or ctx is cancelled. +// It spawns a short-lived goroutine per Wait() call that broadcasts on context cancellation, +// ensuring the caller never blocks past ctx cancellation. +// Caller must hold mu. +func (cb *commitBarrier) waitWhileLocked(ctx context.Context, condMet func() bool) { + for condMet() { + if ctx.Err() != nil { + return + } + done := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + cb.cond.Broadcast() + case <-done: + } + }() + cb.cond.Wait() + close(done) + } +} + +// completeDelegatedJob decrements the delegated job counter and wakes any +// waiter. The decrement and broadcast happen under mu so the wakeup cannot be +// lost against a concurrent waitForAllWorkers predicate check. +func (cb *commitBarrier) completeDelegatedJob() { + cb.mu.Lock() + cb.delegatedJobs-- + cb.cond.Broadcast() + cb.mu.Unlock() +} + +// reset clears the barrier state for a new logical-clock epoch (e.g. when +// sequence numbers restart after binlog rotation). Callers MUST ensure all +// delegated jobs have drained (via waitForAllWorkers) before resetting; this is +// done in-place to avoid racing with workers holding a reference to the barrier. +func (cb *commitBarrier) reset() { + cb.mu.Lock() + cb.lwm = seqUninit + cb.dispatched = cb.dispatched[:0] + cb.head = 0 + cb.committed = make(map[int64]bool) + cb.delegatedJobs = 0 + cb.mu.Unlock() +} + +// getLWM returns the current low water mark over the dispatched subsequence +// (thread-safe). +func (cb *commitBarrier) getLWM() int64 { + cb.mu.Lock() + defer cb.mu.Unlock() + return cb.lwm +} + +// delegatedJobCount returns the number of in-flight delegated jobs (thread-safe). +func (cb *commitBarrier) delegatedJobCount() int64 { + cb.mu.Lock() + defer cb.mu.Unlock() + return cb.delegatedJobs +} diff --git a/go/logic/commit_barrier_test.go b/go/logic/commit_barrier_test.go new file mode 100644 index 000000000..470e83eec --- /dev/null +++ b/go/logic/commit_barrier_test.go @@ -0,0 +1,509 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// dispatch records a set of sequence numbers as dispatched (seen) jobs, in the +// order given. The LWM only advances over dispatched sequences, mirroring how +// the coordinator registers every transaction via addDelegatedJob before a +// worker commits it. +func dispatch(cb *commitBarrier, seqs ...int64) { + for _, s := range seqs { + cb.addDelegatedJob(s) + } +} + +// --- LWM advancement tests --- + +func TestCommitBarrier_LWMAdvancesConsecutively(t *testing.T) { + cb := newCommitBarrier() + require.Equal(t, int64(0), cb.getLWM()) + dispatch(cb, 1, 2, 3) + + cb.commit(1) + require.Equal(t, int64(1), cb.getLWM()) + + cb.commit(2) + require.Equal(t, int64(2), cb.getLWM()) + + cb.commit(3) + require.Equal(t, int64(3), cb.getLWM()) +} + +func TestCommitBarrier_LWMStopsAtGap(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3) + + cb.commit(1) + require.Equal(t, int64(1), cb.getLWM()) + + // Commit 3 before 2: LWM stays at 1 because 2 (earlier in dispatch order) + // is not yet committed. + cb.commit(3) + require.Equal(t, int64(1), cb.getLWM()) + + // Now commit 2 — LWM should jump to 3. + cb.commit(2) + require.Equal(t, int64(3), cb.getLWM()) +} + +func TestCommitBarrier_OutOfOrderCommitFillsGap(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3, 4, 5) + + cb.commit(5) + require.Equal(t, int64(0), cb.getLWM()) + cb.commit(4) + require.Equal(t, int64(0), cb.getLWM()) + cb.commit(3) + require.Equal(t, int64(0), cb.getLWM()) + cb.commit(2) + require.Equal(t, int64(0), cb.getLWM()) + + // Filling the front: commit 1 should advance LWM all the way to 5. + cb.commit(1) + require.Equal(t, int64(5), cb.getLWM()) +} + +// TestCommitBarrier_SparseSequencesAdvance is the regression test for the +// production deadlock: gh-ost only sees its own table's binlog rows, so the +// observed sequence numbers are sparse (other tables' transactions create +// gaps). A gap-free LWM over the global sequence space would stall at the first +// gap; the dispatched-subsequence LWM must advance normally. +func TestCommitBarrier_SparseSequencesAdvance(t *testing.T) { + cb := newCommitBarrier() + // Sequences observed for our table, with large gaps from other tables. + seen := []int64{5, 9, 14, 22, 100, 537} + dispatch(cb, seen...) + + for _, s := range seen { + cb.commit(s) + } + require.Equal(t, int64(537), cb.getLWM()) + + // A later transaction depends on a seen parent (lc=100). It must not block, + // because every seen sequence <= 100 has committed even though global + // sequences 1..99 (other tables) were never observed. + done := make(chan struct{}) + go func() { + cb.waitForDependency(context.Background(), 100) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency stalled on sparse sequence space") + } +} + +// TestCommitBarrier_SparseDependencyBlocksUntilSeenParent verifies the ordering +// guarantee still holds with sparse sequences: a child depending on a seen +// parent waits until that parent (and all earlier seen transactions) commit. +func TestCommitBarrier_SparseDependencyBlocksUntilSeenParent(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 5, 9, 14) + ctx := context.Background() + + // Commit 5 and 14, but NOT 9 (earlier in dispatch order than 14). + cb.commit(5) + cb.commit(14) + require.Equal(t, int64(5), cb.getLWM()) + + done := make(chan struct{}) + go func() { + // Child depends on seen parent seq=9. + cb.waitForDependency(ctx, 9) + close(done) + }() + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + cb.commit(9) + require.Equal(t, int64(14), cb.getLWM()) + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency should unblock once seen parent committed") + } +} + +func TestCommitBarrier_CommitZeroIsNoop(t *testing.T) { + cb := newCommitBarrier() + cb.commit(0) + require.Equal(t, int64(0), cb.getLWM()) +} + +// --- waitForDependency tests --- + +func TestCommitBarrier_WaitForDependencyAlreadySatisfied(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3, 4, 5) + for i := int64(1); i <= 5; i++ { + cb.commit(i) + } + require.Equal(t, int64(5), cb.getLWM()) + + // LWM is 5, lastCommitted=5 should return immediately. + cb.waitForDependency(context.Background(), 5) +} + +func TestCommitBarrier_WaitForDependencyBlocksUntilLWMAdvances(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3) + ctx := context.Background() + + done := make(chan struct{}) + go func() { + cb.waitForDependency(ctx, 3) + close(done) + }() + + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + cb.commit(1) + cb.commit(2) + cb.commit(3) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency did not unblock after LWM advanced") + } +} + +func TestCommitBarrier_WaitForDependencyZeroIsNoop(t *testing.T) { + cb := newCommitBarrier() + cb.waitForDependency(context.Background(), 0) +} + +func TestCommitBarrier_WaitForDependencyCrossTableIsNoop(t *testing.T) { + cb := newCommitBarrier() + ctx := context.Background() + + // Dispatch only sequences on this table's stream; 99 is never dispatched, so + // it is a cross-table parent and must be treated as satisfied immediately. + dispatch(cb, 5, 9, 14) + cb.commit(5) + cb.commit(9) + cb.commit(14) + + done := make(chan struct{}) + go func() { + cb.waitForDependency(ctx, 99) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency blocked on unseen cross-table parent") + } +} + +// TestCommitBarrier_WaitForDependencyUndispatchedGapIsNoop is the regression +// guard for the coordinator hang: a lastCommitted value that lies BETWEEN two +// dispatched sequences but was itself never dispatched (its parent transaction +// touched another table) must be treated as a satisfied cross-table dependency, +// never blocking. Previously a separate "seen" set could disagree with the +// barrier's dispatched set and deadlock the coordinator here. +func TestCommitBarrier_WaitForDependencyUndispatchedGapIsNoop(t *testing.T) { + cb := newCommitBarrier() + ctx := context.Background() + + // We saw transactions 5 and 10 on our stream; 7 was on another table. + dispatch(cb, 5, 10) + cb.commit(5) // lwm = 5; 10 still pending + + done := make(chan struct{}) + go func() { + // 7 > lwm(5) and 7 is not a pending dispatched seq -> cross-table -> noop. + cb.waitForDependency(ctx, 7) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency deadlocked on an undispatched gap sequence") + } +} + +// --- waitForAllWorkers tests --- + +func TestCommitBarrier_WaitForAllWorkers(t *testing.T) { + cb := newCommitBarrier() + ctx := context.Background() + + cb.addDelegatedJob(1) + cb.addDelegatedJob(2) + + done := make(chan struct{}) + go func() { + cb.waitForAllWorkers(ctx) + close(done) + }() + + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + cb.completeDelegatedJob() + cb.completeDelegatedJob() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForAllWorkers did not unblock") + } +} + +// TestCommitBarrier_WaitForAllWorkersNoLostWakeup stresses the window between a +// waiter's predicate check and its park inside cond.Wait(). Before the fix, +// delegatedJobs was mutated outside the lock, so a decrement-to-zero racing +// with the waiter could lose the broadcast and deadlock the coordinator at a +// group boundary. Each iteration must terminate; a hang means the regression +// is back. +func TestCommitBarrier_WaitForAllWorkersNoLostWakeup(t *testing.T) { + ctx := context.Background() + for iter := 0; iter < 2000; iter++ { + cb := newCommitBarrier() + cb.addDelegatedJob(1) + + go cb.completeDelegatedJob() + + done := make(chan struct{}) + go func() { + cb.waitForAllWorkers(ctx) + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatalf("waitForAllWorkers lost wakeup on iteration %d", iter) + } + } +} + +// --- Concurrent tests --- + +func TestCommitBarrier_ConcurrentCommitsAdvanceLWM(t *testing.T) { + cb := newCommitBarrier() + var wg sync.WaitGroup + for i := int64(1); i <= 20; i++ { + wg.Add(1) + cb.addDelegatedJob(i) + go func(seq int64) { + defer wg.Done() + cb.commit(seq) + cb.completeDelegatedJob() + }(i) + } + wg.Wait() + require.Equal(t, int64(20), cb.getLWM()) +} + +// TestCommitBarrier_Reset verifies epoch reset clears all state. +func TestCommitBarrier_Reset(t *testing.T) { + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3) + cb.commit(1) + cb.commit(2) + require.Equal(t, int64(2), cb.getLWM()) + + cb.reset() + require.Equal(t, int64(0), cb.getLWM()) + require.Equal(t, int64(0), cb.delegatedJobCount()) + + // New epoch starts numbering from 1 again. + dispatch(cb, 1, 2) + cb.commit(1) + require.Equal(t, int64(1), cb.getLWM()) +} + +// --- PR #1454 regression tests --- +// Scenarios from meiji163's and dnovitski's investigation: +// https://github.com/github/gh-ost/pull/1454#issuecomment-2798932454 +// https://github.com/github/gh-ost/pull/1454#issuecomment-4340103322 + +func TestCommitBarrier_PR1454_OutOfOrderDoesNotShortCircuit(t *testing.T) { + // Scenario: + // trx A: seq=89058, lc=89053 (modifies row id=5025) + // trx B: seq=89065, lc=89062 (modifies row id=5025) + // + // B must wait until LWM >= 89062, which guarantees A (89058 <= 89062) is + // also complete. Per-sequence membership of just lc would be insufficient. + cb := newCommitBarrier() + + for i := int64(1); i <= 89062; i++ { + cb.addDelegatedJob(i) + } + for i := int64(1); i <= 89053; i++ { + cb.commit(i) + } + require.Equal(t, int64(89053), cb.getLWM()) + + // Commit A (89058) out of order, then fill the gap up to 89062. + cb.commit(89058) + require.Equal(t, int64(89053), cb.getLWM()) + for _, s := range []int64{89054, 89055, 89056, 89057, 89059, 89060, 89061, 89062} { + cb.commit(s) + } + require.Equal(t, int64(89062), cb.getLWM()) + + // trx B (lc=89062) can proceed: 89058 has completed (it's <= LWM). + cb.waitForDependency(context.Background(), 89062) +} + +func TestCommitBarrier_PR1454_WaitBlocksWhenLWMBelowLastCommitted(t *testing.T) { + cb := newCommitBarrier() + ctx := context.Background() + + for i := int64(1); i <= 15; i++ { + cb.addDelegatedJob(i) + } + for i := int64(1); i <= 10; i++ { + cb.commit(i) + } + require.Equal(t, int64(10), cb.getLWM()) + + done := make(chan struct{}) + go func() { + cb.waitForDependency(ctx, 15) + close(done) + }() + + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + for i := int64(11); i <= 15; i++ { + cb.commit(i) + } + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForDependency should have unblocked once LWM reached 15") + } + require.Equal(t, int64(15), cb.getLWM()) +} + +func TestCommitBarrier_PR1454_BinlogRotationResetsLWM(t *testing.T) { + // After binlog rotation, sequence numbers restart from 1. The coordinator + // drains workers and calls reset() so the LWM restarts at 0. + cb := newCommitBarrier() + for i := int64(1); i <= 65553; i++ { + cb.addDelegatedJob(i) + cb.commit(i) + } + require.Equal(t, int64(65553), cb.getLWM()) + + cb.reset() + require.Equal(t, int64(0), cb.getLWM()) + + dispatch(cb, 1, 2, 3, 4, 5) + cb.commit(1) + require.Equal(t, int64(1), cb.getLWM()) + + ctx := context.Background() + done := make(chan struct{}) + go func() { + cb.waitForDependency(ctx, 5) + close(done) + }() + + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + for i := int64(2); i <= 5; i++ { + cb.commit(i) + } + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("should unblock after LWM advances in new epoch") + } +} + +func TestCommitBarrier_PR1454_ErrorDoesNotMarkCommitted(t *testing.T) { + // If a worker fails, its sequence should NOT advance the LWM past it, so a + // dependent transaction keeps waiting until the failed one is retried. + cb := newCommitBarrier() + dispatch(cb, 1, 2, 3, 4) + + cb.commit(1) + cb.commit(2) + require.Equal(t, int64(2), cb.getLWM()) + + // seq=4 commits but can't advance LWM past 2 (seq=3 still pending). + cb.commit(4) + require.Equal(t, int64(2), cb.getLWM()) + + ctx := context.Background() + done := make(chan struct{}) + go func() { + cb.waitForDependency(ctx, 3) + close(done) + }() + + require.Eventually(t, func() bool { + select { + case <-done: + return false + default: + return true + } + }, 200*time.Millisecond, 10*time.Millisecond) + + // Retry seq=3 successfully. + cb.commit(3) + require.Equal(t, int64(4), cb.getLWM()) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waiter should unblock once failed transaction is retried") + } +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bec13e594..b42c79960 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "math" + "math/rand" "os" "strings" "sync/atomic" @@ -50,9 +51,11 @@ type lockProcessedStruct struct { } type applyEventStruct struct { - writeFunc *tableWriteFunc - dmlEvent *binlog.BinlogDMLEvent - coords mysql.BinlogCoordinates + writeFunc *tableWriteFunc + dmlEvent *binlog.BinlogDMLEvent + coords mysql.BinlogCoordinates + lastCommitted int64 + sequenceNum int64 } func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { @@ -61,10 +64,22 @@ func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { } func newApplyEventStructByDML(dmlEntry *binlog.BinlogEntry) *applyEventStruct { - result := &applyEventStruct{dmlEvent: dmlEntry.DmlEvent, coords: dmlEntry.Coordinates} + result := &applyEventStruct{ + dmlEvent: dmlEntry.DmlEvent, + coords: dmlEntry.Coordinates, + lastCommitted: dmlEntry.LastCommitted, + sequenceNum: dmlEntry.SequenceNumber, + } return result } +// mtsWorkerJob is a full transaction batch dispatched to one MTS worker. +type mtsWorkerJob struct { + dmlEvents []*binlog.BinlogDMLEvent + sequenceNum int64 + coords mysql.BinlogCoordinates +} + type PrintStatusRule int const ( @@ -100,6 +115,15 @@ type Migrator struct { applyEventsQueue chan *applyEventStruct finishedMigrating int64 + + // MTS parallel apply (LOGICAL_CLOCK mode) + numWorkers int + mtsActive int64 + workerQueues []chan *mtsWorkerJob + commitBarrier *commitBarrier + coordinatorQueue chan *applyEventStruct + pendingCoordinatorEvent *applyEventStruct + workerNextIdx int } func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { @@ -124,9 +148,29 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), finishedMigrating: 0, } + + // MTS initialization + migrator.numWorkers = context.NumWorkers + if migrator.numWorkers > 1 { + migrator.coordinatorQueue = make(chan *applyEventStruct, base.MaxEventsBatchSize) + migrator.workerQueues = make([]chan *mtsWorkerJob, migrator.numWorkers) + for i := range migrator.workerQueues { + migrator.workerQueues[i] = make(chan *mtsWorkerJob, base.MaxEventsBatchSize) + } + migrator.commitBarrier = newCommitBarrier() + } + return migrator } +// enqueueApplyEvent routes control-plane and DML apply tasks to the active consumer. +func (mgtr *Migrator) enqueueApplyEvent(eventStruct *applyEventStruct) error { + if atomic.LoadInt64(&mgtr.mtsActive) > 0 { + return base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.coordinatorQueue, eventStruct) + } + return base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.applyEventsQueue, eventStruct) +} + // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) func (mgtr *Migrator) sleepWhileTrue(operation func() (bool, error)) error { @@ -161,12 +205,23 @@ func (mgtr *Migrator) retryBatchCopyWithHooks(operation func() error, notFatalHi // retryOperation attempts up to `count` attempts at running given function, // exiting as soon as it returns with non-error. func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) { + return mgtr.retryOperationCtx(mgtr.migrationContext.GetContext(), operation, notFatalHint...) +} + +// retryOperationCtx is like retryOperation but uses the caller's context for +// cancellation checks and PanicAbort delivery (required by MTS goroutines). +func (mgtr *Migrator) retryOperationCtx(ctx context.Context, operation func() error, notFatalHint ...bool) (err error) { maxRetries := int(mgtr.migrationContext.MaxRetries()) for i := 0; i < maxRetries; i++ { if i != 0 { // sleep after previous iteration RetrySleepFn(1 * time.Second) } + if ctx != nil { + if err := ctx.Err(); err != nil { + return err + } + } // Check for abort/context cancellation before each retry if abortErr := mgtr.checkAbort(); abortErr != nil { return abortErr @@ -178,7 +233,7 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) if strings.Contains(err.Error(), "warnings detected") { if len(notFatalHint) == 0 { - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + _ = base.SendWithContext(ctx, mgtr.migrationContext.PanicAbort, err) } return err } @@ -186,7 +241,7 @@ func (mgtr *Migrator) retryOperation(operation func() error, notFatalHint ...boo } if len(notFatalHint) == 0 { // Use helper to prevent deadlock if listenOnPanicAbort already exited - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + _ = base.SendWithContext(ctx, mgtr.migrationContext.PanicAbort, err) } return err } @@ -292,6 +347,9 @@ func (mgtr *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e coords: dmlEntry.Coordinates.Clone(), } var applyEventFunc tableWriteFunc = func() error { + if atomic.LoadInt64(&mgtr.mtsActive) > 0 && mgtr.commitBarrier != nil { + mgtr.commitBarrier.waitForAllWorkers(mgtr.migrationContext.GetContext()) + } // Non-blocking send with overwrite-oldest semantics: if the buffer is // full (receiver timed out on a previous attempt), drain the stale // message first so the current sentinel is always delivered. This @@ -316,12 +374,10 @@ func (mgtr *Migrator) onChangelogStateEvent(dmlEntry *binlog.BinlogEntry) (err e } // at this point we know all events up to lock have been read from the streamer, // because the streamer works sequentially. So those events are either already handled, - // or have event functions in applyEventsQueue. - // So as not to create a potential deadlock, we write this func to applyEventsQueue - // asynchronously, understanding it doesn't really matter. + // or have event functions in the apply queue. + // So as not to create a potential deadlock, we enqueue this func asynchronously. go func() { - // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.applyEventsQueue, newApplyEventStructByFunc(&applyEventFunc)) + _ = mgtr.enqueueApplyEvent(newApplyEventStructByFunc(&applyEventFunc)) }() default: return fmt.Errorf("unknown changelog state: %+v", changelogState) @@ -339,7 +395,7 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e } mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) - // Route the coords bump through applyEventsQueue so it is ordered after + // Route the coords bump through the apply queue so it is ordered after // any DMLs the streamer enqueued before this heartbeat. coords := dmlEntry.Coordinates var writeFunc tableWriteFunc = func() error { @@ -348,11 +404,7 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } - if err := base.SendWithContext( - mgtr.migrationContext.GetContext(), - mgtr.applyEventsQueue, - newApplyEventStructByFunc(&writeFunc), - ); err != nil { + if err := mgtr.enqueueApplyEvent(newApplyEventStructByFunc(&writeFunc)); err != nil { return mgtr.migrationContext.Log.Errore(err) } return nil @@ -614,9 +666,31 @@ func (mgtr *Migrator) Migrate() (err error) { return err } go func() { - if err := mgtr.executeWriteFuncs(); err != nil { - // Send error to PanicAbort to trigger abort - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + if mgtr.numWorkers > 1 { + // Wait for logical timestamps detection before starting MTS + if mgtr.migrationContext.LogicalTimestampsDetected != nil { + <-mgtr.migrationContext.LogicalTimestampsDetected + } + if !mgtr.migrationContext.BinlogHasLogicalTimestamps { + mgtr.migrationContext.Log.Infof("Multi-worker mode requires binlog logical timestamps (MySQL 5.7+). Falling back to single worker.") + if err := mgtr.executeWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + return + } + // MTS mode: workers handle DML in parallel; executeWriteFuncs() still + // runs row copy (copyRowsQueue) and drains pre-MTS backlog on applyEventsQueue. + if err := mgtr.startMTSWorkers(mgtr.migrationContext.GetContext()); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + return + } + if err := mgtr.executeWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + } else { + if err := mgtr.executeWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } } }() go mgtr.iterateChunks() @@ -760,9 +834,29 @@ func (mgtr *Migrator) Revert() error { mgtr.initiateThrottler() go mgtr.initiateStatus() go func() { - if err := mgtr.executeDMLWriteFuncs(); err != nil { - // Send error to PanicAbort to trigger abort - _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + if mgtr.numWorkers > 1 { + if mgtr.migrationContext.LogicalTimestampsDetected != nil { + <-mgtr.migrationContext.LogicalTimestampsDetected + } + if !mgtr.migrationContext.BinlogHasLogicalTimestamps { + mgtr.migrationContext.Log.Infof("Multi-worker mode requires binlog logical timestamps (MySQL 5.7+). Falling back to single worker.") + if err := mgtr.executeDMLWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + return + } + if err := mgtr.startMTSWorkers(mgtr.migrationContext.GetContext()); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + return + } + // executeDMLWriteFuncs still needed for non-DML control events + if err := mgtr.executeDMLWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } + } else { + if err := mgtr.executeDMLWriteFuncs(); err != nil { + _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.migrationContext.PanicAbort, err) + } } }() @@ -1407,10 +1501,17 @@ func (mgtr *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := mgtr.eventsStreamer.GetCurrentBinlogCoordinates() + backlogLen := len(mgtr.applyEventsQueue) + backlogCap := cap(mgtr.applyEventsQueue) + if atomic.LoadInt64(&mgtr.mtsActive) > 0 && mgtr.coordinatorQueue != nil { + backlogLen = len(mgtr.coordinatorQueue) + backlogCap = cap(mgtr.coordinatorQueue) + } + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&mgtr.migrationContext.TotalDMLEventsApplied), - len(mgtr.applyEventsQueue), cap(mgtr.applyEventsQueue), + backlogLen, backlogCap, base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(mgtr.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates.DisplayString(), mgtr.migrationContext.GetCurrentLagDuration().Seconds(), @@ -1485,9 +1586,9 @@ func (mgtr *Migrator) addDMLEventsListener() error { mgtr.migrationContext.DatabaseName, mgtr.migrationContext.OriginalTableName, func(dmlEntry *binlog.BinlogEntry) error { - // Use helper to prevent deadlock if buffer fills and executeWriteFuncs exits + // Use helper to prevent deadlock if buffer fills and consumer exits // This is critical because this callback blocks the event streamer - return base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.applyEventsQueue, newApplyEventStructByDML(dmlEntry)) + return mgtr.enqueueApplyEvent(newApplyEventStructByDML(dmlEntry)) }, ) return err @@ -1701,7 +1802,7 @@ func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { - return mgtr.applier.ApplyDMLEventQueries(dmlEvents) + return mgtr.applier.ApplyDMLEventQueries(mgtr.migrationContext.GetContext(), dmlEvents) } if err := mgtr.retryOperation(applyEventFunc); err != nil { return mgtr.migrationContext.Log.Errore(err) @@ -1818,6 +1919,238 @@ func (mgtr *Migrator) checkpointLoop() { } } +// startMTSWorkers starts the MTS Coordinator and N Worker goroutines. +// Corresponds to MySQL MTS: coordinator + worker threads. +func (mgtr *Migrator) startMTSWorkers(ctx context.Context) error { + mgtr.migrationContext.Log.Infof("Starting MTS mode with %d workers", mgtr.numWorkers) + atomic.StoreInt64(&mgtr.mtsActive, 1) + + // Start workers, each with its own Applier (independent DB connection). + // DML query builders are shared from the primary applier (immutable after prepareQueries). + for i := 0; i < mgtr.numWorkers; i++ { + workerApplier := NewApplier(mgtr.migrationContext) + if err := workerApplier.InitDBConnections(); err != nil { + return fmt.Errorf("MTS worker %d init failed: %w", i, err) + } + if err := workerApplier.adoptDMLQueryBuildersFrom(mgtr.applier); err != nil { + return fmt.Errorf("MTS worker %d: %w", i, err) + } + go mgtr.dmlWorker(ctx, i, workerApplier) + } + + // Start coordinator + go mgtr.dmlCoordinator(ctx) + + return nil +} + +func (mgtr *Migrator) readCoordinatorEvent(ctx context.Context) (*applyEventStruct, bool) { + if mgtr.pendingCoordinatorEvent != nil { + eventStruct := mgtr.pendingCoordinatorEvent + mgtr.pendingCoordinatorEvent = nil + return eventStruct, true + } + select { + case <-ctx.Done(): + return nil, false + case eventStruct := <-mgtr.coordinatorQueue: + return eventStruct, true + } +} + +func (mgtr *Migrator) handleCoordinatorNonDML(ctx context.Context, eventStruct *applyEventStruct) error { + if eventStruct.writeFunc == nil { + return nil + } + return mgtr.retryOperationCtx(ctx, *eventStruct.writeFunc) +} + +// collectTransactionEvents groups consecutive binlog rows belonging to the same GTID transaction. +func (mgtr *Migrator) collectTransactionEvents(ctx context.Context, first *applyEventStruct) ([]*applyEventStruct, bool) { + trxEvents := []*applyEventStruct{first} + seqNum := first.sequenceNum + lastCommitted := first.lastCommitted + + for { + select { + case <-ctx.Done(): + return trxEvents, false + case next := <-mgtr.coordinatorQueue: + if next.dmlEvent == nil { + if err := mgtr.handleCoordinatorNonDML(ctx, next); err != nil { + mgtr.migrationContext.PanicAbort <- err + return nil, false + } + continue + } + if next.sequenceNum == seqNum && next.lastCommitted == lastCommitted { + trxEvents = append(trxEvents, next) + continue + } + mgtr.pendingCoordinatorEvent = next + default: + } + break + } + return trxEvents, true +} + +// dmlCoordinator reads DML events from coordinatorQueue, checks dependencies +// using last_committed/sequence_number (LOGICAL_CLOCK), and dispatches to workers. +// +// Corresponds to MySQL: Mts_submode_logical_clock::schedule_next_event() + wait_for_last_committed_trx() +func (mgtr *Migrator) dmlCoordinator(ctx context.Context) { + scheduleState := newMTSScheduleState() + + for { + eventStruct, ok := mgtr.readCoordinatorEvent(ctx) + if !ok { + return + } + if eventStruct.dmlEvent == nil { + if err := mgtr.handleCoordinatorNonDML(ctx, eventStruct); err != nil { + mgtr.migrationContext.PanicAbort <- err + return + } + continue + } + + trxEvents, ok := mgtr.collectTransactionEvents(ctx, eventStruct) + if !ok { + return + } + if len(trxEvents) == 0 { + continue + } + + currentSeqNum := trxEvents[0].sequenceNum + currentLastCommitted := trxEvents[0].lastCommitted + + isNewGroup, err := scheduleState.evaluateTransaction(currentSeqNum, currentLastCommitted) + if err != nil { + mgtr.migrationContext.PanicAbort <- err + return + } + if scheduleState.consumeEpochReset() { + mgtr.commitBarrier.waitForAllWorkers(ctx) + mgtr.commitBarrier.reset() + } + + mgtr.throttler.throttle(nil) + + dmlEvents := make([]*binlog.BinlogDMLEvent, 0, len(trxEvents)) + for _, e := range trxEvents { + dmlEvents = append(dmlEvents, e.dmlEvent) + } + job := &mtsWorkerJob{ + dmlEvents: dmlEvents, + sequenceNum: currentSeqNum, + coords: trxEvents[len(trxEvents)-1].coords, + } + + var workerID int + if isNewGroup { + mgtr.commitBarrier.waitForAllWorkers(ctx) + workerID = 0 + } else { + mgtr.commitBarrier.waitForDependency(ctx, currentLastCommitted) + workerID = mgtr.workerNextIdx % mgtr.numWorkers + mgtr.workerNextIdx++ + } + + mgtr.commitBarrier.addDelegatedJob(job.sequenceNum) + select { + case mgtr.workerQueues[workerID] <- job: + case <-ctx.Done(): + mgtr.commitBarrier.commit(job.sequenceNum) + mgtr.commitBarrier.completeDelegatedJob() + return + } + } +} + +// dmlWorker reads transaction jobs from its worker queue and applies them to the ghost table. +// Each worker owns an independent Applier with its own DB connection. +func (mgtr *Migrator) dmlWorker(ctx context.Context, workerID int, applier *Applier) { + for { + select { + case <-ctx.Done(): + return + case job := <-mgtr.workerQueues[workerID]: + if job == nil || len(job.dmlEvents) == 0 { + continue + } + mgtr.applyMTSWorkerJob(ctx, workerID, applier, job) + } + } +} + +// mtsApplyMaxRetries bounds retries of a single transaction on transient +// concurrency errors, matching MySQL's default slave_transaction_retries (10) +// generous headroom for contention between parallel workers on the ghost table. +const mtsApplyMaxRetries = 100 + +// retryMTSApply retries a DML apply operation on transient concurrency errors. +// Deadlocks (1213) and lock-wait timeouts (1205) are expected under concurrent +// MTS workers and are resolved by re-running the whole transaction. Non-retryable +// errors (bad query, warnings, etc.) are returned immediately and propagated to +// the coordinator so the migration aborts rather than silently dropping a +// transaction and corrupting dependency tracking. +func (mgtr *Migrator) retryMTSApply(ctx context.Context, operation func() error) error { + var lastErr error + for i := 0; i < mtsApplyMaxRetries; i++ { + if err := ctx.Err(); err != nil { + return err + } + if abortErr := mgtr.checkAbort(); abortErr != nil { + return abortErr + } + err := operation() + if err == nil { + return nil + } + if !isRetryableApplyError(err) { + return err + } + lastErr = err + // Jittered backoff (0-5ms) avoids a thundering herd of workers + // retrying the same contended rows in lockstep. + RetrySleepFn(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) + } + return fmt.Errorf("MTS apply exhausted %d retries on transient error: %w", mtsApplyMaxRetries, lastErr) +} + +// applyMTSWorkerJob applies one delegated transaction job and always releases the commit barrier. +func (mgtr *Migrator) applyMTSWorkerJob(ctx context.Context, workerID int, applier *Applier, job *mtsWorkerJob) { + var applyErr error + defer func() { + if r := recover(); r != nil { + applyErr = fmt.Errorf("MTS worker %d panic: %v", workerID, r) + } + if applyErr != nil { + mgtr.commitBarrier.commit(job.sequenceNum) + mgtr.commitBarrier.completeDelegatedJob() + _ = base.SendWithContext(ctx, mgtr.migrationContext.PanicAbort, applyErr) + } + }() + + if err := mgtr.retryMTSApply(ctx, func() error { + return applier.ApplyDMLEventQueries(ctx, job.dmlEvents) + }); err != nil { + applyErr = err + return + } + + mgtr.commitBarrier.commit(job.sequenceNum) + mgtr.commitBarrier.completeDelegatedJob() + + mgtr.applier.CurrentCoordinatesMutex.Lock() + if mgtr.applier.CurrentCoordinates.SmallerThan(job.coords) { + mgtr.applier.CurrentCoordinates = job.coords + } + mgtr.applier.CurrentCoordinatesMutex.Unlock() +} + // executeWriteFuncs writes data via applier: both the rowcopy and the events backlog. // This is where the ghost table gets the data. The function fills the data single-threaded. // Both event backlog and rowcopy events are polled; the backlog events have precedence. @@ -1841,6 +2174,7 @@ func (mgtr *Migrator) executeWriteFuncs() error { select { case eventStruct := <-mgtr.applyEventsQueue: { + // After mtsActive, new DML goes to coordinatorQueue; entries already here are pre-MTS backlog. if err := mgtr.onApplyEventStruct(eventStruct); err != nil { return err } diff --git a/go/logic/migrator_mts_test.go b/go/logic/migrator_mts_test.go new file mode 100644 index 000000000..4ba85fb96 --- /dev/null +++ b/go/logic/migrator_mts_test.go @@ -0,0 +1,291 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + drivermysql "github.com/go-sql-driver/mysql" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/binlog" + "github.com/github/gh-ost/go/mysql" + "github.com/github/gh-ost/go/sql" + "github.com/stretchr/testify/require" +) + +func TestCollectTransactionEvents_GroupsSameLogicalTimestamp(t *testing.T) { + mgtr := NewMigrator(base.NewMigrationContext(), "test") + mgtr.coordinatorQueue = make(chan *applyEventStruct, 8) + + dml := &binlog.BinlogDMLEvent{DML: binlog.InsertDML} + row1 := newApplyEventStructByDML(&binlog.BinlogEntry{ + LastCommitted: 4, + SequenceNumber: 5, + DmlEvent: dml, + }) + row2 := newApplyEventStructByDML(&binlog.BinlogEntry{ + LastCommitted: 4, + SequenceNumber: 5, + DmlEvent: dml, + }) + row3 := newApplyEventStructByDML(&binlog.BinlogEntry{ + LastCommitted: 5, + SequenceNumber: 6, + DmlEvent: &binlog.BinlogDMLEvent{DML: binlog.InsertDML}, + }) + + mgtr.coordinatorQueue <- row2 + mgtr.coordinatorQueue <- row3 + + ctx := context.Background() + trx, ok := mgtr.collectTransactionEvents(ctx, row1) + require.True(t, ok) + require.Len(t, trx, 2) + require.Equal(t, int64(5), trx[0].sequenceNum) + require.NotNil(t, mgtr.pendingCoordinatorEvent) + require.Equal(t, int64(6), mgtr.pendingCoordinatorEvent.sequenceNum) +} + +func TestCollectTransactionEvents_HandlesNonDMLBetweenRows(t *testing.T) { + mgtr := NewMigrator(base.NewMigrationContext(), "test") + mgtr.coordinatorQueue = make(chan *applyEventStruct, 8) + + called := false + var writeFunc tableWriteFunc = func() error { + called = true + return nil + } + mgtr.coordinatorQueue <- newApplyEventStructByFunc(&writeFunc) + + rowDML := &binlog.BinlogDMLEvent{DML: binlog.InsertDML} + row1 := newApplyEventStructByDML(&binlog.BinlogEntry{ + LastCommitted: 1, + SequenceNumber: 2, + DmlEvent: rowDML, + }) + row2 := newApplyEventStructByDML(&binlog.BinlogEntry{ + LastCommitted: 1, + SequenceNumber: 2, + DmlEvent: rowDML, + }) + mgtr.coordinatorQueue <- row2 + + ctx := context.Background() + trx, ok := mgtr.collectTransactionEvents(ctx, row1) + require.True(t, ok) + require.Len(t, trx, 2) + require.True(t, called) +} + +func TestNotifyLogicalTimestampsDetection_ClosesOnce(t *testing.T) { + ctx := base.NewMigrationContext() + ctx.LogicalTimestampsDetected = make(chan struct{}) + + ctx.NotifyLogicalTimestampsDetection(true) + require.True(t, ctx.BinlogHasLogicalTimestamps) + + select { + case <-ctx.LogicalTimestampsDetected: + default: + t.Fatal("expected channel closed") + } + + ctx.NotifyLogicalTimestampsDetection(false) + require.True(t, ctx.BinlogHasLogicalTimestamps) +} + +func TestEnqueueApplyEvent_RoutesToCoordinatorWhenMTSActive(t *testing.T) { + mgtr := NewMigrator(base.NewMigrationContext(), "test") + mgtr.numWorkers = 4 + mgtr.coordinatorQueue = make(chan *applyEventStruct, 1) + atomic.StoreInt64(&mgtr.mtsActive, 1) + + var noop tableWriteFunc = func() error { return nil } + err := mgtr.enqueueApplyEvent(newApplyEventStructByFunc(&noop)) + require.NoError(t, err) + require.Len(t, mgtr.coordinatorQueue, 1) + require.Len(t, mgtr.applyEventsQueue, 0) +} + +func TestEnqueueApplyEvent_RoutesToApplyQueueWhenMTSInactive(t *testing.T) { + mgtr := NewMigrator(base.NewMigrationContext(), "test") + mgtr.numWorkers = 4 + mgtr.coordinatorQueue = make(chan *applyEventStruct, 1) + + var noop tableWriteFunc = func() error { return nil } + err := mgtr.enqueueApplyEvent(newApplyEventStructByFunc(&noop)) + require.NoError(t, err) + require.Len(t, mgtr.applyEventsQueue, 1) + require.Len(t, mgtr.coordinatorQueue, 0) +} + +func TestMigrator_NumWorkersInitializesMTSChannels(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.NumWorkers = 4 + mgtr := NewMigrator(mctx, "test") + require.Equal(t, 4, mgtr.numWorkers) + require.Len(t, mgtr.workerQueues, 4) + require.NotNil(t, mgtr.commitBarrier) + require.NotNil(t, mgtr.coordinatorQueue) +} + +func TestMigrator_NumWorkersOneSkipsMTSChannels(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.NumWorkers = 1 + mgtr := NewMigrator(mctx, "test") + require.Nil(t, mgtr.commitBarrier) + require.Nil(t, mgtr.coordinatorQueue) +} + +func TestMTSWorkerJobHoldsCoordinates(t *testing.T) { + job := &mtsWorkerJob{coords: &mysql.FileBinlogCoordinates{LogFile: "binlog.000001", LogPos: 4}} + require.NotNil(t, job.coords) +} + +func TestAdoptDMLQueryBuildersFrom_RequiresPreparedPrimary(t *testing.T) { + columns := sql.NewColumnList([]string{"id", "item_id"}) + mctx := base.NewMigrationContext() + mctx.DatabaseName = "test" + mctx.OriginalTableName = "test" + mctx.OriginalTableColumns = columns + mctx.SharedColumns = columns + mctx.MappedSharedColumns = columns + mctx.UniqueKey = &sql.UniqueKey{ + Name: t.Name(), + Columns: *columns, + } + + primary := NewApplier(mctx) + worker := NewApplier(mctx) + + err := worker.adoptDMLQueryBuildersFrom(primary) + require.Error(t, err) + + require.NoError(t, primary.prepareQueries()) + require.NoError(t, worker.adoptDMLQueryBuildersFrom(primary)) + require.NotNil(t, worker.dmlInsertQueryBuilder) + require.Same(t, primary.dmlInsertQueryBuilder, worker.dmlInsertQueryBuilder) +} + +func TestApplyMTSWorkerJob_ReleasesBarrierOnError(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.PanicAbort = make(chan error, 1) + mgtr := NewMigrator(mctx, "test") + mgtr.numWorkers = 2 + mgtr.commitBarrier = newCommitBarrier() + mgtr.commitBarrier.addDelegatedJob(0) + + job := &mtsWorkerJob{ + dmlEvents: []*binlog.BinlogDMLEvent{{DML: binlog.InsertDML}}, + } + // Applier without DB connection or query builders: apply fails, barrier must release. + mgtr.applyMTSWorkerJob(context.Background(), 0, NewApplier(mctx), job) + require.Equal(t, int64(0), mgtr.commitBarrier.delegatedJobCount()) + require.Error(t, <-mctx.PanicAbort) +} + +func TestIsDeadlockError(t *testing.T) { + require.True(t, isDeadlockError(&drivermysql.MySQLError{Number: 1213, Message: "Deadlock found"})) + require.False(t, isDeadlockError(&drivermysql.MySQLError{Number: 1205, Message: "Lock wait timeout"})) + require.False(t, isDeadlockError(errors.New("generic error"))) + require.False(t, isDeadlockError(nil)) +} + +func TestRetryMTSApply_RetriesImmediatelyOnDeadlock(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.PanicAbort = make(chan error, 1) + mgtr := NewMigrator(mctx, "test") + + deadlockErr := &drivermysql.MySQLError{Number: 1213, Message: "Deadlock found when trying to get lock"} + attempts := 0 + start := time.Now() + err := mgtr.retryMTSApply(context.Background(), func() error { + attempts++ + if attempts < 3 { + return deadlockErr + } + return nil + }) + elapsed := time.Since(start) + + require.NoError(t, err) + require.Equal(t, 3, attempts) + // Deadlock retries should be immediate (< 100ms for 2 retries). + // Normal retryOperation would sleep 2 seconds (1s * 2 retries). + require.Less(t, elapsed, 500*time.Millisecond, "deadlock retries should be fast") +} + +func TestRetryMTSApply_RetriesOnLockWaitTimeout(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.PanicAbort = make(chan error, 1) + mgtr := NewMigrator(mctx, "test") + + lockWaitErr := &drivermysql.MySQLError{Number: 1205, Message: "Lock wait timeout exceeded"} + attempts := 0 + err := mgtr.retryMTSApply(context.Background(), func() error { + attempts++ + if attempts < 3 { + return lockWaitErr + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 3, attempts) +} + +func TestRetryMTSApply_PropagatesNonRetryableImmediately(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.SetDefaultNumRetries(3) + mctx.PanicAbort = make(chan error, 1) + mgtr := NewMigrator(mctx, "test") + + // A non-retryable error (not a deadlock / lock-wait timeout) must be + // returned immediately and unchanged, so the coordinator aborts the + // migration rather than silently dropping the transaction. + fatalErr := &drivermysql.MySQLError{Number: 1146, Message: "Table doesn't exist"} + attempts := 0 + err := mgtr.retryMTSApply(context.Background(), func() error { + attempts++ + return fatalErr + }) + require.ErrorIs(t, err, fatalErr) + require.Equal(t, 1, attempts) +} + +func TestCoordinateUpdateMonotonic(t *testing.T) { + mctx := base.NewMigrationContext() + mctx.PanicAbort = make(chan error, 1) + mgtr := NewMigrator(mctx, "test") + mgtr.applier = NewApplier(mctx) + + // Simulate coordinate regression: worker with seq=10 finishes first. + mgtr.applier.CurrentCoordinates = &mysql.FileBinlogCoordinates{LogFile: "binlog.000001", LogPos: 200} + + job := &mtsWorkerJob{ + dmlEvents: []*binlog.BinlogDMLEvent{{DML: binlog.InsertDML}}, + sequenceNum: 10, + coords: &mysql.FileBinlogCoordinates{LogFile: "binlog.000001", LogPos: 100}, + } + mgtr.commitBarrier = newCommitBarrier() + mgtr.commitBarrier.addDelegatedJob(job.sequenceNum) + + // applyMTSWorkerJob will fail (no DB), but the deferred handler runs first. + // Test coordinate update logic directly instead. + mgtr.applier.CurrentCoordinatesMutex.Lock() + if mgtr.applier.CurrentCoordinates.SmallerThan(job.coords) { + mgtr.applier.CurrentCoordinates = job.coords + } + mgtr.applier.CurrentCoordinatesMutex.Unlock() + + // Coordinates should NOT regress to 100. + coords := mgtr.applier.CurrentCoordinates.(*mysql.FileBinlogCoordinates) + require.Equal(t, int64(200), coords.LogPos) +} diff --git a/go/logic/mts_schedule.go b/go/logic/mts_schedule.go new file mode 100644 index 000000000..0843b43bb --- /dev/null +++ b/go/logic/mts_schedule.go @@ -0,0 +1,65 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import "fmt" + +// mtsScheduleState tracks LOGICAL_CLOCK scheduling state across transactions. +// MySQL validates and schedules per GTID transaction, not per row. +type mtsScheduleState struct { + lastTrxSeqNum int64 + lastSeqNum int64 + firstEvent bool + epochReset bool +} + +func newMTSScheduleState() *mtsScheduleState { + return &mtsScheduleState{ + firstEvent: true, + } +} + +func (s *mtsScheduleState) consumeEpochReset() bool { + if !s.epochReset { + return false + } + s.epochReset = false + return true +} + +func (s *mtsScheduleState) resetEpoch() { + s.lastTrxSeqNum = 0 + s.lastSeqNum = 0 + s.firstEvent = true + s.epochReset = true +} + +// evaluateTransaction returns whether the transaction starts a new serialized group +// and updates internal state. Call once per transaction (after row grouping). +func (s *mtsScheduleState) evaluateTransaction(currentSeqNum, currentLastCommitted int64) (isNewGroup bool, err error) { + if currentSeqNum != 0 && currentSeqNum != s.lastTrxSeqNum { + if currentLastCommitted != 0 && currentSeqNum <= currentLastCommitted { + return false, fmt.Errorf("inconsistent timestamps: seq=%d <= lc=%d", currentSeqNum, currentLastCommitted) + } + if s.lastTrxSeqNum != 0 && currentSeqNum <= s.lastTrxSeqNum { + // Logical clock counter wrapped or restarted; begin a new epoch. + s.resetEpoch() + } else { + s.lastTrxSeqNum = currentSeqNum + } + } + + gapSuccessor := s.lastSeqNum != 0 && currentSeqNum != s.lastSeqNum && currentSeqNum > s.lastSeqNum+1 + isNewGroup = s.firstEvent || + currentSeqNum == 0 || + currentLastCommitted == 0 || + gapSuccessor || + (s.lastSeqNum == 0 && currentSeqNum != 0) + + s.firstEvent = false + s.lastSeqNum = currentSeqNum + return isNewGroup, nil +} diff --git a/go/logic/mts_schedule_test.go b/go/logic/mts_schedule_test.go new file mode 100644 index 000000000..c60accd69 --- /dev/null +++ b/go/logic/mts_schedule_test.go @@ -0,0 +1,78 @@ +/* + Copyright 2025 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMTSSchedule_FirstEventIsNewGroup(t *testing.T) { + s := newMTSScheduleState() + isNew, err := s.evaluateTransaction(6, 5) + require.NoError(t, err) + require.True(t, isNew) +} + +func TestMTSSchedule_SequenceUninitIsNewGroup(t *testing.T) { + s := newMTSScheduleState() + _, _ = s.evaluateTransaction(1, 0) + isNew, err := s.evaluateTransaction(0, 0) + require.NoError(t, err) + require.True(t, isNew) +} + +func TestMTSSchedule_LastCommittedUninitIsNewGroup(t *testing.T) { + s := newMTSScheduleState() + _, _ = s.evaluateTransaction(1, 0) + isNew, err := s.evaluateTransaction(2, 0) + require.NoError(t, err) + require.True(t, isNew) +} + +func TestMTSSchedule_GapSuccessorIsNewGroup(t *testing.T) { + s := newMTSScheduleState() + _, _ = s.evaluateTransaction(5, 4) + isNew, err := s.evaluateTransaction(8, 7) + require.NoError(t, err) + require.True(t, isNew) +} + +func TestMTSSchedule_NormalParallelNotNewGroup(t *testing.T) { + s := newMTSScheduleState() + _, _ = s.evaluateTransaction(5, 4) + isNew, err := s.evaluateTransaction(6, 5) + require.NoError(t, err) + require.False(t, isNew) +} + +func TestMTSSchedule_SameSequenceNumberTwiceNoError(t *testing.T) { + s := newMTSScheduleState() + _, err := s.evaluateTransaction(5, 4) + require.NoError(t, err) + // Simulates mis-invocation per row within one transaction; must not trip seq check. + _, err = s.evaluateTransaction(5, 4) + require.NoError(t, err) +} + +func TestMTSSchedule_SequenceLeqLastCommittedErrors(t *testing.T) { + s := newMTSScheduleState() + _, err := s.evaluateTransaction(5, 6) + require.Error(t, err) + require.Contains(t, err.Error(), "inconsistent timestamps") +} + +func TestMTSSchedule_SequenceResetStartsNewEpoch(t *testing.T) { + s := newMTSScheduleState() + _, err := s.evaluateTransaction(534998, 534997) + require.NoError(t, err) + + isNew, err := s.evaluateTransaction(1, 0) + require.NoError(t, err) + require.True(t, isNew) + require.True(t, s.consumeEpochReset()) +} diff --git a/localtests/mts-sysbench/create.sql b/localtests/mts-sysbench/create.sql new file mode 100644 index 000000000..a73038283 --- /dev/null +++ b/localtests/mts-sysbench/create.sql @@ -0,0 +1,2 @@ +/* sbtest1 is created by sysbench prepare in test.sh */ +drop table if exists sbtest1; diff --git a/localtests/mts-sysbench/extra_args b/localtests/mts-sysbench/extra_args new file mode 100644 index 000000000..94877dcee --- /dev/null +++ b/localtests/mts-sysbench/extra_args @@ -0,0 +1 @@ +--gtid --num-workers=4 diff --git a/localtests/mts-sysbench/gtid_mode b/localtests/mts-sysbench/gtid_mode new file mode 100644 index 000000000..76371f28f --- /dev/null +++ b/localtests/mts-sysbench/gtid_mode @@ -0,0 +1 @@ +ON diff --git a/localtests/mts-sysbench/test.sh b/localtests/mts-sysbench/test.sh new file mode 100755 index 000000000..61a45174c --- /dev/null +++ b/localtests/mts-sysbench/test.sh @@ -0,0 +1,172 @@ +#!/bin/bash +# MTS parallel apply under sysbench write load (requires --gtid from test.sh -g). + +set -euo pipefail + +if ! command -v sysbench &>/dev/null; then + echo "sysbench not installed; skip" + exit 0 +fi + +table_name="sbtest1" +ghost_table_name="_${table_name}_gho" +# --test-on-replica (from test.sh) cut-over then swap-reverts: migrated data stays on +# _gho, original name sbtest1 is restored. Compare like standard localtests, not _del. + +sysbench_prepare() { + sysbench oltp_write_only \ + --mysql-host="$master_host" \ + --mysql-port="$master_port" \ + --mysql-user=root \ + --mysql-password=opensesame \ + --mysql-db=test \ + --tables=1 \ + --table-size=5000 \ + prepare +} + +# Load tuned for CI: 8 threads, 45s, rate=1200 trx/s (PR #1454 scale). Unbounded +# 16-thread load pegs the MTS backlog and exceeds the docker test timeout. +sysbench_run_cmd() { + echo "sysbench oltp_write_only \ + --mysql-host=$master_host \ + --mysql-port=$master_port \ + --mysql-user=root \ + --mysql-password=opensesame \ + --mysql-db=test \ + --rand-seed=42 \ + --tables=1 \ + --threads=8 \ + --time=45 \ + --rate=1200 \ + run" +} + +echo "Preparing sysbench sbtest1..." +sysbench_prepare "$master_host" "$master_port" + +build_ghost_command + +echo_dot +echo >"$test_logfile" + +# --- Binlog rotation detection (PR #1454 root cause) --- +binlogs_before=$(gh-ost-test-mysql-master -Nse "SHOW BINARY LOGS" 2>/dev/null | wc -l | tr -d ' ') + +# Start sysbench write load FIRST so writes happen during the entire migration +eval "$(sysbench_run_cmd)" & +sysbench_pid=$! +echo "Started sysbench PID=$sysbench_pid" +sleep 2 + +bash -c "$cmd" >>"$test_logfile" 2>&1 & +ghost_pid=$! + +# Allow time for sysbench (45s) plus backlog drain and cut-over under MTS. +for _ in $(seq 1 300); do + if ! ps -p "$ghost_pid" >/dev/null 2>&1; then + break + fi + if grep -q "Done migrating" "$test_logfile" 2>/dev/null; then + break + fi + sleep 1 + echo_dot +done + +if ps -p "$sysbench_pid" >/dev/null 2>&1; then + kill "$sysbench_pid" 2>/dev/null || true +fi +wait "$ghost_pid" 2>/dev/null || true +execution_result=$? + +if [ "$execution_result" -ne 0 ]; then + echo + echo "ERROR mts-sysbench gh-ost failed" + tail -n 80 "$test_logfile" + exit 1 +fi + +if ! grep -q "Done migrating" "$test_logfile"; then + echo + echo "ERROR mts-sysbench: migration did not complete" + tail -n 80 "$test_logfile" + exit 1 +fi + +if ! grep -q "Starting MTS mode with 4 workers" "$test_logfile"; then + echo + echo "ERROR mts-sysbench: expected MTS mode in log (need --gtid and MySQL 5.7+ logical timestamps)" + tail -n 80 "$test_logfile" + exit 1 +fi + +# --- Binlog rotation report --- +binlogs_after=$(gh-ost-test-mysql-master -Nse "SHOW BINARY LOGS" 2>/dev/null | wc -l | tr -d ' ') +binlog_rotations=$((binlogs_after - binlogs_before)) +if [ "$binlog_rotations" -gt 0 ]; then + echo " [binlog] ${binlog_rotations} rotation(s) detected during migration" +else + echo " [binlog] no rotation detected (reduce max_binlog_size for stricter test)" +fi + +# --- Triple verification: row count + CHECKSUM TABLE + md5sum --- +# Replica after --test-on-replica: compare original table vs ghost table (see test.sh). + +# 1. Row count +orig_count=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -Nse "SELECT COUNT(*) FROM ${table_name}" 2>/dev/null || echo "MISSING") +ghost_count=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -Nse "SELECT COUNT(*) FROM ${ghost_table_name}" 2>/dev/null || echo "MISSING") + +if [ "$orig_count" = "MISSING" ] || [ "$ghost_count" = "MISSING" ]; then + echo "ERROR mts-sysbench: table missing orig=${orig_count} ghost=${ghost_count}" + exit 1 +fi +if [ "$orig_count" != "$ghost_count" ]; then + echo "ERROR mts-sysbench: row count mismatch orig=${orig_count} ghost=${ghost_count}" + exit 1 +fi +echo " [verify] row count match: ${orig_count}" + +# 2. CHECKSUM TABLE +orig_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -Nse "CHECKSUM TABLE ${table_name}" 2>/dev/null | awk '{print $NF}') +ghost_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -Nse "CHECKSUM TABLE ${ghost_table_name}" 2>/dev/null | awk '{print $NF}') +if [ "$orig_checksum" != "$ghost_checksum" ]; then + echo "ERROR mts-sysbench: CHECKSUM TABLE mismatch orig=${orig_checksum} ghost=${ghost_checksum}" + exit 1 +fi +echo " [verify] CHECKSUM TABLE match: ${orig_checksum}" + +# 3. md5sum sorted content +orig_columns="id,k,c,pad" +ghost_columns="$orig_columns" +order_by="order by id" + +gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -e "select ${orig_columns} from ${table_name} ${order_by}" -ss >"$orig_content_output_file" +gh-ost-test-mysql-replica --default-character-set=utf8mb4 test \ + -e "select ${ghost_columns} from ${ghost_table_name} ${order_by}" -ss >"$ghost_content_output_file" + +orig_checksum=$(md5sum <"$orig_content_output_file") +ghost_checksum=$(md5sum <"$ghost_content_output_file") + +if [ "$orig_checksum" != "$ghost_checksum" ]; then + echo "ERROR mts-sysbench: md5sum mismatch" + diff "$orig_content_output_file" "$ghost_content_output_file" | head -n 40 + exit 1 +fi +echo " [verify] md5sum match: ${orig_checksum%% *}" + +sysbench oltp_write_only \ + --mysql-host="$master_host" \ + --mysql-port="$master_port" \ + --mysql-user=root \ + --mysql-password=opensesame \ + --mysql-db=test \ + --tables=1 \ + cleanup >/dev/null 2>&1 || true + +exit 0 diff --git a/localtests/mts-sysbench/timeout b/localtests/mts-sysbench/timeout new file mode 100644 index 000000000..697cb3a26 --- /dev/null +++ b/localtests/mts-sysbench/timeout @@ -0,0 +1 @@ +300 diff --git a/localtests/test.sh b/localtests/test.sh index d918d473b..bb96650ee 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -329,6 +329,12 @@ test_single() { table_name="gh_ost_test" ghost_table_name="_gh_ost_test_gho" + # Per-test timeout override (e.g. mts-sysbench needs more than 120s). + local test_timeout_for_run=$test_timeout + if [ -f $tests_path/$test_name/timeout ]; then + test_timeout_for_run=$(cat $tests_path/$test_name/timeout) + fi + # Check for custom test script if [ -f $tests_path/$test_name/test.sh ]; then # Run the custom test script in a subshell with timeout monitoring @@ -339,7 +345,7 @@ test_single() { # Monitor the test with timeout timeout_counter=0 while kill -0 $test_pid 2>/dev/null; do - if [ $timeout_counter -ge $test_timeout ]; then + if [ $timeout_counter -ge $test_timeout_for_run ]; then kill -TERM $test_pid 2>/dev/null sleep 1 kill -KILL $test_pid 2>/dev/null diff --git a/script/mts-analyze-log.sh b/script/mts-analyze-log.sh new file mode 100755 index 000000000..c7b66f45e --- /dev/null +++ b/script/mts-analyze-log.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# Parse gh-ost MTS test log for timeline, throughput, and stall detection. +# Usage: ./script/mts-analyze-log.sh [/tmp/gh-ost-mts-sysbench.log] + +set -euo pipefail + +LOG="${1:-/tmp/gh-ost-mts-sysbench.log}" +if [ ! -f "$LOG" ]; then + echo "Log not found: $LOG" + exit 1 +fi + +echo "=== gh-ost MTS log analysis ===" +echo "file: $LOG ($(wc -l <"$LOG") lines)" +echo "" + +echo "--- Milestones ---" +grep -E 'Starting MTS|Row copy complete|Done migrating|cut-over|Cut-over|AllEventsUpToLock|ERROR|aborted|falling back' "$LOG" || true +echo "" + +echo "--- Applied progression (unique values) ---" +grep -oE 'Applied: [0-9]+' "$LOG" | awk '{print $2}' | sort -n | uniq | tr '\n' ' ' +echo "" +echo "" + +echo "--- Backlog distribution (top) ---" +grep -oE 'Backlog: [0-9]+' "$LOG" | awk '{print $2}' | sort -n | uniq -c | sort -rn | head -10 +echo "" + +echo "--- Timing ---" +start=$(grep -m1 'Migration started at' "$LOG" | sed 's/# //') +rowcopy=$(grep -m1 'Row copy complete' "$LOG" || true) +done=$(grep -m1 'Done migrating' "$LOG" || true) +echo "start: ${start:-n/a}" +echo "row copy: ${rowcopy:-NOT REACHED}" +echo "done: ${done:-NOT REACHED}" +echo "" + +if grep -q 'Row copy complete' "$LOG"; then + rc_line=$(grep -m1 'Time: [0-9]+s\(total\)' "$LOG" | head -1) + first_applied=$(grep -m1 'Applied: [1-9]' "$LOG" || true) + last_applied=$(grep 'Applied:' "$LOG" | tail -1) + echo "first status after copy: $(grep -m1 'Row copy complete' -A1 "$LOG" | grep Copy || true)" + echo "last status: $last_applied" +fi + +max_backlog=$(grep -oE 'Backlog: [0-9]+' "$LOG" | awk '{print $2}' | sort -n | tail -1) +final_applied=$(grep -oE 'Applied: [0-9]+' "$LOG" | tail -1 | awk '{print $2}') +echo "" +echo "max backlog: ${max_backlog:-0}" +echo "final applied counter: ${final_applied:-0}" +echo "" + +if ! grep -q 'Done migrating' "$LOG"; then + echo "WARN: migration did not finish — consistency check invalid until cut-over completes." +fi diff --git a/script/mts-benchmark-compare.sh b/script/mts-benchmark-compare.sh new file mode 100755 index 000000000..f3ebeb72b --- /dev/null +++ b/script/mts-benchmark-compare.sh @@ -0,0 +1,355 @@ +#!/bin/bash +# Compare gh-ost under heavy DML load: --num-workers=1 vs --num-workers=4. +# +# - 50000 rows + 16 sysbench threads to overwhelm single-threaded apply +# - Preserves _old table and creates pre-migration snapshot for data verification +# - Multi-checkpoint consistency checks (pre-cutover + post-cutover + snapshot) +# - Reports throughput ratio, stall count, backlog distribution +# +# Usage: SYSBENCH_CATCHUP_SEC=45 ./script/mts-benchmark-compare.sh + +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT" + +MYSQL_HOST="${MYSQL_HOST:-127.0.0.1}" +MYSQL_PORT="${MYSQL_PORT:-3306}" +MYSQL_USER="${MYSQL_USER:-root}" +MYSQL_DB="${MYSQL_DB:-test}" +SYSBENCH_TABLE_SIZE="${SYSBENCH_TABLE_SIZE:-50000}" +SYSBENCH_THREADS="${SYSBENCH_THREADS:-16}" +SYSBENCH_CATCHUP_SEC="${SYSBENCH_CATCHUP_SEC:-60}" +GHOST_BIN="${GHOST_BIN:-/tmp/gh-ost-test}" +TABLE=sbtest1 +GHOST="_${TABLE}_gho" +OLD="_${TABLE}_old" +SNAP="${TABLE}_snapshot" +POSTPONE_FILE="${POSTPONE_FILE:-/tmp/gh-ost.mts.postpone}" +REPORT="/tmp/gh-ost-bench-report.txt" + +mysql_cli() { mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" "$@"; } +sysbench_cli() { sysbench "$@" --mysql-host="$MYSQL_HOST" --mysql-port="$MYSQL_PORT" --mysql-user="$MYSQL_USER" --mysql-db="$MYSQL_DB"; } + +[ -x "$GHOST_BIN" ] || go build -o "$GHOST_BIN" ./go/cmd/gh-ost +command -v sysbench >/dev/null || { echo "brew install sysbench"; exit 1; } + +# ── helpers ────────────────────────────────────────────────────────────── + +prepare_table() { + rm -f /tmp/gh-ost.bench-nw*.sock "$POSTPONE_FILE" 2>/dev/null || true + mysql_cli -e "DROP TABLE IF EXISTS \`${MYSQL_DB}\`.\`${SNAP}\`" 2>/dev/null || true + mysql_cli -e "DROP TABLE IF EXISTS \`${MYSQL_DB}\`.\`${OLD}\`" 2>/dev/null || true + sysbench_cli oltp_write_only --tables=1 --table-size="$SYSBENCH_TABLE_SIZE" cleanup 2>/dev/null || true + sysbench_cli oltp_write_only --tables=1 --table-size="$SYSBENCH_TABLE_SIZE" prepare +} + +# Snapshot the original table before migration for post-cutover verification. +create_snapshot() { + mysql_cli -e " + DROP TABLE IF EXISTS \`${MYSQL_DB}\`.\`${SNAP}\`; + CREATE TABLE \`${MYSQL_DB}\`.\`${SNAP}\` LIKE \`${MYSQL_DB}\`.\`${TABLE}\`; + INSERT INTO \`${MYSQL_DB}\`.\`${SNAP}\` SELECT * FROM \`${MYSQL_DB}\`.\`${TABLE}\`; + " + echo "Snapshot created: ${SNAP} ($(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${SNAP}\`") rows)" +} + +# Compare original vs ghost during postpone (catch-up verification). +checksum_before_cutover() { + mysql_cli -Nse " + SELECT CONCAT( + (SELECT COUNT(*) FROM ${MYSQL_DB}.${TABLE}), + ':', + (SELECT COUNT(*) FROM ${MYSQL_DB}.${GHOST}), + ':', + (SELECT COUNT(*) FROM ${MYSQL_DB}.${TABLE} o + LEFT JOIN ${MYSQL_DB}.${GHOST} g ON o.id=g.id WHERE g.id IS NULL), + ':', + (SELECT COUNT(*) FROM ${MYSQL_DB}.${GHOST} g + LEFT JOIN ${MYSQL_DB}.${TABLE} o ON o.id=g.id WHERE o.id IS NULL) + )" 2>/dev/null || echo "n/a" +} + +# After cutover: compare _old (frozen original) vs new table + snapshot. +verify_post_cutover() { + local nw=$1 log=$2 + echo "--- Post-cutover verification (nw=$nw) ---" + + # Check _old table exists + if ! mysql_cli -Nse "SELECT 1 FROM \`${MYSQL_DB}\`.\`${OLD}\` LIMIT 1" &>/dev/null; then + echo "WARN: ${OLD} table not found (cutover may have dropped it)" + return + fi + + local old_count new_count snap_count + old_count=$(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${OLD}\`") + new_count=$(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${TABLE}\`") + snap_count=$(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${SNAP}\`" 2>/dev/null || echo "n/a") + + echo " rows: old=${old_count} new=${new_count} snapshot=${snap_count}" + + # _old should match snapshot (original was frozen at lock time; snapshot taken + # before sysbench load, so _old may have more rows from load before lock). + if [ "$snap_count" != "n/a" ]; then + local snap_only old_only + snap_only=$(mysql_cli -Nse " + SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${SNAP}\` s + LEFT JOIN \`${MYSQL_DB}\`.\`${OLD}\` o ON s.id=o.id WHERE o.id IS NULL" 2>/dev/null || echo "?") + old_only=$(mysql_cli -Nse " + SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${OLD}\` o + LEFT JOIN \`${MYSQL_DB}\`.\`${SNAP}\` s ON o.id=s.id WHERE s.id IS NULL" 2>/dev/null || echo "?") + echo " snapshot_vs_old: only_in_snapshot=${snap_only} only_in_old=${old_only}" + fi + + # New table must have every row from _old (no data loss). + local missing_from_new + missing_from_new=$(mysql_cli -Nse " + SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${OLD}\` o + LEFT JOIN \`${MYSQL_DB}\`.\`${TABLE}\` n ON o.id=n.id WHERE n.id IS NULL" 2>/dev/null || echo "?") + echo " old_vs_new: missing_from_new=${missing_from_new}" + if [ "$missing_from_new" != "0" ] && [ "$missing_from_new" != "?" ]; then + echo " ERROR: ${missing_from_new} rows in _old but missing from new table — data loss!" + fi + + # Value-level check on a sample: compare k,c,pad for matching IDs. + local value_mismatch + value_mismatch=$(mysql_cli -Nse " + SELECT COUNT(*) FROM ( + SELECT o.id FROM \`${MYSQL_DB}\`.\`${OLD}\` o + JOIN \`${MYSQL_DB}\`.\`${TABLE}\` n ON o.id=n.id + WHERE o.k != n.k OR o.c != n.c OR o.pad != n.pad + ) t" 2>/dev/null || echo "?") + echo " value_mismatch_on_matching_ids: ${value_mismatch}" + if [ "$value_mismatch" != "0" ] && [ "$value_mismatch" != "?" ]; then + echo " NOTE: value mismatches are expected if UPDATEs occurred after snapshot but before lock" + fi + + echo " Post-cutover verification done" +} + +# Parse log for metrics. +parse_log_metrics() { + local log=$1 + local final_applied max_backlog max_hblag stall_count + final_applied=$(grep -oE 'Applied: [0-9]+' "$log" | tail -1 | awk '{print $2}') + max_backlog=$(grep -oE 'Backlog: [0-9]+' "$log" | awk '{print $2}' | sort -n | tail -1) + + # Heartbeat lag during catch-up + local catchup_log + catchup_log=$(mktemp) + if grep -q 'Row copy complete' "$log"; then + awk '/Row copy complete/,/Done migrating/' "$log" >"$catchup_log" || cp "$log" "$catchup_log" + else + cp "$log" "$catchup_log" + fi + max_hblag=$(grep -oE 'HeartbeatLag: [0-9.]+s' "$catchup_log" | sed 's/HeartbeatLag: //;s/s//' | sort -n | tail -1) + + # Stall detection: count status lines where Applied stayed at same value + stall_count=$(grep -oE 'Applied: [0-9]+' "$catchup_log" | awk '{print $2}' | uniq -d | wc -l | tr -d ' ') + + rm -f "$catchup_log" + echo "${final_applied:-0} ${max_backlog:-0} ${max_hblag:-0} ${stall_count:-0}" +} + +# Analyze log for MTS feature evidence. +analyze_features() { + local log=$1 nw=$2 + echo "--- Feature coverage (nw=$nw) ---" + + if [ "$nw" -gt 1 ]; then + if grep -q 'Starting MTS mode' "$log"; then + echo " [PASS] MTS mode started" + else + echo " [FAIL] MTS mode NOT started (missing logical timestamps?)" + fi + else + if grep -q 'Starting MTS mode' "$log"; then + echo " [WARN] MTS mode started for nw=1 (unexpected)" + else + echo " [PASS] Single-threaded mode (no MTS)" + fi + fi + + if grep -q 'Done migrating' "$log"; then + echo " [PASS] Migration completed" + else + echo " [FAIL] Migration did NOT complete" + fi + + if grep -qi 'cut-over' "$log"; then + echo " [PASS] Cutover phase reached" + else + echo " [FAIL] Cutover not reached" + fi + + # Deadlock evidence (only meaningful for nw>1) + if [ "$nw" -gt 1 ]; then + local deadlock_count + deadlock_count=$(grep -ci 'deadlock\|1213' "$log" 2>/dev/null || echo "0") + echo " [INFO] Deadlock mentions in log: ${deadlock_count}" + fi + + # Backlog pressure + local bl_above_500 + bl_above_500=$(grep -oE 'Backlog: [0-9]+' "$log" | awk '{print $2}' | awk '$1>500' | wc -l | tr -d ' ') + echo " [INFO] Status lines with backlog>500: ${bl_above_500}" + + echo " Feature analysis done" +} + +# ── main run ───────────────────────────────────────────────────────────── + +run_one() { + local nw=$1 + local log="/tmp/gh-ost-bench-nw${nw}.log" + local t0 t_load_start t_load_end + + prepare_table + create_snapshot + touch "$POSTPONE_FILE" + : >"$log" + t0=$(date +%s) + + echo "Starting gh-ost nw=$nw (table_size=$SYSBENCH_TABLE_SIZE, catchup=${SYSBENCH_CATCHUP_SEC}s)..." + GOTRACEBACK=crash "$GHOST_BIN" \ + --host="$MYSQL_HOST" --port="$MYSQL_PORT" --user="$MYSQL_USER" \ + --password="${MYSQL_PWD:-}" \ + --allow-on-master --database="$MYSQL_DB" --table="$TABLE" \ + --alter="engine=InnoDB" --gtid --num-workers="$nw" \ + --assume-rbr --skip-metadata-lock-check \ + --initially-drop-ghost-table --initially-drop-old-table \ + --chunk-size=100 --dml-batch-size=10 \ + --max-lag-millis=5000 --default-retries=10 \ + --postpone-cut-over-flag-file="$POSTPONE_FILE" \ + --serve-socket-file=/tmp/gh-ost.bench-nw${nw}.sock \ + --initially-drop-socket-file --verbose --execute \ + >>"$log" 2>&1 & + local pid=$! + + # Wait for row copy to complete + for _ in $(seq 1 300); do + grep -q 'Row copy complete' "$log" 2>/dev/null && break + ps -p "$pid" >/dev/null || break + sleep 1 + done + + if ! grep -q 'Row copy complete' "$log" 2>/dev/null; then + echo "ERROR: Row copy did not complete for nw=$nw" + kill "$pid" 2>/dev/null || true + wait "$pid" 2>/dev/null || true + tail -n 30 "$log" + return 1 + fi + + # Start sysbench DML load + t_load_start=$(date +%s) + sysbench_cli oltp_write_only --tables=1 --threads="$SYSBENCH_THREADS" \ + --time="$SYSBENCH_CATCHUP_SEC" --rate=0 --rand-seed=42 run & + local sb_pid=$! + + # Monitor applied progress during load + sleep "$SYSBENCH_CATCHUP_SEC" + kill "$sb_pid" 2>/dev/null || true + wait "$sb_pid" 2>/dev/null || true + t_load_end=$(date +%s) + + # Checksum while still in postpone (original vs ghost) + local cs + cs=$(checksum_before_cutover) + local applied_at_load_end + applied_at_load_end=$(grep -oE 'Applied: [0-9]+' "$log" | tail -1 | awk '{print $2}') + + # Release postpone → cutover + rm -f "$POSTPONE_FILE" + echo "Released postpone-cut-over (nw=$nw); waiting for cut-over..." + + for _ in $(seq 1 120); do + grep -q 'Done migrating' "$log" 2>/dev/null && break + ps -p "$pid" >/dev/null || break + sleep 1 + done + wait "$pid" 2>/dev/null || true + local t_done + t_done=$(date +%s) + + # Parse metrics + read -r final_applied max_backlog max_hblag stall_count <<<"$(parse_log_metrics "$log")" + local load_sec=$((t_load_end - t_load_start)) + local total_sec=$((t_done - t0)) + local throughput=0 + if [ "$load_sec" -gt 0 ] && [ "${applied_at_load_end:-0}" -gt 0 ]; then + throughput=$((applied_at_load_end / load_sec)) + fi + + # Post-cutover verification + verify_post_cutover "$nw" "$log" + analyze_features "$log" "$nw" + + # Cleanup snapshot for next run + mysql_cli -e "DROP TABLE IF EXISTS \`${MYSQL_DB}\`.\`${SNAP}\`" 2>/dev/null || true + + # Report + echo "" + echo "=== nw=$nw RESULT ===" + echo " total_sec=$total_sec load_sec=$load_sec" + echo " applied_at_load_end=$applied_at_load_end final_applied=$final_applied" + echo " throughput=${throughput}/s max_backlog=$max_backlog max_heartbeat_lag_s=$max_hblag stall_count=$stall_count" + echo " checksum(orig:ghost:only_orig:only_ghost)=$cs" + echo " log=$log" + echo "" + + echo "nw=$nw total_sec=$total_sec load_sec=$load_sec applied_at_load_end=$applied_at_load_end final_applied=$final_applied throughput=${throughput}/s max_backlog=$max_backlog max_hblag=${max_hblag}s stalls=$stall_count checksum=$cs" >>"$REPORT" +} + +# ── entry point ────────────────────────────────────────────────────────── + +: >"$REPORT" +echo "=== gh-ost MTS benchmark $(date) ===" | tee -a "$REPORT" +echo "table_size=$SYSBENCH_TABLE_SIZE threads=$SYSBENCH_THREADS catchup_sec=$SYSBENCH_CATCHUP_SEC" | tee -a "$REPORT" +echo "" + +NW1_RESULT="" +NW4_RESULT="" + +echo ">>> Running nw=1 (single-threaded baseline)..." +if run_one 1; then + NW1_RESULT="ok" +else + NW1_RESULT="FAIL" + echo "nw=1 FAILED, skipping nw=4 comparison" +fi + +echo "" +echo ">>> Running nw=4 (MTS parallel apply)..." +if run_one 4; then + NW4_RESULT="ok" +else + NW4_RESULT="FAIL" +fi + +# ── summary ────────────────────────────────────────────────────────────── + +echo "" +echo "============================================" +echo " BENCHMARK SUMMARY" +echo "============================================" +cat "$REPORT" + +if [ "$NW1_RESULT" = "ok" ] && [ "$NW4_RESULT" = "ok" ]; then + # Extract applied counts for ratio + nw1_applied=$(grep '^nw=1' "$REPORT" | tail -1 | grep -oE 'applied_at_load_end=[0-9]+' | head -1 | cut -d= -f2) + nw4_applied=$(grep '^nw=4' "$REPORT" | tail -1 | grep -oE 'applied_at_load_end=[0-9]+' | head -1 | cut -d= -f2) + nw1_bl=$(grep '^nw=1' "$REPORT" | tail -1 | grep -oE 'max_backlog=[0-9]+' | head -1 | cut -d= -f2) + nw4_bl=$(grep '^nw=4' "$REPORT" | tail -1 | grep -oE 'max_backlog=[0-9]+' | head -1 | cut -d= -f2) + + echo "" + echo "nw=1 applied during load: ${nw1_applied:-?}, max_backlog: ${nw1_bl:-?}" + echo "nw=4 applied during load: ${nw4_applied:-?}, max_backlog: ${nw4_bl:-?}" + + if [ -n "$nw1_applied" ] && [ -n "$nw4_applied" ] && [ "$nw1_applied" -gt 0 ]; then + echo "nw=4/nw=1 throughput ratio: $((nw4_applied / nw1_applied))x" + fi +fi + +echo "" +echo "Full report: $REPORT" diff --git a/script/mts-consistency-test.sh b/script/mts-consistency-test.sh new file mode 100755 index 000000000..8f64a75d2 --- /dev/null +++ b/script/mts-consistency-test.sh @@ -0,0 +1,509 @@ +#!/bin/bash +# MTS data consistency proof test for gh-ost PR #1692. +# +# Proves that multi-threaded DML apply preserves data consistency by: +# 1. Running sysbench write load at high TPS to trigger binlog rotation +# 2. Verifying binlog rotation actually occurred (the root cause of PR #1454's bug) +# 3. Running N iterations per worker count (2, 4, 8) +# 4. Triple verification: row count + CHECKSUM TABLE + md5sum sorted content +# +# Based on findings from PR #1454 (meiji163, dnovitski): +# https://github.com/github/gh-ost/pull/1454 +# +# Prerequisites (macOS): +# brew install mysql sysbench +# MySQL must have: binlog_format=ROW, log_bin=ON, gtid_mode=ON +# +# Usage: +# export MYSQL_PWD='your_root_password' +# ./script/mts-consistency-test.sh +# +# Optional env: +# MYSQL_HOST=127.0.0.1 MYSQL_PORT=3306 MYSQL_USER=root MYSQL_DB=test +# ITERATIONS=5 WORKER_COUNTS="2 4 8" SYSBENCH_THREADS=8 +# SYSBENCH_TIME=120 GHOST_BIN=/tmp/gh-ost-test +# SKIP_BUILD=1 SKIP_UNIT_TESTS=1 + +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT" + +# --- Configurable parameters --- +MYSQL_HOST="${MYSQL_HOST:-127.0.0.1}" +MYSQL_PORT="${MYSQL_PORT:-3306}" +MYSQL_USER="${MYSQL_USER:-root}" +MYSQL_DB="${MYSQL_DB:-test}" +NUM_WORKERS="${NUM_WORKERS:-4}" +ITERATIONS="${ITERATIONS:-5}" +WORKER_COUNTS="${WORKER_COUNTS:-"2 4"}" +# Default 8 threads: enough write concurrency to exercise MTS without pegging the +# coordinator backlog at capacity for the whole run (16 threads + rate=0 often +# outpaces 4 workers on a single local MySQL). Override for stress: SYSBENCH_THREADS=16 +SYSBENCH_THREADS="${SYSBENCH_THREADS:-8}" +SYSBENCH_TABLE_SIZE="${SYSBENCH_TABLE_SIZE:-50000}" +SYSBENCH_TIME="${SYSBENCH_TIME:-90}" +MIGRATION_TIMEOUT="${MIGRATION_TIMEOUT:-600}" +GHOST_BIN="${GHOST_BIN:-/tmp/gh-ost-test}" +TABLE_NAME="${TABLE_NAME:-sbtest1}" +GHOST_TABLE="_${TABLE_NAME}_gho" +# gh-ost renames original to _del when --ok-to-drop-table is false (default) +OLD_TABLE="_${TABLE_NAME}_del" +TEST_LOG="/tmp/gh-ost-mts-consistency.log" +REPORT_FILE="/tmp/gh-ost-mts-consistency-report.txt" +POSTPONE_FILE="/tmp/gh-ost-mts-consistency.postpone" + +# --- Colors --- +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +CYAN='\033[0;36m' +NC='\033[0m' # No Color + +# --- Summary tracking --- +TOTAL_RUNS=0 +TOTAL_PASS=0 +TOTAL_FAIL=0 +FAILED_CONFIGS="" + +# --- Helper functions --- +mysql_cli() { + mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" "$@" 2>/dev/null +} + +sysbench_cli() { + sysbench "$@" \ + --mysql-host="$MYSQL_HOST" \ + --mysql-port="$MYSQL_PORT" \ + --mysql-user="$MYSQL_USER" \ + --mysql-db="$MYSQL_DB" +} + +log_info() { + echo -e "${CYAN}[INFO]${NC} $*" +} + +log_pass() { + echo -e "${GREEN}[PASS]${NC} $*" +} + +log_fail() { + echo -e "${RED}[FAIL]${NC} $*" +} + +log_warn() { + echo -e "${YELLOW}[WARN]${NC} $*" +} + +section() { + echo "" + echo -e "${CYAN}======================================================================${NC}" + echo -e "${CYAN} $*${NC}" + echo -e "${CYAN}======================================================================${NC}" +} + +# --- Pre-flight checks --- +preflight() { + section "Pre-flight checks" + + if ! command -v sysbench &>/dev/null; then + log_fail "sysbench not found. Install: brew install sysbench" + exit 1 + fi + log_pass "sysbench: $(sysbench --version 2>&1)" + + if [ "${SKIP_BUILD:-0}" != "1" ]; then + log_info "Building gh-ost -> $GHOST_BIN" + go build -o "$GHOST_BIN" ./go/cmd/gh-ost + fi + log_pass "gh-ost binary: $GHOST_BIN" + + if ! mysql_cli -e "SELECT 1" &>/dev/null; then + log_fail "Cannot connect to MySQL at ${MYSQL_HOST}:${MYSQL_PORT}" + exit 1 + fi + log_pass "MySQL connection: ${MYSQL_HOST}:${MYSQL_PORT}" + + local version gtid_mode binlog_format log_bin + read -r version gtid_mode binlog_format log_bin <<<"$( + mysql_cli -Nse " + SELECT @@version, + @@global.gtid_mode, + @@global.binlog_format, + @@global.log_bin + " 2>/dev/null | tr '\t' ' ' + )" + echo " version=$version gtid_mode=$gtid_mode binlog_format=$binlog_format log_bin=$log_bin" + + if [ "$binlog_format" != "ROW" ]; then + log_fail "binlog_format must be ROW (current: $binlog_format)" + exit 1 + fi + if [ "$gtid_mode" != "ON" ]; then + log_fail "gtid_mode must be ON for MTS (current: $gtid_mode)" + exit 1 + fi + + local max_binlog_size + max_binlog_size=$(mysql_cli -Nse "SELECT @@global.max_binlog_size") + log_info "max_binlog_size=${max_binlog_size} ($(numfmt --to=iec "$max_binlog_size" 2>/dev/null || echo "${max_binlog_size} bytes"))" + + if [ "${SKIP_UNIT_TESTS:-0}" != "1" ]; then + log_info "Running MTS unit tests..." + go test ./go/logic/ -run 'TestCommitBarrier|TestMTSSchedule|TestCollectTransaction|TestNotifyLogical|TestEnqueueApply|TestMigrator_Num|TestAdoptDML|TestApplyMTS|TestIsDeadlock|TestRetryMTS|TestCoordinate' -count=1 -timeout 60s + log_pass "Unit tests passed" + fi +} + +# --- Setup --- +setup_database() { + log_info "Preparing sysbench table ${TABLE_NAME} (${SYSBENCH_TABLE_SIZE} rows)..." + mysql_cli -e "CREATE DATABASE IF NOT EXISTS \`${MYSQL_DB}\`" + sysbench_cli oltp_write_only --tables=1 --table-size="$SYSBENCH_TABLE_SIZE" cleanup 2>/dev/null || true + sysbench_cli oltp_write_only --tables=1 --table-size="$SYSBENCH_TABLE_SIZE" prepare +} + +# --- Checksum verification --- +# Triple verification: row count + CHECKSUM TABLE + md5sum sorted content +verify_consistency() { + local run_id=$1 + local workers=$2 + local iteration=$3 + local errors=0 + + # 1. Row count comparison + local old_count new_count + old_count=$(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${OLD_TABLE}\`" 2>/dev/null || echo "MISSING") + new_count=$(mysql_cli -Nse "SELECT COUNT(*) FROM \`${MYSQL_DB}\`.\`${TABLE_NAME}\`" 2>/dev/null || echo "MISSING") + + if [ "$old_count" = "MISSING" ] || [ "$new_count" = "MISSING" ]; then + log_fail " [verify] Table missing: _old=${old_count} new=${new_count}" + return 1 + fi + + if [ "$old_count" != "$new_count" ]; then + log_fail " [verify] Row count mismatch: _old=${old_count} vs new=${new_count}" + errors=$((errors + 1)) + else + log_pass " [verify] Row count match: ${old_count}" + fi + + # 2. CHECKSUM TABLE comparison. CHECKSUM TABLE returns "