From 71a93610d40e2215fbeea11e141116daa67cfeeb Mon Sep 17 00:00:00 2001 From: GatewayJ <18332154+GatewayJ@users.noreply.github.com> Date: Thu, 25 Jun 2026 15:05:14 +0800 Subject: [PATCH] fix(provisioning): preserve policy document semantics --- src/reconcile/provisioning.rs | 530 +++++++++++++++++++++++++--------- 1 file changed, 389 insertions(+), 141 deletions(-) diff --git a/src/reconcile/provisioning.rs b/src/reconcile/provisioning.rs index 293c3f8..cafdf84 100644 --- a/src/reconcile/provisioning.rs +++ b/src/reconcile/provisioning.rs @@ -55,6 +55,31 @@ struct UserCredentials { resource_version: Option, } +struct PolicyDocument { + raw: String, + normalized: String, +} + +#[derive(Debug, PartialEq, Eq)] +enum PolicyReconcileAction { + Ready(&'static str), + Apply(&'static str), + Failed(Reason, &'static str), +} + +impl PolicyDocument { + fn parse(raw: &str) -> Result { + Ok(Self { + raw: raw.to_string(), + normalized: normalize_policy_document(raw)?, + }) + } + + fn hash(&self) -> String { + hash_document(&self.normalized) + } +} + impl ProvisioningRun<'_> { fn previous_policy(&self, name: &str) -> Option<&ProvisioningItemStatus> { self.previous.policies.iter().find(|item| item.name == name) @@ -410,99 +435,105 @@ async fn reconcile_policy( } }; - let desired_hash = hash_document(&document); - let mut item = match live_policies.get(&policy.name) { - Some(live_document) => { - let live_hash = hash_document(live_document); - match previous.and_then(|item| item.last_applied_hash.as_deref()) { - None if live_hash == desired_hash => run.item( - previous, - &policy.name, - ProvisioningItemState::Ready, - Reason::ProvisioningConfigured, - "Existing RustFS policy matches spec and was adopted", - ), - None => run.item( - previous, - &policy.name, - ProvisioningItemState::Failed, - Reason::PolicyConflict, - "Live RustFS policy differs from spec and is not owned by this status", - ), - Some(last_applied_hash) if last_applied_hash == live_hash => { - if live_hash == desired_hash { - run.item( - previous, - &policy.name, - ProvisioningItemState::Ready, - Reason::ProvisioningConfigured, - "RustFS policy already matches spec", - ) - } else { - match apply_policy(client, live_policies, &policy.name, &document).await { - Ok(applied_hash) => { - let mut item = run.item( - previous, - &policy.name, - ProvisioningItemState::Ready, - Reason::ProvisioningConfigured, - "RustFS policy was applied", - ); - item.last_applied_hash = Some(applied_hash); - item - } - Err(message) => run.item( - previous, - &policy.name, - ProvisioningItemState::Failed, - Reason::PolicyApplyFailed, - message, - ), - } - } + let desired_hash = document.hash(); + let live_hash = live_policies + .get(&policy.name) + .map(|live_document| hash_document(live_document)); + let item = match policy_reconcile_action(previous, live_hash.as_deref(), &desired_hash) { + PolicyReconcileAction::Ready(message) => run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + message, + ), + PolicyReconcileAction::Apply(message) => { + match apply_policy(client, live_policies, &policy.name, &document.raw).await { + Ok(applied_hash) => { + let mut item = run.item( + previous, + &policy.name, + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured, + message, + ); + item.last_applied_hash = Some(applied_hash); + item } - Some(_) if live_hash == desired_hash => run.item( - previous, - &policy.name, - ProvisioningItemState::Ready, - Reason::ProvisioningConfigured, - "RustFS policy matches spec", - ), - Some(_) => run.item( + Err(message) => run.item( previous, &policy.name, ProvisioningItemState::Failed, - Reason::PolicyConflict, - "Live RustFS policy changed since the operator last applied it", + Reason::PolicyApplyFailed, + message, ), } } - None => match apply_policy(client, live_policies, &policy.name, &document).await { - Ok(applied_hash) => { - let mut item = run.item( - previous, - &policy.name, - ProvisioningItemState::Ready, - Reason::ProvisioningConfigured, - "RustFS policy was created", - ); - item.last_applied_hash = Some(applied_hash); - item - } - Err(message) => run.item( - previous, - &policy.name, - ProvisioningItemState::Failed, - Reason::PolicyApplyFailed, - message, - ), - }, + PolicyReconcileAction::Failed(reason, message) => run.item( + previous, + &policy.name, + ProvisioningItemState::Failed, + reason, + message, + ), }; + finalize_policy_item_status( + item, + previous, + &policy.name, + desired_hash, + live_policies, + run.tenant.metadata.generation, + ) +} + +fn policy_reconcile_action( + previous: Option<&ProvisioningItemStatus>, + live_hash: Option<&str>, + desired_hash: &str, +) -> PolicyReconcileAction { + let Some(live_hash) = live_hash else { + return PolicyReconcileAction::Apply("RustFS policy was created"); + }; + + match previous.and_then(|item| item.last_applied_hash.as_deref()) { + None if live_hash == desired_hash => { + PolicyReconcileAction::Ready("Existing RustFS policy matches spec and was adopted") + } + None => PolicyReconcileAction::Failed( + Reason::PolicyConflict, + "Live RustFS policy differs from spec and is not owned by this status", + ), + Some(last_applied_hash) if last_applied_hash == live_hash => { + if live_hash == desired_hash { + PolicyReconcileAction::Ready("RustFS policy already matches spec") + } else { + PolicyReconcileAction::Apply("RustFS policy was applied") + } + } + Some(_) if live_hash == desired_hash => { + PolicyReconcileAction::Ready("RustFS policy matches spec") + } + Some(_) => PolicyReconcileAction::Failed( + Reason::PolicyConflict, + "Live RustFS policy changed since the operator last applied it", + ), + } +} + +fn finalize_policy_item_status( + mut item: ProvisioningItemStatus, + previous: Option<&ProvisioningItemStatus>, + policy_name: &str, + desired_hash: String, + live_policies: &BTreeMap, + generation: Option, +) -> ProvisioningItemStatus { item.desired_hash = Some(desired_hash); if item.last_applied_hash.is_none() && item.state == ProvisioningItemState::Ready.as_str() { item.last_applied_hash = live_policies - .get(&policy.name) + .get(policy_name) .map(|live_document| hash_document(live_document)) .or_else(|| item.desired_hash.clone()); } @@ -516,9 +547,7 @@ async fn reconcile_policy( (Some(current), Some(previous_hash)) if current == previous_hash => { previous.and_then(|item| item.last_applied_generation) } - (Some(_), _) if item.state == ProvisioningItemState::Ready.as_str() => { - run.tenant.metadata.generation - } + (Some(_), _) if item.state == ProvisioningItemState::Ready.as_str() => generation, _ => previous.and_then(|item| item.last_applied_generation), }; item @@ -527,7 +556,7 @@ async fn reconcile_policy( async fn load_policy_document( run: &ProvisioningRun<'_>, policy: &ProvisioningPolicy, -) -> Result { +) -> Result { let reference = &policy.document.config_map_key_ref; let config_map: ConfigMap = run.ctx @@ -564,7 +593,7 @@ async fn load_policy_document( ) })?; - normalize_policy_document(raw).map_err(|message| (Reason::PolicyApplyFailed, message)) + PolicyDocument::parse(raw).map_err(|message| (Reason::PolicyApplyFailed, message)) } async fn apply_policy( @@ -1057,66 +1086,66 @@ fn normalize_policy_document(document: &str) -> Result { } fn normalize_policy_value(value: Value) -> Value { - let Some(object) = value.as_object() else { - return value; - }; + match value { + Value::Object(mut object) => { + if object + .get("ID") + .and_then(Value::as_str) + .is_some_and(|id| id.is_empty()) + { + object.remove("ID"); + } - if !object.contains_key("Statement") { - return value; - } + match object.get("Statement").cloned() { + Some(Value::Array(statements)) => { + let mut normalized_statements = statements + .iter() + .map(normalize_policy_statement) + .collect::>(); + normalized_statements.sort_by_key(statement_sort_key); + object.insert("Statement".to_string(), Value::Array(normalized_statements)); + } + Some(statement) => { + object.insert( + "Statement".to_string(), + normalize_policy_statement(&statement), + ); + } + None => {} + } - let mut normalized = serde_json::Map::new(); - if let Some(version) = object.get("Version") { - normalized.insert("Version".to_string(), version.clone()); - } - if let Some(statements) = object.get("Statement").and_then(Value::as_array) { - let mut normalized_statements = statements - .iter() - .map(normalize_policy_statement) - .collect::>(); - normalized_statements.sort_by_key(statement_sort_key); - normalized.insert("Statement".to_string(), Value::Array(normalized_statements)); + Value::Object(object) + } + value => value, } - - Value::Object(normalized) } fn normalize_policy_statement(statement: &Value) -> Value { - let Some(object) = statement.as_object() else { - return statement.clone(); - }; - - let mut normalized = serde_json::Map::new(); - if let Some(effect) = object.get("Effect") { - normalized.insert("Effect".to_string(), effect.clone()); - } - if let Some(action) = object.get("Action") { - normalized.insert( - "Action".to_string(), - normalize_string_or_string_array(action), - ); - } - if let Some(resource) = object.get("Resource") { - normalized.insert( - "Resource".to_string(), - normalize_string_or_string_array(resource), - ); - } - if let Some(sid) = object - .get("Sid") - .and_then(Value::as_str) - .filter(|sid| !sid.is_empty()) - { - normalized.insert("Sid".to_string(), Value::String(sid.to_string())); - } - if let Some(condition) = object - .get("Condition") - .filter(|condition| is_non_empty_json_object(condition)) - { - normalized.insert("Condition".to_string(), condition.clone()); + match statement { + Value::Object(object) => { + let mut normalized = object.clone(); + for key in ["Action", "NotAction", "Resource", "NotResource"] { + if let Some(value) = normalized.get(key).cloned() { + normalized.insert(key.to_string(), normalize_string_or_string_array(&value)); + } + } + if normalized + .get("Sid") + .and_then(Value::as_str) + .is_some_and(|sid| sid.is_empty()) + { + normalized.remove("Sid"); + } + if normalized + .get("Condition") + .is_some_and(is_empty_json_object) + { + normalized.remove("Condition"); + } + Value::Object(normalized) + } + statement => statement.clone(), } - - Value::Object(normalized) } fn normalize_string_or_string_array(value: &Value) -> Value { @@ -1135,8 +1164,8 @@ fn normalize_string_or_string_array(value: &Value) -> Value { } } -fn is_non_empty_json_object(value: &Value) -> bool { - value.as_object().is_some_and(|object| !object.is_empty()) +fn is_empty_json_object(value: &Value) -> bool { + value.as_object().is_some_and(|object| object.is_empty()) } fn statement_sort_key(statement: &Value) -> String { @@ -1175,7 +1204,21 @@ fn reason_from_str(reason: &str) -> Reason { #[cfg(test)] mod tests { use super::*; + use axum::{ + Router, + body::Body, + extract::State, + http::{Request, StatusCode}, + routing::{get, put}, + }; use k8s_openapi::ByteString; + use std::sync::Arc; + use tokio::sync::Mutex; + + #[derive(Clone, Default)] + struct PolicyApplyCapture { + body: Arc>, + } #[test] fn compatible_secret_values_are_trimmed_and_must_match() { @@ -1238,6 +1281,211 @@ mod tests { assert!(hash_document(&normalized).starts_with("sha256:")); } + #[test] + fn policy_document_preserves_raw_write_document() { + let raw = r#"{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["s3:PutObject", "s3:GetObject"], + "Resource": "arn:aws:s3:::app-data/*" + } + ] + }"#; + + let document = PolicyDocument::parse(raw).expect("policy should parse"); + let normalized: Value = + serde_json::from_str(&document.normalized).expect("normalized policy should be JSON"); + + assert_eq!(document.raw, raw); + assert_eq!( + normalized["Statement"][0]["Action"], + serde_json::json!(["s3:GetObject", "s3:PutObject"]) + ); + } + + #[tokio::test] + async fn apply_policy_sends_raw_document_to_rustfs() { + let raw = r#"{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "NotAction": ["s3:PutObject", "s3:GetObject"], + "NotResource": [ + "arn:aws:s3:::app-data/public/*", + "arn:aws:s3:::app-data/private/*" + ] + } + ] + }"#; + let capture = PolicyApplyCapture::default(); + let route_capture = capture.clone(); + let live_document = raw.to_string(); + let router = Router::new() + .route( + "/rustfs/admin/v3/add-canned-policy", + put( + move |State(c): State, req: Request| async move { + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) + .await + .expect("request body should be readable"); + *c.body.lock().await = + String::from_utf8(body_bytes.to_vec()).expect("body should be UTF-8"); + + StatusCode::OK + }, + ), + ) + .route( + "/rustfs/admin/v3/info-canned-policy", + get(move || { + let live_document = live_document.clone(); + async move { live_document } + }), + ) + .with_state(route_capture); + let listener = tokio::net::TcpListener::bind(("127.0.0.1", 0)) + .await + .expect("test server should bind"); + let addr = listener.local_addr().expect("listener should have address"); + let server = tokio::spawn(async move { + axum::serve(listener, router) + .await + .expect("test server should serve") + }); + let client = + RustfsAdminClient::new_with_base_url(format!("http://{addr}"), "access", "secret"); + let mut live_policies = BTreeMap::new(); + + let applied_hash = apply_policy(&client, &mut live_policies, "tenant-policy", raw) + .await + .expect("policy should apply"); + + assert_eq!(&*capture.body.lock().await, raw); + assert_eq!( + applied_hash, + hash_document( + live_policies + .get("tenant-policy") + .expect("live policy should be cached") + ) + ); + server.abort(); + } + + #[test] + fn policy_document_normalization_preserves_deny_fields() { + let normalized = normalize_policy_document( + r#"{ + "ID": "policy-id", + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "deny-selected", + "Effect": "Deny", + "NotAction": ["s3:PutObject", "s3:GetObject"], + "NotResource": [ + "arn:aws:s3:::app-data/public/*", + "arn:aws:s3:::app-data/private/*" + ], + "Condition": { + "StringLike": { + "s3:prefix": ["private/*"] + } + }, + "Principal": "*" + } + ] + }"#, + ) + .expect("policy should normalize"); + let normalized: Value = + serde_json::from_str(&normalized).expect("normalized policy should be JSON"); + let statement = &normalized["Statement"][0]; + + assert_eq!(normalized["ID"], "policy-id"); + assert_eq!(statement["Sid"], "deny-selected"); + assert_eq!( + statement["NotAction"], + serde_json::json!(["s3:GetObject", "s3:PutObject"]) + ); + assert_eq!( + statement["NotResource"], + serde_json::json!([ + "arn:aws:s3:::app-data/private/*", + "arn:aws:s3:::app-data/public/*" + ]) + ); + assert_eq!(statement["Principal"], "*"); + assert!(statement["Condition"].is_object()); + } + + #[test] + fn stale_policy_status_is_ready_when_live_policy_matches_desired() { + let mut previous = ProvisioningItemStatus::new( + "app-policy", + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured.as_str(), + ); + previous.last_applied_hash = Some("sha256:old".to_string()); + + let action = policy_reconcile_action(Some(&previous), Some("sha256:new"), "sha256:new"); + + assert_eq!( + action, + PolicyReconcileAction::Ready("RustFS policy matches spec") + ); + } + + #[test] + fn stale_policy_status_updates_last_applied_metadata_when_live_matches_desired() { + let live_document = normalize_policy_document( + r#"{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": ["s3:GetObject"], + "Resource": ["arn:aws:s3:::app-data/*"] + } + ] + }"#, + ) + .expect("policy should normalize"); + let desired_hash = hash_document(&live_document); + let live_policies = BTreeMap::from([("app-policy".to_string(), live_document)]); + let mut previous = ProvisioningItemStatus::new( + "app-policy", + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured.as_str(), + ); + previous.last_applied_hash = Some("sha256:old".to_string()); + previous.last_applied_generation = Some(7); + let item = ProvisioningItemStatus::new( + "app-policy", + ProvisioningItemState::Ready, + Reason::ProvisioningConfigured.as_str(), + ); + + let item = finalize_policy_item_status( + item, + Some(&previous), + "app-policy", + desired_hash.clone(), + &live_policies, + Some(8), + ); + + assert_eq!(item.desired_hash.as_deref(), Some(desired_hash.as_str())); + assert_eq!( + item.last_applied_hash.as_deref(), + Some(desired_hash.as_str()) + ); + assert_eq!(item.last_applied_generation, Some(8)); + } + #[test] fn rustfs_server_policy_matches_configmap_spec() { let spec = r#"{