From 001bcb866062f61351233dc70fe3f8d3bca1dcba Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:02:51 +0000 Subject: [PATCH 1/2] Initial plan From cffae03afb61b370762edfa6fe8cdb5a4e9b674a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 27 May 2026 14:22:54 +0000 Subject: [PATCH 2/2] Add indexed paginated instance listing APIs Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- CHANGELOG.md | 2 + USER_GUIDE.md | 11 ++- src/lib.rs | 2 + src/monitoring.rs | 234 ++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 239 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdbef3ec..0bf43d24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc - Behavior change (bug fix): `df.join` / `df.join3` results are now a proper JSON array of objects instead of an array of double-encoded JSON strings. Consumers that previously unescaped each element (e.g. `(elem #>> '{}')::jsonb`) must now read the element directly (#143) +- Monitoring: add `idx_instances_created_at_desc_id` on `df.instances(created_at DESC, id)` to support efficient chronological listing; extend `df.list_instances` to return `created_at` and `completed_at`; add `df.list_instances_paginated(status_filter, limit_count, after_cursor)` with keyset cursor pagination and `total_count` / `next_cursor`. + ## v0.2.1 (Released) - Dependency: upgrade duroxide `0.1.26→0.1.28` and duroxide-pg-opt `v0.1.23→v0.1.26`; adds cached-plan retryability, instance stats API, and error propagation fixes; switches TLS backend to `native-tls`, removing the `ring` crate entirely (#116) diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 45941e98..033dc1cf 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1391,9 +1391,17 @@ SELECT * FROM df.list_instances('Failed'); -- With limit SELECT * FROM df.list_instances(NULL, 10); + +-- Cursor-based pagination (first page) +SELECT * FROM df.list_instances_paginated(NULL, 20, NULL); + +-- Cursor-based pagination (next page) +SELECT * FROM df.list_instances_paginated(NULL, 20, '2026-01-01 12:00:00+00:00|a1b2c3d4'); ``` -**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output` +**df.list_instances columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at` + +**df.list_instances_paginated columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`, `total_count`, `next_cursor` ### Instance Details @@ -1626,6 +1634,7 @@ GRANT EXECUTE ON FUNCTION df.cancel(text, text) TO app_role; GRANT EXECUTE ON FUNCTION df.wait_for_completion(text, integer) TO app_role; GRANT EXECUTE ON FUNCTION df.run(text) TO app_role; GRANT EXECUTE ON FUNCTION df.list_instances(text, integer) TO app_role; +GRANT EXECUTE ON FUNCTION df.list_instances_paginated(text, integer, text) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_info(text) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_nodes(text, integer) TO app_role; GRANT EXECUTE ON FUNCTION df.instance_executions(text, integer) TO app_role; diff --git a/src/lib.rs b/src/lib.rs index be84c0d4..fc45272d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,6 +188,7 @@ COMMENT ON COLUMN df.instances.submitted_by IS -- Index for finding pending instances CREATE INDEX IF NOT EXISTS idx_instances_status ON df.instances(status); +CREATE INDEX IF NOT EXISTS idx_instances_created_at_desc_id ON df.instances(created_at DESC, id); -- Index for finding nodes by instance CREATE INDEX IF NOT EXISTS idx_nodes_instance ON df.nodes(instance_id); @@ -392,6 +393,7 @@ DECLARE 'df.wait_for_completion(text, integer)', 'df.run(text)', 'df.list_instances(text, integer)', + 'df.list_instances_paginated(text, integer, text)', 'df.instance_info(text)', 'df.instance_nodes(text, integer)', 'df.instance_executions(text, integer)', diff --git a/src/monitoring.rs b/src/monitoring.rs index fabd7599..6bfebd8e 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -7,6 +7,28 @@ use pgrx::prelude::*; use crate::types::{new_backend_provider, postgres_connection_string}; +const MAX_LIST_INSTANCES_LIMIT: i32 = 10000; + +fn validate_list_instances_limit(limit_count: i32) { + if limit_count < 1 { + pgrx::error!("limit_count must be at least 1"); + } + if limit_count > MAX_LIST_INSTANCES_LIMIT { + pgrx::error!("limit_count must be at most {}", MAX_LIST_INSTANCES_LIMIT); + } +} + +fn parse_list_instances_cursor(after_cursor: Option<&str>) -> Option<(String, String)> { + let cursor = after_cursor?; + let (created_at, id) = cursor.rsplit_once('|').unwrap_or_else(|| { + pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'") + }); + if created_at.is_empty() || id.is_empty() { + pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'"); + } + Some((created_at.to_string(), id.to_string())) +} + // ============================================================================ // Monitoring Functions // ============================================================================ @@ -25,12 +47,11 @@ pub fn list_instances( name!(status, String), name!(execution_count, i64), name!(output, Option), + name!(created_at, pgrx::datum::TimestampWithTimeZone), + name!(completed_at, Option), ), > { - if limit_count < 1 { - pgrx::error!("limit_count must be at least 1"); - } - let limit_count = limit_count.min(10000); + validate_list_instances_limit(limit_count); let pg_conn_str = postgres_connection_string(); @@ -39,17 +60,23 @@ pub fn list_instances( // df.list_instances(), df.instance_info()) share the same authoritative source // for the status column, eliminating the vocabulary mismatch between // df.instances.status ('cancelled') and duroxide executions.status ('Failed'). - let user_instances: Vec<(String, Option, String)> = Spi::connect(|client| { + let user_instances: Vec<( + String, + Option, + String, + pgrx::datum::TimestampWithTimeZone, + Option, + )> = Spi::connect(|client| { use pgrx::datum::DatumWithOid; let (sql, args): (&str, Vec) = if let Some(status) = status_filter { ( - "SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2", + "SELECT id, label, status, created_at, completed_at FROM df.instances WHERE status = $1 ORDER BY created_at DESC, id DESC LIMIT $2", vec![status.into(), (limit_count as i64).into()], ) } else { ( - "SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1", + "SELECT id, label, status, created_at, completed_at FROM df.instances ORDER BY created_at DESC, id DESC LIMIT $1", vec![(limit_count as i64).into()], ) }; @@ -59,7 +86,13 @@ pub fn list_instances( if let Ok(Some(id)) = row.get::(1) { let label: Option = row.get(2).ok().flatten(); let status: String = row.get(3).ok().flatten().unwrap_or_default(); - instances.push((id, label, status)); + let created_at: Option = + row.get(4).ok().flatten(); + let completed_at: Option = + row.get(5).ok().flatten(); + if let Some(created_at) = created_at { + instances.push((id, label, status, created_at, completed_at)); + } } } } @@ -90,7 +123,186 @@ pub fn list_instances( // Only query duroxide for function_name, execution_count, and output. // Status is read from df.instances (already fetched above) to ensure all // monitoring APIs agree on the status value. - for (id, label, df_status) in &user_instances { + for (id, label, df_status, created_at, completed_at) in &user_instances { + if let Ok(info) = client.get_instance_info(id).await { + rows.push(( + info.instance_id, + label.clone(), + info.orchestration_name, + df_status.clone(), + info.current_execution_id as i64, + info.output, + *created_at, + *completed_at, + )); + } + } + rows + }); + + TableIterator::new(results) +} + +/// List durable function instances with keyset pagination. +#[pg_extern(schema = "df")] +pub fn list_instances_paginated( + status_filter: default!(Option<&str>, "NULL"), + limit_count: default!(i32, "100"), + after_cursor: default!(Option<&str>, "NULL"), +) -> TableIterator< + 'static, + ( + name!(instance_id, String), + name!(label, Option), + name!(function_name, String), + name!(status, String), + name!(execution_count, i64), + name!(output, Option), + name!(created_at, pgrx::datum::TimestampWithTimeZone), + name!(completed_at, Option), + name!(total_count, i64), + name!(next_cursor, Option), + ), +> { + validate_list_instances_limit(limit_count); + let cursor = parse_list_instances_cursor(after_cursor); + let fetch_limit_plus_one = (limit_count as i64) + 1; + let pg_conn_str = postgres_connection_string(); + + let (total_count, mut user_instances): ( + i64, + Vec<( + String, + Option, + String, + pgrx::datum::TimestampWithTimeZone, + Option, + String, + )>, + ) = Spi::connect(|client| { + use pgrx::datum::DatumWithOid; + + let (count_sql, count_args): (&str, Vec) = if let Some(status) = status_filter + { + ( + "SELECT COUNT(*) FROM df.instances WHERE status = $1", + vec![status.into()], + ) + } else { + ("SELECT COUNT(*) FROM df.instances", vec![]) + }; + + let total_count = client + .select(count_sql, Some(1), &count_args) + .ok() + .and_then(|table| { + table + .into_iter() + .next() + .and_then(|row| row.get::(1).ok().flatten()) + }) + .unwrap_or(0); + + let (sql, args): (&str, Vec) = match (status_filter, cursor.as_ref()) { + (Some(status), Some((cursor_created_at, cursor_id))) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE status = $1 \ + AND (created_at < $2::timestamptz OR (created_at = $2::timestamptz AND id < $3)) \ + ORDER BY created_at DESC, id DESC \ + LIMIT $4", + vec![ + status.into(), + cursor_created_at.as_str().into(), + cursor_id.as_str().into(), + fetch_limit_plus_one.into(), + ], + ), + (Some(status), None) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE status = $1 \ + ORDER BY created_at DESC, id DESC \ + LIMIT $2", + vec![status.into(), fetch_limit_plus_one.into()], + ), + (None, Some((cursor_created_at, cursor_id))) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + WHERE (created_at < $1::timestamptz OR (created_at = $1::timestamptz AND id < $2)) \ + ORDER BY created_at DESC, id DESC \ + LIMIT $3", + vec![ + cursor_created_at.as_str().into(), + cursor_id.as_str().into(), + fetch_limit_plus_one.into(), + ], + ), + (None, None) => ( + "SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \ + FROM df.instances \ + ORDER BY created_at DESC, id DESC \ + LIMIT $1", + vec![fetch_limit_plus_one.into()], + ), + }; + + let mut instances = Vec::new(); + if let Ok(table) = client.select(sql, None, &args) { + for row in table { + if let Ok(Some(id)) = row.get::(1) { + let label: Option = row.get(2).ok().flatten(); + let status: String = row.get(3).ok().flatten().unwrap_or_default(); + let created_at: Option = + row.get(4).ok().flatten(); + let completed_at: Option = + row.get(5).ok().flatten(); + let next_cursor: String = row.get(6).ok().flatten().unwrap_or_default(); + if let Some(created_at) = created_at { + instances.push((id, label, status, created_at, completed_at, next_cursor)); + } + } + } + } + (total_count, instances) + }); + + if user_instances.is_empty() { + return TableIterator::new(vec![]); + } + + // Fetching one extra row (limit + 1) lets us detect whether another page exists. + let has_more = user_instances.len() > limit_count as usize; + if has_more { + user_instances.pop(); + } + let next_cursor = if has_more { + // After removing the lookahead row, last() is the final visible row. + user_instances + .last() + .map(|(_, _, _, _, _, cursor)| cursor.clone()) + } else { + None + }; + + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(_) => return TableIterator::new(vec![]), + }; + + let results = rt.block_on(async { + let store = match new_backend_provider(&pg_conn_str).await { + Ok(s) => s, + Err(_) => return vec![], + }; + + let client = Client::new(store); + + let mut rows = Vec::new(); + for (id, label, df_status, created_at, completed_at, _) in &user_instances { if let Ok(info) = client.get_instance_info(id).await { rows.push(( info.instance_id, @@ -99,6 +311,10 @@ pub fn list_instances( df_status.clone(), info.current_execution_id as i64, info.output, + *created_at, + *completed_at, + total_count, + next_cursor.clone(), )); } }