From d7fb00ca6c72132c84b84b9e04eeaa03aec94971 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Tue, 12 May 2026 18:59:14 -0600 Subject: [PATCH 1/6] feat(pyamber): support Python UDF UI parameters --- .../core/models/schema/attribute_type.py | 50 +++++ amber/src/main/python/pytexera/__init__.py | 6 +- .../main/python/pytexera/udf/udf_operator.py | 125 ++++++++++- .../python/pytexera/udf/test_udf_operator.py | 194 ++++++++++++++++++ .../python/PythonUdfUiParameterInjector.scala | 3 +- .../PythonUdfUiParameterInjectorSpec.scala | 3 - 6 files changed, 370 insertions(+), 11 deletions(-) create mode 100644 amber/src/test/python/pytexera/udf/test_udf_operator.py diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 24d0745f41e..9d8ec690546 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -33,8 +33,11 @@ class AttributeType(Enum): STRING = 1 INT = 2 + # Java enum-name aliases accepted by the UI parameter parser. + INTEGER = 2 LONG = 3 BOOL = 4 + BOOLEAN = 4 DOUBLE = 5 TIMESTAMP = 6 BINARY = 7 @@ -78,6 +81,53 @@ class AttributeType(Enum): } +FROM_STRING_PARSER_MAPPING = { + AttributeType.STRING: str, + AttributeType.INT: lambda v: ( + 0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v) + ), + AttributeType.LONG: lambda v: ( + 0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v) + ), + AttributeType.DOUBLE: lambda v: ( + 0.0 if v is None or (isinstance(v, str) and v.strip() == "") else float(v) + ), + AttributeType.BOOL: lambda v: ( + False + if v is None or (isinstance(v, str) and v.strip() == "") + else ( + True + if str(v).strip().lower() == "true" + else ( + False + if str(v).strip().lower() == "false" + else float(str(v).strip()) != 0 + ) + ) + ), + AttributeType.BINARY: lambda v: ( + (_ for _ in ()).throw( + ValueError( + "UiParameter does not support BINARY values. " + "Use a supported type instead." + ) + ) + ), + AttributeType.TIMESTAMP: lambda v: ( + datetime.datetime.fromtimestamp(0) + if v is None or (isinstance(v, str) and v.strip() == "") + else datetime.datetime.fromisoformat(v) + ), + AttributeType.LARGE_BINARY: lambda v: ( + (_ for _ in ()).throw( + ValueError( + "UiParameter does not support LARGE_BINARY values. " + "Use a supported type instead." + ) + ) + ), +} + # Only single-directional mapping. TO_PYOBJECT_MAPPING = { AttributeType.STRING: str, diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..b3b6f4dacfa 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -17,7 +17,7 @@ from loguru import logger from overrides import overrides -from typing import Iterator, Optional, Union +from typing import Iterator, Optional, Union, Dict, Any from pyamber import * from .storage.dataset_file_document import DatasetFileDocument @@ -30,6 +30,7 @@ UDFSourceOperator, ) from core.models.type.large_binary import largebinary +from core.models.schema.attribute_type import * __all__ = [ "State", @@ -53,4 +54,7 @@ "Iterator", "Optional", "Union", + "Dict", + "Any", + "AttributeType", ] diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 003225c75c3..0d07e36fc0b 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -16,12 +16,127 @@ # under the License. from abc import abstractmethod -from typing import Iterator, Optional, Union +from dataclasses import dataclass +import functools +from typing import Any, Dict, Iterator, Optional, Union from pyamber import * +from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING -class UDFOperatorV2(TupleOperatorV2): +@dataclass(frozen=True) +class _UiParameterValue: + name: str + type: AttributeType + value: Any + + +class _UiParameterSupport: + _ui_parameter_injected_values: Dict[str, Any] + _ui_parameter_name_types: Dict[str, AttributeType] + _unsupported_ui_parameter_types = { + AttributeType.BINARY, + AttributeType.LARGE_BINARY, + } + + # Reserved hook name. Backend injector will generate this in the user's class. + def _texera_injected_ui_parameters(self) -> Dict[str, Any]: + return {} + + def _ensure_ui_parameter_state(self) -> None: + if "_ui_parameter_injected_values" not in self.__dict__: + self._ui_parameter_injected_values = {} + if "_ui_parameter_name_types" not in self.__dict__: + self._ui_parameter_name_types = {} + + def _texera_apply_injected_ui_parameters(self) -> None: + self._ensure_ui_parameter_state() + values = self._texera_injected_ui_parameters() + self._ui_parameter_injected_values = dict(values or {}) + self._ui_parameter_name_types = {} + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + + # Wrap the effective open() method once per subclass. + original_open = getattr(cls, "open", None) + if original_open is None: + return + + # Avoid double wrapping + if getattr(original_open, "__texera_ui_params_wrapped__", False): + return + + @functools.wraps(original_open) + def wrapped_open(self, *args, **kwargs): + self._texera_apply_injected_ui_parameters() + return original_open(self, *args, **kwargs) + + setattr(wrapped_open, "__texera_ui_params_wrapped__", True) + cls.open = wrapped_open + + def UiParameter( + self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any + ) -> _UiParameterValue: + if "type" in kwargs: + if attr_type is not None: + raise TypeError("UiParameter.type was provided multiple times.") + attr_type = kwargs.pop("type") + + if kwargs: + unexpected_arguments = ", ".join(sorted(kwargs)) + raise TypeError( + f"UiParameter got unexpected keyword argument(s): " + f"{unexpected_arguments}." + ) + + if attr_type is None: + raise TypeError("UiParameter.type is required.") + + if not isinstance(attr_type, AttributeType): + raise TypeError( + f"UiParameter.type must be an AttributeType, got {attr_type!r}." + ) + + self._ensure_ui_parameter_state() + existing_type = self._ui_parameter_name_types.get(name) + if existing_type is not None and existing_type != attr_type: + raise ValueError( + f"Duplicate UiParameter name '{name}' with conflicting types: " + f"{existing_type.name} vs {attr_type.name}." + ) + + self._ui_parameter_name_types[name] = attr_type + raw_value = self._ui_parameter_injected_values.get(name) + return _UiParameterValue( + name=name, + type=attr_type, + value=self._parse(raw_value, attr_type), + ) + + @staticmethod + def _parse(value: Any, attr_type: AttributeType) -> Any: + if value is None: + return None + + py_type = FROM_STRING_PARSER_MAPPING.get(attr_type) + if py_type is None: + raise TypeError( + f"UiParameter.type {attr_type!r} is not supported for parsing." + ) + + try: + return py_type(value) + except Exception as e: + if attr_type in _UiParameterSupport._unsupported_ui_parameter_types: + raise ValueError(str(e)) from e + raise ValueError( + f"Failed to parse UiParameter value {value!r} as {attr_type.name}. " + f"Please provide a valid {attr_type.name.lower()} value." + ) from e + + +class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2): """ Base class for tuple-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -65,7 +180,7 @@ def close(self) -> None: pass -class UDFSourceOperator(SourceOperator): +class UDFSourceOperator(_UiParameterSupport, SourceOperator): def open(self) -> None: """ Open a context of the operator. Usually can be used for loading/initiating some @@ -90,7 +205,7 @@ def close(self) -> None: pass -class UDFTableOperator(TableOperator): +class UDFTableOperator(_UiParameterSupport, TableOperator): """ Base class for table-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -123,7 +238,7 @@ def close(self) -> None: pass -class UDFBatchOperator(BatchOperator): +class UDFBatchOperator(_UiParameterSupport, BatchOperator): """ Base class for batch-oriented user-defined operators. A concrete implementation must be provided upon using. diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py new file mode 100644 index 00000000000..85c9370aca2 --- /dev/null +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -0,0 +1,194 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from typing import Iterator, Optional + +import pytest + +from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2 +from pytexera.udf.udf_operator import _UiParameterSupport + + +class InjectedParametersOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return { + "count": "7", + "enabled": "1", + "created_at": "2024-01-01T00:00:00", + } + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + self.enabled_parameter = self.UiParameter( + name="enabled", type=AttributeType.BOOL + ) + self.created_at_parameter = self.UiParameter( + "created_at", type=AttributeType.TIMESTAMP + ) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class ConflictingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.UiParameter("duplicate", AttributeType.INT) + self.UiParameter("duplicate", AttributeType.STRING) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class FirstIndependentParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"count": "1"} + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SecondIndependentParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"count": "2"} + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class TestUiParameterSupport: + def test_injected_values_are_applied_before_open(self): + operator = InjectedParametersOperator() + + operator.open() + + assert operator.count_parameter.value == 7 + assert operator.enabled_parameter.value is True + assert operator.created_at_parameter.value == datetime.datetime( + 2024, 1, 1, 0, 0 + ) + + def test_duplicate_parameter_names_with_conflicting_types_raise(self): + operator = ConflictingParameterOperator() + + with pytest.raises(ValueError) as exc_info: + operator.open() + + assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value) + + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected"), + [ + ("hello", AttributeType.STRING, "hello"), + ("7", AttributeType.INT, 7), + ("99", AttributeType.LONG, 99), + ("3.14", AttributeType.DOUBLE, 3.14), + ("1", AttributeType.BOOL, True), + ( + "2024-01-01T00:00:00", + AttributeType.TIMESTAMP, + datetime.datetime(2024, 1, 1, 0, 0), + ), + ], + ) + def test_parse_supported_types(self, raw_value, attr_type, expected): + assert _UiParameterSupport._parse(raw_value, attr_type) == expected + + def test_java_attribute_type_aliases_parse_like_python_names(self): + assert AttributeType.INTEGER is AttributeType.INT + assert AttributeType.BOOLEAN is AttributeType.BOOL + assert _UiParameterSupport._parse("7", AttributeType.INTEGER) == 7 + assert _UiParameterSupport._parse("false", AttributeType.BOOLEAN) is False + + @pytest.mark.parametrize( + ("raw_value", "expected"), + [ + ("", False), + (" ", False), + ("True", True), + ("true", True), + ("1", True), + ("1.0", True), + ("2", True), + ("-1", True), + ("False", False), + ("false", False), + ("0", False), + ("0.0", False), + ], + ) + def test_parse_bool_string_values(self, raw_value, expected): + assert _UiParameterSupport._parse(raw_value, AttributeType.BOOL) is expected + + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected_message"), + [ + ( + "payload", + AttributeType.BINARY, + "UiParameter does not support BINARY values", + ), + ( + "s3://bucket/path/to/object", + AttributeType.LARGE_BINARY, + "UiParameter does not support LARGE_BINARY values", + ), + ], + ) + def test_parse_binary_types_raise_helpful_error( + self, raw_value, attr_type, expected_message + ): + with pytest.raises(ValueError, match=expected_message): + _UiParameterSupport._parse(raw_value, attr_type) + + def test_parse_unsupported_type_raises_helpful_error(self): + with pytest.raises(TypeError, match="UiParameter.type .* is not supported"): + _UiParameterSupport._parse("value", object()) + + def test_wrapped_open_uses_instance_local_state(self): + assert ( + getattr( + FirstIndependentParameterOperator.open, + "__texera_ui_params_wrapped__", + False, + ) + is True + ) + + first_operator = FirstIndependentParameterOperator() + second_operator = SecondIndependentParameterOperator() + + first_operator.open() + second_operator.open() + + assert first_operator.count_parameter.value == 1 + assert second_operator.count_parameter.value == 2 + assert first_operator._ui_parameter_injected_values == {"count": "1"} + assert second_operator._ui_parameter_injected_values == {"count": "2"} + assert ( + first_operator._ui_parameter_injected_values + is not second_operator._ui_parameter_injected_values + ) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala index 13c25a436ec..7b26761290e 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -85,8 +85,7 @@ object PythonUdfUiParameterInjector { private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { val injectedParametersMap = buildInjectedParametersMap(uiParameters) - (pyb"""|# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets. - |@overrides + (pyb"""|@overrides |$InjectedUiParametersHookMethodHeader | return {""" + injectedParametersMap + diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala index d5a2534758f..a741dfbf037 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -104,9 +104,6 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")) injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") - injectedCode should include( - "# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets." - ) injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") injectedCode should include("return {") injectedCode should include("self.decode_python_template") From a00eb4e3e48add3a14f161aa6385799ce6e6d230 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Sun, 14 Jun 2026 23:49:17 -0600 Subject: [PATCH 2/6] fix(pyamber): address UI parameter runtime review --- .../core/models/schema/attribute_type.py | 76 ++++++-------- amber/src/main/python/pytexera/__init__.py | 2 +- .../main/python/pytexera/udf/udf_operator.py | 32 +++--- .../python/pytexera/udf/test_udf_operator.py | 99 +++++++++++++++++++ 4 files changed, 153 insertions(+), 56 deletions(-) diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 9d8ec690546..0faa11f420b 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -33,7 +33,7 @@ class AttributeType(Enum): STRING = 1 INT = 2 - # Java enum-name aliases accepted by the UI parameter parser. + # Attribute access needs Java names; RAW_TYPE_MAPPING handles raw schema strings. INTEGER = 2 LONG = 3 BOOL = 4 @@ -81,51 +81,39 @@ class AttributeType(Enum): } +def _is_empty_value(v): + return v is None or (isinstance(v, str) and v.strip() == "") + + +def _parse_bool(v): + if _is_empty_value(v): + return False + + normalized_value = str(v).strip().lower() + if normalized_value == "true": + return True + if normalized_value == "false": + return False + return float(normalized_value) != 0 + + +def _parse_timestamp(v): + if _is_empty_value(v): + return datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + normalized_value = str(v) + if normalized_value.endswith("Z"): + normalized_value = normalized_value[:-1] + "+00:00" + return datetime.datetime.fromisoformat(normalized_value) + + FROM_STRING_PARSER_MAPPING = { AttributeType.STRING: str, - AttributeType.INT: lambda v: ( - 0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v) - ), - AttributeType.LONG: lambda v: ( - 0 if v is None or (isinstance(v, str) and v.strip() == "") else int(v) - ), - AttributeType.DOUBLE: lambda v: ( - 0.0 if v is None or (isinstance(v, str) and v.strip() == "") else float(v) - ), - AttributeType.BOOL: lambda v: ( - False - if v is None or (isinstance(v, str) and v.strip() == "") - else ( - True - if str(v).strip().lower() == "true" - else ( - False - if str(v).strip().lower() == "false" - else float(str(v).strip()) != 0 - ) - ) - ), - AttributeType.BINARY: lambda v: ( - (_ for _ in ()).throw( - ValueError( - "UiParameter does not support BINARY values. " - "Use a supported type instead." - ) - ) - ), - AttributeType.TIMESTAMP: lambda v: ( - datetime.datetime.fromtimestamp(0) - if v is None or (isinstance(v, str) and v.strip() == "") - else datetime.datetime.fromisoformat(v) - ), - AttributeType.LARGE_BINARY: lambda v: ( - (_ for _ in ()).throw( - ValueError( - "UiParameter does not support LARGE_BINARY values. " - "Use a supported type instead." - ) - ) - ), + AttributeType.INT: lambda v: 0 if _is_empty_value(v) else int(v), + AttributeType.LONG: lambda v: 0 if _is_empty_value(v) else int(v), + AttributeType.DOUBLE: lambda v: 0.0 if _is_empty_value(v) else float(v), + AttributeType.BOOL: _parse_bool, + AttributeType.TIMESTAMP: _parse_timestamp, } # Only single-directional mapping. diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index b3b6f4dacfa..421d4c7e490 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -30,7 +30,7 @@ UDFSourceOperator, ) from core.models.type.large_binary import largebinary -from core.models.schema.attribute_type import * +from core.models.schema.attribute_type import AttributeType __all__ = [ "State", diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 0d07e36fc0b..7a1b5def96c 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -58,19 +58,25 @@ def _texera_apply_injected_ui_parameters(self) -> None: def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) - # Wrap the effective open() method once per subclass. original_open = getattr(cls, "open", None) if original_open is None: return - # Avoid double wrapping if getattr(original_open, "__texera_ui_params_wrapped__", False): return @functools.wraps(original_open) def wrapped_open(self, *args, **kwargs): - self._texera_apply_injected_ui_parameters() - return original_open(self, *args, **kwargs) + """Apply injected UI parameters once before the outermost open().""" + if getattr(self, "_ui_parameter_open_in_progress", False): + return original_open(self, *args, **kwargs) + + self._ui_parameter_open_in_progress = True + try: + self._texera_apply_injected_ui_parameters() + return original_open(self, *args, **kwargs) + finally: + self._ui_parameter_open_in_progress = False setattr(wrapped_open, "__texera_ui_params_wrapped__", True) cls.open = wrapped_open @@ -116,20 +122,24 @@ def UiParameter( @staticmethod def _parse(value: Any, attr_type: AttributeType) -> Any: - if value is None: - return None + if attr_type in _UiParameterSupport._unsupported_ui_parameter_types: + raise ValueError( + f"UiParameter does not support {attr_type.name} values. " + "Use a supported type instead." + ) - py_type = FROM_STRING_PARSER_MAPPING.get(attr_type) - if py_type is None: + parser = FROM_STRING_PARSER_MAPPING.get(attr_type) + if parser is None: raise TypeError( f"UiParameter.type {attr_type!r} is not supported for parsing." ) + if value is None: + return None + try: - return py_type(value) + return parser(value) except Exception as e: - if attr_type in _UiParameterSupport._unsupported_ui_parameter_types: - raise ValueError(str(e)) from e raise ValueError( f"Failed to parse UiParameter value {value!r} as {attr_type.name}. " f"Please provide a valid {attr_type.name.lower()} value." diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py index 85c9370aca2..39bff9fac2b 100644 --- a/amber/src/test/python/pytexera/udf/test_udf_operator.py +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -79,6 +79,46 @@ def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike yield tuple_ +class MissingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"sent": "1"} + + def open(self): + self.missing_parameter = self.UiParameter("missing", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SuperOpenParameterOperator(UDFOperatorV2): + def __init__(self): + self.hook_call_count = 0 + + def _texera_injected_ui_parameters(self): + self.hook_call_count += 1 + return {"count": "3"} + + def open(self): + super().open() + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SuperOpenConflictingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.UiParameter("duplicate", AttributeType.INT) + super().open() + self.UiParameter("duplicate", AttributeType.STRING) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + class TestUiParameterSupport: def test_injected_values_are_applied_before_open(self): operator = InjectedParametersOperator() @@ -112,11 +152,32 @@ def test_duplicate_parameter_names_with_conflicting_types_raise(self): AttributeType.TIMESTAMP, datetime.datetime(2024, 1, 1, 0, 0), ), + ( + "2024-01-01T00:00:00Z", + AttributeType.TIMESTAMP, + datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), ], ) def test_parse_supported_types(self, raw_value, attr_type, expected): assert _UiParameterSupport._parse(raw_value, attr_type) == expected + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected"), + [ + ("", AttributeType.INT, 0), + (" ", AttributeType.LONG, 0), + ("", AttributeType.DOUBLE, 0.0), + ( + "", + AttributeType.TIMESTAMP, + datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc), + ), + ], + ) + def test_parse_empty_values(self, raw_value, attr_type, expected): + assert _UiParameterSupport._parse(raw_value, attr_type) == expected + def test_java_attribute_type_aliases_parse_like_python_names(self): assert AttributeType.INTEGER is AttributeType.INT assert AttributeType.BOOLEAN is AttributeType.BOOL @@ -156,6 +217,11 @@ def test_parse_bool_string_values(self, raw_value, expected): AttributeType.LARGE_BINARY, "UiParameter does not support LARGE_BINARY values", ), + ( + None, + AttributeType.BINARY, + "UiParameter does not support BINARY values", + ), ], ) def test_parse_binary_types_raise_helpful_error( @@ -168,6 +234,39 @@ def test_parse_unsupported_type_raises_helpful_error(self): with pytest.raises(TypeError, match="UiParameter.type .* is not supported"): _UiParameterSupport._parse("value", object()) + def test_missing_injected_name_returns_none(self): + operator = MissingParameterOperator() + + operator.open() + + assert operator.missing_parameter.value is None + + def test_ui_parameter_argument_errors(self): + operator = MissingParameterOperator() + + with pytest.raises(TypeError, match="provided multiple times"): + operator.UiParameter("count", AttributeType.INT, type=AttributeType.INT) + with pytest.raises(TypeError, match="unexpected keyword argument"): + operator.UiParameter("count", AttributeType.INT, value="1") + with pytest.raises(TypeError, match="UiParameter.type is required"): + operator.UiParameter("count") + with pytest.raises(TypeError, match="must be an AttributeType"): + operator.UiParameter("count", object()) + + def test_super_open_applies_injected_values_once(self): + operator = SuperOpenParameterOperator() + + operator.open() + + assert operator.hook_call_count == 1 + assert operator.count_parameter.value == 3 + + def test_super_open_does_not_reset_duplicate_tracking(self): + operator = SuperOpenConflictingParameterOperator() + + with pytest.raises(ValueError, match="Duplicate UiParameter name"): + operator.open() + def test_wrapped_open_uses_instance_local_state(self): assert ( getattr( From e528fb1b64e68b378ea6c2a1c10d5ed4aef8b0c6 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Mon, 15 Jun 2026 21:18:13 -0600 Subject: [PATCH 3/6] test(pyamber): document repeated UI parameter reads --- .../main/python/pytexera/udf/udf_operator.py | 7 ++++++ .../python/pytexera/udf/test_udf_operator.py | 22 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 7a1b5def96c..248c51179bb 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -84,6 +84,13 @@ def wrapped_open(self, *args, **kwargs): def UiParameter( self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any ) -> _UiParameterValue: + """ + Return the current UI parameter value parsed as attr_type. + + Re-reading the same name with the same type is idempotent. Reusing a + name with a different type is rejected because the parsed value would be + ambiguous. + """ if "type" in kwargs: if attr_type is not None: raise TypeError("UiParameter.type was provided multiple times.") diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py index 39bff9fac2b..2a345ac6934 100644 --- a/amber/src/test/python/pytexera/udf/test_udf_operator.py +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -57,6 +57,18 @@ def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike yield tuple_ +class RepeatedParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.first_parameter = self.UiParameter("duplicate", AttributeType.INT) + self.second_parameter = self.UiParameter("duplicate", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + class FirstIndependentParameterOperator(UDFOperatorV2): def _texera_injected_ui_parameters(self): return {"count": "1"} @@ -139,6 +151,16 @@ def test_duplicate_parameter_names_with_conflicting_types_raise(self): assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value) + def test_duplicate_parameter_names_with_same_type_succeed(self): + operator = RepeatedParameterOperator() + + operator.open() + + assert operator.first_parameter.value == 1 + assert operator.second_parameter.value == 1 + assert operator.first_parameter.type is AttributeType.INT + assert operator.second_parameter.type is AttributeType.INT + @pytest.mark.parametrize( ("raw_value", "attr_type", "expected"), [ From 65c23da3c16203c86c24390922b4e5afc0fb93a1 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Mon, 15 Jun 2026 21:28:22 -0600 Subject: [PATCH 4/6] fix(pyamber): warn on unmatched UI parameters --- .../main/python/pytexera/udf/udf_operator.py | 34 +++++++++++++++++-- .../python/pytexera/udf/test_udf_operator.py | 27 +++++++++++++-- 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 248c51179bb..80a603d6fc3 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -18,11 +18,14 @@ from abc import abstractmethod from dataclasses import dataclass import functools -from typing import Any, Dict, Iterator, Optional, Union +import logging +from typing import Any, Dict, Iterator, Optional, Set, Union from pyamber import * from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING +logger = logging.getLogger(__name__) + @dataclass(frozen=True) class _UiParameterValue: @@ -34,6 +37,7 @@ class _UiParameterValue: class _UiParameterSupport: _ui_parameter_injected_values: Dict[str, Any] _ui_parameter_name_types: Dict[str, AttributeType] + _ui_parameter_used_names: Set[str] _unsupported_ui_parameter_types = { AttributeType.BINARY, AttributeType.LARGE_BINARY, @@ -48,12 +52,25 @@ def _ensure_ui_parameter_state(self) -> None: self._ui_parameter_injected_values = {} if "_ui_parameter_name_types" not in self.__dict__: self._ui_parameter_name_types = {} + if "_ui_parameter_used_names" not in self.__dict__: + self._ui_parameter_used_names = set() def _texera_apply_injected_ui_parameters(self) -> None: self._ensure_ui_parameter_state() values = self._texera_injected_ui_parameters() self._ui_parameter_injected_values = dict(values or {}) self._ui_parameter_name_types = {} + self._ui_parameter_used_names = set() + + def _warn_unused_injected_ui_parameters(self) -> None: + unused_names = sorted( + set(self._ui_parameter_injected_values) - self._ui_parameter_used_names + ) + if unused_names: + logger.warning( + "Injected UI parameter value(s) were not used: %s.", + ", ".join(unused_names), + ) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) @@ -74,7 +91,9 @@ def wrapped_open(self, *args, **kwargs): self._ui_parameter_open_in_progress = True try: self._texera_apply_injected_ui_parameters() - return original_open(self, *args, **kwargs) + result = original_open(self, *args, **kwargs) + self._warn_unused_injected_ui_parameters() + return result finally: self._ui_parameter_open_in_progress = False @@ -120,7 +139,16 @@ def UiParameter( ) self._ui_parameter_name_types[name] = attr_type - raw_value = self._ui_parameter_injected_values.get(name) + if name in self._ui_parameter_injected_values: + self._ui_parameter_used_names.add(name) + raw_value = self._ui_parameter_injected_values[name] + else: + logger.warning( + "No injected UI parameter value found for name '%s'.", + name, + ) + raw_value = None + return _UiParameterValue( name=name, type=attr_type, diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py index 2a345ac6934..73d1d782a18 100644 --- a/amber/src/test/python/pytexera/udf/test_udf_operator.py +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -16,6 +16,7 @@ # under the License. import datetime +import logging from typing import Iterator, Optional import pytest @@ -102,6 +103,17 @@ def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike yield tuple_ +class UnusedParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"used": "1", "unused": "2"} + + def open(self): + self.used_parameter = self.UiParameter("used", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + class SuperOpenParameterOperator(UDFOperatorV2): def __init__(self): self.hook_call_count = 0 @@ -256,12 +268,23 @@ def test_parse_unsupported_type_raises_helpful_error(self): with pytest.raises(TypeError, match="UiParameter.type .* is not supported"): _UiParameterSupport._parse("value", object()) - def test_missing_injected_name_returns_none(self): + def test_missing_injected_name_returns_none_and_warns(self, caplog): operator = MissingParameterOperator() - operator.open() + with caplog.at_level(logging.WARNING): + operator.open() assert operator.missing_parameter.value is None + assert "No injected UI parameter value found for name 'missing'" in caplog.text + + def test_unused_injected_name_warns(self, caplog): + operator = UnusedParameterOperator() + + with caplog.at_level(logging.WARNING): + operator.open() + + assert operator.used_parameter.value == 1 + assert "Injected UI parameter value(s) were not used: unused" in caplog.text def test_ui_parameter_argument_errors(self): operator = MissingParameterOperator() From e6014f57ef1c71a96a7145b0baecf93925cde320 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Mon, 15 Jun 2026 21:33:44 -0600 Subject: [PATCH 5/6] chore(pyamber): use loguru for UI parameter warnings --- .../main/python/pytexera/udf/udf_operator.py | 8 ++--- .../python/pytexera/udf/test_udf_operator.py | 33 ++++++++++++++----- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 80a603d6fc3..7c0fd239c7e 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -18,13 +18,11 @@ from abc import abstractmethod from dataclasses import dataclass import functools -import logging from typing import Any, Dict, Iterator, Optional, Set, Union from pyamber import * from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING - -logger = logging.getLogger(__name__) +from loguru import logger @dataclass(frozen=True) @@ -68,7 +66,7 @@ def _warn_unused_injected_ui_parameters(self) -> None: ) if unused_names: logger.warning( - "Injected UI parameter value(s) were not used: %s.", + "Injected UI parameter value(s) were not used: {}.", ", ".join(unused_names), ) @@ -144,7 +142,7 @@ def UiParameter( raw_value = self._ui_parameter_injected_values[name] else: logger.warning( - "No injected UI parameter value found for name '%s'.", + "No injected UI parameter value found for name '{}'.", name, ) raw_value = None diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py index 73d1d782a18..9c45027e639 100644 --- a/amber/src/test/python/pytexera/udf/test_udf_operator.py +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -16,10 +16,10 @@ # under the License. import datetime -import logging from typing import Iterator, Optional import pytest +import pytexera.udf.udf_operator as udf_operator from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2 from pytexera.udf.udf_operator import _UiParameterSupport @@ -268,23 +268,38 @@ def test_parse_unsupported_type_raises_helpful_error(self): with pytest.raises(TypeError, match="UiParameter.type .* is not supported"): _UiParameterSupport._parse("value", object()) - def test_missing_injected_name_returns_none_and_warns(self, caplog): + def test_missing_injected_name_returns_none_and_warns(self, monkeypatch): operator = MissingParameterOperator() + warning_calls = [] + monkeypatch.setattr( + udf_operator.logger, + "warning", + lambda msg, *args, **kwargs: warning_calls.append(msg.format(*args)), + ) - with caplog.at_level(logging.WARNING): - operator.open() + operator.open() assert operator.missing_parameter.value is None - assert "No injected UI parameter value found for name 'missing'" in caplog.text + assert any( + "No injected UI parameter value found for name 'missing'" in call + for call in warning_calls + ) - def test_unused_injected_name_warns(self, caplog): + def test_unused_injected_name_warns(self, monkeypatch): operator = UnusedParameterOperator() + warning_calls = [] + monkeypatch.setattr( + udf_operator.logger, + "warning", + lambda msg, *args, **kwargs: warning_calls.append(msg.format(*args)), + ) - with caplog.at_level(logging.WARNING): - operator.open() + operator.open() assert operator.used_parameter.value == 1 - assert "Injected UI parameter value(s) were not used: unused" in caplog.text + assert warning_calls == [ + "Injected UI parameter value(s) were not used: unused." + ] def test_ui_parameter_argument_errors(self): operator = MissingParameterOperator() From 609026bedbfeafe4f7881a820033749de026e0d1 Mon Sep 17 00:00:00 2001 From: carloea2 Date: Tue, 23 Jun 2026 00:19:52 -0600 Subject: [PATCH 6/6] fix(pyamber): normalize parsed timestamps to UTC --- .../src/main/python/core/models/schema/attribute_type.py | 8 +++++++- amber/src/test/python/pytexera/udf/test_udf_operator.py | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 0faa11f420b..a5c9ad2b6ff 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -104,7 +104,13 @@ def _parse_timestamp(v): normalized_value = str(v) if normalized_value.endswith("Z"): normalized_value = normalized_value[:-1] + "+00:00" - return datetime.datetime.fromisoformat(normalized_value) + parsed_value = datetime.datetime.fromisoformat(normalized_value) + if ( + parsed_value.tzinfo is None + or parsed_value.tzinfo.utcoffset(parsed_value) is None + ): + return parsed_value.replace(tzinfo=datetime.timezone.utc) + return parsed_value FROM_STRING_PARSER_MAPPING = { diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py index 9c45027e639..b96199d450c 100644 --- a/amber/src/test/python/pytexera/udf/test_udf_operator.py +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -152,7 +152,7 @@ def test_injected_values_are_applied_before_open(self): assert operator.count_parameter.value == 7 assert operator.enabled_parameter.value is True assert operator.created_at_parameter.value == datetime.datetime( - 2024, 1, 1, 0, 0 + 2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc ) def test_duplicate_parameter_names_with_conflicting_types_raise(self): @@ -184,7 +184,7 @@ def test_duplicate_parameter_names_with_same_type_succeed(self): ( "2024-01-01T00:00:00", AttributeType.TIMESTAMP, - datetime.datetime(2024, 1, 1, 0, 0), + datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), ), ( "2024-01-01T00:00:00Z",