Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 1 addition & 19 deletions engine/packages/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,8 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::delete::*;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

#[derive(Debug, Deserialize, Serialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct DeleteQuery {
pub namespace: Option<String>,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsDeleteResponse)]
pub struct DeleteResponse {}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DeletePath {
pub actor_id: Id,
}

#[utoipa::path(
delete,
Expand Down
16 changes: 8 additions & 8 deletions engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct CachePurgeRequest {
pub keys: Vec<rivet_cache::RawCacheKey>,
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CachePurgeResponse {}

Expand All @@ -29,7 +29,7 @@ pub async fn cache_purge(
Ok(CachePurgeResponse {})
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BumpServerlessAutoscalerResponse {}

Expand All @@ -55,7 +55,7 @@ pub struct SetTracingConfigRequest {
pub sampler_ratio: Option<Option<f64>>,
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SetTracingConfigResponse {}

Expand Down Expand Up @@ -83,11 +83,11 @@ pub async fn set_tracing_config(
Ok(SetTracingConfigResponse {})
}

#[derive(Deserialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ReplicaReconfigureRequest {}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ReplicaReconfigureResponse {}

Expand All @@ -114,7 +114,7 @@ pub async fn epoxy_replica_reconfigure(
Ok(ReplicaReconfigureResponse {})
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct GetEpoxyStateResponse {
pub config: epoxy::types::ClusterConfig,
Expand Down Expand Up @@ -143,13 +143,13 @@ pub async fn get_epoxy_state(ctx: ApiCtx, _path: (), _query: ()) -> Result<GetEp
})
}

#[derive(Deserialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SetEpoxyStateRequest {
pub config: epoxy::types::ClusterConfig,
}

#[derive(Serialize)]
#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SetEpoxyStateResponse {}

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/api-peer/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListRespon
#[serde(deny_unknown_fields)]
#[schema(as = NamespacesCreateRequest)]
pub struct CreateRequest {
name: String,
display_name: String,
pub name: String,
pub display_name: String,
}

#[derive(Serialize, Deserialize, ToSchema)]
Expand Down
21 changes: 1 addition & 20 deletions engine/packages/api-peer/src/runners.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use anyhow::Result;
use rivet_api_builder::ApiCtx;
use rivet_api_types::{pagination::Pagination, runners::list::*};
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use rivet_api_types::{pagination::Pagination, runners::list::*, runners::list_names::*};

#[utoipa::path(
get,
Expand Down Expand Up @@ -60,23 +58,6 @@ pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListRespon
}
}

#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct ListNamesQuery {
pub namespace: String,
pub limit: Option<usize>,
pub cursor: Option<String>,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = RunnersListNamesResponse)]
pub struct ListNamesResponse {
pub names: Vec<String>,
pub pagination: Pagination,
}

#[tracing::instrument(skip_all)]
pub async fn list_names(
ctx: ApiCtx,
Expand Down
11 changes: 1 addition & 10 deletions engine/packages/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,11 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
};
use rivet_api_types::actors::create::{CreateRequest, CreateResponse};
use rivet_api_types::actors::create::*;
use rivet_api_util::request_remote_datacenter;
use serde::{Deserialize, Serialize};
use utoipa::IntoParams;

use crate::ctx::ApiCtx;

#[derive(Debug, Serialize, Deserialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct CreateQuery {
pub namespace: String,
}

/// ## Datacenter Round Trips
///
/// **If actor is created in the current datacenter:**
Expand Down
28 changes: 2 additions & 26 deletions engine/packages/api-public/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,12 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Path, Query},
};
use rivet_api_types::actors::delete::*;
use rivet_api_util::request_remote_datacenter_raw;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

use crate::ctx::ApiCtx;

#[derive(Debug, Deserialize, Serialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct DeleteQuery {
pub namespace: Option<String>,
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct DeletePath {
pub actor_id: Id,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsDeleteResponse)]
pub struct DeleteResponse {}

/// ## Datacenter Round Trips
///
/// 2 round trip:
Expand Down Expand Up @@ -63,13 +45,7 @@ async fn delete_inner(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Resu
ctx.auth().await?;

if path.actor_id.label() == ctx.config().dc_label() {
let peer_path = rivet_api_peer::actors::delete::DeletePath {
actor_id: path.actor_id,
};
let peer_query = rivet_api_peer::actors::delete::DeleteQuery {
namespace: query.namespace,
};
let res = rivet_api_peer::actors::delete::delete(ctx.into(), peer_path, peer_query).await?;
let res = rivet_api_peer::actors::delete::delete(ctx.into(), path, query).await?;

Ok(Json(res).into_response())
} else {
Expand Down
76 changes: 24 additions & 52 deletions engine/packages/api-public/src/actors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,11 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
};
use rivet_api_types::pagination::Pagination;
use rivet_api_types::{actors::list::*, pagination::Pagination};
use rivet_api_util::fanout_to_datacenters;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

use crate::{actors::utils::fetch_actors_by_ids, ctx::ApiCtx, errors};

#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct ListQuery {
pub namespace: String,
pub name: Option<String>,
pub key: Option<String>,
pub actor_ids: Option<String>,
pub include_destroyed: Option<bool>,
pub limit: Option<usize>,
pub cursor: Option<String>,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = ActorsListResponse)]
pub struct ListResponse {
pub actors: Vec<rivet_types::actors::Actor>,
pub pagination: Pagination,
}

/// ## Datacenter Round Trips
///
/// **If key is some & `include_destroyed` is false**
Expand Down Expand Up @@ -123,15 +100,25 @@ async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Fetch actors
let actors = fetch_actors_by_ids(
let mut actors = fetch_actors_by_ids(
&ctx,
actor_ids,
query.namespace.clone(),
query.include_destroyed,
query.limit,
None, // Don't apply limit in fetch, we'll apply it after cursor filtering
)
.await?;

// Apply cursor filtering if provided
if let Some(cursor_str) = &query.cursor {
let cursor_ts: i64 = cursor_str.parse().context("invalid cursor format")?;
actors.retain(|actor| actor.create_ts < cursor_ts);
}

// Apply limit after cursor filtering
let limit = query.limit.unwrap_or(100);
actors.truncate(limit);

let cursor = actors.last().map(|x| x.create_ts.to_string());

Ok(ListResponse {
Expand Down Expand Up @@ -196,40 +183,25 @@ async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
.build());
}

// Prepare peer query for local handler
let peer_query = rivet_api_types::actors::list::ListQuery {
namespace: query.namespace.clone(),
name: Some(query.name.as_ref().unwrap().clone()),
key: query.key.clone(),
actor_ids: None,
include_destroyed: query.include_destroyed,
limit: query.limit,
cursor: query.cursor.clone(),
};
let limit = query.limit.unwrap_or(100);

// Fanout to all datacenters
let mut actors = fanout_to_datacenters::<
rivet_api_types::actors::list::ListResponse,
_,
_,
_,
_,
Vec<rivet_types::actors::Actor>,
>(
ctx.into(),
"/actors",
peer_query,
|ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await },
|_, res, agg| agg.extend(res.actors),
)
.await?;
let mut actors =
fanout_to_datacenters::<ListResponse, _, _, _, _, Vec<rivet_types::actors::Actor>>(
ctx.into(),
"/actors",
query,
|ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await },
|_, res, agg| agg.extend(res.actors),
)
.await?;

// Sort by create ts desc
actors.sort_by_cached_key(|x| std::cmp::Reverse(x.create_ts));

// Shorten array since returning all actors from all regions could end up returning `regions *
// limit` results, which is a lot.
actors.truncate(query.limit.unwrap_or(100));
actors.truncate(limit);

let cursor = actors.last().map(|x| x.create_ts.to_string());

Expand Down
Loading
Loading