Skip to content

[FLINK-39582][postgres] Allow logical messages#4387

Open
eskabetxe wants to merge 1 commit intoapache:masterfrom
eskabetxe:FLINK-39582
Open

[FLINK-39582][postgres] Allow logical messages#4387
eskabetxe wants to merge 1 commit intoapache:masterfrom
eskabetxe:FLINK-39582

Conversation

@eskabetxe
Copy link
Copy Markdown
Member

@eskabetxe eskabetxe commented Apr 29, 2026

What is the purpose of the change

Enable Postgres logical decoding messages produced by pg_logical_emit_message(transactional, prefix, content) to flow through to the user's DebeziumDeserializationSchema. Today these records are silently dropped before reaching deserialization. See [FLINK-39582] for the problem statement and root cause.

Brief change log

Base extension hooks (no domain logic; pure extensibility):

  • DataSourceDialect: new default createStreamFetcher(FetchTask.Context, int) factory returning the standard IncrementalSourceStreamFetcher. Existing dialects inherit unchanged behavior.
  • IncrementalSourceSplitReader.getStreamFetcher(): now obtains the fetcher via dataSourceDialect.createStreamFetcher(...) instead of direct instantiation.
  • IncrementalSourceStreamFetcher.shouldEmit(): visibility private → protected so dialect subclasses can override.
  • IncrementalSourceRecordEmitter.updateStreamSplitState(): visibility private → protected for the same reason.

Postgres connector:

  • New PostgresSourceStreamFetcher extends the base; overrides shouldEmit() to early-return true for op="m" records when the feature flag is enabled, otherwise delegates to super. Also exposes a package-private isLogicalMessage(SourceRecord) helper.
  • PostgresDialect.createStreamFetcher() returns PostgresSourceStreamFetcher with the flag read from PostgresSourceConfig.
  • PostgresSourceRecordEmitter.processElement() adds a branch for logical messages: updates the stream split state and emits to deserialization. Falls through to super otherwise.

Configuration:

  • New option scan.logical-message.enabled (boolean, default false) on PostgresSourceOptions. Default preserves existing behavior.
  • Plumbed through PostgresSourceConfig (field + getter), PostgresSourceConfigFactory (setter), and PostgresSourceBuilder.includeLogicalMessages(boolean) (DataStream API).

Design notes

  • Why a dialect factory hook instead of an isLogicalMessage() guard in the base classes? op="m" is only produced by the Postgres connector in this repo. Putting the check in shared code would leak Postgres-specific semantics into flink-cdc-base. The factory hook is a generic extensibility point; the actual logic stays in the Postgres module.
  • Why not override isDataChangeRecord() in the Postgres FetchTask.Context? It's semantically wrong — a logical message is a data change. It would also affect SourceReaderMetrics and other call sites, hiding logical messages from telemetry.
  • Content is emitted as-is. pg_logical_emit_message accepts bytea — arbitrary binary content. The content field is delivered as raw byte[] (Kafka Connect Schema.BYTES). The base64-looking output you'll see in JSON-converted records is the standard JsonConverter encoding for BYTES; Avro/Protobuf converters carry raw bytes. Interpretation belongs in the user's deserializer (they know what their prefix means), not the connector.
  • Default is false. Existing pipelines see no behavior change. Opt-in feature.

Verifying this change

Added tests under flink-connector-postgres-cdc/src/test/java/.../source/reader/:

  • PostgresSourceStreamFetcherTest — unit tests for isLogicalMessage() covering: op="m" (true); op="c"/"u"/"d"/"r"/"t" (false); null value; Struct without op field; Struct with null op value; non-Struct value.
  • PostgresSourceStreamFetcherITCase — testcontainer-based integration test (PG 14, pgoutput, Debezium 1.9.8):
  1. logicalMessagesAreEmittedWhenEnabled: with flag on, calls pg_logical_emit_message(false, 'cdc-test', 'hello') and polls until an op="m" record appears.
  2. logicalMessagesAreDroppedWhenDisabled: with flag off, calls the same followed by a marker INSERT. Polls until the marker arrives (proving the WAL has advanced past the message), then asserts no op="m" record was seen.

@eskabetxe
Copy link
Copy Markdown
Member Author

@GOODBOY008 can you help review this?

@leonardBang leonardBang requested a review from loserwang1024 May 6, 2026 02:09
@loserwang1024
Copy link
Copy Markdown
Contributor

@eskabetxe , pg_logical_emit_message is more tightly coupled with the user's business logic rather than being a CDC log generated by PostgreSQL based on DML operations. Personally, I am skeptical about whether it should be introduced at all. @leonardBang , WDYT?

If it is introduced, what happens if the messages emitted via pg_logical_emit_message do not align with the table schema?If you insist on introducing it, please add a configuration option to explicitly enable it. Do not alter the existing logic to avoid disrupting currently running jobs that are working as expected.

@eskabetxe
Copy link
Copy Markdown
Member Author

  • scan.logical-message.enabled (boolean, default false)

Hi @loserwang1024
its opt-in with scan.logical-message.enabled (boolean, default false)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants