Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions amber/src/main/python/core/models/schema/attribute_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ class AttributeType(Enum):

STRING = 1
INT = 2
# Attribute access needs Java names; RAW_TYPE_MAPPING handles raw schema strings.
INTEGER = 2
LONG = 3
BOOL = 4
BOOLEAN = 4
DOUBLE = 5
TIMESTAMP = 6
BINARY = 7
Expand Down Expand Up @@ -78,6 +81,47 @@ class AttributeType(Enum):
}


def _is_empty_value(v):
return v is None or (isinstance(v, str) and v.strip() == "")


def _parse_bool(v):
if _is_empty_value(v):
return False

normalized_value = str(v).strip().lower()
if normalized_value == "true":
return True
if normalized_value == "false":
return False
return float(normalized_value) != 0


def _parse_timestamp(v):
if _is_empty_value(v):
return datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)

normalized_value = str(v)
if normalized_value.endswith("Z"):
normalized_value = normalized_value[:-1] + "+00:00"
parsed_value = datetime.datetime.fromisoformat(normalized_value)
if (
parsed_value.tzinfo is None
or parsed_value.tzinfo.utcoffset(parsed_value) is None
):
return parsed_value.replace(tzinfo=datetime.timezone.utc)
return parsed_value


FROM_STRING_PARSER_MAPPING = {
AttributeType.STRING: str,
AttributeType.INT: lambda v: 0 if _is_empty_value(v) else int(v),
AttributeType.LONG: lambda v: 0 if _is_empty_value(v) else int(v),
AttributeType.DOUBLE: lambda v: 0.0 if _is_empty_value(v) else float(v),
AttributeType.BOOL: _parse_bool,
AttributeType.TIMESTAMP: _parse_timestamp,
}

# Only single-directional mapping.
TO_PYOBJECT_MAPPING = {
AttributeType.STRING: str,
Expand Down
6 changes: 5 additions & 1 deletion amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from loguru import logger
from overrides import overrides
from typing import Iterator, Optional, Union
from typing import Iterator, Optional, Union, Dict, Any

from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
Expand All @@ -30,6 +30,7 @@
UDFSourceOperator,
)
from core.models.type.large_binary import largebinary
from core.models.schema.attribute_type import AttributeType

__all__ = [
"State",
Expand All @@ -53,4 +54,7 @@
"Iterator",
"Optional",
"Union",
"Dict",
"Any",
"AttributeType",
]
168 changes: 163 additions & 5 deletions amber/src/main/python/pytexera/udf/udf_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,170 @@
# under the License.

from abc import abstractmethod
from typing import Iterator, Optional, Union
from dataclasses import dataclass
import functools
from typing import Any, Dict, Iterator, Optional, Set, Union

from pyamber import *
from core.models.schema.attribute_type import AttributeType, FROM_STRING_PARSER_MAPPING
from loguru import logger


class UDFOperatorV2(TupleOperatorV2):
@dataclass(frozen=True)
class _UiParameterValue:
name: str
type: AttributeType
value: Any


class _UiParameterSupport:
_ui_parameter_injected_values: Dict[str, Any]
_ui_parameter_name_types: Dict[str, AttributeType]
_ui_parameter_used_names: Set[str]
_unsupported_ui_parameter_types = {
AttributeType.BINARY,
AttributeType.LARGE_BINARY,
}

# Reserved hook name. Backend injector will generate this in the user's class.
def _texera_injected_ui_parameters(self) -> Dict[str, Any]:
return {}

def _ensure_ui_parameter_state(self) -> None:
if "_ui_parameter_injected_values" not in self.__dict__:
self._ui_parameter_injected_values = {}
if "_ui_parameter_name_types" not in self.__dict__:
self._ui_parameter_name_types = {}
if "_ui_parameter_used_names" not in self.__dict__:
self._ui_parameter_used_names = set()

def _texera_apply_injected_ui_parameters(self) -> None:
self._ensure_ui_parameter_state()
values = self._texera_injected_ui_parameters()
self._ui_parameter_injected_values = dict(values or {})
self._ui_parameter_name_types = {}
self._ui_parameter_used_names = set()

def _warn_unused_injected_ui_parameters(self) -> None:
unused_names = sorted(
set(self._ui_parameter_injected_values) - self._ui_parameter_used_names
)
if unused_names:
logger.warning(
"Injected UI parameter value(s) were not used: {}.",
", ".join(unused_names),
)

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)

original_open = getattr(cls, "open", None)
if original_open is None:
return

if getattr(original_open, "__texera_ui_params_wrapped__", False):
return

@functools.wraps(original_open)
def wrapped_open(self, *args, **kwargs):
"""Apply injected UI parameters once before the outermost open()."""
if getattr(self, "_ui_parameter_open_in_progress", False):
return original_open(self, *args, **kwargs)

self._ui_parameter_open_in_progress = True
try:
self._texera_apply_injected_ui_parameters()
result = original_open(self, *args, **kwargs)
self._warn_unused_injected_ui_parameters()
return result
finally:
self._ui_parameter_open_in_progress = False

setattr(wrapped_open, "__texera_ui_params_wrapped__", True)
cls.open = wrapped_open

def UiParameter(
self, name: str, attr_type: Optional[AttributeType] = None, **kwargs: Any
) -> _UiParameterValue:
"""
Return the current UI parameter value parsed as attr_type.

Re-reading the same name with the same type is idempotent. Reusing a
name with a different type is rejected because the parsed value would be
ambiguous.
"""
if "type" in kwargs:
if attr_type is not None:
raise TypeError("UiParameter.type was provided multiple times.")
attr_type = kwargs.pop("type")

if kwargs:
unexpected_arguments = ", ".join(sorted(kwargs))
raise TypeError(
f"UiParameter got unexpected keyword argument(s): "
f"{unexpected_arguments}."
)

if attr_type is None:
raise TypeError("UiParameter.type is required.")

if not isinstance(attr_type, AttributeType):
raise TypeError(
f"UiParameter.type must be an AttributeType, got {attr_type!r}."
)

self._ensure_ui_parameter_state()
existing_type = self._ui_parameter_name_types.get(name)
if existing_type is not None and existing_type != attr_type:
raise ValueError(
f"Duplicate UiParameter name '{name}' with conflicting types: "
f"{existing_type.name} vs {attr_type.name}."
)

self._ui_parameter_name_types[name] = attr_type
if name in self._ui_parameter_injected_values:
self._ui_parameter_used_names.add(name)
raw_value = self._ui_parameter_injected_values[name]
else:
logger.warning(
"No injected UI parameter value found for name '{}'.",
name,
)
raw_value = None

return _UiParameterValue(
name=name,
type=attr_type,
value=self._parse(raw_value, attr_type),
)

@staticmethod
def _parse(value: Any, attr_type: AttributeType) -> Any:
if attr_type in _UiParameterSupport._unsupported_ui_parameter_types:
raise ValueError(
f"UiParameter does not support {attr_type.name} values. "
"Use a supported type instead."
)

parser = FROM_STRING_PARSER_MAPPING.get(attr_type)
if parser is None:
raise TypeError(
f"UiParameter.type {attr_type!r} is not supported for parsing."
)

if value is None:
return None

try:
return parser(value)
except Exception as e:
raise ValueError(
f"Failed to parse UiParameter value {value!r} as {attr_type.name}. "
f"Please provide a valid {attr_type.name.lower()} value."
) from e


class UDFOperatorV2(_UiParameterSupport, TupleOperatorV2):
"""
Base class for tuple-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -65,7 +223,7 @@ def close(self) -> None:
pass


class UDFSourceOperator(SourceOperator):
class UDFSourceOperator(_UiParameterSupport, SourceOperator):
def open(self) -> None:
"""
Open a context of the operator. Usually can be used for loading/initiating some
Expand All @@ -90,7 +248,7 @@ def close(self) -> None:
pass


class UDFTableOperator(TableOperator):
class UDFTableOperator(_UiParameterSupport, TableOperator):
"""
Base class for table-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down Expand Up @@ -123,7 +281,7 @@ def close(self) -> None:
pass


class UDFBatchOperator(BatchOperator):
class UDFBatchOperator(_UiParameterSupport, BatchOperator):
"""
Base class for batch-oriented user-defined operators. A concrete implementation must
be provided upon using.
Expand Down
Loading
Loading