Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/labrinth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::database::{PgPool, ReadOnlyPgPool};
use crate::env::ENV;
use crate::queue::billing::{index_billing, index_subscriptions};
use crate::queue::moderation::AutomatedModerationQueue;
use crate::routes::internal::delphi::rescan::rescan_projects_in_queue;
use crate::util::anrok;
use crate::util::archon::ArchonClient;
use crate::util::ratelimit::{AsyncRateLimiter, GCRAParameters};
Expand Down Expand Up @@ -102,6 +103,15 @@ pub fn app_setup(

let scheduler = scheduler::Scheduler::new();

{
let pool_ref = pool.clone();
actix_rt::spawn(async move {
if let Err(err) = rescan_projects_in_queue(&pool_ref).await {
warn!("Delphi rescan failed: {err:#}");
}
});
}

let limiter = web::Data::new(AsyncRateLimiter::new(
redis_pool.clone(),
GCRAParameters::new(300, 300),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::{
util::{error::Context, guards::admin_key_guard},
};

pub mod rescan;

pub fn config(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("delphi")
Expand Down Expand Up @@ -197,7 +199,10 @@ async fn ingest_report_deserialized(

report.send_to_slack(&pool, &redis).await.ok();

let mut transaction = pool.begin().await?;
let mut transaction = pool
.begin()
.await
.wrap_internal_err("failed to begin Delphi ingest transaction")?;

let report_id = DBDelphiReport {
id: DelphiReportId(0), // This will be set by the database
Expand All @@ -208,7 +213,8 @@ async fn ingest_report_deserialized(
severity: report.severity,
}
.upsert(&mut transaction)
.await?;
.await
.wrap_internal_err("failed to upsert Delphi report")?;

info!(
num_issues = %report.issues.len(),
Expand Down Expand Up @@ -293,7 +299,8 @@ async fn ingest_report_deserialized(
issue_type: "__dummy".into(),
}
.insert(&mut transaction)
.await?;
.await
.wrap_internal_err("failed to insert dummy Delphi report issue")?;

ReportIssueDetail {
id: DelphiReportIssueDetailsId(0), // This will be set by the database
Expand All @@ -307,7 +314,10 @@ async fn ingest_report_deserialized(
status: DelphiStatus::Pending,
}
.insert(&mut transaction)
.await?;
.await
.wrap_internal_err(
"failed to insert dummy Delphi report issue detail",
)?;
}

for (issue_type, issue_details) in report.issues {
Expand All @@ -317,11 +327,13 @@ async fn ingest_report_deserialized(
issue_type,
}
.insert(&mut transaction)
.await?;
.await
.wrap_internal_err("failed to insert Delphi report issue")?;

// This is required to handle the case where the same Delphi version is re-run on the same file
ReportIssueDetail::remove_all_by_issue_id(issue_id, &mut transaction)
.await?;
.await
.wrap_internal_err("failed to remove old Delphi issue details")?;

for issue_detail in issue_details {
let decompiled_source =
Expand All @@ -339,11 +351,15 @@ async fn ingest_report_deserialized(
status: DelphiStatus::Pending,
}
.insert(&mut transaction)
.await?;
.await
.wrap_internal_err("failed to insert Delphi issue detail")?;
}
}

transaction.commit().await?;
transaction
.commit()
.await
.wrap_internal_err("failed to commit Delphi ingest transaction")?;

Ok(())
}
Expand Down
168 changes: 168 additions & 0 deletions apps/labrinth/src/routes/internal/delphi/rescan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use eyre::{Result, WrapErr, eyre};
use futures::future::try_join_all;
use tracing::info;

use super::{DELPHI_CLIENT, DelphiRunParameters};
use crate::{database::PgPool, env::ENV, models::ids::FileId};

pub async fn rescan_projects_in_queue(pool: &PgPool) -> Result<()> {
let delphi_version = fetch_delphi_version().await?;
let old_delphi_version = fetch_stored_delphi_version(pool).await?;

if old_delphi_version == Some(delphi_version) {
info!(
?delphi_version,
"Delphi version unchanged; skipping startup tech review rescan"
);
return Ok(());
}

info!(
?old_delphi_version,
?delphi_version,
delphi_version,
"Delphi version changed; rescanning tech review queue"
);

let project_ids = fetch_unreviewed_tech_review_project_ids(pool).await?;
if project_ids.is_empty() {
info!("No fully unreviewed tech review projects found to rescan");
return Ok(());
}

let file_ids = fetch_project_file_ids(pool, &project_ids).await?;
if file_ids.is_empty() {
info!(
project_count = project_ids.len(),
"No files found for tech review projects selected for rescan"
);
return Ok(());
}

let file_ids = file_ids
.into_iter()
.map(|file_id| FileId(file_id.cast_unsigned()));

// if we don't delete reports now, when we insert the new reports,
// they will conflict with the existing ones
delete_project_reports(pool, &project_ids).await?;

try_join_all(file_ids.map(|file_id| async move {
super::run(pool, DelphiRunParameters { file_id })
.await
.wrap_err_with(|| {
eyre!("failed to submit Delphi rescan for `{file_id:?}`")
})
}))
.await?;

info!(
project_count = project_ids.len(),
"Submitted Delphi rescans for all unreviewed tech review project files"
);

Ok(())
}

async fn fetch_delphi_version() -> Result<i32> {
let response = DELPHI_CLIENT
.get(format!("{}/version", ENV.DELPHI_URL))
.send()
.await
.and_then(|res| res.error_for_status())
.wrap_err("failed to fetch Delphi version")?;

let version = response
.text()
.await
.wrap_err("failed to read Delphi version response body")?;
let version = version.trim().parse::<i32>().wrap_err_with(|| {
eyre!("invalid Delphi version response body: {version}")
})?;
Ok(version)
}

async fn fetch_stored_delphi_version(pool: &PgPool) -> Result<Option<i32>> {
sqlx::query_scalar::<_, Option<i32>>(
"SELECT MAX(delphi_version) FROM delphi_reports",
)
.fetch_one(pool)
.await
.wrap_err("failed to fetch latest stored Delphi version")
}

async fn fetch_unreviewed_tech_review_project_ids(
pool: &PgPool,
) -> Result<Vec<i64>> {
sqlx::query_scalar::<_, i64>(
r#"
SELECT DISTINCT m.id
FROM mods m
WHERE
EXISTS(
SELECT 1
FROM delphi_issue_details_with_statuses didws
INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id
WHERE
didws.project_id = m.id
AND didws.status = 'pending'
-- see delphi.rs todo comment
AND dri.issue_type != '__dummy'
)
AND NOT EXISTS(
SELECT 1
FROM delphi_issue_details_with_statuses didws
INNER JOIN delphi_report_issues dri ON dri.id = didws.issue_id
WHERE
didws.project_id = m.id
AND didws.status IN ('safe', 'unsafe')
-- see delphi.rs todo comment
AND dri.issue_type != '__dummy'
)
"#,
)
.fetch_all(pool)
.await
.wrap_err("failed to fetch fully unreviewed tech review project ids")
}

async fn fetch_project_file_ids(
pool: &PgPool,
project_ids: &[i64],
) -> Result<Vec<i64>> {
sqlx::query_scalar::<_, i64>(
r#"
SELECT DISTINCT dr.file_id
FROM delphi_reports dr
INNER JOIN files f ON f.id = dr.file_id
INNER JOIN versions v ON v.id = f.version_id
WHERE v.mod_id = ANY($1::bigint[])
"#,
)
.bind(project_ids)
.fetch_all(pool)
.await
.wrap_err("failed to fetch file ids for tech review Delphi rescan")
}

async fn delete_project_reports(
pool: &PgPool,
project_ids: &[i64],
) -> Result<()> {
sqlx::query(
r#"
DELETE FROM delphi_reports dr
USING files f, versions v
WHERE
dr.file_id = f.id
AND f.version_id = v.id
AND v.mod_id = ANY($1::bigint[])
"#,
)
.bind(project_ids)
.execute(pool)
.await
.wrap_err("failed to delete existing Delphi reports before rescan")?;

Ok(())
}