Skip to content

Commit 07eb897

Browse files
committed
preparatory cleanup before enabling scheduled procedures
1 parent 8c593b0 commit 07eb897

File tree

1 file changed

+62
-53
lines changed

1 file changed

+62
-53
lines changed

crates/core/src/host/scheduler.rs

Lines changed: 62 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ use rustc_hash::FxHashMap;
77
use spacetimedb_client_api_messages::energy::EnergyQuanta;
88
use spacetimedb_lib::scheduler::ScheduleAt;
99
use spacetimedb_lib::Timestamp;
10+
use spacetimedb_primitives::ReducerId;
1011
use spacetimedb_primitives::{ColId, TableId};
1112
use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue};
13+
use spacetimedb_schema::def::deserialize::ArgsSeed;
14+
use spacetimedb_schema::def::ReducerDef;
1215
use spacetimedb_table::table::RowRef;
1316
use tokio::sync::mpsc;
1417
use tokio::time::Instant;
@@ -28,11 +31,11 @@ use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHED
2831
use spacetimedb_datastore::traits::IsolationLevel;
2932

3033
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
31-
pub struct ScheduledReducerId {
32-
/// The ID of the table whose rows hold the scheduled reducers.
33-
/// This table should have a entry in `ST_SCHEDULED`.
34+
pub struct ScheduledFunctionId {
35+
/// The ID of the table whose rows hold the scheduled reducers or procedures.
36+
/// This table should have an entry in `ST_SCHEDULED`.
3437
table_id: TableId,
35-
/// The particular schedule row in the reducer scheduling table referred to by `self.table_id`.
38+
/// The particular schedule row in the scheduling table referred to by `self.table_id`.
3639
schedule_id: u64,
3740
// These may seem redundant, but they're actually free - they fit in the struct padding.
3841
// `scheduled_id: u64, table_id: u32, id_column: u16, at_column: u16` == 16 bytes, same as
@@ -43,7 +46,7 @@ pub struct ScheduledReducerId {
4346
at_column: ColId,
4447
}
4548

46-
spacetimedb_table::static_assert_size!(ScheduledReducerId, 16);
49+
spacetimedb_table::static_assert_size!(ScheduledFunctionId, 16);
4750

4851
enum MsgOrExit<T> {
4952
Msg(T),
@@ -52,20 +55,20 @@ enum MsgOrExit<T> {
5255

5356
enum SchedulerMessage {
5457
Schedule {
55-
id: ScheduledReducerId,
58+
id: ScheduledFunctionId,
5659
/// The timestamp we'll tell the reducer it is.
5760
effective_at: Timestamp,
5861
/// The actual instant we're scheduling for.
5962
real_at: Instant,
6063
},
6164
ScheduleImmediate {
62-
reducer_name: String,
65+
function_name: String,
6366
args: FunctionArgs,
6467
},
6568
}
6669

67-
pub struct ScheduledReducer {
68-
reducer: Box<str>,
70+
pub struct ScheduledFunction {
71+
function: Box<str>,
6972
bsatn_args: Vec<u8>,
7073
}
7174

@@ -121,7 +124,7 @@ impl SchedulerStarter {
121124
// calculate duration left to call the scheduled reducer
122125
let duration = schedule_at.to_duration_from(now_ts);
123126
let at = schedule_at.to_timestamp_from(now_ts);
124-
let id = ScheduledReducerId {
127+
let id = ScheduledFunctionId {
125128
table_id,
126129
schedule_id,
127130
id_column,
@@ -130,7 +133,7 @@ impl SchedulerStarter {
130133
let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration);
131134

132135
// This should never happen as duplicate entries should be gated by unique
133-
// constraint voilation in scheduled tables.
136+
// constraint violation in scheduled tables.
134137
if key_map.insert(id, key).is_some() {
135138
return Err(anyhow!(
136139
"Duplicate key found in scheduler queue: table_id {}, schedule_id {}",
@@ -195,21 +198,24 @@ pub enum ScheduleError {
195198
}
196199

197200
impl Scheduler {
198-
/// Schedule a reducer to run from a scheduled table.
201+
/// Schedule a reducer/procedure to run from a scheduled table.
199202
///
200-
/// `reducer_start` is the timestamp of the start of the current reducer.
203+
/// `fn_start` is the timestamp of the start of the current reducer/procedure.
201204
pub(super) fn schedule(
202205
&self,
203206
table_id: TableId,
204207
schedule_id: u64,
205208
schedule_at: ScheduleAt,
206209
id_column: ColId,
207210
at_column: ColId,
208-
reducer_start: Timestamp,
211+
fn_start: Timestamp,
209212
) -> Result<(), ScheduleError> {
210213
// if `Timestamp::now()` is properly monotonic, use it; otherwise, use
211214
// the start of the reducer run as "now" for purposes of scheduling
212-
let now = reducer_start.max(Timestamp::now());
215+
// TODO(procedure-tx): when we do `with_tx` in a procedure,
216+
// it inherits the timestamp of the procedure,
217+
// which could become a problem here for long running procedures.
218+
let now = fn_start.max(Timestamp::now());
213219

214220
// Check that `at` is within `tokio_utils::time::DelayQueue`'s
215221
// accepted time-range.
@@ -229,7 +235,7 @@ impl Scheduler {
229235
// if the actor has exited, it's fine to ignore; it means that the host actor calling
230236
// schedule will exit soon as well, and it'll be scheduled to run when the module host restarts
231237
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::Schedule {
232-
id: ScheduledReducerId {
238+
id: ScheduledFunctionId {
233239
table_id,
234240
schedule_id,
235241
id_column,
@@ -242,9 +248,9 @@ impl Scheduler {
242248
Ok(())
243249
}
244250

245-
pub fn volatile_nonatomic_schedule_immediate(&self, reducer_name: String, args: FunctionArgs) {
251+
pub fn volatile_nonatomic_schedule_immediate(&self, function_name: String, args: FunctionArgs) {
246252
let _ = self.tx.send(MsgOrExit::Msg(SchedulerMessage::ScheduleImmediate {
247-
reducer_name,
253+
function_name,
248254
args,
249255
}));
250256
}
@@ -261,13 +267,13 @@ impl Scheduler {
261267
struct SchedulerActor {
262268
rx: mpsc::UnboundedReceiver<MsgOrExit<SchedulerMessage>>,
263269
queue: DelayQueue<QueueItem>,
264-
key_map: FxHashMap<ScheduledReducerId, delay_queue::Key>,
270+
key_map: FxHashMap<ScheduledFunctionId, delay_queue::Key>,
265271
module_host: WeakModuleHost,
266272
}
267273

268274
pub(crate) enum QueueItem {
269-
Id { id: ScheduledReducerId, at: Timestamp },
270-
VolatileNonatomicImmediate { reducer_name: String, args: FunctionArgs },
275+
Id { id: ScheduledFunctionId, at: Timestamp },
276+
VolatileNonatomicImmediate { function_name: String, args: FunctionArgs },
271277
}
272278

273279
#[cfg(target_pointer_width = "64")]
@@ -304,9 +310,9 @@ impl SchedulerActor {
304310
let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at);
305311
self.key_map.insert(id, key);
306312
}
307-
SchedulerMessage::ScheduleImmediate { reducer_name, args } => {
313+
SchedulerMessage::ScheduleImmediate { function_name, args } => {
308314
self.queue.insert(
309-
QueueItem::VolatileNonatomicImmediate { reducer_name, args },
315+
QueueItem::VolatileNonatomicImmediate { function_name, args },
310316
Duration::ZERO,
311317
);
312318
}
@@ -332,43 +338,44 @@ impl SchedulerActor {
332338
let res = tokio::spawn(async move { module_host.call_scheduled_reducer(item).await }).await;
333339

334340
match res {
335-
// if we didn't actually call the reducer because the module exited or it was already deleted, leave
336-
// the ScheduledReducer in the database for when the module restarts
341+
// If we didn't actually call the function
342+
// because the module exited or it was already deleted,
343+
// leave the `ScheduledFunction` in the database for when the module restarts.
337344
Ok(Err(ReducerCallError::NoSuchModule(_)) | Err(ReducerCallError::ScheduleReducerNotFound)) => {}
338345

339346
Ok(Ok((_, ts))) => {
340347
if let Some(id) = id {
341-
let _ = self.delete_scheduled_reducer_row(&db, id, module_host_clone, ts).await;
348+
let _ = self.delete_scheduled_function_row(&db, id, module_host_clone, ts).await;
342349
}
343350
}
344351

345-
// delete the scheduled reducer row if its not repeated reducer
352+
// Delete the scheduled function row if its not repeated function.
346353
Ok(_) | Err(_) => {
347354
if let Some(id) = id {
348355
// TODO: Handle errors here?
349356
let _ = self
350-
.delete_scheduled_reducer_row(&db, id, module_host_clone, Timestamp::now())
357+
.delete_scheduled_function_row(&db, id, module_host_clone, Timestamp::now())
351358
.await;
352359
}
353360
}
354361
}
355362

356363
if let Err(e) = res {
357-
log::error!("invoking scheduled reducer failed: {e:#}");
364+
log::error!("invoking scheduled function failed: {e:#}");
358365
};
359366
}
360367

361-
async fn delete_scheduled_reducer_row(
368+
async fn delete_scheduled_function_row(
362369
&mut self,
363370
db: &RelationalDB,
364-
id: ScheduledReducerId,
371+
id: ScheduledFunctionId,
365372
module_host: ModuleHost,
366373
ts: Timestamp,
367374
) -> anyhow::Result<()> {
368375
let host_clone = module_host.clone();
369376
let db = db.clone();
370377
let schedule_at = host_clone
371-
.on_module_thread("delete_scheduled_reducer_row", move || {
378+
.on_module_thread("delete_scheduled_function_row", move || {
372379
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
373380

374381
match get_schedule_row_mut(&tx, &db, id) {
@@ -426,19 +433,16 @@ pub(crate) fn handle_queued_call_reducer_params(
426433
let Ok(schedule_row) = get_schedule_row_mut(tx, db, id) else {
427434
// if the row is not found, it means the schedule is cancelled by the user
428435
log::debug!(
429-
"table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}",
436+
"table row corresponding to yield scheduler id not found: table_id {}, scheduler_id {}",
430437
id.table_id,
431438
id.schedule_id
432439
);
433440
return Ok(None);
434441
};
435442

436-
let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?;
443+
let ScheduledFunction { function, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?;
437444

438-
let (reducer_id, reducer_seed) = module_info
439-
.module_def
440-
.reducer_arg_deserialize_seed(&reducer[..])
441-
.ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?;
445+
let (reducer_id, reducer_seed) = find_reducer(module_info, &function)?;
442446

443447
let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?;
444448

@@ -453,11 +457,8 @@ pub(crate) fn handle_queued_call_reducer_params(
453457
reducer_args,
454458
)))
455459
}
456-
QueueItem::VolatileNonatomicImmediate { reducer_name, args } => {
457-
let (reducer_id, reducer_seed) = module_info
458-
.module_def
459-
.reducer_arg_deserialize_seed(&reducer_name[..])
460-
.ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?;
460+
QueueItem::VolatileNonatomicImmediate { function_name, args } => {
461+
let (reducer_id, reducer_seed) = find_reducer(module_info, &function_name)?;
461462
let reducer_args = args.into_tuple(reducer_seed)?;
462463

463464
Ok(Some(CallReducerParams::from_system(
@@ -470,6 +471,13 @@ pub(crate) fn handle_queued_call_reducer_params(
470471
}
471472
}
472473

474+
fn find_reducer<'a>(module_info: &'a ModuleInfo, name: &str) -> anyhow::Result<(ReducerId, ArgsSeed<'a, ReducerDef>)> {
475+
module_info
476+
.module_def
477+
.reducer_arg_deserialize_seed(name)
478+
.ok_or_else(|| anyhow!("Reducer not found: {name}"))
479+
}
480+
473481
fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) {
474482
let caller_identity = module_host.info().database_identity;
475483

@@ -495,40 +503,41 @@ fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) {
495503
}
496504
}
497505

498-
/// Generate `ScheduledReducer` for given `ScheduledReducerId`
506+
/// Generate [`ScheduledFunction`] for given [`ScheduledFunctionId`].
499507
fn process_schedule(
500508
tx: &MutTxId,
501509
db: &RelationalDB,
502510
table_id: TableId,
503511
schedule_row: &RowRef<'_>,
504-
) -> Result<ScheduledReducer, anyhow::Error> {
505-
// get reducer name from `ST_SCHEDULED` table
512+
) -> Result<ScheduledFunction, anyhow::Error> {
513+
// Get reducer name from `ST_SCHEDULED` table.
506514
let table_id_col = StScheduledFields::TableId.col_id();
507-
let reducer_name_col = StScheduledFields::ReducerName.col_id();
515+
let function_name_col = StScheduledFields::ReducerName.col_id();
508516
let st_scheduled_row = db
509517
.iter_by_col_eq_mut(tx, ST_SCHEDULED_ID, table_id_col, &table_id.into())?
510518
.next()
511519
.ok_or_else(|| anyhow!("Scheduled table with id {table_id} entry does not exist in `st_scheduled`"))?;
512-
let reducer = st_scheduled_row.read_col::<Box<str>>(reducer_name_col)?;
520+
let function = st_scheduled_row.read_col::<Box<str>>(function_name_col)?;
513521

514-
Ok(ScheduledReducer {
515-
reducer,
522+
Ok(ScheduledFunction {
523+
function,
516524
bsatn_args: schedule_row.to_bsatn_vec()?,
517525
})
518526
}
519527

520-
/// Helper to get schedule_row with `MutTxId`
528+
/// Helper to get `schedule_row` with `MutTxId`.
521529
fn get_schedule_row_mut<'a>(
522530
tx: &'a MutTxId,
523531
db: &'a RelationalDB,
524-
id: ScheduledReducerId,
532+
id: ScheduledFunctionId,
525533
) -> anyhow::Result<RowRef<'a>> {
526534
db.iter_by_col_eq_mut(tx, id.table_id, id.id_column, &id.schedule_id.into())?
527535
.next()
528536
.ok_or_else(|| anyhow!("Schedule with ID {} not found in table {}", id.schedule_id, id.table_id))
529537
}
530538

531-
/// Helper to get schedule_id and schedule_at from schedule_row product value
539+
/// Helper to get `schedule_id` and `schedule_at`
540+
/// from `schedule_row` product value.
532541
pub fn get_schedule_from_row(
533542
row: &RowRef<'_>,
534543
id_column: ColId,

0 commit comments

Comments
 (0)