Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc

- Behavior change (bug fix): `df.join` / `df.join3` results are now a proper JSON array of objects instead of an array of double-encoded JSON strings. Consumers that previously unescaped each element (e.g. `(elem #>> '{}')::jsonb`) must now read the element directly (#143)

- Monitoring: add `idx_instances_created_at_desc_id` on `df.instances(created_at DESC, id)` to support efficient chronological listing; extend `df.list_instances` to return `created_at` and `completed_at`; add `df.list_instances_paginated(status_filter, limit_count, after_cursor)` with keyset cursor pagination and `total_count` / `next_cursor`.

## v0.2.1 (Released)

- Dependency: upgrade duroxide `0.1.26→0.1.28` and duroxide-pg-opt `v0.1.23→v0.1.26`; adds cached-plan retryability, instance stats API, and error propagation fixes; switches TLS backend to `native-tls`, removing the `ring` crate entirely (#116)
Expand Down
11 changes: 10 additions & 1 deletion USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1391,9 +1391,17 @@ SELECT * FROM df.list_instances('Failed');

-- With limit
SELECT * FROM df.list_instances(NULL, 10);

-- Cursor-based pagination (first page)
SELECT * FROM df.list_instances_paginated(NULL, 20, NULL);

-- Cursor-based pagination (next page)
SELECT * FROM df.list_instances_paginated(NULL, 20, '2026-01-01 12:00:00+00:00|a1b2c3d4');
```

**Columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`
**df.list_instances columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`

**df.list_instances_paginated columns:** `instance_id`, `label`, `function_name`, `status`, `execution_count`, `output`, `created_at`, `completed_at`, `total_count`, `next_cursor`

### Instance Details

Expand Down Expand Up @@ -1626,6 +1634,7 @@ GRANT EXECUTE ON FUNCTION df.cancel(text, text) TO app_role;
GRANT EXECUTE ON FUNCTION df.wait_for_completion(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.run(text) TO app_role;
GRANT EXECUTE ON FUNCTION df.list_instances(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.list_instances_paginated(text, integer, text) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_info(text) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_nodes(text, integer) TO app_role;
GRANT EXECUTE ON FUNCTION df.instance_executions(text, integer) TO app_role;
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ COMMENT ON COLUMN df.instances.submitted_by IS

-- Index for finding pending instances
CREATE INDEX IF NOT EXISTS idx_instances_status ON df.instances(status);
CREATE INDEX IF NOT EXISTS idx_instances_created_at_desc_id ON df.instances(created_at DESC, id);

-- Index for finding nodes by instance
CREATE INDEX IF NOT EXISTS idx_nodes_instance ON df.nodes(instance_id);
Expand Down Expand Up @@ -392,6 +393,7 @@ DECLARE
'df.wait_for_completion(text, integer)',
'df.run(text)',
'df.list_instances(text, integer)',
'df.list_instances_paginated(text, integer, text)',
'df.instance_info(text)',
'df.instance_nodes(text, integer)',
'df.instance_executions(text, integer)',
Expand Down
234 changes: 225 additions & 9 deletions src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ use pgrx::prelude::*;

use crate::types::{new_backend_provider, postgres_connection_string};

const MAX_LIST_INSTANCES_LIMIT: i32 = 10000;

fn validate_list_instances_limit(limit_count: i32) {
if limit_count < 1 {
pgrx::error!("limit_count must be at least 1");
}
if limit_count > MAX_LIST_INSTANCES_LIMIT {
pgrx::error!("limit_count must be at most {}", MAX_LIST_INSTANCES_LIMIT);
}
}

fn parse_list_instances_cursor(after_cursor: Option<&str>) -> Option<(String, String)> {
let cursor = after_cursor?;
let (created_at, id) = cursor.rsplit_once('|').unwrap_or_else(|| {
pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'")
});
if created_at.is_empty() || id.is_empty() {
pgrx::error!("after_cursor must be in the format 'YYYY-MM-DD HH:MM:SS+00:00|instance_id'");
}
Some((created_at.to_string(), id.to_string()))
}

// ============================================================================
// Monitoring Functions
// ============================================================================
Expand All @@ -25,12 +47,11 @@ pub fn list_instances(
name!(status, String),
name!(execution_count, i64),
name!(output, Option<String>),
name!(created_at, pgrx::datum::TimestampWithTimeZone),
name!(completed_at, Option<pgrx::datum::TimestampWithTimeZone>),
),
> {
if limit_count < 1 {
pgrx::error!("limit_count must be at least 1");
}
let limit_count = limit_count.min(10000);
validate_list_instances_limit(limit_count);

let pg_conn_str = postgres_connection_string();

Expand All @@ -39,17 +60,23 @@ pub fn list_instances(
// df.list_instances(), df.instance_info()) share the same authoritative source
// for the status column, eliminating the vocabulary mismatch between
// df.instances.status ('cancelled') and duroxide executions.status ('Failed').
let user_instances: Vec<(String, Option<String>, String)> = Spi::connect(|client| {
let user_instances: Vec<(
String,
Option<String>,
String,
pgrx::datum::TimestampWithTimeZone,
Option<pgrx::datum::TimestampWithTimeZone>,
)> = Spi::connect(|client| {
use pgrx::datum::DatumWithOid;

let (sql, args): (&str, Vec<DatumWithOid>) = if let Some(status) = status_filter {
(
"SELECT id, label, status FROM df.instances WHERE status = $1 ORDER BY created_at DESC LIMIT $2",
"SELECT id, label, status, created_at, completed_at FROM df.instances WHERE status = $1 ORDER BY created_at DESC, id DESC LIMIT $2",
vec![status.into(), (limit_count as i64).into()],
)
} else {
(
"SELECT id, label, status FROM df.instances ORDER BY created_at DESC LIMIT $1",
"SELECT id, label, status, created_at, completed_at FROM df.instances ORDER BY created_at DESC, id DESC LIMIT $1",
vec![(limit_count as i64).into()],
)
};
Expand All @@ -59,7 +86,13 @@ pub fn list_instances(
if let Ok(Some(id)) = row.get::<String>(1) {
let label: Option<String> = row.get(2).ok().flatten();
let status: String = row.get(3).ok().flatten().unwrap_or_default();
instances.push((id, label, status));
let created_at: Option<pgrx::datum::TimestampWithTimeZone> =
row.get(4).ok().flatten();
let completed_at: Option<pgrx::datum::TimestampWithTimeZone> =
row.get(5).ok().flatten();
if let Some(created_at) = created_at {
instances.push((id, label, status, created_at, completed_at));
}
}
}
}
Expand Down Expand Up @@ -90,7 +123,186 @@ pub fn list_instances(
// Only query duroxide for function_name, execution_count, and output.
// Status is read from df.instances (already fetched above) to ensure all
// monitoring APIs agree on the status value.
for (id, label, df_status) in &user_instances {
for (id, label, df_status, created_at, completed_at) in &user_instances {
if let Ok(info) = client.get_instance_info(id).await {
rows.push((
info.instance_id,
label.clone(),
info.orchestration_name,
df_status.clone(),
info.current_execution_id as i64,
info.output,
*created_at,
*completed_at,
));
}
}
rows
});

TableIterator::new(results)
}

/// List durable function instances with keyset pagination.
#[pg_extern(schema = "df")]
pub fn list_instances_paginated(
status_filter: default!(Option<&str>, "NULL"),
limit_count: default!(i32, "100"),
after_cursor: default!(Option<&str>, "NULL"),
) -> TableIterator<
'static,
(
name!(instance_id, String),
name!(label, Option<String>),
name!(function_name, String),
name!(status, String),
name!(execution_count, i64),
name!(output, Option<String>),
name!(created_at, pgrx::datum::TimestampWithTimeZone),
name!(completed_at, Option<pgrx::datum::TimestampWithTimeZone>),
name!(total_count, i64),
name!(next_cursor, Option<String>),
),
> {
validate_list_instances_limit(limit_count);
let cursor = parse_list_instances_cursor(after_cursor);
let fetch_limit_plus_one = (limit_count as i64) + 1;
let pg_conn_str = postgres_connection_string();

let (total_count, mut user_instances): (
i64,
Vec<(
String,
Option<String>,
String,
pgrx::datum::TimestampWithTimeZone,
Option<pgrx::datum::TimestampWithTimeZone>,
String,
)>,
) = Spi::connect(|client| {
use pgrx::datum::DatumWithOid;

let (count_sql, count_args): (&str, Vec<DatumWithOid>) = if let Some(status) = status_filter
{
(
"SELECT COUNT(*) FROM df.instances WHERE status = $1",
vec![status.into()],
)
} else {
("SELECT COUNT(*) FROM df.instances", vec![])
};

let total_count = client
.select(count_sql, Some(1), &count_args)
.ok()
.and_then(|table| {
table
.into_iter()
.next()
.and_then(|row| row.get::<i64>(1).ok().flatten())
})
.unwrap_or(0);

let (sql, args): (&str, Vec<DatumWithOid>) = match (status_filter, cursor.as_ref()) {
(Some(status), Some((cursor_created_at, cursor_id))) => (
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
FROM df.instances \
WHERE status = $1 \
AND (created_at < $2::timestamptz OR (created_at = $2::timestamptz AND id < $3)) \
ORDER BY created_at DESC, id DESC \
LIMIT $4",
vec![
status.into(),
cursor_created_at.as_str().into(),
cursor_id.as_str().into(),
fetch_limit_plus_one.into(),
],
),
(Some(status), None) => (
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
FROM df.instances \
WHERE status = $1 \
ORDER BY created_at DESC, id DESC \
LIMIT $2",
vec![status.into(), fetch_limit_plus_one.into()],
),
(None, Some((cursor_created_at, cursor_id))) => (
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
FROM df.instances \
WHERE (created_at < $1::timestamptz OR (created_at = $1::timestamptz AND id < $2)) \
ORDER BY created_at DESC, id DESC \
LIMIT $3",
vec![
cursor_created_at.as_str().into(),
cursor_id.as_str().into(),
fetch_limit_plus_one.into(),
],
),
(None, None) => (
"SELECT id, label, status, created_at, completed_at, created_at::text || '|' || id AS next_cursor \
FROM df.instances \
ORDER BY created_at DESC, id DESC \
LIMIT $1",
vec![fetch_limit_plus_one.into()],
),
};

let mut instances = Vec::new();
if let Ok(table) = client.select(sql, None, &args) {
for row in table {
if let Ok(Some(id)) = row.get::<String>(1) {
let label: Option<String> = row.get(2).ok().flatten();
let status: String = row.get(3).ok().flatten().unwrap_or_default();
let created_at: Option<pgrx::datum::TimestampWithTimeZone> =
row.get(4).ok().flatten();
let completed_at: Option<pgrx::datum::TimestampWithTimeZone> =
row.get(5).ok().flatten();
let next_cursor: String = row.get(6).ok().flatten().unwrap_or_default();
if let Some(created_at) = created_at {
instances.push((id, label, status, created_at, completed_at, next_cursor));
}
}
}
}
(total_count, instances)
});

if user_instances.is_empty() {
return TableIterator::new(vec![]);
}

// Fetching one extra row (limit + 1) lets us detect whether another page exists.
let has_more = user_instances.len() > limit_count as usize;
if has_more {
user_instances.pop();
}
let next_cursor = if has_more {
// After removing the lookahead row, last() is the final visible row.
user_instances
.last()
.map(|(_, _, _, _, _, cursor)| cursor.clone())
} else {
None
};

let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(_) => return TableIterator::new(vec![]),
};

let results = rt.block_on(async {
let store = match new_backend_provider(&pg_conn_str).await {
Ok(s) => s,
Err(_) => return vec![],
};

let client = Client::new(store);

let mut rows = Vec::new();
for (id, label, df_status, created_at, completed_at, _) in &user_instances {
if let Ok(info) = client.get_instance_info(id).await {
rows.push((
info.instance_id,
Expand All @@ -99,6 +311,10 @@ pub fn list_instances(
df_status.clone(),
info.current_execution_id as i64,
info.output,
*created_at,
*completed_at,
total_count,
next_cursor.clone(),
));
}
}
Expand Down
Loading