Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/activities/compute_cron_wait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! ComputeCronWait activity - computes remaining seconds until a target timestamp
//!
//! This activity is called by the WAIT_SCHEDULE orchestration node with a
//! pre-captured target timestamp (the next cron tick). It computes
//! `max(0, target - now())` at actual execution time, which correctly accounts
//! for any delay between `df.start()` and when the background worker processes
//! the WAIT_SCHEDULE node.
//!
//! Using an activity for this I/O (reading the wall clock) keeps the
//! orchestration itself deterministic, as required by duroxide replay safety.

use chrono::{DateTime, Utc};
use duroxide::ActivityContext;
use serde::{Deserialize, Serialize};

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::compute-cron-wait";

/// Input for the compute_cron_wait activity
#[derive(Debug, Serialize, Deserialize)]
pub struct ComputeCronWaitInput {
/// RFC 3339 timestamp of the next cron tick, captured at DSL time
pub target_timestamp: String,
/// Original cron expression, used only for tracing
pub cron_expr: String,
}

/// Output for the compute_cron_wait activity
#[derive(Debug, Serialize, Deserialize)]
pub struct ComputeCronWaitOutput {
/// Seconds remaining until the target timestamp (clamped to 0 if in the past)
pub wait_seconds: u64,
}

/// Compute the number of seconds to wait until the target timestamp.
///
/// The target was captured at DSL time; the actual remaining duration is
/// measured here, at activity-execution time, so the timer is always correct
/// even if there was a delay between `df.start()` and when the worker ran.
pub async fn execute(ctx: ActivityContext, input_json: String) -> Result<String, String> {
let input: ComputeCronWaitInput = serde_json::from_str(&input_json)
.map_err(|e| format!("Invalid compute_cron_wait input: {e}"))?;

let target: DateTime<Utc> = input
.target_timestamp
.parse()
.map_err(|e| format!("Invalid target_timestamp '{}' (expected RFC 3339): {e}", input.target_timestamp))?;

let now = Utc::now();
let wait_seconds = (target - now).num_seconds().max(0) as u64;

ctx.trace_info(format!(
"compute_cron_wait: cron='{}', target={}, now={}, wait={}s",
input.cron_expr,
input.target_timestamp,
now.to_rfc3339(),
wait_seconds
));

let output = ComputeCronWaitOutput { wait_seconds };
serde_json::to_string(&output).map_err(|e| format!("Failed to serialize output: {e}"))
}
1 change: 1 addition & 0 deletions src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Each activity is in its own file with a co-located NAME constant.
//! This enables IDE navigation (F12 jumps to implementation).

pub mod compute_cron_wait;
pub mod execute_http;
pub mod execute_sql;
pub mod load_function_graph;
Expand Down
21 changes: 14 additions & 7 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,12 @@ pub fn sleep(seconds: i64) -> String {
}

/// Creates a wait-for-schedule node that waits until the next cron match.
/// The wait duration is computed at DSL time (when this function is called)
/// to ensure deterministic replay in the orchestration.
/// The cron expression is validated eagerly at DSL time to fail fast on
/// invalid expressions, and the target timestamp of the next cron tick is
/// captured now so the orchestration remains deterministic. The actual
/// remaining wait is computed at execution time inside the `compute_cron_wait`
/// activity, which correctly accounts for any delay between `df.start()` and
/// when the background worker processes this node.
#[pg_extern(schema = "df")]
pub fn wait_for_schedule(cron_expr: &str) -> String {
let cron_with_seconds = format!("0 {cron_expr}");
Expand All @@ -267,19 +271,22 @@ pub fn wait_for_schedule(cron_expr: &str) -> String {
Err(e) => pgrx::error!("Invalid cron expression '{}': {}", cron_expr, e),
};

// Compute wait duration NOW (at DSL time) for deterministic orchestration replay
let now = Utc::now();
// Compute the target timestamp at DSL time so it is stable across replays.
// The actual wait duration is computed at execution time by the
// compute_cron_wait activity to avoid waking too early when there is a
// delay between df.start() and when the background worker runs this node.
let next = match schedule.upcoming(Utc).next() {
Some(t) => t,
None => pgrx::error!("No upcoming schedule found for '{}'", cron_expr),
};

let duration_secs = (next - now).num_seconds().max(0) as u64;
// RFC 3339 format is stored so the compute_cron_wait activity can parse it
// back reliably with chrono::DateTime::parse_from_rfc3339().
let target_timestamp = next.to_rfc3339();

// Store pre-computed seconds, not the cron expression
let config = serde_json::json!({
"cron_expr": cron_expr,
"wait_seconds": duration_secs
"target_timestamp": target_timestamp
});

Durofut {
Expand Down
12 changes: 6 additions & 6 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,18 +643,18 @@ fn format_node_display(node: &ExplainNode) -> String {
format!("SLEEP {seconds}s{name_suffix}")
}
"WAIT_SCHEDULE" => {
// Parse config to get cron expression and wait seconds
let (cron, secs) = node
// Parse config to get cron expression and target timestamp
let (cron, target) = node
.query
.as_ref()
.and_then(|q| serde_json::from_str::<serde_json::Value>(q).ok())
.map(|cfg| {
let c = cfg["cron_expr"].as_str().unwrap_or("?").to_string();
let s = cfg["wait_seconds"].as_u64().unwrap_or(0);
(c, s)
let t = cfg["target_timestamp"].as_str().unwrap_or("?").to_string();
(c, t)
})
.unwrap_or_else(|| ("?".to_string(), 0));
format!("WAIT '{cron}' ({secs}s){name_suffix}")
.unwrap_or_else(|| ("?".to_string(), "?".to_string()));
format!("WAIT '{cron}' (until {target}){name_suffix}")
}
"HTTP" => {
// Parse config to get method and URL
Expand Down
16 changes: 16 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,22 @@ mod tests {
let json = crate::dsl::wait_for_schedule("*/5 * * * *");
let fut = Durofut::from_json(&json);
assert_eq!(fut.node_type, "WAIT_SCHEDULE");
// Config must contain target_timestamp (RFC 3339), not a pre-computed wait_seconds
let config: serde_json::Value =
serde_json::from_str(fut.query.as_ref().expect("query must be set")).unwrap();
assert!(
config.get("target_timestamp").is_some(),
"config should have target_timestamp"
);
assert!(
config.get("wait_seconds").is_none(),
"config must not have pre-computed wait_seconds"
);
assert_eq!(
config["cron_expr"].as_str().unwrap(),
"*/5 * * * *",
"cron_expr should be preserved"
);
}

#[pg_test]
Expand Down
29 changes: 25 additions & 4 deletions src/orchestrations/execute_function_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,37 @@ async fn execute_wait_schedule_node(
.as_ref()
.ok_or_else(|| format!("WAIT_SCHEDULE node {node_id} has no config"))?;

// Parse pre-computed config from DSL time
let config: serde_json::Value = serde_json::from_str(config_str)
.map_err(|e| format!("Invalid WAIT_SCHEDULE config: {e}"))?;

let wait_seconds = config["wait_seconds"]
.as_u64()
.ok_or_else(|| "WAIT_SCHEDULE missing wait_seconds".to_string())?;
let target_timestamp = config["target_timestamp"]
.as_str()
.ok_or_else(|| "WAIT_SCHEDULE missing target_timestamp".to_string())?;

let cron_expr = config["cron_expr"].as_str().unwrap_or("?");

// Delegate wall-clock I/O to an activity so the orchestration stays
// deterministic. The activity computes max(0, target - now()) at actual
// execution time, correctly accounting for any delay since df.start().
let activity_input = serde_json::json!({
"target_timestamp": target_timestamp,
"cron_expr": cron_expr,
});

let result_json = ctx
.schedule_activity(
activities::compute_cron_wait::NAME,
activity_input.to_string(),
)
.await?;

let result: serde_json::Value = serde_json::from_str(&result_json)
.map_err(|e| format!("Invalid compute_cron_wait output: {e}"))?;

let wait_seconds = result["wait_seconds"]
.as_u64()
.ok_or_else(|| "compute_cron_wait output missing wait_seconds".to_string())?;

ctx.trace_info(format!(
"Waiting {wait_seconds} seconds until schedule: {cron_expr}"
));
Expand Down
3 changes: 3 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub fn create_activity_registry(pool: Arc<PgPool>, semaphore: Arc<Semaphore>) ->
let pool = http_pool.clone();
async move { activities::execute_http::execute(ctx, pool, config_json).await }
})
.register(activities::compute_cron_wait::NAME, |ctx: ActivityContext, input_json: String| {
async move { activities::compute_cron_wait::execute(ctx, input_json).await }
})
.build()
}

Expand Down
Loading