diff --git a/cmd/ateapi/internal/controlapi/functional_test.go b/cmd/ateapi/internal/controlapi/functional_test.go index 13e0d1f59..cb6824180 100644 --- a/cmd/ateapi/internal/controlapi/functional_test.go +++ b/cmd/ateapi/internal/controlapi/functional_test.go @@ -291,7 +291,7 @@ func setupTest(t *testing.T, ns string) *testContext { substrateInformerFactory.WaitForCacheSync(ctx.Done()) // 4. Initialize Service - wc := workercache.New(persistence) + wc := workercache.New(persistence, 5*time.Minute) if err := wc.Start(ctx); err != nil { cancel() mr.Close() diff --git a/cmd/ateapi/internal/workercache/workercache.go b/cmd/ateapi/internal/workercache/workercache.go index b67eb7457..f945faa6c 100644 --- a/cmd/ateapi/internal/workercache/workercache.go +++ b/cmd/ateapi/internal/workercache/workercache.go @@ -36,7 +36,8 @@ import ( // TODO: add metrics — at minimum a gauge for worker count, a counter for // resync events, and a counter for failed PUBLISH operations (in ateredis). type Cache struct { - store store.Interface + store store.Interface + relistInterval time.Duration mu sync.RWMutex workers map[string]*ateapipb.Worker @@ -44,22 +45,26 @@ type Cache struct { ready atomic.Bool } -// New creates a Cache backed by a given store. -func New(store store.Interface) *Cache { +// New creates a Cache backed by a given store. relistInterval controls how +// often the cache performs a full ListWorkers to recover from state drifts +// caused by missing WorkerWatch events. +func New(store store.Interface, relistInterval time.Duration) *Cache { return &Cache{ - store: store, - workers: make(map[string]*ateapipb.Worker), + store: store, + relistInterval: relistInterval, + workers: make(map[string]*ateapipb.Worker), } } // Start syncs the cache synchronously, then spawns a background goroutine -// that streams updates and resyncs on connection loss. +// that streams updates, relists periodically, and resyncs on connection loss. // Returns as soon as the initial sync succeeds. func (c *Cache) Start(ctx context.Context) error { watch, err := c.sync(ctx) if err != nil { return err } + c.ready.Store(true) go c.watchEvents(ctx, watch) return nil } @@ -86,31 +91,33 @@ func (c *Cache) sync(ctx context.Context) (*store.WorkerWatch, error) { if err != nil { return nil, fmt.Errorf("WatchWorkers: %w", err) } + if err := c.relist(ctx); err != nil { + watch.Close() + return nil, err + } + return watch, nil +} +func (c *Cache) relist(ctx context.Context) error { workers, err := c.store.ListWorkers(ctx) if err != nil { - watch.Close() - return nil, fmt.Errorf("ListWorkers: %w", err) + return fmt.Errorf("ListWorkers: %w", err) } - newMap := make(map[string]*ateapipb.Worker, len(workers)) for _, w := range workers { newMap[workerKey(w)] = w } - c.mu.Lock() c.workers = newMap c.mu.Unlock() - c.ready.Store(true) - slog.InfoContext(ctx, "worker cache synced", slog.Int("count", len(newMap))) - return watch, nil + return nil } func (c *Cache) watchEvents(ctx context.Context, watch *store.WorkerWatch) { + ticker := time.NewTicker(c.relistInterval) + defer ticker.Stop() for { - // TODO: Add a periodic "relist" so we can recover from state drifts - // caused by missed watch events. select { case event, ok := <-watch.Events: if !ok { @@ -124,9 +131,14 @@ func (c *Cache) watchEvents(ctx context.Context, watch *store.WorkerWatch) { if watch == nil { return // context cancelled } + c.ready.Store(true) } else { c.applyEvent(event) } + case <-ticker.C: + if err := c.relist(ctx); err != nil { + slog.WarnContext(ctx, "worker cache: periodic relist failed", slog.Any("err", err)) + } case <-ctx.Done(): c.ready.Store(false) watch.Close() diff --git a/cmd/ateapi/internal/workercache/workercache_test.go b/cmd/ateapi/internal/workercache/workercache_test.go index 53ff56075..6f53f36ab 100644 --- a/cmd/ateapi/internal/workercache/workercache_test.go +++ b/cmd/ateapi/internal/workercache/workercache_test.go @@ -31,7 +31,7 @@ import ( ) func TestCache_NotReadyBeforeStart(t *testing.T) { - c := workercache.New(newFakeStore()) + c := workercache.New(newFakeStore(), time.Hour) _, err := c.Workers() if err == nil { t.Fatal("expected error from Workers before Start, got nil") @@ -42,7 +42,7 @@ func TestCache_SyncsOnStart(t *testing.T) { w1 := makeWorker("ns", "pod1", 1) w2 := makeWorker("ns", "pod2", 1) - c := workercache.New(newFakeStore(w1, w2)) + c := workercache.New(newFakeStore(w1, w2), time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -60,7 +60,7 @@ func TestCache_SyncsOnStart(t *testing.T) { func TestCache_CreatedEvent(t *testing.T) { fs := newFakeStore() - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -84,7 +84,7 @@ func TestCache_CreatedEvent(t *testing.T) { func TestCache_UpdatedEvent_NewerVersionApplied(t *testing.T) { w := makeWorker("ns", "pod1", 1) fs := newFakeStore(w) - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -109,7 +109,7 @@ func TestCache_UpdatedEvent_NewerVersionApplied(t *testing.T) { func TestCache_UpdatedEvent_OlderVersionIgnored(t *testing.T) { w := makeWorker("ns", "pod1", 5) fs := newFakeStore(w) - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -139,7 +139,7 @@ func TestCache_UpdatedEvent_OlderVersionIgnored(t *testing.T) { func TestCache_DeletedEvent(t *testing.T) { w := makeWorker("ns", "pod1", 1) fs := newFakeStore(w) - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -160,7 +160,7 @@ func TestCache_DeletedEvent(t *testing.T) { func TestCache_Disconnect_ResyncsWithFreshSnapshot(t *testing.T) { w1 := makeWorker("ns", "pod1", 1) fs := newFakeStore(w1) - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -186,7 +186,7 @@ func TestCache_Disconnect_ResyncsWithFreshSnapshot(t *testing.T) { func TestCache_MultipleDisconnects(t *testing.T) { fs := newFakeStore() - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx := t.Context() if err := c.Start(ctx); err != nil { @@ -210,7 +210,7 @@ func TestCache_MultipleDisconnects(t *testing.T) { func TestCache_WatchClosedOnListWorkersFailure(t *testing.T) { fs := newFakeStore() fs.listErr = errors.New("valkey unavailable") - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) if err := c.Start(t.Context()); err == nil { t.Fatal("expected Start to fail when ListWorkers errors") @@ -226,7 +226,7 @@ func TestCache_WatchClosedOnListWorkersFailure(t *testing.T) { func TestCache_WatchClosedOnShutdown(t *testing.T) { fs := newFakeStore() - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx, cancel := context.WithCancel(t.Context()) if err := c.Start(ctx); err != nil { @@ -244,7 +244,7 @@ func TestCache_WatchClosedOnShutdown(t *testing.T) { func TestCache_WatchClosedOnDisconnectAndShutdown(t *testing.T) { fs := newFakeStore() - c := workercache.New(fs) + c := workercache.New(fs, time.Hour) ctx, cancel := context.WithCancel(t.Context()) if err := c.Start(ctx); err != nil { @@ -268,6 +268,87 @@ func TestCache_WatchClosedOnDisconnectAndShutdown(t *testing.T) { }, 2*time.Second) } +func TestCache_Relist_RecoversFromMissedCreate(t *testing.T) { + w1 := makeWorker("ns", "pod1", 1) + fs := newFakeStore(w1) + c := workercache.New(fs, 10*time.Millisecond) + + if err := c.Start(t.Context()); err != nil { + t.Fatalf("Start: %v", err) + } + + // Add a worker directly to the store without sending a watch event, + // simulating a silent PUBLISH failure. + w2 := makeWorker("ns", "pod2", 1) + fs.setWorkers(w1, w2) + + eventually(t, func() bool { + workers, err := c.Workers() + return err == nil && len(workers) == 2 + }, 2*time.Second) + + got, _ := c.Workers() + if diff := cmp.Diff([]*ateapipb.Worker{w1, w2}, got, protocmp.Transform(), workerSortOpt); diff != "" { + t.Errorf("workers after relist (-want +got):\n%s", diff) + } +} + +func TestCache_Relist_RecoversFromMissedDelete(t *testing.T) { + w1 := makeWorker("ns", "pod1", 1) + w2 := makeWorker("ns", "pod2", 1) + fs := newFakeStore(w1, w2) + c := workercache.New(fs, 10*time.Millisecond) + + if err := c.Start(t.Context()); err != nil { + t.Fatalf("Start: %v", err) + } + + // Remove a worker from the store without a watch event, + // simulating a silent PUBLISH failure on delete. + fs.setWorkers(w1) + + eventually(t, func() bool { + workers, err := c.Workers() + return err == nil && len(workers) == 1 + }, 2*time.Second) + + got, _ := c.Workers() + if diff := cmp.Diff([]*ateapipb.Worker{w1}, got, protocmp.Transform(), workerSortOpt); diff != "" { + t.Errorf("workers after relist (-want +got):\n%s", diff) + } +} + +func TestCache_Relist_FailureIsNonFatal(t *testing.T) { + w1 := makeWorker("ns", "pod1", 1) + fs := newFakeStore(w1) + c := workercache.New(fs, 10*time.Millisecond) + + if err := c.Start(t.Context()); err != nil { + t.Fatalf("Start: %v", err) + } + + // Make ListWorkers fail to simulate a transient Valkey error. + fs.mu.Lock() + fs.listErr = errors.New("valkey unavailable") + fs.mu.Unlock() + + // Wait long enough for at least one relist attempt. + time.Sleep(50 * time.Millisecond) + + // Clear the error; the cache should still be usable with the old snapshot. + fs.mu.Lock() + fs.listErr = nil + fs.mu.Unlock() + + workers, err := c.Workers() + if err != nil { + t.Fatalf("Workers: %v", err) + } + if diff := cmp.Diff([]*ateapipb.Worker{w1}, workers, protocmp.Transform(), workerSortOpt); diff != "" { + t.Errorf("workers mismatch (-want +got):\n%s", diff) + } +} + type fakeStore struct { store.Interface diff --git a/cmd/ateapi/main.go b/cmd/ateapi/main.go index af638d30d..165561617 100644 --- a/cmd/ateapi/main.go +++ b/cmd/ateapi/main.go @@ -112,7 +112,7 @@ func main() { redisPersistence := ateredis.NewPersistence(redisClient) - workerCache := workercache.New(redisPersistence) + workerCache := workercache.New(redisPersistence, 5*time.Minute) if err := workerCache.Start(ctx); err != nil { serverboot.Fatal(ctx, "Failed to seed worker cache", err) }