Skip to content

Commit fb8dbd7

Browse files
ueshindongjoon-hyun
authored andcommitted
[SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view
### What changes were proposed in this pull request? Changes the way to access logs to TVF instead of system view. ```sql SELECT * FROM python_worker_logs() ``` ```py spark.tvf.python_worker_logs() ``` Also blocks the TVF when the python worker logging is disabled. ```py >>> spark.conf.get('spark.sql.pyspark.worker.logging.enabled') 'false' >>> spark.tvf.python_worker_logs().show() Traceback (most recent call last): ... pyspark.errors.exceptions.captured.AnalysisException: [FEATURE_NOT_ENABLED] The feature Python Worker Logging is not enabled. Consider setting the config spark.sql.pyspark.worker.logging.enabled to true to enable this capability. SQLSTATE: 56038 >>> spark.conf.set('spark.sql.pyspark.worker.logging.enabled', True) >>> spark.tvf.python_worker_logs().show() +---+-----+---+-------+---------+------+ | ts|level|msg|context|exception|logger| +---+-----+---+-------+---------+------+ +---+-----+---+-------+---------+------+ ``` ### Why are the changes needed? There may be namespace conflicts with the other system tables/views, etc.. For example, the variables in SQL has the same namespace `system.session.varname`, which may potentially cause an issue. ### Does this PR introduce _any_ user-facing change? Yes, the way to access python worker logs will be changed. ### How was this patch tested? Modified the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53021 from ueshin/issues/SPARK-54323/tvf. Authored-by: Takuya Ueshin <ueshin@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 05b0543 commit fb8dbd7

File tree

28 files changed

+642
-523
lines changed

28 files changed

+642
-523
lines changed

project/MimaExcludes.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ object MimaExcludes {
5252
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.repartitionById"),
5353

5454
// [SPARK-54001][CONNECT] Replace block copying with ref-counting in ArtifactManager cloning
55-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList")
55+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"),
56+
57+
// [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view
58+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs")
5659
)
5760

5861
// Default exclude rules

python/docs/source/user_guide/bugbusting.ipynb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@
945945
"metadata": {},
946946
"outputs": [],
947947
"source": [
948-
"logs = spark.table(\"system.session.python_worker_logs\")"
948+
"logs = spark.tvf.python_worker_logs()"
949949
]
950950
},
951951
{
@@ -1021,7 +1021,7 @@
10211021
"df.select(my_udf(\"text\")).show()\n",
10221022
"\n",
10231023
"# Query the logs\n",
1024-
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1024+
"logs = spark.tvf.python_worker_logs()\n",
10251025
"logs.select(\"level\", \"msg\", \"logger\", \"context\").show(truncate=False)"
10261026
]
10271027
},
@@ -1076,7 +1076,7 @@
10761076
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
10771077
"spark.range(1).select(contextual_udf(lit(\"test\"))).show()\n",
10781078
"\n",
1079-
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1079+
"logs = spark.tvf.python_worker_logs()\n",
10801080
"logs.filter(\"logger = 'contextual'\").select(\"msg\", \"context\").show(truncate=False)"
10811081
]
10821082
},
@@ -1135,7 +1135,7 @@
11351135
"spark.conf.set(\"spark.sql.pyspark.worker.logging.enabled\", \"true\")\n",
11361136
"spark.createDataFrame([(0,), (5,)], [\"value\"]).select(failing_udf(\"value\")).show()\n",
11371137
"\n",
1138-
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1138+
"logs = spark.tvf.python_worker_logs()\n",
11391139
"logs.filter(\"logger = 'error_handler'\").select(\"msg\", \"exception\").show(truncate=False)"
11401140
]
11411141
},
@@ -1193,7 +1193,7 @@
11931193
"df = spark.createDataFrame([(\"hello world\",)], [\"text\"])\n",
11941194
"df.lateralJoin(WordSplitter(col(\"text\").outer())).show()\n",
11951195
"\n",
1196-
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1196+
"logs = spark.tvf.python_worker_logs()\n",
11971197
"logs.filter(\"logger = 'udtf_logger'\").select(\"msg\", \"context\").show(truncate=False)"
11981198
]
11991199
},
@@ -1231,7 +1231,7 @@
12311231
}
12321232
],
12331233
"source": [
1234-
"logs = spark.table(\"system.session.python_worker_logs\")\n",
1234+
"logs = spark.tvf.python_worker_logs()\n",
12351235
"\n",
12361236
"# Count logs by level\n",
12371237
"logs.groupBy(\"level\").count().show()\n",

python/pyspark/sql/connect/tvf.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ def variant_explode_outer(self, input: "Column") -> "DataFrame":
109109

110110
variant_explode_outer.__doc__ = PySparkTableValuedFunction.variant_explode_outer.__doc__
111111

112+
def python_worker_logs(self) -> "DataFrame":
113+
return self._fn("python_worker_logs")
114+
115+
python_worker_logs.__doc__ = PySparkTableValuedFunction.python_worker_logs.__doc__
116+
112117
def _fn(self, name: str, *args: "Column") -> "DataFrame":
113118
from pyspark.sql.connect.dataframe import DataFrame
114119
from pyspark.sql.connect.plan import UnresolvedTableValuedFunction

python/pyspark/sql/tests/arrow/test_arrow_cogrouped_map.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -396,20 +396,20 @@ def func_with_logging(left, right):
396396
+ [Row(id=2, v1=20, v2=200)],
397397
)
398398

399-
logs = self.spark.table("system.session.python_worker_logs")
400-
401-
assertDataFrameEqual(
402-
logs.select("level", "msg", "context", "logger"),
403-
[
404-
Row(
405-
level="WARNING",
406-
msg=f"arrow cogrouped map: {dict(v1=v1, v2=v2)}",
407-
context={"func_name": func_with_logging.__name__},
408-
logger="test_arrow_cogrouped_map",
409-
)
410-
for v1, v2 in [([10, 30], [100, 300]), ([20], [200])]
411-
],
412-
)
399+
logs = self.spark.tvf.python_worker_logs()
400+
401+
assertDataFrameEqual(
402+
logs.select("level", "msg", "context", "logger"),
403+
[
404+
Row(
405+
level="WARNING",
406+
msg=f"arrow cogrouped map: {dict(v1=v1, v2=v2)}",
407+
context={"func_name": func_with_logging.__name__},
408+
logger="test_arrow_cogrouped_map",
409+
)
410+
for v1, v2 in [([10, 30], [100, 300]), ([20], [200])]
411+
],
412+
)
413413

414414

415415
class CogroupedMapInArrowTests(CogroupedMapInArrowTestsMixin, ReusedSQLTestCase):

python/pyspark/sql/tests/arrow/test_arrow_grouped_map.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -416,20 +416,20 @@ def func_with_logging(group):
416416
df,
417417
)
418418

419-
logs = self.spark.table("system.session.python_worker_logs")
420-
421-
assertDataFrameEqual(
422-
logs.select("level", "msg", "context", "logger"),
423-
[
424-
Row(
425-
level="WARNING",
426-
msg=f"arrow grouped map: {dict(id=lst, value=[v*10 for v in lst])}",
427-
context={"func_name": func_with_logging.__name__},
428-
logger="test_arrow_grouped_map",
429-
)
430-
for lst in [[0, 2, 4, 6, 8], [1, 3, 5, 7]]
431-
],
432-
)
419+
logs = self.spark.tvf.python_worker_logs()
420+
421+
assertDataFrameEqual(
422+
logs.select("level", "msg", "context", "logger"),
423+
[
424+
Row(
425+
level="WARNING",
426+
msg=f"arrow grouped map: {dict(id=lst, value=[v*10 for v in lst])}",
427+
context={"func_name": func_with_logging.__name__},
428+
logger="test_arrow_grouped_map",
429+
)
430+
for lst in [[0, 2, 4, 6, 8], [1, 3, 5, 7]]
431+
],
432+
)
433433

434434
@unittest.skipIf(is_remote_only(), "Requires JVM access")
435435
def test_apply_in_arrow_iter_with_logging(self):
@@ -456,20 +456,20 @@ def func_with_logging(group: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatc
456456
df,
457457
)
458458

459-
logs = self.spark.table("system.session.python_worker_logs")
459+
logs = self.spark.tvf.python_worker_logs()
460460

461-
assertDataFrameEqual(
462-
logs.select("level", "msg", "context", "logger"),
463-
[
464-
Row(
465-
level="WARNING",
466-
msg=f"arrow grouped map: {dict(id=lst, value=[v*10 for v in lst])}",
467-
context={"func_name": func_with_logging.__name__},
468-
logger="test_arrow_grouped_map",
469-
)
470-
for lst in [[0, 2, 4], [6, 8], [1, 3, 5], [7]]
471-
],
472-
)
461+
assertDataFrameEqual(
462+
logs.select("level", "msg", "context", "logger"),
463+
[
464+
Row(
465+
level="WARNING",
466+
msg=f"arrow grouped map: {dict(id=lst, value=[v*10 for v in lst])}",
467+
context={"func_name": func_with_logging.__name__},
468+
logger="test_arrow_grouped_map",
469+
)
470+
for lst in [[0, 2, 4], [6, 8], [1, 3, 5], [7]]
471+
],
472+
)
473473

474474

475475
class ApplyInArrowTests(ApplyInArrowTestsMixin, ReusedSQLTestCase):

python/pyspark/sql/tests/arrow/test_arrow_map.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,12 @@ def func_with_logging(iterator):
247247
[Row(id=i) for i in range(9)],
248248
)
249249

250-
logs = self.spark.table("system.session.python_worker_logs")
250+
logs = self.spark.tvf.python_worker_logs()
251251

252-
assertDataFrameEqual(
253-
logs.select("level", "msg", "context", "logger"),
254-
self._expected_logs_for_test_map_in_arrow_with_logging(func_with_logging.__name__),
255-
)
252+
assertDataFrameEqual(
253+
logs.select("level", "msg", "context", "logger"),
254+
self._expected_logs_for_test_map_in_arrow_with_logging(func_with_logging.__name__),
255+
)
256256

257257
def _expected_logs_for_test_map_in_arrow_with_logging(self, func_name):
258258
return [

python/pyspark/sql/tests/arrow/test_arrow_udf_grouped_agg.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,20 +1044,20 @@ def my_grouped_agg_arrow_udf(x):
10441044
[Row(id=1, result=3.0), Row(id=2, result=18.0)],
10451045
)
10461046

1047-
logs = self.spark.table("system.session.python_worker_logs")
1047+
logs = self.spark.tvf.python_worker_logs()
10481048

1049-
assertDataFrameEqual(
1050-
logs.select("level", "msg", "context", "logger"),
1051-
[
1052-
Row(
1053-
level="WARNING",
1054-
msg=f"grouped agg arrow udf: {n}",
1055-
context={"func_name": my_grouped_agg_arrow_udf.__name__},
1056-
logger="test_grouped_agg_arrow",
1057-
)
1058-
for n in [2, 3]
1059-
],
1060-
)
1049+
assertDataFrameEqual(
1050+
logs.select("level", "msg", "context", "logger"),
1051+
[
1052+
Row(
1053+
level="WARNING",
1054+
msg=f"grouped agg arrow udf: {n}",
1055+
context={"func_name": my_grouped_agg_arrow_udf.__name__},
1056+
logger="test_grouped_agg_arrow",
1057+
)
1058+
for n in [2, 3]
1059+
],
1060+
)
10611061

10621062

10631063
class GroupedAggArrowUDFTests(GroupedAggArrowUDFTestsMixin, ReusedSQLTestCase):

python/pyspark/sql/tests/arrow/test_arrow_udf_scalar.py

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,20 +1201,20 @@ def my_scalar_arrow_udf(x):
12011201
[Row(result=f"scalar_arrow_{i}") for i in range(3)],
12021202
)
12031203

1204-
logs = self.spark.table("system.session.python_worker_logs")
1204+
logs = self.spark.tvf.python_worker_logs()
12051205

1206-
assertDataFrameEqual(
1207-
logs.select("level", "msg", "context", "logger"),
1208-
[
1209-
Row(
1210-
level="WARNING",
1211-
msg=f"scalar arrow udf: {lst}",
1212-
context={"func_name": my_scalar_arrow_udf.__name__},
1213-
logger="test_scalar_arrow",
1214-
)
1215-
for lst in [[0], [1, 2]]
1216-
],
1217-
)
1206+
assertDataFrameEqual(
1207+
logs.select("level", "msg", "context", "logger"),
1208+
[
1209+
Row(
1210+
level="WARNING",
1211+
msg=f"scalar arrow udf: {lst}",
1212+
context={"func_name": my_scalar_arrow_udf.__name__},
1213+
logger="test_scalar_arrow",
1214+
)
1215+
for lst in [[0], [1, 2]]
1216+
],
1217+
)
12181218

12191219
@unittest.skipIf(is_remote_only(), "Requires JVM access")
12201220
def test_scalar_iter_arrow_udf_with_logging(self):
@@ -1241,20 +1241,20 @@ def my_scalar_iter_arrow_udf(it):
12411241
[Row(result=f"scalar_iter_arrow_{i}") for i in range(9)],
12421242
)
12431243

1244-
logs = self.spark.table("system.session.python_worker_logs")
1244+
logs = self.spark.tvf.python_worker_logs()
12451245

1246-
assertDataFrameEqual(
1247-
logs.select("level", "msg", "context", "logger"),
1248-
[
1249-
Row(
1250-
level="WARNING",
1251-
msg=f"scalar iter arrow udf: {lst}",
1252-
context={"func_name": my_scalar_iter_arrow_udf.__name__},
1253-
logger="test_scalar_iter_arrow",
1254-
)
1255-
for lst in [[0, 1, 2], [3], [4, 5, 6], [7, 8]]
1256-
],
1257-
)
1246+
assertDataFrameEqual(
1247+
logs.select("level", "msg", "context", "logger"),
1248+
[
1249+
Row(
1250+
level="WARNING",
1251+
msg=f"scalar iter arrow udf: {lst}",
1252+
context={"func_name": my_scalar_iter_arrow_udf.__name__},
1253+
logger="test_scalar_iter_arrow",
1254+
)
1255+
for lst in [[0, 1, 2], [3], [4, 5, 6], [7, 8]]
1256+
],
1257+
)
12581258

12591259

12601260
class ScalarArrowUDFTests(ScalarArrowUDFTestsMixin, ReusedSQLTestCase):

python/pyspark/sql/tests/arrow/test_arrow_udf_window.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -834,20 +834,20 @@ def my_window_arrow_udf(x):
834834
],
835835
)
836836

837-
logs = self.spark.table("system.session.python_worker_logs")
838-
839-
assertDataFrameEqual(
840-
logs.select("level", "msg", "context", "logger"),
841-
[
842-
Row(
843-
level="WARNING",
844-
msg=f"window arrow udf: {lst}",
845-
context={"func_name": my_window_arrow_udf.__name__},
846-
logger="test_window_arrow",
847-
)
848-
for lst in [[1.0], [1.0, 2.0], [3.0], [3.0, 5.0], [3.0, 5.0, 10.0]]
849-
],
850-
)
837+
logs = self.spark.tvf.python_worker_logs()
838+
839+
assertDataFrameEqual(
840+
logs.select("level", "msg", "context", "logger"),
841+
[
842+
Row(
843+
level="WARNING",
844+
msg=f"window arrow udf: {lst}",
845+
context={"func_name": my_window_arrow_udf.__name__},
846+
logger="test_window_arrow",
847+
)
848+
for lst in [[1.0], [1.0, 2.0], [3.0], [3.0, 5.0], [3.0, 5.0, 10.0]]
849+
],
850+
)
851851

852852

853853
class WindowArrowUDFTests(WindowArrowUDFTestsMixin, ReusedSQLTestCase):

python/pyspark/sql/tests/arrow/test_arrow_udtf.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,20 +1721,20 @@ def eval(self, table_data: "pa.RecordBatch") -> Iterator["pa.Table"]:
17211721
[Row(id=i, doubled=i * 2) for i in range(9)],
17221722
)
17231723

1724-
logs = self.spark.table("system.session.python_worker_logs")
1724+
logs = self.spark.tvf.python_worker_logs()
17251725

1726-
assertDataFrameEqual(
1727-
logs.select("level", "msg", "context", "logger"),
1728-
[
1729-
Row(
1730-
level="WARNING",
1731-
msg=f"arrow udtf: {dict(id=lst)}",
1732-
context={"class_name": "TestArrowUDTFWithLogging", "func_name": "eval"},
1733-
logger="test_arrow_udtf",
1734-
)
1735-
for lst in [[0, 1, 2], [3], [4, 5, 6], [7, 8]]
1736-
],
1737-
)
1726+
assertDataFrameEqual(
1727+
logs.select("level", "msg", "context", "logger"),
1728+
[
1729+
Row(
1730+
level="WARNING",
1731+
msg=f"arrow udtf: {dict(id=lst)}",
1732+
context={"class_name": "TestArrowUDTFWithLogging", "func_name": "eval"},
1733+
logger="test_arrow_udtf",
1734+
)
1735+
for lst in [[0, 1, 2], [3], [4, 5, 6], [7, 8]]
1736+
],
1737+
)
17381738

17391739

17401740
class ArrowUDTFTests(ArrowUDTFTestsMixin, ReusedSQLTestCase):

0 commit comments

Comments
 (0)