From dc01157ac929d71f2485053ead7f225ac8f3f2a8 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 12 Jan 2026 13:37:19 +0100 Subject: [PATCH 01/10] adapter: use occ for read-then write, with incremental retries We now use SUBSCRIBE instead of PEEK to maintain the desired set of updates that need to be written. We also don't acquire locks on tables but instead optimistically try and write our updates at the timestamp right at our current subscribe frontier. Additionally, we take the opportunity this provides and move the sequencing code from the coordinator main loop to the frontend, similar to how we have done that for peeks in frontend_peek.rs. Work towards https://github.com/MaterializeInc/database-issues/issues/6686 Implementation of https://github.com/MaterializeInc/materialize/blob/63645b72e83ee26d2cfa99d25d773a06e6accb74/doc/developer/design/20260210_incremental_occ_read_then_write.md --- ...0260210_incremental_occ_read_then_write.md | 6 +- src/adapter-types/src/dyncfgs.rs | 8 + src/adapter/src/active_compute_sink.rs | 3 + src/adapter/src/client.rs | 211 +++- src/adapter/src/command.rs | 61 +- src/adapter/src/coord.rs | 48 +- src/adapter/src/coord/appends.rs | 262 +++- src/adapter/src/coord/command_handler.rs | 48 + src/adapter/src/coord/ddl.rs | 6 +- src/adapter/src/coord/read_then_write.rs | 162 +++ src/adapter/src/coord/sequencer.rs | 9 + src/adapter/src/coord/sequencer/inner.rs | 176 +-- .../src/coord/sequencer/inner/subscribe.rs | 2 + src/adapter/src/coord/sql.rs | 51 +- src/adapter/src/frontend_read_then_write.rs | 1098 +++++++++++++++++ src/adapter/src/lib.rs | 1 + src/adapter/src/metrics.rs | 6 + src/adapter/src/optimize.rs | 4 +- src/adapter/src/peek_client.rs | 20 +- src/sql/src/session/vars.rs | 12 + src/sql/src/session/vars/definitions.rs | 14 + 21 files changed, 2069 insertions(+), 139 deletions(-) create mode 100644 src/adapter/src/coord/read_then_write.rs create mode 100644 src/adapter/src/frontend_read_then_write.rs diff --git a/doc/developer/design/20260210_incremental_occ_read_then_write.md b/doc/developer/design/20260210_incremental_occ_read_then_write.md index db6286fb2f210..4534b7fbd3871 100644 --- a/doc/developer/design/20260210_incremental_occ_read_then_write.md +++ b/doc/developer/design/20260210_incremental_occ_read_then_write.md @@ -128,7 +128,7 @@ Session Task Coordinator | | |-- acquire OCC semaphore | | | - |-- CreateReadThenWriteSubscribe ----> | + |-- CreateInternalSubscribe ---------> | | <------------ subscribe channel -----| | | | +-- OCC Loop ------------------+ | @@ -141,7 +141,7 @@ Session Task Coordinator | | if Success: break | | | +------------------------------+ | | | - |-- DropReadThenWriteSubscribe ------> | + |-- DropInternalSubscribe -----------> | | | ``` @@ -193,7 +193,7 @@ subscribe. The subscribes created for read-then-write are internal: they do not appear in `mz_subscriptions` or other introspection tables, and they don't increment the active subscribes metric. They are created and dropped via dedicated `Command` -variants (`CreateReadThenWriteSubscribe`, `DropReadThenWriteSubscribe`). +variants (`CreateInternalSubscribe`, `DropInternalSubscribe`). ## Correctness diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index c1c1e1296772f..da7dfd2f529e3 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -155,6 +155,13 @@ pub const ENABLE_S3_TABLES_REGION_CHECK: Config = Config::new( "Whether to enforce that S3 Tables connections are in the same region as the environment.", ); +pub const FRONTEND_READ_THEN_WRITE: Config = Config::new( + "enable_adapter_frontend_occ_read_then_write", + // WIP: true for testing in ci, Should be false before merging. + true, + "Use frontend sequencing for DELETE/UPDATE operations.", +); + /// Adds the full set of all adapter `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -178,4 +185,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION) .add(&PERSIST_FAST_PATH_ORDER) .add(&ENABLE_S3_TABLES_REGION_CHECK) + .add(&FRONTEND_READ_THEN_WRITE) } diff --git a/src/adapter/src/active_compute_sink.rs b/src/adapter/src/active_compute_sink.rs index e6251e75a6022..3e0b6357105c7 100644 --- a/src/adapter/src/active_compute_sink.rs +++ b/src/adapter/src/active_compute_sink.rs @@ -116,6 +116,9 @@ pub struct ActiveSubscribe { pub start_time: EpochMillis, /// How to present the subscribe's output. pub output: SubscribeOutput, + /// If true, this is an internal subscribe that should not appear in + /// introspection tables like mz_subscriptions. + pub internal: bool, } impl ActiveSubscribe { diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 716e383ad8ef5..9de2580d76901 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -268,6 +268,9 @@ impl Client { persist_client, statement_logging_frontend, superuser_attribute, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, } = response; let peek_client = PeekClient::new( @@ -277,6 +280,9 @@ impl Client { optimizer_metrics, persist_client, statement_logging_frontend, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, ); let mut client = SessionClient { @@ -731,11 +737,20 @@ impl SessionClient { // No additional work needed here. return Ok((resp, execute_started)); } else { - debug!("frontend peek did not happen, falling back to `Command::Execute`"); + debug!("frontend peek did not happen, trying frontend read-then-write"); + } + + // Attempt read-then-write sequencing in the session task. + if let Some(resp) = self + .try_frontend_read_then_write(&portal_name, &mut outer_ctx_extra) + .await? + { + debug!("frontend read-then-write succeeded"); + return Ok((resp, execute_started)); + } else { + debug!("frontend read-then-write did not happen, falling back to `Command::Execute`"); // If we bailed out, outer_ctx_extra is still present (if it was originally). // `Command::Execute` will handle it. - // (This is not true if we bailed out _after_ the frontend peek sequencing has already - // begun its own statement logging. That case would be a bug.) } let response = self @@ -1052,7 +1067,10 @@ impl SessionClient { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => {} + | Command::FrontendStatementLogging(..) + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => {} }; cmd }); @@ -1156,6 +1174,191 @@ impl SessionClient { Ok(None) } } + + /// Attempt to sequence a read-then-write (DELETE/UPDATE/INSERT INTO .. + /// SELECT .. FROM) from the session task. + /// + /// Returns `Ok(Some(response))` if we handled the operation, or `Ok(None)` + /// to fall back to the Coordinator's sequencing. If it returns an error, it + /// should be returned to the user. + pub(crate) async fn try_frontend_read_then_write( + &mut self, + portal_name: &str, + outer_ctx_extra: &mut Option, + ) -> Result, AdapterError> { + use mz_expr::RowSetFinishing; + use mz_sql::plan::{MutationKind, Plan, ReadThenWritePlan}; + use mz_sql_parser::ast::Statement; + + // Check if frontend read-then-write is enabled (determined once at process startup + // to avoid a mixed-mode window where both the old lock-based path and the new OCC + // path are active concurrently). + if !self.peek_client.frontend_read_then_write_enabled { + return Ok(None); + } + + let catalog = self.catalog_snapshot("try_frontend_read_then_write").await; + + // Extract statement, params, and logging info from portal + let (stmt, params, logging, lifecycle_timestamps) = { + let session = self.session.as_ref().expect("SessionClient invariant"); + let portal = match session.get_portal_unverified(portal_name) { + Some(portal) => portal, + None => return Ok(None), // Portal doesn't exist, fall back + }; + ( + portal.stmt.clone(), + portal.parameters.clone(), + Arc::clone(&portal.logging), + portal.lifecycle_timestamps.clone(), + ) + }; + + // Check if it's a DELETE, UPDATE, or INSERT statement + let stmt = match stmt { + Some(stmt) + if matches!( + &*stmt, + Statement::Delete(_) | Statement::Update(_) | Statement::Insert(_) + ) => + { + stmt + } + Some(_stmt) => { + return Ok(None); + } + None => { + return Ok(None); + } + }; + + // Needed later, for an error message. + let stmt_string = stmt.to_string(); + + // Plan the statement + let (plan, target_cluster) = { + let session = self.session.as_mut().expect("SessionClient invariant"); + let conn_catalog = catalog.for_session(session); + let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?; + let pcx = session.pcx(); + let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?; + + let target_cluster = match session.transaction().cluster() { + Some(cluster_id) => crate::coord::TargetCluster::Transaction(cluster_id), + None => crate::coord::catalog_serving::auto_run_on_catalog_server( + &conn_catalog, + session, + &plan, + ), + }; + + (plan, target_cluster) + }; + + // Handle ReadThenWrite plans or Insert plans. + let rtw_plan = match plan { + Plan::ReadThenWrite(rtw_plan) => rtw_plan, + Plan::Insert(insert_plan) => { + // For INSERT, we need to check if it's a constant insert + // without RETURNING. Constant inserts use a fast path in the + // coordinator, so we fall back. + // + // We need to lower HIR to MIR to check for constants because + // VALUES statements are planned as Wrap calls at the HIR level. + let optimized_mir = if insert_plan.values.as_const().is_some() { + // Already constant at HIR level - just lower without optimization + let expr = insert_plan + .values + .clone() + .lower(catalog.system_config(), None)?; + mz_expr::OptimizedMirRelationExpr(expr) + } else { + // Need to optimize to check if it becomes constant + let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config()); + let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None); + optimizer.optimize(insert_plan.values.clone())? + }; + + // Constant INSERT without RETURNING are blind-writes. Add to + // the transaction and let those code paths handle it. + let inner_mir = optimized_mir.into_inner(); + if inner_mir.as_const().is_some() && insert_plan.returning.is_empty() { + let session = self.session.as_mut().expect("SessionClient invariant"); + return Ok(Some(Coordinator::insert_constant( + &catalog, + session, + insert_plan.id, + inner_mir, + )?)); + } + + // Get table descriptor arity for the finishing projection + let desc_arity = match catalog.try_get_entry(&insert_plan.id) { + Some(table) => { + let desc = table + .relation_desc_latest() + .ok_or_else(|| AdapterError::Internal("table has no desc".into()))?; + desc.arity() + } + None => { + return Err(AdapterError::Catalog(mz_catalog::memory::error::Error { + kind: mz_catalog::memory::error::ErrorKind::Sql( + mz_sql::catalog::CatalogError::UnknownItem( + insert_plan.id.to_string(), + ), + ), + })); + } + }; + + let finishing = RowSetFinishing { + order_by: vec![], + limit: None, + offset: 0, + project: (0..desc_arity).collect(), + }; + + // Convert InsertPlan to ReadThenWritePlan + ReadThenWritePlan { + id: insert_plan.id, + selection: insert_plan.values, + finishing, + assignments: BTreeMap::new(), + kind: MutationKind::Insert, + returning: insert_plan.returning, + } + } + _ => { + return Err(AdapterError::Internal( + "unexpected plan type for mutation".into(), + )); + } + }; + + // The OCC path commits writes immediately and they cannot be rolled + // back, so reject explicit transaction blocks. (Constant INSERTs are + // handled above and don't go through OCC.) + { + let session = self.session.as_ref().expect("SessionClient invariant"); + if !session.transaction().is_implicit() { + return Err(AdapterError::OperationProhibitsTransaction(stmt_string)); + } + } + + let session = self.session.as_mut().expect("SessionClient invariant"); + self.peek_client + .frontend_read_then_write( + session, + rtw_plan, + target_cluster, + ¶ms, + &logging, + lifecycle_timestamps, + outer_ctx_extra, + ) + .await + .map(Some) + } } impl Drop for SessionClient { diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 405018dde0473..b9f2c48de1d67 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -31,7 +31,7 @@ use mz_persist_client::PersistClient; use mz_pgcopy::CopyFormatParams; use mz_repr::global_id::TransientIdGen; use mz_repr::role_id::RoleId; -use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType}; +use mz_repr::{CatalogItemId, ColumnIndex, Diff, GlobalId, Row, RowIterator, SqlRelationType}; use mz_sql::ast::{FetchDirection, Raw, Statement}; use mz_sql::catalog::ObjectType; use mz_sql::optimizer_metrics::OptimizerMetrics; @@ -41,16 +41,17 @@ use mz_sql::session::vars::{OwnedVarInput, SystemVars}; use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement}; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{Semaphore, mpsc, oneshot}; use uuid::Uuid; use crate::catalog::Catalog; -use crate::coord::appends::BuiltinTableAppendNotify; +use crate::coord::appends::{BuiltinTableAppendNotify, TimestampedWriteResult}; use crate::coord::consistency::CoordinatorInconsistencies; use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary}; use crate::coord::timestamp_selection::TimestampDetermination; use crate::coord::{ExecuteContextExtra, ExecuteContextGuard}; use crate::error::AdapterError; +use crate::optimize::LirDataflowDescription; use crate::session::{EndTransactionAction, RowBatchStream, Session}; use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{ @@ -324,6 +325,41 @@ pub enum Command { /// Statement logging event from frontend peek sequencing. /// No response channel needed - this is fire-and-forget. FrontendStatementLogging(FrontendStatementLoggingEvent), + + /// Creates an internal subscribe (not visible in introspection) and returns + /// the response channel. Initially used for frontend-sequenced + /// read-then-write (DELETE/UPDATE/INSERT...SELECT) operations via OCC. + CreateInternalSubscribe { + df_desc: Box, + cluster_id: ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: mz_repr::Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: ConnectionId, + session_uuid: Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: ReadHolds, + tx: oneshot::Sender, AdapterError>>, + }, + + /// Attempts a timestamped write for OCC (optimistic concurrency control). + /// Used by frontend read-then-write to submit accumulated diffs. + AttemptTimestampedWrite { + target_id: CatalogItemId, + diffs: Vec<(Row, Diff)>, + write_ts: mz_repr::Timestamp, + tx: oneshot::Sender, + }, + + /// Drops an internal subscribe. + /// + /// Used for cleanup after the subscribe's purpose is fulfilled or on error. + DropInternalSubscribe { + sink_id: GlobalId, + tx: oneshot::Sender<()>, + }, } impl Command { @@ -358,7 +394,10 @@ impl Command { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => None, + | Command::FrontendStatementLogging(..) + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => None, } } @@ -393,7 +432,10 @@ impl Command { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => None, + | Command::FrontendStatementLogging(..) + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => None, } } } @@ -435,6 +477,15 @@ pub struct StartupResponse { pub optimizer_metrics: OptimizerMetrics, pub persist_client: PersistClient, pub statement_logging_frontend: StatementLoggingFrontend, + /// Semaphore for limiting concurrent OCC (optimistic concurrency control) + /// write operations. + pub occ_write_semaphore: Arc, + /// Whether frontend OCC read-then-write is enabled (determined once at + /// process startup). + pub frontend_read_then_write_enabled: bool, + /// Whether the coordinator is in read-only mode (e.g. during 0dt upgrades). + /// The frontend path must reject mutations when this is true. + pub read_only: bool, } #[derive(Derivative)] diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index fca7673b350c0..38caf8df0865d 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -168,7 +168,7 @@ use thiserror::Error; use timely::progress::{Antichain, Timestamp as _}; use tokio::runtime::Handle as TokioHandle; use tokio::select; -use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch}; +use tokio::sync::{OwnedMutexGuard, Semaphore, mpsc, oneshot, watch}; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -228,6 +228,7 @@ mod indexes; mod introspection; mod message_handler; mod privatelink_status; +mod read_then_write; mod sql; mod validity; @@ -379,6 +380,9 @@ impl Message { Command::ExplainTimestamp { .. } => "explain-timestamp", Command::FrontendStatementLogging(..) => "frontend-statement-logging", Command::StartCopyFromStdin { .. } => "start-copy-from-stdin", + Command::CreateInternalSubscribe { .. } => "create-internal-subscribe", + Command::AttemptTimestampedWrite { .. } => "attempt-timestamped-write", + Command::DropInternalSubscribe { .. } => "drop-internal-subscribe", }, Message::ControllerReady { controller: ControllerReadiness::Compute, @@ -1811,6 +1815,36 @@ pub struct Coordinator { /// Pending writes waiting for a group commit. pending_writes: Vec, + /// Semaphore to limit concurrent OCC (optimistic concurrency control) + /// read-then-write operations. + /// + /// When multiple read-then-write operations run concurrently, each one + /// maintains a subscribe that continually receives updates and must + /// consolidate data. Allowing too many concurrent operations causes wasted + /// work as all subscribes receive and process updates even though only one + /// can write at a given timestamp. Worst case, with N concurrent occ loops, + /// whenever one loop succeeds N-1 loops have to update their state, so we + /// do `O(n^2)` work. + /// + /// This semaphore limits concurrency at the operation level. Each + /// read-then-write operation acquires a permit before starting its + /// subscribe, ensuring bounded resource usage. Additional operations wait + /// until a permit becomes available. + /// + /// NOTE: The number of permits is read from `max_concurrent_occ_writes` at + /// coordinator startup and is **not** updated if the system variable + /// changes at runtime. A restart of `environmentd` is required for changes + /// to take effect. + occ_write_semaphore: Arc, + + /// Whether frontend OCC read-then-write is enabled. Read once at startup + /// from the `FRONTEND_READ_THEN_WRITE` dyncfg and fixed for the lifetime of + /// this process. This avoids a mixed-mode window where both the old + /// lock-based path and the new OCC path are active concurrently, which + /// could allow an OCC write to slip between an old-path reader's read and + /// write phases. + frontend_read_then_write_enabled: bool, + /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the /// table's timestamps, but there are cases where timestamps are not bumped but /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over @@ -3551,6 +3585,7 @@ impl Coordinator { // writes. let user_write_spans = self.pending_writes.iter().flat_map(|x| match x { PendingWriteTxn::User{span, ..} => Some(span), + PendingWriteTxn::InternalTimestamped{span, ..} => Some(span), PendingWriteTxn::System{..} => None, }); let span = match user_write_spans.exactly_one() { @@ -4463,6 +4498,15 @@ pub fn serve( } let catalog = Arc::new(catalog); + // Read once at startup; changing this system variable requires + // an environmentd restart to take effect (see field doc on + // `occ_write_semaphore`). + let max_concurrent_occ_writes = + usize::cast_from(catalog.system_config().max_concurrent_occ_writes()); + let frontend_read_then_write_enabled = { + use mz_adapter_types::dyncfgs::FRONTEND_READ_THEN_WRITE; + FRONTEND_READ_THEN_WRITE.get(catalog.system_config().dyncfgs()) + }; let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader()); let mut coord = Coordinator { @@ -4487,6 +4531,8 @@ pub fn serve( write_locks: BTreeMap::new(), deferred_write_ops: BTreeMap::new(), pending_writes: Vec::new(), + occ_write_semaphore: Arc::new(Semaphore::new(max_concurrent_occ_writes)), + frontend_read_then_write_enabled, advance_timelines_interval, secrets_controller, caching_secrets_reader, diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index 4f682979eb385..0a4d78f8ec633 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -31,6 +31,7 @@ use mz_sql::session::metadata::SessionMetadata; use mz_storage_client::client::TableData; use mz_timestamp_oracle::WriteTimestamp; use smallvec::SmallVec; +use timely::PartialOrder; use tokio::sync::{Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore, oneshot}; use tracing::{Instrument, Span, debug_span, info, warn}; @@ -131,6 +132,29 @@ pub(crate) enum BuiltinTableUpdateSource { Background(oneshot::Sender<()>), } +/// Result of a timestamped write attempt. +/// +/// A timestamped write is a write that must occur at a specific timestamp. This +/// is used for read-then-write patterns where a read happens at timestamp T and +/// the write must happen at exactly T+1 to maintain serializability. +#[derive(Debug, Clone)] +pub enum TimestampedWriteResult { + /// The write was committed at the target timestamp. + Success { + /// The timestamp at which the write was committed. + timestamp: Timestamp, + }, + /// The target timestamp was already past - the write cannot proceed. + TimestampPassed { + /// The timestamp the write was targeting. + target_timestamp: Timestamp, + /// The current write timestamp from the oracle. + current_write_ts: Timestamp, + }, + /// The write was cancelled (e.g., connection closed, coordinator shutdown). + Cancelled, +} + /// A pending write transaction that will be committing during the next group commit. #[derive(Debug)] pub(crate) enum PendingWriteTxn { @@ -149,6 +173,31 @@ pub(crate) enum PendingWriteTxn { updates: Vec, source: BuiltinTableUpdateSource, }, + /// Internal timestamped write (e.g., from read-then-write subscribe OCC). + /// + /// Unlike User writes, this doesn't have an ExecuteContext - the caller + /// handles responses via the `result_tx` channel. + /// + /// **Important**: This variant supports writes to multiple tables in a + /// single entry. It is the caller's responsibility to ensure that: + /// 1. All writes are consistent (e.g., computed from the same read + /// timestamp) + /// 2. Dependencies between tables have been properly resolved + /// + /// **Concurrency semantics**: Only ONE `InternalTimestamped` write is + /// processed per group commit round. If multiple are submitted at the same + /// `target_timestamp`, only one is processed and others fail with + /// `TimestampPassed`. This is required for correctness because each + /// timestamped write may not have resolved dependencies with other + /// concurrent timestamped writes - they were computed independently and + /// could be inconsistent if applied together. The failing writes will retry + /// with a new timestamp. + InternalTimestamped { + span: Span, + writes: BTreeMap>, + target_timestamp: Timestamp, + result_tx: oneshot::Sender, + }, } impl PendingWriteTxn { @@ -339,6 +388,8 @@ impl Coordinator { match pending_write { // We always allow system writes to proceed. PendingWriteTxn::System { .. } => validated_writes.push(pending_write), + // Internal timestamped writes don't need locks (OCC handles conflicts). + PendingWriteTxn::InternalTimestamped { .. } => validated_writes.push(pending_write), // We have a set of locks! Validate they're correct (expected). PendingWriteTxn::User { span, @@ -433,17 +484,62 @@ impl Coordinator { self.defer_op(acquire_future, DeferredOp::Write(write)); } - // The value returned here still might be ahead of `now()` if `now()` has gone backwards at - // any point during this method or if this was triggered from DDL. We will still commit the - // write without waiting for `now()` to advance. This is ok because the next batch of writes - // will trigger the wait loop in `try_group_commit()` if `now()` hasn't advanced past the - // global timeline, preventing an unbounded advancing of the global timeline ahead of - // `now()`. Additionally DDL is infrequent enough and takes long enough that we don't think - // it's practical for continuous DDL to advance the global timestamp in an unbounded manner. - let WriteTimestamp { - timestamp, - advance_to, - } = self.get_local_write_ts().await; + // Separate timestamped (OCC) writes from regular writes in a single pass. + let mut regular_writes = Vec::new(); + let mut timestamped_writes = Vec::new(); + for write in validated_writes { + match write { + PendingWriteTxn::InternalTimestamped { .. } => timestamped_writes.push(write), + other => regular_writes.push(other), + } + } + + // Determine the write timestamp, resolving any timestamped (OCC) writes. + // + // We can only process ONE InternalTimestamped write per group commit round because: + // 1. Each timestamped write may not have resolved dependencies with other timestamped writes + // 2. They were computed independently and could be inconsistent if applied together + // 3. After committing at timestamp T, the oracle advances past T, so other writes at T + // would fail anyway — we just fail them early here + // + // When a timestamped write is selected, we apply ONLY that write in this round and + // defer all regular writes to the next round. This avoids any subtle interactions + // between the OCC write (which targets a specific timestamp) and regular writes. + let resolved = if !timestamped_writes.is_empty() { + self.resolve_timestamped_writes(timestamped_writes).await + } else { + None + }; + + let (timestamp, advance_to, validated_writes) = match resolved { + Some((ts, advance_to, selected_write)) => { + // Defer all regular writes to the next round. + self.pending_writes.extend(regular_writes); + if !self.pending_writes.is_empty() { + self.trigger_group_commit(); + } + (ts, advance_to, vec![selected_write]) + } + None => { + // Normal flow (no eligible timestamped writes): get timestamp + // from oracle. The value returned here still might be ahead of + // `now()` if `now()` has gone backwards at any point during + // this method or if this was triggered from DDL. We will still + // commit the write without waiting for `now()` to advance. This + // is ok because the next batch of writes will trigger the wait + // loop in `try_group_commit()` if `now()` hasn't advanced past + // the global timeline, preventing an unbounded advancing of the + // global timeline ahead of `now()`. Additionally DDL is + // infrequent enough and takes long enough that we don't think + // it's practical for continuous DDL to advance the global + // timestamp in an unbounded manner. + let WriteTimestamp { + timestamp, + advance_to, + } = self.get_local_write_ts().await; + (timestamp, advance_to, regular_writes) + } + }; // While we're flipping on the feature flags for txn-wal tables and // the separated Postgres timestamp oracle, we also need to confirm @@ -465,6 +561,7 @@ impl Coordinator { let mut appends: BTreeMap> = BTreeMap::new(); let mut responses = Vec::with_capacity(validated_writes.len()); let mut notifies = Vec::new(); + let mut timestamped_result_txs = Vec::new(); for validated_write_txn in validated_writes { match validated_write_txn { @@ -506,6 +603,19 @@ impl Coordinator { | BuiltinTableUpdateSource::Background(tx) => notifies.push(tx), } } + PendingWriteTxn::InternalTimestamped { + span: _, + writes, + target_timestamp: _, + result_tx, + } => { + for (id, table_data) in writes { + if self.catalog().try_get_entry(&id).is_some() { + appends.entry(id).or_default().extend(table_data); + } + } + timestamped_result_txs.push(result_tx); + } } } @@ -585,9 +695,10 @@ impl Coordinator { .instrument(debug_span!("group_commit_apply::append_fut")) .await { - Ok(append_result) => { - append_result.unwrap_or_terminate("cannot fail to apply appends") - } + Ok(append_result) => append_result.unwrap_or_terminate(&format!( + "cannot fail to apply appends at {} ({:?})", + timestamp, permit + )), Err(_) => warn!("Writer terminated with writes in indefinite state"), }; @@ -618,6 +729,11 @@ impl Coordinator { // We don't care if the listeners have gone away. let _ = notify.send(()); } + + // Notify timestamped write callers of success. + for result_tx in timestamped_result_txs { + let _ = result_tx.send(TimestampedWriteResult::Success { timestamp }); + } } .instrument(span), ); @@ -625,6 +741,124 @@ impl Coordinator { timestamp } + /// Resolve timestamped (OCC) writes for a group commit round. + /// + /// Given a set of `InternalTimestamped` writes, this method: + /// 1. Checks which writes are still eligible (target timestamp not yet + /// passed) + /// 2. Selects ONE write at the lowest eligible timestamp for this round + /// 3. Fails writes whose timestamp has passed or that lost the selection + /// 4. Defers writes at higher timestamps to the next round + /// + /// Returns `Some((timestamp, advance_to, selected_write))` if a write was + /// selected, or `None` if all timestamped writes had already passed (in + /// which case the caller should fall back to the normal oracle). + async fn resolve_timestamped_writes( + &mut self, + timestamped_writes: Vec, + ) -> Option<(Timestamp, Timestamp, PendingWriteTxn)> { + debug_assert!(!timestamped_writes.is_empty()); + + let next_eligible_write_ts = self.peek_local_write_ts().await.step_forward(); + + // Find the lowest eligible target timestamp in a single scan. + let lowest_eligible_ts = timestamped_writes + .iter() + .filter_map(|w| match w { + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if !target_timestamp.less_than(&next_eligible_write_ts) => { + Some(*target_timestamp) + } + _ => None, + }) + .min(); + + let Some(target_ts) = lowest_eligible_ts else { + // All timestamped writes have passed. Fail them all; the caller + // will fall back to the normal oracle for regular writes. + for write in timestamped_writes { + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: next_eligible_write_ts, + }); + } + } + return None; + }; + + // Dispatch each timestamped write in a single pass: + // - Passed (target < next_eligible_write_ts): fail with TimestampPassed + // - At target_ts, first one seen: select for this round + // - At target_ts, subsequent: fail (only one per round) + // - Above target_ts: defer to next round + let mut selected: Option = None; + for write in timestamped_writes { + match &write { + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if target_timestamp.less_than(&next_eligible_write_ts) => { + // Timestamp has passed. + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: next_eligible_write_ts, + }); + } + } + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if *target_timestamp == target_ts && selected.is_none() => { + // Winner: include in this round. + selected = Some(write); + } + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if *target_timestamp == target_ts => { + // Loser at same timestamp — only one can write per round. + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: target_ts.step_forward(), + }); + } + } + PendingWriteTxn::InternalTimestamped { .. } => { + // Higher timestamp — defer to next round. + self.pending_writes.push(write); + } + _ => unreachable!("timestamped_writes only contains InternalTimestamped"), + } + } + + // If there are more timestamped writes waiting, trigger another group commit. + if self + .pending_writes + .iter() + .any(|w| matches!(w, PendingWriteTxn::InternalTimestamped { .. })) + { + self.trigger_group_commit(); + } + + selected.map(|write| (target_ts, target_ts.step_forward(), write)) + } + /// Submit a write to be executed during the next group commit and trigger a group commit. pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn) { if self.controller.read_only() { diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 3307e9b8fdccd..7b47ebcebc0d5 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -489,6 +489,51 @@ impl Coordinator { Command::FrontendStatementLogging(event) => { self.handle_frontend_statement_logging_event(event); } + Command::CreateInternalSubscribe { + df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + } => { + self.handle_create_internal_subscribe( + *df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + ) + .await; + } + Command::AttemptTimestampedWrite { + target_id, + diffs, + write_ts, + tx, + } => { + self.handle_attempt_timestamped_write( + target_id, diffs, write_ts, tx, + ) + .await; + } + Command::DropInternalSubscribe { sink_id, tx } => { + self.drop_internal_subscribe(sink_id).await; + let _ = tx.send(()); + } } } .instrument(debug_span!("handle_command")) @@ -761,6 +806,9 @@ impl Coordinator { persist_client: self.persist_client.clone(), statement_logging_frontend, superuser_attribute, + occ_write_semaphore: Arc::clone(&self.occ_write_semaphore), + frontend_read_then_write_enabled: self.frontend_read_then_write_enabled, + read_only: self.controller.read_only(), }); if tx.send(resp).is_err() { // Failed to send to adapter, but everything is setup so we can terminate diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 480d123fe826e..8bb21e4f88a3f 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -641,7 +641,11 @@ impl Coordinator { for sink_id in sink_ids { let sink = match self.remove_active_compute_sink(sink_id).await { None => { - tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink"); + // This can happen due to a race condition: an internal + // subscribe may be cleaned up via its own message while + // session disconnect cleanup is in progress. This is + // benign. + tracing::debug!(%sink_id, "drop_compute_sinks: sink already removed"); continue; } Some(sink) => sink, diff --git a/src/adapter/src/coord/read_then_write.rs b/src/adapter/src/coord/read_then_write.rs new file mode 100644 index 0000000000000..511afa266dbd8 --- /dev/null +++ b/src/adapter/src/coord/read_then_write.rs @@ -0,0 +1,162 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Coordinator-side support machinery for frontend occ-based read-then write +//! sequencing in `frontend_read_then_write.rs`. +//! +//! N.B. It's a bit annoying that we still have the write submission go through +//! the coordinator. We can imagine in the long run we want a group-commit task +//! that runs independently, and we can directly submit write requests there. + +use std::collections::BTreeSet; + +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_sql::plan::SubscribeOutput; +use tokio::sync::mpsc; + +use crate::PeekResponseUnary; +use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe}; +use crate::coord::Coordinator; +use crate::coord::appends::TimestampedWriteResult; +use crate::error::AdapterError; + +impl Coordinator { + /// Handle a Command to create an internal subscribe. + /// + /// Internal subscribes are not visible in introspection collections. They + /// are initially used for frontend-sequenced read-then-write + /// (DELETE/UPDATE/INSERT ...SELECT) via OCC. + /// + /// This is called from the frontend OCC implementation after it has + /// acquired the semaphore permit. We create the subscribe here (on the + /// coordinator) and return the channel to the caller. + /// + /// The `read_holds` parameter contains the read holds for this specific + /// operation. They are passed directly through the stages (not via the + /// connection-keyed txn_read_holds map) to avoid issues where multiple + /// operations on the same connection could interfere with each other's + /// holds. + #[allow(clippy::too_many_arguments)] + pub(crate) async fn handle_create_internal_subscribe( + &mut self, + df_desc: crate::optimize::LirDataflowDescription, + cluster_id: mz_compute_types::ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: mz_adapter_types::connection::ConnectionId, + session_uuid: uuid::Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: crate::ReadHolds, + response_tx: tokio::sync::oneshot::Sender< + Result, AdapterError>, + >, + ) { + // Check if connection still exists. If not, the client disconnected + // while waiting for the semaphore - the operation is effectively cancelled. + if !self.active_conns.contains_key(&conn_id) { + // Send error to indicate cancellation + let _ = response_tx.send(Err(AdapterError::Canceled)); + return; + } + + // Create the channel for subscribe responses + let (tx, rx) = mpsc::unbounded_channel(); + + let active_subscribe = ActiveSubscribe { + conn_id: conn_id.clone(), + session_uuid, + channel: tx, + emit_progress: true, // We need progress updates for OCC + as_of, + arity, + cluster_id, + depends_on, + start_time, + output: SubscribeOutput::Diffs, // Output format for diffs + internal: true, // Internal subscribe - skip builtin table updates + }; + active_subscribe.initialize(); + + // Add metadata for the subscribe + let write_notify_fut = self + .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe)) + .await; + + // Ship dataflow + let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id); + + let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await; + + // Send the receiver back to the frontend + let _ = response_tx.send(Ok(rx)); + + // The read_holds are dropped here at the end of the function, after ship_dataflow + // has completed. This ensures the since doesn't advance past our as_of timestamp + // until the dataflow is running. + drop(read_holds); + } + + /// Handle the write attempt from the OCC loop. + pub(crate) async fn handle_attempt_timestamped_write( + &mut self, + target_id: mz_repr::CatalogItemId, + diffs: Vec<(Row, Diff)>, + write_ts: Timestamp, + result_tx: tokio::sync::oneshot::Sender, + ) { + use crate::coord::appends::PendingWriteTxn; + use mz_storage_client::client::TableData; + use smallvec::smallvec; + use std::collections::BTreeMap; + use tracing::Span; + + if self.controller.read_only() { + panic!( + "attempting OCC read-then-write in read-only mode: write_ts={}, target_id={:?}", + write_ts, target_id + ); + } + + // Early check if timestamp already passed + let next_eligible_write_ts = self.peek_local_write_ts().await.step_forward(); + if write_ts < next_eligible_write_ts { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp: write_ts, + current_write_ts: next_eligible_write_ts, + }); + return; + } + + // Create TableData from accumulated diffs + let table_data = TableData::Rows(diffs); + let writes = BTreeMap::from([(target_id, smallvec![table_data])]); + + tracing::trace!(?writes, ?write_ts, "about to attempt read-then-write"); + + // Push internal timestamped write directly to pending_writes + self.pending_writes + .push(PendingWriteTxn::InternalTimestamped { + span: Span::current(), + writes, + target_timestamp: write_ts, + result_tx, + }); + self.trigger_group_commit(); + } + + /// Drop an internal subscribe. + pub(crate) async fn drop_internal_subscribe(&mut self, sink_id: GlobalId) { + // Use drop_compute_sink instead of remove_active_compute_sink to also + // cancel the dataflow on the compute side, not just remove bookkeeping. + let _ = self.drop_compute_sink(sink_id).await; + } +} diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 7753afb5db5ca..b252446a446c6 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -96,6 +96,8 @@ use crate::util::ClientTransmitter; mod inner; +pub(crate) use inner::validate_read_dependencies; + impl Coordinator { /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would /// get stored on the stack which is bad for runtime performance, and blow up our stack usage. @@ -884,6 +886,13 @@ impl Coordinator { // Consolidate rows. This is useful e.g. for an UPDATE where the row // doesn't change, and we need to reflect that in the number of // affected rows. + // + // NOTE: This behavior differs from PostgreSQL. In PostgreSQL, + // `UPDATE t SET x = x` reports the number of rows matching the WHERE + // clause, even if no values actually change. In Materialize, because + // we use differential dataflow, the +1 and -1 diffs for unchanged rows + // cancel out during consolidation, resulting in 0 affected rows. + // This has been Materialize's behavior since early versions. differential_dataflow::consolidation::consolidate(&mut plan.updates); affected_rows = Diff::ZERO; diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 873fd46ed87e5..e54032ca91ba6 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -143,6 +143,94 @@ macro_rules! return_if_err { pub(super) use return_if_err; +/// Validates that all dependencies are valid for read-then-write operations. +/// +/// Ensures all objects the selection depends on are valid for `ReadThenWrite` operations: +/// +/// - They do not refer to any objects whose notion of time moves differently than that of +/// user tables. This limitation is meant to ensure no writes occur between this read and the +/// subsequent write. +/// - They do not use mz_now(), whose time produced during read will differ from the write +/// timestamp. +pub(crate) fn validate_read_dependencies( + catalog: &Catalog, + id: &CatalogItemId, +) -> Result<(), AdapterError> { + use CatalogItemType::*; + use mz_catalog::memory::objects; + let mut ids_to_check = Vec::new(); + let valid = match catalog.try_get_entry(id) { + Some(entry) => { + if let CatalogItem::View(objects::View { optimized_expr, .. }) + | CatalogItem::MaterializedView(objects::MaterializedView { + optimized_expr, .. + }) = entry.item() + { + if optimized_expr.contains_temporal() { + return Err(AdapterError::Unsupported( + "calls to mz_now in write statements", + )); + } + } + match entry.item().typ() { + typ @ (Func | View | MaterializedView | ContinualTask) => { + ids_to_check.extend(entry.uses()); + let valid_id = id.is_user() || matches!(typ, Func); + valid_id + } + Source | Secret | Connection => false, + // Cannot select from sinks or indexes. + Sink | Index => unreachable!(), + Table => { + if !id.is_user() { + // We can't read from non-user tables + false + } else { + // We can't read from tables that are source-exports + entry.source_export_details().is_none() + } + } + Type => true, + } + } + None => false, + }; + if !valid { + let (object_name, object_type) = match catalog.try_get_entry(id) { + Some(entry) => { + let object_name = catalog.resolve_full_name(entry.name(), None).to_string(); + let object_type = match entry.item().typ() { + // We only need the disallowed types here; the allowed types are handled above. + Source => "source", + Secret => "secret", + Connection => "connection", + Table => { + if !id.is_user() { + "system table" + } else { + "source-export table" + } + } + View => "system view", + MaterializedView => "system materialized view", + ContinualTask => "system task", + _ => "invalid dependency", + }; + (object_name, object_type.to_string()) + } + None => (id.to_string(), "unknown".to_string()), + }; + return Err(AdapterError::InvalidTableMutationSelection { + object_name, + object_type, + }); + } + for id in ids_to_check { + validate_read_dependencies(catalog, &id)?; + } + Ok(()) +} + struct DropOps { ops: Vec, dropped_active_db: bool, @@ -2682,93 +2770,7 @@ impl Coordinator { return; } - // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations: - // - // - They do not refer to any objects whose notion of time moves differently than that of - // user tables. This limitation is meant to ensure no writes occur between this read and the - // subsequent write. - // - They do not use mz_now(), whose time produced during read will differ from the write - // timestamp. - fn validate_read_dependencies( - catalog: &Catalog, - id: &CatalogItemId, - ) -> Result<(), AdapterError> { - use CatalogItemType::*; - use mz_catalog::memory::objects; - let mut ids_to_check = Vec::new(); - let valid = match catalog.try_get_entry(id) { - Some(entry) => { - if let CatalogItem::View(objects::View { optimized_expr, .. }) - | CatalogItem::MaterializedView(objects::MaterializedView { - optimized_expr, - .. - }) = entry.item() - { - if optimized_expr.contains_temporal() { - return Err(AdapterError::Unsupported( - "calls to mz_now in write statements", - )); - } - } - match entry.item().typ() { - typ @ (Func | View | MaterializedView | ContinualTask) => { - ids_to_check.extend(entry.uses()); - let valid_id = id.is_user() || matches!(typ, Func); - valid_id - } - Source | Secret | Connection => false, - // Cannot select from sinks or indexes. - Sink | Index => unreachable!(), - Table => { - if !id.is_user() { - // We can't read from non-user tables - false - } else { - // We can't read from tables that are source-exports - entry.source_export_details().is_none() - } - } - Type => true, - } - } - None => false, - }; - if !valid { - let (object_name, object_type) = match catalog.try_get_entry(id) { - Some(entry) => { - let object_name = catalog.resolve_full_name(entry.name(), None).to_string(); - let object_type = match entry.item().typ() { - // We only need the disallowed types here; the allowed types are handled above. - Source => "source", - Secret => "secret", - Connection => "connection", - Table => { - if !id.is_user() { - "system table" - } else { - "source-export table" - } - } - View => "system view", - MaterializedView => "system materialized view", - ContinualTask => "system task", - _ => "invalid dependency", - }; - (object_name, object_type.to_string()) - } - None => (id.to_string(), "unknown".to_string()), - }; - return Err(AdapterError::InvalidTableMutationSelection { - object_name, - object_type, - }); - } - for id in ids_to_check { - validate_read_dependencies(catalog, &id)?; - } - Ok(()) - } - + // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations. for gid in selection.depends_on() { let item_id = self.catalog().resolve_item_id(&gid); if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) { diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index 429ee32b14067..0a392832ba2dd 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -456,6 +456,8 @@ impl Coordinator { depends_on: dependency_ids, start_time: self.now(), output, + internal: false, // User-facing subscribe - appears in + // mz_subscriptions. }; active_subscribe.initialize(); diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index feec9e1384e30..527f8ac5c938c 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -264,18 +264,24 @@ impl Coordinator { let ret_fut = match &active_sink { ActiveComputeSink::Subscribe(active_subscribe) => { - let update = - self.catalog() - .state() - .pack_subscribe_update(id, active_subscribe, Diff::ONE); - let update = self.catalog().state().resolve_builtin_table_update(update); + // Skip builtin table update for internal subscribes + if active_subscribe.internal { + Box::pin(std::future::ready(())) as BuiltinTableAppendNotify + } else { + let update = self.catalog().state().pack_subscribe_update( + id, + active_subscribe, + Diff::ONE, + ); + let update = self.catalog().state().resolve_builtin_table_update(update); - self.metrics - .active_subscribes - .with_label_values(&[session_type]) - .inc(); + self.metrics + .active_subscribes + .with_label_values(&[session_type]) + .inc(); - self.builtin_table_update().execute(vec![update]).await.0 + self.builtin_table_update().execute(vec![update]).await.0 + } } ActiveComputeSink::CopyTo(_) => { self.metrics @@ -311,18 +317,21 @@ impl Coordinator { match &sink { ActiveComputeSink::Subscribe(active_subscribe) => { - let update = self.catalog().state().pack_subscribe_update( - id, - active_subscribe, - Diff::MINUS_ONE, - ); - let update = self.catalog().state().resolve_builtin_table_update(update); - self.builtin_table_update().blocking(vec![update]).await; + // Skip builtin table update for internal subscribes + if !active_subscribe.internal { + let update = self.catalog().state().pack_subscribe_update( + id, + active_subscribe, + Diff::MINUS_ONE, + ); + let update = self.catalog().state().resolve_builtin_table_update(update); + self.builtin_table_update().blocking(vec![update]).await; - self.metrics - .active_subscribes - .with_label_values(&[session_type]) - .dec(); + self.metrics + .active_subscribes + .with_label_values(&[session_type]) + .dec(); + } } ActiveComputeSink::CopyTo(_) => { self.metrics diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs new file mode 100644 index 0000000000000..b929ba49a5c86 --- /dev/null +++ b/src/adapter/src/frontend_read_then_write.rs @@ -0,0 +1,1098 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Frontend sequencing for read-then-write operations. +//! +//! This module implements INSERT [...] SELECT FROM [...], DELETE and UPDATE +//! operations using a subscribe with optimistic concurrency control (OCC), +//! sequenced from the session task rather than the Coordinator. This reduces +//! coordinator bottlenecking. +//! +//! The approach is: +//! 1. Validate and optimize MIR locally +//! 2. Determine timestamp via coordinator +//! 3. Optimize LIR locally +//! 4. Acquire OCC semaphore +//! 5. Create subscribe via Coordinator Command +//! 6. Run OCC loop (receive diffs, attempt write, retry on conflict) +//! 7. Return result +//! +//! ## Rollout note +//! +//! The `FRONTEND_READ_THEN_WRITE` dyncfg is read once at process startup and +//! fixed for the lifetime of the `environmentd` process. This avoids a +//! mixed-mode window where both the old lock-based coordinator path and this +//! OCC path are active concurrently — the old path acquires write locks to +//! prevent concurrent writes between its read and write phases, but this OCC +//! path does not use write locks, so concurrent operation of both paths could +//! allow an OCC write to slip between an old-path reader's read and write. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::num::{NonZeroI64, NonZeroUsize}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use differential_dataflow::consolidation; +use itertools::Itertools; +use mz_cluster_client::ReplicaId; +use mz_compute_types::ComputeInstanceId; +use mz_expr::{CollectionPlan, Id, LocalId, MirRelationExpr, MirScalarExpr}; +use mz_ore::cast::CastFrom; +use mz_repr::optimize::OverrideFrom; +use mz_repr::{ + CatalogItemId, Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowArena, Timestamp, +}; +use mz_sql::plan::{self, MutationKind, Params, QueryWhen}; +use mz_sql::session::metadata::SessionMetadata; +use qcell::QCell; +use timely::progress::Antichain; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::catalog::Catalog; +use crate::command::{Command, ExecuteResponse}; +use crate::coord::appends::TimestampedWriteResult; +use crate::coord::sequencer::validate_read_dependencies; +use crate::coord::{ExecuteContextGuard, TargetCluster}; +use crate::error::AdapterError; +use crate::optimize::Optimize; +use crate::optimize::dataflows::{ComputeInstanceSnapshot, EvalTime, ExprPrep, ExprPrepOneShot}; +use crate::session::{LifecycleTimestamps, Session, TransactionOps}; +use crate::statement_logging::{ + PreparedStatementLoggingInfo, StatementEndedExecutionReason, StatementLifecycleEvent, + StatementLoggingId, +}; +use crate::{PeekClient, PeekResponseUnary, TimelineContext, optimize}; + +/// A handle to an internal subscribe (not visible in introspection collections +/// like `mz_subscriptions`). A `Drop` impl ensures the subscribe's dataflow is +/// cleaned up when dropped. +pub(crate) struct SubscribeHandle { + rx: mpsc::UnboundedReceiver, + sink_id: GlobalId, + /// Wrapped in `Option` so we can move it out in `Drop`. + client: Option, +} + +impl SubscribeHandle { + /// Receive the next message from the subscribe, waiting if necessary. + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } + + /// Try to receive a message without waiting. + pub fn try_recv(&mut self) -> Result { + self.rx.try_recv() + } +} + +impl Drop for SubscribeHandle { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + let sink_id = self.sink_id; + + // Fire-and-forget: if the coordinator is gone, the subscribe will + // be cleaned up when the process exits anyway. + let (tx, _rx) = tokio::sync::oneshot::channel(); + client.send(Command::DropInternalSubscribe { sink_id, tx }); + } + } +} + +impl PeekClient { + /// Execute a read-then-write operation using frontend sequencing. + /// + /// Called by session code when the frontend_read_then_write dyncfg is + /// enabled. + pub(crate) async fn frontend_read_then_write( + &mut self, + session: &mut Session, + plan: plan::ReadThenWritePlan, + target_cluster: TargetCluster, + params: &Params, + logging: &Arc>, + lifecycle_timestamps: Option, + outer_ctx_extra: &mut Option, + ) -> Result { + // Get catalog snapshot + let catalog = self.catalog_snapshot("frontend_read_then_write").await; + + // Set up statement logging. If this execution happens as part of an outer statement + // (e.g. EXECUTE/FETCH), reuse and retire that existing context. + let statement_logging_id = if outer_ctx_extra.is_none() { + let result = self.statement_logging_frontend.begin_statement_execution( + session, + params, + logging, + catalog.system_config(), + lifecycle_timestamps, + ); + + if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { + self.log_began_execution(began_execution, mseh_update, prepared_statement); + Some(logging_id) + } else { + None + } + } else { + outer_ctx_extra + .take() + .and_then(|guard| guard.defuse().retire()) + }; + + // Reject mutations in read-only mode (e.g. during 0dt upgrades). + if self.read_only { + let result = Err(AdapterError::ReadOnly); + if let Some(logging_id) = statement_logging_id { + self.log_ended_execution( + logging_id, + StatementEndedExecutionReason::Errored { + error: AdapterError::ReadOnly.to_string(), + }, + ); + } + return result; + } + + // From this point on, we need to ensure we log the end of execution on + // any exit path. We use an inner function to capture all the logic that + // may fail. + let result = self + .frontend_read_then_write_inner( + session, + plan, + target_cluster, + &catalog, + statement_logging_id, + ) + .await; + + // Log the end of execution + if let Some(logging_id) = statement_logging_id { + let reason = match &result { + Ok(resp) => resp.into(), + Err(e) => StatementEndedExecutionReason::Errored { + error: e.to_string(), + }, + }; + self.log_ended_execution(logging_id, reason); + } + + result + } + + /// Inner implementation of frontend_read_then_write that handles the actual + /// logic. + /// + /// This is separated to ensure statement logging end is always called from + /// the outer function. + async fn frontend_read_then_write_inner( + &mut self, + session: &mut Session, + mut plan: plan::ReadThenWritePlan, + target_cluster: TargetCluster, + catalog: &Arc, + statement_logging_id: Option, + ) -> Result { + // Validate the operation + let validation_result = + self.validate_read_then_write(catalog, session, &plan, target_cluster)?; + + let ValidationResult { + cluster_id, + replica_id, + timeline, + depends_on, + table_desc, + } = validation_result; + + // Log cluster selection + if let Some(logging_id) = statement_logging_id { + self.log_set_cluster(logging_id, cluster_id); + } + + // Read-then-write is rejected in explicit transaction blocks (checked + // in SessionClient::try_frontend_read_then_write), so we're always in + // an implicit (autocommit) transaction here. The actual data is written + // directly via AttemptTimestampedWrite / InternalTimestamped, bypassing + // session transaction ops. The empty Writes(vec![]) just marks this as + // a write transaction in the session state machine so auto-commit + // handles it correctly. This is safe because there's no ROLLBACK + // opportunity in an implicit transaction. + debug_assert!( + session.transaction().is_implicit(), + "read-then-write should be rejected in explicit transactions" + ); + session.add_transaction_ops(TransactionOps::Writes(vec![]))?; + + // Prepare expressions (resolve unmaterializable functions like + // current_user()) + let style = ExprPrepOneShot { + logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above. + session, + catalog_state: catalog.state(), + }; + for expr in plan + .assignments + .values_mut() + .chain(plan.returning.iter_mut()) + { + style.prep_scalar_expr(expr)?; + } + + // Optimize MIR + let (optimizer, global_mir_plan) = + self.optimize_mir_read_then_write(&catalog, &plan, cluster_id)?; + + // Determine timestamp and acquire read holds + let oracle_read_ts = self.oracle_read_ts(&timeline).await; + let bundle = &global_mir_plan.id_bundle(cluster_id); + let (determination, read_holds) = self + .frontend_determine_timestamp( + catalog.state(), + session, + bundle, + &QueryWhen::FreshestTableWrite, + cluster_id, + &timeline, + oracle_read_ts, + None, + ) + .await?; + + let as_of = determination.timestamp_context.timestamp_or_default(); + + // Optimize LIR + let global_lir_plan = + self.optimize_lir_read_then_write(optimizer, global_mir_plan, as_of)?; + + // Log optimization finished + if let Some(logging_id) = statement_logging_id { + self.log_lifecycle_event(logging_id, StatementLifecycleEvent::OptimizationFinished); + } + + let sink_id = global_lir_plan.sink_id(); + let target_id = plan.id; + let kind = plan.kind.clone(); + let returning = plan.returning.clone(); + + let (df_desc, _df_meta) = global_lir_plan.unapply(); + + let arity = df_desc + .sink_exports + .values() + .next() + .expect("has sink") + .from_desc + .arity(); + + let conn_id = session.conn_id().clone(); + let session_uuid = session.uuid(); + let start_time = (self.statement_logging_frontend.now)(); + let max_result_size = catalog.system_config().max_result_size(); + let max_occ_retries = usize::cast_from(catalog.system_config().max_occ_retries()); + let statement_timeout = *session.vars().statement_timeout(); + + // Acquire OCC semaphore permit to limit concurrent write operations + let _permit = self + .occ_write_semaphore + .clone() + .acquire_owned() + .await + .expect("semaphore closed"); + + let subscribe_handle = self + .create_internal_subscribe( + Box::new(df_desc), + cluster_id, + replica_id, + depends_on.clone(), + as_of, + arity, + sink_id, + conn_id.clone(), + session_uuid, + start_time, + read_holds, + ) + .await?; + + let (retry_count, write_submitted, result) = self + .run_occ_loop( + subscribe_handle, + target_id, + kind, + returning, + max_result_size, + max_occ_retries, + table_desc, + statement_timeout, + ) + .await; + + self.coordinator_client() + .metrics() + .occ_retry_count + .observe(f64::from(u32::try_from(retry_count).unwrap_or(u32::MAX))); + + let result = result?; + + // Ensure read linearization only when no write was submitted (no + // matching rows). When a write WAS submitted, the group commit already + // bumped the oracle past `as_of`, so linearization is guaranteed and we + // can skip the costly oracle read_ts call. When no write was submitted, + // we must wait for the oracle to advance past our read timestamp to + // prevent subsequent reads from seeing an earlier state. + if !write_submitted { + self.ensure_read_linearized(&timeline, as_of).await?; + } + + Ok(result) + } + + /// Validate a read-then-write operation. + fn validate_read_then_write( + &self, + catalog: &Arc, + session: &Session, + plan: &plan::ReadThenWritePlan, + target_cluster: TargetCluster, + ) -> Result { + // Disallow mz_now in any position because read time and write time differ. + let contains_temporal = plan.selection.contains_temporal()? + || plan.assignments.values().any(|e| e.contains_temporal()) + || plan.returning.iter().any(|e| e.contains_temporal()); + if contains_temporal { + return Err(AdapterError::Unsupported( + "calls to mz_now in write statements", + )); + } + + // Validate read dependencies + for gid in plan.selection.depends_on() { + let item_id = catalog.resolve_item_id(&gid); + validate_read_dependencies(catalog, &item_id)?; + } + + let cluster = catalog.resolve_target_cluster(target_cluster, session)?; + let cluster_id = cluster.id; + + if cluster.replicas().next().is_none() { + return Err(AdapterError::NoClusterReplicasAvailable { + name: cluster.name.clone(), + is_managed: cluster.is_managed(), + }); + } + + let replica_id = session + .vars() + .cluster_replica() + .map(|name| { + cluster + .replica_id(name) + .ok_or(AdapterError::UnknownClusterReplica { + cluster_name: cluster.name.clone(), + replica_name: name.to_string(), + }) + }) + .transpose()?; + + // Determine timeline + let depends_on = plan.selection.depends_on(); + let timeline = catalog.validate_timeline_context(depends_on.iter().copied())?; + + // Get the table descriptor for constraint validation + let table_desc = catalog + .get_entry(&plan.id) + .relation_desc_latest() + .expect("table has desc") + .into_owned(); + + Ok(ValidationResult { + cluster_id, + replica_id, + timeline, + depends_on, + table_desc, + }) + } + + /// Optimize MIR for a read-then-write operation. + fn optimize_mir_read_then_write( + &self, + catalog: &Arc, + plan: &plan::ReadThenWritePlan, + cluster_id: ComputeInstanceId, + ) -> Result< + ( + optimize::subscribe::Optimizer, + optimize::subscribe::GlobalMirPlan, + ), + AdapterError, + > { + let plan::ReadThenWritePlan { + id: _, + selection, + // The finishing (row ordering/limit/projection) from the plan is not used here. + // For the subscribe-based OCC path, we emit raw diffs and the mutation + // transformation (apply_mutation_to_mir) handles column projection for updates. + // The selection expression already produces rows matching the target table schema. + finishing: _, + assignments, + kind, + returning: _, + } = plan; + + // Lower HIR to MIR + let expr = selection.clone().lower(catalog.system_config(), None)?; + + // Transform for subscribe-based diffs + let expr = apply_mutation_to_mir(expr, kind, assignments); + + // Create optimizer + let compute_instance = ComputeInstanceSnapshot::new_without_collections(cluster_id); + let (_, view_id) = self.transient_id_gen.allocate_id(); + let (_, sink_id) = self.transient_id_gen.allocate_id(); + let debug_name = format!("frontend-read-then-write-subscribe-{}", sink_id); + let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config()) + .override_from(&catalog.get_cluster(cluster_id).config.features()); + + let mut optimizer = optimize::subscribe::Optimizer::new( + catalog.clone(), + compute_instance, + view_id, + sink_id, + true, // with_snapshot + None, // up_to + debug_name, + optimizer_config, + self.optimizer_metrics.clone(), + ); + + // Get the relation desc for the expression + let expr_typ = expr.typ(); + let column_names: Vec = (0..expr_typ.column_types.len()) + .map(|i| format!("column{}", i)) + .collect(); + let relation_desc = RelationDesc::new(expr_typ, column_names.iter().map(|s| s.as_str())); + + // Create the subscribe from the query + let from = mz_sql::plan::SubscribeFrom::Query { + expr, + desc: relation_desc, + }; + + // MIR => MIR optimization (global) + let global_mir_plan = optimizer.optimize(from)?; + + Ok((optimizer, global_mir_plan)) + } + + /// Optimize LIR for a read-then-write operation. + fn optimize_lir_read_then_write( + &self, + mut optimizer: optimize::subscribe::Optimizer, + global_mir_plan: optimize::subscribe::GlobalMirPlan, + as_of: Timestamp, + ) -> Result { + let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); + let global_lir_plan = optimizer.optimize(global_mir_plan)?; + Ok(global_lir_plan) + } + + /// Get the oracle read timestamp for the timeline. + async fn oracle_read_ts(&mut self, timeline: &TimelineContext) -> Option { + if matches!(timeline, TimelineContext::TimelineDependent(_)) { + let timeline = timeline.timeline().expect("timeline is set"); + let oracle = match self.ensure_oracle(timeline.clone()).await { + Ok(oracle) => oracle, + Err(_) => return None, + }; + Some(oracle.read_ts().await) + } else { + None + } + } + + /// Ensure the oracle timestamp has advanced past the given timestamp. + /// + /// For read-then-write operations, when no write is submitted (because no + /// rows matched), we must still linearize the read to prevent subsequent + /// reads from seeing an earlier state. When a write WAS submitted, the + /// group commit bumps the oracle past `as_of`, so this returns immediately. + /// + /// This is analogous to the coordinator's `strict_serializable_reads_tx` + /// mechanism used by the regular SELECT/PEEK path, which waits for the + /// oracle to advance past the chosen read timestamp before returning + /// results. + async fn ensure_read_linearized( + &mut self, + timeline: &TimelineContext, + as_of: Timestamp, + ) -> Result<(), AdapterError> { + let tl = match timeline.timeline() { + Some(tl) => tl, + None => return Ok(()), + }; + + let oracle = self.ensure_oracle(tl.clone()).await?; + + loop { + let oracle_ts = oracle.read_ts().await; + if as_of <= oracle_ts { + return Ok(()); + } + + // Sleep for roughly the difference between as_of and the current + // oracle timestamp. Since timestamps are epoch milliseconds, the + // difference is the approximate wall-clock time we need to wait. + // Cap at 1s to avoid very long sleeps if clocks are skewed, + // matching the cap in `message_linearize_reads`. + let wait_ms = u64::from(as_of.saturating_sub(oracle_ts)); + let wait = Duration::from_millis(wait_ms).min(Duration::from_secs(1)); + tokio::time::sleep(wait).await; + } + } + + /// Creates an internal subscribe that does not appear in introspection + /// tables. Returns a [`SubscribeHandle`] that ensures cleanup on drop. + async fn create_internal_subscribe( + &self, + df_desc: Box, + cluster_id: ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: mz_adapter_types::connection::ConnectionId, + session_uuid: Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: crate::ReadHolds, + ) -> Result { + let rx: mpsc::UnboundedReceiver = self + .call_coordinator(|tx| Command::CreateInternalSubscribe { + df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + }) + .await?; + + Ok(SubscribeHandle { + rx, + sink_id, + client: Some(self.coordinator_client().clone()), + }) + } + + /// Run the OCC loop that receives subscribe rows and tries to write them. + /// + /// NOTE on correctness: semantically, a read-then-write is a SELECT + /// followed by an INSERT. And normally we have to linearize reads, that is + /// we make sure that the oracle timestamp is at least the timestamp that is + /// chosen for a PEEK, to ensure that results can't "go backwards". With our + /// subscribe loop we might get data that is timestamped beyond the current + /// oracle timestamp BUT then actually applying the write will bump the + /// oracle read timestamp to the timestamp of the write so at that time it + /// will be true that `chosen_ts <= oracle_ts`. Returns `(retry_count, + /// write_submitted, result)` so the caller can record OCC retry metrics + /// regardless of whether the operation succeeded or failed, and skip + /// read linearization when a write was submitted. + async fn run_occ_loop( + &self, + mut subscribe_handle: SubscribeHandle, + target_id: CatalogItemId, + kind: MutationKind, + returning: Vec, + max_result_size: u64, + max_occ_retries: usize, + table_desc: RelationDesc, + statement_timeout: Duration, + ) -> (usize, bool, Result) { + // Timeout of 0 is equivalent to "off", meaning we will wait "forever." + let effective_timeout = if statement_timeout == Duration::ZERO { + Duration::MAX + } else { + statement_timeout + }; + let start_time = Instant::now(); + + /// State for the OCC loop + struct OccState { + all_diffs: Vec<(Row, Timestamp, Diff)>, + current_upper: Option, + initial_progress_seen: bool, + retry_count: usize, + byte_size: u64, + write_submitted: bool, + } + + impl OccState { + fn new() -> Self { + Self { + all_diffs: Vec::new(), + current_upper: None, + initial_progress_seen: false, + retry_count: 0, + byte_size: 0, + write_submitted: false, + } + } + + /// Forward all timestamps to `current_upper` and consolidate diffs. + /// + /// After consolidation, `all_diffs` represents the net state of the + /// query as of the latest progress timestamp. Rows that were + /// retracted by newer updates cancel out, and `byte_size` is + /// recomputed to reflect the consolidated data. + fn consolidate(&mut self) { + let target_ts = self.current_upper.unwrap_or(Timestamp::MIN); + for (_, ts, _) in self.all_diffs.iter_mut() { + *ts = target_ts; + } + consolidation::consolidate_updates(&mut self.all_diffs); + self.byte_size = self + .all_diffs + .iter() + .map(|(row, _, _)| u64::cast_from(row.byte_len())) + .sum(); + } + } + + /// Result of processing a subscribe message + enum ProcessResult { + Continue { ready_to_write: bool }, + NoRowsMatched, + Error(AdapterError), + } + + fn process_message( + response: PeekResponseUnary, + state: &mut OccState, + max_result_size: u64, + table_desc: &RelationDesc, + ) -> ProcessResult { + match response { + PeekResponseUnary::Rows(mut rows) => { + let mut saw_progress = false; + + while let Some(row) = rows.next() { + let mut datums = row.iter(); + + // Extract mz_timestamp (SubscribeOutput::Diffs format: + // mz_timestamp, mz_progressed, mz_diff, ...data columns...) + let ts_datum = datums.next().expect("missing mz_timestamp"); + let ts = match ts_datum { + mz_repr::Datum::Numeric(n) => match n.0.try_into() { + Ok(ts_u64) => Timestamp::new(ts_u64), + Err(_) => { + return ProcessResult::Error(AdapterError::Internal(format!( + "mz_timestamp in subscribe output is not a valid u64: {n}" + ))); + } + }, + other => { + return ProcessResult::Error(AdapterError::Internal(format!( + "unexpected mz_timestamp datum: {other:?}" + ))); + } + }; + + // Extract mz_progressed + let progressed_datum = datums.next().expect("missing mz_progressed"); + let is_progress = matches!(progressed_datum, mz_repr::Datum::True); + + if is_progress { + state.current_upper = Some(ts); + saw_progress = true; + + // Consolidate incrementally on each progress + // message. This keeps memory bounded by the + // consolidated size and makes the byte_size check + // below accurate (except for rows received between + // two progress messages, which is a small window). + state.consolidate(); + + if !state.initial_progress_seen { + state.initial_progress_seen = true; + } else if state.all_diffs.is_empty() { + return ProcessResult::NoRowsMatched; + } + } else { + // Data row - extract mz_diff + let diff_datum = datums.next().expect("missing mz_diff"); + let diff = match diff_datum { + mz_repr::Datum::Int64(d) => Diff::from(d), + _ => continue, + }; + + let data_row = Row::pack(datums); + + // Validate constraints for rows being added (positive diff) + if diff.is_positive() { + for (idx, datum) in data_row.iter().enumerate() { + if let Err(e) = table_desc.constraints_met(idx, &datum) { + return ProcessResult::Error(e.into()); + } + } + } + + state.byte_size = state + .byte_size + .saturating_add(u64::cast_from(data_row.byte_len())); + if state.byte_size > max_result_size { + return ProcessResult::Error(AdapterError::ResultSize(format!( + "result exceeds max size of {}", + max_result_size + ))); + } + state.all_diffs.push((data_row, ts, diff)); + } + } + + let ready_to_write = + saw_progress && state.initial_progress_seen && !state.all_diffs.is_empty(); + ProcessResult::Continue { ready_to_write } + } + PeekResponseUnary::Error(e) => ProcessResult::Error(AdapterError::Internal(e)), + PeekResponseUnary::Canceled => ProcessResult::Error(AdapterError::Canceled), + } + } + + let build_no_rows_response = |kind: &MutationKind, returning: &[MirScalarExpr]| { + if !returning.is_empty() { + let rows: Vec = vec![]; + return Ok(ExecuteResponse::SendingRowsImmediate { + rows: Box::new(rows.into_row_iter()), + }); + } + Ok(match kind { + MutationKind::Delete => ExecuteResponse::Deleted(0), + MutationKind::Update => ExecuteResponse::Updated(0), + MutationKind::Insert => ExecuteResponse::Inserted(0), + }) + }; + + let mut state = OccState::new(); + + // Correctness invariant for retries: + // + // `all_diffs` accumulates *all* rows ever received from the subscribe, + // across retries. The subscribe emits a snapshot (at the as_of + // timestamp) followed by incremental updates. We consolidate on every + // progress message (flattening timestamps to MIN first), so after + // consolidation `all_diffs` always represents "what the query returns + // as of the latest progress timestamp" — old snapshot rows that were + // retracted by newer updates cancel out, and new rows appear. This is + // exactly the set of diffs we want to write. + // + // Consolidating on every progress also means the NoRowsMatched check + // works correctly across retries: if the consolidated result becomes + // logically empty (all diffs cancel out), `all_diffs` will be empty + // and we early-return without attempting a write. + let result = loop { + // Check for timeout + let remaining = effective_timeout.saturating_sub(start_time.elapsed()); + if remaining.is_zero() { + // Guard handles cleanup on drop. + break Err(AdapterError::StatementTimeout); + } + + let msg = match tokio::time::timeout(remaining, subscribe_handle.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + // Channel closed + if state.initial_progress_seen && state.all_diffs.is_empty() { + break build_no_rows_response(&kind, &returning); + } + break Err(AdapterError::Internal("subscribe channel closed".into())); + } + Err(_) => { + // Timed out + break Err(AdapterError::StatementTimeout); + } + }; + + match process_message(msg, &mut state, max_result_size, &table_desc) { + ProcessResult::Continue { ready_to_write } => { + if !ready_to_write { + continue; + } + + // Drain pending messages before attempting write + let drain_err = loop { + match subscribe_handle.try_recv() { + Ok(msg) => { + match process_message(msg, &mut state, max_result_size, &table_desc) + { + ProcessResult::Continue { .. } => {} + ProcessResult::NoRowsMatched => { + break Some(build_no_rows_response(&kind, &returning)); + } + ProcessResult::Error(e) => { + break Some(Err(e)); + } + } + } + Err(mpsc::error::TryRecvError::Empty) => break None, + Err(mpsc::error::TryRecvError::Disconnected) => { + break Some(Err(AdapterError::Internal( + "subscribe channel closed".into(), + ))); + } + } + }; + if let Some(result) = drain_err { + break result; + } + + let write_ts = state + .current_upper + .expect("must have seen progress to be ready to write"); + + // Consolidate any rows received during the drain + // (the bulk was already consolidated on the last progress). + state.consolidate(); + + // Submit write + let result = self + .call_coordinator(|tx| Command::AttemptTimestampedWrite { + target_id, + diffs: state + .all_diffs + .iter() + .map(|(row, _ts, diff)| (row.clone(), *diff)) + .collect_vec(), + write_ts, + tx, + }) + .await; + + match result { + TimestampedWriteResult::Success { .. } => { + // N.B. subscribe_handle is dropped here, which + // fires off the cleanup message. + state.write_submitted = true; + break self.build_success_response(kind, returning, &state.all_diffs); + } + TimestampedWriteResult::TimestampPassed { + current_write_ts, .. + } => { + // Do not advance `state.current_upper` (and + // therefore `write_ts`) from `current_write_ts`. + // The diffs in `all_diffs` are only known to be + // correct as of subscribe progress we have actually + // observed. Retrying at a newer oracle timestamp + // before subscribe progress catches up would risk + // applying stale diffs at the wrong timestamp. So + // on `TimestampPassed` we wait for the subscribe to + // progress and retry using that observed frontier. + state.retry_count += 1; + if state.retry_count >= max_occ_retries { + break Err(AdapterError::Internal( + "read-then-write exceeded maximum retry attempts (high contention?)".into(), + )); + } + tracing::debug!( + retry_count = state.retry_count, + write_ts = %write_ts, + current_write_ts = %current_write_ts, + "OCC write conflict, retrying" + ); + continue; + } + TimestampedWriteResult::Cancelled => { + break Err(AdapterError::Canceled); + } + } + } + ProcessResult::NoRowsMatched => { + break build_no_rows_response(&kind, &returning); + } + ProcessResult::Error(e) => { + break Err(e); + } + } + }; + + (state.retry_count, state.write_submitted, result) + } + + /// Build the success response after a successful write. + fn build_success_response( + &self, + kind: MutationKind, + returning: Vec, + all_diffs: &[(Row, Timestamp, Diff)], + ) -> Result { + if returning.is_empty() { + // No RETURNING clause: return row count. + // Sum the absolute value of all diffs. For UPDATE, each changed row + // produces a retraction (-1) and an insertion (+1), so we divide by 2. + let row_count = all_diffs + .iter() + .map(|(_, _, diff)| diff.into_inner().unsigned_abs()) + .sum::(); + let row_count = + usize::try_from(row_count).expect("positive row count must fit in usize"); + + return Ok(match kind { + MutationKind::Delete => ExecuteResponse::Deleted(row_count), + MutationKind::Update => ExecuteResponse::Updated(row_count / 2), + MutationKind::Insert => ExecuteResponse::Inserted(row_count), + }); + } + + // RETURNING: evaluate expressions against rows + let mut returning_rows = Vec::new(); + let arena = RowArena::new(); + + for (row, _ts, diff) in all_diffs { + // For DELETE: evaluate against rows with negative diff + // For UPDATE: evaluate against rows with positive diff (new rows) + // For INSERT: evaluate against rows with positive diff + let include = match kind { + MutationKind::Delete => diff.is_negative(), + MutationKind::Update | MutationKind::Insert => diff.is_positive(), + }; + + if !include { + continue; + } + + let mut returning_row = Row::with_capacity(returning.len()); + let mut packer = returning_row.packer(); + let datums: Vec<_> = row.iter().collect(); + + for expr in &returning { + match expr.eval(&datums, &arena) { + Ok(datum) => packer.push(datum), + Err(err) => return Err(err.into()), + } + } + + let multiplicity = NonZeroUsize::try_from( + NonZeroI64::try_from(diff.into_inner().abs()).expect("diff is non-zero"), + ) + .map_err(AdapterError::from)?; + + returning_rows.push((returning_row, multiplicity)); + } + + let rows: Vec = returning_rows + .into_iter() + .flat_map(|(row, count)| std::iter::repeat(row).take(count.get())) + .collect(); + Ok(ExecuteResponse::SendingRowsImmediate { + rows: Box::new(rows.into_row_iter()), + }) + } +} + +/// Result of validating a read-then-write operation. +struct ValidationResult { + cluster_id: ComputeInstanceId, + replica_id: Option, + timeline: TimelineContext, + depends_on: BTreeSet, + /// The table descriptor, used for constraint validation. + table_desc: RelationDesc, +} + +/// Transform a MIR expression to produce the appropriate diffs for a mutation. +/// +/// - DELETE: Negates the expression to produce `(row, -1)` diffs +/// - UPDATE: Unions negated old rows with mapped new rows to produce both +/// `(old_row, -1)` and `(new_row, +1)` diffs +fn apply_mutation_to_mir( + expr: MirRelationExpr, + kind: &MutationKind, + assignments: &BTreeMap, +) -> MirRelationExpr { + match kind { + MutationKind::Delete => MirRelationExpr::Negate { + input: Box::new(expr), + }, + MutationKind::Update => { + let arity = expr.arity(); + + // Find a fresh LocalId that won't conflict with any in the expression. + // We scan all Let/LetRec bindings (the only MIR nodes that introduce LocalIds) + // and pick max + 1. + let mut max_id = 0_u64; + expr.visit_pre(|e| match e { + MirRelationExpr::Let { id, .. } => { + max_id = std::cmp::max(max_id, id.into()); + } + MirRelationExpr::LetRec { ids, .. } => { + for id in ids { + max_id = std::cmp::max(max_id, id.into()); + } + } + _ => {} + }); + let binding_id = LocalId::new(max_id + 1); + + let get_binding = MirRelationExpr::Get { + id: Id::Local(binding_id), + typ: expr.typ(), + access_strategy: mz_expr::AccessStrategy::UnknownOrLocal, + }; + + // Build map expressions + let map_scalars: Vec = (0..arity) + .map(|i| { + assignments + .get(&i) + .cloned() + .unwrap_or_else(|| MirScalarExpr::column(i)) + }) + .collect(); + + let new_rows = get_binding + .clone() + .map(map_scalars) + .project((arity..2 * arity).collect()); + + let old_rows = MirRelationExpr::Negate { + input: Box::new(get_binding), + }; + + let body = new_rows.union(old_rows); + + MirRelationExpr::Let { + id: binding_id, + value: Box::new(expr), + body: Box::new(body), + } + } + MutationKind::Insert => { + // INSERT: rows pass through unchanged with positive diffs. The + // subscribe will naturally emit each row with diff +1. + // + // Note: Unlike the old PEEK-based path, if the write fails and + // retries, the subscribe reflects the current state of the source + // expression. For INSERT...SELECT where the SELECT references the + // target table, a retry will see the effects of concurrent writes. + // This is correct OCC behavior (the retry operates on the latest + // state) but differs from the old snapshot-once semantics. + expr + } + } +} diff --git a/src/adapter/src/lib.rs b/src/adapter/src/lib.rs index f28cdf717e215..8d05f1972aaa3 100644 --- a/src/adapter/src/lib.rs +++ b/src/adapter/src/lib.rs @@ -44,6 +44,7 @@ mod coord; mod error; mod explain; mod frontend_peek; +mod frontend_read_then_write; mod notice; mod optimize; mod util; diff --git a/src/adapter/src/metrics.rs b/src/adapter/src/metrics.rs index 2f258410e9382..2290aca028e92 100644 --- a/src/adapter/src/metrics.rs +++ b/src/adapter/src/metrics.rs @@ -54,6 +54,7 @@ pub struct Metrics { pub apply_catalog_implications_seconds: Histogram, pub group_commit_confirm_leadership_seconds: Histogram, pub group_commit_table_advancement_seconds: Histogram, + pub occ_retry_count: Histogram, } impl Metrics { @@ -246,6 +247,11 @@ impl Metrics { name: "mz_group_commit_table_advancement_seconds", help: "The time it takes to iterate over all catalog entries to find tables during group commit.", buckets: histogram_seconds_buckets(0.001, 32.0), + )), + occ_retry_count: registry.register(metric!( + name: "mz_occ_read_then_write_retry_count", + help: "Number of OCC retries per read-then-write operation.", + buckets: vec![0., 1., 2., 3., 5., 10., 25., 50., 100., 500., 1000.], )) } } diff --git a/src/adapter/src/optimize.rs b/src/adapter/src/optimize.rs index 13315f45e2eed..c870bf5200bf1 100644 --- a/src/adapter/src/optimize.rs +++ b/src/adapter/src/optimize.rs @@ -86,8 +86,8 @@ use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError}; /// by the optimizer implementations. type MirDataflowDescription = DataflowDescription; /// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally -/// by the optimizer implementations. -type LirDataflowDescription = DataflowDescription; +/// by the optimizer implementations and for read-then-write subscribes. +pub type LirDataflowDescription = DataflowDescription; // Core API // -------- diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 165a125ecb29a..84f26754b96b2 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -29,7 +29,7 @@ use mz_timestamp_oracle::TimestampOracle; use prometheus::Histogram; use thiserror::Error; use timely::progress::Antichain; -use tokio::sync::oneshot; +use tokio::sync::{Semaphore, oneshot}; use uuid::Uuid; use crate::catalog::Catalog; @@ -69,6 +69,12 @@ pub struct PeekClient { persist_client: PersistClient, /// Statement logging state for frontend peek sequencing. pub statement_logging_frontend: StatementLoggingFrontend, + /// Semaphore for limiting concurrent OCC (optimistic concurrency control) write operations. + pub occ_write_semaphore: Arc, + /// Whether frontend OCC read-then-write is enabled (determined once at process startup). + pub frontend_read_then_write_enabled: bool, + /// Whether the coordinator is in read-only mode. Mutations must be rejected. + pub read_only: bool, } impl PeekClient { @@ -80,6 +86,9 @@ impl PeekClient { optimizer_metrics: OptimizerMetrics, persist_client: PersistClient, statement_logging_frontend: StatementLoggingFrontend, + occ_write_semaphore: Arc, + frontend_read_then_write_enabled: bool, + read_only: bool, ) -> Self { Self { coordinator_client, @@ -90,6 +99,9 @@ impl PeekClient { statement_logging_frontend, oracles: Default::default(), // lazily populated persist_client, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, } } @@ -154,6 +166,12 @@ impl PeekClient { .expect("if the coordinator is still alive, it shouldn't have dropped our call") } + /// Returns a clone of the coordinator client, for use in cleanup guards + /// that need to send fire-and-forget commands. + pub(crate) fn coordinator_client(&self) -> &crate::Client { + &self.coordinator_client + } + /// Acquire read holds on the given compute/storage collections, and /// determine the smallest common valid write frontier among the specified collections. /// diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 19fb87d574db6..dc4f758fd1ef9 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1104,6 +1104,8 @@ impl SystemVars { &MAX_RESULT_SIZE, &MAX_COPY_FROM_ROW_SIZE, &ALLOWED_CLUSTER_REPLICA_SIZES, + &MAX_CONCURRENT_OCC_WRITES, + &MAX_OCC_RETRIES, &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE, &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET, &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES, @@ -1658,6 +1660,16 @@ impl SystemVars { .collect() } + /// Returns the value of the `max_concurrent_occ_writes` configuration parameter. + pub fn max_concurrent_occ_writes(&self) -> u32 { + *self.expect_value(&MAX_CONCURRENT_OCC_WRITES) + } + + /// Returns the value of the `max_occ_retries` configuration parameter. + pub fn max_occ_retries(&self) -> u32 { + *self.expect_value(&MAX_OCC_RETRIES) + } + /// Returns the value of the `default_cluster_replication_factor` configuration parameter. pub fn default_cluster_replication_factor(&self) -> u32 { *self.expect_value::(&DEFAULT_CLUSTER_REPLICATION_FACTOR) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 7e8ce95c5fac7..3cc15b530e68d 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -633,6 +633,20 @@ pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new( true, ); +pub static MAX_CONCURRENT_OCC_WRITES: VarDefinition = VarDefinition::new( + "max_concurrent_occ_writes", + value!(u32; 4), + "Maximum number of concurrent read-then-write (DELETE/UPDATE) operations using OCC. Read at startup; changes require an environmentd restart (Materialize).", + false, +); + +pub static MAX_OCC_RETRIES: VarDefinition = VarDefinition::new( + "max_occ_retries", + value!(u32; 1000), + "Maximum number of OCC retry attempts per read-then-write operation before giving up (Materialize).", + false, +); + pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new( "persist_fast_path_limit", value!(usize; 25), From cd45691e2c922196313ecf8044a2c1725c1b7b5c Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 23 Feb 2026 16:15:58 +0100 Subject: [PATCH 02/10] fix frontend cancellation bug --- src/adapter/src/client.rs | 40 ++++++++++++++++++++++-- src/environmentd/tests/server.rs | 53 ++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 9de2580d76901..a8b2e1624ea8c 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use anyhow::bail; use chrono::{DateTime, Utc}; use derivative::Derivative; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; use itertools::Itertools; use mz_adapter_types::connection::{ConnectionId, ConnectionIdType}; use mz_auth::Authenticated; @@ -723,6 +723,7 @@ impl SessionClient { outer_ctx_extra: Option, ) -> Result<(ExecuteResponse, Instant), AdapterError> { let execute_started = Instant::now(); + let cancel_future = cancel_future.map(|_| ()).shared(); // Attempt peek sequencing in the session task. // If unsupported, fall back to the Coordinator path. @@ -742,7 +743,11 @@ impl SessionClient { // Attempt read-then-write sequencing in the session task. if let Some(resp) = self - .try_frontend_read_then_write(&portal_name, &mut outer_ctx_extra) + .try_frontend_read_then_write_with_cancel( + &portal_name, + &mut outer_ctx_extra, + cancel_future.clone(), + ) .await? { debug!("frontend read-then-write succeeded"); @@ -761,7 +766,9 @@ impl SessionClient { tx, outer_ctx_extra, }, - cancel_future, + cancel_future + .clone() + .map(|_| std::io::Error::new(std::io::ErrorKind::Other, "connection closed")), ) .await?; Ok((response, execute_started)) @@ -1175,6 +1182,33 @@ impl SessionClient { } } + async fn try_frontend_read_then_write_with_cancel( + &mut self, + portal_name: &str, + outer_ctx_extra: &mut Option, + cancel_future: impl Future + Send, + ) -> Result, AdapterError> { + let conn_id = self.session().conn_id().clone(); + let inner_client = self.inner().clone(); + let mut cancel_future = pin::pin!(cancel_future); + let mut frontend_read_then_write = + pin::pin!(self.try_frontend_read_then_write(portal_name, outer_ctx_extra)); + + let mut cancelled = false; + + loop { + tokio::select! { + response = &mut frontend_read_then_write => return response, + _ = &mut cancel_future, if !cancelled => { + cancelled = true; + inner_client.send(Command::PrivilegedCancelRequest { + conn_id: conn_id.clone(), + }); + } + } + } + } + /// Attempt to sequence a read-then-write (DELETE/UPDATE/INSERT INTO .. /// SELECT .. FROM) from the session task. /// diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index bf6e92e085e7a..8f7fb00e5d450 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -1206,6 +1206,59 @@ fn test_cancel_long_running_query() { .expect("simple query succeeds after cancellation"); } +// Test that frontend-sequenced read-then-write statements honor pgwire cancel +// requests and do not run to completion after cancellation. +#[mz_ore::test] +fn test_cancel_frontend_read_then_write_long_running_query() { + let server = test_util::TestHarness::default() + .unsafe_mode() + .with_system_parameter_default( + "enable_adapter_frontend_occ_read_then_write".to_string(), + "true".to_string(), + ) + .start_blocking(); + server.enable_feature_flags(&["unsafe_enable_unsafe_functions"]); + + let mut client = server.connect(postgres::NoTls).unwrap(); + let cancel_token = client.cancel_token(); + + client + .batch_execute("CREATE TABLE t (a TEXT, ts INT)") + .unwrap(); + client + .batch_execute("INSERT INTO t VALUES ('hello', 10)") + .unwrap(); + + let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); + let cancel_thread = thread::spawn(move || loop { + thread::sleep(Duration::from_millis(200)); + match shutdown_rx.try_recv() { + Ok(()) => return, + Err(std::sync::mpsc::TryRecvError::Empty) => { + let _ = cancel_token.cancel_query(postgres::NoTls); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => return, + } + }); + + match client.batch_execute( + "INSERT INTO t SELECT a, CASE WHEN mz_unsafe.mz_sleep(ts) > 0 THEN 0 END AS ts FROM t", + ) { + Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {} + Err(e) => panic!("expected error SqlState::QUERY_CANCELED, but got {e:?}"), + Ok(_) => panic!("expected error SqlState::QUERY_CANCELED, but query succeeded"), + } + + shutdown_tx.send(()).unwrap(); + cancel_thread.join().unwrap(); + + let rows = client + .query_one("SELECT count(*) FROM t", &[]) + .unwrap() + .get::<_, i64>(0); + assert_eq!(rows, 1, "cancelled statement should not have committed writes"); +} + fn test_cancellation_cancels_dataflows(query: &str) { // Query that returns how many dataflows are currently installed. // Accounts for the presence of introspection subscribe dataflows by ignoring those. From 600450f1e240ae24dc2bc6234b56abea31ee4d09 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 23 Feb 2026 21:28:19 +0100 Subject: [PATCH 03/10] fix pg_cancel_backend --- src/adapter/src/client.rs | 59 ++++++++++++++++++++++++ src/adapter/src/command.rs | 21 ++++++++- src/adapter/src/coord.rs | 18 ++++++-- src/adapter/src/coord/command_handler.rs | 22 ++++++--- src/adapter/src/coord/sequencer/inner.rs | 22 ++++++--- src/adapter/src/coord/sql.rs | 2 +- src/environmentd/tests/server.rs | 59 ++++++++++++++++++++---- 7 files changed, 176 insertions(+), 27 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index a8b2e1624ea8c..c01a01c31ba31 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -574,6 +574,23 @@ pub struct SessionClient { pub enable_frontend_peek_sequencing: bool, } +/// Keeps a connection cancel watch installed in the coordinator for the +/// duration of a frontend read-then-write attempt. +struct FrontendConnectionCancelWatchGuard { + conn_id: ConnectionId, + client: Option, +} + +impl Drop for FrontendConnectionCancelWatchGuard { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + client.send(Command::UnregisterConnectionCancelWatch { + conn_id: self.conn_id.clone(), + }); + } + } +} + impl SessionClient { /// Parses a SQL expression, reporting failures as a telemetry event if /// possible. @@ -1075,6 +1092,8 @@ impl SessionClient { | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } | Command::CreateInternalSubscribe { .. } | Command::AttemptTimestampedWrite { .. } | Command::DropInternalSubscribe { .. } => {} @@ -1182,6 +1201,8 @@ impl SessionClient { } } + /// Runs frontend read-then-write while reacting to both local/session + /// cancellation and coordinator-issued connection cancellation. async fn try_frontend_read_then_write_with_cancel( &mut self, portal_name: &str, @@ -1189,8 +1210,43 @@ impl SessionClient { cancel_future: impl Future + Send, ) -> Result, AdapterError> { let conn_id = self.session().conn_id().clone(); + let inner_client = self.inner().clone(); let mut cancel_future = pin::pin!(cancel_future); + + // Cancellation can arrive via two independent paths: + // + // 1) `cancel_future`: local/session-side cancellation (e.g. client + // connection closes). For this path we must still forward a privileged + // cancel to the coordinator so in-flight work owned there is canceled. + // + // 2) `connection_cancel`: coordinator-side cancellation (e.g. + // `pg_cancel_backend`) reflected through the connection cancel watch. + // This can trigger while frontend RTW is still planning or optimizing, + // before coordinator-owned subscribe/write steps are installed. + // + // We select on both so frontend RTW exits promptly regardless of where + // cancellation originated. + + let mut connection_cancel_rx = self + .peek_client + .call_coordinator(|tx| Command::RegisterConnectionCancelWatch { + conn_id: conn_id.clone(), + tx, + }) + .await; + let _connection_cancel_guard = FrontendConnectionCancelWatchGuard { + conn_id: conn_id.clone(), + client: Some(inner_client.clone()), + }; + if *connection_cancel_rx.borrow() { + return Err(AdapterError::Canceled); + } + let connection_cancel = async move { + let _ = connection_cancel_rx.wait_for(|v| *v).await; + }; + tokio::pin!(connection_cancel); + let mut frontend_read_then_write = pin::pin!(self.try_frontend_read_then_write(portal_name, outer_ctx_extra)); @@ -1205,6 +1261,9 @@ impl SessionClient { conn_id: conn_id.clone(), }); } + _ = &mut connection_cancel => { + return Err(AdapterError::Canceled); + } } } } diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index b9f2c48de1d67..2f7ffdfa55ccb 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -41,7 +41,7 @@ use mz_sql::session::vars::{OwnedVarInput, SystemVars}; use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement}; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; -use tokio::sync::{Semaphore, mpsc, oneshot}; +use tokio::sync::{Semaphore, mpsc, oneshot, watch}; use uuid::Uuid; use crate::catalog::Catalog; @@ -326,6 +326,21 @@ pub enum Command { /// No response channel needed - this is fire-and-forget. FrontendStatementLogging(FrontendStatementLoggingEvent), + /// Registers a connection-scoped cancellation watch and returns a receiver + /// that becomes `true` when cancellation is requested for the connection. + /// + /// This is shared by coordinator staged sequencing and frontend + /// read-then-write execution. + RegisterConnectionCancelWatch { + conn_id: ConnectionId, + tx: oneshot::Sender>, + }, + + /// Unregisters a previously registered connection-scoped cancellation watch. + UnregisterConnectionCancelWatch { + conn_id: ConnectionId, + }, + /// Creates an internal subscribe (not visible in introspection) and returns /// the response channel. Initially used for frontend-sequenced /// read-then-write (DELETE/UPDATE/INSERT...SELECT) operations via OCC. @@ -395,6 +410,8 @@ impl Command { | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } | Command::CreateInternalSubscribe { .. } | Command::AttemptTimestampedWrite { .. } | Command::DropInternalSubscribe { .. } => None, @@ -433,6 +450,8 @@ impl Command { | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } | Command::CreateInternalSubscribe { .. } | Command::AttemptTimestampedWrite { .. } | Command::DropInternalSubscribe { .. } => None, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 38caf8df0865d..a1819c32fef52 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -380,6 +380,10 @@ impl Message { Command::ExplainTimestamp { .. } => "explain-timestamp", Command::FrontendStatementLogging(..) => "frontend-statement-logging", Command::StartCopyFromStdin { .. } => "start-copy-from-stdin", + Command::RegisterConnectionCancelWatch { .. } => "register-connection-cancel-watch", + Command::UnregisterConnectionCancelWatch { .. } => { + "unregister-connection-cancel-watch" + } Command::CreateInternalSubscribe { .. } => "create-internal-subscribe", Command::AttemptTimestampedWrite { .. } => "attempt-timestamped-write", Command::DropInternalSubscribe { .. } => "drop-internal-subscribe", @@ -1801,9 +1805,15 @@ pub struct Coordinator { /// to stage Batches in Persist that we will then link into the shard. active_copies: BTreeMap, - /// A map from connection ids to a watch channel that is set to `true` if the connection - /// received a cancel request. - staged_cancellation: BTreeMap, watch::Receiver)>, + /// Connection-scoped cancellation watches. + /// + /// Each entry is a watch channel whose value is `false` until cancellation + /// is requested for that connection, at which point it is set to `true`. + /// + /// Consumers install/remove these watches while they have cancellable work + /// in flight. This is used both by coordinator staged sequencing and by + /// frontend read-then-write sequencing. + connection_cancel_watches: BTreeMap, watch::Receiver)>, /// Active introspection subscribes. introspection_subscribes: BTreeMap, @@ -4526,7 +4536,7 @@ pub fn serve( active_compute_sinks: BTreeMap::new(), active_webhooks: BTreeMap::new(), active_copies: BTreeMap::new(), - staged_cancellation: BTreeMap::new(), + connection_cancel_watches: BTreeMap::new(), introspection_subscribes: BTreeMap::new(), write_locks: BTreeMap::new(), deferred_write_ops: BTreeMap::new(), diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 7b47ebcebc0d5..c6dd06945b39d 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -57,7 +57,7 @@ use mz_sql_parser::ast::{ }; use mz_storage_types::sources::Timeline; use opentelemetry::trace::TraceContextExt; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tracing::{Instrument, debug_span, info, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; @@ -489,6 +489,18 @@ impl Coordinator { Command::FrontendStatementLogging(event) => { self.handle_frontend_statement_logging_event(event); } + Command::RegisterConnectionCancelWatch { conn_id, tx } => { + let rx = self + .connection_cancel_watches + .entry(conn_id) + .or_insert_with(|| watch::channel(false)) + .1 + .clone(); + let _ = tx.send(rx); + } + Command::UnregisterConnectionCancelWatch { conn_id } => { + self.connection_cancel_watches.remove(&conn_id); + } Command::CreateInternalSubscribe { df_desc, cluster_id, @@ -525,10 +537,8 @@ impl Coordinator { write_ts, tx, } => { - self.handle_attempt_timestamped_write( - target_id, diffs, write_ts, tx, - ) - .await; + self.handle_attempt_timestamped_write(target_id, diffs, write_ts, tx) + .await; } Command::DropInternalSubscribe { sink_id, tx } => { self.drop_internal_subscribe(sink_id).await; @@ -1800,7 +1810,7 @@ impl Coordinator { self.cancel_cluster_reconfigurations_for_conn(&conn_id) .await; self.cancel_pending_copy(&conn_id); - if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) { + if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) { let _ = tx.send(true); } } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index e54032ca91ba6..23690170ba434 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -246,10 +246,16 @@ struct CreateSourceInner { } impl Coordinator { - /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that - /// execute both on and off of the coordinator thread. Stages can either produce another stage - /// to execute or a final response. An explicit [Span] is passed to allow for convenient - /// tracing. + /// Sequences a [Staged] plan. + /// + /// This is designed for plans that execute both on and off the coordinator + /// thread. Stages can either produce another stage to execute or a final + /// response. + /// + /// While a stage is cancelable, this method installs a + /// connection-scoped cancel watch entry so background work can observe + /// cancellation requests consistently with other frontend/coordinator + /// sequencing paths. pub(crate) async fn sequence_staged( &mut self, mut ctx: S::Ctx, @@ -267,7 +273,7 @@ impl Coordinator { // Channel to await cancellation. Insert a new channel, but check if the previous one // was already canceled. if let Some((_prev_tx, prev_rx)) = self - .staged_cancellation + .connection_cancel_watches .insert(session.conn_id().clone(), watch::channel(false)) { let was_canceled = *prev_rx.borrow(); @@ -279,7 +285,7 @@ impl Coordinator { } else { // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value // when cancel_enabled may have been true on an earlier stage. - self.staged_cancellation.remove(session.conn_id()); + self.connection_cancel_watches.remove(session.conn_id()); } } else { cancel_enabled = false @@ -312,6 +318,8 @@ impl Coordinator { } } + /// Waits for either the spawned stage work to complete or cancellation to + /// be signaled through the connection-scoped cancel watch. fn handle_spawn( &self, ctx: C, @@ -325,7 +333,7 @@ impl Coordinator { { let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx .session() - .and_then(|session| self.staged_cancellation.get(session.conn_id())) + .and_then(|session| self.connection_cancel_watches.get(session.conn_id())) { let mut rx = rx.clone(); Box::pin(async move { diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index 527f8ac5c938c..6a974701d19ef 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -216,7 +216,7 @@ impl Coordinator { /// Clears coordinator state for a connection. pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) { - self.staged_cancellation.remove(conn_id); + self.connection_cancel_watches.remove(conn_id); self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished) .await; self.retire_cluster_reconfigurations_for_conn(conn_id).await; diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 8f7fb00e5d450..562d4ba8a567e 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -1230,14 +1230,16 @@ fn test_cancel_frontend_read_then_write_long_running_query() { .unwrap(); let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); - let cancel_thread = thread::spawn(move || loop { - thread::sleep(Duration::from_millis(200)); - match shutdown_rx.try_recv() { - Ok(()) => return, - Err(std::sync::mpsc::TryRecvError::Empty) => { - let _ = cancel_token.cancel_query(postgres::NoTls); + let cancel_thread = thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(200)); + match shutdown_rx.try_recv() { + Ok(()) => return, + Err(std::sync::mpsc::TryRecvError::Empty) => { + let _ = cancel_token.cancel_query(postgres::NoTls); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => return, } - Err(std::sync::mpsc::TryRecvError::Disconnected) => return, } }); @@ -1256,7 +1258,48 @@ fn test_cancel_frontend_read_then_write_long_running_query() { .query_one("SELECT count(*) FROM t", &[]) .unwrap() .get::<_, i64>(0); - assert_eq!(rows, 1, "cancelled statement should not have committed writes"); + assert_eq!( + rows, 1, + "cancelled statement should not have committed writes" + ); + + // NOTE: mz_sleep with a constant ts get's evaluated differently. This gives + // us additional coverage for cancelling at different moments in the + // processing pipeline. + let cancel_token = client.cancel_token(); + let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); + let cancel_thread = thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(200)); + match shutdown_rx.try_recv() { + Ok(()) => return, + Err(std::sync::mpsc::TryRecvError::Empty) => { + let _ = cancel_token.cancel_query(postgres::NoTls); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => return, + } + } + }); + + match client.batch_execute( + "INSERT INTO t SELECT a, CASE WHEN mz_unsafe.mz_sleep(10) > 0 THEN 0 END AS ts FROM t", + ) { + Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {} + Err(e) => panic!("expected error SqlState::QUERY_CANCELED, but got {e:?}"), + Ok(_) => panic!("expected error SqlState::QUERY_CANCELED, but query succeeded"), + } + + shutdown_tx.send(()).unwrap(); + cancel_thread.join().unwrap(); + + let rows = client + .query_one("SELECT count(*) FROM t", &[]) + .unwrap() + .get::<_, i64>(0); + assert_eq!( + rows, 1, + "cancelled statement should not have committed writes" + ); } fn test_cancellation_cancels_dataflows(query: &str) { From 648745ce1a276074b11f9da928b5eb33509126ce Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 25 Feb 2026 18:56:00 +0100 Subject: [PATCH 04/10] fix lints --- misc/python/materialize/mzcompose/__init__.py | 5 +++++ misc/python/materialize/parallel_workload/action.py | 3 +++ src/adapter/src/coord/sql.rs | 5 ++++- src/adapter/src/frontend_read_then_write.rs | 8 +++----- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 22c322247039d..ab63088cc27cd 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -185,6 +185,11 @@ def get_variable_system_parameters( "true", ["true", "false"], ), + VariableSystemParameter( + "enable_adapter_frontend_occ_read_then_write", + "true", + ["true", "false"], + ), VariableSystemParameter( "enable_cast_elimination", "false", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index f9790a653af3d..730752553caba 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1520,6 +1520,9 @@ def __init__( "false", ] self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES + self.flags_with_values["enable_adapter_frontend_occ_read_then_write"] = ( + BOOLEAN_FLAG_VALUES + ) # If you are adding a new config flag in Materialize, consider using it # here instead of just marking it as uninteresting to silence the diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index 6a974701d19ef..0ab2720bf2fae 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -266,7 +266,10 @@ impl Coordinator { ActiveComputeSink::Subscribe(active_subscribe) => { // Skip builtin table update for internal subscribes if active_subscribe.internal { - Box::pin(std::future::ready(())) as BuiltinTableAppendNotify + #[allow(clippy::as_conversions)] + { + Box::pin(std::future::ready(())) as BuiltinTableAppendNotify + } } else { let update = self.catalog().state().pack_subscribe_update( id, diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index b929ba49a5c86..f067f224b5037 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -249,7 +249,7 @@ impl PeekClient { // Optimize MIR let (optimizer, global_mir_plan) = - self.optimize_mir_read_then_write(&catalog, &plan, cluster_id)?; + self.optimize_mir_read_then_write(catalog, &plan, cluster_id)?; // Determine timestamp and acquire read holds let oracle_read_ts = self.oracle_read_ts(&timeline).await; @@ -301,9 +301,7 @@ impl PeekClient { let statement_timeout = *session.vars().statement_timeout(); // Acquire OCC semaphore permit to limit concurrent write operations - let _permit = self - .occ_write_semaphore - .clone() + let _permit = Arc::clone(&self.occ_write_semaphore) .acquire_owned() .await .expect("semaphore closed"); @@ -465,7 +463,7 @@ impl PeekClient { .override_from(&catalog.get_cluster(cluster_id).config.features()); let mut optimizer = optimize::subscribe::Optimizer::new( - catalog.clone(), + Arc::::clone(catalog), compute_instance, view_id, sink_id, From 5eb9eae458318e45a1b58f9f394d0a75a0821c91 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 25 Feb 2026 22:11:09 +0100 Subject: [PATCH 05/10] fix bugs and tests --- .claude-trace/log-2026-02-27-10-27-47.jsonl | 0 src/adapter/src/client.rs | 46 +++++++++++-- src/adapter/src/frontend_peek.rs | 35 +++------- src/adapter/src/frontend_read_then_write.rs | 75 +++++++-------------- src/adapter/src/peek_client.rs | 67 ++++++++++++++++-- src/environmentd/tests/server.rs | 66 ++++++------------ src/environmentd/tests/sql.rs | 8 ++- 7 files changed, 159 insertions(+), 138 deletions(-) create mode 100644 .claude-trace/log-2026-02-27-10-27-47.jsonl diff --git a/.claude-trace/log-2026-02-27-10-27-47.jsonl b/.claude-trace/log-2026-02-27-10-27-47.jsonl new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index c01a01c31ba31..e279416386ced 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -1329,7 +1329,7 @@ impl SessionClient { let stmt_string = stmt.to_string(); // Plan the statement - let (plan, target_cluster) = { + let (plan, target_cluster, resolved_ids) = { let session = self.session.as_mut().expect("SessionClient invariant"); let conn_catalog = catalog.for_session(session); let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?; @@ -1345,9 +1345,29 @@ impl SessionClient { ), }; - (plan, target_cluster) + (plan, target_cluster, resolved_ids) }; + // Check RBAC permissions, mirroring the coordinator's check in sequencer.rs. + { + let session = self.session.as_ref().expect("SessionClient invariant"); + let conn_catalog = catalog.for_session(session); + let target_cluster_id = catalog + .resolve_target_cluster(target_cluster.clone(), session) + .ok() + .map(|cluster| cluster.id); + if let Err(e) = mz_sql::rbac::check_plan( + &conn_catalog, + None:: Option>, + session, + &plan, + target_cluster_id, + &resolved_ids, + ) { + return Err(e.into()); + } + } + // Handle ReadThenWrite plans or Insert plans. let rtw_plan = match plan { Plan::ReadThenWrite(rtw_plan) => rtw_plan, @@ -1377,12 +1397,24 @@ impl SessionClient { let inner_mir = optimized_mir.into_inner(); if inner_mir.as_const().is_some() && insert_plan.returning.is_empty() { let session = self.session.as_mut().expect("SessionClient invariant"); - return Ok(Some(Coordinator::insert_constant( - &catalog, + + let statement_logging_id = self.peek_client.begin_statement_logging( session, - insert_plan.id, - inner_mir, - )?)); + ¶ms, + &logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); + + let result = + Coordinator::insert_constant(&catalog, session, insert_plan.id, inner_mir); + + if let Some(logging_id) = statement_logging_id { + self.peek_client.end_statement_logging(logging_id, &result); + } + + return Ok(Some(result?)); } // Get table descriptor arity for the finishing projection diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 6f3e2e9f78ee5..98f5a39dfcd89 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -192,33 +192,14 @@ impl PeekClient { // Set up statement logging, and log the beginning of execution. // (But only if we're not executing in the context of another statement.) - let statement_logging_id = if outer_ctx_extra.is_none() { - // This is a new statement, so begin statement logging - let result = self.statement_logging_frontend.begin_statement_execution( - session, - ¶ms, - &logging, - catalog.system_config(), - lifecycle_timestamps, - ); - - if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { - self.log_began_execution(began_execution, mseh_update, prepared_statement); - Some(logging_id) - } else { - None - } - } else { - // We're executing in the context of another statement (e.g., FETCH), - // so extract the statement logging ID from the outer context if present. - // We take ownership and retire the outer context here. The end of execution will be - // logged in one of the following ways: - // - At the end of this function, if the execution is finished by then. - // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek. - outer_ctx_extra - .take() - .and_then(|guard| guard.defuse().retire()) - }; + let statement_logging_id = self.begin_statement_logging( + session, + ¶ms, + &logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); let result = self .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id) diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index f067f224b5037..718b4694af04d 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -126,26 +126,14 @@ impl PeekClient { // Set up statement logging. If this execution happens as part of an outer statement // (e.g. EXECUTE/FETCH), reuse and retire that existing context. - let statement_logging_id = if outer_ctx_extra.is_none() { - let result = self.statement_logging_frontend.begin_statement_execution( - session, - params, - logging, - catalog.system_config(), - lifecycle_timestamps, - ); - - if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { - self.log_began_execution(began_execution, mseh_update, prepared_statement); - Some(logging_id) - } else { - None - } - } else { - outer_ctx_extra - .take() - .and_then(|guard| guard.defuse().retire()) - }; + let statement_logging_id = self.begin_statement_logging( + session, + params, + logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); // Reject mutations in read-only mode (e.g. during 0dt upgrades). if self.read_only { @@ -176,13 +164,7 @@ impl PeekClient { // Log the end of execution if let Some(logging_id) = statement_logging_id { - let reason = match &result { - Ok(resp) => resp.into(), - Err(e) => StatementEndedExecutionReason::Errored { - error: e.to_string(), - }, - }; - self.log_ended_execution(logging_id, reason); + self.end_statement_logging(logging_id, &result); } result @@ -300,6 +282,14 @@ impl PeekClient { let max_occ_retries = usize::cast_from(catalog.system_config().max_occ_retries()); let statement_timeout = *session.vars().statement_timeout(); + // Linearize the read: wait for the oracle to advance past `as_of` + // before creating the subscribe or attempting any writes. This + // mirrors the old coordinator path's `strict_serializable_reads_tx` + // step and prevents writes at far-future timestamps (e.g. from a + // REFRESH MV with a far-future since) from bumping the oracle into + // the future. + self.ensure_read_linearized(&timeline, as_of).await?; + // Acquire OCC semaphore permit to limit concurrent write operations let _permit = Arc::clone(&self.occ_write_semaphore) .acquire_owned() @@ -322,7 +312,7 @@ impl PeekClient { ) .await?; - let (retry_count, write_submitted, result) = self + let (retry_count, result) = self .run_occ_loop( subscribe_handle, target_id, @@ -342,16 +332,6 @@ impl PeekClient { let result = result?; - // Ensure read linearization only when no write was submitted (no - // matching rows). When a write WAS submitted, the group commit already - // bumped the oracle past `as_of`, so linearization is guaranteed and we - // can skip the costly oracle read_ts call. When no write was submitted, - // we must wait for the oracle to advance past our read timestamp to - // prevent subsequent reads from seeing an earlier state. - if !write_submitted { - self.ensure_read_linearized(&timeline, as_of).await?; - } - Ok(result) } @@ -610,8 +590,7 @@ impl PeekClient { /// oracle read timestamp to the timestamp of the write so at that time it /// will be true that `chosen_ts <= oracle_ts`. Returns `(retry_count, /// write_submitted, result)` so the caller can record OCC retry metrics - /// regardless of whether the operation succeeded or failed, and skip - /// read linearization when a write was submitted. + /// regardless of whether the operation succeeded or failed. async fn run_occ_loop( &self, mut subscribe_handle: SubscribeHandle, @@ -622,7 +601,7 @@ impl PeekClient { max_occ_retries: usize, table_desc: RelationDesc, statement_timeout: Duration, - ) -> (usize, bool, Result) { + ) -> (usize, Result) { // Timeout of 0 is equivalent to "off", meaning we will wait "forever." let effective_timeout = if statement_timeout == Duration::ZERO { Duration::MAX @@ -638,7 +617,6 @@ impl PeekClient { initial_progress_seen: bool, retry_count: usize, byte_size: u64, - write_submitted: bool, } impl OccState { @@ -649,7 +627,6 @@ impl PeekClient { initial_progress_seen: false, retry_count: 0, byte_size: 0, - write_submitted: false, } } @@ -815,11 +792,12 @@ impl PeekClient { let msg = match tokio::time::timeout(remaining, subscribe_handle.recv()).await { Ok(Some(msg)) => msg, Ok(None) => { - // Channel closed + // Channel closed — the subscribe was dropped, most likely + // because the connection was canceled. if state.initial_progress_seen && state.all_diffs.is_empty() { break build_no_rows_response(&kind, &returning); } - break Err(AdapterError::Internal("subscribe channel closed".into())); + break Err(AdapterError::Canceled); } Err(_) => { // Timed out @@ -850,9 +828,7 @@ impl PeekClient { } Err(mpsc::error::TryRecvError::Empty) => break None, Err(mpsc::error::TryRecvError::Disconnected) => { - break Some(Err(AdapterError::Internal( - "subscribe channel closed".into(), - ))); + break Some(Err(AdapterError::Canceled)); } } }; @@ -886,7 +862,6 @@ impl PeekClient { TimestampedWriteResult::Success { .. } => { // N.B. subscribe_handle is dropped here, which // fires off the cleanup message. - state.write_submitted = true; break self.build_success_response(kind, returning, &state.all_diffs); } TimestampedWriteResult::TimestampPassed { @@ -929,7 +904,7 @@ impl PeekClient { } }; - (state.retry_count, state.write_submitted, result) + (state.retry_count, result) } /// Build the success response after a successful write. diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 84f26754b96b2..9f1d9a3b5c956 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -33,16 +33,19 @@ use tokio::sync::{Semaphore, oneshot}; use uuid::Uuid; use crate::catalog::Catalog; -use crate::command::{CatalogSnapshot, Command}; -use crate::coord::Coordinator; +use crate::command::{CatalogSnapshot, Command, ExecuteResponse}; use crate::coord::peek::FastPathPlan; -use crate::statement_logging::WatchSetCreation; +use crate::coord::{Coordinator, ExecuteContextGuard}; +use crate::session::{LifecycleTimestamps, Session}; use crate::statement_logging::{ - FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend, - StatementLoggingId, + FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo, + StatementLoggingFrontend, StatementLoggingId, WatchSetCreation, }; use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging}; +use mz_sql::plan::Params; +use qcell::QCell; + /// Storage collections trait alias we need to consult for since/frontiers. pub type StorageCollectionsHandle = Arc< dyn mz_storage_client::storage_collections::StorageCollections @@ -437,7 +440,59 @@ impl PeekClient { }) } - // Statement logging helper methods + /// Set up statement logging for a frontend-sequenced operation. + /// + /// If `outer_ctx_extra` is `None`, begins a new statement execution log + /// entry. If `outer_ctx_extra` is `Some` (e.g. EXECUTE/FETCH), reuses and + /// retires the existing logging context. + /// + /// Returns the logging ID if this statement is being logged, or `None` if + /// it was not sampled. + pub(crate) fn begin_statement_logging( + &self, + session: &mut Session, + params: &Params, + logging: &Arc>, + catalog: &Catalog, + lifecycle_timestamps: Option, + outer_ctx_extra: &mut Option, + ) -> Option { + if outer_ctx_extra.is_none() { + let result = self.statement_logging_frontend.begin_statement_execution( + session, + params, + logging, + catalog.system_config(), + lifecycle_timestamps, + ); + + if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { + self.log_began_execution(began_execution, mseh_update, prepared_statement); + Some(logging_id) + } else { + None + } + } else { + outer_ctx_extra + .take() + .and_then(|guard| guard.defuse().retire()) + } + } + + /// Log the end of a frontend-sequenced statement execution. + pub(crate) fn end_statement_logging( + &self, + logging_id: StatementLoggingId, + result: &Result, + ) { + let reason = match result { + Ok(resp) => resp.into(), + Err(e) => statement_logging::StatementEndedExecutionReason::Errored { + error: e.to_string(), + }, + }; + self.log_ended_execution(logging_id, reason); + } /// Log the beginning of statement execution. pub(crate) fn log_began_execution( diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index 562d4ba8a567e..8adbede644d27 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -3349,11 +3349,11 @@ fn test_github_20262() { } } -// Test that the server properly handles cancellation requests of read-then-write queries. +// Test that a timed-out read-then-write query does not commit its writes. // See database-issues#6134. #[mz_ore::test] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `epoll_wait` on OS `linux` -fn test_cancel_read_then_write() { +fn test_timeout_read_then_write() { let server = test_util::TestHarness::default() .unsafe_mode() .start_blocking(); @@ -3363,53 +3363,27 @@ fn test_cancel_read_then_write() { client .batch_execute("CREATE TABLE foo (a TEXT, ts INT)") .unwrap(); + client + .batch_execute("INSERT INTO foo VALUES ('hello', 10)") + .unwrap(); - // Lots of races here, so try this whole thing in a loop. - Retry::default() - .clamp_backoff(Duration::ZERO) - .retry(|_state| { - let mut client1 = server.connect(postgres::NoTls).unwrap(); - let mut client2 = server.connect(postgres::NoTls).unwrap(); - let cancel_token = client2.cancel_token(); + client + .batch_execute("SET statement_timeout = '5s'") + .unwrap(); - client1.batch_execute("DELETE FROM foo").unwrap(); - client1.batch_execute("SET statement_timeout = '5s'").unwrap(); - client1 - .batch_execute("INSERT INTO foo VALUES ('hello', 10)") - .unwrap(); + let err = client + .batch_execute("INSERT INTO foo SELECT a, CASE WHEN mz_unsafe.mz_sleep(ts) > 0 THEN 0 END AS ts FROM foo") + .unwrap_err(); + assert_contains!(err.to_string_with_causes(), "statement timeout"); - let handle1 = thread::spawn(move || { - let err = client1 - .batch_execute("insert into foo select a, case when mz_unsafe.mz_sleep(ts) > 0 then 0 end as ts from foo") - .unwrap_err(); - assert_contains!( - err.to_string_with_causes(), - "statement timeout" - ); - client1 - }); - std::thread::sleep(Duration::from_millis(100)); - let handle2 = thread::spawn(move || { - let err = client2 - .batch_execute("insert into foo values ('blah', 1);") - .unwrap_err(); - assert_contains!( - err.to_string_with_causes(), - "canceling statement" - ); - }); - std::thread::sleep(Duration::from_millis(100)); - cancel_token.cancel_query(postgres::NoTls)?; - let mut client1 = handle1.join().unwrap(); - handle2.join().unwrap(); - let rows:i64 = client1.query_one ("SELECT count(*) FROM foo", &[]).unwrap().get(0); - // We ran 3 inserts. First succeeded. Second timedout. Third cancelled. - if rows !=1 { - anyhow::bail!("unexpected row count: {rows}"); - } - Ok::<_, anyhow::Error>(()) - }) - .unwrap(); + let rows: i64 = client + .query_one("SELECT count(*) FROM foo", &[]) + .unwrap() + .get(0); + assert_eq!( + rows, 1, + "timed-out statement should not have committed writes" + ); } #[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))] diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 0d466e1596750..63eda50100a72 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1680,8 +1680,12 @@ fn test_subscribe_outlive_cluster() { .batch_execute("CREATE CLUSTER newcluster REPLICAS (r1 (size 'scale=1,workers=1'))") .unwrap(); client2_cancel.cancel_query(postgres::NoTls).unwrap(); - client2 - .batch_execute("ROLLBACK; SET CLUSTER = default") + // The cancel is asynchronous and might race with subsequent commands. + // Retry ROLLBACK in a loop in case it gets canceled. + Retry::default() + .max_tries(5) + .clamp_backoff(Duration::from_millis(100)) + .retry(|_| client2.batch_execute("ROLLBACK; SET CLUSTER = default")) .unwrap(); assert_eq!( client2 From bf193abf636a24253975f527fa1160ed32706064 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 27 Feb 2026 13:46:48 +0100 Subject: [PATCH 06/10] resolve unmaterialie functions --- src/adapter/src/frontend_read_then_write.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index 718b4694af04d..b6f67cd17e5bc 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -231,7 +231,7 @@ impl PeekClient { // Optimize MIR let (optimizer, global_mir_plan) = - self.optimize_mir_read_then_write(catalog, &plan, cluster_id)?; + self.optimize_mir_read_then_write(catalog, session, &plan, cluster_id)?; // Determine timestamp and acquire read holds let oracle_read_ts = self.oracle_read_ts(&timeline).await; @@ -406,6 +406,7 @@ impl PeekClient { fn optimize_mir_read_then_write( &self, catalog: &Arc, + session: &dyn SessionMetadata, plan: &plan::ReadThenWritePlan, cluster_id: ComputeInstanceId, ) -> Result< @@ -432,7 +433,19 @@ impl PeekClient { let expr = selection.clone().lower(catalog.system_config(), None)?; // Transform for subscribe-based diffs - let expr = apply_mutation_to_mir(expr, kind, assignments); + let mut expr = apply_mutation_to_mir(expr, kind, assignments); + + // Resolve unmaterializable functions (now(), current_user, etc.) + // before the subscribe optimizer sees them. The subscribe uses + // ExprPrepMaintained which rejects these, but our subscribe is a + // one-shot read so we can safely resolve them to constants. + // mz_now() is already rejected by validate_read_then_write(). + let style = ExprPrepOneShot { + logical_time: EvalTime::NotAvailable, + session, + catalog_state: catalog.state(), + }; + expr.try_visit_scalars_mut(&mut |s| style.prep_scalar_expr(s))?; // Create optimizer let compute_instance = ComputeInstanceSnapshot::new_without_collections(cluster_id); From 71f4fa828c226724faedb446ebe470abd937234f Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 27 Feb 2026 13:50:17 +0100 Subject: [PATCH 07/10] fix rebase errors --- src/adapter/src/client.rs | 5 ++++- src/adapter/src/frontend_read_then_write.rs | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index e279416386ced..55c9ca12f7786 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -58,13 +58,16 @@ use crate::command::{ use crate::coord::{Coordinator, ExecuteContextGuard}; use crate::error::AdapterError; use crate::metrics::Metrics; +use crate::optimize::Optimize; use crate::session::{ EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId, }; use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType}; use crate::webhook::AppendWebhookResponse; -use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse}; +use crate::{ + AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse, optimize, +}; /// A handle to a running coordinator. /// diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index b6f67cd17e5bc..b9dfba37700ec 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -469,10 +469,11 @@ impl PeekClient { // Get the relation desc for the expression let expr_typ = expr.typ(); - let column_names: Vec = (0..expr_typ.column_types.len()) + let sql_typ = mz_repr::SqlRelationType::from_repr(&expr_typ); + let column_names: Vec = (0..sql_typ.column_types.len()) .map(|i| format!("column{}", i)) .collect(); - let relation_desc = RelationDesc::new(expr_typ, column_names.iter().map(|s| s.as_str())); + let relation_desc = RelationDesc::new(sql_typ, column_names.iter().map(|s| s.as_str())); // Create the subscribe from the query let from = mz_sql::plan::SubscribeFrom::Query { From 2cb8e6d3b92ff3f18704612581c2d4960b270a05 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 27 Feb 2026 14:14:48 +0100 Subject: [PATCH 08/10] fix timestamp stuff --- src/adapter/src/frontend_read_then_write.rs | 27 ++++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index b9dfba37700ec..c17739eef9131 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -60,7 +60,8 @@ use crate::catalog::Catalog; use crate::command::{Command, ExecuteResponse}; use crate::coord::appends::TimestampedWriteResult; use crate::coord::sequencer::validate_read_dependencies; -use crate::coord::{ExecuteContextGuard, TargetCluster}; +use crate::coord::timestamp_selection::TimestampProvider; +use crate::coord::{Coordinator, ExecuteContextGuard, TargetCluster}; use crate::error::AdapterError; use crate::optimize::Optimize; use crate::optimize::dataflows::{ComputeInstanceSnapshot, EvalTime, ExprPrep, ExprPrepOneShot}; @@ -234,7 +235,7 @@ impl PeekClient { self.optimize_mir_read_then_write(catalog, session, &plan, cluster_id)?; // Determine timestamp and acquire read holds - let oracle_read_ts = self.oracle_read_ts(&timeline).await; + let oracle_read_ts = self.oracle_read_ts(&timeline).await?; let bundle = &global_mir_plan.id_bundle(cluster_id); let (determination, read_holds) = self .frontend_determine_timestamp( @@ -500,16 +501,18 @@ impl PeekClient { } /// Get the oracle read timestamp for the timeline. - async fn oracle_read_ts(&mut self, timeline: &TimelineContext) -> Option { - if matches!(timeline, TimelineContext::TimelineDependent(_)) { - let timeline = timeline.timeline().expect("timeline is set"); - let oracle = match self.ensure_oracle(timeline.clone()).await { - Ok(oracle) => oracle, - Err(_) => return None, - }; - Some(oracle.read_ts().await) - } else { - None + async fn oracle_read_ts( + &mut self, + timeline: &TimelineContext, + ) -> Result, AdapterError> { + let timeline = ::get_timeline(timeline); + + match timeline { + Some(timeline) => { + let oracle = self.ensure_oracle(timeline).await?; + Ok(Some(oracle.read_ts().await)) + } + None => Ok(None), } } From facc2ba749464adeda88ac3dabbd384f89e9f247 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 27 Feb 2026 14:14:48 +0100 Subject: [PATCH 09/10] DNM: remove pr-reviwe skill --- .claude/skills/mz-pr-review/SKILL.md | 74 ---------------------------- 1 file changed, 74 deletions(-) delete mode 100644 .claude/skills/mz-pr-review/SKILL.md diff --git a/.claude/skills/mz-pr-review/SKILL.md b/.claude/skills/mz-pr-review/SKILL.md deleted file mode 100644 index b036a60c75c54..0000000000000 --- a/.claude/skills/mz-pr-review/SKILL.md +++ /dev/null @@ -1,74 +0,0 @@ ---- -description: > -Local PR review against Materialize standards. -This skill should be used when the user asks for a review of changes. -argument-hint: [base-branch] -allowed-tools: [Bash, Read, Grep, Glob, Task] ---- - -Perform a local code review of the current branch's changes against Materialize project standards. - -## Steps - -1. Parse arguments from: $ARGUMENTS — a PR number, base branch, or nothing. -2. Get the diff using the first method that works: - - **PR number given** (e.g. `123`): `gh pr diff 123` - - **git available**: `git diff ...HEAD` (default base: `main`) - - **jj available**: `jj diff -r ` (default: diff from trunk) -3. Get the file list from the same diff (add `--stat` for git, `--stat` for jj, or `gh pr diff 123 --stat` for PR). -4. Review the diff against the checklists below. -5. Present findings organized as: **Blocking**, **Strong suggestions**, **Nits**. - -## Review checklist - -The overall developer guide for reviewing changes is defined in `doc/developer/guide-changes.md`, always read and follow its guidance. - -### Tests -- Every behavior change has at least one new or modified test. -- SQL/query behavior → look for `.slt` in `test/sqllogictest/`. -- Wire/protocol behavior → look for `.pt` in `test/pgtest/`. -- Rust logic/types/APIs → add or extend a Rust unit test in the crate (e.g. `#[cfg(test)]` or `tests/`); run with `cargo test -p mz-`. -- Prefer testing observable behavior (SQL results, wire protocol) over implementation details. -- Red flag: behavior change with no test changes. -- For more testing guidelines, read `doc/developer/guide-testing.md` - -### Code style (Rust) -- **Imports:** `std` → external crates → `crate::`; one `use` per module; prefer `crate::` over `super::` in non-test code. -- **Errors:** Structured with `thiserror`; no bare `anyhow!("...")`. `Display` should not print full error chain. -- **Async:** Use `ore::task::spawn` / `spawn_blocking`, not raw `tokio::spawn`. -- **Tests:** `#[mz_ore::test]`; panic in tests rather than returning `Result`. - -### Code style (SQL) -- Keywords capitalized (`SELECT`, `FROM`); identifiers lowercase. -- No space between function name and `(`. - -### Error messages -- Primary: short, factual, lowercase first letter, no trailing punctuation. -- Detail/hint: complete sentences, capitalized, period. -- No "unable", "bad", "illegal", "unknown"; say what kind of object. - -### Architecture -- **Simplicity:** No incidental complexity; simplify redundant logic. -- **No special casing:** Prefer composable design over extra booleans/branches. -- **Encapsulation:** sql-parser = grammar only (no semantic validation); sql = planning + semantics. -- **Dependencies:** New crates must be justified. -- For more design guidelines read: `doc/developer/best-practices.md` - -### Polish -- No leftover `// XXX`, `// FIXME`, `dbg!`, `println!`, or commented-out code. -- No unrelated formatting changes in untouched code. -- New public items should have doc comments. - -### Release notes -- Any user-visible change to stable APIs needs a release note (imperative, "This release will…"). - -## One semantic change rule - -The PR should do one thing. If it spans multiple CODEOWNERS areas (e.g. sql-parser + sql planner), consider suggesting a split. - -## Rules - -- Review the code, not the author. Explain the *why* behind suggestions. -- Use **nit:** for preferences where reasonable people could disagree. -- If the PR improves overall codebase health and blocking items are addressed, say so. -- Do NOT make any changes — this is read-only review. From 865091fd2923649bca948859764e7ff925a496da Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 27 Feb 2026 16:04:45 +0100 Subject: [PATCH 10/10] try fix --- src/adapter/src/frontend_read_then_write.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs index c17739eef9131..d57a072b996b1 100644 --- a/src/adapter/src/frontend_read_then_write.rs +++ b/src/adapter/src/frontend_read_then_write.rs @@ -60,8 +60,7 @@ use crate::catalog::Catalog; use crate::command::{Command, ExecuteResponse}; use crate::coord::appends::TimestampedWriteResult; use crate::coord::sequencer::validate_read_dependencies; -use crate::coord::timestamp_selection::TimestampProvider; -use crate::coord::{Coordinator, ExecuteContextGuard, TargetCluster}; +use crate::coord::{ExecuteContextGuard, TargetCluster}; use crate::error::AdapterError; use crate::optimize::Optimize; use crate::optimize::dataflows::{ComputeInstanceSnapshot, EvalTime, ExprPrep, ExprPrepOneShot}; @@ -505,11 +504,9 @@ impl PeekClient { &mut self, timeline: &TimelineContext, ) -> Result, AdapterError> { - let timeline = ::get_timeline(timeline); - - match timeline { + match timeline.timeline() { Some(timeline) => { - let oracle = self.ensure_oracle(timeline).await?; + let oracle = self.ensure_oracle(timeline.clone()).await?; Ok(Some(oracle.read_ts().await)) } None => Ok(None),