diff --git a/.claude-trace/log-2026-02-27-10-27-47.jsonl b/.claude-trace/log-2026-02-27-10-27-47.jsonl new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/.claude/skills/mz-pr-review/SKILL.md b/.claude/skills/mz-pr-review/SKILL.md deleted file mode 100644 index b036a60c75c54..0000000000000 --- a/.claude/skills/mz-pr-review/SKILL.md +++ /dev/null @@ -1,74 +0,0 @@ ---- -description: > -Local PR review against Materialize standards. -This skill should be used when the user asks for a review of changes. -argument-hint: [base-branch] -allowed-tools: [Bash, Read, Grep, Glob, Task] ---- - -Perform a local code review of the current branch's changes against Materialize project standards. - -## Steps - -1. Parse arguments from: $ARGUMENTS — a PR number, base branch, or nothing. -2. Get the diff using the first method that works: - - **PR number given** (e.g. `123`): `gh pr diff 123` - - **git available**: `git diff ...HEAD` (default base: `main`) - - **jj available**: `jj diff -r ` (default: diff from trunk) -3. Get the file list from the same diff (add `--stat` for git, `--stat` for jj, or `gh pr diff 123 --stat` for PR). -4. Review the diff against the checklists below. -5. Present findings organized as: **Blocking**, **Strong suggestions**, **Nits**. - -## Review checklist - -The overall developer guide for reviewing changes is defined in `doc/developer/guide-changes.md`, always read and follow its guidance. - -### Tests -- Every behavior change has at least one new or modified test. -- SQL/query behavior → look for `.slt` in `test/sqllogictest/`. -- Wire/protocol behavior → look for `.pt` in `test/pgtest/`. -- Rust logic/types/APIs → add or extend a Rust unit test in the crate (e.g. `#[cfg(test)]` or `tests/`); run with `cargo test -p mz-`. -- Prefer testing observable behavior (SQL results, wire protocol) over implementation details. -- Red flag: behavior change with no test changes. -- For more testing guidelines, read `doc/developer/guide-testing.md` - -### Code style (Rust) -- **Imports:** `std` → external crates → `crate::`; one `use` per module; prefer `crate::` over `super::` in non-test code. -- **Errors:** Structured with `thiserror`; no bare `anyhow!("...")`. `Display` should not print full error chain. -- **Async:** Use `ore::task::spawn` / `spawn_blocking`, not raw `tokio::spawn`. -- **Tests:** `#[mz_ore::test]`; panic in tests rather than returning `Result`. - -### Code style (SQL) -- Keywords capitalized (`SELECT`, `FROM`); identifiers lowercase. -- No space between function name and `(`. - -### Error messages -- Primary: short, factual, lowercase first letter, no trailing punctuation. -- Detail/hint: complete sentences, capitalized, period. -- No "unable", "bad", "illegal", "unknown"; say what kind of object. - -### Architecture -- **Simplicity:** No incidental complexity; simplify redundant logic. -- **No special casing:** Prefer composable design over extra booleans/branches. -- **Encapsulation:** sql-parser = grammar only (no semantic validation); sql = planning + semantics. -- **Dependencies:** New crates must be justified. -- For more design guidelines read: `doc/developer/best-practices.md` - -### Polish -- No leftover `// XXX`, `// FIXME`, `dbg!`, `println!`, or commented-out code. -- No unrelated formatting changes in untouched code. -- New public items should have doc comments. - -### Release notes -- Any user-visible change to stable APIs needs a release note (imperative, "This release will…"). - -## One semantic change rule - -The PR should do one thing. If it spans multiple CODEOWNERS areas (e.g. sql-parser + sql planner), consider suggesting a split. - -## Rules - -- Review the code, not the author. Explain the *why* behind suggestions. -- Use **nit:** for preferences where reasonable people could disagree. -- If the PR improves overall codebase health and blocking items are addressed, say so. -- Do NOT make any changes — this is read-only review. diff --git a/doc/developer/design/20260210_incremental_occ_read_then_write.md b/doc/developer/design/20260210_incremental_occ_read_then_write.md index db6286fb2f210..4534b7fbd3871 100644 --- a/doc/developer/design/20260210_incremental_occ_read_then_write.md +++ b/doc/developer/design/20260210_incremental_occ_read_then_write.md @@ -128,7 +128,7 @@ Session Task Coordinator | | |-- acquire OCC semaphore | | | - |-- CreateReadThenWriteSubscribe ----> | + |-- CreateInternalSubscribe ---------> | | <------------ subscribe channel -----| | | | +-- OCC Loop ------------------+ | @@ -141,7 +141,7 @@ Session Task Coordinator | | if Success: break | | | +------------------------------+ | | | - |-- DropReadThenWriteSubscribe ------> | + |-- DropInternalSubscribe -----------> | | | ``` @@ -193,7 +193,7 @@ subscribe. The subscribes created for read-then-write are internal: they do not appear in `mz_subscriptions` or other introspection tables, and they don't increment the active subscribes metric. They are created and dropped via dedicated `Command` -variants (`CreateReadThenWriteSubscribe`, `DropReadThenWriteSubscribe`). +variants (`CreateInternalSubscribe`, `DropInternalSubscribe`). ## Correctness diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 22c322247039d..ab63088cc27cd 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -185,6 +185,11 @@ def get_variable_system_parameters( "true", ["true", "false"], ), + VariableSystemParameter( + "enable_adapter_frontend_occ_read_then_write", + "true", + ["true", "false"], + ), VariableSystemParameter( "enable_cast_elimination", "false", diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index f9790a653af3d..730752553caba 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -1520,6 +1520,9 @@ def __init__( "false", ] self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES + self.flags_with_values["enable_adapter_frontend_occ_read_then_write"] = ( + BOOLEAN_FLAG_VALUES + ) # If you are adding a new config flag in Materialize, consider using it # here instead of just marking it as uninteresting to silence the diff --git a/src/adapter-types/src/dyncfgs.rs b/src/adapter-types/src/dyncfgs.rs index c1c1e1296772f..da7dfd2f529e3 100644 --- a/src/adapter-types/src/dyncfgs.rs +++ b/src/adapter-types/src/dyncfgs.rs @@ -155,6 +155,13 @@ pub const ENABLE_S3_TABLES_REGION_CHECK: Config = Config::new( "Whether to enforce that S3 Tables connections are in the same region as the environment.", ); +pub const FRONTEND_READ_THEN_WRITE: Config = Config::new( + "enable_adapter_frontend_occ_read_then_write", + // WIP: true for testing in ci, Should be false before merging. + true, + "Use frontend sequencing for DELETE/UPDATE operations.", +); + /// Adds the full set of all adapter `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -178,4 +185,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION) .add(&PERSIST_FAST_PATH_ORDER) .add(&ENABLE_S3_TABLES_REGION_CHECK) + .add(&FRONTEND_READ_THEN_WRITE) } diff --git a/src/adapter/src/active_compute_sink.rs b/src/adapter/src/active_compute_sink.rs index e6251e75a6022..3e0b6357105c7 100644 --- a/src/adapter/src/active_compute_sink.rs +++ b/src/adapter/src/active_compute_sink.rs @@ -116,6 +116,9 @@ pub struct ActiveSubscribe { pub start_time: EpochMillis, /// How to present the subscribe's output. pub output: SubscribeOutput, + /// If true, this is an internal subscribe that should not appear in + /// introspection tables like mz_subscriptions. + pub internal: bool, } impl ActiveSubscribe { diff --git a/src/adapter/src/client.rs b/src/adapter/src/client.rs index 716e383ad8ef5..55c9ca12f7786 100644 --- a/src/adapter/src/client.rs +++ b/src/adapter/src/client.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use anyhow::bail; use chrono::{DateTime, Utc}; use derivative::Derivative; -use futures::{Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; use itertools::Itertools; use mz_adapter_types::connection::{ConnectionId, ConnectionIdType}; use mz_auth::Authenticated; @@ -58,13 +58,16 @@ use crate::command::{ use crate::coord::{Coordinator, ExecuteContextGuard}; use crate::error::AdapterError; use crate::metrics::Metrics; +use crate::optimize::Optimize; use crate::session::{ EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId, }; use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy}; use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType}; use crate::webhook::AppendWebhookResponse; -use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse}; +use crate::{ + AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse, optimize, +}; /// A handle to a running coordinator. /// @@ -268,6 +271,9 @@ impl Client { persist_client, statement_logging_frontend, superuser_attribute, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, } = response; let peek_client = PeekClient::new( @@ -277,6 +283,9 @@ impl Client { optimizer_metrics, persist_client, statement_logging_frontend, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, ); let mut client = SessionClient { @@ -568,6 +577,23 @@ pub struct SessionClient { pub enable_frontend_peek_sequencing: bool, } +/// Keeps a connection cancel watch installed in the coordinator for the +/// duration of a frontend read-then-write attempt. +struct FrontendConnectionCancelWatchGuard { + conn_id: ConnectionId, + client: Option, +} + +impl Drop for FrontendConnectionCancelWatchGuard { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + client.send(Command::UnregisterConnectionCancelWatch { + conn_id: self.conn_id.clone(), + }); + } + } +} + impl SessionClient { /// Parses a SQL expression, reporting failures as a telemetry event if /// possible. @@ -717,6 +743,7 @@ impl SessionClient { outer_ctx_extra: Option, ) -> Result<(ExecuteResponse, Instant), AdapterError> { let execute_started = Instant::now(); + let cancel_future = cancel_future.map(|_| ()).shared(); // Attempt peek sequencing in the session task. // If unsupported, fall back to the Coordinator path. @@ -731,11 +758,24 @@ impl SessionClient { // No additional work needed here. return Ok((resp, execute_started)); } else { - debug!("frontend peek did not happen, falling back to `Command::Execute`"); + debug!("frontend peek did not happen, trying frontend read-then-write"); + } + + // Attempt read-then-write sequencing in the session task. + if let Some(resp) = self + .try_frontend_read_then_write_with_cancel( + &portal_name, + &mut outer_ctx_extra, + cancel_future.clone(), + ) + .await? + { + debug!("frontend read-then-write succeeded"); + return Ok((resp, execute_started)); + } else { + debug!("frontend read-then-write did not happen, falling back to `Command::Execute`"); // If we bailed out, outer_ctx_extra is still present (if it was originally). // `Command::Execute` will handle it. - // (This is not true if we bailed out _after_ the frontend peek sequencing has already - // begun its own statement logging. That case would be a bug.) } let response = self @@ -746,7 +786,9 @@ impl SessionClient { tx, outer_ctx_extra, }, - cancel_future, + cancel_future + .clone() + .map(|_| std::io::Error::new(std::io::ErrorKind::Other, "connection closed")), ) .await?; Ok((response, execute_started)) @@ -1052,7 +1094,12 @@ impl SessionClient { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => {} + | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => {} }; cmd }); @@ -1156,6 +1203,290 @@ impl SessionClient { Ok(None) } } + + /// Runs frontend read-then-write while reacting to both local/session + /// cancellation and coordinator-issued connection cancellation. + async fn try_frontend_read_then_write_with_cancel( + &mut self, + portal_name: &str, + outer_ctx_extra: &mut Option, + cancel_future: impl Future + Send, + ) -> Result, AdapterError> { + let conn_id = self.session().conn_id().clone(); + + let inner_client = self.inner().clone(); + let mut cancel_future = pin::pin!(cancel_future); + + // Cancellation can arrive via two independent paths: + // + // 1) `cancel_future`: local/session-side cancellation (e.g. client + // connection closes). For this path we must still forward a privileged + // cancel to the coordinator so in-flight work owned there is canceled. + // + // 2) `connection_cancel`: coordinator-side cancellation (e.g. + // `pg_cancel_backend`) reflected through the connection cancel watch. + // This can trigger while frontend RTW is still planning or optimizing, + // before coordinator-owned subscribe/write steps are installed. + // + // We select on both so frontend RTW exits promptly regardless of where + // cancellation originated. + + let mut connection_cancel_rx = self + .peek_client + .call_coordinator(|tx| Command::RegisterConnectionCancelWatch { + conn_id: conn_id.clone(), + tx, + }) + .await; + let _connection_cancel_guard = FrontendConnectionCancelWatchGuard { + conn_id: conn_id.clone(), + client: Some(inner_client.clone()), + }; + if *connection_cancel_rx.borrow() { + return Err(AdapterError::Canceled); + } + let connection_cancel = async move { + let _ = connection_cancel_rx.wait_for(|v| *v).await; + }; + tokio::pin!(connection_cancel); + + let mut frontend_read_then_write = + pin::pin!(self.try_frontend_read_then_write(portal_name, outer_ctx_extra)); + + let mut cancelled = false; + + loop { + tokio::select! { + response = &mut frontend_read_then_write => return response, + _ = &mut cancel_future, if !cancelled => { + cancelled = true; + inner_client.send(Command::PrivilegedCancelRequest { + conn_id: conn_id.clone(), + }); + } + _ = &mut connection_cancel => { + return Err(AdapterError::Canceled); + } + } + } + } + + /// Attempt to sequence a read-then-write (DELETE/UPDATE/INSERT INTO .. + /// SELECT .. FROM) from the session task. + /// + /// Returns `Ok(Some(response))` if we handled the operation, or `Ok(None)` + /// to fall back to the Coordinator's sequencing. If it returns an error, it + /// should be returned to the user. + pub(crate) async fn try_frontend_read_then_write( + &mut self, + portal_name: &str, + outer_ctx_extra: &mut Option, + ) -> Result, AdapterError> { + use mz_expr::RowSetFinishing; + use mz_sql::plan::{MutationKind, Plan, ReadThenWritePlan}; + use mz_sql_parser::ast::Statement; + + // Check if frontend read-then-write is enabled (determined once at process startup + // to avoid a mixed-mode window where both the old lock-based path and the new OCC + // path are active concurrently). + if !self.peek_client.frontend_read_then_write_enabled { + return Ok(None); + } + + let catalog = self.catalog_snapshot("try_frontend_read_then_write").await; + + // Extract statement, params, and logging info from portal + let (stmt, params, logging, lifecycle_timestamps) = { + let session = self.session.as_ref().expect("SessionClient invariant"); + let portal = match session.get_portal_unverified(portal_name) { + Some(portal) => portal, + None => return Ok(None), // Portal doesn't exist, fall back + }; + ( + portal.stmt.clone(), + portal.parameters.clone(), + Arc::clone(&portal.logging), + portal.lifecycle_timestamps.clone(), + ) + }; + + // Check if it's a DELETE, UPDATE, or INSERT statement + let stmt = match stmt { + Some(stmt) + if matches!( + &*stmt, + Statement::Delete(_) | Statement::Update(_) | Statement::Insert(_) + ) => + { + stmt + } + Some(_stmt) => { + return Ok(None); + } + None => { + return Ok(None); + } + }; + + // Needed later, for an error message. + let stmt_string = stmt.to_string(); + + // Plan the statement + let (plan, target_cluster, resolved_ids) = { + let session = self.session.as_mut().expect("SessionClient invariant"); + let conn_catalog = catalog.for_session(session); + let (stmt, resolved_ids) = mz_sql::names::resolve(&conn_catalog, (*stmt).clone())?; + let pcx = session.pcx(); + let plan = mz_sql::plan::plan(Some(pcx), &conn_catalog, stmt, ¶ms, &resolved_ids)?; + + let target_cluster = match session.transaction().cluster() { + Some(cluster_id) => crate::coord::TargetCluster::Transaction(cluster_id), + None => crate::coord::catalog_serving::auto_run_on_catalog_server( + &conn_catalog, + session, + &plan, + ), + }; + + (plan, target_cluster, resolved_ids) + }; + + // Check RBAC permissions, mirroring the coordinator's check in sequencer.rs. + { + let session = self.session.as_ref().expect("SessionClient invariant"); + let conn_catalog = catalog.for_session(session); + let target_cluster_id = catalog + .resolve_target_cluster(target_cluster.clone(), session) + .ok() + .map(|cluster| cluster.id); + if let Err(e) = mz_sql::rbac::check_plan( + &conn_catalog, + None:: Option>, + session, + &plan, + target_cluster_id, + &resolved_ids, + ) { + return Err(e.into()); + } + } + + // Handle ReadThenWrite plans or Insert plans. + let rtw_plan = match plan { + Plan::ReadThenWrite(rtw_plan) => rtw_plan, + Plan::Insert(insert_plan) => { + // For INSERT, we need to check if it's a constant insert + // without RETURNING. Constant inserts use a fast path in the + // coordinator, so we fall back. + // + // We need to lower HIR to MIR to check for constants because + // VALUES statements are planned as Wrap calls at the HIR level. + let optimized_mir = if insert_plan.values.as_const().is_some() { + // Already constant at HIR level - just lower without optimization + let expr = insert_plan + .values + .clone() + .lower(catalog.system_config(), None)?; + mz_expr::OptimizedMirRelationExpr(expr) + } else { + // Need to optimize to check if it becomes constant + let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config()); + let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None); + optimizer.optimize(insert_plan.values.clone())? + }; + + // Constant INSERT without RETURNING are blind-writes. Add to + // the transaction and let those code paths handle it. + let inner_mir = optimized_mir.into_inner(); + if inner_mir.as_const().is_some() && insert_plan.returning.is_empty() { + let session = self.session.as_mut().expect("SessionClient invariant"); + + let statement_logging_id = self.peek_client.begin_statement_logging( + session, + ¶ms, + &logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); + + let result = + Coordinator::insert_constant(&catalog, session, insert_plan.id, inner_mir); + + if let Some(logging_id) = statement_logging_id { + self.peek_client.end_statement_logging(logging_id, &result); + } + + return Ok(Some(result?)); + } + + // Get table descriptor arity for the finishing projection + let desc_arity = match catalog.try_get_entry(&insert_plan.id) { + Some(table) => { + let desc = table + .relation_desc_latest() + .ok_or_else(|| AdapterError::Internal("table has no desc".into()))?; + desc.arity() + } + None => { + return Err(AdapterError::Catalog(mz_catalog::memory::error::Error { + kind: mz_catalog::memory::error::ErrorKind::Sql( + mz_sql::catalog::CatalogError::UnknownItem( + insert_plan.id.to_string(), + ), + ), + })); + } + }; + + let finishing = RowSetFinishing { + order_by: vec![], + limit: None, + offset: 0, + project: (0..desc_arity).collect(), + }; + + // Convert InsertPlan to ReadThenWritePlan + ReadThenWritePlan { + id: insert_plan.id, + selection: insert_plan.values, + finishing, + assignments: BTreeMap::new(), + kind: MutationKind::Insert, + returning: insert_plan.returning, + } + } + _ => { + return Err(AdapterError::Internal( + "unexpected plan type for mutation".into(), + )); + } + }; + + // The OCC path commits writes immediately and they cannot be rolled + // back, so reject explicit transaction blocks. (Constant INSERTs are + // handled above and don't go through OCC.) + { + let session = self.session.as_ref().expect("SessionClient invariant"); + if !session.transaction().is_implicit() { + return Err(AdapterError::OperationProhibitsTransaction(stmt_string)); + } + } + + let session = self.session.as_mut().expect("SessionClient invariant"); + self.peek_client + .frontend_read_then_write( + session, + rtw_plan, + target_cluster, + ¶ms, + &logging, + lifecycle_timestamps, + outer_ctx_extra, + ) + .await + .map(Some) + } } impl Drop for SessionClient { diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 405018dde0473..2f7ffdfa55ccb 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -31,7 +31,7 @@ use mz_persist_client::PersistClient; use mz_pgcopy::CopyFormatParams; use mz_repr::global_id::TransientIdGen; use mz_repr::role_id::RoleId; -use mz_repr::{CatalogItemId, ColumnIndex, GlobalId, RowIterator, SqlRelationType}; +use mz_repr::{CatalogItemId, ColumnIndex, Diff, GlobalId, Row, RowIterator, SqlRelationType}; use mz_sql::ast::{FetchDirection, Raw, Statement}; use mz_sql::catalog::ObjectType; use mz_sql::optimizer_metrics::OptimizerMetrics; @@ -41,16 +41,17 @@ use mz_sql::session::vars::{OwnedVarInput, SystemVars}; use mz_sql_parser::ast::{AlterObjectRenameStatement, AlterOwnerStatement, DropObjectsStatement}; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{Semaphore, mpsc, oneshot, watch}; use uuid::Uuid; use crate::catalog::Catalog; -use crate::coord::appends::BuiltinTableAppendNotify; +use crate::coord::appends::{BuiltinTableAppendNotify, TimestampedWriteResult}; use crate::coord::consistency::CoordinatorInconsistencies; use crate::coord::peek::{PeekDataflowPlan, PeekResponseUnary}; use crate::coord::timestamp_selection::TimestampDetermination; use crate::coord::{ExecuteContextExtra, ExecuteContextGuard}; use crate::error::AdapterError; +use crate::optimize::LirDataflowDescription; use crate::session::{EndTransactionAction, RowBatchStream, Session}; use crate::statement_logging::WatchSetCreation; use crate::statement_logging::{ @@ -324,6 +325,56 @@ pub enum Command { /// Statement logging event from frontend peek sequencing. /// No response channel needed - this is fire-and-forget. FrontendStatementLogging(FrontendStatementLoggingEvent), + + /// Registers a connection-scoped cancellation watch and returns a receiver + /// that becomes `true` when cancellation is requested for the connection. + /// + /// This is shared by coordinator staged sequencing and frontend + /// read-then-write execution. + RegisterConnectionCancelWatch { + conn_id: ConnectionId, + tx: oneshot::Sender>, + }, + + /// Unregisters a previously registered connection-scoped cancellation watch. + UnregisterConnectionCancelWatch { + conn_id: ConnectionId, + }, + + /// Creates an internal subscribe (not visible in introspection) and returns + /// the response channel. Initially used for frontend-sequenced + /// read-then-write (DELETE/UPDATE/INSERT...SELECT) operations via OCC. + CreateInternalSubscribe { + df_desc: Box, + cluster_id: ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: mz_repr::Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: ConnectionId, + session_uuid: Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: ReadHolds, + tx: oneshot::Sender, AdapterError>>, + }, + + /// Attempts a timestamped write for OCC (optimistic concurrency control). + /// Used by frontend read-then-write to submit accumulated diffs. + AttemptTimestampedWrite { + target_id: CatalogItemId, + diffs: Vec<(Row, Diff)>, + write_ts: mz_repr::Timestamp, + tx: oneshot::Sender, + }, + + /// Drops an internal subscribe. + /// + /// Used for cleanup after the subscribe's purpose is fulfilled or on error. + DropInternalSubscribe { + sink_id: GlobalId, + tx: oneshot::Sender<()>, + }, } impl Command { @@ -358,7 +409,12 @@ impl Command { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => None, + | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => None, } } @@ -393,7 +449,12 @@ impl Command { | Command::RegisterFrontendPeek { .. } | Command::UnregisterFrontendPeek { .. } | Command::ExplainTimestamp { .. } - | Command::FrontendStatementLogging(..) => None, + | Command::FrontendStatementLogging(..) + | Command::RegisterConnectionCancelWatch { .. } + | Command::UnregisterConnectionCancelWatch { .. } + | Command::CreateInternalSubscribe { .. } + | Command::AttemptTimestampedWrite { .. } + | Command::DropInternalSubscribe { .. } => None, } } } @@ -435,6 +496,15 @@ pub struct StartupResponse { pub optimizer_metrics: OptimizerMetrics, pub persist_client: PersistClient, pub statement_logging_frontend: StatementLoggingFrontend, + /// Semaphore for limiting concurrent OCC (optimistic concurrency control) + /// write operations. + pub occ_write_semaphore: Arc, + /// Whether frontend OCC read-then-write is enabled (determined once at + /// process startup). + pub frontend_read_then_write_enabled: bool, + /// Whether the coordinator is in read-only mode (e.g. during 0dt upgrades). + /// The frontend path must reject mutations when this is true. + pub read_only: bool, } #[derive(Derivative)] diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index fca7673b350c0..a1819c32fef52 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -168,7 +168,7 @@ use thiserror::Error; use timely::progress::{Antichain, Timestamp as _}; use tokio::runtime::Handle as TokioHandle; use tokio::select; -use tokio::sync::{OwnedMutexGuard, mpsc, oneshot, watch}; +use tokio::sync::{OwnedMutexGuard, Semaphore, mpsc, oneshot, watch}; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{Instrument, Level, Span, debug, info, info_span, span, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -228,6 +228,7 @@ mod indexes; mod introspection; mod message_handler; mod privatelink_status; +mod read_then_write; mod sql; mod validity; @@ -379,6 +380,13 @@ impl Message { Command::ExplainTimestamp { .. } => "explain-timestamp", Command::FrontendStatementLogging(..) => "frontend-statement-logging", Command::StartCopyFromStdin { .. } => "start-copy-from-stdin", + Command::RegisterConnectionCancelWatch { .. } => "register-connection-cancel-watch", + Command::UnregisterConnectionCancelWatch { .. } => { + "unregister-connection-cancel-watch" + } + Command::CreateInternalSubscribe { .. } => "create-internal-subscribe", + Command::AttemptTimestampedWrite { .. } => "attempt-timestamped-write", + Command::DropInternalSubscribe { .. } => "drop-internal-subscribe", }, Message::ControllerReady { controller: ControllerReadiness::Compute, @@ -1797,9 +1805,15 @@ pub struct Coordinator { /// to stage Batches in Persist that we will then link into the shard. active_copies: BTreeMap, - /// A map from connection ids to a watch channel that is set to `true` if the connection - /// received a cancel request. - staged_cancellation: BTreeMap, watch::Receiver)>, + /// Connection-scoped cancellation watches. + /// + /// Each entry is a watch channel whose value is `false` until cancellation + /// is requested for that connection, at which point it is set to `true`. + /// + /// Consumers install/remove these watches while they have cancellable work + /// in flight. This is used both by coordinator staged sequencing and by + /// frontend read-then-write sequencing. + connection_cancel_watches: BTreeMap, watch::Receiver)>, /// Active introspection subscribes. introspection_subscribes: BTreeMap, @@ -1811,6 +1825,36 @@ pub struct Coordinator { /// Pending writes waiting for a group commit. pending_writes: Vec, + /// Semaphore to limit concurrent OCC (optimistic concurrency control) + /// read-then-write operations. + /// + /// When multiple read-then-write operations run concurrently, each one + /// maintains a subscribe that continually receives updates and must + /// consolidate data. Allowing too many concurrent operations causes wasted + /// work as all subscribes receive and process updates even though only one + /// can write at a given timestamp. Worst case, with N concurrent occ loops, + /// whenever one loop succeeds N-1 loops have to update their state, so we + /// do `O(n^2)` work. + /// + /// This semaphore limits concurrency at the operation level. Each + /// read-then-write operation acquires a permit before starting its + /// subscribe, ensuring bounded resource usage. Additional operations wait + /// until a permit becomes available. + /// + /// NOTE: The number of permits is read from `max_concurrent_occ_writes` at + /// coordinator startup and is **not** updated if the system variable + /// changes at runtime. A restart of `environmentd` is required for changes + /// to take effect. + occ_write_semaphore: Arc, + + /// Whether frontend OCC read-then-write is enabled. Read once at startup + /// from the `FRONTEND_READ_THEN_WRITE` dyncfg and fixed for the lifetime of + /// this process. This avoids a mixed-mode window where both the old + /// lock-based path and the new OCC path are active concurrently, which + /// could allow an OCC write to slip between an old-path reader's read and + /// write phases. + frontend_read_then_write_enabled: bool, + /// For the realtime timeline, an explicit SELECT or INSERT on a table will bump the /// table's timestamps, but there are cases where timestamps are not bumped but /// we expect the closed timestamps to advance (`AS OF X`, SUBSCRIBing views over @@ -3551,6 +3595,7 @@ impl Coordinator { // writes. let user_write_spans = self.pending_writes.iter().flat_map(|x| match x { PendingWriteTxn::User{span, ..} => Some(span), + PendingWriteTxn::InternalTimestamped{span, ..} => Some(span), PendingWriteTxn::System{..} => None, }); let span = match user_write_spans.exactly_one() { @@ -4463,6 +4508,15 @@ pub fn serve( } let catalog = Arc::new(catalog); + // Read once at startup; changing this system variable requires + // an environmentd restart to take effect (see field doc on + // `occ_write_semaphore`). + let max_concurrent_occ_writes = + usize::cast_from(catalog.system_config().max_concurrent_occ_writes()); + let frontend_read_then_write_enabled = { + use mz_adapter_types::dyncfgs::FRONTEND_READ_THEN_WRITE; + FRONTEND_READ_THEN_WRITE.get(catalog.system_config().dyncfgs()) + }; let caching_secrets_reader = CachingSecretsReader::new(secrets_controller.reader()); let mut coord = Coordinator { @@ -4482,11 +4536,13 @@ pub fn serve( active_compute_sinks: BTreeMap::new(), active_webhooks: BTreeMap::new(), active_copies: BTreeMap::new(), - staged_cancellation: BTreeMap::new(), + connection_cancel_watches: BTreeMap::new(), introspection_subscribes: BTreeMap::new(), write_locks: BTreeMap::new(), deferred_write_ops: BTreeMap::new(), pending_writes: Vec::new(), + occ_write_semaphore: Arc::new(Semaphore::new(max_concurrent_occ_writes)), + frontend_read_then_write_enabled, advance_timelines_interval, secrets_controller, caching_secrets_reader, diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index 4f682979eb385..0a4d78f8ec633 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -31,6 +31,7 @@ use mz_sql::session::metadata::SessionMetadata; use mz_storage_client::client::TableData; use mz_timestamp_oracle::WriteTimestamp; use smallvec::SmallVec; +use timely::PartialOrder; use tokio::sync::{Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore, oneshot}; use tracing::{Instrument, Span, debug_span, info, warn}; @@ -131,6 +132,29 @@ pub(crate) enum BuiltinTableUpdateSource { Background(oneshot::Sender<()>), } +/// Result of a timestamped write attempt. +/// +/// A timestamped write is a write that must occur at a specific timestamp. This +/// is used for read-then-write patterns where a read happens at timestamp T and +/// the write must happen at exactly T+1 to maintain serializability. +#[derive(Debug, Clone)] +pub enum TimestampedWriteResult { + /// The write was committed at the target timestamp. + Success { + /// The timestamp at which the write was committed. + timestamp: Timestamp, + }, + /// The target timestamp was already past - the write cannot proceed. + TimestampPassed { + /// The timestamp the write was targeting. + target_timestamp: Timestamp, + /// The current write timestamp from the oracle. + current_write_ts: Timestamp, + }, + /// The write was cancelled (e.g., connection closed, coordinator shutdown). + Cancelled, +} + /// A pending write transaction that will be committing during the next group commit. #[derive(Debug)] pub(crate) enum PendingWriteTxn { @@ -149,6 +173,31 @@ pub(crate) enum PendingWriteTxn { updates: Vec, source: BuiltinTableUpdateSource, }, + /// Internal timestamped write (e.g., from read-then-write subscribe OCC). + /// + /// Unlike User writes, this doesn't have an ExecuteContext - the caller + /// handles responses via the `result_tx` channel. + /// + /// **Important**: This variant supports writes to multiple tables in a + /// single entry. It is the caller's responsibility to ensure that: + /// 1. All writes are consistent (e.g., computed from the same read + /// timestamp) + /// 2. Dependencies between tables have been properly resolved + /// + /// **Concurrency semantics**: Only ONE `InternalTimestamped` write is + /// processed per group commit round. If multiple are submitted at the same + /// `target_timestamp`, only one is processed and others fail with + /// `TimestampPassed`. This is required for correctness because each + /// timestamped write may not have resolved dependencies with other + /// concurrent timestamped writes - they were computed independently and + /// could be inconsistent if applied together. The failing writes will retry + /// with a new timestamp. + InternalTimestamped { + span: Span, + writes: BTreeMap>, + target_timestamp: Timestamp, + result_tx: oneshot::Sender, + }, } impl PendingWriteTxn { @@ -339,6 +388,8 @@ impl Coordinator { match pending_write { // We always allow system writes to proceed. PendingWriteTxn::System { .. } => validated_writes.push(pending_write), + // Internal timestamped writes don't need locks (OCC handles conflicts). + PendingWriteTxn::InternalTimestamped { .. } => validated_writes.push(pending_write), // We have a set of locks! Validate they're correct (expected). PendingWriteTxn::User { span, @@ -433,17 +484,62 @@ impl Coordinator { self.defer_op(acquire_future, DeferredOp::Write(write)); } - // The value returned here still might be ahead of `now()` if `now()` has gone backwards at - // any point during this method or if this was triggered from DDL. We will still commit the - // write without waiting for `now()` to advance. This is ok because the next batch of writes - // will trigger the wait loop in `try_group_commit()` if `now()` hasn't advanced past the - // global timeline, preventing an unbounded advancing of the global timeline ahead of - // `now()`. Additionally DDL is infrequent enough and takes long enough that we don't think - // it's practical for continuous DDL to advance the global timestamp in an unbounded manner. - let WriteTimestamp { - timestamp, - advance_to, - } = self.get_local_write_ts().await; + // Separate timestamped (OCC) writes from regular writes in a single pass. + let mut regular_writes = Vec::new(); + let mut timestamped_writes = Vec::new(); + for write in validated_writes { + match write { + PendingWriteTxn::InternalTimestamped { .. } => timestamped_writes.push(write), + other => regular_writes.push(other), + } + } + + // Determine the write timestamp, resolving any timestamped (OCC) writes. + // + // We can only process ONE InternalTimestamped write per group commit round because: + // 1. Each timestamped write may not have resolved dependencies with other timestamped writes + // 2. They were computed independently and could be inconsistent if applied together + // 3. After committing at timestamp T, the oracle advances past T, so other writes at T + // would fail anyway — we just fail them early here + // + // When a timestamped write is selected, we apply ONLY that write in this round and + // defer all regular writes to the next round. This avoids any subtle interactions + // between the OCC write (which targets a specific timestamp) and regular writes. + let resolved = if !timestamped_writes.is_empty() { + self.resolve_timestamped_writes(timestamped_writes).await + } else { + None + }; + + let (timestamp, advance_to, validated_writes) = match resolved { + Some((ts, advance_to, selected_write)) => { + // Defer all regular writes to the next round. + self.pending_writes.extend(regular_writes); + if !self.pending_writes.is_empty() { + self.trigger_group_commit(); + } + (ts, advance_to, vec![selected_write]) + } + None => { + // Normal flow (no eligible timestamped writes): get timestamp + // from oracle. The value returned here still might be ahead of + // `now()` if `now()` has gone backwards at any point during + // this method or if this was triggered from DDL. We will still + // commit the write without waiting for `now()` to advance. This + // is ok because the next batch of writes will trigger the wait + // loop in `try_group_commit()` if `now()` hasn't advanced past + // the global timeline, preventing an unbounded advancing of the + // global timeline ahead of `now()`. Additionally DDL is + // infrequent enough and takes long enough that we don't think + // it's practical for continuous DDL to advance the global + // timestamp in an unbounded manner. + let WriteTimestamp { + timestamp, + advance_to, + } = self.get_local_write_ts().await; + (timestamp, advance_to, regular_writes) + } + }; // While we're flipping on the feature flags for txn-wal tables and // the separated Postgres timestamp oracle, we also need to confirm @@ -465,6 +561,7 @@ impl Coordinator { let mut appends: BTreeMap> = BTreeMap::new(); let mut responses = Vec::with_capacity(validated_writes.len()); let mut notifies = Vec::new(); + let mut timestamped_result_txs = Vec::new(); for validated_write_txn in validated_writes { match validated_write_txn { @@ -506,6 +603,19 @@ impl Coordinator { | BuiltinTableUpdateSource::Background(tx) => notifies.push(tx), } } + PendingWriteTxn::InternalTimestamped { + span: _, + writes, + target_timestamp: _, + result_tx, + } => { + for (id, table_data) in writes { + if self.catalog().try_get_entry(&id).is_some() { + appends.entry(id).or_default().extend(table_data); + } + } + timestamped_result_txs.push(result_tx); + } } } @@ -585,9 +695,10 @@ impl Coordinator { .instrument(debug_span!("group_commit_apply::append_fut")) .await { - Ok(append_result) => { - append_result.unwrap_or_terminate("cannot fail to apply appends") - } + Ok(append_result) => append_result.unwrap_or_terminate(&format!( + "cannot fail to apply appends at {} ({:?})", + timestamp, permit + )), Err(_) => warn!("Writer terminated with writes in indefinite state"), }; @@ -618,6 +729,11 @@ impl Coordinator { // We don't care if the listeners have gone away. let _ = notify.send(()); } + + // Notify timestamped write callers of success. + for result_tx in timestamped_result_txs { + let _ = result_tx.send(TimestampedWriteResult::Success { timestamp }); + } } .instrument(span), ); @@ -625,6 +741,124 @@ impl Coordinator { timestamp } + /// Resolve timestamped (OCC) writes for a group commit round. + /// + /// Given a set of `InternalTimestamped` writes, this method: + /// 1. Checks which writes are still eligible (target timestamp not yet + /// passed) + /// 2. Selects ONE write at the lowest eligible timestamp for this round + /// 3. Fails writes whose timestamp has passed or that lost the selection + /// 4. Defers writes at higher timestamps to the next round + /// + /// Returns `Some((timestamp, advance_to, selected_write))` if a write was + /// selected, or `None` if all timestamped writes had already passed (in + /// which case the caller should fall back to the normal oracle). + async fn resolve_timestamped_writes( + &mut self, + timestamped_writes: Vec, + ) -> Option<(Timestamp, Timestamp, PendingWriteTxn)> { + debug_assert!(!timestamped_writes.is_empty()); + + let next_eligible_write_ts = self.peek_local_write_ts().await.step_forward(); + + // Find the lowest eligible target timestamp in a single scan. + let lowest_eligible_ts = timestamped_writes + .iter() + .filter_map(|w| match w { + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if !target_timestamp.less_than(&next_eligible_write_ts) => { + Some(*target_timestamp) + } + _ => None, + }) + .min(); + + let Some(target_ts) = lowest_eligible_ts else { + // All timestamped writes have passed. Fail them all; the caller + // will fall back to the normal oracle for regular writes. + for write in timestamped_writes { + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: next_eligible_write_ts, + }); + } + } + return None; + }; + + // Dispatch each timestamped write in a single pass: + // - Passed (target < next_eligible_write_ts): fail with TimestampPassed + // - At target_ts, first one seen: select for this round + // - At target_ts, subsequent: fail (only one per round) + // - Above target_ts: defer to next round + let mut selected: Option = None; + for write in timestamped_writes { + match &write { + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if target_timestamp.less_than(&next_eligible_write_ts) => { + // Timestamp has passed. + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: next_eligible_write_ts, + }); + } + } + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if *target_timestamp == target_ts && selected.is_none() => { + // Winner: include in this round. + selected = Some(write); + } + PendingWriteTxn::InternalTimestamped { + target_timestamp, .. + } if *target_timestamp == target_ts => { + // Loser at same timestamp — only one can write per round. + if let PendingWriteTxn::InternalTimestamped { + target_timestamp, + result_tx, + .. + } = write + { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp, + current_write_ts: target_ts.step_forward(), + }); + } + } + PendingWriteTxn::InternalTimestamped { .. } => { + // Higher timestamp — defer to next round. + self.pending_writes.push(write); + } + _ => unreachable!("timestamped_writes only contains InternalTimestamped"), + } + } + + // If there are more timestamped writes waiting, trigger another group commit. + if self + .pending_writes + .iter() + .any(|w| matches!(w, PendingWriteTxn::InternalTimestamped { .. })) + { + self.trigger_group_commit(); + } + + selected.map(|write| (target_ts, target_ts.step_forward(), write)) + } + /// Submit a write to be executed during the next group commit and trigger a group commit. pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn) { if self.controller.read_only() { diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 3307e9b8fdccd..c6dd06945b39d 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -57,7 +57,7 @@ use mz_sql_parser::ast::{ }; use mz_storage_types::sources::Timeline; use opentelemetry::trace::TraceContextExt; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tracing::{Instrument, debug_span, info, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; @@ -489,6 +489,61 @@ impl Coordinator { Command::FrontendStatementLogging(event) => { self.handle_frontend_statement_logging_event(event); } + Command::RegisterConnectionCancelWatch { conn_id, tx } => { + let rx = self + .connection_cancel_watches + .entry(conn_id) + .or_insert_with(|| watch::channel(false)) + .1 + .clone(); + let _ = tx.send(rx); + } + Command::UnregisterConnectionCancelWatch { conn_id } => { + self.connection_cancel_watches.remove(&conn_id); + } + Command::CreateInternalSubscribe { + df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + } => { + self.handle_create_internal_subscribe( + *df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + ) + .await; + } + Command::AttemptTimestampedWrite { + target_id, + diffs, + write_ts, + tx, + } => { + self.handle_attempt_timestamped_write(target_id, diffs, write_ts, tx) + .await; + } + Command::DropInternalSubscribe { sink_id, tx } => { + self.drop_internal_subscribe(sink_id).await; + let _ = tx.send(()); + } } } .instrument(debug_span!("handle_command")) @@ -761,6 +816,9 @@ impl Coordinator { persist_client: self.persist_client.clone(), statement_logging_frontend, superuser_attribute, + occ_write_semaphore: Arc::clone(&self.occ_write_semaphore), + frontend_read_then_write_enabled: self.frontend_read_then_write_enabled, + read_only: self.controller.read_only(), }); if tx.send(resp).is_err() { // Failed to send to adapter, but everything is setup so we can terminate @@ -1752,7 +1810,7 @@ impl Coordinator { self.cancel_cluster_reconfigurations_for_conn(&conn_id) .await; self.cancel_pending_copy(&conn_id); - if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) { + if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) { let _ = tx.send(true); } } diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 480d123fe826e..8bb21e4f88a3f 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -641,7 +641,11 @@ impl Coordinator { for sink_id in sink_ids { let sink = match self.remove_active_compute_sink(sink_id).await { None => { - tracing::error!(%sink_id, "drop_compute_sinks called on nonexistent sink"); + // This can happen due to a race condition: an internal + // subscribe may be cleaned up via its own message while + // session disconnect cleanup is in progress. This is + // benign. + tracing::debug!(%sink_id, "drop_compute_sinks: sink already removed"); continue; } Some(sink) => sink, diff --git a/src/adapter/src/coord/read_then_write.rs b/src/adapter/src/coord/read_then_write.rs new file mode 100644 index 0000000000000..511afa266dbd8 --- /dev/null +++ b/src/adapter/src/coord/read_then_write.rs @@ -0,0 +1,162 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Coordinator-side support machinery for frontend occ-based read-then write +//! sequencing in `frontend_read_then_write.rs`. +//! +//! N.B. It's a bit annoying that we still have the write submission go through +//! the coordinator. We can imagine in the long run we want a group-commit task +//! that runs independently, and we can directly submit write requests there. + +use std::collections::BTreeSet; + +use mz_repr::{Diff, GlobalId, Row, Timestamp}; +use mz_sql::plan::SubscribeOutput; +use tokio::sync::mpsc; + +use crate::PeekResponseUnary; +use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe}; +use crate::coord::Coordinator; +use crate::coord::appends::TimestampedWriteResult; +use crate::error::AdapterError; + +impl Coordinator { + /// Handle a Command to create an internal subscribe. + /// + /// Internal subscribes are not visible in introspection collections. They + /// are initially used for frontend-sequenced read-then-write + /// (DELETE/UPDATE/INSERT ...SELECT) via OCC. + /// + /// This is called from the frontend OCC implementation after it has + /// acquired the semaphore permit. We create the subscribe here (on the + /// coordinator) and return the channel to the caller. + /// + /// The `read_holds` parameter contains the read holds for this specific + /// operation. They are passed directly through the stages (not via the + /// connection-keyed txn_read_holds map) to avoid issues where multiple + /// operations on the same connection could interfere with each other's + /// holds. + #[allow(clippy::too_many_arguments)] + pub(crate) async fn handle_create_internal_subscribe( + &mut self, + df_desc: crate::optimize::LirDataflowDescription, + cluster_id: mz_compute_types::ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: mz_adapter_types::connection::ConnectionId, + session_uuid: uuid::Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: crate::ReadHolds, + response_tx: tokio::sync::oneshot::Sender< + Result, AdapterError>, + >, + ) { + // Check if connection still exists. If not, the client disconnected + // while waiting for the semaphore - the operation is effectively cancelled. + if !self.active_conns.contains_key(&conn_id) { + // Send error to indicate cancellation + let _ = response_tx.send(Err(AdapterError::Canceled)); + return; + } + + // Create the channel for subscribe responses + let (tx, rx) = mpsc::unbounded_channel(); + + let active_subscribe = ActiveSubscribe { + conn_id: conn_id.clone(), + session_uuid, + channel: tx, + emit_progress: true, // We need progress updates for OCC + as_of, + arity, + cluster_id, + depends_on, + start_time, + output: SubscribeOutput::Diffs, // Output format for diffs + internal: true, // Internal subscribe - skip builtin table updates + }; + active_subscribe.initialize(); + + // Add metadata for the subscribe + let write_notify_fut = self + .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe)) + .await; + + // Ship dataflow + let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id); + + let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await; + + // Send the receiver back to the frontend + let _ = response_tx.send(Ok(rx)); + + // The read_holds are dropped here at the end of the function, after ship_dataflow + // has completed. This ensures the since doesn't advance past our as_of timestamp + // until the dataflow is running. + drop(read_holds); + } + + /// Handle the write attempt from the OCC loop. + pub(crate) async fn handle_attempt_timestamped_write( + &mut self, + target_id: mz_repr::CatalogItemId, + diffs: Vec<(Row, Diff)>, + write_ts: Timestamp, + result_tx: tokio::sync::oneshot::Sender, + ) { + use crate::coord::appends::PendingWriteTxn; + use mz_storage_client::client::TableData; + use smallvec::smallvec; + use std::collections::BTreeMap; + use tracing::Span; + + if self.controller.read_only() { + panic!( + "attempting OCC read-then-write in read-only mode: write_ts={}, target_id={:?}", + write_ts, target_id + ); + } + + // Early check if timestamp already passed + let next_eligible_write_ts = self.peek_local_write_ts().await.step_forward(); + if write_ts < next_eligible_write_ts { + let _ = result_tx.send(TimestampedWriteResult::TimestampPassed { + target_timestamp: write_ts, + current_write_ts: next_eligible_write_ts, + }); + return; + } + + // Create TableData from accumulated diffs + let table_data = TableData::Rows(diffs); + let writes = BTreeMap::from([(target_id, smallvec![table_data])]); + + tracing::trace!(?writes, ?write_ts, "about to attempt read-then-write"); + + // Push internal timestamped write directly to pending_writes + self.pending_writes + .push(PendingWriteTxn::InternalTimestamped { + span: Span::current(), + writes, + target_timestamp: write_ts, + result_tx, + }); + self.trigger_group_commit(); + } + + /// Drop an internal subscribe. + pub(crate) async fn drop_internal_subscribe(&mut self, sink_id: GlobalId) { + // Use drop_compute_sink instead of remove_active_compute_sink to also + // cancel the dataflow on the compute side, not just remove bookkeeping. + let _ = self.drop_compute_sink(sink_id).await; + } +} diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 7753afb5db5ca..b252446a446c6 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -96,6 +96,8 @@ use crate::util::ClientTransmitter; mod inner; +pub(crate) use inner::validate_read_dependencies; + impl Coordinator { /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would /// get stored on the stack which is bad for runtime performance, and blow up our stack usage. @@ -884,6 +886,13 @@ impl Coordinator { // Consolidate rows. This is useful e.g. for an UPDATE where the row // doesn't change, and we need to reflect that in the number of // affected rows. + // + // NOTE: This behavior differs from PostgreSQL. In PostgreSQL, + // `UPDATE t SET x = x` reports the number of rows matching the WHERE + // clause, even if no values actually change. In Materialize, because + // we use differential dataflow, the +1 and -1 diffs for unchanged rows + // cancel out during consolidation, resulting in 0 affected rows. + // This has been Materialize's behavior since early versions. differential_dataflow::consolidation::consolidate(&mut plan.updates); affected_rows = Diff::ZERO; diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 873fd46ed87e5..23690170ba434 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -143,6 +143,94 @@ macro_rules! return_if_err { pub(super) use return_if_err; +/// Validates that all dependencies are valid for read-then-write operations. +/// +/// Ensures all objects the selection depends on are valid for `ReadThenWrite` operations: +/// +/// - They do not refer to any objects whose notion of time moves differently than that of +/// user tables. This limitation is meant to ensure no writes occur between this read and the +/// subsequent write. +/// - They do not use mz_now(), whose time produced during read will differ from the write +/// timestamp. +pub(crate) fn validate_read_dependencies( + catalog: &Catalog, + id: &CatalogItemId, +) -> Result<(), AdapterError> { + use CatalogItemType::*; + use mz_catalog::memory::objects; + let mut ids_to_check = Vec::new(); + let valid = match catalog.try_get_entry(id) { + Some(entry) => { + if let CatalogItem::View(objects::View { optimized_expr, .. }) + | CatalogItem::MaterializedView(objects::MaterializedView { + optimized_expr, .. + }) = entry.item() + { + if optimized_expr.contains_temporal() { + return Err(AdapterError::Unsupported( + "calls to mz_now in write statements", + )); + } + } + match entry.item().typ() { + typ @ (Func | View | MaterializedView | ContinualTask) => { + ids_to_check.extend(entry.uses()); + let valid_id = id.is_user() || matches!(typ, Func); + valid_id + } + Source | Secret | Connection => false, + // Cannot select from sinks or indexes. + Sink | Index => unreachable!(), + Table => { + if !id.is_user() { + // We can't read from non-user tables + false + } else { + // We can't read from tables that are source-exports + entry.source_export_details().is_none() + } + } + Type => true, + } + } + None => false, + }; + if !valid { + let (object_name, object_type) = match catalog.try_get_entry(id) { + Some(entry) => { + let object_name = catalog.resolve_full_name(entry.name(), None).to_string(); + let object_type = match entry.item().typ() { + // We only need the disallowed types here; the allowed types are handled above. + Source => "source", + Secret => "secret", + Connection => "connection", + Table => { + if !id.is_user() { + "system table" + } else { + "source-export table" + } + } + View => "system view", + MaterializedView => "system materialized view", + ContinualTask => "system task", + _ => "invalid dependency", + }; + (object_name, object_type.to_string()) + } + None => (id.to_string(), "unknown".to_string()), + }; + return Err(AdapterError::InvalidTableMutationSelection { + object_name, + object_type, + }); + } + for id in ids_to_check { + validate_read_dependencies(catalog, &id)?; + } + Ok(()) +} + struct DropOps { ops: Vec, dropped_active_db: bool, @@ -158,10 +246,16 @@ struct CreateSourceInner { } impl Coordinator { - /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that - /// execute both on and off of the coordinator thread. Stages can either produce another stage - /// to execute or a final response. An explicit [Span] is passed to allow for convenient - /// tracing. + /// Sequences a [Staged] plan. + /// + /// This is designed for plans that execute both on and off the coordinator + /// thread. Stages can either produce another stage to execute or a final + /// response. + /// + /// While a stage is cancelable, this method installs a + /// connection-scoped cancel watch entry so background work can observe + /// cancellation requests consistently with other frontend/coordinator + /// sequencing paths. pub(crate) async fn sequence_staged( &mut self, mut ctx: S::Ctx, @@ -179,7 +273,7 @@ impl Coordinator { // Channel to await cancellation. Insert a new channel, but check if the previous one // was already canceled. if let Some((_prev_tx, prev_rx)) = self - .staged_cancellation + .connection_cancel_watches .insert(session.conn_id().clone(), watch::channel(false)) { let was_canceled = *prev_rx.borrow(); @@ -191,7 +285,7 @@ impl Coordinator { } else { // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value // when cancel_enabled may have been true on an earlier stage. - self.staged_cancellation.remove(session.conn_id()); + self.connection_cancel_watches.remove(session.conn_id()); } } else { cancel_enabled = false @@ -224,6 +318,8 @@ impl Coordinator { } } + /// Waits for either the spawned stage work to complete or cancellation to + /// be signaled through the connection-scoped cancel watch. fn handle_spawn( &self, ctx: C, @@ -237,7 +333,7 @@ impl Coordinator { { let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx .session() - .and_then(|session| self.staged_cancellation.get(session.conn_id())) + .and_then(|session| self.connection_cancel_watches.get(session.conn_id())) { let mut rx = rx.clone(); Box::pin(async move { @@ -2682,93 +2778,7 @@ impl Coordinator { return; } - // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations: - // - // - They do not refer to any objects whose notion of time moves differently than that of - // user tables. This limitation is meant to ensure no writes occur between this read and the - // subsequent write. - // - They do not use mz_now(), whose time produced during read will differ from the write - // timestamp. - fn validate_read_dependencies( - catalog: &Catalog, - id: &CatalogItemId, - ) -> Result<(), AdapterError> { - use CatalogItemType::*; - use mz_catalog::memory::objects; - let mut ids_to_check = Vec::new(); - let valid = match catalog.try_get_entry(id) { - Some(entry) => { - if let CatalogItem::View(objects::View { optimized_expr, .. }) - | CatalogItem::MaterializedView(objects::MaterializedView { - optimized_expr, - .. - }) = entry.item() - { - if optimized_expr.contains_temporal() { - return Err(AdapterError::Unsupported( - "calls to mz_now in write statements", - )); - } - } - match entry.item().typ() { - typ @ (Func | View | MaterializedView | ContinualTask) => { - ids_to_check.extend(entry.uses()); - let valid_id = id.is_user() || matches!(typ, Func); - valid_id - } - Source | Secret | Connection => false, - // Cannot select from sinks or indexes. - Sink | Index => unreachable!(), - Table => { - if !id.is_user() { - // We can't read from non-user tables - false - } else { - // We can't read from tables that are source-exports - entry.source_export_details().is_none() - } - } - Type => true, - } - } - None => false, - }; - if !valid { - let (object_name, object_type) = match catalog.try_get_entry(id) { - Some(entry) => { - let object_name = catalog.resolve_full_name(entry.name(), None).to_string(); - let object_type = match entry.item().typ() { - // We only need the disallowed types here; the allowed types are handled above. - Source => "source", - Secret => "secret", - Connection => "connection", - Table => { - if !id.is_user() { - "system table" - } else { - "source-export table" - } - } - View => "system view", - MaterializedView => "system materialized view", - ContinualTask => "system task", - _ => "invalid dependency", - }; - (object_name, object_type.to_string()) - } - None => (id.to_string(), "unknown".to_string()), - }; - return Err(AdapterError::InvalidTableMutationSelection { - object_name, - object_type, - }); - } - for id in ids_to_check { - validate_read_dependencies(catalog, &id)?; - } - Ok(()) - } - + // Ensure all objects `selection` depends on are valid for `ReadThenWrite` operations. for gid in selection.depends_on() { let item_id = self.catalog().resolve_item_id(&gid); if let Err(err) = validate_read_dependencies(self.catalog(), &item_id) { diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index 429ee32b14067..0a392832ba2dd 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -456,6 +456,8 @@ impl Coordinator { depends_on: dependency_ids, start_time: self.now(), output, + internal: false, // User-facing subscribe - appears in + // mz_subscriptions. }; active_subscribe.initialize(); diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index feec9e1384e30..0ab2720bf2fae 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -216,7 +216,7 @@ impl Coordinator { /// Clears coordinator state for a connection. pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) { - self.staged_cancellation.remove(conn_id); + self.connection_cancel_watches.remove(conn_id); self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished) .await; self.retire_cluster_reconfigurations_for_conn(conn_id).await; @@ -264,18 +264,27 @@ impl Coordinator { let ret_fut = match &active_sink { ActiveComputeSink::Subscribe(active_subscribe) => { - let update = - self.catalog() - .state() - .pack_subscribe_update(id, active_subscribe, Diff::ONE); - let update = self.catalog().state().resolve_builtin_table_update(update); + // Skip builtin table update for internal subscribes + if active_subscribe.internal { + #[allow(clippy::as_conversions)] + { + Box::pin(std::future::ready(())) as BuiltinTableAppendNotify + } + } else { + let update = self.catalog().state().pack_subscribe_update( + id, + active_subscribe, + Diff::ONE, + ); + let update = self.catalog().state().resolve_builtin_table_update(update); - self.metrics - .active_subscribes - .with_label_values(&[session_type]) - .inc(); + self.metrics + .active_subscribes + .with_label_values(&[session_type]) + .inc(); - self.builtin_table_update().execute(vec![update]).await.0 + self.builtin_table_update().execute(vec![update]).await.0 + } } ActiveComputeSink::CopyTo(_) => { self.metrics @@ -311,18 +320,21 @@ impl Coordinator { match &sink { ActiveComputeSink::Subscribe(active_subscribe) => { - let update = self.catalog().state().pack_subscribe_update( - id, - active_subscribe, - Diff::MINUS_ONE, - ); - let update = self.catalog().state().resolve_builtin_table_update(update); - self.builtin_table_update().blocking(vec![update]).await; + // Skip builtin table update for internal subscribes + if !active_subscribe.internal { + let update = self.catalog().state().pack_subscribe_update( + id, + active_subscribe, + Diff::MINUS_ONE, + ); + let update = self.catalog().state().resolve_builtin_table_update(update); + self.builtin_table_update().blocking(vec![update]).await; - self.metrics - .active_subscribes - .with_label_values(&[session_type]) - .dec(); + self.metrics + .active_subscribes + .with_label_values(&[session_type]) + .dec(); + } } ActiveComputeSink::CopyTo(_) => { self.metrics diff --git a/src/adapter/src/frontend_peek.rs b/src/adapter/src/frontend_peek.rs index 6f3e2e9f78ee5..98f5a39dfcd89 100644 --- a/src/adapter/src/frontend_peek.rs +++ b/src/adapter/src/frontend_peek.rs @@ -192,33 +192,14 @@ impl PeekClient { // Set up statement logging, and log the beginning of execution. // (But only if we're not executing in the context of another statement.) - let statement_logging_id = if outer_ctx_extra.is_none() { - // This is a new statement, so begin statement logging - let result = self.statement_logging_frontend.begin_statement_execution( - session, - ¶ms, - &logging, - catalog.system_config(), - lifecycle_timestamps, - ); - - if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { - self.log_began_execution(began_execution, mseh_update, prepared_statement); - Some(logging_id) - } else { - None - } - } else { - // We're executing in the context of another statement (e.g., FETCH), - // so extract the statement logging ID from the outer context if present. - // We take ownership and retire the outer context here. The end of execution will be - // logged in one of the following ways: - // - At the end of this function, if the execution is finished by then. - // - Later by the Coordinator, either due to RegisterFrontendPeek or ExecuteSlowPathPeek. - outer_ctx_extra - .take() - .and_then(|guard| guard.defuse().retire()) - }; + let statement_logging_id = self.begin_statement_logging( + session, + ¶ms, + &logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); let result = self .try_frontend_peek_inner(session, catalog, stmt, params, statement_logging_id) diff --git a/src/adapter/src/frontend_read_then_write.rs b/src/adapter/src/frontend_read_then_write.rs new file mode 100644 index 0000000000000..d57a072b996b1 --- /dev/null +++ b/src/adapter/src/frontend_read_then_write.rs @@ -0,0 +1,1085 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Frontend sequencing for read-then-write operations. +//! +//! This module implements INSERT [...] SELECT FROM [...], DELETE and UPDATE +//! operations using a subscribe with optimistic concurrency control (OCC), +//! sequenced from the session task rather than the Coordinator. This reduces +//! coordinator bottlenecking. +//! +//! The approach is: +//! 1. Validate and optimize MIR locally +//! 2. Determine timestamp via coordinator +//! 3. Optimize LIR locally +//! 4. Acquire OCC semaphore +//! 5. Create subscribe via Coordinator Command +//! 6. Run OCC loop (receive diffs, attempt write, retry on conflict) +//! 7. Return result +//! +//! ## Rollout note +//! +//! The `FRONTEND_READ_THEN_WRITE` dyncfg is read once at process startup and +//! fixed for the lifetime of the `environmentd` process. This avoids a +//! mixed-mode window where both the old lock-based coordinator path and this +//! OCC path are active concurrently — the old path acquires write locks to +//! prevent concurrent writes between its read and write phases, but this OCC +//! path does not use write locks, so concurrent operation of both paths could +//! allow an OCC write to slip between an old-path reader's read and write. + +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::num::{NonZeroI64, NonZeroUsize}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use differential_dataflow::consolidation; +use itertools::Itertools; +use mz_cluster_client::ReplicaId; +use mz_compute_types::ComputeInstanceId; +use mz_expr::{CollectionPlan, Id, LocalId, MirRelationExpr, MirScalarExpr}; +use mz_ore::cast::CastFrom; +use mz_repr::optimize::OverrideFrom; +use mz_repr::{ + CatalogItemId, Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowArena, Timestamp, +}; +use mz_sql::plan::{self, MutationKind, Params, QueryWhen}; +use mz_sql::session::metadata::SessionMetadata; +use qcell::QCell; +use timely::progress::Antichain; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::catalog::Catalog; +use crate::command::{Command, ExecuteResponse}; +use crate::coord::appends::TimestampedWriteResult; +use crate::coord::sequencer::validate_read_dependencies; +use crate::coord::{ExecuteContextGuard, TargetCluster}; +use crate::error::AdapterError; +use crate::optimize::Optimize; +use crate::optimize::dataflows::{ComputeInstanceSnapshot, EvalTime, ExprPrep, ExprPrepOneShot}; +use crate::session::{LifecycleTimestamps, Session, TransactionOps}; +use crate::statement_logging::{ + PreparedStatementLoggingInfo, StatementEndedExecutionReason, StatementLifecycleEvent, + StatementLoggingId, +}; +use crate::{PeekClient, PeekResponseUnary, TimelineContext, optimize}; + +/// A handle to an internal subscribe (not visible in introspection collections +/// like `mz_subscriptions`). A `Drop` impl ensures the subscribe's dataflow is +/// cleaned up when dropped. +pub(crate) struct SubscribeHandle { + rx: mpsc::UnboundedReceiver, + sink_id: GlobalId, + /// Wrapped in `Option` so we can move it out in `Drop`. + client: Option, +} + +impl SubscribeHandle { + /// Receive the next message from the subscribe, waiting if necessary. + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } + + /// Try to receive a message without waiting. + pub fn try_recv(&mut self) -> Result { + self.rx.try_recv() + } +} + +impl Drop for SubscribeHandle { + fn drop(&mut self) { + if let Some(client) = self.client.take() { + let sink_id = self.sink_id; + + // Fire-and-forget: if the coordinator is gone, the subscribe will + // be cleaned up when the process exits anyway. + let (tx, _rx) = tokio::sync::oneshot::channel(); + client.send(Command::DropInternalSubscribe { sink_id, tx }); + } + } +} + +impl PeekClient { + /// Execute a read-then-write operation using frontend sequencing. + /// + /// Called by session code when the frontend_read_then_write dyncfg is + /// enabled. + pub(crate) async fn frontend_read_then_write( + &mut self, + session: &mut Session, + plan: plan::ReadThenWritePlan, + target_cluster: TargetCluster, + params: &Params, + logging: &Arc>, + lifecycle_timestamps: Option, + outer_ctx_extra: &mut Option, + ) -> Result { + // Get catalog snapshot + let catalog = self.catalog_snapshot("frontend_read_then_write").await; + + // Set up statement logging. If this execution happens as part of an outer statement + // (e.g. EXECUTE/FETCH), reuse and retire that existing context. + let statement_logging_id = self.begin_statement_logging( + session, + params, + logging, + &catalog, + lifecycle_timestamps, + outer_ctx_extra, + ); + + // Reject mutations in read-only mode (e.g. during 0dt upgrades). + if self.read_only { + let result = Err(AdapterError::ReadOnly); + if let Some(logging_id) = statement_logging_id { + self.log_ended_execution( + logging_id, + StatementEndedExecutionReason::Errored { + error: AdapterError::ReadOnly.to_string(), + }, + ); + } + return result; + } + + // From this point on, we need to ensure we log the end of execution on + // any exit path. We use an inner function to capture all the logic that + // may fail. + let result = self + .frontend_read_then_write_inner( + session, + plan, + target_cluster, + &catalog, + statement_logging_id, + ) + .await; + + // Log the end of execution + if let Some(logging_id) = statement_logging_id { + self.end_statement_logging(logging_id, &result); + } + + result + } + + /// Inner implementation of frontend_read_then_write that handles the actual + /// logic. + /// + /// This is separated to ensure statement logging end is always called from + /// the outer function. + async fn frontend_read_then_write_inner( + &mut self, + session: &mut Session, + mut plan: plan::ReadThenWritePlan, + target_cluster: TargetCluster, + catalog: &Arc, + statement_logging_id: Option, + ) -> Result { + // Validate the operation + let validation_result = + self.validate_read_then_write(catalog, session, &plan, target_cluster)?; + + let ValidationResult { + cluster_id, + replica_id, + timeline, + depends_on, + table_desc, + } = validation_result; + + // Log cluster selection + if let Some(logging_id) = statement_logging_id { + self.log_set_cluster(logging_id, cluster_id); + } + + // Read-then-write is rejected in explicit transaction blocks (checked + // in SessionClient::try_frontend_read_then_write), so we're always in + // an implicit (autocommit) transaction here. The actual data is written + // directly via AttemptTimestampedWrite / InternalTimestamped, bypassing + // session transaction ops. The empty Writes(vec![]) just marks this as + // a write transaction in the session state machine so auto-commit + // handles it correctly. This is safe because there's no ROLLBACK + // opportunity in an implicit transaction. + debug_assert!( + session.transaction().is_implicit(), + "read-then-write should be rejected in explicit transactions" + ); + session.add_transaction_ops(TransactionOps::Writes(vec![]))?; + + // Prepare expressions (resolve unmaterializable functions like + // current_user()) + let style = ExprPrepOneShot { + logical_time: EvalTime::NotAvailable, // We already errored out on mz_now above. + session, + catalog_state: catalog.state(), + }; + for expr in plan + .assignments + .values_mut() + .chain(plan.returning.iter_mut()) + { + style.prep_scalar_expr(expr)?; + } + + // Optimize MIR + let (optimizer, global_mir_plan) = + self.optimize_mir_read_then_write(catalog, session, &plan, cluster_id)?; + + // Determine timestamp and acquire read holds + let oracle_read_ts = self.oracle_read_ts(&timeline).await?; + let bundle = &global_mir_plan.id_bundle(cluster_id); + let (determination, read_holds) = self + .frontend_determine_timestamp( + catalog.state(), + session, + bundle, + &QueryWhen::FreshestTableWrite, + cluster_id, + &timeline, + oracle_read_ts, + None, + ) + .await?; + + let as_of = determination.timestamp_context.timestamp_or_default(); + + // Optimize LIR + let global_lir_plan = + self.optimize_lir_read_then_write(optimizer, global_mir_plan, as_of)?; + + // Log optimization finished + if let Some(logging_id) = statement_logging_id { + self.log_lifecycle_event(logging_id, StatementLifecycleEvent::OptimizationFinished); + } + + let sink_id = global_lir_plan.sink_id(); + let target_id = plan.id; + let kind = plan.kind.clone(); + let returning = plan.returning.clone(); + + let (df_desc, _df_meta) = global_lir_plan.unapply(); + + let arity = df_desc + .sink_exports + .values() + .next() + .expect("has sink") + .from_desc + .arity(); + + let conn_id = session.conn_id().clone(); + let session_uuid = session.uuid(); + let start_time = (self.statement_logging_frontend.now)(); + let max_result_size = catalog.system_config().max_result_size(); + let max_occ_retries = usize::cast_from(catalog.system_config().max_occ_retries()); + let statement_timeout = *session.vars().statement_timeout(); + + // Linearize the read: wait for the oracle to advance past `as_of` + // before creating the subscribe or attempting any writes. This + // mirrors the old coordinator path's `strict_serializable_reads_tx` + // step and prevents writes at far-future timestamps (e.g. from a + // REFRESH MV with a far-future since) from bumping the oracle into + // the future. + self.ensure_read_linearized(&timeline, as_of).await?; + + // Acquire OCC semaphore permit to limit concurrent write operations + let _permit = Arc::clone(&self.occ_write_semaphore) + .acquire_owned() + .await + .expect("semaphore closed"); + + let subscribe_handle = self + .create_internal_subscribe( + Box::new(df_desc), + cluster_id, + replica_id, + depends_on.clone(), + as_of, + arity, + sink_id, + conn_id.clone(), + session_uuid, + start_time, + read_holds, + ) + .await?; + + let (retry_count, result) = self + .run_occ_loop( + subscribe_handle, + target_id, + kind, + returning, + max_result_size, + max_occ_retries, + table_desc, + statement_timeout, + ) + .await; + + self.coordinator_client() + .metrics() + .occ_retry_count + .observe(f64::from(u32::try_from(retry_count).unwrap_or(u32::MAX))); + + let result = result?; + + Ok(result) + } + + /// Validate a read-then-write operation. + fn validate_read_then_write( + &self, + catalog: &Arc, + session: &Session, + plan: &plan::ReadThenWritePlan, + target_cluster: TargetCluster, + ) -> Result { + // Disallow mz_now in any position because read time and write time differ. + let contains_temporal = plan.selection.contains_temporal()? + || plan.assignments.values().any(|e| e.contains_temporal()) + || plan.returning.iter().any(|e| e.contains_temporal()); + if contains_temporal { + return Err(AdapterError::Unsupported( + "calls to mz_now in write statements", + )); + } + + // Validate read dependencies + for gid in plan.selection.depends_on() { + let item_id = catalog.resolve_item_id(&gid); + validate_read_dependencies(catalog, &item_id)?; + } + + let cluster = catalog.resolve_target_cluster(target_cluster, session)?; + let cluster_id = cluster.id; + + if cluster.replicas().next().is_none() { + return Err(AdapterError::NoClusterReplicasAvailable { + name: cluster.name.clone(), + is_managed: cluster.is_managed(), + }); + } + + let replica_id = session + .vars() + .cluster_replica() + .map(|name| { + cluster + .replica_id(name) + .ok_or(AdapterError::UnknownClusterReplica { + cluster_name: cluster.name.clone(), + replica_name: name.to_string(), + }) + }) + .transpose()?; + + // Determine timeline + let depends_on = plan.selection.depends_on(); + let timeline = catalog.validate_timeline_context(depends_on.iter().copied())?; + + // Get the table descriptor for constraint validation + let table_desc = catalog + .get_entry(&plan.id) + .relation_desc_latest() + .expect("table has desc") + .into_owned(); + + Ok(ValidationResult { + cluster_id, + replica_id, + timeline, + depends_on, + table_desc, + }) + } + + /// Optimize MIR for a read-then-write operation. + fn optimize_mir_read_then_write( + &self, + catalog: &Arc, + session: &dyn SessionMetadata, + plan: &plan::ReadThenWritePlan, + cluster_id: ComputeInstanceId, + ) -> Result< + ( + optimize::subscribe::Optimizer, + optimize::subscribe::GlobalMirPlan, + ), + AdapterError, + > { + let plan::ReadThenWritePlan { + id: _, + selection, + // The finishing (row ordering/limit/projection) from the plan is not used here. + // For the subscribe-based OCC path, we emit raw diffs and the mutation + // transformation (apply_mutation_to_mir) handles column projection for updates. + // The selection expression already produces rows matching the target table schema. + finishing: _, + assignments, + kind, + returning: _, + } = plan; + + // Lower HIR to MIR + let expr = selection.clone().lower(catalog.system_config(), None)?; + + // Transform for subscribe-based diffs + let mut expr = apply_mutation_to_mir(expr, kind, assignments); + + // Resolve unmaterializable functions (now(), current_user, etc.) + // before the subscribe optimizer sees them. The subscribe uses + // ExprPrepMaintained which rejects these, but our subscribe is a + // one-shot read so we can safely resolve them to constants. + // mz_now() is already rejected by validate_read_then_write(). + let style = ExprPrepOneShot { + logical_time: EvalTime::NotAvailable, + session, + catalog_state: catalog.state(), + }; + expr.try_visit_scalars_mut(&mut |s| style.prep_scalar_expr(s))?; + + // Create optimizer + let compute_instance = ComputeInstanceSnapshot::new_without_collections(cluster_id); + let (_, view_id) = self.transient_id_gen.allocate_id(); + let (_, sink_id) = self.transient_id_gen.allocate_id(); + let debug_name = format!("frontend-read-then-write-subscribe-{}", sink_id); + let optimizer_config = optimize::OptimizerConfig::from(catalog.system_config()) + .override_from(&catalog.get_cluster(cluster_id).config.features()); + + let mut optimizer = optimize::subscribe::Optimizer::new( + Arc::::clone(catalog), + compute_instance, + view_id, + sink_id, + true, // with_snapshot + None, // up_to + debug_name, + optimizer_config, + self.optimizer_metrics.clone(), + ); + + // Get the relation desc for the expression + let expr_typ = expr.typ(); + let sql_typ = mz_repr::SqlRelationType::from_repr(&expr_typ); + let column_names: Vec = (0..sql_typ.column_types.len()) + .map(|i| format!("column{}", i)) + .collect(); + let relation_desc = RelationDesc::new(sql_typ, column_names.iter().map(|s| s.as_str())); + + // Create the subscribe from the query + let from = mz_sql::plan::SubscribeFrom::Query { + expr, + desc: relation_desc, + }; + + // MIR => MIR optimization (global) + let global_mir_plan = optimizer.optimize(from)?; + + Ok((optimizer, global_mir_plan)) + } + + /// Optimize LIR for a read-then-write operation. + fn optimize_lir_read_then_write( + &self, + mut optimizer: optimize::subscribe::Optimizer, + global_mir_plan: optimize::subscribe::GlobalMirPlan, + as_of: Timestamp, + ) -> Result { + let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); + let global_lir_plan = optimizer.optimize(global_mir_plan)?; + Ok(global_lir_plan) + } + + /// Get the oracle read timestamp for the timeline. + async fn oracle_read_ts( + &mut self, + timeline: &TimelineContext, + ) -> Result, AdapterError> { + match timeline.timeline() { + Some(timeline) => { + let oracle = self.ensure_oracle(timeline.clone()).await?; + Ok(Some(oracle.read_ts().await)) + } + None => Ok(None), + } + } + + /// Ensure the oracle timestamp has advanced past the given timestamp. + /// + /// For read-then-write operations, when no write is submitted (because no + /// rows matched), we must still linearize the read to prevent subsequent + /// reads from seeing an earlier state. When a write WAS submitted, the + /// group commit bumps the oracle past `as_of`, so this returns immediately. + /// + /// This is analogous to the coordinator's `strict_serializable_reads_tx` + /// mechanism used by the regular SELECT/PEEK path, which waits for the + /// oracle to advance past the chosen read timestamp before returning + /// results. + async fn ensure_read_linearized( + &mut self, + timeline: &TimelineContext, + as_of: Timestamp, + ) -> Result<(), AdapterError> { + let tl = match timeline.timeline() { + Some(tl) => tl, + None => return Ok(()), + }; + + let oracle = self.ensure_oracle(tl.clone()).await?; + + loop { + let oracle_ts = oracle.read_ts().await; + if as_of <= oracle_ts { + return Ok(()); + } + + // Sleep for roughly the difference between as_of and the current + // oracle timestamp. Since timestamps are epoch milliseconds, the + // difference is the approximate wall-clock time we need to wait. + // Cap at 1s to avoid very long sleeps if clocks are skewed, + // matching the cap in `message_linearize_reads`. + let wait_ms = u64::from(as_of.saturating_sub(oracle_ts)); + let wait = Duration::from_millis(wait_ms).min(Duration::from_secs(1)); + tokio::time::sleep(wait).await; + } + } + + /// Creates an internal subscribe that does not appear in introspection + /// tables. Returns a [`SubscribeHandle`] that ensures cleanup on drop. + async fn create_internal_subscribe( + &self, + df_desc: Box, + cluster_id: ComputeInstanceId, + replica_id: Option, + depends_on: BTreeSet, + as_of: Timestamp, + arity: usize, + sink_id: GlobalId, + conn_id: mz_adapter_types::connection::ConnectionId, + session_uuid: Uuid, + start_time: mz_ore::now::EpochMillis, + read_holds: crate::ReadHolds, + ) -> Result { + let rx: mpsc::UnboundedReceiver = self + .call_coordinator(|tx| Command::CreateInternalSubscribe { + df_desc, + cluster_id, + replica_id, + depends_on, + as_of, + arity, + sink_id, + conn_id, + session_uuid, + start_time, + read_holds, + tx, + }) + .await?; + + Ok(SubscribeHandle { + rx, + sink_id, + client: Some(self.coordinator_client().clone()), + }) + } + + /// Run the OCC loop that receives subscribe rows and tries to write them. + /// + /// NOTE on correctness: semantically, a read-then-write is a SELECT + /// followed by an INSERT. And normally we have to linearize reads, that is + /// we make sure that the oracle timestamp is at least the timestamp that is + /// chosen for a PEEK, to ensure that results can't "go backwards". With our + /// subscribe loop we might get data that is timestamped beyond the current + /// oracle timestamp BUT then actually applying the write will bump the + /// oracle read timestamp to the timestamp of the write so at that time it + /// will be true that `chosen_ts <= oracle_ts`. Returns `(retry_count, + /// write_submitted, result)` so the caller can record OCC retry metrics + /// regardless of whether the operation succeeded or failed. + async fn run_occ_loop( + &self, + mut subscribe_handle: SubscribeHandle, + target_id: CatalogItemId, + kind: MutationKind, + returning: Vec, + max_result_size: u64, + max_occ_retries: usize, + table_desc: RelationDesc, + statement_timeout: Duration, + ) -> (usize, Result) { + // Timeout of 0 is equivalent to "off", meaning we will wait "forever." + let effective_timeout = if statement_timeout == Duration::ZERO { + Duration::MAX + } else { + statement_timeout + }; + let start_time = Instant::now(); + + /// State for the OCC loop + struct OccState { + all_diffs: Vec<(Row, Timestamp, Diff)>, + current_upper: Option, + initial_progress_seen: bool, + retry_count: usize, + byte_size: u64, + } + + impl OccState { + fn new() -> Self { + Self { + all_diffs: Vec::new(), + current_upper: None, + initial_progress_seen: false, + retry_count: 0, + byte_size: 0, + } + } + + /// Forward all timestamps to `current_upper` and consolidate diffs. + /// + /// After consolidation, `all_diffs` represents the net state of the + /// query as of the latest progress timestamp. Rows that were + /// retracted by newer updates cancel out, and `byte_size` is + /// recomputed to reflect the consolidated data. + fn consolidate(&mut self) { + let target_ts = self.current_upper.unwrap_or(Timestamp::MIN); + for (_, ts, _) in self.all_diffs.iter_mut() { + *ts = target_ts; + } + consolidation::consolidate_updates(&mut self.all_diffs); + self.byte_size = self + .all_diffs + .iter() + .map(|(row, _, _)| u64::cast_from(row.byte_len())) + .sum(); + } + } + + /// Result of processing a subscribe message + enum ProcessResult { + Continue { ready_to_write: bool }, + NoRowsMatched, + Error(AdapterError), + } + + fn process_message( + response: PeekResponseUnary, + state: &mut OccState, + max_result_size: u64, + table_desc: &RelationDesc, + ) -> ProcessResult { + match response { + PeekResponseUnary::Rows(mut rows) => { + let mut saw_progress = false; + + while let Some(row) = rows.next() { + let mut datums = row.iter(); + + // Extract mz_timestamp (SubscribeOutput::Diffs format: + // mz_timestamp, mz_progressed, mz_diff, ...data columns...) + let ts_datum = datums.next().expect("missing mz_timestamp"); + let ts = match ts_datum { + mz_repr::Datum::Numeric(n) => match n.0.try_into() { + Ok(ts_u64) => Timestamp::new(ts_u64), + Err(_) => { + return ProcessResult::Error(AdapterError::Internal(format!( + "mz_timestamp in subscribe output is not a valid u64: {n}" + ))); + } + }, + other => { + return ProcessResult::Error(AdapterError::Internal(format!( + "unexpected mz_timestamp datum: {other:?}" + ))); + } + }; + + // Extract mz_progressed + let progressed_datum = datums.next().expect("missing mz_progressed"); + let is_progress = matches!(progressed_datum, mz_repr::Datum::True); + + if is_progress { + state.current_upper = Some(ts); + saw_progress = true; + + // Consolidate incrementally on each progress + // message. This keeps memory bounded by the + // consolidated size and makes the byte_size check + // below accurate (except for rows received between + // two progress messages, which is a small window). + state.consolidate(); + + if !state.initial_progress_seen { + state.initial_progress_seen = true; + } else if state.all_diffs.is_empty() { + return ProcessResult::NoRowsMatched; + } + } else { + // Data row - extract mz_diff + let diff_datum = datums.next().expect("missing mz_diff"); + let diff = match diff_datum { + mz_repr::Datum::Int64(d) => Diff::from(d), + _ => continue, + }; + + let data_row = Row::pack(datums); + + // Validate constraints for rows being added (positive diff) + if diff.is_positive() { + for (idx, datum) in data_row.iter().enumerate() { + if let Err(e) = table_desc.constraints_met(idx, &datum) { + return ProcessResult::Error(e.into()); + } + } + } + + state.byte_size = state + .byte_size + .saturating_add(u64::cast_from(data_row.byte_len())); + if state.byte_size > max_result_size { + return ProcessResult::Error(AdapterError::ResultSize(format!( + "result exceeds max size of {}", + max_result_size + ))); + } + state.all_diffs.push((data_row, ts, diff)); + } + } + + let ready_to_write = + saw_progress && state.initial_progress_seen && !state.all_diffs.is_empty(); + ProcessResult::Continue { ready_to_write } + } + PeekResponseUnary::Error(e) => ProcessResult::Error(AdapterError::Internal(e)), + PeekResponseUnary::Canceled => ProcessResult::Error(AdapterError::Canceled), + } + } + + let build_no_rows_response = |kind: &MutationKind, returning: &[MirScalarExpr]| { + if !returning.is_empty() { + let rows: Vec = vec![]; + return Ok(ExecuteResponse::SendingRowsImmediate { + rows: Box::new(rows.into_row_iter()), + }); + } + Ok(match kind { + MutationKind::Delete => ExecuteResponse::Deleted(0), + MutationKind::Update => ExecuteResponse::Updated(0), + MutationKind::Insert => ExecuteResponse::Inserted(0), + }) + }; + + let mut state = OccState::new(); + + // Correctness invariant for retries: + // + // `all_diffs` accumulates *all* rows ever received from the subscribe, + // across retries. The subscribe emits a snapshot (at the as_of + // timestamp) followed by incremental updates. We consolidate on every + // progress message (flattening timestamps to MIN first), so after + // consolidation `all_diffs` always represents "what the query returns + // as of the latest progress timestamp" — old snapshot rows that were + // retracted by newer updates cancel out, and new rows appear. This is + // exactly the set of diffs we want to write. + // + // Consolidating on every progress also means the NoRowsMatched check + // works correctly across retries: if the consolidated result becomes + // logically empty (all diffs cancel out), `all_diffs` will be empty + // and we early-return without attempting a write. + let result = loop { + // Check for timeout + let remaining = effective_timeout.saturating_sub(start_time.elapsed()); + if remaining.is_zero() { + // Guard handles cleanup on drop. + break Err(AdapterError::StatementTimeout); + } + + let msg = match tokio::time::timeout(remaining, subscribe_handle.recv()).await { + Ok(Some(msg)) => msg, + Ok(None) => { + // Channel closed — the subscribe was dropped, most likely + // because the connection was canceled. + if state.initial_progress_seen && state.all_diffs.is_empty() { + break build_no_rows_response(&kind, &returning); + } + break Err(AdapterError::Canceled); + } + Err(_) => { + // Timed out + break Err(AdapterError::StatementTimeout); + } + }; + + match process_message(msg, &mut state, max_result_size, &table_desc) { + ProcessResult::Continue { ready_to_write } => { + if !ready_to_write { + continue; + } + + // Drain pending messages before attempting write + let drain_err = loop { + match subscribe_handle.try_recv() { + Ok(msg) => { + match process_message(msg, &mut state, max_result_size, &table_desc) + { + ProcessResult::Continue { .. } => {} + ProcessResult::NoRowsMatched => { + break Some(build_no_rows_response(&kind, &returning)); + } + ProcessResult::Error(e) => { + break Some(Err(e)); + } + } + } + Err(mpsc::error::TryRecvError::Empty) => break None, + Err(mpsc::error::TryRecvError::Disconnected) => { + break Some(Err(AdapterError::Canceled)); + } + } + }; + if let Some(result) = drain_err { + break result; + } + + let write_ts = state + .current_upper + .expect("must have seen progress to be ready to write"); + + // Consolidate any rows received during the drain + // (the bulk was already consolidated on the last progress). + state.consolidate(); + + // Submit write + let result = self + .call_coordinator(|tx| Command::AttemptTimestampedWrite { + target_id, + diffs: state + .all_diffs + .iter() + .map(|(row, _ts, diff)| (row.clone(), *diff)) + .collect_vec(), + write_ts, + tx, + }) + .await; + + match result { + TimestampedWriteResult::Success { .. } => { + // N.B. subscribe_handle is dropped here, which + // fires off the cleanup message. + break self.build_success_response(kind, returning, &state.all_diffs); + } + TimestampedWriteResult::TimestampPassed { + current_write_ts, .. + } => { + // Do not advance `state.current_upper` (and + // therefore `write_ts`) from `current_write_ts`. + // The diffs in `all_diffs` are only known to be + // correct as of subscribe progress we have actually + // observed. Retrying at a newer oracle timestamp + // before subscribe progress catches up would risk + // applying stale diffs at the wrong timestamp. So + // on `TimestampPassed` we wait for the subscribe to + // progress and retry using that observed frontier. + state.retry_count += 1; + if state.retry_count >= max_occ_retries { + break Err(AdapterError::Internal( + "read-then-write exceeded maximum retry attempts (high contention?)".into(), + )); + } + tracing::debug!( + retry_count = state.retry_count, + write_ts = %write_ts, + current_write_ts = %current_write_ts, + "OCC write conflict, retrying" + ); + continue; + } + TimestampedWriteResult::Cancelled => { + break Err(AdapterError::Canceled); + } + } + } + ProcessResult::NoRowsMatched => { + break build_no_rows_response(&kind, &returning); + } + ProcessResult::Error(e) => { + break Err(e); + } + } + }; + + (state.retry_count, result) + } + + /// Build the success response after a successful write. + fn build_success_response( + &self, + kind: MutationKind, + returning: Vec, + all_diffs: &[(Row, Timestamp, Diff)], + ) -> Result { + if returning.is_empty() { + // No RETURNING clause: return row count. + // Sum the absolute value of all diffs. For UPDATE, each changed row + // produces a retraction (-1) and an insertion (+1), so we divide by 2. + let row_count = all_diffs + .iter() + .map(|(_, _, diff)| diff.into_inner().unsigned_abs()) + .sum::(); + let row_count = + usize::try_from(row_count).expect("positive row count must fit in usize"); + + return Ok(match kind { + MutationKind::Delete => ExecuteResponse::Deleted(row_count), + MutationKind::Update => ExecuteResponse::Updated(row_count / 2), + MutationKind::Insert => ExecuteResponse::Inserted(row_count), + }); + } + + // RETURNING: evaluate expressions against rows + let mut returning_rows = Vec::new(); + let arena = RowArena::new(); + + for (row, _ts, diff) in all_diffs { + // For DELETE: evaluate against rows with negative diff + // For UPDATE: evaluate against rows with positive diff (new rows) + // For INSERT: evaluate against rows with positive diff + let include = match kind { + MutationKind::Delete => diff.is_negative(), + MutationKind::Update | MutationKind::Insert => diff.is_positive(), + }; + + if !include { + continue; + } + + let mut returning_row = Row::with_capacity(returning.len()); + let mut packer = returning_row.packer(); + let datums: Vec<_> = row.iter().collect(); + + for expr in &returning { + match expr.eval(&datums, &arena) { + Ok(datum) => packer.push(datum), + Err(err) => return Err(err.into()), + } + } + + let multiplicity = NonZeroUsize::try_from( + NonZeroI64::try_from(diff.into_inner().abs()).expect("diff is non-zero"), + ) + .map_err(AdapterError::from)?; + + returning_rows.push((returning_row, multiplicity)); + } + + let rows: Vec = returning_rows + .into_iter() + .flat_map(|(row, count)| std::iter::repeat(row).take(count.get())) + .collect(); + Ok(ExecuteResponse::SendingRowsImmediate { + rows: Box::new(rows.into_row_iter()), + }) + } +} + +/// Result of validating a read-then-write operation. +struct ValidationResult { + cluster_id: ComputeInstanceId, + replica_id: Option, + timeline: TimelineContext, + depends_on: BTreeSet, + /// The table descriptor, used for constraint validation. + table_desc: RelationDesc, +} + +/// Transform a MIR expression to produce the appropriate diffs for a mutation. +/// +/// - DELETE: Negates the expression to produce `(row, -1)` diffs +/// - UPDATE: Unions negated old rows with mapped new rows to produce both +/// `(old_row, -1)` and `(new_row, +1)` diffs +fn apply_mutation_to_mir( + expr: MirRelationExpr, + kind: &MutationKind, + assignments: &BTreeMap, +) -> MirRelationExpr { + match kind { + MutationKind::Delete => MirRelationExpr::Negate { + input: Box::new(expr), + }, + MutationKind::Update => { + let arity = expr.arity(); + + // Find a fresh LocalId that won't conflict with any in the expression. + // We scan all Let/LetRec bindings (the only MIR nodes that introduce LocalIds) + // and pick max + 1. + let mut max_id = 0_u64; + expr.visit_pre(|e| match e { + MirRelationExpr::Let { id, .. } => { + max_id = std::cmp::max(max_id, id.into()); + } + MirRelationExpr::LetRec { ids, .. } => { + for id in ids { + max_id = std::cmp::max(max_id, id.into()); + } + } + _ => {} + }); + let binding_id = LocalId::new(max_id + 1); + + let get_binding = MirRelationExpr::Get { + id: Id::Local(binding_id), + typ: expr.typ(), + access_strategy: mz_expr::AccessStrategy::UnknownOrLocal, + }; + + // Build map expressions + let map_scalars: Vec = (0..arity) + .map(|i| { + assignments + .get(&i) + .cloned() + .unwrap_or_else(|| MirScalarExpr::column(i)) + }) + .collect(); + + let new_rows = get_binding + .clone() + .map(map_scalars) + .project((arity..2 * arity).collect()); + + let old_rows = MirRelationExpr::Negate { + input: Box::new(get_binding), + }; + + let body = new_rows.union(old_rows); + + MirRelationExpr::Let { + id: binding_id, + value: Box::new(expr), + body: Box::new(body), + } + } + MutationKind::Insert => { + // INSERT: rows pass through unchanged with positive diffs. The + // subscribe will naturally emit each row with diff +1. + // + // Note: Unlike the old PEEK-based path, if the write fails and + // retries, the subscribe reflects the current state of the source + // expression. For INSERT...SELECT where the SELECT references the + // target table, a retry will see the effects of concurrent writes. + // This is correct OCC behavior (the retry operates on the latest + // state) but differs from the old snapshot-once semantics. + expr + } + } +} diff --git a/src/adapter/src/lib.rs b/src/adapter/src/lib.rs index f28cdf717e215..8d05f1972aaa3 100644 --- a/src/adapter/src/lib.rs +++ b/src/adapter/src/lib.rs @@ -44,6 +44,7 @@ mod coord; mod error; mod explain; mod frontend_peek; +mod frontend_read_then_write; mod notice; mod optimize; mod util; diff --git a/src/adapter/src/metrics.rs b/src/adapter/src/metrics.rs index 2f258410e9382..2290aca028e92 100644 --- a/src/adapter/src/metrics.rs +++ b/src/adapter/src/metrics.rs @@ -54,6 +54,7 @@ pub struct Metrics { pub apply_catalog_implications_seconds: Histogram, pub group_commit_confirm_leadership_seconds: Histogram, pub group_commit_table_advancement_seconds: Histogram, + pub occ_retry_count: Histogram, } impl Metrics { @@ -246,6 +247,11 @@ impl Metrics { name: "mz_group_commit_table_advancement_seconds", help: "The time it takes to iterate over all catalog entries to find tables during group commit.", buckets: histogram_seconds_buckets(0.001, 32.0), + )), + occ_retry_count: registry.register(metric!( + name: "mz_occ_read_then_write_retry_count", + help: "Number of OCC retries per read-then-write operation.", + buckets: vec![0., 1., 2., 3., 5., 10., 25., 50., 100., 500., 1000.], )) } } diff --git a/src/adapter/src/optimize.rs b/src/adapter/src/optimize.rs index 13315f45e2eed..c870bf5200bf1 100644 --- a/src/adapter/src/optimize.rs +++ b/src/adapter/src/optimize.rs @@ -86,8 +86,8 @@ use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError}; /// by the optimizer implementations. type MirDataflowDescription = DataflowDescription; /// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally -/// by the optimizer implementations. -type LirDataflowDescription = DataflowDescription; +/// by the optimizer implementations and for read-then-write subscribes. +pub type LirDataflowDescription = DataflowDescription; // Core API // -------- diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 165a125ecb29a..9f1d9a3b5c956 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -29,20 +29,23 @@ use mz_timestamp_oracle::TimestampOracle; use prometheus::Histogram; use thiserror::Error; use timely::progress::Antichain; -use tokio::sync::oneshot; +use tokio::sync::{Semaphore, oneshot}; use uuid::Uuid; use crate::catalog::Catalog; -use crate::command::{CatalogSnapshot, Command}; -use crate::coord::Coordinator; +use crate::command::{CatalogSnapshot, Command, ExecuteResponse}; use crate::coord::peek::FastPathPlan; -use crate::statement_logging::WatchSetCreation; +use crate::coord::{Coordinator, ExecuteContextGuard}; +use crate::session::{LifecycleTimestamps, Session}; use crate::statement_logging::{ - FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend, - StatementLoggingId, + FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo, + StatementLoggingFrontend, StatementLoggingId, WatchSetCreation, }; use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging}; +use mz_sql::plan::Params; +use qcell::QCell; + /// Storage collections trait alias we need to consult for since/frontiers. pub type StorageCollectionsHandle = Arc< dyn mz_storage_client::storage_collections::StorageCollections @@ -69,6 +72,12 @@ pub struct PeekClient { persist_client: PersistClient, /// Statement logging state for frontend peek sequencing. pub statement_logging_frontend: StatementLoggingFrontend, + /// Semaphore for limiting concurrent OCC (optimistic concurrency control) write operations. + pub occ_write_semaphore: Arc, + /// Whether frontend OCC read-then-write is enabled (determined once at process startup). + pub frontend_read_then_write_enabled: bool, + /// Whether the coordinator is in read-only mode. Mutations must be rejected. + pub read_only: bool, } impl PeekClient { @@ -80,6 +89,9 @@ impl PeekClient { optimizer_metrics: OptimizerMetrics, persist_client: PersistClient, statement_logging_frontend: StatementLoggingFrontend, + occ_write_semaphore: Arc, + frontend_read_then_write_enabled: bool, + read_only: bool, ) -> Self { Self { coordinator_client, @@ -90,6 +102,9 @@ impl PeekClient { statement_logging_frontend, oracles: Default::default(), // lazily populated persist_client, + occ_write_semaphore, + frontend_read_then_write_enabled, + read_only, } } @@ -154,6 +169,12 @@ impl PeekClient { .expect("if the coordinator is still alive, it shouldn't have dropped our call") } + /// Returns a clone of the coordinator client, for use in cleanup guards + /// that need to send fire-and-forget commands. + pub(crate) fn coordinator_client(&self) -> &crate::Client { + &self.coordinator_client + } + /// Acquire read holds on the given compute/storage collections, and /// determine the smallest common valid write frontier among the specified collections. /// @@ -419,7 +440,59 @@ impl PeekClient { }) } - // Statement logging helper methods + /// Set up statement logging for a frontend-sequenced operation. + /// + /// If `outer_ctx_extra` is `None`, begins a new statement execution log + /// entry. If `outer_ctx_extra` is `Some` (e.g. EXECUTE/FETCH), reuses and + /// retires the existing logging context. + /// + /// Returns the logging ID if this statement is being logged, or `None` if + /// it was not sampled. + pub(crate) fn begin_statement_logging( + &self, + session: &mut Session, + params: &Params, + logging: &Arc>, + catalog: &Catalog, + lifecycle_timestamps: Option, + outer_ctx_extra: &mut Option, + ) -> Option { + if outer_ctx_extra.is_none() { + let result = self.statement_logging_frontend.begin_statement_execution( + session, + params, + logging, + catalog.system_config(), + lifecycle_timestamps, + ); + + if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result { + self.log_began_execution(began_execution, mseh_update, prepared_statement); + Some(logging_id) + } else { + None + } + } else { + outer_ctx_extra + .take() + .and_then(|guard| guard.defuse().retire()) + } + } + + /// Log the end of a frontend-sequenced statement execution. + pub(crate) fn end_statement_logging( + &self, + logging_id: StatementLoggingId, + result: &Result, + ) { + let reason = match result { + Ok(resp) => resp.into(), + Err(e) => statement_logging::StatementEndedExecutionReason::Errored { + error: e.to_string(), + }, + }; + self.log_ended_execution(logging_id, reason); + } /// Log the beginning of statement execution. pub(crate) fn log_began_execution( diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index bf6e92e085e7a..8adbede644d27 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -1206,6 +1206,102 @@ fn test_cancel_long_running_query() { .expect("simple query succeeds after cancellation"); } +// Test that frontend-sequenced read-then-write statements honor pgwire cancel +// requests and do not run to completion after cancellation. +#[mz_ore::test] +fn test_cancel_frontend_read_then_write_long_running_query() { + let server = test_util::TestHarness::default() + .unsafe_mode() + .with_system_parameter_default( + "enable_adapter_frontend_occ_read_then_write".to_string(), + "true".to_string(), + ) + .start_blocking(); + server.enable_feature_flags(&["unsafe_enable_unsafe_functions"]); + + let mut client = server.connect(postgres::NoTls).unwrap(); + let cancel_token = client.cancel_token(); + + client + .batch_execute("CREATE TABLE t (a TEXT, ts INT)") + .unwrap(); + client + .batch_execute("INSERT INTO t VALUES ('hello', 10)") + .unwrap(); + + let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); + let cancel_thread = thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(200)); + match shutdown_rx.try_recv() { + Ok(()) => return, + Err(std::sync::mpsc::TryRecvError::Empty) => { + let _ = cancel_token.cancel_query(postgres::NoTls); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => return, + } + } + }); + + match client.batch_execute( + "INSERT INTO t SELECT a, CASE WHEN mz_unsafe.mz_sleep(ts) > 0 THEN 0 END AS ts FROM t", + ) { + Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {} + Err(e) => panic!("expected error SqlState::QUERY_CANCELED, but got {e:?}"), + Ok(_) => panic!("expected error SqlState::QUERY_CANCELED, but query succeeded"), + } + + shutdown_tx.send(()).unwrap(); + cancel_thread.join().unwrap(); + + let rows = client + .query_one("SELECT count(*) FROM t", &[]) + .unwrap() + .get::<_, i64>(0); + assert_eq!( + rows, 1, + "cancelled statement should not have committed writes" + ); + + // NOTE: mz_sleep with a constant ts get's evaluated differently. This gives + // us additional coverage for cancelling at different moments in the + // processing pipeline. + let cancel_token = client.cancel_token(); + let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel(); + let cancel_thread = thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(200)); + match shutdown_rx.try_recv() { + Ok(()) => return, + Err(std::sync::mpsc::TryRecvError::Empty) => { + let _ = cancel_token.cancel_query(postgres::NoTls); + } + Err(std::sync::mpsc::TryRecvError::Disconnected) => return, + } + } + }); + + match client.batch_execute( + "INSERT INTO t SELECT a, CASE WHEN mz_unsafe.mz_sleep(10) > 0 THEN 0 END AS ts FROM t", + ) { + Err(e) if e.code() == Some(&SqlState::QUERY_CANCELED) => {} + Err(e) => panic!("expected error SqlState::QUERY_CANCELED, but got {e:?}"), + Ok(_) => panic!("expected error SqlState::QUERY_CANCELED, but query succeeded"), + } + + shutdown_tx.send(()).unwrap(); + cancel_thread.join().unwrap(); + + let rows = client + .query_one("SELECT count(*) FROM t", &[]) + .unwrap() + .get::<_, i64>(0); + assert_eq!( + rows, 1, + "cancelled statement should not have committed writes" + ); +} + fn test_cancellation_cancels_dataflows(query: &str) { // Query that returns how many dataflows are currently installed. // Accounts for the presence of introspection subscribe dataflows by ignoring those. @@ -3253,11 +3349,11 @@ fn test_github_20262() { } } -// Test that the server properly handles cancellation requests of read-then-write queries. +// Test that a timed-out read-then-write query does not commit its writes. // See database-issues#6134. #[mz_ore::test] #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `epoll_wait` on OS `linux` -fn test_cancel_read_then_write() { +fn test_timeout_read_then_write() { let server = test_util::TestHarness::default() .unsafe_mode() .start_blocking(); @@ -3267,53 +3363,27 @@ fn test_cancel_read_then_write() { client .batch_execute("CREATE TABLE foo (a TEXT, ts INT)") .unwrap(); + client + .batch_execute("INSERT INTO foo VALUES ('hello', 10)") + .unwrap(); - // Lots of races here, so try this whole thing in a loop. - Retry::default() - .clamp_backoff(Duration::ZERO) - .retry(|_state| { - let mut client1 = server.connect(postgres::NoTls).unwrap(); - let mut client2 = server.connect(postgres::NoTls).unwrap(); - let cancel_token = client2.cancel_token(); + client + .batch_execute("SET statement_timeout = '5s'") + .unwrap(); - client1.batch_execute("DELETE FROM foo").unwrap(); - client1.batch_execute("SET statement_timeout = '5s'").unwrap(); - client1 - .batch_execute("INSERT INTO foo VALUES ('hello', 10)") - .unwrap(); + let err = client + .batch_execute("INSERT INTO foo SELECT a, CASE WHEN mz_unsafe.mz_sleep(ts) > 0 THEN 0 END AS ts FROM foo") + .unwrap_err(); + assert_contains!(err.to_string_with_causes(), "statement timeout"); - let handle1 = thread::spawn(move || { - let err = client1 - .batch_execute("insert into foo select a, case when mz_unsafe.mz_sleep(ts) > 0 then 0 end as ts from foo") - .unwrap_err(); - assert_contains!( - err.to_string_with_causes(), - "statement timeout" - ); - client1 - }); - std::thread::sleep(Duration::from_millis(100)); - let handle2 = thread::spawn(move || { - let err = client2 - .batch_execute("insert into foo values ('blah', 1);") - .unwrap_err(); - assert_contains!( - err.to_string_with_causes(), - "canceling statement" - ); - }); - std::thread::sleep(Duration::from_millis(100)); - cancel_token.cancel_query(postgres::NoTls)?; - let mut client1 = handle1.join().unwrap(); - handle2.join().unwrap(); - let rows:i64 = client1.query_one ("SELECT count(*) FROM foo", &[]).unwrap().get(0); - // We ran 3 inserts. First succeeded. Second timedout. Third cancelled. - if rows !=1 { - anyhow::bail!("unexpected row count: {rows}"); - } - Ok::<_, anyhow::Error>(()) - }) - .unwrap(); + let rows: i64 = client + .query_one("SELECT count(*) FROM foo", &[]) + .unwrap() + .get(0); + assert_eq!( + rows, 1, + "timed-out statement should not have committed writes" + ); } #[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))] diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 0d466e1596750..63eda50100a72 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1680,8 +1680,12 @@ fn test_subscribe_outlive_cluster() { .batch_execute("CREATE CLUSTER newcluster REPLICAS (r1 (size 'scale=1,workers=1'))") .unwrap(); client2_cancel.cancel_query(postgres::NoTls).unwrap(); - client2 - .batch_execute("ROLLBACK; SET CLUSTER = default") + // The cancel is asynchronous and might race with subsequent commands. + // Retry ROLLBACK in a loop in case it gets canceled. + Retry::default() + .max_tries(5) + .clamp_backoff(Duration::from_millis(100)) + .retry(|_| client2.batch_execute("ROLLBACK; SET CLUSTER = default")) .unwrap(); assert_eq!( client2 diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 19fb87d574db6..dc4f758fd1ef9 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1104,6 +1104,8 @@ impl SystemVars { &MAX_RESULT_SIZE, &MAX_COPY_FROM_ROW_SIZE, &ALLOWED_CLUSTER_REPLICA_SIZES, + &MAX_CONCURRENT_OCC_WRITES, + &MAX_OCC_RETRIES, &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE, &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET, &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES, @@ -1658,6 +1660,16 @@ impl SystemVars { .collect() } + /// Returns the value of the `max_concurrent_occ_writes` configuration parameter. + pub fn max_concurrent_occ_writes(&self) -> u32 { + *self.expect_value(&MAX_CONCURRENT_OCC_WRITES) + } + + /// Returns the value of the `max_occ_retries` configuration parameter. + pub fn max_occ_retries(&self) -> u32 { + *self.expect_value(&MAX_OCC_RETRIES) + } + /// Returns the value of the `default_cluster_replication_factor` configuration parameter. pub fn default_cluster_replication_factor(&self) -> u32 { *self.expect_value::(&DEFAULT_CLUSTER_REPLICATION_FACTOR) diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index 7e8ce95c5fac7..3cc15b530e68d 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -633,6 +633,20 @@ pub static ALLOWED_CLUSTER_REPLICA_SIZES: VarDefinition = VarDefinition::new( true, ); +pub static MAX_CONCURRENT_OCC_WRITES: VarDefinition = VarDefinition::new( + "max_concurrent_occ_writes", + value!(u32; 4), + "Maximum number of concurrent read-then-write (DELETE/UPDATE) operations using OCC. Read at startup; changes require an environmentd restart (Materialize).", + false, +); + +pub static MAX_OCC_RETRIES: VarDefinition = VarDefinition::new( + "max_occ_retries", + value!(u32; 1000), + "Maximum number of OCC retry attempts per read-then-write operation before giving up (Materialize).", + false, +); + pub static PERSIST_FAST_PATH_LIMIT: VarDefinition = VarDefinition::new( "persist_fast_path_limit", value!(usize; 25),