Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
- Improved formula validation: Consistent error messages for invalid formulas and conventional span semantics.
3 changes: 0 additions & 3 deletions src/frequenz/sdk/timeseries/formulas/_base_ast_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
class AstNode(abc.ABC, Generic[QuantityT]):
"""An abstract syntax tree node representing a formula expression."""

span: tuple[int, int] | None = None
"""The span (start, end) of the expression in the input string."""

@abc.abstractmethod
async def evaluate(self) -> Sample[QuantityT] | QuantityT | None:
"""Evaluate the expression and return its numerical value."""
Expand Down
53 changes: 53 additions & 0 deletions src/frequenz/sdk/timeseries/formulas/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Exception classes for formulas parser and evaluator."""


class FormulaError(Exception):
"""Base class for exceptions in this module."""


class FormulaSyntaxError(FormulaError):
"""Exception raised for syntax errors in the formula."""

formula: str
span: tuple[int, int]
message: str

def __init__(
self, *, formula: str, span: tuple[int, int] | None, message: str
) -> None:
"""Initialize this instance."""
self._formula = formula
self._span = span
self._message = message

def __str__(self) -> str:
"""Return a string representation of the error.

The span (if present) will be used to highlight the error location.
For long formulas, the beginning or end will be truncated as needed.
"""
if self._span is None:
return self._message

formula_line = self._formula
span_padding = " " * self._span[0]
span_highlight = "^" * (self._span[1] - self._span[0])
error_line = f"{span_padding}{span_highlight} {self._message}"

char_limit: int = 80

if len(error_line) > char_limit:
# Remove characters from the left to fit error line within char limit
num_chars_to_truncate = len(error_line) - char_limit
error_line = error_line[num_chars_to_truncate:]
# Shift formula line by the same amount and insert leading ellipsis
formula_line = f"... {formula_line[num_chars_to_truncate+4:]}"

if len(formula_line) > char_limit:
# Truncate the formula line, insert ellipsis
formula_line = f"{formula_line[:char_limit-4]} ..."

return f"{formula_line}\n{error_line}"
21 changes: 8 additions & 13 deletions src/frequenz/sdk/timeseries/formulas/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,14 @@ async def unsubscribe(self) -> None:
)

@classmethod
def from_string(
cls, name: str, params: list[AstNode[QuantityT]]
) -> Function[QuantityT]:
"""Create a function instance from its name."""
match name.upper():
case "COALESCE":
return Coalesce(params)
case "MAX":
return Max(params)
case "MIN":
return Min(params)
case _:
raise ValueError(f"Unknown function name: {name}")
def function_class_by_name(cls, name: str) -> type[Function[QuantityT]] | None:
"""Return the function class corresponding to the given name."""
known_functions: dict[str, type[Function[QuantityT]]] = {
"COALESCE": Coalesce,
"MAX": Max,
"MIN": Min,
}
return known_functions.get(name.upper())


@dataclass
Expand Down
38 changes: 22 additions & 16 deletions src/frequenz/sdk/timeseries/formulas/_lexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing_extensions import override

from . import _token
from ._exceptions import FormulaSyntaxError
from ._peekable import Peekable


Expand Down Expand Up @@ -74,53 +75,58 @@ def __next__(self) -> _token.Token: # pylint: disable=too-many-branches
_ = next(self._iter) # consume '#'
comp_id = self._read_integer()
if not comp_id:
raise ValueError(f"Expected integer after '#' at position {pos}")
end_pos = pos + len(comp_id)
raise FormulaSyntaxError(
formula=self._formula,
span=(pos + 1, pos + 2),
message="Expected integer",
)
end_pos = pos + len(comp_id) + 1 # account for '#'
return _token.Component(
span=(
pos + 1,
end_pos + 1, # account for '#'
),
span=(pos, end_pos),
id=comp_id,
value=self._formula[pos:end_pos],
)

if char == "+":
_, char = next(self._iter) # consume operator
return _token.Plus(span=(pos + 1, pos + 1), value=char)
return _token.Plus(span=(pos, pos + 1), value=char)

if char == "-":
_, char = next(self._iter)
return _token.Minus(span=(pos + 1, pos + 1), value=char)
return _token.Minus(span=(pos, pos + 1), value=char)

if char == "*":
_, char = next(self._iter)
return _token.Mul(span=(pos + 1, pos + 1), value=char)
return _token.Mul(span=(pos, pos + 1), value=char)

if char == "/":
_, char = next(self._iter)
return _token.Div(span=(pos + 1, pos + 1), value=char)
return _token.Div(span=(pos, pos + 1), value=char)

if char == "(":
_, char = next(self._iter)
return _token.OpenParen(span=(pos + 1, pos + 1), value=char)
return _token.OpenParen(span=(pos, pos + 1), value=char)

if char == ")":
_, char = next(self._iter)
return _token.CloseParen(span=(pos + 1, pos + 1), value=char)
return _token.CloseParen(span=(pos, pos + 1), value=char)

if char == ",":
_, char = next(self._iter)
return _token.Comma(span=(pos + 1, pos + 1), value=char)
return _token.Comma(span=(pos, pos + 1), value=char)

if char.isdigit():
num = self._read_number()
end_pos = pos + len(num)
return _token.Number(span=(pos + 1, end_pos), value=num)
return _token.Number(span=(pos, end_pos), value=num)

if char.isalpha():
symbol = self._read_symbol()
end_pos = pos + len(symbol)
return _token.Symbol(span=(pos + 1, end_pos), value=symbol)
return _token.Symbol(span=(pos, end_pos), value=symbol)

raise ValueError(f"Unexpected character '{char}' at position {pos}")
raise FormulaSyntaxError(
formula=self._formula,
span=(pos, pos + 1),
message="Unexpected character",
)
103 changes: 72 additions & 31 deletions src/frequenz/sdk/timeseries/formulas/_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from . import _ast, _token
from ._base_ast_node import AstNode
from ._exceptions import FormulaSyntaxError
from ._formula import Formula
from ._functions import FunCall, Function
from ._lexer import Lexer
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(
):
"""Initialize the parser."""
self._name: str = name
self._formula: str = formula
self._lexer: Peekable[_token.Token] = Peekable(Lexer(formula))
self._telemetry_fetcher: ResampledStreamFetcher = telemetry_fetcher
self._create_method: Callable[[float], QuantityT] = create_method
Expand All @@ -80,14 +82,16 @@ def _parse_term(self) -> AstNode[QuantityT] | None:
next_factor = self._parse_factor()

if next_factor is None:
raise ValueError(
f"Expected factor after operator at span: {token.span}"
raise FormulaSyntaxError(
formula=self._formula,
span=(token.span[1], token.span[1] + 1),
message="Expected expression",
)

if isinstance(token, _token.Plus):
factor = _ast.Add(span=token.span, left=factor, right=next_factor)
factor = _ast.Add(left=factor, right=next_factor)
elif isinstance(token, _token.Minus):
factor = _ast.Sub(span=token.span, left=factor, right=next_factor)
factor = _ast.Sub(left=factor, right=next_factor)

token = self._lexer.peek()

Expand All @@ -104,12 +108,16 @@ def _parse_factor(self) -> AstNode[QuantityT] | None:
token = next(self._lexer)
next_unary = self._parse_unary()
if next_unary is None:
raise ValueError(f"Expected unary after operator at span: {token.span}")
raise FormulaSyntaxError(
formula=self._formula,
span=(token.span[1], token.span[1] + 1),
message="Expected expression",
)

if isinstance(token, _token.Mul):
unary = _ast.Mul(span=token.span, left=unary, right=next_unary)
unary = _ast.Mul(left=unary, right=next_unary)
elif isinstance(token, _token.Div):
unary = _ast.Div(span=token.span, left=unary, right=next_unary)
unary = _ast.Div(left=unary, right=next_unary)

token = self._lexer.peek()

Expand All @@ -121,12 +129,14 @@ def _parse_unary(self) -> AstNode[QuantityT] | None:
token = next(self._lexer)
primary: AstNode[QuantityT] | None = self._parse_primary()
if primary is None:
raise ValueError(
f"Expected primary expression after unary '-' at position {token.span}"
raise FormulaSyntaxError(
formula=self._formula,
span=(token.span[1], token.span[1] + 1),
message="Expected expression",
)

zero_const = _ast.Constant(span=token.span, value=self._create_method(0.0))
return _ast.Sub(span=token.span, left=zero_const, right=primary)
zero_const = _ast.Constant(value=self._create_method(0.0))
return _ast.Sub(left=zero_const, right=primary)

return self._parse_primary()

Expand All @@ -136,50 +146,80 @@ def _parse_bracketed(self) -> AstNode[QuantityT] | None:

expr: AstNode[QuantityT] | None = self._parse_term()
if expr is None:
raise ValueError(f"Expected expression after '(' at position {oparen.span}")
raise FormulaSyntaxError(
formula=self._formula,
span=(oparen.span[1], oparen.span[1] + 1),
message="Expected expression",
)

token: _token.Token | None = self._lexer.peek()
if token is None or not isinstance(token, _token.CloseParen):
raise ValueError(f"Expected ')' after expression at position {expr.span}")
raise FormulaSyntaxError(
formula=self._formula,
span=oparen.span,
message="Unmatched parenthesis",
)

_ = next(self._lexer) # consume ')'

return expr

def _parse_function_call(self) -> AstNode[QuantityT] | None:
fn_name: _token.Token = next(self._lexer)
function_class: type[Function[QuantityT]] | None = (
Function.function_class_by_name(fn_name.value)
)

if function_class is None:
raise FormulaSyntaxError(
formula=self._formula,
span=fn_name.span,
message="Unknown function name",
)

params: list[AstNode[QuantityT]] = []

token: _token.Token | None = self._lexer.peek()
if token is None or not isinstance(token, _token.OpenParen):
raise ValueError(
f"Expected '(' after function name at position {fn_name.span}"
raise FormulaSyntaxError(
formula=self._formula,
span=(fn_name.span[1], fn_name.span[1] + 1),
message="Expected '('",
)
oparen = next(self._lexer) # consume '('

_ = next(self._lexer) # consume '('
while True:
param = self._parse_term()
if param is None:
raise ValueError(
f"Expected argument in function call at position {fn_name.span}"
raise FormulaSyntaxError(
formula=self._formula,
span=(token.span[1], token.span[1] + 1),
message="Expected argument",
)
params.append(param)

token = self._lexer.peek()
if token is not None and isinstance(token, _token.Comma):
if token is None:
raise FormulaSyntaxError(
formula=self._formula,
span=oparen.span,
message="Unmatched parenthesis",
)

if isinstance(token, _token.Comma):
_ = next(self._lexer) # consume ','
continue
if token is not None and isinstance(token, _token.CloseParen):
if isinstance(token, _token.CloseParen):
_ = next(self._lexer) # consume ')'
break
raise ValueError(
f"Expected ',' or ')' in function call at position {fn_name.span}"

raise FormulaSyntaxError(
formula=self._formula,
span=token.span,
message="Expected ',' or ')'",
)

return FunCall(
span=fn_name.span,
function=Function.from_string(fn_name.value, params),
)
return FunCall(function=function_class(params))

def _parse_primary(self) -> AstNode[QuantityT] | None:
token: _token.Token | None = self._lexer.peek()
Expand All @@ -194,7 +234,6 @@ def make_component_stream_fetcher(
if isinstance(token, _token.Component):
_ = next(self._lexer) # consume token
comp = _ast.TelemetryStream(
span=token.span,
source=f"#{token.id}",
metric_fetcher=make_component_stream_fetcher(
self._telemetry_fetcher, ComponentId(int(token.id))
Expand All @@ -205,9 +244,7 @@ def make_component_stream_fetcher(

if isinstance(token, _token.Number):
_ = next(self._lexer)
return _ast.Constant(
span=token.span, value=self._create_method(float(token.value))
)
return _ast.Constant(value=self._create_method(float(token.value)))

if isinstance(token, _token.OpenParen):
return self._parse_bracketed()
Expand All @@ -220,7 +257,11 @@ def make_component_stream_fetcher(
def parse(self) -> Formula[QuantityT]:
expr = self._parse_term()
if expr is None:
raise ValueError("Empty formula.")
raise FormulaSyntaxError(
formula=self._formula,
span=None,
message="Empty formula",
)
return Formula(
name=self._name,
root=expr,
Expand Down
Loading