diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index a237c85..c515331 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -63,6 +63,12 @@ jobs: test-ps-all-missing needs_non_pg_snapshot: false + - name: redaction + namespaces: >- + test-redaction + needs_non_pg_snapshot: false + needs_pg18_snapshot: true + steps: - uses: actions/checkout@v6 @@ -164,6 +170,11 @@ jobs: load_image postgres:16-alpine load_image alpine:latest + if [ "${{ matrix.needs_pg18_snapshot }}" = "true" ]; then + load_image postgres:18 + load_image nginx:alpine + fi + - name: Deploy MinIO run: | kubectl apply -f tests/fixtures/minio.yaml @@ -201,6 +212,34 @@ jobs: kubectl logs job/setup-kopia-repo --all-containers --prefix echo "--- Kopia repository ready ---" + - name: Set up PG-18 kopia snapshot + if: matrix.needs_pg18_snapshot + run: | + kubectl apply -f tests/fixtures/setup-kopia-repo-pg18.yaml + for i in $(seq 1 60); do + STATUS=$(kubectl get job/setup-kopia-repo-pg18 -o jsonpath='{.status.conditions[?(@.type=="Complete")].status}' 2>/dev/null) + FAILED=$(kubectl get job/setup-kopia-repo-pg18 -o jsonpath='{.status.conditions[?(@.type=="Failed")].status}' 2>/dev/null) + if [ "$STATUS" = "True" ]; then + echo "PG-18 setup job completed successfully" + break + fi + if [ "$FAILED" = "True" ]; then + echo "PG-18 setup job failed!" + kubectl describe job/setup-kopia-repo-pg18 + kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix + exit 1 + fi + if [ "$i" = "60" ]; then + echo "PG-18 setup job timed out after 300s" + kubectl describe job/setup-kopia-repo-pg18 + kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix 2>/dev/null || true + exit 1 + fi + sleep 5 + done + echo "--- PG-18 setup job logs ---" + kubectl logs job/setup-kopia-repo-pg18 --all-containers --prefix + - name: Set up non-postgres kopia snapshot if: matrix.needs_non_pg_snapshot run: | diff --git a/README.md b/README.md index cbd8bb2..d8d8dbb 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ Defines a continuously-refreshed replica of a PostgreSQL database restored from | `postgresExtraConfig` | `string` | No | — | Extra lines appended to `postgresql.conf` (e.g. `shared_preload_libraries`). | | `notifications` | `[]NotificationConfig` | No | `[]` | Notification targets called on restore events. | | `persistentSchemas` | `[]string` | No | — | List of schema names to migrate from the previous restore to the new restore on each switchover. | +| `redaction` | `RedactionSpec` | No | — | If set, apply a Tamanu/dbt-shaped masking manifest to the restored data via the `postgresql_anonymizer` extension before switchover. Requires PostgreSQL 18+. | The cron expression is parsed using the [cronexpr](https://docs.rs/cronexpr) crate. It has two interesting features: @@ -114,6 +115,35 @@ The jitter is a random duration between -time/2 and +time/2. For example, `10m` will result in a jitter between -5m and 5m. When using `H` in the cron expression, you might want to set the jitter to zero to properly take advantage of the spread-but-stable behaviour. +#### RedactionSpec + +Configures applying a column-masking manifest to the restored data using the [postgresql_anonymizer](https://gitlab.com/dalibo/postgresql_anonymizer) extension. +The manifest follows the [Tamanu masking spec](https://github.com/beyondessential/tamanu/tree/main/database#masking) — any dbt project that publishes the same `meta.masking` annotation shape can be pointed at. + +| Field | Type | Required | Default | Description | +|-------|------|----------|---------|-------------| +| `manifestUrl` | `string` | Yes | — | HTTP(S) URL of the masking manifest. May contain a literal `{version}` placeholder. | +| `version` | `string` | No | — | Pinned version substituted into `{version}`. Mutually exclusive with `versionQuery`. | +| `versionQuery` | `string` | No | — | SQL query that returns one row, one text column with the version string. Run as the operator's superuser against the restore. Mutually exclusive with `version`. | +| `versionFallbackToBase` | `bool` | No | `false` | If the manifest URL with the discovered/pinned version 404s, retry with the `major.minor.0` base version. | + +Example (Tamanu): + +```yaml +spec: + redaction: + manifestUrl: "https://docs.data.bes.au/tamanu/v{version}/manifest.json" + versionQuery: "SELECT value FROM local_system_facts WHERE key = 'currentVersion'" + versionFallbackToBase: true +``` + +Notes: + +- Works on any PostgreSQL major the operator otherwise supports. There's no PG-version gate because the prelude apt-installs `postgresql_anonymizer_$N` from [Dalibo Labs](https://apt.dalibo.org/labs/) per the running restore's PG version and copies the files into the standard system extension dirs (`/usr/share/postgresql/$N/extension`, `/usr/lib/postgresql/$N/lib`). +- The download is cached on the restore PVC at `/pgdata/.anon-cache/`, so a pod restart doesn't re-fetch the package — it just re-copies the cached files into the (fresh) container writable layer. +- The postgres container runs as root for the prelude (to apt-install and write to system paths) then drops back to UID 999 via `gosu` before exec'ing `postgres`. `gosu` is preinstalled in the official `postgres` image. +- During redaction the database is writable; once anonymisation completes, the operator sets `default_transaction_read_only = on` at the database level and demotes the analytics user back to non-superuser when `spec.readOnly` is true. + #### SnapshotFilter | Field | Type | Required | Description | @@ -165,6 +195,9 @@ Additional fields for `target: graphQL`: | `schemaMigrationJob` | `string` | Name of the active schema migration Job (set while migration is in progress). | | `schemaMigrationPhase` | `string` | Phase of the schema migration (`active`, `complete`, or `failed: `). | | `persistentSchemaDataSize` | `Quantity` | Measured size of persistent schema data from the last successful migration. Used to size the next restore PVC. | +| `redactionPhase` | `string` | Phase of the current restore's redaction (`active`, `complete`, `partial`, or `failed: `). `partial` means anonymisation ran but some per-column SECURITY LABEL statements were tolerated as errors (e.g. column missing on this DB version). `failed:` is sticky — it doesn't auto-retry; the next scheduled restore clears it. | +| `redactionVersion` | `string` | The manifest version resolved during the last redaction run (when `manifestUrl` is version-templated). | +| `redactionColumnsApplied` | `uint32` | Number of columns the last redaction run attempted to mask. | | `consecutiveRestoreFailures` | `uint32` | Number of consecutive restore failures. Reset to 0 on success. After 3 consecutive failures the operator stops scheduling new restores until the counter is reset (automatically on next successful restore, or manually via `kubectl patch --subresource=status`). | --- diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 8bf9a8c..b42a576 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -36,6 +36,7 @@ use crate::{ }; use scheduling::ScheduleDecision; +mod redaction; mod resources; mod scheduling; mod schema_migration; @@ -162,6 +163,19 @@ pub async fn reconcile(replica: Arc, ctx: Arc) ) }); + // Handle redaction before schema migration: if redaction is set, + // it rewrites the data in place, and any persistent_schemas migration + // pulls from the (already-redacted) source tables. + if replica.spec.redaction.is_some() + && let Some(switching) = switching_restore + { + let redaction_settled = + redaction::reconcile_redaction_step(&ctx, &replica, switching).await?; + if !redaction_settled { + return Ok(Action::requeue(Duration::from_secs(30))); + } + } + // Handle schema migration for persistent_schemas configuration if replica.spec.persistent_schemas.is_some() && let Some(switching) = switching_restore @@ -306,6 +320,16 @@ pub async fn reconcile(replica: Arc, ctx: Arc) true }; + let redaction_settled = if replica.spec.redaction.is_some() { + let phase = replica + .status + .as_ref() + .and_then(|s| s.redaction_phase.as_deref()); + matches!(phase, None | Some("complete") | Some("partial")) + } else { + true + }; + let grace_period = SignedDuration::try_from(replica.spec.switchover_grace_period.0).unwrap_or_default(); let last_completed = replica @@ -325,6 +349,7 @@ pub async fn reconcile(replica: Arc, ctx: Arc) }); if migration_complete + && redaction_settled && has_matching_current && let Some(completed_at) = last_completed && now.duration_since(completed_at.0) > grace_period @@ -368,6 +393,9 @@ pub async fn reconcile(replica: Arc, ctx: Arc) "previousRestore": null, "schemaMigrationJob": null, "schemaMigrationPhase": null, + "redactionPhase": null, + "redactionVersion": null, + "redactionColumnsApplied": null, } }); replicas diff --git a/src/controllers/replica/redaction.rs b/src/controllers/replica/redaction.rs new file mode 100644 index 0000000..b77d922 --- /dev/null +++ b/src/controllers/replica/redaction.rs @@ -0,0 +1,377 @@ +//! Replica redaction: fetch a Tamanu/dbt masking manifest and apply it to +//! a freshly-restored Postgres database using the `postgresql_anonymizer` +//! extension. +//! +//! See `docs/plans/replica-redaction.md` for the full design. + +use k8s_openapi::api::core::v1::Secret; +use kube::{ + Api, ResourceExt as _, + api::{Patch, PatchParams}, +}; +use tracing::{debug, info, warn}; + +use crate::context::Context; +use crate::controllers::postgres::{ + self, PgConnection, discover_restore_database, read_secret_field, +}; +use crate::error::{Error, Result}; +use crate::types::{PostgresPhysicalReplica, PostgresPhysicalRestore, RedactionSpec}; + +use self::manifest::{Manifest, base_version, parse_manifest}; + +pub use self::apply::Outcome; + +mod apply; +pub mod manifest; +pub mod mask; + +const VERSION_PLACEHOLDER: &str = "{version}"; + +/// Reconciler entry point: runs redaction against `switching` if the +/// replica has a redaction spec and the current `redactionPhase` is not +/// already `complete` / `partial` / `failed: …`. Returns `true` when the +/// redaction is settled (complete, partial, or failed — anything that +/// won't change on the next reconcile), `false` when more work is +/// pending and the controller should requeue. +pub async fn reconcile_redaction_step( + ctx: &Context, + replica: &PostgresPhysicalReplica, + switching: &PostgresPhysicalRestore, +) -> Result { + let replica_name = replica.name_any(); + let namespace = replica.namespace().expect("replica is namespaced"); + let phase = replica + .status + .as_ref() + .and_then(|s| s.redaction_phase.as_deref()); + + match phase { + Some("complete") | Some("partial") => return Ok(true), + // `failed: …` is sticky: don't auto-retry. The user clears the + // phase by triggering a new restore (the sweep resets it) or + // editing status manually. Treat it as settled so the + // switchover branch can run if the operator decides to proceed + // without redaction — but `false` here means "redaction is not + // healthy, do not let the switchover proceed". + Some(p) if p.starts_with("failed:") => return Ok(false), + _ => {} + } + + if phase != Some("active") { + patch_phase_only(ctx, &replica_name, &namespace, "active").await?; + } + + let switching_name = switching.name_any(); + match reconcile_redaction(ctx, replica, &switching_name).await { + Ok((version, outcome)) => { + let phase = if outcome.is_partial() { + "partial" + } else { + "complete" + }; + info!( + replica = %replica_name, + restore = %switching_name, + phase, + columns_attempted = outcome.columns_attempted, + columns_failed = outcome.columns_failed, + tables_attempted = outcome.tables_attempted, + tables_failed = outcome.tables_failed, + "redaction finished" + ); + patch_settled( + ctx, + &replica_name, + &namespace, + phase, + version.as_deref(), + outcome.columns_attempted, + ) + .await?; + Ok(true) + } + Err(e) => { + let msg = format!("failed: {e}"); + warn!(replica = %replica_name, error = %e, "redaction failed"); + patch_phase_only(ctx, &replica_name, &namespace, &msg).await?; + Ok(false) + } + } +} + +async fn patch_phase_only( + ctx: &Context, + replica_name: &str, + namespace: &str, + phase: &str, +) -> Result<()> { + let replicas: Api = Api::namespaced(ctx.client.clone(), namespace); + let patch = serde_json::json!({ "status": { "redactionPhase": phase } }); + replicas + .patch_status( + replica_name, + &PatchParams::apply("postgres-restore-operator"), + &Patch::Merge(&patch), + ) + .await?; + Ok(()) +} + +async fn patch_settled( + ctx: &Context, + replica_name: &str, + namespace: &str, + phase: &str, + version: Option<&str>, + columns_applied: u32, +) -> Result<()> { + let replicas: Api = Api::namespaced(ctx.client.clone(), namespace); + let patch = serde_json::json!({ + "status": { + "redactionPhase": phase, + "redactionVersion": version, + "redactionColumnsApplied": columns_applied, + } + }); + replicas + .patch_status( + replica_name, + &PatchParams::apply("postgres-restore-operator"), + &Patch::Merge(&patch), + ) + .await?; + Ok(()) +} + +/// Run the full redaction step against the given restore. +/// +/// Returns the resolved manifest version (if any) and the apply +/// [`Outcome`]. Errors abort the step and let the reconciler retry on +/// the next pass; per-statement issues during apply are tolerated and +/// surface as `outcome.is_partial()`. +pub async fn reconcile_redaction( + ctx: &Context, + replica: &PostgresPhysicalReplica, + restore_name: &str, +) -> Result<(Option, Outcome)> { + let spec = replica + .spec + .redaction + .as_ref() + .expect("reconcile_redaction called with no spec.redaction"); + + validate_spec(spec)?; + + let namespace = replica.namespace().expect("replica is namespaced"); + + let creds_name = replica.creds_secret_name(); + let secrets: Api = Api::namespaced(ctx.client.clone(), &namespace); + let creds = secrets.get(&creds_name).await?; + let user = read_secret_field(&creds, "username")?; + let password = read_secret_field(&creds, "password")?; + + let dbname = discover_restore_database( + &ctx.client, + &namespace, + restore_name, + &user, + &password, + ctx.use_port_forward(), + ) + .await?; + + let conn = postgres::connect_to_restore( + &ctx.client, + &namespace, + restore_name, + &dbname, + &user, + &password, + ctx.use_port_forward(), + ) + .await?; + + let version = resolve_version(spec, &conn).await?; + let resolved_url = resolve_url(spec, version.as_deref())?; + + info!( + replica = %replica.name_any(), + restore = %restore_name, + url = %resolved_url, + "fetching redaction manifest" + ); + + let manifest = fetch_manifest(ctx, spec, version.as_deref(), &resolved_url).await?; + + info!( + columns = manifest.columns.len(), + tables = manifest.tables.len(), + "manifest parsed" + ); + + let outcome = apply::apply(&conn, &manifest).await?; + + if replica.spec.read_only { + debug!( + replica = %replica.name_any(), + "re-enabling read-only on redacted database" + ); + apply::enforce_read_only(&conn, &dbname, &replica.spec.analytics_username).await?; + } + + Ok((version, outcome)) +} + +fn validate_spec(spec: &RedactionSpec) -> Result<()> { + let templated = spec.manifest_url.contains(VERSION_PLACEHOLDER); + let has_literal = spec.version.is_some(); + let has_query = spec.version_query.is_some(); + + if has_literal && has_query { + return Err(Error::Redaction( + "redaction spec: `version` and `versionQuery` are mutually exclusive".into(), + )); + } + if templated && !(has_literal || has_query) { + return Err(Error::Redaction( + "redaction spec: `manifestUrl` contains `{version}` but no `version` or `versionQuery` provided".into(), + )); + } + if !templated && (has_literal || has_query) { + return Err(Error::Redaction( + "redaction spec: `version`/`versionQuery` set but `manifestUrl` has no `{version}` placeholder".into(), + )); + } + Ok(()) +} + +async fn resolve_version(spec: &RedactionSpec, conn: &PgConnection) -> Result> { + if let Some(v) = spec.version.clone() { + return Ok(Some(v)); + } + let Some(query) = spec.version_query.as_deref() else { + return Ok(None); + }; + + let rows = conn + .client + .simple_query(query) + .await + .map_err(|e| Error::Redaction(format!("versionQuery failed: {e}")))?; + + for msg in rows { + if let tokio_postgres::SimpleQueryMessage::Row(row) = msg { + let value = row + .get(0) + .ok_or_else(|| Error::Redaction("versionQuery returned no columns".into()))?; + return Ok(Some(value.to_string())); + } + } + Err(Error::Redaction("versionQuery returned no rows".into())) +} + +fn resolve_url(spec: &RedactionSpec, version: Option<&str>) -> Result { + match version { + Some(v) => Ok(spec.manifest_url.replace(VERSION_PLACEHOLDER, v)), + None => Ok(spec.manifest_url.clone()), + } +} + +async fn fetch_manifest( + ctx: &Context, + spec: &RedactionSpec, + version: Option<&str>, + url: &str, +) -> Result { + let resp = ctx.http_client.get(url).send().await?; + let status = resp.status(); + + if status == reqwest::StatusCode::NOT_FOUND + && spec.version_fallback_to_base + && let Some(v) = version + && let Some(base) = base_version(v) + { + let base_url = spec.manifest_url.replace(VERSION_PLACEHOLDER, &base); + warn!( + version = %v, + base = %base, + "manifest 404, retrying with base version" + ); + debug!(url = %base_url, "fetching redaction manifest (base)"); + let base_resp = ctx.http_client.get(&base_url).send().await?; + let base_resp = base_resp.error_for_status()?; + let body = base_resp.text().await?; + return parse_manifest(&body).map_err(Into::into); + } + + let resp = resp.error_for_status()?; + let body = resp.text().await?; + parse_manifest(&body).map_err(Into::into) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn spec(url: &str, ver: Option<&str>, vq: Option<&str>) -> RedactionSpec { + RedactionSpec { + manifest_url: url.into(), + version: ver.map(str::to_string), + version_query: vq.map(str::to_string), + version_fallback_to_base: false, + } + } + + #[test] + fn validate_accepts_static_url() { + assert!(validate_spec(&spec("https://x/m.json", None, None)).is_ok()); + } + + #[test] + fn validate_accepts_templated_with_literal_version() { + assert!(validate_spec(&spec("https://x/v{version}.json", Some("1.0.0"), None)).is_ok()); + } + + #[test] + fn validate_accepts_templated_with_query() { + assert!(validate_spec(&spec("https://x/v{version}.json", None, Some("SELECT 1"))).is_ok()); + } + + #[test] + fn validate_rejects_templated_without_version() { + assert!(validate_spec(&spec("https://x/v{version}.json", None, None)).is_err()); + } + + #[test] + fn validate_rejects_static_with_version() { + assert!(validate_spec(&spec("https://x/m.json", Some("1.0.0"), None)).is_err()); + } + + #[test] + fn validate_rejects_both_version_and_query() { + assert!( + validate_spec(&spec( + "https://x/v{version}.json", + Some("1.0.0"), + Some("SELECT 1") + )) + .is_err() + ); + } + + #[test] + fn resolve_url_substitutes_version() { + let s = spec("https://x/v{version}/m.json", Some("2.41.0"), None); + assert_eq!( + resolve_url(&s, Some("2.41.0")).unwrap(), + "https://x/v2.41.0/m.json" + ); + } + + #[test] + fn resolve_url_passes_through_when_no_version() { + let s = spec("https://x/m.json", None, None); + assert_eq!(resolve_url(&s, None).unwrap(), "https://x/m.json"); + } +} diff --git a/src/controllers/replica/redaction/apply.rs b/src/controllers/replica/redaction/apply.rs new file mode 100644 index 0000000..e4e96da --- /dev/null +++ b/src/controllers/replica/redaction/apply.rs @@ -0,0 +1,283 @@ +//! Apply parsed [`Manifest`] entries against a live restore database. +//! +//! This is the only file in the `redaction` module that talks to a real +//! Postgres. It is invoked by the reconciler after a restore reaches the +//! `Ready` phase and before the switchover branch. + +use std::collections::HashMap; + +use tokio_postgres::types::Type; +use tracing::{info, warn}; + +use super::manifest::Manifest; +use super::mask::{ColumnInfo, ColumnMask, fragment_for}; +use crate::controllers::postgres::{PgConnection, quote_ident}; +use crate::error::{Error, Result}; + +#[derive(Debug, Default)] +pub struct Outcome { + pub columns_attempted: u32, + pub columns_failed: u32, + pub tables_attempted: u32, + pub tables_failed: u32, +} + +impl Outcome { + pub fn is_partial(&self) -> bool { + self.columns_failed > 0 || self.tables_failed > 0 + } +} + +/// Apply a manifest against the live database that `conn` is attached to. +/// +/// The connection must be made as a superuser (CREATE EXTENSION, SECURITY +/// LABEL, TRUNCATE and `anon.anonymize_database()` all require it). +pub async fn apply(conn: &PgConnection, manifest: &Manifest) -> Result { + let mut outcome = Outcome::default(); + + conn.client + .simple_query("CREATE EXTENSION IF NOT EXISTS anon CASCADE") + .await + .map_err(|e| Error::Redaction(format!("CREATE EXTENSION anon failed: {e}")))?; + + conn.client + .simple_query("SELECT anon.init()") + .await + .map_err(|e| Error::Redaction(format!("anon.init() failed: {e}")))?; + + for table in &manifest.tables { + outcome.tables_attempted += 1; + if table.kind != "truncate" { + warn!( + schema = %table.schema, + table = %table.table, + kind = %table.kind, + "unsupported table-level mask kind, skipping" + ); + outcome.tables_failed += 1; + continue; + } + let stmt = format!( + "TRUNCATE TABLE {}.{} CASCADE", + quote_ident(&table.schema), + quote_ident(&table.table) + ); + if let Err(e) = conn.client.simple_query(&stmt).await { + warn!( + schema = %table.schema, + table = %table.table, + error = %e, + "table truncate failed, continuing" + ); + outcome.tables_failed += 1; + } + } + + let column_infos = lookup_column_infos(conn, &manifest.columns).await?; + + for mask in &manifest.columns { + outcome.columns_attempted += 1; + let info = column_infos.get(&col_key(mask)); + if info.is_none() { + warn!( + schema = %mask.schema, + table = %mask.table, + column = %mask.column, + "column not present in restore, skipping" + ); + outcome.columns_failed += 1; + continue; + } + let fragment = match fragment_for(mask, info) { + Ok(f) => f, + Err(reason) => { + warn!( + schema = %mask.schema, + table = %mask.table, + column = %mask.column, + kind = %mask.kind, + %reason, + "could not build mask fragment, skipping" + ); + outcome.columns_failed += 1; + continue; + } + }; + + let label = format!( + "SECURITY LABEL FOR anon ON COLUMN {}.{}.{} IS {}", + quote_ident(&mask.schema), + quote_ident(&mask.table), + quote_ident(&mask.column), + quote_sql_literal(&fragment.render()), + ); + if let Err(e) = conn.client.simple_query(&label).await { + warn!( + schema = %mask.schema, + table = %mask.table, + column = %mask.column, + error = %e, + "SECURITY LABEL failed, continuing" + ); + outcome.columns_failed += 1; + } + } + + info!( + columns = manifest.columns.len(), + tables = manifest.tables.len(), + failed_columns = outcome.columns_failed, + failed_tables = outcome.tables_failed, + "running anon.anonymize_database()" + ); + + conn.client + .simple_query("SELECT anon.anonymize_database()") + .await + .map_err(|e| Error::Redaction(format!("anon.anonymize_database() failed: {e}")))?; + + Ok(outcome) +} + +/// Lock the freshly-redacted database back to read-only by: +/// - setting the DB-level `default_transaction_read_only` GUC, and +/// - demoting the analytics user back to NOSUPERUSER + granting +/// `pg_read_all_data` (matching the role posture the restore init +/// script applies when `effective_read_only` is true). +pub async fn enforce_read_only( + conn: &PgConnection, + dbname: &str, + analytics_user: &str, +) -> Result<()> { + let alter_db = format!( + "ALTER DATABASE {} SET default_transaction_read_only = on", + quote_ident(dbname), + ); + conn.client + .simple_query(&alter_db) + .await + .map_err(|e| Error::Redaction(format!("ALTER DATABASE for read-only failed: {e}")))?; + + let demote = format!( + "ALTER ROLE {user} WITH NOSUPERUSER", + user = quote_ident(analytics_user), + ); + conn.client + .simple_query(&demote) + .await + .map_err(|e| Error::Redaction(format!("demoting analytics user failed: {e}")))?; + + let grant = format!( + "GRANT pg_read_all_data TO {user}", + user = quote_ident(analytics_user), + ); + conn.client + .simple_query(&grant) + .await + .map_err(|e| Error::Redaction(format!("granting pg_read_all_data failed: {e}")))?; + + Ok(()) +} + +/// Key used to join `ColumnMask` with the `information_schema` results. +fn col_key(m: &ColumnMask) -> (String, String, String) { + (m.schema.clone(), m.table.clone(), m.column.clone()) +} + +/// Look up `data_type`, `is_nullable`, and `column_default` for every +/// masked column in a single batch query. Columns absent from the +/// restore's schema simply don't appear in the returned map. +async fn lookup_column_infos( + conn: &PgConnection, + masks: &[ColumnMask], +) -> Result> { + let mut out = HashMap::new(); + if masks.is_empty() { + return Ok(out); + } + + let schemas: Vec = masks.iter().map(|m| m.schema.clone()).collect(); + let tables: Vec = masks.iter().map(|m| m.table.clone()).collect(); + let columns: Vec = masks.iter().map(|m| m.column.clone()).collect(); + + let stmt = " + SELECT c.table_schema, c.table_name, c.column_name, + c.data_type, c.is_nullable, + pg_get_expr(d.adbin, d.adrelid) AS column_default + FROM information_schema.columns c + LEFT JOIN pg_catalog.pg_attribute a + ON a.attrelid = (quote_ident(c.table_schema) || '.' || quote_ident(c.table_name))::regclass + AND a.attname = c.column_name + AND NOT a.attisdropped + LEFT JOIN pg_catalog.pg_attrdef d + ON d.adrelid = a.attrelid AND d.adnum = a.attnum + WHERE (c.table_schema, c.table_name, c.column_name) + IN (SELECT s, t, col + FROM UNNEST($1::text[], $2::text[], $3::text[]) + AS u(s, t, col)) + "; + + let rows = conn + .client + .query_typed( + stmt, + &[ + (&schemas, Type::TEXT_ARRAY), + (&tables, Type::TEXT_ARRAY), + (&columns, Type::TEXT_ARRAY), + ], + ) + .await?; + + for row in rows { + let schema: String = row.get("table_schema"); + let table: String = row.get("table_name"); + let column: String = row.get("column_name"); + let data_type: String = row.get("data_type"); + let nullable: String = row.get("is_nullable"); + let default: Option = row.get("column_default"); + + out.insert( + (schema, table, column), + ColumnInfo { + data_type, + is_nullable: nullable == "YES", + column_default: default, + }, + ); + } + + Ok(out) +} + +/// Quote a string for inclusion as a SQL literal (single-quoted). +fn quote_sql_literal(s: &str) -> String { + let escaped = s.replace('\'', "''"); + format!("'{escaped}'") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn quote_sql_literal_escapes_single_quotes() { + assert_eq!(quote_sql_literal("ab'c"), "'ab''c'"); + } + + #[test] + fn quote_sql_literal_wraps_normal_text() { + assert_eq!(quote_sql_literal("hello"), "'hello'"); + } + + #[test] + fn outcome_is_partial_when_anything_failed() { + let mut o = Outcome::default(); + assert!(!o.is_partial()); + o.columns_failed = 1; + assert!(o.is_partial()); + o.columns_failed = 0; + o.tables_failed = 1; + assert!(o.is_partial()); + } +} diff --git a/src/controllers/replica/redaction/manifest.rs b/src/controllers/replica/redaction/manifest.rs new file mode 100644 index 0000000..45f72bb --- /dev/null +++ b/src/controllers/replica/redaction/manifest.rs @@ -0,0 +1,291 @@ +//! Parse a Tamanu/dbt manifest document into [`ColumnMask`] and +//! [`TableMask`] entries. The shape of the document is defined at +//! . + +use serde_json::Value; +use tracing::warn; + +use super::mask::{ColumnMask, TableMask, parse_range}; + +#[derive(Debug, Default)] +pub struct Manifest { + pub columns: Vec, + pub tables: Vec, +} + +/// Parse a manifest JSON string. Sources missing `schema` or `name` are +/// skipped (warning logged). Column entries with unrecognised mask shapes +/// are kept (carrying the verbatim kind string) so the apply phase can +/// count them as tolerated errors with useful context. +pub fn parse_manifest(json: &str) -> Result { + let doc: Value = serde_json::from_str(json)?; + let mut out = Manifest::default(); + + let Some(sources) = doc.get("sources").and_then(Value::as_object) else { + return Ok(out); + }; + + for (source_id, source) in sources { + let Some(schema) = source.get("schema").and_then(Value::as_str) else { + warn!( + source = source_id, + "manifest source has no `schema`, skipping" + ); + continue; + }; + let Some(name) = source.get("name").and_then(Value::as_str) else { + warn!( + source = source_id, + "manifest source has no `name`, skipping" + ); + continue; + }; + + if let Some(mask) = meta_masking(source) + && let Some(kind) = mask_kind(&mask) + { + out.tables.push(TableMask { + schema: schema.into(), + table: name.into(), + kind: kind.into(), + }); + } + + let Some(columns) = source.get("columns").and_then(Value::as_object) else { + continue; + }; + + for (col_name, col) in columns { + let Some(mask) = meta_masking(col) else { + continue; + }; + let Some(kind) = mask_kind(&mask) else { + warn!( + source = source_id, + column = col_name, + "manifest masking has no `kind`, skipping" + ); + continue; + }; + + let range = mask + .as_object() + .and_then(|o| o.get("range")) + .and_then(Value::as_str) + .and_then(parse_range); + + out.columns.push(ColumnMask { + schema: schema.into(), + table: name.into(), + column: col_name.into(), + kind: kind.into(), + range, + }); + } + } + + Ok(out) +} + +/// Read `.config.meta.masking`, falling back to `.meta.masking`. +fn meta_masking(node: &Value) -> Option { + if let Some(v) = node + .get("config") + .and_then(|c| c.get("meta")) + .and_then(|m| m.get("masking")) + { + return Some(v.clone()); + } + node.get("meta").and_then(|m| m.get("masking")).cloned() +} + +/// Short-form (`"name"`) vs extended-form (`{"kind":"name", …}`) both +/// reduce to a single kind string. +fn mask_kind(v: &Value) -> Option<&str> { + match v { + Value::String(s) => Some(s.as_str()), + Value::Object(o) => o.get("kind").and_then(Value::as_str), + _ => None, + } +} + +/// Derive a base version (`major.minor.0`) from a `MAJOR.MINOR.PATCH` +/// version. Returns `None` if the input doesn't match that shape, or if +/// the patch is already `0`. +pub fn base_version(v: &str) -> Option { + let parts: Vec<&str> = v.split('.').collect(); + if parts.len() != 3 { + return None; + } + let minor: u32 = parts[1].parse().ok()?; + let patch: u32 = parts[2].parse().ok()?; + if patch == 0 { + return None; + } + Some(format!("{}.{}.0", parts[0], minor)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_short_form_string() { + let m = parse_manifest( + r#"{ + "sources": { + "any.id": { + "schema": "public", + "name": "users", + "columns": { + "email": {"config": {"meta": {"masking": "email"}}} + } + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.columns.len(), 1); + assert_eq!(m.columns[0].schema, "public"); + assert_eq!(m.columns[0].table, "users"); + assert_eq!(m.columns[0].column, "email"); + assert_eq!(m.columns[0].kind, "email"); + assert_eq!(m.columns[0].range, None); + } + + #[test] + fn parses_extended_form_with_range() { + let m = parse_manifest( + r#"{ + "sources": { + "x": { + "schema": "public", + "name": "vitals", + "columns": { + "heart_rate": {"config":{"meta":{"masking":{"kind":"float","range":"60-200"}}}} + } + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.columns[0].kind, "float"); + assert_eq!(m.columns[0].range, Some((60.0, 200.0))); + } + + #[test] + fn parses_extended_form_with_float_range() { + let m = parse_manifest( + r#"{ + "sources": { + "x": { + "schema": "public", + "name": "vitals", + "columns": { + "urine_sg": {"config":{"meta":{"masking":{"kind":"float","range":"1.001-1.03"}}}} + } + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.columns[0].range, Some((1.001, 1.03))); + } + + #[test] + fn parses_table_level_truncate() { + let m = parse_manifest( + r#"{ + "sources": { + "x": { + "schema": "public", + "name": "sync_lookup", + "config": {"meta": {"masking": "truncate"}}, + "columns": {} + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.tables.len(), 1); + assert_eq!(m.tables[0].schema, "public"); + assert_eq!(m.tables[0].table, "sync_lookup"); + assert_eq!(m.tables[0].kind, "truncate"); + } + + #[test] + fn parses_table_level_truncate_via_meta_fallback() { + let m = parse_manifest( + r#"{ + "sources": { + "x": { + "schema": "public", + "name": "t", + "meta": {"masking": "truncate"}, + "columns": {} + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.tables.len(), 1); + assert_eq!(m.tables[0].table, "t"); + } + + #[test] + fn skips_source_missing_schema_or_name() { + let m = parse_manifest( + r#"{ + "sources": { + "a": {"name": "t", "columns": {"c": {"config":{"meta":{"masking":"email"}}}}}, + "b": {"schema": "s", "columns": {"c": {"config":{"meta":{"masking":"email"}}}}} + } + }"#, + ) + .unwrap(); + + assert_eq!(m.columns.len(), 0); + } + + #[test] + fn keeps_unknown_kind_verbatim() { + let m = parse_manifest( + r#"{ + "sources": { + "x": { + "schema": "public", + "name": "t", + "columns": { + "c": {"config":{"meta":{"masking":"brand_new"}}} + } + } + } + }"#, + ) + .unwrap(); + + assert_eq!(m.columns[0].kind, "brand_new"); + } + + #[test] + fn base_version_strips_patch() { + assert_eq!(base_version("2.41.7"), Some("2.41.0".to_string())); + } + + #[test] + fn base_version_returns_none_when_patch_is_zero() { + assert_eq!(base_version("2.41.0"), None); + } + + #[test] + fn base_version_returns_none_on_bad_shape() { + assert_eq!(base_version("2.41"), None); + assert_eq!(base_version("not-a-version"), None); + assert_eq!(base_version("2.x.7"), None); + } +} diff --git a/src/controllers/replica/redaction/mask.rs b/src/controllers/replica/redaction/mask.rs new file mode 100644 index 0000000..8644c99 --- /dev/null +++ b/src/controllers/replica/redaction/mask.rs @@ -0,0 +1,364 @@ +//! Mask types parsed out of a Tamanu/dbt manifest, and the registry that +//! turns them into `SECURITY LABEL` fragments for postgresql_anonymizer. +//! +//! The canonical contract for `meta.masking` is documented at +//! . + +use crate::controllers::postgres::quote_ident; + +#[derive(Debug, Clone, PartialEq)] +pub struct ColumnMask { + pub schema: String, + pub table: String, + pub column: String, + pub kind: String, + pub range: Option<(f64, f64)>, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TableMask { + pub schema: String, + pub table: String, + pub kind: String, +} + +/// Resolved column metadata used by the type-dispatched kinds +/// (`zero`, `empty`, `default`, `nil`). +#[derive(Debug, Clone)] +pub struct ColumnInfo { + pub data_type: String, + pub is_nullable: bool, + pub column_default: Option, +} + +/// The SQL right-hand side of a `SECURITY LABEL … IS ''`. +#[derive(Debug, Clone, PartialEq)] +pub enum Fragment { + Function(String), + Value(String), +} + +impl Fragment { + pub fn render(&self) -> String { + match self { + Self::Function(expr) => format!("MASKED WITH FUNCTION {expr}"), + Self::Value(expr) => format!("MASKED WITH VALUE {expr}"), + } + } +} + +/// Parse `"L-H"` (e.g. `"20-50"`, `"1.001-1.03"`) into a pair of `f64`s, +/// splitting on the **last** `-` so floats decompose correctly. Returns +/// `None` on parse failure. +pub fn parse_range(s: &str) -> Option<(f64, f64)> { + let (lo, hi) = s.rsplit_once('-')?; + let lo: f64 = lo.parse().ok()?; + let hi: f64 = hi.parse().ok()?; + Some((lo, hi)) +} + +/// Build the `Fragment` for a column mask. `info` is only consulted for +/// kinds that need column-type knowledge (`zero`, `empty`, `default`, +/// `nil`); for other kinds it can be `None` (used by unit tests). +/// +/// Returns `Err` with a short diagnostic when the kind is unsupported or +/// when type-dependent kinds are missing required `info`. +pub fn fragment_for(mask: &ColumnMask, info: Option<&ColumnInfo>) -> Result { + let col = quote_ident(&mask.column); + + match mask.kind.as_str() { + "date" => Ok(Fragment::Function(null_pres( + &col, + "anon.random_date()".into(), + ))), + + "datetime" => Ok(Fragment::Function(null_pres( + &col, + format!("date_trunc('day', {col}) + (floor(random() * 86400) || ' seconds')::interval"), + ))), + + "text" => Ok(Fragment::Function(null_pres( + &col, + format!("anon.lorem_ipsum(characters := length({col}))"), + ))), + + "string" => Ok(Fragment::Function(null_pres( + &col, + format!("anon.random_string(length({col}))"), + ))), + + "email" => Ok(Fragment::Function(null_pres( + &col, + "anon.fake_email()".into(), + ))), + + "name" => Ok(Fragment::Function(null_pres( + &col, + format!( + "CASE WHEN {col} LIKE '% %' \ + THEN anon.fake_first_name() || ' ' || anon.fake_last_name() \ + ELSE anon.fake_first_name() END" + ), + ))), + + "phone" => Ok(Fragment::Function(null_pres( + &col, + format!("anon.partial({col}, 2, '****', 2)"), + ))), + + "place" => Ok(Fragment::Function(null_pres( + &col, + "anon.fake_city()".into(), + ))), + + "url" => Ok(Fragment::Function(null_pres( + &col, + "'https://example.invalid/' || anon.random_string(8)".into(), + ))), + + "integer" => { + let (lo, hi) = mask.range.unwrap_or((i32::MIN as f64, i32::MAX as f64)); + Ok(Fragment::Function(null_pres( + &col, + format!("(floor(random() * ({hi} - {lo} + 1)) + {lo})::int"), + ))) + } + + "float" => { + let (lo, hi) = mask.range.unwrap_or((0.0, 1.0)); + Ok(Fragment::Function(null_pres( + &col, + format!("(random() * ({hi} - {lo}) + {lo})::numeric"), + ))) + } + + "money" => { + let (lo, hi) = mask.range.unwrap_or((0.0, 10_000.0)); + Ok(Fragment::Function(null_pres( + &col, + format!("round((random() * ({hi} - {lo}) + {lo})::numeric, 2)"), + ))) + } + + "zero" => { + let info = info.ok_or_else(|| "zero mask needs column type".to_string())?; + match data_type_family(&info.data_type) { + DataTypeFamily::Bytea => Ok(Fragment::Function(format!( + "repeat(E'\\x00'::bytea, length({col}))" + ))), + DataTypeFamily::Text => { + Ok(Fragment::Function(format!("repeat('0', length({col}))"))) + } + DataTypeFamily::Numeric => Ok(Fragment::Value("0".into())), + DataTypeFamily::Other => { + Err(format!("zero mask unsupported for type {}", info.data_type)) + } + } + } + + "empty" => { + let info = info.ok_or_else(|| "empty mask needs column type".to_string())?; + match data_type_family(&info.data_type) { + DataTypeFamily::Numeric => Ok(Fragment::Value("0".into())), + DataTypeFamily::Text => Ok(Fragment::Value("''".into())), + DataTypeFamily::Bytea => Ok(Fragment::Value("E'\\\\x'::bytea".into())), + DataTypeFamily::Other => match info.data_type.as_str() { + "json" | "jsonb" => Ok(Fragment::Value(format!("'{{}}'::{}", info.data_type))), + "ARRAY" => Ok(Fragment::Value("'{}'".into())), + _ => Err(format!( + "empty mask unsupported for type {}", + info.data_type + )), + }, + } + } + + "nil" => { + let info = info.ok_or_else(|| "nil mask needs column type".to_string())?; + if !info.is_nullable { + return Err("nil mask on non-nullable column".into()); + } + Ok(Fragment::Value("NULL".into())) + } + + "default" => { + let info = info.ok_or_else(|| "default mask needs column type".to_string())?; + match info.column_default.as_deref() { + Some(d) => Ok(Fragment::Value(d.into())), + None => Err("default mask on column without default".into()), + } + } + + other => Err(format!("unknown mask kind: {other}")), + } +} + +/// Wrap an expression in a null-preserving CASE. +fn null_pres(col: &str, expr: String) -> String { + format!("CASE WHEN {col} IS NULL THEN NULL ELSE {expr} END") +} + +enum DataTypeFamily { + Numeric, + Text, + Bytea, + Other, +} + +/// Group `information_schema.columns.data_type` strings into the families +/// that determine how `zero`/`empty` are realised. +fn data_type_family(s: &str) -> DataTypeFamily { + match s { + "smallint" | "integer" | "bigint" | "real" | "double precision" | "numeric" | "decimal" => { + DataTypeFamily::Numeric + } + "character varying" | "character" | "text" | "citext" => DataTypeFamily::Text, + "bytea" => DataTypeFamily::Bytea, + _ => DataTypeFamily::Other, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn cm(kind: &str, range: Option<(f64, f64)>) -> ColumnMask { + ColumnMask { + schema: "public".into(), + table: "t".into(), + column: "c".into(), + kind: kind.into(), + range, + } + } + + fn info(data_type: &str, nullable: bool, default: Option<&str>) -> ColumnInfo { + ColumnInfo { + data_type: data_type.into(), + is_nullable: nullable, + column_default: default.map(str::to_string), + } + } + + #[test] + fn range_splits_on_last_dash_for_floats() { + assert_eq!(parse_range("1.001-1.03"), Some((1.001, 1.03))); + assert_eq!(parse_range("20-50"), Some((20.0, 50.0))); + assert_eq!(parse_range("0-10.5"), Some((0.0, 10.5))); + } + + #[test] + fn range_handles_negative_lo() { + assert_eq!(parse_range("-5-5"), Some((-5.0, 5.0))); + } + + #[test] + fn range_returns_none_on_garbage() { + assert!(parse_range("nope").is_none()); + assert!(parse_range("1-x").is_none()); + assert!(parse_range("1.2").is_none()); + } + + #[test] + fn fragment_email_is_null_preserving() { + let f = fragment_for(&cm("email", None), None).unwrap(); + let rendered = f.render(); + assert!(rendered.contains("MASKED WITH FUNCTION")); + assert!(rendered.contains("CASE WHEN")); + assert!(rendered.contains("anon.fake_email()")); + } + + #[test] + fn fragment_name_detects_space() { + let f = fragment_for(&cm("name", None), None).unwrap(); + let rendered = f.render(); + assert!(rendered.contains("LIKE '% %'")); + // anon doesn't ship fake_name(); compose first + last for the + // with-space branch. + assert!(rendered.contains("fake_first_name() || ' ' || anon.fake_last_name()")); + assert!(rendered.contains("ELSE anon.fake_first_name()")); + } + + #[test] + fn fragment_integer_uses_range() { + let f = fragment_for(&cm("integer", Some((20.0, 50.0))), None).unwrap(); + let rendered = f.render(); + assert!(rendered.contains("50 - 20")); + assert!(rendered.contains("::int")); + } + + #[test] + fn fragment_money_rounds_to_two_decimals() { + let f = fragment_for(&cm("money", Some((0.0, 100.0))), None).unwrap(); + assert!(f.render().contains("round(")); + } + + #[test] + fn fragment_zero_for_bytea_repeats() { + let f = fragment_for(&cm("zero", None), Some(&info("bytea", true, None))).unwrap(); + assert!(matches!(f, Fragment::Function(ref s) if s.contains("repeat(E'\\x00'::bytea"))); + } + + #[test] + fn fragment_zero_for_text_repeats_digit() { + let f = fragment_for(&cm("zero", None), Some(&info("text", true, None))).unwrap(); + assert!(matches!(f, Fragment::Function(ref s) if s.contains("repeat('0',"))); + } + + #[test] + fn fragment_zero_for_numeric_is_value_zero() { + let f = fragment_for(&cm("zero", None), Some(&info("integer", true, None))).unwrap(); + assert_eq!(f, Fragment::Value("0".into())); + } + + #[test] + fn fragment_empty_dispatches_on_type() { + assert_eq!( + fragment_for(&cm("empty", None), Some(&info("integer", true, None))).unwrap(), + Fragment::Value("0".into()) + ); + assert_eq!( + fragment_for(&cm("empty", None), Some(&info("text", true, None))).unwrap(), + Fragment::Value("''".into()) + ); + assert_eq!( + fragment_for(&cm("empty", None), Some(&info("jsonb", true, None))).unwrap(), + Fragment::Value("'{}'::jsonb".into()) + ); + } + + #[test] + fn fragment_nil_requires_nullable() { + assert!(fragment_for(&cm("nil", None), Some(&info("text", false, None))).is_err()); + assert_eq!( + fragment_for(&cm("nil", None), Some(&info("text", true, None))).unwrap(), + Fragment::Value("NULL".into()) + ); + } + + #[test] + fn fragment_default_requires_default_expression() { + assert!(fragment_for(&cm("default", None), Some(&info("text", true, None))).is_err()); + assert_eq!( + fragment_for( + &cm("default", None), + Some(&info("text", true, Some("'hello'::text"))), + ) + .unwrap(), + Fragment::Value("'hello'::text".into()) + ); + } + + #[test] + fn fragment_unknown_kind_errors() { + assert!(fragment_for(&cm("brand_new_kind", None), None).is_err()); + } + + #[test] + fn rendered_value_keeps_null_marker() { + assert_eq!( + Fragment::Value("NULL".into()).render(), + "MASKED WITH VALUE NULL" + ); + } +} diff --git a/src/controllers/replica/scheduling.rs b/src/controllers/replica/scheduling.rs index db0beb1..9bb82ca 100644 --- a/src/controllers/replica/scheduling.rs +++ b/src/controllers/replica/scheduling.rs @@ -188,6 +188,7 @@ mod tests { ), persistent_schemas: None, + redaction: None, }, status: Some(PostgresPhysicalReplicaStatus { next_scheduled_restore: next_scheduled.map(Time), diff --git a/src/controllers/replica/schema_migration.rs b/src/controllers/replica/schema_migration.rs index a0b4f87..568852f 100644 --- a/src/controllers/replica/schema_migration.rs +++ b/src/controllers/replica/schema_migration.rs @@ -285,6 +285,7 @@ mod tests { ), persistent_schemas: Some(schemas.into_iter().map(String::from).collect()), + redaction: None, }, status: None, } diff --git a/src/controllers/restore/builders.rs b/src/controllers/restore/builders.rs index ef06f89..43882e7 100644 --- a/src/controllers/restore/builders.rs +++ b/src/controllers/restore/builders.rs @@ -682,19 +682,29 @@ cp -a /usr/lib/locale/* /locale-data/ "# .to_string(); - // persistent_schemas needs write access to receive the migrated data - let effective_read_only = replica.spec.read_only && replica.spec.persistent_schemas.is_none(); + // persistent_schemas and redaction both need write access during their + // post-restore step. Redaction re-enables `default_transaction_read_only` + // at the database level itself after it's done. + let effective_read_only = replica.spec.read_only + && replica.spec.persistent_schemas.is_none() + && replica.spec.redaction.is_none(); let read_only = effective_read_only.to_string(); - let extra_config_block = if let Some(ref extra) = replica.spec.postgres_extra_config { + let extra_config = replica + .spec + .postgres_extra_config + .clone() + .map(|s| format!("{s}\n")) + .unwrap_or_default(); + + let extra_config_block = if extra_config.is_empty() { + String::new() + } else { format!( r#"echo "Appending extra postgresql.conf settings..." cat >> "$PGDATA/postgresql.conf" << 'EXTRACONFEOF' -{extra} -EXTRACONFEOF"# +{extra_config}EXTRACONFEOF"# ) - } else { - String::new() }; let init_script = format!( @@ -989,13 +999,11 @@ echo "Auth setup complete" image: Some(pg_image.clone()), command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]), args: Some(vec![locale_script]), - security_context: Some( - k8s_openapi::api::core::v1::SecurityContext { - run_as_user: Some(0), - run_as_group: Some(0), - ..Default::default() - }, - ), + security_context: Some(k8s_openapi::api::core::v1::SecurityContext { + run_as_user: Some(0), + run_as_group: Some(0), + ..Default::default() + }), volume_mounts: Some(vec![VolumeMount { name: "locale-data".to_string(), mount_path: "/locale-data".to_string(), @@ -1046,7 +1054,42 @@ echo "Auth setup complete" name: "postgres".to_string(), image: Some(pg_image), command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]), - args: Some(vec![r#" + args: Some(vec![format!( + r#" +PG_MAJOR={pg_version} + +# When spec.redaction is configured, the operator sets REDACTION_ENABLED=1 +# so that we apt-install postgresql_anonymizer_$PG_MAJOR and drop the +# files into the standard system extension dirs of this container's +# (fresh) writable filesystem layer. The PVC-backed cache at +# /pgdata/.anon-cache avoids re-downloading on every pod restart. +if [ "${{REDACTION_ENABLED:-0}}" = "1" ]; then + if [ ! -f /pgdata/.anon-cache/anon.so ] || [ ! -f /pgdata/.anon-cache/anon.control ]; then + echo "Installing postgresql_anonymizer_${{PG_MAJOR}} from Dalibo Labs..." + export DEBIAN_FRONTEND=noninteractive + apt-get update + apt-get install -y --no-install-recommends curl ca-certificates gnupg lsb-release + curl -fsSL https://apt.dalibo.org/labs/debian-dalibo.gpg \ + -o /etc/apt/trusted.gpg.d/dalibo-labs.gpg + echo "deb http://apt.dalibo.org/labs $(lsb_release -cs)-dalibo main" \ + > /etc/apt/sources.list.d/dalibo-labs.list + apt-get update + apt-get install -y --no-install-recommends "postgresql_anonymizer_${{PG_MAJOR}}" + mkdir -p /pgdata/.anon-cache + cp -a "/usr/share/postgresql/${{PG_MAJOR}}/extension/anon"* /pgdata/.anon-cache/ + cp -a "/usr/lib/postgresql/${{PG_MAJOR}}/lib/anon.so" /pgdata/.anon-cache/ + chown -R 999:999 /pgdata/.anon-cache + else + echo "anon already cached on PVC, skipping install" + fi + + # Drop the files into this container's writable layer at the standard + # system paths. Cheap (<1s) and has to happen every pod start because + # the writable layer doesn't persist across restarts. + cp -a /pgdata/.anon-cache/anon* "/usr/share/postgresql/${{PG_MAJOR}}/extension/" + cp -a /pgdata/.anon-cache/anon.so "/usr/lib/postgresql/${{PG_MAJOR}}/lib/" +fi + if [ -f /pgdata/needs-reindex ]; then PG_MAJOR=$(cat /pgdata/pgdata/PG_VERSION) ( @@ -1078,20 +1121,49 @@ if [ -f /pgdata/needs-reindex ]; then echo "Background reindex complete" ) & fi -exec postgres -D /pgdata/pgdata ${PGRO_LOG_LEVEL:+-c log_min_messages=$PGRO_LOG_LEVEL} -"#.to_string()]), - env: Some(vec![ - EnvVar { - name: "PGDATA".to_string(), - value: Some("/pgdata/pgdata".to_string()), - ..Default::default() - }, - EnvVar { - name: "POSTGRES_HOST_AUTH_METHOD".to_string(), - value: Some("scram-sha-256".to_string()), + +# Drop privileges to UID 999 (postgres) before launching the server. +# The pod's PodSecurityContext requests UID 999 for the entire pod, but +# this container overrides to root via runAsUser=0 above so we can do +# the apt install + cp prelude. gosu hands off cleanly without a +# trampoline shell. +exec gosu postgres postgres -D /pgdata/pgdata ${{PGRO_LOG_LEVEL:+-c log_min_messages=$PGRO_LOG_LEVEL}} +"# + )]), + env: Some({ + let mut env = vec![ + EnvVar { + name: "PGDATA".to_string(), + value: Some("/pgdata/pgdata".to_string()), + ..Default::default() + }, + EnvVar { + name: "POSTGRES_HOST_AUTH_METHOD".to_string(), + value: Some("scram-sha-256".to_string()), + ..Default::default() + }, + ]; + if replica.spec.redaction.is_some() { + env.push(EnvVar { + name: "REDACTION_ENABLED".to_string(), + value: Some("1".to_string()), + ..Default::default() + }); + } + env + }), + security_context: replica.spec.redaction.is_some().then(|| { + // When redaction is set, the postgres container's + // prelude apt-installs anon and copies files into + // /usr/{share,lib}/postgresql/$N/... — both root- + // only operations. gosu drops back to UID 999 + // before exec'ing postgres itself. + k8s_openapi::api::core::v1::SecurityContext { + run_as_user: Some(0), + run_as_group: Some(0), ..Default::default() - }, - ]), + } + }), ports: Some(vec![ContainerPort { name: Some("postgres".to_string()), container_port: 5432, @@ -1168,12 +1240,10 @@ exec postgres -D /pgdata/pgdata ${PGRO_LOG_LEVEL:+-c log_min_messages=$PGRO_LOG_ }, Volume { name: "dshm".to_string(), - empty_dir: Some( - k8s_openapi::api::core::v1::EmptyDirVolumeSource { - medium: Some("Memory".to_string()), - size_limit: Some(shm_size), - }, - ), + empty_dir: Some(k8s_openapi::api::core::v1::EmptyDirVolumeSource { + medium: Some("Memory".to_string()), + size_limit: Some(shm_size), + }), ..Default::default() }, ]), diff --git a/src/controllers/restore/tests.rs b/src/controllers/restore/tests.rs index a710b60..17bf443 100644 --- a/src/controllers/restore/tests.rs +++ b/src/controllers/restore/tests.rs @@ -39,6 +39,7 @@ fn deployment_uses_affinity_not_node_selector() { notifications: vec![], persistent_schemas: None, + redaction: None, storage_size_maximum: Quantity("2Ti".to_string()), }, ); @@ -123,6 +124,7 @@ fn test_restore_and_replica() -> (PostgresPhysicalRestore, PostgresPhysicalRepli notifications: vec![], persistent_schemas: None, + redaction: None, storage_size_maximum: Quantity("2Ti".to_string()), }, ); @@ -644,3 +646,180 @@ fn deployment_shared_buffers_with_custom_resources() { "init script must set shared_buffers for 2Gi request" ); } + +#[test] +fn deployment_with_redaction_runs_postgres_as_root_and_sets_redaction_env() { + let (mut restore, mut replica) = test_restore_and_replica(); + restore.status = Some(PostgresPhysicalRestoreStatus { + postgres_version: Some("18".to_string()), + ..Default::default() + }); + replica.spec.redaction = Some(RedactionSpec { + manifest_url: "https://example.com/m.json".into(), + version: None, + version_query: None, + version_fallback_to_base: false, + }); + + let deploy = build_deployment(&restore, "test-restore", "default", &replica).unwrap(); + let pod = deploy + .spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap(); + + let postgres = pod + .containers + .iter() + .find(|c| c.name == "postgres") + .expect("postgres container must be present"); + + let sec = postgres + .security_context + .as_ref() + .expect("postgres container must override securityContext when redaction is set"); + assert_eq!(sec.run_as_user, Some(0), "postgres must run as root"); + + let env = postgres.env.as_ref().unwrap(); + assert!( + env.iter() + .any(|e| e.name == "REDACTION_ENABLED" && e.value.as_deref() == Some("1")), + "REDACTION_ENABLED=1 must be set so the prelude installs anon" + ); + + let script = &postgres.args.as_ref().unwrap()[0]; + assert!( + script.contains("PG_MAJOR=18"), + "prelude must pin PG_MAJOR to the restore's PG version, got: {script}" + ); + assert!( + script.contains("postgresql_anonymizer_${PG_MAJOR}"), + "prelude must apt-install the PG-major-specific anon package" + ); + assert!( + script.contains("exec gosu postgres postgres"), + "prelude must drop privileges via gosu before exec'ing postgres" + ); +} + +#[test] +fn deployment_without_redaction_keeps_default_securitycontext() { + let (mut restore, replica) = test_restore_and_replica(); + restore.status = Some(PostgresPhysicalRestoreStatus { + postgres_version: Some("18".to_string()), + ..Default::default() + }); + + let deploy = build_deployment(&restore, "test-restore", "default", &replica).unwrap(); + let pod = deploy + .spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap(); + let postgres = pod + .containers + .iter() + .find(|c| c.name == "postgres") + .unwrap(); + assert!( + postgres.security_context.is_none(), + "postgres container must inherit the pod-level UID 999 when redaction is off" + ); + let env = postgres.env.as_ref().unwrap(); + assert!( + !env.iter().any(|e| e.name == "REDACTION_ENABLED"), + "REDACTION_ENABLED must not be set when redaction is off" + ); +} + +#[test] +fn deployment_with_redaction_builds_for_pg16() { + // Redaction used to be gated to PG 18+ when we relied on the + // extension_control_path GUC. Now the postgres container's prelude + // drops the files into /usr/share/postgresql/$N/extension and + // /usr/lib/postgresql/$N/lib of its own writable layer, so any PG + // major works. + let (mut restore, mut replica) = test_restore_and_replica(); + restore.status = Some(PostgresPhysicalRestoreStatus { + postgres_version: Some("16".to_string()), + ..Default::default() + }); + replica.spec.redaction = Some(RedactionSpec { + manifest_url: "https://example.com/m.json".into(), + version: None, + version_query: None, + version_fallback_to_base: false, + }); + + let deploy = build_deployment(&restore, "test-restore", "default", &replica) + .expect("redaction should build on PG 16"); + let pod = deploy + .spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap(); + let postgres = pod + .containers + .iter() + .find(|c| c.name == "postgres") + .unwrap(); + let script = &postgres.args.as_ref().unwrap()[0]; + assert!( + script.contains("PG_MAJOR=16"), + "prelude must use the restore's PG major (16), got: {script}" + ); +} + +#[test] +fn deployment_with_redaction_forces_writable() { + let (mut restore, mut replica) = test_restore_and_replica(); + restore.status = Some(PostgresPhysicalRestoreStatus { + postgres_version: Some("18".to_string()), + ..Default::default() + }); + replica.spec.read_only = true; + replica.spec.redaction = Some(RedactionSpec { + manifest_url: "https://example.com/m.json".into(), + version: None, + version_query: None, + version_fallback_to_base: false, + }); + + let deploy = build_deployment(&restore, "test-restore", "default", &replica).unwrap(); + let script = deploy_init_setup_auth_script(&deploy); + // The init script uses `if [ "" = "true" ]` and we want + // that variable substituted to "false" when redaction is set so the + // conditional doesn't fire at runtime. + assert!( + script.contains("if [ \"false\" = \"true\" ]"), + "redaction must defer read-only by substituting read_only=false into the init script" + ); +} + +fn deploy_init_setup_auth_script(deploy: &k8s_openapi::api::apps::v1::Deployment) -> String { + let pod = deploy + .spec + .as_ref() + .unwrap() + .template + .spec + .as_ref() + .unwrap(); + let setup_auth = pod + .init_containers + .as_ref() + .unwrap() + .iter() + .find(|c| c.name == "setup-auth") + .unwrap(); + setup_auth.args.as_ref().unwrap()[0].clone() +} diff --git a/src/error.rs b/src/error.rs index bcbfb7e..32e05ca 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,9 @@ pub enum Error { #[error("Database connection error: {0}")] Postgres(#[from] tokio_postgres::Error), + + #[error("Redaction error: {0}")] + Redaction(String), } pub type Result = std::result::Result; diff --git a/src/types/replica.rs b/src/types/replica.rs index 3886a6e..6fc3869 100644 --- a/src/types/replica.rs +++ b/src/types/replica.rs @@ -101,6 +101,41 @@ pub struct PostgresPhysicalReplicaSpec { /// computed size exceeds this limit. Defaults to 2Ti. #[serde(default = "default_storage_size_maximum")] pub storage_size_maximum: Quantity, + + /// If set, apply a redaction manifest to the restored data before the + /// replica becomes eligible for switchover. Requires Postgres 18+ and + /// the postgresql_anonymizer extension (loaded via image-volume mount + /// on the restore Pod). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub redaction: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct RedactionSpec { + /// HTTP(S) URL of the dbt-style masking manifest. May contain a + /// `{version}` placeholder, in which case `version` or `versionQuery` + /// must be set. + pub manifest_url: String, + + /// Pinned version to substitute into `{version}`. Mutually exclusive + /// with `versionQuery`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub version: Option, + + /// SQL query that returns a single text column with the version string. + /// Run against the restore's main database as the operator's superuser. + /// Mutually exclusive with `version`. + /// + /// Example (Tamanu): + /// `SELECT value FROM local_system_facts WHERE key = 'currentVersion'` + #[serde(default, skip_serializing_if = "Option::is_none")] + pub version_query: Option, + + /// If the manifest URL with the discovered/pinned version 404s, retry + /// with the major.minor.0 base version. + #[serde(default)] + pub version_fallback_to_base: bool, } fn default_storage_size_maximum() -> Quantity { @@ -296,6 +331,19 @@ pub struct PostgresPhysicalReplicaStatus { /// cleared (e.g. by a spec change or manual intervention). #[serde(default, skip_serializing_if = "Option::is_none")] pub consecutive_restore_failures: Option, + + /// Phase of redaction for the current restore: + /// pending, active, complete, partial, failed. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub redaction_phase: Option, + + /// Resolved manifest version used by the last redaction run. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub redaction_version: Option, + + /// Number of columns redacted in the last run. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub redaction_columns_applied: Option, } #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] diff --git a/tests/fixtures/manifest-server.yaml b/tests/fixtures/manifest-server.yaml new file mode 100644 index 0000000..3d22d6c --- /dev/null +++ b/tests/fixtures/manifest-server.yaml @@ -0,0 +1,75 @@ +--- +# Static-manifest HTTP server for the redaction integration test. +# +# The harness `kubectl apply`s this into the test namespace; the redacted +# replica's spec.redaction.manifestUrl points at +# http://manifest-server..svc/manifest.json +apiVersion: v1 +kind: ConfigMap +metadata: + name: manifest-server-content +data: + manifest.json: | + { + "sources": { + "source.tamanu.tamanu.users": { + "schema": "public", + "name": "users", + "columns": { + "email": { "config": { "meta": { "masking": "email" } } }, + "full_name": { "config": { "meta": { "masking": "name" } } }, + "single_name": { "config": { "meta": { "masking": "name" } } }, + "dob": { "config": { "meta": { "masking": "date" } } }, + "phone": { "config": { "meta": { "masking": "phone" } } }, + "heart_rate": { "config": { "meta": { "masking": { "kind": "integer", "range": "50-100" } } } } + } + }, + "source.tamanu.tamanu.sync_lookup": { + "schema": "public", + "name": "sync_lookup", + "config": { "meta": { "masking": "truncate" } }, + "columns": {} + } + } + } +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: manifest-server + labels: + app: manifest-server +spec: + replicas: 1 + selector: + matchLabels: + app: manifest-server + template: + metadata: + labels: + app: manifest-server + spec: + containers: + - name: nginx + image: nginx:alpine + imagePullPolicy: IfNotPresent + ports: + - containerPort: 80 + volumeMounts: + - name: content + mountPath: /usr/share/nginx/html + volumes: + - name: content + configMap: + name: manifest-server-content +--- +apiVersion: v1 +kind: Service +metadata: + name: manifest-server +spec: + selector: + app: manifest-server + ports: + - port: 80 + targetPort: 80 diff --git a/tests/fixtures/setup-kopia-repo-pg18.yaml b/tests/fixtures/setup-kopia-repo-pg18.yaml new file mode 100644 index 0000000..f0faa16 --- /dev/null +++ b/tests/fixtures/setup-kopia-repo-pg18.yaml @@ -0,0 +1,138 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: setup-kopia-repo-pg18 + namespace: default +spec: + backoffLimit: 3 + template: + spec: + securityContext: + fsGroup: 999 + initContainers: + - name: create-bucket + image: minio/mc:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c"] + args: + - | + set -e + mc alias set minio http://minio.minio.svc:9000 minioadmin minioadmin + mc mb minio/test-bucket-pg18 --ignore-existing + echo "Bucket created" + - name: init-pgdata + image: postgres:18 + imagePullPolicy: IfNotPresent + securityContext: + runAsUser: 0 + command: ["/bin/sh", "-c"] + args: + - | + set -e + echo "Initializing PostgreSQL 18 data directory..." + mkdir -p /pgdata/18/main + chown postgres:postgres /pgdata/18/main + gosu postgres initdb -D /pgdata/18/main --no-locale --encoding=UTF8 --auth=trust + + echo "Starting temporary PostgreSQL to create application database..." + gosu postgres pg_ctl -D /pgdata/18/main -l /tmp/pg.log start -w + + gosu postgres psql -d postgres -c "CREATE DATABASE myapp" + + # Tamanu-shaped local_system_facts table so the operator's + # versionQuery has something to read. The value is the + # manifest version the test harness will serve. + gosu postgres psql -d myapp <<'SQL' + CREATE TABLE local_system_facts ( + key text PRIMARY KEY, + value text + ); + INSERT INTO local_system_facts (key, value) VALUES ('currentVersion', '1.0.0'); + + -- A users table mirroring the dbt-masking contract: + -- email, name, dob (date) and a non-redacted column + -- to make sure unmarked columns are preserved. + CREATE TABLE users ( + id serial PRIMARY KEY, + email text NOT NULL, + full_name text NOT NULL, + single_name text NOT NULL, + dob date, + phone text, + heart_rate int, + unmasked text NOT NULL + ); + INSERT INTO users (email, full_name, single_name, dob, phone, heart_rate, unmasked) VALUES + ('a@example.com', 'Alice Apple', 'Alice', '1980-01-15', '+64211234567', 70, 'keep-1'), + ('b@example.com', 'Bob Banana', 'Bob', '1975-05-22', '+64211234568', 80, 'keep-2'), + ('c@example.com', 'Carol Cherry', 'Carol', '1990-11-30', '+64211234569', 90, 'keep-3'), + ('d@example.com', 'Dave Date', 'Dave', '1985-07-04', '+64211234570', 60, 'keep-4'), + ('e@example.com', 'Eve Elderberry','Eve', '2000-12-25', '+64211234571', 75, 'keep-5'); + + -- A throwaway table to verify the table-level + -- "truncate" mask works. + CREATE TABLE sync_lookup ( + id serial PRIMARY KEY, + data text NOT NULL + ); + INSERT INTO sync_lookup (data) + SELECT 'row-' || i FROM generate_series(1, 50) AS i; + SQL + echo "Application database 'myapp' created with test data" + + gosu postgres pg_ctl -D /pgdata/18/main stop -w + + echo "PostgreSQL data directory initialized" + ls -la /pgdata/18/main/ + cat /pgdata/18/main/PG_VERSION + volumeMounts: + - name: pgdata + mountPath: /pgdata + containers: + - name: create-snapshot + image: kopia/kopia:0.22.3 + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c"] + args: + - | + set -e + + mkdir -p /tmp/kopia/config /tmp/kopia/logs /tmp/kopia/cache + + echo "Creating kopia repository..." + kopia repository create s3 \ + --bucket=test-bucket-pg18 \ + --endpoint=minio.minio.svc:9000 \ + --region=us-east-1 \ + --access-key=minioadmin \ + --secret-access-key=minioadmin \ + --password=test-repo-password \ + --disable-tls \ + --disable-tls-verification + + echo "Creating snapshot..." + kopia snapshot create /pgdata + + echo "Verifying snapshot..." + kopia snapshot list --json --all + + echo "Done" + env: + - name: KOPIA_CONFIG_PATH + value: /tmp/kopia/config/repository.config + - name: KOPIA_LOG_DIR + value: /tmp/kopia/logs + - name: KOPIA_CACHE_DIRECTORY + value: /tmp/kopia/cache + - name: KOPIA_PASSWORD + value: test-repo-password + - name: USER + value: kopia + volumeMounts: + - name: pgdata + mountPath: /pgdata + volumes: + - name: pgdata + emptyDir: {} + restartPolicy: Never diff --git a/tests/helpers.rs b/tests/helpers.rs index 2c919cb..b5f680e 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -159,6 +159,7 @@ pub fn build_replica(name: &str, secret_ref: &str, opts: ReplicaOpts) -> Postgre postgres_extra_config: None, notifications: vec![], persistent_schemas: None, + redaction: None, storage_size_maximum: Quantity("2Ti".to_string()), }, ) diff --git a/tests/redaction.rs b/tests/redaction.rs new file mode 100644 index 0000000..07d118a --- /dev/null +++ b/tests/redaction.rs @@ -0,0 +1,321 @@ +//! End-to-end redaction integration test. +//! +//! Requires a PG 18 kopia snapshot (see `setup-kopia-repo-pg18.yaml`) +//! and the in-namespace static manifest server (see +//! `manifest-server.yaml`). Both are deployed by the workflow before this +//! test runs. + +use std::{collections::BTreeMap, time::Duration}; + +use k8s_openapi::{ + ByteString, + api::core::v1::{Secret, SecretReference}, + apimachinery::pkg::api::resource::Quantity, +}; +use kube::{ + Api, + api::{ObjectMeta, PostParams}, +}; +use postgres_restore_operator::{ + types::{ + PostgresPhysicalReplica, PostgresPhysicalReplicaSpec, PostgresPhysicalRestore, + RedactionSpec, ReplicaPhase, RestorePhase, + }, + util::TimeSpan, +}; + +use helpers::*; + +mod helpers; + +const NS: &str = "test-redaction"; +const REPLICA_NAME: &str = "redaction-replica"; + +#[tokio::test] +#[ignore = "requires a running Kubernetes cluster with MinIO, PG-18 kopia snapshot and the manifest server"] +async fn redaction_applies_masks_to_restored_data() { + let client = make_client().await; + + setup_namespace(&client, NS).await; + cleanup_namespace(&client, NS, &[REPLICA_NAME]).await; + + println!("--- deploying in-namespace static manifest server"); + deploy_manifest_server(NS).await; + + let secrets: Api = Api::namespaced(client.clone(), NS); + let replicas: Api = Api::namespaced(client.clone(), NS); + let restores: Api = Api::namespaced(client.clone(), NS); + + println!("--- creating kopia secret (PG-18 bucket)"); + secrets + .create( + &PostParams::default(), + &build_pg18_kopia_secret(NS, "redaction-kopia-creds"), + ) + .await + .expect("failed to create kopia secret"); + + println!("--- creating PostgresPhysicalReplica with redaction config"); + let replica = build_redaction_replica(REPLICA_NAME, "redaction-kopia-creds"); + replicas + .create(&PostParams::default(), &replica) + .await + .expect("failed to create replica"); + + println!("--- waiting for first restore to become Active"); + let restore_name = wait_for_restore_phase( + &restores, + REPLICA_NAME, + RestorePhase::Active, + LONG_PHASE_TIMEOUT, + ) + .await; + wait_for_replica_phase( + &replicas, + REPLICA_NAME, + ReplicaPhase::Ready, + LONG_PHASE_TIMEOUT, + ) + .await; + println!("--- restore {restore_name} active"); + + // At this point the operator has already applied redaction (because + // the restore wouldn't transition Switching -> Active otherwise). + let final_replica = replicas.get(REPLICA_NAME).await.unwrap(); + let status = final_replica.status.as_ref().expect("status set"); + let phase = status.redaction_phase.as_deref(); + assert!( + matches!(phase, Some("complete") | Some("partial")), + "redactionPhase should be complete or partial, got {phase:?}" + ); + let version = status.redaction_version.as_deref(); + assert_eq!( + version, + Some("1.0.0"), + "manifest version should be read from local_system_facts" + ); + let cols = status.redaction_columns_applied.unwrap_or(0); + assert!( + cols >= 6, + "expected at least 6 columns redacted, got {cols}" + ); + + let deploy = format!("deployment/{restore_name}"); + + println!("--- verifying truncate mask emptied sync_lookup"); + let count = query_one_value(NS, &deploy, "SELECT count(*) FROM sync_lookup").await; + assert_eq!(count.trim(), "0", "sync_lookup should be truncated"); + + println!("--- verifying unmasked column kept original values"); + let unmasked = query_one_value( + NS, + &deploy, + "SELECT string_agg(unmasked, ',' ORDER BY id) FROM users", + ) + .await; + assert_eq!(unmasked.trim(), "keep-1,keep-2,keep-3,keep-4,keep-5"); + + println!("--- verifying email column was changed"); + let emails = query_one_value( + NS, + &deploy, + "SELECT string_agg(email, ',' ORDER BY id) FROM users", + ) + .await; + assert!( + !emails.contains("a@example.com"), + "original email should be masked, got: {emails}" + ); + assert!( + emails.contains('@'), + "masked email should still look like an email, got: {emails}" + ); + + println!("--- verifying name masks: full names (with space) and single names"); + // full_name has spaces in the original; mask preserves the space-pattern via + // CASE-WHEN: full names get fake_name(), single names get fake_first_name(). + let full_names = query_one_value( + NS, + &deploy, + "SELECT string_agg(full_name, '|' ORDER BY id) FROM users", + ) + .await; + assert!( + !full_names.contains("Alice Apple"), + "full_name should be masked, got: {full_names}" + ); + let single_names = query_one_value( + NS, + &deploy, + "SELECT string_agg(single_name, '|' ORDER BY id) FROM users", + ) + .await; + assert!( + !single_names.contains("Alice"), + "single_name should be masked, got: {single_names}" + ); + + println!("--- verifying date mask changed dob"); + let dobs = query_one_value( + NS, + &deploy, + "SELECT string_agg(dob::text, ',' ORDER BY id) FROM users", + ) + .await; + assert!( + !dobs.contains("1980-01-15"), + "dob should be masked, got: {dobs}" + ); + + println!("--- verifying phone mask preserves prefix/suffix"); + let phones = query_one_value( + NS, + &deploy, + "SELECT string_agg(phone, ',' ORDER BY id) FROM users", + ) + .await; + // anon.partial(phone, 2, '****', 2) keeps first 2 and last 2 chars + assert!( + phones.contains("+6") && phones.contains("****"), + "phone should be partial-masked with ****, got: {phones}" + ); + + println!("--- verifying integer-range mask kept values in [50, 100]"); + let out_of_range = query_one_value( + NS, + &deploy, + "SELECT count(*) FROM users WHERE heart_rate < 50 OR heart_rate > 100", + ) + .await; + assert_eq!( + out_of_range.trim(), + "0", + "heart_rate should stay in 50..100" + ); + + println!("--- verifying read-only was re-enabled"); + let setting = query_one_value(NS, &deploy, "SHOW default_transaction_read_only").await; + assert_eq!( + setting.trim(), + "on", + "default_transaction_read_only should be on" + ); + + println!("--- verifying analytics role was demoted from SUPERUSER"); + let rolsuper = query_one_value( + NS, + &deploy, + "SELECT rolsuper::text FROM pg_roles WHERE rolname = 'analytics'", + ) + .await; + assert_eq!( + rolsuper.trim(), + "false", + "analytics user should no longer be SUPERUSER" + ); + + println!("--- all redaction assertions passed"); +} + +async fn deploy_manifest_server(ns: &str) { + let status = tokio::process::Command::new("kubectl") + .args([ + "apply", + "-n", + ns, + "-f", + "tests/fixtures/manifest-server.yaml", + ]) + .status() + .await + .expect("failed to run kubectl apply"); + assert!(status.success(), "kubectl apply for manifest server failed"); + + // Wait briefly for the Service endpoint to come up. Best-effort — + // the in-cluster DNS resolves the Service name even before the + // backing pod is Ready, and the operator's reqwest call retries + // on transient connection failures via the redaction failed:* path. + tokio::time::sleep(Duration::from_secs(5)).await; +} + +fn build_pg18_kopia_secret(ns: &str, name: &str) -> Secret { + Secret { + metadata: ObjectMeta { + name: Some(name.into()), + namespace: Some(ns.into()), + ..Default::default() + }, + data: Some(BTreeMap::from([ + ("bucket".into(), ByteString("test-bucket-pg18".into())), + ("region".into(), ByteString("us-east-1".into())), + ("accessKeyId".into(), ByteString("minioadmin".into())), + ("secretAccessKey".into(), ByteString("minioadmin".into())), + ( + "repositoryPassword".into(), + ByteString("test-repo-password".into()), + ), + ("endpoint".into(), ByteString("minio.minio.svc:9000".into())), + ("disableTls".into(), ByteString("true".into())), + ])), + ..Default::default() + } +} + +fn build_redaction_replica(name: &str, secret_ref: &str) -> PostgresPhysicalReplica { + let mut replica = PostgresPhysicalReplica::new( + name, + PostgresPhysicalReplicaSpec { + kopia_secret_ref: SecretReference { + name: Some(secret_ref.into()), + namespace: None, + }, + snapshot_filter: None, + schedule: "0 */6 * * *".into(), + schedule_jitter: Default::default(), + minimum_ttl: None, + switchover_grace_period: TimeSpan(jiff::Span::new().seconds(10)), + analytics_username: "analytics".into(), + storage_class: None, + storage_size_override: None, + resources: None, + service_annotations: None, + pod_annotations: None, + affinity: None, + tolerations: vec![], + read_only: true, + postgres_extra_config: None, + notifications: vec![], + persistent_schemas: None, + redaction: Some(RedactionSpec { + manifest_url: format!("http://manifest-server.{NS}.svc/manifest.json"), + version: None, + version_query: Some( + "SELECT value FROM local_system_facts WHERE key = 'currentVersion'".into(), + ), + version_fallback_to_base: false, + }), + storage_size_maximum: Quantity("2Ti".into()), + }, + ); + replica.metadata.namespace = Some(NS.into()); + replica +} + +async fn query_one_value(ns: &str, deploy: &str, sql: &str) -> String { + kubectl_exec( + ns, + deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + sql, + ], + ) + .await +}