-
Notifications
You must be signed in to change notification settings - Fork 165
feat(pyamber): support Python UDF UI parameters #5603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d7fb00c
413b28d
486da33
a00eb4e
e528fb1
65c23da
e6014f5
50aa1aa
609026b
3e72072
d54fb6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,12 +16,170 @@ | |
| # under the License. | ||
|
|
||
| from abc import abstractmethod | ||
| from typing import Iterator, Optional, Union | ||
| from dataclasses import dataclass | ||
| import functools | ||
| from typing import Any, Dict, Iterator, Optional, Set, Union | ||
|
|
||
| from pyamber import * | ||
| from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING | ||
| from loguru import logger | ||
|
|
||
|
|
||
| class UDFOperatorV2(TupleOperatorV2): | ||
| @dataclass(frozen=True) | ||
| class _UiParameterValue: | ||
| name: str | ||
| type: AttributeType | ||
| value: Any | ||
|
|
||
|
|
||
| class _UiParameterSupport: | ||
| _ui_parameter_injected_values: Dict[str, Any] | ||
| _ui_parameter_name_types: Dict[str, AttributeType] | ||
| _ui_parameter_used_names: Set[str] | ||
| _unsupported_ui_parameter_types = { | ||
| AttributeType.BINARY, | ||
| AttributeType.LARGE_BINARY, | ||
| } | ||
|
|
||
| # Reserved hook name. Backend injector will generate this in the user's class. | ||
| def _texera_injected_ui_parameters(self) -> Dict[str, Any]: | ||
| return {} | ||
|
|
||
| def _ensure_ui_parameter_state(self) -> None: | ||
| if "_ui_parameter_injected_values" not in self.__dict__: | ||
| self._ui_parameter_injected_values = {} | ||
| if "_ui_parameter_name_types" not in self.__dict__: | ||
| self._ui_parameter_name_types = {} | ||
| if "_ui_parameter_used_names" not in self.__dict__: | ||
| self._ui_parameter_used_names = set() | ||
|
|
||
| def _texera_apply_injected_ui_parameters(self) -> None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If a user's
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. The wrapped open() now applies injected UI parameters only once for the outermost open() call, with tests for super().open() and duplicate tracking. |
||
| self._ensure_ui_parameter_state() | ||
| values = self._texera_injected_ui_parameters() | ||
| self._ui_parameter_injected_values = dict(values or {}) | ||
| self._ui_parameter_name_types = {} | ||
| self._ui_parameter_used_names = set() | ||
|
|
||
| def _warn_unused_injected_ui_parameters(self) -> None: | ||
| unused_names = sorted( | ||
| set(self._ui_parameter_injected_values) - self._ui_parameter_used_names | ||
| ) | ||
| if unused_names: | ||
| logger.warning( | ||
| "Injected UI parameter value(s) were not used: {}.", | ||
| ", ".join(unused_names), | ||
| ) | ||
|
|
||
| def __init_subclass__(cls, **kwargs): | ||
| super().__init_subclass__(**kwargs) | ||
|
|
||
| original_open = getattr(cls, "open", None) | ||
| if original_open is None: | ||
| return | ||
|
|
||
| if getattr(original_open, "__texera_ui_params_wrapped__", False): | ||
| return | ||
|
|
||
| @functools.wraps(original_open) | ||
| def wrapped_open(self, *args, **kwargs): | ||
| """Apply injected UI parameters once before the outermost open().""" | ||
| if getattr(self, "_ui_parameter_open_in_progress", False): | ||
| return original_open(self, *args, **kwargs) | ||
|
|
||
| self._ui_parameter_open_in_progress = True | ||
| try: | ||
| self._texera_apply_injected_ui_parameters() | ||
| result = original_open(self, *args, **kwargs) | ||
| self._warn_unused_injected_ui_parameters() | ||
| return result | ||
| finally: | ||
| self._ui_parameter_open_in_progress = False | ||
|
|
||
| setattr(wrapped_open, "__texera_ui_params_wrapped__", True) | ||
| cls.open = wrapped_open | ||
|
|
||
| def UiParameter( | ||
|
Xiao-zhen-Liu marked this conversation as resolved.
|
||
| self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any | ||
| ) -> _UiParameterValue: | ||
| """ | ||
| Return the current UI parameter value parsed as attr_type. | ||
|
|
||
| Re-reading the same name with the same type is idempotent. Reusing a | ||
| name with a different type is rejected because the parsed value would be | ||
| ambiguous. | ||
| """ | ||
| if "type" in kwargs: | ||
| if attr_type is not None: | ||
| raise TypeError("UiParameter.type was provided multiple times.") | ||
| attr_type = kwargs.pop("type") | ||
|
|
||
| if kwargs: | ||
| unexpected_arguments = ", ".join(sorted(kwargs)) | ||
| raise TypeError( | ||
| f"UiParameter got unexpected keyword argument(s): " | ||
| f"{unexpected_arguments}." | ||
| ) | ||
|
|
||
| if attr_type is None: | ||
| raise TypeError("UiParameter.type is required.") | ||
|
|
||
| if not isinstance(attr_type, AttributeType): | ||
| raise TypeError( | ||
| f"UiParameter.type must be an AttributeType, got {attr_type!r}." | ||
| ) | ||
|
|
||
| self._ensure_ui_parameter_state() | ||
| existing_type = self._ui_parameter_name_types.get(name) | ||
| if existing_type is not None and existing_type != attr_type: | ||
|
Xiao-zhen-Liu marked this conversation as resolved.
|
||
| raise ValueError( | ||
| f"Duplicate UiParameter name '{name}' with conflicting types: " | ||
| f"{existing_type.name} vs {attr_type.name}." | ||
| ) | ||
|
|
||
| self._ui_parameter_name_types[name] = attr_type | ||
| if name in self._ui_parameter_injected_values: | ||
| self._ui_parameter_used_names.add(name) | ||
| raw_value = self._ui_parameter_injected_values[name] | ||
| else: | ||
| logger.warning( | ||
| "No injected UI parameter value found for name '{}'.", | ||
| name, | ||
| ) | ||
| raw_value = None | ||
|
|
||
| return _UiParameterValue( | ||
| name=name, | ||
| type=attr_type, | ||
| value=self._parse(raw_value, attr_type), | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _parse(value: Any, attr_type: AttributeType) -> Any: | ||
| if attr_type in _UiParameterSupport._unsupported_ui_parameter_types: | ||
| raise ValueError( | ||
| f"UiParameter does not support {attr_type.name} values. " | ||
| "Use a supported type instead." | ||
| ) | ||
|
|
||
| parser = FROM_STRING_PARSER_MAPPING.get(attr_type) | ||
| if parser is None: | ||
| raise TypeError( | ||
| f"UiParameter.type {attr_type!r} is not supported for parsing." | ||
| ) | ||
|
|
||
| if value is None: | ||
| return None | ||
|
|
||
| try: | ||
| return parser(value) | ||
| except Exception as e: | ||
| raise ValueError( | ||
| f"Failed to parse UiParameter value {value!r} as {attr_type.name}. " | ||
| f"Please provide a valid {attr_type.name.lower()} value." | ||
| ) from e | ||
|
|
||
|
|
||
| class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2): | ||
| """ | ||
| Base class for tuple-oriented user-defined operators. A concrete implementation must | ||
| be provided upon using. | ||
|
|
@@ -65,7 +223,7 @@ def close(self) -> None: | |
| pass | ||
|
|
||
|
|
||
| class UDFSourceOperator(SourceOperator): | ||
| class UDFSourceOperator(_UiParameterSupport, SourceOperator): | ||
| def open(self) -> None: | ||
| """ | ||
| Open a context of the operator. Usually can be used for loading/initiating some | ||
|
|
@@ -90,7 +248,7 @@ def close(self) -> None: | |
| pass | ||
|
|
||
|
|
||
| class UDFTableOperator(TableOperator): | ||
| class UDFTableOperator(_UiParameterSupport, TableOperator): | ||
| """ | ||
| Base class for table-oriented user-defined operators. A concrete implementation must | ||
| be provided upon using. | ||
|
|
@@ -123,7 +281,7 @@ def close(self) -> None: | |
| pass | ||
|
|
||
|
|
||
| class UDFBatchOperator(BatchOperator): | ||
| class UDFBatchOperator(_UiParameterSupport, BatchOperator): | ||
| """ | ||
| Base class for batch-oriented user-defined operators. A concrete implementation must | ||
| be provided upon using. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works correctly. But there are now two places mapping Java names to a type (these aliases and
RAW_TYPE_MAPPINGbelow) — a one-line comment on why both exist would help.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I added a short comment explaining that enum attribute access needs Java-style names, while RAW_TYPE_MAPPING handles raw schema strings.