Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func setupTest(t *testing.T, ns string) *testContext {
substrateInformerFactory.WaitForCacheSync(ctx.Done())

// 4. Initialize Service
wc := workercache.New(persistence)
wc := workercache.New(persistence, 5*time.Minute)
if err := wc.Start(ctx); err != nil {
cancel()
mr.Close()
Expand Down
42 changes: 27 additions & 15 deletions cmd/ateapi/internal/workercache/workercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,35 @@ import (
// TODO: add metrics — at minimum a gauge for worker count, a counter for
// resync events, and a counter for failed PUBLISH operations (in ateredis).
type Cache struct {
store store.Interface
store store.Interface
relistInterval time.Duration

mu sync.RWMutex
workers map[string]*ateapipb.Worker

ready atomic.Bool
}

// New creates a Cache backed by a given store.
func New(store store.Interface) *Cache {
// New creates a Cache backed by a given store. relistInterval controls how
// often the cache performs a full ListWorkers to recover from state drifts
// caused by missing WorkerWatch events.
func New(store store.Interface, relistInterval time.Duration) *Cache {
return &Cache{
store: store,
workers: make(map[string]*ateapipb.Worker),
store: store,
relistInterval: relistInterval,
workers: make(map[string]*ateapipb.Worker),
}
}

// Start syncs the cache synchronously, then spawns a background goroutine
// that streams updates and resyncs on connection loss.
// that streams updates, relists periodically, and resyncs on connection loss.
// Returns as soon as the initial sync succeeds.
func (c *Cache) Start(ctx context.Context) error {
watch, err := c.sync(ctx)
if err != nil {
return err
}
c.ready.Store(true)
go c.watchEvents(ctx, watch)
return nil
}
Expand All @@ -86,31 +91,33 @@ func (c *Cache) sync(ctx context.Context) (*store.WorkerWatch, error) {
if err != nil {
return nil, fmt.Errorf("WatchWorkers: %w", err)
}
if err := c.relist(ctx); err != nil {
watch.Close()
return nil, err
}
return watch, nil
}

func (c *Cache) relist(ctx context.Context) error {
workers, err := c.store.ListWorkers(ctx)
if err != nil {
watch.Close()
return nil, fmt.Errorf("ListWorkers: %w", err)
return fmt.Errorf("ListWorkers: %w", err)
}

newMap := make(map[string]*ateapipb.Worker, len(workers))
for _, w := range workers {
newMap[workerKey(w)] = w
}

c.mu.Lock()
c.workers = newMap
c.mu.Unlock()
c.ready.Store(true)

slog.InfoContext(ctx, "worker cache synced", slog.Int("count", len(newMap)))
return watch, nil
return nil
}

func (c *Cache) watchEvents(ctx context.Context, watch *store.WorkerWatch) {
ticker := time.NewTicker(c.relistInterval)
defer ticker.Stop()
for {
// TODO: Add a periodic "relist" so we can recover from state drifts
// caused by missed watch events.
select {
case event, ok := <-watch.Events:
if !ok {
Expand All @@ -124,9 +131,14 @@ func (c *Cache) watchEvents(ctx context.Context, watch *store.WorkerWatch) {
if watch == nil {
return // context cancelled
}
c.ready.Store(true)
} else {
c.applyEvent(event)
}
case <-ticker.C:
if err := c.relist(ctx); err != nil {
slog.WarnContext(ctx, "worker cache: periodic relist failed", slog.Any("err", err))
}
case <-ctx.Done():
c.ready.Store(false)
watch.Close()
Expand Down
103 changes: 92 additions & 11 deletions cmd/ateapi/internal/workercache/workercache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func TestCache_NotReadyBeforeStart(t *testing.T) {
c := workercache.New(newFakeStore())
c := workercache.New(newFakeStore(), time.Hour)
_, err := c.Workers()
if err == nil {
t.Fatal("expected error from Workers before Start, got nil")
Expand All @@ -42,7 +42,7 @@ func TestCache_SyncsOnStart(t *testing.T) {
w1 := makeWorker("ns", "pod1", 1)
w2 := makeWorker("ns", "pod2", 1)

c := workercache.New(newFakeStore(w1, w2))
c := workercache.New(newFakeStore(w1, w2), time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -60,7 +60,7 @@ func TestCache_SyncsOnStart(t *testing.T) {

func TestCache_CreatedEvent(t *testing.T) {
fs := newFakeStore()
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -84,7 +84,7 @@ func TestCache_CreatedEvent(t *testing.T) {
func TestCache_UpdatedEvent_NewerVersionApplied(t *testing.T) {
w := makeWorker("ns", "pod1", 1)
fs := newFakeStore(w)
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -109,7 +109,7 @@ func TestCache_UpdatedEvent_NewerVersionApplied(t *testing.T) {
func TestCache_UpdatedEvent_OlderVersionIgnored(t *testing.T) {
w := makeWorker("ns", "pod1", 5)
fs := newFakeStore(w)
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestCache_UpdatedEvent_OlderVersionIgnored(t *testing.T) {
func TestCache_DeletedEvent(t *testing.T) {
w := makeWorker("ns", "pod1", 1)
fs := newFakeStore(w)
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -160,7 +160,7 @@ func TestCache_DeletedEvent(t *testing.T) {
func TestCache_Disconnect_ResyncsWithFreshSnapshot(t *testing.T) {
w1 := makeWorker("ns", "pod1", 1)
fs := newFakeStore(w1)
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -186,7 +186,7 @@ func TestCache_Disconnect_ResyncsWithFreshSnapshot(t *testing.T) {

func TestCache_MultipleDisconnects(t *testing.T) {
fs := newFakeStore()
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx := t.Context()

if err := c.Start(ctx); err != nil {
Expand All @@ -210,7 +210,7 @@ func TestCache_MultipleDisconnects(t *testing.T) {
func TestCache_WatchClosedOnListWorkersFailure(t *testing.T) {
fs := newFakeStore()
fs.listErr = errors.New("valkey unavailable")
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)

if err := c.Start(t.Context()); err == nil {
t.Fatal("expected Start to fail when ListWorkers errors")
Expand All @@ -226,7 +226,7 @@ func TestCache_WatchClosedOnListWorkersFailure(t *testing.T) {

func TestCache_WatchClosedOnShutdown(t *testing.T) {
fs := newFakeStore()
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx, cancel := context.WithCancel(t.Context())

if err := c.Start(ctx); err != nil {
Expand All @@ -244,7 +244,7 @@ func TestCache_WatchClosedOnShutdown(t *testing.T) {

func TestCache_WatchClosedOnDisconnectAndShutdown(t *testing.T) {
fs := newFakeStore()
c := workercache.New(fs)
c := workercache.New(fs, time.Hour)
ctx, cancel := context.WithCancel(t.Context())

if err := c.Start(ctx); err != nil {
Expand All @@ -268,6 +268,87 @@ func TestCache_WatchClosedOnDisconnectAndShutdown(t *testing.T) {
}, 2*time.Second)
}

func TestCache_Relist_RecoversFromMissedCreate(t *testing.T) {
w1 := makeWorker("ns", "pod1", 1)
fs := newFakeStore(w1)
c := workercache.New(fs, 10*time.Millisecond)

if err := c.Start(t.Context()); err != nil {
t.Fatalf("Start: %v", err)
}

// Add a worker directly to the store without sending a watch event,
// simulating a silent PUBLISH failure.
w2 := makeWorker("ns", "pod2", 1)
fs.setWorkers(w1, w2)

eventually(t, func() bool {
workers, err := c.Workers()
return err == nil && len(workers) == 2
}, 2*time.Second)

got, _ := c.Workers()
if diff := cmp.Diff([]*ateapipb.Worker{w1, w2}, got, protocmp.Transform(), workerSortOpt); diff != "" {
t.Errorf("workers after relist (-want +got):\n%s", diff)
}
}

func TestCache_Relist_RecoversFromMissedDelete(t *testing.T) {
w1 := makeWorker("ns", "pod1", 1)
w2 := makeWorker("ns", "pod2", 1)
fs := newFakeStore(w1, w2)
c := workercache.New(fs, 10*time.Millisecond)

if err := c.Start(t.Context()); err != nil {
t.Fatalf("Start: %v", err)
}

// Remove a worker from the store without a watch event,
// simulating a silent PUBLISH failure on delete.
fs.setWorkers(w1)

eventually(t, func() bool {
workers, err := c.Workers()
return err == nil && len(workers) == 1
}, 2*time.Second)

got, _ := c.Workers()
if diff := cmp.Diff([]*ateapipb.Worker{w1}, got, protocmp.Transform(), workerSortOpt); diff != "" {
t.Errorf("workers after relist (-want +got):\n%s", diff)
}
}

func TestCache_Relist_FailureIsNonFatal(t *testing.T) {
w1 := makeWorker("ns", "pod1", 1)
fs := newFakeStore(w1)
c := workercache.New(fs, 10*time.Millisecond)

if err := c.Start(t.Context()); err != nil {
t.Fatalf("Start: %v", err)
}

// Make ListWorkers fail to simulate a transient Valkey error.
fs.mu.Lock()
fs.listErr = errors.New("valkey unavailable")
fs.mu.Unlock()

// Wait long enough for at least one relist attempt.
time.Sleep(50 * time.Millisecond)

// Clear the error; the cache should still be usable with the old snapshot.
fs.mu.Lock()
fs.listErr = nil
fs.mu.Unlock()

workers, err := c.Workers()
if err != nil {
t.Fatalf("Workers: %v", err)
}
if diff := cmp.Diff([]*ateapipb.Worker{w1}, workers, protocmp.Transform(), workerSortOpt); diff != "" {
t.Errorf("workers mismatch (-want +got):\n%s", diff)
}
}

type fakeStore struct {
store.Interface

Expand Down
2 changes: 1 addition & 1 deletion cmd/ateapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {

redisPersistence := ateredis.NewPersistence(redisClient)

workerCache := workercache.New(redisPersistence)
workerCache := workercache.New(redisPersistence, 5*time.Minute)
if err := workerCache.Start(ctx); err != nil {
serverboot.Fatal(ctx, "Failed to seed worker cache", err)
}
Expand Down
Loading