Skip to content
19 changes: 19 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,28 @@ type MigrationContext struct {
SkipMetadataLockCheck bool
IsOpenMetadataLockInstruments bool

// MTS parallel apply configuration
NumWorkers int
BinlogHasLogicalTimestamps bool
LogicalTimestampsDetected chan struct{}
logicalTimestampsDetectOnce sync.Once

Log Logger
}

// NotifyLogicalTimestampsDetection closes LogicalTimestampsDetected once so MTS
// startup can proceed. found=true when binlog carries logical timestamps (MySQL 5.7+).
func (mctx *MigrationContext) NotifyLogicalTimestampsDetection(found bool) {
mctx.logicalTimestampsDetectOnce.Do(func() {
if found {
mctx.BinlogHasLogicalTimestamps = true
}
if mctx.LogicalTimestampsDetected != nil {
close(mctx.LogicalTimestampsDetected)
}
})
}

type Logger interface {
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Expand Down
6 changes: 4 additions & 2 deletions go/binlog/binlog_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

// BinlogEntry describes an entry in the binary log
type BinlogEntry struct {
Coordinates mysql.BinlogCoordinates
DmlEvent *BinlogDMLEvent
Coordinates mysql.BinlogCoordinates
DmlEvent *BinlogDMLEvent
LastCommitted int64 // logical timestamp of commit parent (0 = SEQ_UNINIT, unavailable)
SequenceNumber int64 // monotonically increasing logical timestamp (0 = SEQ_UNINIT)
}

// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
Expand Down
19 changes: 17 additions & 2 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (gmr *GoMySQLReader) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates
return gmr.currentCoordinates.Clone()
}

func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry, lastCommitted int64, sequenceNumber int64) error {
currentCoords := gmr.GetCurrentBinlogCoordinates()
dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
Expand All @@ -98,6 +98,8 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent
continue
}
binlogEntry := NewBinlogEntryAt(currentCoords)
binlogEntry.LastCommitted = lastCommitted
binlogEntry.SequenceNumber = sequenceNumber
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
Expand Down Expand Up @@ -130,6 +132,10 @@ func (gmr *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent

// StreamEvents
func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
var currentLastCommitted int64
var currentSequenceNumber int64
logicalTimestampsDetected := false

for !canStopStreaming() {
ev, err := gmr.binlogStreamer.GetEvent(context.Background())
if err != nil {
Expand All @@ -156,6 +162,14 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
if !gmr.migrationContext.UseGTIDs {
continue
}
// Capture logical timestamps for MTS dependency tracking
currentLastCommitted = event.LastCommitted
currentSequenceNumber = event.SequenceNumber
// Detect whether binlog contains logical timestamps (MySQL 5.7+)
if !logicalTimestampsDetected && (event.LastCommitted > 0 || event.SequenceNumber > 0) {
logicalTimestampsDetected = true
gmr.migrationContext.NotifyLogicalTimestampsDetection(true)
}
sid, err := uuid.FromBytes(event.SID)
if err != nil {
return err
Expand Down Expand Up @@ -184,12 +198,13 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
gmr.LastTrxCoords = gmr.currentCoordinates.Clone()
}
case *replication.RowsEvent:
if err := gmr.handleRowsEvent(ev, event, entriesChannel); err != nil {
if err := gmr.handleRowsEvent(ev, event, entriesChannel, currentLastCommitted, currentSequenceNumber); err != nil {
return err
}
}
}
gmr.migrationContext.Log.Debugf("done streaming events")
gmr.migrationContext.NotifyLogicalTimestampsDetection(logicalTimestampsDetected)

return nil
}
Expand Down
9 changes: 9 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func main() {
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)")
numWorkers := flag.Int("num-workers", 1, "number of parallel DML apply workers (MTS mode). Requires MySQL 5.7+ with binlog logical timestamps. Default: 1 (single-threaded, backward compatible)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
Expand Down Expand Up @@ -377,6 +378,14 @@ func main() {
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
if *numWorkers < 1 {
migrationContext.Log.Warningf("invalid --num-workers=%d; using 1", *numWorkers)
*numWorkers = 1
}
migrationContext.NumWorkers = *numWorkers
if migrationContext.NumWorkers > 1 {
migrationContext.LogicalTimestampsDetected = make(chan struct{})
}
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetThrottleHTTP(*throttleHTTP)
migrationContext.SetIgnoreHTTPErrors(*ignoreHTTPErrors)
Expand Down
56 changes: 48 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,18 @@ func (apl *Applier) releaseMigrationLock() {
apl.migrationLockConn = nil
}

// adoptDMLQueryBuildersFrom shares read-only DML query builders from the primary applier.
// The primary applier must have called prepareQueries() before MTS workers start.
func (apl *Applier) adoptDMLQueryBuildersFrom(source *Applier) error {
if source.dmlInsertQueryBuilder == nil {
return fmt.Errorf("primary applier DML query builders are not prepared")
}
apl.dmlDeleteQueryBuilder = source.dmlDeleteQueryBuilder
apl.dmlInsertQueryBuilder = source.dmlInsertQueryBuilder
apl.dmlUpdateQueryBuilder = source.dmlUpdateQueryBuilder
return nil
}

func (apl *Applier) prepareQueries() (err error) {
if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
Expand Down Expand Up @@ -462,6 +474,31 @@ func (apl *Applier) AttemptInstantDDL() error {
}, apl.migrationContext.MaxRetries(), apl.migrationContext.Log)
}

// isDeadlockError checks whether the given error is a MySQL InnoDB deadlock (errno 1213).
func isDeadlockError(err error) bool {
var mysqlErr *drivermysql.MySQLError
return errors.As(err, &mysqlErr) && mysqlErr.Number == 1213
}

// isRetryableApplyError reports whether a failed DML apply is a transient
// concurrency error that should be retried. Under concurrent MTS workers the
// following errors are expected: InnoDB deadlocks (1213), lock-wait timeouts
// (1205), and NOWAIT lock failures (3572). Gap locks on the ghost table's
// secondary indexes cause contention between parallel statements. All are
// resolved by retrying the whole transaction, mirroring MySQL's
// slave_transaction_retries behaviour.
func isRetryableApplyError(err error) bool {
var mysqlErr *drivermysql.MySQLError
if !errors.As(err, &mysqlErr) {
return false
}
switch mysqlErr.Number {
case 1205, 1213, 3572:
return true
}
return false
}

// retryOnLockWaitTimeout retries the given operation on MySQL lock wait timeout
// (errno 1205). Non-timeout errors return immediately. This is used for instant
// DDL attempts where the operation may be blocked by a long-running transaction.
Expand Down Expand Up @@ -1660,12 +1697,16 @@ func (apl *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlBu
case binlog.UpdateDML:
{
if _, isModified := apl.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
results := make([]*dmlBuildResult, 0, 2)
dmlEvent.DML = binlog.DeleteDML
results = append(results, apl.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, apl.buildDMLEventQuery(dmlEvent)...)
return results
// A unique-key-modifying UPDATE is split into DELETE + INSERT.
// We must NOT mutate dmlEvent.DML here: the same event may be
// re-applied (e.g. MTS deadlock retry), and a mutated DML would
// be rebuilt as the wrong statement, corrupting the ghost table.
deleteQuery, deleteArgs, deleteErr := apl.dmlDeleteQueryBuilder.BuildQuery(dmlEvent.WhereColumnValues.AbstractValues())
insertQuery, insertArgs, insertErr := apl.dmlInsertQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues())
return []*dmlBuildResult{
newDmlBuildResult(deleteQuery, deleteArgs, -1, deleteErr),
newDmlBuildResult(insertQuery, insertArgs, 1, insertErr),
}
}
query, updateArgs, err := apl.dmlUpdateQueryBuilder.BuildQuery(dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
Expand Down Expand Up @@ -1778,9 +1819,8 @@ func (apl *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gos
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
func (apl *Applier) ApplyDMLEventQueries(ctx context.Context, dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
ctx := context.Background()

err := func() error {
conn, err := apl.db.Conn(ctx)
Expand Down
63 changes: 52 additions & 11 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,47 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
require.Equal(t, 123456, res[0].args[2])
require.Equal(t, 42, res[0].args[3])
})

t.Run("update modifying unique key does not mutate event and is retry-safe", func(t *testing.T) {
// A UPDATE that changes the unique key splits into DELETE + INSERT.
// The event must NOT be mutated, so a re-apply (e.g. MTS deadlock retry)
// produces the identical DELETE + INSERT pair rather than a bare INSERT.
whereValues := sql.ToColumnValues([]interface{}{123456, 42})
newValues := sql.ToColumnValues([]interface{}{123456, 99})
binlogEvent := &binlog.BinlogDMLEvent{
DatabaseName: "test",
DML: binlog.UpdateDML,
NewColumnValues: newValues,
WhereColumnValues: whereValues,
}

first := applier.buildDMLEventQuery(binlogEvent)
require.Len(t, first, 2)
require.NoError(t, first[0].err)
require.NoError(t, first[1].err)
require.Equal(t, int64(-1), first[0].rowsDelta)
require.Equal(t, int64(1), first[1].rowsDelta)
require.Contains(t, first[0].query, "delete")
require.Contains(t, first[1].query, "insert")

// Event type is untouched after building the query.
require.Equal(t, binlog.UpdateDML, binlogEvent.DML)

// Re-applying the same event (retry) yields the same DELETE + INSERT.
second := applier.buildDMLEventQuery(binlogEvent)
require.Len(t, second, 2)
require.Equal(t, first[0].query, second[0].query)
require.Equal(t, first[1].query, second[1].query)
require.Equal(t, binlog.UpdateDML, binlogEvent.DML)
})
}

func TestIsRetryableApplyError(t *testing.T) {
require.True(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1213, Message: "Deadlock found"}))
require.True(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1205, Message: "Lock wait timeout exceeded"}))
require.False(t, isRetryableApplyError(&drivermysql.MySQLError{Number: 1146, Message: "Table doesn't exist"}))
require.False(t, isRetryableApplyError(errors.New("generic error")))
require.False(t, isRetryableApplyError(nil))
}

func TestApplierInstantDDL(t *testing.T) {
Expand Down Expand Up @@ -385,7 +426,7 @@ func (suite *ApplierTestSuite) TestApplyDMLEventQueries() {
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
},
}
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err)

// Check that the row was inserted
Expand Down Expand Up @@ -674,7 +715,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
},
}
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
Expand Down Expand Up @@ -918,7 +959,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigration
}

// This should return an error when PanicOnWarnings is enabled
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "Duplicate entry")

Expand Down Expand Up @@ -1008,7 +1049,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateCompositeUniqueKe
}

// This should return an error when PanicOnWarnings is enabled
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "Duplicate entry")

Expand Down Expand Up @@ -1120,7 +1161,7 @@ func (suite *ApplierTestSuite) TestUpdateModifyingUniqueKeyWithDuplicateOnOtherI
suite.Require().Len(buildResults, 2, "UPDATE modifying unique key should be converted to DELETE+INSERT")

// Apply the event - this should FAIL because INSERT will have duplicate email warning
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().Error(err, "Should fail when DELETE+INSERT causes duplicate on non-migration unique key")
suite.Require().Contains(err.Error(), "Duplicate entry", "Error should mention duplicate entry")

Expand Down Expand Up @@ -1209,7 +1250,7 @@ func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() {
suite.Require().Len(buildResults, 1, "Normal UPDATE should generate single UPDATE query")

// Apply the event - should succeed
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err)

// Verify the update was applied correctly
Expand Down Expand Up @@ -1284,7 +1325,7 @@ func (suite *ApplierTestSuite) TestDuplicateOnMigrationKeyAllowedInBinlogReplay(
}

// This should succeed - duplicate on migration unique key is expected and should be filtered out
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err)

// Verify that the ghost table still has only the original 2 rows with correct data
Expand Down Expand Up @@ -1375,7 +1416,7 @@ func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() {
},
}

err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err, "Duplicate on idx+email (migration key) should be allowed with PanicOnWarnings enabled")

// Test: duplicate on PRIMARY (not the migration key) should fail
Expand All @@ -1388,7 +1429,7 @@ func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() {
},
}

err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().Error(err, "Duplicate on PRIMARY (not migration key) should fail with PanicOnWarnings enabled")
suite.Require().Contains(err.Error(), "Duplicate entry")

Expand Down Expand Up @@ -1477,7 +1518,7 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
}

// Should succeed because PanicOnWarnings is disabled
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().NoError(err)

// Verify that only 2 original rows exist with correct data (the duplicate was silently ignored)
Expand Down Expand Up @@ -1583,7 +1624,7 @@ func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
}

// Should fail due to the second event
err = applier.ApplyDMLEventQueries(dmlEvents)
err = applier.ApplyDMLEventQueries(context.Background(), dmlEvents)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "Duplicate entry")

Expand Down
Loading