diff --git a/rust/api/src/kv_store_tests.rs b/rust/api/src/kv_store_tests.rs index 3e3ad1d..5f8ed44 100644 --- a/rust/api/src/kv_store_tests.rs +++ b/rust/api/src/kv_store_tests.rs @@ -53,6 +53,8 @@ macro_rules! define_kv_store_tests { create_test!(list_should_honour_page_size_and_key_prefix_if_provided); create_test!(list_should_return_zero_global_version_when_global_versioning_not_enabled); create_test!(list_should_limit_max_page_size); + create_test!(list_should_return_results_ordered_by_creation_time); + create_test!(list_should_paginate_by_creation_time_with_prefix); }; } @@ -506,6 +508,59 @@ pub trait KvStoreTestSuite { Ok(()) } + async fn list_should_return_results_ordered_by_creation_time() -> Result<(), VssError> { + let kv_store = Self::create_store().await; + let ctx = TestContext::new(&kv_store); + + // Insert keys in reverse-alphabetical order so creation order != key order. + ctx.put_objects(Some(0), vec![kv("z_first", "v1", 0)]).await?; + ctx.put_objects(Some(1), vec![kv("m_second", "v1", 0)]).await?; + ctx.put_objects(Some(2), vec![kv("a_third", "v1", 0)]).await?; + + let page = ctx.list(None, None, None).await?; + let keys: Vec<&str> = page.key_versions.iter().map(|kv| kv.key.as_str()).collect(); + + // Results should be in creation order, not alphabetical. + assert_eq!(keys, vec!["z_first", "m_second", "a_third"]); + + Ok(()) + } + + async fn list_should_paginate_by_creation_time_with_prefix() -> Result<(), VssError> { + let kv_store = Self::create_store().await; + let ctx = TestContext::new(&kv_store); + + // Insert prefixed keys in reverse-alphabetical order with a page_size of 1 + // to force multiple pages and verify cross-page ordering. + ctx.put_objects(Some(0), vec![kv("pfx_z", "v1", 0)]).await?; + ctx.put_objects(Some(1), vec![kv("pfx_a", "v1", 0)]).await?; + ctx.put_objects(Some(2), vec![kv("other", "v1", 0)]).await?; + ctx.put_objects(Some(3), vec![kv("pfx_m", "v1", 0)]).await?; + + let mut next_page_token: Option = None; + let mut all_keys: Vec = Vec::new(); + + loop { + let current_page = match next_page_token.take() { + None => ctx.list(None, Some(1), Some("pfx_".to_string())).await?, + Some(token) => ctx.list(Some(token), Some(1), Some("pfx_".to_string())).await?, + }; + + if current_page.key_versions.is_empty() { + break; + } + + assert!(current_page.key_versions.len() <= 1); + all_keys.extend(current_page.key_versions.into_iter().map(|kv| kv.key)); + next_page_token = current_page.next_page_token; + } + + // Should get prefixed keys in creation order, excluding "other". + assert_eq!(all_keys, vec!["pfx_z", "pfx_a", "pfx_m"]); + + Ok(()) + } + async fn list_should_limit_max_page_size() -> Result<(), VssError> { let kv_store = Self::create_store().await; let ctx = TestContext::new(&kv_store); diff --git a/rust/impls/src/migrations.rs b/rust/impls/src/migrations.rs index a39981b..dbbb6b9 100644 --- a/rust/impls/src/migrations.rs +++ b/rust/impls/src/migrations.rs @@ -35,6 +35,9 @@ pub(crate) const MIGRATIONS: &[&str] = &[ PRIMARY KEY (user_token, store_id, key) );", "ALTER TABLE vss_db DROP CONSTRAINT IF EXISTS vss_db_store_id_check;", + "UPDATE vss_db SET created_at = COALESCE(last_updated_at, NOW()) WHERE created_at IS NULL;", + "ALTER TABLE vss_db ALTER COLUMN created_at SET NOT NULL;", + "CREATE INDEX idx_vss_db_created_at ON vss_db (user_token, store_id, created_at, key);", ]; #[cfg(test)] pub(crate) const DUMMY_MIGRATION: &str = "SELECT 1 WHERE FALSE;"; diff --git a/rust/impls/src/postgres_store.rs b/rust/impls/src/postgres_store.rs index daaf437..89eb4e3 100644 --- a/rust/impls/src/postgres_store.rs +++ b/rust/impls/src/postgres_store.rs @@ -33,6 +33,25 @@ pub(crate) struct VssDbRecord { const KEY_COLUMN: &str = "key"; const VALUE_COLUMN: &str = "value"; const VERSION_COLUMN: &str = "version"; +const CREATED_AT_COLUMN: &str = "created_at"; + +/// Page token is encoded as 20-char zero-padded epoch microseconds followed by the key. +fn encode_page_token(created_at: &chrono::DateTime, key: &str) -> String { + format!("{:020}{}", created_at.timestamp_micros(), key) +} + +fn decode_page_token(token: &str) -> Result<(chrono::DateTime, String), VssError> { + if token.len() < 20 { + return Err(VssError::InvalidRequestError("Invalid page token".to_string())); + } + let micros: i64 = token[..20] + .parse() + .map_err(|_| VssError::InvalidRequestError("Invalid page token".to_string()))?; + let created_at = chrono::DateTime::from_timestamp_micros(micros) + .ok_or_else(|| VssError::InvalidRequestError("Invalid page token".to_string()))?; + let key = token[20..].to_string(); + Ok((created_at, key)) +} /// The maximum number of key versions that can be returned in a single page. /// @@ -663,17 +682,24 @@ where let conn = self.pool.get().await?; - let stmt = "SELECT key, version FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key > $3 AND key LIKE $4 ORDER BY key LIMIT $5"; - let key_like = format!("{}%", key_prefix.as_deref().unwrap_or_default()); - let page_token_param = page_token.as_deref().unwrap_or_default(); - let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = - vec![&user_token, &store_id, &page_token_param, &key_like, &limit]; - let rows = conn - .query(stmt, ¶ms) - .await - .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?; + let rows = if let Some(ref token) = page_token { + let (page_created_at, page_key) = decode_page_token(token)?; + let stmt = "SELECT key, version, created_at FROM vss_db WHERE user_token = $1 AND store_id = $2 AND (created_at, key) > ($3, $4) AND key LIKE $5 ORDER BY created_at, key LIMIT $6"; + let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + vec![&user_token, &store_id, &page_created_at, &page_key, &key_like, &limit]; + conn.query(stmt, ¶ms) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))? + } else { + let stmt = "SELECT key, version, created_at FROM vss_db WHERE user_token = $1 AND store_id = $2 AND key LIKE $3 ORDER BY created_at, key LIMIT $4"; + let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = + vec![&user_token, &store_id, &key_like, &limit]; + conn.query(stmt, ¶ms) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))? + }; let key_versions: Vec<_> = rows .iter() @@ -686,8 +712,10 @@ where .collect(); let mut next_page_token = Some("".to_string()); - if !key_versions.is_empty() { - next_page_token = key_versions.get(key_versions.len() - 1).map(|kv| kv.key.to_string()); + if let Some(last_kv) = key_versions.last() { + let last_created_at = + rows[rows.len() - 1].get::<&str, chrono::DateTime>(CREATED_AT_COLUMN); + next_page_token = Some(encode_page_token(&last_created_at, &last_kv.key)); } Ok(ListKeyVersionsResponse { key_versions, next_page_token, global_version }) @@ -696,11 +724,12 @@ where #[cfg(test)] mod tests { - use super::{drop_database, DUMMY_MIGRATION, MIGRATIONS}; + use super::{decode_page_token, drop_database, encode_page_token, DUMMY_MIGRATION, MIGRATIONS}; use crate::postgres_store::PostgresPlaintextBackend; use api::define_kv_store_tests; use api::kv_store::KvStore; use api::types::{DeleteObjectRequest, GetObjectRequest, KeyValue, PutObjectRequest}; + use chrono::{TimeZone, Utc as ChronoUtc}; use bytes::Bytes; use tokio::sync::OnceCell; @@ -884,4 +913,27 @@ mod tests { drop_database(POSTGRES_ENDPOINT, DEFAULT_DB, vss_db, NoTls).await.unwrap(); } + + #[test] + fn page_token_roundtrips() { + let created_at = ChronoUtc.with_ymd_and_hms(2026, 3, 15, 12, 30, 45).unwrap(); + let key = "some/test/key"; + + let token = encode_page_token(&created_at, key); + let (decoded_time, decoded_key) = decode_page_token(&token).unwrap(); + + assert_eq!(decoded_time, created_at); + assert_eq!(decoded_key, key); + } + + #[test] + fn page_token_rejects_short_input() { + assert!(decode_page_token("tooshort").is_err()); + assert!(decode_page_token("").is_err()); + } + + #[test] + fn page_token_rejects_non_numeric_prefix() { + assert!(decode_page_token("abcdefghijklmnopqrstkey").is_err()); + } }