diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index ee8b26a4..d5cacb22 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -18,3 +18,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( taskname TEXT NOT NULL, on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 ); + +CREATE INDEX idx_activation_partition ON inflight_taskactivations (partition); diff --git a/src/kafka/consumer.rs b/src/kafka/consumer.rs index 16d9b1e8..24ee265e 100644 --- a/src/kafka/consumer.rs +++ b/src/kafka/consumer.rs @@ -1,3 +1,4 @@ +use crate::store::inflight_activation::InflightActivationStore; use anyhow::{Error, anyhow}; use futures::{ Stream, StreamExt, @@ -44,6 +45,7 @@ use tracing::{debug, error, info, instrument, warn}; pub async fn start_consumer( topics: &[&str], kafka_client_config: &ClientConfig, + activation_store: Arc, spawn_actors: impl FnMut( Arc>, &BTreeSet<(String, i32)>, @@ -68,6 +70,7 @@ pub async fn start_consumer( handle_events( consumer, event_receiver, + activation_store, client_shutdown_sender, spawn_actors, ) @@ -340,6 +343,7 @@ enum ConsumerState { pub async fn handle_events( consumer: Arc>, events: UnboundedReceiver<(Event, SyncSender<()>)>, + activation_store: Arc, shutdown_client: oneshot::Sender<()>, mut spawn_actors: impl FnMut( Arc>, @@ -372,6 +376,12 @@ pub async fn handle_events( state = match (state, event) { (ConsumerState::Ready, Event::Assign(tpl)) => { metrics::gauge!("arroyo.consumer.current_partitions").set(tpl.len() as f64); + // Note: This assumes we only process one topic per consumer. + let mut partitions = Vec::::new(); + for (_, partition) in tpl.iter() { + partitions.push(*partition); + } + activation_store.assign_partitions(partitions).unwrap(); ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl) } (ConsumerState::Ready, Event::Revoke(_)) => { diff --git a/src/main.rs b/src/main.rs index 7970939d..744de735 100644 --- a/src/main.rs +++ b/src/main.rs @@ -162,6 +162,7 @@ async fn main() -> Result<(), Error> { start_consumer( &[&consumer_config.kafka_topic], &consumer_config.kafka_consumer_config(), + consumer_store.clone(), processing_strategy!({ err: OsStreamWriter::new( diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 9de6bf2b..b76392c4 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -337,21 +337,13 @@ impl InflightActivationStoreConfig { #[async_trait] pub trait InflightActivationStore: Send + Sync { - /// Trigger incremental vacuum to reclaim free pages in the database - async fn vacuum_db(&self) -> Result<(), Error>; - - /// Perform a full vacuum on the database - async fn full_vacuum_db(&self) -> Result<(), Error>; - - /// Get the size of the database in bytes - async fn db_size(&self) -> Result; - - /// Get an activation by id - async fn get_by_id(&self, id: &str) -> Result, Error>; - + /// CONSUMER OPERATIONS /// Store a batch of activations async fn store(&self, batch: Vec) -> Result; + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error>; + + /// SERVER OPERATIONS /// Get a single pending activation, optionally filtered by namespace async fn get_pending_activation( &self, @@ -385,6 +377,14 @@ pub trait InflightActivationStore: Send + Sync { limit: Option, ) -> Result, Error>; + /// Update the status of a specific activation + async fn set_status( + &self, + id: &str, + status: InflightActivationStatus, + ) -> Result, Error>; + + /// COUNT OPERATIONS /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; @@ -400,12 +400,9 @@ pub trait InflightActivationStore: Send + Sync { /// Count all activations async fn count(&self) -> Result; - /// Update the status of a specific activation - async fn set_status( - &self, - id: &str, - status: InflightActivationStatus, - ) -> Result, Error>; + /// ACTIVATION OPERATIONS + /// Get an activation by id + async fn get_by_id(&self, id: &str) -> Result, Error>; /// Set the processing deadline for a specific activation async fn set_processing_deadline( @@ -417,12 +414,20 @@ pub trait InflightActivationStore: Send + Sync { /// Delete an activation by id async fn delete_activation(&self, id: &str) -> Result<(), Error>; + /// DATABASE OPERATIONS + /// Trigger incremental vacuum to reclaim free pages in the database + async fn vacuum_db(&self) -> Result<(), Error>; + + /// Perform a full vacuum on the database + async fn full_vacuum_db(&self) -> Result<(), Error>; + + /// Get the size of the database in bytes + async fn db_size(&self) -> Result; + + /// UPKEEP OPERATIONS /// Get all activations with status Retry async fn get_retry_activations(&self) -> Result, Error>; - /// Clear all activations from the store - async fn clear(&self) -> Result<(), Error>; - /// Update tasks that exceeded their processing deadline async fn handle_processing_deadline(&self) -> Result; @@ -447,6 +452,10 @@ pub trait InflightActivationStore: Send + Sync { /// Remove killswitched tasks async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result; + /// TEST OPERATIONS + /// Clear all activations from the store + async fn clear(&self) -> Result<(), Error>; + /// Remove the database, used only in tests async fn remove_db(&self) -> Result<(), Error> { Ok(()) @@ -714,6 +723,11 @@ impl InflightActivationStore for SqliteActivationStore { Ok(Some(row.into())) } + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + warn!("assign_partitions: {:?}", partitions); + Ok(()) + } + #[instrument(skip_all)] async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index e14ba1e4..151b7f44 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -585,6 +585,105 @@ async fn test_set_activation_status(#[case] adapter: &str) { store.remove_db().await.unwrap(); } +#[tokio::test] +#[rstest] +#[case::postgres("postgres")] +async fn test_set_activation_status_with_partitions(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + + let mut batch = make_activations(2); + batch[1].partition = 1; + assert!(store.store(batch).await.is_ok()); + assert_counts( + StatusCount { + pending: 1, + ..StatusCount::default() + }, + store.as_ref(), + ) + .await; + + assert!( + store + .set_status("id_0", InflightActivationStatus::Failure) + .await + .is_ok() + ); + assert_counts( + StatusCount { + failure: 1, + ..StatusCount::default() + }, + store.as_ref(), + ) + .await; + + assert!( + store + .set_status("id_0", InflightActivationStatus::Pending) + .await + .is_ok() + ); + assert_counts( + StatusCount { + pending: 1, + ..StatusCount::default() + }, + store.as_ref(), + ) + .await; + assert!( + store + .set_status("id_0", InflightActivationStatus::Failure) + .await + .is_ok() + ); + assert!( + store + .set_status("id_1", InflightActivationStatus::Failure) + .await + .is_ok() + ); + // The broker can update the status of an activation in a different partition, but + // it still should not be counted in its upkeep. + assert_counts( + StatusCount { + pending: 0, + failure: 1, + ..StatusCount::default() + }, + store.as_ref(), + ) + .await; + assert!( + store + .get_pending_activation(None, None) + .await + .unwrap() + .is_none() + ); + + let result = store + .set_status("not_there", InflightActivationStatus::Complete) + .await; + assert!(result.is_ok(), "no query error"); + + let activation = result.unwrap(); + assert!(activation.is_none(), "no activation found"); + + let result = store + .set_status("id_0", InflightActivationStatus::Complete) + .await; + assert!(result.is_ok(), "no query error"); + + let result_opt = result.unwrap(); + assert!(result_opt.is_some(), "activation should be returned"); + let inflight = result_opt.unwrap(); + assert_eq!(inflight.id, "id_0"); + assert_eq!(inflight.status, InflightActivationStatus::Complete); + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index fdfc493b..0214761b 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -7,10 +7,11 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use sentry_protos::taskbroker::v1::OnAttemptsExceeded; use sqlx::{ - Pool, Postgres, QueryBuilder, Row, + Pool, Postgres, QueryBuilder, pool::PoolConnection, - postgres::{PgConnectOptions, PgPool, PgPoolOptions, PgRow}, + postgres::{PgConnectOptions, PgPool, PgPoolOptions}, }; +use std::sync::RwLock; use std::{str::FromStr, time::Instant}; use tracing::instrument; @@ -70,6 +71,7 @@ pub struct PostgresActivationStore { read_pool: PgPool, write_pool: PgPool, config: PostgresActivationStoreConfig, + partitions: RwLock>, } impl PostgresActivationStore { @@ -118,8 +120,29 @@ impl PostgresActivationStore { read_pool, write_pool, config, + partitions: RwLock::new(vec![]), }) } + + /// Add the partition condition to the query builder in a thread-safe manner + fn add_partition_condition( + &self, + query_builder: &mut QueryBuilder, + first_condition: bool, + ) { + let partitions = self.partitions.read().unwrap(); + let condition = if first_condition { "WHERE" } else { "AND" }; + if !partitions.is_empty() { + query_builder.push(" "); + query_builder.push(condition); + query_builder.push(" partition IN ("); + let mut separated = query_builder.separated(", "); + for partition in partitions.iter() { + separated.push_bind(*partition); + } + query_builder.push(")"); + } + } } #[async_trait] @@ -187,6 +210,13 @@ impl InflightActivationStore for PostgresActivationStore { Ok(Some(row.into())) } + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + let mut write_guard = self.partitions.write().unwrap(); + write_guard.clear(); + write_guard.extend(partitions); + Ok(()) + } + #[instrument(skip_all)] async fn store(&self, batch: Vec) -> Result { if batch.is_empty() { @@ -275,6 +305,8 @@ impl InflightActivationStore for PostgresActivationStore { query_builder.push_bind(now); query_builder.push(")"); + self.add_partition_condition(&mut query_builder, false); + // Handle application & namespace filtering if let Some(value) = application { query_builder.push(" AND application ="); @@ -324,21 +356,25 @@ impl InflightActivationStore for PostgresActivationStore { /// Tasks with delay_until set, will have their age adjusted based on their /// delay time. No tasks = 0 lag async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { - let result = sqlx::query( + let mut query_builder = QueryBuilder::new( "SELECT received_at, delay_until FROM inflight_taskactivations - WHERE status = $1 - AND processing_attempts = 0 - ORDER BY received_at ASC - LIMIT 1 - ", - ) - .bind(InflightActivationStatus::Pending.to_string()) - .fetch_one(&self.read_pool) - .await; + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND processing_attempts = 0"); + + self.add_partition_condition(&mut query_builder, false); + + query_builder.push(" ORDER BY received_at ASC LIMIT 1"); + + let result = query_builder + .build_query_as::<(DateTime, Option>)>() + .fetch_one(&self.read_pool) + .await; if let Ok(row) = result { - let received_at: DateTime = row.get("received_at"); - let delay_until: Option> = row.get("delay_until"); + let received_at: DateTime = row.0; + let delay_until: Option> = row.1; let millis = now.signed_duration_since(received_at).num_milliseconds() - delay_until.map_or(0, |delay_time| { delay_time @@ -354,19 +390,27 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] async fn count_by_status(&self, status: InflightActivationStatus) -> Result { - let result = - sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1") - .bind(status.to_string()) - .fetch_one(&self.read_pool) - .await?; - Ok(result.get::("count") as usize) + let mut query_builder = QueryBuilder::new( + "SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(status.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder + .build_query_as::<(i64,)>() + .fetch_one(&self.read_pool) + .await?; + Ok(result.0 as usize) } async fn count(&self) -> Result { - let result = sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations") + let mut query_builder = + QueryBuilder::new("SELECT COUNT(*) as count FROM inflight_taskactivations"); + self.add_partition_condition(&mut query_builder, true); + let result = query_builder + .build_query_as::<(i64,)>() .fetch_one(&self.read_pool) .await?; - Ok(result.get::("count") as usize) + Ok(result.0 as usize) } /// Update the status of a specific activation @@ -421,9 +465,8 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] async fn get_retry_activations(&self) -> Result, Error> { - Ok(sqlx::query_as( - " - SELECT id, + let mut query_builder = QueryBuilder::new( + "SELECT id, activation, partition, kafka_offset AS offset, @@ -441,15 +484,18 @@ impl InflightActivationStore for PostgresActivationStore { taskname, on_attempts_exceeded FROM inflight_taskactivations - WHERE status = $1 - ", - ) - .bind(InflightActivationStatus::Retry.to_string()) - .fetch_all(&self.read_pool) - .await? - .into_iter() - .map(|row: TableRow| row.into()) - .collect()) + WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Retry.to_string()); + self.add_partition_condition(&mut query_builder, false); + + Ok(query_builder + .build_query_as::() + .fetch_all(&self.read_pool) + .await? + .into_iter() + .map(|row| row.into()) + .collect()) } // Used in tests @@ -472,16 +518,19 @@ impl InflightActivationStore for PostgresActivationStore { // At-most-once tasks that fail their processing deadlines go directly to failure // there are no retries, as the worker will reject the task due to at_most_once keys. - let most_once_result = sqlx::query( + let mut query_builder = QueryBuilder::new( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", - ) - .bind(InflightActivationStatus::Failure.to_string()) - .bind(now) - .bind(InflightActivationStatus::Processing.to_string()) - .execute(&mut *atomic) - .await; + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push("WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND at_most_once = TRUE AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + + self.add_partition_condition(&mut query_builder, false); + + let most_once_result = query_builder.build().execute(&mut *atomic).await; let mut processing_deadline_modified_rows = 0; if let Ok(query_res) = most_once_result { @@ -490,16 +539,19 @@ impl InflightActivationStore for PostgresActivationStore { // Update regular tasks. // Increment processing_attempts by 1 and reset processing_deadline to null. - let result = sqlx::query( + let mut query_builder = QueryBuilder::new( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", - ) - .bind(InflightActivationStatus::Pending.to_string()) - .bind(now) - .bind(InflightActivationStatus::Processing.to_string()) - .execute(&mut *atomic) - .await; + SET processing_deadline = null, status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(", processing_attempts = processing_attempts + 1"); + query_builder.push(" WHERE processing_deadline < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + self.add_partition_condition(&mut query_builder, false); + + let result = query_builder.build().execute(&mut *atomic).await; atomic.commit().await?; @@ -518,16 +570,17 @@ impl InflightActivationStore for PostgresActivationStore { let mut conn = self .acquire_write_conn_metric("handle_processing_attempts") .await?; - let processing_attempts_result = sqlx::query( + let mut query_builder = QueryBuilder::new( "UPDATE inflight_taskactivations - SET status = $1 - WHERE processing_attempts >= $2 AND status = $3", - ) - .bind(InflightActivationStatus::Failure.to_string()) - .bind(self.config.max_processing_attempts as i32) - .bind(InflightActivationStatus::Pending.to_string()) - .execute(&mut *conn) - .await; + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + query_builder.push(" WHERE processing_attempts >= "); + query_builder.push_bind(self.config.max_processing_attempts as i32); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + self.add_partition_condition(&mut query_builder, false); + let processing_attempts_result = query_builder.build().execute(&mut *conn).await; if let Ok(query_res) = processing_attempts_result { return Ok(query_res.rows_affected()); @@ -546,15 +599,15 @@ impl InflightActivationStore for PostgresActivationStore { async fn handle_expires_at(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_expires_at").await?; - let query = sqlx::query( - "DELETE FROM inflight_taskactivations WHERE status = $1 AND expires_at IS NOT NULL AND expires_at < $2", - ) - .bind(InflightActivationStatus::Pending.to_string()) - .bind(now) - .execute(&mut *conn) - .await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" AND expires_at IS NOT NULL AND expires_at < "); + query_builder.push_bind(now); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(result.rows_affected()) } /// Perform upkeep work for tasks that are past delay_until deadlines @@ -567,17 +620,18 @@ impl InflightActivationStore for PostgresActivationStore { async fn handle_delay_until(&self) -> Result { let now = Utc::now(); let mut conn = self.acquire_write_conn_metric("handle_delay_until").await?; - let update_result = sqlx::query( - r#"UPDATE inflight_taskactivations - SET status = $1 - WHERE delay_until IS NOT NULL AND delay_until < $2 AND status = $3 - "#, - ) - .bind(InflightActivationStatus::Pending.to_string()) - .bind(now) - .bind(InflightActivationStatus::Delay.to_string()) - .execute(&mut *conn) - .await?; + + let mut query_builder = QueryBuilder::new( + "UPDATE inflight_taskactivations + SET status = ", + ); + query_builder.push_bind(InflightActivationStatus::Pending.to_string()); + query_builder.push(" WHERE delay_until IS NOT NULL AND delay_until < "); + query_builder.push_bind(now); + query_builder.push(" AND status = "); + query_builder.push_bind(InflightActivationStatus::Delay.to_string()); + self.add_partition_condition(&mut query_builder, false); + let update_result = query_builder.build().execute(&mut *conn).await?; Ok(update_result.rows_affected()) } @@ -592,13 +646,15 @@ impl InflightActivationStore for PostgresActivationStore { async fn handle_failed_tasks(&self) -> Result { let mut atomic = self.write_pool.begin().await?; - let failed_tasks: Vec = - sqlx::query("SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Failure.to_string()) - .fetch_all(&mut *atomic) - .await? - .into_iter() - .collect(); + let mut query_builder = QueryBuilder::new( + "SELECT id, activation, on_attempts_exceeded FROM inflight_taskactivations WHERE status = ", + ); + query_builder.push_bind(InflightActivationStatus::Failure.to_string()); + self.add_partition_condition(&mut query_builder, false); + let failed_tasks = query_builder + .build_query_as::<(String, Vec, i32)>() + .fetch_all(&mut *atomic) + .await?; let mut forwarder = FailedTasksForwarder { to_discard: vec![], @@ -606,13 +662,11 @@ impl InflightActivationStore for PostgresActivationStore { }; for record in failed_tasks.iter() { - let activation_data: &[u8] = record.get("activation"); - let id: String = record.get("id"); + let activation_data: &[u8] = record.1.as_slice(); + let id: String = record.0.clone(); // We could be deadlettering because of activation.expires // when a task expires we still deadletter if configured. - let on_attempts_exceeded_val: i32 = record.get("on_attempts_exceeded"); - let on_attempts_exceeded: OnAttemptsExceeded = - on_attempts_exceeded_val.try_into().unwrap(); + let on_attempts_exceeded: OnAttemptsExceeded = record.2.try_into().unwrap(); if on_attempts_exceeded == OnAttemptsExceeded::Discard || on_attempts_exceeded == OnAttemptsExceeded::Unspecified { @@ -668,12 +722,13 @@ impl InflightActivationStore for PostgresActivationStore { #[instrument(skip_all)] async fn remove_completed(&self) -> Result { let mut conn = self.acquire_write_conn_metric("remove_completed").await?; - let query = sqlx::query("DELETE FROM inflight_taskactivations WHERE status = $1") - .bind(InflightActivationStatus::Complete.to_string()) - .execute(&mut *conn) - .await?; + let mut query_builder = + QueryBuilder::new("DELETE FROM inflight_taskactivations WHERE status = "); + query_builder.push_bind(InflightActivationStatus::Complete.to_string()); + self.add_partition_condition(&mut query_builder, false); + let result = query_builder.build().execute(&mut *conn).await?; - Ok(query.rows_affected()) + Ok(result.rows_affected()) } /// Remove killswitched tasks. @@ -686,6 +741,7 @@ impl InflightActivationStore for PostgresActivationStore { separated.push_bind(taskname); } separated.push_unseparated(")"); + self.add_partition_condition(&mut query_builder, false); let mut conn = self .acquire_write_conn_metric("remove_killswitched") .await?; diff --git a/src/test_utils.rs b/src/test_utils.rs index 392b48c1..d5bd3310 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -257,13 +257,17 @@ pub async fn create_test_store(adapter: &str) -> Arc, - "postgres" => Arc::new( - PostgresActivationStore::new(PostgresActivationStoreConfig::from_config( - &create_integration_config(), - )) - .await - .unwrap(), - ) as Arc, + "postgres" => { + let store = Arc::new( + PostgresActivationStore::new(PostgresActivationStoreConfig::from_config( + &create_integration_config(), + )) + .await + .unwrap(), + ) as Arc; + store.assign_partitions(vec![0]).unwrap(); + store + } _ => panic!("Invalid adapter: {}", adapter), } }