Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) {
let mut num_activations_processed = 0;

while store
.get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns)))
.get_pending_activation(Some("sentry"), Some(std::slice::from_ref(&ns)), None)
.await
.unwrap()
.is_some()
Expand Down
1 change: 1 addition & 0 deletions migrations/0005_add_bucket.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE inflight_taskactivations ADD COLUMN bucket INTEGER NOT NULL DEFAULT 0;
3 changes: 3 additions & 0 deletions pg_migrations/0002_add_bucket.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0;

CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket);
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ pub struct Config {
/// How to deliver tasks to workers: "push" or "pull".
pub delivery_mode: DeliveryMode,

/// The number of concurrent dispatchers to run.
/// The number of concurrent fetch loops in push mode, which should be ≤ `MAX_FETCH_THREADS` and a power of two.
/// If it's not a power of two or it's too large, it will be rounded to a valid nearby value.
pub fetch_threads: usize,

/// Time in milliseconds to wait between fetch attempts when no pending activation is found.
Expand Down
44 changes: 40 additions & 4 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,38 @@ use tracing::{debug, info, warn};

use crate::config::Config;
use crate::push::{PushError, PushPool};
use crate::store::inflight_activation::{InflightActivation, InflightActivationStore};
use crate::store::inflight_activation::{BucketRange, InflightActivation, InflightActivationStore};

// This should be a power of two.
pub const MAX_FETCH_THREADS: i16 = 256;

/// Returns the largest positive divisor of [`MAX_FETCH_THREADS`] that is also a power of two.
pub fn normalize_fetch_threads(n: usize) -> usize {
let n = n.max(1);
let mut v = MAX_FETCH_THREADS;

while v > 1 {
if (v as usize) <= n {
return v as usize;
}

v /= 2;
}

1
}

/// Inclusive bucket range for fetch thread `thread_index` when using `fetch_threads` concurrent fetch loops.
/// Requires `fetch_threads` to divide [`MAX_FETCH_THREADS`] (enforced via [`normalize_fetch_threads`]).
pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize) -> BucketRange {
let maximum = MAX_FETCH_THREADS as usize;
let buckets_per_range = maximum / fetch_threads;

let low = (thread_index * buckets_per_range) as i16;
let high = ((thread_index + 1) * buckets_per_range - 1) as i16;

(low, high)
}

/// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further.
#[async_trait]
Expand Down Expand Up @@ -51,14 +82,16 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}
}

/// Spawn `config.fetch_threads` asynchronous tasks, each of which repeatedly moves pending activations from the store to the push pool until the shutdown signal is received.
/// Spawns one task per effective fetch thread ([`normalize_fetch_threads`]), each claiming pending work only in its bucket subrange.
pub async fn start(&self) -> Result<()> {
let fetch_wait_ms = self.config.fetch_wait_ms;
let fetch_threads = normalize_fetch_threads(self.config.fetch_threads);

let mut fetch_pool = crate::tokio::spawn_pool(self.config.fetch_threads, |_| {
let mut fetch_pool = crate::tokio::spawn_pool(fetch_threads, |thread_index| {
let store = self.store.clone();
let pusher = self.pusher.clone();
let config = self.config.clone();
let bucket_filter = Some(bucket_range_for_fetch_thread(thread_index, fetch_threads));

let guard = get_shutdown_guard().shutdown_on_drop();

Expand All @@ -80,7 +113,10 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
let application = config.application.as_deref();
let namespaces = config.namespaces.as_deref();

match store.get_pending_activation(application, namespaces).await {
match store
.get_pending_activation(application, namespaces, bucket_filter)
.await
{
Ok(Some(activation)) => {
let id = activation.id.clone();

Expand Down
4 changes: 3 additions & 1 deletion src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use tonic::async_trait;
use super::*;
use crate::config::Config;
use crate::push::PushError;
use crate::store::inflight_activation::InflightActivation;
use crate::store::inflight_activation::InflightActivationStore;
use crate::store::inflight_activation::{BucketRange, InflightActivation};
use crate::store::inflight_activation::{
FailedTasksForwarder, InflightActivationStatus, QueryResult,
};
Expand Down Expand Up @@ -74,6 +74,7 @@ impl InflightActivationStore for MockStore {
&self,
_application: Option<&str>,
_namespaces: Option<&[String]>,
_bucket: Option<BucketRange>,
) -> Result<Option<InflightActivation>, Error> {
if self.fail {
return Err(anyhow!("mock store error"));
Expand All @@ -87,6 +88,7 @@ impl InflightActivationStore for MockStore {
_application: Option<&str>,
_namespaces: Option<&[String]>,
_limit: Option<i32>,
_bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error> {
unimplemented!()
}
Expand Down
4 changes: 2 additions & 2 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl ConsumerService for TaskbrokerServer {

let inflight = self
.store
.get_pending_activation(application.as_deref(), namespaces)
.get_pending_activation(application.as_deref(), namespaces, None)
.await;

match inflight {
Expand Down Expand Up @@ -125,7 +125,7 @@ impl ConsumerService for TaskbrokerServer {
let namespaces = namespace.as_ref().map(std::slice::from_ref);
let res = match self
.store
.get_pending_activation(application.as_deref(), namespaces)
.get_pending_activation(application.as_deref(), namespaces, None)
.await
{
Err(e) => {
Expand Down
12 changes: 12 additions & 0 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use crate::config::Config;
use crate::fetch::MAX_FETCH_THREADS;
use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus};
use anyhow::{Error, anyhow};
use chrono::{DateTime, Utc};
use prost::Message as _;
use rdkafka::{Message, message::OwnedMessage};
use sentry_protos::taskbroker::v1::OnAttemptsExceeded;
use sentry_protos::taskbroker::v1::TaskActivation;
use uuid::Uuid;

pub struct DeserializeActivationConfig {
pub max_delayed_allowed: u64,
Expand All @@ -21,6 +23,13 @@ impl DeserializeActivationConfig {
}
}

/// Use the UUID of an activation to determine its bucket.
pub fn bucket_from_id(id: &str) -> i16 {
Uuid::parse_str(id)
.map(|u| (u.as_u128() % MAX_FETCH_THREADS as u128) as i16)
.unwrap_or(0)
}

pub fn new(
config: DeserializeActivationConfig,
) -> impl Fn(Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
Expand Down Expand Up @@ -78,6 +87,8 @@ pub fn new(
.try_into()
.unwrap();

let bucket = bucket_from_id(&activation.id);

Ok(InflightActivation {
id: activation.id.clone(),
activation: payload.to_vec(),
Expand All @@ -96,6 +107,7 @@ pub fn new(
namespace,
taskname,
on_attempts_exceeded,
bucket,
})
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/store/inflight_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use tracing::{instrument, warn};

use crate::config::Config;

pub type BucketRange = (i16, i16);

/// The members of this enum should be synced with the members
/// of InflightActivationStatus in sentry_protos
#[derive(Clone, Copy, Debug, PartialEq, Eq, Type)]
Expand Down Expand Up @@ -173,6 +175,10 @@ pub struct InflightActivation {
/// are exceeded.
#[builder(default = false)]
pub at_most_once: bool,

/// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion.
#[builder(setter(skip), default = "0")]
pub bucket: i16,
}

impl InflightActivation {
Expand Down Expand Up @@ -235,6 +241,7 @@ pub struct TableRow {
pub taskname: String,
#[sqlx(try_from = "i32")]
pub on_attempts_exceeded: OnAttemptsExceeded,
pub bucket: i16,
}

impl TryFrom<InflightActivation> for TableRow {
Expand All @@ -259,6 +266,7 @@ impl TryFrom<InflightActivation> for TableRow {
namespace: value.namespace,
taskname: value.taskname,
on_attempts_exceeded: value.on_attempts_exceeded,
bucket: value.bucket,
})
}
}
Expand All @@ -283,6 +291,7 @@ impl From<TableRow> for InflightActivation {
namespace: value.namespace,
taskname: value.taskname,
on_attempts_exceeded: value.on_attempts_exceeded,
bucket: value.bucket,
}
}
}
Expand Down Expand Up @@ -352,11 +361,12 @@ pub trait InflightActivationStore: Send + Sync {
/// Store a batch of activations
async fn store(&self, batch: Vec<InflightActivation>) -> Result<QueryResult, Error>;

/// Get a single pending activation, optionally filtered by namespaces.
/// Get a single pending activation, optionally filtered by namespaces and bucket subrange.
async fn get_pending_activation(
&self,
application: Option<&str>,
namespaces: Option<&[String]>,
bucket: Option<BucketRange>,
) -> Result<Option<InflightActivation>, Error> {
if namespaces.is_some() && application.is_none() {
warn!(
Expand All @@ -367,7 +377,7 @@ pub trait InflightActivationStore: Send + Sync {
}

let results = self
.get_pending_activations(application, namespaces, Some(1))
.get_pending_activations(application, namespaces, Some(1), bucket)
.await?;

if results.is_empty() {
Expand All @@ -383,6 +393,7 @@ pub trait InflightActivationStore: Send + Sync {
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error>;

/// Get the age of the oldest pending activation in seconds
Expand Down Expand Up @@ -698,7 +709,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE id = $1
",
Expand Down Expand Up @@ -739,7 +751,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
)
",
);
Expand Down Expand Up @@ -772,6 +785,7 @@ impl InflightActivationStore for SqliteActivationStore {
b.push_bind(row.namespace);
b.push_bind(row.taskname);
b.push_bind(row.on_attempts_exceeded as i32);
b.push_bind(row.bucket);
})
.push(" ON CONFLICT(id) DO NOTHING")
.build();
Expand Down Expand Up @@ -809,6 +823,7 @@ impl InflightActivationStore for SqliteActivationStore {
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error> {
let now = Utc::now();

Expand Down Expand Up @@ -849,6 +864,12 @@ impl InflightActivationStore for SqliteActivationStore {
}
query_builder.push(")");
}
if let Some((min, max)) = bucket {
query_builder.push(" AND bucket >= ");
query_builder.push_bind(min);
query_builder.push(" AND bucket <= ");
query_builder.push_bind(max);
}
query_builder.push(" ORDER BY added_at");
if let Some(limit) = limit {
query_builder.push(" LIMIT ");
Expand Down Expand Up @@ -989,7 +1010,8 @@ impl InflightActivationStore for SqliteActivationStore {
application,
namespace,
taskname,
on_attempts_exceeded
on_attempts_exceeded,
bucket
FROM inflight_taskactivations
WHERE status = $1
",
Expand Down
Loading
Loading