Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. #7160
* [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363
* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
Expand Down
23 changes: 0 additions & 23 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ querier:
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]

# Maximum lookback beyond which queries are not sent to ingester. 0 means all
# queries are sent to ingester.
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Enable returning samples stats per steps in query response.
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]
Expand All @@ -131,14 +126,6 @@ querier:
# CLI flag: -querier.response-compression
[response_compression: <string> | default = "gzip"]

# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
# store will be manipulated to ensure the query end is not more recent than
# 'now - query-store-after'.
# CLI flag: -querier.query-store-after
[query_store_after: <duration> | default = 0s]

# Maximum duration into the future you can query. 0 to disable.
# CLI flag: -querier.max-query-into-future
[max_query_into_future: <duration> | default = 10m]
Expand Down Expand Up @@ -247,16 +234,6 @@ querier:
# CLI flag: -querier.ingester-query-max-attempts
[ingester_query_max_attempts: <int> | default = 1]

# When distributor's sharding strategy is shuffle-sharding and this setting is
# > 0, queriers fetch in-memory series from the minimum set of required
# ingesters, selecting only ingesters which may have received series since
# 'now - lookback period'. The lookback period should be greater or equal than
# the configured 'query store after' and 'query ingesters within'. If this
# setting is 0, queriers always query all ingesters (ingesters shuffle
# sharding on read path is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

thanos_engine:
# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus
Expand Down
42 changes: 19 additions & 23 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4286,6 +4286,25 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# zones are not available.
[query_partial_data: <boolean> | default = false]

# Maximum lookback duration for querying data from ingesters. Queries for data
# older than this will only query the long-term storage. This is a per-tenant
# limit that can be overridden in the runtime configuration. Should be less than
# or equal to close-idle-tsdb-timeout.
# CLI flag: -limits.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Minimum age of data before querying the long-term storage. Queries for data
# younger than this will only query ingesters. This is a per-tenant limit that
# can be overridden in the runtime configuration.
# CLI flag: -limits.query-store-after
[query_store_after: <duration> | default = 0s]

# Lookback period for shuffle sharding of ingesters. This is a per-tenant limit
# that can be overridden in the runtime configuration. Should be greater than or
# equal to query-ingesters-within.
# CLI flag: -limits.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

# The maximum number of rows that can be fetched when querying parquet storage.
# Each row maps to a series in a parquet file. This limit applies before
# materializing chunks. 0 to disable.
Expand Down Expand Up @@ -4755,11 +4774,6 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.max-samples
[max_samples: <int> | default = 50000000]

# Maximum lookback beyond which queries are not sent to ingester. 0 means all
# queries are sent to ingester.
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Enable returning samples stats per steps in query response.
# CLI flag: -querier.per-step-stats-enabled
[per_step_stats_enabled: <boolean> | default = false]
Expand All @@ -4769,14 +4783,6 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.response-compression
[response_compression: <string> | default = "gzip"]

# The time after which a metric should be queried from storage and not just
# ingesters. 0 means all queries are sent to store. When running the blocks
# storage, if this option is enabled, the time range of the query sent to the
# store will be manipulated to ensure the query end is not more recent than 'now
# - query-store-after'.
# CLI flag: -querier.query-store-after
[query_store_after: <duration> | default = 0s]

# Maximum duration into the future you can query. 0 to disable.
# CLI flag: -querier.max-query-into-future
[max_query_into_future: <duration> | default = 10m]
Expand Down Expand Up @@ -4885,16 +4891,6 @@ store_gateway_client:
# CLI flag: -querier.ingester-query-max-attempts
[ingester_query_max_attempts: <int> | default = 1]

# When distributor's sharding strategy is shuffle-sharding and this setting is >
# 0, queriers fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since 'now - lookback
# period'. The lookback period should be greater or equal than the configured
# 'query store after' and 'query ingesters within'. If this setting is 0,
# queriers always query all ingesters (ingesters shuffle sharding on read path
# is disabled).
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]

thanos_engine:
# Experimental. Use Thanos promql engine
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.LimitsConfig.Validate(c.NameValidationScheme, c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.LimitsConfig.ValidateQueryLimits("default", c.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil {
return errors.Wrap(err, "invalid query routing config")
}
if err := c.ResourceMonitor.Validate(); err != nil {
return errors.Wrap(err, "invalid resource-monitor config")
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {

func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled

Expand Down Expand Up @@ -497,7 +496,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, t.ResourceMonitor)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ func (l runtimeConfigLoader) load(r io.Reader) (any, error) {
// only check if target is `all`, `distributor`, "querier", and "ruler"
// refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929
if overrides != nil {
for _, ul := range overrides.TenantLimits {
for userID, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.NameValidationScheme, l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return nil, err
}
if err := ul.ValidateQueryLimits(userID, l.cfg.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil {
return nil, err
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ type Config struct {
// this (and should never use it) but this feature is used by other projects built on top of it
SkipLabelNameValidation bool `yaml:"-"`

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set
// with metadata APIs (labels names and values for now). When zone awareness is enabled, only results
// from quorum number of zones will be included to reduce data merged and improve performance.
Expand Down
146 changes: 145 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3388,7 +3388,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
distributorCfg.ShuffleShardingLookbackPeriod = time.Hour
cfg.limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(time.Hour)

cfg.limits.IngestionTenantShardSize = cfg.shuffleShardSize
}
Expand Down Expand Up @@ -4794,3 +4794,147 @@ func TestDistributor_BatchTimeoutMetric(t *testing.T) {
cortex_distributor_ingester_push_timeouts_total 5
`), "cortex_distributor_ingester_push_timeouts_total"))
}
func TestDistributor_ShuffleShardingIngestersLookbackPeriod(t *testing.T) {
t.Parallel()

tests := map[string]struct {
lookbackPeriod time.Duration
shardSize int
expectedBehavior string
}{
"lookback disabled (0) should not use shuffle sharding with lookback": {
lookbackPeriod: 0,
shardSize: 3,
expectedBehavior: "no_lookback",
},
"lookback 1h should include ingesters from past hour": {
lookbackPeriod: 1 * time.Hour,
shardSize: 3,
expectedBehavior: "with_lookback",
},
"lookback 2h should include ingesters from past 2 hours": {
lookbackPeriod: 2 * time.Hour,
shardSize: 3,
expectedBehavior: "with_lookback",
},
"shard size 0 should not use shuffle sharding": {
lookbackPeriod: 1 * time.Hour,
shardSize: 0,
expectedBehavior: "no_shuffle_sharding",
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()

// Setup distributor with shuffle sharding enabled
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionTenantShardSize = testData.shardSize
limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.lookbackPeriod)

numIngesters := 10
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
shardByAllLabels: true,
shuffleShardSize: testData.shardSize,
shuffleShardEnabled: true,
limits: limits,
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Get ingesters for query
replicationSet, err := ds[0].GetIngestersForQuery(ctx)
require.NoError(t, err)

switch testData.expectedBehavior {
case "no_lookback":
// When lookback is disabled, should still use shuffle sharding but without lookback
// This means we get the current shard size
if testData.shardSize > 0 {
assert.LessOrEqual(t, len(replicationSet.Instances), testData.shardSize,
"should not exceed shard size when lookback is disabled")
}

case "with_lookback":
// When lookback is enabled, should use shuffle sharding with lookback
// This means we might get more ingesters than the shard size
assert.GreaterOrEqual(t, len(replicationSet.Instances), testData.shardSize,
"should include at least shard size ingesters with lookback")

case "no_shuffle_sharding":
// When shard size is 0, shuffle sharding is disabled
// Should query all ingesters
assert.Equal(t, numIngesters, len(replicationSet.Instances),
"should query all ingesters when shuffle sharding is disabled")
}
})
}
}

func TestDistributor_ShuffleShardingIngestersLookbackPeriod_Validation(t *testing.T) {
t.Parallel()

tests := map[string]struct {
queryStoreAfter time.Duration
shuffleShardingIngestersLookbackPeriod time.Duration
shouldBeValid bool
description string
}{
"valid: lookback >= queryStoreAfter": {
queryStoreAfter: 1 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 2 * time.Hour,
shouldBeValid: true,
description: "lookback period should be >= queryStoreAfter",
},
"valid: lookback == queryStoreAfter": {
queryStoreAfter: 1 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: true,
description: "lookback period can equal queryStoreAfter",
},
"invalid: lookback < queryStoreAfter": {
queryStoreAfter: 2 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: false,
description: "lookback period must be >= queryStoreAfter",
},
"valid: both disabled": {
queryStoreAfter: 0,
shuffleShardingIngestersLookbackPeriod: 0,
shouldBeValid: true,
description: "both can be disabled",
},
"valid: queryStoreAfter disabled": {
queryStoreAfter: 0,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: true,
description: "queryStoreAfter can be disabled while lookback is enabled",
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()

limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.QueryStoreAfter = model.Duration(testData.queryStoreAfter)
limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.shuffleShardingIngestersLookbackPeriod)

// ValidateQueryLimits requires userID and closeIdleTSDBTimeout
err := limits.ValidateQueryLimits("test-user", 13*time.Hour)

if testData.shouldBeValid {
assert.NoError(t, err, testData.description)
} else {
assert.Error(t, err, testData.description)
assert.Contains(t, err.Error(), "shuffle_sharding_ingesters_lookback_period", testData.description)
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod(userID)

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod(userID)

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
Expand Down
9 changes: 3 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

// Injected at runtime and read from querier config.
QueryIngestersWithin time.Duration `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

Expand Down Expand Up @@ -1926,7 +1923,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -2042,7 +2039,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -2174,7 +2171,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
return cleanup, err
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return cleanup, err
}
Expand Down
Loading
Loading