From 9d1b50bc2f8c64833833c805f5ecd80e7b7e5981 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 15:16:04 +0200 Subject: [PATCH 01/23] add aws-sdk-sqs --- Cargo.lock | 230 +++++++++++++++++++++----- crates/bin/docs_rs_watcher/Cargo.toml | 1 + 2 files changed, 189 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c049e144..b1ceb81dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,7 +486,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http 0.63.6", - "aws-smithy-json 0.62.5", + "aws-smithy-json 0.62.6", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -536,9 +536,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.3" +version = "1.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dcd93c82209ac7413532388067dce79be5a8780c1786e5fae3df22e4dee2864" +checksum = "77ed8e8c52d2dc2390ad9f15647fe663f71e9780b4262c190fbb823a32721566" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -596,6 +596,30 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.99.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce61cf7e451891862a315dc96e1dbeb5e6a6f3740b354b5243217602b7e437b" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.63.6", + "aws-smithy-json 0.62.6", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.0", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.103.0" @@ -606,7 +630,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http 0.63.6", - "aws-smithy-json 0.62.5", + "aws-smithy-json 0.62.6", "aws-smithy-observability", "aws-smithy-query", "aws-smithy-runtime", @@ -623,9 +647,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68dc0b907359b120170613b5c09ccc61304eac3998ff6274b97d93ee6490115a" +checksum = "b7083fb918b38474ac65ffbf8a69fc8792d36879f4ac5f1667b43aec61efe9a5" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -738,17 +762,23 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "h2", + "h2 0.3.27", + "h2 0.4.14", + "http 0.2.12", "http 1.4.0", - "hyper", - "hyper-rustls", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper 1.9.0", + "hyper-rustls 0.24.2", + "hyper-rustls 0.27.9", "hyper-util", "pin-project-lite", - "rustls", + "rustls 0.21.12", + "rustls 0.23.40", "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower", "tracing", ] @@ -764,10 +794,12 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.5" +version = "0.62.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +checksum = "517089205f18ab4adc5a3e02888cb139bbbbb2e168eac9f396216925d1fbeaf5" dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", ] @@ -792,15 +824,16 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.11.1" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0504b1ab12debb5959e5165ee5fe97dd387e7aa7ea6a477bfd7635dfe769a4f5" +checksum = "b8e6f5caf6fea86f8c2206541ab5857cfcda9013426cdbe8fa0098b9e2d32182" dependencies = [ "aws-smithy-async", "aws-smithy-http 0.63.6", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "bytes", "fastrand", @@ -817,9 +850,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71a13df6ada0aafbf21a73bdfcdf9324cfa9df77d96b8446045be3cde61b42e" +checksum = "dc117c179ecf39a62a0a3f49f600e9ac26a7ad7dd172177999f83933af776c32" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api-macros", @@ -844,11 +877,22 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "aws-smithy-schema" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7442cb268338f0eb8278140a107c046756aa01093d8ef5e99628d34ae09c94f5" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 1.4.0", +] + [[package]] name = "aws-smithy-types" -version = "1.4.7" +version = "1.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" +checksum = "056b66dbce2f81cc0c1e2b05bb402eb58f8a3530479d650efadd5bbae9a4050b" dependencies = [ "base64-simd", "bytes", @@ -891,13 +935,14 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.15" +version = "1.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4bbcaa9304ea40902d3d5f42a0428d1bd895a2b0f6999436fb279ffddc58ac" +checksum = "d16bf10b03a3c01e6b3b7d47cd964e873ffe9e7d4e80fad16bd4c077cb068531" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "rustc_version", "tracing", @@ -917,7 +962,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.9.0", "hyper-util", "itoa 1.0.18", "matchit", @@ -2445,6 +2490,7 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-sdk-sqs", "clap", "crates-index", "crates-index-diff", @@ -3946,6 +3992,25 @@ dependencies = [ "phf 0.11.3", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.14.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.14" @@ -4241,6 +4306,30 @@ dependencies = [ "typenum", ] +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa 1.0.18", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.9.0" @@ -4251,7 +4340,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", + "h2 0.4.14", "http 1.4.0", "http-body 1.0.1", "httparse", @@ -4263,6 +4352,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.9" @@ -4270,12 +4374,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ "http 1.4.0", - "hyper", + "hyper 1.9.0", "hyper-util", - "rustls", + "rustls 0.23.40", "rustls-native-certs", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.4", "tower-service", ] @@ -4285,7 +4389,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.9.0", "hyper-util", "pin-project-lite", "tokio", @@ -4300,7 +4404,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.9.0", "hyper-util", "native-tls", "tokio", @@ -4320,7 +4424,7 @@ dependencies = [ "futures-util", "http 1.4.0", "http-body 1.0.1", - "hyper", + "hyper 1.9.0", "ipnet", "libc", "percent-encoding", @@ -4980,7 +5084,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.9.0", "hyper-util", "log", "pin-project-lite", @@ -6025,7 +6129,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls", + "rustls 0.23.40", "socket2 0.6.3", "thiserror", "tokio", @@ -6046,7 +6150,7 @@ dependencies = [ "rand 0.9.4", "ring", "rustc-hash", - "rustls", + "rustls 0.23.40", "rustls-pki-types", "slab", "thiserror", @@ -6336,12 +6440,12 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.4.14", "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper", - "hyper-rustls", + "hyper 1.9.0", + "hyper-rustls 0.27.9", "hyper-tls", "hyper-util", "js-sys", @@ -6351,7 +6455,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls", + "rustls 0.23.40", "rustls-pki-types", "rustls-platform-verifier", "serde", @@ -6359,7 +6463,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.4", "tokio-util", "tower", "tower-http", @@ -6445,6 +6549,18 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.40" @@ -6454,7 +6570,7 @@ dependencies = [ "aws-lc-rs", "once_cell", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.13", "subtle", "zeroize", ] @@ -6492,10 +6608,10 @@ dependencies = [ "jni", "log", "once_cell", - "rustls", + "rustls 0.23.40", "rustls-native-certs", "rustls-platform-verifier-android", - "rustls-webpki", + "rustls-webpki 0.103.13", "security-framework", "security-framework-sys", "webpki-root-certs", @@ -6508,6 +6624,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.13" @@ -6611,6 +6737,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "3.7.0" @@ -7738,13 +7874,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls", + "rustls 0.23.40", "tokio", ] @@ -7823,7 +7969,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.9.0", "hyper-timeout", "hyper-util", "percent-encoding", diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 0b0f9ba6a..e0f80e245 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-sdk-sqs = "1.99.0" clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } From fddeb9a511f039ab0412e83738d25d253cd5f1f5 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 15:40:36 +0200 Subject: [PATCH 02/23] add shared subcrate for event types --- Cargo.lock | 13 +- crates/lib/docs_rs_crates_io/Cargo.toml | 19 +++ crates/lib/docs_rs_crates_io/src/events.rs | 184 +++++++++++++++++++++ crates/lib/docs_rs_crates_io/src/lib.rs | 1 + 4 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 crates/lib/docs_rs_crates_io/Cargo.toml create mode 100644 crates/lib/docs_rs_crates_io/src/events.rs create mode 100644 crates/lib/docs_rs_crates_io/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b1ceb81dd..7dc2708d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2152,6 +2152,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "docs_rs_crates_io" +version = "0.1.0" +dependencies = [ + "semver", + "serde", + "serde_json", +] + [[package]] name = "docs_rs_database" version = "0.0.0" @@ -6988,9 +6997,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa 1.0.18", "memchr", diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..d10606ee6 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +serde = { version = "1.0.228", features = ["derive"] } +semver = { version = "1.0.28", features = ["serde"] } + +[dev-dependencies] +serde_json = "1.0.150" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..41a81dd17 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,184 @@ +#![allow(clippy::disallowed_types)] + +use std::fmt; + +/// Identify a kind of change that occurred to a crate +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum Change { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl Change { + /// Return the added crate, if this is this kind of change. + pub fn added(&self) -> Option<&CrateVersion> { + match self { + Change::Added(v) => Some(v), + _ => None, + } + } + + /// Return the yanked crate, if this is this kind of change. + pub fn yanked(&self) -> Option<&CrateVersion> { + match self { + Change::Yanked(v) => Some(v), + _ => None, + } + } + + /// Return the unyanked crate, if this is this kind of change. + pub fn unyanked(&self) -> Option<&CrateVersion> { + match self { + Change::Unyanked(v) => Some(v), + _ => None, + } + } + + /// Return the deleted crate, if this is this kind of change. + pub fn crate_deleted(&self) -> Option<&str> { + match self { + Change::CrateDeleted { name, .. } => Some(name.as_str()), + _ => None, + } + } + + /// Return the deleted version crate, if this is this kind of change. + pub fn version_deleted(&self) -> Option<&CrateVersion> { + match self { + Change::VersionDeleted(v) => Some(v), + _ => None, + } + } +} + +impl fmt::Display for Change { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match *self { + Change::Added(_) => "added", + Change::Yanked(_) => "yanked", + Change::CrateDeleted { .. } => "crate deleted", + Change::VersionDeleted(_) => "version deleted", + Change::Unyanked(_) => "unyanked", + } + ) + } +} + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// is the release yanked? + pub yanked: bool, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: semver::Version, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + yanked: false, + version: semver::Version::new(4, 5, 0), + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "yanked": false, + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + Change::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + Change::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + Change::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + Change::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + Change::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events; From 87c438435e9996c40292844b35e68f542eb2488f Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 16:14:48 +0200 Subject: [PATCH 03/23] feat(events): add event envelope metadata Wrap typed change payloads in a conventional event envelope with id, occurred_at, source, and schema_version. --- crates/lib/docs_rs_crates_io/src/events.rs | 47 ++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 41a81dd17..d8eb10074 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -76,6 +76,22 @@ impl fmt::Display for Change { } } +/// A conventional event envelope for crate index changes. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the underlying change occurred, as an RFC 3339 string. + pub occurred_at: String, + /// System that emitted the event. + pub source: String, + /// Version of the serialized event schema. + pub schema_version: u32, + /// The typed change payload. + #[serde(flatten)] + pub change: Change, +} + /// Pack all information we know about a change made to a version of a crate. #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] pub struct CrateVersion { @@ -101,6 +117,16 @@ mod tests { } } + fn event(change: Change) -> Event { + Event { + id: "evt_123".into(), + occurred_at: "2026-05-22T12:34:56Z".into(), + source: "crates-index".into(), + schema_version: 1, + change, + } + } + #[test] fn crate_version_serializes_with_vers_field() { let event = crate_version(); @@ -181,4 +207,25 @@ mod tests { assert_eq!(serde_json::to_value(&event).unwrap(), expected); } } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(Change::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "source": "crates-index", + "schema_version": 1, + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } } From b7403da2686a3c44caf5ce4e0927f8bc2b63d8b8 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 16:25:14 +0200 Subject: [PATCH 04/23] refactor(events): version event payload types Rename the current wire payload to ChangeV1 and make the event envelope generic for future schema versions. --- crates/lib/docs_rs_crates_io/src/events.rs | 49 ++++++++++++---------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index d8eb10074..d2cd795d2 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -5,7 +5,7 @@ use std::fmt; /// Identify a kind of change that occurred to a crate #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] -pub enum Change { +pub enum ChangeV1 { /// A crate version was added. Added(CrateVersion), /// A crate version was unyanked. @@ -18,11 +18,11 @@ pub enum Change { VersionDeleted(CrateVersion), } -impl Change { +impl ChangeV1 { /// Return the added crate, if this is this kind of change. pub fn added(&self) -> Option<&CrateVersion> { match self { - Change::Added(v) => Some(v), + ChangeV1::Added(v) => Some(v), _ => None, } } @@ -30,7 +30,7 @@ impl Change { /// Return the yanked crate, if this is this kind of change. pub fn yanked(&self) -> Option<&CrateVersion> { match self { - Change::Yanked(v) => Some(v), + ChangeV1::Yanked(v) => Some(v), _ => None, } } @@ -38,7 +38,7 @@ impl Change { /// Return the unyanked crate, if this is this kind of change. pub fn unyanked(&self) -> Option<&CrateVersion> { match self { - Change::Unyanked(v) => Some(v), + ChangeV1::Unyanked(v) => Some(v), _ => None, } } @@ -46,7 +46,7 @@ impl Change { /// Return the deleted crate, if this is this kind of change. pub fn crate_deleted(&self) -> Option<&str> { match self { - Change::CrateDeleted { name, .. } => Some(name.as_str()), + ChangeV1::CrateDeleted { name, .. } => Some(name.as_str()), _ => None, } } @@ -54,23 +54,23 @@ impl Change { /// Return the deleted version crate, if this is this kind of change. pub fn version_deleted(&self) -> Option<&CrateVersion> { match self { - Change::VersionDeleted(v) => Some(v), + ChangeV1::VersionDeleted(v) => Some(v), _ => None, } } } -impl fmt::Display for Change { +impl fmt::Display for ChangeV1 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{}", match *self { - Change::Added(_) => "added", - Change::Yanked(_) => "yanked", - Change::CrateDeleted { .. } => "crate deleted", - Change::VersionDeleted(_) => "version deleted", - Change::Unyanked(_) => "unyanked", + ChangeV1::Added(_) => "added", + ChangeV1::Yanked(_) => "yanked", + ChangeV1::CrateDeleted { .. } => "crate deleted", + ChangeV1::VersionDeleted(_) => "version deleted", + ChangeV1::Unyanked(_) => "unyanked", } ) } @@ -78,7 +78,7 @@ impl fmt::Display for Change { /// A conventional event envelope for crate index changes. #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] -pub struct Event { +pub struct Event { /// Unique event identifier for deduplication and tracing. pub id: String, /// Timestamp when the underlying change occurred, as an RFC 3339 string. @@ -89,9 +89,12 @@ pub struct Event { pub schema_version: u32, /// The typed change payload. #[serde(flatten)] - pub change: Change, + pub change: T, } +/// The first version of the public event wire format. +pub type EventV1 = Event; + /// Pack all information we know about a change made to a version of a crate. #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] pub struct CrateVersion { @@ -117,8 +120,8 @@ mod tests { } } - fn event(change: Change) -> Event { - Event { + fn event(change: ChangeV1) -> EventV1 { + EventV1 { id: "evt_123".into(), occurred_at: "2026-05-22T12:34:56Z".into(), source: "crates-index".into(), @@ -147,7 +150,7 @@ mod tests { let cases = [ ( - Change::Added(crate_version.clone()), + ChangeV1::Added(crate_version.clone()), json!({ "type": "added", "payload": { @@ -158,7 +161,7 @@ mod tests { }), ), ( - Change::Unyanked(crate_version.clone()), + ChangeV1::Unyanked(crate_version.clone()), json!({ "type": "unyanked", "payload": { @@ -169,7 +172,7 @@ mod tests { }), ), ( - Change::Yanked(crate_version.clone()), + ChangeV1::Yanked(crate_version.clone()), json!({ "type": "yanked", "payload": { @@ -180,7 +183,7 @@ mod tests { }), ), ( - Change::CrateDeleted { + ChangeV1::CrateDeleted { name: "old-crate".into(), }, json!({ @@ -191,7 +194,7 @@ mod tests { }), ), ( - Change::VersionDeleted(crate_version), + ChangeV1::VersionDeleted(crate_version), json!({ "type": "version_deleted", "payload": { @@ -210,7 +213,7 @@ mod tests { #[test] fn event_serializes_with_minimum_metadata() { - let event = event(Change::CrateDeleted { + let event = event(ChangeV1::CrateDeleted { name: "old-crate".into(), }); From e52cae891fa7d5f1349f613d7b50faf81b5add6f Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 16:32:57 +0200 Subject: [PATCH 05/23] refactor(events): use typed event timestamps Remove event source metadata and store occurred_at as an RFC 3339 OffsetDateTime. --- Cargo.lock | 1 + crates/lib/docs_rs_crates_io/Cargo.toml | 1 + crates/lib/docs_rs_crates_io/src/events.rs | 39 ++++++++++++++++++---- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7dc2708d6..be33873ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2159,6 +2159,7 @@ dependencies = [ "semver", "serde", "serde_json", + "time", ] [[package]] diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml index d10606ee6..df404c935 100644 --- a/crates/lib/docs_rs_crates_io/Cargo.toml +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -11,6 +11,7 @@ edition.workspace = true [dependencies] serde = { version = "1.0.228", features = ["derive"] } semver = { version = "1.0.28", features = ["serde"] } +time = { version = "0.3.44", features = ["formatting", "parsing", "serde"] } [dev-dependencies] serde_json = "1.0.150" diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index d2cd795d2..34abfe92e 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -1,6 +1,7 @@ #![allow(clippy::disallowed_types)] use std::fmt; +use time::OffsetDateTime; /// Identify a kind of change that occurred to a crate #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] @@ -81,10 +82,9 @@ impl fmt::Display for ChangeV1 { pub struct Event { /// Unique event identifier for deduplication and tracing. pub id: String, - /// Timestamp when the underlying change occurred, as an RFC 3339 string. - pub occurred_at: String, - /// System that emitted the event. - pub source: String, + /// Timestamp when the underlying change occurred. + #[serde(with = "time::serde::rfc3339")] + pub occurred_at: OffsetDateTime, /// Version of the serialized event schema. pub schema_version: u32, /// The typed change payload. @@ -123,8 +123,11 @@ mod tests { fn event(change: ChangeV1) -> EventV1 { EventV1 { id: "evt_123".into(), - occurred_at: "2026-05-22T12:34:56Z".into(), - source: "crates-index".into(), + occurred_at: OffsetDateTime::parse( + "2026-05-22T12:34:56Z", + &time::format_description::well_known::Rfc3339, + ) + .unwrap(), schema_version: 1, change, } @@ -222,7 +225,6 @@ mod tests { json!({ "id": "evt_123", "occurred_at": "2026-05-22T12:34:56Z", - "source": "crates-index", "schema_version": 1, "type": "crate_deleted", "payload": { @@ -231,4 +233,27 @@ mod tests { }) ); } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: EventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "schema_version": 1, + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + OffsetDateTime::parse( + "2026-05-22T12:34:56Z", + &time::format_description::well_known::Rfc3339, + ) + .unwrap() + ); + } } From 6b053a37c239139f875a6f8a64dcc3e62947597d Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 16:35:11 +0200 Subject: [PATCH 06/23] refactor(events): use chrono timestamps Replace time::OffsetDateTime with chrono::DateTime for RFC 3339 event timestamps. --- Cargo.lock | 4 +++- crates/lib/docs_rs_crates_io/Cargo.toml | 2 +- crates/lib/docs_rs_crates_io/src/events.rs | 21 ++++++++------------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be33873ff..d696e0ebd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1290,8 +1290,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2156,10 +2158,10 @@ dependencies = [ name = "docs_rs_crates_io" version = "0.1.0" dependencies = [ + "chrono", "semver", "serde", "serde_json", - "time", ] [[package]] diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml index df404c935..a513e14ac 100644 --- a/crates/lib/docs_rs_crates_io/Cargo.toml +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -9,9 +9,9 @@ repository.workspace = true edition.workspace = true [dependencies] +chrono = { version = "0.4.42", features = ["serde"] } serde = { version = "1.0.228", features = ["derive"] } semver = { version = "1.0.28", features = ["serde"] } -time = { version = "0.3.44", features = ["formatting", "parsing", "serde"] } [dev-dependencies] serde_json = "1.0.150" diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 34abfe92e..3dcf86760 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -1,7 +1,7 @@ #![allow(clippy::disallowed_types)] +use chrono::{DateTime, Utc}; use std::fmt; -use time::OffsetDateTime; /// Identify a kind of change that occurred to a crate #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] @@ -83,8 +83,7 @@ pub struct Event { /// Unique event identifier for deduplication and tracing. pub id: String, /// Timestamp when the underlying change occurred. - #[serde(with = "time::serde::rfc3339")] - pub occurred_at: OffsetDateTime, + pub occurred_at: DateTime, /// Version of the serialized event schema. pub schema_version: u32, /// The typed change payload. @@ -123,11 +122,9 @@ mod tests { fn event(change: ChangeV1) -> EventV1 { EventV1 { id: "evt_123".into(), - occurred_at: OffsetDateTime::parse( - "2026-05-22T12:34:56Z", - &time::format_description::well_known::Rfc3339, - ) - .unwrap(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), schema_version: 1, change, } @@ -249,11 +246,9 @@ mod tests { assert_eq!( event.occurred_at, - OffsetDateTime::parse( - "2026-05-22T12:34:56Z", - &time::format_description::well_known::Rfc3339, - ) - .unwrap() + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) ); } } From 0b7dd8da0419b32bbc47c840b9802582e7dfd2df Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 17:03:56 +0200 Subject: [PATCH 07/23] wider deps --- crates/lib/docs_rs_crates_io/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml index a513e14ac..96b373c89 100644 --- a/crates/lib/docs_rs_crates_io/Cargo.toml +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -9,12 +9,12 @@ repository.workspace = true edition.workspace = true [dependencies] -chrono = { version = "0.4.42", features = ["serde"] } -serde = { version = "1.0.228", features = ["derive"] } -semver = { version = "1.0.28", features = ["serde"] } +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1", features = ["derive"] } +semver = { version = "1", features = ["serde"] } [dev-dependencies] -serde_json = "1.0.150" +serde_json = "1.0" [lints] workspace = true From 277ea22f63faa20ff8cf8e141da148913effdf24 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 18:33:43 +0200 Subject: [PATCH 08/23] fix(watcher): make version delete idempotent Treat duplicate version deletion events as a no-op so temporary event-based handling can safely replay them. --- crates/bin/docs_rs_watcher/src/db/delete.rs | 36 +++++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index dbfa0e58e..742bbb634 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -170,14 +170,18 @@ async fn delete_version_from_database( format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)").as_str()) .bind(crate_id).bind(version).execute(&mut *transaction).await?; } - let is_library: bool = sqlx::query_scalar!( + let Some(is_library) = sqlx::query_scalar!( "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", crate_id.0, version as _, ) - .fetch_one(&mut *transaction) + .fetch_optional(&mut *transaction) .await? - .unwrap_or(false); + else { + transaction.commit().await?; + return Ok(false); + }; + let is_library = is_library.unwrap_or(false); sqlx::query!( "DELETE FROM queue WHERE name = $1 AND version = $2;", @@ -690,6 +694,32 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread")] + async fn test_delete_already_deleted_version_doesnt_error() -> Result<()> { + let env = TestEnvironment::new().await?; + let mut conn = env.async_conn().await?; + + env.fake_release() + .await + .name(&KRATE) + .version(V1) + .create() + .await?; + env.fake_release() + .await + .name(&KRATE) + .version(V2) + .create() + .await?; + + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + delete_version(&mut conn, env.storage()?, env.config(), &KRATE, &V1).await?; + + assert!(crate_exists(&mut conn, &KRATE).await?); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread")] async fn test_delete_version_waits_for_locked_queue_rows() -> Result<()> { let env = TestEnvironment::new().await?; From cb7453f3948d00a19834c4e3f9ac2d25605ea2bc Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 18:56:01 +0200 Subject: [PATCH 09/23] feat(watcher): add SQS config Add watcher config fields for an SQS queue URL and region to support an event-based path. --- crates/bin/docs_rs_watcher/src/config.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 7b5f17976..597549f29 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -7,6 +7,8 @@ use std::{path::PathBuf, time::Duration}; pub struct Config { pub registry_index_path: PathBuf, pub registry_url: Option, + pub sqs_queue_url: Option, + pub sqs_region: Option, /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, @@ -29,6 +31,8 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, + sqs_region: maybe_env("DOCSRS_SQS_REGION")?, delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, From de207e532af1fea82d5d78718a3e4723cba48930 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 18:56:48 +0200 Subject: [PATCH 10/23] refactor(watcher): parse SQS queue URL Use url::Url for the watcher SQS queue URL config so invalid values fail during config loading. --- crates/bin/docs_rs_watcher/Cargo.toml | 1 + crates/bin/docs_rs_watcher/src/config.rs | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index e0f80e245..38eb11199 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -33,6 +33,7 @@ rayon = "1.6.1" sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +url = { workspace = true } [dev-dependencies] docs_rs_config = { path = "../../lib/docs_rs_config", features = ["testing"] } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 597549f29..404ade8f5 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -2,12 +2,13 @@ use anyhow::Result; use docs_rs_config::AppConfig; use docs_rs_env_vars::{env, maybe_env, require_env}; use std::{path::PathBuf, time::Duration}; +use url::Url; #[derive(Debug)] pub struct Config { pub registry_index_path: PathBuf, pub registry_url: Option, - pub sqs_queue_url: Option, + pub sqs_queue_url: Option, pub sqs_region: Option, /// How long to wait between registry checks From b311a599d391230dc6d882de28b9089ec9b5bd6b Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Fri, 22 May 2026 18:57:05 +0200 Subject: [PATCH 11/23] chore(lockfile): record watcher url dep Update Cargo.lock after making docs_rs_watcher depend directly on url. --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index d696e0ebd..dfa12b8ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2529,6 +2529,7 @@ dependencies = [ "test-case", "tokio", "tracing", + "url", ] [[package]] From e84c343e18c6b8d7c9aaebe9c93d18800f44572c Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 23 May 2026 08:15:59 +0200 Subject: [PATCH 12/23] refactor(events): drop schema version Remove the redundant schema_version field from the crates.io event envelope and keep versioning in the typed payloads. --- crates/lib/docs_rs_crates_io/src/events.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 3dcf86760..5ba9b4abc 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -84,8 +84,6 @@ pub struct Event { pub id: String, /// Timestamp when the underlying change occurred. pub occurred_at: DateTime, - /// Version of the serialized event schema. - pub schema_version: u32, /// The typed change payload. #[serde(flatten)] pub change: T, @@ -125,7 +123,6 @@ mod tests { occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") .unwrap() .with_timezone(&Utc), - schema_version: 1, change, } } @@ -222,7 +219,6 @@ mod tests { json!({ "id": "evt_123", "occurred_at": "2026-05-22T12:34:56Z", - "schema_version": 1, "type": "crate_deleted", "payload": { "name": "old-crate" @@ -236,7 +232,6 @@ mod tests { let event: EventV1 = serde_json::from_value(json!({ "id": "evt_123", "occurred_at": "2026-05-22T12:34:56Z", - "schema_version": 1, "type": "crate_deleted", "payload": { "name": "old-crate" From 1e374cf4bfcfd5bdaf51ef8c8bc91e6e063f2b8b Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 23 May 2026 08:17:50 +0200 Subject: [PATCH 13/23] renames --- crates/lib/docs_rs_crates_io/src/events.rs | 52 +++++++++++----------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 5ba9b4abc..46f43b11a 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -6,7 +6,7 @@ use std::fmt; /// Identify a kind of change that occurred to a crate #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] -pub enum ChangeV1 { +pub enum IndexChangeV1 { /// A crate version was added. Added(CrateVersion), /// A crate version was unyanked. @@ -19,11 +19,11 @@ pub enum ChangeV1 { VersionDeleted(CrateVersion), } -impl ChangeV1 { +impl IndexChangeV1 { /// Return the added crate, if this is this kind of change. pub fn added(&self) -> Option<&CrateVersion> { match self { - ChangeV1::Added(v) => Some(v), + IndexChangeV1::Added(v) => Some(v), _ => None, } } @@ -31,7 +31,7 @@ impl ChangeV1 { /// Return the yanked crate, if this is this kind of change. pub fn yanked(&self) -> Option<&CrateVersion> { match self { - ChangeV1::Yanked(v) => Some(v), + IndexChangeV1::Yanked(v) => Some(v), _ => None, } } @@ -39,7 +39,7 @@ impl ChangeV1 { /// Return the unyanked crate, if this is this kind of change. pub fn unyanked(&self) -> Option<&CrateVersion> { match self { - ChangeV1::Unyanked(v) => Some(v), + IndexChangeV1::Unyanked(v) => Some(v), _ => None, } } @@ -47,7 +47,7 @@ impl ChangeV1 { /// Return the deleted crate, if this is this kind of change. pub fn crate_deleted(&self) -> Option<&str> { match self { - ChangeV1::CrateDeleted { name, .. } => Some(name.as_str()), + IndexChangeV1::CrateDeleted { name, .. } => Some(name.as_str()), _ => None, } } @@ -55,42 +55,42 @@ impl ChangeV1 { /// Return the deleted version crate, if this is this kind of change. pub fn version_deleted(&self) -> Option<&CrateVersion> { match self { - ChangeV1::VersionDeleted(v) => Some(v), + IndexChangeV1::VersionDeleted(v) => Some(v), _ => None, } } } -impl fmt::Display for ChangeV1 { +impl fmt::Display for IndexChangeV1 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{}", match *self { - ChangeV1::Added(_) => "added", - ChangeV1::Yanked(_) => "yanked", - ChangeV1::CrateDeleted { .. } => "crate deleted", - ChangeV1::VersionDeleted(_) => "version deleted", - ChangeV1::Unyanked(_) => "unyanked", + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", } ) } } -/// A conventional event envelope for crate index changes. +/// A conventional event envelope for our events between crates.io & docs.rs #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] pub struct Event { /// Unique event identifier for deduplication and tracing. pub id: String, - /// Timestamp when the underlying change occurred. + /// Timestamp when the event occured pub occurred_at: DateTime, - /// The typed change payload. + /// The typed payload. #[serde(flatten)] pub change: T, } /// The first version of the public event wire format. -pub type EventV1 = Event; +pub type IndexChangeEventV1 = Event; /// Pack all information we know about a change made to a version of a crate. #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] @@ -117,8 +117,8 @@ mod tests { } } - fn event(change: ChangeV1) -> EventV1 { - EventV1 { + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { id: "evt_123".into(), occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") .unwrap() @@ -147,7 +147,7 @@ mod tests { let cases = [ ( - ChangeV1::Added(crate_version.clone()), + IndexChangeV1::Added(crate_version.clone()), json!({ "type": "added", "payload": { @@ -158,7 +158,7 @@ mod tests { }), ), ( - ChangeV1::Unyanked(crate_version.clone()), + IndexChangeV1::Unyanked(crate_version.clone()), json!({ "type": "unyanked", "payload": { @@ -169,7 +169,7 @@ mod tests { }), ), ( - ChangeV1::Yanked(crate_version.clone()), + IndexChangeV1::Yanked(crate_version.clone()), json!({ "type": "yanked", "payload": { @@ -180,7 +180,7 @@ mod tests { }), ), ( - ChangeV1::CrateDeleted { + IndexChangeV1::CrateDeleted { name: "old-crate".into(), }, json!({ @@ -191,7 +191,7 @@ mod tests { }), ), ( - ChangeV1::VersionDeleted(crate_version), + IndexChangeV1::VersionDeleted(crate_version), json!({ "type": "version_deleted", "payload": { @@ -210,7 +210,7 @@ mod tests { #[test] fn event_serializes_with_minimum_metadata() { - let event = event(ChangeV1::CrateDeleted { + let event = event(IndexChangeV1::CrateDeleted { name: "old-crate".into(), }); @@ -229,7 +229,7 @@ mod tests { #[test] fn event_deserializes_rfc3339_occurred_at() { - let event: EventV1 = serde_json::from_value(json!({ + let event: IndexChangeEventV1 = serde_json::from_value(json!({ "id": "evt_123", "occurred_at": "2026-05-22T12:34:56Z", "type": "crate_deleted", From bb4d8456ea0a98fa7151c98809d9b18f5bd89112 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 23 May 2026 08:22:02 +0200 Subject: [PATCH 14/23] some cleanup --- crates/lib/docs_rs_crates_io/src/events.rs | 44 +--------------------- 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 46f43b11a..f01933db6 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -3,7 +3,7 @@ use chrono::{DateTime, Utc}; use std::fmt; -/// Identify a kind of change that occurred to a crate +/// A change that can happen to a crate on our index. #[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] pub enum IndexChangeV1 { @@ -19,48 +19,6 @@ pub enum IndexChangeV1 { VersionDeleted(CrateVersion), } -impl IndexChangeV1 { - /// Return the added crate, if this is this kind of change. - pub fn added(&self) -> Option<&CrateVersion> { - match self { - IndexChangeV1::Added(v) => Some(v), - _ => None, - } - } - - /// Return the yanked crate, if this is this kind of change. - pub fn yanked(&self) -> Option<&CrateVersion> { - match self { - IndexChangeV1::Yanked(v) => Some(v), - _ => None, - } - } - - /// Return the unyanked crate, if this is this kind of change. - pub fn unyanked(&self) -> Option<&CrateVersion> { - match self { - IndexChangeV1::Unyanked(v) => Some(v), - _ => None, - } - } - - /// Return the deleted crate, if this is this kind of change. - pub fn crate_deleted(&self) -> Option<&str> { - match self { - IndexChangeV1::CrateDeleted { name, .. } => Some(name.as_str()), - _ => None, - } - } - - /// Return the deleted version crate, if this is this kind of change. - pub fn version_deleted(&self) -> Option<&CrateVersion> { - match self { - IndexChangeV1::VersionDeleted(v) => Some(v), - _ => None, - } - } -} - impl fmt::Display for IndexChangeV1 { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( From c0436b166914cf1326d6e7104961ea9a477e0378 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 23 May 2026 09:22:13 +0200 Subject: [PATCH 15/23] no rustls --- Cargo.lock | 158 +++++--------------------- crates/bin/docs_rs_watcher/Cargo.toml | 2 +- 2 files changed, 27 insertions(+), 133 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfa12b8ed..cf62a1150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -762,23 +762,17 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "h2 0.3.27", - "h2 0.4.14", - "http 0.2.12", + "h2", "http 1.4.0", - "http-body 0.4.6", - "hyper 0.14.32", - "hyper 1.9.0", - "hyper-rustls 0.24.2", - "hyper-rustls 0.27.9", + "hyper", + "hyper-rustls", "hyper-util", "pin-project-lite", - "rustls 0.21.12", - "rustls 0.23.40", + "rustls", "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower", "tracing", ] @@ -962,7 +956,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.9.0", + "hyper", "hyper-util", "itoa 1.0.18", "matchit", @@ -4005,25 +3999,6 @@ dependencies = [ "phf 0.11.3", ] -[[package]] -name = "h2" -version = "0.3.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.12", - "indexmap 2.14.0", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h2" version = "0.4.14" @@ -4319,30 +4294,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "hyper" -version = "0.14.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2 0.3.27", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa 1.0.18", - "pin-project-lite", - "socket2 0.5.10", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.9.0" @@ -4353,7 +4304,7 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2 0.4.14", + "h2", "http 1.4.0", "http-body 1.0.1", "httparse", @@ -4365,21 +4316,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.32", - "log", - "rustls 0.21.12", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-rustls" version = "0.27.9" @@ -4387,12 +4323,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" dependencies = [ "http 1.4.0", - "hyper 1.9.0", + "hyper", "hyper-util", - "rustls 0.23.40", + "rustls", "rustls-native-certs", "tokio", - "tokio-rustls 0.26.4", + "tokio-rustls", "tower-service", ] @@ -4402,7 +4338,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 1.9.0", + "hyper", "hyper-util", "pin-project-lite", "tokio", @@ -4417,7 +4353,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.9.0", + "hyper", "hyper-util", "native-tls", "tokio", @@ -4437,7 +4373,7 @@ dependencies = [ "futures-util", "http 1.4.0", "http-body 1.0.1", - "hyper 1.9.0", + "hyper", "ipnet", "libc", "percent-encoding", @@ -5097,7 +5033,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.9.0", + "hyper", "hyper-util", "log", "pin-project-lite", @@ -6142,7 +6078,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.40", + "rustls", "socket2 0.6.3", "thiserror", "tokio", @@ -6163,7 +6099,7 @@ dependencies = [ "rand 0.9.4", "ring", "rustc-hash", - "rustls 0.23.40", + "rustls", "rustls-pki-types", "slab", "thiserror", @@ -6453,12 +6389,12 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.4.14", + "h2", "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.9.0", - "hyper-rustls 0.27.9", + "hyper", + "hyper-rustls", "hyper-tls", "hyper-util", "js-sys", @@ -6468,7 +6404,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.40", + "rustls", "rustls-pki-types", "rustls-platform-verifier", "serde", @@ -6476,7 +6412,7 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", - "tokio-rustls 0.26.4", + "tokio-rustls", "tokio-util", "tower", "tower-http", @@ -6562,18 +6498,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - [[package]] name = "rustls" version = "0.23.40" @@ -6583,7 +6507,7 @@ dependencies = [ "aws-lc-rs", "once_cell", "rustls-pki-types", - "rustls-webpki 0.103.13", + "rustls-webpki", "subtle", "zeroize", ] @@ -6621,10 +6545,10 @@ dependencies = [ "jni", "log", "once_cell", - "rustls 0.23.40", + "rustls", "rustls-native-certs", "rustls-platform-verifier-android", - "rustls-webpki 0.103.13", + "rustls-webpki", "security-framework", "security-framework-sys", "webpki-root-certs", @@ -6637,16 +6561,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" -[[package]] -name = "rustls-webpki" -version = "0.101.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "rustls-webpki" version = "0.103.13" @@ -6750,16 +6664,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "security-framework" version = "3.7.0" @@ -7887,23 +7791,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" dependencies = [ - "rustls 0.23.40", + "rustls", "tokio", ] @@ -7982,7 +7876,7 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.9.0", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index 38eb11199..caeaefa97 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,7 +8,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } -aws-sdk-sqs = "1.99.0" +aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough crates-index = { version = "3.0.0", default-features = false, features = ["git", "git-https-reqwest", "git-performance", "parallel"] } From 65af70e4b5519015b60215c419174f746f0f9699 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 3 Jun 2026 22:36:11 +0200 Subject: [PATCH 16/23] add docs_rs_crates_io subcrate for interaction / shared types --- Cargo.lock | 12 + crates/lib/docs_rs_crates_io/Cargo.toml | 20 ++ crates/lib/docs_rs_crates_io/src/events.rs | 249 +++++++++++++++++++++ crates/lib/docs_rs_crates_io/src/lib.rs | 1 + 4 files changed, 282 insertions(+) create mode 100644 crates/lib/docs_rs_crates_io/Cargo.toml create mode 100644 crates/lib/docs_rs_crates_io/src/events.rs create mode 100644 crates/lib/docs_rs_crates_io/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b65c61ca8..a46048b74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1235,8 +1235,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2077,6 +2079,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "docs_rs_crates_io" +version = "0.1.0" +dependencies = [ + "chrono", + "semver", + "serde", + "serde_json", +] + [[package]] name = "docs_rs_database" version = "0.0.0" diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml new file mode 100644 index 000000000..c6f9224b3 --- /dev/null +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "docs_rs_crates_io" +version = "0.1.0" +description = "types & logic for the direct integration between docs.rs & crates.io" + +authors.workspace = true +license.workspace = true +repository.workspace = true +edition.workspace = true + +[dependencies] +chrono = { version = "0.4", features = ["serde"] } +semver = { version = "1", features = ["serde"] } +serde = { version = "1", features = ["derive"] } + +[dev-dependencies] +serde_json = "1.0" + +[lints] +workspace = true diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs new file mode 100644 index 000000000..12cc5170b --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -0,0 +1,249 @@ +#![allow(clippy::disallowed_types)] + +use chrono::{DateTime, Utc}; +use std::fmt; + +/// A change that can happen to a crate on our index. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +#[serde(tag = "type", content = "payload", rename_all = "snake_case")] +pub enum IndexChangeV1 { + /// A crate version was added. + Added(CrateVersion), + /// A crate version was unyanked. + Unyanked(CrateVersion), + /// A crate version was yanked. + Yanked(CrateVersion), + /// The name of the crate whose file was deleted, which implies all versions were deleted as well. + CrateDeleted { name: String }, + /// A crate version was deleted. + VersionDeleted(CrateVersion), +} + +impl IndexChangeV1 { + /// Return the added crate, if this is this kind of change. + pub fn added(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Added(v) => Some(v), + _ => None, + } + } + + /// Return the yanked crate, if this is this kind of change. + pub fn yanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Yanked(v) => Some(v), + _ => None, + } + } + + /// Return the unyanked crate, if this is this kind of change. + pub fn unyanked(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::Unyanked(v) => Some(v), + _ => None, + } + } + + /// Return the deleted crate, if this is this kind of change. + pub fn crate_deleted(&self) -> Option<&str> { + match self { + IndexChangeV1::CrateDeleted { name } => Some(name.as_str()), + _ => None, + } + } + + /// Return the deleted version crate, if this is this kind of change. + pub fn version_deleted(&self) -> Option<&CrateVersion> { + match self { + IndexChangeV1::VersionDeleted(v) => Some(v), + _ => None, + } + } +} + +impl fmt::Display for IndexChangeV1 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + match *self { + IndexChangeV1::Added(_) => "added", + IndexChangeV1::Yanked(_) => "yanked", + IndexChangeV1::CrateDeleted { .. } => "crate deleted", + IndexChangeV1::VersionDeleted(_) => "version deleted", + IndexChangeV1::Unyanked(_) => "unyanked", + } + ) + } +} + +/// A conventional event envelope for our events between crates.io & docs.rs +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct Event { + /// Unique event identifier for deduplication and tracing. + pub id: String, + /// Timestamp when the event occured + pub occurred_at: DateTime, + /// The typed payload. + #[serde(flatten)] + pub change: T, +} + +/// The first version of the public event wire format. +pub type IndexChangeEventV1 = Event; + +/// Pack all information we know about a change made to a version of a crate. +#[derive(Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)] +pub struct CrateVersion { + /// The crate name, i.e. `clap`. + pub name: String, + /// is the release yanked? + pub yanked: bool, + /// The semantic version of the crate. + #[serde(rename = "vers")] + pub version: semver::Version, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn crate_version() -> CrateVersion { + CrateVersion { + name: "clap".into(), + yanked: false, + version: semver::Version::new(4, 5, 0), + } + } + + fn event(change: IndexChangeV1) -> IndexChangeEventV1 { + IndexChangeEventV1 { + id: "evt_123".into(), + occurred_at: DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + change, + } + } + + #[test] + fn crate_version_serializes_with_vers_field() { + let event = crate_version(); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "name": "clap", + "yanked": false, + "vers": "4.5.0", + }) + ); + } + + #[test] + fn change_serializes_with_expected_variant_shapes() { + let crate_version = crate_version(); + + let cases = [ + ( + IndexChangeV1::Added(crate_version.clone()), + json!({ + "type": "added", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Unyanked(crate_version.clone()), + json!({ + "type": "unyanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::Yanked(crate_version.clone()), + json!({ + "type": "yanked", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ( + IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }, + json!({ + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }), + ), + ( + IndexChangeV1::VersionDeleted(crate_version), + json!({ + "type": "version_deleted", + "payload": { + "name": "clap", + "yanked": false, + "vers": "4.5.0", + } + }), + ), + ]; + + for (event, expected) in cases { + assert_eq!(serde_json::to_value(&event).unwrap(), expected); + } + } + + #[test] + fn event_serializes_with_minimum_metadata() { + let event = event(IndexChangeV1::CrateDeleted { + name: "old-crate".into(), + }); + + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + }) + ); + } + + #[test] + fn event_deserializes_rfc3339_occurred_at() { + let event: IndexChangeEventV1 = serde_json::from_value(json!({ + "id": "evt_123", + "occurred_at": "2026-05-22T12:34:56Z", + "type": "crate_deleted", + "payload": { + "name": "old-crate" + } + })) + .unwrap(); + + assert_eq!( + event.occurred_at, + DateTime::parse_from_rfc3339("2026-05-22T12:34:56Z") + .unwrap() + .with_timezone(&Utc) + ); + } +} diff --git a/crates/lib/docs_rs_crates_io/src/lib.rs b/crates/lib/docs_rs_crates_io/src/lib.rs new file mode 100644 index 000000000..a9970c28f --- /dev/null +++ b/crates/lib/docs_rs_crates_io/src/lib.rs @@ -0,0 +1 @@ +pub mod events; From fed546f2f27c1e98e47d03e25b22934a9a01b261 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 03:55:39 +0200 Subject: [PATCH 17/23] sort --- crates/lib/docs_rs_crates_io/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/lib/docs_rs_crates_io/Cargo.toml b/crates/lib/docs_rs_crates_io/Cargo.toml index 96b373c89..c6f9224b3 100644 --- a/crates/lib/docs_rs_crates_io/Cargo.toml +++ b/crates/lib/docs_rs_crates_io/Cargo.toml @@ -10,8 +10,8 @@ edition.workspace = true [dependencies] chrono = { version = "0.4", features = ["serde"] } -serde = { version = "1", features = ["derive"] } semver = { version = "1", features = ["serde"] } +serde = { version = "1", features = ["derive"] } [dev-dependencies] serde_json = "1.0" From f056b427b9ac842fe577fbe8df2d2372ad4a86a6 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 04:03:26 +0200 Subject: [PATCH 18/23] save --- Cargo.lock | 39 +++++++++++++++++---- crates/bin/docs_rs_watcher/src/db/delete.rs | 14 ++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a46048b74..2ef05ee9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,9 +537,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.4" +version = "1.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77ed8e8c52d2dc2390ad9f15647fe663f71e9780b4262c190fbb823a32721566" +checksum = "6c9b9de216a988dd54b754a82a7660cfe14cee4f6782ae4524470972fa0ccb39" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -599,6 +599,31 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-sqs" +version = "1.102.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0246bf049cfc003ce44599dff955b9353758de3afa68a053da9b2c7de20a07d8" +dependencies = [ + "arc-swap", + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-observability", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "http 1.4.1", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.106.0" @@ -682,9 +707,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.20" +version = "0.60.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faf09d74e5e32f76b8762da505a3cd59303e367a664ca67295387baa8c1d7548" +checksum = "78d8391e65fcea47c586a22e1a41f173b38615b112b2c6b7a44e80cec3e6b706" dependencies = [ "aws-smithy-types", "bytes", @@ -835,9 +860,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.9" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53f93074121a1be41317b9aa607143ae17900631f7f59a99f2b905d519d6783b" +checksum = "32b42fcf341259d85ca10fac9a2f6448a8ec691c6955a18e45bc3b71a85fab85" dependencies = [ "base64-simd", "bytes", @@ -2428,6 +2453,7 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-sdk-sqs", "clap", "crates-index", "crates-index-diff", @@ -2454,6 +2480,7 @@ dependencies = [ "test-case", "tokio", "tracing", + "url", ] [[package]] diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index c65fa7a6b..40cd19341 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -453,6 +453,13 @@ mod tests { ); } + // running delete-crate again doesn't error. + assert!( + delete_crate(&mut conn, storage, env.config(), &FOO) + .await + .is_ok() + ); + Ok(()) } @@ -617,6 +624,13 @@ mod tests { vec!["Peter Rabbit".to_string()] ); + // running delete-version again doesn't fail. + assert!( + delete_version(&mut conn, storage, env.config(), &KRATE, &V1) + .await + .is_ok() + ); + // FIXME: remove for now until test frontend is async // let web = env.frontend(); // assert_success("/a/2.0.0/a/", web)?; From d0417990a4f10986d2ee4c04fa2e88240bc34dea Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 04:12:16 +0200 Subject: [PATCH 19/23] make deletes repeatable --- crates/bin/docs_rs_watcher/src/db/delete.rs | 53 +++++++++++++-------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/crates/bin/docs_rs_watcher/src/db/delete.rs b/crates/bin/docs_rs_watcher/src/db/delete.rs index 40cd19341..4b97be297 100644 --- a/crates/bin/docs_rs_watcher/src/db/delete.rs +++ b/crates/bin/docs_rs_watcher/src/db/delete.rs @@ -67,7 +67,13 @@ pub async fn delete_version( return Ok(()); }; - let is_library = delete_version_from_database(conn, config, name, crate_id, version).await?; + let Some(is_library) = + delete_version_from_database(conn, config, name, crate_id, version).await? + else { + // release doesn't exist + return Ok(()); + }; + let paths = if is_library { LIBRARY_STORAGE_PATHS_TO_DELETE } else { @@ -133,7 +139,18 @@ async fn delete_version_from_database( name: &KrateName, crate_id: CrateId, version: &Version, -) -> Result { +) -> Result> { + let Some(release_id) = sqlx::query_scalar!( + "SELECT id FROM releases WHERE crate_id = $1 AND version = $2", + crate_id as _, + version as _ + ) + .fetch_optional(&mut *conn) + .await? + else { + return Ok(None); + }; + let mut transaction = conn.begin().await?; let delete_lock_timeout = format!("{}ms", config.delete_lock_timeout.as_millis()); @@ -157,31 +174,27 @@ async fn delete_version_from_database( sqlx::query!( "DELETE FROM builds_logs bl USING builds b - JOIN releases r ON b.rid = r.id - WHERE bl.build_id = b.id AND r.crate_id = $1 AND r.version = $2;", - crate_id as _, - version as _ + WHERE bl.build_id = b.id AND b.rid = $1;", + release_id as _, ) .execute(&mut *transaction) .await?; for &(table, column) in METADATA { - sqlx::query(sqlx::AssertSqlSafe( - format!("DELETE FROM {table} WHERE {column} IN (SELECT id FROM releases WHERE crate_id = $1 AND version = $2)"))) - .bind(crate_id).bind(version).execute(&mut *transaction).await?; + sqlx::query(sqlx::AssertSqlSafe(format!( + "DELETE FROM {table} WHERE {column} = $1" + ))) + .bind(release_id) + .execute(&mut *transaction) + .await?; } - let Some(is_library) = sqlx::query_scalar!( - "DELETE FROM releases WHERE crate_id = $1 AND version = $2 RETURNING is_library", - crate_id.0, - version as _, + let is_library: bool = sqlx::query_scalar!( + "DELETE FROM releases WHERE id = $1 RETURNING is_library", + release_id as _, ) - .fetch_optional(&mut *transaction) + .fetch_one(&mut *transaction) .await? - else { - transaction.commit().await?; - return Ok(false); - }; - let is_library = is_library.unwrap_or(false); + .unwrap_or(false); sqlx::query!( "DELETE FROM queue WHERE name = $1 AND version = $2;", @@ -194,7 +207,7 @@ async fn delete_version_from_database( update_latest_version_id(&mut transaction, crate_id).await?; transaction.commit().await?; - Ok(is_library) + Ok(Some(is_library)) } /// Returns whether any release in this crate was a library From e1f4467e1a3eace5b2e0006983d6d34bec98a421 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 05:31:14 +0200 Subject: [PATCH 20/23] first version --- Cargo.lock | 3 + Cargo.toml | 5 + crates/bin/docs_rs_watcher/Cargo.toml | 3 + crates/bin/docs_rs_watcher/src/config.rs | 4 + .../bin/docs_rs_watcher/src/index_watcher.rs | 57 +++++- crates/bin/docs_rs_watcher/src/lib.rs | 10 +- crates/bin/docs_rs_watcher/src/main.rs | 8 +- crates/bin/docs_rs_watcher/src/subscriber.rs | 192 ++++++++++++++++++ .../docs_rs_watcher/src/synchronization.rs | 27 +++ crates/lib/docs_rs_crates_io/src/events.rs | 10 + crates/lib/docs_rs_storage/Cargo.toml | 6 +- 11 files changed, 307 insertions(+), 18 deletions(-) create mode 100644 crates/bin/docs_rs_watcher/src/subscriber.rs create mode 100644 crates/bin/docs_rs_watcher/src/synchronization.rs diff --git a/Cargo.lock b/Cargo.lock index 2ef05ee9a..da131d79d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2453,6 +2453,7 @@ name = "docs_rs_watcher" version = "0.6.0" dependencies = [ "anyhow", + "aws-config", "aws-sdk-sqs", "clap", "crates-index", @@ -2460,6 +2461,7 @@ dependencies = [ "docs_rs_build_queue", "docs_rs_config", "docs_rs_context", + "docs_rs_crates_io", "docs_rs_database", "docs_rs_env_vars", "docs_rs_fastly", @@ -2476,6 +2478,7 @@ dependencies = [ "opentelemetry", "pretty_assertions", "rayon", + "serde_json", "sqlx", "test-case", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 1d178d433..3f7d1a58b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,11 @@ edition = "2024" anyhow = { version = "1.0.42", features = ["backtrace"] } askama = "0.16.0" async-stream = "0.3.5" +# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 +# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable +# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by +# using the modern rustls 0.23 + hyper 1.x stack instead. +aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } axum-extra = { version = "0.12.0", features = ["middleware", "routing", "typed-header"] } base64 = "0.22" bon = { version = "3.8.1", features = ["experimental-overwritable"] } diff --git a/crates/bin/docs_rs_watcher/Cargo.toml b/crates/bin/docs_rs_watcher/Cargo.toml index caeaefa97..4b24a2d55 100644 --- a/crates/bin/docs_rs_watcher/Cargo.toml +++ b/crates/bin/docs_rs_watcher/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow = { workspace = true } +aws-config = { workspace = true } aws-sdk-sqs = { version = "1.99.0", default-features = false, features = ["default-https-client", "rt-tokio"] } clap = { workspace = true } # NOTE: on the new infra, switch back from `git-https-reqwest` to `git-https` (curl) once the curl version is new enough @@ -16,6 +17,7 @@ crates-index = { version = "3.0.0", default-features = false, features = ["git", crates-index-diff = { version = "30.0.0", default-features = false, features = ["http-reqwest", "max-performance", "semver"] } docs_rs_build_queue = { path = "../../lib/docs_rs_build_queue" } docs_rs_config = { path = "../../lib/docs_rs_config" } +docs_rs_crates_io = { path = "../../lib/docs_rs_crates_io" } docs_rs_context = { path = "../../lib/docs_rs_context" } docs_rs_database = { path = "../../lib/docs_rs_database" } docs_rs_env_vars = { path = "../../lib/docs_rs_env_vars" } @@ -28,6 +30,7 @@ docs_rs_types = { path = "../../lib/docs_rs_types" } docs_rs_utils = { path = "../../lib/docs_rs_utils" } futures-util = { workspace = true } itertools = { workspace = true } +serde_json = { workspace = true } opentelemetry = { workspace = true } rayon = "1.6.1" sqlx = { workspace = true } diff --git a/crates/bin/docs_rs_watcher/src/config.rs b/crates/bin/docs_rs_watcher/src/config.rs index 404ade8f5..388eaa5db 100644 --- a/crates/bin/docs_rs_watcher/src/config.rs +++ b/crates/bin/docs_rs_watcher/src/config.rs @@ -10,6 +10,7 @@ pub struct Config { pub registry_url: Option, pub sqs_queue_url: Option, pub sqs_region: Option, + pub aws_sdk_max_retries: u32, /// How long to wait between registry checks pub delay_between_registry_fetches: Duration, @@ -32,8 +33,11 @@ impl AppConfig for Config { Ok(Self { registry_index_path: env("REGISTRY_INDEX_PATH", prefix.join("crates.io-index"))?, registry_url: maybe_env("REGISTRY_URL")?, + sqs_queue_url: maybe_env("DOCSRS_SQS_QUEUE_URL")?, sqs_region: maybe_env("DOCSRS_SQS_REGION")?, + aws_sdk_max_retries: env("DOCSRS_AWS_SDK_MAX_RETRIES", 6u32)?, + delay_between_registry_fetches: Duration::from_secs(env::( "DOCSRS_DELAY_BETWEEN_REGISTRY_FETCHES", 60, diff --git a/crates/bin/docs_rs_watcher/src/index_watcher.rs b/crates/bin/docs_rs_watcher/src/index_watcher.rs index 68b964d79..871572be0 100644 --- a/crates/bin/docs_rs_watcher/src/index_watcher.rs +++ b/crates/bin/docs_rs_watcher/src/index_watcher.rs @@ -2,6 +2,7 @@ use crate::{ Config, db::{delete_crate, delete_version}, index::Index, + synchronization::CrateLocks, }; use anyhow::{Context as _, Result}; use crates_index_diff::Change; @@ -45,6 +46,30 @@ impl TryFrom for CrateVersion { } } +impl TryFrom<&docs_rs_crates_io::events::CrateVersion> for CrateVersion { + type Error = anyhow::Error; + + fn try_from(value: &docs_rs_crates_io::events::CrateVersion) -> Result { + Ok(Self { + name: value.name.parse()?, + version: value.version.clone().into(), + yanked: value.yanked, + }) + } +} + +impl TryFrom for CrateVersion { + type Error = anyhow::Error; + + fn try_from(value: docs_rs_crates_io::events::CrateVersion) -> Result { + Ok(Self { + name: value.name.parse()?, + version: value.version.into(), + yanked: value.yanked, + }) + } +} + #[cfg(test)] impl From for crates_index_diff::CrateVersion { fn from(value: CrateVersion) -> Self { @@ -92,6 +117,7 @@ async fn queue_crate_invalidation(krate: &KrateName, cdn: Option<&Cdn>) { /// Returns the number of crates added pub(crate) async fn get_new_crates( context: &Context, + locks: &CrateLocks, index: &Index, config: &Config, ) -> Result { @@ -115,7 +141,7 @@ pub(crate) async fn get_new_crates( debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - let crates_added = process_changes(context, &changes, config).await; + let crates_added = process_changes(context, &locks, &changes, config).await; if let Err(err) = context.build_queue()?.deprioritize_workspaces().await { error!(?err, "error deprioritizing workspaces"); @@ -129,11 +155,16 @@ pub(crate) async fn get_new_crates( Ok(crates_added) } -async fn process_changes(context: &Context, changes: &Vec, config: &Config) -> usize { +async fn process_changes( + context: &Context, + locks: &CrateLocks, + changes: &Vec, + config: &Config, +) -> usize { let mut crates_added = 0; for change in changes { - match process_change(context, change, config).await { + match process_change(context, locks, change, config).await { Ok(added) => { if added { crates_added += 1; @@ -148,7 +179,12 @@ async fn process_changes(context: &Context, changes: &Vec, config: &Conf } /// Process a crate change, returning whether the change was a crate addition or not. -async fn process_change(context: &Context, change: &Change, config: &Config) -> Result { +pub(crate) async fn process_change( + context: &Context, + locks: &CrateLocks, + change: &Change, + config: &Config, +) -> Result { let crate_version: CrateVersion = change .versions() .first() @@ -156,6 +192,8 @@ async fn process_change(context: &Context, change: &Change, config: &Config) -> .clone() .try_into()?; + let _guard = locks.lock(crate_version.name.to_string()).await; + match change { Change::Added(_release) => process_version_added(context, &crate_version).await?, Change::AddedAndYanked(_release) => { @@ -177,7 +215,10 @@ async fn process_change(context: &Context, change: &Change, config: &Config) -> } /// Processes crate changes, whether they got yanked or unyanked. -async fn process_version_yank_status(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_yank_status( + context: &Context, + release: &CrateVersion, +) -> Result<()> { // FIXME: delay yanks of crates that have not yet finished building // https://github.com/rust-lang/docs.rs/issues/1934 set_yanked(context, &release.name, &release.version, release.yanked).await?; @@ -185,7 +226,7 @@ async fn process_version_yank_status(context: &Context, release: &CrateVersion) Ok(()) } -async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { +pub(crate) async fn process_version_added(context: &Context, release: &CrateVersion) -> Result<()> { let mut conn = context.pool()?.get_async().await?; let priority = get_crate_priority(&mut conn, &release.name).await?; context @@ -216,7 +257,7 @@ async fn process_version_added(context: &Context, release: &CrateVersion) -> Res Ok(()) } -async fn process_version_deleted( +pub(crate) async fn process_version_deleted( context: &Context, config: &Config, release: &CrateVersion, @@ -250,7 +291,7 @@ async fn process_version_deleted( Ok(()) } -async fn process_crate_deleted( +pub(crate) async fn process_crate_deleted( context: &Context, config: &Config, krate: &KrateName, diff --git a/crates/bin/docs_rs_watcher/src/lib.rs b/crates/bin/docs_rs_watcher/src/lib.rs index 833a6c688..05cf5cc68 100644 --- a/crates/bin/docs_rs_watcher/src/lib.rs +++ b/crates/bin/docs_rs_watcher/src/lib.rs @@ -5,6 +5,8 @@ mod index; pub mod index_watcher; mod rebuilds; mod service_metrics; +pub mod subscriber; +pub mod synchronization; #[cfg(test)] mod testing; @@ -13,7 +15,9 @@ pub use db::{delete_crate, delete_version}; pub use index::Index; pub use rebuilds::queue_rebuilds; -use crate::{index_watcher::get_new_crates, service_metrics::OtelServiceMetrics}; +use crate::{ + index_watcher::get_new_crates, service_metrics::OtelServiceMetrics, synchronization::CrateLocks, +}; use anyhow::Result; use docs_rs_context::Context; use docs_rs_utils::start_async_cron; @@ -24,7 +28,7 @@ use tracing::{debug, error, info, trace}; /// Run the registry watcher /// NOTE: this should only be run once, otherwise crates would be added /// to the queue multiple times. -pub async fn watch_registry(config: &Config, context: &Context) -> Result<()> { +pub async fn watch_registry(config: &Config, context: &Context, locks: &CrateLocks) -> Result<()> { let mut last_gc = Instant::now(); let queue = context.build_queue()?; @@ -36,7 +40,7 @@ pub async fn watch_registry(config: &Config, context: &Context) -> Result<()> { debug!("Checking new crates"); let index = Index::from_config(config).await?; - match get_new_crates(context, &index, config).await { + match get_new_crates(context, locks, &index, config).await { Ok(n) => debug!("{} crates added to queue", n), Err(e) => { error!(?e, "Failed to get new crates"); diff --git a/crates/bin/docs_rs_watcher/src/main.rs b/crates/bin/docs_rs_watcher/src/main.rs index ba133b8e1..3a7ad4d34 100644 --- a/crates/bin/docs_rs_watcher/src/main.rs +++ b/crates/bin/docs_rs_watcher/src/main.rs @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand}; use docs_rs_config::AppConfig as _; use docs_rs_context::Context; use docs_rs_types::{KrateName, Version}; -use docs_rs_watcher::{Config, Index, index_watcher}; +use docs_rs_watcher::{Config, Index, index_watcher, synchronization::CrateLocks}; use std::sync::Arc; #[tokio::main] @@ -81,7 +81,11 @@ impl CommandLine { // which should only run once, and all the time. docs_rs_watcher::start_background_service_metric_collector(&ctx).await?; - docs_rs_watcher::watch_registry(&config, &ctx).await?; + let locks = CrateLocks::new(); + tokio::try_join!( + docs_rs_watcher::watch_registry(&config, &ctx, &locks), + docs_rs_watcher::subscriber::listen(&config, &ctx, &locks), + )?; } Self::Queue { subcommand } => subcommand.handle_args(config, ctx).await?, Self::Database { subcommand } => subcommand.handle_args(config, ctx).await?, diff --git a/crates/bin/docs_rs_watcher/src/subscriber.rs b/crates/bin/docs_rs_watcher/src/subscriber.rs new file mode 100644 index 000000000..78e76df5f --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/subscriber.rs @@ -0,0 +1,192 @@ +use crate::{ + Config, + index_watcher::{ + process_crate_deleted, process_version_added, process_version_deleted, + process_version_yank_status, + }, + synchronization::CrateLocks, +}; +use anyhow::{Context as _, Result, bail}; +use aws_config::{BehaviorVersion, Region, retry::RetryConfig}; +use aws_sdk_sqs::Client; +use docs_rs_context::Context; +use docs_rs_crates_io::events::{IndexChangeEventV1, IndexChangeV1}; +use docs_rs_types::KrateName; +use docs_rs_utils::retry_async; +use std::time::Duration; +use tokio::time; +use tracing::{debug, error, instrument, warn}; + +/// visibility timeout: +/// should be longer than the longest time our server takes to handle a message. +/// +/// if we fetch a message, and don't delete it in this time, it will be redelivered. +const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(60); + +/// wait-time (long polling): +/// +/// How long should the request be kept open when there are no messages. +const WAIT_TIME: Duration = Duration::from_secs(30); + +/// when one long-polling request is finished, how long to sleep before starting the next? +const SLEEP_BETWEEN_REQUESTS: Duration = Duration::from_secs(1); + +/// when we have an error handling a message, how long should SQS wait until +/// it redelivers this message. +const RETRY_DELAY: Duration = Duration::from_secs(30); + +pub async fn listen(config: &Config, context: &Context, locks: &CrateLocks) -> Result<()> { + let (Some(region), Some(queue_url)) = (&config.sqs_region, &config.sqs_queue_url) else { + bail!("missing sqs region or url, disabling crates.io subscriber"); + }; + let queue_url = queue_url.to_string(); + + let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let client = Client::from_conf( + aws_sdk_sqs::config::Builder::from(&shared_config) + .retry_config(RetryConfig::standard().with_max_attempts(config.aws_sdk_max_retries)) + .region(Region::new(region.clone())) + .build(), + ); + + let queue = context.build_queue()?; + + loop { + if queue.is_locked().await? { + debug!("Queue is locked, skipping checking new crates"); + time::sleep(WAIT_TIME).await; + continue; + } + + let response = match client + .receive_message() + .queue_url(queue_url.clone()) + .max_number_of_messages(10) + .wait_time_seconds(WAIT_TIME.as_secs() as i32) + .visibility_timeout(VISIBILITY_TIMEOUT.as_secs() as i32) + .send() + .await + { + Ok(response) => response, + Err(err) => { + error!( + ?err, + queue_url, "error receiving messages from sqs, retrying" + ); + time::sleep(WAIT_TIME).await; + continue; + } + }; + + for message in response.messages() { + let Some(body) = message.body() else { + continue; + }; + + match retry_async( + || async move { process_message(context, config, locks, body).await }, + 3, + ) + .await + { + Ok(_) => { + if let Some(receipt_handle) = message.receipt_handle() { + // mark the message as "done" + if let Err(err) = client + .delete_message() + .queue_url(queue_url.clone()) + .receipt_handle(receipt_handle) + .send() + .await + { + // sqs will redeliver the message after the visibility timeout passed + error!( + ?err, + receipt_handle, queue_url, "error deleting message from queue" + ); + } + } + } + Err(err) => { + error!( + ?err, + ?message, + ?RETRY_DELAY, + body, + "error handling message. Retrying." + ); + + if let Some(receipt_handle) = message.receipt_handle() { + // Don't delete the message. + // It will become visible again after the visibility timeout. + if let Err(err) = client + .change_message_visibility() + .queue_url(queue_url.clone()) + .receipt_handle(receipt_handle) + // retry after some time + .visibility_timeout(RETRY_DELAY.as_secs() as i32) // retry + .send() + .await + { + // this error doesn't really matter, without the changed visibility + // timeout sqs will redeliver after the default visibility timeout. + warn!( + ?err, + receipt_handle, + queue_url, + "error setting visibility_timeout for retry" + ); + } + } + } + } + } + + time::sleep(SLEEP_BETWEEN_REQUESTS).await; + } +} + +#[instrument(skip(context, config, locks))] +async fn process_message( + context: &Context, + config: &Config, + locks: &CrateLocks, + body: &str, +) -> Result<()> { + let event: IndexChangeEventV1 = + serde_json::from_str(body).context("error parsing event from json")?; + + debug!(?event, "received event from sqs"); + + let _guard = locks.lock(event.change.name()).await; + + process_change(context, &event.change, config) + .await + .context("error processing change")?; + + Ok(()) +} + +/// Process a crate change, returning whether the change was a crate addition or not. +pub(crate) async fn process_change( + context: &Context, + change: &IndexChangeV1, + config: &Config, +) -> Result { + match change { + IndexChangeV1::Added(crate_version) => { + process_version_added(context, &crate_version.try_into().unwrap()).await? + } + IndexChangeV1::Unyanked(crate_version) | IndexChangeV1::Yanked(crate_version) => { + process_version_yank_status(context, &crate_version.try_into().unwrap()).await? + } + IndexChangeV1::CrateDeleted { name, .. } => { + let name: KrateName = name.parse()?; + process_crate_deleted(context, config, &name).await? + } + IndexChangeV1::VersionDeleted(crate_version) => { + process_version_deleted(context, config, &crate_version.try_into().unwrap()).await? + } + }; + Ok(change.added().is_some()) +} diff --git a/crates/bin/docs_rs_watcher/src/synchronization.rs b/crates/bin/docs_rs_watcher/src/synchronization.rs new file mode 100644 index 000000000..d80be40c6 --- /dev/null +++ b/crates/bin/docs_rs_watcher/src/synchronization.rs @@ -0,0 +1,27 @@ +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{Mutex, OwnedMutexGuard}; + +#[derive(Clone, Default)] +pub struct CrateLocks { + locks: Arc>>>>, +} + +impl CrateLocks { + pub fn new() -> Self { + Self { + locks: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn lock(&self, crate_name: impl Into) -> OwnedMutexGuard<()> { + let lock = { + let mut locks = self.locks.lock().await; + locks + .entry(crate_name.into()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + + lock.lock_owned().await + } +} diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 12cc5170b..986e814cb 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -59,6 +59,16 @@ impl IndexChangeV1 { _ => None, } } + + pub fn name(&self) -> &str { + match self { + IndexChangeV1::Added(crate_version) => &crate_version.name, + IndexChangeV1::Unyanked(crate_version) => &crate_version.name, + IndexChangeV1::Yanked(crate_version) => &crate_version.name, + IndexChangeV1::CrateDeleted { name } => &name, + IndexChangeV1::VersionDeleted(crate_version) => &crate_version.name, + } + } } impl fmt::Display for IndexChangeV1 { diff --git a/crates/lib/docs_rs_storage/Cargo.toml b/crates/lib/docs_rs_storage/Cargo.toml index 153de2489..6b0fb85ea 100644 --- a/crates/lib/docs_rs_storage/Cargo.toml +++ b/crates/lib/docs_rs_storage/Cargo.toml @@ -16,11 +16,7 @@ testing = [ anyhow = { workspace = true } async-compression = { version = "0.4.32", features = ["bzip2", "deflate", "gzip", "tokio", "zstd"] } async-stream = { workspace = true } -# The default `rustls` feature pulls in the legacy hyper 0.14 + rustls 0.21 -# stack via `aws-smithy-runtime/tls-rustls`, which includes the vulnerable -# `rustls-webpki` v0.101.x. Using only `default-https-client` avoids this by -# using the modern rustls 0.23 + hyper 1.x stack instead. -aws-config = { version = "1.0.0", default-features = false, features = ["default-https-client", "rt-tokio"] } +aws-config = { workspace = true } aws-sdk-s3 = { version = "1.3.0", default-features = false, features = ["default-https-client", "rt-tokio"] } aws-smithy-types-convert = { version = "0.60.0", features = ["convert-chrono"] } base64 = { workspace = true } From bc3b16e3c067f4cb469e8fa08dfce1242906ca73 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 05:33:48 +0200 Subject: [PATCH 21/23] errs --- crates/bin/docs_rs_watcher/src/index_watcher.rs | 4 +++- crates/lib/docs_rs_crates_io/src/events.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/bin/docs_rs_watcher/src/index_watcher.rs b/crates/bin/docs_rs_watcher/src/index_watcher.rs index 871572be0..87e00cd50 100644 --- a/crates/bin/docs_rs_watcher/src/index_watcher.rs +++ b/crates/bin/docs_rs_watcher/src/index_watcher.rs @@ -141,7 +141,7 @@ pub(crate) async fn get_new_crates( debug!(last_seen_reference=%last_seen_reference, new_reference=%new_reference, "queueing changes"); - let crates_added = process_changes(context, &locks, &changes, config).await; + let crates_added = process_changes(context, locks, &changes, config).await; if let Err(err) = context.build_queue()?.deprioritize_workspaces().await { error!(?err, "error deprioritizing workspaces"); @@ -558,8 +558,10 @@ mod tests { version: V2, ..Default::default() }; + let locks = CrateLocks::new(); let added = process_changes( &env, + &locks, &vec![ // Should be added correctly Change::Added(krate1.into()), diff --git a/crates/lib/docs_rs_crates_io/src/events.rs b/crates/lib/docs_rs_crates_io/src/events.rs index 986e814cb..4b9dc4ea3 100644 --- a/crates/lib/docs_rs_crates_io/src/events.rs +++ b/crates/lib/docs_rs_crates_io/src/events.rs @@ -65,7 +65,7 @@ impl IndexChangeV1 { IndexChangeV1::Added(crate_version) => &crate_version.name, IndexChangeV1::Unyanked(crate_version) => &crate_version.name, IndexChangeV1::Yanked(crate_version) => &crate_version.name, - IndexChangeV1::CrateDeleted { name } => &name, + IndexChangeV1::CrateDeleted { name } => name, IndexChangeV1::VersionDeleted(crate_version) => &crate_version.name, } } From 8f49598882325c8f25ac1b01fe1af83ff1149d7d Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sat, 13 Jun 2026 05:34:30 +0200 Subject: [PATCH 22/23] todo --- crates/bin/docs_rs_watcher/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/bin/docs_rs_watcher/src/main.rs b/crates/bin/docs_rs_watcher/src/main.rs index 3a7ad4d34..9ee93819c 100644 --- a/crates/bin/docs_rs_watcher/src/main.rs +++ b/crates/bin/docs_rs_watcher/src/main.rs @@ -82,6 +82,7 @@ impl CommandLine { docs_rs_watcher::start_background_service_metric_collector(&ctx).await?; let locks = CrateLocks::new(); + // FIXME: we don't want to exit in error case, do we? tokio::try_join!( docs_rs_watcher::watch_registry(&config, &ctx, &locks), docs_rs_watcher::subscriber::listen(&config, &ctx, &locks), From d927bc5e56e3b1e8565f467ba246541ef93544e5 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 14 Jun 2026 14:59:54 +0200 Subject: [PATCH 23/23] prio --- crates/bin/docs_rs_watcher/src/subscriber.rs | 3 +++ crates/bin/docs_rs_watcher/src/synchronization.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/crates/bin/docs_rs_watcher/src/subscriber.rs b/crates/bin/docs_rs_watcher/src/subscriber.rs index 78e76df5f..2dce449e9 100644 --- a/crates/bin/docs_rs_watcher/src/subscriber.rs +++ b/crates/bin/docs_rs_watcher/src/subscriber.rs @@ -17,6 +17,9 @@ use std::time::Duration; use tokio::time; use tracing::{debug, error, instrument, warn}; +// TODO: +// * when should we run deprioritize_workspaces ? + /// visibility timeout: /// should be longer than the longest time our server takes to handle a message. /// diff --git a/crates/bin/docs_rs_watcher/src/synchronization.rs b/crates/bin/docs_rs_watcher/src/synchronization.rs index d80be40c6..b1720fe91 100644 --- a/crates/bin/docs_rs_watcher/src/synchronization.rs +++ b/crates/bin/docs_rs_watcher/src/synchronization.rs @@ -1,6 +1,9 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::{Mutex, OwnedMutexGuard}; +/// shared locks so we can serialize changes to the same crate, +/// for the transition phase where we might get input from both +/// the git index and the sqs queue. #[derive(Clone, Default)] pub struct CrateLocks { locks: Arc>>>>,