From 1d9b4effa92fa927880331e748a3955908402d2f Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Tue, 24 Mar 2026 15:32:39 -0700 Subject: [PATCH] Convert heavy queries from 5xx to 4xx Signed-off-by: Essam Eldaly --- pkg/api/handlers.go | 6 +- pkg/api/queryapi/query_api.go | 133 +++++++++++++-- pkg/frontend/transport/handler.go | 13 ++ pkg/querier/querier.go | 28 ++++ pkg/querier/stats/stats.go | 191 ++++++++++++++++++++++ pkg/querier/stats/timeout_decision.go | 47 ++++++ pkg/querier/worker/frontend_processor.go | 3 + pkg/querier/worker/scheduler_processor.go | 3 + pkg/scheduler/scheduler.go | 10 ++ 9 files changed, 419 insertions(+), 15 deletions(-) create mode 100644 pkg/querier/stats/timeout_decision.go diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 7f219896b7d..5852a65cbb1 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -286,7 +286,11 @@ func NewQuerierHandler( legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1")) api.Register(legacyPromRouter) - queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) + queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, stats.PhaseTrackerConfig{ + TotalTimeout: querierCfg.TimeoutClassificationDeadline, + EvalTimeThreshold: querierCfg.TimeoutClassificationEvalThreshold, + Enabled: querierCfg.TimeoutClassificationEnabled, + }) requestTracker := request_tracker.NewRequestTracker(querierCfg.ActiveQueryTrackerDir, "apis.active", querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger)) var apiHandler http.Handler diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/queryapi/query_api.go index 83eed69ec8b..ede00d471b0 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/queryapi/query_api.go @@ -21,18 +21,21 @@ import ( "github.com/cortexproject/cortex/pkg/distributed_execution" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/api" + "github.com/cortexproject/cortex/pkg/util/requestmeta" ) type QueryAPI struct { - queryable storage.SampleAndChunkQueryable - queryEngine engine.QueryEngine - now func() time.Time - statsRenderer v1.StatsRenderer - logger log.Logger - codecs []v1.Codec - CORSOrigin *regexp.Regexp + queryable storage.SampleAndChunkQueryable + queryEngine engine.QueryEngine + now func() time.Time + statsRenderer v1.StatsRenderer + logger log.Logger + codecs []v1.Codec + CORSOrigin *regexp.Regexp + timeoutClassification stats.PhaseTrackerConfig } func NewQueryAPI( @@ -42,15 +45,17 @@ func NewQueryAPI( logger log.Logger, codecs []v1.Codec, CORSOrigin *regexp.Regexp, + timeoutClassification stats.PhaseTrackerConfig, ) *QueryAPI { return &QueryAPI{ - queryEngine: qe, - queryable: q, - statsRenderer: statsRenderer, - logger: logger, - codecs: codecs, - CORSOrigin: CORSOrigin, - now: time.Now, + queryEngine: qe, + queryable: q, + statsRenderer: statsRenderer, + logger: logger, + codecs: codecs, + CORSOrigin: CORSOrigin, + now: time.Now, + timeoutClassification: timeoutClassification, } } @@ -84,6 +89,11 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -95,6 +105,15 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -138,6 +157,13 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -159,6 +185,11 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { } ctx := r.Context() + + // Always record query start time for phase tracking, regardless of feature flag. + queryStats := stats.FromContext(ctx) + queryStats.SetQueryStart(time.Now()) + if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc timeout, err := util.ParseDurationMs(to) @@ -170,6 +201,15 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } + cfg := q.timeoutClassification + ctx, cancel, earlyResult := applyTimeoutClassification(ctx, queryStats, cfg) + if cancel != nil { + defer cancel() + } + if earlyResult != nil { + return *earlyResult + } + opts, err := extractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} @@ -211,6 +251,13 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { res := qry.Exec(ctx) if res.Err != nil { + // If the context was cancelled/timed out, apply timeout classification. + if ctx.Err() != nil { + if classified := q.classifyTimeout(ctx, queryStats, cfg, res.Warnings, qry.Close); classified != nil { + return *classified + } + } + return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close} } @@ -281,6 +328,64 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data any, w } } +// applyTimeoutClassification creates a proactive context timeout that fires before +// the PromQL engine's own timeout, adjusted for queue wait time. Returns the +// (possibly wrapped) context, an optional cancel func, and an optional early-exit +// result when the entire timeout budget was already consumed in the queue. +func applyTimeoutClassification(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig) (context.Context, context.CancelFunc, *apiFuncResult) { + if !cfg.Enabled { + return ctx, nil, nil + } + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + effectiveTimeout := cfg.TotalTimeout - queueWaitTime + if effectiveTimeout <= 0 { + return ctx, nil, &apiFuncResult{nil, &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, + "query timed out: query spent too long in scheduler queue")}, nil, nil} + } + ctx, cancel := context.WithTimeout(ctx, effectiveTimeout) + return ctx, cancel, nil +} + +// classifyTimeout inspects phase timings after a context cancellation/timeout +// and returns an apiFuncResult if the timeout should be converted to a 4XX user error. +// Returns nil if no conversion applies and the caller should use the default error path. +func (q *QueryAPI) classifyTimeout(ctx context.Context, queryStats *stats.QueryStats, cfg stats.PhaseTrackerConfig, warnings annotations.Annotations, closer func()) *apiFuncResult { + decision := stats.DecideTimeoutResponse(queryStats, cfg) + + fetchTime := queryStats.LoadQueryStorageWallTime() + totalTime := time.Since(queryStats.LoadQueryStart()) + evalTime := totalTime - fetchTime + var queueWaitTime time.Duration + queueJoin := queryStats.LoadQueueJoinTime() + queueLeave := queryStats.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + level.Warn(q.logger).Log( + "msg", "query timed out with classification", + "request_id", requestmeta.RequestIdFromContext(ctx), + "query_start", queryStats.LoadQueryStart(), + "queue_wait_time", queueWaitTime, + "fetch_time", fetchTime, + "eval_time", evalTime, + "total_time", totalTime, + "decision", decision, + "conversion_enabled", cfg.Enabled, + ) + + if cfg.Enabled && decision == stats.UserError4XX { + return &apiFuncResult{nil, &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, + "query timed out: query spent too long in evaluation - consider simplifying your query")}, warnings, closer} + } + + return nil +} + func (q *QueryAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) { for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) { for _, codec := range q.codecs { diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 8fd27a11454..47ec69876b6 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -536,6 +536,19 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query logMessage = append(logMessage, "query_storage_wall_time_seconds", sws) } + if maxFetch := stats.LoadMaxFetchTime(); maxFetch > 0 { + logMessage = append(logMessage, "max_fetch_time", maxFetch) + } + if maxEval := stats.LoadMaxEvalTime(); maxEval > 0 { + logMessage = append(logMessage, "max_eval_time", maxEval) + } + if maxQueue := stats.LoadMaxQueueWaitTime(); maxQueue > 0 { + logMessage = append(logMessage, "max_queue_wait_time", maxQueue) + } + if maxTotal := stats.LoadMaxTotalTime(); maxTotal > 0 { + logMessage = append(logMessage, "max_total_time", maxTotal) + } + if splitInterval > 0 { logMessage = append(logMessage, "split_interval", splitInterval.String()) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8ebc66a16dc..bc865982e57 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -103,6 +103,11 @@ type Config struct { DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` HonorProjectionHints bool `yaml:"honor_projection_hints"` + + // Timeout classification flags for converting 5XX to 4XX on expensive queries. + TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"` + TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"` + TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"` } var ( @@ -114,6 +119,11 @@ var ( errInvalidSeriesBatchSize = errors.New("store gateway series batch size should be greater or equal than 0") errInvalidIngesterQueryMaxAttempts = errors.New("ingester query max attempts should be greater or equal than 1") errInvalidParquetQueryableDefaultBlockStore = errors.New("unsupported parquet queryable default block store. Supported options are tsdb and parquet") + + errTimeoutClassificationDeadlineNotPositive = errors.New("timeout_classification_deadline must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdNotPositive = errors.New("timeout_classification_eval_threshold must be positive when timeout classification is enabled") + errTimeoutClassificationEvalThresholdExceedsDeadline = errors.New("timeout_classification_eval_threshold must be less than or equal to timeout_classification_deadline") + errTimeoutClassificationDeadlineExceedsTimeout = errors.New("timeout_classification_deadline must be less than the querier timeout") ) // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -158,6 +168,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.HonorProjectionHints, "querier.honor-projection-hints", false, "[Experimental] If true, querier will honor projection hints and only materialize requested labels. Today, projection is only effective when Parquet Queryable is enabled. Projection is only applied when not querying mixed block types (parquet and non-parquet) and not querying ingesters.") f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.") + f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.") + f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", 59*time.Second, "The total time before the querier proactively cancels a query for timeout classification.") + f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", 40*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).") } // Validate the config @@ -197,6 +210,21 @@ func (cfg *Config) Validate() error { } } + if cfg.TimeoutClassificationEnabled { + if cfg.TimeoutClassificationDeadline <= 0 { + return errTimeoutClassificationDeadlineNotPositive + } + if cfg.TimeoutClassificationEvalThreshold <= 0 { + return errTimeoutClassificationEvalThresholdNotPositive + } + if cfg.TimeoutClassificationEvalThreshold > cfg.TimeoutClassificationDeadline { + return errTimeoutClassificationEvalThresholdExceedsDeadline + } + if cfg.TimeoutClassificationDeadline >= cfg.Timeout { + return errTimeoutClassificationDeadlineExceedsTimeout + } + } + if err := cfg.ThanosEngine.Validate(); err != nil { return err } diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index a834cd311e1..f7ad32725b3 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -23,6 +23,19 @@ type QueryStats struct { DataSelectMinTime int64 SplitInterval time.Duration m sync.Mutex + + // Phase tracking fields for timeout classification. + // Stored as UnixNano int64 for atomic operations. + queryStart int64 // nanosecond timestamp when query began + queueJoinTime int64 // nanosecond timestamp when request entered scheduler queue + queueLeaveTime int64 // nanosecond timestamp when request left scheduler queue + + // Max timing breakdown across sub-queries (nanoseconds). + // These use max() semantics during Merge rather than sum. + maxFetchTime int64 // max storage fetch time across sub-queries + maxEvalTime int64 // max PromQL evaluation time across sub-queries + maxQueueWaitTime int64 // max scheduler queue wait time across sub-queries + maxTotalTime int64 // max total sub-query time across sub-queries } // ContextWithEmptyStats returns a context with empty stats. @@ -306,6 +319,180 @@ func (s *QueryStats) LoadSplitInterval() time.Duration { return s.SplitInterval } +// SetQueryStart records when the query began execution. +func (s *QueryStats) SetQueryStart(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queryStart, t.UnixNano()) +} + +// LoadQueryStart returns the query start time. +func (s *QueryStats) LoadQueryStart() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queryStart) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueJoinTime records when the request entered the scheduler queue. +func (s *QueryStats) SetQueueJoinTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueJoinTime, t.UnixNano()) +} + +// LoadQueueJoinTime returns the queue join time. +func (s *QueryStats) LoadQueueJoinTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueJoinTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// SetQueueLeaveTime records when the request left the scheduler queue. +func (s *QueryStats) SetQueueLeaveTime(t time.Time) { + if s == nil { + return + } + + atomic.StoreInt64(&s.queueLeaveTime, t.UnixNano()) +} + +// LoadQueueLeaveTime returns the queue leave time. +func (s *QueryStats) LoadQueueLeaveTime() time.Time { + if s == nil { + return time.Time{} + } + + ns := atomic.LoadInt64(&s.queueLeaveTime) + if ns == 0 { + return time.Time{} + } + return time.Unix(0, ns) +} + +// updateMaxDuration atomically updates a max duration field if the new value is larger. +func updateMaxDuration(addr *int64, val time.Duration) { + new := int64(val) + for { + old := atomic.LoadInt64(addr) + if new <= old { + return + } + if atomic.CompareAndSwapInt64(addr, old, new) { + return + } + } +} + +// UpdateMaxFetchTime updates the max fetch time if the provided value is larger. +func (s *QueryStats) UpdateMaxFetchTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxFetchTime, t) +} + +// LoadMaxFetchTime returns the max fetch time across sub-queries. +func (s *QueryStats) LoadMaxFetchTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxFetchTime)) +} + +// UpdateMaxEvalTime updates the max eval time if the provided value is larger. +func (s *QueryStats) UpdateMaxEvalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxEvalTime, t) +} + +// LoadMaxEvalTime returns the max eval time across sub-queries. +func (s *QueryStats) LoadMaxEvalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxEvalTime)) +} + +// UpdateMaxQueueWaitTime updates the max queue wait time if the provided value is larger. +func (s *QueryStats) UpdateMaxQueueWaitTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxQueueWaitTime, t) +} + +// LoadMaxQueueWaitTime returns the max queue wait time across sub-queries. +func (s *QueryStats) LoadMaxQueueWaitTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxQueueWaitTime)) +} + +// UpdateMaxTotalTime updates the max total time if the provided value is larger. +func (s *QueryStats) UpdateMaxTotalTime(t time.Duration) { + if s == nil { + return + } + updateMaxDuration(&s.maxTotalTime, t) +} + +// LoadMaxTotalTime returns the max total sub-query time across sub-queries. +func (s *QueryStats) LoadMaxTotalTime() time.Duration { + if s == nil { + return 0 + } + return time.Duration(atomic.LoadInt64(&s.maxTotalTime)) +} + +// ComputeAndStoreTimingBreakdown computes the timing breakdown from phase tracking +// fields and stores them as max values. This should be called after a sub-query +// completes, before stats are sent back to the frontend. +func (s *QueryStats) ComputeAndStoreTimingBreakdown() { + if s == nil { + return + } + + queryStart := s.LoadQueryStart() + if queryStart.IsZero() { + return + } + + fetchTime := s.LoadQueryStorageWallTime() + totalTime := time.Since(queryStart) + evalTime := totalTime - fetchTime + + var queueWaitTime time.Duration + queueJoin := s.LoadQueueJoinTime() + queueLeave := s.LoadQueueLeaveTime() + if !queueJoin.IsZero() && !queueLeave.IsZero() { + queueWaitTime = queueLeave.Sub(queueJoin) + } + + s.UpdateMaxFetchTime(fetchTime) + s.UpdateMaxEvalTime(evalTime) + s.UpdateMaxQueueWaitTime(queueWaitTime) + s.UpdateMaxTotalTime(totalTime) +} + func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) { if s == nil { return @@ -396,6 +583,10 @@ func (s *QueryStats) Merge(other *QueryStats) { s.AddScannedSamples(other.LoadScannedSamples()) s.SetPeakSamples(max(s.LoadPeakSamples(), other.LoadPeakSamples())) s.AddExtraFields(other.LoadExtraFields()...) + s.UpdateMaxFetchTime(other.LoadMaxFetchTime()) + s.UpdateMaxEvalTime(other.LoadMaxEvalTime()) + s.UpdateMaxQueueWaitTime(other.LoadMaxQueueWaitTime()) + s.UpdateMaxTotalTime(other.LoadMaxTotalTime()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/timeout_decision.go b/pkg/querier/stats/timeout_decision.go new file mode 100644 index 00000000000..e1848fca78d --- /dev/null +++ b/pkg/querier/stats/timeout_decision.go @@ -0,0 +1,47 @@ +package stats + +import "time" + +// TimeoutDecision represents the classification of a query timeout. +type TimeoutDecision int + +const ( + // Default5XX means return 503 (current behavior). + Default5XX TimeoutDecision = iota + // UserError4XX means the query is too expensive, return 422. + UserError4XX +) + +// PhaseTrackerConfig holds configurable thresholds for timeout classification. +type PhaseTrackerConfig struct { + // TotalTimeout is the total time before the querier cancels the query context. + TotalTimeout time.Duration + + // EvalTimeThreshold is the eval time above which a timeout is classified as user error (4XX). + EvalTimeThreshold time.Duration + + // Enabled controls whether the 5XX-to-4XX conversion is active. + Enabled bool +} + +// DecideTimeoutResponse inspects QueryStats phase timings and returns a TimeoutDecision. +// It returns UserError4XX if eval time exceeds the threshold, Default5XX otherwise. +// It is a pure function that does not modify stats or cfg. +func DecideTimeoutResponse(stats *QueryStats, cfg PhaseTrackerConfig) TimeoutDecision { + if stats == nil { + return Default5XX + } + + queryStart := stats.LoadQueryStart() + if queryStart.IsZero() { + return Default5XX + } + + evalTime := time.Since(queryStart) - stats.LoadQueryStorageWallTime() + + if evalTime > cfg.EvalTimeThreshold { + return UserError4XX + } + + return Default5XX +} diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 88f7f311393..923c9576fc4 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -156,6 +156,9 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + // Ensure responses that are too big are not retried. if len(response.Body) >= fp.maxMessageSize { errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 3bba5980442..eb9d9b138b1 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -193,6 +193,9 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, level.Info(logger).Log("msg", "finished request", "status_code", response.Code, "response_size", len(response.GetBody())) } + // Compute timing breakdown before sending stats back to the frontend. + stats.ComputeAndStoreTimingBreakdown() + if err = ctx.Err(); err != nil { return } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 882ed166174..3ee91e80f33 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -424,6 +424,11 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr req.enqueueTime = now req.ctxCancel = cancel + // Record queue join time for timeout classification phase tracking. + if qStats := stats.FromContext(ctx); qStats != nil { + qStats.SetQueueJoinTime(now) + } + // aggregate the max queriers limit in the case of a multi tenant query tenantIDs, err := users.TenantIDsFromOrgID(userID) if err != nil { @@ -527,6 +532,11 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL r := req.(*schedulerRequest) + // Record queue leave time for timeout classification phase tracking. + if qStats := stats.FromContext(r.ctx); qStats != nil { + qStats.SetQueueLeaveTime(time.Now()) + } + s.queueDuration.Observe(time.Since(r.enqueueTime).Seconds()) r.queueSpan.Finish()