diff --git a/Cargo.toml b/Cargo.toml index 532e152..bdd702c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" [dependencies] # CLI -clap = { version = "4", features = ["derive"] } +clap = { version = "4", features = ["derive", "env"] } # Error handling anyhow = "1" diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 61eda99..fd54b98 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -7,5 +7,6 @@ pub mod fetch_issues; pub mod fetch_labels; pub mod generate_summaries; pub mod purge; +pub mod push_metrics; pub mod remove_legacy_label; pub mod workflows; diff --git a/src/commands/push_metrics.rs b/src/commands/push_metrics.rs new file mode 100644 index 0000000..171d323 --- /dev/null +++ b/src/commands/push_metrics.rs @@ -0,0 +1,331 @@ +use crate::config::Config; +use anyhow::{Context, Result}; +use reqwest::blocking::Client; +use rusqlite::Connection; +use serde_json::json; +use std::collections::HashMap; + +const DD_API_URL: &str = "https://api.datadoghq.com/api/v2/series"; +const BATCH_SIZE: usize = 200; + +pub fn run( + config: &Config, + dd_api_key: &str, + dd_site: Option<&str>, + since: Option<&str>, + dry_run: bool, +) -> Result<()> { + let client = Client::new(); + let api_url = match dd_site { + Some(site) => format!("https://api.{site}/api/v2/series"), + None => DD_API_URL.to_string(), + }; + + let db_path = format!("out/db/{}_{}.db", config.repo_owner, config.repo_name); + let conn = Connection::open(&db_path) + .with_context(|| format!("Failed to open database: {db_path}"))?; + + let repo_tag = format!("repo:{}/{}", config.repo_owner, config.repo_name); + let since_filter = since.unwrap_or("2026-01-01"); + + println!("Reading from {db_path} (created_at >= {since_filter})..."); + + let mut all_series = Vec::new(); + + // Issues + let issue_series = items_to_series(&conn, "issues", &repo_tag, since_filter)?; + all_series.extend(issue_series); + + // Pull requests + let pr_series = items_to_series(&conn, "pull_requests", &repo_tag, since_filter)?; + all_series.extend(pr_series); + + // Discussions + let disc_series = discussions_to_series(&conn, &repo_tag, since_filter)?; + all_series.extend(disc_series); + + if all_series.is_empty() { + println!("No metrics to push."); + return Ok(()); + } + + if dry_run { + print_dry_run(&all_series); + return Ok(()); + } + + println!("Pushing {} metric series to Datadog...", all_series.len()); + for (i, chunk) in all_series.chunks(BATCH_SIZE).enumerate() { + let payload = json!({ "series": chunk }); + let response = client + .post(&api_url) + .header("DD-API-KEY", dd_api_key) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .context("Failed to send metrics to Datadog")?; + + if response.status().is_success() { + println!(" Batch {}: submitted {} series", i + 1, chunk.len()); + } else { + let status = response.status(); + let body = response.text().unwrap_or_default(); + eprintln!(" Batch {} failed ({status}): {body}", i + 1); + } + } + + println!("Done."); + Ok(()) +} + +/// Build per-item metric series for issues or pull_requests. +/// Each item becomes a data point with value=1 at its created_at timestamp, +/// tagged with all its labels, state, and issue_type. +fn items_to_series( + conn: &Connection, + table: &str, + repo_tag: &str, + since: &str, +) -> Result> { + let metric_name = if table == "pull_requests" { + "github.prs" + } else { + "github.issues" + }; + + // Build a map of item_id -> list of label names + let label_map = build_label_map(conn, table, since)?; + + // Query items + let issue_type_col = if table == "issues" { ", issue_type" } else { "" }; + let extra_filter = if table == "pull_requests" { + " AND is_draft = 0" + } else { + "" + }; + let query = format!( + "SELECT id, number, state, created_at, closed_at{issue_type_col} + FROM {table} + WHERE created_at >= ?{extra_filter} + ORDER BY created_at" + ); + + let mut stmt = conn.prepare(&query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + + // Accumulate: group by unique tag set -> series with points + let mut series_map: HashMap, Vec)> = + HashMap::new(); + + while let Some(row) = rows.next()? { + let id: i64 = row.get(0)?; + let _number: i64 = row.get(1)?; + let state: String = row.get(2)?; + let created_at: String = row.get(3)?; + let closed_at: Option = row.get(4)?; + let issue_type: Option = if table == "issues" { + row.get(5)? + } else { + None + }; + + let ts = match parse_timestamp(&created_at) { + Some(t) => t, + None => continue, + }; + + let mut tags = vec![repo_tag.to_string(), format!("state:{state}")]; + + if let Some(ref it) = issue_type { + tags.push(format!("issue_type:{}", it.to_lowercase())); + } + + // Add all labels as tags + if let Some(labels) = label_map.get(&id) { + for label in labels { + tags.push(label_to_tag(label)); + } + } + + // Sort tags for consistent grouping + tags.sort(); + let key = tags.join(","); + + let entry = series_map + .entry(key) + .or_insert_with(|| (metric_name.to_string(), tags.clone(), Vec::new())); + entry.2.push(json!({"timestamp": ts, "value": 1})); + + // If closed, also emit a closed event + if state == "closed" { + if let Some(ref ca) = closed_at { + if let Some(closed_ts) = parse_timestamp(ca) { + let closed_tags = tags.clone(); + // Replace state:closed -> already there + let closed_key = format!("closed:{}", closed_tags.join(",")); + let closed_metric = format!("{metric_name}.closed"); + let entry = series_map + .entry(closed_key) + .or_insert_with(|| (closed_metric, closed_tags, Vec::new())); + entry.2.push(json!({"timestamp": closed_ts, "value": 1})); + } + } + } + } + + Ok(series_map + .into_values() + .map(|(metric, tags, points)| { + json!({ + "metric": metric, + "type": 1, // count + "points": points, + "tags": tags, + }) + }) + .collect()) +} + +fn discussions_to_series( + conn: &Connection, + repo_tag: &str, + since: &str, +) -> Result> { + let query = + "SELECT number, category, created_at, closed, is_answered + FROM discussions + WHERE created_at >= ? + ORDER BY created_at"; + + let mut stmt = conn.prepare(query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + + let mut series_map: HashMap, Vec)> = + HashMap::new(); + + while let Some(row) = rows.next()? { + let _number: i64 = row.get(0)?; + let category: String = row.get(1)?; + let created_at: String = row.get(2)?; + let closed: bool = row.get(3)?; + let is_answered: Option = row.get(4)?; + + let ts = match parse_timestamp(&created_at) { + Some(t) => t, + None => continue, + }; + + let answered = is_answered.unwrap_or(false); + let state = if closed { + "closed" + } else if answered { + "answered" + } else { + "open" + }; + + let tags = vec![ + repo_tag.to_string(), + format!("category:{}", category.to_lowercase()), + format!("state:{state}"), + format!("answered:{answered}"), + ]; + + let key = tags.join(","); + let entry = series_map + .entry(key) + .or_insert_with(|| ("github.discussions".to_string(), tags.clone(), Vec::new())); + entry.2.push(json!({"timestamp": ts, "value": 1})); + } + + Ok(series_map + .into_values() + .map(|(metric, tags, points)| { + json!({ + "metric": metric, + "type": 1, + "points": points, + "tags": tags, + }) + }) + .collect()) +} + +fn build_label_map( + conn: &Connection, + table: &str, + since: &str, +) -> Result>> { + let query = format!( + "SELECT il.issue_id, l.name + FROM issue_labels il + JOIN labels l ON l.id = il.label_id + JOIN {table} t ON t.id = il.issue_id + WHERE t.created_at >= ?" + ); + let mut stmt = conn.prepare(&query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + let mut map: HashMap> = HashMap::new(); + while let Some(row) = rows.next()? { + let id: i64 = row.get(0)?; + let name: String = row.get(1)?; + map.entry(id).or_default().push(name); + } + Ok(map) +} + +fn label_to_tag(label: &str) -> String { + // "domain: vrl" -> "domain:vrl" + // "type: bug" -> "type:bug" + // "good first issue" -> "label:good first issue" + if let Some((prefix, value)) = label.split_once(": ") { + format!("{prefix}:{value}") + } else { + format!("label:{label}") + } +} + +fn parse_timestamp(s: &str) -> Option { + chrono::DateTime::parse_from_rfc3339(s) + .ok() + .map(|dt| dt.timestamp()) +} + +fn print_dry_run(all_series: &[serde_json::Value]) { + println!( + "[dry-run] Would push {} metric series to Datadog", + all_series.len() + ); + + let total_points: usize = all_series + .iter() + .filter_map(|s| s["points"].as_array().map(|a| a.len())) + .sum(); + println!(" Total data points: {total_points}"); + + // Group by metric name + let mut by_metric: HashMap = HashMap::new(); + for s in all_series { + let name = s["metric"].as_str().unwrap_or("unknown"); + let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0); + let entry = by_metric.entry(name.to_string()).or_default(); + entry.0 += 1; + entry.1 += points; + } + let mut metrics: Vec<_> = by_metric.into_iter().collect(); + metrics.sort(); + for (name, (series, points)) in &metrics { + println!(" {name}: {series} series, {points} data points"); + } + + // Show sample tags from a few series + println!("\n Sample tags:"); + for s in all_series.iter().take(5) { + let metric = s["metric"].as_str().unwrap_or(""); + if let Some(tags) = s["tags"].as_array() { + let tags_str: Vec<_> = tags.iter().filter_map(|t| t.as_str()).collect(); + let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0); + println!(" {metric} ({points} pts): {}", tags_str.join(", ")); + } + } +} diff --git a/src/main.rs b/src/main.rs index 7d8a45d..df1ac3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand}; use github_tools::{ commands::{ build_db, close_old_prs, compact, delete_stale_branches, fetch_discussions, fetch_issues, - fetch_labels, generate_summaries, purge, remove_legacy_label, workflows, + fetch_labels, generate_summaries, purge, push_metrics, remove_legacy_label, workflows, }, config::Config, }; @@ -125,6 +125,19 @@ enum Command { #[arg(long)] limit: Option, }, + /// Push per-item metrics to Datadog (each issue/PR/discussion = data point with all labels as tags) + PushMetrics { + #[arg(long, help = "Path to .env file")] + env_file: Option, + #[arg(long, help = "Datadog API key (or set DD_API_KEY in .env file)")] + dd_api_key: Option, + #[arg(long, help = "Datadog site (or set DD_SITE in .env file, e.g. datadoghq.eu)")] + dd_site: Option, + #[arg(long, help = "Only push items created since this date (default: 2026-01-01)")] + since: Option, + #[arg(long, help = "Print metrics summary without sending to Datadog")] + dry_run: bool, + }, /// Deduplicate JSON year files in a data directory Compact { #[arg(help = "Path to directory containing year JSON files (e.g. data/vectordotdev_vector/issues)")] @@ -194,6 +207,20 @@ fn main() -> Result<()> { let config = Config::load(env_file.as_deref())?; build_db::run(&input, &config) } + Command::PushMetrics { + env_file, + dd_api_key, + dd_site, + since, + dry_run, + } => { + let config = Config::load(env_file.as_deref())?; + let api_key = dd_api_key + .or_else(|| std::env::var("DD_API_KEY").ok()) + .ok_or_else(|| anyhow::anyhow!("DD_API_KEY not set (use --dd-api-key or add DD_API_KEY to .env file)"))?; + let site = dd_site.or_else(|| std::env::var("DD_SITE").ok()); + push_metrics::run(&config, &api_key, site.as_deref(), since.as_deref(), dry_run) + } Command::Compact { dir } => compact::run(&dir), Command::FetchAll { env_file, since } => workflows::fetch_all(&env_file, since.as_deref()), Command::GenerateAll {