diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 1d7025bf..2bc6cfe1 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -23,9 +23,10 @@ pg_durable is a PostgreSQL extension that brings durable, fault-tolerant functio 13. [Monitoring](#monitoring) 14. [User Isolation & Privileges](#user-isolation--privileges) 15. [Connection Limits](#connection-limits) -16. [Troubleshooting](#troubleshooting) -17. [Quick Reference Card](#quick-reference-card) -18. [Appendix: Test Data Setup](#appendix-test-data-setup) +16. [Rate Limiting](#rate-limiting) +17. [Troubleshooting](#troubleshooting) +18. [Quick Reference Card](#quick-reference-card) +19. [Appendix: Test Data Setup](#appendix-test-data-setup) --- @@ -1818,6 +1819,87 @@ pg_durable.execution_acquire_timeout = 60 --- +## Rate Limiting + +pg_durable enforces two per-user quotas on `df.start()` to prevent a single database role from exhausting disk space, background-worker pool capacity, or duroxide history storage. Both limits are checked **before** any rows are inserted or any worker capacity is consumed — a rejected `df.start()` has zero side effects. + +Superusers bypass both checks (consistent with PostgreSQL's trust model for superusers). + +### Concurrency Cap — `df.max_concurrent_per_user` + +Controls how many **pending or running** instances a single user may have simultaneously. + +| GUC | Default | Context | Who can change | +|-----|---------|---------|----------------| +| `df.max_concurrent_per_user` | `100` | `PGC_SUSET` | Superusers only | + +`0` means unlimited (not recommended in multi-tenant deployments). + +When the limit is reached, `df.start()` raises: + +``` +ERROR: df.start rejected: user "alice" has 100 active instance(s) (limit 100). + Wait for in-flight instances to complete or ask a superuser to raise + df.max_concurrent_per_user. +``` + +**Tuning guidance:** + +- Raise the limit for trusted users running bulk workflows. +- Lower it (e.g. `10`) in highly multi-tenant environments where fairness matters. + +```sql +-- Superuser sets limit for all users +SET df.max_concurrent_per_user = 50; + +-- Verify current setting +SHOW df.max_concurrent_per_user; +``` + +### Instance Quota — `df.max_instances_per_user` + +Controls the **total number of rows** in `df.instances` for a user, regardless of status. This prevents unbounded history accumulation. + +| GUC | Default | Context | Who can change | +|-----|---------|---------|----------------| +| `df.max_instances_per_user` | `10000` | `PGC_SUSET` | Superusers only | + +`0` means unlimited. + +When the limit is reached, `df.start()` raises: + +``` +ERROR: df.start rejected: user "alice" has 10000 total instance(s) (limit 10000). + Delete old instances (DELETE FROM df.instances WHERE submitted_by = + current_user::regrole AND lower(status) IN ('completed','failed','cancelled')) + or ask a superuser to raise df.max_instances_per_user. +``` + +**Quota reclamation:** Completed, failed, and cancelled instances count against the quota. Delete old rows manually to reclaim quota: + +```sql +-- As a superuser or instance owner — deletes all finished instances for alice +DELETE FROM df.instances + WHERE submitted_by = 'alice'::regrole + AND lower(status) IN ('completed', 'failed', 'cancelled'); +``` + +> **Note:** A `df.purge()` helper function is planned for a future release to automate this cleanup. + +### Setting Limits in `postgresql.conf` + +Both GUCs are `PGC_SUSET` — they can be changed at runtime by superusers (`SET df.max_concurrent_per_user = 200`) or set persistently: + +```ini +# postgresql.conf +df.max_concurrent_per_user = 100 # 0 = unlimited +df.max_instances_per_user = 10000 # 0 = unlimited +``` + +> **Security note:** Because these GUCs are `PGC_SUSET`, regular (non-superuser) roles **cannot** raise the limits — `SET df.max_concurrent_per_user = 999999` will be rejected with a permission error. This is intentional. + +--- + ## Troubleshooting ### Extension Exists But Workflows Don't Start diff --git a/src/dsl.rs b/src/dsl.rs index e1be80dd..aa3ce204 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -701,6 +701,60 @@ pub fn start( } } + // Rate limiting: skip checks for superusers (consistent with I-8). + let is_superuser = crate::types::is_role_superuser_oid(current_user_oid).unwrap_or(false); + if !is_superuser { + // Check concurrency cap (pending + running instances for this user). + let max_concurrent = crate::types::get_max_concurrent_per_user(); + if max_concurrent > 0 { + let active_count: i64 = match Spi::get_one_with_args::( + "SELECT count(*) FROM df.instances \ + WHERE submitted_by = $1::oid::regrole \ + AND lower(status) IN ('pending', 'running')", + &[current_user_oid.into()], + ) { + Ok(Some(n)) => n, + Ok(None) => 0, + Err(e) => pgrx::error!("df.start: failed to count active instances: {}", e), + }; + if active_count >= max_concurrent as i64 { + pgrx::error!( + "df.start rejected: user \"{}\" has {} active instance(s) (limit {}). \ + Wait for in-flight instances to complete or ask a superuser to raise \ + df.max_concurrent_per_user.", + current_user_name, + active_count, + max_concurrent + ); + } + } + + // Check total instance quota (all statuses for this user). + let max_total = crate::types::get_max_instances_per_user(); + if max_total > 0 { + let total_count: i64 = match Spi::get_one_with_args::( + "SELECT count(*) FROM df.instances WHERE submitted_by = $1::oid::regrole", + &[current_user_oid.into()], + ) { + Ok(Some(n)) => n, + Ok(None) => 0, + Err(e) => pgrx::error!("df.start: failed to count total instances: {}", e), + }; + if total_count >= max_total as i64 { + pgrx::error!( + "df.start rejected: user \"{}\" has {} total instance(s) (limit {}). \ + Delete old instances (DELETE FROM df.instances WHERE submitted_by = \ + current_user::regrole AND lower(status) IN \ + ('completed','failed','cancelled')) or ask a superuser to raise \ + df.max_instances_per_user.", + current_user_name, + total_count, + max_total + ); + } + } + } + // Insert all nodes from the nested graph into df.nodes, returning root node ID fn insert_nodes( node: &Durofut, diff --git a/src/lib.rs b/src/lib.rs index 85b2e280..0f1dc27f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,12 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// role is a PostgreSQL superuser. Set to `true` only when superuser durable /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); +/// Maximum number of concurrently active (pending + running) instances per user. +/// 0 means unlimited. Only superusers can change this GUC (PGC_SUSET). +pub static MAX_CONCURRENT_PER_USER: GucSetting = GucSetting::::new(100); +/// Maximum total instances (all statuses) per user. 0 means unlimited. +/// Only superusers can change this GUC (PGC_SUSET). +pub static MAX_INSTANCES_PER_USER: GucSetting = GucSetting::::new(10000); // Module declarations pub mod activities; @@ -132,6 +138,28 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::SUPERUSER_ONLY, ); + GucRegistry::define_int_guc( + c"df.max_concurrent_per_user", + c"Maximum number of concurrently active (pending + running) df.start() instances per user", + c"0 = unlimited. Only superusers can change this setting.", + &MAX_CONCURRENT_PER_USER, + 0, + i32::MAX, + GucContext::Suset, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"df.max_instances_per_user", + c"Maximum total df.instances rows (all statuses) per user", + c"0 = unlimited. Delete old instances to reclaim quota. Only superusers can change this setting.", + &MAX_INSTANCES_PER_USER, + 0, + i32::MAX, + GucContext::Suset, + GucFlags::default(), + ); + worker::register_background_worker(); } diff --git a/src/types.rs b/src/types.rs index 43a762db..55c7b9c3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -58,6 +58,16 @@ pub fn superuser_instances_enabled() -> bool { crate::ENABLE_SUPERUSER_INSTANCES.get() } +/// Returns the maximum number of concurrently active instances per user (0 = unlimited). +pub fn get_max_concurrent_per_user() -> i32 { + crate::MAX_CONCURRENT_PER_USER.get() +} + +/// Returns the maximum total instances per user across all statuses (0 = unlimited). +pub fn get_max_instances_per_user() -> i32 { + crate::MAX_INSTANCES_PER_USER.get() +} + /// Returns `true` if the role identified by `role_oid` is a PostgreSQL superuser. /// Runs a SPI query against `pg_catalog.pg_roles`. Must be called from a /// backend context (not the background worker). diff --git a/tests/e2e/sql/22_rate_limit.sql b/tests/e2e/sql/22_rate_limit.sql new file mode 100644 index 00000000..2389bce5 --- /dev/null +++ b/tests/e2e/sql/22_rate_limit.sql @@ -0,0 +1,236 @@ +-- 22_rate_limit.sql +-- Tests: df.max_concurrent_per_user and df.max_instances_per_user GUC enforcement +-- +-- Cases covered: +-- 1. Concurrency cap: 3rd df.start() is rejected when limit = 2. +-- 2. After one instance is cancelled, a new df.start() succeeds. +-- 3. Instance quota: df.start() is rejected when total count reaches limit. +-- 4. Superuser bypasses both limits. +-- +-- Runs as postgres throughout; identity switching is explicit. + +-- ============================================================ +-- Setup +-- ============================================================ +DO $setup$ +BEGIN + PERFORM pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE usename = 'rl_test_user' AND pid <> pg_backend_pid(); + + BEGIN DROP OWNED BY rl_test_user; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE rl_test_user; EXCEPTION WHEN undefined_object THEN NULL; END; +END $setup$; + +CREATE ROLE rl_test_user LOGIN; +SELECT df.grant_usage('rl_test_user'); +GRANT TEMPORARY ON DATABASE postgres TO rl_test_user; + +-- Persistent table so superuser can cancel the long-running instances. +DROP TABLE IF EXISTS _rl_long_instances; +CREATE TABLE _rl_long_instances (instance_id TEXT); +GRANT INSERT ON _rl_long_instances TO rl_test_user; + +-- ============================================================ +-- Test 1: Concurrency cap enforcement +-- ============================================================ + +SET df.max_concurrent_per_user = 2; + +-- Start 2 long-running instances as rl_test_user so they stay active. +SET SESSION AUTHORIZATION rl_test_user; +INSERT INTO _rl_long_instances SELECT df.start(df.sleep(60), 'rl-concurrent-1'); +INSERT INTO _rl_long_instances SELECT df.start(df.sleep(60), 'rl-concurrent-2'); +RESET SESSION AUTHORIZATION; + +-- Third start must be rejected. +DO $$ +DECLARE + caught BOOLEAN := false; + msg TEXT; +BEGIN + BEGIN + SET SESSION AUTHORIZATION rl_test_user; + PERFORM df.start(df.sleep(60), 'rl-concurrent-3-should-fail'); + RESET SESSION AUTHORIZATION; + EXCEPTION WHEN OTHERS THEN + caught := true; + msg := SQLERRM; + RESET SESSION AUTHORIZATION; + END; + + IF NOT caught THEN + RAISE EXCEPTION 'TEST 1 FAILED: expected df.start() to be rejected at concurrency limit'; + END IF; + + IF msg NOT LIKE '%max_concurrent_per_user%' THEN + RAISE EXCEPTION 'TEST 1 FAILED: error message does not mention max_concurrent_per_user, got: %', msg; + END IF; + + RAISE NOTICE 'TEST 1 PASSED: concurrency cap enforced (error: %)', msg; +END $$; + +-- ============================================================ +-- Test 2: After one instance is cancelled, a new df.start() succeeds +-- ============================================================ + +-- Cancel one of the running instances (superuser can cancel any instance). +DO $$ +DECLARE + inst_id TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _rl_long_instances LIMIT 1; + PERFORM df.cancel(inst_id, 'Test 2: freeing slot'); +END $$; + +-- Now rl_test_user has 1 active instance — below the limit of 2. +-- Create temp table as the user so they can INSERT into it. +SET SESSION AUTHORIZATION rl_test_user; +CREATE TEMP TABLE _rl_t2 (instance_id TEXT); +INSERT INTO _rl_t2 SELECT df.start('SELECT 1', 'rl-after-cancel'); +RESET SESSION AUTHORIZATION; + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _rl_t2; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + IF lower(status) != 'completed' THEN + RAISE EXCEPTION 'TEST 2 FAILED: expected completed after slot freed, got %', status; + END IF; + RAISE NOTICE 'TEST 2 PASSED: new df.start() succeeded after a slot was freed'; +END $$; + +DROP TABLE _rl_t2; + +-- Cancel the remaining long-running instance before Test 3. +DO $$ +DECLARE + inst_id TEXT; +BEGIN + FOR inst_id IN SELECT instance_id FROM _rl_long_instances LOOP + BEGIN + PERFORM df.cancel(inst_id, 'Test cleanup'); + EXCEPTION WHEN OTHERS THEN NULL; + END; + END LOOP; +END $$; + +-- ============================================================ +-- Test 3: Instance total quota enforcement +-- ============================================================ + +-- Lift concurrency limit so it doesn't interfere. +SET df.max_concurrent_per_user = 1000; + +-- Set total quota to current count + 1 so one more succeeds, then blocks. +DO $$ +DECLARE + current_count BIGINT; +BEGIN + SELECT COUNT(*) INTO current_count + FROM df.instances + WHERE submitted_by = 'rl_test_user'::regrole; + + -- Use explicit bigint-to-text cast to avoid any injection risk. + EXECUTE 'SET df.max_instances_per_user = ' || (current_count + 1)::bigint::text; +END $$; + +-- This start should succeed (fills the quota exactly). +SET SESSION AUTHORIZATION rl_test_user; +CREATE TEMP TABLE _rl_t3a (instance_id TEXT); +INSERT INTO _rl_t3a SELECT df.start('SELECT 1', 'rl-quota-fill'); +RESET SESSION AUTHORIZATION; + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _rl_t3a; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + IF lower(status) != 'completed' THEN + RAISE EXCEPTION 'TEST 3a FAILED: expected completed, got %', status; + END IF; + RAISE NOTICE 'TEST 3a PASSED: last allowed instance completed'; +END $$; + +DROP TABLE _rl_t3a; + +-- This start must be rejected (quota exceeded). +DO $$ +DECLARE + caught BOOLEAN := false; + msg TEXT; +BEGIN + BEGIN + SET SESSION AUTHORIZATION rl_test_user; + PERFORM df.start('SELECT 1', 'rl-quota-exceed'); + RESET SESSION AUTHORIZATION; + EXCEPTION WHEN OTHERS THEN + caught := true; + msg := SQLERRM; + RESET SESSION AUTHORIZATION; + END; + + IF NOT caught THEN + RAISE EXCEPTION 'TEST 3b FAILED: expected df.start() to be rejected at quota limit'; + END IF; + + IF msg NOT LIKE '%max_instances_per_user%' THEN + RAISE EXCEPTION 'TEST 3b FAILED: error message does not mention max_instances_per_user, got: %', msg; + END IF; + + RAISE NOTICE 'TEST 3b PASSED: instance quota enforced (error: %)', msg; +END $$; + +-- ============================================================ +-- Test 4: Superuser bypasses both limits +-- ============================================================ + +-- Keep limits very tight. +SET df.max_concurrent_per_user = 1; +SET df.max_instances_per_user = 1; + +-- Superuser (postgres) should not be blocked. +CREATE TEMP TABLE _rl_t4 (instance_id TEXT); +INSERT INTO _rl_t4 SELECT df.start('SELECT 1', 'rl-superuser-bypass'); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _rl_t4; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + IF lower(status) != 'completed' THEN + RAISE EXCEPTION 'TEST 4 FAILED: superuser df.start() was blocked, got status %', status; + END IF; + RAISE NOTICE 'TEST 4 PASSED: superuser bypasses rate limits'; +END $$; + +DROP TABLE _rl_t4; + +-- ============================================================ +-- Cleanup +-- ============================================================ + +-- Restore defaults. +SET df.max_concurrent_per_user = 100; +SET df.max_instances_per_user = 10000; + +DROP TABLE IF EXISTS _rl_long_instances; + +DO $teardown$ +BEGIN + PERFORM pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE usename = 'rl_test_user' AND pid <> pg_backend_pid(); + + BEGIN DROP OWNED BY rl_test_user; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE rl_test_user; EXCEPTION WHEN undefined_object THEN NULL; END; +END $teardown$; + +SELECT 'TEST PASSED' AS result;