Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ pg_durable is a PostgreSQL extension that brings durable, fault-tolerant functio
13. [Monitoring](#monitoring)
14. [User Isolation & Privileges](#user-isolation--privileges)
15. [Connection Limits](#connection-limits)
16. [Troubleshooting](#troubleshooting)
17. [Quick Reference Card](#quick-reference-card)
18. [Appendix: Test Data Setup](#appendix-test-data-setup)
16. [Rate Limiting](#rate-limiting)
17. [Troubleshooting](#troubleshooting)
18. [Quick Reference Card](#quick-reference-card)
19. [Appendix: Test Data Setup](#appendix-test-data-setup)

---

Expand Down Expand Up @@ -1818,6 +1819,87 @@ pg_durable.execution_acquire_timeout = 60

---

## Rate Limiting

pg_durable enforces two per-user quotas on `df.start()` to prevent a single database role from exhausting disk space, background-worker pool capacity, or duroxide history storage. Both limits are checked **before** any rows are inserted or any worker capacity is consumed — a rejected `df.start()` has zero side effects.

Superusers bypass both checks (consistent with PostgreSQL's trust model for superusers).

### Concurrency Cap — `df.max_concurrent_per_user`

Controls how many **pending or running** instances a single user may have simultaneously.

| GUC | Default | Context | Who can change |
|-----|---------|---------|----------------|
| `df.max_concurrent_per_user` | `100` | `PGC_SUSET` | Superusers only |

`0` means unlimited (not recommended in multi-tenant deployments).

When the limit is reached, `df.start()` raises:

```
ERROR: df.start rejected: user "alice" has 100 active instance(s) (limit 100).
Wait for in-flight instances to complete or ask a superuser to raise
df.max_concurrent_per_user.
```

**Tuning guidance:**

- Raise the limit for trusted users running bulk workflows.
- Lower it (e.g. `10`) in highly multi-tenant environments where fairness matters.

```sql
-- Superuser sets limit for all users
SET df.max_concurrent_per_user = 50;

-- Verify current setting
SHOW df.max_concurrent_per_user;
```

### Instance Quota — `df.max_instances_per_user`

Controls the **total number of rows** in `df.instances` for a user, regardless of status. This prevents unbounded history accumulation.

| GUC | Default | Context | Who can change |
|-----|---------|---------|----------------|
| `df.max_instances_per_user` | `10000` | `PGC_SUSET` | Superusers only |

`0` means unlimited.

When the limit is reached, `df.start()` raises:

```
ERROR: df.start rejected: user "alice" has 10000 total instance(s) (limit 10000).
Delete old instances (DELETE FROM df.instances WHERE submitted_by =
current_user::regrole AND lower(status) IN ('completed','failed','cancelled'))
or ask a superuser to raise df.max_instances_per_user.
```

**Quota reclamation:** Completed, failed, and cancelled instances count against the quota. Delete old rows manually to reclaim quota:

```sql
-- As a superuser or instance owner — deletes all finished instances for alice
DELETE FROM df.instances
WHERE submitted_by = 'alice'::regrole
AND lower(status) IN ('completed', 'failed', 'cancelled');
```

> **Note:** A `df.purge()` helper function is planned for a future release to automate this cleanup.

### Setting Limits in `postgresql.conf`

Both GUCs are `PGC_SUSET` — they can be changed at runtime by superusers (`SET df.max_concurrent_per_user = 200`) or set persistently:

```ini
# postgresql.conf
df.max_concurrent_per_user = 100 # 0 = unlimited
df.max_instances_per_user = 10000 # 0 = unlimited
```

> **Security note:** Because these GUCs are `PGC_SUSET`, regular (non-superuser) roles **cannot** raise the limits — `SET df.max_concurrent_per_user = 999999` will be rejected with a permission error. This is intentional.

---

## Troubleshooting

### Extension Exists But Workflows Don't Start
Expand Down
54 changes: 54 additions & 0 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,60 @@ pub fn start(
}
}

// Rate limiting: skip checks for superusers (consistent with I-8).
let is_superuser = crate::types::is_role_superuser_oid(current_user_oid).unwrap_or(false);
if !is_superuser {
// Check concurrency cap (pending + running instances for this user).
let max_concurrent = crate::types::get_max_concurrent_per_user();
if max_concurrent > 0 {
let active_count: i64 = match Spi::get_one_with_args::<i64>(
"SELECT count(*) FROM df.instances \
WHERE submitted_by = $1::oid::regrole \
AND lower(status) IN ('pending', 'running')",
&[current_user_oid.into()],
) {
Ok(Some(n)) => n,
Ok(None) => 0,
Err(e) => pgrx::error!("df.start: failed to count active instances: {}", e),
};
if active_count >= max_concurrent as i64 {
pgrx::error!(
"df.start rejected: user \"{}\" has {} active instance(s) (limit {}). \
Wait for in-flight instances to complete or ask a superuser to raise \
df.max_concurrent_per_user.",
current_user_name,
active_count,
max_concurrent
);
}
}

// Check total instance quota (all statuses for this user).
let max_total = crate::types::get_max_instances_per_user();
if max_total > 0 {
let total_count: i64 = match Spi::get_one_with_args::<i64>(
"SELECT count(*) FROM df.instances WHERE submitted_by = $1::oid::regrole",
&[current_user_oid.into()],
) {
Ok(Some(n)) => n,
Ok(None) => 0,
Err(e) => pgrx::error!("df.start: failed to count total instances: {}", e),
};
if total_count >= max_total as i64 {
pgrx::error!(
"df.start rejected: user \"{}\" has {} total instance(s) (limit {}). \
Delete old instances (DELETE FROM df.instances WHERE submitted_by = \
current_user::regrole AND lower(status) IN \
('completed','failed','cancelled')) or ask a superuser to raise \
df.max_instances_per_user.",
current_user_name,
total_count,
max_total
);
}
}
}

// Insert all nodes from the nested graph into df.nodes, returning root node ID
fn insert_nodes(
node: &Durofut,
Expand Down
28 changes: 28 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting<i32> = GucSetting::<i32>::new(3
/// role is a PostgreSQL superuser. Set to `true` only when superuser durable
/// functions are explicitly desired. See docs/superuser_guc.md.
pub static ENABLE_SUPERUSER_INSTANCES: GucSetting<bool> = GucSetting::<bool>::new(false);
/// Maximum number of concurrently active (pending + running) instances per user.
/// 0 means unlimited. Only superusers can change this GUC (PGC_SUSET).
pub static MAX_CONCURRENT_PER_USER: GucSetting<i32> = GucSetting::<i32>::new(100);
/// Maximum total instances (all statuses) per user. 0 means unlimited.
/// Only superusers can change this GUC (PGC_SUSET).
pub static MAX_INSTANCES_PER_USER: GucSetting<i32> = GucSetting::<i32>::new(10000);

// Module declarations
pub mod activities;
Expand Down Expand Up @@ -132,6 +138,28 @@ pub extern "C-unwind" fn _PG_init() {
GucFlags::SUPERUSER_ONLY,
);

GucRegistry::define_int_guc(
c"df.max_concurrent_per_user",
c"Maximum number of concurrently active (pending + running) df.start() instances per user",
c"0 = unlimited. Only superusers can change this setting.",
&MAX_CONCURRENT_PER_USER,
0,
i32::MAX,
GucContext::Suset,
GucFlags::default(),
);

GucRegistry::define_int_guc(
c"df.max_instances_per_user",
c"Maximum total df.instances rows (all statuses) per user",
c"0 = unlimited. Delete old instances to reclaim quota. Only superusers can change this setting.",
&MAX_INSTANCES_PER_USER,
0,
i32::MAX,
GucContext::Suset,
GucFlags::default(),
);

worker::register_background_worker();
}

Expand Down
10 changes: 10 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ pub fn superuser_instances_enabled() -> bool {
crate::ENABLE_SUPERUSER_INSTANCES.get()
}

/// Returns the maximum number of concurrently active instances per user (0 = unlimited).
pub fn get_max_concurrent_per_user() -> i32 {
crate::MAX_CONCURRENT_PER_USER.get()
}

/// Returns the maximum total instances per user across all statuses (0 = unlimited).
pub fn get_max_instances_per_user() -> i32 {
crate::MAX_INSTANCES_PER_USER.get()
}

/// Returns `true` if the role identified by `role_oid` is a PostgreSQL superuser.
/// Runs a SPI query against `pg_catalog.pg_roles`. Must be called from a
/// backend context (not the background worker).
Expand Down
Loading
Loading