From 26b9e7037470b6a1328fbb9d858081644522e496 Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Tue, 26 May 2026 17:23:08 -0400 Subject: [PATCH] feat(gateway): add reconciler lease for HA multi-replica deployments Introduce a database-backed reconciler lease so that only one gateway replica runs the watch and reconcile loops in Postgres-backed HA deployments. SQLite (single-replica) deployments skip the lease and run unconditionally as before. The lease is a lightweight JSON record in the objects table using CAS for cross-replica safety. A lease coordinator on each replica attempts acquisition, runs renewal while holding, and releases on shutdown for fast failover. Watch and reconcile loops now accept a cancellation channel for cooperative shutdown. Implements #1429 Signed-off-by: Derek Carr --- crates/openshell-server/src/compute/lease.rs | 408 ++++++++++++++++++ crates/openshell-server/src/compute/mod.rs | 169 ++++++-- crates/openshell-server/src/lib.rs | 5 +- .../openshell-server/src/persistence/mod.rs | 6 + 4 files changed, 561 insertions(+), 27 deletions(-) create mode 100644 crates/openshell-server/src/compute/lease.rs diff --git a/crates/openshell-server/src/compute/lease.rs b/crates/openshell-server/src/compute/lease.rs new file mode 100644 index 000000000..e4bae0ce7 --- /dev/null +++ b/crates/openshell-server/src/compute/lease.rs @@ -0,0 +1,408 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Reconciler lease for HA multi-replica gateway deployments. +//! +//! A single global lease stored in the `objects` table determines which +//! replica runs the watch and reconcile loops. All replicas continue +//! serving gRPC requests regardless of lease ownership. +//! +//! The lease payload is a small JSON blob — no protobuf definition needed. +//! CAS via `put_if` / `delete_if` provides cross-replica safety; the lease +//! is an optimization to reduce contention, not a correctness mechanism. + +use crate::persistence::{PersistenceError, Store, WriteCondition}; +use openshell_core::time::now_ms; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::Duration; +use thiserror::Error; + +const LEASE_OBJECT_TYPE: &str = "reconciler_lease"; +const LEASE_SINGLETON_ID: &str = "singleton"; +const LEASE_SINGLETON_NAME: &str = "reconciler-lease"; + +pub const LEASE_TTL: Duration = Duration::from_secs(30); +pub const LEASE_RENEWAL_INTERVAL: Duration = Duration::from_secs(10); +pub const LEASE_ACQUIRE_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug, Error)] +pub enum LeaseError { + #[error("lease is already held by another replica")] + AlreadyHeld, + #[error("lease not found")] + NotFound, + #[error("lease CAS conflict — another replica wrote first")] + Conflict, + #[error("persistence error: {0}")] + Store(#[from] PersistenceError), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct LeasePayload { + holder: String, + acquired_at_ms: i64, +} + +#[derive(Debug, Clone)] +pub struct LeaseRecord { + pub holder: String, + pub acquired_at_ms: i64, + pub resource_version: u64, + pub updated_at_ms: i64, +} + +#[derive(Debug)] +pub struct LeaseGuard { + resource_version: u64, + acquired_at_ms: i64, +} + +impl LeaseGuard { + pub fn resource_version(&self) -> u64 { + self.resource_version + } +} + +pub struct ReconcilerLease { + store: Arc, + replica_id: String, + ttl: Duration, +} + +impl ReconcilerLease { + pub fn new(store: Arc, replica_id: String, ttl: Duration) -> Self { + Self { + store, + replica_id, + ttl, + } + } + + pub fn replica_id(&self) -> &str { + &self.replica_id + } + + /// Attempt to create the lease record. Succeeds only if no lease exists. + pub async fn try_acquire(&self) -> Result { + let acquired_at_ms = now_ms(); + let payload = LeasePayload { + holder: self.replica_id.clone(), + acquired_at_ms, + }; + let payload_bytes = serde_json::to_vec(&payload) + .map_err(|e| PersistenceError::Encode(e.to_string()))?; + + match self + .store + .put_if( + LEASE_OBJECT_TYPE, + LEASE_SINGLETON_ID, + LEASE_SINGLETON_NAME, + &payload_bytes, + None, + WriteCondition::MustCreate, + ) + .await + { + Ok(result) => Ok(LeaseGuard { + resource_version: result.resource_version, + acquired_at_ms, + }), + Err(PersistenceError::UniqueViolation { .. }) => Err(LeaseError::AlreadyHeld), + Err(e) => Err(LeaseError::Store(e)), + } + } + + /// Steal an expired lease from another replica via CAS. + pub async fn try_steal_expired(&self) -> Result { + let record = self.read().await?.ok_or(LeaseError::NotFound)?; + + let age_ms = now_ms() - record.updated_at_ms; + if age_ms < self.ttl.as_millis() as i64 { + return Err(LeaseError::AlreadyHeld); + } + + let acquired_at_ms = now_ms(); + let payload = LeasePayload { + holder: self.replica_id.clone(), + acquired_at_ms, + }; + let payload_bytes = serde_json::to_vec(&payload) + .map_err(|e| PersistenceError::Encode(e.to_string()))?; + + match self + .store + .put_if( + LEASE_OBJECT_TYPE, + LEASE_SINGLETON_ID, + LEASE_SINGLETON_NAME, + &payload_bytes, + None, + WriteCondition::MatchResourceVersion(record.resource_version), + ) + .await + { + Ok(result) => Ok(LeaseGuard { + resource_version: result.resource_version, + acquired_at_ms, + }), + Err(PersistenceError::Conflict { .. }) => Err(LeaseError::Conflict), + Err(e) => Err(LeaseError::Store(e)), + } + } + + /// Try to acquire a fresh lease; if one already exists and is expired, + /// attempt to steal it. + pub async fn acquire_or_steal(&self) -> Result { + match self.try_acquire().await { + Ok(guard) => Ok(guard), + Err(LeaseError::AlreadyHeld) => self.try_steal_expired().await, + Err(e) => Err(e), + } + } + + /// Renew the lease by CAS-writing the same payload to bump + /// `updated_at_ms` and `resource_version`. + pub async fn renew(&self, guard: &mut LeaseGuard) -> Result<(), LeaseError> { + let payload = LeasePayload { + holder: self.replica_id.clone(), + acquired_at_ms: guard.acquired_at_ms, + }; + let payload_bytes = serde_json::to_vec(&payload) + .map_err(|e| PersistenceError::Encode(e.to_string()))?; + + match self + .store + .put_if( + LEASE_OBJECT_TYPE, + LEASE_SINGLETON_ID, + LEASE_SINGLETON_NAME, + &payload_bytes, + None, + WriteCondition::MatchResourceVersion(guard.resource_version), + ) + .await + { + Ok(result) => { + guard.resource_version = result.resource_version; + Ok(()) + } + Err(PersistenceError::Conflict { .. }) => Err(LeaseError::Conflict), + Err(e) => Err(LeaseError::Store(e)), + } + } + + /// Release the lease so a standby replica can acquire immediately + /// without waiting for TTL expiry. + pub async fn release(&self, guard: LeaseGuard) -> Result<(), LeaseError> { + match self + .store + .delete_if( + LEASE_OBJECT_TYPE, + LEASE_SINGLETON_ID, + guard.resource_version, + ) + .await + { + Ok(_) => Ok(()), + Err(PersistenceError::Conflict { .. }) => Err(LeaseError::Conflict), + Err(e) => Err(LeaseError::Store(e)), + } + } + + /// Read the current lease record, if any. + pub async fn read(&self) -> Result, LeaseError> { + let record = self + .store + .get(LEASE_OBJECT_TYPE, LEASE_SINGLETON_ID) + .await + .map_err(LeaseError::Store)?; + let Some(record) = record else { + return Ok(None); + }; + + let payload: LeasePayload = serde_json::from_slice(&record.payload) + .map_err(|e| PersistenceError::Decode(e.to_string()))?; + + Ok(Some(LeaseRecord { + holder: payload.holder, + acquired_at_ms: payload.acquired_at_ms, + resource_version: record.resource_version, + updated_at_ms: record.updated_at_ms, + })) + } +} + +/// Derive a stable replica identity for lease ownership. +/// +/// Kubernetes sets `HOSTNAME` to the pod name, Docker sets it to the +/// container ID, and systemd units inherit the machine hostname. +/// `OPENSHELL_REPLICA_ID` allows explicit override. The UUID fallback +/// handles edge cases where neither env var is set. +pub fn replica_id() -> String { + std::env::var("HOSTNAME") + .or_else(|_| std::env::var("OPENSHELL_REPLICA_ID")) + .unwrap_or_else(|_| uuid::Uuid::new_v4().to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::persistence::Store; + + async fn test_store() -> Arc { + Arc::new( + Store::connect("sqlite::memory:?cache=shared") + .await + .unwrap(), + ) + } + + fn lease(store: Arc, id: &str, ttl: Duration) -> ReconcilerLease { + ReconcilerLease::new(store, id.to_string(), ttl) + } + + #[tokio::test] + async fn acquire_succeeds_when_no_lease_exists() { + let store = test_store().await; + let l = lease(store, "replica-1", LEASE_TTL); + let guard = l.try_acquire().await.expect("should acquire"); + assert!(guard.resource_version > 0); + + let record = l.read().await.unwrap().expect("lease should exist"); + assert_eq!(record.holder, "replica-1"); + } + + #[tokio::test] + async fn acquire_fails_when_lease_held() { + let store = test_store().await; + let l1 = lease(store.clone(), "replica-1", LEASE_TTL); + let l2 = lease(store, "replica-2", LEASE_TTL); + let _guard = l1.try_acquire().await.unwrap(); + let err = l2.try_acquire().await.unwrap_err(); + assert!(matches!(err, LeaseError::AlreadyHeld)); + } + + #[tokio::test] + async fn concurrent_acquisition_exactly_one_wins() { + let store = test_store().await; + let mut tasks = Vec::new(); + for i in 0..5 { + let s = store.clone(); + tasks.push(tokio::spawn(async move { + let l = lease(s, &format!("replica-{i}"), LEASE_TTL); + l.try_acquire().await + })); + } + + let mut wins = 0; + for task in tasks { + if task.await.unwrap().is_ok() { + wins += 1; + } + } + assert_eq!(wins, 1); + } + + #[tokio::test] + async fn renew_extends_lease() { + let store = test_store().await; + let l = lease(store, "replica-1", LEASE_TTL); + let mut guard = l.try_acquire().await.unwrap(); + let v1 = guard.resource_version; + + l.renew(&mut guard).await.unwrap(); + assert!(guard.resource_version > v1); + + let record = l.read().await.unwrap().unwrap(); + assert_eq!(record.holder, "replica-1"); + assert_eq!(record.resource_version, guard.resource_version); + } + + #[tokio::test] + async fn steal_rejected_when_lease_active() { + let store = test_store().await; + let l1 = lease(store.clone(), "replica-1", LEASE_TTL); + let _guard = l1.try_acquire().await.unwrap(); + + let l2 = lease(store, "replica-2", LEASE_TTL); + let err = l2.try_steal_expired().await.unwrap_err(); + assert!(matches!(err, LeaseError::AlreadyHeld)); + } + + #[tokio::test] + async fn steal_succeeds_when_lease_expired() { + let store = test_store().await; + // Use a 0ms TTL so the lease is immediately expired + let l1 = lease(store.clone(), "replica-1", Duration::ZERO); + let _guard = l1.try_acquire().await.unwrap(); + + let l2 = lease(store, "replica-2", Duration::ZERO); + let guard = l2.try_steal_expired().await.expect("should steal expired"); + let record = l2.read().await.unwrap().unwrap(); + assert_eq!(record.holder, "replica-2"); + assert_eq!(record.resource_version, guard.resource_version); + } + + #[tokio::test] + async fn release_allows_immediate_reacquire() { + let store = test_store().await; + let l1 = lease(store.clone(), "replica-1", LEASE_TTL); + let guard = l1.try_acquire().await.unwrap(); + l1.release(guard).await.unwrap(); + + let l2 = lease(store, "replica-2", LEASE_TTL); + let guard = l2.try_acquire().await.expect("should acquire after release"); + let record = l2.read().await.unwrap().unwrap(); + assert_eq!(record.holder, "replica-2"); + assert_eq!(record.resource_version, guard.resource_version); + } + + #[tokio::test] + async fn acquire_or_steal_creates_when_none_exists() { + let store = test_store().await; + let l = lease(store, "replica-1", LEASE_TTL); + let guard = l.acquire_or_steal().await.expect("should create"); + let record = l.read().await.unwrap().unwrap(); + assert_eq!(record.holder, "replica-1"); + assert_eq!(record.resource_version, guard.resource_version); + } + + #[tokio::test] + async fn acquire_or_steal_steals_expired() { + let store = test_store().await; + let l1 = lease(store.clone(), "replica-1", Duration::ZERO); + let _guard = l1.try_acquire().await.unwrap(); + + let l2 = lease(store, "replica-2", Duration::ZERO); + let guard = l2.acquire_or_steal().await.expect("should steal"); + let record = l2.read().await.unwrap().unwrap(); + assert_eq!(record.holder, "replica-2"); + assert_eq!(record.resource_version, guard.resource_version); + } + + #[tokio::test] + async fn acquire_or_steal_fails_when_active() { + let store = test_store().await; + let l1 = lease(store.clone(), "replica-1", LEASE_TTL); + let _guard = l1.try_acquire().await.unwrap(); + + let l2 = lease(store, "replica-2", LEASE_TTL); + let err = l2.acquire_or_steal().await.unwrap_err(); + assert!(matches!(err, LeaseError::AlreadyHeld)); + } + + #[tokio::test] + async fn read_returns_none_when_no_lease() { + let store = test_store().await; + let l = lease(store, "replica-1", LEASE_TTL); + assert!(l.read().await.unwrap().is_none()); + } + + #[tokio::test] + async fn replica_id_returns_nonempty() { + let id = replica_id(); + assert!(!id.is_empty()); + } +} diff --git a/crates/openshell-server/src/compute/mod.rs b/crates/openshell-server/src/compute/mod.rs index 98dc3fd63..b0d491d81 100644 --- a/crates/openshell-server/src/compute/mod.rs +++ b/crates/openshell-server/src/compute/mod.rs @@ -3,6 +3,7 @@ //! Gateway-owned compute orchestration over a pluggable compute backend. +pub mod lease; pub mod vm; pub use openshell_driver_docker::DockerComputeConfig; @@ -40,10 +41,10 @@ use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, watch}; use tonic::transport::Channel; use tonic::{Code, Request, Status}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; type DriverWatchStream = Pin> + Send>>; type SharedComputeDriver = @@ -555,15 +556,22 @@ impl ComputeRuntime { Ok(deleted) } - pub fn spawn_watchers(&self) { + pub fn spawn_watchers(&self, shutdown_rx: watch::Receiver) { let runtime = Arc::new(self.clone()); - let watch_runtime = runtime.clone(); - tokio::spawn(async move { - watch_runtime.watch_loop().await; - }); - tokio::spawn(async move { - runtime.reconcile_loop().await; - }); + if self.store.is_single_replica() { + let watch_runtime = runtime.clone(); + let watch_shutdown = shutdown_rx.clone(); + tokio::spawn(async move { + watch_runtime.watch_loop(watch_shutdown).await; + }); + tokio::spawn(async move { + runtime.reconcile_loop(shutdown_rx).await; + }); + } else { + tokio::spawn(async move { + runtime.lease_coordinator(shutdown_rx).await; + }); + } } pub async fn cleanup_on_shutdown(&self) -> Result<(), String> { @@ -712,7 +720,103 @@ impl ComputeRuntime { } } - async fn watch_loop(self: Arc) { + async fn lease_coordinator(self: Arc, mut shutdown_rx: watch::Receiver) { + use lease::{ReconcilerLease, replica_id, LEASE_ACQUIRE_INTERVAL, LEASE_TTL}; + + let lease = ReconcilerLease::new(self.store.clone(), replica_id(), LEASE_TTL); + info!(replica = %lease.replica_id(), "reconciler lease coordinator started"); + + loop { + if *shutdown_rx.borrow() { + break; + } + + match lease.acquire_or_steal().await { + Ok(guard) => { + info!(replica = %lease.replica_id(), "acquired reconciler lease"); + self.run_as_holder(&lease, guard, &mut shutdown_rx).await; + } + Err(e) => { + debug!( + replica = %lease.replica_id(), + error = %e, + "reconciler lease acquisition attempt failed" + ); + tokio::select! { + _ = tokio::time::sleep(LEASE_ACQUIRE_INTERVAL) => {} + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + break; + } + } + } + } + } + } + + info!(replica = %lease.replica_id(), "reconciler lease coordinator stopped"); + } + + async fn run_as_holder( + self: &Arc, + lease: &lease::ReconcilerLease, + mut guard: lease::LeaseGuard, + shutdown_rx: &mut watch::Receiver, + ) { + use lease::LEASE_RENEWAL_INTERVAL; + + let (cancel_tx, cancel_rx) = watch::channel(false); + + let runtime = self.clone(); + let watch_cancel = cancel_rx.clone(); + let watch_handle = tokio::spawn(async move { + runtime.watch_loop(watch_cancel).await; + }); + + let runtime = self.clone(); + let reconcile_handle = tokio::spawn(async move { + runtime.reconcile_loop(cancel_rx).await; + }); + + loop { + tokio::select! { + _ = tokio::time::sleep(LEASE_RENEWAL_INTERVAL) => { + match lease.renew(&mut guard).await { + Ok(()) => { + debug!(replica = %lease.replica_id(), "renewed reconciler lease"); + } + Err(e) => { + warn!( + replica = %lease.replica_id(), + error = %e, + "reconciler lease renewal failed — releasing holder role" + ); + break; + } + } + } + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!(replica = %lease.replica_id(), "shutdown — releasing reconciler lease"); + if let Err(e) = lease.release(guard).await { + warn!(error = %e, "failed to release reconciler lease on shutdown"); + } + let _ = cancel_tx.send(true); + let _ = watch_handle.await; + let _ = reconcile_handle.await; + return; + } + } + } + } + + let _ = cancel_tx.send(true); + let _ = watch_handle.await; + let _ = reconcile_handle.await; + info!(replica = %lease.replica_id(), "reconciler lease lost — returning to standby"); + } + + async fn watch_loop(self: Arc, mut cancel: watch::Receiver) { loop { let mut stream = match self .driver @@ -722,40 +826,55 @@ impl ComputeRuntime { Ok(response) => response.into_inner(), Err(err) => { warn!(error = %err, "Compute driver watch stream failed to start"); - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(2)) => {} + _ = cancel.changed() => return, + } continue; } }; let mut restart = false; - while let Some(item) = stream.next().await { - match item { - Ok(event) => { - if let Err(err) = self.apply_watch_event(event).await { - warn!(error = %err, "Failed to apply compute driver event"); + loop { + tokio::select! { + item = stream.next() => { + match item { + Some(Ok(event)) => { + if let Err(err) = self.apply_watch_event(event).await { + warn!(error = %err, "Failed to apply compute driver event"); + } + } + Some(Err(err)) => { + warn!(error = %err, "Compute driver watch stream errored"); + restart = true; + break; + } + None => break, } } - Err(err) => { - warn!(error = %err, "Compute driver watch stream errored"); - restart = true; - break; - } + _ = cancel.changed() => return, } } if !restart { warn!("Compute driver watch stream ended unexpectedly"); } - tokio::time::sleep(Duration::from_secs(2)).await; + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(2)) => {} + _ = cancel.changed() => return, + } } } - async fn reconcile_loop(self: Arc) { + async fn reconcile_loop(self: Arc, mut cancel: watch::Receiver) { loop { if let Err(err) = self.reconcile_store_with_backend(ORPHAN_GRACE_PERIOD).await { warn!(error = %err, "Store reconciliation sweep failed"); } - tokio::time::sleep(RECONCILE_INTERVAL).await; + tokio::select! { + _ = tokio::time::sleep(RECONCILE_INTERVAL) => {} + _ = cancel.changed() => return, + } } } diff --git a/crates/openshell-server/src/lib.rs b/crates/openshell-server/src/lib.rs index b7e145bde..f7ed579ca 100644 --- a/crates/openshell-server/src/lib.rs +++ b/crates/openshell-server/src/lib.rs @@ -330,6 +330,8 @@ pub async fn run_server( let state = Arc::new(state); + let (shutdown_tx, shutdown_rx) = watch::channel(false); + // Resume sandboxes that were stopped during the previous gateway // shutdown so the running compute state matches the persisted store. // Runs before watchers spawn so the watch loop sees the post-resume @@ -338,7 +340,7 @@ pub async fn run_server( warn!(error = %err, "Failed to resume persisted sandboxes during startup"); } - state.compute.spawn_watchers(); + state.compute.spawn_watchers(shutdown_rx.clone()); ssh_sessions::spawn_session_reaper(store.clone(), Duration::from_secs(3600)); supervisor_session::spawn_relay_reaper(state.clone(), Duration::from_secs(30)); provider_refresh::spawn_refresh_worker(state.clone(), Duration::from_secs(60)); @@ -416,7 +418,6 @@ pub async fn run_server( None }; - let (shutdown_tx, shutdown_rx) = watch::channel(false); let mut listener_tasks = Vec::with_capacity(gateway_listeners.len()); let enable_loopback_service_http = config.service_routing.enable_loopback_service_http; for (listener, listen_addr) in gateway_listeners { diff --git a/crates/openshell-server/src/persistence/mod.rs b/crates/openshell-server/src/persistence/mod.rs index 32875a9f9..7a0ae7361 100644 --- a/crates/openshell-server/src/persistence/mod.rs +++ b/crates/openshell-server/src/persistence/mod.rs @@ -137,6 +137,12 @@ pub fn generate_name() -> String { } impl Store { + /// Returns `true` for single-replica backends (SQLite) where no lease + /// coordination is needed, `false` for multi-replica backends (Postgres). + pub fn is_single_replica(&self) -> bool { + matches!(self, Self::Sqlite(_)) + } + /// Connect to a persistence store based on the database URL. pub async fn connect(url: &str) -> CoreResult { if url.starts_with("postgres://") || url.starts_with("postgresql://") {