-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54891] Unified type coercion for Arrow-backed Python UDFs #53666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This commit implements the STRICT policy as a no-op that lets Arrow handle type conversion natively, while PERMISSIVE/WARN policies implement pickle-compatible coercion behavior. Changes: - worker.py: Add conditional logic so STRICT skips coercion entirely - types.py: Update all coerce() methods to return value unchanged for STRICT - test_coercion.py: Update unit tests to verify STRICT no-op behavior - test_arrow_udf_coercion.py: Add integration tests comparing policies The integration tests verify: - PERMISSIVE matches pickle behavior exactly - WARN produces same results as PERMISSIVE - STRICT produces different results (Arrow's aggressive conversion) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
JIRA Issue Information=== Sub-task SPARK-54891 === This comment was automatically generated by GitHub Actions |
zhengruifeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fangchenli thanks for working on this.
There are 3 known different behaviors of python udf:
1, vanilla python udf, based on pickle;
2, arrow-optimized python udf (with legacy pandas conversion), with useArrow=True or spark.sql.execution.pythonUDF.arrow.enabled=True;
3, arrow-optimized python udf (without legacy pandas conversion), with 2 and spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled=False;
it seems this PR is for 2, I personally think it is a good idea if we can eliminate the behavior differences in both 1 vs 2 and 1 vs 3.
| return wrap_arrow_batch_udf_arrow(f, args_offsets, kwargs_offsets, return_type, runner_conf) | ||
|
|
||
|
|
||
| def wrap_arrow_batch_udf_arrow(f, args_offsets, kwargs_offsets, return_type, runner_conf): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code path is for arrow-optimized python udf without legcay pandas conversion, there is another path wrap_arrow_batch_udf_legacy for python udf with legcay pandas conversion
python/pyspark/worker.py
Outdated
| (results, arrow_return_type, return_type). | ||
| """ | ||
| return list(pool.map(lambda row: func(*row), get_args(*args))) | ||
| return list(pool.map(lambda row: coerce_result(func(*row)), get_args(*args))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel such coercion should happen in serializers.py
|
I have some detail suggestions on the code itself but I want to wait until we have a further discussion about this feature (just to avoid wasting our time). The current implementation (or the concept as its own) will introduce a non-trivial overhead for data conversion. For simple and common types like integer, bytes, strings, we introduced multiple python-level function calls to each element, which could result in a perf regression - to default behavior. The implementation is not complete either I think? We need to handle this in all container types I assume? I'd suggest that we discuss whether we want to implement this - if the benefit trumps the overhead in both perf and maintenance, before we start reviewing the code itself. |
Thanks for the feedback. I'll benchmark it to determine the overhead. |
What changes were proposed in this pull request?
This PR implements unified type coercion for Arrow-backed Python UDF. It adds a
CoercionPolicyoption to the config and the coerce() methods to DataType classes to control type conversion behavior when Arrow optimization is enabled.Why are the changes needed?
When Arrow optimization is enabled for Python UDFs, the type coercion behavior differs from that of the pickle-based UDFs. We need to control the coercion behavior for backward compatibility and to help migrations.
Does this PR introduce any user-facing change?
Yes. A new configuration spark.sql.execution.pythonUDF.coercion.policy is added with three options:
With PERMISSIVE (default), users can enable Arrow optimization without behavior changes.
How was this patch tested?
Both unit tests and integration tests were added
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.5