From a523af13d303779d4249219b9cda7a71c03f69d1 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Wed, 22 Apr 2026 11:47:19 -0700 Subject: [PATCH 1/3] Revert "[SPARK-56387][PYTHON][FOLLOWUP] Use namedtuple instead of dict to have be backward compatible" This reverts commit c0deffe0188bb4ba252126d491b5e390435c9e48. --- python/pyspark/sql/_typing.pyi | 10 ++++++ python/pyspark/sql/connect/profiler.py | 2 +- python/pyspark/sql/profiler.py | 50 ++++++++++++-------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index b3b31962ad45b..ed62d85dc29a3 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -19,9 +19,11 @@ from typing import ( Any, Callable, + Dict, List, Optional, Tuple, + TypedDict, TypeVar, Union, ) @@ -29,8 +31,10 @@ from typing_extensions import Literal, Protocol import datetime import decimal +import pstats from pyspark._typing import PrimitiveType +from pyspark.profiler import CodeMapDict import pyspark.sql.types from pyspark.sql.column import Column from pyspark.sql.tvf_argument import TableValuedFunctionArgument @@ -81,3 +85,9 @@ class UserDefinedFunctionLike(Protocol): def returnType(self) -> pyspark.sql.types.DataType: ... def __call__(self, *args: ColumnOrName) -> Column: ... def asNondeterministic(self) -> UserDefinedFunctionLike: ... + +class ProfileResult(TypedDict, total=False): + perf: pstats.Stats + memory: CodeMapDict + +ProfileResults = Dict[Union[int, str], ProfileResult] diff --git a/python/pyspark/sql/connect/profiler.py b/python/pyspark/sql/connect/profiler.py index 73993aa128fb4..681287c40afc0 100644 --- a/python/pyspark/sql/connect/profiler.py +++ b/python/pyspark/sql/connect/profiler.py @@ -19,7 +19,7 @@ from pyspark.sql.profiler import ProfilerCollector, ProfileResultsParam if TYPE_CHECKING: - from pyspark.sql.profiler import ProfileResults + from pyspark.sql._typing import ProfileResults class ConnectProfilerCollector(ProfilerCollector): diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 6bbfdcc1db51c..0b0c038c8f72e 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -27,10 +27,10 @@ Dict, Iterable, Literal, - NamedTuple, Optional, Tuple, Union, + TYPE_CHECKING, overload, ) import warnings @@ -50,19 +50,8 @@ PStatsParam, ) - -class ProfileResult(NamedTuple): - perf: Optional[pstats.Stats] = None - memory: Optional[CodeMapDict] = None - - def __bool__(self) -> bool: - return self.perf is not None or self.memory is not None - - def replace(self, **kwargs: Any) -> "ProfileResult": - return self._replace(**kwargs) - - -ProfileResults = Dict[Union[int, str], ProfileResult] +if TYPE_CHECKING: + from pyspark.sql._typing import ProfileResults class _ProfileResultsParam(AccumulatorParam["ProfileResults"]): @@ -80,9 +69,16 @@ def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileRe if key not in value1: value1[key] = result else: - perf = PStatsParam.addInPlace(value1[key].perf, result.perf) - memory = MemUsageParam.addInPlace(value1[key].memory, result.memory) - value1[key] = ProfileResult(perf=perf, memory=memory) + perf = PStatsParam.addInPlace( + value1[key].get("perf", None), result.get("perf", None) + ) + if perf is not None: + value1[key]["perf"] = perf + memory = MemUsageParam.addInPlace( + value1[key].get("memory", None), result.get("memory", None) + ) + if memory is not None: + value1[key]["memory"] = memory return value1 @@ -112,7 +108,7 @@ def save(self) -> None: # make it picklable st.stream = None # type: ignore[attr-defined] st.strip_dirs() - self._accumulator.add({self._result_key: ProfileResult(perf=st)}) + self._accumulator.add({self._result_key: {"perf": st}}) def __enter__(self) -> "WorkerPerfProfiler": self.start() @@ -160,7 +156,7 @@ def save(self) -> None: filename: list(line_iterator) for filename, line_iterator in self._profiler.code_map.items() } - self._accumulator.add({self._result_key: ProfileResult(memory=codemap_dict)}) + self._accumulator.add({self._result_key: {"memory": codemap_dict}}) def __enter__(self) -> "WorkerMemoryProfiler": self.start() @@ -227,9 +223,9 @@ def show(id: Union[int, str]) -> None: def _perf_profile_results(self) -> Dict[Union[int, str], pstats.Stats]: with self._lock: return { - result_id: result.perf + result_id: result["perf"] for result_id, result in self._profile_results.items() - if result.perf is not None + if result.get("perf", None) is not None } def show_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: @@ -273,9 +269,9 @@ def show(id: Union[int, str]) -> None: def _memory_profile_results(self) -> Dict[Union[int, str], CodeMapDict]: with self._lock: return { - result_id: result.memory + result_id: result["memory"] for result_id, result in self._profile_results.items() - if result.memory is not None + if result.get("memory", None) is not None } @property @@ -369,12 +365,12 @@ def clear_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - self._profile_results[id] = self._profile_results[id].replace(perf=None) + self._profile_results[id].pop("perf", None) if not self._profile_results[id]: self._profile_results.pop(id) else: for id in list(self._profile_results.keys()): - self._profile_results[id] = self._profile_results[id].replace(perf=None) + self._profile_results[id].pop("perf", None) if not self._profile_results[id]: self._profile_results.pop(id) @@ -393,12 +389,12 @@ def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: with self._lock: if id is not None: if id in self._profile_results: - self._profile_results[id] = self._profile_results[id].replace(memory=None) + self._profile_results[id].pop("memory", None) if not self._profile_results[id]: self._profile_results.pop(id) else: for id in list(self._profile_results.keys()): - self._profile_results[id] = self._profile_results[id].replace(memory=None) + self._profile_results[id].pop("memory", None) if not self._profile_results[id]: self._profile_results.pop(id) From a4a35e80864d28fe59454ddcf68c5ae56843ac14 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Wed, 22 Apr 2026 14:56:32 -0700 Subject: [PATCH 2/3] Use a separate accumulator for profile --- python/pyspark/accumulators.py | 4 +- python/pyspark/sql/_typing.pyi | 4 +- python/pyspark/sql/connect/client/core.py | 2 +- python/pyspark/sql/connect/profiler.py | 6 +- python/pyspark/sql/profiler.py | 70 +++++++++++++++++++---- python/pyspark/sql/worker/utils.py | 17 ++++-- python/pyspark/worker.py | 27 ++++++--- 7 files changed, 101 insertions(+), 29 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index cd2eea5258c84..0bce2934cb81f 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -60,6 +60,7 @@ def _deserialize_accumulator( class SpecialAccumulatorIds: SQL_UDF_PROFIER = -1 + SQL_UDF_PROFIER_V2 = -2 class Accumulator(Generic[T]): @@ -298,7 +299,8 @@ def accum_updates() -> bool: num_updates = read_int(self.rfile) for _ in range(num_updates): aid, update = pickleSer._read_with_length(self.rfile) - _accumulatorRegistry[aid] += update + if aid in _accumulatorRegistry: + _accumulatorRegistry[aid] += update # Write a byte in acknowledgement self.wfile.write(struct.pack("!b", 1)) return False diff --git a/python/pyspark/sql/_typing.pyi b/python/pyspark/sql/_typing.pyi index ed62d85dc29a3..94e3ccf770939 100644 --- a/python/pyspark/sql/_typing.pyi +++ b/python/pyspark/sql/_typing.pyi @@ -86,8 +86,10 @@ class UserDefinedFunctionLike(Protocol): def __call__(self, *args: ColumnOrName) -> Column: ... def asNondeterministic(self) -> UserDefinedFunctionLike: ... +ProfileResults = Dict[Union[int, str], Tuple[Optional[pstats.Stats], Optional[CodeMapDict]]] + class ProfileResult(TypedDict, total=False): perf: pstats.Stats memory: CodeMapDict -ProfileResults = Dict[Union[int, str], ProfileResult] +ProfileResultsV2 = Dict[Union[int, str], ProfileResult] diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index bbc3452571976..925b114d070dd 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -1561,7 +1561,7 @@ def handle_response( if observed_metrics.name == "__python_accumulator__": for metric in observed_metrics.metrics: aid, update = pickleSer.loads(LiteralExpression._to_value(metric)) - if aid == SpecialAccumulatorIds.SQL_UDF_PROFIER: + if aid == SpecialAccumulatorIds.SQL_UDF_PROFIER_V2: self._profiler_collector._update(update) elif observed_metrics.name in observations: observation_result = observations[observed_metrics.name]._result diff --git a/python/pyspark/sql/connect/profiler.py b/python/pyspark/sql/connect/profiler.py index 681287c40afc0..d746316bb63b1 100644 --- a/python/pyspark/sql/connect/profiler.py +++ b/python/pyspark/sql/connect/profiler.py @@ -16,7 +16,7 @@ # from typing import TYPE_CHECKING -from pyspark.sql.profiler import ProfilerCollector, ProfileResultsParam +from pyspark.sql.profiler import ProfilerCollector, ProfileResultsParamV2 if TYPE_CHECKING: from pyspark.sql._typing import ProfileResults @@ -29,7 +29,7 @@ class ConnectProfilerCollector(ProfilerCollector): def __init__(self) -> None: super().__init__() - self._value = ProfileResultsParam.zero({}) + self._value = ProfileResultsParamV2.zero({}) @property def _profile_results(self) -> "ProfileResults": @@ -38,4 +38,4 @@ def _profile_results(self) -> "ProfileResults": def _update(self, update: "ProfileResults") -> None: with self._lock: - self._value = ProfileResultsParam.addInPlace(self._profile_results, update) + self._value = ProfileResultsParamV2.addInPlace(self._profile_results, update) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index 0b0c038c8f72e..d6094449e3ff5 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -51,20 +51,54 @@ ) if TYPE_CHECKING: - from pyspark.sql._typing import ProfileResults + from pyspark.sql._typing import ProfileResults, ProfileResultsV2 -class _ProfileResultsParam(AccumulatorParam["ProfileResults"]): +class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]): """ AccumulatorParam for profilers. """ @staticmethod - def zero(value: "ProfileResults") -> "ProfileResults": + def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]: + return value + + @staticmethod + def addInPlace( + value1: Optional["ProfileResults"], value2: Optional["ProfileResults"] + ) -> Optional["ProfileResults"]: + if value1 is None or len(value1) == 0: + value1 = {} + if value2 is None or len(value2) == 0: + value2 = {} + + value = value1.copy() + for key, (perf, mem, *_) in value2.items(): + if key in value1: + orig_perf, orig_mem, *_ = value1[key] + else: + orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None)) + value[key] = ( + PStatsParam.addInPlace(orig_perf, perf), + MemUsageParam.addInPlace(orig_mem, mem), + ) + return value + + +ProfileResultsParam = _ProfileResultsParam() + + +class _ProfileResultsParamV2(AccumulatorParam["ProfileResultsV2"]): + """ + AccumulatorParam for profilers. + """ + + @staticmethod + def zero(value: "ProfileResultsV2") -> "ProfileResultsV2": return {} @staticmethod - def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileResults": + def addInPlace(value1: "ProfileResultsV2", value2: "ProfileResultsV2") -> "ProfileResultsV2": for key, result in value2.items(): if key not in value1: value1[key] = result @@ -82,7 +116,7 @@ def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileRe return value1 -ProfileResultsParam = _ProfileResultsParam() +ProfileResultsParamV2 = _ProfileResultsParamV2() class WorkerPerfProfiler: @@ -91,9 +125,13 @@ class WorkerPerfProfiler: """ def __init__( - self, accumulator: Accumulator["ProfileResults"], result_key: Union[int, str] + self, + accumulator: Accumulator["ProfileResults"], + accumulator_v2: Accumulator["ProfileResultsV2"], + result_key: Union[int, str], ) -> None: self._accumulator = accumulator + self._accumulator_v2 = accumulator_v2 self._profiler = cProfile.Profile() self._result_key = result_key @@ -108,7 +146,13 @@ def save(self) -> None: # make it picklable st.stream = None # type: ignore[attr-defined] st.strip_dirs() - self._accumulator.add({self._result_key: {"perf": st}}) + self._accumulator.add({self._result_key: (st, None)}) + + st = pstats.Stats(self._profiler, stream=None) + # make it picklable + st.stream = None # type: ignore[attr-defined] + st.strip_dirs() + self._accumulator_v2.add({self._result_key: {"perf": st}}) def __enter__(self) -> "WorkerPerfProfiler": self.start() @@ -132,12 +176,14 @@ class WorkerMemoryProfiler: def __init__( self, accumulator: Accumulator["ProfileResults"], + accumulator_v2: Accumulator["ProfileResultsV2"], result_key: Union[int, str], func_or_code: Union[Callable, CodeType], ) -> None: from pyspark.memory_profiler_ext import UDFLineProfilerV2 self._accumulator = accumulator + self._accumulator_v2 = accumulator_v2 self._profiler = UDFLineProfilerV2() if isinstance(func_or_code, CodeType): self._profiler.add_code(func_or_code) @@ -156,7 +202,8 @@ def save(self) -> None: filename: list(line_iterator) for filename, line_iterator in self._profiler.code_map.items() } - self._accumulator.add({self._result_key: {"memory": codemap_dict}}) + self._accumulator.add({self._result_key: (None, codemap_dict)}) + self._accumulator_v2.add({self._result_key: {"memory": codemap_dict}}) def __enter__(self) -> "WorkerMemoryProfiler": self.start() @@ -402,11 +449,12 @@ def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None: class AccumulatorProfilerCollector(ProfilerCollector): def __init__(self) -> None: super().__init__() - if SpecialAccumulatorIds.SQL_UDF_PROFIER in _accumulatorRegistry: - self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER] + + if SpecialAccumulatorIds.SQL_UDF_PROFIER_V2 in _accumulatorRegistry: + self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER_V2] else: self._accumulator = Accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2 ) @property diff --git a/python/pyspark/sql/worker/utils.py b/python/pyspark/sql/worker/utils.py index 58f2dbb67f648..ecb54857655c2 100644 --- a/python/pyspark/sql/worker/utils.py +++ b/python/pyspark/sql/worker/utils.py @@ -24,7 +24,12 @@ _deserialize_accumulator, SpecialAccumulatorIds, ) -from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler, WorkerMemoryProfiler +from pyspark.sql.profiler import ( + ProfileResultsParam, + ProfileResultsParamV2, + WorkerPerfProfiler, + WorkerMemoryProfiler, +) from pyspark.serializers import ( read_int, write_int, @@ -67,7 +72,11 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None: _accumulatorRegistry.clear() accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + ) + + accumulator_v2 = _deserialize_accumulator( + SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2 ) if main.__module__ == "__main__": @@ -80,10 +89,10 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None: worker_module = worker_module.split(".")[-1] if conf.profiler == "perf": - with WorkerPerfProfiler(accumulator, worker_module): + with WorkerPerfProfiler(accumulator, accumulator_v2, worker_module): main(infile, outfile) elif conf.profiler == "memory": - with WorkerMemoryProfiler(accumulator, worker_module, main): + with WorkerMemoryProfiler(accumulator, accumulator_v2, worker_module, main): main(infile, outfile) else: main(infile, outfile) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 4bb81ae044ea6..4d7f6e90488a5 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1013,10 +1013,13 @@ def _is_iter_based(eval_type: int) -> bool: def wrap_perf_profiler(f, eval_type, result_id): - from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler + from pyspark.sql.profiler import ProfileResultsParam, ProfileResultsParamV2, WorkerPerfProfiler accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + ) + accumulator_v2 = _deserialize_accumulator( + SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2 ) if _is_iter_based(eval_type): @@ -1025,7 +1028,7 @@ def profiling_func(*args, **kwargs): iterator = iter(f(*args, **kwargs)) while True: try: - with WorkerPerfProfiler(accumulator, result_id): + with WorkerPerfProfiler(accumulator, accumulator_v2, result_id): item = next(iterator) yield item except StopIteration: @@ -1034,7 +1037,7 @@ def profiling_func(*args, **kwargs): else: def profiling_func(*args, **kwargs): - with WorkerPerfProfiler(accumulator, result_id): + with WorkerPerfProfiler(accumulator, accumulator_v2, result_id): ret = f(*args, **kwargs) return ret @@ -1042,7 +1045,11 @@ def profiling_func(*args, **kwargs): def wrap_memory_profiler(f, eval_type, result_id): - from pyspark.sql.profiler import ProfileResultsParam, WorkerMemoryProfiler + from pyspark.sql.profiler import ( + ProfileResultsParam, + ProfileResultsParamV2, + WorkerMemoryProfiler, + ) import pyspark.memory_profiler_ext @@ -1050,7 +1057,11 @@ def wrap_memory_profiler(f, eval_type, result_id): return f accumulator = _deserialize_accumulator( - SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam + SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam + ) + + accumulator_v2 = _deserialize_accumulator( + SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2 ) if _is_iter_based(eval_type): @@ -1061,7 +1072,7 @@ def profiling_func(*args, **kwargs): while True: try: - with WorkerMemoryProfiler(accumulator, result_id, g.gi_code): + with WorkerMemoryProfiler(accumulator, accumulator_v2, result_id, g.gi_code): item = next(iterator) yield item except StopIteration: @@ -1070,7 +1081,7 @@ def profiling_func(*args, **kwargs): else: def profiling_func(*args, **kwargs): - with WorkerMemoryProfiler(accumulator, result_id, f): + with WorkerMemoryProfiler(accumulator, accumulator_v2, result_id, f): ret = f(*args, **kwargs) return ret From d51090274c8923230985ef957785232e2ef07f9b Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Thu, 23 Apr 2026 15:44:32 -0700 Subject: [PATCH 3/3] Add commments --- python/pyspark/sql/profiler.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/pyspark/sql/profiler.py b/python/pyspark/sql/profiler.py index d6094449e3ff5..1e3a25e9c2cae 100644 --- a/python/pyspark/sql/profiler.py +++ b/python/pyspark/sql/profiler.py @@ -88,6 +88,12 @@ def addInPlace( ProfileResultsParam = _ProfileResultsParam() +# _ProfileResultsParam uses (perf, memory) tuple which is very difficult +# to extend. However, this is shared code between the server and the client +# for spark connect. In order to gradually migrate to dict implementation, +# we create a new AccumulatorParam on a new channel SQL_UDF_PROFIER_V2. +# We started this in 4.2.0. When we drop support for all versions before 4.2.0, +# we can remove _ProfileResultsParam and other old content. class _ProfileResultsParamV2(AccumulatorParam["ProfileResultsV2"]): """ AccumulatorParam for profilers.