Skip to content

[SPARK-56538][CONNECT] Add per-RPC deadlines to Spark Connect client#55402

Open
pranavdev022 wants to merge 2 commits intoapache:masterfrom
pranavdev022:SPARK-56538-per-rpc-deadlines
Open

[SPARK-56538][CONNECT] Add per-RPC deadlines to Spark Connect client#55402
pranavdev022 wants to merge 2 commits intoapache:masterfrom
pranavdev022:SPARK-56538-per-rpc-deadlines

Conversation

@pranavdev022
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Introduce a RpcDeadlines configuration class (Scala case class, Python dataclass) that assigns per-RPC gRPC deadlines to every Spark Connect client call. Each field controls the timeout for one RPC type and can be individually set to None to disable.

Defaults:

RPC Default
reattachableExecutePlan, reattachExecute 10 minutes
analyzePlan, addArtifacts 1 hour
config, interrupt, releaseSession, artifactStatus, cloneSession, getStatus, fetchErrorDetails 10 minutes
Non-reattachable ExecutePlan None (no deadline)

Why are the changes needed?

The Spark Connect client currently has no per-RPC timeouts. If a network connection silently dies (load balancer drops an idle connection, firewall closes a stale TCP socket, server becomes unreachable), the client hangs indefinitely with no error or feedback. This is particularly problematic for long-lived streaming responses on the reattachable execute path, where the client expects a continuous stream that may go silent without any TCP-level indication of failure.

Per-RPC deadlines act as a last-resort kill mechanism: if no response arrives within the deadline window, gRPC raises DEADLINE_EXCEEDED on the client side. On the reattachable path, the client transparently opens a fresh ReattachExecute stream (the server-side operation continues running). On unary RPCs, the error surfaces to the user with a hint about how to adjust or disable deadlines.

Does this PR introduce any user-facing change?

Yes. All existing clients will get default deadlines on upgrade. Any call that previously hung indefinitely will now fail with DEADLINE_EXCEEDED after the configured timeout, accompanied by an error message explaining how to configure or disable deadlines via RpcDeadlines.

Users can:

  • Adjust individual deadlines: SparkConnectClient.builder().rpcDeadlines(RpcDeadlines(analyzePlan = Some(2.hours))).build()
  • Disable all deadlines: SparkConnectClient.builder().rpcDeadlines(RpcDeadlines.disabled).build()
  • Python equivalent: SparkConnectClient(url, rpc_deadlines=RpcDeadlines(analyze_plan=7200.0)) or rpc_deadlines=RpcDeadlines.disabled()

How was this patch tested?

Added new tests to verify this feature in SparkConnectClientSuite, SparkConnectClientRetriesSuite, test_client.py, test_client_retries.py.

Was this patch authored or co-authored using generative AI tooling?

Yes. Co-authored with Claude Code (Anthropic).

@pranavdev022 pranavdev022 force-pushed the SPARK-56538-per-rpc-deadlines branch 12 times, most recently from 7446657 to 1f06206 Compare April 20, 2026 20:39
Introduce RpcDeadlines configuration for Scala and Python clients with
defaults per SPARK-56538. Apply deadlines on blocking unary RPCs and
reattachable execute stream segments; omit deadline on non-reattachable
ExecutePlan. Treat DEADLINE_EXCEEDED as non-retryable in the default retry
policy; reattachable iterator recovers via RetryException. Add user-facing
hints when deadlines fire on unary RPCs. Include JVM and Python tests.
@pranavdev022 pranavdev022 force-pushed the SPARK-56538-per-rpc-deadlines branch from 1f06206 to cae11cd Compare April 20, 2026 21:22
@pranavdev022
Copy link
Copy Markdown
Contributor Author

@hvanhovell can you take a look at this PR?
cc: @vicennial

fetchErrorDetails: Option[FiniteDuration] = Some(10.minutes)) {

// Validate all fields: each must be a positive duration or None.
private lazy val namedFields: Seq[(String, Option[FiniteDuration])] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the check below always runs, you don't have to make this lazy.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removed the lazy. The validation runs unconditionally in the constructor body so lazy adds overhead for no benefit.


val rpcDeadlines: RpcDeadlines = configuration.rpcDeadlines

{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scoping is not needed.


private val stub = SparkConnectServiceGrpc.newBlockingStub(channel)

private def withDeadline(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a line of documentation on why we need to create a new stub every time if we have deadlines... it is IMO pretty unintuitive...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a doc comment.

case ex: StatusRuntimeException if ex.getStatus.getCode == Status.Code.DEADLINE_EXCEEDED =>
// The per-RPC deadline fired. The server-side operation is still alive; we clear the
// iterator and raise RetryException so the outer retry loop opens a fresh
// ReattachExecute stream (a new per-RPC deadline countdown) to resume receiving results.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can reattach to an ongoing reattach right?

Copy link
Copy Markdown
Contributor Author

@pranavdev022 pranavdev022 Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
The client-side deadline cancels the gRPC stream, but the server keeps the operation alive (operations are only released by explicit ReleaseExecute or after the server-side release timeout). So a subsequent ReattachExecute with the same operationId and lastReturnedResponseId resumes from where the previous stream left off. If the operation was unexpectedly released, the server returns INVALID_HANDLE and the client falls back to a fresh ExecutePlan.


class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectServiceImplBase {

@volatile var analyzePlanAwait: Option[CountDownLatch] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do???? It is not used at all.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. This was dead code left from an earlier iteration.

override def releaseSession(
request: proto.ReleaseSessionRequest,
responseObserver: StreamObserver[proto.ReleaseSessionResponse]): Unit = {
latch.await(5, TimeUnit.SECONDS)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a way to wait 5 seconds? I understand that CountDonwLatch is probably less sensitive to spurious wake-ups, but it seems a bit heavyweight.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latch has countDown() to release the service thread after assertions. Fixed the placement, moved all countDown() calls into finally blocks so the service thread is released even if an assertion fails. Also reduced the timeout from 5 seconds to 250ms (client deadline is 50ms, so 5x margin is plenty).

}
}

test("analyzePlan deadline fires on slow server") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please test all one-shot RPCs in a single test... or create method that abstracts out all of the commonalities...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated the analyzePlan, config, interrupt, and releaseSession deadline tests into a single test with a shared helper. The helper creates a slow service, sets a short deadline, and verifies DEADLINE_EXCEEDED.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general there is quite a bit of repetition in this file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed via the test consolidation. The repeated slow-service + short-deadline + assert-DEADLINE_EXCEEDED pattern is now in one helper method.

get_status: Optional[float] = 10 * 60 # 10 min
fetch_error_details: Optional[float] = 10 * 60 # 10 min

def __post_init__(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT While this is fancy. It is also quite complex. Is also not really needed. Just check the individual values.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay. It's not difficult to read as a Python programmer. The issue is what would be the alternative. I can't think of any super clean ways to write this. The good thing about this method is that we don't need to change any code when we add new fields in the future. Single source of truth. The only thing I feel a bit unnecessary is the infinite check - that was a bit too much.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified the validation. Removed the isinstance type check and math.isfinite guard.
Keeping the fields() iteration so we don't need to update validation when fields are added.

@zhengruifeng
Copy link
Copy Markdown
Contributor

get_status: Optional[float] = 10 * 60 # 10 min
fetch_error_details: Optional[float] = 10 * 60 # 10 min

def __post_init__(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay. It's not difficult to read as a Python programmer. The issue is what would be the alternative. I can't think of any super clean ways to write this. The good thing about this method is that we don't need to change any code when we add new fields in the future. Single source of truth. The only thing I feel a bit unnecessary is the infinite check - that was a bit too much.

rpc_deadlines if rpc_deadlines is not None else RpcDeadlines()
)
d = self._rpc_deadlines
configured = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason to have this variable is for logging? That's a bit overkill. We can just print the dataclass.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced the manual field enumeration with:

logger.info("Spark Connect RPC deadlines: %s", self._rpc_deadlines)

The server will attempt to use this size if it is set and within the valid range
([1KB, max batch size on server]). Otherwise, the server's maximum batch size is used.
rpc_deadlines : RpcDeadlines, optional
Per-RPC gRPC call timeouts in seconds. Defaults follow SPARK-56538; use
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-56538 is this PR, so a user reading help(SparkConnectClient.init) has to navigate to JIRA and then back to this PR to find out what the defaults actually are.

maybe a one-line summary ("10 min for most RPCs, 1 hour for analyze/addArtifacts, none for non-reattachable execute") is self-contained and cheap.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

PYSPARK_ROOT = os.path.dirname(pyspark.__file__)


@dataclass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use frozen=True here to avoid invalid assignment like:

r = RpcDeadlines()
r.config = -1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Some(stubWithDeadline(reattachableExecutePlanDeadline).executePlan(initialRequest))

// When true, an empty iterator triggers a fresh ExecutePlan instead of ReattachExecute.
private var restartExecutionOnNextRetry: Boolean = false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have such flag in python?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Python doesn't have this flag. Python uses a simpler inline pattern: INVALID_HANDLE creates a new ExecutePlan iterator directly, DEADLINE_EXCEEDED sets the iterator to None so the next retry issues a ReattachExecute. No flag needed.
Removing the flag from Scala to match, the deferred pattern was unnecessary complexity here.

@pranavdev022 pranavdev022 requested a review from hvanhovell April 26, 2026 18:23
@pranavdev022 pranavdev022 force-pushed the SPARK-56538-per-rpc-deadlines branch from 8448b9c to 98b5920 Compare April 27, 2026 07:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants