Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369

## 1.21.0 in progress
Expand Down
21 changes: 14 additions & 7 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,10 @@ func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) {
level.Error(am.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err())
return
case <-ticker.C:
owned := am.isUserOwned(userID)
owned, err := am.isUserOwned(userID)
if err != nil {
continue
}
if !owned {
continue
}
Expand Down Expand Up @@ -804,7 +807,11 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)

// Filter out users not owned by this shard.
for _, userID := range allUserIDs {
if am.isUserOwned(userID) {
owned, err := am.isUserOwned(userID)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to check if user %s is owned", userID)
}
if owned {
ownedUserIDs = append(ownedUserIDs, userID)
}
}
Expand All @@ -821,24 +828,24 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)
return allUserIDs, configs, nil
}

func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
func (am *MultitenantAlertmanager) isUserOwned(userID string) (bool, error) {
if !am.allowedTenants.IsAllowed(userID) {
return false
return false, nil
}

// If sharding is disabled, any alertmanager instance owns all users.
if !am.cfg.ShardingEnabled {
return true
return true, nil
}

alertmanagers, err := am.ring.Get(users.ShardByUser(userID), getSyncRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
am.ringCheckErrors.Inc()
level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err)
return false
return false, err
}

return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr())
return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()), nil
}

func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertConfigDesc) {
Expand Down
125 changes: 125 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,3 +2501,128 @@ func (m *mockAlertManagerLimits) AlertmanagerMaxSilencesCount(_ string) int {
func (m *mockAlertManagerLimits) AlertmanagerMaxSilenceSizeBytes(_ string) int {
return m.maxSilencesSizeBytes
}

func TestMultitenantAlertmanager_isUserOwned(t *testing.T) {
ctx := context.Background()
_ = ctx
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

// We don't start the AM, so the ring is empty.
// When sharding is enabled, isUserOwned will call am.ring.Get, which should fail because the ring is empty.
owned, err := am.isUserOwned("user-1")
require.Error(t, err)
require.Equal(t, ring.ErrEmptyRing, err)
require.False(t, owned)

// If sharding is disabled, it should return true, nil
am.cfg.ShardingEnabled = false
owned, err = am.isUserOwned("user-1")
require.NoError(t, err)
require.True(t, owned)
}

func TestMultitenantAlertmanager_loadAndSyncConfigs_deletesUserFromStore(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

// We don't start the AM. We just manually insert state, then call loadAndSyncConfigs.
user1 := "user-1"

// 1. Create a FullState (remote state) for user1 in the store.
fullState := alertspb.FullStateDesc{
State: &clusterpb.FullState{},
}
err = alertStore.SetFullState(ctx, user1, fullState)
require.NoError(t, err)

// Since we did NOT SetAlertConfig for user1, the store.ListAllUsers will return empty.
allUsers, err := alertStore.ListAllUsers(ctx)
require.NoError(t, err)
require.Empty(t, allUsers)

// Verify user1's FullState is written to the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)

// 2. Call loadAndSyncConfigs. It should fetch all users (which is empty),
// and since sharding is enabled, it should clean up unused remote state for any users not in the list.
// user1 has state but no config, so its state should be pruned.
err = am.loadAndSyncConfigs(ctx, reasonPeriodic)
require.NoError(t, err)

// 3. Verify user1's FullState is deleted from the store.
_, err = alertStore.GetFullState(ctx, user1)
require.Error(t, err)
require.Contains(t, err.Error(), "alertmanager storage object not found")
}

func TestMultitenantAlertmanager_loadAndSyncConfigs_DoesNotDeleteUserFromStoreWhenRingUnreachable(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

user1 := "user-1"

// 1. Create a FullState (remote state) for user1 in the store.
fullState := alertspb.FullStateDesc{
State: &clusterpb.FullState{},
}
err = alertStore.SetFullState(ctx, user1, fullState)
require.NoError(t, err)

// Since we DO SetAlertConfig for user1, the store.ListAllUsers will return the user.
err = alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: user1,
RawConfig: simpleConfigOne,
Templates: []*alertspb.TemplateDesc{},
})
require.NoError(t, err)

allUsers, err := alertStore.ListAllUsers(ctx)
require.NoError(t, err)
require.Len(t, allUsers, 1)

// Verify user1's FullState is written to the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)

// 2. Call loadAndSyncConfigs. It should fetch all users.
// Since sharding is enabled but ring is empty (unreachable), isUserOwned will fail.
// loadAndSyncConfigs will return early with an error before pruning any states!
err = am.loadAndSyncConfigs(ctx, reasonPeriodic)
require.Error(t, err)
require.Contains(t, err.Error(), ring.ErrEmptyRing.Error())

// 3. Verify user1's FullState is STILL in the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)
}
Loading