diff --git a/amber/src/main/python/core/models/schema/attribute_type.py b/amber/src/main/python/core/models/schema/attribute_type.py index 24d0745f41e..a5c9ad2b6ff 100644 --- a/amber/src/main/python/core/models/schema/attribute_type.py +++ b/amber/src/main/python/core/models/schema/attribute_type.py @@ -33,8 +33,11 @@ class AttributeType(Enum): STRING = 1 INT = 2 + # Attribute access needs Java names; RAW_TYPE_MAPPING handles raw schema strings. + INTEGER = 2 LONG = 3 BOOL = 4 + BOOLEAN = 4 DOUBLE = 5 TIMESTAMP = 6 BINARY = 7 @@ -78,6 +81,47 @@ class AttributeType(Enum): } +def _is_empty_value(v): + return v is None or (isinstance(v, str) and v.strip() == "") + + +def _parse_bool(v): + if _is_empty_value(v): + return False + + normalized_value = str(v).strip().lower() + if normalized_value == "true": + return True + if normalized_value == "false": + return False + return float(normalized_value) != 0 + + +def _parse_timestamp(v): + if _is_empty_value(v): + return datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + + normalized_value = str(v) + if normalized_value.endswith("Z"): + normalized_value = normalized_value[:-1] + "+00:00" + parsed_value = datetime.datetime.fromisoformat(normalized_value) + if ( + parsed_value.tzinfo is None + or parsed_value.tzinfo.utcoffset(parsed_value) is None + ): + return parsed_value.replace(tzinfo=datetime.timezone.utc) + return parsed_value + + +FROM_STRING_PARSER_MAPPING = { + AttributeType.STRING: str, + AttributeType.INT: lambda v: 0 if _is_empty_value(v) else int(v), + AttributeType.LONG: lambda v: 0 if _is_empty_value(v) else int(v), + AttributeType.DOUBLE: lambda v: 0.0 if _is_empty_value(v) else float(v), + AttributeType.BOOL: _parse_bool, + AttributeType.TIMESTAMP: _parse_timestamp, +} + # Only single-directional mapping. TO_PYOBJECT_MAPPING = { AttributeType.STRING: str, diff --git a/amber/src/main/python/pytexera/__init__.py b/amber/src/main/python/pytexera/__init__.py index e40d1a43fe0..421d4c7e490 100644 --- a/amber/src/main/python/pytexera/__init__.py +++ b/amber/src/main/python/pytexera/__init__.py @@ -17,7 +17,7 @@ from loguru import logger from overrides import overrides -from typing import Iterator, Optional, Union +from typing import Iterator, Optional, Union, Dict, Any from pyamber import * from .storage.dataset_file_document import DatasetFileDocument @@ -30,6 +30,7 @@ UDFSourceOperator, ) from core.models.type.large_binary import largebinary +from core.models.schema.attribute_type import AttributeType __all__ = [ "State", @@ -53,4 +54,7 @@ "Iterator", "Optional", "Union", + "Dict", + "Any", + "AttributeType", ] diff --git a/amber/src/main/python/pytexera/udf/udf_operator.py b/amber/src/main/python/pytexera/udf/udf_operator.py index 003225c75c3..7c0fd239c7e 100644 --- a/amber/src/main/python/pytexera/udf/udf_operator.py +++ b/amber/src/main/python/pytexera/udf/udf_operator.py @@ -16,12 +16,170 @@ # under the License. from abc import abstractmethod -from typing import Iterator, Optional, Union +from dataclasses import dataclass +import functools +from typing import Any, Dict, Iterator, Optional, Set, Union from pyamber import * +from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING +from loguru import logger -class UDFOperatorV2(TupleOperatorV2): +@dataclass(frozen=True) +class _UiParameterValue: + name: str + type: AttributeType + value: Any + + +class _UiParameterSupport: + _ui_parameter_injected_values: Dict[str, Any] + _ui_parameter_name_types: Dict[str, AttributeType] + _ui_parameter_used_names: Set[str] + _unsupported_ui_parameter_types = { + AttributeType.BINARY, + AttributeType.LARGE_BINARY, + } + + # Reserved hook name. Backend injector will generate this in the user's class. + def _texera_injected_ui_parameters(self) -> Dict[str, Any]: + return {} + + def _ensure_ui_parameter_state(self) -> None: + if "_ui_parameter_injected_values" not in self.__dict__: + self._ui_parameter_injected_values = {} + if "_ui_parameter_name_types" not in self.__dict__: + self._ui_parameter_name_types = {} + if "_ui_parameter_used_names" not in self.__dict__: + self._ui_parameter_used_names = set() + + def _texera_apply_injected_ui_parameters(self) -> None: + self._ensure_ui_parameter_state() + values = self._texera_injected_ui_parameters() + self._ui_parameter_injected_values = dict(values or {}) + self._ui_parameter_name_types = {} + self._ui_parameter_used_names = set() + + def _warn_unused_injected_ui_parameters(self) -> None: + unused_names = sorted( + set(self._ui_parameter_injected_values) - self._ui_parameter_used_names + ) + if unused_names: + logger.warning( + "Injected UI parameter value(s) were not used: {}.", + ", ".join(unused_names), + ) + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + + original_open = getattr(cls, "open", None) + if original_open is None: + return + + if getattr(original_open, "__texera_ui_params_wrapped__", False): + return + + @functools.wraps(original_open) + def wrapped_open(self, *args, **kwargs): + """Apply injected UI parameters once before the outermost open().""" + if getattr(self, "_ui_parameter_open_in_progress", False): + return original_open(self, *args, **kwargs) + + self._ui_parameter_open_in_progress = True + try: + self._texera_apply_injected_ui_parameters() + result = original_open(self, *args, **kwargs) + self._warn_unused_injected_ui_parameters() + return result + finally: + self._ui_parameter_open_in_progress = False + + setattr(wrapped_open, "__texera_ui_params_wrapped__", True) + cls.open = wrapped_open + + def UiParameter( + self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any + ) -> _UiParameterValue: + """ + Return the current UI parameter value parsed as attr_type. + + Re-reading the same name with the same type is idempotent. Reusing a + name with a different type is rejected because the parsed value would be + ambiguous. + """ + if "type" in kwargs: + if attr_type is not None: + raise TypeError("UiParameter.type was provided multiple times.") + attr_type = kwargs.pop("type") + + if kwargs: + unexpected_arguments = ", ".join(sorted(kwargs)) + raise TypeError( + f"UiParameter got unexpected keyword argument(s): " + f"{unexpected_arguments}." + ) + + if attr_type is None: + raise TypeError("UiParameter.type is required.") + + if not isinstance(attr_type, AttributeType): + raise TypeError( + f"UiParameter.type must be an AttributeType, got {attr_type!r}." + ) + + self._ensure_ui_parameter_state() + existing_type = self._ui_parameter_name_types.get(name) + if existing_type is not None and existing_type != attr_type: + raise ValueError( + f"Duplicate UiParameter name '{name}' with conflicting types: " + f"{existing_type.name} vs {attr_type.name}." + ) + + self._ui_parameter_name_types[name] = attr_type + if name in self._ui_parameter_injected_values: + self._ui_parameter_used_names.add(name) + raw_value = self._ui_parameter_injected_values[name] + else: + logger.warning( + "No injected UI parameter value found for name '{}'.", + name, + ) + raw_value = None + + return _UiParameterValue( + name=name, + type=attr_type, + value=self._parse(raw_value, attr_type), + ) + + @staticmethod + def _parse(value: Any, attr_type: AttributeType) -> Any: + if attr_type in _UiParameterSupport._unsupported_ui_parameter_types: + raise ValueError( + f"UiParameter does not support {attr_type.name} values. " + "Use a supported type instead." + ) + + parser = FROM_STRING_PARSER_MAPPING.get(attr_type) + if parser is None: + raise TypeError( + f"UiParameter.type {attr_type!r} is not supported for parsing." + ) + + if value is None: + return None + + try: + return parser(value) + except Exception as e: + raise ValueError( + f"Failed to parse UiParameter value {value!r} as {attr_type.name}. " + f"Please provide a valid {attr_type.name.lower()} value." + ) from e + + +class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2): """ Base class for tuple-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -65,7 +223,7 @@ def close(self) -> None: pass -class UDFSourceOperator(SourceOperator): +class UDFSourceOperator(_UiParameterSupport, SourceOperator): def open(self) -> None: """ Open a context of the operator. Usually can be used for loading/initiating some @@ -90,7 +248,7 @@ def close(self) -> None: pass -class UDFTableOperator(TableOperator): +class UDFTableOperator(_UiParameterSupport, TableOperator): """ Base class for table-oriented user-defined operators. A concrete implementation must be provided upon using. @@ -123,7 +281,7 @@ def close(self) -> None: pass -class UDFBatchOperator(BatchOperator): +class UDFBatchOperator(_UiParameterSupport, BatchOperator): """ Base class for batch-oriented user-defined operators. A concrete implementation must be provided upon using. diff --git a/amber/src/test/python/pytexera/udf/test_udf_operator.py b/amber/src/test/python/pytexera/udf/test_udf_operator.py new file mode 100644 index 00000000000..b96199d450c --- /dev/null +++ b/amber/src/test/python/pytexera/udf/test_udf_operator.py @@ -0,0 +1,353 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import datetime +from typing import Iterator, Optional + +import pytest +import pytexera.udf.udf_operator as udf_operator + +from pytexera import AttributeType, Tuple, TupleLike, UDFOperatorV2 +from pytexera.udf.udf_operator import _UiParameterSupport + + +class InjectedParametersOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return { + "count": "7", + "enabled": "1", + "created_at": "2024-01-01T00:00:00", + } + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + self.enabled_parameter = self.UiParameter( + name="enabled", type=AttributeType.BOOL + ) + self.created_at_parameter = self.UiParameter( + "created_at", type=AttributeType.TIMESTAMP + ) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class ConflictingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.UiParameter("duplicate", AttributeType.INT) + self.UiParameter("duplicate", AttributeType.STRING) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class RepeatedParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.first_parameter = self.UiParameter("duplicate", AttributeType.INT) + self.second_parameter = self.UiParameter("duplicate", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class FirstIndependentParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"count": "1"} + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SecondIndependentParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"count": "2"} + + def open(self): + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class MissingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"sent": "1"} + + def open(self): + self.missing_parameter = self.UiParameter("missing", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class UnusedParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"used": "1", "unused": "2"} + + def open(self): + self.used_parameter = self.UiParameter("used", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SuperOpenParameterOperator(UDFOperatorV2): + def __init__(self): + self.hook_call_count = 0 + + def _texera_injected_ui_parameters(self): + self.hook_call_count += 1 + return {"count": "3"} + + def open(self): + super().open() + self.count_parameter = self.UiParameter("count", AttributeType.INT) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class SuperOpenConflictingParameterOperator(UDFOperatorV2): + def _texera_injected_ui_parameters(self): + return {"duplicate": "1"} + + def open(self): + self.UiParameter("duplicate", AttributeType.INT) + super().open() + self.UiParameter("duplicate", AttributeType.STRING) + + def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + yield tuple_ + + +class TestUiParameterSupport: + def test_injected_values_are_applied_before_open(self): + operator = InjectedParametersOperator() + + operator.open() + + assert operator.count_parameter.value == 7 + assert operator.enabled_parameter.value is True + assert operator.created_at_parameter.value == datetime.datetime( + 2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc + ) + + def test_duplicate_parameter_names_with_conflicting_types_raise(self): + operator = ConflictingParameterOperator() + + with pytest.raises(ValueError) as exc_info: + operator.open() + + assert "Duplicate UiParameter name 'duplicate'" in str(exc_info.value) + + def test_duplicate_parameter_names_with_same_type_succeed(self): + operator = RepeatedParameterOperator() + + operator.open() + + assert operator.first_parameter.value == 1 + assert operator.second_parameter.value == 1 + assert operator.first_parameter.type is AttributeType.INT + assert operator.second_parameter.type is AttributeType.INT + + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected"), + [ + ("hello", AttributeType.STRING, "hello"), + ("7", AttributeType.INT, 7), + ("99", AttributeType.LONG, 99), + ("3.14", AttributeType.DOUBLE, 3.14), + ("1", AttributeType.BOOL, True), + ( + "2024-01-01T00:00:00", + AttributeType.TIMESTAMP, + datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ( + "2024-01-01T00:00:00Z", + AttributeType.TIMESTAMP, + datetime.datetime(2024, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), + ), + ], + ) + def test_parse_supported_types(self, raw_value, attr_type, expected): + assert _UiParameterSupport._parse(raw_value, attr_type) == expected + + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected"), + [ + ("", AttributeType.INT, 0), + (" ", AttributeType.LONG, 0), + ("", AttributeType.DOUBLE, 0.0), + ( + "", + AttributeType.TIMESTAMP, + datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc), + ), + ], + ) + def test_parse_empty_values(self, raw_value, attr_type, expected): + assert _UiParameterSupport._parse(raw_value, attr_type) == expected + + def test_java_attribute_type_aliases_parse_like_python_names(self): + assert AttributeType.INTEGER is AttributeType.INT + assert AttributeType.BOOLEAN is AttributeType.BOOL + assert _UiParameterSupport._parse("7", AttributeType.INTEGER) == 7 + assert _UiParameterSupport._parse("false", AttributeType.BOOLEAN) is False + + @pytest.mark.parametrize( + ("raw_value", "expected"), + [ + ("", False), + (" ", False), + ("True", True), + ("true", True), + ("1", True), + ("1.0", True), + ("2", True), + ("-1", True), + ("False", False), + ("false", False), + ("0", False), + ("0.0", False), + ], + ) + def test_parse_bool_string_values(self, raw_value, expected): + assert _UiParameterSupport._parse(raw_value, AttributeType.BOOL) is expected + + @pytest.mark.parametrize( + ("raw_value", "attr_type", "expected_message"), + [ + ( + "payload", + AttributeType.BINARY, + "UiParameter does not support BINARY values", + ), + ( + "s3://bucket/path/to/object", + AttributeType.LARGE_BINARY, + "UiParameter does not support LARGE_BINARY values", + ), + ( + None, + AttributeType.BINARY, + "UiParameter does not support BINARY values", + ), + ], + ) + def test_parse_binary_types_raise_helpful_error( + self, raw_value, attr_type, expected_message + ): + with pytest.raises(ValueError, match=expected_message): + _UiParameterSupport._parse(raw_value, attr_type) + + def test_parse_unsupported_type_raises_helpful_error(self): + with pytest.raises(TypeError, match="UiParameter.type .* is not supported"): + _UiParameterSupport._parse("value", object()) + + def test_missing_injected_name_returns_none_and_warns(self, monkeypatch): + operator = MissingParameterOperator() + warning_calls = [] + monkeypatch.setattr( + udf_operator.logger, + "warning", + lambda msg, *args, **kwargs: warning_calls.append(msg.format(*args)), + ) + + operator.open() + + assert operator.missing_parameter.value is None + assert any( + "No injected UI parameter value found for name 'missing'" in call + for call in warning_calls + ) + + def test_unused_injected_name_warns(self, monkeypatch): + operator = UnusedParameterOperator() + warning_calls = [] + monkeypatch.setattr( + udf_operator.logger, + "warning", + lambda msg, *args, **kwargs: warning_calls.append(msg.format(*args)), + ) + + operator.open() + + assert operator.used_parameter.value == 1 + assert warning_calls == [ + "Injected UI parameter value(s) were not used: unused." + ] + + def test_ui_parameter_argument_errors(self): + operator = MissingParameterOperator() + + with pytest.raises(TypeError, match="provided multiple times"): + operator.UiParameter("count", AttributeType.INT, type=AttributeType.INT) + with pytest.raises(TypeError, match="unexpected keyword argument"): + operator.UiParameter("count", AttributeType.INT, value="1") + with pytest.raises(TypeError, match="UiParameter.type is required"): + operator.UiParameter("count") + with pytest.raises(TypeError, match="must be an AttributeType"): + operator.UiParameter("count", object()) + + def test_super_open_applies_injected_values_once(self): + operator = SuperOpenParameterOperator() + + operator.open() + + assert operator.hook_call_count == 1 + assert operator.count_parameter.value == 3 + + def test_super_open_does_not_reset_duplicate_tracking(self): + operator = SuperOpenConflictingParameterOperator() + + with pytest.raises(ValueError, match="Duplicate UiParameter name"): + operator.open() + + def test_wrapped_open_uses_instance_local_state(self): + assert ( + getattr( + FirstIndependentParameterOperator.open, + "__texera_ui_params_wrapped__", + False, + ) + is True + ) + + first_operator = FirstIndependentParameterOperator() + second_operator = SecondIndependentParameterOperator() + + first_operator.open() + second_operator.open() + + assert first_operator.count_parameter.value == 1 + assert second_operator.count_parameter.value == 2 + assert first_operator._ui_parameter_injected_values == {"count": "1"} + assert second_operator._ui_parameter_injected_values == {"count": "2"} + assert ( + first_operator._ui_parameter_injected_values + is not second_operator._ui_parameter_injected_values + ) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala index 13c25a436ec..7b26761290e 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -85,8 +85,7 @@ object PythonUdfUiParameterInjector { private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { val injectedParametersMap = buildInjectedParametersMap(uiParameters) - (pyb"""|# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets. - |@overrides + (pyb"""|@overrides |$InjectedUiParametersHookMethodHeader | return {""" + injectedParametersMap + diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala index d5a2534758f..a741dfbf037 100644 --- a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -104,9 +104,6 @@ class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")) injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") - injectedCode should include( - "# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets." - ) injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") injectedCode should include("return {") injectedCode should include("self.decode_python_template")