diff --git a/cmd/ateapi/internal/controlapi/create_actor.go b/cmd/ateapi/internal/controlapi/create_actor.go index 54e36521d..c5bafa328 100644 --- a/cmd/ateapi/internal/controlapi/create_actor.go +++ b/cmd/ateapi/internal/controlapi/create_actor.go @@ -41,7 +41,16 @@ func (s *Service) CreateActor(ctx context.Context, req *ateapipb.CreateActorRequ return nil, fmt.Errorf("while getting ActorTemplate: %w", err) } - id := req.GetActorId() + // The atespace must already exist. + exists, err := s.persistence.AtespaceExists(ctx, req.GetRef().GetAtespace()) + if err != nil { + return nil, fmt.Errorf("while checking atespace: %w", err) + } + if !exists { + return nil, status.Errorf(codes.FailedPrecondition, "Atespace %s not found", req.GetRef().GetAtespace()) + } + + id := req.GetRef().GetName() actor := &ateapipb.Actor{ ActorId: id, Version: 1, @@ -49,6 +58,7 @@ func (s *Service) CreateActor(ctx context.Context, req *ateapipb.CreateActorRequ ActorTemplateNamespace: req.GetActorTemplateNamespace(), ActorTemplateName: req.GetActorTemplateName(), WorkerSelector: req.GetWorkerSelector(), + Atespace: req.GetRef().GetAtespace(), } err = s.persistence.CreateActor(ctx, actor) if err != nil { @@ -58,7 +68,7 @@ func (s *Service) CreateActor(ctx context.Context, req *ateapipb.CreateActorRequ return nil, fmt.Errorf("while recording actor: %w", err) } - storedActor, err := s.persistence.GetActor(ctx, id) + storedActor, err := s.persistence.GetActor(ctx, req.GetRef().GetAtespace(), id) if err != nil { return nil, fmt.Errorf("while fetching recorded actor from DB: %w", err) } @@ -75,10 +85,16 @@ func validateCreateActorRequest(req *ateapipb.CreateActorRequest) error { if req.GetActorTemplateName() == "" { return status.Error(codes.InvalidArgument, "actor_template_name is required") } - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "actor_id is required") } - if err := resources.ValidateActorID(req.GetActorId()); err != nil { + if err := resources.ValidateActorID(req.GetRef().GetName()); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } + if err := resources.ValidateAtespace(req.GetRef().GetAtespace()); err != nil { return status.Error(codes.InvalidArgument, err.Error()) } if err := validateSelector(req.GetWorkerSelector()); err != nil { diff --git a/cmd/ateapi/internal/controlapi/create_atespace.go b/cmd/ateapi/internal/controlapi/create_atespace.go new file mode 100644 index 000000000..6522c6084 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/create_atespace.go @@ -0,0 +1,53 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "errors" + "fmt" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" + "github.com/agent-substrate/substrate/internal/resources" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Service) CreateAtespace(ctx context.Context, req *ateapipb.CreateAtespaceRequest) (*ateapipb.CreateAtespaceResponse, error) { + if err := validateCreateAtespaceRequest(req); err != nil { + return nil, err + } + + atespace := &ateapipb.Atespace{Name: req.GetName()} + if err := s.persistence.CreateAtespace(ctx, atespace); err != nil { + if errors.Is(err, store.ErrAlreadyExists) { + return nil, status.Errorf(codes.AlreadyExists, "Atespace %s already exists", req.GetName()) + } + return nil, fmt.Errorf("while recording atespace: %w", err) + } + + return &ateapipb.CreateAtespaceResponse{Atespace: atespace}, nil +} + +func validateCreateAtespaceRequest(req *ateapipb.CreateAtespaceRequest) error { + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "name is required") + } + if err := resources.ValidateAtespace(req.GetName()); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + return nil +} diff --git a/cmd/ateapi/internal/controlapi/delete_actor.go b/cmd/ateapi/internal/controlapi/delete_actor.go index db70a32f3..3bbc5d151 100644 --- a/cmd/ateapi/internal/controlapi/delete_actor.go +++ b/cmd/ateapi/internal/controlapi/delete_actor.go @@ -31,16 +31,16 @@ func (s *Service) DeleteActor(ctx context.Context, req *ateapipb.DeleteActorRequ return nil, err } - if err := s.persistence.DeleteActor(ctx, req.GetActorId()); err != nil { + if err := s.persistence.DeleteActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()); err != nil { if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } if errors.Is(err, store.ErrFailedPrecondition) { - actor, getErr := s.persistence.GetActor(ctx, req.GetActorId()) + actor, getErr := s.persistence.GetActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()) if getErr == nil { - return nil, status.Errorf(codes.FailedPrecondition, "Actor %s is not suspended (status: %v)", req.GetActorId(), actor.GetStatus()) + return nil, status.Errorf(codes.FailedPrecondition, "Actor %s is not suspended (status: %v)", req.GetRef().GetName(), actor.GetStatus()) } - return nil, status.Errorf(codes.FailedPrecondition, "Actor %s is not suspended", req.GetActorId()) + return nil, status.Errorf(codes.FailedPrecondition, "Actor %s is not suspended", req.GetRef().GetName()) } if errors.Is(err, store.ErrPersistenceRetry) { return nil, status.Error(codes.Aborted, "concurrent update conflict, please retry") @@ -52,11 +52,14 @@ func (s *Service) DeleteActor(ctx context.Context, req *ateapipb.DeleteActorRequ } func validateDeleteActorRequest(req *ateapipb.DeleteActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "actor_id is required") } - if err := resources.ValidateActorID(req.GetActorId()); err != nil { + if err := resources.ValidateActorID(req.GetRef().GetName()); err != nil { return status.Error(codes.InvalidArgument, err.Error()) } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } return nil } diff --git a/cmd/ateapi/internal/controlapi/delete_atespace.go b/cmd/ateapi/internal/controlapi/delete_atespace.go new file mode 100644 index 000000000..736690734 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/delete_atespace.go @@ -0,0 +1,55 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "errors" + "fmt" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" + "github.com/agent-substrate/substrate/internal/resources" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Service) DeleteAtespace(ctx context.Context, req *ateapipb.DeleteAtespaceRequest) (*ateapipb.DeleteAtespaceResponse, error) { + if err := validateDeleteAtespaceRequest(req); err != nil { + return nil, err + } + + if err := s.persistence.DeleteAtespace(ctx, req.GetName()); err != nil { + if errors.Is(err, store.ErrNotFound) { + return nil, status.Errorf(codes.NotFound, "Atespace %s not found", req.GetName()) + } + if errors.Is(err, store.ErrFailedPrecondition) { + return nil, status.Errorf(codes.FailedPrecondition, "Atespace %s is not empty", req.GetName()) + } + return nil, fmt.Errorf("while deleting atespace from DB: %w", err) + } + + return &ateapipb.DeleteAtespaceResponse{}, nil +} + +func validateDeleteAtespaceRequest(req *ateapipb.DeleteAtespaceRequest) error { + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "name is required") + } + if err := resources.ValidateAtespace(req.GetName()); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + return nil +} diff --git a/cmd/ateapi/internal/controlapi/functional_test.go b/cmd/ateapi/internal/controlapi/functional_test.go index 13e0d1f59..65a549d1b 100644 --- a/cmd/ateapi/internal/controlapi/functional_test.go +++ b/cmd/ateapi/internal/controlapi/functional_test.go @@ -61,6 +61,8 @@ var ( fakeAtelet = &FakeAteletServer{} ) +const testAtespace = "test-atespace" + func TestMain(m *testing.M) { binaryAssetsDirectory, err := envtestbins.BinaryAssetsDir() if err != nil { @@ -343,6 +345,15 @@ func setupTest(t *testing.T, ns string) *testContext { t.Fatalf("failed to create namespace %s: %v", ns, err) } + // CreateActor now requires the atespace to exist first. + if _, err := client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: testAtespace}); err != nil { + conn.Close() + grpcServer.Stop() + cancel() + mr.Close() + t.Fatalf("failed to seed test atespace %q: %v", testAtespace, err) + } + cleanup := func() { conn.Close() grpcServer.Stop() @@ -388,6 +399,14 @@ func createTemplate(t *testing.T, tc *testContext, ns string) { }) } +// createAtespace creates an atespace via the API. +func createAtespace(t *testing.T, tc *testContext, name string) { + t.Helper() + if _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: name}); err != nil { + t.Fatalf("CreateAtespace(%s) failed: %v", name, err) + } +} + const poolLabelKey = "pool" func createTemplateWithContainers(t *testing.T, tc *testContext, ns string, containers []atev1alpha1.Container) { @@ -619,9 +638,9 @@ func TestCreateActor_Success(t *testing.T) { createTemplate(t, tc, ns) createResp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "free"}}, }) if err != nil { @@ -636,6 +655,7 @@ func TestCreateActor_Success(t *testing.T) { ActorTemplateName: "tmpl1", Status: ateapipb.Actor_STATUS_SUSPENDED, WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "free"}}, + Atespace: testAtespace, }, } @@ -651,9 +671,9 @@ func TestCreateActor_TemplateNotFound(t *testing.T) { defer tc.cleanup() _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "non-existent", - ActorId: "id1", }) assertGrpcError(t, err, codes.FailedPrecondition, fmt.Sprintf("ActorTemplate %s/non-existent not found", ns)) } @@ -667,18 +687,18 @@ func TestCreateActor_Duplicate(t *testing.T) { createTemplate(t, tc, ns) _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("first CreateActor failed: %v", err) } _, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) assertGrpcError(t, err, codes.AlreadyExists, "Actor id1 already exists") } @@ -692,9 +712,9 @@ func TestGetActor_Found(t *testing.T) { createTemplate(t, tc, ns) createResp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -703,7 +723,7 @@ func TestGetActor_Found(t *testing.T) { id := createResp.GetActor().GetActorId() getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -728,7 +748,7 @@ func TestGetActor_NotFound(t *testing.T) { defer tc.cleanup() _, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: "non-existent", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "non-existent"}, }) assertGrpcError(t, err, codes.NotFound, "Actor non-existent not found") } @@ -747,23 +767,23 @@ func TestListActors(t *testing.T) { createTemplate(t, tc, ns) resp1, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor 1 failed: %v", err) } resp2, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id2"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id2", }) if err != nil { t.Fatalf("CreateActor 2 failed: %v", err) } - listResp, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{}) + listResp, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{Atespace: testAtespace}) if err != nil { t.Fatalf("ListActors failed: %v", err) } @@ -789,6 +809,104 @@ func TestListActors(t *testing.T) { } } +// TestListActors_ByAtespace verifies create + list are scoped by atespace end to +// end through the RPC surface: an actor created with a given atespace is only +// returned by ListActors(atespace=X) and only fetched by GetActor(atespace=X). +func TestListActors_ByAtespace(t *testing.T) { + ns := namespaceForTest("ns-list-by-atespace") + tc := setupTest(t, ns) + defer tc.cleanup() + + createTemplate(t, tc, ns) + createAtespace(t, tc, "team-a") + createAtespace(t, tc, "team-b") + + create := func(id, atespace string) *ateapipb.Actor { + resp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: atespace, Name: id}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }) + if err != nil { + t.Fatalf("CreateActor(%s, atespace=%q) failed: %v", id, atespace, err) + } + return resp.GetActor() + } + a1 := create("id1", "team-a") + a2 := create("id2", "team-a") + b1 := create("id3", "team-b") + + sortByID := []cmp.Option{ + protocmp.Transform(), + cmpopts.SortSlices(func(a, b *ateapipb.Actor) bool { return a.ActorId < b.ActorId }), + } + + // List scoped to team-a returns only its actors. + listA, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{Atespace: "team-a"}) + if err != nil { + t.Fatalf("ListActors(team-a) failed: %v", err) + } + if diff := cmp.Diff([]*ateapipb.Actor{a1, a2}, listA.GetActors(), sortByID...); diff != "" { + t.Errorf("ListActors(team-a) mismatch (-want +got):\n%s", diff) + } + + // List scoped to team-b returns only its actor. + listB, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{Atespace: "team-b"}) + if err != nil { + t.Fatalf("ListActors(team-b) failed: %v", err) + } + if diff := cmp.Diff([]*ateapipb.Actor{b1}, listB.GetActors(), sortByID...); diff != "" { + t.Errorf("ListActors(team-b) mismatch (-want +got):\n%s", diff) + } + + // Get is scoped: the right atespace hits, the empty atespace misses (deny-across by key). + if _, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: "team-a", Name: "id1"}}); err != nil { + t.Errorf("GetActor(id1, team-a) failed: %v", err) + } + _, err = tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}) + assertGrpcError(t, err, codes.NotFound, "Actor id1 not found") +} + +// TestListActors_AllAtespaces verifies that an empty atespace lists actors across +// all atespaces (the `-A` / admin view), unlike the scoped single-tenant listing. +func TestListActors_AllAtespaces(t *testing.T) { + ns := namespaceForTest("ns-list-all-atespaces") + tc := setupTest(t, ns) + defer tc.cleanup() + + createTemplate(t, tc, ns) + createAtespace(t, tc, "team-a") + createAtespace(t, tc, "team-b") + + create := func(id, atespace string) { + if _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: atespace, Name: id}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }); err != nil { + t.Fatalf("CreateActor(%s, atespace=%q) failed: %v", id, atespace, err) + } + } + create("id1", "team-a") + create("id2", "team-b") + + // Empty atespace lists across all atespaces; returned actors carry their atespace. + resp, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{}) + if err != nil { + t.Fatalf("ListActors(all) failed: %v", err) + } + got := map[string]string{} + for _, a := range resp.GetActors() { + got[a.GetActorId()] = a.GetAtespace() + } + if got["id1"] != "team-a" { + t.Errorf("ListActors(all): got[id1]=%q, want team-a", got["id1"]) + } + if got["id2"] != "team-b" { + t.Errorf("ListActors(all): got[id2]=%q, want team-b", got["id2"]) + } +} + // TestListActors_Pagination tests that ListActors correctly paginates results. func TestListActors_Pagination(t *testing.T) { ns := namespaceForTest("ns-list-actors-pagination") @@ -800,9 +918,9 @@ func TestListActors_Pagination(t *testing.T) { var want []*ateapipb.Actor for i := 0; i < 5; i++ { resp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: fmt.Sprintf("id%d", i)}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: fmt.Sprintf("id%d", i), }) if err != nil { t.Fatalf("CreateActor %d failed: %v", i, err) @@ -815,6 +933,7 @@ func TestListActors_Pagination(t *testing.T) { for { listResp, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{ + Atespace: testAtespace, PageSize: 2, PageToken: pageToken, }) @@ -852,6 +971,7 @@ func TestListActors_PageSizeValidation(t *testing.T) { // 1. Negative page size _, err := tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{ + Atespace: testAtespace, PageSize: -1, }) if status.Code(err) != codes.InvalidArgument { @@ -860,6 +980,7 @@ func TestListActors_PageSizeValidation(t *testing.T) { // 2. Page size exceeding maxPageSize (1000) _, err = tc.client.ListActors(context.Background(), &ateapipb.ListActorsRequest{ + Atespace: testAtespace, PageSize: 1001, }) if status.Code(err) != codes.InvalidArgument { @@ -927,9 +1048,9 @@ func TestResumeActor(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -937,7 +1058,7 @@ func TestResumeActor(t *testing.T) { id := "id1" _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) @@ -948,7 +1069,7 @@ func TestResumeActor(t *testing.T) { } getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -956,6 +1077,7 @@ func TestResumeActor(t *testing.T) { want := &ateapipb.GetActorResponse{ Actor: &ateapipb.Actor{ ActorId: id, + Atespace: testAtespace, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", Status: ateapipb.Actor_STATUS_RUNNING, @@ -992,6 +1114,7 @@ func TestResumeActor(t *testing.T) { ActorNamespace: ns, ActorTemplate: "tmpl1", ActorId: id, + ActorAtespace: testAtespace, Ip: "127.0.0.1", NodeName: "node1", } @@ -1044,15 +1167,15 @@ func TestResumeActorResolvesValueFromEnv(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err = tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) } _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) @@ -1098,9 +1221,9 @@ func TestResumeActor_NoWorkers(t *testing.T) { createTemplate(t, tc, ns) createResp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1109,7 +1232,7 @@ func TestResumeActor_NoWorkers(t *testing.T) { id := createResp.GetActor().GetActorId() _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) assertGrpcError(t, err, codes.FailedPrecondition, "no free workers available") } @@ -1128,16 +1251,16 @@ func TestResumeActor_NoEligiblePool(t *testing.T) { }) createResp, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) } _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: createResp.GetActor().GetActorId(), + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: createResp.GetActor().GetActorId()}, }) assertGrpcError(t, err, codes.FailedPrecondition, "no worker pool matches the template's sandboxClass and the template/actor selectors") } @@ -1160,9 +1283,9 @@ func TestResumeActor_MultiPoolSelector(t *testing.T) { createWorkerPod(t, tc, ns, "worker-b", "node1", "pool-b") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "b"}, }, @@ -1171,12 +1294,12 @@ func TestResumeActor_MultiPoolSelector(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: "id1"}) + _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}) if err != nil { t.Fatalf("ResumeActor failed: %v", err) } - getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: "id1"}) + getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1211,9 +1334,9 @@ func TestResumeActor_RequiresBothSelectorsToMatch(t *testing.T) { createWorkerPod(t, tc, ns, "worker-actor-only", "node1", "pool-actor-only") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "b"}, }, @@ -1222,11 +1345,11 @@ func TestResumeActor_RequiresBothSelectorsToMatch(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: "id1"}); err != nil { + if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}); err != nil { t.Fatalf("ResumeActor failed: %v", err) } - getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: "id1"}) + getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1256,9 +1379,9 @@ func TestResumeActor_Reentrancy(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1269,14 +1392,14 @@ func TestResumeActor_Reentrancy(t *testing.T) { tc.fakeAtelet.FailRestore = fmt.Errorf("mock atelet failure") _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err == nil { t.Fatalf("expected ResumeActor to fail due to atelet error") } // Verify actor state is RESUMING in Redis! - actor, err := tc.persistence.GetActor(context.Background(), id) + actor, err := tc.persistence.GetActor(context.Background(), testAtespace, id) if err != nil { t.Fatalf("failed to get actor from store: %v", err) } @@ -1289,7 +1412,7 @@ func TestResumeActor_Reentrancy(t *testing.T) { tc.fakeAtelet.RestoreCalled = false // reset for verification _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed on retry: %v", err) @@ -1300,7 +1423,7 @@ func TestResumeActor_Reentrancy(t *testing.T) { } // Verify actor state is RUNNING! - actor, err = tc.persistence.GetActor(context.Background(), id) + actor, err = tc.persistence.GetActor(context.Background(), testAtespace, id) if err != nil { t.Fatalf("failed to get actor from store: %v", err) } @@ -1329,9 +1452,9 @@ func TestSuspendActor(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1340,7 +1463,7 @@ func TestSuspendActor(t *testing.T) { // Resume first to make it running _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) @@ -1348,7 +1471,7 @@ func TestSuspendActor(t *testing.T) { // Suspend _, err = tc.client.SuspendActor(context.Background(), &ateapipb.SuspendActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("SuspendActor failed: %v", err) @@ -1359,7 +1482,7 @@ func TestSuspendActor(t *testing.T) { } getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -1367,6 +1490,7 @@ func TestSuspendActor(t *testing.T) { want := &ateapipb.GetActorResponse{ Actor: &ateapipb.Actor{ ActorId: id, + Atespace: testAtespace, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", Status: ateapipb.Actor_STATUS_SUSPENDED, @@ -1413,9 +1537,9 @@ func TestPauseActor(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1424,7 +1548,7 @@ func TestPauseActor(t *testing.T) { // Resume first to make it running _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) @@ -1432,7 +1556,7 @@ func TestPauseActor(t *testing.T) { // Pause _, err = tc.client.PauseActor(context.Background(), &ateapipb.PauseActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("PauseActor failed: %v", err) @@ -1443,7 +1567,7 @@ func TestPauseActor(t *testing.T) { } getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -1451,6 +1575,7 @@ func TestPauseActor(t *testing.T) { want := &ateapipb.GetActorResponse{ Actor: &ateapipb.Actor{ ActorId: id, + Atespace: testAtespace, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", Status: ateapipb.Actor_STATUS_PAUSED, @@ -1488,9 +1613,9 @@ func TestUpdateActor_Success(t *testing.T) { createTemplate(t, tc, ns) _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "free"}, }, @@ -1500,7 +1625,7 @@ func TestUpdateActor_Success(t *testing.T) { } updateResp, err := tc.client.UpdateActor(context.Background(), &ateapipb.UpdateActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "paid"}, }, @@ -1512,6 +1637,7 @@ func TestUpdateActor_Success(t *testing.T) { wantActor := &ateapipb.Actor{ ActorId: "id1", Version: 2, + Atespace: testAtespace, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", Status: ateapipb.Actor_STATUS_SUSPENDED, @@ -1524,7 +1650,7 @@ func TestUpdateActor_Success(t *testing.T) { t.Errorf("UpdateActor response mismatch (-want +got):\n%s", diff) } - getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: "id1"}) + getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1539,7 +1665,7 @@ func TestUpdateActor_NotFound(t *testing.T) { tc := setupTest(t, ns) defer tc.cleanup() - _, err := tc.client.UpdateActor(context.Background(), &ateapipb.UpdateActorRequest{ActorId: "does-not-exist"}) + _, err := tc.client.UpdateActor(context.Background(), &ateapipb.UpdateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "does-not-exist"}}) assertGrpcError(t, err, codes.NotFound, "Actor does-not-exist not found") } @@ -1572,9 +1698,9 @@ func TestResumeActor_ReleasesStaleWorkerWhenPoolBecomesIneligible(t *testing.T) id := "id1" _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: id, WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "a"}}, }) if err != nil { @@ -1582,24 +1708,24 @@ func TestResumeActor_ReleasesStaleWorkerWhenPoolBecomesIneligible(t *testing.T) } tc.fakeAtelet.FailRun = fmt.Errorf("mock atelet failure") - _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: id}) + _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}) if err == nil { t.Fatalf("expected first ResumeActor (onto worker-a) to fail") } tc.fakeAtelet.FailRun = nil if _, err := tc.client.UpdateActor(context.Background(), &ateapipb.UpdateActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "b"}}, }); err != nil { t.Fatalf("UpdateActor failed: %v", err) } - if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: id}); err != nil { + if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}); err != nil { t.Fatalf("second ResumeActor failed: %v", err) } - getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: id}) + getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1660,9 +1786,9 @@ func TestUpdateActor_ReassignsPoolAcrossSuspendResume(t *testing.T) { id := "id1" _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: id, WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "a"}, }, @@ -1671,11 +1797,11 @@ func TestUpdateActor_ReassignsPoolAcrossSuspendResume(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: id}); err != nil { + if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}); err != nil { t.Fatalf("first ResumeActor failed: %v", err) } - getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: id}) + getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1687,7 +1813,7 @@ func TestUpdateActor_ReassignsPoolAcrossSuspendResume(t *testing.T) { } if _, err := tc.client.UpdateActor(context.Background(), &ateapipb.UpdateActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, WorkerSelector: &ateapipb.Selector{ MatchLabels: map[string]string{"tier": "b"}, }, @@ -1695,14 +1821,14 @@ func TestUpdateActor_ReassignsPoolAcrossSuspendResume(t *testing.T) { t.Fatalf("UpdateActor failed: %v", err) } - if _, err := tc.client.SuspendActor(context.Background(), &ateapipb.SuspendActorRequest{ActorId: id}); err != nil { + if _, err := tc.client.SuspendActor(context.Background(), &ateapipb.SuspendActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}); err != nil { t.Fatalf("SuspendActor failed: %v", err) } - if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ActorId: id}); err != nil { + if _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}); err != nil { t.Fatalf("second ResumeActor failed: %v", err) } - getResp, err = tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ActorId: id}) + getResp, err = tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}}) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -1735,37 +1861,37 @@ func TestValidation(t *testing.T) { }{ { "missing namespace", - &ateapipb.CreateActorRequest{ActorTemplateName: "tmpl1", ActorId: "id1"}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateName: "tmpl1"}, "actor_template_namespace is required"}, { "missing template name", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorId: "id1"}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: "ns1"}, "actor_template_name is required"}, { "missing actor id", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1"}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1"}, "actor_id is required"}, { "invalid actor id (capitals)", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", ActorId: "ID1"}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "ID1"}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1"}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, { "invalid actor id (special chars)", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", ActorId: "id_1"}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id_1"}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1"}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, { "invalid worker_selector label key", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"bad key!": "x"}}}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"bad key!": "x"}}}, `invalid worker_selector label key "bad key!": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')`, }, { "invalid worker_selector label value", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "not valid!"}}}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "not valid!"}}}, `invalid worker_selector label value "not valid!" for key "tier": a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')`, }, { "too many worker_selector match_labels", - &ateapipb.CreateActorRequest{ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: selectorLabelsOfSize(11)}}, + &ateapipb.CreateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: "ns1", ActorTemplateName: "tmpl1", WorkerSelector: &ateapipb.Selector{MatchLabels: selectorLabelsOfSize(11)}}, "worker_selector has 11 match_labels entries, exceeding the limit of 10", }, } @@ -1783,7 +1909,7 @@ func TestValidation(t *testing.T) { req *ateapipb.GetActorRequest wantMsg string }{ - {"missing id", &ateapipb.GetActorRequest{}, "id is required"}, + {"missing id", &ateapipb.GetActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}}, "id is required"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1799,7 +1925,7 @@ func TestValidation(t *testing.T) { req *ateapipb.ResumeActorRequest wantMsg string }{ - {"missing id", &ateapipb.ResumeActorRequest{}, "id is required"}, + {"missing id", &ateapipb.ResumeActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}}, "id is required"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1815,7 +1941,7 @@ func TestValidation(t *testing.T) { req *ateapipb.SuspendActorRequest wantMsg string }{ - {"missing id", &ateapipb.SuspendActorRequest{}, "id is required"}, + {"missing id", &ateapipb.SuspendActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}}, "id is required"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1831,20 +1957,20 @@ func TestValidation(t *testing.T) { req *ateapipb.UpdateActorRequest wantMsg string }{ - {"missing id", &ateapipb.UpdateActorRequest{}, "actor_id is required"}, + {"missing id", &ateapipb.UpdateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}}, "actor_id is required"}, { "invalid worker_selector label key", - &ateapipb.UpdateActorRequest{ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"bad key!": "x"}}}, + &ateapipb.UpdateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"bad key!": "x"}}}, `invalid worker_selector label key "bad key!": name part must consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyName', or 'my.name', or '123-abc', regex used for validation is '([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9]')`, }, { "invalid worker_selector label value", - &ateapipb.UpdateActorRequest{ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "not valid!"}}}, + &ateapipb.UpdateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, WorkerSelector: &ateapipb.Selector{MatchLabels: map[string]string{"tier": "not valid!"}}}, `invalid worker_selector label value "not valid!" for key "tier": a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')`, }, { "too many worker_selector match_labels", - &ateapipb.UpdateActorRequest{ActorId: "id1", WorkerSelector: &ateapipb.Selector{MatchLabels: selectorLabelsOfSize(11)}}, + &ateapipb.UpdateActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, WorkerSelector: &ateapipb.Selector{MatchLabels: selectorLabelsOfSize(11)}}, "worker_selector has 11 match_labels entries, exceeding the limit of 10", }, } @@ -1862,9 +1988,9 @@ func TestValidation(t *testing.T) { req *ateapipb.DeleteActorRequest wantMsg string }{ - {"missing id", &ateapipb.DeleteActorRequest{}, "actor_id is required"}, - {"invalid actor id (capitals)", &ateapipb.DeleteActorRequest{ActorId: "ID1"}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, - {"invalid actor id (special chars)", &ateapipb.DeleteActorRequest{ActorId: "id_1"}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, + {"missing id", &ateapipb.DeleteActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace}}, "actor_id is required"}, + {"invalid actor id (capitals)", &ateapipb.DeleteActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "ID1"}}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, + {"invalid actor id (special chars)", &ateapipb.DeleteActorRequest{Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id_1"}}, "invalid actor_id: must start and end with a lower case alphanumeric character, and consist only of lower case alphanumeric characters or '-'"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -1885,9 +2011,9 @@ func TestResumeActor_LockConflict(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1901,7 +2027,7 @@ func TestResumeActor_LockConflict(t *testing.T) { errChan := make(chan error, 1) go func() { _, err := tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) errChan <- err }() @@ -1911,7 +2037,7 @@ func TestResumeActor_LockConflict(t *testing.T) { // Launch Request B (should fail due to lock conflict) _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) assertGrpcError(t, err, codes.Aborted, "another operation is in progress for this actor") @@ -1932,9 +2058,9 @@ func TestResumeActor_DanglingWorker(t *testing.T) { createWorkerPod(t, tc, ns, "worker-a", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -1946,7 +2072,7 @@ func TestResumeActor_DanglingWorker(t *testing.T) { // 3. Call ResumeActor -> Expect failure _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err == nil { t.Fatalf("expected ResumeActor to fail due to atelet error") @@ -1954,7 +2080,7 @@ func TestResumeActor_DanglingWorker(t *testing.T) { // Verify actor state is RESUMING with worker A assigned getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -1978,7 +2104,7 @@ func TestResumeActor_DanglingWorker(t *testing.T) { // 8. Call ResumeActor again -> Expect success and picking Worker B! _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed on retry: %v", err) @@ -1989,7 +2115,7 @@ func TestResumeActor_DanglingWorker(t *testing.T) { } // Verify actor state is RUNNING with worker B assigned - actor, err = tc.persistence.GetActor(context.Background(), id) + actor, err = tc.persistence.GetActor(context.Background(), testAtespace, id) if err != nil { t.Fatalf("failed to get actor from store: %v", err) } @@ -2012,9 +2138,9 @@ func TestSuspendActor_DanglingWorker(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) @@ -2023,7 +2149,7 @@ func TestSuspendActor_DanglingWorker(t *testing.T) { // Resume first to make it running _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) @@ -2032,14 +2158,14 @@ func TestSuspendActor_DanglingWorker(t *testing.T) { deleteWorkerPod(t, tc, ns, "worker-1") // 3. Call SuspendActor -> Should succeed (our fix skips missing pod execution) - actors, _, _ := tc.persistence.ListActors(context.Background(), maxPageSize, "") + actors, _, _ := tc.persistence.ListActors(context.Background(), testAtespace, maxPageSize, "") t.Logf("Actors in Redis before Suspend: %d", len(actors)) for _, a := range actors { t.Logf(" Actor: %s/%s/%s", a.GetActorTemplateNamespace(), a.GetActorTemplateName(), a.GetActorId()) } _, err = tc.client.SuspendActor(context.Background(), &ateapipb.SuspendActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("SuspendActor failed: %v", err) @@ -2047,7 +2173,7 @@ func TestSuspendActor_DanglingWorker(t *testing.T) { // 4. Verify it becomes SUSPENDED in Redis getResp, err := tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: id, + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: id}, }) if err != nil { t.Fatalf("GetActor failed: %v", err) @@ -2068,23 +2194,23 @@ func TestDeleteActor_Success(t *testing.T) { createTemplate(t, tc, ns) _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) } _, err = tc.client.DeleteActor(context.Background(), &ateapipb.DeleteActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, }) if err != nil { t.Fatalf("DeleteActor failed: %v", err) } _, err = tc.client.GetActor(context.Background(), &ateapipb.GetActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, }) assertGrpcError(t, err, codes.NotFound, "Actor id1 not found") } @@ -2098,23 +2224,23 @@ func TestDeleteActor_NotSuspended(t *testing.T) { createWorkerPod(t, tc, ns, "worker-1", "node1", "pool1") _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl1", - ActorId: "id1", }) if err != nil { t.Fatalf("CreateActor failed: %v", err) } _, err = tc.client.ResumeActor(context.Background(), &ateapipb.ResumeActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, }) if err != nil { t.Fatalf("ResumeActor failed: %v", err) } _, err = tc.client.DeleteActor(context.Background(), &ateapipb.DeleteActorRequest{ - ActorId: "id1", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "id1"}, }) assertGrpcError(t, err, codes.FailedPrecondition, "Actor id1 is not suspended (status: STATUS_RUNNING)") } @@ -2125,7 +2251,7 @@ func TestDeleteActor_NotFound(t *testing.T) { defer tc.cleanup() _, err := tc.client.DeleteActor(context.Background(), &ateapipb.DeleteActorRequest{ - ActorId: "non-existent", + Ref: &ateapipb.ActorReference{Atespace: testAtespace, Name: "non-existent"}, }) assertGrpcError(t, err, codes.NotFound, "Actor non-existent not found") } @@ -2146,3 +2272,221 @@ func assertGrpcError(t *testing.T, err error, wantCode codes.Code, wantMsg strin t.Errorf("expected message %q, got %q", wantMsg, st.Message()) } } + +func TestCreateActor_AtespaceNotFound(t *testing.T) { + ns := namespaceForTest("ns-create-actor-no-atespace") + tc := setupTest(t, ns) + defer tc.cleanup() + createTemplate(t, tc, ns) + + // The template exists, but "missing-as" was never created. The template + // check fires first, so reaching this error proves the atespace check ran. + _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: "missing-as", Name: "id1"}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }) + assertGrpcError(t, err, codes.FailedPrecondition, "Atespace missing-as not found") +} + +func TestCreateAtespace_Success(t *testing.T) { + ns := namespaceForTest("ns-create-atespace") + tc := setupTest(t, ns) + defer tc.cleanup() + createTemplate(t, tc, ns) + + resp, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}) + if err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + got := resp.GetAtespace() + if got.GetName() != "team-a" { + t.Errorf("Name = %q, want team-a", got.GetName()) + } + + // An actor can now be created into the new atespace. + if _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: "team-a", Name: "id1"}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }); err != nil { + t.Errorf("CreateActor into freshly created atespace failed: %v", err) + } +} + +func TestCreateAtespace_AlreadyExists(t *testing.T) { + ns := namespaceForTest("ns-create-atespace-dup") + tc := setupTest(t, ns) + defer tc.cleanup() + + if _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}); err != nil { + t.Fatalf("first CreateAtespace failed: %v", err) + } + _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}) + assertGrpcError(t, err, codes.AlreadyExists, "Atespace team-a already exists") +} + +func TestCreateAtespace_Validation(t *testing.T) { + ns := namespaceForTest("ns-create-atespace-validation") + tc := setupTest(t, ns) + defer tc.cleanup() + + _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: ""}) + assertGrpcError(t, err, codes.InvalidArgument, "name is required") + + // Invalid names — uppercase/underscore plus Redis-key/SCAN metacharacters — + // are rejected by ValidateAtespace before any key is built (injection guard). + for _, bad := range []string{"Team_A", "a*", "a:b", "a/b"} { + _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: bad}) + if status.Code(err) != codes.InvalidArgument { + t.Errorf("CreateAtespace(%q): got code %v, want InvalidArgument (err=%v)", bad, status.Code(err), err) + } + } +} + +func TestGetAtespace_Found(t *testing.T) { + ns := namespaceForTest("ns-get-atespace") + tc := setupTest(t, ns) + defer tc.cleanup() + + created, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}) + if err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + resp, err := tc.client.GetAtespace(context.Background(), &ateapipb.GetAtespaceRequest{Name: "team-a"}) + if err != nil { + t.Fatalf("GetAtespace failed: %v", err) + } + if diff := cmp.Diff(created.GetAtespace(), resp.GetAtespace(), protocmp.Transform()); diff != "" { + t.Errorf("GetAtespace mismatch (-created +got):\n%s", diff) + } +} + +func TestGetAtespace_NotFound(t *testing.T) { + ns := namespaceForTest("ns-get-atespace-missing") + tc := setupTest(t, ns) + defer tc.cleanup() + + _, err := tc.client.GetAtespace(context.Background(), &ateapipb.GetAtespaceRequest{Name: "nope"}) + assertGrpcError(t, err, codes.NotFound, "Atespace nope not found") +} + +func TestListAtespaces(t *testing.T) { + ns := namespaceForTest("ns-list-atespaces") + tc := setupTest(t, ns) + defer tc.cleanup() + + for _, n := range []string{"team-a", "team-b"} { + if _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: n}); err != nil { + t.Fatalf("CreateAtespace(%s) failed: %v", n, err) + } + } + resp, err := tc.client.ListAtespaces(context.Background(), &ateapipb.ListAtespacesRequest{}) + if err != nil { + t.Fatalf("ListAtespaces failed: %v", err) + } + got := map[string]bool{} + for _, a := range resp.GetAtespaces() { + got[a.GetName()] = true + } + // setupTest seeds testAtespace; team-a and team-b were created above. + for _, n := range []string{testAtespace, "team-a", "team-b"} { + if !got[n] { + t.Errorf("ListAtespaces missing %q; got %v", n, got) + } + } +} + +func TestDeleteAtespace_Empty_Success(t *testing.T) { + ns := namespaceForTest("ns-delete-atespace-empty") + tc := setupTest(t, ns) + defer tc.cleanup() + + if _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: "team-a"}); err != nil { + t.Fatalf("DeleteAtespace failed: %v", err) + } + _, err := tc.client.GetAtespace(context.Background(), &ateapipb.GetAtespaceRequest{Name: "team-a"}) + assertGrpcError(t, err, codes.NotFound, "Atespace team-a not found") +} + +func TestDeleteAtespace_NonEmpty_Rejected(t *testing.T) { + ns := namespaceForTest("ns-delete-atespace-nonempty") + tc := setupTest(t, ns) + defer tc.cleanup() + createTemplate(t, tc, ns) + + if _, err := tc.client.CreateAtespace(context.Background(), &ateapipb.CreateAtespaceRequest{Name: "team-a"}); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: "team-a", Name: "id1"}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }); err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: "team-a"}) + assertGrpcError(t, err, codes.FailedPrecondition, "Atespace team-a is not empty") + // The atespace must survive a rejected delete. + if _, err := tc.client.GetAtespace(context.Background(), &ateapipb.GetAtespaceRequest{Name: "team-a"}); err != nil { + t.Errorf("atespace should survive a rejected delete, got %v", err) + } +} + +// TestDeleteAtespace_ScopedToTargetAtespace pins (at the RPC layer) that the +// emptiness check is scoped to the target atespace: deleting an empty atespace +// succeeds even when a different atespace holds actors. +func TestDeleteAtespace_ScopedToTargetAtespace(t *testing.T) { + ns := namespaceForTest("ns-delete-atespace-scoped") + tc := setupTest(t, ns) + defer tc.cleanup() + createTemplate(t, tc, ns) + createAtespace(t, tc, "team-a") + createAtespace(t, tc, "team-b") + + // Actor only in team-b. + if _, err := tc.client.CreateActor(context.Background(), &ateapipb.CreateActorRequest{ + Ref: &ateapipb.ActorReference{Atespace: "team-b", Name: "id1"}, + ActorTemplateNamespace: ns, + ActorTemplateName: "tmpl1", + }); err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + + // Empty team-a deletes fine despite team-b holding an actor. + if _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: "team-a"}); err != nil { + t.Errorf("DeleteAtespace(team-a, empty) failed: %v", err) + } + // team-b is still non-empty → rejected. + _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: "team-b"}) + assertGrpcError(t, err, codes.FailedPrecondition, "Atespace team-b is not empty") +} + +func TestDeleteAtespace_NotFound(t *testing.T) { + ns := namespaceForTest("ns-delete-atespace-missing") + tc := setupTest(t, ns) + defer tc.cleanup() + + _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: "nope"}) + assertGrpcError(t, err, codes.NotFound, "Atespace nope not found") +} + +func TestDeleteAtespace_Validation(t *testing.T) { + ns := namespaceForTest("ns-delete-atespace-validation") + tc := setupTest(t, ns) + defer tc.cleanup() + + _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: ""}) + assertGrpcError(t, err, codes.InvalidArgument, "name is required") + + // Metacharacter names are rejected before the emptiness glob scan ever runs. + for _, bad := range []string{"a*", "a:b"} { + _, err := tc.client.DeleteAtespace(context.Background(), &ateapipb.DeleteAtespaceRequest{Name: bad}) + if status.Code(err) != codes.InvalidArgument { + t.Errorf("DeleteAtespace(%q): got code %v, want InvalidArgument", bad, status.Code(err)) + } + } +} diff --git a/cmd/ateapi/internal/controlapi/get_actor.go b/cmd/ateapi/internal/controlapi/get_actor.go index d2b5cc637..d23d4fe59 100644 --- a/cmd/ateapi/internal/controlapi/get_actor.go +++ b/cmd/ateapi/internal/controlapi/get_actor.go @@ -29,9 +29,9 @@ func (s *Service) GetActor(ctx context.Context, req *ateapipb.GetActorRequest) ( if err := validateGetActorRequest(req); err != nil { return nil, err } - actor, err := s.persistence.GetActor(ctx, req.GetActorId()) + actor, err := s.persistence.GetActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()) if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } else if err != nil { return nil, fmt.Errorf("while getting actor from DB: %w", err) } @@ -41,8 +41,11 @@ func (s *Service) GetActor(ctx context.Context, req *ateapipb.GetActorRequest) ( } func validateGetActorRequest(req *ateapipb.GetActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "id is required") } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } return nil } diff --git a/cmd/ateapi/internal/controlapi/get_atespace.go b/cmd/ateapi/internal/controlapi/get_atespace.go new file mode 100644 index 000000000..8d6c52508 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/get_atespace.go @@ -0,0 +1,52 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "errors" + "fmt" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" + "github.com/agent-substrate/substrate/internal/resources" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Service) GetAtespace(ctx context.Context, req *ateapipb.GetAtespaceRequest) (*ateapipb.GetAtespaceResponse, error) { + if err := validateGetAtespaceRequest(req); err != nil { + return nil, err + } + + atespace, err := s.persistence.GetAtespace(ctx, req.GetName()) + if errors.Is(err, store.ErrNotFound) { + return nil, status.Errorf(codes.NotFound, "Atespace %s not found", req.GetName()) + } else if err != nil { + return nil, fmt.Errorf("while getting atespace from DB: %w", err) + } + + return &ateapipb.GetAtespaceResponse{Atespace: atespace}, nil +} + +func validateGetAtespaceRequest(req *ateapipb.GetAtespaceRequest) error { + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "name is required") + } + if err := resources.ValidateAtespace(req.GetName()); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + return nil +} diff --git a/cmd/ateapi/internal/controlapi/list_actors.go b/cmd/ateapi/internal/controlapi/list_actors.go index 7f863990c..4c09e8c16 100644 --- a/cmd/ateapi/internal/controlapi/list_actors.go +++ b/cmd/ateapi/internal/controlapi/list_actors.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/agent-substrate/substrate/internal/resources" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -34,7 +35,7 @@ func (s *Service) ListActors(ctx context.Context, req *ateapipb.ListActorsReques pageSize = maxPageSize } - actors, nextToken, err := s.persistence.ListActors(ctx, pageSize, req.GetPageToken()) + actors, nextToken, err := s.persistence.ListActors(ctx, req.GetAtespace(), pageSize, req.GetPageToken()) if err != nil { return nil, fmt.Errorf("while listing actors in db: %w", err) } @@ -45,6 +46,13 @@ func (s *Service) ListActors(ctx context.Context, req *ateapipb.ListActorsReques } func validateListActorsRequest(req *ateapipb.ListActorsRequest) error { + // An empty atespace is allowed here and means "all atespaces"(used by `kubectl ate get actors -A`). + // A non-empty atespace is validated and scopes the listing to that tenant. + if req.GetAtespace() != "" { + if err := resources.ValidateAtespace(req.GetAtespace()); err != nil { + return err + } + } pageSize := req.GetPageSize() if pageSize < 0 { return fmt.Errorf("page_size cannot be negative") diff --git a/cmd/ateapi/internal/controlapi/list_atespaces.go b/cmd/ateapi/internal/controlapi/list_atespaces.go new file mode 100644 index 000000000..27b15e31e --- /dev/null +++ b/cmd/ateapi/internal/controlapi/list_atespaces.go @@ -0,0 +1,30 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "fmt" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" +) + +func (s *Service) ListAtespaces(ctx context.Context, req *ateapipb.ListAtespacesRequest) (*ateapipb.ListAtespacesResponse, error) { + atespaces, err := s.persistence.ListAtespaces(ctx) + if err != nil { + return nil, fmt.Errorf("while listing atespaces in db: %w", err) + } + return &ateapipb.ListAtespacesResponse{Atespaces: atespaces}, nil +} diff --git a/cmd/ateapi/internal/controlapi/pause_actor.go b/cmd/ateapi/internal/controlapi/pause_actor.go index 74f023298..3b49167b3 100644 --- a/cmd/ateapi/internal/controlapi/pause_actor.go +++ b/cmd/ateapi/internal/controlapi/pause_actor.go @@ -29,13 +29,13 @@ func (s *Service) PauseActor(ctx context.Context, req *ateapipb.PauseActorReques return nil, err } - actor, err := s.actorWorkflow.PauseActor(ctx, req.GetActorId()) + actor, err := s.actorWorkflow.PauseActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()) if err != nil { if errors.Is(err, store.ErrPersistenceRetry) { return nil, status.Error(codes.Aborted, "concurrent update conflict, please retry") } if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } return nil, err } @@ -44,8 +44,11 @@ func (s *Service) PauseActor(ctx context.Context, req *ateapipb.PauseActorReques } func validatePauseActorRequest(req *ateapipb.PauseActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "id is required") } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } return nil } diff --git a/cmd/ateapi/internal/controlapi/resume_actor.go b/cmd/ateapi/internal/controlapi/resume_actor.go index 671afb6ed..d810a77c3 100644 --- a/cmd/ateapi/internal/controlapi/resume_actor.go +++ b/cmd/ateapi/internal/controlapi/resume_actor.go @@ -29,13 +29,13 @@ func (s *Service) ResumeActor(ctx context.Context, req *ateapipb.ResumeActorRequ return nil, err } - actor, err := s.actorWorkflow.ResumeActor(ctx, req.GetActorId(), req.GetBoot()) + actor, err := s.actorWorkflow.ResumeActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName(), req.GetBoot()) if err != nil { if errors.Is(err, store.ErrPersistenceRetry) { return nil, status.Error(codes.Aborted, "concurrent update conflict, please retry") } if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } return nil, err } @@ -44,8 +44,11 @@ func (s *Service) ResumeActor(ctx context.Context, req *ateapipb.ResumeActorRequ } func validateResumeActorRequest(req *ateapipb.ResumeActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "id is required") } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } return nil } diff --git a/cmd/ateapi/internal/controlapi/suspend_actor.go b/cmd/ateapi/internal/controlapi/suspend_actor.go index c8a88b505..c7ee31fa9 100644 --- a/cmd/ateapi/internal/controlapi/suspend_actor.go +++ b/cmd/ateapi/internal/controlapi/suspend_actor.go @@ -29,13 +29,13 @@ func (s *Service) SuspendActor(ctx context.Context, req *ateapipb.SuspendActorRe return nil, err } - actor, err := s.actorWorkflow.SuspendActor(ctx, req.GetActorId()) + actor, err := s.actorWorkflow.SuspendActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()) if err != nil { if errors.Is(err, store.ErrPersistenceRetry) { return nil, status.Error(codes.Aborted, "concurrent update conflict, please retry") } if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } return nil, err } @@ -44,8 +44,11 @@ func (s *Service) SuspendActor(ctx context.Context, req *ateapipb.SuspendActorRe } func validateSuspendActorRequest(req *ateapipb.SuspendActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "id is required") } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } return nil } diff --git a/cmd/ateapi/internal/controlapi/syncer.go b/cmd/ateapi/internal/controlapi/syncer.go index f0501387d..c16c8dd5e 100644 --- a/cmd/ateapi/internal/controlapi/syncer.go +++ b/cmd/ateapi/internal/controlapi/syncer.go @@ -170,7 +170,7 @@ func (s *WorkerPoolSyncer) releaseActorOnDeadWorker(ctx context.Context, namespa if worker.GetActorId() == "" { return nil } - actor, err := s.persistence.GetActor(ctx, worker.GetActorId()) + actor, err := s.persistence.GetActor(ctx, worker.GetActorAtespace(), worker.GetActorId()) if err != nil { if errors.Is(err, store.ErrNotFound) { return nil diff --git a/cmd/ateapi/internal/controlapi/syncer_test.go b/cmd/ateapi/internal/controlapi/syncer_test.go index c05b6f110..2672c11a8 100644 --- a/cmd/ateapi/internal/controlapi/syncer_test.go +++ b/cmd/ateapi/internal/controlapi/syncer_test.go @@ -180,7 +180,7 @@ func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) { } actorID := "actor-orphan" if err := persistence.CreateActor(ctx, &ateapipb.Actor{ - ActorId: actorID, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl", + ActorId: actorID, Atespace: "team-orphan", ActorTemplateNamespace: ns, ActorTemplateName: "tmpl", Status: ateapipb.Actor_STATUS_RUNNING, AteomPodNamespace: ns, AteomPodName: pod, AteomPodIp: ip, InProgressSnapshot: "gs://snapshots/partial", @@ -196,7 +196,7 @@ func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) { t.Fatalf("create actor: %v", err) } w, _ := persistence.GetWorker(ctx, ns, pool, pod) - w.ActorId, w.ActorNamespace, w.ActorTemplate = actorID, ns, "tmpl" + w.ActorId, w.ActorNamespace, w.ActorTemplate, w.ActorAtespace = actorID, ns, "tmpl", "team-orphan" if err := persistence.UpdateWorker(ctx, w, w.Version); err != nil { t.Fatalf("update worker: %v", err) } @@ -206,7 +206,7 @@ func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) { } var got *ateapipb.Actor if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) { - a, gerr := persistence.GetActor(c, actorID) + a, gerr := persistence.GetActor(c, "team-orphan", actorID) if gerr != nil { return false, gerr } diff --git a/cmd/ateapi/internal/controlapi/update_actor.go b/cmd/ateapi/internal/controlapi/update_actor.go index f45e5a338..f48babf8b 100644 --- a/cmd/ateapi/internal/controlapi/update_actor.go +++ b/cmd/ateapi/internal/controlapi/update_actor.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" + "github.com/agent-substrate/substrate/internal/resources" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -30,10 +31,10 @@ func (s *Service) UpdateActor(ctx context.Context, req *ateapipb.UpdateActorRequ return nil, err } - actor, err := s.persistence.GetActor(ctx, req.GetActorId()) + actor, err := s.persistence.GetActor(ctx, req.GetRef().GetAtespace(), req.GetRef().GetName()) if err != nil { if errors.Is(err, store.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetActorId()) + return nil, status.Errorf(codes.NotFound, "Actor %s not found", req.GetRef().GetName()) } return nil, fmt.Errorf("while getting actor: %w", err) } @@ -50,9 +51,15 @@ func (s *Service) UpdateActor(ctx context.Context, req *ateapipb.UpdateActorRequ } func validateUpdateActorRequest(req *ateapipb.UpdateActorRequest) error { - if req.GetActorId() == "" { + if req.GetRef().GetName() == "" { return status.Error(codes.InvalidArgument, "actor_id is required") } + if req.GetRef().GetAtespace() == "" { + return status.Error(codes.InvalidArgument, "atespace is required") + } + if err := resources.ValidateAtespace(req.GetRef().GetAtespace()); err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } if err := validateSelector(req.GetWorkerSelector()); err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/cmd/ateapi/internal/controlapi/workflow.go b/cmd/ateapi/internal/controlapi/workflow.go index 5066a2c96..ff873a51a 100644 --- a/cmd/ateapi/internal/controlapi/workflow.go +++ b/cmd/ateapi/internal/controlapi/workflow.go @@ -148,10 +148,11 @@ func NewActorWorkflow( } // ResumeActor executes the workflow to resume a suspended actor. Idempotent. -func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) (*ateapipb.Actor, error) { +func (w *ActorWorkflow) ResumeActor(ctx context.Context, atespace, id string, boot bool) (*ateapipb.Actor, error) { input := &ResumeInput{ - ActorID: id, - Boot: boot, + ActorID: id, + Atespace: atespace, + Boot: boot, } state := &ResumeState{} @@ -178,9 +179,10 @@ func (w *ActorWorkflow) ResumeActor(ctx context.Context, id string, boot bool) ( } // SuspendActor executes the workflow to suspend a running actor. Idempotent. -func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb.Actor, error) { +func (w *ActorWorkflow) SuspendActor(ctx context.Context, atespace, id string) (*ateapipb.Actor, error) { input := &SuspendInput{ - ActorID: id, + ActorID: id, + Atespace: atespace, } state := &SuspendState{} @@ -207,9 +209,10 @@ func (w *ActorWorkflow) SuspendActor(ctx context.Context, id string) (*ateapipb. } // PauseActor executes the workflow to pause a running actor. Idempotent. -func (w *ActorWorkflow) PauseActor(ctx context.Context, id string) (*ateapipb.Actor, error) { +func (w *ActorWorkflow) PauseActor(ctx context.Context, atespace, id string) (*ateapipb.Actor, error) { input := &PauseInput{ - ActorID: id, + ActorID: id, + Atespace: atespace, } state := &PauseState{} diff --git a/cmd/ateapi/internal/controlapi/workflow_pause.go b/cmd/ateapi/internal/controlapi/workflow_pause.go index a7aaa41bc..49f9c4072 100644 --- a/cmd/ateapi/internal/controlapi/workflow_pause.go +++ b/cmd/ateapi/internal/controlapi/workflow_pause.go @@ -32,7 +32,8 @@ import ( // PauseInput holds the immutable parameters requested by the client. type PauseInput struct { - ActorID string + ActorID string + Atespace string } // PauseState holds the mutable state loaded and modified during execution. @@ -52,7 +53,7 @@ func (s *LoadActorForPauseStep) IsComplete(ctx context.Context, input *PauseInpu return false, nil } func (s *LoadActorForPauseStep) Execute(ctx context.Context, input *PauseInput, state *PauseState) error { - actor, err := s.store.GetActor(ctx, input.ActorID) + actor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } @@ -171,7 +172,7 @@ func (s *FinalizePausedStep) IsComplete(ctx context.Context, input *PauseInput, return state.Actor.GetStatus() == ateapipb.Actor_STATUS_PAUSED && state.Actor.GetAteomPodNamespace() == "", nil } func (s *FinalizePausedStep) Execute(ctx context.Context, input *PauseInput, state *PauseState) error { - latestActor, err := s.store.GetActor(ctx, input.ActorID) + latestActor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } @@ -198,6 +199,7 @@ func (s *FinalizePausedStep) Execute(ctx context.Context, input *PauseInput, sta worker.ActorNamespace = "" worker.ActorTemplate = "" worker.ActorId = "" + worker.ActorAtespace = "" err = s.store.UpdateWorker(ctx, worker, worker.Version) if err != nil { @@ -207,7 +209,7 @@ func (s *FinalizePausedStep) Execute(ctx context.Context, input *PauseInput, sta } // 2. Safely clear ActiveWorker now that the worker object in DB is freed - latestActor, err = s.store.GetActor(ctx, input.ActorID) + latestActor, err = s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index ddc877f9d..ba92bc9a7 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -41,8 +41,9 @@ import ( // ResumeInput holds the immutable parameters requested by the client. type ResumeInput struct { - ActorID string - Boot bool + ActorID string + Atespace string + Boot bool } // ResumeState holds the mutable state loaded and modified during execution. @@ -62,7 +63,7 @@ func (s *LoadActorForResumeStep) IsComplete(ctx context.Context, input *ResumeIn return false, nil } func (s *LoadActorForResumeStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error { - actor, err := s.store.GetActor(ctx, input.ActorID) + actor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { if errors.Is(err, store.ErrNotFound) { return status.Errorf(codes.NotFound, "Actor %s not found", input.ActorID) @@ -183,6 +184,7 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat assignedWorker.ActorId = input.ActorID assignedWorker.ActorNamespace = state.Actor.GetActorTemplateNamespace() assignedWorker.ActorTemplate = state.Actor.GetActorTemplateName() + assignedWorker.ActorAtespace = state.Actor.GetAtespace() if err := s.store.UpdateWorker(ctx, assignedWorker, assignedWorker.Version); err != nil { return err @@ -354,7 +356,7 @@ func (s *FinalizeRunningStep) IsComplete(ctx context.Context, input *ResumeInput return state.Actor.GetStatus() == ateapipb.Actor_STATUS_RUNNING, nil } func (s *FinalizeRunningStep) Execute(ctx context.Context, input *ResumeInput, state *ResumeState) error { - latestActor, err := s.store.GetActor(ctx, input.ActorID) + latestActor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } diff --git a/cmd/ateapi/internal/controlapi/workflow_suspend.go b/cmd/ateapi/internal/controlapi/workflow_suspend.go index 1c0bcec2d..281e958a3 100644 --- a/cmd/ateapi/internal/controlapi/workflow_suspend.go +++ b/cmd/ateapi/internal/controlapi/workflow_suspend.go @@ -33,7 +33,8 @@ import ( // SuspendInput holds the immutable parameters requested by the client. type SuspendInput struct { - ActorID string + ActorID string + Atespace string } // SuspendState holds the mutable state loaded and modified during execution. @@ -53,7 +54,7 @@ func (s *LoadActorForSuspendStep) IsComplete(ctx context.Context, input *Suspend return false, nil } func (s *LoadActorForSuspendStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error { - actor, err := s.store.GetActor(ctx, input.ActorID) + actor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } @@ -173,7 +174,7 @@ func (s *FinalizeSuspendedStep) IsComplete(ctx context.Context, input *SuspendIn return state.Actor.GetStatus() == ateapipb.Actor_STATUS_SUSPENDED && state.Actor.GetAteomPodNamespace() == "", nil } func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput, state *SuspendState) error { - latestActor, err := s.store.GetActor(ctx, input.ActorID) + latestActor, err := s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } @@ -197,6 +198,7 @@ func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput worker.ActorNamespace = "" worker.ActorTemplate = "" worker.ActorId = "" + worker.ActorAtespace = "" err = s.store.UpdateWorker(ctx, worker, worker.Version) if err != nil { @@ -206,7 +208,7 @@ func (s *FinalizeSuspendedStep) Execute(ctx context.Context, input *SuspendInput } // 2. Safely clear ActiveWorker now that the worker object in DB is freed - latestActor, err = s.store.GetActor(ctx, input.ActorID) + latestActor, err = s.store.GetActor(ctx, input.Atespace, input.ActorID) if err != nil { return err } diff --git a/cmd/ateapi/internal/store/ateredis/ateredis.go b/cmd/ateapi/internal/store/ateredis/ateredis.go index 025516a50..058a27785 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis.go @@ -15,7 +15,7 @@ // Package ateredis is an ate storage backend built on Redis. // // Actors are stored in keys of the form -// `actor:`. They are +// `actor::`. They are // stored as DBActor JSON-serialized objects, which lets us manipulate them from // Redis lua. // @@ -86,8 +86,128 @@ func NewPersistence(redisClient *redis.ClusterClient) *Persistence { } } -func actorDBKey(id string) string { - return "actor:" + id +func actorDBKey(atespace, id string) string { + return "actor:" + atespace + ":" + id +} + +// actorScanPattern returns the SCAN match pattern for listing actors. An empty +// atespace lists across all atespaces (actor:*); a non-empty atespace scopes the +// scan to that tenant (actor::*). +func actorScanPattern(atespace string) string { + if atespace == "" { + return "actor:*" + } + return "actor:" + atespace + ":*" +} + +func atespaceDBKey(name string) string { + return "atespace:" + name +} + +func (s *Persistence) CreateAtespace(ctx context.Context, atespace *ateapipb.Atespace) error { + dbKey := atespaceDBKey(atespace.GetName()) + dbBytes, err := protojson.Marshal(atespace) + if err != nil { + return fmt.Errorf("in protojson.Marshal: %w", err) + } + ok, err := s.rdb.SetNX(ctx, dbKey, dbBytes, 0).Result() + if err != nil { + return fmt.Errorf("while executing redis set: %w", err) + } + if !ok { + return store.ErrAlreadyExists + } + return nil +} + +func (s *Persistence) GetAtespace(ctx context.Context, name string) (*ateapipb.Atespace, error) { + dbKey := atespaceDBKey(name) + dbBytes, err := s.rdb.Get(ctx, dbKey).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, store.ErrNotFound + } + return nil, fmt.Errorf("while getting atespace key %q: %w", dbKey, err) + } + atespace := &ateapipb.Atespace{} + if err := protojson.Unmarshal(dbBytes, atespace); err != nil { + return nil, fmt.Errorf("while unmarshaling atespace: %w", err) + } + if atespace.GetName() != name { + return nil, fmt.Errorf("(impossible) mismatch between stored name and key %q", dbKey) + } + return atespace, nil +} + +// AtespaceExists reports whether the atespace object exists. This is a plain +// EXISTS check and is NOT atomic with respect to a concurrent DeleteAtespace. +func (s *Persistence) AtespaceExists(ctx context.Context, name string) (bool, error) { + n, err := s.rdb.Exists(ctx, atespaceDBKey(name)).Result() + if err != nil { + return false, fmt.Errorf("while checking atespace existence: %w", err) + } + return n > 0, nil +} + +func (s *Persistence) ListAtespaces(ctx context.Context) ([]*ateapipb.Atespace, error) { + var result []*ateapipb.Atespace + var mu sync.Mutex + + err := s.rdb.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error { + iter := master.Scan(ctx, 0, "atespace:*", 0).Iterator() + for iter.Next(ctx) { + key := iter.Val() + getCmd := master.Get(ctx, key) + if getCmd.Err() != nil { + return fmt.Errorf("while getting atespace %q: %w", key, getCmd.Err()) + } + atespace := &ateapipb.Atespace{} + if err := protojson.Unmarshal([]byte(getCmd.Val()), atespace); err != nil { + return fmt.Errorf("in protojson.Unmarshal: %w", err) + } + mu.Lock() + result = append(result, atespace) + mu.Unlock() + } + if err := iter.Err(); err != nil { + return fmt.Errorf("error from iterator: %w", err) + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("while iterating all redis master: %w", err) + } + return result, nil +} + +// DeleteAtespace deletes an empty atespace. Returns store.ErrNotFound if the +// atespace does not exist, or store.ErrFailedPrecondition if any actor still +// lives in it. +func (s *Persistence) DeleteAtespace(ctx context.Context, name string) error { + dbKey := atespaceDBKey(name) + + // Existence first, so a missing atespace returns NotFound, not a silent no-op. + exists, err := s.rdb.Exists(ctx, dbKey).Result() + if err != nil { + return fmt.Errorf("while checking atespace key %q: %w", dbKey, err) + } + if exists == 0 { + return store.ErrNotFound + } + + // Reject a non-empty atespace. + actors, _, err := s.ListActors(ctx, name, 1, "") + if err != nil { + return fmt.Errorf("while checking atespace emptiness: %w", err) + } + if len(actors) > 0 { + return store.ErrFailedPrecondition + } + + if err := s.rdb.Del(ctx, dbKey).Err(); err != nil { + return fmt.Errorf("while deleting atespace key %q: %w", dbKey, err) + } + return nil } func workerDBKey(namespace, poolName, podName string) string { @@ -179,8 +299,8 @@ func (s *Persistence) DebugClearAll(ctx context.Context) error { return err } -func (s *Persistence) GetActor(ctx context.Context, id string) (*ateapipb.Actor, error) { - dbKey := actorDBKey(id) +func (s *Persistence) GetActor(ctx context.Context, atespace, id string) (*ateapipb.Actor, error) { + dbKey := actorDBKey(atespace, id) dbActorBytes, err := s.rdb.Get(ctx, dbKey).Bytes() if err != nil { @@ -195,15 +315,15 @@ func (s *Persistence) GetActor(ctx context.Context, id string) (*ateapipb.Actor, return nil, fmt.Errorf("while unmarshaling actor: %w", err) } - if actor.GetActorId() != id { - return nil, fmt.Errorf("(impossible) mismatch between stored id and key id") + if actor.GetActorId() != id || actor.GetAtespace() != atespace { + return nil, fmt.Errorf("(impossible) mismatch between stored id/atespace and key") } return actor, nil } func (s *Persistence) CreateActor(ctx context.Context, actor *ateapipb.Actor) error { - dbKey := actorDBKey(actor.GetActorId()) + dbKey := actorDBKey(actor.GetAtespace(), actor.GetActorId()) // Clone because we will update the version field, and we don't want to // stomp the caller's copy. @@ -347,8 +467,8 @@ func (s *Persistence) DeleteWorker(ctx context.Context, namespace, pool, pod str return nil } -func (s *Persistence) DeleteActor(ctx context.Context, id string) error { - dbKey := actorDBKey(id) +func (s *Persistence) DeleteActor(ctx context.Context, atespace, id string) error { + dbKey := actorDBKey(atespace, id) err := s.rdb.Watch(ctx, func(tx *redis.Tx) error { currentVal, err := tx.Get(ctx, dbKey).Bytes() if err != nil { @@ -385,7 +505,7 @@ func (s *Persistence) DeleteActor(ctx context.Context, id string) error { } func (s *Persistence) UpdateActor(ctx context.Context, actor *ateapipb.Actor, expectedVersion int64) error { - dbKey := actorDBKey(actor.GetActorId()) + dbKey := actorDBKey(actor.GetAtespace(), actor.GetActorId()) // Clone because we will update the version field, and we don't want to // stomp the caller's copy. @@ -412,6 +532,9 @@ func (s *Persistence) UpdateActor(ctx context.Context, actor *ateapipb.Actor, ex if currentActor.GetActorId() != dbActor.GetActorId() { return fmt.Errorf("actor_id is immutable") } + if currentActor.GetAtespace() != dbActor.GetAtespace() { + return fmt.Errorf("atespace is immutable") + } if currentActor.GetActorTemplateNamespace() != dbActor.GetActorTemplateNamespace() { return fmt.Errorf("actor_template_namespace is immutable") } @@ -510,7 +633,10 @@ func hashShardAddr(addr string) string { return hex.EncodeToString(h[:]) } -func (s *Persistence) ListActors(ctx context.Context, pageSize int32, pageTokenStr string) ([]*ateapipb.Actor, string, error) { +// ListActors lists actors, scoped to the given atespace. An empty atespace lists +// across all atespaces (SCAN actor:*); a non-empty atespace restricts the scan to +// that tenant (SCAN actor::*). +func (s *Persistence) ListActors(ctx context.Context, atespace string, pageSize int32, pageTokenStr string) ([]*ateapipb.Actor, string, error) { token, err := decodePageToken(pageTokenStr) if err != nil { return nil, "", fmt.Errorf("invalid page token: %w", err) @@ -559,7 +685,7 @@ func (s *Persistence) ListActors(ctx context.Context, pageSize int32, pageTokenS } var keys []string - keys, cursor, err = master.Scan(ctx, cursor, "actor:*", int64(remaining)).Result() + keys, cursor, err = master.Scan(ctx, cursor, actorScanPattern(atespace), int64(remaining)).Result() if err != nil { return nil, "", fmt.Errorf("while scanning shard %s: %w", shardAddr, err) } diff --git a/cmd/ateapi/internal/store/ateredis/ateredis_test.go b/cmd/ateapi/internal/store/ateredis/ateredis_test.go index 779612e6c..6b3c61837 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis_test.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis_test.go @@ -49,7 +49,7 @@ func TestGetActor_NotFound(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() - _, err := s.GetActor(ctx, "non-existent") + _, err := s.GetActor(ctx, "", "non-existent") if !errors.Is(err, store.ErrNotFound) { t.Errorf("expected ErrNotFound, got %v", err) } @@ -71,7 +71,7 @@ func TestCreateActor_Success(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - got, err := s.GetActor(ctx, actor.ActorId) + got, err := s.GetActor(ctx, actor.GetAtespace(), actor.ActorId) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -134,7 +134,7 @@ func TestUpdateActor_Success(t *testing.T) { t.Errorf("expected actor.Version to be updated to 2, got %d", actor.Version) } - updated, err := s.GetActor(ctx, actor.ActorId) + updated, err := s.GetActor(ctx, actor.GetAtespace(), actor.ActorId) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -163,13 +163,13 @@ func TestUpdateActor_Conflict(t *testing.T) { } // Fetch instance 1 - actor1, err := s.GetActor(ctx, actor.ActorId) + actor1, err := s.GetActor(ctx, actor.GetAtespace(), actor.ActorId) if err != nil { t.Fatalf("GetActor failed: %v", err) } // Fetch instance 2 (stale after actor1 updates) - actor2, err := s.GetActor(ctx, actor.ActorId) + actor2, err := s.GetActor(ctx, actor.GetAtespace(), actor.ActorId) if err != nil { t.Fatalf("GetActor failed: %v", err) } @@ -348,12 +348,12 @@ func TestDeleteActor(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - err = s.DeleteActor(ctx, "session-1") + err = s.DeleteActor(ctx, "", "session-1") if err != nil { t.Fatalf("DeleteActor failed: %v", err) } - _, err = s.GetActor(ctx, "session-1") + _, err = s.GetActor(ctx, "", "session-1") if !errors.Is(err, store.ErrNotFound) { t.Errorf("expected ErrNotFound after delete, got %v", err) } @@ -375,7 +375,7 @@ func TestDeleteActor_NotSuspended(t *testing.T) { t.Fatalf("CreateActor failed: %v", err) } - err = s.DeleteActor(ctx, "session-1") + err = s.DeleteActor(ctx, "", "session-1") if !errors.Is(err, store.ErrFailedPrecondition) { t.Errorf("expected ErrFailedPrecondition deleting running actor, got %v", err) } @@ -385,7 +385,7 @@ func TestDeleteActor_NotFound(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() - err := s.DeleteActor(ctx, "non-existent") + err := s.DeleteActor(ctx, "", "non-existent") if !errors.Is(err, store.ErrNotFound) { t.Errorf("expected ErrNotFound deleting non-existent actor, got %v", err) } @@ -477,7 +477,7 @@ func TestListActors(t *testing.T) { t.Fatalf("failed to create actor2: %v", err) } - actors, _, err := s.ListActors(ctx, 1000, "") + actors, _, err := s.ListActors(ctx, "", 1000, "") if err != nil { t.Fatalf("ListActors failed: %v", err) } @@ -582,7 +582,7 @@ func TestListActors_Empty(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() - actors, _, err := s.ListActors(ctx, 1000, "") + actors, _, err := s.ListActors(ctx, "", 1000, "") if err != nil { t.Fatalf("ListActors failed: %v", err) } @@ -612,7 +612,7 @@ func TestListActors_Pagination(t *testing.T) { pageToken := "" for { - actors, nextToken, err := s.ListActors(ctx, 2, pageToken) + actors, nextToken, err := s.ListActors(ctx, "", 2, pageToken) if err != nil { t.Fatalf("ListActors failed: %v", err) } @@ -833,3 +833,262 @@ func TestAcquireLock_NonReentry(t *testing.T) { t.Errorf("expected second lock acquisition to fail (non-reentrant)") } } + +func TestListActors_ScopedByAtespace(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + mkActor := func(id, atespace string) *ateapipb.Actor { + return &ateapipb.Actor{ + ActorId: id, + Atespace: atespace, + ActorTemplateNamespace: "ns1", + ActorTemplateName: "tmpl1", + Status: ateapipb.Actor_STATUS_SUSPENDED, + } + } + for _, a := range []*ateapipb.Actor{ + mkActor("a1", "team-a"), + mkActor("a2", "team-a"), + mkActor("b1", "team-b"), + } { + if err := s.CreateActor(ctx, a); err != nil { + t.Fatalf("CreateActor(%s/%s) failed: %v", a.GetAtespace(), a.GetActorId(), err) + } + } + + // List is scoped to one atespace. + teamA, _, err := s.ListActors(ctx, "team-a", 1000, "") + if err != nil { + t.Fatalf("ListActors(team-a) failed: %v", err) + } + if got := actorIDSet(teamA); !got["a1"] || !got["a2"] || got["b1"] || len(got) != 2 { + t.Errorf("ListActors(team-a) = %v, want exactly {a1, a2}", got) + } + + teamB, _, err := s.ListActors(ctx, "team-b", 1000, "") + if err != nil { + t.Fatalf("ListActors(team-b) failed: %v", err) + } + if got := actorIDSet(teamB); !got["b1"] || got["a1"] || len(got) != 1 { + t.Errorf("ListActors(team-b) = %v, want exactly {b1}", got) + } + + // An empty atespace lists across all atespaces (the admin/dev `-A` view). + all, _, err := s.ListActors(ctx, "", 1000, "") + if err != nil { + t.Fatalf("ListActors(all) failed: %v", err) + } + if got := actorIDSet(all); !got["a1"] || !got["a2"] || !got["b1"] || len(got) != 3 { + t.Errorf("ListActors(all) = %v, want exactly {a1, a2, b1}", got) + } + + // Get is scoped too: right atespace hits, wrong/empty atespace misses. + if _, err := s.GetActor(ctx, "team-a", "a1"); err != nil { + t.Errorf("GetActor(team-a, a1) failed: %v", err) + } + if _, err := s.GetActor(ctx, "team-b", "a1"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("GetActor(team-b, a1) = %v, want ErrNotFound", err) + } + if _, err := s.GetActor(ctx, "", "a1"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("GetActor(empty, a1) = %v, want ErrNotFound", err) + } +} + +func actorIDSet(actors []*ateapipb.Actor) map[string]bool { + set := make(map[string]bool, len(actors)) + for _, a := range actors { + set[a.GetActorId()] = true + } + return set +} + +func newTestAtespace(name string) *ateapipb.Atespace { + return &ateapipb.Atespace{Name: name} +} + +func TestCreateAtespace_Success(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + want := newTestAtespace("team-a") + if err := s.CreateAtespace(ctx, want); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + got, err := s.GetAtespace(ctx, "team-a") + if err != nil { + t.Fatalf("GetAtespace failed: %v", err) + } + if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" { + t.Errorf("round-trip mismatch (-want +got):\n%s", diff) + } +} + +func TestCreateAtespace_AlreadyExists(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("first CreateAtespace failed: %v", err) + } + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); !errors.Is(err, store.ErrAlreadyExists) { + t.Errorf("expected ErrAlreadyExists, got %v", err) + } +} + +func TestGetAtespace_NotFound(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if _, err := s.GetAtespace(ctx, "nope"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +func TestAtespaceExists(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if ok, err := s.AtespaceExists(ctx, "team-a"); err != nil || ok { + t.Fatalf("AtespaceExists before create = (%v, %v), want (false, nil)", ok, err) + } + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if ok, err := s.AtespaceExists(ctx, "team-a"); err != nil || !ok { + t.Fatalf("AtespaceExists after create = (%v, %v), want (true, nil)", ok, err) + } +} + +func TestListAtespaces(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + names := []string{"team-a", "team-b", "team-c"} + for _, n := range names { + if err := s.CreateAtespace(ctx, newTestAtespace(n)); err != nil { + t.Fatalf("CreateAtespace(%s) failed: %v", n, err) + } + } + got, err := s.ListAtespaces(ctx) + if err != nil { + t.Fatalf("ListAtespaces failed: %v", err) + } + if len(got) != len(names) { + t.Fatalf("ListAtespaces returned %d atespaces, want %d", len(got), len(names)) + } + gotNames := map[string]bool{} + for _, a := range got { + gotNames[a.GetName()] = true + } + for _, n := range names { + if !gotNames[n] { + t.Errorf("ListAtespaces missing %q; got %v", n, gotNames) + } + } +} + +func TestListAtespaces_Empty(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + got, err := s.ListAtespaces(ctx) + if err != nil { + t.Fatalf("ListAtespaces failed: %v", err) + } + if len(got) != 0 { + t.Errorf("ListAtespaces on empty store = %v, want empty", got) + } +} + +func TestDeleteAtespace_Empty(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if err := s.DeleteAtespace(ctx, "team-a"); err != nil { + t.Fatalf("DeleteAtespace failed: %v", err) + } + if _, err := s.GetAtespace(ctx, "team-a"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("after delete, GetAtespace = %v, want ErrNotFound", err) + } +} + +func TestDeleteAtespace_NotFound(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.DeleteAtespace(ctx, "nope"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("expected ErrNotFound, got %v", err) + } +} + +func TestDeleteAtespace_NonEmpty_Rejected(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if err := s.CreateActor(ctx, &ateapipb.Actor{ActorId: "id1", Atespace: "team-a", Status: ateapipb.Actor_STATUS_SUSPENDED}); err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + if err := s.DeleteAtespace(ctx, "team-a"); !errors.Is(err, store.ErrFailedPrecondition) { + t.Errorf("DeleteAtespace on non-empty = %v, want ErrFailedPrecondition", err) + } + // The atespace must survive a rejected delete. + if _, err := s.GetAtespace(ctx, "team-a"); err != nil { + t.Errorf("atespace should still exist after rejected delete, got %v", err) + } +} + +func TestDeleteAtespace_EmptyAfterActorsRemoved(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("CreateAtespace failed: %v", err) + } + if err := s.CreateActor(ctx, &ateapipb.Actor{ActorId: "id1", Atespace: "team-a", Status: ateapipb.Actor_STATUS_SUSPENDED}); err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + if err := s.DeleteAtespace(ctx, "team-a"); !errors.Is(err, store.ErrFailedPrecondition) { + t.Fatalf("expected rejection while non-empty, got %v", err) + } + if err := s.DeleteActor(ctx, "team-a", "id1"); err != nil { + t.Fatalf("DeleteActor failed: %v", err) + } + if err := s.DeleteAtespace(ctx, "team-a"); err != nil { + t.Errorf("DeleteAtespace after actor removed = %v, want nil (re-scan should find it empty)", err) + } +} + +func TestDeleteAtespace_EmptyWhileOtherTenantNonEmpty(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + if err := s.CreateAtespace(ctx, newTestAtespace("team-a")); err != nil { + t.Fatalf("CreateAtespace(team-a) failed: %v", err) + } + if err := s.CreateAtespace(ctx, newTestAtespace("team-b")); err != nil { + t.Fatalf("CreateAtespace(team-b) failed: %v", err) + } + // Actor lives ONLY in team-b. + if err := s.CreateActor(ctx, &ateapipb.Actor{ActorId: "id1", Atespace: "team-b", Status: ateapipb.Actor_STATUS_SUSPENDED}); err != nil { + t.Fatalf("CreateActor failed: %v", err) + } + + // team-a is empty → delete must succeed. + if err := s.DeleteAtespace(ctx, "team-a"); err != nil { + t.Errorf("DeleteAtespace(team-a, empty) = %v, want nil (must not be blocked by team-b's actor)", err) + } + if _, err := s.GetAtespace(ctx, "team-a"); !errors.Is(err, store.ErrNotFound) { + t.Errorf("after delete, GetAtespace(team-a) = %v, want ErrNotFound", err) + } + // team-b is still non-empty → still rejected. + if err := s.DeleteAtespace(ctx, "team-b"); !errors.Is(err, store.ErrFailedPrecondition) { + t.Errorf("DeleteAtespace(team-b, non-empty) = %v, want ErrFailedPrecondition", err) + } +} diff --git a/cmd/ateapi/internal/store/store.go b/cmd/ateapi/internal/store/store.go index 11ed7b11d..5de4bdc30 100644 --- a/cmd/ateapi/internal/store/store.go +++ b/cmd/ateapi/internal/store/store.go @@ -39,8 +39,8 @@ var ( // Interface defines the contract for the persistence layer storing actor state. type Interface interface { - // Fetches an actor by id. Returns ErrNotFound if missing. - GetActor(ctx context.Context, id string) (*ateapipb.Actor, error) + // Fetches an actor by (atespace, id). Returns ErrNotFound if missing. + GetActor(ctx context.Context, atespace, id string) (*ateapipb.Actor, error) // Stores a new actor in suspended state. Returns ErrAlreadyExists if key is taken. CreateActor(ctx context.Context, actor *ateapipb.Actor) error @@ -49,10 +49,27 @@ type Interface interface { UpdateActor(ctx context.Context, actor *ateapipb.Actor, expectedVersion int64) error // Removes an actor. Returns ErrNotFound if missing, or ErrFailedPrecondition if not suspended. - DeleteActor(ctx context.Context, id string) error + DeleteActor(ctx context.Context, atespace, id string) error - // Lists all known actors. Returns a page of actors and a next page token. - ListActors(ctx context.Context, pageSize int32, pageToken string) ([]*ateapipb.Actor, string, error) + // Lists actors in the given atespace (scoped scan), or across ALL atespaces if atespace is + // empty. Returns a page of actors and a next page token. + ListActors(ctx context.Context, atespace string, pageSize int32, pageToken string) ([]*ateapipb.Actor, string, error) + + // Stores a new atespace. Returns ErrAlreadyExists if the name is taken. + CreateAtespace(ctx context.Context, atespace *ateapipb.Atespace) error + + // Fetches an atespace by name. Returns ErrNotFound if missing. + GetAtespace(ctx context.Context, name string) (*ateapipb.Atespace, error) + + // Lists all atespaces. Returns nil if none found. + ListAtespaces(ctx context.Context) ([]*ateapipb.Atespace, error) + + // AtespaceExists reports whether the atespace object exists. + AtespaceExists(ctx context.Context, name string) (bool, error) + + // Removes an empty atespace. Returns ErrNotFound if missing, or + // ErrFailedPrecondition if any actor::* key still exists. + DeleteAtespace(ctx context.Context, name string) error // Fetches worker state by namespace, pool, and pod name. Returns ErrNotFound if missing. GetWorker(ctx context.Context, namespace, pool, pod string) (*ateapipb.Worker, error) diff --git a/cmd/atecontroller/internal/controllers/actortemplate_controller.go b/cmd/atecontroller/internal/controllers/actortemplate_controller.go index 372890116..085700b0c 100644 --- a/cmd/atecontroller/internal/controllers/actortemplate_controller.go +++ b/cmd/atecontroller/internal/controllers/actortemplate_controller.go @@ -19,9 +19,12 @@ import ( "fmt" "time" + "github.com/agent-substrate/substrate/internal/resources" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" "github.com/google/uuid" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" k8errors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -71,12 +74,18 @@ func (r *ActorTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reques case atev1alpha1.PhaseInitial: actorID := uuid.NewString() + // Golden actors live in the reserved ate-golden system atespace. + _, err := r.AteClient.CreateAtespace(ctx, &ateapipb.CreateAtespaceRequest{Name: resources.GoldenActorAtespace}) + if err != nil && status.Code(err) != codes.AlreadyExists { + return ctrl.Result{}, fmt.Errorf("while ensuring atespace %q: %w", resources.GoldenActorAtespace, err) + } + createReq := &ateapipb.CreateActorRequest{ - ActorId: actorID, + Ref: &ateapipb.ActorReference{Atespace: resources.GoldenActorAtespace, Name: actorID}, ActorTemplateNamespace: at.ObjectMeta.Namespace, ActorTemplateName: at.ObjectMeta.Name, } - _, err := r.AteClient.CreateActor(ctx, createReq) + _, err = r.AteClient.CreateActor(ctx, createReq) if err != nil { return ctrl.Result{}, fmt.Errorf("while creating golden actor: %w", err) } @@ -101,7 +110,7 @@ func (r *ActorTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reques // TODO: Maybe this should go through a different RPC dedicated to // booting an actor from scratch. resumeReq := &ateapipb.ResumeActorRequest{ - ActorId: at.Status.GoldenActorID, + Ref: &ateapipb.ActorReference{Atespace: resources.GoldenActorAtespace, Name: at.Status.GoldenActorID}, } _, err := r.AteClient.ResumeActor(ctx, resumeReq) if err != nil { @@ -127,7 +136,7 @@ func (r *ActorTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reques // from it. req := &ateapipb.SuspendActorRequest{ - ActorId: at.Status.GoldenActorID, + Ref: &ateapipb.ActorReference{Atespace: resources.GoldenActorAtespace, Name: at.Status.GoldenActorID}, } resp, err := r.AteClient.SuspendActor(ctx, req) if err != nil { diff --git a/cmd/atenet/internal/dns/corefile.go b/cmd/atenet/internal/dns/corefile.go index 7e243f93c..618248cf1 100644 --- a/cmd/atenet/internal/dns/corefile.go +++ b/cmd/atenet/internal/dns/corefile.go @@ -39,11 +39,12 @@ func buildTemplate() string { directives = append(directives, "ready :8181") directives = append(directives, "reload") - // Construct match pattern for .. + // Construct match pattern for ... Both the + // actor id and the atespace are DNS-1123 labels (same regex). directives = append(directives, fmt.Sprintf("template IN A %s {", resources.ActorDNSSuffix)) - dnsDomainParts := strings.Split("."+resources.ActorDNSSuffix+".", ".") - dnsDomainRef := strings.Join(dnsDomainParts, `\.`) - directives = append(directives, fmt.Sprintf(` match "^%s%s$"`, resources.ActorIDRegexPattern, dnsDomainRef)) + // Escape the suffix's dots so they match literally; the final \. matches the FQDN's trailing dot. + escapedSuffix := strings.ReplaceAll(resources.ActorDNSSuffix, ".", `\.`) + directives = append(directives, fmt.Sprintf(` match "^%s\.%s\.%s\.$"`, resources.ActorIDRegexPattern, resources.ActorIDRegexPattern, escapedSuffix)) // Note the %s -- this will be filled with the router IP. directives = append(directives, ` answer "{{ .Name }} 60 IN A %s"`) directives = append(directives, "}") diff --git a/cmd/atenet/internal/dns/corefile_test.go b/cmd/atenet/internal/dns/corefile_test.go index df565ad06..a8cffcf27 100644 --- a/cmd/atenet/internal/dns/corefile_test.go +++ b/cmd/atenet/internal/dns/corefile_test.go @@ -38,7 +38,7 @@ func TestMakeCoreFile(t *testing.T) { "ready :8181", "reload", "template IN A actors.resources.substrate.ate.dev {", - `match "^` + resources.ActorIDRegexPattern + `\.actors\.resources\.substrate\.ate\.dev\.$"`, + `match "^` + resources.ActorIDRegexPattern + `\.` + resources.ActorIDRegexPattern + `\.actors\.resources\.substrate\.ate\.dev\.$"`, `answer "{{ .Name }} 60 IN A 10.240.0.10"`, }, }, diff --git a/cmd/atenet/internal/router/extproc.go b/cmd/atenet/internal/router/extproc.go index 3b3664774..ac5b5d5b8 100644 --- a/cmd/atenet/internal/router/extproc.go +++ b/cmd/atenet/internal/router/extproc.go @@ -142,14 +142,14 @@ func (s *ExtProcServer) handleRequestHeaders( ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ExtProc.RequestHeaders") defer span.End() - actorID, err := parseActorID(metadata.host) + atespace, actorID, err := parseActorRef(metadata.host) if err != nil { // Host is invalid, respond with 404. return nil, metadata, "", "", "", invalidHostErr(metadata.host, err) } - slog.InfoContext(ctx, "ResumeActor", slog.String("actorID", actorID)) - actor, err := s.resumer.ResumeActor(ctx, actorID) + slog.InfoContext(ctx, "ResumeActor", slog.String("atespace", atespace), slog.String("actorID", actorID)) + actor, err := s.resumer.ResumeActor(ctx, atespace, actorID) if err != nil { return nil, metadata, "", "", "", mapResumeError(actorID, err) } @@ -161,6 +161,7 @@ func (s *ExtProcServer) handleRequestHeaders( workerIP := actor.GetAteomPodIp() slog.InfoContext(ctx, "ResumeActor result", + slog.String("atespace", atespace), slog.String("actorID", actorID), slog.String("status", actor.GetStatus().String()), slog.String("workerIP", workerIP)) diff --git a/cmd/atenet/internal/router/extproc_in.go b/cmd/atenet/internal/router/extproc_in.go index ae3e1b4d2..aa15684d3 100644 --- a/cmd/atenet/internal/router/extproc_in.go +++ b/cmd/atenet/internal/router/extproc_in.go @@ -15,7 +15,6 @@ package router import ( - "fmt" "net" "strings" @@ -57,21 +56,17 @@ func newRequestMetadata(headers []*corev3.HeaderValue) *requestMetadata { } } -func parseActorID(host string) (string, error) { - var err error +// parseActorRef extracts the (atespace, actor id) an incoming request is +// addressed to from its Host/:authority, which has the form +// "..actors.resources.substrate.ate.dev" (optionally with a +// port). The atespace is required because an actor id is only unique within its +// atespace. +func parseActorRef(host string) (atespace, actorID string, err error) { if strings.Contains(host, ":") { host, _, err = net.SplitHostPort(host) + if err != nil { + return "", "", err + } } - if err != nil { - return "", err - } - actorID, found := strings.CutSuffix(strings.TrimSuffix(host, "."), "."+resources.ActorDNSSuffix) - if !found { - return "", fmt.Errorf("invalid actor_id: must end with %s, got %q", resources.ActorDNSSuffix, host) - } - if err := resources.ValidateActorID(actorID); err != nil { - return "", err - } - - return actorID, nil + return resources.ParseActorDNSName(host) } diff --git a/cmd/atenet/internal/router/extproc_in_test.go b/cmd/atenet/internal/router/extproc_in_test.go index 901d98976..a23d47590 100644 --- a/cmd/atenet/internal/router/extproc_in_test.go +++ b/cmd/atenet/internal/router/extproc_in_test.go @@ -119,60 +119,68 @@ func TestExtractMetadata(t *testing.T) { } } -func TestParseActorID(t *testing.T) { +func TestParseActorRef(t *testing.T) { tests := []struct { - name string - host string - wantID string - wantErr bool + name string + host string + wantAtespace string + wantID string + wantErr bool }{ { - name: "valid host without port", - host: "my-actor.actors.resources.substrate.ate.dev", - wantID: "my-actor", - wantErr: false, + name: "valid host without port", + host: "my-actor.team-a.actors.resources.substrate.ate.dev", + wantAtespace: "team-a", + wantID: "my-actor", + wantErr: false, + }, + { + name: "valid host with port", + host: "my-actor.team-a.actors.resources.substrate.ate.dev:8443", + wantAtespace: "team-a", + wantID: "my-actor", + wantErr: false, }, { - name: "valid host with port", - host: "my-actor.actors.resources.substrate.ate.dev:8443", - wantID: "my-actor", - wantErr: false, + name: "valid host with trailing dot", + host: "my-actor.team-a.actors.resources.substrate.ate.dev.", + wantAtespace: "team-a", + wantID: "my-actor", + wantErr: false, }, { - name: "valid host with trailing dot", - host: "my-actor.actors.resources.substrate.ate.dev.", - wantID: "my-actor", - wantErr: false, + name: "valid host with trailing dot and port", + host: "my-actor.team-a.actors.resources.substrate.ate.dev.:8080", + wantAtespace: "team-a", + wantID: "my-actor", + wantErr: false, }, { - name: "valid host with trailing dot and port", - host: "my-actor.actors.resources.substrate.ate.dev.:8080", - wantID: "my-actor", - wantErr: false, + name: "missing atespace label", + host: "my-actor.actors.resources.substrate.ate.dev", + wantErr: true, }, { name: "invalid suffix", - host: "my-actor.example.com", - wantID: "", + host: "my-actor.team-a.example.com", wantErr: true, }, { name: "invalid host port format", - host: "my-actor.actors.resources.substrate.ate.dev:invalid:port", - wantID: "", + host: "my-actor.team-a.actors.resources.substrate.ate.dev:invalid:port", wantErr: true, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - gotID, err := parseActorID(tc.host) + gotAtespace, gotID, err := parseActorRef(tc.host) if (err != nil) != tc.wantErr { - t.Errorf("parseActorID(%q) error = %v, wantErr %v", tc.host, err, tc.wantErr) + t.Errorf("parseActorRef(%q) error = %v, wantErr %v", tc.host, err, tc.wantErr) return } - if gotID != tc.wantID { - t.Errorf("parseActorID(%q) gotID = %v, want %v", tc.host, gotID, tc.wantID) + if gotAtespace != tc.wantAtespace || gotID != tc.wantID { + t.Errorf("parseActorRef(%q) = (%q, %q), want (%q, %q)", tc.host, gotAtespace, gotID, tc.wantAtespace, tc.wantID) } }) } diff --git a/cmd/atenet/internal/router/extproc_test.go b/cmd/atenet/internal/router/extproc_test.go index 52f345035..3f28d27ef 100644 --- a/cmd/atenet/internal/router/extproc_test.go +++ b/cmd/atenet/internal/router/extproc_test.go @@ -61,7 +61,7 @@ func TestHandleRequestHeadersDoesNotLogSensitiveData(t *testing.T) { Headers: &corev3.HeaderMap{ Headers: []*corev3.HeaderValue{ {Key: ":path", Value: "/api/v1/reset?token=" + secret}, - {Key: ":authority", Value: testUUID + ".actors.resources.substrate.ate.dev"}, + {Key: ":authority", Value: testUUID + ".team-a.actors.resources.substrate.ate.dev"}, {Key: ":method", Value: "POST"}, {Key: "authorization", Value: "Bearer " + secret}, {Key: "cookie", Value: "session=" + secret}, @@ -107,12 +107,12 @@ func TestExtProcHeadersEvaluation(t *testing.T) { name: "invalid host returns 404 identifying the host", authority: "invalid-host.com", expectErr: true, - expectedErrStr: `invalid host "invalid-host.com": invalid actor_id: must end with actors.resources.substrate.ate.dev, got "invalid-host.com"`, + expectedErrStr: `invalid host "invalid-host.com": invalid actor DNS name: must end with actors.resources.substrate.ate.dev, got "invalid-host.com"`, expectedStatus: envoy_type.StatusCode_NotFound, }, { name: "non-gRPC resume error collapses to 500 without leaking detail", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeErr: errors.New("resume failed with sensitive detail"), expectErr: true, expectedErrStr: `error resuming actor "123e4567-e89b-12d3-a456-426614174000"`, @@ -120,7 +120,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "FailedPrecondition maps to 503 with preserved desc", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeErr: status.Error(codes.FailedPrecondition, "no free workers available"), expectErr: true, expectedErrStr: `actor "123e4567-e89b-12d3-a456-426614174000" unavailable: no free workers available`, @@ -128,7 +128,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "NotFound maps to 404", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeErr: status.Error(codes.NotFound, "actor missing"), expectErr: true, expectedErrStr: `actor "123e4567-e89b-12d3-a456-426614174000" not found`, @@ -136,7 +136,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "Unavailable maps to 503", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeErr: status.Error(codes.Unavailable, "control-plane down"), expectErr: true, expectedErrStr: `actor "123e4567-e89b-12d3-a456-426614174000" unavailable`, @@ -144,7 +144,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "DeadlineExceeded maps to 504", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeErr: status.Error(codes.DeadlineExceeded, "deadline"), expectErr: true, expectedErrStr: `actor "123e4567-e89b-12d3-a456-426614174000" request timed out`, @@ -152,7 +152,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "Bad Actor IP from resume returns 500 without leaking IP", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeResp: &ateapipb.ResumeActorResponse{ Actor: &ateapipb.Actor{ AteomPodIp: "invalid-ip", @@ -164,7 +164,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) { }, { name: "Successful resume", - authority: testUUID + ".actors.resources.substrate.ate.dev", + authority: testUUID + ".team-a.actors.resources.substrate.ate.dev", resumeResp: &ateapipb.ResumeActorResponse{ Actor: &ateapipb.Actor{ AteomPodIp: "10.0.0.52", @@ -179,8 +179,8 @@ func TestExtProcHeadersEvaluation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clientMock := &mockClient{ resumeFn: func(ctx context.Context, in *ateapipb.ResumeActorRequest, opts ...grpc.CallOption) (*ateapipb.ResumeActorResponse, error) { - if in.GetActorId() != testUUID { - t.Errorf("unexpected identifier parsed in test context: %s", in.GetActorId()) + if in.GetRef().GetName() != testUUID { + t.Errorf("unexpected identifier parsed in test context: %s", in.GetRef().GetName()) } if tc.resumeErr != nil { return nil, tc.resumeErr diff --git a/cmd/atenet/internal/router/resumer.go b/cmd/atenet/internal/router/resumer.go index 5ed2ad040..872927ba9 100644 --- a/cmd/atenet/internal/router/resumer.go +++ b/cmd/atenet/internal/router/resumer.go @@ -41,13 +41,17 @@ func NewActorResumer(apiClient ateapipb.ControlClient) *ActorResumer { } // ResumeActor ensures the requested actor is running. It deduplicates concurrent -// requests within the process and retries when needed. -func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapipb.Actor, error) { +// requests within the process and retries when needed. The actor is addressed by +// (atespace, actorID) since an actor id is only unique within its atespace. +func (r *ActorResumer) ResumeActor(ctx context.Context, atespace, actorID string) (*ateapipb.Actor, error) { ctx, span := otel.Tracer(routerServiceName).Start(ctx, "ResumeActor", - trace.WithAttributes(attribute.String("actor_id", actorID))) + trace.WithAttributes( + attribute.String("atespace", atespace), + attribute.String("actor_id", actorID), + )) defer span.End() - ch := r.flight.DoChan(actorID, func() (interface{}, error) { + ch := r.flight.DoChan(atespace+"/"+actorID, func() (interface{}, error) { // We detach the context from the first caller using a fixed background timeout. // This guarantees that if Caller 1 disconnects or times out, the underlying // resume operation continues running for Caller 2 and Caller 3 without failing. @@ -66,7 +70,7 @@ func (r *ActorResumer) ResumeActor(ctx context.Context, actorID string) (*ateapi err := wait.ExponentialBackoffWithContext(bgCtx, backoff, func(ctx context.Context) (bool, error) { var err error resumeResp, err = r.apiClient.ResumeActor(ctx, &ateapipb.ResumeActorRequest{ - ActorId: actorID, + Ref: &ateapipb.ActorReference{Atespace: atespace, Name: actorID}, }) if err == nil { return true, nil diff --git a/cmd/atenet/internal/router/resumer_test.go b/cmd/atenet/internal/router/resumer_test.go index 8e10f7f57..061023d58 100644 --- a/cmd/atenet/internal/router/resumer_test.go +++ b/cmd/atenet/internal/router/resumer_test.go @@ -40,6 +40,7 @@ func (m *resumerMockClient) ResumeActor(ctx context.Context, in *ateapipb.Resume func TestActorResumer_ResumeActor(t *testing.T) { const testActorID = "actor-a" + const testAtespace = "team-a" const expectedIP = "10.0.0.52" t.Run("SuspendedResumedSuccessfully", func(t *testing.T) { @@ -58,7 +59,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { } resumer := NewActorResumer(mock) - actor, err := resumer.ResumeActor(context.Background(), testActorID) + actor, err := resumer.ResumeActor(context.Background(), testAtespace, testActorID) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -89,7 +90,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { } resumer := NewActorResumer(mock) - actor, err := resumer.ResumeActor(context.Background(), testActorID) + actor, err := resumer.ResumeActor(context.Background(), testAtespace, testActorID) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -109,7 +110,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { } resumer := NewActorResumer(mock) - _, err := resumer.ResumeActor(context.Background(), testActorID) + _, err := resumer.ResumeActor(context.Background(), testAtespace, testActorID) if got := status.Code(err); got != codes.NotFound { t.Errorf("expected gRPC code NotFound, got %v (err=%v)", got, err) } @@ -146,7 +147,7 @@ func TestActorResumer_ResumeActor(t *testing.T) { for i := 0; i < concurrentRequests; i++ { go func(idx int) { defer wg.Done() - results[idx], errs[idx] = resumer.ResumeActor(context.Background(), testActorID) + results[idx], errs[idx] = resumer.ResumeActor(context.Background(), testAtespace, testActorID) }(i) } wg.Wait() diff --git a/cmd/kubectl-ate/README.md b/cmd/kubectl-ate/README.md index b45679ae2..557650511 100644 --- a/cmd/kubectl-ate/README.md +++ b/cmd/kubectl-ate/README.md @@ -74,23 +74,30 @@ These flags can be appended to any command: List and inspect the state of actors and workers across the cluster. ```bash -# List all actors in a clean table format -kubectl ate get actors +# List actors in one atespace (tenant); -a is shorthand for --atespace +kubectl ate get actors --atespace +kubectl ate get actors -a + +# List actors across all atespaces +kubectl ate get actors -A # Get a specific actor by ID and output as raw YAML -kubectl ate get actor -o yaml +kubectl ate get actor --atespace -o yaml # List all physical workers and see which actors are assigned to them kubectl ate get workers ``` +> **Note:** `get actors` requires either `--atespace ` / `-a ` (one tenant) or `-A`/`--all-atespaces` (all tenants) — there is no default atespace. Getting a single actor always requires `--atespace`/`-a`, since an actor is addressed by `(atespace, id)`. `-a` (lower-case) scopes to one atespace; `-A` (upper-case) spans all. + > **Note:** Actors and workers are not Kubernetes CRDs — they live in the Substrate control plane (valkey/redis), not `etcd`. `kubectl get actor` and `kubectl get worker` will not return anything; only `kubectl ate get …` queries the control plane. `kubectl get actortemplate` and `kubectl get workerpool` *do* work, because those are CRDs. #### `kubectl ate get actor` output columns | Column | Meaning | |---|---| -| `NAMESPACE` | The namespace of the `ActorTemplate` the actor was created from. | +| `ATESPACE` | The atespace (tenant boundary) the actor belongs to. Part of the actor's identity; folded into the storage key as `actor::`. | +| `TEMPLATE NS` | The namespace of the `ActorTemplate` the actor was created from (distinct from `ATESPACE`). | | `TEMPLATE` | The `ActorTemplate` name. | | `ID` | Actor ID. User-provided for application actors; UUID for the golden actor that each template materialises during `ResumeGoldenActor`. | | `STATUS` | One of `STATUS_RESUMING`, `STATUS_RUNNING`, `STATUS_SUSPENDING`, `STATUS_SUSPENDED`. | @@ -108,22 +115,44 @@ kubectl ate get workers | `STATUS` | `FREE` (idle, ready to receive an actor) or `ASSIGNED` (currently hosting an actor). | | `ASSIGNED ACTOR` | If `STATUS=ASSIGNED`, the actor reference `/