From 65c766a0dcefe30934bf953b8ce075634259cb58 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 15:59:31 +0200 Subject: [PATCH 1/7] added channel mapping resolution table to gold layer. --- .../metadata/time_series_expression.py | 44 +++++- .../analyze/query/query_builder.py | 36 +---- .../query/solvers/key_value_store_solver.py | 41 ++++- .../analyze/query/solvers/query_solver.py | 8 +- src/impulse_reporting/core/report.py | 47 +++++- .../meta/container_dimensions.py | 88 +++++++++++ .../persist/report_storage.py | 83 ++++++++++ .../delta_solver_column_mapping_test.py | 9 +- .../solvers/key_value_store_alias_test.py | 83 +++++++++- .../key_value_store_solver_wide_only_test.py | 5 +- ...annel_mapping_resolution_dimension_test.py | 146 ++++++++++++++++++ 11 files changed, 540 insertions(+), 50 deletions(-) create mode 100644 tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py diff --git a/src/impulse_query_engine/analyze/metadata/time_series_expression.py b/src/impulse_query_engine/analyze/metadata/time_series_expression.py index 7df22c1..ee49a70 100644 --- a/src/impulse_query_engine/analyze/metadata/time_series_expression.py +++ b/src/impulse_query_engine/analyze/metadata/time_series_expression.py @@ -3,7 +3,7 @@ import abc import operator import zlib -from collections.abc import Callable +from collections.abc import Callable, Iterable from typing import TYPE_CHECKING, Any import pyspark.sql.types as T @@ -197,6 +197,48 @@ def get_selectors(self) -> list["TimeSeriesSelector"]: """ pass + @staticmethod + def collect_selectors( + expressions: Iterable[Any], + uses_alias: bool | None = None, + ) -> list["TimeSeriesSelector"]: + """Collect deduplicated leaf selectors from a list of expressions. + + Iterates each item, skips anything that isn't a + :class:`TimeSeriesExpression`, walks ``get_selectors()``, applies + an optional ``uses_alias`` filter, and deduplicates by + ``selector_id`` preserving discovery order. + + Parameters + ---------- + expressions : Iterable[Any] + Items to walk; non-``TimeSeriesExpression`` entries are + silently skipped (e.g. the ``selections`` list on a + ``QueryBuilder`` may carry other selector kinds). + uses_alias : bool or None, optional + When ``True``, keep only alias selectors; when ``False``, + keep only direct selectors; when ``None`` (default), keep + all. + + Returns + ------- + list of TimeSeriesSelector + Deduplicated selectors in discovery order. + """ + selectors: list["TimeSeriesSelector"] = [] + seen_ids: set = set() + for expression in expressions: + if not isinstance(expression, TimeSeriesExpression): + continue + for selector in expression.get_selectors(): + if uses_alias is not None and selector.uses_alias != uses_alias: + continue + if selector.selector_id in seen_ids: + continue + seen_ids.add(selector.selector_id) + selectors.append(selector) + return selectors + @abc.abstractmethod def __str__(self) -> str: """ diff --git a/src/impulse_query_engine/analyze/query/query_builder.py b/src/impulse_query_engine/analyze/query/query_builder.py index 24f5d46..ba54910 100644 --- a/src/impulse_query_engine/analyze/query/query_builder.py +++ b/src/impulse_query_engine/analyze/query/query_builder.py @@ -174,34 +174,6 @@ def select(self, *args) -> Self: self.selections = list(args) return self - def _collect_time_series_selectors(self, uses_alias=None) -> list[TimeSeriesSelector]: - """Collect deduplicated leaf selectors from this query's selections. - - Parameters - ---------- - uses_alias : bool or None, optional - When ``True``, keep only alias selectors; when ``False``, keep - only direct selectors; when ``None`` (default), keep all. - - Returns - ------- - list of TimeSeriesSelector - Deduplicated selectors in discovery order. - """ - selectors = [] - seen_selector_ids = set() - for expression in self.selections: - if not isinstance(expression, TimeSeriesExpression): - continue - for selector in expression.get_selectors(): - if uses_alias is not None and selector.uses_alias != uses_alias: - continue - if selector.selector_id in seen_selector_ids: - continue - seen_selector_ids.add(selector.selector_id) - selectors.append(selector) - return selectors - def _determine_result_objects_dtypes(self, default_dtype: T = T.DoubleType()): """ Determine result objects and their data types for the selections. @@ -261,8 +233,12 @@ def solve( ) = self._determine_result_objects_dtypes() # extract selectors upfront - direct_selectors = self._collect_time_series_selectors(uses_alias=False) - aliased_selectors = self._collect_time_series_selectors(uses_alias=True) + direct_selectors = TimeSeriesExpression.collect_selectors( + self.selections, uses_alias=False + ) + aliased_selectors = TimeSeriesExpression.collect_selectors( + self.selections, uses_alias=True + ) # create Query tags_df = solver.filter_container_tags(spark, self) diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index d5e5e5c..7c5eba1 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -389,10 +389,15 @@ def filter_aliased_channel_metrics( Returns ------- pyspark.sql.DataFrame - DataFrame with ``(container_id, channel_id, selector_ids)`` - where ``selector_ids`` is an array column. When unit conversion - is active (see above), also carries ``source_unit`` and - ``target_unit`` columns. + DataFrame with + ``(container_id, channel_id, , + channel_alias, alias_priority, selector_ids)`` where + ``selector_ids`` is an array column. The metrics-side join key + columns come from ``effective_alias_join_keys`` (default: + ``channel_name``, ``data_key``) and are deduplicated in case the + same physical column appears on both sides of a join-key tuple. + When unit conversion is active (see above), also carries + ``source_unit`` and ``target_unit`` columns. """ container_id_col = self.config.container_id_col channel_id_col = self.config.channel_id_col @@ -488,7 +493,17 @@ def filter_aliased_channel_metrics( resolved = resolved.withColumn( "selector_ids", F.array(self._build_selector_id_expr(selectors)) ) - out_cols = [container_id_col, channel_id_col, "selector_ids"] + join_key_metrics_cols = list( + dict.fromkeys(metrics_col for _, metrics_col in join_keys) + ) + out_cols = [ + container_id_col, + channel_id_col, + *join_key_metrics_cols, + channel_alias_col, + alias_priority_col, + "selector_ids", + ] if has_unit_cols: out_cols.extend([source_unit_col, target_unit_col]) return resolved.select(*out_cols) @@ -543,8 +558,22 @@ def resolve_channel_selections( and target_unit_col in aliased_channel_metrics_df.columns ) + # ``filter_aliased_channel_metrics`` emits extra columns + # (metrics-side join keys, channel_alias, alias_priority) for the + # channel mapping resolution dimension; the solve pipeline only + # consumes (container_id, channel_id, selector_ids[, source_unit, + # target_unit]) and unionByName requires matching schemas. + aliased_solve_cols = [ + self.config.container_id_col, + self.config.channel_id_col, + "selector_ids", + ] + if has_unit_cols: + aliased_solve_cols.extend([source_unit_col, target_unit_col]) + aliased_for_union = aliased_channel_metrics_df.select(*aliased_solve_cols) + merged = channel_metrics_df.unionByName( - aliased_channel_metrics_df, allowMissingColumns=has_unit_cols + aliased_for_union, allowMissingColumns=has_unit_cols ) agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")] diff --git a/src/impulse_query_engine/analyze/query/solvers/query_solver.py b/src/impulse_query_engine/analyze/query/solvers/query_solver.py index 1cdce84..22091a1 100644 --- a/src/impulse_query_engine/analyze/query/solvers/query_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/query_solver.py @@ -238,8 +238,12 @@ def filter_aliased_channel_metrics( Returns ------- pyspark.sql.DataFrame - DataFrame with ``(container_id, channel_id, selector_ids)`` - where ``selector_ids`` is an array column. + DataFrame with + ``(container_id, channel_id, , + channel_alias, alias_priority, selector_ids)`` where + ``selector_ids`` is an array column. Implementations that + support unit conversion additionally include ``source_unit`` + and ``target_unit`` columns. """ raise NotImplementedError( f"{self.__class__.__name__} does not support aliased channel resolution" diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py index 7b73278..eff6619 100644 --- a/src/impulse_reporting/core/report.py +++ b/src/impulse_reporting/core/report.py @@ -38,7 +38,10 @@ from impulse_reporting.incremental.definition_hash_comparator import ( DefinitionHashComparator, ) -from impulse_reporting.meta.container_dimensions import ContainerDimension +from impulse_reporting.meta.container_dimensions import ( + ChannelMappingResolutionDimension, + ContainerDimension, +) from impulse_reporting.persist.report_storage import ( ReportEntityTransformer, Sink, @@ -96,6 +99,7 @@ def __init__( self.aggregation_dfs = {} self.aggregation_metadata_dfs = {} self.container_dimension_df = None + self.channel_mapping_resolution_dimension_df = None self._is_incremental = None if config: @@ -591,6 +595,12 @@ def _persist_full(self): uri = writer.get_output_uri() writer.write(self.container_dimension_df, uri=uri) + # persist channel mapping resolution dimension + if self.channel_mapping_resolution_dimension_df is not None: + writer = storage_factory.create_channel_mapping_resolution_dimension_writer() + uri = writer.get_output_uri() + writer.write(self.channel_mapping_resolution_dimension_df, uri=uri) + @telemetry_logger("report", "determine_report") def _persist_incremental( self, @@ -738,6 +748,25 @@ def _persist_incremental( df_enriched = self.container_dimension_df.transform(transformer.add_meta_information) self.sink.upsert(df_enriched, uri, ["container_id"]) + # Persist channel mapping resolution dimension + # (upsert by container_id, channel_id, channel_alias) + if self.channel_mapping_resolution_dimension_df is not None: + writer = storage_factory.create_channel_mapping_resolution_dimension_writer() + uri = writer.get_output_uri() + df_enriched = self.channel_mapping_resolution_dimension_df.transform( + transformer.add_meta_information + ) + solver_cfg = self.solver.config + self.sink.upsert( + df_enriched, + uri, + [ + solver_cfg.container_id_col, + solver_cfg.channel_id_col, + solver_cfg.channel_alias_col, + ], + ) + def _transform_for_persistence( self, df: DataFrame, @@ -1001,6 +1030,22 @@ def determine_report(self, is_incremental: bool = None): pre_filtered_containers_df=pre_filtered_containers_df, ) + # Determine channel mapping resolution dimension + aliased_selectors = TimeSeriesExpression.collect_selectors( + all_changed_expressions + all_unchanged_expressions, + uses_alias=True, + ) + self.channel_mapping_resolution_dimension_df = ( + ChannelMappingResolutionDimension.get_dimension( + spark=self.spark, + query=self.query, + solver=self.solver, + config=self.config, + aliased_selectors=aliased_selectors, + pre_filtered_containers_df=pre_filtered_containers_df, + ) + ) + def _resolve_is_incremental(self, is_incremental: bool = None) -> bool: """ Resolve the processing mode considering signature, config, and gold layer. diff --git a/src/impulse_reporting/meta/container_dimensions.py b/src/impulse_reporting/meta/container_dimensions.py index 7e254fe..18fb1bd 100644 --- a/src/impulse_reporting/meta/container_dimensions.py +++ b/src/impulse_reporting/meta/container_dimensions.py @@ -3,6 +3,9 @@ import pyspark.sql.functions as F from pyspark.sql import DataFrame, SparkSession +from impulse_query_engine.analyze.metadata.time_series_expression import ( + TimeSeriesSelector, +) from impulse_query_engine.analyze.query.query_builder import QueryBuilder from impulse_query_engine.analyze.query.solvers.query_solver import QuerySolver from impulse_reporting.config.config_parser import ImpulseConfig @@ -102,3 +105,88 @@ def _(df: DataFrame) -> DataFrame: return df.withColumn("config_hash", F.hash(F.lit(config_hash))) return _ + + +class ChannelMappingResolutionDimension: + """Helper class to handle the channel mapping resolution dimension. + + Persists the result of + :meth:`QuerySolver.filter_aliased_channel_metrics` so downstream BI + consumers can join on ``(container_id, channel_id, channel_alias)`` + to recover the physical join keys, alias priority, and resolved unit + pair. + """ + + @staticmethod + def get_dimension( + spark: SparkSession, + query: QueryBuilder, + solver: QuerySolver, + config: ImpulseConfig, + aliased_selectors: list[TimeSeriesSelector], + pre_filtered_containers_df: DataFrame = None, + ) -> DataFrame | None: + """ + Compute the channel mapping resolution dimension for the report. + + Returns ``None`` when the dimension cannot be built — either the + solver does not override ``filter_aliased_channel_metrics`` (e.g. + ``DeltaSolver``, ``BlobSolver``, ``InMemorySolver``) or the report + has no aliased selectors. In both cases the persist step should be + a no-op. + + When buildable, runs the solver's container-side filter pipeline + (``filter_container_tags`` → ``filter_container_metrics``) so the + result honors ``pre_filtered_containers_df`` for incremental mode, + then calls ``filter_aliased_channel_metrics`` with the aliased + selectors collected from the report's events and aggregations. + The internal ``selector_ids`` column is dropped since it is a + runtime artifact and not part of the dimension contract. + + Parameters + ---------- + spark : SparkSession + Spark session for data processing. + query : QueryBuilder + The query builder used for the report. + solver : QuerySolver + The solver instance to use for query execution. + config : ImpulseConfig + The configuration object; used to attach ``config_hash``. + aliased_selectors : list[TimeSeriesSelector] + Aliased selectors (``uses_alias=True``) collected from the + report's events and aggregations. May be empty. + pre_filtered_containers_df : DataFrame, optional + Pre-filtered containers for incremental processing. + + Returns + ------- + DataFrame or None + DataFrame with columns + ``(container_id, channel_id, , + channel_alias, alias_priority[, source_unit, target_unit], + config_hash)``, or ``None`` if the dimension is not + applicable. + """ + if ( + type(solver).filter_aliased_channel_metrics + is QuerySolver.filter_aliased_channel_metrics + ): + return None + + if not aliased_selectors: + return None + + container_tags_df = solver.filter_container_tags(spark, query) + container_df = solver.filter_container_metrics( + spark, query, container_tags_df, pre_filtered_containers_df + ) + + resolved = solver.filter_aliased_channel_metrics( + spark, query.db, container_df, aliased_selectors + ) + + if "selector_ids" in resolved.columns: + resolved = resolved.drop("selector_ids") + + return resolved.transform(ContainerDimension._add_config_hash(config)) diff --git a/src/impulse_reporting/persist/report_storage.py b/src/impulse_reporting/persist/report_storage.py index 9181012..fc4fb1a 100644 --- a/src/impulse_reporting/persist/report_storage.py +++ b/src/impulse_reporting/persist/report_storage.py @@ -71,6 +71,18 @@ def get_output_uri_measurement_dimensions_table(self) -> str: """ pass + @abstractmethod + def get_output_uri_channel_mapping_resolution_dimension_table(self) -> str: + """ + Get the output URI for the channel mapping resolution dimension table. + + Returns + ------- + str + The output URI for the channel mapping resolution dimension table. + """ + pass + @dataclass() class UnitySinkConfig(SinkConfig): @@ -148,6 +160,29 @@ def get_output_uri_measurement_dimensions_table(self) -> str: uri = f"{self.catalog_name}.{self.schema_name}.measurement_dimension" return uri + def get_output_uri_channel_mapping_resolution_dimension_table(self) -> str: + """ + Get the output URI for the channel mapping resolution dimension table + in Unity Catalog format. + + Returns + ------- + str + The Unity Catalog URI for the channel mapping resolution dimension + table. + """ + if self.table_prefix: + uri = ( + f"{self.catalog_name}.{self.schema_name}." + f"{self.table_prefix}_channel_mapping_resolution_dimension" + ) + else: + uri = ( + f"{self.catalog_name}.{self.schema_name}." + "channel_mapping_resolution_dimension" + ) + return uri + class Sink(ABC): """ @@ -600,6 +635,39 @@ def get_output_uri(self) -> str: return self.sink.config.get_output_uri_measurement_dimensions_table() +class ChannelMappingResolutionDimensionWriter: + """Writer for the channel mapping resolution dimension.""" + + def __init__(self, sink: Sink, transformer: ReportEntityTransformer): + self.sink = sink + self.transformer = transformer + + def write(self, df: DataFrame, uri: str): + """ + Write channel mapping resolution dimension to the sink. + + Parameters + ---------- + df : DataFrame + DataFrame containing the channel mapping resolution dimension. + uri : str + The destination URI. + """ + df_enriched = df.transform(self.transformer.add_meta_information) + self.sink.store(df_enriched, uri) + + def get_output_uri(self) -> str: + """ + Get the output URI for the channel mapping resolution dimension table. + + Returns + ------- + str + The output URI for the channel mapping resolution dimension table. + """ + return self.sink.config.get_output_uri_channel_mapping_resolution_dimension_table() + + class WriterFactory: """ Factory class to create report entity writers. @@ -654,3 +722,18 @@ def create_container_dimension_writer(self) -> ContainerDimensionWriter: A writer configured for measurement dimensions. """ return ContainerDimensionWriter(self.sink, self._default_transformer) + + def create_channel_mapping_resolution_dimension_writer( + self, + ) -> ChannelMappingResolutionDimensionWriter: + """ + Create a writer for the channel mapping resolution dimension. + + Returns + ------- + ChannelMappingResolutionDimensionWriter + A writer configured for the channel mapping resolution dimension. + """ + return ChannelMappingResolutionDimensionWriter( + self.sink, self._default_transformer + ) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py index c1c7e00..5a90931 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py @@ -25,6 +25,9 @@ from pyspark.sql import SparkSession from impulse_query_engine.analyze.metadata.tag_expression import TagSelector +from impulse_query_engine.analyze.metadata.time_series_expression import ( + TimeSeriesExpression, +) from impulse_query_engine.analyze.query.solvers.delta_solver import DeltaSolver from impulse_query_engine.analyze.query.solvers.solver_config import ( SolverConfig, @@ -351,7 +354,7 @@ def test_filter_channel_tags_uses_renames(self, spark, db_custom_channel_tags): query.select(query.channel(channel_name="Engine RPM")) tags_df = solver.filter_container_tags(spark, query) container_df = solver.filter_container_metrics(spark, query, tags_df) - selectors = query._collect_time_series_selectors(uses_alias=False) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=False) result = solver.filter_channel_tags(spark, db_custom_channel_tags, container_df, selectors) assert {"container_id", "channel_id", "selector_id"}.issubset(set(result.columns)) # Three channels named "Engine RPM" across containers 1, 2, 3 @@ -364,7 +367,7 @@ def test_default_config_fails_on_renamed_channel_tags(self, spark, db_custom_cha query.select(query.channel(channel_name="Engine RPM")) tags_df = solver.filter_container_tags(spark, query) container_df = solver.filter_container_metrics(spark, query, tags_df) - selectors = query._collect_time_series_selectors(uses_alias=False) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=False) with pytest.raises(AnalysisException): solver.filter_channel_tags( spark, db_custom_channel_tags, container_df, selectors @@ -398,7 +401,7 @@ def test_filter_channel_metrics_uses_renames(self, spark, db_custom_channel_metr query.select(query.channel(channel_name="Engine RPM")) tags_df = solver.filter_container_tags(spark, query) container_df = solver.filter_container_metrics(spark, query, tags_df) - selectors = query._collect_time_series_selectors(uses_alias=False) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=False) ch_tags_df = solver.filter_channel_tags( spark, db_custom_channel_metrics, container_df, selectors ) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py index e229b4f..4c76378 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py @@ -7,6 +7,9 @@ import pytest from pyspark.sql import SparkSession +from impulse_query_engine.analyze.metadata.time_series_expression import ( + TimeSeriesExpression, +) from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( KeyValueStoreSolver, ) @@ -40,7 +43,7 @@ def test_no_aliased_selections_returns_empty( query.select(query.channel(channel_name="Engine RPM", data_key="TM")) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) assert result.columns == ["container_id", "channel_id", "selector_ids"] @@ -62,7 +65,7 @@ def test_alias_resolves_to_correct_channels( query.select(engine_speed) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) rows = { @@ -89,7 +92,7 @@ def test_alias_scoped_by_project_id( query.select(query.channel_with_alias(channel_alias="engine_speed")) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) assert result.count() == 0 @@ -111,7 +114,7 @@ def test_alias_scoped_by_toolbox_id( query.select(query.channel_with_alias(channel_alias="engine_speed")) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) assert result.count() == 0 @@ -132,12 +135,80 @@ def test_selector_id_consistent_for_same_expression( query.select(engine_speed) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) selector_ids = {row.selector_ids[0] for row in result.collect()} assert selector_ids == {engine_speed.selector_id} + def test_output_columns_default_join_keys( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + """Default join keys → output carries channel_name, data_key, channel_alias, priority.""" + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), + ), + ) + query = key_value_store_alias_db.query + query.select(query.channel_with_alias(channel_alias="engine_speed")) + + container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) + result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) + + assert result.columns == [ + "container_id", + "channel_id", + "channel_name", + "data_key", + "channel_alias", + "priority", + "selector_ids", + ] + rows = result.collect() + assert len(rows) > 0 + row = rows[0] + assert row.channel_alias == "engine_speed" + assert row.channel_name in {"Engine RPM", "EngSpd"} + assert row.data_key in {"TM", "ProjSpecREC_10Hz"} + + def test_output_columns_single_join_key( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + """Single-column join_keys override → only that metrics column surfaces.""" + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + query.select(query.channel_with_alias(channel_alias="engine_speed")) + + container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) + result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) + + assert result.columns == [ + "container_id", + "channel_id", + "channel_name", + "channel_alias", + "priority", + "selector_ids", + ] + def test_multiple_aliases(self, spark: SparkSession, key_value_store_alias_db: MeasurementDB): solver = KeyValueStoreSolver( spark, @@ -153,7 +224,7 @@ def test_multiple_aliases(self, spark: SparkSession, key_value_store_alias_db: M query.select(engine_speed, vehicle_speed) container_df = _filtered_containers(spark, key_value_store_alias_db, solver, query) - selectors = query._collect_time_series_selectors(uses_alias=True) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=True) result = solver.filter_aliased_channel_metrics(spark, query.db, container_df, selectors) selector_ids = {row.selector_ids[0] for row in result.collect()} diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py index f4d98b4..257836a 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py @@ -14,6 +14,9 @@ import pandas as pd from pyspark.sql import SparkSession +from impulse_query_engine.analyze.metadata.time_series_expression import ( + TimeSeriesExpression, +) from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( KeyValueStoreSolver, KVSTimeSeriesCache, @@ -202,7 +205,7 @@ def test_filter_channel_metrics_uses_config_cols( ) tags_df = solver.filter_container_tags(spark, query) container_df = solver.filter_container_metrics(spark, query, tags_df) - selectors = query._collect_time_series_selectors(uses_alias=False) + selectors = TimeSeriesExpression.collect_selectors(query.selections, uses_alias=False) result = solver.filter_channel_metrics(spark, basic_narrow_db, container_df, selectors) assert "container_id" in result.columns assert "channel_id" in result.columns diff --git a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py new file mode 100644 index 0000000..e6c4c45 --- /dev/null +++ b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py @@ -0,0 +1,146 @@ +# pylint: disable=missing-function-docstring + +from pyspark.sql import SparkSession + +from impulse_query_engine.analyze.query.solvers.in_memory_solver import InMemorySolver +from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( + KeyValueStoreSolver, +) +from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + SolverConfig, + TableConfig, +) +from impulse_query_engine.measurement_db import MeasurementDB +from impulse_reporting.config.config_parser import ImpulseConfig +from impulse_reporting.meta.container_dimensions import ChannelMappingResolutionDimension + + +def _impulse_config() -> ImpulseConfig: + return ImpulseConfig.model_validate( + { + "source": { + "container_metrics_table": "c.s.container_metrics", + "channel_metrics_table": "c.s.channel_metrics", + "channels_uri": "c.s.channels", + "channel_mapping_table": "c.s.channel_mapping", + }, + } + ) + + +def _kvs_solver(spark: SparkSession) -> KeyValueStoreSolver: + return KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), + ), + ) + + +def test_returns_none_when_no_aliased_selectors( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + + result = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + config=_impulse_config(), + aliased_selectors=[], + ) + + assert result is None + + +def test_returns_none_when_solver_does_not_support_aliasing( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + """InMemorySolver inherits the base-class NotImplementedError default; dimension must skip.""" + solver = InMemorySolver() + query = key_value_store_alias_db.query + # Even with an aliased selector in the query, the dimension must skip when + # the solver doesn't override filter_aliased_channel_metrics. + aliased = query.channel_with_alias(channel_alias="engine_speed") + + result = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + config=_impulse_config(), + aliased_selectors=[aliased], + ) + + assert result is None + + +def test_returns_resolution_with_expected_schema( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + aliased = query.channel_with_alias(channel_alias="engine_speed") + + result = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + config=_impulse_config(), + aliased_selectors=[aliased], + ) + + assert result is not None + # selector_ids must be dropped; config_hash must be appended. + assert result.columns == [ + "container_id", + "channel_id", + "channel_name", + "data_key", + "channel_alias", + "priority", + "config_hash", + ] + rows = result.collect() + assert len(rows) > 0 + aliases = {row.channel_alias for row in rows} + assert aliases == {"engine_speed"} + # Every row resolved to a known physical channel for engine_speed. + for row in rows: + assert row.channel_name in {"Engine RPM", "EngSpd"} + assert row.data_key in {"TM", "ProjSpecREC_10Hz"} + assert row.config_hash is not None + + +def test_dimension_honors_pre_filtered_containers( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + """When pre_filtered_containers_df is supplied, the result is restricted to those containers.""" + import pyspark.sql.functions as F + + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + aliased = query.channel_with_alias(channel_alias="engine_speed") + + # Pre-filtered containers must carry the same columns as silver + # container_metrics so the solver's downstream project_id filter still + # applies (matches the contract used by the incremental container + # detector in production). + pre_filtered = key_value_store_alias_db.container_metrics(spark).where( + F.col("container_id") == 1 + ) + + result = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + config=_impulse_config(), + aliased_selectors=[aliased], + pre_filtered_containers_df=pre_filtered, + ) + + container_ids = {row.container_id for row in result.collect()} + assert container_ids == {1} From 378e8249a102cd7e0d4769409d04c6fbe23ec633 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 17:10:46 +0200 Subject: [PATCH 2/7] Add integration test for channel mapping resolution dimension --- ...annel_mapping_resolution_dimension_test.py | 85 +++++++++++++++++++ .../integration/test_helpers.py | 71 ++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py diff --git a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py new file mode 100644 index 0000000..4b544b8 --- /dev/null +++ b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py @@ -0,0 +1,85 @@ +"""Integration test for the channel_mapping_resolution_dimension gold table. + +Exercises the end-to-end flow: a Report configured with +``channel_mapping_table``, aggregations that use ``channel_with_alias``, +``determine_report()`` and ``persist_results()`` writing the new +``channel_mapping_resolution_dimension`` Delta table to the gold schema. +""" + +from tests.conftest import spark # noqa: F401 pytest fixture +from tests.impulse_reporting.integration.test_helpers import ( + add_histograms_aggregations, + create_alias_report, +) + + +def test_alias_report_writes_channel_mapping_resolution_dimension( + spark, setup_key_value_store_alias_db +): + report, channels = create_alias_report(spark, table_prefix="alias_int") + add_histograms_aggregations( + report, + engine_rpm=channels["engine_speed"], + vehicle_speed=channels["vehicle_speed"], + weights=channels["weights"], + ) + + report.determine_report() + report.persist_results() + + gold = spark.read.table( + "spark_catalog.gold.alias_int_channel_mapping_resolution_dimension" + ) + + # Schema: exact column set as written by ChannelMappingResolutionDimension + # + meta columns from ContainerDimensionWriter / persist pipeline. + assert set(gold.columns) == { + "container_id", + "channel_id", + "channel_name", + "data_key", + "channel_alias", + "priority", + "config_hash", + "_created_at", + } + + rows = gold.collect() + + # Six resolutions: 3 containers x 2 aliases. Containers 1 and 2 carry + # the (Engine RPM/TM) + (Vehicle Speed Sensor/TM) physical channels; + # container 3 carries (EngSpd/ProjSpecREC_10Hz) + (Spd_Vhcl/ProjSpecREC_10Hz). + resolutions = { + (r.container_id, r.channel_id, r.channel_name, r.data_key, r.channel_alias) + for r in rows + } + assert resolutions == { + (1, 5, "Engine RPM", "TM", "engine_speed"), + (1, 7, "Vehicle Speed Sensor", "TM", "vehicle_speed"), + (2, 5, "Engine RPM", "TM", "engine_speed"), + (2, 7, "Vehicle Speed Sensor", "TM", "vehicle_speed"), + (3, 5, "EngSpd", "ProjSpecREC_10Hz", "engine_speed"), + (3, 7, "Spd_Vhcl", "ProjSpecREC_10Hz", "vehicle_speed"), + } + + # The dimension contract dedupes by (container_id, channel_alias); + # each alias resolves to exactly one physical channel per container. + assert len(rows) == len( + {(r.container_id, r.channel_alias) for r in rows} + ) == 6 + + # The alias CSV leaves `priority` empty (NULL) for every mapping row; + # those NULLs propagate verbatim through the resolution. + assert all(r.priority is None for r in rows) + + # config_hash is deterministic per-config and applied uniformly to every + # row in the dimension df via ContainerDimension._add_config_hash. + config_hashes = {r.config_hash for r in rows} + assert len(config_hashes) == 1 + assert next(iter(config_hashes)) is not None + + # _created_at is stamped once via F.current_timestamp() inside + # ReportEntityTransformer.add_meta_information, so all rows share it. + created_ats = {r._created_at for r in rows} + assert len(created_ats) == 1 + assert next(iter(created_ats)) is not None diff --git a/tests/impulse_reporting/integration/test_helpers.py b/tests/impulse_reporting/integration/test_helpers.py index 337577c..490c544 100644 --- a/tests/impulse_reporting/integration/test_helpers.py +++ b/tests/impulse_reporting/integration/test_helpers.py @@ -5,6 +5,11 @@ from databricks.sdk import WorkspaceClient from pyspark.sql import SparkSession +from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + SolverConfig, + TableConfig, +) from impulse_reporting.aggregations.histogram import ( HistogramCustomWeights, HistogramDistance, @@ -18,6 +23,8 @@ from impulse_reporting.aggregations.stats_aggregator import StatsAggregator from impulse_reporting.config.config_parser import ( ImpulseConfig, + QueryEngine, + Solvers, Source, UnitySink, ) @@ -87,6 +94,70 @@ def create_default_report( return report, channels +def create_alias_report( + spark: SparkSession, + report_name: str = "alias_report", + table_prefix: str = "alias_test", +) -> tuple[Report, dict]: + """ + Create a Report pointed at the ``silver_key_value_store_alias`` schema + with ``channel_mapping_table`` configured, plus the matching + ``SolverConfig`` (project_id, ``project → project_id`` rename, and a + ``toolbox_id`` filter on the channel mapping) so the alias-CSV data + resolves cleanly. + + Returns + ------- + tuple + - report: Report wired up against the alias silver schema + - channels: dict with aliased expressions: + - 'engine_speed': aliased Engine RPM channel + - 'vehicle_speed': aliased Vehicle Speed Sensor channel + - 'weights': vehicle_speed (reused as weights handle) + - 'veh_spd_event_expr': vehicle_speed > 1 expression + """ + config = ImpulseConfig( + source=Source( + container_metrics_table="spark_catalog.silver_key_value_store_alias.container_metrics", + channel_metrics_table="spark_catalog.silver_key_value_store_alias.channel_metrics", + channels_uri="spark_catalog.silver_key_value_store_alias.channels", + channel_mapping_table="spark_catalog.silver_key_value_store_alias.channel_mapping", + ), + unity_sink=UnitySink( + catalog="spark_catalog", + schema="gold", + table_prefix=table_prefix, + ), + query_engine=QueryEngine( + solver=Solvers.KEY_VALUE_STORE_SOLVER, + solver_config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), + ), + ), + ) + + report = Report( + name=report_name, + spark=spark, + workspace_client=create_autospec(WorkspaceClient), + config=dict(config), + ) + + query = report.get_db().query + engine_speed = query.channel_with_alias(channel_alias="engine_speed") + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed") + + channels = { + "engine_speed": engine_speed, + "vehicle_speed": vehicle_speed, + "weights": vehicle_speed, + "veh_spd_event_expr": vehicle_speed > 1, + } + return report, channels + + def add_histograms_aggregations( report: Report, engine_rpm, From e9af40a68f394fc8d7c2b0616f799cc336485945 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 17:17:45 +0200 Subject: [PATCH 3/7] formatting --- .../analyze/query/solvers/key_value_store_solver.py | 4 +--- src/impulse_reporting/persist/report_storage.py | 9 ++------- .../channel_mapping_resolution_dimension_test.py | 11 +++-------- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 7c5eba1..2b330af 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -493,9 +493,7 @@ def filter_aliased_channel_metrics( resolved = resolved.withColumn( "selector_ids", F.array(self._build_selector_id_expr(selectors)) ) - join_key_metrics_cols = list( - dict.fromkeys(metrics_col for _, metrics_col in join_keys) - ) + join_key_metrics_cols = list(dict.fromkeys(metrics_col for _, metrics_col in join_keys)) out_cols = [ container_id_col, channel_id_col, diff --git a/src/impulse_reporting/persist/report_storage.py b/src/impulse_reporting/persist/report_storage.py index fc4fb1a..0174a7c 100644 --- a/src/impulse_reporting/persist/report_storage.py +++ b/src/impulse_reporting/persist/report_storage.py @@ -177,10 +177,7 @@ def get_output_uri_channel_mapping_resolution_dimension_table(self) -> str: f"{self.table_prefix}_channel_mapping_resolution_dimension" ) else: - uri = ( - f"{self.catalog_name}.{self.schema_name}." - "channel_mapping_resolution_dimension" - ) + uri = f"{self.catalog_name}.{self.schema_name}." "channel_mapping_resolution_dimension" return uri @@ -734,6 +731,4 @@ def create_channel_mapping_resolution_dimension_writer( ChannelMappingResolutionDimensionWriter A writer configured for the channel mapping resolution dimension. """ - return ChannelMappingResolutionDimensionWriter( - self.sink, self._default_transformer - ) + return ChannelMappingResolutionDimensionWriter(self.sink, self._default_transformer) diff --git a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py index 4b544b8..9aa4b13 100644 --- a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py @@ -27,9 +27,7 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( report.determine_report() report.persist_results() - gold = spark.read.table( - "spark_catalog.gold.alias_int_channel_mapping_resolution_dimension" - ) + gold = spark.read.table("spark_catalog.gold.alias_int_channel_mapping_resolution_dimension") # Schema: exact column set as written by ChannelMappingResolutionDimension # + meta columns from ContainerDimensionWriter / persist pipeline. @@ -50,8 +48,7 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( # the (Engine RPM/TM) + (Vehicle Speed Sensor/TM) physical channels; # container 3 carries (EngSpd/ProjSpecREC_10Hz) + (Spd_Vhcl/ProjSpecREC_10Hz). resolutions = { - (r.container_id, r.channel_id, r.channel_name, r.data_key, r.channel_alias) - for r in rows + (r.container_id, r.channel_id, r.channel_name, r.data_key, r.channel_alias) for r in rows } assert resolutions == { (1, 5, "Engine RPM", "TM", "engine_speed"), @@ -64,9 +61,7 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( # The dimension contract dedupes by (container_id, channel_alias); # each alias resolves to exactly one physical channel per container. - assert len(rows) == len( - {(r.container_id, r.channel_alias) for r in rows} - ) == 6 + assert len(rows) == len({(r.container_id, r.channel_alias) for r in rows}) == 6 # The alias CSV leaves `priority` empty (NULL) for every mapping row; # those NULLs propagate verbatim through the resolution. From 109957d683f47c70f09f4f228633ef9672aca053 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 18:39:11 +0200 Subject: [PATCH 4/7] Refactor channel mapping resolution logic to clarify behavior when no aliased selectors are present. --- .../meta/container_dimensions.py | 27 +++++++++---------- ...annel_mapping_resolution_dimension_test.py | 22 --------------- 2 files changed, 13 insertions(+), 36 deletions(-) diff --git a/src/impulse_reporting/meta/container_dimensions.py b/src/impulse_reporting/meta/container_dimensions.py index 18fb1bd..2cd1a39 100644 --- a/src/impulse_reporting/meta/container_dimensions.py +++ b/src/impulse_reporting/meta/container_dimensions.py @@ -129,13 +129,10 @@ def get_dimension( """ Compute the channel mapping resolution dimension for the report. - Returns ``None`` when the dimension cannot be built — either the - solver does not override ``filter_aliased_channel_metrics`` (e.g. - ``DeltaSolver``, ``BlobSolver``, ``InMemorySolver``) or the report - has no aliased selectors. In both cases the persist step should be - a no-op. + Returns ``None`` when the report has no aliased selectors — there + is nothing to resolve, and the persist step is a no-op. - When buildable, runs the solver's container-side filter pipeline + Otherwise runs the solver's container-side filter pipeline (``filter_container_tags`` → ``filter_container_metrics``) so the result honors ``pre_filtered_containers_df`` for incremental mode, then calls ``filter_aliased_channel_metrics`` with the aliased @@ -143,6 +140,14 @@ def get_dimension( The internal ``selector_ids`` column is dropped since it is a runtime artifact and not part of the dimension contract. + Notes + ----- + Solver-capability is *not* checked here. If a report has aliased + selectors, the configured solver must support alias resolution — + otherwise ``QueryBuilder.solve`` upstream of this call has + already raised ``NotImplementedError``. We rely on that invariant + instead of introspecting the solver class. + Parameters ---------- spark : SparkSession @@ -165,15 +170,9 @@ def get_dimension( DataFrame with columns ``(container_id, channel_id, , channel_alias, alias_priority[, source_unit, target_unit], - config_hash)``, or ``None`` if the dimension is not - applicable. + config_hash)``, or ``None`` if the report has no aliased + selectors. """ - if ( - type(solver).filter_aliased_channel_metrics - is QuerySolver.filter_aliased_channel_metrics - ): - return None - if not aliased_selectors: return None diff --git a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py index e6c4c45..e8d2e08 100644 --- a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py @@ -2,7 +2,6 @@ from pyspark.sql import SparkSession -from impulse_query_engine.analyze.query.solvers.in_memory_solver import InMemorySolver from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( KeyValueStoreSolver, ) @@ -57,27 +56,6 @@ def test_returns_none_when_no_aliased_selectors( assert result is None -def test_returns_none_when_solver_does_not_support_aliasing( - spark: SparkSession, key_value_store_alias_db: MeasurementDB -): - """InMemorySolver inherits the base-class NotImplementedError default; dimension must skip.""" - solver = InMemorySolver() - query = key_value_store_alias_db.query - # Even with an aliased selector in the query, the dimension must skip when - # the solver doesn't override filter_aliased_channel_metrics. - aliased = query.channel_with_alias(channel_alias="engine_speed") - - result = ChannelMappingResolutionDimension.get_dimension( - spark=spark, - query=query, - solver=solver, - config=_impulse_config(), - aliased_selectors=[aliased], - ) - - assert result is None - - def test_returns_resolution_with_expected_schema( spark: SparkSession, key_value_store_alias_db: MeasurementDB ): From 4e856d5b831af44aa449f857b440e7d55f2be858 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 19:11:56 +0200 Subject: [PATCH 5/7] Remove unused config parameter from ChannelMappingResolutionDimension and related tests to streamline functionality. --- src/impulse_reporting/core/report.py | 1 - .../meta/container_dimensions.py | 10 +++------ ...annel_mapping_resolution_dimension_test.py | 9 +------- ...annel_mapping_resolution_dimension_test.py | 21 +------------------ 4 files changed, 5 insertions(+), 36 deletions(-) diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py index eff6619..91a5cda 100644 --- a/src/impulse_reporting/core/report.py +++ b/src/impulse_reporting/core/report.py @@ -1040,7 +1040,6 @@ def determine_report(self, is_incremental: bool = None): spark=self.spark, query=self.query, solver=self.solver, - config=self.config, aliased_selectors=aliased_selectors, pre_filtered_containers_df=pre_filtered_containers_df, ) diff --git a/src/impulse_reporting/meta/container_dimensions.py b/src/impulse_reporting/meta/container_dimensions.py index 2cd1a39..8f703b4 100644 --- a/src/impulse_reporting/meta/container_dimensions.py +++ b/src/impulse_reporting/meta/container_dimensions.py @@ -122,7 +122,6 @@ def get_dimension( spark: SparkSession, query: QueryBuilder, solver: QuerySolver, - config: ImpulseConfig, aliased_selectors: list[TimeSeriesSelector], pre_filtered_containers_df: DataFrame = None, ) -> DataFrame | None: @@ -156,8 +155,6 @@ def get_dimension( The query builder used for the report. solver : QuerySolver The solver instance to use for query execution. - config : ImpulseConfig - The configuration object; used to attach ``config_hash``. aliased_selectors : list[TimeSeriesSelector] Aliased selectors (``uses_alias=True``) collected from the report's events and aggregations. May be empty. @@ -169,9 +166,8 @@ def get_dimension( DataFrame or None DataFrame with columns ``(container_id, channel_id, , - channel_alias, alias_priority[, source_unit, target_unit], - config_hash)``, or ``None`` if the report has no aliased - selectors. + channel_alias, alias_priority[, source_unit, target_unit])``, + or ``None`` if the report has no aliased selectors. """ if not aliased_selectors: return None @@ -188,4 +184,4 @@ def get_dimension( if "selector_ids" in resolved.columns: resolved = resolved.drop("selector_ids") - return resolved.transform(ContainerDimension._add_config_hash(config)) + return resolved diff --git a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py index 9aa4b13..791bb6b 100644 --- a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py @@ -30,7 +30,7 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( gold = spark.read.table("spark_catalog.gold.alias_int_channel_mapping_resolution_dimension") # Schema: exact column set as written by ChannelMappingResolutionDimension - # + meta columns from ContainerDimensionWriter / persist pipeline. + # + the _created_at meta column from the writer. assert set(gold.columns) == { "container_id", "channel_id", @@ -38,7 +38,6 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( "data_key", "channel_alias", "priority", - "config_hash", "_created_at", } @@ -67,12 +66,6 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( # those NULLs propagate verbatim through the resolution. assert all(r.priority is None for r in rows) - # config_hash is deterministic per-config and applied uniformly to every - # row in the dimension df via ContainerDimension._add_config_hash. - config_hashes = {r.config_hash for r in rows} - assert len(config_hashes) == 1 - assert next(iter(config_hashes)) is not None - # _created_at is stamped once via F.current_timestamp() inside # ReportEntityTransformer.add_meta_information, so all rows share it. created_ats = {r._created_at for r in rows} diff --git a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py index e8d2e08..946416c 100644 --- a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py @@ -11,23 +11,9 @@ TableConfig, ) from impulse_query_engine.measurement_db import MeasurementDB -from impulse_reporting.config.config_parser import ImpulseConfig from impulse_reporting.meta.container_dimensions import ChannelMappingResolutionDimension -def _impulse_config() -> ImpulseConfig: - return ImpulseConfig.model_validate( - { - "source": { - "container_metrics_table": "c.s.container_metrics", - "channel_metrics_table": "c.s.channel_metrics", - "channels_uri": "c.s.channels", - "channel_mapping_table": "c.s.channel_mapping", - }, - } - ) - - def _kvs_solver(spark: SparkSession) -> KeyValueStoreSolver: return KeyValueStoreSolver( spark, @@ -49,7 +35,6 @@ def test_returns_none_when_no_aliased_selectors( spark=spark, query=query, solver=solver, - config=_impulse_config(), aliased_selectors=[], ) @@ -67,12 +52,11 @@ def test_returns_resolution_with_expected_schema( spark=spark, query=query, solver=solver, - config=_impulse_config(), aliased_selectors=[aliased], ) assert result is not None - # selector_ids must be dropped; config_hash must be appended. + # selector_ids is dropped; no config_hash on this dimension. assert result.columns == [ "container_id", "channel_id", @@ -80,7 +64,6 @@ def test_returns_resolution_with_expected_schema( "data_key", "channel_alias", "priority", - "config_hash", ] rows = result.collect() assert len(rows) > 0 @@ -90,7 +73,6 @@ def test_returns_resolution_with_expected_schema( for row in rows: assert row.channel_name in {"Engine RPM", "EngSpd"} assert row.data_key in {"TM", "ProjSpecREC_10Hz"} - assert row.config_hash is not None def test_dimension_honors_pre_filtered_containers( @@ -115,7 +97,6 @@ def test_dimension_honors_pre_filtered_containers( spark=spark, query=query, solver=solver, - config=_impulse_config(), aliased_selectors=[aliased], pre_filtered_containers_df=pre_filtered, ) From e95ffebea4be1f18018882519a9d5ed22acbea8d Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Fri, 29 May 2026 19:40:15 +0200 Subject: [PATCH 6/7] added test --- .../unit/core/report_incremental_test.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/impulse_reporting/unit/core/report_incremental_test.py b/tests/impulse_reporting/unit/core/report_incremental_test.py index 8e35a83..6613c68 100644 --- a/tests/impulse_reporting/unit/core/report_incremental_test.py +++ b/tests/impulse_reporting/unit/core/report_incremental_test.py @@ -695,6 +695,49 @@ def test_persist_incremental_measurement_dimension_upserts(self, spark): upsert_call = report.sink.upsert.call_args assert upsert_call.args[2] == ["container_id"] + def test_persist_incremental_channel_mapping_resolution_dimension_upserts(self, spark): + """Channel mapping resolution dimension upserts with merge keys read from solver config. + + Guards the SolverConfig parameterization of the merge keys — if the + list ever drifts back to hardcoded literals, the column-name + properties on solver.config would no longer flow through, and a + custom SolverConfig override would break in production. + """ + report = _build_report(spark) + mock_dim_df = MagicMock(spec=DataFrame) + mock_dim_df.transform.return_value = MagicMock(spec=DataFrame) + + # Pin solver.config.*_col to real strings so the merge keys + # resolve from the SolverConfig instead of returning child mocks. + report.solver.config.container_id_col = "container_id" + report.solver.config.channel_id_col = "channel_id" + report.solver.config.channel_alias_col = "channel_alias" + + report.aggregation_dfs = {} + report.aggregation_metadata_dfs = {} + report.event_dfs = {} + report.event_metadata_dfs = {} + report.container_dimension_df = None + report.channel_mapping_resolution_dimension_df = mock_dim_df + + with ( + patch("impulse_reporting.core.report.WriterFactory") as mock_factory_cls, + patch("impulse_reporting.core.report.ReportEntityTransformer"), + ): + mock_writer = MagicMock() + mock_writer.get_output_uri.return_value = ( + "catalog.gold.channel_mapping_resolution_dimension" + ) + mock_factory_cls.return_value.create_channel_mapping_resolution_dimension_writer.return_value = ( + mock_writer + ) + + report._persist_incremental(changed_aggregation_ids={}, changed_event_ids={}) + + report.sink.upsert.assert_called_once() + upsert_call = report.sink.upsert.call_args + assert upsert_call.args[2] == ["container_id", "channel_id", "channel_alias"] + def test_persist_incremental_cross_type_changed_events_combined(self, spark): """When multiple event types share a fact table and both have changed definitions, their DataFrames are combined into a single replace_by_ids From 36916d4084c46a0e3a1e416e5b9703b33f6987ae Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Tue, 9 Jun 2026 15:57:00 +0200 Subject: [PATCH 7/7] Implement channel mapping resolution dimension logic to handle changed and unchanged definitions separately, ensuring new aliases are resolved across all containers during incremental runs. Update related documentation and tests to reflect these changes. --- docs/impulse/docs/config/configuration.md | 2 +- .../data_model/gold_layer_event_normalized.md | 22 +++++ src/impulse_reporting/core/report.py | 18 ++-- .../meta/container_dimensions.py | 87 +++++++++++++++++++ ...annel_mapping_resolution_dimension_test.py | 79 +++++++++++++++++ .../integration/test_helpers.py | 19 ++++ ...annel_mapping_resolution_dimension_test.py | 82 +++++++++++++++++ 7 files changed, 303 insertions(+), 6 deletions(-) diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index 471ff5c..b806425 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -61,7 +61,7 @@ Maps the silver-layer input tables. | `channels_uri` | `str` | Yes | Full Unity Catalog path. Time-series sample data. | | `container_tags_table` | `str` | No | Full Unity Catalog path. Container EAV tags. | | `channel_tags_table` | `str` | No | Full Unity Catalog path. Channel EAV tags. | -| `channel_mapping_table` | `str` | No | Full Unity Catalog path. Logical-to-physical channel alias table. Required when using `QueryBuilder.channel_with_alias()` (currently supported by `KeyValueStoreSolver`). | +| `channel_mapping_table` | `str` | No | Full Unity Catalog path. Logical-to-physical channel alias table. Required when using `QueryBuilder.channel_with_alias()` (currently supported by `KeyValueStoreSolver`). In reporting mode the resolved alias-to-physical-channel mapping is materialized to the gold-layer [`channel_mapping_resolution_dimension`](../data_model/gold_layer_event_normalized.md#dimension-tables). | | `unit_conversion_table` | `str` | No | Full Unity Catalog path. Per-unit-family conversion factors. When configured together with a `channel_mapping_table` whose rows carry `source_unit` / `target_unit` columns, aliased selectors auto-convert values from source to target unit during `solve()` (currently supported by `KeyValueStoreSolver`). | Tag tables are required for solvers that consume tag-based filters diff --git a/docs/impulse/docs/data_model/gold_layer_event_normalized.md b/docs/impulse/docs/data_model/gold_layer_event_normalized.md index 2fabfbe..2b8f51a 100644 --- a/docs/impulse/docs/data_model/gold_layer_event_normalized.md +++ b/docs/impulse/docs/data_model/gold_layer_event_normalized.md @@ -143,6 +143,18 @@ histogram2d_fact { timestamp _created_at } +channel_mapping_resolution_dimension { + long container_id FK + long channel_id + string channel_name + string data_key + string channel_alias + string priority "nullable" + string source_unit "optional" + string target_unit "optional" + timestamp _created_at +} + histogram_fact }o--|| event_dimension: event_id histogram2d_fact }o--|| event_dimension: event_id stats_aggregator_fact }o--|| event_instance_fact: event_instance_id @@ -155,6 +167,8 @@ measurement_dimension ||--o{ histogram_fact : container_id measurement_dimension ||--o{ histogram2d_fact : container_id measurement_dimension ||--o{ stats_aggregator_fact : container_id +measurement_dimension ||--o{ channel_mapping_resolution_dimension : container_id + event_instance_fact }o--|| event_dimension: event_id ``` @@ -186,3 +200,11 @@ guaranteed. | `{prefix}_stats_aggregator_dimension` | `visual_id`, `report_id` | Statistics metadata (signals, aggregation labels). | | `{prefix}_event_dimension` | `event_id`, `report_id` | Event definitions (name, expression, required channels). | | `{prefix}_measurement_dimension` | `container_id` | Container metadata. Always carries `container_id`, `config_hash`, `_created_at`; additional columns are populated from [`config.measurement_dimensions`](../config/configuration.md#measurement_dimensions-optional). | +| `{prefix}_channel_mapping_resolution_dimension` | `container_id`, `channel_id`, `channel_alias` | Resolves each channel alias to its physical channel per container (physical join keys, alias `priority`). Written only when the report uses aliased selectors. The `source_unit` / `target_unit` columns are present only when a [`config.unit_conversion_table`](../config/configuration.md) is configured. | + +The `channel_mapping_resolution_dimension` table lets BI consumers join a fact +back to the physical channel that an alias resolved to: join on +`(container_id, channel_id, channel_alias)`. The join-key, `channel_alias`, +`priority`, and `source_unit` / `target_unit` column names follow the +[`channel_mapping` solver config](../config/configuration.md) — see the column +reference there for how each maps to the alias and metrics tables. diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py index 91a5cda..601a6c2 100644 --- a/src/impulse_reporting/core/report.py +++ b/src/impulse_reporting/core/report.py @@ -1030,17 +1030,25 @@ def determine_report(self, is_incremental: bool = None): pre_filtered_containers_df=pre_filtered_containers_df, ) - # Determine channel mapping resolution dimension - aliased_selectors = TimeSeriesExpression.collect_selectors( - all_changed_expressions + all_unchanged_expressions, + # Determine channel mapping resolution dimension. + # Mirror the fact split: aliases from changed definitions resolve + # over all containers, aliases only in unchanged definitions stay + # scoped to the incrementally-detected containers. + changed_aliased_selectors = TimeSeriesExpression.collect_selectors( + all_changed_expressions, + uses_alias=True, + ) + unchanged_aliased_selectors = TimeSeriesExpression.collect_selectors( + all_unchanged_expressions, uses_alias=True, ) self.channel_mapping_resolution_dimension_df = ( - ChannelMappingResolutionDimension.get_dimension( + ChannelMappingResolutionDimension.get_dimension_for_scopes( spark=self.spark, query=self.query, solver=self.solver, - aliased_selectors=aliased_selectors, + changed_aliased_selectors=changed_aliased_selectors, + unchanged_aliased_selectors=unchanged_aliased_selectors, pre_filtered_containers_df=pre_filtered_containers_df, ) ) diff --git a/src/impulse_reporting/meta/container_dimensions.py b/src/impulse_reporting/meta/container_dimensions.py index 8f703b4..9612cee 100644 --- a/src/impulse_reporting/meta/container_dimensions.py +++ b/src/impulse_reporting/meta/container_dimensions.py @@ -185,3 +185,90 @@ def get_dimension( resolved = resolved.drop("selector_ids") return resolved + + @staticmethod + def get_dimension_for_scopes( + spark: SparkSession, + query: QueryBuilder, + solver: QuerySolver, + changed_aliased_selectors: list[TimeSeriesSelector], + unchanged_aliased_selectors: list[TimeSeriesSelector], + pre_filtered_containers_df: DataFrame = None, + ) -> DataFrame | None: + """ + Compute the channel mapping resolution dimension honoring the + report's changed/unchanged definition split. + + Mirrors the scoping the fact pipeline uses: aliases referenced by + *changed* definitions are resolved over **all** containers + (``pre_filtered_containers_df=None``), while aliases referenced + only by *unchanged* definitions stay scoped to + ``pre_filtered_containers_df``. This keeps incremental runs cheap + without leaving older containers unresolved when a new alias is + introduced by a changed definition. + + In full (non-incremental) mode ``pre_filtered_containers_df`` is + ``None``, so both scopes resolve over all containers and the union + is equivalent to resolving the combined selector set in one pass. + + Aliases already covered by the changed set are excluded from the + unchanged set (by ``selector_id``) so the two scopes resolve + disjoint aliases. A given alias maps to one stable ``selector_id``, + so disjoint ``selector_id`` sets yield disjoint ``channel_alias`` + values and therefore no ``(container_id, channel_alias)`` collision + across the two results — which also keeps the downstream Delta + ``MERGE`` from seeing duplicate source rows for a merge key. + + Parameters + ---------- + spark : SparkSession + Spark session for data processing. + query : QueryBuilder + The query builder used for the report. + solver : QuerySolver + The solver instance to use for query execution. + changed_aliased_selectors : list[TimeSeriesSelector] + Aliased selectors from changed definitions (resolved over all + containers). May be empty. + unchanged_aliased_selectors : list[TimeSeriesSelector] + Aliased selectors from unchanged definitions (resolved over + ``pre_filtered_containers_df``). May be empty. + pre_filtered_containers_df : DataFrame, optional + Pre-filtered containers for incremental processing. + + Returns + ------- + DataFrame or None + The combined resolution dimension, or ``None`` if neither + scope has aliased selectors. + """ + changed_ids = {selector.selector_id for selector in changed_aliased_selectors} + unchanged_only_selectors = [ + selector + for selector in unchanged_aliased_selectors + if selector.selector_id not in changed_ids + ] + + changed_df = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + aliased_selectors=changed_aliased_selectors, + pre_filtered_containers_df=None, + ) + unchanged_df = ChannelMappingResolutionDimension.get_dimension( + spark=spark, + query=query, + solver=solver, + aliased_selectors=unchanged_only_selectors, + pre_filtered_containers_df=pre_filtered_containers_df, + ) + + if changed_df is None: + return unchanged_df + if unchanged_df is None: + return changed_df + + # ``source_unit`` / ``target_unit`` only appear for selectors whose + # mapping rows carry them, so the two scopes may differ in columns. + return changed_df.unionByName(unchanged_df, allowMissingColumns=True) diff --git a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py index 791bb6b..4467a0b 100644 --- a/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/integration/channel_mapping_resolution_dimension_test.py @@ -6,6 +6,8 @@ ``channel_mapping_resolution_dimension`` Delta table to the gold schema. """ +from impulse_reporting.aggregations.histogram import HistogramDuration +from impulse_reporting.core.page import Page from tests.conftest import spark # noqa: F401 pytest fixture from tests.impulse_reporting.integration.test_helpers import ( add_histograms_aggregations, @@ -71,3 +73,80 @@ def test_alias_report_writes_channel_mapping_resolution_dimension( created_ats = {r._created_at for r in rows} assert len(created_ats) == 1 assert next(iter(created_ats)) is not None + + +def test_incremental_added_alias_resolves_over_all_containers( + spark, setup_key_value_store_alias_db +): + """A new alias introduced on an incremental run is a *changed* definition + and must be resolved across ALL containers — not only the incrementally + detected ones — mirroring how the fact tables are reprocessed. + + Scenario: run 1 (full) registers only ``engine_speed``. Run 2 runs + incrementally with silver unchanged (so no containers are upserted) and + adds a ``vehicle_speed`` aggregation. The new alias must still appear for + every container in the resolution dimension. + """ + prefix = "alias_inc" + table = f"spark_catalog.gold.{prefix}_channel_mapping_resolution_dimension" + rpm_bins = [float(i) for i in range(0, 8000, 250)] + speed_bins = [float(i) for i in range(0, 300, 1)] + + # Start from a clean gold slate so run 1 is genuinely a first/full run + # (the gold warehouse can persist across local test runs). + spark.sql("CREATE SCHEMA IF NOT EXISTS spark_catalog.gold") + for t in spark.sql("SHOW TABLES IN spark_catalog.gold").collect(): + if t.tableName.startswith(prefix): + spark.sql(f"DROP TABLE IF EXISTS spark_catalog.gold.{t.tableName} PURGE") + + # --- Run 1 (gold absent -> full): only the engine_speed alias. --- + report1, channels1 = create_alias_report(spark, table_prefix=prefix, incremental=True) + page1 = Page(page_number=1) + page1.add_aggregation( + HistogramDuration(name="rpm_hist", base_expr=channels1["engine_speed"], bins=rpm_bins) + ) + report1.add_page(page1) + report1.determine_report() + report1.persist_results() + + after_run1 = {(r.container_id, r.channel_alias) for r in spark.read.table(table).collect()} + assert after_run1 == { + (1, "engine_speed"), + (2, "engine_speed"), + (3, "engine_speed"), + } + + # --- Run 2 (incremental, silver unchanged -> no containers upserted): + # keep engine_speed (unchanged) and ADD vehicle_speed (changed/new). --- + report2, channels2 = create_alias_report(spark, table_prefix=prefix, incremental=True) + page2 = Page(page_number=1) + page2.add_aggregation( + HistogramDuration(name="rpm_hist", base_expr=channels2["engine_speed"], bins=rpm_bins) + ) + page2.add_aggregation( + HistogramDuration(name="speed_hist", base_expr=channels2["vehicle_speed"], bins=speed_bins) + ) + report2.add_page(page2) + report2.determine_report() + report2.persist_results() + + gold = spark.read.table(table) + after_run2 = {(r.container_id, r.channel_alias) for r in gold.collect()} + + # The newly-added vehicle_speed alias is resolved for ALL containers, + # even though no containers were incrementally upserted; the engine_speed + # rows written by run 1 are preserved by the upsert. + assert after_run2 == { + (1, "engine_speed"), + (2, "engine_speed"), + (3, "engine_speed"), + (1, "vehicle_speed"), + (2, "vehicle_speed"), + (3, "vehicle_speed"), + } + + # The MERGE saw no duplicate source rows for its key. + assert ( + gold.count() + == gold.dropDuplicates(["container_id", "channel_id", "channel_alias"]).count() + ) diff --git a/tests/impulse_reporting/integration/test_helpers.py b/tests/impulse_reporting/integration/test_helpers.py index 490c544..d873ce0 100644 --- a/tests/impulse_reporting/integration/test_helpers.py +++ b/tests/impulse_reporting/integration/test_helpers.py @@ -23,6 +23,7 @@ from impulse_reporting.aggregations.stats_aggregator import StatsAggregator from impulse_reporting.config.config_parser import ( ImpulseConfig, + IncrementalConfig, QueryEngine, Solvers, Source, @@ -98,6 +99,7 @@ def create_alias_report( spark: SparkSession, report_name: str = "alias_report", table_prefix: str = "alias_test", + incremental: bool = False, ) -> tuple[Report, dict]: """ Create a Report pointed at the ``silver_key_value_store_alias`` schema @@ -106,6 +108,14 @@ def create_alias_report( ``toolbox_id`` filter on the channel mapping) so the alias-CSV data resolves cleanly. + Parameters + ---------- + incremental : bool, optional + When ``True``, enable incremental processing. The alias silver + ``container_metrics`` carries no last-modified column, so update + detection is a no-op and only genuinely new containers are + upserted (default is ``False``). + Returns ------- tuple @@ -136,6 +146,15 @@ def create_alias_report( channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ), + incremental=( + IncrementalConfig( + enabled=True, + silver_last_modified_column="timestamp", + gold_last_modified_column="_created_at", + ) + if incremental + else None + ), ) report = Report( diff --git a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py index 946416c..842ae14 100644 --- a/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py +++ b/tests/impulse_reporting/unit/meta/channel_mapping_resolution_dimension_test.py @@ -103,3 +103,85 @@ def test_dimension_honors_pre_filtered_containers( container_ids = {row.container_id for row in result.collect()} assert container_ids == {1} + + +def test_scopes_returns_none_when_both_empty( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + + result = ChannelMappingResolutionDimension.get_dimension_for_scopes( + spark=spark, + query=query, + solver=solver, + changed_aliased_selectors=[], + unchanged_aliased_selectors=[], + ) + + assert result is None + + +def test_scopes_changed_alias_covers_all_containers_unchanged_alias_scoped( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + """The fact-split contract: a changed alias resolves over ALL containers + even under a pre_filter, while an alias only in unchanged definitions + stays scoped to pre_filtered_containers_df.""" + import pyspark.sql.functions as F + + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + changed = query.channel_with_alias(channel_alias="engine_speed") + unchanged = query.channel_with_alias(channel_alias="vehicle_speed") + + pre_filtered = key_value_store_alias_db.container_metrics(spark).where( + F.col("container_id") == 1 + ) + + result = ChannelMappingResolutionDimension.get_dimension_for_scopes( + spark=spark, + query=query, + solver=solver, + changed_aliased_selectors=[changed], + unchanged_aliased_selectors=[unchanged], + pre_filtered_containers_df=pre_filtered, + ) + + rows = result.collect() + changed_containers = {r.container_id for r in rows if r.channel_alias == "engine_speed"} + unchanged_containers = {r.container_id for r in rows if r.channel_alias == "vehicle_speed"} + + # Changed alias: full coverage regardless of the pre_filter. + assert changed_containers == {1, 2, 3} + # Unchanged-only alias: restricted to the pre-filtered container. + assert unchanged_containers == {1} + + +def test_scopes_alias_in_both_lists_resolved_once( + spark: SparkSession, key_value_store_alias_db: MeasurementDB +): + """An alias present in both the changed and unchanged sets is resolved + once (via the changed/full scope), so no duplicate + (container_id, channel_alias) rows reach the downstream MERGE.""" + solver = _kvs_solver(spark) + query = key_value_store_alias_db.query + engine = query.channel_with_alias(channel_alias="engine_speed") + vehicle = query.channel_with_alias(channel_alias="vehicle_speed") + + # engine_speed appears in both lists; vehicle_speed only in unchanged. + result = ChannelMappingResolutionDimension.get_dimension_for_scopes( + spark=spark, + query=query, + solver=solver, + changed_aliased_selectors=[engine], + unchanged_aliased_selectors=[engine, vehicle], + ) + + rows = result.collect() + keys = [(r.container_id, r.channel_alias) for r in rows] + # No duplicates across the two scopes. + assert len(keys) == len(set(keys)) + # engine_speed resolved over all containers exactly once each. + engine_containers = sorted(r.container_id for r in rows if r.channel_alias == "engine_speed") + assert engine_containers == [1, 2, 3]