From ce60e43e757d25cba69da698db42733a87395282 Mon Sep 17 00:00:00 2001 From: Evan Date: Mon, 22 Jun 2026 17:59:53 +0800 Subject: [PATCH] feat: Add image cache #0 --- .../controlapi/report_node_image_cache.go | 62 ++++ .../report_node_image_cache_test.go | 58 ++++ .../internal/controlapi/workflow_resume.go | 96 +++++- .../controlapi/workflow_resume_test.go | 85 +++++ .../internal/store/ateredis/ateredis.go | 43 +++ .../internal/store/ateredis/ateredis_test.go | 29 ++ cmd/ateapi/internal/store/store.go | 8 + cmd/atelet/cache_reporter.go | 71 ++++ cmd/atelet/cache_reporter_test.go | 57 +++ .../memorypullcache/memorypullcache.go | 48 ++- .../memorypullcache/memorypullcache_test.go | 28 ++ cmd/atelet/main.go | 29 ++ pkg/proto/ateapipb/ateapi.pb.go | 326 +++++++++++++----- pkg/proto/ateapipb/ateapi.proto | 16 + pkg/proto/ateapipb/ateapi_grpc.pb.go | 62 +++- 15 files changed, 918 insertions(+), 100 deletions(-) create mode 100644 cmd/ateapi/internal/controlapi/report_node_image_cache.go create mode 100644 cmd/ateapi/internal/controlapi/report_node_image_cache_test.go create mode 100644 cmd/atelet/cache_reporter.go create mode 100644 cmd/atelet/cache_reporter_test.go diff --git a/cmd/ateapi/internal/controlapi/report_node_image_cache.go b/cmd/ateapi/internal/controlapi/report_node_image_cache.go new file mode 100644 index 000000000..623b849d2 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/report_node_image_cache.go @@ -0,0 +1,62 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "sort" + "time" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// nodeImageCacheTTL allows several reports to be missed without immediately +// discarding useful affinity information. Stale data affects performance only: +// atelet still pulls an image normally if the scheduler predicted a cache hit. +const nodeImageCacheTTL = 2 * time.Minute + +func (s *Service) ReportNodeImageCache(ctx context.Context, req *ateapipb.ReportNodeImageCacheRequest) (*ateapipb.ReportNodeImageCacheResponse, error) { + cache := req.GetCache() + if cache.GetNodeName() == "" { + return nil, status.Error(codes.InvalidArgument, "cache.node_name is required") + } + if cache.GetAteletPodUid() == "" { + return nil, status.Error(codes.InvalidArgument, "cache.atelet_pod_uid is required") + } + + cache = proto.Clone(cache).(*ateapipb.NodeImageCache) + // Reports are authoritative snapshots. Normalize them to keep storage and + // scheduler comparisons deterministic and avoid duplicate digest entries. + dedup := make(map[string]struct{}, len(cache.GetImageDigests())) + for _, digest := range cache.GetImageDigests() { + if digest == "" { + return nil, status.Error(codes.InvalidArgument, "cache.image_digests must not contain an empty digest") + } + dedup[digest] = struct{}{} + } + cache.ImageDigests = cache.ImageDigests[:0] + for digest := range dedup { + cache.ImageDigests = append(cache.ImageDigests, digest) + } + sort.Strings(cache.ImageDigests) + + if err := s.persistence.SetNodeImageCache(ctx, cache, nodeImageCacheTTL); err != nil { + return nil, err + } + return &ateapipb.ReportNodeImageCacheResponse{}, nil +} diff --git a/cmd/ateapi/internal/controlapi/report_node_image_cache_test.go b/cmd/ateapi/internal/controlapi/report_node_image_cache_test.go new file mode 100644 index 000000000..9a4b026d0 --- /dev/null +++ b/cmd/ateapi/internal/controlapi/report_node_image_cache_test.go @@ -0,0 +1,58 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controlapi + +import ( + "context" + "testing" + + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestReportNodeImageCache(t *testing.T) { + persistence, cleanup := storetest.SetupTestStore(t) + defer cleanup() + service := &Service{persistence: persistence} + + _, err := service.ReportNodeImageCache(context.Background(), &ateapipb.ReportNodeImageCacheRequest{ + Cache: &ateapipb.NodeImageCache{ + NodeName: "node-1", + AteletPodUid: "atelet-uid", + ImageDigests: []string{"sha256:b", "sha256:a", "sha256:a"}, + }, + }) + if err != nil { + t.Fatalf("ReportNodeImageCache failed: %v", err) + } + + got, err := persistence.GetNodeImageCache(context.Background(), "node-1") + if err != nil { + t.Fatalf("GetNodeImageCache failed: %v", err) + } + if len(got.GetImageDigests()) != 2 || got.GetImageDigests()[0] != "sha256:a" || got.GetImageDigests()[1] != "sha256:b" { + t.Fatalf("stored digests = %v, want [sha256:a sha256:b]", got.GetImageDigests()) + } +} + +func TestReportNodeImageCacheValidatesIdentity(t *testing.T) { + service := &Service{} + _, err := service.ReportNodeImageCache(context.Background(), &ateapipb.ReportNodeImageCacheRequest{}) + if status.Code(err) != codes.InvalidArgument { + t.Fatalf("ReportNodeImageCache status = %v, want InvalidArgument", status.Code(err)) + } +} diff --git a/cmd/ateapi/internal/controlapi/workflow_resume.go b/cmd/ateapi/internal/controlapi/workflow_resume.go index a604e1709..65975c263 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume.go @@ -21,6 +21,7 @@ import ( "log/slog" "math/rand" "slices" + "strings" "time" "github.com/agent-substrate/substrate/cmd/ateapi/internal/store" @@ -155,7 +156,18 @@ func (s *AssignWorkerStep) Execute(ctx context.Context, input *ResumeInput, stat // If not, find a free one using randomized shuffling if assignedWorker == nil { - pickedWorker := s.findFreeWorker(workers, eligible, state.Actor.GetLatestSnapshotInfo().GetLocal().GetNodeVmsWithLocalSnapshots()) + requiredDigests, err := actorTemplateImageDigests(state.ActorTemplate) + if err != nil { + return fmt.Errorf("while resolving actor image digests: %w", err) + } + nodeCaches := s.loadNodeImageCaches(ctx, workers) + pickedWorker := s.findFreeWorker( + workers, + eligible, + state.Actor.GetLatestSnapshotInfo().GetLocal().GetNodeVmsWithLocalSnapshots(), + requiredDigests, + nodeCaches, + ) if pickedWorker == nil { return status.Errorf(codes.FailedPrecondition, "no free workers available") } @@ -194,8 +206,79 @@ func (s *AssignWorkerStep) RetryBackoff() *wait.Backoff { } } -func (s *AssignWorkerStep) findFreeWorker(workers []*ateapipb.Worker, eligible map[types.NamespacedName]struct{}, nodesRestrictions []string) *ateapipb.Worker { +func actorTemplateImageDigests(actorTemplate *atev1alpha1.ActorTemplate) ([]string, error) { + refs := make([]string, 0, len(actorTemplate.Spec.Containers)+1) + refs = append(refs, actorTemplate.Spec.PauseImage) + for _, container := range actorTemplate.Spec.Containers { + refs = append(refs, container.Image) + } + + digestSet := make(map[string]struct{}, len(refs)) + for _, ref := range refs { + at := strings.LastIndexByte(ref, '@') + if at < 0 || at == len(ref)-1 { + return nil, fmt.Errorf("image reference %q is not digest-pinned", ref) + } + digestSet[ref[at+1:]] = struct{}{} + } + + digests := make([]string, 0, len(digestSet)) + for digest := range digestSet { + digests = append(digests, digest) + } + slices.Sort(digests) + return digests, nil +} + +func (s *AssignWorkerStep) loadNodeImageCaches(ctx context.Context, workers []*ateapipb.Worker) map[string]*ateapipb.NodeImageCache { + caches := make(map[string]*ateapipb.NodeImageCache) + seen := make(map[string]struct{}) + for _, worker := range workers { + nodeName := worker.GetNodeName() + if nodeName == "" { + continue + } + if _, ok := seen[nodeName]; ok { + continue + } + seen[nodeName] = struct{}{} + cache, err := s.store.GetNodeImageCache(ctx, nodeName) + if err != nil { + if !errors.Is(err, store.ErrNotFound) { + slog.WarnContext(ctx, "Ignoring unavailable node image-cache report", "node", nodeName, "err", err) + } + continue + } + caches[nodeName] = cache + } + return caches +} + +func hasAllImageDigests(cache *ateapipb.NodeImageCache, requiredDigests []string) bool { + if cache == nil || len(requiredDigests) == 0 { + return false + } + cached := make(map[string]struct{}, len(cache.GetImageDigests())) + for _, digest := range cache.GetImageDigests() { + cached[digest] = struct{}{} + } + for _, digest := range requiredDigests { + if _, ok := cached[digest]; !ok { + return false + } + } + return true +} + +func (s *AssignWorkerStep) findFreeWorker( + workers []*ateapipb.Worker, + eligible map[types.NamespacedName]struct{}, + nodesRestrictions []string, + requiredDigests []string, + nodeCaches map[string]*ateapipb.NodeImageCache, +) *ateapipb.Worker { var freeWorkers []*ateapipb.Worker + var cacheHitWorkers []*ateapipb.Worker for _, worker := range workers { if worker.GetActorId() != "" { continue @@ -205,9 +288,18 @@ func (s *AssignWorkerStep) findFreeWorker(workers []*ateapipb.Worker, eligible m } if len(nodesRestrictions) == 0 || slices.Contains(nodesRestrictions, worker.GetNodeName()) { freeWorkers = append(freeWorkers, worker) + if hasAllImageDigests(nodeCaches[worker.GetNodeName()], requiredDigests) { + cacheHitWorkers = append(cacheHitWorkers, worker) + } } } + if len(cacheHitWorkers) > 0 { + rand.Shuffle(len(cacheHitWorkers), func(i, j int) { + cacheHitWorkers[i], cacheHitWorkers[j] = cacheHitWorkers[j], cacheHitWorkers[i] + }) + return cacheHitWorkers[0] + } if len(freeWorkers) > 0 { rand.Shuffle(len(freeWorkers), func(i, j int) { freeWorkers[i], freeWorkers[j] = freeWorkers[j], freeWorkers[i] diff --git a/cmd/ateapi/internal/controlapi/workflow_resume_test.go b/cmd/ateapi/internal/controlapi/workflow_resume_test.go index c4c0cca4a..bac8d1a76 100644 --- a/cmd/ateapi/internal/controlapi/workflow_resume_test.go +++ b/cmd/ateapi/internal/controlapi/workflow_resume_test.go @@ -15,8 +15,11 @@ package controlapi import ( + "context" "testing" + "time" + "github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" "github.com/agent-substrate/substrate/pkg/proto/ateapipb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +36,88 @@ func pool(namespace, name string, labels map[string]string) *atev1alpha1.WorkerP } } +const ( + testDigestA = "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + testDigestB = "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" +) + +func TestActorTemplateImageDigests(t *testing.T) { + actorTemplate := &atev1alpha1.ActorTemplate{ + Spec: atev1alpha1.ActorTemplateSpec{ + PauseImage: "example.com/pause@" + testDigestA, + Containers: []atev1alpha1.Container{ + {Image: "example.com/app@" + testDigestB}, + {Image: "example.com/duplicate@" + testDigestA}, + }, + }, + } + got, err := actorTemplateImageDigests(actorTemplate) + if err != nil { + t.Fatalf("actorTemplateImageDigests failed: %v", err) + } + if len(got) != 2 || got[0] != testDigestA || got[1] != testDigestB { + t.Fatalf("actorTemplateImageDigests = %v, want [%s %s]", got, testDigestA, testDigestB) + } +} + +func TestFindFreeWorkerPrefersCompleteImageCacheHit(t *testing.T) { + step := &AssignWorkerStep{} + eligible := map[types.NamespacedName]struct{}{{Namespace: "ns", Name: "pool"}: {}} + workers := []*ateapipb.Worker{ + {WorkerNamespace: "ns", WorkerPool: "pool", WorkerPod: "cold", NodeName: "node-cold"}, + {WorkerNamespace: "ns", WorkerPool: "pool", WorkerPod: "warm", NodeName: "node-warm"}, + } + caches := map[string]*ateapipb.NodeImageCache{ + "node-warm": {NodeName: "node-warm", ImageDigests: []string{testDigestA, testDigestB}}, + } + + got := step.findFreeWorker(workers, eligible, nil, []string{testDigestA, testDigestB}, caches) + if got.GetWorkerPod() != "warm" { + t.Fatalf("findFreeWorker selected %q, want warm", got.GetWorkerPod()) + } +} + +func TestFindFreeWorkerSnapshotRestrictionPrecedesImageAffinity(t *testing.T) { + step := &AssignWorkerStep{} + eligible := map[types.NamespacedName]struct{}{{Namespace: "ns", Name: "pool"}: {}} + workers := []*ateapipb.Worker{ + {WorkerNamespace: "ns", WorkerPool: "pool", WorkerPod: "snapshot-local", NodeName: "node-snapshot"}, + {WorkerNamespace: "ns", WorkerPool: "pool", WorkerPod: "warm", NodeName: "node-warm"}, + } + caches := map[string]*ateapipb.NodeImageCache{ + "node-warm": {NodeName: "node-warm", ImageDigests: []string{testDigestA}}, + } + + got := step.findFreeWorker(workers, eligible, []string{"node-snapshot"}, []string{testDigestA}, caches) + if got.GetWorkerPod() != "snapshot-local" { + t.Fatalf("findFreeWorker selected %q, want snapshot-local", got.GetWorkerPod()) + } +} + +func TestHasAllImageDigestsRejectsPartialHit(t *testing.T) { + cache := &ateapipb.NodeImageCache{ImageDigests: []string{testDigestA}} + if hasAllImageDigests(cache, []string{testDigestA, testDigestB}) { + t.Fatal("partial image-cache hit was treated as a complete hit") + } +} + +func TestLoadNodeImageCachesDeduplicatesNodes(t *testing.T) { + store, cleanup := storetest.SetupTestStore(t) + defer cleanup() + if err := store.SetNodeImageCache(context.Background(), &ateapipb.NodeImageCache{ + NodeName: "node-1", ImageDigests: []string{testDigestA}, + }, time.Minute); err != nil { + t.Fatalf("SetNodeImageCache failed: %v", err) + } + step := &AssignWorkerStep{store: store} + got := step.loadNodeImageCaches(context.Background(), []*ateapipb.Worker{ + {NodeName: "node-1"}, {NodeName: "node-1"}, {NodeName: "node-missing"}, + }) + if len(got) != 1 || got["node-1"] == nil { + t.Fatalf("loadNodeImageCaches = %v, want only node-1", got) + } +} + func TestEligibleWorkerPools(t *testing.T) { tests := []struct { name string diff --git a/cmd/ateapi/internal/store/ateredis/ateredis.go b/cmd/ateapi/internal/store/ateredis/ateredis.go index 206395c97..48915669f 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis.go @@ -87,6 +87,49 @@ func workerDBKey(namespace, poolName, podName string) string { return "worker:" + namespace + ":" + poolName + ":" + podName } +func nodeImageCacheDBKey(nodeName string) string { + return "node-image-cache:" + nodeName +} + +func (s *Persistence) SetNodeImageCache(ctx context.Context, cache *ateapipb.NodeImageCache, ttl time.Duration) error { + if cache.GetNodeName() == "" { + return fmt.Errorf("node_name is required") + } + if ttl <= 0 { + return fmt.Errorf("ttl must be positive") + } + + dbCache := proto.Clone(cache).(*ateapipb.NodeImageCache) + sort.Strings(dbCache.ImageDigests) + b, err := protojson.Marshal(dbCache) + if err != nil { + return fmt.Errorf("while marshaling node image cache: %w", err) + } + if err := s.rdb.Set(ctx, nodeImageCacheDBKey(cache.GetNodeName()), b, ttl).Err(); err != nil { + return fmt.Errorf("while storing node image cache: %w", err) + } + return nil +} + +func (s *Persistence) GetNodeImageCache(ctx context.Context, nodeName string) (*ateapipb.NodeImageCache, error) { + b, err := s.rdb.Get(ctx, nodeImageCacheDBKey(nodeName)).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return nil, store.ErrNotFound + } + return nil, fmt.Errorf("while getting node image cache: %w", err) + } + + cache := &ateapipb.NodeImageCache{} + if err := protojson.Unmarshal(b, cache); err != nil { + return nil, fmt.Errorf("while unmarshaling node image cache: %w", err) + } + if cache.GetNodeName() != nodeName { + return nil, fmt.Errorf("(impossible) mismatch between stored node name and key") + } + return cache, nil +} + // DebugClearAll flushes all data from Redis. func (s *Persistence) DebugClearAll(ctx context.Context) error { // Iterate through every Primary (Master) node in the cluster diff --git a/cmd/ateapi/internal/store/ateredis/ateredis_test.go b/cmd/ateapi/internal/store/ateredis/ateredis_test.go index eb41fc6d4..938c1d1ba 100644 --- a/cmd/ateapi/internal/store/ateredis/ateredis_test.go +++ b/cmd/ateapi/internal/store/ateredis/ateredis_test.go @@ -55,6 +55,35 @@ func TestGetActor_NotFound(t *testing.T) { } } +func TestNodeImageCache(t *testing.T) { + mr, s, ctx := setupTest(t) + defer mr.Close() + + cache := &ateapipb.NodeImageCache{ + NodeName: "node-1", + AteletPodUid: "atelet-uid", + ImageDigests: []string{"sha256:b", "sha256:a"}, + } + if err := s.SetNodeImageCache(ctx, cache, time.Minute); err != nil { + t.Fatalf("SetNodeImageCache failed: %v", err) + } + + got, err := s.GetNodeImageCache(ctx, "node-1") + if err != nil { + t.Fatalf("GetNodeImageCache failed: %v", err) + } + want := proto.Clone(cache).(*ateapipb.NodeImageCache) + want.ImageDigests = []string{"sha256:a", "sha256:b"} + if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" { + t.Errorf("node image cache mismatch (-want +got):\n%s", diff) + } + + mr.FastForward(time.Minute) + if _, err := s.GetNodeImageCache(ctx, "node-1"); !errors.Is(err, store.ErrNotFound) { + t.Fatalf("GetNodeImageCache after TTL = %v, want ErrNotFound", err) + } +} + func TestCreateActor_Success(t *testing.T) { mr, s, ctx := setupTest(t) defer mr.Close() diff --git a/cmd/ateapi/internal/store/store.go b/cmd/ateapi/internal/store/store.go index 638e7f0ee..fe3d4d65d 100644 --- a/cmd/ateapi/internal/store/store.go +++ b/cmd/ateapi/internal/store/store.go @@ -69,6 +69,14 @@ type Interface interface { // Lists all known workers. Returns nil if none found. ListWorkers(ctx context.Context) ([]*ateapipb.Worker, error) + // Stores the latest full image-cache snapshot reported by the atelet on a + // node. The record expires unless it is refreshed before ttl elapses. + SetNodeImageCache(ctx context.Context, cache *ateapipb.NodeImageCache, ttl time.Duration) error + + // Fetches the image-cache snapshot for a node. Returns ErrNotFound when no + // report exists or the last report has expired. + GetNodeImageCache(ctx context.Context, nodeName string) (*ateapipb.NodeImageCache, error) + // AcquireLock attempts to acquire a distributed lock with a TTL. // Returns true if the lock was successfully acquired. // Returns false if the lock is already held by another client (conflict). diff --git a/cmd/atelet/cache_reporter.go b/cmd/atelet/cache_reporter.go new file mode 100644 index 000000000..0a4b80d75 --- /dev/null +++ b/cmd/atelet/cache_reporter.go @@ -0,0 +1,71 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "log/slog" + "time" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc" +) + +const imageCacheReportTimeout = 10 * time.Second + +type nodeImageCacheReportClient interface { + ReportNodeImageCache(context.Context, *ateapipb.ReportNodeImageCacheRequest, ...grpc.CallOption) (*ateapipb.ReportNodeImageCacheResponse, error) +} + +type imageDigestSource interface { + Digests() []string +} + +type nodeImageCacheReporter struct { + client nodeImageCacheReportClient + cache imageDigestSource + nodeName string + ateletPodUID string + interval time.Duration +} + +func (r *nodeImageCacheReporter) run(ctx context.Context) { + r.report(ctx) + ticker := time.NewTicker(r.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.report(ctx) + } + } +} + +func (r *nodeImageCacheReporter) report(ctx context.Context) { + reportCtx, cancel := context.WithTimeout(ctx, imageCacheReportTimeout) + defer cancel() + _, err := r.client.ReportNodeImageCache(reportCtx, &ateapipb.ReportNodeImageCacheRequest{ + Cache: &ateapipb.NodeImageCache{ + NodeName: r.nodeName, + AteletPodUid: r.ateletPodUID, + ImageDigests: r.cache.Digests(), + }, + }) + if err != nil { + slog.WarnContext(ctx, "Failed to report node image cache", "node", r.nodeName, "err", err) + } +} diff --git a/cmd/atelet/cache_reporter_test.go b/cmd/atelet/cache_reporter_test.go new file mode 100644 index 000000000..1f92500f2 --- /dev/null +++ b/cmd/atelet/cache_reporter_test.go @@ -0,0 +1,57 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "testing" + "time" + + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" + "google.golang.org/grpc" +) + +type fakeDigestSource struct{ digests []string } + +func (f fakeDigestSource) Digests() []string { return f.digests } + +type fakeNodeImageCacheReportClient struct { + req *ateapipb.ReportNodeImageCacheRequest +} + +func (f *fakeNodeImageCacheReportClient) ReportNodeImageCache(_ context.Context, req *ateapipb.ReportNodeImageCacheRequest, _ ...grpc.CallOption) (*ateapipb.ReportNodeImageCacheResponse, error) { + f.req = req + return &ateapipb.ReportNodeImageCacheResponse{}, nil +} + +func TestNodeImageCacheReporter(t *testing.T) { + client := &fakeNodeImageCacheReportClient{} + reporter := &nodeImageCacheReporter{ + client: client, + cache: fakeDigestSource{digests: []string{"sha256:a", "sha256:b"}}, + nodeName: "node-1", + ateletPodUID: "atelet-uid", + interval: time.Minute, + } + reporter.report(context.Background()) + + cache := client.req.GetCache() + if cache.GetNodeName() != "node-1" || cache.GetAteletPodUid() != "atelet-uid" { + t.Fatalf("unexpected cache identity: %v", cache) + } + if got := cache.GetImageDigests(); len(got) != 2 || got[0] != "sha256:a" || got[1] != "sha256:b" { + t.Fatalf("reported digests = %v, want [sha256:a sha256:b]", got) + } +} diff --git a/cmd/atelet/internal/memorypullcache/memorypullcache.go b/cmd/atelet/internal/memorypullcache/memorypullcache.go index 481db59fb..e29cb329a 100644 --- a/cmd/atelet/internal/memorypullcache/memorypullcache.go +++ b/cmd/atelet/internal/memorypullcache/memorypullcache.go @@ -22,7 +22,9 @@ import ( "log/slog" "net" "runtime" + "sort" "strings" + "sync" "github.com/google/go-containerregistry/pkg/authn" "github.com/google/go-containerregistry/pkg/name" @@ -40,6 +42,10 @@ type MemoryPullCache struct { // Map from hexadecimal sha256 hash of image to byte contents of composed // tarball cache *lru.Cache + + cacheMutationMu sync.Mutex + digestsMu sync.RWMutex + digests map[string]struct{} } func NewMemoryPullCache(ctx context.Context, gcpAuthenticator authn.Authenticator, localhostRegistryReplacement string) (*MemoryPullCache, error) { @@ -56,9 +62,18 @@ func NewMemoryPullCache(ctx context.Context, gcpAuthenticator authn.Authenticato // caches that could fill up or have weird behavior, it might be better // to just have two levels. Store some images in ateom memory, and the // rest are kept in a shared GCS cache. - cache: lru.New(256), localhostRegistryReplacement: localhostRegistryReplacement, + digests: make(map[string]struct{}), } + c.cache = lru.NewWithEvictionFunc(256, func(key lru.Key, _ interface{}) { + digest, ok := key.(string) + if !ok { + return + } + c.digestsMu.Lock() + delete(c.digests, digest) + c.digestsMu.Unlock() + }) c.gcpAuthenticator = gcpAuthenticator @@ -171,7 +186,7 @@ func (c *MemoryPullCache) Fetch(ctx context.Context, ref string) (io.ReadCloser, // not be the same as the digest of the image we actually downloaded // from the registry. We need to place the cache entry under the digest // they requested. - c.cache.Add(requestedDigest.DigestStr(), memData) + c.add(requestedDigest.DigestStr(), memData) slog.InfoContext( ctx, "Populated image cache", @@ -183,6 +198,35 @@ func (c *MemoryPullCache) Fetch(ctx context.Context, ref string) (io.ReadCloser, return io.NopCloser(bytes.NewReader(memData)), nil } +func (c *MemoryPullCache) add(digest string, contents []byte) { + // Serialize membership changes with Digests. The LRU's eviction callback is + // synchronous, so it updates the reporting index before Add returns. + c.cacheMutationMu.Lock() + defer c.cacheMutationMu.Unlock() + + c.cache.Add(digest, contents) + c.digestsMu.Lock() + c.digests[digest] = struct{}{} + c.digestsMu.Unlock() +} + +// Digests returns a sorted snapshot of the image digests currently held in +// the in-memory pull cache. +func (c *MemoryPullCache) Digests() []string { + c.cacheMutationMu.Lock() + defer c.cacheMutationMu.Unlock() + + c.digestsMu.RLock() + digests := make([]string, 0, len(c.digests)) + for digest := range c.digests { + digests = append(digests, digest) + } + c.digestsMu.RUnlock() + + sort.Strings(digests) + return digests +} + func registryHost(ref string) string { parts := strings.SplitN(ref, "/", 2) reg, err := name.NewRegistry(parts[0], name.Insecure) diff --git a/cmd/atelet/internal/memorypullcache/memorypullcache_test.go b/cmd/atelet/internal/memorypullcache/memorypullcache_test.go index 5cae666c5..35735d903 100644 --- a/cmd/atelet/internal/memorypullcache/memorypullcache_test.go +++ b/cmd/atelet/internal/memorypullcache/memorypullcache_test.go @@ -15,9 +15,37 @@ package memorypullcache import ( + "context" + "fmt" + "slices" "testing" ) +func addCachedDigest(c *MemoryPullCache, digest string) { + c.add(digest, []byte(digest)) +} + +func TestDigestsTracksLRUContents(t *testing.T) { + c, err := NewMemoryPullCache(context.Background(), nil, "") + if err != nil { + t.Fatalf("NewMemoryPullCache failed: %v", err) + } + for i := 0; i < 257; i++ { + addCachedDigest(c, fmt.Sprintf("sha256:%03d", i)) + } + + digests := c.Digests() + if len(digests) != 256 { + t.Fatalf("Digests returned %d entries, want 256", len(digests)) + } + if slices.Contains(digests, "sha256:000") { + t.Error("Digests contains the evicted oldest entry") + } + if !slices.IsSorted(digests) { + t.Errorf("Digests is not sorted: %v", digests) + } +} + func TestIsLocalRegistry(t *testing.T) { tests := []struct { ref string diff --git a/cmd/atelet/main.go b/cmd/atelet/main.go index 7094f7315..74b9047e1 100644 --- a/cmd/atelet/main.go +++ b/cmd/atelet/main.go @@ -16,6 +16,7 @@ package main import ( "context" + "crypto/tls" "encoding/json" "errors" "fmt" @@ -26,6 +27,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "cloud.google.com/go/storage" "github.com/agent-substrate/substrate/cmd/atelet/internal/ategcs" @@ -37,6 +39,7 @@ import ( "github.com/agent-substrate/substrate/internal/resources" "github.com/agent-substrate/substrate/internal/serverboot" "github.com/agent-substrate/substrate/internal/version" + "github.com/agent-substrate/substrate/pkg/proto/ateapipb" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/google/go-containerregistry/pkg/authn" @@ -51,6 +54,7 @@ import ( "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" @@ -63,6 +67,8 @@ var ( gcpAuthForImagePulls = pflag.Bool("gcp-auth-for-image-pulls", true, "Use GCP application default credentials mechanism.") localhostRegistryReplacement = pflag.String("localhost-registry-replacement", "", "The replacement registry endpoint for localhost and/or loopback IP addresses, useful for local development. for example kind-registry:5000") + ateapiAddress = pflag.String("ateapi-address", "api.ate-system.svc:443", "gRPC address of the ateapi Control service used for image-cache reports.") + imageCacheReportInterval = pflag.Duration("image-cache-report-interval", 30*time.Second, "How often to report the full node image-cache contents to ateapi.") showVersion = pflag.Bool("version", false, "Print version and exit.") ) @@ -113,6 +119,29 @@ func main() { if err != nil { serverboot.Fatal(ctx, "Failed to create pull cache", err) } + if nodeName, ateletPodUID := os.Getenv("MY_NODE_NAME"), os.Getenv("POD_UID"); nodeName == "" || ateletPodUID == "" { + slog.WarnContext(ctx, "Image-cache reporting disabled because MY_NODE_NAME or POD_UID is unset") + } else if *imageCacheReportInterval <= 0 { + slog.WarnContext(ctx, "Image-cache reporting disabled because report interval is not positive") + } else { + apiConn, err := grpc.NewClient( + *ateapiAddress, + grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) + if err != nil { + serverboot.Fatal(ctx, "Failed to create ateapi client for image-cache reporting", err) + } + defer apiConn.Close() + reporter := &nodeImageCacheReporter{ + client: ateapipb.NewControlClient(apiConn), + cache: pullCache, + nodeName: nodeName, + ateletPodUID: ateletPodUID, + interval: *imageCacheReportInterval, + } + go reporter.run(ctx) + } anonGCSClient, err := storage.NewClient(ctx, option.WithoutAuthentication()) if err != nil { diff --git a/pkg/proto/ateapipb/ateapi.pb.go b/pkg/proto/ateapipb/ateapi.pb.go index f9040f2d1..7be8d70fb 100644 --- a/pkg/proto/ateapipb/ateapi.pb.go +++ b/pkg/proto/ateapipb/ateapi.pb.go @@ -1501,6 +1501,146 @@ func (x *Worker) GetNodeName() string { return "" } +type NodeImageCache struct { + state protoimpl.MessageState `protogen:"open.v1"` + NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` + AteletPodUid string `protobuf:"bytes,2,opt,name=atelet_pod_uid,json=ateletPodUid,proto3" json:"atelet_pod_uid,omitempty"` + ImageDigests []string `protobuf:"bytes,3,rep,name=image_digests,json=imageDigests,proto3" json:"image_digests,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *NodeImageCache) Reset() { + *x = NodeImageCache{} + mi := &file_ateapi_proto_msgTypes[24] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *NodeImageCache) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NodeImageCache) ProtoMessage() {} + +func (x *NodeImageCache) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[24] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NodeImageCache.ProtoReflect.Descriptor instead. +func (*NodeImageCache) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{24} +} + +func (x *NodeImageCache) GetNodeName() string { + if x != nil { + return x.NodeName + } + return "" +} + +func (x *NodeImageCache) GetAteletPodUid() string { + if x != nil { + return x.AteletPodUid + } + return "" +} + +func (x *NodeImageCache) GetImageDigests() []string { + if x != nil { + return x.ImageDigests + } + return nil +} + +type ReportNodeImageCacheRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Cache *NodeImageCache `protobuf:"bytes,1,opt,name=cache,proto3" json:"cache,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReportNodeImageCacheRequest) Reset() { + *x = ReportNodeImageCacheRequest{} + mi := &file_ateapi_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReportNodeImageCacheRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReportNodeImageCacheRequest) ProtoMessage() {} + +func (x *ReportNodeImageCacheRequest) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[25] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReportNodeImageCacheRequest.ProtoReflect.Descriptor instead. +func (*ReportNodeImageCacheRequest) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{25} +} + +func (x *ReportNodeImageCacheRequest) GetCache() *NodeImageCache { + if x != nil { + return x.Cache + } + return nil +} + +type ReportNodeImageCacheResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ReportNodeImageCacheResponse) Reset() { + *x = ReportNodeImageCacheResponse{} + mi := &file_ateapi_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ReportNodeImageCacheResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReportNodeImageCacheResponse) ProtoMessage() {} + +func (x *ReportNodeImageCacheResponse) ProtoReflect() protoreflect.Message { + mi := &file_ateapi_proto_msgTypes[26] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReportNodeImageCacheResponse.ProtoReflect.Descriptor instead. +func (*ReportNodeImageCacheResponse) Descriptor() ([]byte, []int) { + return file_ateapi_proto_rawDescGZIP(), []int{26} +} + type DebugClearRequest struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -1509,7 +1649,7 @@ type DebugClearRequest struct { func (x *DebugClearRequest) Reset() { *x = DebugClearRequest{} - mi := &file_ateapi_proto_msgTypes[24] + mi := &file_ateapi_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1521,7 +1661,7 @@ func (x *DebugClearRequest) String() string { func (*DebugClearRequest) ProtoMessage() {} func (x *DebugClearRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[24] + mi := &file_ateapi_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1534,7 +1674,7 @@ func (x *DebugClearRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use DebugClearRequest.ProtoReflect.Descriptor instead. func (*DebugClearRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{24} + return file_ateapi_proto_rawDescGZIP(), []int{27} } type DebugClearResponse struct { @@ -1545,7 +1685,7 @@ type DebugClearResponse struct { func (x *DebugClearResponse) Reset() { *x = DebugClearResponse{} - mi := &file_ateapi_proto_msgTypes[25] + mi := &file_ateapi_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1557,7 +1697,7 @@ func (x *DebugClearResponse) String() string { func (*DebugClearResponse) ProtoMessage() {} func (x *DebugClearResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[25] + mi := &file_ateapi_proto_msgTypes[28] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1570,7 +1710,7 @@ func (x *DebugClearResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use DebugClearResponse.ProtoReflect.Descriptor instead. func (*DebugClearResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{25} + return file_ateapi_proto_rawDescGZIP(), []int{28} } type MintJWTRequest struct { @@ -1585,7 +1725,7 @@ type MintJWTRequest struct { func (x *MintJWTRequest) Reset() { *x = MintJWTRequest{} - mi := &file_ateapi_proto_msgTypes[26] + mi := &file_ateapi_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1597,7 +1737,7 @@ func (x *MintJWTRequest) String() string { func (*MintJWTRequest) ProtoMessage() {} func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[26] + mi := &file_ateapi_proto_msgTypes[29] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1610,7 +1750,7 @@ func (x *MintJWTRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTRequest.ProtoReflect.Descriptor instead. func (*MintJWTRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{26} + return file_ateapi_proto_rawDescGZIP(), []int{29} } func (x *MintJWTRequest) GetAudience() []string { @@ -1670,7 +1810,7 @@ type MintJWTResponse struct { func (x *MintJWTResponse) Reset() { *x = MintJWTResponse{} - mi := &file_ateapi_proto_msgTypes[27] + mi := &file_ateapi_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1682,7 +1822,7 @@ func (x *MintJWTResponse) String() string { func (*MintJWTResponse) ProtoMessage() {} func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[27] + mi := &file_ateapi_proto_msgTypes[30] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1695,7 +1835,7 @@ func (x *MintJWTResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintJWTResponse.ProtoReflect.Descriptor instead. func (*MintJWTResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{27} + return file_ateapi_proto_rawDescGZIP(), []int{30} } func (x *MintJWTResponse) GetSessionJwt() string { @@ -1720,7 +1860,7 @@ type MintCertRequest struct { func (x *MintCertRequest) Reset() { *x = MintCertRequest{} - mi := &file_ateapi_proto_msgTypes[28] + mi := &file_ateapi_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1732,7 +1872,7 @@ func (x *MintCertRequest) String() string { func (*MintCertRequest) ProtoMessage() {} func (x *MintCertRequest) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[28] + mi := &file_ateapi_proto_msgTypes[31] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1745,7 +1885,7 @@ func (x *MintCertRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertRequest.ProtoReflect.Descriptor instead. func (*MintCertRequest) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{28} + return file_ateapi_proto_rawDescGZIP(), []int{31} } func (x *MintCertRequest) GetAppId() string { @@ -1788,7 +1928,7 @@ type MintCertResponse struct { func (x *MintCertResponse) Reset() { *x = MintCertResponse{} - mi := &file_ateapi_proto_msgTypes[29] + mi := &file_ateapi_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1800,7 +1940,7 @@ func (x *MintCertResponse) String() string { func (*MintCertResponse) ProtoMessage() {} func (x *MintCertResponse) ProtoReflect() protoreflect.Message { - mi := &file_ateapi_proto_msgTypes[29] + mi := &file_ateapi_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1813,7 +1953,7 @@ func (x *MintCertResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MintCertResponse.ProtoReflect.Descriptor instead. func (*MintCertResponse) Descriptor() ([]byte, []int) { - return file_ateapi_proto_rawDescGZIP(), []int{29} + return file_ateapi_proto_rawDescGZIP(), []int{32} } func (x *MintCertResponse) GetSessionCertificates() [][]byte { @@ -1923,7 +2063,14 @@ const file_ateapi_proto_rawDesc = "" + "\aversion\x18\b \x01(\x03R\aversion\x12$\n" + "\x0eworker_pod_uid\x18\t \x01(\tR\fworkerPodUid\x12\x1b\n" + "\tnode_name\x18\n" + - " \x01(\tR\bnodeName\"\x13\n" + + " \x01(\tR\bnodeName\"x\n" + + "\x0eNodeImageCache\x12\x1b\n" + + "\tnode_name\x18\x01 \x01(\tR\bnodeName\x12$\n" + + "\x0eatelet_pod_uid\x18\x02 \x01(\tR\fateletPodUid\x12#\n" + + "\rimage_digests\x18\x03 \x03(\tR\fimageDigests\"K\n" + + "\x1bReportNodeImageCacheRequest\x12,\n" + + "\x05cache\x18\x01 \x01(\v2\x16.ateapi.NodeImageCacheR\x05cache\"\x1e\n" + + "\x1cReportNodeImageCacheResponse\"\x13\n" + "\x11DebugClearRequest\"\x14\n" + "\x12DebugClearResponse\"{\n" + "\x0eMintJWTRequest\x12\x1a\n" + @@ -1946,7 +2093,7 @@ const file_ateapi_proto_rawDesc = "" + "\fSnapshotType\x12\x1d\n" + "\x19SNAPSHOT_TYPE_UNSPECIFIED\x10\x00\x12\x17\n" + "\x13SNAPSHOT_TYPE_LOCAL\x10\x01\x12\x1a\n" + - "\x16SNAPSHOT_TYPE_EXTERNAL\x10\x022\xde\x05\n" + + "\x16SNAPSHOT_TYPE_EXTERNAL\x10\x022\xc3\x06\n" + "\aControl\x12?\n" + "\bGetActor\x12\x17.ateapi.GetActorRequest\x1a\x18.ateapi.GetActorResponse\"\x00\x12H\n" + "\vCreateActor\x12\x1a.ateapi.CreateActorRequest\x1a\x1b.ateapi.CreateActorResponse\"\x00\x12H\n" + @@ -1958,7 +2105,8 @@ const file_ateapi_proto_rawDesc = "" + "\vDeleteActor\x12\x1a.ateapi.DeleteActorRequest\x1a\x1b.ateapi.DeleteActorResponse\"\x00\x12H\n" + "\vListWorkers\x12\x1a.ateapi.ListWorkersRequest\x1a\x1b.ateapi.ListWorkersResponse\"\x00\x12E\n" + "\n" + - "ListActors\x12\x19.ateapi.ListActorsRequest\x1a\x1a.ateapi.ListActorsResponse\"\x00\x12E\n" + + "ListActors\x12\x19.ateapi.ListActorsRequest\x1a\x1a.ateapi.ListActorsResponse\"\x00\x12c\n" + + "\x14ReportNodeImageCache\x12#.ateapi.ReportNodeImageCacheRequest\x1a$.ateapi.ReportNodeImageCacheResponse\"\x00\x12E\n" + "\n" + "DebugClear\x12\x19.ateapi.DebugClearRequest\x1a\x1a.ateapi.DebugClearResponse\"\x002\x8c\x01\n" + "\x0fSessionIdentity\x12:\n" + @@ -1978,47 +2126,50 @@ func file_ateapi_proto_rawDescGZIP() []byte { } var file_ateapi_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 31) +var file_ateapi_proto_msgTypes = make([]protoimpl.MessageInfo, 34) var file_ateapi_proto_goTypes = []any{ - (SnapshotType)(0), // 0: ateapi.SnapshotType - (Actor_Status)(0), // 1: ateapi.Actor.Status - (*ExternalSnapshotInfo)(nil), // 2: ateapi.ExternalSnapshotInfo - (*LocalSnapshotInfo)(nil), // 3: ateapi.LocalSnapshotInfo - (*SnapshotInfo)(nil), // 4: ateapi.SnapshotInfo - (*Selector)(nil), // 5: ateapi.Selector - (*Actor)(nil), // 6: ateapi.Actor - (*GetActorRequest)(nil), // 7: ateapi.GetActorRequest - (*GetActorResponse)(nil), // 8: ateapi.GetActorResponse - (*CreateActorRequest)(nil), // 9: ateapi.CreateActorRequest - (*CreateActorResponse)(nil), // 10: ateapi.CreateActorResponse - (*UpdateActorRequest)(nil), // 11: ateapi.UpdateActorRequest - (*UpdateActorResponse)(nil), // 12: ateapi.UpdateActorResponse - (*SuspendActorRequest)(nil), // 13: ateapi.SuspendActorRequest - (*SuspendActorResponse)(nil), // 14: ateapi.SuspendActorResponse - (*PauseActorRequest)(nil), // 15: ateapi.PauseActorRequest - (*PauseActorResponse)(nil), // 16: ateapi.PauseActorResponse - (*ResumeActorRequest)(nil), // 17: ateapi.ResumeActorRequest - (*ResumeActorResponse)(nil), // 18: ateapi.ResumeActorResponse - (*DeleteActorRequest)(nil), // 19: ateapi.DeleteActorRequest - (*DeleteActorResponse)(nil), // 20: ateapi.DeleteActorResponse - (*ListWorkersRequest)(nil), // 21: ateapi.ListWorkersRequest - (*ListWorkersResponse)(nil), // 22: ateapi.ListWorkersResponse - (*ListActorsRequest)(nil), // 23: ateapi.ListActorsRequest - (*ListActorsResponse)(nil), // 24: ateapi.ListActorsResponse - (*Worker)(nil), // 25: ateapi.Worker - (*DebugClearRequest)(nil), // 26: ateapi.DebugClearRequest - (*DebugClearResponse)(nil), // 27: ateapi.DebugClearResponse - (*MintJWTRequest)(nil), // 28: ateapi.MintJWTRequest - (*MintJWTResponse)(nil), // 29: ateapi.MintJWTResponse - (*MintCertRequest)(nil), // 30: ateapi.MintCertRequest - (*MintCertResponse)(nil), // 31: ateapi.MintCertResponse - nil, // 32: ateapi.Selector.MatchLabelsEntry + (SnapshotType)(0), // 0: ateapi.SnapshotType + (Actor_Status)(0), // 1: ateapi.Actor.Status + (*ExternalSnapshotInfo)(nil), // 2: ateapi.ExternalSnapshotInfo + (*LocalSnapshotInfo)(nil), // 3: ateapi.LocalSnapshotInfo + (*SnapshotInfo)(nil), // 4: ateapi.SnapshotInfo + (*Selector)(nil), // 5: ateapi.Selector + (*Actor)(nil), // 6: ateapi.Actor + (*GetActorRequest)(nil), // 7: ateapi.GetActorRequest + (*GetActorResponse)(nil), // 8: ateapi.GetActorResponse + (*CreateActorRequest)(nil), // 9: ateapi.CreateActorRequest + (*CreateActorResponse)(nil), // 10: ateapi.CreateActorResponse + (*UpdateActorRequest)(nil), // 11: ateapi.UpdateActorRequest + (*UpdateActorResponse)(nil), // 12: ateapi.UpdateActorResponse + (*SuspendActorRequest)(nil), // 13: ateapi.SuspendActorRequest + (*SuspendActorResponse)(nil), // 14: ateapi.SuspendActorResponse + (*PauseActorRequest)(nil), // 15: ateapi.PauseActorRequest + (*PauseActorResponse)(nil), // 16: ateapi.PauseActorResponse + (*ResumeActorRequest)(nil), // 17: ateapi.ResumeActorRequest + (*ResumeActorResponse)(nil), // 18: ateapi.ResumeActorResponse + (*DeleteActorRequest)(nil), // 19: ateapi.DeleteActorRequest + (*DeleteActorResponse)(nil), // 20: ateapi.DeleteActorResponse + (*ListWorkersRequest)(nil), // 21: ateapi.ListWorkersRequest + (*ListWorkersResponse)(nil), // 22: ateapi.ListWorkersResponse + (*ListActorsRequest)(nil), // 23: ateapi.ListActorsRequest + (*ListActorsResponse)(nil), // 24: ateapi.ListActorsResponse + (*Worker)(nil), // 25: ateapi.Worker + (*NodeImageCache)(nil), // 26: ateapi.NodeImageCache + (*ReportNodeImageCacheRequest)(nil), // 27: ateapi.ReportNodeImageCacheRequest + (*ReportNodeImageCacheResponse)(nil), // 28: ateapi.ReportNodeImageCacheResponse + (*DebugClearRequest)(nil), // 29: ateapi.DebugClearRequest + (*DebugClearResponse)(nil), // 30: ateapi.DebugClearResponse + (*MintJWTRequest)(nil), // 31: ateapi.MintJWTRequest + (*MintJWTResponse)(nil), // 32: ateapi.MintJWTResponse + (*MintCertRequest)(nil), // 33: ateapi.MintCertRequest + (*MintCertResponse)(nil), // 34: ateapi.MintCertResponse + nil, // 35: ateapi.Selector.MatchLabelsEntry } var file_ateapi_proto_depIdxs = []int32{ 0, // 0: ateapi.SnapshotInfo.type:type_name -> ateapi.SnapshotType 2, // 1: ateapi.SnapshotInfo.external:type_name -> ateapi.ExternalSnapshotInfo 3, // 2: ateapi.SnapshotInfo.local:type_name -> ateapi.LocalSnapshotInfo - 32, // 3: ateapi.Selector.match_labels:type_name -> ateapi.Selector.MatchLabelsEntry + 35, // 3: ateapi.Selector.match_labels:type_name -> ateapi.Selector.MatchLabelsEntry 1, // 4: ateapi.Actor.status:type_name -> ateapi.Actor.Status 4, // 5: ateapi.Actor.latest_snapshot_info:type_name -> ateapi.SnapshotInfo 5, // 6: ateapi.Actor.worker_selector:type_name -> ateapi.Selector @@ -2032,35 +2183,38 @@ var file_ateapi_proto_depIdxs = []int32{ 6, // 14: ateapi.ResumeActorResponse.actor:type_name -> ateapi.Actor 25, // 15: ateapi.ListWorkersResponse.workers:type_name -> ateapi.Worker 6, // 16: ateapi.ListActorsResponse.actors:type_name -> ateapi.Actor - 7, // 17: ateapi.Control.GetActor:input_type -> ateapi.GetActorRequest - 9, // 18: ateapi.Control.CreateActor:input_type -> ateapi.CreateActorRequest - 11, // 19: ateapi.Control.UpdateActor:input_type -> ateapi.UpdateActorRequest - 13, // 20: ateapi.Control.SuspendActor:input_type -> ateapi.SuspendActorRequest - 15, // 21: ateapi.Control.PauseActor:input_type -> ateapi.PauseActorRequest - 17, // 22: ateapi.Control.ResumeActor:input_type -> ateapi.ResumeActorRequest - 19, // 23: ateapi.Control.DeleteActor:input_type -> ateapi.DeleteActorRequest - 21, // 24: ateapi.Control.ListWorkers:input_type -> ateapi.ListWorkersRequest - 23, // 25: ateapi.Control.ListActors:input_type -> ateapi.ListActorsRequest - 26, // 26: ateapi.Control.DebugClear:input_type -> ateapi.DebugClearRequest - 28, // 27: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest - 30, // 28: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest - 8, // 29: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse - 10, // 30: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse - 12, // 31: ateapi.Control.UpdateActor:output_type -> ateapi.UpdateActorResponse - 14, // 32: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse - 16, // 33: ateapi.Control.PauseActor:output_type -> ateapi.PauseActorResponse - 18, // 34: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse - 20, // 35: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse - 22, // 36: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse - 24, // 37: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse - 27, // 38: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse - 29, // 39: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse - 31, // 40: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse - 29, // [29:41] is the sub-list for method output_type - 17, // [17:29] is the sub-list for method input_type - 17, // [17:17] is the sub-list for extension type_name - 17, // [17:17] is the sub-list for extension extendee - 0, // [0:17] is the sub-list for field type_name + 26, // 17: ateapi.ReportNodeImageCacheRequest.cache:type_name -> ateapi.NodeImageCache + 7, // 18: ateapi.Control.GetActor:input_type -> ateapi.GetActorRequest + 9, // 19: ateapi.Control.CreateActor:input_type -> ateapi.CreateActorRequest + 11, // 20: ateapi.Control.UpdateActor:input_type -> ateapi.UpdateActorRequest + 13, // 21: ateapi.Control.SuspendActor:input_type -> ateapi.SuspendActorRequest + 15, // 22: ateapi.Control.PauseActor:input_type -> ateapi.PauseActorRequest + 17, // 23: ateapi.Control.ResumeActor:input_type -> ateapi.ResumeActorRequest + 19, // 24: ateapi.Control.DeleteActor:input_type -> ateapi.DeleteActorRequest + 21, // 25: ateapi.Control.ListWorkers:input_type -> ateapi.ListWorkersRequest + 23, // 26: ateapi.Control.ListActors:input_type -> ateapi.ListActorsRequest + 27, // 27: ateapi.Control.ReportNodeImageCache:input_type -> ateapi.ReportNodeImageCacheRequest + 29, // 28: ateapi.Control.DebugClear:input_type -> ateapi.DebugClearRequest + 31, // 29: ateapi.SessionIdentity.MintJWT:input_type -> ateapi.MintJWTRequest + 33, // 30: ateapi.SessionIdentity.MintCert:input_type -> ateapi.MintCertRequest + 8, // 31: ateapi.Control.GetActor:output_type -> ateapi.GetActorResponse + 10, // 32: ateapi.Control.CreateActor:output_type -> ateapi.CreateActorResponse + 12, // 33: ateapi.Control.UpdateActor:output_type -> ateapi.UpdateActorResponse + 14, // 34: ateapi.Control.SuspendActor:output_type -> ateapi.SuspendActorResponse + 16, // 35: ateapi.Control.PauseActor:output_type -> ateapi.PauseActorResponse + 18, // 36: ateapi.Control.ResumeActor:output_type -> ateapi.ResumeActorResponse + 20, // 37: ateapi.Control.DeleteActor:output_type -> ateapi.DeleteActorResponse + 22, // 38: ateapi.Control.ListWorkers:output_type -> ateapi.ListWorkersResponse + 24, // 39: ateapi.Control.ListActors:output_type -> ateapi.ListActorsResponse + 28, // 40: ateapi.Control.ReportNodeImageCache:output_type -> ateapi.ReportNodeImageCacheResponse + 30, // 41: ateapi.Control.DebugClear:output_type -> ateapi.DebugClearResponse + 32, // 42: ateapi.SessionIdentity.MintJWT:output_type -> ateapi.MintJWTResponse + 34, // 43: ateapi.SessionIdentity.MintCert:output_type -> ateapi.MintCertResponse + 31, // [31:44] is the sub-list for method output_type + 18, // [18:31] is the sub-list for method input_type + 18, // [18:18] is the sub-list for extension type_name + 18, // [18:18] is the sub-list for extension extendee + 0, // [0:18] is the sub-list for field type_name } func init() { file_ateapi_proto_init() } @@ -2078,7 +2232,7 @@ func file_ateapi_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ateapi_proto_rawDesc), len(file_ateapi_proto_rawDesc)), NumEnums: 2, - NumMessages: 31, + NumMessages: 34, NumExtensions: 0, NumServices: 2, }, diff --git a/pkg/proto/ateapipb/ateapi.proto b/pkg/proto/ateapipb/ateapi.proto index 20e80e5ca..04e0c574f 100644 --- a/pkg/proto/ateapipb/ateapi.proto +++ b/pkg/proto/ateapipb/ateapi.proto @@ -50,6 +50,10 @@ service Control { // List all actors currently reflected in redis. rpc ListActors(ListActorsRequest) returns (ListActorsResponse) {} + // Report the image digests cached by the atelet on a node. Reports are full + // snapshots and expire unless the atelet refreshes them periodically. + rpc ReportNodeImageCache(ReportNodeImageCacheRequest) returns (ReportNodeImageCacheResponse) {} + // Debugging: drop all data from the ate database. rpc DebugClear(DebugClearRequest) returns (DebugClearResponse) {} } @@ -246,6 +250,18 @@ message Worker { string node_name = 10; } +message NodeImageCache { + string node_name = 1; + string atelet_pod_uid = 2; + repeated string image_digests = 3; +} + +message ReportNodeImageCacheRequest { + NodeImageCache cache = 1; +} + +message ReportNodeImageCacheResponse {} + message DebugClearRequest {} message DebugClearResponse {} diff --git a/pkg/proto/ateapipb/ateapi_grpc.pb.go b/pkg/proto/ateapipb/ateapi_grpc.pb.go index 1f4c47c48..f55c3efe3 100644 --- a/pkg/proto/ateapipb/ateapi_grpc.pb.go +++ b/pkg/proto/ateapipb/ateapi_grpc.pb.go @@ -35,16 +35,17 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" - Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" - Control_UpdateActor_FullMethodName = "/ateapi.Control/UpdateActor" - Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" - Control_PauseActor_FullMethodName = "/ateapi.Control/PauseActor" - Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" - Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" - Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" - Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" - Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" + Control_GetActor_FullMethodName = "/ateapi.Control/GetActor" + Control_CreateActor_FullMethodName = "/ateapi.Control/CreateActor" + Control_UpdateActor_FullMethodName = "/ateapi.Control/UpdateActor" + Control_SuspendActor_FullMethodName = "/ateapi.Control/SuspendActor" + Control_PauseActor_FullMethodName = "/ateapi.Control/PauseActor" + Control_ResumeActor_FullMethodName = "/ateapi.Control/ResumeActor" + Control_DeleteActor_FullMethodName = "/ateapi.Control/DeleteActor" + Control_ListWorkers_FullMethodName = "/ateapi.Control/ListWorkers" + Control_ListActors_FullMethodName = "/ateapi.Control/ListActors" + Control_ReportNodeImageCache_FullMethodName = "/ateapi.Control/ReportNodeImageCache" + Control_DebugClear_FullMethodName = "/ateapi.Control/DebugClear" ) // ControlClient is the client API for Control service. @@ -71,6 +72,9 @@ type ControlClient interface { ListWorkers(ctx context.Context, in *ListWorkersRequest, opts ...grpc.CallOption) (*ListWorkersResponse, error) // List all actors currently reflected in redis. ListActors(ctx context.Context, in *ListActorsRequest, opts ...grpc.CallOption) (*ListActorsResponse, error) + // Report the image digests cached by the atelet on a node. Reports are full + // snapshots and expire unless the atelet refreshes them periodically. + ReportNodeImageCache(ctx context.Context, in *ReportNodeImageCacheRequest, opts ...grpc.CallOption) (*ReportNodeImageCacheResponse, error) // Debugging: drop all data from the ate database. DebugClear(ctx context.Context, in *DebugClearRequest, opts ...grpc.CallOption) (*DebugClearResponse, error) } @@ -173,6 +177,16 @@ func (c *controlClient) ListActors(ctx context.Context, in *ListActorsRequest, o return out, nil } +func (c *controlClient) ReportNodeImageCache(ctx context.Context, in *ReportNodeImageCacheRequest, opts ...grpc.CallOption) (*ReportNodeImageCacheResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ReportNodeImageCacheResponse) + err := c.cc.Invoke(ctx, Control_ReportNodeImageCache_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *controlClient) DebugClear(ctx context.Context, in *DebugClearRequest, opts ...grpc.CallOption) (*DebugClearResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(DebugClearResponse) @@ -207,6 +221,9 @@ type ControlServer interface { ListWorkers(context.Context, *ListWorkersRequest) (*ListWorkersResponse, error) // List all actors currently reflected in redis. ListActors(context.Context, *ListActorsRequest) (*ListActorsResponse, error) + // Report the image digests cached by the atelet on a node. Reports are full + // snapshots and expire unless the atelet refreshes them periodically. + ReportNodeImageCache(context.Context, *ReportNodeImageCacheRequest) (*ReportNodeImageCacheResponse, error) // Debugging: drop all data from the ate database. DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) mustEmbedUnimplementedControlServer() @@ -246,6 +263,9 @@ func (UnimplementedControlServer) ListWorkers(context.Context, *ListWorkersReque func (UnimplementedControlServer) ListActors(context.Context, *ListActorsRequest) (*ListActorsResponse, error) { return nil, status.Error(codes.Unimplemented, "method ListActors not implemented") } +func (UnimplementedControlServer) ReportNodeImageCache(context.Context, *ReportNodeImageCacheRequest) (*ReportNodeImageCacheResponse, error) { + return nil, status.Error(codes.Unimplemented, "method ReportNodeImageCache not implemented") +} func (UnimplementedControlServer) DebugClear(context.Context, *DebugClearRequest) (*DebugClearResponse, error) { return nil, status.Error(codes.Unimplemented, "method DebugClear not implemented") } @@ -432,6 +452,24 @@ func _Control_ListActors_Handler(srv interface{}, ctx context.Context, dec func( return interceptor(ctx, in, info, handler) } +func _Control_ReportNodeImageCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReportNodeImageCacheRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlServer).ReportNodeImageCache(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Control_ReportNodeImageCache_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlServer).ReportNodeImageCache(ctx, req.(*ReportNodeImageCacheRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Control_DebugClear_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(DebugClearRequest) if err := dec(in); err != nil { @@ -493,6 +531,10 @@ var Control_ServiceDesc = grpc.ServiceDesc{ MethodName: "ListActors", Handler: _Control_ListActors_Handler, }, + { + MethodName: "ReportNodeImageCache", + Handler: _Control_ReportNodeImageCache_Handler, + }, { MethodName: "DebugClear", Handler: _Control_DebugClear_Handler,