diff --git a/src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt b/src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt index f6cb7fb6..9ae38bc7 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt @@ -11,35 +11,63 @@ */ package com.redhat.devtools.gateway -import com.intellij.openapi.application.ApplicationManager -import com.intellij.openapi.application.ModalityState import com.intellij.openapi.diagnostic.thisLogger -import com.intellij.openapi.project.ex.ProjectManagerEx import com.jetbrains.gateway.thinClientLink.LinkedClientManager import com.jetbrains.gateway.thinClientLink.ThinClientHandle import com.jetbrains.rd.util.lifetime.Lifetime +import com.redhat.devtools.gateway.devworkspace.DevWorkspace import com.redhat.devtools.gateway.devworkspace.DevWorkspacePatch import com.redhat.devtools.gateway.devworkspace.DevWorkspaceRestart import com.redhat.devtools.gateway.devworkspace.DevWorkspaces import com.redhat.devtools.gateway.devworkspace.RestartDevWorkspaceAnnotationWatch import com.redhat.devtools.gateway.openshift.DevWorkspacePods +import com.redhat.devtools.gateway.openshift.PodForwardResolution import com.redhat.devtools.gateway.server.RemoteIDEServer import com.redhat.devtools.gateway.server.RemoteIDEServerStatus import com.redhat.devtools.gateway.util.ProgressCountdown +import com.redhat.devtools.gateway.util.closeAllProjects +import com.redhat.devtools.gateway.util.findFreePort import com.redhat.devtools.gateway.util.isCancellationException import com.redhat.devtools.gateway.view.ui.Dialogs -import io.kubernetes.client.openapi.ApiClient +import io.kubernetes.client.openapi.models.V1Pod import kotlinx.coroutines.* import java.io.Closeable import java.io.IOException -import java.net.ServerSocket import java.net.URI import java.util.concurrent.CancellationException import java.util.concurrent.atomic.AtomicBoolean import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds +/** + * Connects Gateway to a workspace IDE via port-forward and thin client. + * + * ## Session recovery routing + * + * Two independent paths recover from pod or workspace changes. They must not run at the same time. + * + * **Unplanned pod roll** (pod UID changes, no restart annotation): + * [WorkspacePodTracker] detects the roll and invokes [ThinClientReconnect] ("Reconnecting to workspace"). + * [PortForwardPodResolver] waits for a running pod when needed, then re-establishes port-forward + * in parallel with session reconnect. The existing local port and forwarder are reused. + * + * **User-initiated restart** (Remote IDE sets [DevWorkspacePatch.RESTART_KEY] on the DevWorkspace): + * [RestartDevWorkspaceAnnotationWatch] triggers [DevWorkspaceRestart] ("Restart workspace"), which + * stops and starts the DevWorkspace and opens a new connection. While the annotation is present, + * [WorkspacePodTracker] skips pod-roll reconnect so the two handlers do not compete. + * + * Wiring: [setupThinClientReconnect] registers the tracker callback; [watchRestartAnnotation] starts + * the annotation watch. [isWorkspaceRestartInProgress] is the guard passed into the tracker. + */ class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) { + + companion object { + val THIN_CLIENT_TIMEOUT: kotlin.time.Duration = 60.seconds + val THIN_CLIENT_POLL_DELAY: kotlin.time.Duration = 200.milliseconds + } + + private val thinClientReconnectScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + @Throws(Exception::class) @Suppress("UnstableApiUsage") suspend fun connect( @@ -53,138 +81,334 @@ class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) { val workspace = devSpacesContext.devWorkspace devSpacesContext.addWorkspace(workspace) - var remoteIdeServer: RemoteIDEServer? = null - var forwarder: Closeable? = null - var client: ThinClientHandle? = null + var thinClient: ThinClientHandle? = null + var sessionCtx: ThinClientSessionContext? = null return try { - var remoteIdeServerStatus: RemoteIDEServerStatus = RemoteIDEServerStatus.empty() - while (!remoteIdeServerStatus.isReady) { - checkCancelled?.invoke() - onProgress?.invoke(ProgressCountdown.ProgressEvent( - message = "Waiting for the workspace to get ready...", - countdownSeconds = DevWorkspaces.RUNNING_TIMEOUT)) - - DevWorkspaces(devSpacesContext.client) - .startAndWait( - devSpacesContext.devWorkspace.namespace, - devSpacesContext.devWorkspace.name, - checkCancelled = checkCancelled) - - checkCancelled?.invoke() - onProgress?.invoke(ProgressCountdown.ProgressEvent( - message = "Waiting for the workspace to get ready...", - countdownSeconds = RemoteIDEServer.readyTimeout)) - - remoteIdeServer = RemoteIDEServer(devSpacesContext) - remoteIdeServerStatus = runCatching { - remoteIdeServer.apply { waitServerReady(checkCancelled) }.getStatus() - }.getOrElse { e -> - if (e.isCancellationException()) throw e - RemoteIDEServerStatus.empty() - } - - checkCancelled?.invoke() - if (!remoteIdeServerStatus.isReady) { - val restartWorkspace = Dialogs.ideNotResponding() - - if (restartWorkspace) { - // User chose "Restart Pod": stop the Pod and try starting from scratch - DevWorkspaces(devSpacesContext.client).stopAndWait( - devSpacesContext.devWorkspace.namespace, - devSpacesContext.devWorkspace.name, - checkCancelled = checkCancelled - ) - continue - } else { - // User chose "Cancel Connection" - throw CancellationException("User cancelled the operation") - } - } - } + val (remoteIdeServer, remoteIdeServerStatus) = waitForWorkspaceReady( + onProgress = onProgress, + checkCancelled = checkCancelled, + ) - checkCancelled?.invoke() val joinLink = remoteIdeServerStatus.joinLink ?: throw IOException("Could not connect, workspace IDE is not ready. No join link present.") - check(remoteIdeServer != null) checkCancelled?.invoke() onProgress?.invoke(ProgressCountdown.ProgressEvent( - message = "Waiting for the workspace IDE client to start...")) + message = "Waiting for the workspace IDE client to start...") + ) - val pods = DevWorkspacePods(devSpacesContext.client) val localPort = findFreePort() - forwarder = pods.forward(remoteIdeServer.pod, localPort, 5990) - pods.waitForForwardReady(localPort) - val effectiveJoinLink = joinLink.replace(":5990", ":$localPort") + sessionCtx = ThinClientSessionContext( + localPort = localPort, + remoteIdeServer = remoteIdeServer, + forwarder = null, + onConnected = onConnected, + onDisconnected = onDisconnected, + onDevWorkspaceStopped = onDevWorkspaceStopped, + checkCancelled = checkCancelled, + ) - val lifetimeDef = Lifetime.Eternal.createNested() - lifetimeDef.lifetime.onTermination { onClientClosed( client, onDisconnected, onDevWorkspaceStopped, remoteIdeServer, forwarder) } + val forwardRecoveryProgress = ForwardRecoveryProgress( + scope = thinClientReconnectScope, + sessionCtx = sessionCtx, + isWorkspaceRestartInProgress = ::isWorkspaceRestartInProgress, + onCanceled = { + sessionCtx.intentionalDisconnect.set(true) + onClientClosed(thinClient, sessionCtx) + }, + ) - val finished = AtomicBoolean(false) + val tracker = WorkspacePodTracker( + remoteIdeServer, + ::isWorkspaceRestartInProgress, + ) + tracker.seed(remoteIdeServer.getPod()) - checkCancelled?.invoke() - client = LinkedClientManager - .getInstance() - .startNewClient( - Lifetime.Eternal, - URI(effectiveJoinLink), - "", - onConnected, // Triggers enableButtons() via view - false - ) - - client.onClientPresenceChanged.advise(client.lifetime) { finished.set(true) } - client.clientClosed.advise(client.lifetime) { - onClientClosed(client, onDisconnected, onDevWorkspaceStopped, remoteIdeServer, forwarder) - finished.set(true) - } - client.clientFailedToOpenProject.advise(client.lifetime) { - onClientClosed(client, onDisconnected, onDevWorkspaceStopped, remoteIdeServer, forwarder) - finished.set(true) - } + setupThinClientReconnect( + remoteIdeServer = remoteIdeServer, + sessionCtx = sessionCtx, + forwardRecoveryProgress = forwardRecoveryProgress, + getCurrentClient = { thinClient }, + startThinClient = ::startThinClient, + onClientHandleReplaced = { newClient -> + thinClient = newClient + if (registerRestartWatcher == true) { + watchRestartAnnotation(newClient, workspace) + } + }, + onClientClosed = this@DevSpacesConnection::onClientClosed, + tracker = tracker, + ) - val success = withTimeoutOrNull(60.seconds) { - while (!finished.get()) { - checkCancelled?.invoke() - delay(200.milliseconds) - } - true - } ?: false + val portForwardResolver = PortForwardPodResolver( + tracker = tracker, + sessionCtx = sessionCtx, + forwardRecovery = forwardRecoveryProgress, + ) - // Check if the thin client has opened - check(success && client.clientPresent) { - "Could not connect, workspace IDE is not ready." - } + sessionCtx.forwarder = startForwarding( + podResolver = portForwardResolver::resolve, + pods = DevWorkspacePods(devSpacesContext.client), + localPort = localPort + ) + + thinClient = startThinClient(joinLink, sessionCtx) - // Watch for restart annotation on the DevWorkspace if (registerRestartWatcher == true) { - watchRestartAnnotation( - workspace.namespace, - workspace.name, - devSpacesContext.client, - client - ) + watchRestartAnnotation(thinClient, workspace) } onConnected() - client + thinClient } catch (e: Exception) { - runCatching { client?.close() } - onClientClosed(client, onDisconnected, onDevWorkspaceStopped, remoteIdeServer, forwarder) + cleanupOnFailure( + client = thinClient, + sessionCtx = sessionCtx, + forwarder = sessionCtx?.forwarder, + workspace = workspace, + onDisconnected = onDisconnected, + ) throw e } } + @Throws(Exception::class) + private suspend fun waitForWorkspaceReady( + onProgress: ((value: ProgressCountdown.ProgressEvent) -> Unit)?, + checkCancelled: (() -> Unit)?, + ): Pair { + var remoteIdeServer: RemoteIDEServer? = null + var remoteIdeServerStatus: RemoteIDEServerStatus = RemoteIDEServerStatus.empty() + + while (!remoteIdeServerStatus.isReady) { + checkCancelled?.invoke() + onProgress?.invoke(ProgressCountdown.ProgressEvent( + message = "Waiting for the workspace to get ready...", + countdownSeconds = DevWorkspaces.RUNNING_TIMEOUT)) + + DevWorkspaces(devSpacesContext.client) + .startAndWait( + devSpacesContext.devWorkspace.namespace, + devSpacesContext.devWorkspace.name, + checkCancelled = checkCancelled) + + checkCancelled?.invoke() + onProgress?.invoke(ProgressCountdown.ProgressEvent( + message = "Waiting for the workspace to get ready...", + countdownSeconds = RemoteIDEServer.READY_TIMEOUT)) + + remoteIdeServer = RemoteIDEServer(devSpacesContext) + remoteIdeServerStatus = fetchServerStatus(remoteIdeServer, checkCancelled) + + checkCancelled?.invoke() + if (!remoteIdeServerStatus.isReady) { + if (Dialogs.ideNotResponding()) { + // User chose "Restart Pod": stop the Pod and try starting from scratch + DevWorkspaces(devSpacesContext.client).stopAndWait( + devSpacesContext.devWorkspace.namespace, + devSpacesContext.devWorkspace.name, + checkCancelled = checkCancelled + ) + continue + } else { + // User chose "Cancel Connection" + throw CancellationException("User cancelled the operation") + } + } + } + + return remoteIdeServer!! to remoteIdeServerStatus + } + + private suspend fun fetchServerStatus( + remoteIdeServer: RemoteIDEServer, + checkCancelled: (() -> Unit)? + ): RemoteIDEServerStatus = runCatching { + remoteIdeServer.waitServerReady(checkCancelled) + remoteIdeServer.fetchStatus(checkCancelled) + }.getOrElse { e -> + if (e.isCancellationException()) throw e + RemoteIDEServerStatus.empty() + } + + private fun cleanupOnFailure( + client: ThinClientHandle?, + sessionCtx: ThinClientSessionContext?, + forwarder: Closeable?, + workspace: DevWorkspace, + onDisconnected: () -> Unit, + ) { + runCatching { client?.close() } + if (sessionCtx != null) { + onClientClosed(client, sessionCtx) + } else { + runCatching { forwarder?.close() } + devSpacesContext.removeWorkspace(workspace) + runCatching { onDisconnected() } + } + } + + private suspend fun startForwarding( + podResolver: suspend () -> PodForwardResolution, + pods: DevWorkspacePods, + localPort: Int, + ): Closeable { + val forwarder = pods.forward( + podResolver, + localPort, + 5990, + RemoteIDEServer.READY_TIMEOUT + ) + pods.waitForForwardReady(localPort) + return forwarder + } + + private fun createSessionContext( + localPort: Int, + remoteIdeServer: RemoteIDEServer, + onConnected: () -> Unit, + onDisconnected: () -> Unit, + onDevWorkspaceStopped: () -> Unit, + checkCancelled: (() -> Unit)?, + ): Pair { + val ctx = ThinClientSessionContext( + localPort = localPort, + remoteIdeServer = remoteIdeServer, + forwarder = null, + onConnected = onConnected, + onDisconnected = onDisconnected, + onDevWorkspaceStopped = onDevWorkspaceStopped, + checkCancelled = checkCancelled, + ) + + val tracker = WorkspacePodTracker( + remoteIdeServer, + ::isWorkspaceRestartInProgress, + ) + + return ctx to tracker + } + + /** + * Wires [WorkspacePodTracker.onPodRoll] to [ThinClientReconnect.execute]. + * + * Pod-roll routing (annotation absent) is decided in the tracker; this method only connects + * the callback after [ThinClientReconnect] is constructed for the session. + */ + private fun setupThinClientReconnect( + remoteIdeServer: RemoteIDEServer, + sessionCtx: ThinClientSessionContext, + forwardRecoveryProgress: ForwardRecoveryProgress, + getCurrentClient: () -> ThinClientHandle?, + startThinClient: suspend (String, ThinClientSessionContext) -> ThinClientHandle, + onClientHandleReplaced: (ThinClientHandle) -> Unit, + onClientClosed: (ThinClientHandle?, ThinClientSessionContext) -> Unit, + tracker: WorkspacePodTracker + ) { + val thinClientReconnect = ThinClientReconnect( + remoteIdeServer = remoteIdeServer, + sessionCtx = sessionCtx, + getCurrentClient = getCurrentClient, + startThinClient = startThinClient, + onClientHandleReplaced = onClientHandleReplaced, + onClientClosed = onClientClosed, + ) + tracker.onPodRoll = { pod -> + forwardRecoveryProgress.dismiss() + thinClientReconnect.execute(pod) + } + } + + /** + * Returns true when the DevWorkspace carries [DevWorkspacePatch.RESTART_KEY]. + * + * Used by [WorkspacePodTracker] to suppress pod-roll reconnect while + * [DevWorkspaceRestart] owns the session recovery. + */ + private fun isWorkspaceRestartInProgress(): Boolean = + runCatching { + val workspace = devSpacesContext.devWorkspace + DevWorkspaces(devSpacesContext.client).isRestarting( + workspace.namespace, + workspace.name, + ) + }.getOrDefault(false) + + @Suppress("UnstableApiUsage") + private suspend fun startThinClient(joinLink: String, ctx: ThinClientSessionContext): ThinClientHandle { + val effectiveJoinLink = joinLink.replace(":5990", ":${ctx.localPort}") + val client = LinkedClientManager + .getInstance() + .startNewClient( + Lifetime.Eternal, + URI(effectiveJoinLink), + "", + ctx.onConnected, + false + ) + + val finished = AtomicBoolean(false) + + ctx.checkCancelled?.invoke() + attachClientListeners(client, finished, ctx) { + onClientClosed(client, ctx) + } + + client.onClientPresenceChanged.advise(client.lifetime) { finished.set(true) } + + thisLogger().warn( + "Thin client: waiting for connection on local port ${ctx.localPort} (join link host redirected from :5990)" + ) + val success = withTimeoutOrNull(THIN_CLIENT_TIMEOUT) { + while (!finished.get()) { + ctx.checkCancelled?.invoke() + delay(THIN_CLIENT_POLL_DELAY) + } + true + } ?: false + + check(success && client.clientPresent) { + "Could not connect, workspace IDE is not ready (local port ${ctx.localPort}, clientPresent=${client.clientPresent})." + } + + return client + } + @Suppress("UnstableApiUsage") - private fun watchRestartAnnotation(namespace: String, workspaceName: String, kubeClient: ApiClient, thinClient: ThinClientHandle) { + private fun attachClientListeners( + client: ThinClientHandle, + finished: AtomicBoolean, + session: ThinClientSessionContext, + onClosed: () -> Unit, + ) { + client.clientClosed.advise(client.lifetime) { + if (!session.reconnecting.get()) { + onClosed() + } + finished.set(true) + } + client.clientFailedToOpenProject.advise(client.lifetime) { + if (!session.reconnecting.get()) { + onClosed() + } + finished.set(true) + } + } + + /** + * Starts the **user-initiated restart** watch ([RestartDevWorkspaceAnnotationWatch] → [DevWorkspaceRestart]). + * Complements pod-roll detection in [WorkspacePodTracker] / [ThinClientReconnect]. + */ + @Suppress("UnstableApiUsage") + private fun watchRestartAnnotation(thinClient: ThinClientHandle, workspace: DevWorkspace) { val restartWatchScope = CoroutineScope(SupervisorJob() + Dispatchers.IO) RestartDevWorkspaceAnnotationWatch( onRestartAnnotated(thinClient), - kubeClient, - namespace, - workspaceName + devSpacesContext.client, + workspace.namespace, + workspace.name ).start(restartWatchScope) thinClient.lifetime.onTermination { @@ -193,9 +417,7 @@ class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) { } @Suppress("UnstableApiUsage") - private fun onRestartAnnotated( - thinClient: ThinClientHandle - ): () -> Job { + private fun onRestartAnnotated(thinClient: ThinClientHandle): () -> Job { return { CoroutineScope(Dispatchers.IO).launch { DevWorkspaceRestart(devSpacesContext).execute(thinClient) @@ -204,14 +426,13 @@ class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) { } @Suppress("UnstableApiUsage") - private fun onClientClosed( - client: ThinClientHandle? = null, - onDisconnected: () -> Unit, - onDevWorkspaceStopped: () -> Unit, - remoteIdeServer: RemoteIDEServer?, - forwarder: Closeable? - ) { + internal fun onClientClosed(client: ThinClientHandle?, session: ThinClientSessionContext) { CoroutineScope(Dispatchers.IO).launch { + if (session.reconnecting.get()) { + thisLogger().debug("Skipping disconnect cleanup during pod-roll reconnect") + return@launch + } + thinClientReconnectScope.cancel() runCatching { client?.close() } val workspace = devSpacesContext.devWorkspace val workspacePatch = DevWorkspacePatch( @@ -224,50 +445,26 @@ class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) { ) try { if (workspacePatch.hasRestartAnnotation()) { - /** - * user triggered restart - * logic to restart workspace is in [onRestartAnnotated] - * The annotation will be cleaned up by DevWorkspaceRestart - */ closeAllProjects() - } else if (true == remoteIdeServer?.waitServerTerminated()) { - /** - * user closed IDE and clicked "Close and Stop" - */ + } else if ( + session.intentionalDisconnect.get() + || session.remoteIdeServer.waitServerTerminated() + ) { DevWorkspaces(devSpacesContext.client) .stop(workspace.namespace, workspace.name) - .also { onDevWorkspaceStopped() } + .also { session.onDevWorkspaceStopped() } } } finally { - runCatching { - forwarder?.close() - }.onFailure { e -> - thisLogger().debug("Failed to close port forwarder", e) - } + closeForwarder(session.forwarder) devSpacesContext.removeWorkspace(workspace) - runCatching { onDisconnected() } + runCatching { session.onDisconnected() } } } } - private fun closeAllProjects() { - ApplicationManager.getApplication().invokeLater( - { - val pm = ProjectManagerEx.getInstanceEx() - for (project in pm.openProjects.toList()) { - if (!project.isDisposed) { - pm.closeAndDispose(project) - } - } - }, - ModalityState.nonModal() - ) + private fun closeForwarder(forwarder: Closeable?) { + runCatching { forwarder?.close() } + .onFailure { e -> thisLogger().debug("Failed to close port forwarder", e) } } - private fun findFreePort(): Int { - ServerSocket(0).use { socket -> - socket.reuseAddress = true - return socket.localPort - } - } } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgress.kt b/src/main/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgress.kt new file mode 100644 index 00000000..15551c3e --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgress.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.intellij.openapi.progress.ProcessCanceledException +import com.intellij.openapi.progress.ProgressIndicator +import com.intellij.openapi.progress.ProgressManager +import com.intellij.openapi.progress.Task +import com.redhat.devtools.gateway.util.WorkspaceSessionProgress +import com.redhat.devtools.gateway.util.checkProgressCanceled +import com.redhat.devtools.gateway.util.clearProgressText2Safely +import com.redhat.devtools.gateway.util.delayRespectingProgress +import com.redhat.devtools.gateway.util.updateProgress +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +/** + * Shows delayed progress when port-forward cannot resolve a workspace pod for an extended period. + * + * Progress appears only after [showAfter] of sustained unavailability so brief glitches stay silent. + * Suppressed while pod-roll reconnect or annotated restart handlers own recovery. + */ +internal class ForwardRecoveryProgress( + private val scope: CoroutineScope, + private val sessionCtx: ThinClientSessionContext, + private val isWorkspaceRestartInProgress: () -> Boolean, + private val onCanceled: () -> Unit, + private val showAfter: Duration = DEFAULT_SHOW_AFTER, +) { + private val waitingSinceMillis = AtomicLong(0) + private val progressActive = AtomicBoolean(false) + private var showProgressJob: Job? = null + + companion object { + private val DEFAULT_SHOW_AFTER: Duration = 20_000.milliseconds + private val PROGRESS_POLL_DELAY: Duration = 500.milliseconds + const val PROGRESS_TITLE: String = "Reconnecting to workspace" + } + + /** Called when no workspace pod is available yet. */ + fun onPodUnavailable() { + if (shouldSuppress()) { + reset() + return + } + if (waitingSinceMillis.compareAndSet(0, System.currentTimeMillis())) { + scheduleShowProgress() + } + } + + /** Called when a pod is resolved for port-forwarding. */ + fun onPodResolved() { + reset() + } + + /** Called when a dedicated recovery handler (pod-roll reconnect) takes over. */ + fun dismiss() { + reset() + } + + private fun shouldSuppress(): Boolean = + isWorkspaceRestartInProgress() || sessionCtx.reconnecting.get() + + private fun scheduleShowProgress() { + showProgressJob?.cancel() + showProgressJob = scope.launch { + delay(showAfter) + if (waitingSinceMillis.get() != 0L && !shouldSuppress()) { + maybeShowProgress() + } + } + } + + private fun maybeShowProgress() { + if (!progressActive.compareAndSet(false, true)) return + ProgressManager.getInstance().run(object : Task.Backgroundable(null, PROGRESS_TITLE, true) { + override fun run(indicator: ProgressIndicator) { + indicator.isIndeterminate = true + runBlocking { + try { + indicator.updateProgress(WorkspaceSessionProgress.WAITING_FOR_POD, 0.0) + while (waitingSinceMillis.get() != 0L && !shouldSuppress()) { + delayRespectingProgress(indicator, PROGRESS_POLL_DELAY) + } + } catch (e: ProcessCanceledException) { + onCanceled() + throw e + } finally { + indicator.clearProgressText2Safely() + progressActive.set(false) + } + } + } + }) + } + + private fun reset() { + showProgressJob?.cancel() + showProgressJob = null + waitingSinceMillis.set(0) + } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/PodResolution.kt b/src/main/kotlin/com/redhat/devtools/gateway/PodResolution.kt new file mode 100644 index 00000000..a366cf76 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/PodResolution.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import io.kubernetes.client.openapi.models.V1Pod + +/** + * Outcome of resolving the workspace pod for session recovery routing. + * + * Domain vocabulary only — no port-forward or progress semantics. + */ +sealed class PodResolution { + data class Ready(val pod: V1Pod) : PodResolution() + data object Unavailable : PodResolution() + data class RollDelegated(val pod: V1Pod) : PodResolution() + data object RestartSuppressed : PodResolution() +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/PortForwardPodResolver.kt b/src/main/kotlin/com/redhat/devtools/gateway/PortForwardPodResolver.kt new file mode 100644 index 00000000..1d1dc22c --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/PortForwardPodResolver.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.intellij.openapi.diagnostic.thisLogger +import com.redhat.devtools.gateway.openshift.DevWorkspacePods +import com.redhat.devtools.gateway.openshift.PodForwardResolution +import com.redhat.devtools.gateway.util.podLogIdentity +import io.kubernetes.client.openapi.models.V1Pod +import kotlinx.coroutines.delay +import kotlin.time.Duration.Companion.seconds + +/** + * Adapts [WorkspacePodTracker] outcomes to port-forward transport and recovery progress. + * + * While no pod is available, [resolve] polls until a running pod appears before returning. + * On pod roll, the rolled pod is returned immediately so port-forward can re-establish in + * parallel with [ThinClientReconnect] (IDE wait via exec, then thin client via local port). + */ +internal class PortForwardPodResolver( + private val tracker: WorkspacePodTracker, + private val sessionCtx: ThinClientSessionContext, + private val forwardRecovery: ForwardRecoveryProgress, +) { + companion object { + private val POD_POLL_DELAY = DevWorkspacePods.DEFAULT_RECONNECT_DELAY_SECONDS.seconds + } + + suspend fun resolve(): PodForwardResolution { + return when (val result = tracker.resolvePod()) { + is PodResolution.Ready -> { + forwardRecovery.onPodResolved() + PodForwardResolution(result.pod) + } + is PodResolution.Unavailable -> { + PodForwardResolution(waitForRunningPod()) + } + is PodResolution.RollDelegated -> { + forwardRecovery.dismiss() + thisLogger().info( + "Port forward: pod roll to ${podLogIdentity(result.pod)}, " + + "re-establishing forward on local port ${sessionCtx.localPort}" + ) + PodForwardResolution(result.pod) + } + is PodResolution.RestartSuppressed -> { + forwardRecovery.dismiss() + PodForwardResolution(waitForRunningPod()) + } + } + } + + /** Polls until a running pod is available. */ + private suspend fun waitForRunningPod(): V1Pod { + forwardRecovery.onPodUnavailable() + thisLogger().info( + "Port forward: waiting for workspace pod on local port ${sessionCtx.localPort}" + ) + while (true) { + delay(POD_POLL_DELAY) + when (val result = tracker.resolvePod()) { + is PodResolution.Ready -> { + forwardRecovery.onPodResolved() + return result.pod + } + is PodResolution.RollDelegated -> { + forwardRecovery.dismiss() + thisLogger().info( + "Port forward: pod roll to ${podLogIdentity(result.pod)} during wait, " + + "re-establishing forward on local port ${sessionCtx.localPort}" + ) + return result.pod + } + is PodResolution.RestartSuppressed -> forwardRecovery.dismiss() + is PodResolution.Unavailable -> Unit + } + } + } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/ThinClientReconnect.kt b/src/main/kotlin/com/redhat/devtools/gateway/ThinClientReconnect.kt new file mode 100644 index 00000000..3739ae69 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/ThinClientReconnect.kt @@ -0,0 +1,241 @@ +/* + * Copyright (c) 2024-2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.intellij.openapi.application.ApplicationManager +import com.intellij.openapi.diagnostic.thisLogger +import com.intellij.openapi.progress.ProgressIndicator +import com.intellij.openapi.progress.ProgressManager +import com.intellij.openapi.progress.Task +import com.jetbrains.gateway.thinClientLink.ThinClientHandle +import com.redhat.devtools.gateway.server.RemoteIDEServer +import com.redhat.devtools.gateway.util.WorkspaceSessionProgress +import com.redhat.devtools.gateway.util.checkProgressCanceled +import com.redhat.devtools.gateway.util.clearProgressText2Safely +import com.redhat.devtools.gateway.util.delayRespectingProgress +import com.redhat.devtools.gateway.util.isCancellationException +import com.redhat.devtools.gateway.util.isTimeoutException +import com.redhat.devtools.gateway.util.podLogIdentity +import com.redhat.devtools.gateway.util.setProgressText2 +import com.redhat.devtools.gateway.util.updateProgress +import com.redhat.devtools.gateway.view.ui.Dialogs +import io.kubernetes.client.openapi.ApiException +import io.kubernetes.client.openapi.models.V1Pod +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import java.io.IOException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +/** + * Reconnects the thin client after an **unplanned** pod roll (CRW-11119). + * + * ## Session recovery routing + * + * Invoked only from [WorkspacePodTracker.onPodRoll] when the pod UID changes and + * [DevWorkspacePatch.RESTART_KEY] is **not** set. User-initiated restarts are routed to + * [com.redhat.devtools.gateway.devworkspace.DevWorkspaceRestart] instead; the tracker suppresses + * this handler while that annotation is present. + * + * Keeps the existing session: same local port, port forwarder, and [ThinClientSessionContext]. + * Progress title: "Reconnecting to workspace" ([PROGRESS_TITLE]). + * + * Closes the old thin client, waits for the new pod's IDE via [RemoteIDEServer.awaitJoinLink], + * and starts a new thin client on the same forwarder. + * + * Transient failures are retried up to [MAX_RETRIES] times. Permanent failures, exhausted retries, + * or user cancellation tear down the session via [onClientClosed]. + */ +internal class ThinClientReconnect( + private val remoteIdeServer: RemoteIDEServer, + private val sessionCtx: ThinClientSessionContext, + private val getCurrentClient: () -> ThinClientHandle?, + private val startThinClient: suspend (String, ThinClientSessionContext) -> ThinClientHandle, + private val onClientHandleReplaced: (ThinClientHandle) -> Unit, + private val onClientClosed: (ThinClientHandle?, ThinClientSessionContext) -> Unit, +) { + + companion object { + private const val MAX_RETRIES: Int = 3 + private const val PROGRESS_TITLE: String = "Reconnecting to workspace" + private val RETRY_DELAY: Duration = 10.seconds + private val RECONNECT_CLEANUP_DRAIN_DELAY: Duration = 500.milliseconds + } + + fun execute(pod: V1Pod) { + ProgressManager.getInstance().run(object : Task.Backgroundable( + null, PROGRESS_TITLE, true + ) { + override fun run(indicator: ProgressIndicator) { + indicator.isIndeterminate = false + runBlocking { + onPodRoll(pod, indicator) + } + } + }) + } + + suspend fun onPodRoll(pod: V1Pod, indicator: ProgressIndicator? = null) { + if (!sessionCtx.reconnecting.compareAndSet(false, true)) { + thisLogger().warn( + "Pod roll: reconnect already in progress, skipping duplicate for ${podLogIdentity(pod)}" + ) + return + } + + thisLogger().info( + "Pod roll: reconnecting IDE session for ${podLogIdentity(pod)} via local port ${sessionCtx.localPort}" + ) + + try { + indicator?.updateProgress(WorkspaceSessionProgress.CLOSING_IDE, 0.0) + closeOldThinClient() + indicator?.let { checkProgressCanceled(it) } + reconnectWithRetries(pod, indicator) + } catch (e: Exception) { + if (e.isCancellationException()) { + abortReconnect() + throw e + } + sessionCtx.reconnecting.set(false) + } finally { + indicator.clearProgressText2Safely() + } + } + + /** + * Attempts to reconnect up to [MAX_RETRIES] times. + * Returns on success (reconnecting cleared inside), calls [onClientClosed] on permanent + * failure or exhausted retries (reconnecting cleared before the call so cleanup runs). + */ + private suspend fun reconnectWithRetries(pod: V1Pod, indicator: ProgressIndicator?) { + var lastError: Exception? = null + + for (attempt in 0 until MAX_RETRIES) { + try { + thisLogger().info( + "Pod roll: reconnect attempt ${attempt + 1}/$MAX_RETRIES — waiting for IDE on ${podLogIdentity(pod)}" + ) + indicator?.updateProgress(WorkspaceSessionProgress.WAITING_FOR_IDE, 0.5) + indicator?.setProgressText2("Attempt ${attempt + 1}/$MAX_RETRIES") + val joinLink = fetchJoinLink(pod, indicator) + thisLogger().info( + "Pod roll: IDE ready on attempt ${attempt + 1}, starting thin client on local port ${sessionCtx.localPort}" + ) + indicator?.updateProgress(WorkspaceSessionProgress.CONNECTING_TO_IDE, 1.0) + indicator?.setProgressText2(null) + val newClient = startThinClient(joinLink, sessionCtx) + onClientHandleReplaced(newClient) + + delay(RECONNECT_CLEANUP_DRAIN_DELAY) + thisLogger().info("Pod roll: IDE session reconnected successfully after ${attempt + 1} attempt(s)") + sessionCtx.reconnecting.set(false) + return + } catch (e: Exception) { + when (val decision = classifyError(e)) { + null -> throw e + else -> { + lastError = e + logReconnectAttemptFailure(attempt, e, decision.permanent) + if (decision.permanent) break + if (decision.willRetry && attempt < MAX_RETRIES - 1) { + delayRespectingProgress(indicator, RETRY_DELAY) + } + } + } + } + } + + failReconnect(lastError ?: IOException("Reconnect failed after $MAX_RETRIES attempts")) + } + + private fun abortReconnect() { + sessionCtx.reconnecting.set(false) + onClientClosed(getCurrentClient(), sessionCtx) + } + + private fun failReconnect(error: Exception) { + sessionCtx.reconnecting.set(false) + logReconnectFailure(error) + onClientClosed(getCurrentClient(), sessionCtx) + } + + private suspend fun fetchJoinLink(pod: V1Pod, indicator: ProgressIndicator?): String { + indicator?.let { checkProgressCanceled(it) } + val checkCancelled = indicator?.let { ind -> { checkProgressCanceled(ind) } } + try { + remoteIdeServer.refreshPod() + } catch (e: Exception) { + if (e.isCancellationException()) throw e + thisLogger().info( + "Pod roll: refresh failed, using rolled pod ${podLogIdentity(pod)}: ${e.message}" + ) + remoteIdeServer.setPod(pod) + } + return remoteIdeServer.awaitJoinLink(checkCancelled = checkCancelled) + } + + private data class RetryDecision( + val permanent: Boolean, + val willRetry: Boolean + ) + + private fun classifyError(e: Exception): RetryDecision? = when { + e.isCancellationException() -> null + e.isTimeoutException() -> RetryDecision(permanent = false, willRetry = true) + else -> { + val permanent = when { + e is IOException && e.message?.contains("no join link", ignoreCase = true) == true -> true + e is IllegalStateException && e.message?.contains("not ready", ignoreCase = true) == true -> true + e is ApiException && e.code in setOf(401, 403, 404) -> true + else -> false + } + RetryDecision(permanent = permanent, willRetry = !permanent) + } + } + + private suspend fun closeOldThinClient() { + try { + getCurrentClient()?.close() + } catch (e: Exception) { + if (e.isCancellationException()) throw e + thisLogger().debug("Failed to close old thin client during reconnect", e) + } + } + + private fun logReconnectAttemptFailure(attempt: Int, e: Exception, permanent: Boolean) { + val kind = when { + e.isTimeoutException() -> "timeout" + permanent -> "permanent" + else -> "transient" + } + val retryHint = if (!permanent && attempt < MAX_RETRIES - 1) ", will retry" else "" + thisLogger().warn( + "Pod roll: reconnect attempt ${attempt + 1}/$MAX_RETRIES failed ($kind): " + + "${e.javaClass.simpleName}: ${e.message}$retryHint" + ) + } + + private fun logReconnectFailure(e: Exception) { + val message = e.message ?: "Unknown error during reconnect" + if (e.isTimeoutException()) { + thisLogger().warn("Reconnect failed after $MAX_RETRIES attempts: $message") + } else { + thisLogger().warn("Reconnect failed after $MAX_RETRIES attempts: $message", e) + } + val application = ApplicationManager.getApplication() ?: return + application.invokeLater { + Dialogs.error(message, "Connection Error") + } + } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/ThinClientSessionContext.kt b/src/main/kotlin/com/redhat/devtools/gateway/ThinClientSessionContext.kt new file mode 100644 index 00000000..2ced9cb4 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/ThinClientSessionContext.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024-2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.redhat.devtools.gateway.server.RemoteIDEServer +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean + +internal data class ThinClientSessionContext( + val localPort: Int, + val remoteIdeServer: RemoteIDEServer, + var forwarder: Closeable?, + val onConnected: () -> Unit, + val onDisconnected: () -> Unit, + val onDevWorkspaceStopped: () -> Unit, + val checkCancelled: (() -> Unit)?, + val reconnecting: AtomicBoolean = AtomicBoolean(false), + /** Set when the user explicitly disconnects from Gateway; see [DevSpacesConnection.onClientClosed]. */ + val intentionalDisconnect: AtomicBoolean = AtomicBoolean(false), +) diff --git a/src/main/kotlin/com/redhat/devtools/gateway/WorkspacePodTracker.kt b/src/main/kotlin/com/redhat/devtools/gateway/WorkspacePodTracker.kt new file mode 100644 index 00000000..210c5eea --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/WorkspacePodTracker.kt @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024-2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.intellij.openapi.diagnostic.thisLogger +import com.redhat.devtools.gateway.devworkspace.DevWorkspacePatch +import com.redhat.devtools.gateway.server.RemoteIDEServer +import com.redhat.devtools.gateway.util.podIdentity +import com.redhat.devtools.gateway.util.podLogIdentity +import io.kubernetes.client.openapi.ApiException +import io.kubernetes.client.openapi.models.V1Pod +import java.io.IOException + +/** + * Tracks the workspace pod identity (UID) and detects pod rolls. + * + * ## Session recovery routing + * + * When the pod UID changes: + * + * 1. If [isWorkspaceRestartInProgress] is true ([DevWorkspacePatch.RESTART_KEY] on the DevWorkspace), + * return [PodResolution.RestartSuppressed] and do **not** invoke [onPodRoll]. + * [com.redhat.devtools.gateway.devworkspace.DevWorkspaceRestart] handles recovery. + * 2. Otherwise invoke [onPodRoll] (typically [ThinClientReconnect]) and return [PodResolution.RollDelegated]. + * + * Port-forward timing and progress are handled by [PortForwardPodResolver], not here. + * + * @param isWorkspaceRestartInProgress whether a user-initiated restart annotation is set; when true, + * pod-roll callbacks are suppressed. + */ +class WorkspacePodTracker( + private val remoteIdeServer: RemoteIDEServer, + private val isWorkspaceRestartInProgress: () -> Boolean = { false }, +) { + private var connectedPod: String? = null + + /** + * Invoked when a pod roll is detected and [isWorkspaceRestartInProgress] is false. + * Wired by [DevSpacesConnection.setupThinClientReconnect] to [ThinClientReconnect.execute]. + */ + var onPodRoll: suspend (V1Pod) -> Unit = {} + + fun seed(pod: V1Pod) { connectedPod = podIdentity(pod) } + + /** + * Refreshes the workspace pod and applies pod-roll routing (see class KDoc). + */ + suspend fun resolvePod(): PodResolution { + val pod = runCatching { remoteIdeServer.refreshPod() } + .onFailure { e -> + logPodRefreshFailure(e) + thisLogger().warn("resolvePod: workspace pod not available: ${e.message}") + } + .getOrNull() + ?: return PodResolution.Unavailable + + val identity = podIdentity(pod) + val previousIdentity = connectedPod + connectedPod = identity + + if (previousIdentity != null + && identity != null + && identity != previousIdentity) { + thisLogger().info( + "resolvePod: pod roll $previousIdentity -> $identity (${podLogIdentity(pod)}), starting reconnect" + ) + if (isWorkspaceRestartInProgress()) { + thisLogger().info( + "resolvePod: ${DevWorkspacePatch.RESTART_KEY} is set; " + + "skipping pod-roll reconnect (DevWorkspaceRestart handles this)" + ) + return PodResolution.RestartSuppressed + } + onPodRoll(pod) + return PodResolution.RollDelegated(pod) + } + + return PodResolution.Ready(pod) + } + + private fun logPodRefreshFailure(e: Throwable) { + when (e) { + is ApiException -> + thisLogger().warn("Failed to refresh workspace pod: ${e.message}", e) + is IOException if e.message?.contains("not running", ignoreCase = true) == true -> + thisLogger().info("Workspace pod not ready yet, will retry: ${e.message}") + else -> + thisLogger().info("Failed to refresh workspace pod, will retry: ${e.message}", e) + } + } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/auth/sandbox/SandboxClusterAuthProvider.kt b/src/main/kotlin/com/redhat/devtools/gateway/auth/sandbox/SandboxClusterAuthProvider.kt index 012e9af6..99062a49 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/auth/sandbox/SandboxClusterAuthProvider.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/auth/sandbox/SandboxClusterAuthProvider.kt @@ -101,16 +101,21 @@ class SandboxClusterAuthProvider( } private suspend fun requestSecret(secretName: String, namespace: String, api: CoreV1Api): V1Secret? { - repeat(30) { + repeat(SECRET_POLL_RETRY_COUNT) { val secret = api.readNamespacedSecret(secretName, namespace).execute() if (secret.data?.containsKey("token") == true) { return secret } - delay(1000.milliseconds) + delay(SECRET_POLL_RETRY_DELAY) } return null } + companion object { + const val SECRET_POLL_RETRY_COUNT: Int = 30 + val SECRET_POLL_RETRY_DELAY: kotlin.time.Duration = 1000.milliseconds + } + private fun extractToken(secret: V1Secret): String { val tokenBytes = secret.data?.get("token") ?: error("Token missing in secret") diff --git a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspacePatch.kt b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspacePatch.kt index 4424258a..6c45912c 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspacePatch.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspacePatch.kt @@ -21,8 +21,13 @@ import io.kubernetes.client.util.PatchUtils /** * Patches a specific DevWorkspace resource. * - * * annotation `che.eclipse.org/restart-in-progress`: signals that a restart of the workspace should happen - * * resource value`spec.started`: starts/stops a workspace + * ## Session recovery routing + * + * [RESTART_KEY] selects the **user-initiated restart** path ([DevWorkspaceRestart] via + * [RestartDevWorkspaceAnnotationWatch]). While set, [com.redhat.devtools.gateway.WorkspacePodTracker] + * suppresses pod-roll reconnect ([com.redhat.devtools.gateway.ThinClientReconnect]). + * + * Other annotations and `spec.started` control workspace start/stop. * * @param namespace The namespace of the DevWorkspace. * @param name The name of the DevWorkspace. @@ -44,6 +49,8 @@ class DevWorkspacePatch( companion object { private const val ANNOTATIONS_PATH = "/metadata/annotations" + + /** DevWorkspace annotation that routes recovery to [DevWorkspaceRestart] instead of pod-roll reconnect. */ const val RESTART_KEY = "che.eclipse.org/restart-in-progress" const val RESTART_VALUE = "true" } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestart.kt b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestart.kt index dd7e8fba..7eebe13d 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestart.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestart.kt @@ -23,6 +23,9 @@ import com.redhat.devtools.gateway.DevSpacesContext import com.redhat.devtools.gateway.openshift.DevWorkspacePods import com.redhat.devtools.gateway.server.RemoteIDEServer import com.redhat.devtools.gateway.util.messageWithoutPrefix +import com.redhat.devtools.gateway.util.updateProgress +import com.redhat.devtools.gateway.util.checkProgressCanceled +import com.redhat.devtools.gateway.util.WorkspaceSessionProgress import com.redhat.devtools.gateway.view.ui.Dialogs import io.kubernetes.client.openapi.models.V1Pod import kotlinx.coroutines.Dispatchers @@ -35,15 +38,27 @@ import kotlin.time.Duration.Companion.seconds private const val TIMEOUT_SECONDS_DELETE_PODS = 120 /** - * Handles the restart of a DevWorkspace when triggered by the restart annotation. + * Restarts a DevWorkspace after a **user-initiated** restart from the Remote IDE. * - * The restart process: + * ## Session recovery routing + * + * Entry point: [RestartDevWorkspaceAnnotationWatch] when [DevWorkspacePatch.RESTART_KEY] appears + * on the DevWorkspace CR. This is the annotated-restart path; it does **not** use + * [com.redhat.devtools.gateway.WorkspacePodTracker] or [com.redhat.devtools.gateway.ThinClientReconnect]. + * + * While the annotation is set, [WorkspacePodTracker] skips pod-roll reconnect so both handlers + * never run concurrently (pod rolls during stop/start are expected but ignored by the tracker). + * + * Progress title: "Restart workspace". Steps use [com.redhat.devtools.gateway.util.WorkspaceSessionProgress]. + * + * Restart sequence: * 1. Close the thin client connection * 2. Stop the workspace (spec.started = false) * 3. Wait for all pods to be deleted * 4. Start the workspace (spec.started = true) - * 5. Reconnect the IDE; then [execute] removes the restart annotation in a `finally` block - * (once per attempt, success or failure). + * 5. Reconnect the IDE via a new [DevSpacesConnection.connect] + * + * [execute] removes the restart annotation in a `finally` block (success or failure). */ class DevWorkspaceRestart( private val devSpacesContext: DevSpacesContext, @@ -86,28 +101,29 @@ class DevWorkspaceRestart( thinClient: ThinClientHandle, indicator: ProgressIndicator? ) { - indicator?.update("Closing IDE connection...", 0.0) + indicator?.updateProgress(WorkspaceSessionProgress.CLOSING_IDE, 0.0) close(thinClient) - indicator?.update("Stopping workspace...", 0.2) + indicator?.updateProgress(WorkspaceSessionProgress.STOPPING_WORKSPACE, 0.2) stopWorkspaceAndWait() - indicator?.update("Waiting for pods to terminate...", 0.4) + indicator?.updateProgress(WorkspaceSessionProgress.WAITING_PODS_TERMINATE, 0.4) waitForPodsDeleted(indicator) - indicator?.update("Starting workspace...", 0.6) + indicator?.updateProgress(WorkspaceSessionProgress.STARTING_WORKSPACE, 0.6) startWorkspaceAndWait() - indicator?.update("Waiting for IDE to be ready...", 0.8) - waitForIDEReady() + indicator?.updateProgress(WorkspaceSessionProgress.WAITING_FOR_IDE, 0.8) + waitForIDEReady(indicator) - indicator?.update("Connecting to IDE...", 1.0) + indicator?.updateProgress(WorkspaceSessionProgress.CONNECTING_TO_IDE, 1.0) connectIDE() } - private suspend fun waitForIDEReady() { + private suspend fun waitForIDEReady(indicator: ProgressIndicator?) { thisLogger().debug("Waiting for IDE server to be ready...") - createRemoteIDEServer().waitServerReady() + val checkCancelled = indicator?.let { ind -> { checkProgressCanceled(ind) } } + createRemoteIDEServer().awaitJoinLink(checkCancelled = checkCancelled) thisLogger().debug("IDE server is ready") } @@ -154,7 +170,7 @@ class DevWorkspaceRestart( private suspend fun close(thinClient: ThinClientHandle) { thisLogger().debug("Closing thin client for $namespace/$workspaceName") thinClient.close() - delay(1.seconds) // Give time for port forwarder cleanup + delay(PORT_FORWARDER_CLEANUP_WAIT) } private fun stopWorkspaceAndWait() { @@ -172,8 +188,7 @@ class DevWorkspaceRestart( thisLogger().debug("Starting workspace and waiting...") workspaces.startAndWait( namespace, - workspaceName, - 20 + workspaceName ) thisLogger().debug("Workspace started and running") } @@ -184,10 +199,10 @@ class DevWorkspaceRestart( try { withTimeout(TIMEOUT_SECONDS_DELETE_PODS.seconds) { while (true) { - indicator?.checkCanceled() + indicator?.let { checkProgressCanceled(it) } val pods = fetchPodsWithRetry(labelSelector) if (pods.isEmpty()) break - delay(2.seconds) + delay(POD_DELETION_POLL_DELAY) } } } catch (e: TimeoutCancellationException) { @@ -201,7 +216,7 @@ class DevWorkspaceRestart( } catch (e: Exception) { if (e is kotlinx.coroutines.CancellationException) throw e thisLogger().warn("Failed to list pods, retrying...", e) - delay(1.seconds) + delay(POD_FETCH_RETRY_DELAY) fetchPodsWithRetry(labelSelector) } } @@ -227,9 +242,9 @@ class DevWorkspaceRestart( } } - private fun ProgressIndicator.update(text: String, fraction: Double) { - this.fraction = fraction - this.text = text - + companion object { + val POD_DELETION_POLL_DELAY: kotlin.time.Duration = 2.seconds + val POD_FETCH_RETRY_DELAY: kotlin.time.Duration = 1.seconds + val PORT_FORWARDER_CLEANUP_WAIT: kotlin.time.Duration = 1.seconds } } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaces.kt b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaces.kt index 3680221b..0a861a38 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaces.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaces.kt @@ -36,6 +36,12 @@ val DevWorkspace.cheEditor: String return Utils.getValue(this.annotations, arrayOf("che.eclipse.org/che-editor")) as? String ?: "unknown" } +val DevWorkspace.displayName: String + get() { + val label = Utils.getValue(this.labels, arrayOf("kubernetes.io/metadata.name")) as String? + return label ?: this.name + } + class DevWorkspaces(private val client: ApiClient) { private val customApi = CustomObjectsApi(client) @@ -49,6 +55,7 @@ class DevWorkspaces(private val client: ApiClient) { const val STOPPING: String = "Stopping" const val RUNNING_TIMEOUT: Long = 300 + val PHASE_WAIT_DELAY: kotlin.time.Duration = 1.seconds } @Throws(ApiException::class) @@ -241,7 +248,7 @@ class DevWorkspaces(private val client: ApiClient) { val devWorkspace = try { DevWorkspaces(client).get(namespace, name) } catch (_: Exception) { - delay(1.seconds) + delay(PHASE_WAIT_DELAY) continue } @@ -253,7 +260,7 @@ class DevWorkspaces(private val client: ApiClient) { -> return@withTimeoutOrNull false } - delay(1.seconds) + delay(PHASE_WAIT_DELAY) } @Suppress("UNREACHABLE_CODE") @@ -277,7 +284,7 @@ class DevWorkspaces(private val client: ApiClient) { val devWorkspace = try { DevWorkspaces(client).get(namespace, name) } catch (e: Exception) { - delay(1.seconds) + delay(PHASE_WAIT_DELAY) continue } @@ -286,7 +293,7 @@ class DevWorkspaces(private val client: ApiClient) { return@withTimeoutOrNull true // phase changed out of the given set } - delay(1.seconds) + delay(PHASE_WAIT_DELAY) } @Suppress("UNREACHABLE_CODE") diff --git a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/RestartDevWorkspaceAnnotationWatch.kt b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/RestartDevWorkspaceAnnotationWatch.kt index eedcf3b9..f9840d3a 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/RestartDevWorkspaceAnnotationWatch.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/devworkspace/RestartDevWorkspaceAnnotationWatch.kt @@ -25,8 +25,16 @@ import java.util.concurrent.atomic.AtomicBoolean import kotlin.time.Duration.Companion.milliseconds /** - * Watches the DevWorkspace CR for [DevWorkspacePatch.RESTART_KEY] (set from the remote IDE). - * When the annotation appears, deletes workspace pod(s) and closes the thin client so the user can reconnect. + * Watches the DevWorkspace CR for [DevWorkspacePatch.RESTART_KEY] (set from the Remote IDE). + * + * ## Session recovery routing + * + * When the annotation appears, invokes the registered callback (typically + * [DevWorkspaceRestart.execute]). This starts the **user-initiated restart** path. + * + * Unplanned pod rolls without this annotation are handled separately by + * [com.redhat.devtools.gateway.WorkspacePodTracker] and + * [com.redhat.devtools.gateway.ThinClientReconnect]. */ class RestartDevWorkspaceAnnotationWatch( private val onIsAnnotated: () -> Job, @@ -35,6 +43,12 @@ class RestartDevWorkspaceAnnotationWatch( private val workspaceName: String, private val dispatcher: CoroutineDispatcher = Dispatchers.IO ) { + + companion object { + val WATCHER_RECONNECT_DELAY: kotlin.time.Duration = 100.milliseconds + val WATCHER_CREATE_FAIL_DELAY: kotlin.time.Duration = 1000.milliseconds + } + private val devWorkspaces = DevWorkspaces(client) private val restartAnnotationPending = AtomicBoolean(false) @@ -43,7 +57,7 @@ class RestartDevWorkspaceAnnotationWatch( return scope.launch(dispatcher) { while (isActive) { val watcher = createWatcher(namespace, fieldSelector) ?: run { - delay(1000.milliseconds) + delay(WATCHER_CREATE_FAIL_DELAY) continue } try { @@ -78,7 +92,7 @@ class RestartDevWorkspaceAnnotationWatch( } finally { runCatching { watcher.close() } } - delay(100.milliseconds) + delay(WATCHER_RECONNECT_DELAY) } } } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePods.kt b/src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePods.kt index 1c2a026b..3da1be62 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePods.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePods.kt @@ -13,28 +13,45 @@ package com.redhat.devtools.gateway.openshift import com.intellij.openapi.diagnostic.logger import com.redhat.devtools.gateway.util.isCancellationException +import com.redhat.devtools.gateway.util.isTimeoutException +import com.redhat.devtools.gateway.util.podLogIdentity import io.kubernetes.client.PortForward import io.kubernetes.client.openapi.ApiClient import io.kubernetes.client.openapi.ApiException import io.kubernetes.client.openapi.apis.CoreV1Api import io.kubernetes.client.openapi.models.V1Pod import io.kubernetes.client.openapi.models.V1PodList +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.* import java.io.Closeable import java.io.IOException import java.io.InputStream import java.io.OutputStream import java.net.* +import java.time.OffsetDateTime import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.time.Duration.Companion.seconds +/** + * Result of resolving the workspace pod for port-forwarding. + * + * @param pod the pod to forward to, or null while waiting + * @param retryDelaySeconds delay before the next resolve attempt when [pod] is null + */ +data class PodForwardResolution( + val pod: V1Pod?, + val retryDelaySeconds: Long = DevWorkspacePods.DEFAULT_RECONNECT_DELAY_SECONDS, +) + class DevWorkspacePods(private val client: ApiClient) { companion object { const val WORKSPACE_LABEL_KEY = "controller.devfile.io/devworkspace_name" - private const val CONNECT_ATTEMPTS = 5 - private const val RECONNECT_DELAY: Long = 1000 + const val DEFAULT_RECONNECT_DELAY_SECONDS: Long = 3 + private const val FORWARD_READY_RETRY_COUNT: Int = 30 + private const val FORWARD_READY_RETRY_DELAY: Long = 100L // milliseconds + private val POD_DELETION_WAIT_DELAY: kotlin.time.Duration = 4.seconds } private val logger = logger() @@ -135,13 +152,26 @@ class DevWorkspacePods(private val client: ApiClient) { ApiClientUtils.cloneForExec(base) @Throws(IOException::class) - fun forward(pod: V1Pod, localPort: Int, remotePort: Int): Closeable { + fun forward( + resolvePod: suspend () -> PodForwardResolution, + localPort: Int, + remotePort: Int, + reconnectTimeoutSeconds: Long, + reconnectDelaySeconds: Long = DEFAULT_RECONNECT_DELAY_SECONDS, + ): Closeable { val serverSocket = ServerSocket(localPort, 50, InetAddress.getLoopbackAddress()) val scope = CoroutineScope( // dont cancel if child coroutine fails + use blocking I/O scope SupervisorJob() + Dispatchers.IO ) - scope.acceptConnections(serverSocket, pod, localPort, remotePort) + scope.acceptConnections( + serverSocket, + resolvePod, + localPort, + remotePort, + reconnectTimeoutSeconds, + reconnectDelaySeconds, + ) return Closeable { runCatching { serverSocket.close() } scope.cancel() @@ -150,9 +180,11 @@ class DevWorkspacePods(private val client: ApiClient) { private fun CoroutineScope.acceptConnections( serverSocket: ServerSocket, - pod: V1Pod, + resolvePod: suspend () -> PodForwardResolution, localPort: Int, - remotePort: Int + remotePort: Int, + reconnectTimeout: Long, + reconnectDelaySeconds: Long, ) { launch { logger.info("Starting port forward on local port $localPort...") @@ -163,9 +195,11 @@ class DevWorkspacePods(private val client: ApiClient) { launch { handleConnection( clientSocket, - pod, + resolvePod, localPort, - remotePort + remotePort, + reconnectTimeout, + reconnectDelaySeconds, ) } } @@ -186,39 +220,73 @@ class DevWorkspacePods(private val client: ApiClient) { private suspend fun CoroutineScope.handleConnection( clientSocket: Socket, - pod: V1Pod, + resolvePod: suspend () -> PodForwardResolution, localPort: Int, - remotePort: Int + remotePort: Int, + reconnectTimeout: Long, + reconnectDelaySeconds: Long, ) { + var lastPod: V1Pod? = null + var lastForwardError: String? = null try { - repeat(CONNECT_ATTEMPTS) { attempt -> - if (!isActive) return@repeat + withTimeout(reconnectTimeout.seconds) { + while (isActive) { + if (!clientSocket.isConnected + || clientSocket.isClosed) { + return@withTimeout + } - var forwardResult: PortForward.PortForwardResult? = null - try { - logger.info("Attempt #${attempt + 1}: Connecting $localPort -> $remotePort...") - val portForward = PortForward(client) - forwardResult = portForward.forward(pod, listOf(remotePort)) - logger.info("forward successful: $localPort -> $remotePort...") - copyStreams(clientSocket, forwardResult, remotePort) - return - } catch (e: Exception) { - if (e.isCancellationException()) throw e - logger.info( - "Could not port forward $localPort -> $remotePort: ${e.message}. Retrying in ${RECONNECT_DELAY}ms..." - ) - if (isActive) { - delay(RECONNECT_DELAY) + val resolution = resolvePod() + val pod = resolution.pod + if (pod == null) { + val delaySeconds = resolution.retryDelaySeconds + logger.warn( + "Port forward $localPort -> $remotePort: no workspace pod yet, " + + "retrying in ${delaySeconds}s" + ) + delay(delaySeconds.seconds) + continue + } + lastPod = pod + + var forwardResult: PortForward.PortForwardResult? = null + try { + logger.info( + "Port forward $localPort -> $remotePort: connecting to ${podLogIdentity(pod)}" + ) + val portForward = PortForward(client) + forwardResult = portForward.forward(pod, listOf(remotePort)) + logger.info("Port forward $localPort -> $remotePort: established to ${podLogIdentity(pod)}") + copyStreams(clientSocket, forwardResult, remotePort) + return@withTimeout + } catch (e: Exception) { + if (e.isCancellationException() + || e.isTimeoutException()) { + throw e + } + lastForwardError = "${e.javaClass.simpleName}: ${e.message}" + logger.info( + "Port forward $localPort -> $remotePort: connecting to ${podLogIdentity(pod)}" + ) + delay(reconnectDelaySeconds.seconds) + } finally { + closeStreams(remotePort, forwardResult) } - } finally { - closeStreams(remotePort, forwardResult) } } - } catch(e: Exception) { + } catch (_: TimeoutCancellationException) { + val podInfo = lastPod?.let { podLogIdentity(it) } ?: "no pod resolved" + val errorInfo = lastForwardError?.let { ", last error: $it" } ?: "" + logger.warn( + "Port forward $localPort -> $remotePort timed out after ${reconnectTimeout}s " + + "(last pod: $podInfo$errorInfo)" + ) + } catch (e: Exception) { if (e.isCancellationException()) throw e logger.warn( - "Could not port forward to pod ${pod.metadata?.name} using port $localPort -> $remotePort", - e) + "Could not port forward using port $localPort -> $remotePort", + e + ) } finally { runCatching { clientSocket.close() } } @@ -268,34 +336,43 @@ class DevWorkspacePods(private val client: ApiClient) { @Throws(IOException::class) fun waitForForwardReady(port: Int) { - val maxRetries = 30 - val retryDelay = 100L - - repeat(maxRetries) { attempt -> + repeat(FORWARD_READY_RETRY_COUNT) { attempt -> try { val testSocket = ServerSocket() testSocket.bind(InetSocketAddress("127.0.0.1", port)) testSocket.close() // If we can bind to the port, it means port forwarding is not ready yet - Thread.sleep(retryDelay) + Thread.sleep(FORWARD_READY_RETRY_DELAY) } catch (_: BindException) { // Port is already in use, which means port forwarding is ready return } catch (e: Exception) { - if (attempt == maxRetries - 1) { - throw IOException("Port forwarding to local port $port is not ready after ${maxRetries * retryDelay}ms", e) + if (attempt == FORWARD_READY_RETRY_COUNT - 1) { + throw IOException("Port forwarding to local port $port is not ready after ${FORWARD_READY_RETRY_COUNT * FORWARD_READY_RETRY_DELAY}ms", e) } - Thread.sleep(retryDelay) + Thread.sleep(FORWARD_READY_RETRY_DELAY) } } - throw IOException("Port forwarding to local port $port is not ready after ${maxRetries * retryDelay}ms") + throw IOException("Port forwarding to local port $port is not ready after ${FORWARD_READY_RETRY_COUNT * FORWARD_READY_RETRY_DELAY}ms") } @Throws(ApiException::class) - fun findFirst(namespace: String, labelSelector: String): V1Pod? { + fun findFirstRunning(namespace: String, labelSelector: String): V1Pod? { val pods = list(namespace, labelSelector) - return pods.items[0] + return pods.items + .filter { isRunningAndReady(it) } + .maxByOrNull { it.metadata?.creationTimestamp ?: OffsetDateTime.MIN } + } + + private fun isRunningAndReady(pod: V1Pod): Boolean { + if (pod.metadata?.deletionTimestamp != null + || pod.status?.phase != "Running") { + return false + } + return pod.status?.conditions?.any { condition -> + condition.type == "Ready" && condition.status == "True" + } == true } @Throws(ApiException::class) @@ -333,7 +410,7 @@ class DevWorkspacePods(private val client: ApiClient) { } catch (e: Exception) { if (e.isCancellationException()) throw e logger.info("Error listing pods for $namespace/$workspaceName: ${e.message}") - delay(1.seconds) + delay(POD_DELETION_WAIT_DELAY) continue } @@ -344,7 +421,7 @@ class DevWorkspacePods(private val client: ApiClient) { } logger.debug("Still waiting for ${pods.items.size} pod(s) to be deleted for $namespace/$workspaceName") - delay(1.seconds) + delay(POD_DELETION_WAIT_DELAY) } @Suppress("UNREACHABLE_CODE") diff --git a/src/main/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServer.kt b/src/main/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServer.kt index d66ad3fc..6f0ddd7a 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServer.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServer.kt @@ -28,25 +28,61 @@ import kotlin.time.Duration.Companion.seconds * Represent an IDE server running in a CDE. */ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) { - var pod: V1Pod - private var container: V1Container + + private var cachedPod: V1Pod? = null companion object { - var readyTimeout: Long = 60 // seconds + const val READY_TIMEOUT: Long = 120 // seconds + /** Per-probe exec budget while polling; must be well below [READY_TIMEOUT]. */ + const val STATUS_EXEC_TIMEOUT: Long = 15 // seconds + val DRAIN_DELAY: kotlin.time.Duration = 500.milliseconds } - init { - pod = findPod() - container = findContainer() + /** + * Returns the cached workspace pod, or fetches it from the cluster if not yet cached. + */ + @Throws(IOException::class) + fun getPod(): V1Pod { + cachedPod?.let { return it } + return refreshPod() } /** - * Asks the CDE for the remote IDE server status. + * Sets the given pod. + * + * @see [getPod] + * @see [refreshPod] */ - @Throws(CancellationException::class) - suspend fun getStatus(checkCancelled: (() -> Unit)? = null): RemoteIDEServerStatus = + fun setPod(pod: V1Pod) { + cachedPod = pod + } + + /** + * Re-queries the cluster for the workspace pod and updates the cache. + */ + @Throws(IOException::class) + fun refreshPod(): V1Pod = fetchPod().also { cachedPod = it } + + /** + * Returns the IDE container from the workspace pod. + */ + @Throws(IOException::class) + fun getContainer(): V1Container = findContainer(getPod()) + + /** + * Fetches the workspace pod and IDE container, then asks the CDE for the remote IDE server status. + * + * @param execTimeoutSeconds max seconds to wait for a single status exec probe + */ + @Throws(CancellationException::class, IOException::class) + suspend fun fetchStatus( + checkCancelled: (() -> Unit)? = null, + execTimeoutSeconds: Long = STATUS_EXEC_TIMEOUT, + ): RemoteIDEServerStatus = withContext(Dispatchers.IO) { checkCancelled?.invoke() + val pod = getPod() + val container = findContainer(pod) val output = DevWorkspacePods(devSpacesContext.client).exec( pod = pod, container = container.name, @@ -61,7 +97,7 @@ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) { "-c", "/idea-server/bin/remote-dev-server.sh status \$PROJECT_SOURCE | awk '/STATUS:/{p=1; next} p'" ), - timeout = readyTimeout + timeout = execTimeoutSeconds, ).trim() checkCancelled?.invoke() @@ -78,22 +114,52 @@ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) { * @return True if the server is ready within the timeout, False otherwise. */ @Throws(IOException::class) - suspend fun waitServerReady(checkCancelled: (() -> Unit)? = null, timeout: Long = readyTimeout): Boolean { + suspend fun waitServerReady(checkCancelled: (() -> Unit)? = null, timeout: Long = READY_TIMEOUT): Boolean { return doWaitServerState(true, timeout, checkCancelled) .also { if (!it) throw IOException( - "Workspace IDE is not ready after $readyTimeout seconds.", + "Workspace IDE is not ready after $timeout seconds.", ) } } + /** + * Waits for the IDE server to become ready and returns its join link. + * + * Shared by pod-roll reconnect ([com.redhat.devtools.gateway.ThinClientReconnect]) and + * annotated restart ([com.redhat.devtools.gateway.devworkspace.DevWorkspaceRestart]). + * + * @param pod when non-null, seeds the pod cache before waiting (pod-roll path passes the rolled pod) + * @param checkCancelled optional progress cancellation check + * @param timeout maximum wait for IDE readiness in seconds + */ + @Throws(IOException::class, CancellationException::class) + suspend fun awaitJoinLink( + pod: V1Pod? = null, + checkCancelled: (() -> Unit)? = null, + timeout: Long = READY_TIMEOUT, + ): String { + pod?.let { setPod(it) } + waitServerReady(checkCancelled = checkCancelled, timeout = timeout) + return fetchStatus(checkCancelled = checkCancelled).joinLink + ?: throw IOException("no join link") + } + @Throws(CancellationException::class) private suspend fun isServerState( isReadyState: Boolean, - checkCancelled: (() -> Unit)? = null + checkCancelled: (() -> Unit)? = null, + refreshPodBeforeCheck: Boolean = false, ): Boolean { return try { - getStatus(checkCancelled).isReady == isReadyState + if (refreshPodBeforeCheck) { + runCatching { refreshPod() }.onFailure { e -> + if (e.isCancellationException()) throw e + thisLogger().debug("Failed to refresh workspace pod during IDE state check", e) + return false + } + } + fetchStatus(checkCancelled).isReady == isReadyState } catch (e: Exception) { if (e.isCancellationException()) throw e thisLogger().debug("Failed to check workspace IDE state.", e) @@ -116,39 +182,44 @@ class RemoteIDEServer(private val devSpacesContext: DevSpacesContext) { @Throws(IOException::class, CancellationException::class) private suspend fun doWaitServerState( isReadyState: Boolean, - timeout: Long = readyTimeout, + timeout: Long = READY_TIMEOUT, checkCancelled: (() -> Unit)? = null ): Boolean = - withTimeoutOrNull(timeout.seconds) { // seconds → ms + withTimeoutOrNull(timeout.seconds) { while (true) { checkCancelled?.invoke() - if (isServerState(isReadyState, checkCancelled)) { + if (isServerState( + isReadyState, + checkCancelled, + refreshPodBeforeCheck = isReadyState, + )) { return@withTimeoutOrNull true } yield() - delay(500.milliseconds) + delay(DRAIN_DELAY) } @Suppress("UNREACHABLE_CODE") false } ?: false - @Throws(IOException::class) - private fun findPod(): V1Pod { - val selector = "controller.devfile.io/devworkspace_name=${devSpacesContext.devWorkspace.name}" + private fun labelSelector(): String = + "${DevWorkspacePods.WORKSPACE_LABEL_KEY}=${devSpacesContext.devWorkspace.name}" + @Throws(IOException::class) + private fun fetchPod(): V1Pod { return DevWorkspacePods(devSpacesContext.client) - .findFirst( + .findFirstRunning( devSpacesContext.devWorkspace.namespace, - selector + labelSelector() ) ?: throw IOException( "DevWorkspace '${devSpacesContext.devWorkspace.name}' is not running.", ) } @Throws(IOException::class) - private fun findContainer(): V1Container { + private fun findContainer(pod: V1Pod): V1Container { return pod.spec!!.containers.find { container -> container.ports?.any { port -> port.name == "idea-server" } != null } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/ExceptionUtils.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/ExceptionUtils.kt index cf371292..6e326845 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/util/ExceptionUtils.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/ExceptionUtils.kt @@ -11,6 +11,7 @@ */ package com.redhat.devtools.gateway.util +import com.intellij.openapi.progress.ProcessCanceledException import com.redhat.devtools.gateway.auth.session.SsoLoginException import kotlinx.coroutines.TimeoutCancellationException import java.util.concurrent.CancellationException @@ -23,7 +24,9 @@ fun Throwable.messageWithoutPrefix(): String? { fun Throwable.isTimeoutException(): Boolean = (this is TimeoutCancellationException || this is TimeoutException ) -fun Throwable.isCancellationException(): Boolean = (this is CancellationException && !isTimeoutException() ) +fun Throwable.isCancellationException(): Boolean = + this is ProcessCanceledException + || (this is CancellationException && !isTimeoutException()) fun Throwable.isLoginUserCancelled(): Boolean = generateSequence(this) { it.cause }.any { it is SsoLoginException.Cancelled } diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/NetworkUtils.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/NetworkUtils.kt new file mode 100644 index 00000000..91b72945 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/NetworkUtils.kt @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import java.net.ServerSocket + +fun findFreePort(): Int { + ServerSocket(0).use { socket -> + socket.reuseAddress = true + return socket.localPort + } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/PodIdentity.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/PodIdentity.kt new file mode 100644 index 00000000..b7e058e9 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/PodIdentity.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import io.kubernetes.client.openapi.models.V1Pod + +/** + * Returns a human-readable description of a pod for log messages. + * + * Format: ` (uid=[, terminating])` + * + * @param pod the pod to describe + * @return a formatted string like `"my-pod (uid=abc-123)"` or `"my-pod (uid=abc-123, terminating)"` + * if the pod has a deletion timestamp. Falls back to `"unknown"` for name or uid if missing. + */ +fun podLogIdentity(pod: V1Pod): String { + val namespace = pod.metadata?.namespace + val name = pod.metadata?.name ?: "unknown" + val label = if (namespace != null) "$namespace/$name" else name + val uid = pod.metadata?.uid ?: "unknown" + val terminating = if (pod.metadata?.deletionTimestamp != null) ", terminating" else "" + return "$label (uid=$uid$terminating)" +} + +/** + * Returns a stable identity key for a pod, used to detect pod rolls. + * + * Prefers the pod's UID as the identity, falling back to the pod's name if + * the UID is null. Returns null only if both UID and name are null. + * + * @param pod the pod to derive an identity from + * @return the pod's UID, its name if UID is null, or null if both are absent + */ +fun podIdentity(pod: V1Pod): String? { + val uid = pod.metadata?.uid + if (uid != null) return uid + val namespace = pod.metadata?.namespace + val name = pod.metadata?.name + return if (name != null) { + if (namespace != null) "$namespace/$name" else name + } else null +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCancellation.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCancellation.kt new file mode 100644 index 00000000..f24aa655 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCancellation.kt @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import com.intellij.openapi.progress.ProcessCanceledException +import com.intellij.openapi.progress.ProgressIndicator +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +private val INDICATOR_POLL_DELAY = 500.milliseconds + +/** + * Polls [indicator] and runs [cancelAction] when the user cancels the operation. + */ +fun CoroutineScope.launchProgressCancelWatcher( + indicator: ProgressIndicator, + cancelAction: suspend () -> Unit, +): Job = launch(Dispatchers.Default) { + while (isActive) { + if (indicator.isCanceled) { + cancelAction() + return@launch + } + delay(INDICATOR_POLL_DELAY) + } +} + +/** + * Runs [block] while polling [indicator] for cancellation. + * The watcher is stopped when [block] completes so the caller's [coroutineScope] does not hang. + */ +suspend fun withProgressCancelWatcher( + indicator: ProgressIndicator, + cancelAction: suspend () -> Unit, + block: suspend CoroutineScope.() -> T, +): T = coroutineScope { + val cancelJob = launchProgressCancelWatcher(indicator, cancelAction) + try { + block() + } finally { + cancelJob.cancelAndJoin() + } +} + +fun checkProgressCanceled(indicator: ProgressIndicator) { + if (indicator.isCanceled || !indicator.isRunning) { + throw ProcessCanceledException() + } +} + +/** + * Updates progress text and fraction. + * Throws [ProcessCanceledException] when the indicator is canceled, stopped, or disposed. + */ +fun ProgressIndicator.updateProgress(text: String, fraction: Double) { + checkProgressCanceled(this) + writeToIndicator { + this.fraction = fraction + this.text = text + } +} + +/** + * Updates secondary progress text. + * Throws [ProcessCanceledException] when the indicator is canceled, stopped, or disposed. + */ +fun ProgressIndicator.setProgressText2(text: String?) { + checkProgressCanceled(this) + writeToIndicator { + this.text2 = text + } +} + +private inline fun ProgressIndicator.writeToIndicator(block: ProgressIndicator.() -> Unit) { + try { + block() + } catch (e: IllegalArgumentException) { + throw ProcessCanceledException(e) + } catch (e: IllegalStateException) { + throw ProcessCanceledException(e) + } +} + +/** Clears [ProgressIndicator.text2]; ignores errors when the indicator is already disposed. */ +fun ProgressIndicator?.clearProgressText2Safely() { + runCatching { this?.text2 = null } +} + +/** + * Sleeps for [duration], polling [indicator] for cancellation or disposal. + */ +suspend fun delayRespectingProgress(indicator: ProgressIndicator?, duration: Duration) { + if (indicator == null) { + delay(duration) + return + } + var remaining = duration + while (remaining > Duration.ZERO) { + checkProgressCanceled(indicator) + val step = minOf(INDICATOR_POLL_DELAY, remaining) + delay(step) + remaining -= step + } +} + +/** + * Runs work on IO and honors progress cancellation before and after [block]. + */ +suspend fun withProgressCancellation( + indicator: ProgressIndicator, + block: suspend () -> T, +): T { + checkProgressCanceled(indicator) + return withContext(Dispatchers.IO) { + block() + }.also { checkProgressCanceled(indicator) } +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCountdown.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCountdown.kt index aa6ddcdd..a546fa84 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCountdown.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/ProgressCountdown.kt @@ -23,6 +23,7 @@ class ProgressCountdown(private val delegate: ProgressIndicator) : ProgressIndic companion object { const val EN_DASH = "\u2013" + private val POLLING_DELAY: kotlin.time.Duration = 1.seconds } fun update( @@ -46,7 +47,7 @@ class ProgressCountdown(private val delegate: ProgressIndicator) : ProgressIndic job = scope.launch { while (remaining > 0 && isActive && !delegate.isCanceled) { delegate.text2 = buildText2WithSuffix(remaining) - delay(1.seconds) + delay(POLLING_DELAY) remaining-- } delegate.text2 = baseText2 diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/ProjectUtils.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/ProjectUtils.kt new file mode 100644 index 00000000..1896b452 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/ProjectUtils.kt @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import com.intellij.openapi.application.ApplicationManager +import com.intellij.openapi.application.ModalityState +import com.intellij.openapi.project.ex.ProjectManagerEx + +fun closeAllProjects() { + ApplicationManager.getApplication().invokeLater( + { + val pm = ProjectManagerEx.getInstanceEx() + for (project in pm.openProjects.toList()) { + if (!project.isDisposed) { + pm.closeAndDispose(project) + } + } + }, + ModalityState.nonModal() + ) +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/util/WorkspaceSessionProgress.kt b/src/main/kotlin/com/redhat/devtools/gateway/util/WorkspaceSessionProgress.kt new file mode 100644 index 00000000..b1ecadb5 --- /dev/null +++ b/src/main/kotlin/com/redhat/devtools/gateway/util/WorkspaceSessionProgress.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +/** + * Shared progress step labels for workspace session recovery. + * + * Used by both recovery paths: + * - **Pod-roll reconnect** ([com.redhat.devtools.gateway.ThinClientReconnect]): [CLOSING_IDE], + * [WAITING_FOR_IDE], [CONNECTING_TO_IDE] + * - **Annotated restart** ([com.redhat.devtools.gateway.devworkspace.DevWorkspaceRestart]): all + * constants including [STOPPING_WORKSPACE], [WAITING_PODS_TERMINATE], [STARTING_WORKSPACE] + * - **Delayed port-forward recovery** ([ForwardRecoveryProgress] via [PortForwardPodResolver]): + * [WAITING_FOR_POD] + */ +internal object WorkspaceSessionProgress { + const val WAITING_FOR_POD: String = "Waiting for workspace pod..." + const val CLOSING_IDE: String = "Closing IDE connection..." + const val WAITING_FOR_IDE: String = "Waiting for IDE to be ready..." + const val CONNECTING_TO_IDE: String = "Connecting to IDE..." + const val STOPPING_WORKSPACE: String = "Stopping workspace..." + const val WAITING_PODS_TERMINATE: String = "Waiting for pods to terminate..." + const val STARTING_WORKSPACE: String = "Starting workspace..." +} diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesServerStepView.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesServerStepView.kt index 8397f9e3..a7632834 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesServerStepView.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesServerStepView.kt @@ -45,6 +45,7 @@ import com.redhat.devtools.gateway.view.ui.PasteClipboardMenu import com.redhat.devtools.gateway.view.ui.requestInitialFocus import com.redhat.devtools.gateway.util.isLoginUserCancelled import com.redhat.devtools.gateway.util.stripScheme +import com.redhat.devtools.gateway.util.withProgressCancellation import kotlinx.coroutines.* import java.awt.event.ItemEvent import java.awt.event.KeyAdapter @@ -516,7 +517,7 @@ class DevSpacesServerStepView( try { indicator.text = "Updating Kube config..." - withContext(Dispatchers.IO) { + withProgressCancellation(indicator) { KubeConfigUpdate .create( cluster.name.trim(), @@ -524,7 +525,6 @@ class DevSpacesServerStepView( token.trim()) .apply() } - } catch (e: Exception) { thisLogger().warn(e.message ?: "Could not save configuration file", e) withContext(Dispatchers.Main) { @@ -541,7 +541,7 @@ class DevSpacesServerStepView( try { indicator.text = "Updating Kube config..." - withContext(Dispatchers.IO) { + withProgressCancellation(indicator) { KubeConfigUpdate .create( cluster.name.trim(), diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesWorkspacesStepView.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesWorkspacesStepView.kt index 98990592..aa95b3d1 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesWorkspacesStepView.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/DevSpacesWorkspacesStepView.kt @@ -312,7 +312,7 @@ class DevSpacesWorkspacesStepView( val remoteIdeServer = RemoteIDEServer(devSpacesContext) status = runBlocking { remoteIdeServer.waitServerReady(checkCancelled) - remoteIdeServer.getStatus() + remoteIdeServer.fetchStatus(checkCancelled) } } catch (e: Exception) { if (e.isCancellationException()) { diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/AbstractAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/AbstractAuthenticationStrategy.kt index 48b7fa65..7b164a54 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/AbstractAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/AbstractAuthenticationStrategy.kt @@ -21,13 +21,6 @@ import com.redhat.devtools.gateway.openshift.Projects import com.redhat.devtools.gateway.openshift.codeToReasonPhrase import io.kubernetes.client.openapi.ApiClient import io.kubernetes.client.openapi.ApiException -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job -import kotlinx.coroutines.delay -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlin.time.Duration.Companion.milliseconds /** * Abstract base class for authentication strategies. @@ -47,23 +40,6 @@ abstract class AbstractAuthenticationStrategy( override fun isDirty(saved: Cluster): Boolean = false - /** - * Starts a cancellation watcher that polls the progress indicator - * and cancels the given action when the user cancels the operation. - */ - protected fun CoroutineScope.launchCancelWatcher( - indicator: ProgressIndicator, - cancelAction: suspend () -> Unit - ): Job = launch(Dispatchers.Default) { - while (isActive) { - if (indicator.isCanceled) { - cancelAction() - return@launch - } - delay(500.milliseconds) - } - } - /** * Creates a validated API client. */ diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/ClientCertificateAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/ClientCertificateAuthenticationStrategy.kt index 450589d2..99f5d3d1 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/ClientCertificateAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/ClientCertificateAuthenticationStrategy.kt @@ -21,6 +21,7 @@ import com.redhat.devtools.gateway.view.ui.PasteClipboardMenu import com.redhat.devtools.gateway.auth.tls.browseCertificate import com.redhat.devtools.gateway.DevSpacesContext import com.redhat.devtools.gateway.auth.tls.TlsContext +import com.redhat.devtools.gateway.util.withProgressCancellation import com.redhat.devtools.gateway.openshift.Cluster import javax.swing.JPanel @@ -86,15 +87,17 @@ class ClientCertificateAuthenticationStrategy( val clientCert = tfClientCert.text val clientKey = tfClientKey.text - val client = createValidatedApiClient( - server, - certAuthority, - null, - clientCert, - clientKey, - tlsContext, - "Authentication failed: invalid client certificate or key." - ) + val client = withProgressCancellation(indicator) { + createValidatedApiClient( + server, + certAuthority, + null, + clientCert, + clientKey, + tlsContext, + "Authentication failed: invalid client certificate or key." + ) + } saveKubeconfigWithCert(selectedCluster, clientCert, clientKey, indicator) devSpacesContext.client = client diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftCredentialsAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftCredentialsAuthenticationStrategy.kt index 0228c517..c1f197aa 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftCredentialsAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftCredentialsAuthenticationStrategy.kt @@ -27,6 +27,7 @@ import com.redhat.devtools.gateway.auth.code.AuthTokenKind import com.redhat.devtools.gateway.auth.code.TokenModel import com.redhat.devtools.gateway.auth.session.OpenShiftAuthSessionManager import com.redhat.devtools.gateway.auth.tls.TlsContext +import com.redhat.devtools.gateway.util.withProgressCancellation import com.redhat.devtools.gateway.openshift.Cluster import com.redhat.devtools.gateway.view.ui.PasswordFieldWithToggle import javax.swing.JComponent @@ -116,12 +117,14 @@ class OpenShiftCredentialsAuthenticationStrategy( val sessionManager = OpenShiftAuthSessionManager() - val osToken = sessionManager.loginWithCredentials( - apiServerUrl = selectedCluster.url, - username = username, - password = password, - tlsContext.sslContext - ) + val osToken = withProgressCancellation(indicator) { + sessionManager.loginWithCredentials( + apiServerUrl = selectedCluster.url, + username = username, + password = password, + tlsContext.sslContext, + ) + } val finalToken = TokenModel( accessToken = osToken.accessToken, @@ -133,15 +136,17 @@ class OpenShiftCredentialsAuthenticationStrategy( indicator.text = "Validating cluster access..." - val client = createValidatedApiClient( - server, - certAuthority, - finalToken.accessToken, - null, - null, - tlsContext, - "Authentication failed: invalid OpenShift credentials." - ) + val client = withProgressCancellation(indicator) { + createValidatedApiClient( + server, + certAuthority, + finalToken.accessToken, + null, + null, + tlsContext, + "Authentication failed: invalid OpenShift credentials." + ) + } setTokenDisplay(finalToken.accessToken) saveKubeconfig(selectedCluster, finalToken.accessToken, indicator) diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftOAuthAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftOAuthAuthenticationStrategy.kt index eeab88a4..ec4ef148 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftOAuthAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/OpenShiftOAuthAuthenticationStrategy.kt @@ -22,7 +22,12 @@ import com.redhat.devtools.gateway.auth.session.AbstractAuthSessionManager import com.redhat.devtools.gateway.auth.session.OpenShiftAuthSessionManager import com.redhat.devtools.gateway.auth.tls.TlsContext import com.redhat.devtools.gateway.openshift.Cluster -import kotlinx.coroutines.* +import com.redhat.devtools.gateway.util.withProgressCancellation +import com.redhat.devtools.gateway.util.withProgressCancelWatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.withContext import javax.swing.JPanel /** @@ -70,9 +75,10 @@ class OpenShiftOAuthAuthenticationStrategy( indicator.text = "Waiting for you to complete login in your browser..." currentCoroutineContext().ensureActive() - coroutineScope { - launchCancelWatcher(indicator) { login.cancel() } - + withProgressCancelWatcher( + indicator, + cancelAction = { login.cancel() }, + ) { indicator.text = "Obtaining OpenShift access..." val osToken = login.awaitResult(AbstractAuthSessionManager.LOGIN_TIMEOUT_MS) @@ -85,15 +91,17 @@ class OpenShiftOAuthAuthenticationStrategy( ) indicator.text = "Validating cluster access..." - val client = createValidatedApiClient( - server, - certAuthority, - finalToken.accessToken, - null, - null, - tlsContext, - "Authentication failed: token received from OpenShift Authenticator is invalid or expired." - ) + val client = withProgressCancellation(indicator) { + createValidatedApiClient( + server, + certAuthority, + finalToken.accessToken, + null, + null, + tlsContext, + "Authentication failed: token received from OpenShift Authenticator is invalid or expired." + ) + } setTokenDisplay(finalToken.accessToken) saveKubeconfig(selectedCluster, finalToken.accessToken, indicator) diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/RedHatSSOAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/RedHatSSOAuthenticationStrategy.kt index 94a91dbf..f2c33d28 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/RedHatSSOAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/RedHatSSOAuthenticationStrategy.kt @@ -22,7 +22,12 @@ import com.redhat.devtools.gateway.auth.session.AbstractAuthSessionManager import com.redhat.devtools.gateway.auth.session.RedHatAuthSessionManager import com.redhat.devtools.gateway.auth.tls.TlsContext import com.redhat.devtools.gateway.openshift.Cluster -import kotlinx.coroutines.* +import com.redhat.devtools.gateway.util.withProgressCancellation +import com.redhat.devtools.gateway.util.withProgressCancelWatcher +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.withContext import javax.swing.JPanel /** @@ -66,26 +71,31 @@ class RedHatSSOAuthenticationStrategy( indicator.text = "Waiting for you to complete login in your browser..." currentCoroutineContext().ensureActive() - coroutineScope { - launchCancelWatcher(indicator) { login.cancel() } - + withProgressCancelWatcher( + indicator, + cancelAction = { login.cancel() }, + ) { val ssoToken = login.awaitResult(AbstractAuthSessionManager.LOGIN_TIMEOUT_MS) indicator.text = "Obtaining OpenShift access..." val sandboxAuth = SandboxClusterAuthProvider() - val finalToken = sandboxAuth.authenticate(ssoToken) + val finalToken = withProgressCancellation(indicator) { + sandboxAuth.authenticate(ssoToken) + } indicator.text = "Validating cluster access..." try { - val client = createValidatedApiClient( - server, certAuthority, - finalToken.accessToken, - null, - null, - tlsContext, - "Authentication failed: Red Hat SSO token is invalid or unauthorized for this cluster." - ) + val client = withProgressCancellation(indicator) { + createValidatedApiClient( + server, certAuthority, + finalToken.accessToken, + null, + null, + tlsContext, + "Authentication failed: Red Hat SSO token is invalid or unauthorized for this cluster." + ) + } // Do not save SSO tokens if (finalToken.kind == AuthTokenKind.PIPELINE) { diff --git a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/TokenAuthenticationStrategy.kt b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/TokenAuthenticationStrategy.kt index 68fc6980..dfbe0b9c 100644 --- a/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/TokenAuthenticationStrategy.kt +++ b/src/main/kotlin/com/redhat/devtools/gateway/view/steps/auth/TokenAuthenticationStrategy.kt @@ -29,6 +29,7 @@ import com.redhat.devtools.gateway.view.ui.PasteClipboardMenu import com.redhat.devtools.gateway.view.ui.PasswordFieldWithToggle import com.redhat.devtools.gateway.DevSpacesContext import com.redhat.devtools.gateway.auth.tls.TlsContext +import com.redhat.devtools.gateway.util.withProgressCancellation import com.redhat.devtools.gateway.openshift.Cluster import com.redhat.devtools.gateway.util.ClipboardTokenMonitor import javax.swing.JComponent @@ -94,15 +95,17 @@ class TokenAuthenticationStrategy( val token = String(tfToken.password) - val client = createValidatedApiClient( - server, - certAuthority, - token, - null, - null, - tlsContext, - "Authentication failed: invalid server URL or token." - ) + val client = withProgressCancellation(indicator) { + createValidatedApiClient( + server, + certAuthority, + token, + null, + null, + tlsContext, + "Authentication failed: invalid server URL or token." + ) + } saveKubeconfig.invoke(selectedCluster, token, indicator) devSpacesContext.client = client diff --git a/src/test/kotlin/com/redhat/devtools/gateway/DevSpacesConnectionReconnectTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/DevSpacesConnectionReconnectTest.kt new file mode 100644 index 00000000..2e46a556 --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/DevSpacesConnectionReconnectTest.kt @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.redhat.devtools.gateway.devworkspace.DevWorkspace +import com.redhat.devtools.gateway.devworkspace.DevWorkspaces +import com.redhat.devtools.gateway.server.RemoteIDEServer +import io.kubernetes.client.openapi.ApiClient +import io.mockk.coEvery +import io.mockk.every +import io.mockk.mockk +import io.mockk.mockkConstructor +import io.mockk.unmockkConstructor +import io.mockk.verify +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.Closeable +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration.Companion.seconds + +class DevSpacesConnectionReconnectTest { + + private lateinit var devSpacesContext: DevSpacesContext + private lateinit var connection: DevSpacesConnection + private lateinit var forwarder: Closeable + private lateinit var apiClient: ApiClient + + @BeforeEach + fun beforeEach() { + val devWorkspace = mockk(relaxed = true) { + every { name } returns "test-workspace" + every { namespace } returns "test-namespace" + every { annotations } returns emptyMap() + } + apiClient = mockk(relaxed = true) + devSpacesContext = mockk(relaxed = true) { + every { this@mockk.devWorkspace } returns devWorkspace + every { client } returns apiClient + } + forwarder = mockk(relaxed = true) + connection = DevSpacesConnection(devSpacesContext) + mockkConstructor(DevWorkspaces::class) + every { anyConstructed().get(any(), any()) } returns devWorkspace + } + + @AfterEach + fun afterEach() { + unmockkConstructor(DevWorkspaces::class) + } + + @Test + fun `#onClientClosed does not tear down session when reconnecting`() { + runBlocking { + connection.onClientClosed(client = null, session = session(reconnecting = true)) + + delay(200) + + verify(exactly = 0) { devSpacesContext.removeWorkspace(any()) } + verify(exactly = 0) { forwarder.close() } + } + } + + @Test + fun `#onClientClosed tears down session when not reconnecting`() { + runBlocking { + val teardownDone = CompletableDeferred() + val remoteIdeServer = mockk(relaxed = true) { + coEvery { waitServerTerminated() } returns false + } + val session = ThinClientSessionContext( + localPort = 12_345, + remoteIdeServer = remoteIdeServer, + forwarder = forwarder, + onConnected = {}, + onDisconnected = { teardownDone.complete(Unit) }, + onDevWorkspaceStopped = {}, + checkCancelled = null, + reconnecting = AtomicBoolean(false), + ) + + connection.onClientClosed(client = null, session = session) + + withTimeout(2.seconds) { + teardownDone.await() + } + + verify(exactly = 1) { devSpacesContext.removeWorkspace(any()) } + verify(exactly = 1) { forwarder.close() } + } + } + + private fun session(reconnecting: Boolean): ThinClientSessionContext = + ThinClientSessionContext( + localPort = 12_345, + remoteIdeServer = mockk(relaxed = true), + forwarder = forwarder, + onConnected = {}, + onDisconnected = {}, + onDevWorkspaceStopped = {}, + checkCancelled = null, + reconnecting = AtomicBoolean(reconnecting), + ) +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgressTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgressTest.kt new file mode 100644 index 00000000..ceeaeceb --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/ForwardRecoveryProgressTest.kt @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.redhat.devtools.gateway.server.RemoteIDEServer +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import io.mockk.mockk +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration.Companion.milliseconds + +class ForwardRecoveryProgressTest { + + private lateinit var scope: CoroutineScope + + @AfterEach + fun tearDown() { + scope.cancel() + } + + @Test + fun `#onPodUnavailable is suppressed while reconnecting`() = runBlocking { + scope = CoroutineScope(SupervisorJob()) + val canceled = AtomicBoolean(false) + val progress = ForwardRecoveryProgress( + scope = scope, + sessionCtx = sessionContext(reconnecting = true), + isWorkspaceRestartInProgress = { false }, + onCanceled = { canceled.set(true) }, + showAfter = 50.milliseconds, + ) + + progress.onPodUnavailable() + delay(100.milliseconds) + + assertThat(canceled).isFalse() + } + + @Test + fun `#onPodUnavailable is suppressed while workspace restart is in progress`() = runBlocking { + scope = CoroutineScope(SupervisorJob()) + val canceled = AtomicBoolean(false) + val progress = ForwardRecoveryProgress( + scope = scope, + sessionCtx = sessionContext(), + isWorkspaceRestartInProgress = { true }, + onCanceled = { canceled.set(true) }, + showAfter = 50.milliseconds, + ) + + progress.onPodUnavailable() + delay(100.milliseconds) + + assertThat(canceled).isFalse() + } + + @Test + fun `#onPodResolved cancels scheduled progress`() = runBlocking { + scope = CoroutineScope(SupervisorJob()) + val canceled = AtomicBoolean(false) + val progress = ForwardRecoveryProgress( + scope = scope, + sessionCtx = sessionContext(), + isWorkspaceRestartInProgress = { false }, + onCanceled = { canceled.set(true) }, + showAfter = 100.milliseconds, + ) + + progress.onPodUnavailable() + progress.onPodResolved() + delay(150.milliseconds) + + assertThat(canceled).isFalse() + } + + private fun sessionContext(reconnecting: Boolean = false): ThinClientSessionContext = + ThinClientSessionContext( + localPort = 42_000, + remoteIdeServer = mockk(relaxed = true), + forwarder = null, + onConnected = {}, + onDisconnected = {}, + onDevWorkspaceStopped = {}, + checkCancelled = null, + reconnecting = AtomicBoolean(reconnecting), + ) +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/PortForwardPodResolverTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/PortForwardPodResolverTest.kt new file mode 100644 index 00000000..06006032 --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/PortForwardPodResolverTest.kt @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.redhat.devtools.gateway.server.RemoteIDEServer +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import io.mockk.coEvery +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import java.io.IOException +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.time.Duration.Companion.milliseconds + +class PortForwardPodResolverTest { + + private val scope = CoroutineScope(SupervisorJob()) + + @AfterEach + fun tearDown() { + scope.cancel() + } + + @Test + fun `#resolve returns pod immediately when tracker is Ready`() = runBlocking { + val pod = podWithUid("pod-a", "uid-1") + val tracker = trackerReturning(PodResolution.Ready(pod)) + val recovery = spyk(forwardRecovery()) + val resolver = PortForwardPodResolver(tracker, sessionContext(), recovery) + + val result = resolver.resolve() + + assertThat(result.pod).isSameAs(pod) + verify(exactly = 1) { recovery.onPodResolved() } + } + + @Test + fun `#resolve waits for pod when tracker is Unavailable`() = runBlocking { + val pod = podWithUid("pod-a", "uid-1") + val remoteIdeServer = mockk() + var calls = 0 + coEvery { remoteIdeServer.refreshPod() } answers { + if (++calls < 2) throw IOException("not running") else pod + } + val tracker = WorkspacePodTracker(remoteIdeServer) + tracker.seed(podWithUid("pod-a", "uid-1")) + val recovery = spyk(forwardRecovery()) + val resolver = PortForwardPodResolver(tracker, sessionContext(), recovery) + + val result = resolver.resolve() + + assertThat(result.pod).isSameAs(pod) + verify(atLeast = 1) { recovery.onPodUnavailable() } + verify(atLeast = 1) { recovery.onPodResolved() } + } + + @Test + fun `#resolve returns rolled pod immediately without waiting for reconnect`() = runBlocking { + val pod = podWithUid("pod-b", "uid-2") + val reconnecting = AtomicBoolean(true) + val tracker = trackerReturning(PodResolution.RollDelegated(pod)) + val recovery = spyk(forwardRecovery()) + val resolver = PortForwardPodResolver(tracker, sessionContext(reconnecting), recovery) + + val result = resolver.resolve() + + assertThat(result.pod).isSameAs(pod) + assertThat(reconnecting.get()).isTrue() + verify(exactly = 1) { recovery.dismiss() } + verify(exactly = 0) { recovery.onPodResolved() } + } + + private fun trackerReturning(vararg outcomes: PodResolution): WorkspacePodTracker { + val remoteIdeServer = mockk(relaxed = true) + val tracker = spyk(WorkspacePodTracker(remoteIdeServer)) + var index = 0 + coEvery { tracker.resolvePod() } answers { + outcomes[minOf(index++, outcomes.size - 1)] + } + return tracker + } + + private fun forwardRecovery(): ForwardRecoveryProgress = + ForwardRecoveryProgress( + scope = scope, + sessionCtx = sessionContext(), + isWorkspaceRestartInProgress = { false }, + onCanceled = {}, + showAfter = 60_000.milliseconds, + ) + + private fun sessionContext(reconnecting: AtomicBoolean = AtomicBoolean(false)): ThinClientSessionContext = + ThinClientSessionContext( + localPort = 42_000, + remoteIdeServer = mockk(relaxed = true), + forwarder = null, + onConnected = {}, + onDisconnected = {}, + onDevWorkspaceStopped = {}, + checkCancelled = null, + reconnecting = reconnecting, + ) + + private fun podWithUid(name: String, uid: String): V1Pod = + V1Pod().metadata(V1ObjectMeta().name(name).uid(uid)) +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/ThinClientReconnectTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/ThinClientReconnectTest.kt new file mode 100644 index 00000000..7e72b71c --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/ThinClientReconnectTest.kt @@ -0,0 +1,510 @@ +/* + * Copyright (c) 2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.intellij.openapi.progress.ProgressIndicator +import com.intellij.openapi.progress.ProgressManager +import com.intellij.openapi.progress.ProcessCanceledException +import com.jetbrains.gateway.thinClientLink.ThinClientHandle +import com.redhat.devtools.gateway.server.RemoteIDEServer +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import io.mockk.mockkStatic +import io.mockk.unmockkStatic +import io.mockk.verify +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.BeforeEach +import io.kubernetes.client.openapi.ApiException +import java.io.IOException +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.TimeoutCancellationException +import kotlin.time.Duration +import java.util.concurrent.atomic.AtomicBoolean + +class ThinClientReconnectTest { + + private lateinit var remoteIdeServer: RemoteIDEServer + private lateinit var oldClient: ThinClientHandle + private lateinit var newClient: ThinClientHandle + private lateinit var sessionCtx: ThinClientSessionContext + private lateinit var reconnecting: AtomicBoolean + + @BeforeEach + fun beforeEach() { + remoteIdeServer = mockk() + oldClient = mockk(relaxed = true) + newClient = mockk(relaxed = true) + reconnecting = AtomicBoolean(false) + sessionCtx = createSessionContext() + every { remoteIdeServer.refreshPod() } returns podNamed("new-pod") + } + + @Test + fun `#onPodRoll fetches new joinLink and calls startThinClient`() { + runBlocking { + stubServerReady(joinLink = "jetbrains://gateway?fp=NEW") + var receivedJoinLink: String? = null + + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> }, + startThinClient = { joinLink, _ -> + receivedJoinLink = joinLink + newClient + }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + coVerify(exactly = 1) { remoteIdeServer.refreshPod() } + coVerify(exactly = 1) { + remoteIdeServer.awaitJoinLink(checkCancelled = any()) + } + assertThat(receivedJoinLink).isEqualTo("jetbrains://gateway?fp=NEW") + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll closes existing thin client before reconnect`() { + runBlocking { + stubServerReady(joinLink = "jetbrains://gateway?fp=NEW") + val closeOrder = mutableListOf() + + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = { closeOrder.add("replaced") }, + onClientClosed = { _, _ -> }, + startThinClient = { _, _ -> + closeOrder.add("started") + newClient + }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + assertThat(closeOrder).containsExactly("started", "replaced") + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll skips duplicate work when reconnecting flag is set`() { + runBlocking { + reconnecting.set(true) + var startCalled = false + + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> }, + startThinClient = { _, _ -> + startCalled = true + newClient + }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify(exactly = 0) { oldClient.close() } + assertThat(startCalled).isFalse() + } + } + + @Test + fun `#onPodRoll tears down session when transient retries exhausted`() { + runBlocking { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } throws + IOException("server unavailable") + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll recovers from transient failure on retry`() { + runBlocking { + val attemptCount = mutableListOf() + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } answers { + attemptCount.add(attemptCount.size + 1) + if (attemptCount.size < 2) throw IOException("not ready yet") + "jetbrains://gateway?fp=RECOVERED" + } + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + assertThat(attemptCount).hasSize(2) + verify { oldClient.close() } + assertThat(clientClosedCalled).isFalse() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll recovers from TimeoutCancellationException on retry`() { + runBlocking { + val attemptCount = mutableListOf() + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } answers { + attemptCount.add(attemptCount.size + 1) + if (attemptCount.size < 2) throw timeoutCancellationException() + "jetbrains://gateway?fp=RECOVERED" + } + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + assertThat(attemptCount).hasSize(2) + verify { oldClient.close() } + assertThat(clientClosedCalled).isFalse() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll tears down session when timeout retries exhausted`() { + runBlocking { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } throws + timeoutCancellationException() + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll tears down session on permanent failure with no retry`() { + runBlocking { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } throws + IOException("no join link") + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + coVerify(exactly = 1) { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll treats startThinClient failure as permanent and tears down`() { + runBlocking { + stubServerReady(joinLink = "jetbrains://gateway?fp=IRRELEVANT") + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> throw IllegalStateException("Could not connect, workspace IDE is not ready.") }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + coVerify(exactly = 1) { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll treats ApiException as permanent and tears down`() { + runBlocking { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } throws + ApiException(403, "Forbidden") + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod")) + + verify { oldClient.close() } + coVerify(exactly = 1) { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll clears reconnecting flag before rethrowing cancellation exception`() { + runBlocking { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } throws + CancellationException("cancelled") + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + var caughtCancellation: CancellationException? = null + try { + thinClientReconnect.onPodRoll(podNamed("new-pod")) + } catch (e: CancellationException) { + caughtCancellation = e + } + + assertThat(caughtCancellation).isNotNull + verify { oldClient.close() } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll updates progress indicator at each reconnect phase`() { + runBlocking { + stubServerReady(joinLink = "jetbrains://gateway?fp=NEW") + val indicator = mockk(relaxed = true) + stubActiveProgressIndicator(indicator) + + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> }, + startThinClient = { _, _ -> newClient }, + ) + + thinClientReconnect.onPodRoll(podNamed("new-pod"), indicator) + + verify { indicator.fraction = 0.0 } + verify { indicator.text = "Closing IDE connection..." } + verify { indicator.fraction = 0.5 } + verify { indicator.text = "Waiting for IDE to be ready..." } + verify { indicator.text2 = "Attempt 1/3" } + verify { indicator.fraction = 1.0 } + verify { indicator.text = "Connecting to IDE..." } + verify { indicator.text2 = null } + } + } + + @Test + fun `#execute runs Task Backgroundable and passes indicator to onPodRoll`() { + runBlocking { + stubServerReady(joinLink = "jetbrains://gateway?fp=NEW") + val indicator = mockk(relaxed = true) + stubActiveProgressIndicator(indicator) + + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> }, + startThinClient = { _, _ -> newClient }, + ) + + runExecuteWithProgressMock(indicator) { + thinClientReconnect.execute(podNamed("new-pod")) + } + + verify { indicator.fraction = 0.0 } + verify { indicator.text = "Closing IDE connection..." } + verify { indicator.fraction = 1.0 } + verify { indicator.text = "Connecting to IDE..." } + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll tears down session when progress is canceled`() { + runBlocking { + val indicator = mockk(relaxed = true) + every { indicator.isRunning } returns true + every { indicator.isCanceled } returnsMany listOf(false, true) + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + var caught: ProcessCanceledException? = null + try { + thinClientReconnect.onPodRoll(podNamed("new-pod"), indicator) + } catch (e: ProcessCanceledException) { + caught = e + } + + assertThat(caught).isNotNull + verify { oldClient.close() } + verify { indicator.text2 = null } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + @Test + fun `#onPodRoll tears down session when canceled during retry delay`() { + runBlocking { + var cancelOnNextProgressCheck = false + val indicator = mockk(relaxed = true) + every { indicator.isRunning } returns true + every { indicator.isCanceled } answers { cancelOnNextProgressCheck } + + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } answers { + cancelOnNextProgressCheck = true + throw IOException("not ready yet") + } + + var clientClosedCalled = false + val thinClientReconnect = createThinClientReconnect( + getCurrentClient = { oldClient }, + onClientHandleReplaced = {}, + onClientClosed = { _, _ -> clientClosedCalled = true }, + startThinClient = { _, _ -> newClient }, + ) + + var caught: ProcessCanceledException? = null + try { + thinClientReconnect.onPodRoll(podNamed("new-pod"), indicator) + } catch (e: ProcessCanceledException) { + caught = e + } + + assertThat(caught).isNotNull + verify { oldClient.close() } + verify { indicator.text2 = "Attempt 1/3" } + verify { indicator.text2 = null } + assertThat(clientClosedCalled).isTrue() + assertThat(reconnecting.get()).isFalse() + } + } + + private fun stubActiveProgressIndicator(indicator: ProgressIndicator) { + every { indicator.isRunning } returns true + every { indicator.isCanceled } returns false + } + + private fun createSessionContext(): ThinClientSessionContext = + ThinClientSessionContext( + localPort = 12_345, + remoteIdeServer = remoteIdeServer, + forwarder = mockk(relaxed = true), + onConnected = {}, + onDisconnected = {}, + onDevWorkspaceStopped = {}, + checkCancelled = null, + reconnecting = reconnecting, + ) + + private fun stubServerReady(joinLink: String) { + coEvery { remoteIdeServer.awaitJoinLink(checkCancelled = any()) } returns joinLink + } + + private fun createThinClientReconnect( + getCurrentClient: () -> ThinClientHandle?, + onClientHandleReplaced: (ThinClientHandle) -> Unit, + onClientClosed: (ThinClientHandle?, ThinClientSessionContext) -> Unit, + startThinClient: suspend (String, ThinClientSessionContext) -> ThinClientHandle, + ): ThinClientReconnect = + ThinClientReconnect( + remoteIdeServer = remoteIdeServer, + sessionCtx = sessionCtx, + getCurrentClient = getCurrentClient, + startThinClient = startThinClient, + onClientHandleReplaced = onClientHandleReplaced, + onClientClosed = onClientClosed, + ) + + private fun podNamed(name: String): V1Pod = + V1Pod().metadata(V1ObjectMeta().name(name).uid("uid-$name")) + + private fun timeoutCancellationException(): TimeoutCancellationException = runBlocking { + try { + withTimeout(0L) { delay(Long.MAX_VALUE) } + error("unreachable") + } catch (e: TimeoutCancellationException) { + e + } + } + + private fun runExecuteWithProgressMock(indicator: ProgressIndicator, body: () -> Unit) { + mockkStatic(ProgressManager::class) + try { + val pm = mockk() + every { ProgressManager.getInstance() } returns pm + every { pm.run(any()) } answers { + firstArg().run(indicator) + } + body() + } finally { + unmockkStatic(ProgressManager::class) + } + } +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/WorkspacePodTrackerTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/WorkspacePodTrackerTest.kt new file mode 100644 index 00000000..fb0e7803 --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/WorkspacePodTrackerTest.kt @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway + +import com.redhat.devtools.gateway.server.RemoteIDEServer +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import io.mockk.coEvery +import io.mockk.mockk +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger + +class WorkspacePodTrackerTest { + + private lateinit var remoteIdeServer: RemoteIDEServer + private lateinit var tracker: WorkspacePodTracker + + @BeforeEach + fun beforeEach() { + remoteIdeServer = mockk() + tracker = WorkspacePodTracker(remoteIdeServer) + } + + @Test + fun `#seed and first resolvePod with same UID does not fire onPodRoll`() { + runBlocking { + val pod = podWithUid("pod-a", "uid-1") + tracker.seed(pod) + coEvery { remoteIdeServer.refreshPod() } returns pod + + val rollCount = AtomicInteger(0) + tracker.onPodRoll = { rollCount.incrementAndGet() } + + val result = tracker.resolvePod() + + assertThat(result).isInstanceOf(PodResolution.Ready::class.java) + assertThat(rollCount.get()).isZero() + } + } + + @Test + fun `#resolvePod fires onPodRoll when UID changes`() { + runBlocking { + val podA = podWithUid("pod-a", "uid-1") + val podB = podWithUid("pod-b", "uid-2") + tracker.seed(podA) + coEvery { remoteIdeServer.refreshPod() } returns podB + + var rolledPod: V1Pod? = null + tracker.onPodRoll = { rolledPod = it } + + val result = tracker.resolvePod() + + assertThat(rolledPod).isSameAs(podB) + assertThat(result).isEqualTo(PodResolution.RollDelegated(podB)) + } + } + + @Test + fun `#resolvePod returns Unavailable and does not fire onPodRoll when refreshPod throws`() { + runBlocking { + tracker.seed(podWithUid("pod-a", "uid-1")) + coEvery { remoteIdeServer.refreshPod() } throws IOException("not running") + + val rollCount = AtomicInteger(0) + tracker.onPodRoll = { rollCount.incrementAndGet() } + + val result = tracker.resolvePod() + + assertThat(result).isEqualTo(PodResolution.Unavailable) + assertThat(rollCount.get()).isZero() + } + } + + @Test + fun `#resolvePod uses pod name as fallback when uid is null`() { + runBlocking { + val podA = podWithNameOnly("pod-a") + val podB = podWithNameOnly("pod-b") + tracker.seed(podA) + coEvery { remoteIdeServer.refreshPod() } returns podB + + var rolled = false + tracker.onPodRoll = { rolled = true } + + tracker.resolvePod() + + assertThat(rolled).isTrue() + } + } + + @Test + fun `#resolvePod does not fire onPodRoll when workspace restart is in progress`() { + runBlocking { + val podA = podWithUid("pod-a", "uid-1") + val podB = podWithUid("pod-b", "uid-2") + tracker.seed(podA) + coEvery { remoteIdeServer.refreshPod() } returns podB + + val rollCount = AtomicInteger(0) + val trackingTracker = WorkspacePodTracker( + remoteIdeServer, + isWorkspaceRestartInProgress = { true }, + ) + trackingTracker.seed(podA) + trackingTracker.onPodRoll = { rollCount.incrementAndGet() } + + val result = trackingTracker.resolvePod() + + assertThat(result).isEqualTo(PodResolution.RestartSuppressed) + assertThat(rollCount.get()).isZero() + } + } + + @Test + fun `#resolvePod does not fire onPodRoll twice during re-entrant resolvePod`() { + runBlocking { + val podA = podWithUid("pod-a", "uid-1") + val podB = podWithUid("pod-b", "uid-2") + tracker.seed(podA) + coEvery { remoteIdeServer.refreshPod() } returns podB + + val rollCount = AtomicInteger(0) + tracker.onPodRoll = { + rollCount.incrementAndGet() + tracker.resolvePod() + } + + tracker.resolvePod() + + assertThat(rollCount.get()).isEqualTo(1) + } + } + + private fun podWithUid(name: String, uid: String): V1Pod = + V1Pod().metadata(V1ObjectMeta().name(name).uid(uid)) + + private fun podWithNameOnly(name: String): V1Pod = + V1Pod().metadata(V1ObjectMeta().name(name)) +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestartTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestartTest.kt index e27fe702..bdd2fede 100644 --- a/src/test/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestartTest.kt +++ b/src/test/kotlin/com/redhat/devtools/gateway/devworkspace/DevWorkspaceRestartTest.kt @@ -52,6 +52,7 @@ class DevWorkspaceRestartTest { pods = mockk(relaxed = true) thinClient = mockk(relaxed = true) indicator = mockk(relaxed = true) + every { indicator.isRunning } returns true remoteIDEServer = mockk(relaxed = true) devSpacesConnection = mockk(relaxed = true) @@ -66,7 +67,7 @@ class DevWorkspaceRestartTest { // Default: no pods remaining every { pods.list(any(), any()) } returns V1PodList().items(emptyList()) // Default: IDE ready and connection succeed (`just Awaits` never completes — hangs runTest) - coJustRun { remoteIDEServer.waitServerReady() } + coJustRun { remoteIDEServer.awaitJoinLink(pod = any(), checkCancelled = any()) } coEvery { devSpacesConnection.connect(any(), any(), any(), any(), any(), any()) } returns mockk(relaxed = true) } @@ -178,7 +179,7 @@ class DevWorkspaceRestartTest { // when restart.restart(thinClient, indicator) // then - coVerify { remoteIDEServer.waitServerReady() } + coVerify { remoteIDEServer.awaitJoinLink(pod = any(), checkCancelled = any()) } } @Test @@ -195,7 +196,7 @@ class DevWorkspaceRestartTest { restart.restart(thinClient, indicator) // then coVerifyOrder { - remoteIDEServer.waitServerReady() + remoteIDEServer.awaitJoinLink(pod = any(), checkCancelled = any()) devSpacesConnection.connect(any(), any(), any(), any(), any(), any()) } } @@ -240,7 +241,7 @@ class DevWorkspaceRestartTest { workspaces.startAndWait(namespace, workspaceName, any()) } coVerifyOrder { - remoteIDEServer.waitServerReady() + remoteIDEServer.awaitJoinLink(pod = any(), checkCancelled = any()) devSpacesConnection.connect(any(), any(), any(), any(), any(), any()) } } @@ -285,7 +286,7 @@ class DevWorkspaceRestartTest { fun `#doRestart fails when IDE ready fails`() = runTest { // given val exception = RuntimeException("IDE not ready") - coEvery { remoteIDEServer.waitServerReady() } throws exception + coEvery { remoteIDEServer.awaitJoinLink(pod = any(), checkCancelled = any()) } throws exception // when val result = runCatching { restart.restart(thinClient, indicator) } // then diff --git a/src/test/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePodsTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePodsTest.kt index 7f48f2b2..1417ae9a 100644 --- a/src/test/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePodsTest.kt +++ b/src/test/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspacePodsTest.kt @@ -11,6 +11,7 @@ */ package com.redhat.devtools.gateway.openshift +import com.redhat.devtools.gateway.openshift.PodForwardResolution import io.kubernetes.client.PortForward import io.kubernetes.client.openapi.ApiClient import io.kubernetes.client.openapi.ApiException @@ -18,16 +19,21 @@ import io.kubernetes.client.openapi.apis.CoreV1Api import io.kubernetes.client.openapi.models.V1ListMeta import io.kubernetes.client.openapi.models.V1ObjectMeta import io.kubernetes.client.openapi.models.V1Pod +import io.kubernetes.client.openapi.models.V1PodCondition import io.kubernetes.client.openapi.models.V1PodList +import io.kubernetes.client.openapi.models.V1PodStatus import io.mockk.every import io.mockk.mockk import io.mockk.mockkConstructor import io.mockk.slot import io.mockk.unmockkConstructor import io.mockk.verify +import java.time.OffsetDateTime import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import java.util.concurrent.atomic.AtomicInteger import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.AfterEach @@ -35,6 +41,7 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream +import java.io.Closeable import java.io.IOException import java.net.ServerSocket import java.net.Socket @@ -76,9 +83,13 @@ class DevWorkspacePodsTest { fun `#forward copies from server to client`() { // given val portForwardResult = mockk(relaxed = true) + val connectionReady = CompletableDeferred() every { anyConstructed().forward(pod, listOf(remotePort)) - } returns portForwardResult + } answers { + connectionReady.complete(Unit) + portForwardResult + } val serverIn = ByteArrayInputStream(serverData.toByteArray()) every { portForwardResult.getInputStream(remotePort) @@ -89,44 +100,232 @@ class DevWorkspacePodsTest { } returns serverOut // when - val closeable = pods.forward(pod, localPort, remotePort) - - // then - // wait for the server to start - runBlocking { delay(100.milliseconds) } - - closeable.use { closeable -> - // Verify that data from server input stream is received by client - val bytesRead = sendClientData("ping") // Send data to trigger server response + val closeable = forwardPod() + + // then — keep socket open for data exchange, then close + runBlocking { + val socket = Socket("127.0.0.1", localPort) + connectionReady.await() + socket.outputStream.write("ping".toByteArray()) + socket.outputStream.flush() + val bytesRead = socket.inputStream.read(buffer) assertThat(String(buffer, 0, bytesRead)).isEqualTo(serverData) + socket.close() } + + closeable.use {} } @Test fun `#forward tries several times if connecting fails`() { // given + val forwardCount = AtomicInteger(0) + val handlerFinished = CompletableDeferred() every { anyConstructed().forward(pod, listOf(remotePort)) - } throws mockk(relaxed = true) + } answers { + forwardCount.incrementAndGet() + if (forwardCount.get() >= 2) handlerFinished.complete(Unit) + throw IOException("unavailable") + } // when - val closeable = pods.forward(pod, localPort, remotePort) + val closeable = pods.forward( + resolvePod = { + if (forwardCount.get() >= 2) throw SignalExit() else PodForwardResolution(pod) + }, + localPort = localPort, + remotePort = remotePort, + reconnectTimeoutSeconds = 30, + reconnectDelaySeconds = 1, + ) + + runBlocking { + delay(100.milliseconds) + Socket("127.0.0.1", localPort).use { it.close() } + handlerFinished.await() + } // then - // wait for the server to start - runBlocking { delay(100.milliseconds) } - Socket("127.0.0.1", localPort).apply { - close() // trigger retry + closeable.use { + verify(atLeast = 2) { + anyConstructed().forward(pod, listOf(remotePort)) + } } - runBlocking { delay(6000.milliseconds) } // 5 attempts * 1 second + } - closeable.use { closeable -> - verify(atLeast = 2) { // 2+ retries - anyConstructed().forward(pod, listOf(remotePort)) + @Test + fun `#forward reconnects to new pod after stream failure`() { + // given + val podA = podNamed("pod-a") + val podB = podNamed("pod-b") + var resolveCall = 0 + val connectionReady = CompletableDeferred() + val portForwardResult = mockk(relaxed = true) + every { + portForwardResult.getInputStream(remotePort) + } answers { + connectionReady.complete(Unit) // signal that successful forward is established + ByteArrayInputStream(serverData.toByteArray()) + } + every { + portForwardResult.getOutboundStream(remotePort) + } returns ByteArrayOutputStream() + every { + anyConstructed().forward(any(), listOf(remotePort)) + } answers { + when ((args[0] as V1Pod).metadata?.name) { + "pod-a" -> throw IOException("pod unavailable") + else -> portForwardResult + } + } + + // when — cap resolveCall so all connections after first get podB + val closeable = pods.forward( + resolvePod = { + when (resolveCall++) { + 0 -> PodForwardResolution(podA) + else -> PodForwardResolution(podB) + } + }, + localPort = localPort, + remotePort = remotePort, + reconnectTimeoutSeconds = 10, + reconnectDelaySeconds = 1, + ) + + // Connect client, wait for forward to succeed, verify data flow, then close + runBlocking { + val socket = Socket("127.0.0.1", localPort) + connectionReady.await() // waits until forward succeeds (podB) + // Send data and read response + socket.outputStream.write("ping".toByteArray()) + socket.outputStream.flush() + val bytesRead = socket.inputStream.read(buffer) + assertThat(String(buffer, 0, bytesRead)).isEqualTo(serverData) + socket.close() + } + + // then + closeable.use { + verify(atLeast = 2) { + anyConstructed().forward(any(), listOf(remotePort)) } } } + @Test + fun `#forward stops retrying after reconnectTimeoutSeconds`() { + // given — forward mock signals after 2 calls; fetchPod exits on signal + val forwardCount = AtomicInteger(0) + val stopRetry = CompletableDeferred() + val handlerFinished = CompletableDeferred() + + every { + anyConstructed().forward(any(), listOf(remotePort)) + } answers { + val count = forwardCount.incrementAndGet() + if (count >= 2 && !handlerFinished.isCompleted) handlerFinished.complete(Unit) + throw IOException("unavailable") + } + + val closeable = pods.forward( + resolvePod = { + if (stopRetry.isCompleted) throw SignalExit() else PodForwardResolution(pod) + }, + localPort = localPort, + remotePort = remotePort, + reconnectTimeoutSeconds = 4, + reconnectDelaySeconds = 1, + ) + + runBlocking { + delay(100.milliseconds) + Socket("127.0.0.1", localPort).use { it.close() } + handlerFinished.await() // wait for 2 forward calls + stopRetry.complete(Unit) // signal fetchPod to exit on next retry + delay(2500.milliseconds) // let retry loop process the signal (delay + fetchPod) + } + + // then — retries bounded by timeout (2 attempts with 4s timeout / 2s delay) + closeable.use { + assertThat(forwardCount.get()).isLessThanOrEqualTo(2) + verify(atMost = 2) { + anyConstructed().forward(any(), listOf(remotePort)) + } + } + } + + @Test + fun `#forward stops retrying when client disconnects`() { + // given — signal after first forward call, then fetchPod exits on next retry + val firstForwardDone = CompletableDeferred() + every { + anyConstructed().forward(any(), listOf(remotePort)) + } answers { + if (!firstForwardDone.isCompleted) firstForwardDone.complete(Unit) + throw IOException("unavailable") + } + + val closeable = pods.forward( + resolvePod = { + if (firstForwardDone.isCompleted) throw SignalExit() else PodForwardResolution(pod) + }, + localPort = localPort, + remotePort = remotePort, + reconnectTimeoutSeconds = 30, + reconnectDelaySeconds = 1, + ) + + runBlocking { + Socket("127.0.0.1", localPort).use { it.close() } + firstForwardDone.await() // wait for first forward call + delay(3000.milliseconds) // let retry loop process signal (delay + fetchPod) + } + + // then + closeable.use { + verify(exactly = 1) { + anyConstructed().forward(any(), listOf(remotePort)) + } + } + } + + @Test + fun `#forward uses retry delay from PodForwardResolution when pod is unavailable`() { + val resolveCount = AtomicInteger(0) + val closeable = pods.forward( + resolvePod = { + resolveCount.incrementAndGet() + PodForwardResolution(null, retryDelaySeconds = 1) + }, + localPort = localPort, + remotePort = remotePort, + reconnectTimeoutSeconds = 3, + reconnectDelaySeconds = 5, + ) + + runBlocking { + delay(100.milliseconds) + Socket("127.0.0.1", localPort).use { it.close() } + delay(2500.milliseconds) + } + + closeable.use { + assertThat(resolveCount.get()).isGreaterThanOrEqualTo(2) + } + } + + private fun forwardPod(reconnectTimeoutSeconds: Long = 30L): Closeable = + pods.forward({ PodForwardResolution(pod) }, localPort, remotePort, reconnectTimeoutSeconds, reconnectDelaySeconds = 1) + + /** Exception used by tests to signal the retry loop to exit deterministically. */ + private class SignalExit : RuntimeException() + + private fun podNamed(name: String): V1Pod = V1Pod().apply { + metadata = V1ObjectMeta().apply { this.name = name } + } + private fun sendClientData(data: String): Int { Socket("127.0.0.1", localPort).use { // client to server @@ -140,6 +339,22 @@ class DevWorkspacePodsTest { } } + private fun runningReadyPod(name: String, creationTimestamp: String): V1Pod = V1Pod().apply { + metadata = V1ObjectMeta().apply { + this.name = name + this.creationTimestamp = OffsetDateTime.parse(creationTimestamp) + } + status = V1PodStatus().apply { + phase = "Running" + conditions = listOf( + V1PodCondition().apply { + type = "Ready" + status = "True" + } + ) + } + } + private fun findFreePort(): Int { return ServerSocket(0).use { it.localPort } } @@ -375,4 +590,77 @@ class DevWorkspacePodsTest { assertThat(callCount).isGreaterThanOrEqualTo(3) // Had pods, error, then success unmockkConstructor(CoreV1Api::class) } + + @Test + fun `#findFirstRunning returns newest pod when multiple are running and ready`() { + // given + mockkConstructor(CoreV1Api::class) + val oldPod = runningReadyPod("pod-old", "2024-01-01T00:00:00Z") + val newPod = runningReadyPod("pod-new", "2024-01-02T00:00:00Z") + every { + anyConstructed().listNamespacedPod(any()) + } returns mockk { + every { labelSelector(any()) } returns mockk { + every { execute() } returns V1PodList().apply { items = listOf(oldPod, newPod) } + } + } + + // when + val result = pods.findFirstRunning("ns", "label=foo") + + // then + assertThat(result?.metadata?.name).isEqualTo("pod-new") + unmockkConstructor(CoreV1Api::class) + } + + @Test + fun `#findFirstRunning excludes terminating pods`() { + // given + mockkConstructor(CoreV1Api::class) + val terminating = runningReadyPod("terminating", "2024-01-02T00:00:00Z").apply { + metadata.deletionTimestamp = OffsetDateTime.parse("2024-01-03T00:00:00Z") + } + val healthy = runningReadyPod("healthy", "2024-01-01T00:00:00Z") + every { + anyConstructed().listNamespacedPod(any()) + } returns mockk { + every { labelSelector(any()) } returns mockk { + every { execute() } returns V1PodList().apply { items = listOf(terminating, healthy) } + } + } + + // when + val result = pods.findFirstRunning("ns", "label=foo") + + // then + assertThat(result?.metadata?.name).isEqualTo("healthy") + unmockkConstructor(CoreV1Api::class) + } + + @Test + fun `#findFirstRunning returns null when no pod is running and ready`() { + // given + mockkConstructor(CoreV1Api::class) + val notReady = V1Pod().apply { + metadata = V1ObjectMeta().apply { + name = "not-ready" + creationTimestamp = OffsetDateTime.parse("2024-01-01T00:00:00Z") + } + status = V1PodStatus().apply { phase = "Running" } + } + every { + anyConstructed().listNamespacedPod(any()) + } returns mockk { + every { labelSelector(any()) } returns mockk { + every { execute() } returns V1PodList().apply { items = listOf(notReady) } + } + } + + // when + val result = pods.findFirstRunning("ns", "label=foo") + + // then + assertThat(result).isNull() + unmockkConstructor(CoreV1Api::class) + } } \ No newline at end of file diff --git a/src/test/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServerTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServerTest.kt index 76215d3c..48274d4b 100644 --- a/src/test/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServerTest.kt +++ b/src/test/kotlin/com/redhat/devtools/gateway/server/RemoteIDEServerTest.kt @@ -16,7 +16,9 @@ import com.redhat.devtools.gateway.openshift.DevWorkspacePods import io.kubernetes.client.openapi.models.V1Container import io.kubernetes.client.openapi.models.V1ObjectMeta import io.kubernetes.client.openapi.models.V1Pod +import io.kubernetes.client.openapi.models.V1PodCondition import io.kubernetes.client.openapi.models.V1PodSpec +import io.kubernetes.client.openapi.models.V1PodStatus import io.mockk.* import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat @@ -31,31 +33,16 @@ class RemoteIDEServerTest { private lateinit var devSpacesContext: DevSpacesContext private lateinit var remoteIDEServer: RemoteIDEServer + private lateinit var mockPod: V1Pod @BeforeEach fun beforeEach() { devSpacesContext = mockk(relaxed = true) mockkConstructor(DevWorkspacePods::class) - val mockPod = V1Pod().apply { - metadata = V1ObjectMeta().apply { - name = "test-pod" - } - spec = V1PodSpec().apply { - containers = listOf( - V1Container().apply { - name = "test-container" - ports = listOf( - mockk(relaxed = true) { - every { name } returns "idea-server" - } - ) - } - ) - } - } - coEvery { - anyConstructed().findFirst(any(), any()) + mockPod = runningPod("test-pod") + every { + anyConstructed().findFirstRunning(any(), any()) } returns mockPod remoteIDEServer = spyk(RemoteIDEServer(devSpacesContext), recordPrivateCalls = true) @@ -66,6 +53,177 @@ class RemoteIDEServerTest { unmockkAll() } + @Test + fun `#getPod queries cluster and returns cached pod on subsequent calls`() { + // given + // when + val first = remoteIDEServer.getPod() + val second = remoteIDEServer.getPod() + // then + assertThat(first.metadata?.name).isEqualTo("test-pod") + assertThat(second).isSameAs(first) + verify(exactly = 1) { + anyConstructed().findFirstRunning(any(), any()) + } + } + + @Test + fun `#getContainer returns IDE container from workspace pod`() { + // given + // when + val container = remoteIDEServer.getContainer() + // then + assertThat(container.name).isEqualTo("test-container") + verify(exactly = 1) { + anyConstructed().findFirstRunning(any(), any()) + } + } + + @Test + fun `#refreshPod re-queries cluster and returns new pod when it changes`() { + // given + val firstPod = runningPod("pod-v1") + val secondPod = runningPod("pod-v2") + every { + anyConstructed().findFirstRunning(any(), any()) + } returnsMany listOf(firstPod, secondPod) + // when + val refreshedPod1 = remoteIDEServer.refreshPod() + val refreshedPod2 = remoteIDEServer.refreshPod() + // then + assertThat(refreshedPod1.metadata?.name).isEqualTo(firstPod.metadata?.name) + assertThat(refreshedPod2.metadata?.name).isEqualTo(secondPod.metadata?.name) + } + + @Test + fun `#refreshPod throws when no running pod exists`() { + // given + every { + anyConstructed().findFirstRunning(any(), any()) + } returns null + // then + assertThrows { + remoteIDEServer.refreshPod() + } + } + + @Test + fun `#setPod seeds cache without querying cluster`() { + // when + val pod = runningPod("cached-pod") + remoteIDEServer.setPod(pod) + // when + val result = remoteIDEServer.getPod() + // then + assertThat(result.metadata?.name).isEqualTo("cached-pod") + assertThat(result).isSameAs(pod) + verify(exactly = 0) { + anyConstructed().findFirstRunning(any(), any()) + } + } + + @Test + fun `#fetchStatus uses short exec timeout for status probes`() { + val cachedPod = runningPod("cached-pod") + remoteIDEServer.setPod(cachedPod) + val readyJson = """{"joinLink":"https://ready","httpLink":"","gatewayLink":"","appVersion":"","runtimeVersion":"","projects":[]}""" + val execTimeout = slot() + coEvery { + anyConstructed().exec(any(), any(), any(), capture(execTimeout), any()) + } returns readyJson + + runBlocking { + remoteIDEServer.fetchStatus() + } + + assertThat(execTimeout.captured).isEqualTo(RemoteIDEServer.STATUS_EXEC_TIMEOUT) + } + + @Test + fun `#waitServerReady error message uses configured timeout`() { + coEvery { + remoteIDEServer.fetchStatus(checkCancelled = any()) + } returns remoteIDEServerStatus(null, arrayOf(projectInfo("death star"))) + + val error = assertThrows { + runBlocking { + remoteIDEServer.waitServerReady(timeout = 5) + } + } + + assertThat(error.message).isEqualTo("Workspace IDE is not ready after 5 seconds.") + } + + @Test + fun `#fetchStatus uses cached pod after setPod`() { + // given + val cachedPod = runningPod("cached-pod") + remoteIDEServer.setPod(cachedPod) + val readyJson = """{"joinLink":"https://ready","httpLink":"","gatewayLink":"","appVersion":"","runtimeVersion":"","projects":[]}""" + var execPod: V1Pod? = null + coEvery { + anyConstructed().exec(any(), any(), any(), any(), any()) + } answers { + execPod = firstArg() + readyJson + } + // when + runBlocking { + remoteIDEServer.fetchStatus() + } + + verify(exactly = 0) { + anyConstructed().findFirstRunning(any(), any()) + } + // then + assertThat(execPod).isSameAs(cachedPod) + } + + @Test + fun `#fetchStatus does not call refreshPod after setPod`() { + // given + val cachedPod = runningPod("cached-pod") + remoteIDEServer.setPod(cachedPod) + val readyJson = """{"joinLink":"https://ready","httpLink":"","gatewayLink":"","appVersion":"","runtimeVersion":"","projects":[]}""" + var execPod: V1Pod? = null + coEvery { + anyConstructed().exec(any(), any(), any(), any(), any()) + } answers { + execPod = firstArg() + readyJson + } + // when + runBlocking { + remoteIDEServer.fetchStatus() + } + + verify(exactly = 0) { + anyConstructed().findFirstRunning(any(), any()) + } + // then + assertThat(execPod).isSameAs(cachedPod) + } + + @Test + fun `#awaitJoinLink seeds pod waits for ready server and returns join link`() = runBlocking { + val pod = runningPod("rolled-pod") + coEvery { remoteIDEServer.waitServerReady(checkCancelled = any(), timeout = any()) } returns true + coEvery { remoteIDEServer.fetchStatus(checkCancelled = any()) } returns RemoteIDEServerStatus( + "https://join", + "", + "", + "", + "", + null, + ) + + val link = remoteIDEServer.awaitJoinLink(pod) + + assertThat(link).isEqualTo("https://join") + verify { remoteIDEServer.setPod(pod) } + coVerify { remoteIDEServer.waitServerReady(checkCancelled = any(), timeout = any()) } + } + @Test fun `#waitServerReady should reach timeout and throw if server status has no join link`() { // given @@ -76,7 +234,7 @@ class RemoteIDEServerTest { ) ) coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } returns withoutJoinLink // when, then @@ -95,7 +253,7 @@ class RemoteIDEServerTest { null ) coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } returns withoutProjects // when, then @@ -116,7 +274,7 @@ class RemoteIDEServerTest { ) ) coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } returns withoutJoinLink // when @@ -136,7 +294,7 @@ class RemoteIDEServerTest { null ) coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } returns withoutProjects // when @@ -152,7 +310,7 @@ class RemoteIDEServerTest { fun `#waitServerTerminated should return false on timeout`() { // given coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } returns remoteIDEServerStatus( // running server has join link and projects "https://starwars.galaxy?peridea", @@ -174,7 +332,7 @@ class RemoteIDEServerTest { fun `#waitServerTerminated should return false on exception`() { // given coEvery { - remoteIDEServer.getStatus() + remoteIDEServer.fetchStatus() } throws IOException("error") // when @@ -186,6 +344,32 @@ class RemoteIDEServerTest { assertThat(result).isFalse } + private fun runningPod(name: String): V1Pod { + return V1Pod().apply { + metadata = V1ObjectMeta().apply { + this.name = name + uid = name + } + spec = V1PodSpec().apply { + containers = listOf( + V1Container().apply { + this.name = "test-container" + ports = listOf(mockk(relaxed = true)) + } + ) + } + status = V1PodStatus().apply { + phase = "Running" + conditions = listOf( + V1PodCondition().apply { + type = "Ready" + status = "True" + } + ) + } + } + } + private fun remoteIDEServerStatus(joinLink: String? = null, projects: Array?): RemoteIDEServerStatus { return RemoteIDEServerStatus( joinLink, diff --git a/src/test/kotlin/com/redhat/devtools/gateway/util/PodIdentityTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/util/PodIdentityTest.kt new file mode 100644 index 00000000..b19f2d61 --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/util/PodIdentityTest.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2024-2025 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import io.kubernetes.client.openapi.models.V1ObjectMeta +import io.kubernetes.client.openapi.models.V1Pod +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import java.time.OffsetDateTime + +class PodIdentityTest { + + @Test + fun `returns name and uid when both are present`() { + // given + val pod = V1Pod().metadata(V1ObjectMeta().name("workspace-pod-abc123").uid("a1b2c3d4-e5f6-7890")) + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("workspace-pod-abc123 (uid=a1b2c3d4-e5f6-7890)") + } + + @Test + fun `returns unknown name when metadata name is null`() { + // given + val pod = V1Pod().metadata(V1ObjectMeta().uid("a1b2c3d4-e5f6-7890")) + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("unknown (uid=a1b2c3d4-e5f6-7890)") + } + + @Test + fun `returns unknown uid when metadata uid is null`() { + // given + val pod = V1Pod().metadata(V1ObjectMeta().name("workspace-pod-xyz789")) + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("workspace-pod-xyz789 (uid=unknown)") + } + + @Test + fun `returns unknown for both when metadata is null`() { + // given + val pod = V1Pod() + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("unknown (uid=unknown)") + } + + @Test + fun `includes terminating when deletionTimestamp is present`() { + // given + val pod = V1Pod().metadata(V1ObjectMeta().name("terminating-pod").uid("e5f6a7b8-c9d0-1234").deletionTimestamp(OffsetDateTime.now())) + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("terminating-pod (uid=e5f6a7b8-c9d0-1234, terminating)") + } + + @Test + fun `does not include terminating when deletionTimestamp is null`() { + // given + val pod = V1Pod().metadata(V1ObjectMeta().name("running-pod").uid("e5f6a7b8-c9d0-1234")) + + // when + val result = podLogIdentity(pod) + + // then + assertThat(result).isEqualTo("running-pod (uid=e5f6a7b8-c9d0-1234)") + } +} diff --git a/src/test/kotlin/com/redhat/devtools/gateway/util/ProgressCancellationTest.kt b/src/test/kotlin/com/redhat/devtools/gateway/util/ProgressCancellationTest.kt new file mode 100644 index 00000000..f6842c1e --- /dev/null +++ b/src/test/kotlin/com/redhat/devtools/gateway/util/ProgressCancellationTest.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2026 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package com.redhat.devtools.gateway.util + +import com.intellij.openapi.progress.ProcessCanceledException +import com.intellij.openapi.progress.ProgressIndicator +import io.mockk.every +import io.mockk.mockk +import io.mockk.verify +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test + +class ProgressCancellationTest { + + @Test + fun `#checkProgressCanceled throws when indicator is stopped`() { + val indicator = mockk(relaxed = true) + every { indicator.isCanceled } returns false + every { indicator.isRunning } returns false + + var caught: ProcessCanceledException? = null + try { + checkProgressCanceled(indicator) + } catch (e: ProcessCanceledException) { + caught = e + } + + assertThat(caught).isNotNull + } + + @Test + fun `#updateProgress throws when indicator is canceled`() { + val indicator = mockk(relaxed = true) + every { indicator.isRunning } returns true + every { indicator.isCanceled } returns true + + var caught: ProcessCanceledException? = null + try { + indicator.updateProgress("Waiting for IDE to be ready...", 0.5) + } catch (e: ProcessCanceledException) { + caught = e + } + + assertThat(caught).isNotNull + verify(exactly = 0) { indicator.fraction = any() } + } + + @Test + fun `#updateProgress treats disposed indicator write failure as cancellation`() { + val indicator = mockk(relaxed = true) + every { indicator.isRunning } returns true + every { indicator.isCanceled } returns false + every { indicator.fraction = any() } throws IllegalArgumentException("indicator is disposed") + + var caught: ProcessCanceledException? = null + try { + indicator.updateProgress("Waiting for IDE to be ready...", 0.5) + } catch (e: ProcessCanceledException) { + caught = e + } + + assertThat(caught).isNotNull + } +}