diff --git a/src/activities/compute_cron_wait.rs b/src/activities/compute_cron_wait.rs new file mode 100644 index 00000000..531312f9 --- /dev/null +++ b/src/activities/compute_cron_wait.rs @@ -0,0 +1,62 @@ +//! ComputeCronWait activity - computes remaining seconds until a target timestamp +//! +//! This activity is called by the WAIT_SCHEDULE orchestration node with a +//! pre-captured target timestamp (the next cron tick). It computes +//! `max(0, target - now())` at actual execution time, which correctly accounts +//! for any delay between `df.start()` and when the background worker processes +//! the WAIT_SCHEDULE node. +//! +//! Using an activity for this I/O (reading the wall clock) keeps the +//! orchestration itself deterministic, as required by duroxide replay safety. + +use chrono::{DateTime, Utc}; +use duroxide::ActivityContext; +use serde::{Deserialize, Serialize}; + +/// Activity name for registration and scheduling +pub const NAME: &str = "pg_durable::activity::compute-cron-wait"; + +/// Input for the compute_cron_wait activity +#[derive(Debug, Serialize, Deserialize)] +pub struct ComputeCronWaitInput { + /// RFC 3339 timestamp of the next cron tick, captured at DSL time + pub target_timestamp: String, + /// Original cron expression, used only for tracing + pub cron_expr: String, +} + +/// Output for the compute_cron_wait activity +#[derive(Debug, Serialize, Deserialize)] +pub struct ComputeCronWaitOutput { + /// Seconds remaining until the target timestamp (clamped to 0 if in the past) + pub wait_seconds: u64, +} + +/// Compute the number of seconds to wait until the target timestamp. +/// +/// The target was captured at DSL time; the actual remaining duration is +/// measured here, at activity-execution time, so the timer is always correct +/// even if there was a delay between `df.start()` and when the worker ran. +pub async fn execute(ctx: ActivityContext, input_json: String) -> Result { + let input: ComputeCronWaitInput = serde_json::from_str(&input_json) + .map_err(|e| format!("Invalid compute_cron_wait input: {e}"))?; + + let target: DateTime = input + .target_timestamp + .parse() + .map_err(|e| format!("Invalid target_timestamp '{}' (expected RFC 3339): {e}", input.target_timestamp))?; + + let now = Utc::now(); + let wait_seconds = (target - now).num_seconds().max(0) as u64; + + ctx.trace_info(format!( + "compute_cron_wait: cron='{}', target={}, now={}, wait={}s", + input.cron_expr, + input.target_timestamp, + now.to_rfc3339(), + wait_seconds + )); + + let output = ComputeCronWaitOutput { wait_seconds }; + serde_json::to_string(&output).map_err(|e| format!("Failed to serialize output: {e}")) +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs index ae1a7db0..b845a8e7 100644 --- a/src/activities/mod.rs +++ b/src/activities/mod.rs @@ -3,6 +3,7 @@ //! Each activity is in its own file with a co-located NAME constant. //! This enables IDE navigation (F12 jumps to implementation). +pub mod compute_cron_wait; pub mod execute_http; pub mod execute_sql; pub mod load_function_graph; diff --git a/src/dsl.rs b/src/dsl.rs index c8011fa3..e7615e00 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -257,8 +257,12 @@ pub fn sleep(seconds: i64) -> String { } /// Creates a wait-for-schedule node that waits until the next cron match. -/// The wait duration is computed at DSL time (when this function is called) -/// to ensure deterministic replay in the orchestration. +/// The cron expression is validated eagerly at DSL time to fail fast on +/// invalid expressions, and the target timestamp of the next cron tick is +/// captured now so the orchestration remains deterministic. The actual +/// remaining wait is computed at execution time inside the `compute_cron_wait` +/// activity, which correctly accounts for any delay between `df.start()` and +/// when the background worker processes this node. #[pg_extern(schema = "df")] pub fn wait_for_schedule(cron_expr: &str) -> String { let cron_with_seconds = format!("0 {cron_expr}"); @@ -267,19 +271,22 @@ pub fn wait_for_schedule(cron_expr: &str) -> String { Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e), }; - // Compute wait duration NOW (at DSL time) for deterministic orchestration replay - let now = Utc::now(); + // Compute the target timestamp at DSL time so it is stable across replays. + // The actual wait duration is computed at execution time by the + // compute_cron_wait activity to avoid waking too early when there is a + // delay between df.start() and when the background worker runs this node. let next = match schedule.upcoming(Utc).next() { Some(t) => t, None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr), }; - let duration_secs = (next - now).num_seconds().max(0) as u64; + // RFC 3339 format is stored so the compute_cron_wait activity can parse it + // back reliably with chrono::DateTime::parse_from_rfc3339(). + let target_timestamp = next.to_rfc3339(); - // Store pre-computed seconds, not the cron expression let config = serde_json::json!({ "cron_expr": cron_expr, - "wait_seconds": duration_secs + "target_timestamp": target_timestamp }); Durofut { diff --git a/src/explain.rs b/src/explain.rs index ad974cb6..21042181 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -643,18 +643,18 @@ fn format_node_display(node: &ExplainNode) -> String { format!("SLEEP {seconds}s{name_suffix}") } "WAIT_SCHEDULE" => { - // Parse config to get cron expression and wait seconds - let (cron, secs) = node + // Parse config to get cron expression and target timestamp + let (cron, target) = node .query .as_ref() .and_then(|q| serde_json::from_str::(q).ok()) .map(|cfg| { let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string(); - let s = cfg["wait_seconds"].as_u64().unwrap_or(0); - (c, s) + let t = cfg["target_timestamp"].as_str().unwrap_or("?").to_string(); + (c, t) }) - .unwrap_or_else(|| ("?".to_string(), 0)); - format!("WAIT '{cron}' ({secs}s){name_suffix}") + .unwrap_or_else(|| ("?".to_string(), "?".to_string())); + format!("WAIT '{cron}' (until {target}){name_suffix}") } "HTTP" => { // Parse config to get method and URL diff --git a/src/lib.rs b/src/lib.rs index be84c0d4..f65f8342 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -972,6 +972,22 @@ mod tests { let json = crate::dsl::wait_for_schedule("*/5 * * * *"); let fut = Durofut::from_json(&json); assert_eq!(fut.node_type, "WAIT_SCHEDULE"); + // Config must contain target_timestamp (RFC 3339), not a pre-computed wait_seconds + let config: serde_json::Value = + serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap(); + assert!( + config.get("target_timestamp").is_some(), + "config should have target_timestamp" + ); + assert!( + config.get("wait_seconds").is_none(), + "config must not have pre-computed wait_seconds" + ); + assert_eq!( + config["cron_expr"].as_str().unwrap(), + "*/5 * * * *", + "cron_expr should be preserved" + ); } #[pg_test] diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index 67183d67..8cd7f3de 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -412,16 +412,37 @@ async fn execute_wait_schedule_node( .as_ref() .ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?; - // Parse pre-computed config from DSL time let config: serde_json::Value = serde_json::from_str(config_str) .map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?; - let wait_seconds = config["wait_seconds"] - .as_u64() - .ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?; + let target_timestamp = config["target_timestamp"] + .as_str() + .ok_or_else(|| "WAIT_SCHEDULE missing target_timestamp".to_string())?; let cron_expr = config["cron_expr"].as_str().unwrap_or("?"); + // Delegate wall-clock I/O to an activity so the orchestration stays + // deterministic. The activity computes max(0, target - now()) at actual + // execution time, correctly accounting for any delay since df.start(). + let activity_input = serde_json::json!({ + "target_timestamp": target_timestamp, + "cron_expr": cron_expr, + }); + + let result_json = ctx + .schedule_activity( + activities::compute_cron_wait::NAME, + activity_input.to_string(), + ) + .await?; + + let result: serde_json::Value = serde_json::from_str(&result_json) + .map_err(|e| format!("Invalid compute_cron_wait output: {e}"))?; + + let wait_seconds = result["wait_seconds"] + .as_u64() + .ok_or_else(|| "compute_cron_wait output missing wait_seconds".to_string())?; + ctx.trace_info(format!( "Waiting {wait_seconds} seconds until schedule: {cron_expr}" )); diff --git a/src/registry.rs b/src/registry.rs index bb0a57ee..04ee28eb 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -38,6 +38,9 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let pool = http_pool.clone(); async move { activities::execute_http::execute(ctx, pool, config_json).await } }) + .register(activities::compute_cron_wait::NAME, |ctx: ActivityContext, input_json: String| { + async move { activities::compute_cron_wait::execute(ctx, input_json).await } + }) .build() }