Skip to content

sink: add debezium-avro protocol (#5475)#5551

Open
ti-chi-bot wants to merge 2 commits into
pingcap:release-nextgen-202603from
ti-chi-bot:cherry-pick-5475-to-release-nextgen-202603
Open

sink: add debezium-avro protocol (#5475)#5551
ti-chi-bot wants to merge 2 commits into
pingcap:release-nextgen-202603from
ti-chi-bot:cherry-pick-5475-to-release-nextgen-202603

Conversation

@ti-chi-bot

Copy link
Copy Markdown
Member

This is an automated cherry-pick of #5475

What problem does this PR solve?

Issue Number: close #5476

What is changed and how it works?

This PR introduces a new Kafka sink protocol: protocol=debezium-avro.

The new protocol is intended for users who need both:

  1. Debezium-compatible change event semantics.
  2. Confluent Avro binary encoding with Schema Registry integration.

Before this change, TiCDC had two separate protocols that each solved only
one side of the requirement:

  • protocol=debezium emits Debezium-style key/value messages, but the
    messages are JSON and the schema is embedded in each message.
  • protocol=avro emits Confluent Avro messages and registers schemas in
    Schema Registry, but the payload is TiCDC's flat row format rather than a
    Debezium Envelope.

protocol=debezium-avro combines these two capabilities. It reuses the
Debezium codec's event model and field extraction logic, and reuses the
existing Avro Schema Registry registration and Confluent wire-format flow.

Users can enable it with a sink URI like:

 cdc cli changefeed create \
  --server=http://127.0.0.1:8300 \
  --changefeed-id=debezium-avro-demo \
  --sink-uri='kafka://127.0.0.1:9092/ticdc-debezium-avro-glue-demo' \
  --config=/path/to/changefeed-glue.toml

Protocol and configuration

This PR adds ProtocolDebeziumAvro to the sink protocol enum and protocol
parser, so protocol=debezium-avro can be used in Kafka sink URIs and
changefeed configs.

The protocol is treated as an Avro-like protocol where needed, including:

  • requiring Schema Registry configuration;
  • preserving Schema Registry fields in changefeed config cleanup;
  • using Kafka Avro-related validation paths;
  • creating the Debezium Avro encoder/decoder from the codec builder;
  • supporting the bundled cdc_kafka_consumer decode path.

For the MVP, Debezium Avro supports Confluent Schema Registry through
schema-registry and AWS Glue Schema Registry.

The protocol also keeps the existing Avro type-mode options where
applicable:

  • avro-decimal-handling-mode
  • avro-bigint-unsigned-handling-mode

Check List

Tests

  • Unit test
  • Integration test

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

support debezium-avro protocol

Summary by CodeRabbit

Summary

  • New Features
    • Added debezium-avro sink protocol support with Confluent Schema Registry–based encoding/decoding.
    • Extended Avro-like routing, DDL/DML handling, and watermark-related behavior to debezium-avro.
  • Bug Fixes
    • Improved Kafka consumer partition discovery for comma-separated topic lists with retry-based metadata lookup.
    • Retained schema registry settings for Avro-related protocols, including debezium-avro.
  • Tests
    • Expanded unit and integration coverage for debezium-avro, including protocol parsing and schema/encoding behaviors.
  • Chores
    • Updated linter configuration to exclude an additional GoSec finding.

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
@ti-chi-bot ti-chi-bot added do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. type/cherry-pick-for-release-nextgen-202603 labels Jul 1, 2026
@ti-chi-bot

Copy link
Copy Markdown
Member Author

@wk989898 This PR has conflicts, I have hold it.
Please resolve them or ask others to resolve them, then comment /unhold to remove the hold label.

@ti-chi-bot

ti-chi-bot Bot commented Jul 1, 2026

Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot

ti-chi-bot Bot commented Jul 1, 2026

Copy link
Copy Markdown

@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository.

@coderabbitai

coderabbitai Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fe2cf3e7-6f95-4063-9fd3-6564dc76affe

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the debezium-avro protocol to support Confluent Avro encoding for Debezium, adding the corresponding encoder, decoder, configuration options, and integration tests. However, the changes contain numerous unresolved merge conflict markers across several files, including api/v2/changefeed.go, cmd/kafka-consumer/writer.go, pkg/sink/codec/common/config.go, and pkg/sink/codec/debezium/codec.go. Additionally, the reviewer noted a logic bug in writer.go that could lead to dropped fallback rows and missing variable definitions in codec.go that will cause compilation failures.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread api/v2/changefeed.go Outdated
Comment on lines +1711 to +1767
<<<<<<< HEAD
=======
func verifyTable4MQ(
replicaConfig *config.ReplicaConfig,
scheme string,
topic string,
protocol config.Protocol,
tableInfos []*common.TableInfo,
) error {
if !config.IsMQScheme(scheme) {
return nil
}

isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro
eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), isAvroLike)
if err != nil {
return err
}
if err = eventRouter.VerifyTables(tableInfos); err != nil {
return err
}

selectors, err := columnselector.New(replicaConfig.Sink)
if err != nil {
return err
}
return selectors.VerifyTables(tableInfos, eventRouter)
}

func verifyRouteConflict(
changefeedID common.ChangeFeedID,
eligibleTables []common.TableName,
ineligibleTables []common.TableName,
replicaCfg *config.ReplicaConfig,
) error {
if len(eligibleTables)+len(ineligibleTables) == 0 || replicaCfg == nil ||
replicaCfg.Sink == nil || len(replicaCfg.Sink.DispatchRules) == 0 {
return nil
}
if util.GetOrZero(replicaCfg.ForceReplicate) {
return routing.ValidateNoStaticRouteConflict(
changefeedID,
util.GetOrZero(replicaCfg.CaseSensitive),
replicaCfg.Sink.DispatchRules,
eligibleTables,
ineligibleTables,
)
}
return routing.ValidateNoStaticRouteConflict(
changefeedID,
util.GetOrZero(replicaCfg.CaseSensitive),
replicaCfg.Sink.DispatchRules,
eligibleTables,
)
}

>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers (<<<<<<< HEAD, =======, >>>>>>>) are present in this file. Please resolve the conflict by removing the markers and keeping the correct functions.

func verifyTable4MQ(
	replicaConfig *config.ReplicaConfig,
	scheme string,
	topic string,
	protocol config.Protocol,
	tableInfos []*common.TableInfo,
) error {
	if !config.IsMQScheme(scheme) {
		return nil
	}

	isAvroLike := protocol == config.ProtocolAvro || protocol == config.ProtocolDebeziumAvro
	eventRouter, err := eventrouter.NewEventRouter(replicaConfig.Sink, topic, config.IsPulsarScheme(scheme), isAvroLike)
	if err != nil {
		return err
	}
	if err = eventRouter.VerifyTables(tableInfos); err != nil {
		return err
	}

	selectors, err := columnselector.New(replicaConfig.Sink)
	if err != nil {
		return err
	}
	return selectors.VerifyTables(tableInfos, eventRouter)
}

func verifyRouteConflict(
	changefeedID common.ChangeFeedID,
	eligibleTables []common.TableName,
	ineligibleTables []common.TableName,
	replicaCfg *config.ReplicaConfig,
) error {
	if len(eligibleTables)+len(ineligibleTables) == 0 || replicaCfg == nil ||
		replicaCfg.Sink == nil || len(replicaCfg.Sink.DispatchRules) == 0 {
		return nil
	}
	if util.GetOrZero(replicaCfg.ForceReplicate) {
		return routing.ValidateNoStaticRouteConflict(
			changefeedID,
			util.GetOrZero(replicaCfg.CaseSensitive),
			replicaCfg.Sink.DispatchRules,
			eligibleTables,
			ineligibleTables,
		)
	}
	return routing.ValidateNoStaticRouteConflict(
		changefeedID,
		util.GetOrZero(replicaCfg.CaseSensitive),
		replicaCfg.Sink.DispatchRules,
		eligibleTables,
	)
}

Comment thread cmd/kafka-consumer/writer.go Outdated
Comment on lines +542 to +547
<<<<<<< HEAD
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro:
=======
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple,
config.ProtocolDebezium, config.ProtocolDebeziumAvro:
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side which includes the new protocols.

	case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro, config.ProtocolSimple,
		config.ProtocolDebezium, config.ProtocolDebeziumAvro:

Comment thread cmd/kafka-consumer/writer.go Outdated
Comment on lines +637 to +685
<<<<<<< HEAD
group.Append(dml, false)
log.Info("DML event append to the group",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Uint64("appliedWatermark", group.AppliedWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]))
=======
switch w.protocol {
case config.ProtocolSimple:
// simple protocol set the table id for all row message, it can be known which table the row message belongs to,
// also consider the table partition.
// open protocol set the partition table id if the table is partitioned.
// for normal table, the table id is generated by the fake table id generator by using schema and table name.
// so one event group for one normal table or one table partition, replayed messages can be ignored.
log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
zap.Int32("partition", progress.partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]),
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
case config.ProtocolCanalJSON, config.ProtocolOpen, config.ProtocolAvro,
config.ProtocolDebezium, config.ProtocolDebeziumAvro:
// for partition table, these protocols cannot assign physical table id to each dml message,
// we cannot distinguish whether it's a real fallback event or not, still append it.
if w.partitionTableAccessor.IsPartitionTable(schema, table) {
log.Warn("DML events fallback, but the table is a partition table, still append it",
zap.Int32("partition", group.Partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]), zap.Any("protocol", w.protocol))
group.Append(dml, true)
return
}
log.Warn("DML event fallback row, since less than the group high watermark, ignore it",
zap.Int32("partition", progress.partition), zap.Any("offset", offset),
zap.Uint64("commitTs", commitTs), zap.Uint64("HighWatermark", group.HighWatermark),
zap.Any("partitionWatermark", progress.watermark), zap.Any("watermarkOffset", progress.watermarkOffset),
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
zap.Stringer("eventType", dml.RowTypes[0]),
// zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns),
zap.Any("protocol", w.protocol), zap.Bool("IsPartition", dml.TableInfo.TableName.IsPartition))
default:
log.Panic("unknown protocol", zap.Any("protocol", w.protocol))
}
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Additionally, the switch statement from the cherry-picked commit re-introduces a bug where fallback rows are dropped for non-partition tables. HEAD's logic (which always appends fallback rows) should be kept to prevent data loss.

	group.Append(dml, false)
	log.Info("DML event append to the group",
		zap.Int32("partition", group.Partition), zap.Any("offset", offset),
		zap.Uint64("commitTs", commitTs), zap.Uint64("highWatermark", group.HighWatermark),
		zap.Uint64("appliedWatermark", group.AppliedWatermark),
		zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
		zap.Stringer("eventType", dml.RowTypes[0]))

Comment thread pkg/sink/codec/common/config.go Outdated
Comment on lines +359 to +364
<<<<<<< HEAD
!(c.Protocol == config.ProtocolCanalJSON || c.Protocol == config.ProtocolAvro || c.Protocol == config.ProtocolDebezium) {
=======
(c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro &&
c.Protocol != config.ProtocolDebezium && c.Protocol != config.ProtocolDebeziumAvro) {
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side which includes ProtocolDebeziumAvro.

		(c.Protocol != config.ProtocolCanalJSON && c.Protocol != config.ProtocolAvro &&
			c.Protocol != config.ProtocolDebezium && c.Protocol != config.ProtocolDebeziumAvro) {

Comment thread pkg/sink/codec/debezium/codec.go Outdated
Comment on lines +1024 to +1029
<<<<<<< HEAD
jWriter.WriteStringField("name",
fmt.Sprintf("%s.Key", getSchemaTopicName(c.clusterID, e.TableInfo.GetSchemaName(), e.TableInfo.GetTableName())))
=======
jWriter.WriteStringField("name", c.keySchemaName(schemaName, tableName))
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side. Note that schemaName and tableName must be defined in this scope to avoid compilation errors.

				schemaName := e.TableInfo.GetTargetSchemaName()
				tableName := e.TableInfo.GetTargetTableName()
				jWriter.WriteStringField("name", c.keySchemaName(schemaName, tableName))

Comment thread pkg/sink/codec/debezium/codec.go Outdated
Comment on lines +1067 to +1079
<<<<<<< HEAD
jWriter.WriteStringField("snapshot", "false")
jWriter.WriteStringField("db", e.TableInfo.GetSchemaName())
jWriter.WriteStringField("table", e.TableInfo.GetTableName())
=======
if c.isDebeziumAvro() {
jWriter.WriteNullField("snapshot")
} else {
jWriter.WriteStringField("snapshot", "false")
}
jWriter.WriteStringField("db", schemaName)
jWriter.WriteStringField("table", tableName)
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side.

				if c.isDebeziumAvro() {
					jWriter.WriteNullField("snapshot")
				} else {
					jWriter.WriteStringField("snapshot", "false")
				}
				jWriter.WriteStringField("db", schemaName)
				jWriter.WriteStringField("table", tableName)

Comment thread pkg/sink/codec/debezium/codec.go Outdated
Comment on lines +1142 to +1147
<<<<<<< HEAD
jWriter.WriteStringField("name",
fmt.Sprintf("%s.Envelope", getSchemaTopicName(c.clusterID, e.TableInfo.GetSchemaName(), e.TableInfo.GetTableName())))
=======
jWriter.WriteStringField("name", c.envelopeSchemaName(schemaName, tableName))
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side.

				jWriter.WriteStringField("name", c.envelopeSchemaName(schemaName, tableName))

Comment thread pkg/sink/codec/debezium/codec.go Outdated
Comment on lines +1176 to +1181
<<<<<<< HEAD
jWriter.WriteStringField("name",
fmt.Sprintf("%s.Value", getSchemaTopicName(c.clusterID, e.TableInfo.GetSchemaName(), e.TableInfo.GetTableName())))
=======
jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName))
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side.

						jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName))

Comment thread pkg/sink/codec/debezium/codec.go Outdated
Comment on lines +1190 to +1195
<<<<<<< HEAD
jWriter.WriteStringField("name",
fmt.Sprintf("%s.Value", getSchemaTopicName(c.clusterID, e.TableInfo.GetSchemaName(), e.TableInfo.GetTableName())))
=======
jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName))
>>>>>>> d220ee9b8 (sink: add debezium-avro protocol (#5475))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Unresolved merge conflict markers are present here. Please resolve the conflict by keeping the incoming side.

						jWriter.WriteStringField("name", c.valueSchemaName(schemaName, tableName))

Signed-off-by: wk989898 <nhsmwk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. type/cherry-pick-for-release-nextgen-202603

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants