From 358edc150c2730c4a3286ebc753768c871c6c5be Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 2 Apr 2026 14:52:07 -0700 Subject: [PATCH 1/4] Add Sent Flag to Prevent Dropping Tasks on Push Failure --- benches/store_bench.rs | 11 +- migrations/0006_add_sent.sql | 1 + .../0001_create_inflight_activations.sql | 8 +- pg_migrations/0002_add_bucket.sql | 3 - src/fetch/mod.rs | 2 +- src/fetch/tests.rs | 18 +- src/grpc/server.rs | 44 +---- src/grpc/server_tests.rs | 26 +-- src/kafka/deserialize_activation.rs | 1 + src/main.rs | 2 +- src/push/mod.rs | 34 +++- src/push/tests.rs | 8 +- src/store/inflight_activation.rs | 125 +++++++++--- src/store/inflight_activation_tests.rs | 186 ++++++++++-------- src/store/postgres_activation_store.rs | 64 ++++-- src/upkeep.rs | 8 +- 16 files changed, 323 insertions(+), 218 deletions(-) create mode 100644 migrations/0006_add_sent.sql delete mode 100644 pg_migrations/0002_add_bucket.sql diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 2b29f38a..f2ca2f03 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -58,16 +58,11 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { join_set.spawn(async move { let mut num_activations_processed = 0; - while !store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&ns)), - Some(1), - None, - ) + while store + .claim_activation_for_pull(Some("sentry"), Some(&ns)) .await .unwrap() - .is_empty() + .is_none() { num_activations_processed += 1; } diff --git a/migrations/0006_add_sent.sql b/migrations/0006_add_sent.sql new file mode 100644 index 00000000..f0acabc0 --- /dev/null +++ b/migrations/0006_add_sent.sql @@ -0,0 +1 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN sent INTEGER NOT NULL DEFAULT 0; diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index ee8b26a4..7b31baa1 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -16,5 +16,11 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( application TEXT NOT NULL, namespace TEXT NOT NULL, taskname TEXT NOT NULL, - on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, + bucket SMALLINT NOT NULL DEFAULT 0, + sent BOOLEAN NOT NULL DEFAULT FALSE ); + +-- Supports pending claim queries (status, filters, ordering) including sent +CREATE INDEX IF NOT EXISTS idx_inflight_taskactivations_claim +ON inflight_taskactivations (status, bucket, sent); diff --git a/pg_migrations/0002_add_bucket.sql b/pg_migrations/0002_add_bucket.sql deleted file mode 100644 index 6283c667..00000000 --- a/pg_migrations/0002_add_bucket.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0; - -CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index e104fefc..d53a5519 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -117,7 +117,7 @@ impl FetchPool { let namespaces = config.namespaces.as_deref(); match store - .get_pending_activations(application, namespaces, limit, bucket) + .claim_activations_for_push(application, namespaces, limit, bucket) .await { Ok(activations) if activations.is_empty() => { diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index aa101fcb..86cdb25f 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -70,31 +70,29 @@ impl InflightActivationStore for MockStore { unimplemented!() } - async fn get_pending_activations( + async fn claim_activations( &self, _application: Option<&str>, _namespaces: Option<&[String]>, _limit: Option, _bucket: Option, + mark_sent: bool, ) -> Result, Error> { if self.fail { return Err(anyhow!("mock store error")); } Ok(match self.pending.lock().await.take() { - Some(a) => vec![a], + Some(mut a) => { + a.sent = mark_sent; + vec![a] + } None => vec![], }) } - async fn get_pending_activations_from_namespaces( - &self, - _application: Option<&str>, - _namespaces: Option<&[String]>, - _limit: Option, - _bucket: Option, - ) -> Result, Error> { - unimplemented!() + async fn mark_activation_sent(&self, _id: &str) -> Result<(), Error> { + Ok(()) } async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 5d6fb3e4..bd2db13d 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -35,31 +35,16 @@ impl ConsumerService for TaskbrokerServer { let application = &request.get_ref().application; let namespace = &request.get_ref().namespace; - let namespaces = namespace.as_ref().map(std::slice::from_ref); let inflight = self .store - .get_pending_activations(application.as_deref(), namespaces, Some(1), None) + .claim_activation_for_pull(application.as_deref(), namespace.as_deref()) .await; match inflight { - Ok(activations) if activations.is_empty() => { - Err(Status::not_found("No pending activation")) - } - - Ok(activations) if activations.len() > 1 => { - error!( - count = activations.len(), - application = ?application, - namespace = ?namespace, - "get_pending_activations returned more than one row despite limit of 1", - ); - - Err(Status::internal("Unable to retrieve pending activation")) - } + Ok(None) => Err(Status::not_found("No pending activation")), - Ok(activations) => { - let inflight = &activations[0]; + Ok(Some(inflight)) => { let now = Utc::now(); if inflight.processing_attempts < 1 { @@ -141,10 +126,10 @@ impl ConsumerService for TaskbrokerServer { }; let start_time = Instant::now(); - let namespaces = namespace.as_ref().map(std::slice::from_ref); + let res = match self .store - .get_pending_activations(application.as_deref(), namespaces, Some(1), None) + .claim_activation_for_pull(application.as_deref(), namespace.as_deref()) .await { Err(e) => { @@ -152,24 +137,9 @@ impl ConsumerService for TaskbrokerServer { Err(Status::internal("Unable to fetch next task")) } - Ok(activations) if activations.is_empty() => { - Err(Status::not_found("No pending activation")) - } - - Ok(activations) if activations.len() > 1 => { - error!( - count = activations.len(), - application = ?application, - namespace = ?namespace, - "get_pending_activations returned more than one row despite limit of 1", - ); - - Err(Status::internal("Unable to fetch next task")) - } - - Ok(activations) => { - let inflight = &activations[0]; + Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })), + Ok(Some(inflight)) => { if inflight.processing_attempts < 1 { let now = Utc::now(); let received_to_gettask_latency = inflight.received_latency(now); diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index f0834a95..4dd6be36 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::config::{Config, DeliveryMode}; use crate::grpc::server::TaskbrokerServer; +use crate::store::inflight_activation::InflightActivationStatus; use prost::Message; use rstest::rstest; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; @@ -111,7 +112,10 @@ async fn test_get_task_success(#[case] adapter: &str) { let activations = make_activations(1); store.store(activations).await.unwrap(); - let service = TaskbrokerServer { store, config }; + let service = TaskbrokerServer { + store: store.clone(), + config, + }; let request = GetTaskRequest { namespace: None, application: None, @@ -122,6 +126,10 @@ async fn test_get_task_success(#[case] adapter: &str) { assert!(resp.get_ref().task.is_some()); let task = resp.get_ref().task.as_ref().unwrap(); assert!(task.id == "id_0"); + + let row = store.get_by_id("id_0").await.unwrap().expect("claimed row"); + assert!(row.sent); + assert_eq!(row.status, InflightActivationStatus::Processing); } #[tokio::test] @@ -291,9 +299,8 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { }), }; let response = service.set_task_status(Request::new(request)).await; - assert!(response.is_err()); - let e = response.unwrap_err(); - assert_eq!(e.code(), Code::NotFound); + assert!(response.is_ok()); + assert!(response.unwrap().get_ref().task.is_none()); } #[tokio::test] @@ -320,12 +327,9 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte }), }; let response = service.set_task_status(Request::new(request)).await; - assert!(response.is_err()); - - let resp = response.unwrap_err(); - assert_eq!( - resp.code(), - Code::NotFound, - "No task found as namespace without filter is invalid." + assert!(response.is_ok()); + assert!( + response.unwrap().get_ref().task.is_none(), + "namespace without application yields no next task in response" ); } diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 28485606..1949f573 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -108,6 +108,7 @@ pub fn new( taskname, on_attempts_exceeded, bucket, + sent: false, }) } } diff --git a/src/main.rs b/src/main.rs index db0ebd84..53e3160f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -241,7 +241,7 @@ async fn main() -> Result<(), Error> { }); // Initialize push and fetch pools - let push_pool = Arc::new(PushPool::new(config.clone())); + let push_pool = Arc::new(PushPool::new(config.clone(), store.clone())); let fetch_pool = FetchPool::new(store.clone(), config.clone(), push_pool.clone()); // Initialize push threads diff --git a/src/push/mod.rs b/src/push/mod.rs index 55bae746..45ba3a51 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -16,7 +16,7 @@ use tonic::transport::Channel; use tracing::{debug, error, info}; use crate::config::Config; -use crate::store::inflight_activation::InflightActivation; +use crate::store::inflight_activation::{InflightActivation, InflightActivationStore}; type HmacSha256 = Hmac; @@ -88,26 +88,32 @@ pub struct PushPool { /// Taskbroker configuration. config: Arc, + + /// Activation store, which we need for marking tasks as sent. + store: Arc, } impl PushPool { /// Initialize a new push pool. - pub fn new(config: Arc) -> Self { + pub fn new(config: Arc, store: Arc) -> Self { let (sender, receiver) = flume::bounded(config.push_queue_size); Self { sender, receiver, config, + store, } } /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. pub async fn start(&self) -> Result<()> { + let store = self.store.clone(); let mut push_pool: JoinSet> = crate::tokio::spawn_pool(self.config.push_threads, |_| { let endpoint = self.config.worker_endpoint.clone(); let receiver = self.receiver.clone(); + let store = store.clone(); let guard = get_shutdown_guard().shutdown_on_drop(); @@ -158,7 +164,17 @@ impl PushPool { ) .await { - Ok(_) => debug!(task_id = %id, "Activation sent to worker"), + Ok(_) => { + debug!(task_id = %id, "Activation sent to worker"); + + if let Err(e) = store.mark_activation_sent(&id).await { + error!( + task_id = %id, + error = ?e, + "Failed to mark activation as sent after push" + ); + } + } // Once processing deadline expires, status will be set back to pending Err(e) => error!( @@ -185,7 +201,17 @@ impl PushPool { ) .await { - Ok(_) => debug!(task_id = %id, "Activation sent to worker"), + Ok(_) => { + debug!(task_id = %id, "Activation sent to worker"); + + if let Err(e) = store.mark_activation_sent(&id).await { + error!( + task_id = %id, + error = ?e, + "Failed to mark activation as sent after push" + ); + } + } // Once processing deadline expires, status will be set back to pending Err(e) => error!( diff --git a/src/push/tests.rs b/src/push/tests.rs index 5b79b95b..6ea99c9d 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -7,7 +7,7 @@ use tonic::async_trait; use super::*; use crate::config::Config; -use crate::test_utils::make_activations; +use crate::test_utils::{create_test_store, make_activations}; /// Fake worker client for unit testing. struct MockWorkerClient { @@ -117,7 +117,8 @@ async fn push_pool_submit_enqueues_item() { ..Config::default() }); - let pool = PushPool::new(config); + let store = create_test_store("sqlite").await; + let pool = PushPool::new(config, store); let activation = make_activations(1).remove(0); let result = pool.submit(activation).await; @@ -131,7 +132,8 @@ async fn push_pool_submit_backpressures_when_queue_full() { ..Config::default() }); - let pool = PushPool::new(config); + let store = create_test_store("sqlite").await; + let pool = PushPool::new(config, store); let first = make_activations(1).remove(0); let second = make_activations(1).remove(0); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index d4c8b4a0..12f5a2da 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -180,6 +180,10 @@ pub struct InflightActivation { /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. #[builder(setter(skip), default = "0")] pub bucket: i16, + + /// True after successful push. + #[builder(default = false)] + pub sent: bool, } impl InflightActivation { @@ -243,6 +247,7 @@ pub struct TableRow { #[sqlx(try_from = "i32")] pub on_attempts_exceeded: OnAttemptsExceeded, pub bucket: i16, + pub sent: bool, } impl TryFrom for TableRow { @@ -268,6 +273,7 @@ impl TryFrom for TableRow { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, + sent: value.sent, }) } } @@ -293,6 +299,7 @@ impl From for InflightActivation { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, + sent: value.sent, } } } @@ -376,13 +383,24 @@ pub trait InflightActivationStore: Send + Sync { /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If no limit is provided, all matching activations will be returned. - async fn get_pending_activations( + async fn claim_activations( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + sent: bool, + ) -> Result, Error>; + + /// Claims `limit` activations within the `bucket` range. Column `sent` remains false until `mark_activation_sent` is called. + async fn claim_activations_for_push( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, ) -> Result, Error> { + // If a namespace filter is used, an application must also be used if namespaces.is_some() && application.is_none() { warn!( ?namespaces, @@ -392,21 +410,41 @@ pub trait InflightActivationStore: Send + Sync { return Ok(vec![]); } - let results = self - .get_pending_activations_from_namespaces(application, namespaces, limit, bucket) - .await?; - - Ok(results) + self.claim_activations(application, namespaces, limit, bucket, false) + .await } - /// Claim pending activations (moves them to processing), optionally filtered by application and namespaces. - async fn get_pending_activations_from_namespaces( + /// Claims `limit` activations with application `application` and namespace `namespace`. + async fn claim_activation_for_pull( &self, application: Option<&str>, - namespaces: Option<&[String]>, - limit: Option, - bucket: Option, - ) -> Result, Error>; + namespace: Option<&str>, + ) -> Result, Error> { + // Convert single namespace to vector for internal use + let namespaces = namespace.map(|ns| vec![ns.to_string()]); + + // If a namespace filter is used, an application must also be used + if namespaces.is_some() && application.is_none() { + warn!( + ?namespaces, + "Received request for namespaced task without application" + ); + + return Ok(None); + } + + let mut rows = self + .claim_activations(application, namespaces.as_deref(), Some(1), None, true) + .await?; + + // If we are getting more than one task here, something is broken + assert!(rows.len() <= 1); + + Ok(rows.pop()) + } + + /// Record successful push. + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error>; /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; @@ -738,7 +776,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE id = $1 ", @@ -780,7 +819,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent ) ", ); @@ -814,6 +854,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); + b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -842,16 +883,14 @@ impl InflightActivationStore for SqliteActivationStore { meta_result } - /// Claim pending activations from specified namespaces (moves them to processing). - /// If namespaces is `None`, gets from any namespace. - /// If namespaces is `Some(...)`, restricts to those namespaces. #[instrument(skip_all)] - async fn get_pending_activations_from_namespaces( + async fn claim_activations( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, + sent: bool, ) -> Result, Error> { let now = Utc::now(); @@ -865,6 +904,8 @@ impl InflightActivationStore for SqliteActivationStore { status = " )); query_builder.push_bind(InflightActivationStatus::Processing); + query_builder.push(", sent = "); + query_builder.push_bind(sent); query_builder.push( " WHERE id IN ( @@ -905,9 +946,7 @@ impl InflightActivationStore for SqliteActivationStore { } query_builder.push(") RETURNING *"); - let mut conn = self - .acquire_write_conn_metric("get_pending_activation") - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; let rows: Vec = query_builder .build_query_as::() .fetch_all(&mut *conn) @@ -916,6 +955,19 @@ impl InflightActivationStore for SqliteActivationStore { Ok(rows.into_iter().map(|row| row.into()).collect()) } + #[instrument(skip_all)] + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + let mut conn = self + .acquire_write_conn_metric("mark_activation_sent") + .await?; + sqlx::query("UPDATE inflight_taskactivations SET sent = 1 WHERE id = $1 AND status = $2") + .bind(id) + .bind(InflightActivationStatus::Processing) + .execute(&mut *conn) + .await?; + Ok(()) + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. @@ -1039,7 +1091,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE status = $1 ", @@ -1060,16 +1113,13 @@ impl InflightActivationStore for SqliteActivationStore { Ok(()) } - /// Update tasks that are in processing and have exceeded their processing deadline - /// Exceeding a processing deadline does not consume a retry as we don't know - /// if a worker took the task and was killed, or failed. + /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; // Idempotent tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to idempotency keys. let most_once_result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1 @@ -1086,12 +1136,27 @@ impl InflightActivationStore for SqliteActivationStore { processing_deadline_modified_rows = query_res.rows_affected(); } - // Update non-idempotent tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. + // Revert non-AMO activations that weren't delivered back to pending without consuming an attempt + let unsent = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, sent = 0 + WHERE processing_deadline < $2 AND sent = 0 AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; + + if let Ok(query_res) = unsent { + processing_deadline_modified_rows += query_res.rows_affected(); + } + + // Revert activations that were delivered back to 'pending' AND consume an attempt let result = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = 0 + WHERE processing_deadline < $2 AND sent = 1 AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 363e6cef..0b46a96b 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -249,12 +249,11 @@ async fn test_get_pending_activation(#[case] adapter: &str) { let batch = make_activations(2); assert!(store.store(batch.clone()).await.is_ok()); - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_0"); assert_eq!(result.status, InflightActivationStatus::Processing); @@ -288,7 +287,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); let mut first = store - .get_pending_activations(None, None, Some(1), Some((15, 25))) + .claim_activations_for_push(None, None, Some(1), Some((15, 25))) .await .unwrap(); assert_eq!(first.len(), 1); @@ -297,7 +296,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert_eq!(first.bucket, 20); let mut second = store - .get_pending_activations(None, None, Some(1), Some((0, 15))) + .claim_activations_for_push(None, None, Some(1), Some((0, 15))) .await .unwrap(); assert_eq!(second.len(), 1); @@ -307,7 +306,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert!( store - .get_pending_activations(None, None, Some(1), Some((15, 25))) + .claim_activations_for_push(None, None, Some(1), Some((15, 25))) .await .unwrap() .is_empty() @@ -343,17 +342,12 @@ async fn test_get_pending_activation_with_race(#[case] adapter: &str) { join_set.spawn(async move { rx.recv().await.unwrap(); { - let mut v = store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&ns)), - Some(1), - None, - ) + let v = store + .claim_activation_for_pull(Some("sentry"), Some(ns.as_str())) .await .unwrap(); - assert_eq!(v.len(), 1); - v.pop().unwrap() + assert!(v.is_some()); + v.unwrap() } }); } @@ -384,17 +378,11 @@ async fn test_get_pending_activation_with_namespace(#[case] adapter: &str) { let other_namespace = "other_namespace".to_string(); // Get activation from other namespace - let mut got = store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&other_namespace)), - Some(1), - None, - ) + let result = store + .claim_activation_for_pull(Some("sentry"), Some(other_namespace.as_str())) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); @@ -416,10 +404,11 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & batch[3].namespace = "ns4".into(); assert!(store.store(batch.clone()).await.is_ok()); - // Get activation from multiple namespaces (should get oldest) + // Get activation from multiple namespaces (should get oldest). + // Use `claim_activations` so upkeep-style `None` application + namespaces is allowed (not `claim_activations_for_push`). let namespaces = vec!["ns2".to_string(), "ns3".to_string()]; let result = store - .get_pending_activations_from_namespaces(None, Some(&namespaces), None, None) + .claim_activations(None, Some(&namespaces), None, None, false) .await .unwrap(); @@ -444,24 +433,18 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] batch[1].namespace = "other_namespace".into(); assert!(store.store(batch.clone()).await.is_ok()); - // This is an invalid query as we don't want to allow clients - // to fetch tasks from any application. + // This is an invalid query as we don't want to allow clients to fetch tasks from any application let other_namespace = "other_namespace".to_string(); let got = store - .get_pending_activations( - None, - Some(std::slice::from_ref(&other_namespace)), - Some(1), - None, - ) + .claim_activation_for_pull(None, Some(other_namespace.as_str())) .await .unwrap(); - assert!(got.is_empty()); + assert!(got.is_none()); // We allow no application in this method because of usage in upkeep let namespaces = vec!["other_namespace".to_string()]; let activations = store - .get_pending_activations_from_namespaces(None, Some(&namespaces), Some(2), None) + .claim_activations(None, Some(&namespaces), Some(2), None, false) .await .unwrap(); assert_eq!( @@ -492,12 +475,9 @@ async fn test_get_pending_activation_skip_expires(#[case] adapter: &str) { batch[0].expires_at = Some(Utc::now() - Duration::from_secs(100)); assert!(store.store(batch.clone()).await.is_ok()); - let result = store - .get_pending_activations(None, None, Some(1), None) - .await; + let result = store.claim_activation_for_pull(None, None).await; assert!(result.is_ok()); - let res_vec = result.unwrap(); - assert!(res_vec.is_empty()); + assert!(result.unwrap().is_none()); assert_counts( StatusCount { @@ -523,12 +503,11 @@ async fn test_get_pending_activation_earliest(#[case] adapter: &str) { let ret = store.store(batch.clone()).await; assert!(ret.is_ok(), "{}", ret.err().unwrap().to_string()); - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!( result.added_at, Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap() @@ -549,12 +528,11 @@ async fn test_get_pending_activation_fetches_application(#[case] adapter: &str) // Getting an activation with no application filter should // include activations with application set. - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_0"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); @@ -574,31 +552,27 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { assert!(store.store(batch.clone()).await.is_ok()); // Get activation from a named application - let mut got = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + let result = store + .claim_activation_for_pull(Some("hammers"), None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); assert_eq!(result.application, "hammers"); let result_opt = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + .claim_activation_for_pull(Some("hammers"), None) .await .unwrap(); assert!( - result_opt.is_empty(), + result_opt.is_none(), "no pending activations in hammers left" ); - let remaining = store - .get_pending_activations(None, None, Some(1), None) - .await - .unwrap(); - assert_eq!(remaining.len(), 1, "one pending activation in '' left"); + let remaining = store.claim_activation_for_pull(None, None).await.unwrap(); + assert!(remaining.is_some(), "one pending activation in '' left"); store.remove_db().await.unwrap(); } @@ -621,29 +595,22 @@ async fn test_get_pending_activation_with_application_and_namespace(#[case] adap let target_ns = "target".to_string(); // Get activation from a named application - let mut got = store - .get_pending_activations( - Some("hammers"), - Some(std::slice::from_ref(&target_ns)), - Some(1), - None, - ) + let result = store + .claim_activation_for_pull(Some("hammers"), Some(target_ns.as_str())) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); assert_eq!(result.application, "hammers"); assert_eq!(result.namespace, "target"); - let mut got = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + let result = store + .claim_activation_for_pull(Some("hammers"), None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_2"); assert_eq!(result.application, "hammers"); assert_eq!(result.namespace, "not-target"); @@ -662,7 +629,7 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, None, None) + .claim_activations_for_push(None, None, None, None) .await .unwrap(); assert_eq!(got.len(), N); @@ -696,7 +663,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, Some(X), None) + .claim_activations_for_push(None, None, Some(X), None) .await .unwrap(); assert_eq!(got.len(), X as usize); @@ -733,7 +700,7 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str) assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, Some(X), None) + .claim_activations_for_push(None, None, Some(X), None) .await .unwrap(); assert_eq!(got.len(), Y); @@ -849,10 +816,10 @@ async fn test_set_activation_status(#[case] adapter: &str) { .await; assert!( store - .get_pending_activations(None, None, Some(1), None) + .claim_activation_for_pull(None, None) .await .unwrap() - .is_empty() + .is_none() ); let result = store @@ -983,6 +950,7 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); @@ -1027,6 +995,7 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); batch[1].status = InflightActivationStatus::Processing; batch[1].processing_deadline = Some(Utc::now() + chrono::Duration::days(30)); @@ -1065,9 +1034,11 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { // Both records are past processing deadlines let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; replace_retry_state( &mut batch[1], Some(RetryState { @@ -1118,6 +1089,7 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1164,6 +1136,7 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1210,6 +1183,7 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1247,6 +1221,46 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & store.remove_db().await.unwrap(); } +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + let mut batch = make_activations(1); + batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = false; + batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); + assert!(store.store(batch.clone()).await.is_ok()); + let count = store.handle_processing_deadline().await.unwrap(); + assert_eq!(count, 1); + let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); + assert_eq!(task.status, InflightActivationStatus::Pending); + assert_eq!(task.processing_attempts, 0); + assert!(!task.sent); + store.remove_db().await.unwrap(); +} + +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + let mut batch = make_activations(1); + batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = false; + batch[0].at_most_once = true; + batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); + assert!(store.store(batch.clone()).await.is_ok()); + let count = store.handle_processing_deadline().await.unwrap(); + assert_eq!(count, 1); + let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); + assert_eq!(task.status, InflightActivationStatus::Failure); + assert!(!task.sent); + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index 5b7e0077..e10e67c1 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -176,7 +176,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE id = $1 ", @@ -218,7 +219,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent ) ", ); @@ -251,6 +253,7 @@ impl InflightActivationStore for PostgresActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); + b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -258,16 +261,14 @@ impl InflightActivationStore for PostgresActivationStore { Ok(query.execute(&mut *conn).await?.into()) } - /// Claim pending activations from specified namespaces (moves them to processing). - /// If namespaces is `None`, gets from any namespace. - /// If namespaces is `Some(...)`, restricts to those namespaces. #[instrument(skip_all)] - async fn get_pending_activations_from_namespaces( + async fn claim_activations( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, + sent: bool, ) -> Result, Error> { let now = Utc::now(); @@ -320,13 +321,13 @@ impl InflightActivationStore for PostgresActivationStore { status = " )); query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + query_builder.push(", sent = "); + query_builder.push_bind(sent); query_builder.push(" FROM selected_activations "); query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); query_builder.push(" RETURNING *, kafka_offset AS offset"); - let mut conn = self - .acquire_write_conn_metric("get_pending_activation") - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; let rows: Vec = query_builder .build_query_as::() .fetch_all(&mut *conn) @@ -335,6 +336,21 @@ impl InflightActivationStore for PostgresActivationStore { Ok(rows.into_iter().map(|row| row.into()).collect()) } + #[instrument(skip_all)] + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + let mut conn = self + .acquire_write_conn_metric("mark_activation_sent") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET sent = TRUE WHERE id = $1 AND status = $2", + ) + .bind(id) + .bind(InflightActivationStatus::Processing.to_string()) + .execute(&mut *conn) + .await?; + Ok(()) + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. @@ -479,7 +495,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE status = $1 ", @@ -502,16 +519,12 @@ impl InflightActivationStore for PostgresActivationStore { Ok(()) } - /// Update tasks that are in processing and have exceeded their processing deadline - /// Exceeding a processing deadline does not consume a retry as we don't know - /// if a worker took the task and was killed, or failed. + /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; - // At-most-once tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to at_most_once keys. let most_once_result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1 @@ -528,12 +541,25 @@ impl InflightActivationStore for PostgresActivationStore { processing_deadline_modified_rows = query_res.rows_affected(); } - // Update regular tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. + let unsent = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, sent = FALSE + WHERE processing_deadline < $2 AND sent = FALSE AND status = $3", + ) + .bind(InflightActivationStatus::Pending.to_string()) + .bind(now) + .bind(InflightActivationStatus::Processing.to_string()) + .execute(&mut *atomic) + .await; + + if let Ok(query_res) = unsent { + processing_deadline_modified_rows += query_res.rows_affected(); + } + let result = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = FALSE + WHERE processing_deadline < $2 AND sent = TRUE AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) diff --git a/src/upkeep.rs b/src/upkeep.rs index 82f0da52..cdd19b29 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -299,7 +299,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None, None) + .claim_activations(None, Some(&demoted_namespaces), None, None, true) .await { // Produce tasks to Kafka with updated namespace @@ -1163,14 +1163,14 @@ mod tests { 1 ); let pending = store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap(); assert_eq!(pending.len(), 1); assert_eq!(pending[0].id, "id_0"); assert!( store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap() .is_empty() @@ -1195,7 +1195,7 @@ mod tests { 1 ); let pending = store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap(); assert_eq!(pending.len(), 1); From 7084a24646b0fa0eb20604a715d531bebdfa9c81 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:08:33 -0700 Subject: [PATCH 2/4] Add Metrics for Processing Deadline Resets, Fix AI Reviewer Bugs --- benches/store_bench.rs | 2 +- src/fetch/tests.rs | 4 +- src/store/inflight_activation.rs | 82 +++++++++++++++++--------- src/store/inflight_activation_tests.rs | 77 ++++++++++++++++++++---- src/store/postgres_activation_store.rs | 63 +++++++++++--------- src/upkeep.rs | 50 ++++++++++++---- 6 files changed, 197 insertions(+), 81 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index f2ca2f03..2463776c 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -62,7 +62,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { .claim_activation_for_pull(Some("sentry"), Some(&ns)) .await .unwrap() - .is_none() + .is_some() { num_activations_processed += 1; } diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 86cdb25f..2fb34f4c 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -9,11 +9,11 @@ use tonic::async_trait; use super::*; use crate::config::Config; use crate::push::PushError; -use crate::store::inflight_activation::InflightActivationStore; use crate::store::inflight_activation::{BucketRange, InflightActivation}; use crate::store::inflight_activation::{ FailedTasksForwarder, InflightActivationStatus, QueryResult, }; +use crate::store::inflight_activation::{InflightActivationStore, ProcessingDeadlineCounts}; use crate::test_utils::make_activations; /// Store stub that returns one activation once OR is always empty OR always fails. @@ -135,7 +135,7 @@ impl InflightActivationStore for MockStore { unimplemented!() } - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { unimplemented!() } diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 12f5a2da..7600c3ad 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -186,6 +186,26 @@ pub struct InflightActivation { pub sent: bool, } +/// Counts how many tasks were changed from 'processing' to 'pending' by `handle_processing_deadline`. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct ProcessingDeadlineCounts { + /// The number of AMO tasks reverted. + pub at_most_once: u64, + + /// The number of regular, unsent tasks reverted. + pub non_amo_unsent: u64, + + /// The number of regular, sent tasks reverted. + pub non_amo_sent: u64, +} + +impl ProcessingDeadlineCounts { + /// Count the total number of tasks that reached their processing deadline and were reverted to pending. + pub fn total(&self) -> u64 { + self.at_most_once + self.non_amo_unsent + self.non_amo_sent + } +} + impl InflightActivation { /// The number of milliseconds between an activation's received timestamp /// and the provided datetime @@ -501,7 +521,7 @@ pub trait InflightActivationStore: Send + Sync { async fn clear(&self) -> Result<(), Error>; /// Update tasks that exceeded their processing deadline - async fn handle_processing_deadline(&self) -> Result; + async fn handle_processing_deadline(&self) -> Result; /// Update tasks that exceeded max processing attempts async fn handle_processing_attempts(&self) -> Result; @@ -1115,63 +1135,67 @@ impl InflightActivationStore for SqliteActivationStore { /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; // Idempotent tasks that fail their processing deadlines go directly to failure - let most_once_result = sqlx::query( + let amo = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + SET processing_deadline = null, + status = $1 + WHERE processing_deadline < $2 + AND at_most_once = TRUE + AND status = $3", ) .bind(InflightActivationStatus::Failure) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + .await?; - // Revert non-AMO activations that weren't delivered back to pending without consuming an attempt + // Revert activations that weren't delivered back to 'pending' without consuming an attempt let unsent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, sent = 0 - WHERE processing_deadline < $2 AND sent = 0 AND status = $3", + SET processing_deadline = null, + status = $1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = FALSE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; - - if let Ok(query_res) = unsent { - processing_deadline_modified_rows += query_res.rows_affected(); - } + .await?; // Revert activations that were delivered back to 'pending' AND consume an attempt - let result = sqlx::query( + let sent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = 0 - WHERE processing_deadline < $2 AND sent = 1 AND status = $3", + SET processing_deadline = null, + status = $1, + processing_attempts = processing_attempts + 1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = TRUE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; + .await?; atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } - - Err(anyhow!("Could not update tasks past processing_deadline")) + Ok(ProcessingDeadlineCounts { + at_most_once: amo.rows_affected(), + non_amo_unsent: unsent.rows_affected(), + non_amo_sent: sent.rows_affected(), + }) } /// Update tasks that have exceeded their max processing attempts. diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 0b46a96b..02248ff9 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -10,7 +10,8 @@ use crate::{ config::Config, store::inflight_activation::{ InflightActivationBuilder, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, QueryResult, SqliteActivationStore, create_sqlite_pool, + InflightActivationStoreConfig, ProcessingDeadlineCounts, QueryResult, + SqliteActivationStore, create_sqlite_pool, }, store::postgres_activation_store::build_pg_connect_options, test_utils::{ @@ -966,7 +967,14 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -982,7 +990,7 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { // Run again to check early return let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 0); + assert_eq!(count.unwrap(), ProcessingDeadlineCounts::default()); store.remove_db().await.unwrap(); } @@ -1011,7 +1019,14 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 1, @@ -1064,7 +1079,14 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 2); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 1, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 1, @@ -1115,7 +1137,14 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -1162,7 +1191,14 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -1208,7 +1244,14 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { processing: 0, @@ -1233,7 +1276,14 @@ async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] ada batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); - assert_eq!(count, 1); + assert_eq!( + count, + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 1, + non_amo_sent: 0, + } + ); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Pending); assert_eq!(task.processing_attempts, 0); @@ -1254,7 +1304,14 @@ async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] ada batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); - assert_eq!(count, 1); + assert_eq!( + count, + ProcessingDeadlineCounts { + at_most_once: 1, + non_amo_unsent: 0, + non_amo_sent: 0, + } + ); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Failure); assert!(!task.sent); diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index e10e67c1..c8625f16 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -1,6 +1,6 @@ use crate::store::inflight_activation::{ BucketRange, DepthCounts, FailedTasksForwarder, InflightActivation, InflightActivationStatus, - InflightActivationStore, QueryResult, TableRow, + InflightActivationStore, ProcessingDeadlineCounts, QueryResult, TableRow, }; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -521,60 +521,67 @@ impl InflightActivationStore for PostgresActivationStore { /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; - let most_once_result = sqlx::query( + // Idempotent tasks that fail their processing deadlines go directly to failure + let amo = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + SET processing_deadline = null, + status = $1 + WHERE processing_deadline < $2 + AND at_most_once = TRUE + AND status = $3", ) .bind(InflightActivationStatus::Failure.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + .await?; + // Revert activations that weren't delivered back to 'pending' without consuming an attempt let unsent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, sent = FALSE - WHERE processing_deadline < $2 AND sent = FALSE AND status = $3", + SET processing_deadline = null, + status = $1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = FALSE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; - - if let Ok(query_res) = unsent { - processing_deadline_modified_rows += query_res.rows_affected(); - } + .await?; - let result = sqlx::query( + // Revert activations that were delivered back to 'pending' AND consume an attempt + let sent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = FALSE - WHERE processing_deadline < $2 AND sent = TRUE AND status = $3", + SET processing_deadline = null, + status = $1, + processing_attempts = processing_attempts + 1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = TRUE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; + .await?; atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } - - Err(anyhow!("Could not update tasks past processing_deadline")) + Ok(ProcessingDeadlineCounts { + at_most_once: amo.rows_affected(), + non_amo_unsent: unsent.rows_affected(), + non_amo_sent: sent.rows_affected(), + }) } /// Update tasks that have exceeded their max processing attempts. diff --git a/src/upkeep.rs b/src/upkeep.rs index cdd19b29..abc87515 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -18,7 +18,7 @@ use uuid::Uuid; use crate::SERVICE_NAME; use crate::config::Config; use crate::runtime_config::RuntimeConfigManager; -use crate::store::inflight_activation::InflightActivationStore; +use crate::store::inflight_activation::{InflightActivationStore, ProcessingDeadlineCounts}; /// The upkeep task that periodically performs upkeep /// on the inflight store @@ -67,7 +67,7 @@ pub async fn upkeep( #[derive(Debug)] pub struct UpkeepResults { retried: u64, - processing_deadline_reset: u64, + processing_deadline_reset: ProcessingDeadlineCounts, processing_attempts_exceeded: u64, delay_elapsed: u64, expired: u64, @@ -85,7 +85,7 @@ pub struct UpkeepResults { impl UpkeepResults { fn empty(&self) -> bool { self.retried == 0 - && self.processing_deadline_reset == 0 + && self.processing_deadline_reset.total() == 0 && self.processing_attempts_exceeded == 0 && self.expired == 0 && self.completed == 0 @@ -115,7 +115,7 @@ pub async fn do_upkeep( let upkeep_start = Instant::now(); let mut result_context = UpkeepResults { retried: 0, - processing_deadline_reset: 0, + processing_deadline_reset: ProcessingDeadlineCounts::default(), processing_attempts_exceeded: 0, delay_elapsed: 0, expired: 0, @@ -299,7 +299,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .claim_activations(None, Some(&demoted_namespaces), None, None, true) + .claim_activations(None, Some(&demoted_namespaces), None, None, false) .await { // Produce tasks to Kafka with updated namespace @@ -379,11 +379,15 @@ pub async fn do_upkeep( } if !result_context.empty() { + let processing_deadline_reset = result_context.processing_deadline_reset; + debug!( result_context.completed, result_context.deadlettered, result_context.discarded, - result_context.processing_deadline_reset, + processing_deadline_amo = processing_deadline_reset.at_most_once, + processing_deadline_non_amo_unsent = processing_deadline_reset.non_amo_unsent, + processing_deadline_non_amo_sent = processing_deadline_reset.non_amo_sent, result_context.processing_attempts_exceeded, result_context.expired, result_context.retried, @@ -413,11 +417,28 @@ pub async fn do_upkeep( .increment(result_context.discarded); metrics::counter!("upkeep.cleanup_action", "kind" => "mark_processing_attempts_exceeded_as_failure") .increment(result_context.processing_attempts_exceeded); - metrics::counter!("upkeep.cleanup_action", "kind" => "mark_processing_deadline_exceeded_as_failure") - .increment(result_context.processing_deadline_reset); metrics::counter!("upkeep.cleanup_action", "kind" => "mark_delay_elapsed_as_pending") .increment(result_context.delay_elapsed); + // Processing deadlines + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_amo_failure" + ) + .increment(result_context.processing_deadline_reset.at_most_once); + + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_non_amo_unsent" + ) + .increment(result_context.processing_deadline_reset.non_amo_unsent); + + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_non_amo_sent" + ) + .increment(result_context.processing_deadline_reset.non_amo_sent); + // Forwarded tasks metrics::counter!("upkeep.forwarded_tasks").increment(result_context.forwarded); @@ -510,7 +531,7 @@ mod tests { use crate::{ config::Config, runtime_config::RuntimeConfigManager, - store::inflight_activation::InflightActivationStatus, + store::inflight_activation::{InflightActivationStatus, ProcessingDeadlineCounts}, test_utils::{ StatusCount, assert_counts, consume_topic, create_config, create_integration_config, create_integration_config_with_topic, create_producer, create_test_store, @@ -773,7 +794,7 @@ mod tests { store.as_ref(), ) .await; - assert_eq!(result_context.processing_deadline_reset, 0); + assert_eq!(result_context.processing_deadline_reset.total(), 0); } #[tokio::test] @@ -823,7 +844,14 @@ mod tests { .await; // 0 processing, 2 pending now - assert_eq!(result_context.processing_deadline_reset, 1); + assert_eq!( + result_context.processing_deadline_reset, + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 1, + non_amo_sent: 0, + } + ); assert_counts( StatusCount { processing: 0, From 1d248a1b08d5a0608f20c63fdbf0ba920131e224 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:23:48 -0700 Subject: [PATCH 3/4] Split Postgres Changes into Migrations --- pg_migrations/0001_create_inflight_activations.sql | 8 +------- pg_migrations/0002_add_bucket.sql | 3 +++ pg_migrations/0003_add_sent.sql | 4 ++++ 3 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 pg_migrations/0002_add_bucket.sql create mode 100644 pg_migrations/0003_add_sent.sql diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index 7b31baa1..ee8b26a4 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -16,11 +16,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( application TEXT NOT NULL, namespace TEXT NOT NULL, taskname TEXT NOT NULL, - on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, - bucket SMALLINT NOT NULL DEFAULT 0, - sent BOOLEAN NOT NULL DEFAULT FALSE + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 ); - --- Supports pending claim queries (status, filters, ordering) including sent -CREATE INDEX IF NOT EXISTS idx_inflight_taskactivations_claim -ON inflight_taskactivations (status, bucket, sent); diff --git a/pg_migrations/0002_add_bucket.sql b/pg_migrations/0002_add_bucket.sql new file mode 100644 index 00000000..6283c667 --- /dev/null +++ b/pg_migrations/0002_add_bucket.sql @@ -0,0 +1,3 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0; + +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/pg_migrations/0003_add_sent.sql b/pg_migrations/0003_add_sent.sql new file mode 100644 index 00000000..05fa6384 --- /dev/null +++ b/pg_migrations/0003_add_sent.sql @@ -0,0 +1,4 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN sent BOOLEAN NOT NULL DEFAULT FALSE; + +DROP INDEX IF EXISTS idx_activation_claim; +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket, sent); From 688dc04dc86dbafb3f39ad080e3f72305464c814 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:38:51 -0700 Subject: [PATCH 4/4] Handle Claim One Invariant Gracefully --- src/grpc/server.rs | 1 + src/store/inflight_activation.rs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index bd2db13d..727d8cfb 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -137,6 +137,7 @@ impl ConsumerService for TaskbrokerServer { Err(Status::internal("Unable to fetch next task")) } + // If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })), Ok(Some(inflight)) => { diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 7600c3ad..27df2100 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -458,9 +458,11 @@ pub trait InflightActivationStore: Send + Sync { .await?; // If we are getting more than one task here, something is broken - assert!(rows.len() <= 1); - - Ok(rows.pop()) + if rows.len() > 1 { + Err(anyhow!("Found more than one row despite limit of one")) + } else { + Ok(rows.pop()) + } } /// Record successful push.