Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,11 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) {
join_set.spawn(async move {
let mut num_activations_processed = 0;

while !store
.get_pending_activations(
Some("sentry"),
Some(std::slice::from_ref(&ns)),
Some(1),
None,
)
while store
.claim_activation_for_pull(Some("sentry"), Some(&ns))
.await
.unwrap()
.is_empty()
.is_some()
{
num_activations_processed += 1;
}
Expand Down
1 change: 1 addition & 0 deletions migrations/0006_add_sent.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE inflight_taskactivations ADD COLUMN sent INTEGER NOT NULL DEFAULT 0;
4 changes: 4 additions & 0 deletions pg_migrations/0003_add_sent.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE inflight_taskactivations ADD COLUMN sent BOOLEAN NOT NULL DEFAULT FALSE;

DROP INDEX IF EXISTS idx_activation_claim;
CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket, sent);
2 changes: 1 addition & 1 deletion src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
let namespaces = config.namespaces.as_deref();

match store
.get_pending_activations(application, namespaces, limit, bucket)
.claim_activations_for_push(application, namespaces, limit, bucket)
.await
{
Ok(activations) if activations.is_empty() => {
Expand Down
22 changes: 10 additions & 12 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use tonic::async_trait;
use super::*;
use crate::config::Config;
use crate::push::PushError;
use crate::store::inflight_activation::InflightActivationStore;
use crate::store::inflight_activation::{BucketRange, InflightActivation};
use crate::store::inflight_activation::{
FailedTasksForwarder, InflightActivationStatus, QueryResult,
};
use crate::store::inflight_activation::{InflightActivationStore, ProcessingDeadlineCounts};
use crate::test_utils::make_activations;

/// Store stub that returns one activation once OR is always empty OR always fails.
Expand Down Expand Up @@ -70,31 +70,29 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn get_pending_activations(
async fn claim_activations(
&self,
_application: Option<&str>,
_namespaces: Option<&[String]>,
_limit: Option<i32>,
_bucket: Option<BucketRange>,
mark_sent: bool,
) -> Result<Vec<InflightActivation>, Error> {
if self.fail {
return Err(anyhow!("mock store error"));
}

Ok(match self.pending.lock().await.take() {
Some(a) => vec![a],
Some(mut a) => {
a.sent = mark_sent;
vec![a]
}
None => vec![],
})
}

async fn get_pending_activations_from_namespaces(
&self,
_application: Option<&str>,
_namespaces: Option<&[String]>,
_limit: Option<i32>,
_bucket: Option<BucketRange>,
) -> Result<Vec<InflightActivation>, Error> {
unimplemented!()
async fn mark_activation_sent(&self, _id: &str) -> Result<(), Error> {
Ok(())
}

async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
Expand Down Expand Up @@ -137,7 +135,7 @@ impl InflightActivationStore for MockStore {
unimplemented!()
}

async fn handle_processing_deadline(&self) -> Result<u64, Error> {
async fn handle_processing_deadline(&self) -> Result<ProcessingDeadlineCounts, Error> {
unimplemented!()
}

Expand Down
45 changes: 8 additions & 37 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,16 @@ impl ConsumerService for TaskbrokerServer {

let application = &request.get_ref().application;
let namespace = &request.get_ref().namespace;
let namespaces = namespace.as_ref().map(std::slice::from_ref);

let inflight = self
.store
.get_pending_activations(application.as_deref(), namespaces, Some(1), None)
.claim_activation_for_pull(application.as_deref(), namespace.as_deref())
.await;

match inflight {
Ok(activations) if activations.is_empty() => {
Err(Status::not_found("No pending activation"))
}

Ok(activations) if activations.len() > 1 => {
error!(
count = activations.len(),
application = ?application,
namespace = ?namespace,
"get_pending_activations returned more than one row despite limit of 1",
);

Err(Status::internal("Unable to retrieve pending activation"))
}
Ok(None) => Err(Status::not_found("No pending activation")),

Ok(activations) => {
let inflight = &activations[0];
Ok(Some(inflight)) => {
let now = Utc::now();

if inflight.processing_attempts < 1 {
Expand Down Expand Up @@ -141,35 +126,21 @@ impl ConsumerService for TaskbrokerServer {
};

let start_time = Instant::now();
let namespaces = namespace.as_ref().map(std::slice::from_ref);

let res = match self
.store
.get_pending_activations(application.as_deref(), namespaces, Some(1), None)
.claim_activation_for_pull(application.as_deref(), namespace.as_deref())
.await
{
Err(e) => {
error!("Unable to fetch next task: {:?}", e);
Err(Status::internal("Unable to fetch next task"))
}

Ok(activations) if activations.is_empty() => {
Err(Status::not_found("No pending activation"))
}

Ok(activations) if activations.len() > 1 => {
error!(
count = activations.len(),
application = ?application,
namespace = ?namespace,
"get_pending_activations returned more than one row despite limit of 1",
);

Err(Status::internal("Unable to fetch next task"))
}

Ok(activations) => {
let inflight = &activations[0];
// If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired
Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })),

Ok(Some(inflight)) => {
if inflight.processing_attempts < 1 {
let now = Utc::now();
let received_to_gettask_latency = inflight.received_latency(now);
Expand Down
26 changes: 15 additions & 11 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use crate::config::{Config, DeliveryMode};
use crate::grpc::server::TaskbrokerServer;
use crate::store::inflight_activation::InflightActivationStatus;
use prost::Message;
use rstest::rstest;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
Expand Down Expand Up @@ -111,7 +112,10 @@ async fn test_get_task_success(#[case] adapter: &str) {
let activations = make_activations(1);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store, config };
let service = TaskbrokerServer {
store: store.clone(),
config,
};
let request = GetTaskRequest {
namespace: None,
application: None,
Expand All @@ -122,6 +126,10 @@ async fn test_get_task_success(#[case] adapter: &str) {
assert!(resp.get_ref().task.is_some());
let task = resp.get_ref().task.as_ref().unwrap();
assert!(task.id == "id_0");

let row = store.get_by_id("id_0").await.unwrap().expect("claimed row");
assert!(row.sent);
assert_eq!(row.status, InflightActivationStatus::Processing);
}

#[tokio::test]
Expand Down Expand Up @@ -291,9 +299,8 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) {
}),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());
let e = response.unwrap_err();
assert_eq!(e.code(), Code::NotFound);
assert!(response.is_ok());
assert!(response.unwrap().get_ref().task.is_none());
}

#[tokio::test]
Expand All @@ -320,12 +327,9 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte
}),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());

let resp = response.unwrap_err();
assert_eq!(
resp.code(),
Code::NotFound,
"No task found as namespace without filter is invalid."
assert!(response.is_ok());
assert!(
response.unwrap().get_ref().task.is_none(),
"namespace without application yields no next task in response"
);
}
1 change: 1 addition & 0 deletions src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub fn new(
taskname,
on_attempts_exceeded,
bucket,
sent: false,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ async fn main() -> Result<(), Error> {
});

// Initialize push and fetch pools
let push_pool = Arc::new(PushPool::new(config.clone()));
let push_pool = Arc::new(PushPool::new(config.clone(), store.clone()));
let fetch_pool = FetchPool::new(store.clone(), config.clone(), push_pool.clone());

// Initialize push threads
Expand Down
34 changes: 30 additions & 4 deletions src/push/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tonic::transport::Channel;
use tracing::{debug, error, info};

use crate::config::Config;
use crate::store::inflight_activation::InflightActivation;
use crate::store::inflight_activation::{InflightActivation, InflightActivationStore};

type HmacSha256 = Hmac<Sha256>;

Expand Down Expand Up @@ -88,26 +88,32 @@ pub struct PushPool {

/// Taskbroker configuration.
config: Arc<Config>,

/// Activation store, which we need for marking tasks as sent.
store: Arc<dyn InflightActivationStore>,
}

impl PushPool {
/// Initialize a new push pool.
pub fn new(config: Arc<Config>) -> Self {
pub fn new(config: Arc<Config>, store: Arc<dyn InflightActivationStore>) -> Self {
let (sender, receiver) = flume::bounded(config.push_queue_size);

Self {
sender,
receiver,
config,
store,
}
}

/// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received.
pub async fn start(&self) -> Result<()> {
let store = self.store.clone();
let mut push_pool: JoinSet<Result<()>> =
crate::tokio::spawn_pool(self.config.push_threads, |_| {
let endpoint = self.config.worker_endpoint.clone();
let receiver = self.receiver.clone();
let store = store.clone();

let guard = get_shutdown_guard().shutdown_on_drop();

Expand Down Expand Up @@ -158,7 +164,17 @@ impl PushPool {
)
.await
{
Ok(_) => debug!(task_id = %id, "Activation sent to worker"),
Ok(_) => {
debug!(task_id = %id, "Activation sent to worker");

if let Err(e) = store.mark_activation_sent(&id).await {
error!(
task_id = %id,
error = ?e,
"Failed to mark activation as sent after push"
);
}
}

// Once processing deadline expires, status will be set back to pending
Err(e) => error!(
Expand All @@ -185,7 +201,17 @@ impl PushPool {
)
.await
{
Ok(_) => debug!(task_id = %id, "Activation sent to worker"),
Ok(_) => {
debug!(task_id = %id, "Activation sent to worker");

if let Err(e) = store.mark_activation_sent(&id).await {
error!(
task_id = %id,
error = ?e,
"Failed to mark activation as sent after push"
);
}
}

// Once processing deadline expires, status will be set back to pending
Err(e) => error!(
Expand Down
8 changes: 5 additions & 3 deletions src/push/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tonic::async_trait;

use super::*;
use crate::config::Config;
use crate::test_utils::make_activations;
use crate::test_utils::{create_test_store, make_activations};

/// Fake worker client for unit testing.
struct MockWorkerClient {
Expand Down Expand Up @@ -117,7 +117,8 @@ async fn push_pool_submit_enqueues_item() {
..Config::default()
});

let pool = PushPool::new(config);
let store = create_test_store("sqlite").await;
let pool = PushPool::new(config, store);
let activation = make_activations(1).remove(0);

let result = pool.submit(activation).await;
Expand All @@ -131,7 +132,8 @@ async fn push_pool_submit_backpressures_when_queue_full() {
..Config::default()
});

let pool = PushPool::new(config);
let store = create_test_store("sqlite").await;
let pool = PushPool::new(config, store);

let first = make_activations(1).remove(0);
let second = make_activations(1).remove(0);
Expand Down
Loading
Loading