From c61de5873580bd1787deb1c035f237e1ba04917c Mon Sep 17 00:00:00 2001 From: Anton Nekipelov <226657+anton-107@users.noreply.github.com> Date: Fri, 5 Jun 2026 17:05:15 +0200 Subject: [PATCH 1/3] experimental/ssh: surface connect failures instead of hanging Improve diagnostics when `databricks ssh connect` fails. - Surface bootstrap job-run errors: when the SSH server bootstrap job reaches a terminal/failed state, fetch the run's state message, notebook error/trace, and run-page URL and show them, instead of the generic "server metadata error / metadata.json doesn't exist". - Guard against hangs when the server is up but the handshake never completes (e.g. the container image has no OpenSSH server, so the server can't launch /usr/sbin/sshd and holds the websocket open). The client now aborts after a handshake timeout with an actionable hint, and exits promptly when the server closes the connection, instead of hanging until ssh's ConnectTimeout. - Add an openssh-server hint when ssh exits with its connection-failure code (255). Tests cover the failed-run message formatting, the fast exit on server close, and the handshake timeout. WIP: the missing-sshd path still incurs a handshake-timeout wait; a server-side pre-flight sshd check (tracked separately) would turn it into an immediate, clear job failure. Co-authored-by: Isaac --- experimental/ssh/internal/client/client.go | 135 ++++++++++++++++-- .../internal/client/client_internal_test.go | 117 +++++++++++++++ experimental/ssh/internal/proxy/client.go | 95 +++++++++--- .../ssh/internal/proxy/client_server_test.go | 83 +++++++++++ 4 files changed, 401 insertions(+), 29 deletions(-) create mode 100644 experimental/ssh/internal/client/client_internal_test.go diff --git a/experimental/ssh/internal/client/client.go b/experimental/ssh/internal/client/client.go index ff9d87d61b9..00c1e05d0d0 100644 --- a/experimental/ssh/internal/client/client.go +++ b/experimental/ssh/internal/client/client.go @@ -489,16 +489,19 @@ func getServerMetadata(ctx context.Context, client *databricks.WorkspaceClient, return wsMetadata.Port, string(bodyBytes), effectiveClusterID, nil } -func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient, version, secretScopeName string, opts ClientOptions) error { +// submitSSHTunnelJob submits the bootstrap job and waits for the SSH server task to start. +// It returns the job run ID (when known) so callers can fetch and surface the run's error +// details if the server never comes up. +func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient, version, secretScopeName string, opts ClientOptions) (int64, error) { sessionID := opts.SessionIdentifier() contentDir, err := sshWorkspace.GetWorkspaceContentDir(ctx, client, version, sessionID) if err != nil { - return fmt.Errorf("failed to get workspace content directory: %w", err) + return 0, fmt.Errorf("failed to get workspace content directory: %w", err) } err = client.Workspace.MkdirsByPath(ctx, contentDir) //nolint:staticcheck // Deprecated in SDK v0.127.0. Migration to WorkspaceHierarchyService tracked separately. if err != nil { - return fmt.Errorf("failed to create directory in the remote workspace: %w", err) + return 0, fmt.Errorf("failed to create directory in the remote workspace: %w", err) } sshTunnelJobName := "ssh-server-bootstrap-" + sessionID @@ -514,7 +517,7 @@ func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient, Overwrite: true, }) if err != nil { - return fmt.Errorf("failed to create ssh-tunnel notebook: %w", err) + return 0, fmt.Errorf("failed to create ssh-tunnel notebook: %w", err) } baseParams := map[string]string{ @@ -569,12 +572,13 @@ func submitSSHTunnelJob(ctx context.Context, client *databricks.WorkspaceClient, waiter, err := client.Jobs.Submit(ctx, submitRequest) if err != nil { - return fmt.Errorf("failed to submit job: %w", err) + return 0, fmt.Errorf("failed to submit job: %w", err) } cmdio.LogString(ctx, fmt.Sprintf("Job submitted successfully with run ID: %d", waiter.RunId)) - return waitForJobToStart(ctx, client, waiter.RunId, opts.TaskStartupTimeout) + // Return the run ID even on error so callers can fetch the run's failure details. + return waiter.RunId, waitForJobToStart(ctx, client, waiter.RunId, opts.TaskStartupTimeout) } func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, serverPort int, clusterID string, opts ClientOptions) error { @@ -610,7 +614,18 @@ func spawnSSHClient(ctx context.Context, userName, privateKeyPath string, server sshCmd.Stdout = os.Stdout sshCmd.Stderr = os.Stderr - return sshCmd.Run() + err = sshCmd.Run() + // ssh reserves exit code 255 for its own connection-level failures (a remote command's exit + // code is passed through as-is, 0-254). The most common cause here is the cluster's container + // image missing an OpenSSH server, so the server can't launch sshd once we connect — the + // connection then drops right after "Connected!". Surface an actionable hint rather than + // leaving the user with ssh's opaque "Connection closed" message. + if exitErr, ok := errors.AsType[*exec.ExitError](err); ok && exitErr.ExitCode() == 255 { + cmdio.LogString(ctx, cmdio.Yellow(ctx, "The SSH connection closed unexpectedly. If it dropped right after connecting, "+ + "the cluster's container image is likely missing an OpenSSH server: ensure 'openssh-server' "+ + "is installed (it provides /usr/sbin/sshd), then check the SSH server job run logs.")) + } + return err } func runSSHProxy(ctx context.Context, client *databricks.WorkspaceClient, serverPort int, clusterID string, opts ClientOptions) error { @@ -691,9 +706,10 @@ func waitForJobToStart(ctx context.Context, client *databricks.WorkspaceClient, return sshTask, nil } - // Check for terminal failure states + // Check for terminal failure states. Surface the run's actual error (e.g. a notebook + // traceback or "Could not reach driver") instead of a generic message. if currentState == jobs.RunLifecycleStateV2StateTerminated { - return nil, retries.Halt(errors.New("task terminated before reaching running state")) + return nil, retries.Halt(fmt.Errorf("ssh server bootstrap job failed:\n%s", describeRunFailure(ctx, client, runID))) } // Continue polling for other states @@ -703,6 +719,94 @@ func waitForJobToStart(ctx context.Context, client *databricks.WorkspaceClient, return err } +// maxRunFailureTraceBytes bounds how much of a failed run's error trace we print to the +// terminal; the full output is always available via the run page URL. +const maxRunFailureTraceBytes = 2000 + +// describeRunFailure fetches a failed bootstrap run's error details and formats them for the +// terminal. It is best-effort: any API error is folded into the returned text rather than +// propagated, so callers can always embed the result in their own error. +func describeRunFailure(ctx context.Context, client *databricks.WorkspaceClient, runID int64) string { + if runID == 0 { + return " (no job run ID available)" + } + + run, err := client.Jobs.GetRun(ctx, jobs.GetRunRequest{RunId: runID}) + if err != nil { + return fmt.Sprintf(" could not fetch job run %d: %v", runID, err) + } + + var b strings.Builder + + // Locate the SSH server task to read its termination reason and per-task run output. + var sshTask *jobs.RunTask + for i := range run.Tasks { + if run.Tasks[i].TaskKey == sshServerTaskKey { + sshTask = &run.Tasks[i] + break + } + } + + if sshTask != nil && sshTask.Status != nil && sshTask.Status.TerminationDetails != nil { + if msg := strings.TrimSpace(sshTask.Status.TerminationDetails.Message); msg != "" { + fmt.Fprintf(&b, " %s\n", msg) + } + } + + // The notebook error/traceback carries the real cause (e.g. a Python exception). + outputRunID := runID + if sshTask != nil && sshTask.RunId != 0 { + outputRunID = sshTask.RunId + } + if output, err := client.Jobs.GetRunOutput(ctx, jobs.GetRunOutputRequest{RunId: outputRunID}); err == nil && output != nil { + if e := strings.TrimSpace(output.Error); e != "" { + fmt.Fprintf(&b, " %s\n", e) + } + if trace := strings.TrimSpace(output.ErrorTrace); trace != "" { + fmt.Fprintf(&b, "%s\n", truncateTail(trace, maxRunFailureTraceBytes)) + } + } + + if run.RunPageUrl != "" { + fmt.Fprintf(&b, " See the full job logs: %s", run.RunPageUrl) + } + + if b.Len() == 0 { + return fmt.Sprintf(" job run %d failed; see run details in the workspace", runID) + } + return strings.TrimRight(b.String(), "\n") +} + +// runFailureIfTerminated reports whether the bootstrap run has reached a terminal state (so the +// SSH server will never come up), returning a formatted failure description when it has. +func runFailureIfTerminated(ctx context.Context, client *databricks.WorkspaceClient, runID int64) (string, bool) { + if runID == 0 { + return "", false + } + run, err := client.Jobs.GetRun(ctx, jobs.GetRunRequest{RunId: runID}) + if err != nil { + return "", false + } + for i := range run.Tasks { + if run.Tasks[i].TaskKey != sshServerTaskKey { + continue + } + if run.Tasks[i].Status != nil && run.Tasks[i].Status.State == jobs.RunLifecycleStateV2StateTerminated { + return describeRunFailure(ctx, client, runID), true + } + return "", false + } + return "", false +} + +// truncateTail returns the last maxBytes of s, marking the cut when truncated. +func truncateTail(s string, maxBytes int) string { + if len(s) <= maxBytes { + return s + } + return " ...\n" + s[len(s)-maxBytes:] +} + func ensureSSHServerIsRunning(ctx context.Context, client *databricks.WorkspaceClient, version, secretScopeName string, opts ClientOptions) (string, int, string, error) { sessionID := opts.SessionIdentifier() // For dedicated clusters, use clusterID; for serverless, it will be read from metadata @@ -712,7 +816,7 @@ func ensureSSHServerIsRunning(ctx context.Context, client *databricks.WorkspaceC if errors.Is(err, errServerMetadata) { cmdio.LogString(ctx, "Starting SSH server...") - err := submitSSHTunnelJob(ctx, client, version, secretScopeName, opts) + runID, err := submitSSHTunnelJob(ctx, client, version, secretScopeName, opts) if err != nil { return "", 0, "", fmt.Errorf("failed to submit and start ssh server job: %w", err) } @@ -729,10 +833,17 @@ func ensureSSHServerIsRunning(ctx context.Context, client *databricks.WorkspaceC if err == nil { cmdio.LogString(ctx, "Health check successful, starting ssh WebSocket connection...") break - } else if retries < maxRetries-1 { + } + // The metadata never appears if the bootstrap job dies after reaching RUNNING. + // Surface the job's actual error instead of waiting out the full timeout with a + // generic "metadata.json doesn't exist" message. + if failure, terminated := runFailureIfTerminated(ctx, client, runID); terminated { + return "", 0, "", fmt.Errorf("ssh server bootstrap job failed:\n%s", failure) + } + if retries < maxRetries-1 { time.Sleep(2 * time.Second) } else { - return "", 0, "", fmt.Errorf("failed to start the ssh server: %w", err) + return "", 0, "", fmt.Errorf("failed to start the ssh server: %w\n%s", err, describeRunFailure(ctx, client, runID)) } } } else if err != nil { diff --git a/experimental/ssh/internal/client/client_internal_test.go b/experimental/ssh/internal/client/client_internal_test.go new file mode 100644 index 00000000000..35740d8cee7 --- /dev/null +++ b/experimental/ssh/internal/client/client_internal_test.go @@ -0,0 +1,117 @@ +package client + +import ( + "strings" + "testing" + "time" + + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// terminatedRun builds a job run whose SSH server task has terminated, for the failure-surfacing tests. +func terminatedRun(runID, taskRunID int64, message, pageURL string) *jobs.Run { + return &jobs.Run{ + RunId: runID, + RunPageUrl: pageURL, + Tasks: []jobs.RunTask{{ + TaskKey: sshServerTaskKey, + RunId: taskRunID, + Status: &jobs.RunStatus{ + State: jobs.RunLifecycleStateV2StateTerminated, + TerminationDetails: &jobs.TerminationDetails{Message: message}, + }, + }}, + } +} + +func TestDescribeRunFailureIncludesMessageTraceAndURL(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockJobsAPI() + api.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{RunId: 1}).Return( + terminatedRun(1, 99, "Could not reach driver of cluster 0605-x.", "https://example.test/run/1"), nil) + api.EXPECT().GetRunOutput(mock.Anything, jobs.GetRunOutputRequest{RunId: 99}).Return( + &jobs.RunOutput{Error: "Run failed with error message", ErrorTrace: "Traceback (most recent call last): boom"}, nil) + + out := describeRunFailure(ctx, m.WorkspaceClient, 1) + assert.Contains(t, out, "Could not reach driver of cluster 0605-x.") + assert.Contains(t, out, "Run failed with error message") + assert.Contains(t, out, "Traceback (most recent call last): boom") + assert.Contains(t, out, "https://example.test/run/1") +} + +func TestDescribeRunFailureTruncatesLongTrace(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockJobsAPI() + longTrace := strings.Repeat("x", maxRunFailureTraceBytes+500) + "TAIL_MARKER" + api.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{RunId: 1}).Return( + terminatedRun(1, 99, "", "https://example.test/run/1"), nil) + api.EXPECT().GetRunOutput(mock.Anything, jobs.GetRunOutputRequest{RunId: 99}).Return( + &jobs.RunOutput{ErrorTrace: longTrace}, nil) + + out := describeRunFailure(ctx, m.WorkspaceClient, 1) + assert.Contains(t, out, "...") + assert.Contains(t, out, "TAIL_MARKER") + // The leading run of 'x' is dropped by truncation. + assert.NotContains(t, out, strings.Repeat("x", maxRunFailureTraceBytes+1)) +} + +func TestDescribeRunFailureNoRunID(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + m := mocks.NewMockWorkspaceClient(t) + out := describeRunFailure(ctx, m.WorkspaceClient, 0) + assert.Contains(t, out, "no job run ID") +} + +func TestRunFailureIfTerminated(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + + t.Run("terminated", func(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockJobsAPI() + api.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{RunId: 1}).Return( + terminatedRun(1, 99, "boom", "https://example.test/run/1"), nil) + api.EXPECT().GetRunOutput(mock.Anything, jobs.GetRunOutputRequest{RunId: 99}).Return( + &jobs.RunOutput{}, nil) + + desc, terminated := runFailureIfTerminated(ctx, m.WorkspaceClient, 1) + assert.True(t, terminated) + assert.Contains(t, desc, "boom") + }) + + t.Run("still running", func(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockJobsAPI() + api.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{RunId: 1}).Return(&jobs.Run{ + RunId: 1, + Tasks: []jobs.RunTask{{ + TaskKey: sshServerTaskKey, + Status: &jobs.RunStatus{State: jobs.RunLifecycleStateV2StateRunning}, + }}, + }, nil) + + _, terminated := runFailureIfTerminated(ctx, m.WorkspaceClient, 1) + assert.False(t, terminated) + }) +} + +func TestWaitForJobToStartSurfacesFailure(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockJobsAPI() + api.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{RunId: 1}).Return( + terminatedRun(1, 99, "Could not reach driver of cluster 0605-x.", "https://example.test/run/1"), nil) + api.EXPECT().GetRunOutput(mock.Anything, jobs.GetRunOutputRequest{RunId: 99}).Return( + &jobs.RunOutput{}, nil) + + err := waitForJobToStart(ctx, m.WorkspaceClient, 1, 30*time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "ssh server bootstrap job failed") + assert.Contains(t, err.Error(), "Could not reach driver of cluster 0605-x.") +} diff --git a/experimental/ssh/internal/proxy/client.go b/experimental/ssh/internal/proxy/client.go index 89be5967c9b..a1e8389e7ff 100644 --- a/experimental/ssh/internal/proxy/client.go +++ b/experimental/ssh/internal/proxy/client.go @@ -2,41 +2,102 @@ package proxy import ( "context" + "errors" "fmt" "io" + "sync/atomic" "time" "github.com/databricks/cli/libs/log" "golang.org/x/sync/errgroup" ) +// clientHandshakeTimeout bounds how long the client waits for the first byte from the SSH server +// after the proxy websocket is established. A healthy sshd sends its identification string +// immediately (RFC 4253 §4.2), so if nothing arrives the server most likely failed to launch +// sshd — e.g. the cluster's container image has no OpenSSH server. The server can hold the +// websocket open in that state, leaving the proxy loops blocked forever, so we bail out instead +// of letting the ssh client hang until its ConnectTimeout. It is a var so tests can shorten it. +var clientHandshakeTimeout = 30 * time.Second + +var errHandshakeTimeout = errors.New("no response from the SSH server: the cluster's container image may be missing an OpenSSH server (sshd) — ensure 'openssh-server' is installed and check the SSH server job run logs") + +// firstByteWriter signals (once) the first time any data is written through it, then forwards +// transparently. The client uses it to detect that the SSH server has started responding. +type firstByteWriter struct { + w io.Writer + signaled atomic.Bool + firstByte chan struct{} +} + +func (f *firstByteWriter) Write(p []byte) (int, error) { + if len(p) > 0 && f.signaled.CompareAndSwap(false, true) { + close(f.firstByte) + } + return f.w.Write(p) +} + func RunClientProxy(ctx context.Context, src io.ReadCloser, dst io.Writer, requestHandoverTick func() <-chan time.Time, createConn createWebsocketConnectionFunc) error { proxy := newProxyConnection(createConn) log.Infof(ctx, "Establishing SSH proxy connection...") - g, gCtx := errgroup.WithContext(ctx) - if err := proxy.connect(gCtx); err != nil { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if err := proxy.connect(ctx); err != nil { return fmt.Errorf("failed to connect to proxy: %w", err) } defer proxy.close() log.Infof(ctx, "SSH proxy connection established") - g.Go(func() error { - for { - select { - case <-gCtx.Done(): - return gCtx.Err() - case <-requestHandoverTick(): - err := proxy.initiateHandover(gCtx) - if err != nil { - return err + wrappedDst := &firstByteWriter{w: dst, firstByte: make(chan struct{})} + + // Run the proxy loops in the background. We don't wait on them directly: if the server holds + // the websocket open without ever launching sshd, both loops can block forever (the sending + // loop on os.Stdin, the receiving loop on ReadMessage), so g.Wait would never return. + done := make(chan error, 1) + go func() { + g, gCtx := errgroup.WithContext(ctx) + g.Go(func() error { + for { + select { + case <-gCtx.Done(): + return gCtx.Err() + case <-requestHandoverTick(): + if err := proxy.initiateHandover(gCtx); err != nil { + return err + } } } - } - }) + }) + g.Go(func() error { + // When proxy.start returns (EOF from ssh, or the server closing the connection), + // cancel so the handover goroutine stops too and g.Wait can return. + defer cancel() + return proxy.start(gCtx, src, wrappedDst) + }) + done <- g.Wait() + }() - g.Go(func() error { - return proxy.start(gCtx, src, dst) - }) + select { + case err := <-done: + // Session ended before the handshake even started (e.g. the server closed the connection). + return normalizeProxyError(err) + case <-wrappedDst.firstByte: + // The server is responding; the handshake is underway. Wait for the session to finish. + return normalizeProxyError(<-done) + case <-time.After(clientHandshakeTimeout): + // cancel() (deferred) unblocks what it can; the process exits and reclaims any goroutine + // still stuck on os.Stdin. ssh then fails fast instead of hanging on its ConnectTimeout. + return errHandshakeTimeout + case <-ctx.Done(): + return nil + } +} - return g.Wait() +// normalizeProxyError treats a clean finish or a context cancellation (our own exit signal, or the +// user interrupting) as success; anything else is a real proxy error. +func normalizeProxyError(err error) error { + if err == nil || errors.Is(err, context.Canceled) { + return nil + } + return err } diff --git a/experimental/ssh/internal/proxy/client_server_test.go b/experimental/ssh/internal/proxy/client_server_test.go index 186cf6b6579..1915cc07c88 100644 --- a/experimental/ssh/internal/proxy/client_server_test.go +++ b/experimental/ssh/internal/proxy/client_server_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "net/http" "net/http/httptest" "os/exec" "sync" @@ -230,3 +231,85 @@ func TestQuickHandover(t *testing.T) { assert.Equal(t, string(expectedOutput), client.Output.String()) } + +// TestClientExitsWhenServerCommandFails reproduces the missing-sshd case: the server accepts the +// websocket but can't launch its command, so it closes the connection immediately. The client +// proxy must exit promptly instead of hanging on the handover goroutine (which would leave the +// ssh client waiting until its ConnectTimeout). +func TestClientExitsWhenServerCommandFails(t *testing.T) { + ctx := cmdio.MockDiscard(t.Context()) + connections := NewConnectionsManager(2, time.Hour) + server := httptest.NewServer(NewProxyServer(ctx, connections, func(ctx context.Context) *exec.Cmd { + // A binary that does not exist: serverCmd.Start() fails, mirroring a missing /usr/sbin/sshd. + return exec.CommandContext(ctx, "databricks-ssh-nonexistent-binary") + })) + defer server.Close() + + wsURL := "ws" + server.URL[4:] + createConn := func(ctx context.Context, connID string) (*websocket.Conn, error) { + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s?id=%s", wsURL, connID), nil) // nolint:bodyclose + return conn, err + } + // Source is never closed by the test; only the server-side close must drive the client to exit. + src, _ := io.Pipe() + requestHandoverTick := func() <-chan time.Time { return time.After(time.Hour) } + + done := make(chan error, 1) + go func() { + done <- RunClientProxy(ctx, src, io.Discard, requestHandoverTick, createConn) + }() + + select { + case <-done: + // Returned promptly after the server closed the connection — no hang. + case <-time.After(10 * time.Second): + t.Fatal("RunClientProxy hung after the server closed the connection") + } +} + +// TestClientTimesOutWhenServerSendsNothing reproduces the harder missing-sshd case: the server +// accepts the websocket but holds it open without ever sending the SSH banner (the real server +// does this after failing to launch sshd). The client must abort on the handshake timeout rather +// than block forever on its read loops. +func TestClientTimesOutWhenServerSendsNothing(t *testing.T) { + original := clientHandshakeTimeout + clientHandshakeTimeout = 300 * time.Millisecond + defer func() { clientHandshakeTimeout = original }() + + ctx := cmdio.MockDiscard(t.Context()) + upgrader := websocket.Upgrader{} + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + // Hold the connection open, sending nothing, until the client goes away. + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + })) + defer server.Close() + + wsURL := "ws" + server.URL[4:] + createConn := func(ctx context.Context, connID string) (*websocket.Conn, error) { + conn, _, err := websocket.DefaultDialer.Dial(fmt.Sprintf("%s?id=%s", wsURL, connID), nil) // nolint:bodyclose + return conn, err + } + src, _ := io.Pipe() + requestHandoverTick := func() <-chan time.Time { return time.After(time.Hour) } + + done := make(chan error, 1) + go func() { + done <- RunClientProxy(ctx, src, io.Discard, requestHandoverTick, createConn) + }() + + select { + case err := <-done: + require.ErrorIs(t, err, errHandshakeTimeout) + case <-time.After(10 * time.Second): + t.Fatal("RunClientProxy did not abort on the handshake timeout") + } +} From 4d5f5328a707bb252529a217fee9cbb8d8ee1ee9 Mon Sep 17 00:00:00 2001 From: Anton Nekipelov <226657+anton-107@users.noreply.github.com> Date: Mon, 8 Jun 2026 09:44:10 +0200 Subject: [PATCH 2/3] experimental/ssh: document how to reproduce connect failure modes Add FAILURE_MODES.md describing how to reproduce, and what the user sees for, the two `databricks ssh connect` container failure modes: a missing OpenSSH server (sshd launched lazily, connection drops after "Connected!") and a container that can't run the Python bootstrap ("Could not reach driver" before sshd matters). Includes example Dockerfiles, a cluster spec, a working control image, how to read the bootstrap job logs, and the local unit tests that cover the same paths. Linked from README. Co-authored-by: Isaac --- experimental/ssh/FAILURE_MODES.md | 149 ++++++++++++++++++++++++++++++ experimental/ssh/README.md | 3 + 2 files changed, 152 insertions(+) create mode 100644 experimental/ssh/FAILURE_MODES.md diff --git a/experimental/ssh/FAILURE_MODES.md b/experimental/ssh/FAILURE_MODES.md new file mode 100644 index 00000000000..18249cc0a5d --- /dev/null +++ b/experimental/ssh/FAILURE_MODES.md @@ -0,0 +1,149 @@ +# Reproducing `databricks ssh connect` failure modes + +This guide documents container/cluster misconfigurations that make `databricks ssh connect` +fail, how to reproduce each one, the symptom the user sees, and where the real error lives. It +is primarily a testing aid for the SSH feature's error-handling paths. + +For the connection flow and architecture, see [README.md](./README.md). + +## Background: where failures surface + +The bootstrap is a **Python notebook job** that starts `databricks ssh server` on the cluster. +The server publishes its port to the workspace (`metadata.json`), the client reads it, prints +`Connected!`, and spawns `ssh`. The SSH daemon (`/usr/sbin/sshd`) is launched **lazily, per +client connection** (see `internal/server/sshd.go` and `internal/proxy/server.go`). Because of +this ordering, different misconfigurations fail at different stages: + +| Stage | Needs | Failure mode if missing | +| --- | --- | --- | +| Bootstrap job runs | a working Databricks **Python** runtime in the image | [Mode 2](#mode-2-container-cant-run-the-python-bootstrap) | +| Per-connection SSH | **`/usr/sbin/sshd`** (OpenSSH server) in the image | [Mode 1](#mode-1-container-missing-the-openssh-server-sshd) | + +## Prerequisites + +- A workspace with **Databricks Container Services** (custom Docker images) enabled. +- Permission to create a **dedicated (single-user)** cluster. +- A dev build of the CLI. See the *Development* section of [README.md](./README.md): + ```shell + ./task build snapshot-release + ./cli ssh connect --cluster= --releases-dir=./dist --debug + ``` +- A container registry the workspace can pull from (e.g. a public Docker Hub repo) to host the + test images below. Build them for the cluster's architecture (`linux/amd64` on most clouds): + ```shell + docker buildx build --platform linux/amd64 -t /: --push . + ``` + +The cluster specs below use a single-node dedicated cluster. Adjust `node_type_id` and +`spark_version` for your cloud and DBR version: + +```json +{ + "cluster_name": "ssh-failure-repro", + "spark_version": "16.4.x-scala2.12", + "node_type_id": "", + "num_workers": 0, + "data_security_mode": "SINGLE_USER", + "single_user_name": "", + "spark_conf": { "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*, 4]" }, + "custom_tags": { "ResourceClass": "SingleNode" }, + "autotermination_minutes": 60, + "docker_image": { "url": "/:" } +} +``` + +Create it with `databricks clusters create --json @cluster.json --no-wait` and wait for the +`RUNNING` state (a custom-container pull can take several minutes). + +## Mode 1: container missing the OpenSSH server (`sshd`) + +A notebook-capable image that does **not** ship `openssh-server`. Build it by removing the SSH +server from an image that otherwise works: + +```dockerfile +FROM databricksruntime/standard:16.4-LTS +RUN (apt-get remove -y openssh-server || true) \ + && rm -f /usr/sbin/sshd /usr/bin/sshd +``` + +Create a cluster on this image, then: + +```shell +./cli ssh connect --cluster= --releases-dir=./dist +``` + +**Symptom.** The bootstrap job succeeds and publishes metadata, so the client prints +`Connected!` — and then the connection drops. The server can't launch `/usr/sbin/sshd` for the +incoming connection and holds the websocket open, so historically the `ssh` client **hung** +until its `ConnectTimeout`. The real error, +`failed to start SSHD process: ... /usr/sbin/sshd: no such file or directory`, is only written +to the bootstrap job's **stdout logs** while the job is still `RUNNING` — it is never a failed +job state. + +**With the error-handling improvements** the client aborts after a handshake timeout (no SSH +banner from the server) with an actionable hint to install `openssh-server`, and exits +promptly instead of hanging. + +**Fix.** Install `openssh-server` in the image (`apt-get install -y openssh-server`). + +## Mode 2: container can't run the Python bootstrap + +A bare/minimal base that lacks a working Databricks Python runtime. The simplest example is +`databricksruntime/rbase:16.4-LTS` used directly as the cluster image (it is an R *base* layer; +notably it has no functioning `/databricks/python` notebook-execution environment). + +Create a cluster on `databricksruntime/rbase:16.4-LTS`, then: + +```shell +./cli ssh connect --cluster= --releases-dir=./dist +``` + +**Symptom.** The bootstrap is a Python notebook, but the image can't execute notebook commands, +so the job fails with `Could not reach driver of cluster `. The SSH server never starts and +never publishes metadata, so the client fails with +`server metadata error / ... metadata.json doesn't exist` — **before** the `sshd` step is ever +reached. (A trivial `print(...)` notebook job submitted to the same cluster fails the same way, +which is a quick way to confirm the image, not the SSH feature, is at fault.) + +**With the error-handling improvements** the client fetches the failed run's state message, +notebook error/trace, and run-page URL and shows them instead of the generic metadata error. + +**Fix.** Build on a notebook-capable base (e.g. `databricksruntime/standard:...`) or otherwise +provide a working Databricks Python environment, in addition to `openssh-server`. + +## Working control + +`databricksruntime/standard:16.4-LTS` ships **both** a working Python runtime **and** `sshd`, +so `ssh connect` to a cluster on it succeeds end to end. Use it as a baseline to confirm your +workspace, cluster spec, and dev build are healthy before reproducing a failure mode. + +## Inspecting the bootstrap job logs + +`ssh connect` prints `Job submitted successfully with run ID: `. Inspect it with: + +```shell +databricks jobs get-run # open run_page_url in the UI +databricks jobs get-run-output # task-run-id = .tasks[0].run_id of the run +``` + +Caveat: for a **running** server task, `get-run-output`'s `logs`/`error` are not populated — +the `sshd` error from [Mode 1](#mode-1-container-missing-the-openssh-server-sshd) lives in the +live notebook cell stdout / driver logs, not the Jobs run-output API. A failed run from +[Mode 2](#mode-2-container-cant-run-the-python-bootstrap) does populate the run's state message +and error. + +## Reproducing locally, without a workspace + +The proxy-layer behaviors have unit tests that don't need a cluster: + +- `internal/proxy/client_server_test.go` + - `TestClientExitsWhenServerCommandFails` — server can't launch its command and closes the + connection; the client exits promptly. + - `TestClientTimesOutWhenServerSendsNothing` — server holds the connection open and sends + nothing (the Mode 1 shape); the client aborts on the handshake timeout. +- `internal/client/client_internal_test.go` — formatting of a failed bootstrap run's error + (state message, error trace, run-page URL) using SDK mocks. + +```shell +go test ./experimental/ssh/... +``` diff --git a/experimental/ssh/README.md b/experimental/ssh/README.md index 40b553d4bff..769006ef2df 100644 --- a/experimental/ssh/README.md +++ b/experimental/ssh/README.md @@ -23,6 +23,9 @@ databricks ssh connect --cluster=id ./cli ssh connect --cluster= --releases-dir=./dist --debug # or modify ssh config accordingly ``` +To reproduce and test the known `ssh connect` failure modes (container missing `sshd`, or a +container that can't run the Python bootstrap), see [FAILURE_MODES.md](./FAILURE_MODES.md). + ## Design High level: From b573464ed43b3759bfe860b3454e37b04b396afa Mon Sep 17 00:00:00 2001 From: Anton Nekipelov <226657+anton-107@users.noreply.github.com> Date: Mon, 8 Jun 2026 12:12:21 +0200 Subject: [PATCH 3/3] ci: re-trigger checks after transient CPython download outage Co-authored-by: Isaac