-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-38691] [cdc connector mysql] Support for MySQL Transaction Boundary Events in Flink CDC Connector #4170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...dc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
Show resolved
Hide resolved
ruanhang1993
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tejanshrana Thanks for the PR.
LGTM totally. @lvyanquan Could you help to review and see if it could be included in 3.6 version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds support for MySQL transaction boundary events (BEGIN/END) in the Flink CDC MySQL connector. Previously, when Debezium's provide.transaction.metadata was enabled, transaction metadata events were logged as "unknown" and skipped, causing loss of transaction boundary information.
Changes:
- Added detection logic for transaction metadata events using the Debezium schema name pattern
- Transaction events now update binlog offsets regardless of configuration
- Added optional emission of transaction events controlled by
includeTransactionMetadataEventsflag (default: false)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| RecordUtils.java | Added constant and detection method for transaction metadata events using keySchema pattern |
| MySqlRecordEmitter.java | Added conditional processing and emission logic for transaction metadata events |
| MySqlSourceConfig.java | Added includeTransactionMetadataEvents configuration field |
| MySqlSourceConfigFactory.java | Added builder method and Debezium property configuration for transaction metadata |
| MySqlSourceBuilder.java | Added public API method to configure transaction metadata event inclusion |
| MySqlSource.java | Updated MySqlRecordEmitter instantiation to pass transaction metadata flag |
| MySqlPipelineRecordEmitter.java | Explicitly disabled transaction metadata events for pipeline connector |
| MySqlRecordEmitterTest.java | Added comprehensive test coverage for transaction metadata event handling |
| MySqlSourceReaderTest.java | Updated test fixtures to pass new constructor parameter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lvyanquan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left one minor suggestion.
.../main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
Outdated
Show resolved
Hide resolved
|
Thanks @tejanshrana's contribution, merged. |
Transaction BEGIN/END events (when provide.transaction.metadata is enabled) are currently treated as unknown and skipped, causing loss of transaction boundaries.
Solution
Logs from before the change:
2025-11-04 14:52:28 2025-11-04 14:52:28,827 INFO org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] - Meet unknown element SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=4541146d-b988-11f0-87f6-0242ac140006:33, ts_sec=1762267948, file=mysql-bin.000003, pos=13381, gtids=4541146d-b988-11f0-87f6-0242ac140006:1-32, server_id=1}} ConnectRecord{topic='mysql_binlog_source.transaction', kafkaPartition=null, key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:33}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:33}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, just skip.After the change, the transaction start and end events look like below:
{"status":"BEGIN","id":"4541146d-b988-11f0-87f6-0242ac140006:47","event_count":null,"data_collections":null} {"status":"END","id":"4541146d-b988-11f0-87f6-0242ac140006:47","event_count":1,"data_collections":[{"data_collection":"sample.sample","event_count":1}]}