Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _deserialize_accumulator(

class SpecialAccumulatorIds:
SQL_UDF_PROFIER = -1
SQL_UDF_PROFIER_V2 = -2


class Accumulator(Generic[T]):
Expand Down Expand Up @@ -298,7 +299,8 @@ def accum_updates() -> bool:
num_updates = read_int(self.rfile)
for _ in range(num_updates):
aid, update = pickleSer._read_with_length(self.rfile)
_accumulatorRegistry[aid] += update
if aid in _accumulatorRegistry:
_accumulatorRegistry[aid] += update
# Write a byte in acknowledgement
self.wfile.write(struct.pack("!b", 1))
return False
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/_typing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Tuple,
TypedDict,
TypeVar,
Union,
)
from typing_extensions import Literal, Protocol

import datetime
import decimal
import pstats

from pyspark._typing import PrimitiveType
from pyspark.profiler import CodeMapDict
import pyspark.sql.types
from pyspark.sql.column import Column
from pyspark.sql.tvf_argument import TableValuedFunctionArgument
Expand Down Expand Up @@ -81,3 +85,11 @@ class UserDefinedFunctionLike(Protocol):
def returnType(self) -> pyspark.sql.types.DataType: ...
def __call__(self, *args: ColumnOrName) -> Column: ...
def asNondeterministic(self) -> UserDefinedFunctionLike: ...

ProfileResults = Dict[Union[int, str], Tuple[Optional[pstats.Stats], Optional[CodeMapDict]]]

class ProfileResult(TypedDict, total=False):
perf: pstats.Stats
memory: CodeMapDict

ProfileResultsV2 = Dict[Union[int, str], ProfileResult]
2 changes: 1 addition & 1 deletion python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1561,7 +1561,7 @@ def handle_response(
if observed_metrics.name == "__python_accumulator__":
for metric in observed_metrics.metrics:
aid, update = pickleSer.loads(LiteralExpression._to_value(metric))
if aid == SpecialAccumulatorIds.SQL_UDF_PROFIER:
if aid == SpecialAccumulatorIds.SQL_UDF_PROFIER_V2:
self._profiler_collector._update(update)
elif observed_metrics.name in observations:
observation_result = observations[observed_metrics.name]._result
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/connect/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
#
from typing import TYPE_CHECKING

from pyspark.sql.profiler import ProfilerCollector, ProfileResultsParam
from pyspark.sql.profiler import ProfilerCollector, ProfileResultsParamV2

if TYPE_CHECKING:
from pyspark.sql.profiler import ProfileResults
from pyspark.sql._typing import ProfileResults


class ConnectProfilerCollector(ProfilerCollector):
Expand All @@ -29,7 +29,7 @@ class ConnectProfilerCollector(ProfilerCollector):

def __init__(self) -> None:
super().__init__()
self._value = ProfileResultsParam.zero({})
self._value = ProfileResultsParamV2.zero({})

@property
def _profile_results(self) -> "ProfileResults":
Expand All @@ -38,4 +38,4 @@ def _profile_results(self) -> "ProfileResults":

def _update(self, update: "ProfileResults") -> None:
with self._lock:
self._value = ProfileResultsParam.addInPlace(self._profile_results, update)
self._value = ProfileResultsParamV2.addInPlace(self._profile_results, update)
110 changes: 80 additions & 30 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
Dict,
Iterable,
Literal,
NamedTuple,
Optional,
Tuple,
Union,
TYPE_CHECKING,
overload,
)
import warnings
Expand All @@ -50,43 +50,79 @@
PStatsParam,
)

if TYPE_CHECKING:
from pyspark.sql._typing import ProfileResults, ProfileResultsV2

class ProfileResult(NamedTuple):
perf: Optional[pstats.Stats] = None
memory: Optional[CodeMapDict] = None

def __bool__(self) -> bool:
return self.perf is not None or self.memory is not None
class _ProfileResultsParam(AccumulatorParam[Optional["ProfileResults"]]):
"""
AccumulatorParam for profilers.
"""

@staticmethod
def zero(value: Optional["ProfileResults"]) -> Optional["ProfileResults"]:
return value

def replace(self, **kwargs: Any) -> "ProfileResult":
return self._replace(**kwargs)
@staticmethod
def addInPlace(
value1: Optional["ProfileResults"], value2: Optional["ProfileResults"]
) -> Optional["ProfileResults"]:
if value1 is None or len(value1) == 0:
value1 = {}
if value2 is None or len(value2) == 0:
value2 = {}

value = value1.copy()
for key, (perf, mem, *_) in value2.items():
if key in value1:
orig_perf, orig_mem, *_ = value1[key]
else:
orig_perf, orig_mem = (PStatsParam.zero(None), MemUsageParam.zero(None))
value[key] = (
PStatsParam.addInPlace(orig_perf, perf),
MemUsageParam.addInPlace(orig_mem, mem),
)
return value


ProfileResults = Dict[Union[int, str], ProfileResult]
ProfileResultsParam = _ProfileResultsParam()


class _ProfileResultsParam(AccumulatorParam["ProfileResults"]):
# _ProfileResultsParam uses (perf, memory) tuple which is very difficult
# to extend. However, this is shared code between the server and the client
# for spark connect. In order to gradually migrate to dict implementation,
# we create a new AccumulatorParam on a new channel SQL_UDF_PROFIER_V2.
# We started this in 4.2.0. When we drop support for all versions before 4.2.0,
# we can remove _ProfileResultsParam and other old content.
class _ProfileResultsParamV2(AccumulatorParam["ProfileResultsV2"]):
"""
AccumulatorParam for profilers.
"""

@staticmethod
def zero(value: "ProfileResults") -> "ProfileResults":
def zero(value: "ProfileResultsV2") -> "ProfileResultsV2":
return {}

@staticmethod
def addInPlace(value1: "ProfileResults", value2: "ProfileResults") -> "ProfileResults":
def addInPlace(value1: "ProfileResultsV2", value2: "ProfileResultsV2") -> "ProfileResultsV2":
for key, result in value2.items():
if key not in value1:
value1[key] = result
else:
perf = PStatsParam.addInPlace(value1[key].perf, result.perf)
memory = MemUsageParam.addInPlace(value1[key].memory, result.memory)
value1[key] = ProfileResult(perf=perf, memory=memory)
perf = PStatsParam.addInPlace(
value1[key].get("perf", None), result.get("perf", None)
)
if perf is not None:
value1[key]["perf"] = perf
memory = MemUsageParam.addInPlace(
value1[key].get("memory", None), result.get("memory", None)
)
if memory is not None:
value1[key]["memory"] = memory
return value1


ProfileResultsParam = _ProfileResultsParam()
ProfileResultsParamV2 = _ProfileResultsParamV2()


class WorkerPerfProfiler:
Expand All @@ -95,9 +131,13 @@ class WorkerPerfProfiler:
"""

def __init__(
self, accumulator: Accumulator["ProfileResults"], result_key: Union[int, str]
self,
accumulator: Accumulator["ProfileResults"],
accumulator_v2: Accumulator["ProfileResultsV2"],
result_key: Union[int, str],
) -> None:
self._accumulator = accumulator
self._accumulator_v2 = accumulator_v2
self._profiler = cProfile.Profile()
self._result_key = result_key

Expand All @@ -112,7 +152,13 @@ def save(self) -> None:
# make it picklable
st.stream = None # type: ignore[attr-defined]
st.strip_dirs()
self._accumulator.add({self._result_key: ProfileResult(perf=st)})
self._accumulator.add({self._result_key: (st, None)})

st = pstats.Stats(self._profiler, stream=None)
# make it picklable
st.stream = None # type: ignore[attr-defined]
st.strip_dirs()
self._accumulator_v2.add({self._result_key: {"perf": st}})

def __enter__(self) -> "WorkerPerfProfiler":
self.start()
Expand All @@ -136,12 +182,14 @@ class WorkerMemoryProfiler:
def __init__(
self,
accumulator: Accumulator["ProfileResults"],
accumulator_v2: Accumulator["ProfileResultsV2"],
result_key: Union[int, str],
func_or_code: Union[Callable, CodeType],
) -> None:
from pyspark.memory_profiler_ext import UDFLineProfilerV2

self._accumulator = accumulator
self._accumulator_v2 = accumulator_v2
self._profiler = UDFLineProfilerV2()
if isinstance(func_or_code, CodeType):
self._profiler.add_code(func_or_code)
Expand All @@ -160,7 +208,8 @@ def save(self) -> None:
filename: list(line_iterator)
for filename, line_iterator in self._profiler.code_map.items()
}
self._accumulator.add({self._result_key: ProfileResult(memory=codemap_dict)})
self._accumulator.add({self._result_key: (None, codemap_dict)})
self._accumulator_v2.add({self._result_key: {"memory": codemap_dict}})

def __enter__(self) -> "WorkerMemoryProfiler":
self.start()
Expand Down Expand Up @@ -227,9 +276,9 @@ def show(id: Union[int, str]) -> None:
def _perf_profile_results(self) -> Dict[Union[int, str], pstats.Stats]:
with self._lock:
return {
result_id: result.perf
result_id: result["perf"]
for result_id, result in self._profile_results.items()
if result.perf is not None
if result.get("perf", None) is not None
}

def show_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None:
Expand Down Expand Up @@ -273,9 +322,9 @@ def show(id: Union[int, str]) -> None:
def _memory_profile_results(self) -> Dict[Union[int, str], CodeMapDict]:
with self._lock:
return {
result_id: result.memory
result_id: result["memory"]
for result_id, result in self._profile_results.items()
if result.memory is not None
if result.get("memory", None) is not None
}

@property
Expand Down Expand Up @@ -369,12 +418,12 @@ def clear_perf_profiles(self, id: Optional[Union[int, str]] = None) -> None:
with self._lock:
if id is not None:
if id in self._profile_results:
self._profile_results[id] = self._profile_results[id].replace(perf=None)
self._profile_results[id].pop("perf", None)
if not self._profile_results[id]:
self._profile_results.pop(id)
else:
for id in list(self._profile_results.keys()):
self._profile_results[id] = self._profile_results[id].replace(perf=None)
self._profile_results[id].pop("perf", None)
if not self._profile_results[id]:
self._profile_results.pop(id)

Expand All @@ -393,24 +442,25 @@ def clear_memory_profiles(self, id: Optional[Union[int, str]] = None) -> None:
with self._lock:
if id is not None:
if id in self._profile_results:
self._profile_results[id] = self._profile_results[id].replace(memory=None)
self._profile_results[id].pop("memory", None)
if not self._profile_results[id]:
self._profile_results.pop(id)
else:
for id in list(self._profile_results.keys()):
self._profile_results[id] = self._profile_results[id].replace(memory=None)
self._profile_results[id].pop("memory", None)
if not self._profile_results[id]:
self._profile_results.pop(id)


class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
super().__init__()
if SpecialAccumulatorIds.SQL_UDF_PROFIER in _accumulatorRegistry:
self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER]

if SpecialAccumulatorIds.SQL_UDF_PROFIER_V2 in _accumulatorRegistry:
self._accumulator = _accumulatorRegistry[SpecialAccumulatorIds.SQL_UDF_PROFIER_V2]
else:
self._accumulator = Accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2
)

@property
Expand Down
17 changes: 13 additions & 4 deletions python/pyspark/sql/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
_deserialize_accumulator,
SpecialAccumulatorIds,
)
from pyspark.sql.profiler import ProfileResultsParam, WorkerPerfProfiler, WorkerMemoryProfiler
from pyspark.sql.profiler import (
ProfileResultsParam,
ProfileResultsParamV2,
WorkerPerfProfiler,
WorkerMemoryProfiler,
)
from pyspark.serializers import (
read_int,
write_int,
Expand Down Expand Up @@ -67,7 +72,11 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None:

_accumulatorRegistry.clear()
accumulator = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER, {}, ProfileResultsParam
SpecialAccumulatorIds.SQL_UDF_PROFIER, None, ProfileResultsParam
)

accumulator_v2 = _deserialize_accumulator(
SpecialAccumulatorIds.SQL_UDF_PROFIER_V2, {}, ProfileResultsParamV2
)

if main.__module__ == "__main__":
Expand All @@ -80,10 +89,10 @@ def worker_run(main: Callable, infile: IO, outfile: IO) -> None:
worker_module = worker_module.split(".")[-1]

if conf.profiler == "perf":
with WorkerPerfProfiler(accumulator, worker_module):
with WorkerPerfProfiler(accumulator, accumulator_v2, worker_module):
main(infile, outfile)
elif conf.profiler == "memory":
with WorkerMemoryProfiler(accumulator, worker_module, main):
with WorkerMemoryProfiler(accumulator, accumulator_v2, worker_module, main):
main(infile, outfile)
else:
main(infile, outfile)
Expand Down
Loading