Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path = "src/lib.rs"

[dependencies]
# CLI
clap = { version = "4", features = ["derive"] }
clap = { version = "4", features = ["derive", "env"] }

# Error handling
anyhow = "1"
Expand Down
1 change: 1 addition & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub mod fetch_issues;
pub mod fetch_labels;
pub mod generate_summaries;
pub mod purge;
pub mod push_metrics;
pub mod remove_legacy_label;
pub mod workflows;
331 changes: 331 additions & 0 deletions src/commands/push_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
use crate::config::Config;
use anyhow::{Context, Result};
use reqwest::blocking::Client;
use rusqlite::Connection;
use serde_json::json;
use std::collections::HashMap;

const DD_API_URL: &str = "https://api.datadoghq.com/api/v2/series";
const BATCH_SIZE: usize = 200;

pub fn run(
config: &Config,
dd_api_key: &str,
dd_site: Option<&str>,
since: Option<&str>,
dry_run: bool,
) -> Result<()> {
let client = Client::new();
let api_url = match dd_site {
Some(site) => format!("https://api.{site}/api/v2/series"),
None => DD_API_URL.to_string(),
};

let db_path = format!("out/db/{}_{}.db", config.repo_owner, config.repo_name);
let conn = Connection::open(&db_path)
.with_context(|| format!("Failed to open database: {db_path}"))?;

let repo_tag = format!("repo:{}/{}", config.repo_owner, config.repo_name);
let since_filter = since.unwrap_or("2026-01-01");

println!("Reading from {db_path} (created_at >= {since_filter})...");

let mut all_series = Vec::new();

// Issues
let issue_series = items_to_series(&conn, "issues", &repo_tag, since_filter)?;
all_series.extend(issue_series);

// Pull requests
let pr_series = items_to_series(&conn, "pull_requests", &repo_tag, since_filter)?;
all_series.extend(pr_series);

// Discussions
let disc_series = discussions_to_series(&conn, &repo_tag, since_filter)?;
all_series.extend(disc_series);

if all_series.is_empty() {
println!("No metrics to push.");
return Ok(());
}

if dry_run {
print_dry_run(&all_series);
return Ok(());
}

println!("Pushing {} metric series to Datadog...", all_series.len());
for (i, chunk) in all_series.chunks(BATCH_SIZE).enumerate() {
let payload = json!({ "series": chunk });
let response = client
.post(&api_url)
.header("DD-API-KEY", dd_api_key)
.header("Content-Type", "application/json")
.json(&payload)
.send()
.context("Failed to send metrics to Datadog")?;

if response.status().is_success() {
println!(" Batch {}: submitted {} series", i + 1, chunk.len());
} else {
let status = response.status();
let body = response.text().unwrap_or_default();
eprintln!(" Batch {} failed ({status}): {body}", i + 1);
}
}

println!("Done.");
Ok(())
}

/// Build per-item metric series for issues or pull_requests.
/// Each item becomes a data point with value=1 at its created_at timestamp,
/// tagged with all its labels, state, and issue_type.
fn items_to_series(
conn: &Connection,
table: &str,
repo_tag: &str,
since: &str,
) -> Result<Vec<serde_json::Value>> {
let metric_name = if table == "pull_requests" {
"github.prs"
} else {
"github.issues"
};

// Build a map of item_id -> list of label names
let label_map = build_label_map(conn, table, since)?;

// Query items
let issue_type_col = if table == "issues" { ", issue_type" } else { "" };
let extra_filter = if table == "pull_requests" {
" AND is_draft = 0"
} else {
""
};
let query = format!(
"SELECT id, number, state, created_at, closed_at{issue_type_col}
FROM {table}
WHERE created_at >= ?{extra_filter}
ORDER BY created_at"
);

let mut stmt = conn.prepare(&query)?;
let mut rows = stmt.query(rusqlite::params![since])?;

// Accumulate: group by unique tag set -> series with points
let mut series_map: HashMap<String, (String, Vec<String>, Vec<serde_json::Value>)> =
HashMap::new();

while let Some(row) = rows.next()? {
let id: i64 = row.get(0)?;
let _number: i64 = row.get(1)?;
let state: String = row.get(2)?;
let created_at: String = row.get(3)?;
let closed_at: Option<String> = row.get(4)?;
let issue_type: Option<String> = if table == "issues" {
row.get(5)?
} else {
None
};

let ts = match parse_timestamp(&created_at) {
Some(t) => t,
None => continue,
};

let mut tags = vec![repo_tag.to_string(), format!("state:{state}")];

if let Some(ref it) = issue_type {
tags.push(format!("issue_type:{}", it.to_lowercase()));
}

// Add all labels as tags
if let Some(labels) = label_map.get(&id) {
for label in labels {
tags.push(label_to_tag(label));
}
}

// Sort tags for consistent grouping
tags.sort();
let key = tags.join(",");

let entry = series_map
.entry(key)
.or_insert_with(|| (metric_name.to_string(), tags.clone(), Vec::new()));
entry.2.push(json!({"timestamp": ts, "value": 1}));

// If closed, also emit a closed event
if state == "closed" {
if let Some(ref ca) = closed_at {
if let Some(closed_ts) = parse_timestamp(ca) {
let closed_tags = tags.clone();
// Replace state:closed -> already there
let closed_key = format!("closed:{}", closed_tags.join(","));
let closed_metric = format!("{metric_name}.closed");
let entry = series_map
.entry(closed_key)
.or_insert_with(|| (closed_metric, closed_tags, Vec::new()));
entry.2.push(json!({"timestamp": closed_ts, "value": 1}));
}
}
}
}

Ok(series_map
.into_values()
.map(|(metric, tags, points)| {
json!({
"metric": metric,
"type": 1, // count
"points": points,
"tags": tags,
})
})
.collect())
}

fn discussions_to_series(
conn: &Connection,
repo_tag: &str,
since: &str,
) -> Result<Vec<serde_json::Value>> {
let query =
"SELECT number, category, created_at, closed, is_answered
FROM discussions
WHERE created_at >= ?
ORDER BY created_at";

let mut stmt = conn.prepare(query)?;
let mut rows = stmt.query(rusqlite::params![since])?;

let mut series_map: HashMap<String, (String, Vec<String>, Vec<serde_json::Value>)> =
HashMap::new();

while let Some(row) = rows.next()? {
let _number: i64 = row.get(0)?;
let category: String = row.get(1)?;
let created_at: String = row.get(2)?;
let closed: bool = row.get(3)?;
let is_answered: Option<bool> = row.get(4)?;

let ts = match parse_timestamp(&created_at) {
Some(t) => t,
None => continue,
};

let answered = is_answered.unwrap_or(false);
let state = if closed {
"closed"
} else if answered {
"answered"
} else {
"open"
};

let tags = vec![
repo_tag.to_string(),
format!("category:{}", category.to_lowercase()),
format!("state:{state}"),
format!("answered:{answered}"),
];

let key = tags.join(",");
let entry = series_map
.entry(key)
.or_insert_with(|| ("github.discussions".to_string(), tags.clone(), Vec::new()));
entry.2.push(json!({"timestamp": ts, "value": 1}));
}

Ok(series_map
.into_values()
.map(|(metric, tags, points)| {
json!({
"metric": metric,
"type": 1,
"points": points,
"tags": tags,
})
})
.collect())
}

fn build_label_map(
conn: &Connection,
table: &str,
since: &str,
) -> Result<HashMap<i64, Vec<String>>> {
let query = format!(
"SELECT il.issue_id, l.name
FROM issue_labels il
JOIN labels l ON l.id = il.label_id
JOIN {table} t ON t.id = il.issue_id
WHERE t.created_at >= ?"
);
let mut stmt = conn.prepare(&query)?;
let mut rows = stmt.query(rusqlite::params![since])?;
let mut map: HashMap<i64, Vec<String>> = HashMap::new();
while let Some(row) = rows.next()? {
let id: i64 = row.get(0)?;
let name: String = row.get(1)?;
map.entry(id).or_default().push(name);
}
Ok(map)
}

fn label_to_tag(label: &str) -> String {
// "domain: vrl" -> "domain:vrl"
// "type: bug" -> "type:bug"
// "good first issue" -> "label:good first issue"
if let Some((prefix, value)) = label.split_once(": ") {
format!("{prefix}:{value}")
} else {
format!("label:{label}")
}
}

fn parse_timestamp(s: &str) -> Option<i64> {
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.timestamp())
}

fn print_dry_run(all_series: &[serde_json::Value]) {
println!(
"[dry-run] Would push {} metric series to Datadog",
all_series.len()
);

let total_points: usize = all_series
.iter()
.filter_map(|s| s["points"].as_array().map(|a| a.len()))
.sum();
println!(" Total data points: {total_points}");

// Group by metric name
let mut by_metric: HashMap<String, (usize, usize)> = HashMap::new();
for s in all_series {
let name = s["metric"].as_str().unwrap_or("unknown");
let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0);
let entry = by_metric.entry(name.to_string()).or_default();
entry.0 += 1;
entry.1 += points;
}
let mut metrics: Vec<_> = by_metric.into_iter().collect();
metrics.sort();
for (name, (series, points)) in &metrics {
println!(" {name}: {series} series, {points} data points");
}

// Show sample tags from a few series
println!("\n Sample tags:");
for s in all_series.iter().take(5) {
let metric = s["metric"].as_str().unwrap_or("");
if let Some(tags) = s["tags"].as_array() {
let tags_str: Vec<_> = tags.iter().filter_map(|t| t.as_str()).collect();
let points = s["points"].as_array().map(|a| a.len()).unwrap_or(0);
println!(" {metric} ({points} pts): {}", tags_str.join(", "));
}
}
}
29 changes: 28 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clap::{Parser, Subcommand};
use github_tools::{
commands::{
build_db, close_old_prs, compact, delete_stale_branches, fetch_discussions, fetch_issues,
fetch_labels, generate_summaries, purge, remove_legacy_label, workflows,
fetch_labels, generate_summaries, purge, push_metrics, remove_legacy_label, workflows,
},
config::Config,
};
Expand Down Expand Up @@ -125,6 +125,19 @@ enum Command {
#[arg(long)]
limit: Option<usize>,
},
/// Push per-item metrics to Datadog (each issue/PR/discussion = data point with all labels as tags)
PushMetrics {
#[arg(long, help = "Path to .env file")]
env_file: Option<String>,
#[arg(long, help = "Datadog API key (or set DD_API_KEY in .env file)")]
dd_api_key: Option<String>,
#[arg(long, help = "Datadog site (or set DD_SITE in .env file, e.g. datadoghq.eu)")]
dd_site: Option<String>,
#[arg(long, help = "Only push items created since this date (default: 2026-01-01)")]
since: Option<String>,
#[arg(long, help = "Print metrics summary without sending to Datadog")]
dry_run: bool,
},
/// Deduplicate JSON year files in a data directory
Compact {
#[arg(help = "Path to directory containing year JSON files (e.g. data/vectordotdev_vector/issues)")]
Expand Down Expand Up @@ -194,6 +207,20 @@ fn main() -> Result<()> {
let config = Config::load(env_file.as_deref())?;
build_db::run(&input, &config)
}
Command::PushMetrics {
env_file,
dd_api_key,
dd_site,
since,
dry_run,
} => {
let config = Config::load(env_file.as_deref())?;
let api_key = dd_api_key
.or_else(|| std::env::var("DD_API_KEY").ok())
.ok_or_else(|| anyhow::anyhow!("DD_API_KEY not set (use --dd-api-key or add DD_API_KEY to .env file)"))?;
let site = dd_site.or_else(|| std::env::var("DD_SITE").ok());
push_metrics::run(&config, &api_key, site.as_deref(), since.as_deref(), dry_run)
}
Command::Compact { dir } => compact::run(&dir),
Command::FetchAll { env_file, since } => workflows::fetch_all(&env_file, since.as_deref()),
Command::GenerateAll {
Expand Down