diff --git a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql index c32dcf520..ed5efe49f 100644 --- a/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql +++ b/macros/edr/data_monitoring/data_monitors_configuration/get_column_monitors.sql @@ -1,13 +1,35 @@ {% macro get_column_obj_and_monitors(model_relation, column_name, monitors=none) %} - {% set column_obj_and_monitors = [] %} {% set column_objects = adapter.get_columns_in_relation(model_relation) %} + + {% if target.type == 'bigquery' %} + {% set expanded = [] %} + {% for col in column_objects %} + {% do expanded.append(col) %} + {% if col.fields | length > 0 %} + {# `BigQueryColumn.flatten()` discards ancestor modes, so a + NULLABLE leaf under a REPEATED ancestor still satisfies + `leaf.mode != 'REPEATED'`. Build the set of safe leaf names + via an ancestor-aware walker and filter `flatten()` against + it. #} + {% set safe_names = elementary.bq_safe_leaf_names(col) %} + {% for leaf in col.flatten() %} + {% if leaf.name in safe_names %} + {% do expanded.append(leaf) %} + {% endif %} + {% endfor %} + {% endif %} + {% endfor %} + {% set column_objects = expanded %} + {% endif %} + {% for column_obj in column_objects %} {% if column_obj.name.strip('"') | lower == column_name.strip('"') | lower %} + {% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %} {% set column_monitors = elementary.column_monitors_by_type( - elementary.get_column_data_type(column_obj), monitors + elementary.get_column_data_type(wrapped), monitors ) %} - {% set column_item = {"column": column_obj, "monitors": column_monitors} %} + {% set column_item = {"column": wrapped, "monitors": column_monitors} %} {{ return(column_item) }} {% endif %} {% endfor %} @@ -22,10 +44,11 @@ {% set column_objects = adapter.get_columns_in_relation(model_relation) %} {% for column_obj in column_objects %} + {% set wrapped = elementary.wrap_column_for_struct_support(column_obj) %} {% set column_monitors = elementary.column_monitors_by_type( - elementary.get_column_data_type(column_obj), monitors + elementary.get_column_data_type(wrapped), monitors ) %} - {% set column_item = {"column": column_obj, "monitors": column_monitors} %} + {% set column_item = {"column": wrapped, "monitors": column_monitors} %} {% do column_obj_and_monitors.append(column_item) %} {% endfor %} diff --git a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql index ecd1d3564..acc4fa7f8 100644 --- a/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/column_monitoring_query.sql @@ -15,7 +15,13 @@ {%- set timestamp_column = metric_properties.timestamp_column %} {% set prefixed_dimensions = [] %} {% for dimension_column in dimensions %} - {% do prefixed_dimensions.append("dimension_" ~ dimension_column) %} + {% if elementary.bq_is_nested_identifier(dimension_column) %} + {% do prefixed_dimensions.append( + "dimension_" ~ elementary.bq_safe_alias(dimension_column) + ) %} + {% else %} + {% do prefixed_dimensions.append("dimension_" ~ dimension_column) %} + {% endif %} {% endfor %} {% set metric_types = [] %} @@ -53,7 +59,7 @@ ), filtered_monitored_table as ( select - {{ column_obj.quoted }}, + {{ column_obj.quoted }}{% if column_obj.is_nested %} as {{ adapter.quote(column_obj.safe_alias) }}{% endif %}, {%- if dimensions -%} {{ elementary.select_dimensions_columns( @@ -78,7 +84,7 @@ {%- else %} filtered_monitored_table as ( select - {{ column_obj.quoted }}, + {{ column_obj.quoted }}{% if column_obj.is_nested %} as {{ adapter.quote(column_obj.safe_alias) }}{% endif %}, {%- if dimensions -%} {{ elementary.select_dimensions_columns( @@ -94,7 +100,7 @@ column_metrics as ( {%- if column_metrics %} - {%- set column = column_obj.quoted -%} + {%- set column = adapter.quote(column_obj.safe_alias) if column_obj.is_nested else column_obj.quoted -%} select {%- if timestamp_column %} edr_bucket_start as bucket_start, edr_bucket_end as bucket_end, @@ -341,17 +347,135 @@ {% endif %} {% endmacro %} +{# Segment-quotes nested dimensions on BigQuery and sanitises the alias suffix. + Backward compatible for non-nested columns and non-BQ adapters. #} {% macro select_dimensions_columns(dimension_columns, as_prefix="") %} {% set select_statements %} {%- for column in dimension_columns -%} - {{ column }} {%- if as_prefix -%} - {{ " as " ~ as_prefix ~ "_" ~ column }} - {%- endif -%} - {%- if not loop.last -%} - {{ ", " }} + {%- set _is_nested_bq = elementary.bq_is_nested_identifier(column) -%} + {%- set _source = elementary.bq_segment_quote(column) if _is_nested_bq else column -%} + {%- set _alias_suffix = elementary.bq_safe_alias(column) if _is_nested_bq else column -%} + {{ _source }}{{ " as " ~ as_prefix ~ "_" ~ _alias_suffix }} + {%- else -%} + {{ column }} {%- endif -%} + {%- if not loop.last -%}{{ ", " }}{%- endif -%} {%- endfor -%} {% endset %} {{ return(select_statements) }} {% endmacro %} + + +{# ---------------------------------------------------------------------- #} +{# BigQuery STRUCT nested-field helpers. #} +{# ---------------------------------------------------------------------- #} + +{# True only on BigQuery and only when `name` is a plain dotted identifier path + (e.g. user.address.city) — i.e. an actual nested STRUCT reference. Returns + false for plain identifiers, SQL expressions (dimensions are documented as + accepting arbitrary expressions, which must pass through untouched) and + non-BigQuery adapters. #} +{% macro bq_is_nested_identifier(name) %} + {%- if target.type != 'bigquery' or name is not string -%} + {{ return(false) }} + {%- endif -%} + {{ return(modules.re.match('^\\w+(\\.\\w+)+$', name) is not none) }} +{% endmacro %} + +{# Segment-quote a nested identifier path for BigQuery: + user.address.city -> `user`.`address`.`city`. + Anything that is not a nested identifier path (plain identifiers, SQL + expressions, non-BigQuery adapters) is returned unchanged, preserving + existing behaviour at all callsites. #} +{% macro bq_segment_quote(name) %} + {%- if elementary.bq_is_nested_identifier(name) -%} + {%- set parts = [] -%} + {%- for seg in name.split('.') -%} + {%- do parts.append('`' ~ seg ~ '`') -%} + {%- endfor -%} + {{ parts | join('.') }} + {%- else -%} + {{ name }} + {%- endif -%} +{% endmacro %} + +{# Convert a (possibly dotted) identifier into a dot-free alias safe to use + as a SQL identifier. No-op for names without dots. #} +{% macro bq_safe_alias(name) %} + {{- name | replace('.', '__') -}} +{% endmacro %} + +{# Wrap a Column / BigQueryColumn with a dict carrying the SQL identifier + representation (.quoted, segment-quoted for nested), a CTE-projection-safe + alias (.safe_alias, dot-free) and an .is_nested flag. For non-nested columns + and non-BigQuery adapters the wrapper mirrors the original Column's values + (safe_alias falls back to .quoted so identifier quoting is never lost), so + downstream consumers see no behavioural difference. #} +{% macro wrap_column_for_struct_support(column_obj) %} + {%- set name = column_obj.name -%} + {%- set is_nested = elementary.bq_is_nested_identifier(name) -%} + {%- if is_nested -%} + {%- set quoted_segments = [] -%} + {%- for seg in name.split('.') -%} + {%- do quoted_segments.append('`' ~ seg ~ '`') -%} + {%- endfor -%} + {%- set quoted = quoted_segments | join('.') -%} + {%- set safe_alias = name | replace('.', '__') -%} + {%- else -%} + {%- set quoted = column_obj.quoted -%} + {%- set safe_alias = column_obj.quoted -%} + {%- endif -%} + {# `fields` only exists on BigQueryColumn; guard so non-BigQuery + adapters (Snowflake, Postgres, Redshift, ...) don't trip on the + attribute access. #} + {%- set fields = column_obj.fields if column_obj.fields is defined else [] -%} + {{ return({ + 'name': name, + 'column': column_obj.column, + 'quoted': quoted, + 'safe_alias': safe_alias, + 'is_nested': is_nested, + 'dtype': column_obj.dtype, + 'data_type': column_obj.data_type, + 'fields': fields, + }) }} +{% endmacro %} + +{# Walk a BigQuery STRUCT tree and collect dotted leaf names that are safe to + monitor without UNNEST — i.e. no REPEATED ancestor anywhere in the path, + and the leaf itself is not REPEATED. `BigQueryColumn.flatten()` returns leaf + columns with the leaf's own mode but discards ancestor modes, so this walker + is the source of truth for "which leaves can we project directly?". #} +{% macro bq_safe_leaf_names(column_obj) %} + {%- set safe_names = [] -%} + {%- if column_obj.mode != 'REPEATED' + and column_obj.fields is defined + and column_obj.fields | length > 0 -%} + {%- for child in column_obj.fields -%} + {%- do elementary._bq_walk_collect( + child, [column_obj.column], false, safe_names + ) -%} + {%- endfor -%} + {%- endif -%} + {{ return(safe_names) }} +{% endmacro %} + +{# Recursive helper: walks a google.cloud.bigquery.SchemaField subtree, + propagating whether any ancestor was REPEATED. Append safe leaf names to + `safe_names`. #} +{% macro _bq_walk_collect(field, prefix, has_repeated_ancestor, safe_names) %} + {%- set new_prefix = prefix + [field.name] -%} + {%- if field.fields | length == 0 -%} + {%- if not has_repeated_ancestor and field.mode != 'REPEATED' -%} + {%- do safe_names.append(new_prefix | join('.')) -%} + {%- endif -%} + {%- else -%} + {%- set new_has_repeated = has_repeated_ancestor or (field.mode == 'REPEATED') -%} + {%- for child in field.fields -%} + {%- do elementary._bq_walk_collect( + child, new_prefix, new_has_repeated, safe_names + ) -%} + {%- endfor -%} + {%- endif -%} +{% endmacro %} diff --git a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql index 3aa20fe19..30119655a 100644 --- a/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql +++ b/macros/edr/data_monitoring/monitors_query/dimension_monitoring_query.sql @@ -12,8 +12,16 @@ elementary.relation_to_full_name(monitored_table_relation) ) %} {% set dimensions_string = elementary.join_list(dimensions, "; ") %} + + {# Segment-quote nested struct paths (e.g. user.address.city) for BigQuery so + they compile correctly. Plain identifiers, expressions and non-BigQuery + adapters pass through unchanged. #} + {% set sql_dimensions = [] %} + {% for dimension in dimensions %} + {% do sql_dimensions.append(elementary.bq_segment_quote(dimension)) %} + {% endfor %} {% set concat_dimensions_sql_expression = elementary.list_concat_with_separator( - dimensions, "; " + sql_dimensions, "; " ) %} {% set timestamp_column = metric_properties.timestamp_column %} {%- set data_monitoring_metrics_relation = elementary.get_elementary_relation(