[FLINK-35180][python] Fix embedded (thread-mode) type converters#27999
Draft
polsinello wants to merge 1 commit intoapache:release-1.20from
Draft
[FLINK-35180][python] Fix embedded (thread-mode) type converters#27999polsinello wants to merge 1 commit intoapache:release-1.20from
polsinello wants to merge 1 commit intoapache:release-1.20from
Conversation
Pemja auto-converts Python datetime/date/time to java.sql.Timestamp/Date/
Time, but Flink's ExternalSerializer and DataFormatConverters.RowConverter
expect the modern java.time.* bridge classes (LocalDateTime, LocalDate,
LocalTime, Instant). In thread mode this mismatch causes ClassCastException
at serialization boundaries for any UDF that returns or receives temporal
types. Process mode is unaffected because its Beam-based
runnerOutputTypeSerializer resolves to the correct java.time.* serializers
via LegacyTypeInfoDataTypeConverter.
This patch fixes several related type-conversion bugs in the embedded
Python execution path so all Flink-supported types work correctly in
thread mode.
- Add bridge-aware DataConverter implementations for all temporal types
(TIMESTAMP, DATE, TIME, TIMESTAMP_LTZ) on both the Table API and
DataStream paths, replacing IdentityDataConverter where it silently
passed java.sql.* objects through without conversion.
- Fix Row/Tuple buffer reuse in RowDataConverter/TupleDataConverter that
caused silent data corruption in ARRAY<ROW<...>> results.
- Widen numeric DataConverter generics from Long/Double to Number to
avoid bridge-method ClassCastException when pemja returns Integer/
Float for small values. Document the rationale once on
ByteDataConverter and reference it from the other three numeric
narrowing converters.
- Cache the original Java ExternalTypeInfo in the Python
ExternalTypeInfo wrapper to prevent lossy DECIMAL precision round-
trips through legacy TypeInfo conversion; add __getstate__/
__setstate__ so ExternalTypeInfo survives pickling.
- InstantConverter.to_external computes via integer microseconds with
divmod (floor division) to handle pre-1970 (negative-epoch)
timestamps correctly. int(delta.total_seconds()) truncates toward
zero and produces wrong (epoch_s, nano) pairs for negative values.
- Document the intentional tuple/list asymmetry in ArrayDataConverter:
to_internal returns list (matches process-mode's Beam path),
to_external returns tuple (pemja coerces tuple to Object[] but list
to ArrayList, which would break the downstream Java
ArrayDataConverter).
- Use DateLocalDateDataConverter.INSTANCE at visit(DateType) instead of
allocating a fresh instance per call.
- Remove now-unused TimeDataConverter class (replaced by
TimeLocalTimeDataConverter).
- Drop dead precision fields in TimestampLocalDateTimeDataConverter
and TimestampInstantDataConverter; the inner DataFormatConverters
LocalDateTimeConverter and InstantConverter already hold the
authoritative copies.
Collaborator
dianfu
reviewed
Apr 25, 2026
|
|
||
| @Override | ||
| Object toExternalImpl(java.time.LocalTime value) { | ||
| return value; |
Contributor
There was a problem hiding this comment.
Should we convert it to java.sql.Time?
Contributor
|
@polsinello Thanks for the PR! Just left a minor comment. Please convert this to a formal PR (removed the Draft tag) when you think it's ready for merge. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
In embedded (thread) mode, Pemja's JNI bridge auto-converts Python
datetime/date/timeobjects tojava.sql.Timestamp/Date/Time, but Flink'sExternalSerializerandDataFormatConverters.RowConverterexpect the modernjava.time.*bridge classes (LocalDateTime,LocalDate,LocalTime,Instant). This mismatch causesClassCastExceptionat serialization boundaries for any UDF that returns or receives temporal types. Process mode is unaffected because its Beam-basedrunnerOutputTypeSerializerresolves to the correctjava.time.*serializers viaLegacyTypeInfoDataTypeConverter.This patch fixes several related type-conversion bugs in the embedded Python execution path so that all Flink-supported types work correctly in thread mode.
Brief change log
DataConverterimplementations for all temporal types (TIMESTAMP,DATE,TIME,TIMESTAMP_LTZ) on both the Table API (PythonTypeUtilsinflink-table-runtime) and DataStream (PythonTypeUtilsinflink-streaming-java) pathsIdentityDataConverterfor temporal types where it silently passedjava.sql.*objects through without conversionROW/TUPLEbuffer reuse inArrayDataConverterthat caused silent data corruption inARRAY<ROW<...>>resultsDataConvertergenerics fromLong/DoubletoNumberto avoid bridge-methodClassCastExceptionwhen Pemja returnsInteger/Floatfor small valuesExternalTypeInfoin the PythonExternalTypeInfowrapper to prevent lossyDECIMALprecision round-trips through legacyTypeInfoconversionVerifying this change
This change is already covered by existing tests — the existing PyFlink Table API and DataStream test suites exercise all affected type paths.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): nojava.time.*factory call per value).Documentation