Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ func NewQuerierHandler(
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
api.Register(legacyPromRouter)

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{
TotalTimeout: querierCfg.TimeoutClassificationDeadline,
EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold,
Enabled: querierCfg.TimeoutClassificationEnabled,
})

requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
Expand Down
133 changes: 119 additions & 14 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"github.com/cortexproject/cortex/pkg/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/api"
"github.com/cortexproject/cortex/pkg/util/requestmeta"
)

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
queryable storage.SampleAndChunkQueryable
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
codecs []v1.Codec
CORSOrigin *regexp.Regexp
timeoutClassification stats.PhaseTrackerConfig
}

func NewQueryAPI(
Expand All @@ -42,15 +45,17 @@ func NewQueryAPI(
logger log.Logger,
codecs []v1.Codec,
CORSOrigin *regexp.Regexp,
timeoutClassification stats.PhaseTrackerConfig,
) *QueryAPI {
return &QueryAPI{
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
queryEngine: qe,
queryable: q,
statsRenderer: statsRenderer,
logger: logger,
codecs: codecs,
CORSOrigin: CORSOrigin,
now: time.Now,
timeoutClassification: timeoutClassification,
}
}

Expand Down Expand Up @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand All @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
}

ctx := r.Context()

// Always record query start time for phase tracking, regardless of feature flag.
queryStats := stats.FromContext(ctx)
queryStats.SetQueryStart(time.Now())

if to := r.FormValue("timeout"); to != "" {
var cancel context.CancelFunc
timeout, err := util.ParseDurationMs(to)
Expand All @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
defer cancel()
}

cfg := q.timeoutClassification
ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg)
if cancel != nil {
defer cancel()
}
if earlyResult != nil {
return *earlyResult
}

opts, err := extractQueryOpts(r)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
Expand Down Expand Up @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

res := qry.Exec(ctx)
if res.Err != nil {
// If the context was cancelled/timed out, apply timeout classification.
if ctx.Err() != nil {
if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil {
return *classified
}
}

return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
}

Expand Down Expand Up @@ -281,6 +328,64 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w
}
}

// applyTimeoutClassification creates a proactive context timeout that fires before
// the PromQL engine's own timeout, adjusted for queue wait time. Returns the
// (possibly wrapped) context, an optional cancel func, and an optional early-exit
// result when the entire timeout budget was already consumed in the queue.
func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) {
if !cfg.Enabled {
return ctx, nil, nil
}
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
effectiveTimeout := cfg.TotalTimeout - queueWaitTime
if effectiveTimeout <= 0 {
return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable,
"query timed out: query spent too long in scheduler queue")}, nil, nil}
}
ctx, cancel := context.WithTimeout(ctx, effectiveTimeout)
return ctx, cancel, nil
}

// classifyTimeout inspects phase timings after a context cancellation/timeout
// and returns an apiFuncResult if the timeout should be converted to a 4XX user error.
// Returns nil if no conversion applies and the caller should use the default error path.
func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult {
decision := stats.DecideTimeoutResponse(queryStats, cfg)

fetchTime := queryStats.LoadQueryStorageWallTime()
totalTime := time.Since(queryStats.LoadQueryStart())
evalTime := totalTime - fetchTime
var queueWaitTime time.Duration
queueJoin := queryStats.LoadQueueJoinTime()
queueLeave := queryStats.LoadQueueLeaveTime()
if !queueJoin.IsZero() && !queueLeave.IsZero() {
queueWaitTime = queueLeave.Sub(queueJoin)
}
level.Warn(q.logger).Log(
"msg", "query timed out with classification",
"request_id", requestmeta.RequestIdFromContext(ctx),
"query_start", queryStats.LoadQueryStart(),
"queue_wait_time", queueWaitTime,
"fetch_time", fetchTime,
"eval_time", evalTime,
"total_time", totalTime,
"decision", decision,
"conversion_enabled", cfg.Enabled,
)

if cfg.Enabled && decision == stats.UserError4XX {
return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity,
"query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer}
}

return nil
}

func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
for _, codec := range q.codecs {
Expand Down
13 changes: 13 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,19 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if maxFetch := stats.LoadMaxFetchTime(); maxFetch > 0 {
logMessage = append(logMessage, "max_fetch_time", maxFetch)
}
if maxEval := stats.LoadMaxEvalTime(); maxEval > 0 {
logMessage = append(logMessage, "max_eval_time", maxEval)
}
if maxQueue := stats.LoadMaxQueueWaitTime(); maxQueue > 0 {
logMessage = append(logMessage, "max_queue_wait_time", maxQueue)
}
if maxTotal := stats.LoadMaxTotalTime(); maxTotal > 0 {
logMessage = append(logMessage, "max_total_time", maxTotal)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ type Config struct {
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`

HonorProjectionHints bool `yaml:"honor_projection_hints"`

// Timeout classification flags for converting 5XX to 4XX on expensive queries.
TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"`
TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"`
TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"`
}

var (
Expand All @@ -114,6 +119,11 @@ var (
errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0")
errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1")
errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet")

errTimeoutClassificationDeadlineNotPositive = errors.New("timeout_classification_deadline must be positive when timeout classification is enabled")
errTimeoutClassificationEvalThresholdNotPositive = errors.New("timeout_classification_eval_threshold must be positive when timeout classification is enabled")
errTimeoutClassificationEvalThresholdExceedsDeadline = errors.New("timeout_classification_eval_threshold must be less than or equal to timeout_classification_deadline")
errTimeoutClassificationDeadlineExceedsTimeout = errors.New("timeout_classification_deadline must be less than the querier timeout")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand Down Expand Up @@ -158,6 +168,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.HonorProjectionHints, "querier.honor-projection-hints", false, "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.")
f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")
f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.")
f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", 59*time.Second, "The total time before the querier proactively cancels a query for timeout classification.")
f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", 40*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).")
}

// Validate the config
Expand Down Expand Up @@ -197,6 +210,21 @@ func (cfg *Config) Validate() error {
}
}

if cfg.TimeoutClassificationEnabled {
if cfg.TimeoutClassificationDeadline <= 0 {
return errTimeoutClassificationDeadlineNotPositive
}
if cfg.TimeoutClassificationEvalThreshold <= 0 {
return errTimeoutClassificationEvalThresholdNotPositive
}
if cfg.TimeoutClassificationEvalThreshold > cfg.TimeoutClassificationDeadline {
return errTimeoutClassificationEvalThresholdExceedsDeadline
}
if cfg.TimeoutClassificationDeadline >= cfg.Timeout {
return errTimeoutClassificationDeadlineExceedsTimeout
}
}

if err := cfg.ThanosEngine.Validate(); err != nil {
return err
}
Expand Down
Loading
Loading