From 02e6297ec7409682fde091c6cfcf5d17ef2d8acb Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 21 Apr 2026 23:02:34 -0700 Subject: [PATCH 1/2] Add cluster key tests - Snowflake --- .../source/database/snowflake/metadata.py | 38 ++- .../unit/topology/database/test_snowflake.py | 217 ++++++++++++++++++ 2 files changed, 252 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index d3688fd92528..088494ae7324 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -164,6 +164,12 @@ def __init__( logger = ingestion_logger() +# Preview length for a cluster-key expression in the fallback warning log. +# Used only when a genuinely malformed expression trips the outer exception +# handler; normal deep-but-well-formed expressions are walked iteratively +# and never hit that path. +_CLUSTER_KEY_LOG_PREVIEW = 200 + # pylint: disable=protected-access SnowflakeDialect._json_deserializer = json.loads SnowflakeDialect.get_table_names = get_table_names @@ -471,11 +477,20 @@ def __clean_append(self, token: Token, result_list: List) -> None: result_list.append(name) def __get_identifier_from_function(self, function_token: Function) -> List: + # Iterative DFS with an explicit stack. Previously this was recursive + # and a deeply-nested CLUSTERING_KEY could exhaust Python's stack, + # cascading into SQLAlchemy / urllib3 weakref finalizers and crashing + # the ingestion pod. Pushing children in reverse lets us pop them in + # left-to-right source order, preserving identifier ordering. identifiers = [] - for token in function_token.get_parameters(): + stack = list(function_token.get_parameters()) + stack.reverse() + while stack: + token = stack.pop() if isinstance(token, Function): - # get column names from nested functions - identifiers.extend(self.__get_identifier_from_function(token)) + children = list(token.get_parameters()) + children.reverse() + stack.extend(children) elif isinstance(token, Identifier): self.__clean_append(token, identifiers) return identifiers @@ -493,6 +508,23 @@ def parse_column_name_from_expr(self, cluster_key_expr: str) -> Optional[List[st elif isinstance(token, Identifier): self.__clean_append(token, result) return result + except RecursionError: + # Defence-in-depth: the iterative walker above eliminates our own + # recursion, but sqlparse.parse() or a future refactor could still + # trip the limit. Never let RecursionError escape this path — + # leaving it would poison SQLAlchemy pool / urllib3 finalizers + # and abort the process. + truncated = ( + cluster_key_expr[:_CLUSTER_KEY_LOG_PREVIEW] + "..." + if len(cluster_key_expr) > _CLUSTER_KEY_LOG_PREVIEW + else cluster_key_expr + ) + logger.warning( + "RecursionError parsing cluster key; skipping partition " + "columns for expression: %r", + truncated, + ) + return [] except Exception as err: logger.debug(traceback.format_exc()) logger.warning(f"Failed to parse cluster key - {err}") diff --git a/ingestion/tests/unit/topology/database/test_snowflake.py b/ingestion/tests/unit/topology/database/test_snowflake.py index 0e0d7674c917..d057a4f95c66 100644 --- a/ingestion/tests/unit/topology/database/test_snowflake.py +++ b/ingestion/tests/unit/topology/database/test_snowflake.py @@ -13,10 +13,13 @@ snowflake unit tests """ # pylint: disable=line-too-long +import sys from unittest import TestCase from unittest.mock import MagicMock, Mock, PropertyMock, patch +import pytest import sqlalchemy.types as sqltypes +from sqlparse.sql import Function from metadata.generated.schema.entity.data.table import TableType from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( @@ -853,3 +856,217 @@ def test_empty_tag_value_skipped_with_warning(self): source.schema_tags_map["TEST_SCHEMA"][0], {"tag_name": "TEST_TAG", "tag_value": "123"}, ) + + +class _FakeFunction(Function): + """Lightweight sqlparse.Function stand-in. + + Overrides get_parameters() so the class can be chained without needing + a real Parenthesis token tree, and avoids MagicMock's per-access + attribute allocation that distorts memory measurements. + """ + + def __init__(self, children): + super().__init__(tokens=[]) + self._children = children + + def get_parameters(self): + return self._children + + +def _build_nested_function_chain(depth: int) -> Function: + """Build a Function-token chain of the given depth, iteratively. + + Iterative construction matters: a recursive builder would hit the same + recursion limit we are trying to exercise. Each level's get_parameters() + returns the next level; the leaf returns []. + """ + current = _FakeFunction([]) + for _ in range(depth): + current = _FakeFunction([current]) + return current + + +from sqlparse.sql import Identifier as _SqlparseIdentifier # noqa: E402 + + +class _IdentifierToken(_SqlparseIdentifier): + """Minimal sqlparse.sql.Identifier stand-in. + + Subclasses Identifier so isinstance() checks pass; overrides + get_real_name() to return the injected column name. Lighter than + MagicMock and keeps test behaviour deterministic. + """ + + def __init__(self, name: str): + super().__init__(tokens=[]) + self._name = name + + def get_real_name(self): + return self._name + + +class TestSnowflakeClusterKeyDepthHandling: + """TDD tests for the cluster-key parser fix. + + Production symptom: metadata ingestion pod dies with + "RecursionError: maximum recursion depth exceeded" surfacing only through + weakref finalizers in SQLAlchemy's connection pool and snowflake-connector's + urllib3 pool. The primary RecursionError originates in + SnowflakeSource.__get_identifier_from_function while parsing a pathological + CLUSTERING_KEY expression. The public wrapper parse_column_name_from_expr + catches the error via a broad `except Exception` and silently returns None, + so the workflow continues through every bad-cluster-key table — accumulating + retained state (tracebacks, sqlparse trees, deferred finalizers) until the + pod is killed. + + Required post-fix contract (asserted below): + 1. Arbitrarily deep cluster keys parse without raising — no depth cap + that would silently truncate legitimate expressions. The recursive + walker is replaced with an iterative DFS using an explicit stack. + 2. The iterative walker returns identifiers in the same left-to-right + DFS order as the original recursive implementation (semantic + equivalence on normal cluster keys). + 3. Many deep tables in sequence process cleanly with no cumulative + state issues. + 4. Defence-in-depth: if some other layer (e.g. sqlparse itself) ever + triggers RecursionError, the wrapper catches it explicitly, logs + an identifying warning, and returns [] rather than None. + 5. Happy-path parsing of normal cluster keys is unaffected (verified by + the existing test_partition_parse_columns test). + """ + + @pytest.fixture(autouse=True) + def _setup_source(self): + self.source = get_snowflake_sources()["not_incremental"] + self._original_recursion_limit = sys.getrecursionlimit() + # Small limit: if the walker is still recursive, anything deeper + # than ~100 levels will trip it. The iterative version must handle + # depth-500+ fine at this limit. + sys.setrecursionlimit(200) + yield + sys.setrecursionlimit(self._original_recursion_limit) + + def test_very_deep_cluster_key_parses_without_raising(self): + """Depth alone must not cause a failure anymore. + + Pre-fix: recursive walker blows the stack well before depth 500 when + the recursion limit is 200. Post-fix: iterative walker handles any + depth Python has heap for. + """ + deep_function_tree = _build_nested_function_chain(depth=500) + fake_statement = MagicMock() + fake_statement.tokens = [deep_function_tree] + + with patch( + "metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse", + return_value=[fake_statement], + ): + result = self.source.parse_column_name_from_expr( + "LINEAR(any_depth_should_parse)" + ) + + # A chain of pure Function tokens with no Identifier leaves yields + # an empty list — parsed successfully, just no columns to report. + assert ( + result == [] + ), f"Deep cluster key should parse successfully; got {result!r}." + + def test_deep_cluster_key_with_identifiers_returns_them_in_dfs_order(self): + """Iterative walker must be semantically equivalent on real input. + + Builds a chain that mirrors a real pathologically-deep cluster key: + each level has a Function child (going deeper) and a sibling + Identifier at that level. The expected result is the identifiers in + left-to-right depth-first order — same order the recursive + implementation produced on sensible inputs. + """ + # Construct: f(col_0, f(col_1, f(col_2, ... f(col_499) ...))) + # Recursive-equivalent DFS order: [col_0, col_1, col_2, ..., col_499] + current = _FakeFunction([_IdentifierToken("col_499")]) + for i in range(498, -1, -1): + current = _FakeFunction([_IdentifierToken(f"col_{i}"), current]) + + fake_statement = MagicMock() + fake_statement.tokens = [current] + + with patch( + "metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse", + return_value=[fake_statement], + ): + result = self.source.parse_column_name_from_expr( + "LINEAR(nested_cluster_key_with_columns)" + ) + + expected = [f"col_{i}" for i in range(500)] + assert result == expected, ( + f"Iterative walker must preserve left-to-right DFS order. " + f"Expected first 3: {expected[:3]}, last 3: {expected[-3:]}. " + f"Got first 3: {result[:3] if result else result}, " + f"last 3: {result[-3:] if result and len(result) >= 3 else result}" + ) + + def test_many_deep_cluster_keys_in_sequence_all_parse_cleanly(self): + """The loop keeps running across many tables, no cumulative state. + + Mirrors the production scenario of many tables with deep cluster + keys processed back-to-back. Asserts every call completes and + returns a list (never None), and that no RecursionError leaks. + """ + deep_function_tree = _build_nested_function_chain(depth=500) + fake_statement = MagicMock() + fake_statement.tokens = [deep_function_tree] + + n_tables = 50 + + with patch( + "metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse", + return_value=[fake_statement], + ): + results = [ + self.source.parse_column_name_from_expr( + f"LINEAR(cluster_key_table_{i}_deeply_nested)" + ) + for i in range(n_tables) + ] + + assert all(r == [] for r in results), ( + f"Every deep cluster key should parse to []; got distinct " + f"types: {set(type(r).__name__ for r in results)}" + ) + assert all( + r is not None for r in results + ), "No call should return None — None would indicate parse failure." + + def test_recursion_error_from_another_layer_is_caught_and_logged(self, caplog): + """Defence-in-depth for sqlparse / any future recursive caller. + + The iterative walker we added eliminates our own recursion, but + sqlparse.parse() itself can still hit Python's limit on pathological + input — and any future refactor might reintroduce recursion elsewhere + in this path. The wrapper must catch RecursionError explicitly, log + a warning that identifies the offending expression, and return [] + so the topology loop continues. + """ + import logging + + offending_expr = "LINEAR(marker_for_defence_in_depth_42)" + + with caplog.at_level(logging.WARNING, logger="metadata.Ingestion"), patch( + "metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse", + side_effect=RecursionError("simulated downstream recursion"), + ): + result = self.source.parse_column_name_from_expr(offending_expr) + + assert result == [], ( + f"RecursionError from any layer must be caught and translated " + f"to [] so the topology loop continues; got {result!r}." + ) + + warnings = [ + r.getMessage() for r in caplog.records if r.levelno >= logging.WARNING + ] + assert any("marker_for_defence_in_depth_42" in msg for msg in warnings), ( + f"Warning must identify the offending expression so the " + f"affected table can be located. Got warnings: {warnings!r}" + ) From 630b75a1fd5e406b83bd33d2aa5433c467182daa Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 21 Apr 2026 23:05:04 -0700 Subject: [PATCH 2/2] remove the unnecessary limit --- .../ingestion/source/database/snowflake/metadata.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 088494ae7324..cf7bd93171e5 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -164,12 +164,6 @@ def __init__( logger = ingestion_logger() -# Preview length for a cluster-key expression in the fallback warning log. -# Used only when a genuinely malformed expression trips the outer exception -# handler; normal deep-but-well-formed expressions are walked iteratively -# and never hit that path. -_CLUSTER_KEY_LOG_PREVIEW = 200 - # pylint: disable=protected-access SnowflakeDialect._json_deserializer = json.loads SnowflakeDialect.get_table_names = get_table_names @@ -514,15 +508,10 @@ def parse_column_name_from_expr(self, cluster_key_expr: str) -> Optional[List[st # trip the limit. Never let RecursionError escape this path — # leaving it would poison SQLAlchemy pool / urllib3 finalizers # and abort the process. - truncated = ( - cluster_key_expr[:_CLUSTER_KEY_LOG_PREVIEW] + "..." - if len(cluster_key_expr) > _CLUSTER_KEY_LOG_PREVIEW - else cluster_key_expr - ) logger.warning( "RecursionError parsing cluster key; skipping partition " "columns for expression: %r", - truncated, + cluster_key_expr, ) return [] except Exception as err: