Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,20 @@ def __clean_append(self, token: Token, result_list: List) -> None:
result_list.append(name)

def __get_identifier_from_function(self, function_token: Function) -> List:
# Iterative DFS with an explicit stack. Previously this was recursive
# and a deeply-nested CLUSTERING_KEY could exhaust Python's stack,
# cascading into SQLAlchemy / urllib3 weakref finalizers and crashing
# the ingestion pod. Pushing children in reverse lets us pop them in
# left-to-right source order, preserving identifier ordering.
identifiers = []
for token in function_token.get_parameters():
stack = list(function_token.get_parameters())
stack.reverse()
while stack:
token = stack.pop()
if isinstance(token, Function):
# get column names from nested functions
identifiers.extend(self.__get_identifier_from_function(token))
children = list(token.get_parameters())
children.reverse()
stack.extend(children)
elif isinstance(token, Identifier):
self.__clean_append(token, identifiers)
return identifiers
Expand All @@ -493,6 +502,18 @@ def parse_column_name_from_expr(self, cluster_key_expr: str) -> Optional[List[st
elif isinstance(token, Identifier):
self.__clean_append(token, result)
return result
except RecursionError:
# Defence-in-depth: the iterative walker above eliminates our own
# recursion, but sqlparse.parse() or a future refactor could still
# trip the limit. Never let RecursionError escape this path —
# leaving it would poison SQLAlchemy pool / urllib3 finalizers
# and abort the process.
logger.warning(
"RecursionError parsing cluster key; skipping partition "
"columns for expression: %r",
cluster_key_expr,
)
return []
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to parse cluster key - {err}")
Expand Down
217 changes: 217 additions & 0 deletions ingestion/tests/unit/topology/database/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
snowflake unit tests
"""
# pylint: disable=line-too-long
import sys
from unittest import TestCase
from unittest.mock import MagicMock, Mock, PropertyMock, patch

import pytest
import sqlalchemy.types as sqltypes
from sqlparse.sql import Function

from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
Expand Down Expand Up @@ -853,3 +856,217 @@ def test_empty_tag_value_skipped_with_warning(self):
source.schema_tags_map["TEST_SCHEMA"][0],
{"tag_name": "TEST_TAG", "tag_value": "123"},
)


class _FakeFunction(Function):
"""Lightweight sqlparse.Function stand-in.

Overrides get_parameters() so the class can be chained without needing
a real Parenthesis token tree, and avoids MagicMock's per-access
attribute allocation that distorts memory measurements.
"""

def __init__(self, children):
super().__init__(tokens=[])
self._children = children

def get_parameters(self):
return self._children


def _build_nested_function_chain(depth: int) -> Function:
"""Build a Function-token chain of the given depth, iteratively.

Iterative construction matters: a recursive builder would hit the same
recursion limit we are trying to exercise. Each level's get_parameters()
returns the next level; the leaf returns [].
"""
current = _FakeFunction([])
for _ in range(depth):
current = _FakeFunction([current])
return current


from sqlparse.sql import Identifier as _SqlparseIdentifier # noqa: E402


Comment on lines +890 to +892
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s a mid-file import (from sqlparse.sql import Identifier as _SqlparseIdentifier) guarded by # noqa: E402. This will fight the repo’s Python formatting tooling (e.g., isort) and makes imports harder to reason about. Please move this import up with the other module imports (near the existing from sqlparse.sql import Function) and drop the E402 suppression.

Copilot uses AI. Check for mistakes.
class _IdentifierToken(_SqlparseIdentifier):
"""Minimal sqlparse.sql.Identifier stand-in.

Subclasses Identifier so isinstance() checks pass; overrides
get_real_name() to return the injected column name. Lighter than
MagicMock and keeps test behaviour deterministic.
"""

def __init__(self, name: str):
super().__init__(tokens=[])
self._name = name

def get_real_name(self):
return self._name


class TestSnowflakeClusterKeyDepthHandling:
"""TDD tests for the cluster-key parser fix.

Production symptom: metadata ingestion pod dies with
"RecursionError: maximum recursion depth exceeded" surfacing only through
weakref finalizers in SQLAlchemy's connection pool and snowflake-connector's
urllib3 pool. The primary RecursionError originates in
SnowflakeSource.__get_identifier_from_function while parsing a pathological
CLUSTERING_KEY expression. The public wrapper parse_column_name_from_expr
catches the error via a broad `except Exception` and silently returns None,
so the workflow continues through every bad-cluster-key table — accumulating
retained state (tracebacks, sqlparse trees, deferred finalizers) until the
pod is killed.

Required post-fix contract (asserted below):
1. Arbitrarily deep cluster keys parse without raising — no depth cap
that would silently truncate legitimate expressions. The recursive
walker is replaced with an iterative DFS using an explicit stack.
2. The iterative walker returns identifiers in the same left-to-right
DFS order as the original recursive implementation (semantic
equivalence on normal cluster keys).
3. Many deep tables in sequence process cleanly with no cumulative
state issues.
4. Defence-in-depth: if some other layer (e.g. sqlparse itself) ever
triggers RecursionError, the wrapper catches it explicitly, logs
an identifying warning, and returns [] rather than None.
5. Happy-path parsing of normal cluster keys is unaffected (verified by
the existing test_partition_parse_columns test).
"""

@pytest.fixture(autouse=True)
def _setup_source(self):
self.source = get_snowflake_sources()["not_incremental"]
self._original_recursion_limit = sys.getrecursionlimit()
# Small limit: if the walker is still recursive, anything deeper
# than ~100 levels will trip it. The iterative version must handle
# depth-500+ fine at this limit.
sys.setrecursionlimit(200)
yield
sys.setrecursionlimit(self._original_recursion_limit)

def test_very_deep_cluster_key_parses_without_raising(self):
"""Depth alone must not cause a failure anymore.

Pre-fix: recursive walker blows the stack well before depth 500 when
the recursion limit is 200. Post-fix: iterative walker handles any
depth Python has heap for.
"""
deep_function_tree = _build_nested_function_chain(depth=500)
fake_statement = MagicMock()
fake_statement.tokens = [deep_function_tree]

with patch(
"metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse",
return_value=[fake_statement],
):
result = self.source.parse_column_name_from_expr(
"LINEAR(any_depth_should_parse)"
)

# A chain of pure Function tokens with no Identifier leaves yields
# an empty list — parsed successfully, just no columns to report.
assert (
result == []
), f"Deep cluster key should parse successfully; got {result!r}."

def test_deep_cluster_key_with_identifiers_returns_them_in_dfs_order(self):
"""Iterative walker must be semantically equivalent on real input.

Builds a chain that mirrors a real pathologically-deep cluster key:
each level has a Function child (going deeper) and a sibling
Identifier at that level. The expected result is the identifiers in
left-to-right depth-first order — same order the recursive
implementation produced on sensible inputs.
"""
# Construct: f(col_0, f(col_1, f(col_2, ... f(col_499) ...)))
# Recursive-equivalent DFS order: [col_0, col_1, col_2, ..., col_499]
current = _FakeFunction([_IdentifierToken("col_499")])
for i in range(498, -1, -1):
current = _FakeFunction([_IdentifierToken(f"col_{i}"), current])

fake_statement = MagicMock()
fake_statement.tokens = [current]

with patch(
"metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse",
return_value=[fake_statement],
):
result = self.source.parse_column_name_from_expr(
"LINEAR(nested_cluster_key_with_columns)"
)

expected = [f"col_{i}" for i in range(500)]
assert result == expected, (
f"Iterative walker must preserve left-to-right DFS order. "
f"Expected first 3: {expected[:3]}, last 3: {expected[-3:]}. "
f"Got first 3: {result[:3] if result else result}, "
f"last 3: {result[-3:] if result and len(result) >= 3 else result}"
)

def test_many_deep_cluster_keys_in_sequence_all_parse_cleanly(self):
"""The loop keeps running across many tables, no cumulative state.

Mirrors the production scenario of many tables with deep cluster
keys processed back-to-back. Asserts every call completes and
returns a list (never None), and that no RecursionError leaks.
"""
deep_function_tree = _build_nested_function_chain(depth=500)
fake_statement = MagicMock()
fake_statement.tokens = [deep_function_tree]

n_tables = 50

with patch(
"metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse",
return_value=[fake_statement],
):
results = [
self.source.parse_column_name_from_expr(
f"LINEAR(cluster_key_table_{i}_deeply_nested)"
)
for i in range(n_tables)
]

assert all(r == [] for r in results), (
f"Every deep cluster key should parse to []; got distinct "
f"types: {set(type(r).__name__ for r in results)}"
)
assert all(
r is not None for r in results
), "No call should return None — None would indicate parse failure."

def test_recursion_error_from_another_layer_is_caught_and_logged(self, caplog):
"""Defence-in-depth for sqlparse / any future recursive caller.

The iterative walker we added eliminates our own recursion, but
sqlparse.parse() itself can still hit Python's limit on pathological
input — and any future refactor might reintroduce recursion elsewhere
in this path. The wrapper must catch RecursionError explicitly, log
a warning that identifies the offending expression, and return []
so the topology loop continues.
"""
import logging

offending_expr = "LINEAR(marker_for_defence_in_depth_42)"

with caplog.at_level(logging.WARNING, logger="metadata.Ingestion"), patch(
"metadata.ingestion.source.database.snowflake.metadata.sqlparse.parse",
side_effect=RecursionError("simulated downstream recursion"),
):
result = self.source.parse_column_name_from_expr(offending_expr)

assert result == [], (
f"RecursionError from any layer must be caught and translated "
f"to [] so the topology loop continues; got {result!r}."
)

warnings = [
r.getMessage() for r in caplog.records if r.levelno >= logging.WARNING
]
assert any("marker_for_defence_in_depth_42" in msg for msg in warnings), (
f"Warning must identify the offending expression so the "
f"affected table can be located. Got warnings: {warnings!r}"
)
Loading