sink: add debezium-avro protocol#5475
Conversation
|
Skipping CI for Draft Pull Request. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds ChangesDebezium Avro Protocol Support
Sequence Diagram(s)Encoding and decoding flow sequenceDiagram
participant NewEventEncoder
participant NewAvroBatchEncoder
participant SchemaRegistry
participant BatchEncoder
participant NewEventDecoder
participant NewAvroDecoder
participant avroDecoder
participant decoder
NewEventEncoder->>NewAvroBatchEncoder: build encoder for ProtocolDebeziumAvro
NewAvroBatchEncoder->>SchemaRegistry: create Confluent schema manager
BatchEncoder->>SchemaRegistry: register and encode Avro key/value messages
NewEventDecoder->>NewAvroDecoder: build decoder for ProtocolDebeziumAvro
NewAvroDecoder->>avroDecoder: initialize schema cache and HTTP client
avroDecoder->>SchemaRegistry: GET schema by id
avroDecoder->>decoder: forward Debezium JSON envelope
decoder-->>avroDecoder: streamed events
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces support for Confluent Avro encoding within the Debezium protocol, allowing MQ downstreams to utilize the Confluent Avro Schema Registry. Key changes include the implementation of a Debezium Avro encoder, schema conversion logic, configuration validation, and comprehensive integration tests. Feedback on the changes highlights two critical bugs: a namespace sanitization and union branch name mismatch that could reject valid schemas, and an array optional items schema mismatch causing runtime encoding failures. Additionally, a performance concern was raised regarding double serialization/deserialization round-trips during row event encoding.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func avroFullName(connectName string, fallbackName string) string { | ||
| if connectName != "" { | ||
| return connectName | ||
| } | ||
| if fallbackName != "" { | ||
| return avroFieldName(fallbackName) | ||
| } | ||
| return "ConnectDefault" | ||
| } | ||
|
|
||
| func splitAvroFullName(fullName string) (name string, namespace string) { | ||
| idx := strings.LastIndex(fullName, ".") | ||
| if idx < 0 { | ||
| return avroFieldName(fullName), "" | ||
| } | ||
| return avroFieldName(fullName[idx+1:]), fullName[:idx] | ||
| } | ||
|
|
||
| func avroFieldName(field string) string { | ||
| return common.SanitizeName(field) | ||
| } |
There was a problem hiding this comment.
Critical Bug: Namespace Sanitization and Union Branch Name Mismatch
Currently, splitAvroFullName only sanitizes the name part of the full name, leaving the namespace part unsanitized. If a database name, table name, or cluster ID contains invalid Avro characters (such as hyphens -), the generated Avro schema will be invalid and rejected by the Schema Registry.
Furthermore, avroUnionBranchName uses avroFullName directly, which returns the unsanitized name. This causes a mismatch between the union branch name (unsanitized) and the record name in the Avro schema (partially sanitized), leading to runtime encoding failures (e.g., unknown union member errors).
We can resolve both issues cleanly by introducing a sanitizeAvroFullName helper and ensuring avroFullName always returns a fully sanitized name.
func avroFullName(connectName string, fallbackName string) string {
if connectName != "" {
return sanitizeAvroFullName(connectName)
}
if fallbackName != "" {
return avroFieldName(fallbackName)
}
return "ConnectDefault"
}
func sanitizeAvroFullName(fullName string) string {
idx := strings.LastIndex(fullName, ".")
if idx < 0 {
return avroFieldName(fullName)
}
name := avroFieldName(fullName[idx+1:])
parts := strings.Split(fullName[:idx], ".")
for i, part := range parts {
parts[i] = avroFieldName(part)
}
return strings.Join(parts, ".") + "." + name
}
func splitAvroFullName(fullName string) (name string, namespace string) {
idx := strings.LastIndex(fullName, ".")
if idx < 0 {
return fullName, ""
}
return fullName[idx+1:], fullName[:idx]
}
func avroFieldName(field string) string {
return common.SanitizeName(field)
}| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| arraySchema := map[string]any{ | ||
| "type": "array", | ||
| "items": items, | ||
| } |
There was a problem hiding this comment.
Critical Bug: Array Optional Items Schema Mismatch
In the toAvroSchema case for "array", if schema.Items.Optional is true, the items schema is not wrapped in a union ["null", items]. However, in toNative, the item values are wrapped in goavro.Union if they are optional. This mismatch between the Avro schema and the native value structure will cause runtime encoding failures.
We should wrap items in a union schema in toAvroSchema when schema.Items.Optional is true.
| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | |
| if err != nil { | |
| return nil, err | |
| } | |
| arraySchema := map[string]any{ | |
| "type": "array", | |
| "items": items, | |
| } | |
| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | |
| if err != nil { | |
| return nil, err | |
| } | |
| if schema.Items.Optional { | |
| items = []any{"null", items} | |
| } | |
| arraySchema := map[string]any{ | |
| "type": "array", | |
| "items": items, | |
| } |
| func (d *BatchEncoder) appendAvroRowChangedEvent( | ||
| ctx context.Context, | ||
| topic string, | ||
| e *commonEvent.RowEvent, | ||
| ) error { | ||
| keyBuf := bytes.Buffer{} | ||
| if err := d.codec.EncodeKey(e, &keyBuf); err != nil { | ||
| return errors.Trace(err) | ||
| } | ||
|
|
||
| valueBuf := bytes.Buffer{} | ||
| if err := d.codec.EncodeValue(e, &valueBuf); err != nil { | ||
| return errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
Performance Overhead: Double Serialization/Deserialization Round-Trip
Currently, appendAvroRowChangedEvent serializes the row event to JSON using d.codec.EncodeKey and d.codec.EncodeValue, then immediately deserializes it back into Go structs in d.encodeAvroMessage to convert it to Avro. This double serialization/deserialization round-trip (RowEvent -> JSON -> Go Structs -> Avro) introduces significant CPU and memory overhead.
While this approach allows maximum reuse of the existing dbzCodec, please consider refactoring this in the future to directly encode RowEvent to Avro without the intermediate JSON step to improve performance.
|
/test kafka |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/sink/codec/debezium/codec.go (1)
462-480: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick winWrite unsigned BIGINT defaults as strings in string mode.
Line 465 declares the field schema as
string, but Lines 476-480 still parse and emit the default as a numeric value. ABIGINT UNSIGNED DEFAULT ...column underAvroBigintUnsignedHandlingModeStringcan produce an Avro schema/default type mismatch.Proposed fix
case mysql.TypeLonglong: // BIGINT if c.isDebeziumAvro() && mysql.HasUnsignedFlag(ft.GetFlag()) && c.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { writer.WriteStringField("type", "string") } else { writer.WriteStringField("type", "int64") } writer.WriteBoolField("optional", c.columnOptional(ft)) writer.WriteStringField("field", colName) if col.GetDefaultValue() != nil { v, ok := col.GetDefaultValue().(string) if !ok { return } + if c.isDebeziumAvro() && + mysql.HasUnsignedFlag(ft.GetFlag()) && + c.config.AvroBigintUnsignedHandlingMode == common.BigintUnsignedHandlingModeString { + writer.WriteStringField("default", v) + return + } floatV, err := strconv.ParseFloat(v, 64) if err != nil { return } writer.WriteFloat64Field("default", floatV) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sink/codec/debezium/codec.go` around lines 462 - 480, The unsigned BIGINT default handling in the Debezium schema writer is inconsistent with string mode: in the code path inside the column schema builder, `isDebeziumAvro`, `HasUnsignedFlag`, and `AvroBigintUnsignedHandlingModeString` set the field type to string, but the `col.GetDefaultValue()` branch still parses and writes a numeric default. Update this default serialization logic so that when the same string-mode condition applies, the default is emitted as a string rather than being parsed with `strconv.ParseFloat`; keep the existing numeric behavior only for non-string modes. Ensure the change is applied in the schema-writing flow around the `writer.WriteStringField("type", ...)` and `writer.Write...Field("default", ...)` calls.
🧹 Nitpick comments (1)
pkg/config/sink.go (1)
147-148: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winName
debezium-avroexplicitly in this comment.Line 148 currently reads like plain
protocol=debeziumcan use Schema Registry, butpkg/sink/codec/common/config.gorejects that combination and tells users to switch to"debezium-avro". This comment will send readers toward a config that fails validation.Suggested wording
- // SchemaRegistry is only available when the downstream is MQ using avro protocol - // or debezium protocol with Confluent Avro encoding. + // SchemaRegistry is only available when the downstream is MQ using avro protocol + // or debezium-avro protocol with Confluent Avro encoding.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/config/sink.go` around lines 147 - 148, Update the SchemaRegistry comment in sink config to explicitly mention that it is available for MQ with avro protocol or debezium with the debezium-avro protocol, not plain debezium; keep the wording aligned with the validation in common config so readers are guided to the supported configuration. Refer to the SchemaRegistry field and the protocol handling in sink config to make the supported combination unambiguous.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.golangci.yml:
- Around line 87-91: The repo-wide gosec exclusion is disabling G115 for
production code, which should be narrowed instead of applied globally. Remove
G115 from the top-level gosec excludes in the golangci config and keep
integer-overflow suppression limited to the already excluded test paths or
specific call sites. If a few production conversions are intentional, annotate
those exact spots with nolint rather than weakening the entire codebase. Use the
existing gosec and tests exclusions in the config to keep the scope constrained.
In `@cmd/kafka-consumer/consumer.go`:
- Around line 65-75: The topic lookup in consumer metadata handling is treating
any present entry in resp.Topics as a valid success, which can hide
broker-reported topic-level failures. Update the logic around topicDetail in
consumer.go to inspect topicDetail.Err before marking found=true or using the
partition count, and only proceed when it equals kafka.ErrNoError; otherwise
continue retrying and do not treat the topic as usable.
In `@pkg/config/changefeed.go`:
- Around line 493-496: Normalize the sink protocol using the existing parsing
path before checking whether to clear SchemaRegistry. In changefeed.go, the
logic around util.GetOrZero(info.Config.Sink.Protocol) should compare the parsed
protocol value returned by ParseSinkProtocolFromString (or an equivalent
normalized enum/string) rather than the raw config string, so aliases like
flat-avro are treated as Avro-family and preserved. Update the conditional in
the schema-registry removal block to use the normalized protocol consistently
with ProtocolAvro and ProtocolDebeziumAvro.
In `@pkg/sink/codec/debezium/avro_decoder.go`:
- Around line 69-73: The avroDecoder currently uses http.DefaultClient, which
has no timeout and can cause schema-registry requests to hang indefinitely;
update the HTTP client used in the avroDecoder initialization path to a client
with a bounded timeout. Make the change in the constructor that builds
avroDecoder and apply the same fix anywhere else in this file that creates
Schema Registry HTTP clients so AddKeyValue cannot block Kafka consumption.
- Around line 139-142: The Avro decoder currently ignores the unconsumed bytes
returned by NativeFromBinary, which can let malformed Confluent payloads pass as
valid. In the decode path that calls registeredSchema.codec.NativeFromBinary,
capture the trailing bytes result and reject the message when any bytes remain
after decoding. Keep the existing error wrapping behavior for invalid payloads,
and make sure the check is applied in the same avro decoder flow that handles
the Confluent header.
- Around line 684-698: The missing-field handling in avroMissingFieldValue and
avroFieldAllowsMissing is discarding non-null Avro defaults by returning nil
whenever a default exists. Update the logic in avroMissingFieldValue to preserve
the field’s declared default value from the field map instead of treating all
defaults as null, while keeping the existing null/array fallbacks for schemas
that allow them. Use avroFieldAllowsMissing, avroMissingFieldValue, and the
field["default"] lookup to locate and adjust the behavior.
In `@pkg/sink/codec/debezium/avro.go`:
- Around line 402-423: The numeric coercion in the Debezium Avro conversion
logic is too permissive and can silently truncate invalid payloads. Update the
integer handling in the conversion routine that covers the int32 and int64
schema branches so it validates bounds and rejects non-integral values before
casting. Use the existing error path with ErrDebeziumInvalidMessage in the
conversion helper to return an error for out-of-range or fractional json.Number,
uint64, float64, int64, and similar inputs instead of narrowing them.
- Around line 262-268: Optional array item schemas are being registered without
a null union, so nullable elements can be encoded against the wrong Avro schema.
Update the array handling in toAvroSchema and the corresponding item encoding
path used around goavro.Union to represent optional items as ["null", itemType]
when the item schema is nullable, while keeping non-optional items unchanged.
---
Outside diff comments:
In `@pkg/sink/codec/debezium/codec.go`:
- Around line 462-480: The unsigned BIGINT default handling in the Debezium
schema writer is inconsistent with string mode: in the code path inside the
column schema builder, `isDebeziumAvro`, `HasUnsignedFlag`, and
`AvroBigintUnsignedHandlingModeString` set the field type to string, but the
`col.GetDefaultValue()` branch still parses and writes a numeric default. Update
this default serialization logic so that when the same string-mode condition
applies, the default is emitted as a string rather than being parsed with
`strconv.ParseFloat`; keep the existing numeric behavior only for non-string
modes. Ensure the change is applied in the schema-writing flow around the
`writer.WriteStringField("type", ...)` and `writer.Write...Field("default",
...)` calls.
---
Nitpick comments:
In `@pkg/config/sink.go`:
- Around line 147-148: Update the SchemaRegistry comment in sink config to
explicitly mention that it is available for MQ with avro protocol or debezium
with the debezium-avro protocol, not plain debezium; keep the wording aligned
with the validation in common config so readers are guided to the supported
configuration. Refer to the SchemaRegistry field and the protocol handling in
sink config to make the supported combination unambiguous.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3b570d05-8c53-4ca8-b121-b8d18ffda25c
📒 Files selected for processing (28)
.golangci.ymlapi/v2/changefeed.gocmd/kafka-consumer/consumer.gocmd/kafka-consumer/option.gocmd/kafka-consumer/writer.godownstreamadapter/sink/helper/helper.godownstreamadapter/sink/kafka/helper.gopkg/config/changefeed.gopkg/config/changefeed_test.gopkg/config/sink.gopkg/config/sink_protocol.gopkg/config/sink_protocol_test.gopkg/sink/codec/builder.gopkg/sink/codec/common/config.gopkg/sink/codec/common/config_test.gopkg/sink/codec/debezium/avro.gopkg/sink/codec/debezium/avro_decoder.gopkg/sink/codec/debezium/avro_test.gopkg/sink/codec/debezium/codec.gopkg/sink/codec/debezium/decoder.gopkg/sink/codec/debezium/encoder.gotests/integration_tests/debezium_avro/conf/diff_config.tomltests/integration_tests/debezium_avro/data/ddl.sqltests/integration_tests/debezium_avro/data/post_ddl_workload.sqltests/integration_tests/debezium_avro/data/prepare.sqltests/integration_tests/debezium_avro/data/workload.sqltests/integration_tests/debezium_avro/run.shtests/integration_tests/run_light_it_in_ci.sh
| # G115: integer overflow conversions are too noisy for this codebase. | ||
| gosec: | ||
| excludes: | ||
| - G104 | ||
| - G115 |
There was a problem hiding this comment.
🔒 Security & Privacy | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
cat -n .golangci.yml | sed -n '80,130p'Repository: pingcap/ticdc
Length of output: 1886
Avoid disabling G115 repo-wide.
_test.go files and the tests/ directory are already fully excluded from gosec (Lines 114–129), so this change specifically removes integer overflow checks (G115) from production code. This weakens security posture without justification, as the "noise" is likely confined to test files which are already ignored.
Scope this exclusion to specific paths or use //nolint:gosec annotations at the specific call sites instead of disabling the rule globally.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.golangci.yml around lines 87 - 91, The repo-wide gosec exclusion is
disabling G115 for production code, which should be narrowed instead of applied
globally. Remove G115 from the top-level gosec excludes in the golangci config
and keep integer-overflow suppression limited to the already excluded test paths
or specific call sites. If a few production conversions are intentional,
annotate those exact spots with nolint rather than weakening the entire
codebase. Use the existing gosec and tests exclusions in the config to keep the
scope constrained.
| if topicDetail, ok := resp.Topics[topic]; ok { | ||
| numPartitions := int32(len(topicDetail.Partitions)) | ||
| log.Info("get partition number of topic", | ||
| zap.String("topic", topic), | ||
| zap.Int32("partitionNum", numPartitions)) | ||
| if numPartitions > maxPartitionNum { | ||
| maxPartitionNum = numPartitions | ||
| } | ||
| found = true | ||
| break | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify existing metadata error handling and the Kafka client package in use.
rg -n 'confluent-kafka-go|IBM/sarama' go.mod go.sum
rg -n 'GetMetadata\(|topicDetail\.Error|TopicMetadata' --type go .Repository: pingcap/ticdc
Length of output: 1002
🏁 Script executed:
#!/bin/bash
# Inspect the admin.go file to understand the GetMetadata implementation and return type.
cat -n pkg/sink/kafka/admin.go | head -80Repository: pingcap/ticdc
Length of output: 3079
🏁 Script executed:
#!/bin/bash
# Check the sarama TopicMetadata struct definition in vendor or go mod cache (simulated by checking if vendor exists or just looking at the struct usage pattern in the codebase).
# Since we can't access go mod cache directly in a simple script, we will search for the struct usage in the codebase.
rg -n 'type TopicMetadata struct' --type go .Repository: pingcap/ticdc
Length of output: 151
🏁 Script executed:
#!/bin/bash
# Check the usage of GetMetadata response in consumer.go around line 55 to see the type of 'resp'.
cat -n cmd/kafka-consumer/consumer.go | sed -n '50,80p'Repository: pingcap/ticdc
Length of output: 1371
🏁 Script executed:
#!/bin/bash
# Check the import in consumer.go and search for TopicMetadata struct details in the codebase or vendor logic.
rg -A 5 'type.*TopicMetadata' --type go .
rg -A 10 'type.*MetadataResponse' --type go .Repository: pingcap/ticdc
Length of output: 151
🏁 Script executed:
#!/bin/bash
# Check if there is a local vendor or if we can find the sarama package definition via grep in the mod cache simulation or just assume standard sarama if not found locally.
# Let's try to find the definition of the error field in the TopicMetadata struct by searching for its usage with .Err or .Error.
rg -n 'topicDetail\.Err|TopicMetadata.*Err' --type go .Repository: pingcap/ticdc
Length of output: 151
🏁 Script executed:
#!/bin/bash
# Check imports and admin variable declaration in consumer.go
cat -n cmd/kafka-consumer/consumer.go | head -40Repository: pingcap/ticdc
Length of output: 1632
🏁 Script executed:
#!/bin/bash
# Find where GetMetadata is defined or used to determine the return type.
rg -n 'func.*GetMetadata' --type go .
rg -n 'MetadataResponse' --type go .Repository: pingcap/ticdc
Length of output: 151
🏁 Script executed:
#!/bin/bash
# Since local src code for sarama/confuent structs might not be present, search for any existing usage of .Err on metadata in the codebase to confirm the pattern.
rg -n '\.Err' --type go cmd/kafka-consumer/Repository: pingcap/ticdc
Length of output: 3089
🏁 Script executed:
#!/bin/bash
# Check the go.mod to confirm the exact version and search for any existing error handling patterns in the consumer file.
grep -A 5 'confluent-kafka-go' go.mod
rg -n 'kafka.Error' cmd/kafka-consumer/consumer.goRepository: pingcap/ticdc
Length of output: 442
Check per-topic metadata errors before treating the topic as found.
resp.Topics[topic] can exist even when the broker reports a topic-level error (e.g., authorization denied, deleted topic). The current code treats the presence of the key as success, potentially masking errors where topicDetail.Err is set. This can lead to successful startup with an unusable topic.
The code uses confluent-kafka-go, where the error field is topicDetail.Err (not Error).
💡 Suggested fix
Check topicDetail.Err and retry if it is not kafka.ErrNoError.
if topicDetail, ok := resp.Topics[topic]; ok {
+ if topicDetail.Err != kafka.ErrNoError {
+ log.Info("retry get partition number due to topic error",
+ zap.String("topic", topic),
+ zap.Error(topicDetail.Err))
+ time.Sleep(1 * time.Second)
+ continue
+ }
numPartitions := int32(len(topicDetail.Partitions))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if topicDetail, ok := resp.Topics[topic]; ok { | |
| numPartitions := int32(len(topicDetail.Partitions)) | |
| log.Info("get partition number of topic", | |
| zap.String("topic", topic), | |
| zap.Int32("partitionNum", numPartitions)) | |
| if numPartitions > maxPartitionNum { | |
| maxPartitionNum = numPartitions | |
| } | |
| found = true | |
| break | |
| } | |
| if topicDetail, ok := resp.Topics[topic]; ok { | |
| if topicDetail.Err != kafka.ErrNoError { | |
| log.Info("retry get partition number due to topic error", | |
| zap.String("topic", topic), | |
| zap.Error(topicDetail.Err)) | |
| time.Sleep(1 * time.Second) | |
| continue | |
| } | |
| numPartitions := int32(len(topicDetail.Partitions)) | |
| log.Info("get partition number of topic", | |
| zap.String("topic", topic), | |
| zap.Int32("partitionNum", numPartitions)) | |
| if numPartitions > maxPartitionNum { | |
| maxPartitionNum = numPartitions | |
| } | |
| found = true | |
| break | |
| } |
🧰 Tools
🪛 ast-grep (0.44.0)
[warning] 65-65: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(len(topicDetail.Partitions))
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@cmd/kafka-consumer/consumer.go` around lines 65 - 75, The topic lookup in
consumer metadata handling is treating any present entry in resp.Topics as a
valid success, which can hide broker-reported topic-level failures. Update the
logic around topicDetail in consumer.go to inspect topicDetail.Err before
marking found=true or using the partition count, and only proceed when it equals
kafka.ErrNoError; otherwise continue retrying and do not treat the topic as
usable.
| // remove schema registry for MQ downstream with | ||
| // protocol other than avro | ||
| if util.GetOrZero(info.Config.Sink.Protocol) != ProtocolAvro.String() { | ||
| // protocol other than avro or debezium-avro | ||
| protocol := util.GetOrZero(info.Config.Sink.Protocol) | ||
| if protocol != ProtocolAvro.String() && protocol != ProtocolDebeziumAvro.String() { |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Normalize the protocol before deciding whether to drop SchemaRegistry.
Line 496 compares the raw sink string, so protocol=flat-avro still gets its schema registry cleared here even though ParseSinkProtocolFromString maps "flat-avro" to ProtocolAvro. That mutates a valid Avro-family config into an invalid one on save/reload.
Suggested fix
- protocol := util.GetOrZero(info.Config.Sink.Protocol)
- if protocol != ProtocolAvro.String() && protocol != ProtocolDebeziumAvro.String() {
+ protocol, err := ParseSinkProtocolFromString(util.GetOrZero(info.Config.Sink.Protocol))
+ if err != nil || (protocol != ProtocolAvro && protocol != ProtocolDebeziumAvro) {
info.Config.Sink.SchemaRegistry = nil
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // remove schema registry for MQ downstream with | |
| // protocol other than avro | |
| if util.GetOrZero(info.Config.Sink.Protocol) != ProtocolAvro.String() { | |
| // protocol other than avro or debezium-avro | |
| protocol := util.GetOrZero(info.Config.Sink.Protocol) | |
| if protocol != ProtocolAvro.String() && protocol != ProtocolDebeziumAvro.String() { | |
| // remove schema registry for MQ downstream with | |
| // protocol other than avro or debezium-avro | |
| protocol, err := ParseSinkProtocolFromString(util.GetOrZero(info.Config.Sink.Protocol)) | |
| if err != nil || (protocol != ProtocolAvro && protocol != ProtocolDebeziumAvro) { |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/config/changefeed.go` around lines 493 - 496, Normalize the sink protocol
using the existing parsing path before checking whether to clear SchemaRegistry.
In changefeed.go, the logic around util.GetOrZero(info.Config.Sink.Protocol)
should compare the parsed protocol value returned by ParseSinkProtocolFromString
(or an equivalent normalized enum/string) rather than the raw config string, so
aliases like flat-avro are treated as Avro-family and preserved. Update the
conditional in the schema-registry removal block to use the normalized protocol
consistently with ProtocolAvro and ProtocolDebeziumAvro.
| return &avroDecoder{ | ||
| ctx: ctx, | ||
| registryURL: registryURL, | ||
| httpClient: http.DefaultClient, | ||
| inner: NewDecoder(config, idx, db).(*decoder), |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | ⚡ Quick win
Add a timeout to schema-registry HTTP calls.
http.DefaultClient has no timeout, so a stalled Schema Registry connection can block AddKeyValue indefinitely while holding up Kafka consumption.
Proposed fix
import (
"context"
"database/sql"
"encoding/binary"
"encoding/json"
"io"
"math/big"
"net/http"
"strconv"
"strings"
"sync"
+ "time"
"github.com/linkedin/goavro/v2"
"github.com/pingcap/log"
@@
const confluentAvroHeaderLen = 5
+const schemaRegistryRequestTimeout = 10 * time.Second
@@
return &avroDecoder{
ctx: ctx,
registryURL: registryURL,
- httpClient: http.DefaultClient,
+ httpClient: &http.Client{Timeout: schemaRegistryRequestTimeout},
inner: NewDecoder(config, idx, db).(*decoder),
schemas: make(map[int]*registeredDebeziumAvroSchema),
}, nilAlso applies to: 183-185
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/codec/debezium/avro_decoder.go` around lines 69 - 73, The
avroDecoder currently uses http.DefaultClient, which has no timeout and can
cause schema-registry requests to hang indefinitely; update the HTTP client used
in the avroDecoder initialization path to a client with a bounded timeout. Make
the change in the constructor that builds avroDecoder and apply the same fix
anywhere else in this file that creates Schema Registry HTTP clients so
AddKeyValue cannot block Kafka consumption.
| native, _, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:]) | ||
| if err != nil { | ||
| return nil, nil, errors.WrapError(errors.ErrAvroInvalidMessage, err) | ||
| } |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟡 Minor | ⚡ Quick win
Reject Avro payloads with trailing bytes.
NativeFromBinary returns unconsumed bytes; ignoring them lets malformed Confluent messages decode as valid events.
Proposed fix
- native, _, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:])
+ native, remaining, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:])
if err != nil {
return nil, nil, errors.WrapError(errors.ErrAvroInvalidMessage, err)
}
+ if len(remaining) != 0 {
+ return nil, nil, errors.ErrAvroInvalidMessage.GenWithStackByArgs(
+ "avro payload has trailing bytes")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| native, _, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:]) | |
| if err != nil { | |
| return nil, nil, errors.WrapError(errors.ErrAvroInvalidMessage, err) | |
| } | |
| native, remaining, err := registeredSchema.codec.NativeFromBinary(data[confluentAvroHeaderLen:]) | |
| if err != nil { | |
| return nil, nil, errors.WrapError(errors.ErrAvroInvalidMessage, err) | |
| } | |
| if len(remaining) != 0 { | |
| return nil, nil, errors.ErrAvroInvalidMessage.GenWithStackByArgs( | |
| "avro payload has trailing bytes") | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/codec/debezium/avro_decoder.go` around lines 139 - 142, The Avro
decoder currently ignores the unconsumed bytes returned by NativeFromBinary,
which can let malformed Confluent payloads pass as valid. In the decode path
that calls registeredSchema.codec.NativeFromBinary, capture the trailing bytes
result and reject the message when any bytes remain after decoding. Keep the
existing error wrapping behavior for invalid payloads, and make sure the check
is applied in the same avro decoder flow that handles the Confluent header.
| func avroFieldAllowsMissing(field map[string]any) bool { | ||
| if _, hasDefault := field["default"]; hasDefault { | ||
| return true | ||
| } | ||
| return avroSchemaAllowsNull(field["type"]) | ||
| } | ||
|
|
||
| func avroMissingFieldValue(field map[string]any) (any, bool) { | ||
| if avroFieldAllowsMissing(field) { | ||
| return nil, true | ||
| } | ||
| if avroSchemaIsArray(field["type"]) { | ||
| return []any{}, true | ||
| } | ||
| return nil, false |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Preserve Avro field defaults instead of decoding them as null.
For a missing field with a non-null default, avroMissingFieldValue currently returns nil, which silently changes schema-evolved payload values.
Proposed fix
-func avroFieldAllowsMissing(field map[string]any) bool {
- if _, hasDefault := field["default"]; hasDefault {
- return true
- }
- return avroSchemaAllowsNull(field["type"])
-}
-
func avroMissingFieldValue(field map[string]any) (any, bool) {
- if avroFieldAllowsMissing(field) {
+ if defaultValue, hasDefault := field["default"]; hasDefault {
+ return defaultValue, true
+ }
+ if avroSchemaAllowsNull(field["type"]) {
return nil, true
}
if avroSchemaIsArray(field["type"]) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func avroFieldAllowsMissing(field map[string]any) bool { | |
| if _, hasDefault := field["default"]; hasDefault { | |
| return true | |
| } | |
| return avroSchemaAllowsNull(field["type"]) | |
| } | |
| func avroMissingFieldValue(field map[string]any) (any, bool) { | |
| if avroFieldAllowsMissing(field) { | |
| return nil, true | |
| } | |
| if avroSchemaIsArray(field["type"]) { | |
| return []any{}, true | |
| } | |
| return nil, false | |
| func avroMissingFieldValue(field map[string]any) (any, bool) { | |
| if defaultValue, hasDefault := field["default"]; hasDefault { | |
| return defaultValue, true | |
| } | |
| if avroSchemaAllowsNull(field["type"]) { | |
| return nil, true | |
| } | |
| if avroSchemaIsArray(field["type"]) { | |
| return []any{}, true | |
| } | |
| return nil, false | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/codec/debezium/avro_decoder.go` around lines 684 - 698, The
missing-field handling in avroMissingFieldValue and avroFieldAllowsMissing is
discarding non-null Avro defaults by returning nil whenever a default exists.
Update the logic in avroMissingFieldValue to preserve the field’s declared
default value from the field map instead of treating all defaults as null, while
keeping the existing null/array fallbacks for schemas that allow them. Use
avroFieldAllowsMissing, avroMissingFieldValue, and the field["default"] lookup
to locate and adjust the behavior.
| items, err := c.toAvroSchema(schema.Items, fallbackName+"Item") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| arraySchema := map[string]any{ | ||
| "type": "array", | ||
| "items": items, |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Wrap optional array item schemas in a null union.
Line 359 already encodes optional array items with goavro.Union(...), but Line 268 registers the item schema without ["null", itemType]. Arrays containing nullable items will fail Avro encoding or be encoded against the wrong schema.
🐛 Proposed fix
items, err := c.toAvroSchema(schema.Items, fallbackName+"Item")
if err != nil {
return nil, err
}
+ if schema.Items.Optional {
+ items = []any{"null", items}
+ }
arraySchema := map[string]any{
"type": "array",
"items": items,
}Also applies to: 359-364
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/codec/debezium/avro.go` around lines 262 - 268, Optional array item
schemas are being registered without a null union, so nullable elements can be
encoded against the wrong Avro schema. Update the array handling in toAvroSchema
and the corresponding item encoding path used around goavro.Union to represent
optional items as ["null", itemType] when the item schema is nullable, while
keeping non-optional items unchanged.
| case "int8", "int16", "int32": | ||
| switch v := value.(type) { | ||
| case json.Number: | ||
| i, err := v.Int64() | ||
| if err == nil { | ||
| return int32(i), nil | ||
| } | ||
| f, err := v.Float64() | ||
| if err != nil { | ||
| return nil, errors.WrapError(errors.ErrDebeziumInvalidMessage, err) | ||
| } | ||
| return int32(f), nil | ||
| case int: | ||
| return int32(v), nil | ||
| case int32: | ||
| return v, nil | ||
| case int64: | ||
| return int32(v), nil | ||
| case uint64: | ||
| return int32(v), nil | ||
| case float64: | ||
| return int32(v), nil |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Reject out-of-range integer payloads instead of truncating them.
The integer branches silently narrow json.Number, uint64, float64, and int64 values into int32/int64. Malformed payloads like 2147483648 for an int32 schema or non-integral values like 1.5 can be encoded as corrupted data instead of returning ErrDebeziumInvalidMessage.
🐛 Proposed direction
+// Add checked helpers and use them before int32/int64 conversions.
+// For integer schemas, reject non-integral float/json.Number values and values
+// outside math.MinInt32..math.MaxInt32 or math.MinInt64..math.MaxInt64.Also applies to: 427-449
🧰 Tools
🪛 ast-grep (0.44.0)
[warning] 406-406: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(i)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 412-412: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(f)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 414-414: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(v)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 418-418: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(v)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 420-420: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(v)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
[warning] 422-422: Narrowing a non-constant integer to a smaller fixed-width type (int8/int16/int32, uint8/uint16/uint32) can silently overflow or wrap, yielding negative or truncated values that are dangerous in size, length, or index logic. Validate the source value is within the target type's range before converting (e.g. bounds-check, or use a checked helper), and avoid narrowing untrusted or len()/parsed values.
Context: int32(v)
Note: [CWE-190] Integer Overflow or Wraparound.
(integer-overflow-narrowing-conversion-go)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/sink/codec/debezium/avro.go` around lines 402 - 423, The numeric coercion
in the Debezium Avro conversion logic is too permissive and can silently
truncate invalid payloads. Update the integer handling in the conversion routine
that covers the int32 and int64 schema branches so it validates bounds and
rejects non-integral values before casting. Use the existing error path with
ErrDebeziumInvalidMessage in the conversion helper to return an error for
out-of-range or fractional json.Number, uint64, float64, int64, and similar
inputs instead of narrowing them.
Source: Linters/SAST tools
|
/test kafka |
|
/test kafka |
|
/test all |
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cmd/kafka-consumer/writer_test.go`:
- Around line 328-330: The Debezium-Avro subtest is constructing the router with
the wrong Avro-like flag, so it does not match the `newWriter` path used in
production. Update the `eventrouter.NewEventRouter` call in the `writer_test.go`
subtest to pass `isAvroLike=true` for the Debezium-Avro case, matching the
`newWriter` logic for `config.ProtocolDebeziumAvro` and ensuring the test
exercises the same router configuration.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d16129a0-501b-4c0a-9e5a-acb6348e5e05
📒 Files selected for processing (5)
cmd/kafka-consumer/writer.gocmd/kafka-consumer/writer_test.gopkg/sink/codec/debezium/avro_test.gopkg/sink/codec/debezium/encoder.gotests/integration_tests/debezium_avro/run.sh
💤 Files with no reviewable changes (1)
- tests/integration_tests/debezium_avro/run.sh
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/sink/codec/debezium/avro_test.go
| replicaCfg := config.GetDefaultReplicaConfig() | ||
| eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, false) | ||
| require.NoError(t, err) |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Use the Avro-like router flag in the Debezium-Avro subtest.
newWriter now builds the router with isAvroLike=true for config.ProtocolDebeziumAvro, but this test still hardcodes isAvro=false. That means the new subtest is not exercising the same router configuration as production.
Suggested fix
- eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, false)
+ isAvroLike := protocol == config.ProtocolDebeziumAvro
+ eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, isAvroLike)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| replicaCfg := config.GetDefaultReplicaConfig() | |
| eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, false) | |
| require.NoError(t, err) | |
| replicaCfg := config.GetDefaultReplicaConfig() | |
| isAvroLike := protocol == config.ProtocolDebeziumAvro | |
| eventRouter, err := eventrouter.NewEventRouter(replicaCfg.Sink, "test-topic", false, isAvroLike) | |
| require.NoError(t, err) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@cmd/kafka-consumer/writer_test.go` around lines 328 - 330, The Debezium-Avro
subtest is constructing the router with the wrong Avro-like flag, so it does not
match the `newWriter` path used in production. Update the
`eventrouter.NewEventRouter` call in the `writer_test.go` subtest to pass
`isAvroLike=true` for the Debezium-Avro case, matching the `newWriter` logic for
`config.ProtocolDebeziumAvro` and ensuring the test exercises the same router
configuration.
|
/test all |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/test pull-error-log-review |
Signed-off-by: wk989898 <nhsmwk@gmail.com>
|
/test next-gen |
|
/cherry-pick release-nextgen-202603 |
|
@wk989898: new pull request created to branch DetailsIn response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
What problem does this PR solve?
Issue Number: close #5476
What is changed and how it works?
This PR introduces a new Kafka sink protocol:
protocol=debezium-avro.The new protocol is intended for users who need both:
Before this change, TiCDC had two separate protocols that each solved only
one side of the requirement:
protocol=debeziumemits Debezium-style key/value messages, but themessages are JSON and the schema is embedded in each message.
protocol=avroemits Confluent Avro messages and registers schemas inSchema Registry, but the payload is TiCDC's flat row format rather than a
Debezium Envelope.
protocol=debezium-avrocombines these two capabilities. It reuses theDebezium codec's event model and field extraction logic, and reuses the
existing Avro Schema Registry registration and Confluent wire-format flow.
Users can enable it with a sink URI like:
Protocol and configuration
This PR adds ProtocolDebeziumAvro to the sink protocol enum and protocol
parser, so protocol=debezium-avro can be used in Kafka sink URIs and
changefeed configs.
The protocol is treated as an Avro-like protocol where needed, including:
For the MVP, Debezium Avro supports Confluent Schema Registry through
schema-registry and AWS Glue Schema Registry.
The protocol also keeps the existing Avro type-mode options where
applicable:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Summary
debezium-avrosink protocol support with Confluent Schema Registry–based encoding/decoding.debezium-avro.debezium-avro.debezium-avro, including protocol parsing and schema/encoding behaviors.