[AURON #2325] Fix Auron Kafka source emitting no records without a watermark#2326
[AURON #2325] Fix Auron Kafka source emitting no records without a watermark#2326weiqingy wants to merge 1 commit into
Conversation
…t a watermark AuronKafkaSourceFunction.open() set isRunning=true only inside the watermarkStrategy != null branch, but run() collects records only while isRunning is true on both the watermark and no-watermark paths. A source configured without an event-time watermark therefore emitted nothing. Set isRunning=true unconditionally once open() completes so no-watermark sources emit records (and snapshot offsets and discover partitions) like watermarked ones, while a partial-init failure still leaves the source not-running. Add a no-watermark mock table and an end-to-end test asserting the source emits its records.
|
Hi @Tartarus0zm, could you please help review this PR when you get a chance? Thanks! |
|
The 2 failing checks are unrelated to this PR — it only touches Flink extension files (
This looks like a known master breakage on the Spark 4.0/4.1 lanes rather than anything in this PR. Could you please help re-trigger once master is green there? Thanks! |
|
The 2 failing checks are unrelated to this PR. This PR only touches
These cast test wrappers were added by [AURON #2174] and reference upstream Spark cast base suites that don't resolve under Spark 4.0/4.1. The same two jobs fail on Everything else (Flink 1.18, all Spark 3.x lanes, Rust, Style, License) is green. |
There was a problem hiding this comment.
Pull request overview
Fixes a regression in the Auron Flink Kafka source where tables configured without an event-time watermark strategy would never emit records because isRunning was only set in the watermark-initialization branch of open().
Changes:
- Set
AuronKafkaSourceFunction.isRunning = trueunconditionally after successfulopen()initialization so both watermark and no-watermark run paths can emit. - Add a no-watermark Kafka mock table (
T5) to the shared Kafka test base. - Add an end-to-end regression IT case that validates a no-watermark source emits all expected rows.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java |
Ensures the source transitions to “running” after initialization regardless of watermark strategy presence. |
auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaSourceTestBase.java |
Adds a no-watermark mock table definition (T5) to exercise the non-watermark execution path. |
auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/kafka/AuronKafkaNoWatermarkITCase.java |
Introduces a regression IT validating record emission for no-watermark Kafka tables. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Which issue does this PR close?
Closes #2325
Rationale for this change
AuronKafkaSourceFunctionset itsisRunningflag totrueonly inside theif (watermarkStrategy != null)branch ofopen(), butrun()collects records only whileisRunningistrueon both the watermark and no-watermark paths. A source configured without an event-time watermark therefore never started emitting and returned an empty result set. The gap was never caught because the existingauron-kafkatest tables all declare a watermark.What changes are included in this PR?
isRunningis now set totrueunconditionally onceopen()completes, so a no-watermark source emits records (and snapshots offsets and discovers partitions) like a watermarked one, while a partial-initialization failure still leaves the source not-running.A no-watermark mock table and an end-to-end test asserting the source emits its records are added.
Are there any user-facing changes?
Yes. An
auron-kafkatable without aWATERMARK FORclause now returns its records instead of an empty result.How was this patch tested?
A new
AuronKafkaNoWatermarkITCaseruns a plainSELECTover a no-watermarkauron-kafkatable and asserts the emitted records; it fails (empty result) without the fix and passes with it. The fullauron-flink-runtimeandauron-flink-plannermodule test suites pass, including the existing watermarked-source integration tests.