diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 2a09b51c..fa6ff277 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -59,7 +59,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { let mut num_activations_processed = 0; while store - .get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns))) + .get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns)), None) .await .unwrap() .is_some() diff --git a/migrations/0005_add_bucket.sql b/migrations/0005_add_bucket.sql new file mode 100644 index 00000000..eb5d5bab --- /dev/null +++ b/migrations/0005_add_bucket.sql @@ -0,0 +1 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN bucket INTEGER NOT NULL DEFAULT 0; diff --git a/pg_migrations/0002_add_bucket.sql b/pg_migrations/0002_add_bucket.sql new file mode 100644 index 00000000..6283c667 --- /dev/null +++ b/pg_migrations/0002_add_bucket.sql @@ -0,0 +1,3 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0; + +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/src/config.rs b/src/config.rs index a547304e..a10c6490 100644 --- a/src/config.rs +++ b/src/config.rs @@ -254,7 +254,8 @@ pub struct Config { /// How to deliver tasks to workers: "push" or "pull". pub delivery_mode: DeliveryMode, - /// The number of concurrent dispatchers to run. + /// The number of concurrent fetch loops in push mode, which should be ≤ `MAX_FETCH_THREADS` and a power of two. + /// If it's not a power of two or it's too large, it will be rounded to a valid nearby value. pub fetch_threads: usize, /// Time in milliseconds to wait between fetch attempts when no pending activation is found. diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 5fb81715..91022422 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -9,7 +9,38 @@ use tracing::{debug, info, warn}; use crate::config::Config; use crate::push::{PushError, PushPool}; -use crate::store::inflight_activation::{InflightActivation, InflightActivationStore}; +use crate::store::inflight_activation::{BucketRange, InflightActivation, InflightActivationStore}; + +// This should be a power of two. +pub const MAX_FETCH_THREADS: i16 = 256; + +/// Returns the largest positive divisor of [`MAX_FETCH_THREADS`] that is also a power of two. +pub fn normalize_fetch_threads(n: usize) -> usize { + let n = n.max(1); + let mut v = MAX_FETCH_THREADS; + + while v > 1 { + if (v as usize) <= n { + return v as usize; + } + + v /= 2; + } + + 1 +} + +/// Inclusive bucket range for fetch thread `thread_index` when using `fetch_threads` concurrent fetch loops. +/// Requires `fetch_threads` to divide [`MAX_FETCH_THREADS`] (enforced via [`normalize_fetch_threads`]). +pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize) -> BucketRange { + let maximum = MAX_FETCH_THREADS as usize; + let buckets_per_range = maximum / fetch_threads; + + let low = (thread_index * buckets_per_range) as i16; + let high = ((thread_index + 1) * buckets_per_range - 1) as i16; + + (low, high) +} /// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further. #[async_trait] @@ -51,14 +82,16 @@ impl FetchPool { } } - /// Spawn `config.fetch_threads` asynchronous tasks, each of which repeatedly moves pending activations from the store to the push pool until the shutdown signal is received. + /// Spawns one task per effective fetch thread ([`normalize_fetch_threads`]), each claiming pending work only in its bucket subrange. pub async fn start(&self) -> Result<()> { let fetch_wait_ms = self.config.fetch_wait_ms; + let fetch_threads = normalize_fetch_threads(self.config.fetch_threads); - let mut fetch_pool = crate::tokio::spawn_pool(self.config.fetch_threads, |_| { + let mut fetch_pool = crate::tokio::spawn_pool(fetch_threads, |thread_index| { let store = self.store.clone(); let pusher = self.pusher.clone(); let config = self.config.clone(); + let bucket_filter = Some(bucket_range_for_fetch_thread(thread_index, fetch_threads)); let guard = get_shutdown_guard().shutdown_on_drop(); @@ -80,7 +113,10 @@ impl FetchPool { let application = config.application.as_deref(); let namespaces = config.namespaces.as_deref(); - match store.get_pending_activation(application, namespaces).await { + match store + .get_pending_activation(application, namespaces, bucket_filter) + .await + { Ok(Some(activation)) => { let id = activation.id.clone(); diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index a77a9aac..aed8755e 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -9,8 +9,8 @@ use tonic::async_trait; use super::*; use crate::config::Config; use crate::push::PushError; -use crate::store::inflight_activation::InflightActivation; use crate::store::inflight_activation::InflightActivationStore; +use crate::store::inflight_activation::{BucketRange, InflightActivation}; use crate::store::inflight_activation::{ FailedTasksForwarder, InflightActivationStatus, QueryResult, }; @@ -74,6 +74,7 @@ impl InflightActivationStore for MockStore { &self, _application: Option<&str>, _namespaces: Option<&[String]>, + _bucket: Option, ) -> Result, Error> { if self.fail { return Err(anyhow!("mock store error")); @@ -87,6 +88,7 @@ impl InflightActivationStore for MockStore { _application: Option<&str>, _namespaces: Option<&[String]>, _limit: Option, + _bucket: Option, ) -> Result, Error> { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 03db46ba..cc97b6aa 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -39,7 +39,7 @@ impl ConsumerService for TaskbrokerServer { let inflight = self .store - .get_pending_activation(application.as_deref(), namespaces) + .get_pending_activation(application.as_deref(), namespaces, None) .await; match inflight { @@ -125,7 +125,7 @@ impl ConsumerService for TaskbrokerServer { let namespaces = namespace.as_ref().map(std::slice::from_ref); let res = match self .store - .get_pending_activation(application.as_deref(), namespaces) + .get_pending_activation(application.as_deref(), namespaces, None) .await { Err(e) => { diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index d0ea263e..28485606 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use crate::config::Config; +use crate::fetch::MAX_FETCH_THREADS; use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus}; use anyhow::{Error, anyhow}; use chrono::{DateTime, Utc}; @@ -8,6 +9,7 @@ use prost::Message as _; use rdkafka::{Message, message::OwnedMessage}; use sentry_protos::taskbroker::v1::OnAttemptsExceeded; use sentry_protos::taskbroker::v1::TaskActivation; +use uuid::Uuid; pub struct DeserializeActivationConfig { pub max_delayed_allowed: u64, @@ -21,6 +23,13 @@ impl DeserializeActivationConfig { } } +/// Use the UUID of an activation to determine its bucket. +pub fn bucket_from_id(id: &str) -> i16 { + Uuid::parse_str(id) + .map(|u| (u.as_u128() % MAX_FETCH_THREADS as u128) as i16) + .unwrap_or(0) +} + pub fn new( config: DeserializeActivationConfig, ) -> impl Fn(Arc) -> Result { @@ -78,6 +87,8 @@ pub fn new( .try_into() .unwrap(); + let bucket = bucket_from_id(&activation.id); + Ok(InflightActivation { id: activation.id.clone(), activation: payload.to_vec(), @@ -96,6 +107,7 @@ pub fn new( namespace, taskname, on_attempts_exceeded, + bucket, }) } } diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 4cd29b12..c68702fd 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -27,6 +27,8 @@ use tracing::{instrument, warn}; use crate::config::Config; +pub type BucketRange = (i16, i16); + /// The members of this enum should be synced with the members /// of InflightActivationStatus in sentry_protos #[derive(Clone, Copy, Debug, PartialEq, Eq, Type)] @@ -173,6 +175,10 @@ pub struct InflightActivation { /// are exceeded. #[builder(default = false)] pub at_most_once: bool, + + /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. + #[builder(setter(skip), default = "0")] + pub bucket: i16, } impl InflightActivation { @@ -235,6 +241,7 @@ pub struct TableRow { pub taskname: String, #[sqlx(try_from = "i32")] pub on_attempts_exceeded: OnAttemptsExceeded, + pub bucket: i16, } impl TryFrom for TableRow { @@ -259,6 +266,7 @@ impl TryFrom for TableRow { namespace: value.namespace, taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, }) } } @@ -283,6 +291,7 @@ impl From for InflightActivation { namespace: value.namespace, taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, } } } @@ -352,11 +361,12 @@ pub trait InflightActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: Vec) -> Result; - /// Get a single pending activation, optionally filtered by namespaces. + /// Get a single pending activation, optionally filtered by namespaces and bucket subrange. async fn get_pending_activation( &self, application: Option<&str>, namespaces: Option<&[String]>, + bucket: Option, ) -> Result, Error> { if namespaces.is_some() && application.is_none() { warn!( @@ -367,7 +377,7 @@ pub trait InflightActivationStore: Send + Sync { } let results = self - .get_pending_activations(application, namespaces, Some(1)) + .get_pending_activations(application, namespaces, Some(1), bucket) .await?; if results.is_empty() { @@ -383,6 +393,7 @@ pub trait InflightActivationStore: Send + Sync { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + bucket: Option, ) -> Result, Error>; /// Get the age of the oldest pending activation in seconds @@ -698,7 +709,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -739,7 +751,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket ) ", ); @@ -772,6 +785,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.namespace); b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -809,6 +823,7 @@ impl InflightActivationStore for SqliteActivationStore { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + bucket: Option, ) -> Result, Error> { let now = Utc::now(); @@ -849,6 +864,12 @@ impl InflightActivationStore for SqliteActivationStore { } query_builder.push(")"); } + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } query_builder.push(" ORDER BY added_at"); if let Some(limit) = limit { query_builder.push(" LIMIT "); @@ -989,7 +1010,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE status = $1 ", diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 010f35ad..e9b03048 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -181,7 +181,7 @@ async fn test_get_pending_activation(#[case] adapter: &str) { assert!(store.store(batch.clone()).await.is_ok()); let result = store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .unwrap(); @@ -205,6 +205,45 @@ async fn test_get_pending_activation(#[case] adapter: &str) { store.remove_db().await.unwrap(); } +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + + let mut batch = make_activations(2); + batch[0].bucket = 10; + batch[1].bucket = 20; + assert!(store.store(batch).await.is_ok()); + + let first = store + .get_pending_activation(None, None, Some((15, 25))) + .await + .unwrap() + .unwrap(); + assert_eq!(first.id, "id_1"); + assert_eq!(first.bucket, 20); + + let second = store + .get_pending_activation(None, None, Some((0, 15))) + .await + .unwrap() + .unwrap(); + assert_eq!(second.id, "id_0"); + assert_eq!(second.bucket, 10); + + assert!( + store + .get_pending_activation(None, None, Some((15, 25))) + .await + .unwrap() + .is_none() + ); + + store.remove_db().await.unwrap(); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 32)] #[rstest] #[case::sqlite("sqlite")] @@ -232,7 +271,7 @@ async fn test_get_pending_activation_with_race(#[case] adapter: &str) { join_set.spawn(async move { rx.recv().await.unwrap(); store - .get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns))) + .get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns)), None) .await .unwrap() .unwrap() @@ -266,7 +305,11 @@ async fn test_get_pending_activation_with_namespace(#[case] adapter: &str) { let other_namespace = "other_namespace".to_string(); // Get activation from other namespace let result = store - .get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&other_namespace))) + .get_pending_activation( + Some("sentry"), + Some(std::slice::from_ref(&other_namespace)), + None, + ) .await .unwrap() .unwrap(); @@ -294,7 +337,7 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & // Get activation from multiple namespaces (should get oldest) let namespaces = vec!["ns2".to_string(), "ns3".to_string()]; let result = store - .get_pending_activations(None, Some(&namespaces), None) + .get_pending_activations(None, Some(&namespaces), None, None) .await .unwrap(); @@ -323,7 +366,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] // to fetch tasks from any application. let other_namespace = "other_namespace".to_string(); let opt = store - .get_pending_activation(None, Some(std::slice::from_ref(&other_namespace))) + .get_pending_activation(None, Some(std::slice::from_ref(&other_namespace)), None) .await .unwrap(); assert!(opt.is_none()); @@ -331,7 +374,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] // We allow no application in this method because of usage in upkeep let namespaces = vec!["other_namespace".to_string()]; let activations = store - .get_pending_activations(None, Some(&namespaces), Some(2)) + .get_pending_activations(None, Some(&namespaces), Some(2), None) .await .unwrap(); assert_eq!( @@ -362,7 +405,7 @@ async fn test_get_pending_activation_skip_expires(#[case] adapter: &str) { batch[0].expires_at = Some(Utc::now() - Duration::from_secs(100)); assert!(store.store(batch.clone()).await.is_ok()); - let result = store.get_pending_activation(None, None).await; + let result = store.get_pending_activation(None, None, None).await; assert!(result.is_ok()); let res_option = result.unwrap(); assert!(res_option.is_none()); @@ -392,7 +435,7 @@ async fn test_get_pending_activation_earliest(#[case] adapter: &str) { assert!(ret.is_ok(), "{}", ret.err().unwrap().to_string()); let result = store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .unwrap(); @@ -417,7 +460,7 @@ async fn test_get_pending_activation_fetches_application(#[case] adapter: &str) // Getting an activation with no application filter should // include activations with application set. let result = store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .unwrap(); @@ -441,7 +484,7 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { // Get activation from a named application let result = store - .get_pending_activation(Some("hammers"), None) + .get_pending_activation(Some("hammers"), None, None) .await .unwrap() .unwrap(); @@ -451,7 +494,7 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { assert_eq!(result.application, "hammers"); let result_opt = store - .get_pending_activation(Some("hammers"), None) + .get_pending_activation(Some("hammers"), None, None) .await .unwrap(); assert!( @@ -459,7 +502,10 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { "no pending activations in hammers left" ); - let result_opt = store.get_pending_activation(None, None).await.unwrap(); + let result_opt = store + .get_pending_activation(None, None, None) + .await + .unwrap(); assert!(result_opt.is_some(), "one pending activation in '' left"); store.remove_db().await.unwrap(); } @@ -484,7 +530,11 @@ async fn test_get_pending_activation_with_application_and_namespace(#[case] adap let target_ns = "target".to_string(); // Get activation from a named application let result = store - .get_pending_activation(Some("hammers"), Some(std::slice::from_ref(&target_ns))) + .get_pending_activation( + Some("hammers"), + Some(std::slice::from_ref(&target_ns)), + None, + ) .await .unwrap() .unwrap(); @@ -495,7 +545,7 @@ async fn test_get_pending_activation_with_application_and_namespace(#[case] adap assert_eq!(result.namespace, "target"); let result = store - .get_pending_activation(Some("hammers"), None) + .get_pending_activation(Some("hammers"), None, None) .await .unwrap() .unwrap(); @@ -600,7 +650,7 @@ async fn test_set_activation_status(#[case] adapter: &str) { .await; assert!( store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .is_none() diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index 8d472713..9b9878a6 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -1,6 +1,6 @@ use crate::store::inflight_activation::{ - FailedTasksForwarder, InflightActivation, InflightActivationStatus, InflightActivationStore, - QueryResult, TableRow, + BucketRange, FailedTasksForwarder, InflightActivation, InflightActivationStatus, + InflightActivationStore, QueryResult, TableRow, }; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -175,7 +175,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -216,7 +217,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket ) ", ); @@ -248,6 +250,7 @@ impl InflightActivationStore for PostgresActivationStore { b.push_bind(row.namespace); b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -264,11 +267,12 @@ impl InflightActivationStore for PostgresActivationStore { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + bucket: Option, ) -> Result, Error> { let now = Utc::now(); let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new( + let mut query_builder = QueryBuilder::::new( "WITH selected_activations AS ( SELECT id FROM inflight_taskactivations @@ -294,6 +298,15 @@ impl InflightActivationStore for PostgresActivationStore { } query_builder.push(")"); } + + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); + + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } + query_builder.push(" ORDER BY added_at"); if let Some(limit) = limit { query_builder.push(" LIMIT "); @@ -443,7 +456,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE status = $1 ", diff --git a/src/upkeep.rs b/src/upkeep.rs index 3c6bfeb4..a149c9a9 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -304,7 +304,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .get_pending_activations(None, Some(&demoted_namespaces), None) + .get_pending_activations(None, Some(&demoted_namespaces), None, None) .await { // Produce tasks to Kafka with updated namespace @@ -1175,7 +1175,7 @@ mod tests { ); assert_eq!( store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .unwrap() @@ -1184,7 +1184,7 @@ mod tests { ); assert!( store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .is_none() @@ -1210,7 +1210,7 @@ mod tests { ); assert_eq!( store - .get_pending_activation(None, None) + .get_pending_activation(None, None, None) .await .unwrap() .unwrap()