diff --git a/apps/labrinth/src/lib.rs b/apps/labrinth/src/lib.rs index 4b6f484a95..776ffa9255 100644 --- a/apps/labrinth/src/lib.rs +++ b/apps/labrinth/src/lib.rs @@ -19,6 +19,7 @@ use crate::database::{PgPool, ReadOnlyPgPool}; use crate::env::ENV; use crate::queue::billing::{index_billing, index_subscriptions}; use crate::queue::moderation::AutomatedModerationQueue; +use crate::routes::internal::delphi::rescan::rescan_projects_in_queue; use crate::util::anrok; use crate::util::archon::ArchonClient; use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters}; @@ -102,6 +103,15 @@ pub fn app_setup( let scheduler = scheduler::Scheduler::new(); + { + let pool_ref = pool.clone(); + actix_rt::spawn(async move { + if let Err(err) = rescan_projects_in_queue(&pool_ref).await { + warn!("Delphi rescan failed: {err:#}"); + } + }); + } + let limiter = web::Data::new(AsyncRateLimiter::new( redis_pool.clone(), GCRAParameters::new(300, 300), diff --git a/apps/labrinth/src/routes/internal/delphi.rs b/apps/labrinth/src/routes/internal/delphi/mod.rs similarity index 95% rename from apps/labrinth/src/routes/internal/delphi.rs rename to apps/labrinth/src/routes/internal/delphi/mod.rs index 20c051ca60..579a1b9d14 100644 --- a/apps/labrinth/src/routes/internal/delphi.rs +++ b/apps/labrinth/src/routes/internal/delphi/mod.rs @@ -34,6 +34,8 @@ use crate::{ util::{error::Context, guards::admin_key_guard}, }; +pub mod rescan; + pub fn config(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("delphi") @@ -197,7 +199,10 @@ async fn ingest_report_deserialized( report.send_to_slack(&pool, &redis).await.ok(); - let mut transaction = pool.begin().await?; + let mut transaction = pool + .begin() + .await + .wrap_internal_err("failed to begin Delphi ingest transaction")?; let report_id = DBDelphiReport { id: DelphiReportId(0), // This will be set by the database @@ -208,7 +213,8 @@ async fn ingest_report_deserialized( severity: report.severity, } .upsert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to upsert Delphi report")?; info!( num_issues = %report.issues.len(), @@ -293,7 +299,8 @@ async fn ingest_report_deserialized( issue_type: "__dummy".into(), } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to insert dummy Delphi report issue")?; ReportIssueDetail { id: DelphiReportIssueDetailsId(0), // This will be set by the database @@ -307,7 +314,10 @@ async fn ingest_report_deserialized( status: DelphiStatus::Pending, } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err( + "failed to insert dummy Delphi report issue detail", + )?; } for (issue_type, issue_details) in report.issues { @@ -317,11 +327,13 @@ async fn ingest_report_deserialized( issue_type, } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to insert Delphi report issue")?; // This is required to handle the case where the same Delphi version is re-run on the same file ReportIssueDetail::remove_all_by_issue_id(issue_id, &mut transaction) - .await?; + .await + .wrap_internal_err("failed to remove old Delphi issue details")?; for issue_detail in issue_details { let decompiled_source = @@ -339,11 +351,15 @@ async fn ingest_report_deserialized( status: DelphiStatus::Pending, } .insert(&mut transaction) - .await?; + .await + .wrap_internal_err("failed to insert Delphi issue detail")?; } } - transaction.commit().await?; + transaction + .commit() + .await + .wrap_internal_err("failed to commit Delphi ingest transaction")?; Ok(()) } diff --git a/apps/labrinth/src/routes/internal/delphi/rescan.rs b/apps/labrinth/src/routes/internal/delphi/rescan.rs new file mode 100644 index 0000000000..d14e51dfc4 --- /dev/null +++ b/apps/labrinth/src/routes/internal/delphi/rescan.rs @@ -0,0 +1,168 @@ +use eyre::{Result, WrapErr, eyre}; +use futures::future::try_join_all; +use tracing::info; + +use super::{DELPHI_CLIENT, DelphiRunParameters}; +use crate::{database::PgPool, env::ENV, models::ids::FileId}; + +pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> { + let delphi_version = fetch_delphi_version().await?; + let old_delphi_version = fetch_stored_delphi_version(pool).await?; + + if old_delphi_version == Some(delphi_version) { + info!( + ?delphi_version, + "Delphi version unchanged; skipping startup tech review rescan" + ); + return Ok(()); + } + + info!( + ?old_delphi_version, + ?delphi_version, + delphi_version, + "Delphi version changed; rescanning tech review queue" + ); + + let project_ids = fetch_unreviewed_tech_review_project_ids(pool).await?; + if project_ids.is_empty() { + info!("No fully unreviewed tech review projects found to rescan"); + return Ok(()); + } + + let file_ids = fetch_project_file_ids(pool, &project_ids).await?; + if file_ids.is_empty() { + info!( + project_count = project_ids.len(), + "No files found for tech review projects selected for rescan" + ); + return Ok(()); + } + + let file_ids = file_ids + .into_iter() + .map(|file_id| FileId(file_id.cast_unsigned())); + + // if we don't delete reports now, when we insert the new reports, + // they will conflict with the existing ones + delete_project_reports(pool, &project_ids).await?; + + try_join_all(file_ids.map(|file_id| async move { + super::run(pool, DelphiRunParameters { file_id }) + .await + .wrap_err_with(|| { + eyre!("failed to submit Delphi rescan for `{file_id:?}`") + }) + })) + .await?; + + info!( + project_count = project_ids.len(), + "Submitted Delphi rescans for all unreviewed tech review project files" + ); + + Ok(()) +} + +async fn fetch_delphi_version() -> Result { + let response = DELPHI_CLIENT + .get(format!("{}/version", ENV.DELPHI_URL)) + .send() + .await + .and_then(|res| res.error_for_status()) + .wrap_err("failed to fetch Delphi version")?; + + let version = response + .text() + .await + .wrap_err("failed to read Delphi version response body")?; + let version = version.trim().parse::().wrap_err_with(|| { + eyre!("invalid Delphi version response body: {version}") + })?; + Ok(version) +} + +async fn fetch_stored_delphi_version(pool: &PgPool) -> Result> { + sqlx::query_scalar::<_, Option>( + "SELECT MAX(delphi_version) FROM delphi_reports", + ) + .fetch_one(pool) + .await + .wrap_err("failed to fetch latest stored Delphi version") +} + +async fn fetch_unreviewed_tech_review_project_ids( + pool: &PgPool, +) -> Result> { + sqlx::query_scalar::<_, i64>( + r#" + SELECT DISTINCT m.id + FROM mods m + WHERE + EXISTS( + SELECT 1 + FROM delphi_issue_details_with_statuses didws + INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id + WHERE + didws.project_id = m.id + AND didws.status = 'pending' + -- see delphi.rs todo comment + AND dri.issue_type != '__dummy' + ) + AND NOT EXISTS( + SELECT 1 + FROM delphi_issue_details_with_statuses didws + INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id + WHERE + didws.project_id = m.id + AND didws.status IN ('safe', 'unsafe') + -- see delphi.rs todo comment + AND dri.issue_type != '__dummy' + ) + "#, + ) + .fetch_all(pool) + .await + .wrap_err("failed to fetch fully unreviewed tech review project ids") +} + +async fn fetch_project_file_ids( + pool: &PgPool, + project_ids: &[i64], +) -> Result> { + sqlx::query_scalar::<_, i64>( + r#" + SELECT DISTINCT dr.file_id + FROM delphi_reports dr + INNER JOIN files f ON f.id = dr.file_id + INNER JOIN versions v ON v.id = f.version_id + WHERE v.mod_id = ANY($1::bigint[]) + "#, + ) + .bind(project_ids) + .fetch_all(pool) + .await + .wrap_err("failed to fetch file ids for tech review Delphi rescan") +} + +async fn delete_project_reports( + pool: &PgPool, + project_ids: &[i64], +) -> Result<()> { + sqlx::query( + r#" + DELETE FROM delphi_reports dr + USING files f, versions v + WHERE + dr.file_id = f.id + AND f.version_id = v.id + AND v.mod_id = ANY($1::bigint[]) + "#, + ) + .bind(project_ids) + .execute(pool) + .await + .wrap_err("failed to delete existing Delphi reports before rescan")?; + + Ok(()) +}