Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add CRD established signal/helper ([#1167]).

[#1167]: https://github.com/stackabletech/operator-rs/pull/1167

## [0.107.0] - 2026-03-09

### Added
Expand Down
48 changes: 48 additions & 0 deletions crates/stackable-operator/src/utils/signal.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::runtime::wait;
use snafu::{ResultExt, Snafu};
use stackable_shared::time::Duration;
use tokio::{
signal::unix::{SignalKind, signal},
sync::watch,
};

use crate::client::Client;

#[derive(Debug, Snafu)]
#[snafu(display("failed to construct signal watcher"))]
pub struct SignalError {
Expand Down Expand Up @@ -71,3 +76,46 @@ impl SignalWatcher<()> {
Ok(Self { watch_rx })
}
}

pub const DEFAULT_CRD_ESTABLISHED_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug, Snafu)]
pub enum CrdEstablishedError {
#[snafu(display("failed to meet CRD established condition before the timeout elapsed"))]
TimeoutElapsed { source: tokio::time::error::Elapsed },

#[snafu(display("failed to await CRD established condition due to api error"))]
Api { source: kube::runtime::wait::Error },
}

/// Waits for a CRD named `crd_name` to be established before `timeout_duration` (or by default
/// [`DEFAULT_CRD_ESTABLISHED_TIMEOUT`]) is elapsed.
///
/// The same caveats from [`conditions::is_crd_established`](wait::conditions::is_crd_established)
/// apply here as well.
///
/// ### Errors
///
/// This function returns errors either if the timeout elapsed without the condition being met or
/// when the underlying API returned errors (CRD is unknown to the Kubernetes API server or due to
/// missing permissions).
pub async fn crd_established(
client: &Client,
crd_name: &str,
timeout_duration: impl Into<Option<Duration>>,
) -> Result<(), CrdEstablishedError> {
let api: kube::Api<CustomResourceDefinition> = client.get_api(&());
let crd_established =
wait::await_condition(api, crd_name, wait::conditions::is_crd_established());
let _ = tokio::time::timeout(
*timeout_duration
.into()
.unwrap_or(DEFAULT_CRD_ESTABLISHED_TIMEOUT),
crd_established,
)
.await
.context(TimeoutElapsedSnafu)?
.context(ApiSnafu)?;

Ok(())
}
Loading