Conversation
MultiDBSource connectors (Snowflake, BigQuery, Redshift, ...) call CommonDbSourceService.set_inspector() once per database to re-point the Inspector. The previous engine was simply reassigned without .dispose(), leaking its QueuePool, dialect metadata, Inspector info_cache, and any per-engine `event.listens_for(engine, "connect")` closures that capture self (e.g. SnowflakeSource.set_session_query_tag). Across N databases this accumulates linearly — a significant contributor to long-running- pipeline memory growth. kill_active_connections() was already in place but only disposes when the pool has active/idle checkouts, which misses the idle-between- switches case. Now set_inspector unconditionally disposes the old engine, guarded for None (first call) and for dispose failures (a teardown error must not block the new engine assignment). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
self.context.get_global().deleted_tables is a process-wide list extended per-schema by _get_table_names_and_types and _get_stream_names_and_types. In incremental mode, ACCOUNT_USAGE.TABLES returns soft-deleted rows historically, so the list grows across every schema of every database and is consumed only at the end of each DB's topology by mark_tables_as_deleted. Two consequences this fix addresses: 1. Unbounded memory growth. In deployments with ~39 databases and tens of thousands of ghost tables, the list accumulates hundreds of MB of FQN strings, held for the whole run. 2. Cross-database delete replay. Entries from database N linger in the list while database N+1 is being processed, so delete-by-name calls at the end of DB N+1 receive FQNs belonging to DB N — potentially marking tables as deleted that still exist. Adds _reset_per_database_state() called at the start of each DB cycle in both configured-DB and multi-DB branches of get_database_names. Uses list.clear() in place (rather than reassignment) so any future code that caches a local reference to the list sees the cleared state consistently. Thread safety is preserved by the topology runner's ThreadPoolExecutor context manager, which joins all schema-level workers before returning to the producer generator. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| @@ -153,6 +153,16 @@ def set_inspector(self, database_name: str) -> None: | |||
| """ | |||
|
|
|||
| kill_active_connections(self.engine) | |||
There was a problem hiding this comment.
💡 Performance: Engine may be disposed twice per database switch
kill_active_connections() already calls engine.dispose() when there are active/idle connections (see connections_utils.py). The new code at line 163 then calls self.engine.dispose() unconditionally, leading to a double-dispose on the same engine. SQLAlchemy's dispose() is idempotent so this is safe, but the redundant pool teardown is unnecessary work.
Suggested fix:
Move `kill_active_connections` into the `if self.engine is not None` guard and call `dispose()` only once, or remove the conditional dispose from `kill_active_connections` since the caller now handles it:
if self.engine is not None:
kill_active_connections(self.engine)
try:
self.engine.dispose()
except Exception as exc:
logger.warning(...)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
There was a problem hiding this comment.
Pull request overview
This PR targets long-running, multi-database ingestion memory growth by (1) scoping Snowflake incremental deleted_tables tracking to the current database and (2) disposing SQLAlchemy engines when switching databases, with unit tests to prevent regressions.
Changes:
- Reset Snowflake per-database state (
deleted_tables) before yielding each database fromget_database_names(). - Dispose the previous SQLAlchemy engine during
CommonDbSourceService.set_inspector()DB switches. - Add unit tests covering deleted-table resets (Snowflake) and engine disposal behavior (common DB source).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py |
Adds a per-database reset hook to clear deleted_tables before each DB is processed. |
ingestion/src/metadata/ingestion/source/database/common_db_source.py |
Disposes the previous engine on DB switch to reduce resource retention across multi-DB runs. |
ingestion/tests/unit/topology/database/test_snowflake.py |
Adds regression tests asserting deleted_tables is cleared between DB yields (configured DB and discovery paths). |
ingestion/tests/unit/topology/database/test_common_db_source.py |
Adds tests verifying engine disposal (and resilience to dispose failures) on DB switch. |
Comments suppressed due to low confidence (1)
ingestion/src/metadata/ingestion/source/database/common_db_source.py:173
set_inspectorreplacesself.enginebut does not rebind/replaceself.session(created in__init__viacreate_and_bind_thread_safe_session(self.engine)). This keeps a strong reference to the old Engine (and its dialect/event listeners), undermining the intent of disposing the old engine and potentially causing any session usage to target the wrong DB. Consider callingself.session.remove()(if present) and recreatingself.sessionbound to the new engine afterget_connection(...).
new_service_connection = deepcopy(self.service_connection)
new_service_connection.database = database_name
self.engine = get_connection(new_service_connection)
self._connection_map = {} # Lazy init as well
self._inspector_map = {}
| kill_active_connections(self.engine) | ||
| # Release the previous engine's pool, dialect state, Inspector | ||
| # info_cache, and any per-engine event listeners that capture `self` | ||
| # (e.g. SnowflakeSource.set_session_query_tag). Without this, each | ||
| # database switch leaks ~tens of MB → linear growth on multi-DB | ||
| # pipelines. | ||
| if self.engine is not None: | ||
| try: |
There was a problem hiding this comment.
kill_active_connections(self.engine) is executed unconditionally. If set_inspector is ever called before an engine is initialized (or after it has been cleared), kill_active_connections will log a warning due to engine being None. Consider guarding this call with if self.engine is not None: (similar to the new dispose guard) to avoid noisy logs and unnecessary exception handling in normal flows.
| try: | ||
| self.engine.dispose() | ||
| except Exception as exc: | ||
| logger.warning(f"Failed to dispose previous engine on DB switch: {exc}") |
There was a problem hiding this comment.
The warning on dispose failure drops the traceback, which will make diagnosing pool teardown issues harder in production. Consider logging with exc_info=True (or logger.exception(...)) so the stack trace is captured while still proceeding with the DB switch.
| logger.warning(f"Failed to dispose previous engine on DB switch: {exc}") | |
| logger.warning( | |
| f"Failed to dispose previous engine on DB switch: {exc}", | |
| exc_info=True, | |
| ) |
SQLAlchemy's Inspector.info_cache is an unbounded dict populated by every @reflection.cache-decorated method (get_schema_columns, get_pk_constraint, get_foreign_keys, get_unique_constraints, get_columns, get_view_definition, get_table_ddl, ...). Inspectors are retained per-thread in _inspector_map until close(), so the cache accumulates every schema's entries for the full duration of each database. On Snowflake specifically, get_schema_columns materializes the full column catalog of a schema into a single cache entry — measured at ~50 MB per wide schema. In the 52k-tables / 50-cols-per-table worst case observed in production, one inspector's cache crosses 1 GB before any DB-boundary cleanup can run, OOM-killing the pod mid-database. Multi-threaded ingestion multiplies this by the worker count. Adds _clear_thread_reflection_cache() which clears info_cache for the CURRENT thread's inspector only — other threads may be mid-reflection and clearing their cache would race with in-flight reads. Wired into get_tables_name_and_type(), the per-schema producer hook in CommonDbSourceService, so prior-schema entries are reclaimed before the next schema's entries are populated. The only runtime cost is re-querying _current_database_schema once per schema, which is a cheap single-row query. Benefits every SQL-based connector (Snowflake, BigQuery, Redshift, ...); Snowflake sees the biggest improvement given its dense reflection path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review 👍 Approved with suggestions 0 resolved / 1 findingsDatabase ingestion logic has been improved to streamline resource handling. Ensure the redundant engine disposal in 💡 Performance: Engine may be disposed twice per database switch📄 ingestion/src/metadata/ingestion/source/database/common_db_source.py:155 📄 ingestion/src/metadata/ingestion/source/database/common_db_source.py:161-163
Suggested fix🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
🟡 Playwright Results — all passed (13 flaky)✅ 3032 passed · ❌ 0 failed · 🟡 13 flaky · ⏭️ 88 skipped
🟡 13 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
_clear_thread_reflection_cachetoCommonDbSourceServiceto flush SQLAlchemy's unboundedinfo_cachebetween schema boundaries.TestSchemaBoundaryReflectionCacheClearto verify cache clearing logic and ensure thread-safe operation by isolating inspector cache purges.This will update automatically on new commits.