Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 51 additions & 51 deletions Cargo.lock

Large diffs are not rendered by default.

78 changes: 78 additions & 0 deletions engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions engine/packages/api-peer/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,39 @@ pub async fn create(

Ok(CreateResponse { actor: res.actor })
}

#[tracing::instrument(skip_all)]
pub async fn create2(
ctx: ApiCtx,
_path: (),
query: CreateQuery,
body: CreateRequest,
) -> Result<CreateResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let actor_id = Id::new_v1(ctx.config().dc_label());

let res = ctx
.op(pegboard::ops::actor::create::Input2 {
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: body.key,
pool_name: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
// ends up forwarding to another.
forward_request: true,
// api-peer is always creating in its own datacenter
datacenter_name: None,
})
.await?;

Ok(CreateResponse { actor: res.actor })
}
91 changes: 91 additions & 0 deletions engine/packages/api-peer/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,97 @@ pub async fn get_or_create(
}
}

#[tracing::instrument(skip_all)]
pub async fn get_or_create2(
ctx: ApiCtx,
_path: (),
query: GetOrCreateQuery,
body: GetOrCreateRequest,
) -> Result<GetOrCreateResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Check if actor already exists for the key
let existing = ctx
.op(pegboard::ops::actor::get_for_key::Input {
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: body.key.clone(),
fetch_error: true,
})
.await?;

if let Some(actor) = existing.actor {
// Actor exists, return it
return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Actor doesn't exist, create it
let actor_id = Id::new_v1(ctx.config().dc_label());

match ctx
.op(pegboard::ops::actor::create::Input2 {
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: Some(body.key.clone()),
pool_name: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
// ends up forwarding to another.
forward_request: true,
// api-peer is always creating in its own datacenter
datacenter_name: None,
})
.await
{
Ok(res) => Ok(GetOrCreateResponse {
actor: res.actor,
created: true,
}),
Err(err) => {
// Check if this is a DuplicateKey error and extract the existing actor ID
if let Some(existing_actor_id) = extract_duplicate_key_error(&err) {
tracing::info!(
?existing_actor_id,
"received duplicate key error, fetching existing actor"
);

// Fetch the existing actor - it should be in this datacenter since
// the duplicate key error came from this datacenter
let res = ctx
.op(pegboard::ops::actor::get::Input {
actor_ids: vec![existing_actor_id],
fetch_error: true,
})
.await?;

let actor = res
.actors
.into_iter()
.next()
.ok_or_else(|| pegboard::errors::Actor::NotFound.build())?;

return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Re-throw the original error if it's not a DuplicateKey
Err(err)
}
}
}

/// Helper function to extract the existing actor ID from a duplicate key error
///
/// Returns Some(actor_id) if the error is a duplicate key error with metadata, None otherwise
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ pub async fn router(
// MARK: Actors
.route("/actors", get(actors::list::list))
.route("/actors", post(actors::create::create))
.route("/actors2", post(actors::create::create2))
.route("/actors", put(actors::get_or_create::get_or_create))
.route("/actors2", put(actors::get_or_create::get_or_create2))
.route("/actors/{actor_id}", delete(actors::delete::delete))
.route("/actors/names", get(actors::list_names::list_names))
.route(
Expand Down
64 changes: 64 additions & 0 deletions engine/packages/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,67 @@ async fn create_inner(
.await
}
}

#[utoipa::path(
post,
operation_id = "actors2_create",
path = "/actors2",
params(CreateQuery),
request_body(content = CreateRequest, content_type = "application/json"),
responses(
(status = 200, body = CreateResponse),
),
)]
pub async fn create2(
Extension(ctx): Extension<ApiCtx>,
Query(query): Query<CreateQuery>,
Json(body): Json<CreateRequest>,
) -> Response {
match create2_inner(ctx, query, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

#[tracing::instrument(skip_all)]
async fn create2_inner(
ctx: ApiCtx,
query: CreateQuery,
body: CreateRequest,
) -> Result<CreateResponse> {
ctx.skip_auth();

let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let target_dc_label = super::utils::find_dc_for_actor_creation(
&ctx,
namespace.namespace_id,
&query.namespace,
&body.runner_name_selector,
body.datacenter.as_ref().map(String::as_str),
)
.await?;

let query = rivet_api_types::actors::create::CreateQuery {
namespace: query.namespace,
};

if target_dc_label == ctx.config().dc_label() {
rivet_api_peer::actors::create::create2(ctx.into(), (), query, body).await
} else {
request_remote_datacenter::<CreateResponse>(
ctx.config(),
target_dc_label,
"/actors2",
axum::http::Method::POST,
Some(&query),
Some(&body),
)
.await
}
}
64 changes: 64 additions & 0 deletions engine/packages/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,67 @@ async fn get_or_create_inner(
.await
}
}

#[utoipa::path(
put,
operation_id = "actors2_get_or_create",
path = "/actors2",
params(GetOrCreateQuery),
request_body(content = GetOrCreateRequest, content_type = "application/json"),
responses(
(status = 200, body = GetOrCreateResponse),
),
)]
pub async fn get_or_create2(
Extension(ctx): Extension<ApiCtx>,
Query(query): Query<GetOrCreateQuery>,
Json(body): Json<GetOrCreateRequest>,
) -> Response {
match get_or_create_inner2(ctx, query, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

#[tracing::instrument(skip_all)]
async fn get_or_create_inner2(
ctx: ApiCtx,
query: GetOrCreateQuery,
body: GetOrCreateRequest,
) -> Result<GetOrCreateResponse> {
ctx.skip_auth();

let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

let target_dc_label = super::utils::find_dc_for_actor_creation(
&ctx,
namespace.namespace_id,
&query.namespace,
&body.runner_name_selector,
body.datacenter.as_ref().map(String::as_str),
)
.await?;

let query = GetOrCreateQuery {
namespace: query.namespace,
};

if target_dc_label == ctx.config().dc_label() {
rivet_api_peer::actors::get_or_create::get_or_create2(ctx.into(), (), query, body).await
} else {
request_remote_datacenter::<GetOrCreateResponse>(
ctx.config(),
target_dc_label,
"/actors2",
axum::http::Method::PUT,
Some(&query),
Some(&body),
)
.await
}
}
7 changes: 7 additions & 0 deletions engine/packages/api-public/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi
paths(
actors::list::list,
actors::create::create,
actors::create::create2,
actors::delete::delete,
actors::list_names::list_names,
actors::get_or_create::get_or_create,
actors::get_or_create::get_or_create2,
actors::kv_get::kv_get,
runners::list,
runners::list_names,
Expand Down Expand Up @@ -79,10 +81,15 @@ pub async fn router(
// MARK: Actors
.route("/actors", axum::routing::get(actors::list::list))
.route("/actors", axum::routing::post(actors::create::create))
.route("/actors2", axum::routing::post(actors::create::create2))
.route(
"/actors",
axum::routing::put(actors::get_or_create::get_or_create),
)
.route(
"/actors2",
axum::routing::put(actors::get_or_create::get_or_create2),
)
.route(
"/actors/{actor_id}",
axum::routing::delete(actors::delete::delete),
Expand Down
Loading
Loading