Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions PLAN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Plan: 0dt read-only support for logging persist sinks

## Problem

Logging persist sinks are not tracked as compute collections by the controller, so they cannot use the per-collection `AllowWrites` mechanism.
During 0dt deploys, a read-only replica should not write to persist shards until promoted.

An earlier attempt piggybacked on the first `AllowWrites` for any collection, but this broke clusters with no MVs (no `AllowWrites` was ever sent).

## Approach: add `read_only` to `InstanceConfig`

The compute controller already knows its `read_only` state (instance-level `read_only: bool`).
`CreateInstance(InstanceConfig)` is sent to all replicas and tracked in the command history.
Adding a `read_only: bool` field to `InstanceConfig` lets the controller communicate its read-only state to all replicas at instance creation time.

On the replica, `handle_create_instance` sets a `watch::Sender<bool>` on `ComputeState` before initializing logging, so logging persist sinks pick up the correct value.
The signal also flips to writable on the first `AllowWrites` command for any collection, providing a fallback for 0dt promotion.

This handles all cases:
* Read-only controller (0dt catchup): sends `read_only: true`, sinks stay read-only.
* Writable controller: sends `read_only: false`, sinks start writable.
* Writable controller with no MVs: still sends `read_only: false`, sinks start writable.
* Command history replay: the `CreateInstance` includes the `read_only` field, so reconnecting replicas get the right state.
* 0dt promotion: `AllowWrites` for any collection flips the signal from `true` to `false`.

## Implementation (done)

### 1. Add `read_only` field to `InstanceConfig`

**File:** `src/compute-client/src/protocol/command.rs`

* Added `pub read_only: bool` to `InstanceConfig`.
* Updated `compatible_with()` destructures to include `read_only: _` (transitions handled via `AllowWrites`, no compatibility check needed).

### 2. Set `read_only` from the compute controller

**File:** `src/compute-client/src/controller/instance.rs`

* In `Instance::run()`, set `read_only: self.read_only` when constructing `InstanceConfig`.

### 3. Add replica-level read-only signal to `ComputeState`

**File:** `src/compute/src/compute_state.rs`

* Added `read_only_tx: watch::Sender<bool>` and `pub read_only_rx: watch::Receiver<bool>` fields.
* Initialized as `watch::channel(true)` (read-only by default, safe default for 0dt).
* In `handle_create_instance`, call `send_replace(config.read_only)` before `initialize_logging`.
* In `handle_allow_writes`, flip `read_only_tx` to `false` via `send_if_modified`.

### 4. Use the replica-level signal in logging persist sinks

**File:** `src/compute/src/logging/persist.rs`

* Replaced `let (_tx, read_only_rx) = watch::channel(false)` with `compute_state.read_only_rx.clone()`.
* Removed `use tokio::sync::watch` import.
11 changes: 11 additions & 0 deletions ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: persisted-introspection
label: "Persisted Introspection"
depends_on: build-aarch64
timeout_in_minutes: 30
inputs: [test/persisted-introspection]
plugins:
- ./ci/plugins/mzcompose:
composition: persisted-introspection
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: secrets-logging
label: "Secrets Logging"
depends_on: build-aarch64
Expand Down
369 changes: 369 additions & 0 deletions doc/developer/design/20250225_persist_backed_introspection.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ def get_default_system_parameters(
"kafka_reconnect_backoff_max",
"oidc_issuer",
"oidc_audience",
"enable_persist_introspection",
]


Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ def __init__(
"false",
]
self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_persist_introspection"] = BOOLEAN_FLAG_VALUES

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
Expand Down
163 changes: 157 additions & 6 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ use mz_ore::{assert_none, instrument, soft_assert_no_log};
use mz_pgrepr::oid::INVALID_OID;
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
use mz_repr::role_id::RoleId;
use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
use mz_repr::{
CatalogItemId, Diff, GlobalId, RelationDesc, RelationVersion, Timestamp, VersionedRelationDesc,
};
use mz_sql::catalog::CatalogError as SqlCatalogError;
use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
use mz_sql::names::{
FullItemName, ItemQualifiers, QualifiedItemName, RawDatabaseSpecifier,
ResolvedDatabaseSpecifier, ResolvedIds, SchemaSpecifier,
ResolvedDatabaseSpecifier, ResolvedIds, SchemaId, SchemaSpecifier,
};
use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
use mz_sql::session::vars::{VarError, VarInput};
use mz_sql::session::vars::{DEFAULT_DATABASE_NAME, VarError, VarInput};
use mz_sql::{plan, rbac};
use mz_sql_parser::ast::Expr;
use mz_storage_types::sources::Timeline;
Expand Down Expand Up @@ -87,6 +89,7 @@ struct InProgressRetractions {
items: BTreeMap<ItemKey, CatalogEntry>,
temp_items: BTreeMap<CatalogItemId, CatalogEntry>,
introspection_source_indexes: BTreeMap<CatalogItemId, CatalogEntry>,
persisted_introspection_sources: BTreeMap<CatalogItemId, CatalogEntry>,
system_object_mappings: BTreeMap<CatalogItemId, CatalogEntry>,
}

Expand Down Expand Up @@ -307,6 +310,9 @@ impl CatalogState {
retractions,
);
}
StateUpdateKind::PersistedIntrospectionSource(source) => {
self.apply_persisted_introspection_source_update(source, diff, retractions);
}
StateUpdateKind::ClusterReplica(cluster_replica) => {
self.apply_cluster_replica_update(cluster_replica, diff, retractions);
}
Expand Down Expand Up @@ -588,6 +594,49 @@ impl CatalogState {
}
}

#[instrument(level = "debug")]
fn apply_persisted_introspection_source_update(
&mut self,
source: mz_catalog::durable::PersistedIntrospectionSource,
diff: StateDiff,
retractions: &mut InProgressRetractions,
) {
match diff {
StateDiff::Addition => {
if let Some(mut entry) = retractions
.persisted_introspection_sources
.remove(&source.item_id)
{
// This should only happen during startup as a result of builtin migrations.
let (name, catalog_item) = self.create_persisted_introspection_source(
&source.name,
source.schema_id,
source.global_id,
);
assert_eq!(entry.id, source.item_id);
assert_eq!(entry.oid, source.oid);
assert_eq!(entry.name, name);
entry.item = catalog_item;
self.insert_entry(entry);
} else {
self.insert_persisted_introspection_source(
&source.name,
source.schema_id,
source.item_id,
source.global_id,
source.oid,
);
}
}
StateDiff::Retraction => {
let entry = self.drop_item(source.item_id);
retractions
.persisted_introspection_sources
.insert(entry.id, entry);
}
}
}

#[instrument(level = "debug")]
fn apply_cluster_replica_update(
&mut self,
Expand Down Expand Up @@ -1367,6 +1416,9 @@ impl CatalogState {
StateUpdateKind::IntrospectionSourceIndex(introspection_source_index) => {
self.pack_item_update(introspection_source_index.item_id, diff)
}
StateUpdateKind::PersistedIntrospectionSource(source) => {
self.pack_item_update(source.item_id, diff)
}
StateUpdateKind::ClusterReplica(cluster_replica) => self.pack_cluster_replica_update(
cluster_replica.cluster_id,
&cluster_replica.name,
Expand Down Expand Up @@ -1905,6 +1957,77 @@ impl CatalogState {
(index_name, index)
}

fn insert_persisted_introspection_source(
&mut self,
log_name: &str,
schema_id: SchemaId,
item_id: CatalogItemId,
global_id: GlobalId,
oid: u32,
) {
let (name, catalog_item) =
self.create_persisted_introspection_source(log_name, schema_id, global_id);
self.insert_item(
item_id,
oid,
name,
catalog_item,
MZ_SYSTEM_ROLE_ID,
PrivilegeMap::from_mz_acl_items(vec![
rbac::default_builtin_object_privilege(mz_sql::catalog::ObjectType::Source),
rbac::owner_privilege(mz_sql::catalog::ObjectType::Source, MZ_SYSTEM_ROLE_ID),
]),
);
}

fn create_persisted_introspection_source(
&self,
log_name: &str,
schema_id: SchemaId,
global_id: GlobalId,
) -> (QualifiedItemName, CatalogItem) {
let log = BUILTIN_LOG_LOOKUP
.get(log_name)
.expect("missing builtin log");
let log_variant = log.variant;

let database_id = *self
.database_by_name
.get(DEFAULT_DATABASE_NAME)
.expect("materialize database must exist");
let database = &self.database_by_id[&database_id];
let schema = &database.schemas_by_id[&schema_id];

let name = QualifiedItemName {
qualifiers: ItemQualifiers {
database_spec: ResolvedDatabaseSpecifier::Id(database_id),
schema_spec: SchemaSpecifier::Id(schema_id),
},
item: log_name.to_string(),
};

let full_name = FullItemName {
database: RawDatabaseSpecifier::Name(database.name.clone()),
schema: schema.name.schema.clone(),
item: log_name.to_string(),
};
let create_sql =
persisted_introspection_source_sql(full_name, log_name, &log_variant.desc());

let catalog_item = CatalogItem::Source(Source {
create_sql: Some(create_sql),
global_id,
data_source: DataSourceDesc::PersistedIntrospection(log_variant),
desc: log_variant.desc(),
timeline: Timeline::EpochMilliseconds,
resolved_ids: ResolvedIds::empty(),
custom_logical_compaction_window: None,
is_retained_metrics_object: false,
});

(name, catalog_item)
}

/// Insert system configuration `name` with `value`.
///
/// Return a `bool` value indicating whether the configuration was modified
Expand All @@ -1922,6 +2045,31 @@ impl CatalogState {
}
}

/// Generate a valid `CREATE MATERIALIZED VIEW` SQL string for a persisted
/// introspection source. The SQL is used for the `create_sql` field so
/// the catalog consistency checker can verify the item name.
fn persisted_introspection_source_sql(
name: FullItemName,
log_name: &str,
desc: &RelationDesc,
) -> String {
use mz_sql_parser::ast::Ident;
use mz_sql_parser::ast::display::AstDisplay;

let unresolved = mz_sql::normalize::unresolve(name);
let columns = desc
.iter_names()
.map(|col| Ident::new_unchecked(col.as_str()).to_ast_string_stable())
.collect::<Vec<_>>()
.join(", ");
format!(
"CREATE MATERIALIZED VIEW {} AS SELECT {} FROM mz_introspection.{}",
unresolved.to_ast_string_stable(),
columns,
Ident::new_unchecked(log_name).to_ast_string_stable(),
)
}

/// Sort [`StateUpdate`]s in dependency order.
///
/// # Panics
Expand Down Expand Up @@ -1984,6 +2132,7 @@ fn sort_updates(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
),
StateUpdateKind::Cluster(_)
| StateUpdateKind::IntrospectionSourceIndex(_)
| StateUpdateKind::PersistedIntrospectionSource(_)
| StateUpdateKind::ClusterReplica(_) => push_update(
update,
diff,
Expand Down Expand Up @@ -2386,9 +2535,11 @@ impl ApplyState {
Self::BuiltinViewAdditions(vec![view_addition])
}

IntrospectionSourceIndex(_) | SystemObjectMapping(_) | TemporaryItem(_) | Item(_) => {
Self::Items(vec![update])
}
IntrospectionSourceIndex(_)
| PersistedIntrospectionSource(_)
| SystemObjectMapping(_)
| TemporaryItem(_)
| Item(_) => Self::Items(vec![update]),

Role(_)
| RoleAuth(_)
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl Catalog {
})
}
BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
| BootstrapStateUpdateKind::PersistedIntrospectionSource(_)
| BootstrapStateUpdateKind::SystemObjectMapping(_) => {
system_item_updates.push(StateUpdate {
kind: kind.into(),
Expand Down Expand Up @@ -981,6 +982,7 @@ fn add_new_remove_old_builtin_clusters_migration(
logging: default_logging_config(),
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
persist_introspection: false,
}),
workload_class: None,
},
Expand Down
Loading