diff --git a/crates/stackable-operator/CHANGELOG.md b/crates/stackable-operator/CHANGELOG.md index 7d7346a6b..55fa43aec 100644 --- a/crates/stackable-operator/CHANGELOG.md +++ b/crates/stackable-operator/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add CRD established signal/helper ([#1167]). + +[#1167]: https://github.com/stackabletech/operator-rs/pull/1167 + ## [0.107.0] - 2026-03-09 ### Added diff --git a/crates/stackable-operator/src/utils/signal.rs b/crates/stackable-operator/src/utils/signal.rs index 59c68e17f..135952426 100644 --- a/crates/stackable-operator/src/utils/signal.rs +++ b/crates/stackable-operator/src/utils/signal.rs @@ -1,9 +1,14 @@ +use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; +use kube::runtime::wait; use snafu::{ResultExt, Snafu}; +use stackable_shared::time::Duration; use tokio::{ signal::unix::{SignalKind, signal}, sync::watch, }; +use crate::client::Client; + #[derive(Debug, Snafu)] #[snafu(display("failed to construct signal watcher"))] pub struct SignalError { @@ -71,3 +76,46 @@ impl SignalWatcher<()> { Ok(Self { watch_rx }) } } + +pub const DEFAULT_CRD_ESTABLISHED_TIMEOUT: Duration = Duration::from_secs(5); + +#[derive(Debug, Snafu)] +pub enum CrdEstablishedError { + #[snafu(display("failed to meet CRD established condition before the timeout elapsed"))] + TimeoutElapsed { source: tokio::time::error::Elapsed }, + + #[snafu(display("failed to await CRD established condition due to api error"))] + Api { source: kube::runtime::wait::Error }, +} + +/// Waits for a CRD named `crd_name` to be established before `timeout_duration` (or by default +/// [`DEFAULT_CRD_ESTABLISHED_TIMEOUT`]) is elapsed. +/// +/// The same caveats from [`conditions::is_crd_established`](wait::conditions::is_crd_established) +/// apply here as well. +/// +/// ### Errors +/// +/// This function returns errors either if the timeout elapsed without the condition being met or +/// when the underlying API returned errors (CRD is unknown to the Kubernetes API server or due to +/// missing permissions). +pub async fn crd_established( + client: &Client, + crd_name: &str, + timeout_duration: impl Into>, +) -> Result<(), CrdEstablishedError> { + let api: kube::Api = client.get_api(&()); + let crd_established = + wait::await_condition(api, crd_name, wait::conditions::is_crd_established()); + let _ = tokio::time::timeout( + *timeout_duration + .into() + .unwrap_or(DEFAULT_CRD_ESTABLISHED_TIMEOUT), + crd_established, + ) + .await + .context(TimeoutElapsedSnafu)? + .context(ApiSnafu)?; + + Ok(()) +}