Skip to content

Commit 8fea375

Browse files
committed
feat: Cache ResidualEvaluator
Fixes #2147 - Implement ResidualEvaluatorCache with LRU eviction and thread safety - Cache evaluators by partition spec, expression, case sensitivity, and schema - Fix mypy type annotations and add type ignore for cachetools decorator
1 parent 1f9c46b commit 8fea375

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

pyiceberg/expressions/visitors.py

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
import hashlib
1718
import math
19+
import threading
1820
from abc import ABC, abstractmethod
1921
from functools import singledispatch
2022
from typing import (
@@ -23,6 +25,7 @@
2325
Dict,
2426
Generic,
2527
List,
28+
Optional,
2629
Set,
2730
SupportsFloat,
2831
Tuple,
@@ -1976,11 +1979,123 @@ def residual_for(self, partition_data: Record) -> BooleanExpression:
19761979
return self.expr
19771980

19781981

1982+
# =============================================================================
1983+
# ADD THESE BEFORE THE ResidualEvaluator CLASS DEFINITION
1984+
# =============================================================================
1985+
1986+
_DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE = 128
1987+
1988+
1989+
class ResidualEvaluatorCache:
1990+
"""Thread-safe LRU cache for ResidualEvaluator instances.
1991+
1992+
Caches ResidualEvaluators to avoid repeated instantiation and initialization
1993+
overhead when scanning multiple data files with identical partition specs,
1994+
expressions, schemas, and case sensitivity settings.
1995+
"""
1996+
1997+
_cache: Dict[str, ResidualEvaluator]
1998+
_maxsize: int
1999+
_lock: threading.RLock
2000+
2001+
def __init__(self, maxsize: int = _DEFAULT_RESIDUAL_EVALUATOR_CACHE_SIZE) -> None:
2002+
"""Initialize the cache.
2003+
2004+
Args:
2005+
maxsize: Maximum number of evaluators to cache. Defaults to 128.
2006+
"""
2007+
self._cache = {}
2008+
self._maxsize = maxsize
2009+
self._lock = threading.RLock()
2010+
2011+
@staticmethod
2012+
def _make_key(
2013+
spec_id: int,
2014+
expr: BooleanExpression,
2015+
case_sensitive: bool,
2016+
schema_id: Optional[int] = None,
2017+
) -> str:
2018+
"""Create deterministic cache key from evaluator parameters.
2019+
2020+
Args:
2021+
spec_id: Partition spec identifier.
2022+
expr: Filter expression tree.
2023+
case_sensitive: Case-sensitive flag.
2024+
schema_id: Optional schema identifier.
2025+
2026+
Returns:
2027+
32-character MD5 hex string cache key.
2028+
"""
2029+
key_parts = f"{spec_id}#{repr(expr)}#{case_sensitive}#{schema_id}"
2030+
return hashlib.md5(key_parts.encode()).hexdigest()
2031+
2032+
def get(
2033+
self,
2034+
spec: PartitionSpec,
2035+
expr: BooleanExpression,
2036+
case_sensitive: bool,
2037+
schema: Schema,
2038+
) -> Optional[ResidualEvaluator]:
2039+
"""Retrieve cached evaluator if it exists.
2040+
2041+
Args:
2042+
spec: Partition specification.
2043+
expr: Filter expression.
2044+
case_sensitive: Case sensitivity flag.
2045+
schema: Table schema.
2046+
2047+
Returns:
2048+
Cached ResidualEvaluator or None.
2049+
"""
2050+
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
2051+
with self._lock:
2052+
return self._cache.get(cache_key)
2053+
2054+
def put(
2055+
self,
2056+
spec: PartitionSpec,
2057+
expr: BooleanExpression,
2058+
case_sensitive: bool,
2059+
schema: Schema,
2060+
evaluator: ResidualEvaluator,
2061+
) -> None:
2062+
"""Cache a ResidualEvaluator instance.
2063+
2064+
Args:
2065+
spec: Partition specification.
2066+
expr: Filter expression.
2067+
case_sensitive: Case sensitivity flag.
2068+
schema: Table schema.
2069+
evaluator: ResidualEvaluator to cache.
2070+
"""
2071+
cache_key = self._make_key(spec.spec_id, expr, case_sensitive, schema.schema_id)
2072+
with self._lock:
2073+
if len(self._cache) >= self._maxsize:
2074+
oldest_key = next(iter(self._cache))
2075+
del self._cache[oldest_key]
2076+
self._cache[cache_key] = evaluator
2077+
2078+
def clear(self) -> None:
2079+
"""Clear all cached evaluators."""
2080+
with self._lock:
2081+
self._cache.clear()
2082+
2083+
2084+
_residual_evaluator_cache = ResidualEvaluatorCache()
2085+
2086+
19792087
def residual_evaluator_of(
19802088
spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
19812089
) -> ResidualEvaluator:
1982-
return (
1983-
UnpartitionedResidualEvaluator(schema=schema, expr=expr)
1984-
if spec.is_unpartitioned()
1985-
else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
1986-
)
2090+
cached = _residual_evaluator_cache.get(spec, expr, case_sensitive, schema)
2091+
if cached is not None:
2092+
return cached
2093+
2094+
evaluator: ResidualEvaluator
2095+
if spec.is_unpartitioned():
2096+
evaluator = UnpartitionedResidualEvaluator(schema=schema, expr=expr)
2097+
else:
2098+
evaluator = ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
2099+
2100+
_residual_evaluator_cache.put(spec, expr, case_sensitive, schema, evaluator)
2101+
return evaluator

tests/utils/test_manifest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
@pytest.fixture(autouse=True)
4949
def clear_global_manifests_cache() -> None:
5050
# Clear the global cache before each test
51-
_manifests.cache_clear()
51+
_manifests.cache_clear() # type: ignore
5252

5353

5454
def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: Dict[str, str]) -> None:

0 commit comments

Comments
 (0)