Skip to content

Commit 6c0dd3f

Browse files
SCHJonathanhuangxiaopingRD
authored andcommitted
[SPARK-54452] Fix empty response from SparkConnect server for spark.sql(...) inside FlowFunction
### What changes were proposed in this pull request? In PR apache#53024, we added SDP support for `spark.sql(...)` inside a FlowFunction. For these calls, instead of eagerly executing the SQL, the Spark Connect server should return the raw logical plan to the client and defer execution to the flow function. However, in that PR we constructed the response object but forgot to actually return it to the Spark Connect client, so the client received an empty response. This went unnoticed in tests because, when the client sees an empty `spark.sql(...)` response, [it falls back to creating an empty DataFrame holding the raw logical plan](https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/session.py#L829-L835), which happens to match the desired behavior. This PR fixes the bug by returning the proper response instead of relying on that implicit fallback. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? This PR fixes a bug introduced in apache#53024 where the server did not return the constructed spark.sql(...) response to the client. ### How was this patch tested? New tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#53156 from SCHJonathan/jonathan-chang_data/fix-spark-sql-bug. Authored-by: Yuheng Chang <jonathanyuheng@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent a5e2199 commit 6c0dd3f

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2992,6 +2992,14 @@ class SparkConnectPlanner(
29922992
// the SQL command and defer the actual analysis and execution to the flow function.
29932993
if (insidePipelineFlowFunction) {
29942994
result.setRelation(relation)
2995+
executeHolder.eventsManager.postFinished()
2996+
responseObserver.onNext(
2997+
ExecutePlanResponse
2998+
.newBuilder()
2999+
.setSessionId(sessionHolder.sessionId)
3000+
.setServerSideSessionId(sessionHolder.serverSessionId)
3001+
.setSqlCommandResult(result)
3002+
.build)
29953003
return
29963004
}
29973005

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/SparkDeclarativePipelinesServerSuite.scala

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,4 +850,98 @@ class SparkDeclarativePipelinesServerSuite
850850
}
851851
}
852852
}
853+
854+
test(
855+
"SPARK-54452: spark.sql() inside a pipeline flow function should return a sql_command_result") {
856+
withRawBlockingStub { implicit stub =>
857+
val graphId = createDataflowGraph
858+
val pipelineAnalysisContext = proto.PipelineAnalysisContext
859+
.newBuilder()
860+
.setDataflowGraphId(graphId)
861+
.setFlowName("flow1")
862+
.build()
863+
val userContext = proto.UserContext
864+
.newBuilder()
865+
.addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
866+
.setUserId("test_user")
867+
.build()
868+
869+
val relation = proto.Plan
870+
.newBuilder()
871+
.setCommand(
872+
proto.Command
873+
.newBuilder()
874+
.setSqlCommand(
875+
proto.SqlCommand
876+
.newBuilder()
877+
.setInput(
878+
proto.Relation
879+
.newBuilder()
880+
.setRead(proto.Read
881+
.newBuilder()
882+
.setNamedTable(
883+
proto.Read.NamedTable.newBuilder().setUnparsedIdentifier("table"))
884+
.build())
885+
.build()))
886+
.build())
887+
.build()
888+
889+
val sparkSqlRequest = proto.ExecutePlanRequest
890+
.newBuilder()
891+
.setUserContext(userContext)
892+
.setPlan(relation)
893+
.setSessionId(UUID.randomUUID().toString)
894+
.build()
895+
val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
896+
assert(sparkSqlResponse.hasSqlCommandResult)
897+
assert(
898+
sparkSqlResponse.getSqlCommandResult.getRelation ==
899+
relation.getCommand.getSqlCommand.getInput)
900+
}
901+
}
902+
903+
test(
904+
"SPARK-54452: spark.sql() outside a pipeline flow function should return a " +
905+
"sql_command_result") {
906+
withRawBlockingStub { implicit stub =>
907+
val graphId = createDataflowGraph
908+
val pipelineAnalysisContext = proto.PipelineAnalysisContext
909+
.newBuilder()
910+
.setDataflowGraphId(graphId)
911+
.build()
912+
val userContext = proto.UserContext
913+
.newBuilder()
914+
.addExtensions(com.google.protobuf.Any.pack(pipelineAnalysisContext))
915+
.setUserId("test_user")
916+
.build()
917+
918+
val relation = proto.Plan
919+
.newBuilder()
920+
.setCommand(
921+
proto.Command
922+
.newBuilder()
923+
.setSqlCommand(
924+
proto.SqlCommand
925+
.newBuilder()
926+
.setInput(proto.Relation
927+
.newBuilder()
928+
.setSql(proto.SQL.newBuilder().setQuery("SELECT * FROM RANGE(5)"))
929+
.build())
930+
.build())
931+
.build())
932+
.build()
933+
934+
val sparkSqlRequest = proto.ExecutePlanRequest
935+
.newBuilder()
936+
.setUserContext(userContext)
937+
.setPlan(relation)
938+
.setSessionId(UUID.randomUUID().toString)
939+
.build()
940+
val sparkSqlResponse = stub.executePlan(sparkSqlRequest).next()
941+
assert(sparkSqlResponse.hasSqlCommandResult)
942+
assert(
943+
sparkSqlResponse.getSqlCommandResult.getRelation ==
944+
relation.getCommand.getSqlCommand.getInput)
945+
}
946+
}
853947
}

0 commit comments

Comments
 (0)