From 30b0caf3c0df8d43033cbb4aedf48dae9e10cb94 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 15 Apr 2026 12:42:16 -0400 Subject: [PATCH 1/3] Add push-metrics command for Datadog integration Reads summary CSVs and pushes custom metrics to Datadog via the v2 series API. Supports --dry-run to preview without sending. Metrics pushed: - github.issues.created/closed (monthly) - github.prs.created/closed (monthly) - github.issues.by_type / github.prs.by_type (Bug, Feature, etc.) - github.issues.by_label / github.prs.by_label (per-label monthly) - github.issues.total_open/total_closed (current snapshot) - github.prs.total_open/total_closed (current snapshot) - github.discussions.created/closed/answered (monthly) All metrics tagged with repo:{owner}/{name}. Label metrics tagged with label:{name}, type metrics with issue_type:{name}. DD_API_KEY can be passed via --dd-api-key or DD_API_KEY env var. DD_SITE for non-US1 regions via --dd-site or DD_SITE env var. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.toml | 2 +- src/commands/mod.rs | 1 + src/commands/push_metrics.rs | 301 +++++++++++++++++++++++++++++++++++ src/main.rs | 22 ++- 4 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 src/commands/push_metrics.rs diff --git a/Cargo.toml b/Cargo.toml index 532e152..bdd702c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" [dependencies] # CLI -clap = { version = "4", features = ["derive"] } +clap = { version = "4", features = ["derive", "env"] } # Error handling anyhow = "1" diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 61eda99..fd54b98 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -7,5 +7,6 @@ pub mod fetch_issues; pub mod fetch_labels; pub mod generate_summaries; pub mod purge; +pub mod push_metrics; pub mod remove_legacy_label; pub mod workflows; diff --git a/src/commands/push_metrics.rs b/src/commands/push_metrics.rs new file mode 100644 index 0000000..e0473ef --- /dev/null +++ b/src/commands/push_metrics.rs @@ -0,0 +1,301 @@ +use crate::config::Config; +use anyhow::{Context, Result}; +use reqwest::blocking::Client; +use serde_json::json; +use std::path::Path; + +const DD_API_URL: &str = "https://api.datadoghq.com/api/v2/series"; +const BATCH_SIZE: usize = 200; // well under the 500KB limit + +pub fn run(config: &Config, dd_api_key: &str, dd_site: Option<&str>, dry_run: bool) -> Result<()> { + let client = Client::new(); + let api_url = match dd_site { + Some(site) => format!("https://api.{site}/api/v2/series"), + None => DD_API_URL.to_string(), + }; + + let repo_tag = format!("repo:{}/{}", config.repo_owner, config.repo_name); + let summaries_dir = Path::new("out/summaries"); + + let mut all_series = Vec::new(); + + for table in &["issues", "pull_requests"] { + let prefix = format!("{}_{}", config.repo_owner, config.repo_name); + + // Monthly summary: created/closed + label and type breakdowns + let monthly_csv = summaries_dir.join(format!("{prefix}_{table}.monthly_summary.csv")); + if monthly_csv.exists() { + let series = monthly_summary_to_series(&monthly_csv, table, &repo_tag)?; + all_series.extend(series); + } + + // Overall totals + let totals_csv = summaries_dir.join(format!("{prefix}_{table}.overall_totals.csv")); + if totals_csv.exists() { + let series = overall_totals_to_series(&totals_csv, table, &repo_tag)?; + all_series.extend(series); + } + } + + // Discussion monthly summary + let disc_csv = summaries_dir.join(format!( + "{}_{}_discussions.monthly_summary.csv", + config.repo_owner, config.repo_name + )); + if disc_csv.exists() { + let series = discussion_summary_to_series(&disc_csv, &repo_tag)?; + all_series.extend(series); + } + + if all_series.is_empty() { + println!("No metrics to push. Run generate-summaries first."); + return Ok(()); + } + + if dry_run { + println!("[dry-run] Would push {} metric series to Datadog", all_series.len()); + // Print a sample of unique metric names + let mut metric_names: Vec = all_series + .iter() + .filter_map(|s| s["metric"].as_str().map(String::from)) + .collect(); + metric_names.sort(); + metric_names.dedup(); + for name in &metric_names { + let count = all_series + .iter() + .filter(|s| s["metric"].as_str() == Some(name)) + .count(); + let points: usize = all_series + .iter() + .filter(|s| s["metric"].as_str() == Some(name)) + .filter_map(|s| s["points"].as_array().map(|a| a.len())) + .sum(); + println!(" {name}: {count} series, {points} data points"); + } + return Ok(()); + } + + println!("Pushing {} metric series to Datadog...", all_series.len()); + + for (i, chunk) in all_series.chunks(BATCH_SIZE).enumerate() { + let payload = json!({ "series": chunk }); + let response = client + .post(&api_url) + .header("DD-API-KEY", dd_api_key) + .header("Content-Type", "application/json") + .json(&payload) + .send() + .context("Failed to send metrics to Datadog")?; + + if response.status().is_success() { + println!( + " Batch {}: submitted {} series", + i + 1, + chunk.len() + ); + } else { + let status = response.status(); + let body = response.text().unwrap_or_default(); + eprintln!(" Batch {} failed ({status}): {body}", i + 1); + } + } + + println!("Done."); + Ok(()) +} + +fn month_to_timestamp(month: &str) -> Option { + // "2025-04" -> unix timestamp of 2025-04-01T00:00:00Z + let dt = chrono::NaiveDate::parse_from_str(&format!("{month}-01"), "%Y-%m-%d").ok()?; + Some(dt.and_hms_opt(0, 0, 0)?.and_utc().timestamp()) +} + +fn monthly_summary_to_series( + path: &Path, + table: &str, + repo_tag: &str, +) -> Result> { + let mut rdr = csv::Reader::from_path(path) + .with_context(|| format!("Failed to read {}", path.display()))?; + let headers = rdr.headers()?.clone(); + let mut series_map: std::collections::HashMap> = + std::collections::HashMap::new(); + + // Metric name for the table (e.g. "issues" or "pull_requests") + let metric_table = table.replace("pull_requests", "prs"); + + for result in rdr.records() { + let record = result?; + let month = record.get(0).unwrap_or(""); + let ts = match month_to_timestamp(month) { + Some(t) => t, + None => continue, + }; + + for (i, header) in headers.iter().enumerate().skip(1) { + let val: f64 = record.get(i).unwrap_or("0").parse().unwrap_or(0.0); + if val == 0.0 { + continue; + } + + let (metric, tags) = classify_column(header, &metric_table, repo_tag); + series_map + .entry(format!("{metric}:{}", tags.join(","))) + .or_insert_with(|| vec![json!(metric), json!(tags), json!([])]) + .get_mut(2) + .unwrap() + .as_array_mut() + .unwrap() + .push(json!({"timestamp": ts, "value": val})); + } + } + + let series: Vec = series_map + .into_values() + .map(|parts| { + json!({ + "metric": parts[0], + "type": 0, // gauge in v2 API + "points": parts[2], + "tags": parts[1], + }) + }) + .collect(); + + Ok(series) +} + +fn classify_column(header: &str, metric_table: &str, repo_tag: &str) -> (String, Vec) { + let base_tags = vec![repo_tag.to_string()]; + + // created_issues / closed_issues / created_pull_requests / closed_pull_requests + if header.starts_with("created_") { + return ( + format!("github.{metric_table}.created"), + base_tags, + ); + } + if header.starts_with("closed_") { + return ( + format!("github.{metric_table}.closed"), + base_tags, + ); + } + + // Issue types: Bug, Feature, Task (capitalized, from issue_type column) + if header == "Bug" || header == "Feature" || header == "Task" || header == "Enhancement" { + let mut tags = base_tags; + tags.push(format!("issue_type:{}", header.to_lowercase())); + return (format!("github.{metric_table}.by_type"), tags); + } + + // type: labels (old style): "type: bug", "type: feature", etc. + if let Some(type_name) = header.strip_prefix("type: ") { + let mut tags = base_tags; + tags.push(format!("issue_type:{type_name}")); + return (format!("github.{metric_table}.by_type"), tags); + } + + // All other labels: emit as github.{table}.by_label with label tag + let mut tags = base_tags; + tags.push(format!("label:{header}")); + (format!("github.{metric_table}.by_label"), tags) +} + +fn overall_totals_to_series( + path: &Path, + table: &str, + repo_tag: &str, +) -> Result> { + let mut rdr = csv::Reader::from_path(path) + .with_context(|| format!("Failed to read {}", path.display()))?; + let metric_table = table.replace("pull_requests", "prs"); + let tags = vec![repo_tag.to_string()]; + let now = chrono::Utc::now().timestamp(); + + let mut series = Vec::new(); + for result in rdr.records() { + let record = result?; + let open: f64 = record.get(0).unwrap_or("0").parse().unwrap_or(0.0); + let closed: f64 = record.get(1).unwrap_or("0").parse().unwrap_or(0.0); + + series.push(json!({ + "metric": format!("github.{metric_table}.total_open"), + "type": 0, + "points": [{"timestamp": now, "value": open}], + "tags": tags, + })); + series.push(json!({ + "metric": format!("github.{metric_table}.total_closed"), + "type": 0, + "points": [{"timestamp": now, "value": closed}], + "tags": tags, + })); + } + Ok(series) +} + +fn discussion_summary_to_series( + path: &Path, + repo_tag: &str, +) -> Result> { + let mut rdr = csv::Reader::from_path(path) + .with_context(|| format!("Failed to read {}", path.display()))?; + let tags = vec![repo_tag.to_string()]; + + let mut created_points = Vec::new(); + let mut closed_points = Vec::new(); + let mut answered_points = Vec::new(); + + for result in rdr.records() { + let record = result?; + let month = record.get(0).unwrap_or(""); + let ts = match month_to_timestamp(month) { + Some(t) => t, + None => continue, + }; + + let created: f64 = record.get(1).unwrap_or("0").parse().unwrap_or(0.0); + let closed: f64 = record.get(2).unwrap_or("0").parse().unwrap_or(0.0); + let answered: f64 = record.get(3).unwrap_or("0").parse().unwrap_or(0.0); + + if created > 0.0 { + created_points.push(json!({"timestamp": ts, "value": created})); + } + if closed > 0.0 { + closed_points.push(json!({"timestamp": ts, "value": closed})); + } + if answered > 0.0 { + answered_points.push(json!({"timestamp": ts, "value": answered})); + } + } + + let mut series = Vec::new(); + if !created_points.is_empty() { + series.push(json!({ + "metric": "github.discussions.created", + "type": 0, + "points": created_points, + "tags": tags, + })); + } + if !closed_points.is_empty() { + series.push(json!({ + "metric": "github.discussions.closed", + "type": 0, + "points": closed_points, + "tags": tags, + })); + } + if !answered_points.is_empty() { + series.push(json!({ + "metric": "github.discussions.answered", + "type": 0, + "points": answered_points, + "tags": tags, + })); + } + + Ok(series) +} diff --git a/src/main.rs b/src/main.rs index 7d8a45d..a70c5b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand}; use github_tools::{ commands::{ build_db, close_old_prs, compact, delete_stale_branches, fetch_discussions, fetch_issues, - fetch_labels, generate_summaries, purge, remove_legacy_label, workflows, + fetch_labels, generate_summaries, purge, push_metrics, remove_legacy_label, workflows, }, config::Config, }; @@ -125,6 +125,17 @@ enum Command { #[arg(long)] limit: Option, }, + /// Push summary metrics to Datadog + PushMetrics { + #[arg(long, help = "Path to .env file")] + env_file: Option, + #[arg(long, env = "DD_API_KEY", help = "Datadog API key (or set DD_API_KEY env var)")] + dd_api_key: String, + #[arg(long, env = "DD_SITE", help = "Datadog site (e.g. datadoghq.eu, us5.datadoghq.com)")] + dd_site: Option, + #[arg(long, help = "Print metrics summary without sending to Datadog")] + dry_run: bool, + }, /// Deduplicate JSON year files in a data directory Compact { #[arg(help = "Path to directory containing year JSON files (e.g. data/vectordotdev_vector/issues)")] @@ -194,6 +205,15 @@ fn main() -> Result<()> { let config = Config::load(env_file.as_deref())?; build_db::run(&input, &config) } + Command::PushMetrics { + env_file, + dd_api_key, + dd_site, + dry_run, + } => { + let config = Config::load(env_file.as_deref())?; + push_metrics::run(&config, &dd_api_key, dd_site.as_deref(), dry_run) + } Command::Compact { dir } => compact::run(&dir), Command::FetchAll { env_file, since } => workflows::fetch_all(&env_file, since.as_deref()), Command::GenerateAll { From 79c06a182158a570d6d879ea7076aa3bfa2fb400 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 15 Apr 2026 13:02:36 -0400 Subject: [PATCH 2/3] Rework push-metrics to per-issue model with full label tags Instead of pre-aggregated monthly summaries, each issue/PR/discussion becomes a data point at its created_at timestamp, tagged with ALL its labels. This enables cross-label queries in Datadog like: github.issues{domain:vrl,type:bug} Label tags use the label prefix as key: "domain: vrl" -> domain:vrl, "source: kafka" -> source:kafka. Unprefixed labels use label: key. Defaults to --since 2026-01-01 to keep initial volume small. 312 series / 1,158 points for Vector 2026 data. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/commands/push_metrics.rs | 464 +++++++++++++++++++---------------- src/main.rs | 7 +- 2 files changed, 252 insertions(+), 219 deletions(-) diff --git a/src/commands/push_metrics.rs b/src/commands/push_metrics.rs index e0473ef..171d323 100644 --- a/src/commands/push_metrics.rs +++ b/src/commands/push_metrics.rs @@ -1,83 +1,60 @@ use crate::config::Config; use anyhow::{Context, Result}; use reqwest::blocking::Client; +use rusqlite::Connection; use serde_json::json; -use std::path::Path; +use std::collections::HashMap; const DD_API_URL: &str = "https://api.datadoghq.com/api/v2/series"; -const BATCH_SIZE: usize = 200; // well under the 500KB limit - -pub fn run(config: &Config, dd_api_key: &str, dd_site: Option<&str>, dry_run: bool) -> Result<()> { +const BATCH_SIZE: usize = 200; + +pub fn run( + config: &Config, + dd_api_key: &str, + dd_site: Option<&str>, + since: Option<&str>, + dry_run: bool, +) -> Result<()> { let client = Client::new(); let api_url = match dd_site { Some(site) => format!("https://api.{site}/api/v2/series"), None => DD_API_URL.to_string(), }; + let db_path = format!("out/db/{}_{}.db", config.repo_owner, config.repo_name); + let conn = Connection::open(&db_path) + .with_context(|| format!("Failed to open database: {db_path}"))?; + let repo_tag = format!("repo:{}/{}", config.repo_owner, config.repo_name); - let summaries_dir = Path::new("out/summaries"); + let since_filter = since.unwrap_or("2026-01-01"); - let mut all_series = Vec::new(); + println!("Reading from {db_path} (created_at >= {since_filter})..."); - for table in &["issues", "pull_requests"] { - let prefix = format!("{}_{}", config.repo_owner, config.repo_name); + let mut all_series = Vec::new(); - // Monthly summary: created/closed + label and type breakdowns - let monthly_csv = summaries_dir.join(format!("{prefix}_{table}.monthly_summary.csv")); - if monthly_csv.exists() { - let series = monthly_summary_to_series(&monthly_csv, table, &repo_tag)?; - all_series.extend(series); - } + // Issues + let issue_series = items_to_series(&conn, "issues", &repo_tag, since_filter)?; + all_series.extend(issue_series); - // Overall totals - let totals_csv = summaries_dir.join(format!("{prefix}_{table}.overall_totals.csv")); - if totals_csv.exists() { - let series = overall_totals_to_series(&totals_csv, table, &repo_tag)?; - all_series.extend(series); - } - } + // Pull requests + let pr_series = items_to_series(&conn, "pull_requests", &repo_tag, since_filter)?; + all_series.extend(pr_series); - // Discussion monthly summary - let disc_csv = summaries_dir.join(format!( - "{}_{}_discussions.monthly_summary.csv", - config.repo_owner, config.repo_name - )); - if disc_csv.exists() { - let series = discussion_summary_to_series(&disc_csv, &repo_tag)?; - all_series.extend(series); - } + // Discussions + let disc_series = discussions_to_series(&conn, &repo_tag, since_filter)?; + all_series.extend(disc_series); if all_series.is_empty() { - println!("No metrics to push. Run generate-summaries first."); + println!("No metrics to push."); return Ok(()); } if dry_run { - println!("[dry-run] Would push {} metric series to Datadog", all_series.len()); - // Print a sample of unique metric names - let mut metric_names: Vec = all_series - .iter() - .filter_map(|s| s["metric"].as_str().map(String::from)) - .collect(); - metric_names.sort(); - metric_names.dedup(); - for name in &metric_names { - let count = all_series - .iter() - .filter(|s| s["metric"].as_str() == Some(name)) - .count(); - let points: usize = all_series - .iter() - .filter(|s| s["metric"].as_str() == Some(name)) - .filter_map(|s| s["points"].as_array().map(|a| a.len())) - .sum(); - println!(" {name}: {count} series, {points} data points"); - } + print_dry_run(&all_series); return Ok(()); } println!("Pushing {} metric series to Datadog...", all_series.len()); - for (i, chunk) in all_series.chunks(BATCH_SIZE).enumerate() { let payload = json!({ "series": chunk }); let response = client @@ -89,11 +66,7 @@ pub fn run(config: &Config, dd_api_key: &str, dd_site: Option<&str>, dry_run: bo .context("Failed to send metrics to Datadog")?; if response.status().is_success() { - println!( - " Batch {}: submitted {} series", - i + 1, - chunk.len() - ); + println!(" Batch {}: submitted {} series", i + 1, chunk.len()); } else { let status = response.status(); let body = response.text().unwrap_or_default(); @@ -105,197 +78,254 @@ pub fn run(config: &Config, dd_api_key: &str, dd_site: Option<&str>, dry_run: bo Ok(()) } -fn month_to_timestamp(month: &str) -> Option { - // "2025-04" -> unix timestamp of 2025-04-01T00:00:00Z - let dt = chrono::NaiveDate::parse_from_str(&format!("{month}-01"), "%Y-%m-%d").ok()?; - Some(dt.and_hms_opt(0, 0, 0)?.and_utc().timestamp()) -} - -fn monthly_summary_to_series( - path: &Path, +/// Build per-item metric series for issues or pull_requests. +/// Each item becomes a data point with value=1 at its created_at timestamp, +/// tagged with all its labels, state, and issue_type. +fn items_to_series( + conn: &Connection, table: &str, repo_tag: &str, + since: &str, ) -> Result> { - let mut rdr = csv::Reader::from_path(path) - .with_context(|| format!("Failed to read {}", path.display()))?; - let headers = rdr.headers()?.clone(); - let mut series_map: std::collections::HashMap> = - std::collections::HashMap::new(); - - // Metric name for the table (e.g. "issues" or "pull_requests") - let metric_table = table.replace("pull_requests", "prs"); - - for result in rdr.records() { - let record = result?; - let month = record.get(0).unwrap_or(""); - let ts = match month_to_timestamp(month) { + let metric_name = if table == "pull_requests" { + "github.prs" + } else { + "github.issues" + }; + + // Build a map of item_id -> list of label names + let label_map = build_label_map(conn, table, since)?; + + // Query items + let issue_type_col = if table == "issues" { ", issue_type" } else { "" }; + let extra_filter = if table == "pull_requests" { + " AND is_draft = 0" + } else { + "" + }; + let query = format!( + "SELECT id, number, state, created_at, closed_at{issue_type_col} + FROM {table} + WHERE created_at >= ?{extra_filter} + ORDER BY created_at" + ); + + let mut stmt = conn.prepare(&query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + + // Accumulate: group by unique tag set -> series with points + let mut series_map: HashMap, Vec)> = + HashMap::new(); + + while let Some(row) = rows.next()? { + let id: i64 = row.get(0)?; + let _number: i64 = row.get(1)?; + let state: String = row.get(2)?; + let created_at: String = row.get(3)?; + let closed_at: Option = row.get(4)?; + let issue_type: Option = if table == "issues" { + row.get(5)? + } else { + None + }; + + let ts = match parse_timestamp(&created_at) { Some(t) => t, None => continue, }; - for (i, header) in headers.iter().enumerate().skip(1) { - let val: f64 = record.get(i).unwrap_or("0").parse().unwrap_or(0.0); - if val == 0.0 { - continue; + let mut tags = vec![repo_tag.to_string(), format!("state:{state}")]; + + if let Some(ref it) = issue_type { + tags.push(format!("issue_type:{}", it.to_lowercase())); + } + + // Add all labels as tags + if let Some(labels) = label_map.get(&id) { + for label in labels { + tags.push(label_to_tag(label)); } + } - let (metric, tags) = classify_column(header, &metric_table, repo_tag); - series_map - .entry(format!("{metric}:{}", tags.join(","))) - .or_insert_with(|| vec![json!(metric), json!(tags), json!([])]) - .get_mut(2) - .unwrap() - .as_array_mut() - .unwrap() - .push(json!({"timestamp": ts, "value": val})); + // Sort tags for consistent grouping + tags.sort(); + let key = tags.join(","); + + let entry = series_map + .entry(key) + .or_insert_with(|| (metric_name.to_string(), tags.clone(), Vec::new())); + entry.2.push(json!({"timestamp": ts, "value": 1})); + + // If closed, also emit a closed event + if state == "closed" { + if let Some(ref ca) = closed_at { + if let Some(closed_ts) = parse_timestamp(ca) { + let closed_tags = tags.clone(); + // Replace state:closed -> already there + let closed_key = format!("closed:{}", closed_tags.join(",")); + let closed_metric = format!("{metric_name}.closed"); + let entry = series_map + .entry(closed_key) + .or_insert_with(|| (closed_metric, closed_tags, Vec::new())); + entry.2.push(json!({"timestamp": closed_ts, "value": 1})); + } + } } } - let series: Vec = series_map + Ok(series_map .into_values() - .map(|parts| { + .map(|(metric, tags, points)| { json!({ - "metric": parts[0], - "type": 0, // gauge in v2 API - "points": parts[2], - "tags": parts[1], + "metric": metric, + "type": 1, // count + "points": points, + "tags": tags, }) }) - .collect(); - - Ok(series) + .collect()) } -fn classify_column(header: &str, metric_table: &str, repo_tag: &str) -> (String, Vec) { - let base_tags = vec![repo_tag.to_string()]; - - // created_issues / closed_issues / created_pull_requests / closed_pull_requests - if header.starts_with("created_") { - return ( - format!("github.{metric_table}.created"), - base_tags, - ); - } - if header.starts_with("closed_") { - return ( - format!("github.{metric_table}.closed"), - base_tags, - ); - } +fn discussions_to_series( + conn: &Connection, + repo_tag: &str, + since: &str, +) -> Result> { + let query = + "SELECT number, category, created_at, closed, is_answered + FROM discussions + WHERE created_at >= ? + ORDER BY created_at"; + + let mut stmt = conn.prepare(query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + + let mut series_map: HashMap, Vec)> = + HashMap::new(); + + while let Some(row) = rows.next()? { + let _number: i64 = row.get(0)?; + let category: String = row.get(1)?; + let created_at: String = row.get(2)?; + let closed: bool = row.get(3)?; + let is_answered: Option = row.get(4)?; + + let ts = match parse_timestamp(&created_at) { + Some(t) => t, + None => continue, + }; - // Issue types: Bug, Feature, Task (capitalized, from issue_type column) - if header == "Bug" || header == "Feature" || header == "Task" || header == "Enhancement" { - let mut tags = base_tags; - tags.push(format!("issue_type:{}", header.to_lowercase())); - return (format!("github.{metric_table}.by_type"), tags); - } + let answered = is_answered.unwrap_or(false); + let state = if closed { + "closed" + } else if answered { + "answered" + } else { + "open" + }; - // type: labels (old style): "type: bug", "type: feature", etc. - if let Some(type_name) = header.strip_prefix("type: ") { - let mut tags = base_tags; - tags.push(format!("issue_type:{type_name}")); - return (format!("github.{metric_table}.by_type"), tags); + let tags = vec![ + repo_tag.to_string(), + format!("category:{}", category.to_lowercase()), + format!("state:{state}"), + format!("answered:{answered}"), + ]; + + let key = tags.join(","); + let entry = series_map + .entry(key) + .or_insert_with(|| ("github.discussions".to_string(), tags.clone(), Vec::new())); + entry.2.push(json!({"timestamp": ts, "value": 1})); } - // All other labels: emit as github.{table}.by_label with label tag - let mut tags = base_tags; - tags.push(format!("label:{header}")); - (format!("github.{metric_table}.by_label"), tags) + Ok(series_map + .into_values() + .map(|(metric, tags, points)| { + json!({ + "metric": metric, + "type": 1, + "points": points, + "tags": tags, + }) + }) + .collect()) } -fn overall_totals_to_series( - path: &Path, +fn build_label_map( + conn: &Connection, table: &str, - repo_tag: &str, -) -> Result> { - let mut rdr = csv::Reader::from_path(path) - .with_context(|| format!("Failed to read {}", path.display()))?; - let metric_table = table.replace("pull_requests", "prs"); - let tags = vec![repo_tag.to_string()]; - let now = chrono::Utc::now().timestamp(); - - let mut series = Vec::new(); - for result in rdr.records() { - let record = result?; - let open: f64 = record.get(0).unwrap_or("0").parse().unwrap_or(0.0); - let closed: f64 = record.get(1).unwrap_or("0").parse().unwrap_or(0.0); - - series.push(json!({ - "metric": format!("github.{metric_table}.total_open"), - "type": 0, - "points": [{"timestamp": now, "value": open}], - "tags": tags, - })); - series.push(json!({ - "metric": format!("github.{metric_table}.total_closed"), - "type": 0, - "points": [{"timestamp": now, "value": closed}], - "tags": tags, - })); + since: &str, +) -> Result>> { + let query = format!( + "SELECT il.issue_id, l.name + FROM issue_labels il + JOIN labels l ON l.id = il.label_id + JOIN {table} t ON t.id = il.issue_id + WHERE t.created_at >= ?" + ); + let mut stmt = conn.prepare(&query)?; + let mut rows = stmt.query(rusqlite::params![since])?; + let mut map: HashMap> = HashMap::new(); + while let Some(row) = rows.next()? { + let id: i64 = row.get(0)?; + let name: String = row.get(1)?; + map.entry(id).or_default().push(name); } - Ok(series) + Ok(map) } -fn discussion_summary_to_series( - path: &Path, - repo_tag: &str, -) -> Result> { - let mut rdr = csv::Reader::from_path(path) - .with_context(|| format!("Failed to read {}", path.display()))?; - let tags = vec![repo_tag.to_string()]; - - let mut created_points = Vec::new(); - let mut closed_points = Vec::new(); - let mut answered_points = Vec::new(); - - for result in rdr.records() { - let record = result?; - let month = record.get(0).unwrap_or(""); - let ts = match month_to_timestamp(month) { - Some(t) => t, - None => continue, - }; - - let created: f64 = record.get(1).unwrap_or("0").parse().unwrap_or(0.0); - let closed: f64 = record.get(2).unwrap_or("0").parse().unwrap_or(0.0); - let answered: f64 = record.get(3).unwrap_or("0").parse().unwrap_or(0.0); - - if created > 0.0 { - created_points.push(json!({"timestamp": ts, "value": created})); - } - if closed > 0.0 { - closed_points.push(json!({"timestamp": ts, "value": closed})); - } - if answered > 0.0 { - answered_points.push(json!({"timestamp": ts, "value": answered})); - } +fn label_to_tag(label: &str) -> String { + // "domain: vrl" -> "domain:vrl" + // "type: bug" -> "type:bug" + // "good first issue" -> "label:good first issue" + if let Some((prefix, value)) = label.split_once(": ") { + format!("{prefix}:{value}") + } else { + format!("label:{label}") } +} - let mut series = Vec::new(); - if !created_points.is_empty() { - series.push(json!({ - "metric": "github.discussions.created", - "type": 0, - "points": created_points, - "tags": tags, - })); - } - if !closed_points.is_empty() { - series.push(json!({ - "metric": "github.discussions.closed", - "type": 0, - "points": closed_points, - "tags": tags, - })); +fn parse_timestamp(s: &str) -> Option { + chrono::DateTime::parse_from_rfc3339(s) + .ok() + .map(|dt| dt.timestamp()) +} + +fn print_dry_run(all_series: &[serde_json::Value]) { + println!( + "[dry-run] Would push {} metric series to Datadog", + all_series.len() + ); + + let total_points: usize = all_series + .iter() + .filter_map(|s| s["points"].as_array().map(|a| a.len())) + .sum(); + println!(" Total data points: {total_points}"); + + // Group by metric name + let mut by_metric: HashMap = HashMap::new(); + for s in all_series { + let name = s["metric"].as_str().unwrap_or("unknown"); + let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0); + let entry = by_metric.entry(name.to_string()).or_default(); + entry.0 += 1; + entry.1 += points; } - if !answered_points.is_empty() { - series.push(json!({ - "metric": "github.discussions.answered", - "type": 0, - "points": answered_points, - "tags": tags, - })); + let mut metrics: Vec<_> = by_metric.into_iter().collect(); + metrics.sort(); + for (name, (series, points)) in &metrics { + println!(" {name}: {series} series, {points} data points"); } - Ok(series) + // Show sample tags from a few series + println!("\n Sample tags:"); + for s in all_series.iter().take(5) { + let metric = s["metric"].as_str().unwrap_or(""); + if let Some(tags) = s["tags"].as_array() { + let tags_str: Vec<_> = tags.iter().filter_map(|t| t.as_str()).collect(); + let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0); + println!(" {metric} ({points} pts): {}", tags_str.join(", ")); + } + } } diff --git a/src/main.rs b/src/main.rs index a70c5b7..9dee861 100644 --- a/src/main.rs +++ b/src/main.rs @@ -125,7 +125,7 @@ enum Command { #[arg(long)] limit: Option, }, - /// Push summary metrics to Datadog + /// Push per-item metrics to Datadog (each issue/PR/discussion = data point with all labels as tags) PushMetrics { #[arg(long, help = "Path to .env file")] env_file: Option, @@ -133,6 +133,8 @@ enum Command { dd_api_key: String, #[arg(long, env = "DD_SITE", help = "Datadog site (e.g. datadoghq.eu, us5.datadoghq.com)")] dd_site: Option, + #[arg(long, help = "Only push items created since this date (default: 2026-01-01)")] + since: Option, #[arg(long, help = "Print metrics summary without sending to Datadog")] dry_run: bool, }, @@ -209,10 +211,11 @@ fn main() -> Result<()> { env_file, dd_api_key, dd_site, + since, dry_run, } => { let config = Config::load(env_file.as_deref())?; - push_metrics::run(&config, &dd_api_key, dd_site.as_deref(), dry_run) + push_metrics::run(&config, &dd_api_key, dd_site.as_deref(), since.as_deref(), dry_run) } Command::Compact { dir } => compact::run(&dir), Command::FetchAll { env_file, since } => workflows::fetch_all(&env_file, since.as_deref()), From 6fda1f3d6f37d0ceeee4b257627f42674fe9e55a Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 15 Apr 2026 13:06:19 -0400 Subject: [PATCH 3/3] Read DD_API_KEY and DD_SITE from .env file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes --dd-api-key optional — reads from DD_API_KEY in the .env file loaded by --env-file, or from the environment. Same for DD_SITE. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/main.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9dee861..df1ac3f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -129,9 +129,9 @@ enum Command { PushMetrics { #[arg(long, help = "Path to .env file")] env_file: Option, - #[arg(long, env = "DD_API_KEY", help = "Datadog API key (or set DD_API_KEY env var)")] - dd_api_key: String, - #[arg(long, env = "DD_SITE", help = "Datadog site (e.g. datadoghq.eu, us5.datadoghq.com)")] + #[arg(long, help = "Datadog API key (or set DD_API_KEY in .env file)")] + dd_api_key: Option, + #[arg(long, help = "Datadog site (or set DD_SITE in .env file, e.g. datadoghq.eu)")] dd_site: Option, #[arg(long, help = "Only push items created since this date (default: 2026-01-01)")] since: Option, @@ -215,7 +215,11 @@ fn main() -> Result<()> { dry_run, } => { let config = Config::load(env_file.as_deref())?; - push_metrics::run(&config, &dd_api_key, dd_site.as_deref(), since.as_deref(), dry_run) + let api_key = dd_api_key + .or_else(|| std::env::var("DD_API_KEY").ok()) + .ok_or_else(|| anyhow::anyhow!("DD_API_KEY not set (use --dd-api-key or add DD_API_KEY to .env file)"))?; + let site = dd_site.or_else(|| std::env::var("DD_SITE").ok()); + push_metrics::run(&config, &api_key, site.as_deref(), since.as_deref(), dry_run) } Command::Compact { dir } => compact::run(&dir), Command::FetchAll { env_file, since } => workflows::fetch_all(&env_file, since.as_deref()),