From c6cc6eda70d5eb2b1f0d17db7d4cea24ab1fb3a2 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 17:41:18 +0800 Subject: [PATCH 01/11] proxy,metrics: add query interaction latency observability Comments: - Add per-interaction latency histogram, slow interaction logging, and backend metric label GC with TTL. - Add dynamic runtime settings via config hot reload for thresholds and GC interval/TTL. - Add design doc and usage manual under docs/ for rollout, tuning, and capacity planning guidance. - Keep backward compatibility by preserving existing query duration semantics and defaulting new feature off. --- README.md | 43 +++++++ conf/proxy.toml | 15 +++ docs/query-interaction-latency-design.md | 131 ++++++++++++++++++++ docs/query-interaction-latency-usage.md | 146 +++++++++++++++++++++++ lib/config/proxy.go | 18 ++- lib/config/proxy_test.go | 24 +++- pkg/metrics/interaction.go | 60 ++++++++++ pkg/metrics/interaction_test.go | 30 +++++ pkg/metrics/metrics.go | 15 +++ pkg/metrics/session.go | 9 ++ pkg/proxy/backend/cmd_processor_exec.go | 98 ++++++++++++--- pkg/proxy/backend/metrics.go | 82 ++++++++++++- pkg/proxy/backend/metrics_test.go | 37 ++++++ pkg/proxy/proxy.go | 5 + pkg/proxy/proxy_test.go | 12 +- pkg/server/api/config_test.go | 5 +- 16 files changed, 706 insertions(+), 24 deletions(-) create mode 100644 docs/query-interaction-latency-design.md create mode 100644 docs/query-interaction-latency-usage.md create mode 100644 pkg/metrics/interaction.go create mode 100644 pkg/metrics/interaction_test.go diff --git a/README.md b/README.md index c56efe46..e450563b 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,49 @@ bin/tiproxy --config=conf/proxy.toml mysql -h127.0.0.1 -uroot -P6000 ``` +## Interaction Latency Observability + +TiProxy can expose per-interaction latency: + +- Interaction latency: from forwarding one MySQL command to TiDB, until receiving the first response packet from TiDB. +- Command duration (`tiproxy_session_query_duration_seconds`) still exists and keeps the original meaning. + +Configure it in `proxy.toml`: + +```toml +[advance] +query-interaction-metrics = true +query-interaction-slow-log-threshold-ms = 200 +backend-metrics-gc-interval-seconds = 300 +backend-metrics-gc-idle-seconds = 3600 +``` + +- `query-interaction-slow-log-threshold-ms`: + - `0` disables slow interaction logs. + - positive values log interactions slower than threshold. +- `backend-metrics-gc-idle-seconds`: + - removes idle backend label series to control in-memory metric cache growth. + - `0` disables metric GC. +- `backend-metrics-gc-interval-seconds`: + - controls GC sweep frequency. + - `0` disables metric GC. + +These options support dynamic update through `PUT /api/admin/config`, so no restart is required. + +Detailed docs: + +- Design: [`docs/query-interaction-latency-design.md`](docs/query-interaction-latency-design.md) +- Usage: [`docs/query-interaction-latency-usage.md`](docs/query-interaction-latency-usage.md) + +### Resource Sizing Notes + +Enabling interaction latency metrics increases CPU and memory usage because each interaction adds extra histogram observation and optional slow-log checks. + +- Recommended initial production reservation after enabling: + - CPU: +15% + - Memory: +10% +- Re-check and tune by workload. Use your own benchmark and metrics data before full rollout. + ## Code of Conduct This project is for everyone. We ask that our users and contributors take a few minutes to review our [Code of Conduct](code-of-conduct.md). diff --git a/conf/proxy.toml b/conf/proxy.toml index 57c4b837..189dfd07 100644 --- a/conf/proxy.toml +++ b/conf/proxy.toml @@ -112,3 +112,18 @@ graceful-close-conn-timeout = 15 [advance] # ignore-wrong-namespace = true + +# enable per-interaction latency metrics: +# request sent to TiDB -> first response packet received from TiDB +# query-interaction-metrics = false + +# slow interaction log threshold in milliseconds: +# 0 => disable slow interaction logs +# 200 => log interactions slower than 200ms +# query-interaction-slow-log-threshold-ms = 200 + +# backend metrics GC: +# backend labels not updated for `backend-metrics-gc-idle-seconds` will be removed from in-memory caches and metric vectors +# set `backend-metrics-gc-interval-seconds = 0` or `backend-metrics-gc-idle-seconds = 0` to disable GC +# backend-metrics-gc-interval-seconds = 300 +# backend-metrics-gc-idle-seconds = 3600 diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md new file mode 100644 index 00000000..ae57fe35 --- /dev/null +++ b/docs/query-interaction-latency-design.md @@ -0,0 +1,131 @@ +# TiProxy Query Interaction Latency Design + +## 1. 背景 + +TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时,但无法直接回答以下问题: + +- 单次 MySQL 交互(request -> backend first response)的延迟是否异常。 +- 出现慢请求时,是 TiDB 后端慢,还是 TiProxy 自身引入额外时延。 +- backend 数量或地址变化后,metrics label 是否会长期累积占用内存。 + +为此,本设计新增交互延迟观测、慢交互日志、以及 backend metrics label GC。 + +## 2. 目标与非目标 + +### 2.1 目标 + +- 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type` 维度分析。 +- 支持慢交互日志阈值动态修改,无需重启 TiProxy。 +- 控制 metrics label 内存增长,支持 TTL 回收。 +- 保持线上稳定:默认关闭交互指标,开关与阈值可热更新。 + +### 2.2 非目标 + +- 不提供完整 SQL tracing/span。 +- 不改变已有 `query_duration_seconds` 指标语义。 +- 不对外暴露每一条请求的全量明细存储。 + +## 3. 术语定义 + +- Interaction:一轮 MySQL 命令交互,从 TiProxy 把 request 转发到 TiDB 后开始,直到收到 TiDB 的第一个 response packet 结束。 +- Command Duration:现有命令完整处理耗时(已有指标,不变)。 +- Interaction Duration:本次新增的首包响应延迟。 + +## 4. 配置设计 + +新增 `[advance]` 配置项: + +- `query-interaction-metrics` (bool) + - 是否开启交互延迟观测。 + - 默认:`false`。 +- `query-interaction-slow-log-threshold-ms` (int) + - 慢交互日志阈值,单位毫秒。 + - `0` 表示关闭慢日志。 + - 默认:`200`。 +- `backend-metrics-gc-interval-seconds` (int) + - backend metrics GC 扫描周期。 + - `0` 表示关闭 GC。 + - 默认:`300`。 +- `backend-metrics-gc-idle-seconds` (int) + - backend idle TTL,超过 TTL 未更新则回收其 metrics labels。 + - `0` 表示关闭 GC。 + - 默认:`3600`。 + +所有配置支持通过 `PUT /api/admin/config` 动态更新。 + +## 5. 指标与日志设计 + +### 5.1 新增指标 + +- `tiproxy_session_query_interaction_duration_seconds` (HistogramVec) + - Labels: `backend`, `cmd_type` + - Bucket:与 `query_duration_seconds` 对齐。 + +### 5.2 慢交互日志 + +当 `interaction_duration >= threshold` 时记录 `Warn` 日志: + +- 固定字段:`interaction_duration`, `cmd`, `backend_addr` +- 条件字段: + - `query`:仅 `COM_QUERY`,经过 normalize 并截断 + - `stmt_id`:`COM_STMT_*` 且包体含 statement id 时 + +## 6. 数据路径与埋点位置 + +埋点位于 `CmdProcessor` 转发路径,按“每一轮交互”采集: + +- 单响应命令:收到首包时采集一次。 +- `COM_QUERY`/`COM_STMT_EXECUTE` 多结果:每轮结果单独采集。 +- `LOAD DATA LOCAL INFILE`:包含本地文件阶段后的最终返回轮次。 +- `COM_CHANGE_USER`:包含 auth switch 多轮交互。 +- 无响应命令(如 `COM_QUIT`)不采集。 + +## 7. Backend Metrics GC 设计 + +在 backend metrics cache 维护 `lastSeen`: + +1. 每次 metrics 更新刷新 `lastSeen`。 +2. 到达 GC interval 时触发 sweep。 +3. 若 `now - lastSeen > idleTTL`: + - 删除缓存节点。 + - 调用 Prometheus `DeleteLabelValues` 回收对应 labels。 + +回收范围: + +- `query_total` +- `query_duration_seconds` +- `query_interaction_duration_seconds` +- `handshake_duration_seconds` +- traffic counters + +## 8. 性能与稳定性考虑 + +- 默认关闭交互指标,避免默认增量开销。 +- 开启后增量主要是: + - monotonic time 计算 + - histogram observe + - 慢日志阈值判断 +- GC 采用“低频 + TTL”策略,避免每次请求都做全量扫描。 + +建议上线预留资源: + +- CPU: +15% +- Memory: +10% + +实际值需结合业务流量压测复核。 + +## 9. 测试策略 + +- 配置测试:新字段默认值、序列化、负值校验。 +- 热更新测试:`WatchConfig` 下参数动态生效。 +- 指标测试: + - interaction histogram sample count 增长 + - TTL GC 可删除 stale backend labels +- 协议路径回归:`pkg/proxy/backend` 全量测试通过。 + +## 10. 兼容性 + +- 原有指标与语义保持不变。 +- 新配置均有默认值,升级兼容旧配置文件。 +- 若需完全关闭回收,将 `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`。 + diff --git a/docs/query-interaction-latency-usage.md b/docs/query-interaction-latency-usage.md new file mode 100644 index 00000000..fc9907a2 --- /dev/null +++ b/docs/query-interaction-latency-usage.md @@ -0,0 +1,146 @@ +# TiProxy Query Interaction Latency 使用手册 + +## 1. 功能说明 + +本功能用于观测 TiProxy 到 TiDB 的“首包响应延迟”,并支持: + +- Prometheus 聚合指标分析 +- 慢交互日志定位 +- backend metrics label 自动 GC +- 运行中动态调参,无需重启 + +## 2. 快速启用 + +在 `proxy.toml` 中添加: + +```toml +[advance] +query-interaction-metrics = true +query-interaction-slow-log-threshold-ms = 200 +backend-metrics-gc-interval-seconds = 300 +backend-metrics-gc-idle-seconds = 3600 +``` + +说明: + +- `query-interaction-metrics=false`:关闭交互延迟指标(默认)。 +- `query-interaction-slow-log-threshold-ms=0`:关闭慢交互日志。 +- `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`:关闭 metrics GC。 + +## 3. 动态修改(不中断服务) + +通过管理接口: + +```bash +curl -X PUT http://127.0.0.1:3080/api/admin/config -d ' +advance.query-interaction-metrics = true +advance.query-interaction-slow-log-threshold-ms = 1000 +advance.backend-metrics-gc-interval-seconds = 120 +advance.backend-metrics-gc-idle-seconds = 1800 +' +``` + +常见动态操作: + +- 临时排障:将阈值从 `200` 调到 `50` 或 `20`,快速捕获慢交互日志。 +- 排障完成:将阈值恢复,或直接关掉慢日志(设为 `0`)。 + +## 4. 指标查看 + +新增指标: + +- `tiproxy_session_query_interaction_duration_seconds` + +关键标签: + +- `backend` +- `cmd_type` + +PromQL 示例: + +```promql +# 全局 P99 interaction latency +histogram_quantile( + 0.99, + sum(rate(tiproxy_session_query_interaction_duration_seconds_bucket[1m])) by (le) +) +``` + +```promql +# 按 backend 看 P99 interaction latency +histogram_quantile( + 0.99, + sum(rate(tiproxy_session_query_interaction_duration_seconds_bucket[1m])) by (le, backend) +) +``` + +```promql +# 对比 command duration 与 interaction duration 的均值差 +( + sum(rate(tiproxy_session_query_duration_seconds_sum[1m])) / + sum(rate(tiproxy_session_query_duration_seconds_count[1m])) +) +- +( + sum(rate(tiproxy_session_query_interaction_duration_seconds_sum[1m])) / + sum(rate(tiproxy_session_query_interaction_duration_seconds_count[1m])) +) +``` + +## 5. 慢交互日志 + +日志名: + +- `slow mysql interaction` + +字段: + +- `interaction_duration` +- `cmd` +- `backend_addr` +- `query`(仅 `COM_QUERY`) +- `stmt_id`(适用 `COM_STMT_*`) + +建议: + +- 生产默认 `200ms` 或更高。 +- 高峰期若日志量偏大,可临时提高阈值。 + +## 6. 内存与 CPU 容量建议 + +启用交互指标后,建议初始资源预留: + +- CPU:+15% +- 内存:+10% + +原因: + +- 每次交互多一次 histogram observe。 +- 慢日志阈值判断和可能的日志写入。 +- backend metrics label cache 维护与周期性 GC。 + +上线前建议: + +1. 在预发环境以真实流量压测 30~60 分钟。 +2. 观察 `process_resident_memory_bytes`、`go_gc_duration_seconds`、CPU 使用率。 +3. 调整阈值与 GC 参数到稳定区间后再全量上线。 + +## 7. 故障排查 + +### 7.1 看不到 interaction 指标 + +- 确认 `advance.query-interaction-metrics = true`。 +- 确认 `/api/admin/config` 返回已生效值。 +- 确认有真实 SQL 流量经过 TiProxy。 + +### 7.2 backend labels 增长过快 + +- 缩短 `backend-metrics-gc-idle-seconds`。 +- 缩短 `backend-metrics-gc-interval-seconds`。 +- 检查 backend 地址是否高频变化(例如短周期扩缩容)。 + +### 7.3 慢日志太多 + +- 提高 `query-interaction-slow-log-threshold-ms`。 +- 排障窗口之外可设置为 `0` 关闭慢日志。 + diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 45379f13..dccfdaf7 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -70,7 +70,11 @@ type API struct { } type Advance struct { - IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` + IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` + QueryInteractionMetrics bool `yaml:"query-interaction-metrics,omitempty" toml:"query-interaction-metrics,omitempty" json:"query-interaction-metrics,omitempty"` + QueryInteractionSlowLogThreshold int `yaml:"query-interaction-slow-log-threshold-ms,omitempty" toml:"query-interaction-slow-log-threshold-ms,omitempty" json:"query-interaction-slow-log-threshold-ms,omitempty"` + BackendMetricsGCInterval int `yaml:"backend-metrics-gc-interval-seconds,omitempty" toml:"backend-metrics-gc-interval-seconds,omitempty" json:"backend-metrics-gc-interval-seconds,omitempty"` + BackendMetricsGCIdle int `yaml:"backend-metrics-gc-idle-seconds,omitempty" toml:"backend-metrics-gc-idle-seconds,omitempty" json:"backend-metrics-gc-idle-seconds,omitempty"` } type LogOnline struct { @@ -149,6 +153,9 @@ func NewConfig() *Config { cfg.Log.LogFile.MaxBackups = 3 cfg.Advance.IgnoreWrongNamespace = true + cfg.Advance.QueryInteractionSlowLogThreshold = 200 + cfg.Advance.BackendMetricsGCInterval = 300 + cfg.Advance.BackendMetricsGCIdle = 3600 cfg.Security.SQLTLS.MinTLSVersion = "1.2" cfg.Security.ServerSQLTLS.MinTLSVersion = "1.2" cfg.Security.ServerHTTPTLS.MinTLSVersion = "1.2" @@ -181,6 +188,15 @@ func (cfg *Config) Check() error { if cfg.Proxy.ConnBufferSize > 0 && (cfg.Proxy.ConnBufferSize > 16*1024*1024 || cfg.Proxy.ConnBufferSize < 1024) { return errors.Wrapf(ErrInvalidConfigValue, "conn-buffer-size must be between 1K and 16M") } + if cfg.Advance.QueryInteractionSlowLogThreshold < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "query-interaction-slow-log-threshold-ms cannot be negative") + } + if cfg.Advance.BackendMetricsGCInterval < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "backend-metrics-gc-interval-seconds cannot be negative") + } + if cfg.Advance.BackendMetricsGCIdle < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "backend-metrics-gc-idle-seconds cannot be negative") + } return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 7f49be50..79de3540 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -15,7 +15,11 @@ import ( var testProxyConfig = Config{ Workdir: "./wd", Advance: Advance{ - IgnoreWrongNamespace: true, + IgnoreWrongNamespace: true, + QueryInteractionMetrics: true, + QueryInteractionSlowLogThreshold: 500, + BackendMetricsGCInterval: 60, + BackendMetricsGCIdle: 300, }, Proxy: ProxyServer{ Addr: "0.0.0.0:4000", @@ -113,6 +117,24 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, + { + pre: func(t *testing.T, c *Config) { + c.Advance.QueryInteractionSlowLogThreshold = -1 + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.Advance.BackendMetricsGCInterval = -1 + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.Advance.BackendMetricsGCIdle = -1 + }, + err: ErrInvalidConfigValue, + }, } for _, tc := range testcases { cfg := testProxyConfig diff --git a/pkg/metrics/interaction.go b/pkg/metrics/interaction.go new file mode 100644 index 00000000..00778de6 --- /dev/null +++ b/pkg/metrics/interaction.go @@ -0,0 +1,60 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "sync/atomic" + "time" +) + +var queryInteractionEnabled atomic.Bool +var queryInteractionSlowLogThreshold atomic.Int64 +var backendMetricsGCInterval atomic.Int64 +var backendMetricsGCIdleTTL atomic.Int64 + +func init() { + queryInteractionSlowLogThreshold.Store(int64(200 * time.Millisecond)) + backendMetricsGCInterval.Store(int64(5 * time.Minute)) + backendMetricsGCIdleTTL.Store(int64(time.Hour)) +} + +// SetQueryInteractionEnabled updates whether per-interaction query latency metrics are emitted. +func SetQueryInteractionEnabled(enabled bool) { + queryInteractionEnabled.Store(enabled) +} + +// QueryInteractionEnabled returns whether per-interaction query latency metrics are enabled. +func QueryInteractionEnabled() bool { + return queryInteractionEnabled.Load() +} + +// SetQueryInteractionSlowLogThreshold updates the slow log threshold of per-interaction latency. +func SetQueryInteractionSlowLogThreshold(threshold time.Duration) { + queryInteractionSlowLogThreshold.Store(int64(threshold)) +} + +// QueryInteractionSlowLogThreshold returns the slow log threshold of per-interaction latency. +func QueryInteractionSlowLogThreshold() time.Duration { + return time.Duration(queryInteractionSlowLogThreshold.Load()) +} + +// SetBackendMetricsGCInterval updates how often backend metric labels are GC'ed. +func SetBackendMetricsGCInterval(interval time.Duration) { + backendMetricsGCInterval.Store(int64(interval)) +} + +// BackendMetricsGCInterval returns the GC interval for backend metric labels. +func BackendMetricsGCInterval() time.Duration { + return time.Duration(backendMetricsGCInterval.Load()) +} + +// SetBackendMetricsGCIdleTTL updates the idle TTL for backend metric labels. +func SetBackendMetricsGCIdleTTL(ttl time.Duration) { + backendMetricsGCIdleTTL.Store(int64(ttl)) +} + +// BackendMetricsGCIdleTTL returns the idle TTL for backend metric labels. +func BackendMetricsGCIdleTTL() time.Duration { + return time.Duration(backendMetricsGCIdleTTL.Load()) +} diff --git a/pkg/metrics/interaction_test.go b/pkg/metrics/interaction_test.go new file mode 100644 index 00000000..39b8dc63 --- /dev/null +++ b/pkg/metrics/interaction_test.go @@ -0,0 +1,30 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestInteractionSettings(t *testing.T) { + SetQueryInteractionEnabled(false) + require.False(t, QueryInteractionEnabled()) + SetQueryInteractionEnabled(true) + require.True(t, QueryInteractionEnabled()) + + threshold := 321 * time.Millisecond + SetQueryInteractionSlowLogThreshold(threshold) + require.Equal(t, threshold, QueryInteractionSlowLogThreshold()) + + interval := 123 * time.Second + SetBackendMetricsGCInterval(interval) + require.Equal(t, interval, BackendMetricsGCInterval()) + + ttl := 456 * time.Second + SetBackendMetricsGCIdleTTL(ttl) + require.Equal(t, ttl, BackendMetricsGCIdleTTL()) +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 2b65b985..ddfbbc1a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,6 +8,7 @@ package metrics import ( "context" + "fmt" "regexp" "runtime" "sync" @@ -100,6 +101,7 @@ func registerProxyMetrics() { prometheus.MustRegister(KeepAliveCounter) prometheus.MustRegister(QueryTotalCounter) prometheus.MustRegister(QueryDurationHistogram) + prometheus.MustRegister(QueryInteractionDurationHistogram) prometheus.MustRegister(HandshakeDurationHistogram) prometheus.MustRegister(BackendStatusGauge) prometheus.MustRegister(GetBackendHistogram) @@ -132,3 +134,16 @@ func ReadGauge(gauge prometheus.Gauge) (float64, error) { } return metric.Gauge.GetValue(), nil } + +// ReadHistogramSampleCount reads the sample count from the observer. It is only used for testing. +func ReadHistogramSampleCount(observer prometheus.Observer) (uint64, error) { + metric, ok := observer.(prometheus.Metric) + if !ok { + return 0, fmt.Errorf("observer %T does not implement prometheus.Metric", observer) + } + var dtoMetric dto.Metric + if err := metric.Write(&dtoMetric); err != nil { + return 0, err + } + return dtoMetric.Histogram.GetSampleCount(), nil +} diff --git a/pkg/metrics/session.go b/pkg/metrics/session.go index 62dff305..f4c575e9 100644 --- a/pkg/metrics/session.go +++ b/pkg/metrics/session.go @@ -31,6 +31,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days }, []string{LblBackend, LblCmdType}) + QueryInteractionDurationHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ModuleProxy, + Subsystem: LabelSession, + Name: "query_interaction_duration_seconds", + Help: "Bucketed histogram of request to first response latency (s) for handled commands.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days + }, []string{LblBackend, LblCmdType}) + HandshakeDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: ModuleProxy, diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index 402fa4ed..24704c5d 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -10,7 +10,9 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tiproxy/lib/util/errors" + "github.com/pingcap/tiproxy/pkg/metrics" pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/pingcap/tiproxy/pkg/util/monotime" "github.com/siddontang/go/hack" "go.uber.org/zap" ) @@ -39,6 +41,7 @@ func (cp *CmdProcessor) executeCmd(request []byte, clientIO, backendIO *pnet.Pac func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, request []byte) error { cmd := pnet.Command(request[0]) + interactionStart := monotime.Now() // ComChangeUser is special: we need to modify the packet before forwarding. if cmd != pnet.ComChangeUser { if err := backendIO.WritePacket(request, true); err != nil { @@ -47,11 +50,11 @@ func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, reque } switch cmd { case pnet.ComStmtPrepare: - return cp.forwardPrepareCmd(clientIO, backendIO) + return cp.forwardPrepareCmd(clientIO, backendIO, request, interactionStart) case pnet.ComStmtFetch: - return cp.forwardFetchCmd(clientIO, backendIO, request) + return cp.forwardFetchCmd(clientIO, backendIO, request, interactionStart) case pnet.ComQuery, pnet.ComStmtExecute, pnet.ComProcessInfo: - return cp.forwardQueryCmd(clientIO, backendIO, request) + return cp.forwardQueryCmd(clientIO, backendIO, request, interactionStart) case pnet.ComStmtClose: return cp.forwardCloseCmd(request) case pnet.ComStmtSendLongData: @@ -59,9 +62,9 @@ func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, reque case pnet.ComChangeUser: return cp.forwardChangeUserCmd(clientIO, backendIO, request) case pnet.ComStatistics: - return cp.forwardStatisticsCmd(clientIO, backendIO) + return cp.forwardStatisticsCmd(clientIO, backendIO, request, interactionStart) case pnet.ComFieldList: - return cp.forwardFieldListCmd(clientIO, backendIO, request) + return cp.forwardFieldListCmd(clientIO, backendIO, request, interactionStart) case pnet.ComQuit: return cp.forwardQuitCmd() } @@ -71,6 +74,7 @@ func (cp *CmdProcessor) forwardCommand(clientIO, backendIO *pnet.PacketIO, reque if err != nil { return err } + cp.observeInteraction(request, backendIO, interactionStart) switch response[0] { case pnet.OKHeader.Byte(): cp.handleOKPacket(request, response) @@ -96,9 +100,14 @@ func forwardOnePacket(destIO, srcIO *pnet.PacketIO, flush bool) (data []byte, er return data, destIO.WritePacket(data, flush) } -func (cp *CmdProcessor) forwardUntilResultEnd(clientIO, backendIO *pnet.PacketIO, request []byte) (uint16, error) { +func (cp *CmdProcessor) forwardUntilResultEnd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time, observeFirstPacket bool) (uint16, error) { var serverStatus uint16 + observed := !observeFirstPacket err := backendIO.ForwardUntil(clientIO, func(firstByte byte, length int) (end, needData bool) { + if !observed { + observed = true + cp.observeInteraction(request, backendIO, interactionStart) + } switch { case pnet.IsErrorPacket(firstByte): return true, true @@ -125,11 +134,12 @@ func (cp *CmdProcessor) forwardUntilResultEnd(clientIO, backendIO *pnet.PacketIO return serverStatus, err } -func (cp *CmdProcessor) forwardPrepareCmd(clientIO, backendIO *pnet.PacketIO) error { +func (cp *CmdProcessor) forwardPrepareCmd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time) error { response, err := forwardOnePacket(clientIO, backendIO, false) if err != nil { return err } + cp.observeInteraction(request, backendIO, interactionStart) switch response[0] { case pnet.OKHeader.Byte(): // The OK packet doesn't contain a server status. @@ -167,22 +177,27 @@ func (cp *CmdProcessor) forwardPrepareCmd(clientIO, backendIO *pnet.PacketIO) er return errors.Errorf("unexpected response, cmd:%d resp:%d", pnet.ComStmtPrepare, response[0]) } -func (cp *CmdProcessor) forwardFetchCmd(clientIO, backendIO *pnet.PacketIO, request []byte) error { - _, err := cp.forwardUntilResultEnd(clientIO, backendIO, request) +func (cp *CmdProcessor) forwardFetchCmd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time) error { + _, err := cp.forwardUntilResultEnd(clientIO, backendIO, request, interactionStart, true) return err } -func (cp *CmdProcessor) forwardFieldListCmd(clientIO, backendIO *pnet.PacketIO, request []byte) error { - _, err := cp.forwardUntilResultEnd(clientIO, backendIO, request) +func (cp *CmdProcessor) forwardFieldListCmd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time) error { + _, err := cp.forwardUntilResultEnd(clientIO, backendIO, request, interactionStart, true) return err } -func (cp *CmdProcessor) forwardQueryCmd(clientIO, backendIO *pnet.PacketIO, request []byte) error { +func (cp *CmdProcessor) forwardQueryCmd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time) error { for { var serverStatus uint16 var first byte + observed := false err := backendIO.ForwardUntil(clientIO, func(firstByte byte, _ int) (end, needData bool) { first = firstByte + if !observed { + observed = true + cp.observeInteraction(request, backendIO, interactionStart) + } switch firstByte { case pnet.OKHeader.Byte(), pnet.ErrHeader.Byte(): return true, true @@ -215,6 +230,7 @@ func (cp *CmdProcessor) forwardQueryCmd(clientIO, backendIO *pnet.PacketIO, requ if serverStatus&pnet.ServerMoreResultsExists == 0 { break } + interactionStart = monotime.Now() } return nil } @@ -238,9 +254,11 @@ func (cp *CmdProcessor) forwardLoadInFile(clientIO, backendIO *pnet.PacketIO, re } } var response []byte + interactionStart := monotime.Now() if response, err = forwardOnePacket(clientIO, backendIO, true); err != nil { return } + cp.observeInteraction(request, backendIO, interactionStart) switch response[0] { case pnet.OKHeader.Byte(): return cp.handleOKPacket(request, response), nil @@ -272,7 +290,7 @@ func (cp *CmdProcessor) forwardResultSet(clientIO, backendIO *pnet.PacketIO, req } } // Deprecate EOF or no cursor. - return cp.forwardUntilResultEnd(clientIO, backendIO, request) + return cp.forwardUntilResultEnd(clientIO, backendIO, request, 0, false) } func (cp *CmdProcessor) forwardCloseCmd(request []byte) error { @@ -301,6 +319,7 @@ func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, // See https://github.com/pingcap/tiproxy/issues/127. req.AuthPlugin = unknownAuthPlugin req.AuthData = nil + interactionStart := monotime.Now() if err := backendIO.WritePacket(pnet.MakeChangeUser(req, cp.capability), true); err != nil { return err } @@ -310,6 +329,7 @@ func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, if err != nil { return err } + cp.observeInteraction(request, backendIO, interactionStart) switch response[0] { case pnet.OKHeader.Byte(): cp.handleOKPacket(request, response) @@ -321,13 +341,17 @@ func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, if _, err = forwardOnePacket(backendIO, clientIO, true); err != nil { return err } + interactionStart = monotime.Now() } } } -func (cp *CmdProcessor) forwardStatisticsCmd(clientIO, backendIO *pnet.PacketIO) error { +func (cp *CmdProcessor) forwardStatisticsCmd(clientIO, backendIO *pnet.PacketIO, request []byte, interactionStart monotime.Time) error { // It just sends a string. _, err := forwardOnePacket(clientIO, backendIO, true) + if err == nil { + cp.observeInteraction(request, backendIO, interactionStart) + } return err } @@ -368,3 +392,49 @@ func isBeginStmt(query string) bool { normalized := parser.Normalize(query, "ON") return strings.HasPrefix(normalized, "begin") || strings.HasPrefix(normalized, "start transaction") } + +func (cp *CmdProcessor) observeInteraction(request []byte, backendIO *pnet.PacketIO, start monotime.Time) { + if !metrics.QueryInteractionEnabled() { + return + } + if start == 0 || len(request) == 0 { + return + } + cmd := pnet.Command(request[0]) + if cmd >= pnet.ComEnd { + return + } + duration := monotime.Since(start) + addr := backendIO.RemoteAddr().String() + addCmdInteractionMetrics(cmd, addr, duration) + + threshold := metrics.QueryInteractionSlowLogThreshold() + if threshold <= 0 || duration < threshold { + return + } + fields := []zap.Field{ + zap.Duration("interaction_duration", duration), + zap.Stringer("cmd", cmd), + zap.String("backend_addr", addr), + } + if cmd == pnet.ComQuery { + query := parser.Normalize(pnet.ParseQueryPacket(request[1:]), "ON") + if len(query) > 256 { + query = query[:256] + } + fields = append(fields, zap.String("query", query)) + } + if isStmtCmd(cmd) && len(request) >= 5 { + fields = append(fields, zap.Uint32("stmt_id", binary.LittleEndian.Uint32(request[1:5]))) + } + cp.logger.Warn("slow mysql interaction", fields...) +} + +func isStmtCmd(cmd pnet.Command) bool { + switch cmd { + case pnet.ComStmtExecute, pnet.ComStmtSendLongData, pnet.ComStmtClose, pnet.ComStmtReset, pnet.ComStmtFetch: + return true + default: + return false + } +} diff --git a/pkg/proxy/backend/metrics.go b/pkg/proxy/backend/metrics.go index 37a9afbd..650acefc 100644 --- a/pkg/proxy/backend/metrics.go +++ b/pkg/proxy/backend/metrics.go @@ -14,8 +14,9 @@ import ( ) type mcPerCmd struct { - counter prometheus.Counter - observer prometheus.Observer + counter prometheus.Counter + observer prometheus.Observer + interactionObserver prometheus.Observer } type mcPerBackend struct { @@ -27,11 +28,13 @@ type mcPerBackend struct { outBytes prometheus.Counter outPackets prometheus.Counter handshakeDuration prometheus.Observer + lastSeen monotime.Time } type cmdMetricsCache struct { sync.Mutex backendMetrics map[string]*mcPerBackend + lastGCTime monotime.Time } func newCmdMetricsCache() cmdMetricsCache { @@ -43,9 +46,13 @@ func newCmdMetricsCache() cmdMetricsCache { var cache = newCmdMetricsCache() func addCmdMetrics(cmd pnet.Command, addr string, startTime monotime.Time) { + if cmd >= pnet.ComEnd { + return + } + now := monotime.Now() cache.Lock() defer cache.Unlock() - backendMetrics := ensureBackendMetrics(addr) + backendMetrics := ensureBackendMetrics(addr, now) mc := &backendMetrics.cmds[cmd] if mc.counter == nil { label := cmd.String() @@ -55,20 +62,40 @@ func addCmdMetrics(cmd pnet.Command, addr string, startTime monotime.Time) { mc.counter.Inc() cost := monotime.Since(startTime) mc.observer.Observe(cost.Seconds()) + cache.maybeRunGC(now) +} + +func addCmdInteractionMetrics(cmd pnet.Command, addr string, duration time.Duration) { + if cmd >= pnet.ComEnd { + return + } + now := monotime.Now() + cache.Lock() + defer cache.Unlock() + backendMetrics := ensureBackendMetrics(addr, now) + mc := &backendMetrics.cmds[cmd] + if mc.interactionObserver == nil { + label := cmd.String() + mc.interactionObserver = metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, label) + } + mc.interactionObserver.Observe(duration.Seconds()) + cache.maybeRunGC(now) } func addTraffic(addr string, inBytes, inPackets, outBytes, outPackets uint64) { + now := monotime.Now() cache.Lock() defer cache.Unlock() // Updating traffic per IO costs too much CPU, so update it per command. - backendMetrics := ensureBackendMetrics(addr) + backendMetrics := ensureBackendMetrics(addr, now) backendMetrics.inBytes.Add(float64(inBytes)) backendMetrics.inPackets.Add(float64(inPackets)) backendMetrics.outBytes.Add(float64(outBytes)) backendMetrics.outPackets.Add(float64(outPackets)) + cache.maybeRunGC(now) } -func ensureBackendMetrics(addr string) *mcPerBackend { +func ensureBackendMetrics(addr string, now monotime.Time) *mcPerBackend { backendMetrics, ok := cache.backendMetrics[addr] if !ok { backendMetrics = &mcPerBackend{ @@ -81,15 +108,56 @@ func ensureBackendMetrics(addr string) *mcPerBackend { } cache.backendMetrics[addr] = backendMetrics } + backendMetrics.lastSeen = now return backendMetrics } +func (cache *cmdMetricsCache) maybeRunGC(now monotime.Time) { + ttl := metrics.BackendMetricsGCIdleTTL() + interval := metrics.BackendMetricsGCInterval() + if ttl <= 0 || interval <= 0 { + return + } + if cache.lastGCTime > 0 && now.Before(cache.lastGCTime.Add(interval)) { + return + } + cache.lastGCTime = now + expireBefore := now.Sub(ttl) + for addr, backendMetrics := range cache.backendMetrics { + if backendMetrics.lastSeen.After(expireBefore) { + continue + } + deleteBackendMetricLabels(addr) + delete(cache.backendMetrics, addr) + } +} + +func deleteBackendMetricLabels(addr string) { + metrics.InboundBytesCounter.DeleteLabelValues(addr) + metrics.InboundPacketsCounter.DeleteLabelValues(addr) + metrics.OutboundBytesCounter.DeleteLabelValues(addr) + metrics.OutboundPacketsCounter.DeleteLabelValues(addr) + metrics.HandshakeDurationHistogram.DeleteLabelValues(addr) + for cmd := pnet.Command(0); cmd < pnet.ComEnd; cmd++ { + label := cmd.String() + metrics.QueryTotalCounter.DeleteLabelValues(addr, label) + metrics.QueryDurationHistogram.DeleteLabelValues(addr, label) + metrics.QueryInteractionDurationHistogram.DeleteLabelValues(addr, label) + } +} + // Only used for testing, no need to optimize. func readCmdCounter(cmd pnet.Command, addr string) (int, error) { label := cmd.String() return metrics.ReadCounter(metrics.QueryTotalCounter.WithLabelValues(addr, label)) } +// Only used for testing, no need to optimize. +func readCmdInteractionCounter(cmd pnet.Command, addr string) (uint64, error) { + label := cmd.String() + return metrics.ReadHistogramSampleCount(metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, label)) +} + // Only used for testing, no need to optimize. func readTraffic(addr string) (inBytes, inPackets, outBytes, outPackets int, err error) { if inBytes, err = metrics.ReadCounter(metrics.InboundBytesCounter.WithLabelValues(addr)); err != nil { @@ -106,10 +174,12 @@ func readTraffic(addr string) (inBytes, inPackets, outBytes, outPackets int, err } func addHandshakeMetrics(addr string, duration time.Duration) { + now := monotime.Now() cache.Lock() defer cache.Unlock() - backendMetrics := ensureBackendMetrics(addr) + backendMetrics := ensureBackendMetrics(addr, now) backendMetrics.handshakeDuration.Observe(duration.Seconds()) + cache.maybeRunGC(now) } func addGetBackendMetrics(duration time.Duration, succeed bool) { diff --git a/pkg/proxy/backend/metrics_test.go b/pkg/proxy/backend/metrics_test.go index 88ee199e..dc541266 100644 --- a/pkg/proxy/backend/metrics_test.go +++ b/pkg/proxy/backend/metrics_test.go @@ -4,15 +4,52 @@ package backend import ( + "fmt" "sync" "testing" + "time" "github.com/pingcap/tiproxy/pkg/metrics" pnet "github.com/pingcap/tiproxy/pkg/proxy/net" "github.com/pingcap/tiproxy/pkg/util/monotime" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" ) +func TestAddCmdInteractionMetrics(t *testing.T) { + addr := fmt.Sprintf("127.0.0.1:%d", time.Now().UnixNano()%100000+10000) + cmd := pnet.ComQuery + prev, err := readCmdInteractionCounter(cmd, addr) + require.NoError(t, err) + addCmdInteractionMetrics(cmd, addr, time.Millisecond) + cur, err := readCmdInteractionCounter(cmd, addr) + require.NoError(t, err) + require.Equal(t, prev+1, cur) +} + +func TestBackendMetricsGC(t *testing.T) { + originInterval := metrics.BackendMetricsGCInterval() + originTTL := metrics.BackendMetricsGCIdleTTL() + defer metrics.SetBackendMetricsGCInterval(originInterval) + defer metrics.SetBackendMetricsGCIdleTTL(originTTL) + + metrics.SetBackendMetricsGCInterval(time.Nanosecond) + metrics.SetBackendMetricsGCIdleTTL(time.Millisecond) + + addr1 := fmt.Sprintf("127.0.0.1:%d", time.Now().UnixNano()%100000+10000) + addr2 := fmt.Sprintf("127.0.0.1:%d", time.Now().UnixNano()%100000+11000) + addCmdMetrics(pnet.ComQuery, addr1, monotime.Now()) + time.Sleep(5 * time.Millisecond) + addCmdMetrics(pnet.ComQuery, addr2, monotime.Now()) + + cache.Lock() + _, ok1 := cache.backendMetrics[addr1] + _, ok2 := cache.backendMetrics[addr2] + cache.Unlock() + require.False(t, ok1) + require.True(t, ok2) +} + func BenchmarkAddCmdMetrics(b *testing.B) { cmd := pnet.ComQuery addr := "127.0.0.1:4000" diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 069ed00c..aa250298 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -105,6 +105,11 @@ func (s *SQLServer) reset(cfg *config.Config) { s.mu.frontendReadTimeout = cfg.Proxy.FrontendReadTimeout s.mu.connectTimeout = cfg.Proxy.ConnectTimeout s.mu.Unlock() + + metrics.SetQueryInteractionEnabled(cfg.Advance.QueryInteractionMetrics) + metrics.SetQueryInteractionSlowLogThreshold(time.Duration(cfg.Advance.QueryInteractionSlowLogThreshold) * time.Millisecond) + metrics.SetBackendMetricsGCInterval(time.Duration(cfg.Advance.BackendMetricsGCInterval) * time.Second) + metrics.SetBackendMetricsGCIdleTTL(time.Duration(cfg.Advance.BackendMetricsGCIdle) * time.Second) } func (s *SQLServer) Run(ctx context.Context, cfgch <-chan *config.Config) { diff --git a/pkg/proxy/proxy_test.go b/pkg/proxy/proxy_test.go index 9c97fa47..c7db6db3 100644 --- a/pkg/proxy/proxy_test.go +++ b/pkg/proxy/proxy_test.go @@ -227,6 +227,12 @@ func TestWatchCfg(t *testing.T) { GracefulCloseConnTimeout: 100, }, }, + Advance: config.Advance{ + QueryInteractionMetrics: true, + QueryInteractionSlowLogThreshold: 1234, + BackendMetricsGCInterval: 321, + BackendMetricsGCIdle: 654, + }, Security: config.Security{ RequireBackendTLS: true, }, @@ -239,7 +245,11 @@ func TestWatchCfg(t *testing.T) { server.mu.maxConnections == cfg.Proxy.MaxConnections && server.mu.connBufferSize == cfg.Proxy.ConnBufferSize && server.mu.proxyProtocol == (cfg.Proxy.ProxyProtocol != "") && - server.mu.gracefulWait == cfg.Proxy.GracefulWaitBeforeShutdown + server.mu.gracefulWait == cfg.Proxy.GracefulWaitBeforeShutdown && + metrics.QueryInteractionEnabled() == cfg.Advance.QueryInteractionMetrics && + metrics.QueryInteractionSlowLogThreshold() == time.Duration(cfg.Advance.QueryInteractionSlowLogThreshold)*time.Millisecond && + metrics.BackendMetricsGCInterval() == time.Duration(cfg.Advance.BackendMetricsGCInterval)*time.Second && + metrics.BackendMetricsGCIdleTTL() == time.Duration(cfg.Advance.BackendMetricsGCIdle)*time.Second }, 3*time.Second, 10*time.Millisecond) require.NoError(t, server.Close()) } diff --git a/pkg/server/api/config_test.go b/pkg/server/api/config_test.go index 358a7e81..30faca24 100644 --- a/pkg/server/api/config_test.go +++ b/pkg/server/api/config_test.go @@ -50,6 +50,9 @@ addr = '0.0.0.0:3080' [advance] ignore-wrong-namespace = true +query-interaction-slow-log-threshold-ms = 200 +backend-metrics-gc-interval-seconds = 300 +backend-metrics-gc-idle-seconds = 3600 [security] [security.server-tls] @@ -78,7 +81,7 @@ max-backups = 3 doHTTP(t, http.MethodGet, "/api/admin/config?format=json", nil, nil, func(t *testing.T, r *http.Response) { all, err := io.ReadAll(r.Body) require.NoError(t, err) - require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000},"graceful-close-conn-timeout":15},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true},"security":{"server-tls":{"min-tls-version":"1.2"},"server-http-tls":{"min-tls-version":"1.2"},"cluster-tls":{"min-tls-version":"1.2"},"sql-tls":{"min-tls-version":"1.2"}},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`, + require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000},"graceful-close-conn-timeout":15},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true,"query-interaction-slow-log-threshold-ms":200,"backend-metrics-gc-interval-seconds":300,"backend-metrics-gc-idle-seconds":3600},"security":{"server-tls":{"min-tls-version":"1.2"},"server-http-tls":{"min-tls-version":"1.2"},"cluster-tls":{"min-tls-version":"1.2"},"sql-tls":{"min-tls-version":"1.2"}},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`, string(regexp.MustCompile(`"workdir":"[^"]+",`).ReplaceAll(all, nil))) require.Equal(t, http.StatusOK, r.StatusCode) }) From c061058a35a1dbad7db854c2056bb6b9c6967d3b Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 17:53:51 +0800 Subject: [PATCH 02/11] backend,metrics: add dynamic interaction metrics username filters - add advance.query-interaction-user-patterns with glob syntax validation - hot-reload user patterns and filter only query interaction histogram collection - use handshake/change-user usernames in command execution path, including COM_CHANGE_USER - extend docs, config examples, and add tests for config/runtime/filter behavior --- README.md | 4 ++ conf/proxy.toml | 5 ++ docs/query-interaction-latency-design.md | 9 +++- docs/query-interaction-latency-usage.md | 6 ++- lib/config/proxy.go | 29 ++++++++++-- lib/config/proxy_test.go | 7 +++ pkg/metrics/interaction.go | 42 +++++++++++++++++ pkg/metrics/interaction_test.go | 10 ++++ pkg/proxy/backend/backend_conn_mgr.go | 4 +- pkg/proxy/backend/cmd_processor.go | 4 +- pkg/proxy/backend/cmd_processor_exec.go | 16 +++++-- pkg/proxy/backend/metrics_test.go | 59 ++++++++++++++++++++++++ pkg/proxy/backend/mock_proxy_test.go | 4 +- pkg/proxy/proxy.go | 1 + pkg/proxy/proxy_test.go | 4 ++ 15 files changed, 189 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index e450563b..a742ab2f 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,7 @@ Configure it in `proxy.toml`: [advance] query-interaction-metrics = true query-interaction-slow-log-threshold-ms = 200 +query-interaction-user-patterns = "app_*,readonly" backend-metrics-gc-interval-seconds = 300 backend-metrics-gc-idle-seconds = 3600 ``` @@ -135,6 +136,9 @@ backend-metrics-gc-idle-seconds = 3600 - `query-interaction-slow-log-threshold-ms`: - `0` disables slow interaction logs. - positive values log interactions slower than threshold. +- `query-interaction-user-patterns`: + - comma-separated username glob patterns (`*`, `?`), case-sensitive. + - empty value means collecting interaction metrics for all users. - `backend-metrics-gc-idle-seconds`: - removes idle backend label series to control in-memory metric cache growth. - `0` disables metric GC. diff --git a/conf/proxy.toml b/conf/proxy.toml index 189dfd07..a1c793b1 100644 --- a/conf/proxy.toml +++ b/conf/proxy.toml @@ -122,6 +122,11 @@ graceful-close-conn-timeout = 15 # 200 => log interactions slower than 200ms # query-interaction-slow-log-threshold-ms = 200 +# optional username glob filter for interaction metrics: +# comma-separated patterns, supports `*` and `?`, case-sensitive +# empty means collect all users +# query-interaction-user-patterns = "" + # backend metrics GC: # backend labels not updated for `backend-metrics-gc-idle-seconds` will be removed from in-memory caches and metric vectors # set `backend-metrics-gc-interval-seconds = 0` or `backend-metrics-gc-idle-seconds = 0` to disable GC diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md index ae57fe35..ba1f5df8 100644 --- a/docs/query-interaction-latency-design.md +++ b/docs/query-interaction-latency-design.md @@ -16,6 +16,7 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type` 维度分析。 - 支持慢交互日志阈值动态修改,无需重启 TiProxy。 +- 支持按 MySQL `username` 模式过滤交互指标,降低排障期指标压力。 - 控制 metrics label 内存增长,支持 TTL 回收。 - 保持线上稳定:默认关闭交互指标,开关与阈值可热更新。 @@ -42,6 +43,11 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - 慢交互日志阈值,单位毫秒。 - `0` 表示关闭慢日志。 - 默认:`200`。 +- `query-interaction-user-patterns` (string) + - 交互指标按用户名过滤(glob 模式,逗号分隔,大小写敏感)。 + - 例如:`app_*`, `readonly`, `tenant_??`。 + - 空字符串表示不过滤(采集所有用户)。 + - 默认:`""`。 - `backend-metrics-gc-interval-seconds` (int) - backend metrics GC 扫描周期。 - `0` 表示关闭 GC。 @@ -60,6 +66,7 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - `tiproxy_session_query_interaction_duration_seconds` (HistogramVec) - Labels: `backend`, `cmd_type` - Bucket:与 `query_duration_seconds` 对齐。 + - 仅对匹配 `query-interaction-user-patterns` 的连接采集。 ### 5.2 慢交互日志 @@ -106,6 +113,7 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - histogram observe - 慢日志阈值判断 - GC 采用“低频 + TTL”策略,避免每次请求都做全量扫描。 +- username 过滤采用预解析 glob 列表匹配,开销与 pattern 数量线性相关。 建议上线预留资源: @@ -128,4 +136,3 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - 原有指标与语义保持不变。 - 新配置均有默认值,升级兼容旧配置文件。 - 若需完全关闭回收,将 `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`。 - diff --git a/docs/query-interaction-latency-usage.md b/docs/query-interaction-latency-usage.md index fc9907a2..2263e3c3 100644 --- a/docs/query-interaction-latency-usage.md +++ b/docs/query-interaction-latency-usage.md @@ -17,6 +17,7 @@ [advance] query-interaction-metrics = true query-interaction-slow-log-threshold-ms = 200 +query-interaction-user-patterns = "app_*,readonly" backend-metrics-gc-interval-seconds = 300 backend-metrics-gc-idle-seconds = 3600 ``` @@ -25,6 +26,7 @@ backend-metrics-gc-idle-seconds = 3600 - `query-interaction-metrics=false`:关闭交互延迟指标(默认)。 - `query-interaction-slow-log-threshold-ms=0`:关闭慢交互日志。 +- `query-interaction-user-patterns=""`:采集所有用户名;设置后仅采集匹配 username 的交互指标。 - `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`:关闭 metrics GC。 ## 3. 动态修改(不中断服务) @@ -35,6 +37,7 @@ backend-metrics-gc-idle-seconds = 3600 curl -X PUT http://127.0.0.1:3080/api/admin/config -d ' advance.query-interaction-metrics = true advance.query-interaction-slow-log-threshold-ms = 1000 +advance.query-interaction-user-patterns = "app_*" advance.backend-metrics-gc-interval-seconds = 120 advance.backend-metrics-gc-idle-seconds = 1800 ' @@ -116,6 +119,7 @@ histogram_quantile( 原因: - 每次交互多一次 histogram observe。 +- 若设置 `query-interaction-user-patterns`,每次交互会做一次用户名 glob 匹配(与 pattern 数量线性相关)。 - 慢日志阈值判断和可能的日志写入。 - backend metrics label cache 维护与周期性 GC。 @@ -130,6 +134,7 @@ histogram_quantile( ### 7.1 看不到 interaction 指标 - 确认 `advance.query-interaction-metrics = true`。 +- 若配置了 `advance.query-interaction-user-patterns`,确认当前连接 username 能匹配其中至少一个 pattern。 - 确认 `/api/admin/config` 返回已生效值。 - 确认有真实 SQL 流量经过 TiProxy。 @@ -143,4 +148,3 @@ histogram_quantile( - 提高 `query-interaction-slow-log-threshold-ms`。 - 排障窗口之外可设置为 `0` 关闭慢日志。 - diff --git a/lib/config/proxy.go b/lib/config/proxy.go index dccfdaf7..f97d3ebd 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -6,7 +6,9 @@ package config import ( "bytes" "os" + "path" "path/filepath" + "strings" "time" "github.com/BurntSushi/toml" @@ -70,11 +72,12 @@ type API struct { } type Advance struct { - IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` - QueryInteractionMetrics bool `yaml:"query-interaction-metrics,omitempty" toml:"query-interaction-metrics,omitempty" json:"query-interaction-metrics,omitempty"` - QueryInteractionSlowLogThreshold int `yaml:"query-interaction-slow-log-threshold-ms,omitempty" toml:"query-interaction-slow-log-threshold-ms,omitempty" json:"query-interaction-slow-log-threshold-ms,omitempty"` - BackendMetricsGCInterval int `yaml:"backend-metrics-gc-interval-seconds,omitempty" toml:"backend-metrics-gc-interval-seconds,omitempty" json:"backend-metrics-gc-interval-seconds,omitempty"` - BackendMetricsGCIdle int `yaml:"backend-metrics-gc-idle-seconds,omitempty" toml:"backend-metrics-gc-idle-seconds,omitempty" json:"backend-metrics-gc-idle-seconds,omitempty"` + IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` + QueryInteractionMetrics bool `yaml:"query-interaction-metrics,omitempty" toml:"query-interaction-metrics,omitempty" json:"query-interaction-metrics,omitempty"` + QueryInteractionSlowLogThreshold int `yaml:"query-interaction-slow-log-threshold-ms,omitempty" toml:"query-interaction-slow-log-threshold-ms,omitempty" json:"query-interaction-slow-log-threshold-ms,omitempty"` + QueryInteractionUserPatterns string `yaml:"query-interaction-user-patterns,omitempty" toml:"query-interaction-user-patterns,omitempty" json:"query-interaction-user-patterns,omitempty"` + BackendMetricsGCInterval int `yaml:"backend-metrics-gc-interval-seconds,omitempty" toml:"backend-metrics-gc-interval-seconds,omitempty" json:"backend-metrics-gc-interval-seconds,omitempty"` + BackendMetricsGCIdle int `yaml:"backend-metrics-gc-idle-seconds,omitempty" toml:"backend-metrics-gc-idle-seconds,omitempty" json:"backend-metrics-gc-idle-seconds,omitempty"` } type LogOnline struct { @@ -197,7 +200,23 @@ func (cfg *Config) Check() error { if cfg.Advance.BackendMetricsGCIdle < 0 { return errors.Wrapf(ErrInvalidConfigValue, "backend-metrics-gc-idle-seconds cannot be negative") } + if err := checkQueryInteractionUserPatterns(cfg.Advance.QueryInteractionUserPatterns); err != nil { + return err + } + + return nil +} +func checkQueryInteractionUserPatterns(patterns string) error { + for _, pattern := range strings.Split(patterns, ",") { + pattern = strings.TrimSpace(pattern) + if pattern == "" { + continue + } + if _, err := path.Match(pattern, ""); err != nil { + return errors.Wrapf(ErrInvalidConfigValue, "invalid query-interaction-user-patterns pattern %q: %v", pattern, err) + } + } return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 79de3540..ffac9426 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -18,6 +18,7 @@ var testProxyConfig = Config{ IgnoreWrongNamespace: true, QueryInteractionMetrics: true, QueryInteractionSlowLogThreshold: 500, + QueryInteractionUserPatterns: "app_*,readonly", BackendMetricsGCInterval: 60, BackendMetricsGCIdle: 300, }, @@ -135,6 +136,12 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, + { + pre: func(t *testing.T, c *Config) { + c.Advance.QueryInteractionUserPatterns = "[" + }, + err: ErrInvalidConfigValue, + }, } for _, tc := range testcases { cfg := testProxyConfig diff --git a/pkg/metrics/interaction.go b/pkg/metrics/interaction.go index 00778de6..55038a8b 100644 --- a/pkg/metrics/interaction.go +++ b/pkg/metrics/interaction.go @@ -4,6 +4,8 @@ package metrics import ( + "path" + "strings" "sync/atomic" "time" ) @@ -12,11 +14,17 @@ var queryInteractionEnabled atomic.Bool var queryInteractionSlowLogThreshold atomic.Int64 var backendMetricsGCInterval atomic.Int64 var backendMetricsGCIdleTTL atomic.Int64 +var queryInteractionUserMatcherPtr atomic.Pointer[queryInteractionUserMatcher] + +type queryInteractionUserMatcher struct { + patterns []string +} func init() { queryInteractionSlowLogThreshold.Store(int64(200 * time.Millisecond)) backendMetricsGCInterval.Store(int64(5 * time.Minute)) backendMetricsGCIdleTTL.Store(int64(time.Hour)) + SetQueryInteractionUserPatterns("") } // SetQueryInteractionEnabled updates whether per-interaction query latency metrics are emitted. @@ -39,6 +47,40 @@ func QueryInteractionSlowLogThreshold() time.Duration { return time.Duration(queryInteractionSlowLogThreshold.Load()) } +// SetQueryInteractionUserPatterns updates username glob filters for per-interaction metrics. +// Comma separates multiple patterns; empty means allowing all users. +func SetQueryInteractionUserPatterns(patterns string) { + filterPatterns := make([]string, 0, 4) + for _, pattern := range strings.Split(patterns, ",") { + pattern = strings.TrimSpace(pattern) + if pattern == "" { + continue + } + filterPatterns = append(filterPatterns, pattern) + } + queryInteractionUserMatcherPtr.Store(&queryInteractionUserMatcher{ + patterns: filterPatterns, + }) +} + +// ShouldCollectQueryInteractionForUser reports whether per-interaction metrics should be emitted for this user. +func ShouldCollectQueryInteractionForUser(user string) bool { + matcher := queryInteractionUserMatcherPtr.Load() + if matcher == nil || len(matcher.patterns) == 0 { + return true + } + for _, pattern := range matcher.patterns { + matched, err := path.Match(pattern, user) + if err != nil { + continue + } + if matched { + return true + } + } + return false +} + // SetBackendMetricsGCInterval updates how often backend metric labels are GC'ed. func SetBackendMetricsGCInterval(interval time.Duration) { backendMetricsGCInterval.Store(int64(interval)) diff --git a/pkg/metrics/interaction_test.go b/pkg/metrics/interaction_test.go index 39b8dc63..29bcf748 100644 --- a/pkg/metrics/interaction_test.go +++ b/pkg/metrics/interaction_test.go @@ -11,6 +11,8 @@ import ( ) func TestInteractionSettings(t *testing.T) { + defer SetQueryInteractionUserPatterns("") + SetQueryInteractionEnabled(false) require.False(t, QueryInteractionEnabled()) SetQueryInteractionEnabled(true) @@ -27,4 +29,12 @@ func TestInteractionSettings(t *testing.T) { ttl := 456 * time.Second SetBackendMetricsGCIdleTTL(ttl) require.Equal(t, ttl, BackendMetricsGCIdleTTL()) + + SetQueryInteractionUserPatterns("app_*, readonly") + require.True(t, ShouldCollectQueryInteractionForUser("app_0")) + require.True(t, ShouldCollectQueryInteractionForUser("readonly")) + require.False(t, ShouldCollectQueryInteractionForUser("root")) + + SetQueryInteractionUserPatterns("") + require.True(t, ShouldCollectQueryInteractionForUser("any-user")) } diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index 42542f68..b05ddc91 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -359,7 +359,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) ( waitingRedirect := mgr.redirectInfo.Load() != nil var holdRequest bool backendIO := mgr.backendIO.Load() - holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, waitingRedirect) + holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, waitingRedirect, mgr.authenticator.user) if !holdRequest { addCmdMetrics(cmd, backendIO.RemoteAddr().String(), startTime) mgr.updateTraffic(backendIO) @@ -405,7 +405,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) ( // Execute the held request no matter redirection succeeds or not. if holdRequest && mgr.closeStatus.Load() < statusNotifyClose { backendIO = mgr.backendIO.Load() - _, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, false) + _, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, backendIO, false, mgr.authenticator.user) addCmdMetrics(cmd, backendIO.RemoteAddr().String(), startTime) mgr.updateTraffic(backendIO) if err != nil && !pnet.IsMySQLError(err) { diff --git a/pkg/proxy/backend/cmd_processor.go b/pkg/proxy/backend/cmd_processor.go index c3ebeb58..b6d42d58 100644 --- a/pkg/proxy/backend/cmd_processor.go +++ b/pkg/proxy/backend/cmd_processor.go @@ -24,7 +24,9 @@ type CmdProcessor struct { capability pnet.Capability // Only includes in_trans or quit status. serverStatus uint32 - logger *zap.Logger + // currentUser is used by interaction metrics filters. + currentUser string + logger *zap.Logger } func NewCmdProcessor(logger *zap.Logger) *CmdProcessor { diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index 24704c5d..632c01f1 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -20,7 +20,8 @@ import ( // executeCmd forwards requests and responses between the client and the backend. // holdRequest: should the proxy send the request to the new backend. // err: unexpected errors or MySQL errors. -func (cp *CmdProcessor) executeCmd(request []byte, clientIO, backendIO *pnet.PacketIO, waitingRedirect bool) (holdRequest bool, err error) { +func (cp *CmdProcessor) executeCmd(request []byte, clientIO, backendIO *pnet.PacketIO, waitingRedirect bool, currentUser string) (holdRequest bool, err error) { + cp.currentUser = currentUser backendIO.ResetSequence() if waitingRedirect && cp.needHoldRequest(request) { var response []byte @@ -306,6 +307,7 @@ func (cp *CmdProcessor) forwardSendLongDataCmd(request []byte) error { } func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, request []byte) error { + interactionUser := cp.currentUser req, err := pnet.ParseChangeUser(request, cp.capability) if err != nil { cp.logger.Warn("parse COM_CHANGE_USER packet encounters error", zap.Error(err)) @@ -313,6 +315,8 @@ func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, if !errors.As(err, &warning) { return mysql.ErrMalformPacket } + } else { + interactionUser = req.User } // The client may use the TiProxy salt to generate the auth data instead of using the TiDB salt, // so we need another switch-auth request to pass the TiDB salt to the client. @@ -329,7 +333,7 @@ func (cp *CmdProcessor) forwardChangeUserCmd(clientIO, backendIO *pnet.PacketIO, if err != nil { return err } - cp.observeInteraction(request, backendIO, interactionStart) + cp.observeInteractionByUser(request, backendIO, interactionStart, interactionUser) switch response[0] { case pnet.OKHeader.Byte(): cp.handleOKPacket(request, response) @@ -394,6 +398,10 @@ func isBeginStmt(query string) bool { } func (cp *CmdProcessor) observeInteraction(request []byte, backendIO *pnet.PacketIO, start monotime.Time) { + cp.observeInteractionByUser(request, backendIO, start, cp.currentUser) +} + +func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet.PacketIO, start monotime.Time, interactionUser string) { if !metrics.QueryInteractionEnabled() { return } @@ -406,7 +414,9 @@ func (cp *CmdProcessor) observeInteraction(request []byte, backendIO *pnet.Packe } duration := monotime.Since(start) addr := backendIO.RemoteAddr().String() - addCmdInteractionMetrics(cmd, addr, duration) + if metrics.ShouldCollectQueryInteractionForUser(interactionUser) { + addCmdInteractionMetrics(cmd, addr, duration) + } threshold := metrics.QueryInteractionSlowLogThreshold() if threshold <= 0 || duration < threshold { diff --git a/pkg/proxy/backend/metrics_test.go b/pkg/proxy/backend/metrics_test.go index dc541266..604d0bc8 100644 --- a/pkg/proxy/backend/metrics_test.go +++ b/pkg/proxy/backend/metrics_test.go @@ -50,6 +50,65 @@ func TestBackendMetricsGC(t *testing.T) { require.True(t, ok2) } +func TestInteractionMetricsUserPatternFilter(t *testing.T) { + originEnabled := metrics.QueryInteractionEnabled() + defer metrics.SetQueryInteractionEnabled(originEnabled) + defer metrics.SetQueryInteractionUserPatterns("") + + metrics.SetQueryInteractionEnabled(true) + metrics.SetQueryInteractionUserPatterns("app_*") + + tc := newTCPConnSuite(t) + ts, clean := newTestSuite(t, tc, func(cfg *testConfig) { + cfg.clientConfig.cmd = pnet.ComQuery + cfg.backendConfig.respondType = responseTypeOK + }) + defer clean() + ts.authenticateFirstTime(t, nil) + + addr := ts.tc.proxyBIO.RemoteAddr().String() + prev, err := readCmdInteractionCounter(pnet.ComQuery, addr) + require.NoError(t, err) + + ts.executeCmd(t, nil) + cur, err := readCmdInteractionCounter(pnet.ComQuery, addr) + require.NoError(t, err) + require.Equal(t, prev, cur) + + ts.changeUser("app_reader", mockDBName) + ts.executeCmd(t, nil) + cur, err = readCmdInteractionCounter(pnet.ComQuery, addr) + require.NoError(t, err) + require.Equal(t, prev+1, cur) +} + +func TestInteractionMetricsUserPatternOnChangeUser(t *testing.T) { + originEnabled := metrics.QueryInteractionEnabled() + defer metrics.SetQueryInteractionEnabled(originEnabled) + defer metrics.SetQueryInteractionUserPatterns("") + + metrics.SetQueryInteractionEnabled(true) + metrics.SetQueryInteractionUserPatterns("app_*") + + tc := newTCPConnSuite(t) + ts, clean := newTestSuite(t, tc, func(cfg *testConfig) { + cfg.clientConfig.cmd = pnet.ComChangeUser + cfg.clientConfig.username = "app_switch" + cfg.backendConfig.respondType = responseTypeOK + }) + defer clean() + ts.authenticateFirstTime(t, nil) + + addr := ts.tc.proxyBIO.RemoteAddr().String() + prev, err := readCmdInteractionCounter(pnet.ComChangeUser, addr) + require.NoError(t, err) + + ts.executeCmd(t, nil) + cur, err := readCmdInteractionCounter(pnet.ComChangeUser, addr) + require.NoError(t, err) + require.Equal(t, prev+1, cur) +} + func BenchmarkAddCmdMetrics(b *testing.B) { cmd := pnet.ComQuery addr := "127.0.0.1:4000" diff --git a/pkg/proxy/backend/mock_proxy_test.go b/pkg/proxy/backend/mock_proxy_test.go index 49192058..0d204fc8 100644 --- a/pkg/proxy/backend/mock_proxy_test.go +++ b/pkg/proxy/backend/mock_proxy_test.go @@ -81,12 +81,12 @@ func (mp *mockProxy) processCmd(clientIO, backendIO *pnet.PacketIO) error { if err != nil { return err } - if mp.holdRequest, err = mp.cmdProcessor.executeCmd(request, clientIO, backendIO, mp.waitRedirect); err != nil { + if mp.holdRequest, err = mp.cmdProcessor.executeCmd(request, clientIO, backendIO, mp.waitRedirect, mp.authenticator.user); err != nil { return err } // Pretend to redirect the held request to the new backend. The backend must respond for another loop. if mp.holdRequest { - _, err = mp.cmdProcessor.executeCmd(request, clientIO, backendIO, false) + _, err = mp.cmdProcessor.executeCmd(request, clientIO, backendIO, false, mp.authenticator.user) } return err } diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index aa250298..42773574 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -108,6 +108,7 @@ func (s *SQLServer) reset(cfg *config.Config) { metrics.SetQueryInteractionEnabled(cfg.Advance.QueryInteractionMetrics) metrics.SetQueryInteractionSlowLogThreshold(time.Duration(cfg.Advance.QueryInteractionSlowLogThreshold) * time.Millisecond) + metrics.SetQueryInteractionUserPatterns(cfg.Advance.QueryInteractionUserPatterns) metrics.SetBackendMetricsGCInterval(time.Duration(cfg.Advance.BackendMetricsGCInterval) * time.Second) metrics.SetBackendMetricsGCIdleTTL(time.Duration(cfg.Advance.BackendMetricsGCIdle) * time.Second) } diff --git a/pkg/proxy/proxy_test.go b/pkg/proxy/proxy_test.go index c7db6db3..78d028ac 100644 --- a/pkg/proxy/proxy_test.go +++ b/pkg/proxy/proxy_test.go @@ -230,6 +230,7 @@ func TestWatchCfg(t *testing.T) { Advance: config.Advance{ QueryInteractionMetrics: true, QueryInteractionSlowLogThreshold: 1234, + QueryInteractionUserPatterns: "app_*,readonly", BackendMetricsGCInterval: 321, BackendMetricsGCIdle: 654, }, @@ -248,6 +249,9 @@ func TestWatchCfg(t *testing.T) { server.mu.gracefulWait == cfg.Proxy.GracefulWaitBeforeShutdown && metrics.QueryInteractionEnabled() == cfg.Advance.QueryInteractionMetrics && metrics.QueryInteractionSlowLogThreshold() == time.Duration(cfg.Advance.QueryInteractionSlowLogThreshold)*time.Millisecond && + metrics.ShouldCollectQueryInteractionForUser("app_0") && + metrics.ShouldCollectQueryInteractionForUser("readonly") && + !metrics.ShouldCollectQueryInteractionForUser("root") && metrics.BackendMetricsGCInterval() == time.Duration(cfg.Advance.BackendMetricsGCInterval)*time.Second && metrics.BackendMetricsGCIdleTTL() == time.Duration(cfg.Advance.BackendMetricsGCIdle)*time.Second }, 3*time.Second, 10*time.Millisecond) From 5988cbe1bd5a3005eb8bb57615e01f43e239adca Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 20:14:55 +0800 Subject: [PATCH 03/11] backend,metrics: add sql_type for interaction latency - add sql_type label to query_interaction_duration_seconds - classify COM_QUERY into select/update/begin/commit/... with other fallback - keep non-COM_QUERY commands on sql_type=other - update docs and tests for new interaction metric granularity --- README.md | 2 + docs/query-interaction-latency-design.md | 6 +- docs/query-interaction-latency-usage.md | 9 ++ pkg/metrics/session.go | 5 +- pkg/proxy/backend/cmd_processor_exec.go | 6 +- pkg/proxy/backend/metrics.go | 40 ++++--- pkg/proxy/backend/metrics_test.go | 54 ++++++++-- pkg/proxy/backend/sql_type.go | 129 +++++++++++++++++++++++ pkg/proxy/backend/sql_type_test.go | 38 +++++++ 9 files changed, 261 insertions(+), 28 deletions(-) create mode 100644 pkg/proxy/backend/sql_type.go create mode 100644 pkg/proxy/backend/sql_type_test.go diff --git a/README.md b/README.md index a742ab2f..c70e2d05 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,8 @@ TiProxy can expose per-interaction latency: - Interaction latency: from forwarding one MySQL command to TiDB, until receiving the first response packet from TiDB. - Command duration (`tiproxy_session_query_duration_seconds`) still exists and keeps the original meaning. +- Interaction metric (`tiproxy_session_query_interaction_duration_seconds`) includes labels: `backend`, `cmd_type`, `sql_type`. + - `sql_type` is fine-grained only for `COM_QUERY` (for example `select`, `update`, `begin`, `commit`); other commands use `other`. Configure it in `proxy.toml`: diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md index ba1f5df8..db359efe 100644 --- a/docs/query-interaction-latency-design.md +++ b/docs/query-interaction-latency-design.md @@ -14,7 +14,7 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 ### 2.1 目标 -- 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type` 维度分析。 +- 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type`、`sql_type` 维度分析。 - 支持慢交互日志阈值动态修改,无需重启 TiProxy。 - 支持按 MySQL `username` 模式过滤交互指标,降低排障期指标压力。 - 控制 metrics label 内存增长,支持 TTL 回收。 @@ -64,9 +64,10 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 ### 5.1 新增指标 - `tiproxy_session_query_interaction_duration_seconds` (HistogramVec) - - Labels: `backend`, `cmd_type` + - Labels: `backend`, `cmd_type`, `sql_type` - Bucket:与 `query_duration_seconds` 对齐。 - 仅对匹配 `query-interaction-user-patterns` 的连接采集。 + - `sql_type` 取值固定:`select|insert|update|delete|replace|begin|commit|rollback|set|use|other`。 ### 5.2 慢交互日志 @@ -83,6 +84,7 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - 单响应命令:收到首包时采集一次。 - `COM_QUERY`/`COM_STMT_EXECUTE` 多结果:每轮结果单独采集。 +- `COM_QUERY` 额外做轻量首关键字分类,写入固定集合 `sql_type`,未识别归类为 `other`。 - `LOAD DATA LOCAL INFILE`:包含本地文件阶段后的最终返回轮次。 - `COM_CHANGE_USER`:包含 auth switch 多轮交互。 - 无响应命令(如 `COM_QUIT`)不采集。 diff --git a/docs/query-interaction-latency-usage.md b/docs/query-interaction-latency-usage.md index 2263e3c3..1b157599 100644 --- a/docs/query-interaction-latency-usage.md +++ b/docs/query-interaction-latency-usage.md @@ -58,6 +58,7 @@ advance.backend-metrics-gc-idle-seconds = 1800 - `backend` - `cmd_type` +- `sql_type`(仅 `COM_QUERY` 会细分到 `select/update/begin/commit/...`,其他命令为 `other`) PromQL 示例: @@ -77,6 +78,14 @@ histogram_quantile( ) ``` +```promql +# 按 sql_type 看 P99 interaction latency(例如 select/update/commit/begin) +histogram_quantile( + 0.99, + sum(rate(tiproxy_session_query_interaction_duration_seconds_bucket[1m])) by (le, sql_type) +) +``` + ```promql # 对比 command duration 与 interaction duration 的均值差 ( diff --git a/pkg/metrics/session.go b/pkg/metrics/session.go index f4c575e9..a67f0e98 100644 --- a/pkg/metrics/session.go +++ b/pkg/metrics/session.go @@ -8,9 +8,10 @@ package metrics import "github.com/prometheus/client_golang/prometheus" -// LblCmdType is the label constant. +// LblCmdType and LblSQLType are label constants. const ( LblCmdType = "cmd_type" + LblSQLType = "sql_type" ) var ( @@ -38,7 +39,7 @@ var ( Name: "query_interaction_duration_seconds", Help: "Bucketed histogram of request to first response latency (s) for handled commands.", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days - }, []string{LblBackend, LblCmdType}) + }, []string{LblBackend, LblCmdType, LblSQLType}) HandshakeDurationHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index 632c01f1..d1562964 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -415,7 +415,11 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet duration := monotime.Since(start) addr := backendIO.RemoteAddr().String() if metrics.ShouldCollectQueryInteractionForUser(interactionUser) { - addCmdInteractionMetrics(cmd, addr, duration) + sqlType := sqlTypeOther + if cmd == pnet.ComQuery && len(request) > 1 { + sqlType = classifyComQuerySQLType(request[1:]) + } + addCmdInteractionMetrics(cmd, addr, sqlType, duration) } threshold := metrics.QueryInteractionSlowLogThreshold() diff --git a/pkg/proxy/backend/metrics.go b/pkg/proxy/backend/metrics.go index 650acefc..f73a809b 100644 --- a/pkg/proxy/backend/metrics.go +++ b/pkg/proxy/backend/metrics.go @@ -14,9 +14,9 @@ import ( ) type mcPerCmd struct { - counter prometheus.Counter - observer prometheus.Observer - interactionObserver prometheus.Observer + counter prometheus.Counter + observer prometheus.Observer + interactionObservers map[string]prometheus.Observer } type mcPerBackend struct { @@ -65,20 +65,28 @@ func addCmdMetrics(cmd pnet.Command, addr string, startTime monotime.Time) { cache.maybeRunGC(now) } -func addCmdInteractionMetrics(cmd pnet.Command, addr string, duration time.Duration) { +func addCmdInteractionMetrics(cmd pnet.Command, addr, sqlType string, duration time.Duration) { if cmd >= pnet.ComEnd { return } + if sqlType == "" { + sqlType = sqlTypeOther + } now := monotime.Now() cache.Lock() defer cache.Unlock() backendMetrics := ensureBackendMetrics(addr, now) mc := &backendMetrics.cmds[cmd] - if mc.interactionObserver == nil { - label := cmd.String() - mc.interactionObserver = metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, label) + if mc.interactionObservers == nil { + mc.interactionObservers = make(map[string]prometheus.Observer, len(interactionSQLTypes)) + } + observer, ok := mc.interactionObservers[sqlType] + if !ok { + cmdType := cmd.String() + observer = metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, cmdType, sqlType) + mc.interactionObservers[sqlType] = observer } - mc.interactionObserver.Observe(duration.Seconds()) + observer.Observe(duration.Seconds()) cache.maybeRunGC(now) } @@ -139,10 +147,12 @@ func deleteBackendMetricLabels(addr string) { metrics.OutboundPacketsCounter.DeleteLabelValues(addr) metrics.HandshakeDurationHistogram.DeleteLabelValues(addr) for cmd := pnet.Command(0); cmd < pnet.ComEnd; cmd++ { - label := cmd.String() - metrics.QueryTotalCounter.DeleteLabelValues(addr, label) - metrics.QueryDurationHistogram.DeleteLabelValues(addr, label) - metrics.QueryInteractionDurationHistogram.DeleteLabelValues(addr, label) + cmdType := cmd.String() + metrics.QueryTotalCounter.DeleteLabelValues(addr, cmdType) + metrics.QueryDurationHistogram.DeleteLabelValues(addr, cmdType) + for _, sqlType := range interactionSQLTypes { + metrics.QueryInteractionDurationHistogram.DeleteLabelValues(addr, cmdType, sqlType) + } } } @@ -153,9 +163,9 @@ func readCmdCounter(cmd pnet.Command, addr string) (int, error) { } // Only used for testing, no need to optimize. -func readCmdInteractionCounter(cmd pnet.Command, addr string) (uint64, error) { - label := cmd.String() - return metrics.ReadHistogramSampleCount(metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, label)) +func readCmdInteractionCounter(cmd pnet.Command, addr, sqlType string) (uint64, error) { + cmdType := cmd.String() + return metrics.ReadHistogramSampleCount(metrics.QueryInteractionDurationHistogram.WithLabelValues(addr, cmdType, sqlType)) } // Only used for testing, no need to optimize. diff --git a/pkg/proxy/backend/metrics_test.go b/pkg/proxy/backend/metrics_test.go index 604d0bc8..34f409cd 100644 --- a/pkg/proxy/backend/metrics_test.go +++ b/pkg/proxy/backend/metrics_test.go @@ -19,10 +19,10 @@ import ( func TestAddCmdInteractionMetrics(t *testing.T) { addr := fmt.Sprintf("127.0.0.1:%d", time.Now().UnixNano()%100000+10000) cmd := pnet.ComQuery - prev, err := readCmdInteractionCounter(cmd, addr) + prev, err := readCmdInteractionCounter(cmd, addr, sqlTypeOther) require.NoError(t, err) - addCmdInteractionMetrics(cmd, addr, time.Millisecond) - cur, err := readCmdInteractionCounter(cmd, addr) + addCmdInteractionMetrics(cmd, addr, sqlTypeOther, time.Millisecond) + cur, err := readCmdInteractionCounter(cmd, addr, sqlTypeOther) require.NoError(t, err) require.Equal(t, prev+1, cur) } @@ -67,17 +67,17 @@ func TestInteractionMetricsUserPatternFilter(t *testing.T) { ts.authenticateFirstTime(t, nil) addr := ts.tc.proxyBIO.RemoteAddr().String() - prev, err := readCmdInteractionCounter(pnet.ComQuery, addr) + prev, err := readCmdInteractionCounter(pnet.ComQuery, addr, sqlTypeOther) require.NoError(t, err) ts.executeCmd(t, nil) - cur, err := readCmdInteractionCounter(pnet.ComQuery, addr) + cur, err := readCmdInteractionCounter(pnet.ComQuery, addr, sqlTypeOther) require.NoError(t, err) require.Equal(t, prev, cur) ts.changeUser("app_reader", mockDBName) ts.executeCmd(t, nil) - cur, err = readCmdInteractionCounter(pnet.ComQuery, addr) + cur, err = readCmdInteractionCounter(pnet.ComQuery, addr, sqlTypeOther) require.NoError(t, err) require.Equal(t, prev+1, cur) } @@ -100,15 +100,53 @@ func TestInteractionMetricsUserPatternOnChangeUser(t *testing.T) { ts.authenticateFirstTime(t, nil) addr := ts.tc.proxyBIO.RemoteAddr().String() - prev, err := readCmdInteractionCounter(pnet.ComChangeUser, addr) + prev, err := readCmdInteractionCounter(pnet.ComChangeUser, addr, sqlTypeOther) require.NoError(t, err) ts.executeCmd(t, nil) - cur, err := readCmdInteractionCounter(pnet.ComChangeUser, addr) + cur, err := readCmdInteractionCounter(pnet.ComChangeUser, addr, sqlTypeOther) require.NoError(t, err) require.Equal(t, prev+1, cur) } +func TestInteractionMetricsSQLTypeLabels(t *testing.T) { + originEnabled := metrics.QueryInteractionEnabled() + defer metrics.SetQueryInteractionEnabled(originEnabled) + defer metrics.SetQueryInteractionUserPatterns("") + + metrics.SetQueryInteractionEnabled(true) + metrics.SetQueryInteractionUserPatterns("") + + tc := newTCPConnSuite(t) + ts, clean := newTestSuite(t, tc, func(cfg *testConfig) { + cfg.clientConfig.cmd = pnet.ComQuery + cfg.backendConfig.respondType = responseTypeOK + }) + defer clean() + ts.authenticateFirstTime(t, nil) + + addr := ts.tc.proxyBIO.RemoteAddr().String() + tests := []struct { + sql string + sqlType string + }{ + {sql: "select 1", sqlType: sqlTypeSelect}, + {sql: "update t set a=1", sqlType: sqlTypeUpdate}, + {sql: "begin", sqlType: sqlTypeBegin}, + {sql: "commit", sqlType: sqlTypeCommit}, + } + + for _, test := range tests { + prev, err := readCmdInteractionCounter(pnet.ComQuery, addr, test.sqlType) + require.NoError(t, err) + ts.mc.sql = test.sql + ts.executeCmd(t, nil) + cur, err := readCmdInteractionCounter(pnet.ComQuery, addr, test.sqlType) + require.NoError(t, err) + require.Equal(t, prev+1, cur, test.sql) + } +} + func BenchmarkAddCmdMetrics(b *testing.B) { cmd := pnet.ComQuery addr := "127.0.0.1:4000" diff --git a/pkg/proxy/backend/sql_type.go b/pkg/proxy/backend/sql_type.go new file mode 100644 index 00000000..a824347b --- /dev/null +++ b/pkg/proxy/backend/sql_type.go @@ -0,0 +1,129 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package backend + +import ( + "bytes" + "strings" +) + +const ( + sqlTypeSelect = "select" + sqlTypeInsert = "insert" + sqlTypeUpdate = "update" + sqlTypeDelete = "delete" + sqlTypeReplace = "replace" + sqlTypeBegin = "begin" + sqlTypeCommit = "commit" + sqlTypeRollback = "rollback" + sqlTypeSet = "set" + sqlTypeUse = "use" + sqlTypeOther = "other" +) + +var interactionSQLTypes = []string{ + sqlTypeSelect, + sqlTypeInsert, + sqlTypeUpdate, + sqlTypeDelete, + sqlTypeReplace, + sqlTypeBegin, + sqlTypeCommit, + sqlTypeRollback, + sqlTypeSet, + sqlTypeUse, + sqlTypeOther, +} + +func classifyComQuerySQLType(query []byte) string { + pos := skipLeadingSQLTokens(query, 0, true) + if pos >= len(query) { + return sqlTypeOther + } + first, pos := readSQLKeyword(query, pos) + if first == "" { + return sqlTypeOther + } + switch first { + case sqlTypeSelect: + return sqlTypeSelect + case sqlTypeInsert: + return sqlTypeInsert + case sqlTypeUpdate: + return sqlTypeUpdate + case sqlTypeDelete: + return sqlTypeDelete + case sqlTypeReplace: + return sqlTypeReplace + case sqlTypeBegin: + return sqlTypeBegin + case sqlTypeCommit: + return sqlTypeCommit + case sqlTypeRollback: + return sqlTypeRollback + case sqlTypeSet: + return sqlTypeSet + case sqlTypeUse: + return sqlTypeUse + case "start": + second, _ := readSQLKeyword(query, skipLeadingSQLTokens(query, pos, false)) + if second == "transaction" { + return sqlTypeBegin + } + } + return sqlTypeOther +} + +func skipLeadingSQLTokens(query []byte, pos int, skipSemicolon bool) int { + for pos < len(query) { + switch query[pos] { + case ' ', '\t', '\n', '\r': + pos++ + case ';': + if !skipSemicolon { + return pos + } + pos++ + case '#': + pos = skipLineComment(query, pos+1) + default: + if pos+1 < len(query) && query[pos] == '-' && query[pos+1] == '-' { + pos = skipLineComment(query, pos+2) + continue + } + if pos+1 < len(query) && query[pos] == '/' && query[pos+1] == '*' { + end := bytes.Index(query[pos+2:], []byte("*/")) + if end < 0 { + return len(query) + } + pos += end + 4 + continue + } + return pos + } + } + return pos +} + +func skipLineComment(query []byte, pos int) int { + for pos < len(query) && query[pos] != '\n' { + pos++ + } + return pos +} + +func readSQLKeyword(query []byte, pos int) (string, int) { + start := pos + for pos < len(query) { + ch := query[pos] + if (ch < 'a' || ch > 'z') && (ch < 'A' || ch > 'Z') { + break + } + pos++ + } + if pos == start { + return "", pos + } + return strings.ToLower(string(query[start:pos])), pos +} diff --git a/pkg/proxy/backend/sql_type_test.go b/pkg/proxy/backend/sql_type_test.go new file mode 100644 index 00000000..c753402a --- /dev/null +++ b/pkg/proxy/backend/sql_type_test.go @@ -0,0 +1,38 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package backend + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClassifyComQuerySQLType(t *testing.T) { + tests := []struct { + query string + expected string + }{ + {query: "select 1", expected: sqlTypeSelect}, + {query: "UPDATE t SET a=1", expected: sqlTypeUpdate}, + {query: "insert into t values (1)", expected: sqlTypeInsert}, + {query: "delete from t", expected: sqlTypeDelete}, + {query: "replace into t values (1)", expected: sqlTypeReplace}, + {query: "begin", expected: sqlTypeBegin}, + {query: "START TRANSACTION", expected: sqlTypeBegin}, + {query: "commit", expected: sqlTypeCommit}, + {query: "rollback", expected: sqlTypeRollback}, + {query: "set autocommit=0", expected: sqlTypeSet}, + {query: "use test", expected: sqlTypeUse}, + {query: " # c1\n -- c2\n /* c3 */ select * from t", expected: sqlTypeSelect}, + {query: "", expected: sqlTypeOther}, + {query: "; ", expected: sqlTypeOther}, + {query: "show tables", expected: sqlTypeOther}, + {query: "/* unterminated", expected: sqlTypeOther}, + } + + for _, test := range tests { + require.Equal(t, test.expected, classifyComQuerySQLType([]byte(test.query)), test.query) + } +} From 217b4b65321f6a7772050faa24481b1c6e39192d Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 20:30:28 +0800 Subject: [PATCH 04/11] backend: add interaction threshold and pattern hit logs - enrich slow interaction logs with connection_id, interaction_time, username and sql_type - add username pattern match fields and dedicated matched-pattern warning log - expose matcher API returning matched pattern and wire connection id into cmd processor - update docs for new log events and fields --- README.md | 1 + docs/query-interaction-latency-design.md | 7 ++++++- docs/query-interaction-latency-usage.md | 7 +++++++ pkg/metrics/interaction.go | 13 ++++++++++--- pkg/metrics/interaction_test.go | 9 +++++++++ pkg/proxy/backend/backend_conn_mgr.go | 1 + pkg/proxy/backend/cmd_processor.go | 4 +++- pkg/proxy/backend/cmd_processor_exec.go | 21 ++++++++++++++++----- 8 files changed, 53 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index c70e2d05..9b3d55b3 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,7 @@ backend-metrics-gc-idle-seconds = 3600 - `query-interaction-slow-log-threshold-ms`: - `0` disables slow interaction logs. - positive values log interactions slower than threshold. + - slow logs include `interaction_time`, `connection_id`, `username`, `sql_type`, and username-pattern match fields. - `query-interaction-user-patterns`: - comma-separated username glob patterns (`*`, `?`), case-sensitive. - empty value means collecting interaction metrics for all users. diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md index db359efe..0636567b 100644 --- a/docs/query-interaction-latency-design.md +++ b/docs/query-interaction-latency-design.md @@ -73,11 +73,16 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 当 `interaction_duration >= threshold` 时记录 `Warn` 日志: -- 固定字段:`interaction_duration`, `cmd`, `backend_addr` +- 固定字段:`interaction_time`, `interaction_duration`, `connection_id`, `cmd`, `sql_type`, `username`, `backend_addr` +- 过滤字段:`username_pattern_matched`, `username_matched_pattern` - 条件字段: - `query`:仅 `COM_QUERY`,经过 normalize 并截断 - `stmt_id`:`COM_STMT_*` 且包体含 statement id 时 +当慢交互同时命中 username pattern 时,额外输出: + +- `slow mysql interaction matched username pattern` + ## 6. 数据路径与埋点位置 埋点位于 `CmdProcessor` 转发路径,按“每一轮交互”采集: diff --git a/docs/query-interaction-latency-usage.md b/docs/query-interaction-latency-usage.md index 1b157599..1347f889 100644 --- a/docs/query-interaction-latency-usage.md +++ b/docs/query-interaction-latency-usage.md @@ -104,11 +104,18 @@ histogram_quantile( 日志名: - `slow mysql interaction` +- `slow mysql interaction matched username pattern`(当慢交互且命中 username pattern) 字段: +- `interaction_time` - `interaction_duration` +- `connection_id` - `cmd` +- `sql_type` +- `username` +- `username_pattern_matched` +- `username_matched_pattern` - `backend_addr` - `query`(仅 `COM_QUERY`) - `stmt_id`(适用 `COM_STMT_*`) diff --git a/pkg/metrics/interaction.go b/pkg/metrics/interaction.go index 55038a8b..bbc38550 100644 --- a/pkg/metrics/interaction.go +++ b/pkg/metrics/interaction.go @@ -65,9 +65,16 @@ func SetQueryInteractionUserPatterns(patterns string) { // ShouldCollectQueryInteractionForUser reports whether per-interaction metrics should be emitted for this user. func ShouldCollectQueryInteractionForUser(user string) bool { + matched, _ := MatchQueryInteractionUserPattern(user) + return matched +} + +// MatchQueryInteractionUserPattern reports whether the user matches configured patterns +// and returns the matched pattern when available. +func MatchQueryInteractionUserPattern(user string) (matched bool, pattern string) { matcher := queryInteractionUserMatcherPtr.Load() if matcher == nil || len(matcher.patterns) == 0 { - return true + return true, "" } for _, pattern := range matcher.patterns { matched, err := path.Match(pattern, user) @@ -75,10 +82,10 @@ func ShouldCollectQueryInteractionForUser(user string) bool { continue } if matched { - return true + return true, pattern } } - return false + return false, "" } // SetBackendMetricsGCInterval updates how often backend metric labels are GC'ed. diff --git a/pkg/metrics/interaction_test.go b/pkg/metrics/interaction_test.go index 29bcf748..f54a846e 100644 --- a/pkg/metrics/interaction_test.go +++ b/pkg/metrics/interaction_test.go @@ -34,7 +34,16 @@ func TestInteractionSettings(t *testing.T) { require.True(t, ShouldCollectQueryInteractionForUser("app_0")) require.True(t, ShouldCollectQueryInteractionForUser("readonly")) require.False(t, ShouldCollectQueryInteractionForUser("root")) + matched, pattern := MatchQueryInteractionUserPattern("app_0") + require.True(t, matched) + require.Equal(t, "app_*", pattern) + matched, pattern = MatchQueryInteractionUserPattern("root") + require.False(t, matched) + require.Equal(t, "", pattern) SetQueryInteractionUserPatterns("") require.True(t, ShouldCollectQueryInteractionForUser("any-user")) + matched, pattern = MatchQueryInteractionUserPattern("any-user") + require.True(t, matched) + require.Equal(t, "", pattern) } diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index b05ddc91..f6573640 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -182,6 +182,7 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler } mgr.ctxmap.m = make(map[any]any) mgr.SetValue(ConnContextKeyConnID, connectionID) + mgr.cmdProcessor.connectionID = connectionID return mgr } diff --git a/pkg/proxy/backend/cmd_processor.go b/pkg/proxy/backend/cmd_processor.go index b6d42d58..ba49882e 100644 --- a/pkg/proxy/backend/cmd_processor.go +++ b/pkg/proxy/backend/cmd_processor.go @@ -26,7 +26,9 @@ type CmdProcessor struct { serverStatus uint32 // currentUser is used by interaction metrics filters. currentUser string - logger *zap.Logger + // connectionID is used in interaction logs. + connectionID uint64 + logger *zap.Logger } func NewCmdProcessor(logger *zap.Logger) *CmdProcessor { diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index d1562964..85ea4715 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -6,6 +6,7 @@ package backend import ( "encoding/binary" "strings" + "time" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/tidb/pkg/parser" @@ -414,11 +415,12 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet } duration := monotime.Since(start) addr := backendIO.RemoteAddr().String() - if metrics.ShouldCollectQueryInteractionForUser(interactionUser) { - sqlType := sqlTypeOther - if cmd == pnet.ComQuery && len(request) > 1 { - sqlType = classifyComQuerySQLType(request[1:]) - } + sqlType := sqlTypeOther + if cmd == pnet.ComQuery && len(request) > 1 { + sqlType = classifyComQuerySQLType(request[1:]) + } + patternMatched, matchedPattern := metrics.MatchQueryInteractionUserPattern(interactionUser) + if patternMatched { addCmdInteractionMetrics(cmd, addr, sqlType, duration) } @@ -427,8 +429,14 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet return } fields := []zap.Field{ + zap.Time("interaction_time", time.Now()), zap.Duration("interaction_duration", duration), + zap.Uint64("connection_id", cp.connectionID), zap.Stringer("cmd", cmd), + zap.String("sql_type", sqlType), + zap.String("username", interactionUser), + zap.Bool("username_pattern_matched", patternMatched), + zap.String("username_matched_pattern", matchedPattern), zap.String("backend_addr", addr), } if cmd == pnet.ComQuery { @@ -442,6 +450,9 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet fields = append(fields, zap.Uint32("stmt_id", binary.LittleEndian.Uint32(request[1:5]))) } cp.logger.Warn("slow mysql interaction", fields...) + if patternMatched { + cp.logger.Warn("slow mysql interaction matched username pattern", fields...) + } } func isStmtCmd(cmd pnet.Command) bool { From 8584ae8d0a27c1804b11b13a9481248bdcc51a79 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 20:39:52 +0800 Subject: [PATCH 05/11] docs: add english and chinese interaction latency design docs - rewrite query-interaction-latency-design.md in English - add query-interaction-latency-design-zh.md as standalone Chinese version - add README link to Chinese design doc --- README.md | 1 + docs/query-interaction-latency-design-zh.md | 154 ++++++++++++++++ docs/query-interaction-latency-design.md | 193 ++++++++++---------- 3 files changed, 256 insertions(+), 92 deletions(-) create mode 100644 docs/query-interaction-latency-design-zh.md diff --git a/README.md b/README.md index 9b3d55b3..a46dbbea 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,7 @@ These options support dynamic update through `PUT /api/admin/config`, so no rest Detailed docs: - Design: [`docs/query-interaction-latency-design.md`](docs/query-interaction-latency-design.md) +- Design (Chinese): [`docs/query-interaction-latency-design-zh.md`](docs/query-interaction-latency-design-zh.md) - Usage: [`docs/query-interaction-latency-usage.md`](docs/query-interaction-latency-usage.md) ### Resource Sizing Notes diff --git a/docs/query-interaction-latency-design-zh.md b/docs/query-interaction-latency-design-zh.md new file mode 100644 index 00000000..525713c4 --- /dev/null +++ b/docs/query-interaction-latency-design-zh.md @@ -0,0 +1,154 @@ +# TiProxy Query Interaction Latency 设计文档(中文) + +## 1. 背景 + +TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时,但无法直接回答以下问题: + +- 单次 MySQL 交互(request -> backend first response)的延迟是否异常。 +- 出现慢请求时,是 TiDB 后端慢,还是 TiProxy 自身引入额外时延。 +- backend 数量或地址变化后,metrics label 是否会长期累积占用内存。 + +为此,本设计新增交互延迟观测、慢交互日志、以及 backend metrics label GC。 + +## 2. 目标与非目标 + +### 2.1 目标 + +- 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type`、`sql_type` 维度分析。 +- 支持慢交互日志阈值动态修改,无需重启 TiProxy。 +- 支持按 MySQL `username` 模式过滤交互指标,降低排障期指标压力。 +- 控制 metrics label 内存增长,支持 TTL 回收。 +- 保持线上稳定:默认关闭交互指标,开关与阈值可热更新。 + +### 2.2 非目标 + +- 不提供完整 SQL tracing/span。 +- 不改变已有 `query_duration_seconds` 指标语义。 +- 不对外暴露每一条请求的全量明细存储。 + +## 3. 术语定义 + +- Interaction:一轮 MySQL 命令交互,从 TiProxy 把 request 转发到 TiDB 后开始,直到收到 TiDB 的第一个 response packet 结束。 +- Command Duration:现有命令完整处理耗时(已有指标,不变)。 +- Interaction Duration:本次新增的首包响应延迟。 + +## 4. 配置设计 + +新增 `[advance]` 配置项: + +- `query-interaction-metrics` (bool) + - 是否开启交互延迟观测。 + - 默认:`false`。 +- `query-interaction-slow-log-threshold-ms` (int) + - 慢交互日志阈值,单位毫秒。 + - `0` 表示关闭慢日志。 + - 默认:`200`。 +- `query-interaction-user-patterns` (string) + - 交互指标按用户名过滤(glob 模式,逗号分隔,大小写敏感)。 + - 例如:`app_*`, `readonly`, `tenant_??`。 + - 空字符串表示不过滤(采集所有用户)。 + - 默认:`""`。 +- `backend-metrics-gc-interval-seconds` (int) + - backend metrics GC 扫描周期。 + - `0` 表示关闭 GC。 + - 默认:`300`。 +- `backend-metrics-gc-idle-seconds` (int) + - backend idle TTL,超过 TTL 未更新则回收其 metrics labels。 + - `0` 表示关闭 GC。 + - 默认:`3600`。 + +所有配置支持通过 `PUT /api/admin/config` 动态更新。 + +## 5. 指标与日志设计 + +### 5.1 新增指标 + +- `tiproxy_session_query_interaction_duration_seconds` (HistogramVec) + - Labels: `backend`, `cmd_type`, `sql_type` + - Bucket:与 `query_duration_seconds` 对齐。 + - 仅对匹配 `query-interaction-user-patterns` 的连接采集。 + - `sql_type` 取值固定:`select|insert|update|delete|replace|begin|commit|rollback|set|use|other`。 + +### 5.2 慢交互日志 + +当 `interaction_duration >= threshold` 时记录 `Warn` 日志: + +- 固定字段: + - `interaction_time` + - `interaction_duration` + - `connection_id` + - `cmd` + - `sql_type` + - `username` + - `backend_addr` +- 过滤字段: + - `username_pattern_matched` + - `username_matched_pattern` +- 条件字段: + - `query`:仅 `COM_QUERY`,经过 normalize 并截断 + - `stmt_id`:`COM_STMT_*` 且包体含 statement id 时 + +当慢交互同时命中 username pattern 时,额外输出: + +- `slow mysql interaction matched username pattern` + +## 6. 数据路径与埋点位置 + +埋点位于 `CmdProcessor` 转发路径,按“每一轮交互”采集: + +- 单响应命令:收到首包时采集一次。 +- `COM_QUERY`/`COM_STMT_EXECUTE` 多结果:每轮结果单独采集。 +- `COM_QUERY` 额外做轻量首关键字分类,写入固定集合 `sql_type`,未识别归类为 `other`。 +- `LOAD DATA LOCAL INFILE`:包含本地文件阶段后的最终返回轮次。 +- `COM_CHANGE_USER`:包含 auth switch 多轮交互。 +- 无响应命令(如 `COM_QUIT`)不采集。 + +## 7. Backend Metrics GC 设计 + +在 backend metrics cache 维护 `lastSeen`: + +1. 每次 metrics 更新刷新 `lastSeen`。 +2. 到达 GC interval 时触发 sweep。 +3. 若 `now - lastSeen > idleTTL`: + - 删除缓存节点。 + - 调用 Prometheus `DeleteLabelValues` 回收对应 labels。 + +回收范围: + +- `query_total` +- `query_duration_seconds` +- `query_interaction_duration_seconds` +- `handshake_duration_seconds` +- traffic counters + +## 8. 性能与稳定性考虑 + +- 默认关闭交互指标,避免默认增量开销。 +- 开启后增量主要是: + - monotonic time 计算 + - histogram observe + - 慢日志阈值判断 +- GC 采用“低频 + TTL”策略,避免每次请求都做全量扫描。 +- username 过滤采用预解析 glob 列表匹配,开销与 pattern 数量线性相关。 + +建议上线预留资源: + +- CPU: +15% +- Memory: +10% + +实际值需结合业务流量压测复核。 + +## 9. 测试策略 + +- 配置测试:新字段默认值、序列化、负值校验。 +- 热更新测试:`WatchConfig` 下参数动态生效。 +- 指标测试: + - interaction histogram sample count 增长 + - TTL GC 可删除 stale backend labels +- 协议路径回归:`pkg/proxy/backend` 全量测试通过。 + +## 10. 兼容性 + +- 原有指标与语义保持不变。 +- 新配置均有默认值,升级兼容旧配置文件。 +- 若需完全关闭回收,将 `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`。 diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md index 0636567b..5efbdffb 100644 --- a/docs/query-interaction-latency-design.md +++ b/docs/query-interaction-latency-design.md @@ -1,110 +1,119 @@ # TiProxy Query Interaction Latency Design -## 1. 背景 +## 1. Background -TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时,但无法直接回答以下问题: +As a TiDB gateway, TiProxy already exposes command-level total duration metrics. However, those metrics cannot directly answer: -- 单次 MySQL 交互(request -> backend first response)的延迟是否异常。 -- 出现慢请求时,是 TiDB 后端慢,还是 TiProxy 自身引入额外时延。 -- backend 数量或地址变化后,metrics label 是否会长期累积占用内存。 +- Whether one MySQL interaction (`request -> first backend response packet`) is slow. +- Whether a latency spike comes from TiDB backend latency or TiProxy overhead. +- Whether backend label cardinality can grow indefinitely when backend addresses change. -为此,本设计新增交互延迟观测、慢交互日志、以及 backend metrics label GC。 +To solve this, we add interaction latency observability, slow interaction logs, and backend metrics label GC. -## 2. 目标与非目标 +## 2. Goals and Non-goals -### 2.1 目标 +### 2.1 Goals -- 提供每次交互的聚合延迟指标,支持按 `backend`、`cmd_type`、`sql_type` 维度分析。 -- 支持慢交互日志阈值动态修改,无需重启 TiProxy。 -- 支持按 MySQL `username` 模式过滤交互指标,降低排障期指标压力。 -- 控制 metrics label 内存增长,支持 TTL 回收。 -- 保持线上稳定:默认关闭交互指标,开关与阈值可热更新。 +- Provide per-interaction latency metrics with dimensions `backend`, `cmd_type`, and `sql_type`. +- Support dynamic updates for slow interaction threshold without restarting TiProxy. +- Support username-pattern filtering for interaction metrics to reduce troubleshooting-time metric pressure. +- Control metrics-label memory growth with TTL-based GC. +- Keep production stable: interaction metrics remain disabled by default; options are dynamically reloadable. -### 2.2 非目标 +### 2.2 Non-goals -- 不提供完整 SQL tracing/span。 -- 不改变已有 `query_duration_seconds` 指标语义。 -- 不对外暴露每一条请求的全量明细存储。 +- Full SQL tracing/span storage. +- Changing semantics of existing `query_duration_seconds`. +- Storing full per-request raw details externally. -## 3. 术语定义 +## 3. Terminology -- Interaction:一轮 MySQL 命令交互,从 TiProxy 把 request 转发到 TiDB 后开始,直到收到 TiDB 的第一个 response packet 结束。 -- Command Duration:现有命令完整处理耗时(已有指标,不变)。 -- Interaction Duration:本次新增的首包响应延迟。 +- Interaction: one MySQL command round from forwarding request to TiDB until receiving the first response packet. +- Command Duration: existing full command processing duration metric. +- Interaction Duration: first-response latency introduced by this feature. -## 4. 配置设计 +## 4. Configuration -新增 `[advance]` 配置项: +New `[advance]` options: - `query-interaction-metrics` (bool) - - 是否开启交互延迟观测。 - - 默认:`false`。 + - Enable interaction latency metrics. + - Default: `false`. - `query-interaction-slow-log-threshold-ms` (int) - - 慢交互日志阈值,单位毫秒。 - - `0` 表示关闭慢日志。 - - 默认:`200`。 + - Slow interaction log threshold in milliseconds. + - `0` disables slow interaction logs. + - Default: `200`. - `query-interaction-user-patterns` (string) - - 交互指标按用户名过滤(glob 模式,逗号分隔,大小写敏感)。 - - 例如:`app_*`, `readonly`, `tenant_??`。 - - 空字符串表示不过滤(采集所有用户)。 - - 默认:`""`。 + - Comma-separated, case-sensitive glob patterns for usernames. + - Examples: `app_*`, `readonly`, `tenant_??`. + - Empty means no filtering (collect all usernames). + - Default: `""`. - `backend-metrics-gc-interval-seconds` (int) - - backend metrics GC 扫描周期。 - - `0` 表示关闭 GC。 - - 默认:`300`。 + - Backend metrics GC sweep interval. + - `0` disables GC. + - Default: `300`. - `backend-metrics-gc-idle-seconds` (int) - - backend idle TTL,超过 TTL 未更新则回收其 metrics labels。 - - `0` 表示关闭 GC。 - - 默认:`3600`。 + - Idle TTL for backend labels; stale labels are removed after TTL. + - `0` disables GC. + - Default: `3600`. -所有配置支持通过 `PUT /api/admin/config` 动态更新。 +All options support dynamic updates through `PUT /api/admin/config`. -## 5. 指标与日志设计 +## 5. Metrics and Logs -### 5.1 新增指标 +### 5.1 New metric - `tiproxy_session_query_interaction_duration_seconds` (HistogramVec) - Labels: `backend`, `cmd_type`, `sql_type` - - Bucket:与 `query_duration_seconds` 对齐。 - - 仅对匹配 `query-interaction-user-patterns` 的连接采集。 - - `sql_type` 取值固定:`select|insert|update|delete|replace|begin|commit|rollback|set|use|other`。 - -### 5.2 慢交互日志 - -当 `interaction_duration >= threshold` 时记录 `Warn` 日志: - -- 固定字段:`interaction_time`, `interaction_duration`, `connection_id`, `cmd`, `sql_type`, `username`, `backend_addr` -- 过滤字段:`username_pattern_matched`, `username_matched_pattern` -- 条件字段: - - `query`:仅 `COM_QUERY`,经过 normalize 并截断 - - `stmt_id`:`COM_STMT_*` 且包体含 statement id 时 - -当慢交互同时命中 username pattern 时,额外输出: + - Buckets aligned with `query_duration_seconds` + - Collected only when username matches `query-interaction-user-patterns` + - `sql_type` is fixed: `select|insert|update|delete|replace|begin|commit|rollback|set|use|other` + +### 5.2 Slow interaction logs + +When `interaction_duration >= threshold`, TiProxy logs `Warn`: + +- Base fields: + - `interaction_time` + - `interaction_duration` + - `connection_id` + - `cmd` + - `sql_type` + - `username` + - `backend_addr` +- Filter fields: + - `username_pattern_matched` + - `username_matched_pattern` +- Conditional fields: + - `query` for `COM_QUERY` (normalized and truncated) + - `stmt_id` for `COM_STMT_*` when statement id exists + +When slow interaction also matches username pattern, TiProxy emits an extra `Warn` log: - `slow mysql interaction matched username pattern` -## 6. 数据路径与埋点位置 +## 6. Data path and instrumentation -埋点位于 `CmdProcessor` 转发路径,按“每一轮交互”采集: +Instrumentation is in the `CmdProcessor` forwarding path, sampled per interaction: -- 单响应命令:收到首包时采集一次。 -- `COM_QUERY`/`COM_STMT_EXECUTE` 多结果:每轮结果单独采集。 -- `COM_QUERY` 额外做轻量首关键字分类,写入固定集合 `sql_type`,未识别归类为 `other`。 -- `LOAD DATA LOCAL INFILE`:包含本地文件阶段后的最终返回轮次。 -- `COM_CHANGE_USER`:包含 auth switch 多轮交互。 -- 无响应命令(如 `COM_QUIT`)不采集。 +- Single-response commands: observed once when first response packet arrives. +- `COM_QUERY`/`COM_STMT_EXECUTE` with multiple results: observed per result round. +- `COM_QUERY`: lightweight first-keyword classification fills fixed `sql_type`; unrecognized SQL is `other`. +- `LOAD DATA LOCAL INFILE`: includes the final response after file transfer phase. +- `COM_CHANGE_USER`: includes multi-round auth switch interactions. +- No-response commands (for example `COM_QUIT`) are not observed. -## 7. Backend Metrics GC 设计 +## 7. Backend metrics GC -在 backend metrics cache 维护 `lastSeen`: +`backend metrics cache` tracks `lastSeen`: -1. 每次 metrics 更新刷新 `lastSeen`。 -2. 到达 GC interval 时触发 sweep。 -3. 若 `now - lastSeen > idleTTL`: - - 删除缓存节点。 - - 调用 Prometheus `DeleteLabelValues` 回收对应 labels。 +1. Refresh `lastSeen` on each metric update. +2. Run sweep when GC interval arrives. +3. If `now - lastSeen > idleTTL`: + - Delete cache entry. + - Call Prometheus `DeleteLabelValues` to remove metric labels. -回收范围: +GC coverage: - `query_total` - `query_duration_seconds` @@ -112,34 +121,34 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - `handshake_duration_seconds` - traffic counters -## 8. 性能与稳定性考虑 +## 8. Performance and stability -- 默认关闭交互指标,避免默认增量开销。 -- 开启后增量主要是: - - monotonic time 计算 +- Interaction metrics are disabled by default to avoid default overhead. +- Main incremental cost when enabled: + - monotonic time calculation - histogram observe - - 慢日志阈值判断 -- GC 采用“低频 + TTL”策略,避免每次请求都做全量扫描。 -- username 过滤采用预解析 glob 列表匹配,开销与 pattern 数量线性相关。 + - slow-threshold evaluation and logging +- GC uses low-frequency + TTL sweep to avoid per-request full scans. +- Username filter uses pre-parsed glob patterns; cost is linear to pattern count. -建议上线预留资源: +Recommended initial reservation before rollout: - CPU: +15% - Memory: +10% -实际值需结合业务流量压测复核。 +Validate with workload-specific benchmarks. -## 9. 测试策略 +## 9. Testing strategy -- 配置测试:新字段默认值、序列化、负值校验。 -- 热更新测试:`WatchConfig` 下参数动态生效。 -- 指标测试: - - interaction histogram sample count 增长 - - TTL GC 可删除 stale backend labels -- 协议路径回归:`pkg/proxy/backend` 全量测试通过。 +- Config tests: defaults, serialization, invalid value checks. +- Dynamic update tests: runtime updates effective under `WatchConfig`. +- Metrics tests: + - interaction histogram sample count increments + - TTL GC removes stale backend labels +- Protocol-path regression: full `pkg/proxy/backend` tests pass. -## 10. 兼容性 +## 10. Compatibility -- 原有指标与语义保持不变。 -- 新配置均有默认值,升级兼容旧配置文件。 -- 若需完全关闭回收,将 `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`。 +- Existing metrics and semantics stay unchanged. +- New options have defaults, so upgrades are backward-compatible. +- To fully disable GC, set `backend-metrics-gc-interval-seconds=0` or `backend-metrics-gc-idle-seconds=0`. From 2cc22665496000b31cdedf86b2266419ee99a214 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 22:04:02 +0800 Subject: [PATCH 06/11] tests: stabilize under proxy env --- pkg/manager/infosync/info_test.go | 15 +++++++++++++ pkg/proxy/backend/backend_conn_mgr_test.go | 25 +++++++++++++++------- pkg/server/api/server_test.go | 6 +++++- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/pkg/manager/infosync/info_test.go b/pkg/manager/infosync/info_test.go index 557dae7e..1ecb5b08 100644 --- a/pkg/manager/infosync/info_test.go +++ b/pkg/manager/infosync/info_test.go @@ -27,6 +27,19 @@ import ( "go.uber.org/zap" ) +func disableProxyEnvForTest(t *testing.T) { + // Some environments (dev machines / CI) set HTTP(S)/ALL proxy variables. + // These can interfere with etcd/grpc connections in tests (e.g. redirecting to 127.0.0.1:7890). + for _, k := range []string{ + "HTTP_PROXY", "http_proxy", + "HTTPS_PROXY", "https_proxy", + "ALL_PROXY", "all_proxy", + "NO_PROXY", "no_proxy", + } { + t.Setenv(k, "") + } +} + // TTL is refreshed periodically and info stays the same. func TestTTLRefresh(t *testing.T) { ts := newEtcdTestSuite(t) @@ -244,6 +257,8 @@ type etcdTestSuite struct { } func newEtcdTestSuite(t *testing.T) *etcdTestSuite { + disableProxyEnvForTest(t) + lg, _ := logger.CreateLoggerForTest(t) ts := &etcdTestSuite{ t: t, diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index 13f9212c..fdfbaf04 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -899,24 +899,33 @@ func TestGetBackendIO(t *testing.T) { } lg, _ := logger.CreateLoggerForTest(t) mgr := NewBackendConnManager(lg, handler, 0, &BCConfig{ConnectTimeout: time.Second}) - var wg waitgroup.WaitGroup for i := 0; i <= len(listeners); i++ { + idx := i + var wg waitgroup.WaitGroup wg.Run(func() { - if i < len(listeners) { - cn, err := listeners[i].Accept() + if idx >= len(listeners) { + return + } + cn, err := listeners[idx].Accept() + if err != nil { + // It's possible that the listener is closed before Accept returns (scheduler timing). + // Treat close-related errors as expected; other errors should fail the test. + if errors.Is(err, net.ErrClosed) || strings.Contains(err.Error(), "use of closed network connection") { + return + } require.NoError(t, err) - require.NoError(t, cn.Close()) + return } + require.NoError(t, cn.Close()) }) io, err := mgr.getBackendIO(context.Background(), mgr, nil) if err == nil { require.NoError(t, io.Close()) } - message := fmt.Sprintf("%d: %s, %+v\n", i, badAddrs, err) - if i < len(listeners) { - require.NoError(t, err, message) - err = listeners[i].Close() + message := fmt.Sprintf("%d: %s, %+v\n", idx, badAddrs, err) + if idx < len(listeners) { require.NoError(t, err, message) + require.NoError(t, listeners[idx].Close(), message) } else { require.Error(t, err, message) } diff --git a/pkg/server/api/server_test.go b/pkg/server/api/server_test.go index 774c3a1a..98d85939 100644 --- a/pkg/server/api/server_test.go +++ b/pkg/server/api/server_test.go @@ -43,6 +43,10 @@ func createServer(t *testing.T, closing *bool) (*Server, func(t *testing.T, meth }) addr := fmt.Sprintf("http://%s", srv.listener.Addr().String()) + // Avoid using HTTP proxy env vars in tests. The server is always local. + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.Proxy = nil + httpClient := &http.Client{Transport: tr} return srv, func(t *testing.T, method, pa string, rd io.Reader, header map[string]string, f func(*testing.T, *http.Response)) { if pa[0] != '/' { pa = "/" + pa @@ -52,7 +56,7 @@ func createServer(t *testing.T, closing *bool) (*Server, func(t *testing.T, meth for key, value := range header { req.Header.Set(key, value) } - resp, err := http.DefaultClient.Do(req) + resp, err := httpClient.Do(req) require.NoError(t, err) f(t, resp) require.NoError(t, resp.Body.Close()) From 9fa469bea5f7486f796397be385fc8067bc267ff Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 22:04:07 +0800 Subject: [PATCH 07/11] backend,metrics: add sql_digest to slow interaction logs --- README.md | 2 + conf/proxy.toml | 3 + docs/query-interaction-latency-design-zh.md | 6 +- docs/query-interaction-latency-design.md | 6 +- docs/query-interaction-latency-usage.md | 6 +- lib/config/proxy.go | 13 ++-- pkg/metrics/interaction.go | 12 ++++ pkg/metrics/interaction_test.go | 5 ++ pkg/proxy/backend/cmd_processor_exec.go | 14 ++-- pkg/proxy/backend/slow_log_digest_test.go | 74 +++++++++++++++++++++ pkg/proxy/proxy.go | 1 + 11 files changed, 129 insertions(+), 13 deletions(-) create mode 100644 pkg/proxy/backend/slow_log_digest_test.go diff --git a/README.md b/README.md index a46dbbea..93cc661a 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,8 @@ backend-metrics-gc-idle-seconds = 3600 - `0` disables slow interaction logs. - positive values log interactions slower than threshold. - slow logs include `interaction_time`, `connection_id`, `username`, `sql_type`, and username-pattern match fields. +- `query-interaction-slow-log-only-digest`: + - when `true`, slow interaction logs print only `sql_digest` for `COM_QUERY` and omit the normalized `query` text. - `query-interaction-user-patterns`: - comma-separated username glob patterns (`*`, `?`), case-sensitive. - empty value means collecting interaction metrics for all users. diff --git a/conf/proxy.toml b/conf/proxy.toml index a1c793b1..16003a21 100644 --- a/conf/proxy.toml +++ b/conf/proxy.toml @@ -122,6 +122,9 @@ graceful-close-conn-timeout = 15 # 200 => log interactions slower than 200ms # query-interaction-slow-log-threshold-ms = 200 +# only print sql_digest in slow interaction logs, omit normalized query text: +# query-interaction-slow-log-only-digest = false + # optional username glob filter for interaction metrics: # comma-separated patterns, supports `*` and `?`, case-sensitive # empty means collect all users diff --git a/docs/query-interaction-latency-design-zh.md b/docs/query-interaction-latency-design-zh.md index 525713c4..83a9f44f 100644 --- a/docs/query-interaction-latency-design-zh.md +++ b/docs/query-interaction-latency-design-zh.md @@ -43,6 +43,9 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - 慢交互日志阈值,单位毫秒。 - `0` 表示关闭慢日志。 - 默认:`200`。 +- `query-interaction-slow-log-only-digest` (bool) + - 为 `true` 时,慢交互日志仅输出 `sql_digest`,不输出规范化后的 `query` 文本。 + - 默认:`false`。 - `query-interaction-user-patterns` (string) - 交互指标按用户名过滤(glob 模式,逗号分隔,大小写敏感)。 - 例如:`app_*`, `readonly`, `tenant_??`。 @@ -81,11 +84,12 @@ TiProxy 作为 TiDB gateway,原有指标可以看到命令级别的总耗时 - `sql_type` - `username` - `backend_addr` +- `sql_digest`(仅 `COM_QUERY`) - 过滤字段: - `username_pattern_matched` - `username_matched_pattern` - 条件字段: - - `query`:仅 `COM_QUERY`,经过 normalize 并截断 + - `query`:仅 `COM_QUERY`,经过 normalize 并截断;当 `query-interaction-slow-log-only-digest=true` 时不输出 - `stmt_id`:`COM_STMT_*` 且包体含 statement id 时 当慢交互同时命中 username pattern 时,额外输出: diff --git a/docs/query-interaction-latency-design.md b/docs/query-interaction-latency-design.md index 5efbdffb..021ca74f 100644 --- a/docs/query-interaction-latency-design.md +++ b/docs/query-interaction-latency-design.md @@ -43,6 +43,9 @@ New `[advance]` options: - Slow interaction log threshold in milliseconds. - `0` disables slow interaction logs. - Default: `200`. +- `query-interaction-slow-log-only-digest` (bool) + - When `true`, slow interaction logs print `sql_digest` and omit the normalized `query` text. + - Default: `false`. - `query-interaction-user-patterns` (string) - Comma-separated, case-sensitive glob patterns for usernames. - Examples: `app_*`, `readonly`, `tenant_??`. @@ -81,11 +84,12 @@ When `interaction_duration >= threshold`, TiProxy logs `Warn`: - `sql_type` - `username` - `backend_addr` + - `sql_digest` (only for `COM_QUERY`) - Filter fields: - `username_pattern_matched` - `username_matched_pattern` - Conditional fields: - - `query` for `COM_QUERY` (normalized and truncated) + - `query` for `COM_QUERY` (normalized and truncated), unless `query-interaction-slow-log-only-digest=true` - `stmt_id` for `COM_STMT_*` when statement id exists When slow interaction also matches username pattern, TiProxy emits an extra `Warn` log: diff --git a/docs/query-interaction-latency-usage.md b/docs/query-interaction-latency-usage.md index 1347f889..d3a35b2a 100644 --- a/docs/query-interaction-latency-usage.md +++ b/docs/query-interaction-latency-usage.md @@ -17,6 +17,7 @@ [advance] query-interaction-metrics = true query-interaction-slow-log-threshold-ms = 200 +query-interaction-slow-log-only-digest = false query-interaction-user-patterns = "app_*,readonly" backend-metrics-gc-interval-seconds = 300 backend-metrics-gc-idle-seconds = 3600 @@ -26,6 +27,7 @@ backend-metrics-gc-idle-seconds = 3600 - `query-interaction-metrics=false`:关闭交互延迟指标(默认)。 - `query-interaction-slow-log-threshold-ms=0`:关闭慢交互日志。 +- `query-interaction-slow-log-only-digest=true`:慢交互日志仅输出 `sql_digest`,不输出 `query`。 - `query-interaction-user-patterns=""`:采集所有用户名;设置后仅采集匹配 username 的交互指标。 - `backend-metrics-gc-interval-seconds=0` 或 `backend-metrics-gc-idle-seconds=0`:关闭 metrics GC。 @@ -37,6 +39,7 @@ backend-metrics-gc-idle-seconds = 3600 curl -X PUT http://127.0.0.1:3080/api/admin/config -d ' advance.query-interaction-metrics = true advance.query-interaction-slow-log-threshold-ms = 1000 +advance.query-interaction-slow-log-only-digest = true advance.query-interaction-user-patterns = "app_*" advance.backend-metrics-gc-interval-seconds = 120 advance.backend-metrics-gc-idle-seconds = 1800 @@ -117,7 +120,8 @@ histogram_quantile( - `username_pattern_matched` - `username_matched_pattern` - `backend_addr` -- `query`(仅 `COM_QUERY`) +- `sql_digest`(仅 `COM_QUERY`) +- `query`(仅 `COM_QUERY`;当 `advance.query-interaction-slow-log-only-digest=true` 时不输出) - `stmt_id`(适用 `COM_STMT_*`) 建议: diff --git a/lib/config/proxy.go b/lib/config/proxy.go index f97d3ebd..b73ebbce 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -72,12 +72,13 @@ type API struct { } type Advance struct { - IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` - QueryInteractionMetrics bool `yaml:"query-interaction-metrics,omitempty" toml:"query-interaction-metrics,omitempty" json:"query-interaction-metrics,omitempty"` - QueryInteractionSlowLogThreshold int `yaml:"query-interaction-slow-log-threshold-ms,omitempty" toml:"query-interaction-slow-log-threshold-ms,omitempty" json:"query-interaction-slow-log-threshold-ms,omitempty"` - QueryInteractionUserPatterns string `yaml:"query-interaction-user-patterns,omitempty" toml:"query-interaction-user-patterns,omitempty" json:"query-interaction-user-patterns,omitempty"` - BackendMetricsGCInterval int `yaml:"backend-metrics-gc-interval-seconds,omitempty" toml:"backend-metrics-gc-interval-seconds,omitempty" json:"backend-metrics-gc-interval-seconds,omitempty"` - BackendMetricsGCIdle int `yaml:"backend-metrics-gc-idle-seconds,omitempty" toml:"backend-metrics-gc-idle-seconds,omitempty" json:"backend-metrics-gc-idle-seconds,omitempty"` + IgnoreWrongNamespace bool `yaml:"ignore-wrong-namespace,omitempty" toml:"ignore-wrong-namespace,omitempty" json:"ignore-wrong-namespace,omitempty"` + QueryInteractionMetrics bool `yaml:"query-interaction-metrics,omitempty" toml:"query-interaction-metrics,omitempty" json:"query-interaction-metrics,omitempty"` + QueryInteractionSlowLogThreshold int `yaml:"query-interaction-slow-log-threshold-ms,omitempty" toml:"query-interaction-slow-log-threshold-ms,omitempty" json:"query-interaction-slow-log-threshold-ms,omitempty"` + QueryInteractionSlowLogOnlyDigest bool `yaml:"query-interaction-slow-log-only-digest,omitempty" toml:"query-interaction-slow-log-only-digest,omitempty" json:"query-interaction-slow-log-only-digest,omitempty"` + QueryInteractionUserPatterns string `yaml:"query-interaction-user-patterns,omitempty" toml:"query-interaction-user-patterns,omitempty" json:"query-interaction-user-patterns,omitempty"` + BackendMetricsGCInterval int `yaml:"backend-metrics-gc-interval-seconds,omitempty" toml:"backend-metrics-gc-interval-seconds,omitempty" json:"backend-metrics-gc-interval-seconds,omitempty"` + BackendMetricsGCIdle int `yaml:"backend-metrics-gc-idle-seconds,omitempty" toml:"backend-metrics-gc-idle-seconds,omitempty" json:"backend-metrics-gc-idle-seconds,omitempty"` } type LogOnline struct { diff --git a/pkg/metrics/interaction.go b/pkg/metrics/interaction.go index bbc38550..30c2f749 100644 --- a/pkg/metrics/interaction.go +++ b/pkg/metrics/interaction.go @@ -12,6 +12,7 @@ import ( var queryInteractionEnabled atomic.Bool var queryInteractionSlowLogThreshold atomic.Int64 +var queryInteractionSlowLogOnlyDigest atomic.Bool var backendMetricsGCInterval atomic.Int64 var backendMetricsGCIdleTTL atomic.Int64 var queryInteractionUserMatcherPtr atomic.Pointer[queryInteractionUserMatcher] @@ -47,6 +48,17 @@ func QueryInteractionSlowLogThreshold() time.Duration { return time.Duration(queryInteractionSlowLogThreshold.Load()) } +// SetQueryInteractionSlowLogOnlyDigest updates whether slow interaction logs should only print SQL digest +// and omit the normalized query text. +func SetQueryInteractionSlowLogOnlyDigest(onlyDigest bool) { + queryInteractionSlowLogOnlyDigest.Store(onlyDigest) +} + +// QueryInteractionSlowLogOnlyDigest returns whether slow interaction logs should only print SQL digest. +func QueryInteractionSlowLogOnlyDigest() bool { + return queryInteractionSlowLogOnlyDigest.Load() +} + // SetQueryInteractionUserPatterns updates username glob filters for per-interaction metrics. // Comma separates multiple patterns; empty means allowing all users. func SetQueryInteractionUserPatterns(patterns string) { diff --git a/pkg/metrics/interaction_test.go b/pkg/metrics/interaction_test.go index f54a846e..cebd43c0 100644 --- a/pkg/metrics/interaction_test.go +++ b/pkg/metrics/interaction_test.go @@ -22,6 +22,11 @@ func TestInteractionSettings(t *testing.T) { SetQueryInteractionSlowLogThreshold(threshold) require.Equal(t, threshold, QueryInteractionSlowLogThreshold()) + SetQueryInteractionSlowLogOnlyDigest(false) + require.False(t, QueryInteractionSlowLogOnlyDigest()) + SetQueryInteractionSlowLogOnlyDigest(true) + require.True(t, QueryInteractionSlowLogOnlyDigest()) + interval := 123 * time.Second SetBackendMetricsGCInterval(interval) require.Equal(t, interval, BackendMetricsGCInterval()) diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index 85ea4715..dd9fa7ae 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -440,11 +440,17 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet zap.String("backend_addr", addr), } if cmd == pnet.ComQuery { - query := parser.Normalize(pnet.ParseQueryPacket(request[1:]), "ON") - if len(query) > 256 { - query = query[:256] + sqlText := pnet.ParseQueryPacket(request[1:]) + normalized, digest := parser.NormalizeDigest(sqlText) + if digest != nil { + fields = append(fields, zap.String("sql_digest", digest.String())) + } + if !metrics.QueryInteractionSlowLogOnlyDigest() { + if len(normalized) > 256 { + normalized = normalized[:256] + } + fields = append(fields, zap.String("query", normalized)) } - fields = append(fields, zap.String("query", query)) } if isStmtCmd(cmd) && len(request) >= 5 { fields = append(fields, zap.Uint32("stmt_id", binary.LittleEndian.Uint32(request[1:5]))) diff --git a/pkg/proxy/backend/slow_log_digest_test.go b/pkg/proxy/backend/slow_log_digest_test.go new file mode 100644 index 00000000..18337435 --- /dev/null +++ b/pkg/proxy/backend/slow_log_digest_test.go @@ -0,0 +1,74 @@ +// Copyright 2024 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package backend + +import ( + "strings" + "testing" + "time" + + "github.com/pingcap/tiproxy/pkg/metrics" + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/stretchr/testify/require" +) + +func TestSlowInteractionLogSQLDigestOnly(t *testing.T) { + originEnabled := metrics.QueryInteractionEnabled() + originThreshold := metrics.QueryInteractionSlowLogThreshold() + originOnlyDigest := metrics.QueryInteractionSlowLogOnlyDigest() + defer metrics.SetQueryInteractionEnabled(originEnabled) + defer metrics.SetQueryInteractionSlowLogThreshold(originThreshold) + defer metrics.SetQueryInteractionSlowLogOnlyDigest(originOnlyDigest) + defer metrics.SetQueryInteractionUserPatterns("") + + metrics.SetQueryInteractionEnabled(true) + metrics.SetQueryInteractionSlowLogThreshold(time.Nanosecond) + metrics.SetQueryInteractionSlowLogOnlyDigest(true) + metrics.SetQueryInteractionUserPatterns("") + + tc := newTCPConnSuite(t) + ts, clean := newTestSuite(t, tc, func(cfg *testConfig) { + cfg.clientConfig.cmd = pnet.ComQuery + cfg.clientConfig.sql = "select 1" + cfg.backendConfig.respondType = responseTypeOK + }) + defer clean() + ts.authenticateFirstTime(t, nil) + ts.executeCmd(t, nil) + + logText := ts.mp.text.String() + require.Contains(t, logText, "slow mysql interaction") + require.Contains(t, logText, "sql_digest") + require.False(t, strings.Contains(logText, "select ?"), logText) +} + +func TestSlowInteractionLogSQLDigestAndQuery(t *testing.T) { + originEnabled := metrics.QueryInteractionEnabled() + originThreshold := metrics.QueryInteractionSlowLogThreshold() + originOnlyDigest := metrics.QueryInteractionSlowLogOnlyDigest() + defer metrics.SetQueryInteractionEnabled(originEnabled) + defer metrics.SetQueryInteractionSlowLogThreshold(originThreshold) + defer metrics.SetQueryInteractionSlowLogOnlyDigest(originOnlyDigest) + defer metrics.SetQueryInteractionUserPatterns("") + + metrics.SetQueryInteractionEnabled(true) + metrics.SetQueryInteractionSlowLogThreshold(time.Nanosecond) + metrics.SetQueryInteractionSlowLogOnlyDigest(false) + metrics.SetQueryInteractionUserPatterns("") + + tc := newTCPConnSuite(t) + ts, clean := newTestSuite(t, tc, func(cfg *testConfig) { + cfg.clientConfig.cmd = pnet.ComQuery + cfg.clientConfig.sql = "select 1" + cfg.backendConfig.respondType = responseTypeOK + }) + defer clean() + ts.authenticateFirstTime(t, nil) + ts.executeCmd(t, nil) + + logText := ts.mp.text.String() + require.Contains(t, logText, "slow mysql interaction") + require.Contains(t, logText, "sql_digest") + require.True(t, strings.Contains(logText, "select ?"), logText) +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 42773574..91a30b9c 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -108,6 +108,7 @@ func (s *SQLServer) reset(cfg *config.Config) { metrics.SetQueryInteractionEnabled(cfg.Advance.QueryInteractionMetrics) metrics.SetQueryInteractionSlowLogThreshold(time.Duration(cfg.Advance.QueryInteractionSlowLogThreshold) * time.Millisecond) + metrics.SetQueryInteractionSlowLogOnlyDigest(cfg.Advance.QueryInteractionSlowLogOnlyDigest) metrics.SetQueryInteractionUserPatterns(cfg.Advance.QueryInteractionUserPatterns) metrics.SetBackendMetricsGCInterval(time.Duration(cfg.Advance.BackendMetricsGCInterval) * time.Second) metrics.SetBackendMetricsGCIdleTTL(time.Duration(cfg.Advance.BackendMetricsGCIdle) * time.Second) From f576b6014cf7f8dcc92479dcb4a9af626d4f64f9 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sat, 7 Feb 2026 23:01:57 +0800 Subject: [PATCH 08/11] cmd: add query interaction metrics flag --- cmd/tiproxy/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/tiproxy/main.go b/cmd/tiproxy/main.go index 1a7c4445..cd0db9a4 100644 --- a/cmd/tiproxy/main.go +++ b/cmd/tiproxy/main.go @@ -31,6 +31,9 @@ func main() { rootCmd.PersistentFlags().StringVar(&sctx.ConfigFile, "config", "", "proxy config file path") rootCmd.PersistentFlags().StringVar(&sctx.Overlay.Log.Encoder, "log_encoder", "", "log in format of tidb, console, or json") rootCmd.PersistentFlags().StringVar(&sctx.Overlay.Log.Level, "log_level", "", "log level") + rootCmd.PersistentFlags().BoolVar(&sctx.Overlay.Advance.QueryInteractionMetrics, "query-interaction-metrics", false, "enable query interaction latency metrics (advance.query-interaction-metrics). Note: CLI flag is an overlay and will override config reloads") + // Keep underscore alias for compatibility with existing flag naming style. + rootCmd.PersistentFlags().BoolVar(&sctx.Overlay.Advance.QueryInteractionMetrics, "query_interaction_metrics", false, "alias of --query-interaction-metrics") metrics.MaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) From 35ed57ddc1ad60831dcfa86c2d5eec0bbdb5d018 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sun, 8 Feb 2026 08:09:07 +0800 Subject: [PATCH 09/11] backend: guard sql_digest normalization --- pkg/proxy/backend/cmd_processor_exec.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index dd9fa7ae..4f234aa2 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -441,11 +441,12 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet } if cmd == pnet.ComQuery { sqlText := pnet.ParseQueryPacket(request[1:]) - normalized, digest := parser.NormalizeDigest(sqlText) - if digest != nil { - fields = append(fields, zap.String("sql_digest", digest.String())) + normalized, digestStr, ok := normalizeDigestSafe(sqlText) + fields = append(fields, zap.Bool("sql_digest_ok", ok)) + if digestStr != "" { + fields = append(fields, zap.String("sql_digest", digestStr)) } - if !metrics.QueryInteractionSlowLogOnlyDigest() { + if ok && !metrics.QueryInteractionSlowLogOnlyDigest() { if len(normalized) > 256 { normalized = normalized[:256] } @@ -461,6 +462,22 @@ func (cp *CmdProcessor) observeInteractionByUser(request []byte, backendIO *pnet } } +func normalizeDigestSafe(sqlText string) (normalized string, digestStr string, ok bool) { + ok = true + defer func() { + if r := recover(); r != nil { + ok = false + normalized = "" + digestStr = "" + } + }() + normalized, digest := parser.NormalizeDigest(sqlText) + if digest != nil { + digestStr = digest.String() + } + return normalized, digestStr, ok +} + func isStmtCmd(cmd pnet.Command) bool { switch cmd { case pnet.ComStmtExecute, pnet.ComStmtSendLongData, pnet.ComStmtClose, pnet.ComStmtReset, pnet.ComStmtFetch: From 3ae99d8aa8d55362e6ab7a49d2545458226b4573 Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sun, 8 Feb 2026 08:14:47 +0800 Subject: [PATCH 10/11] backend: guard query normalization --- pkg/proxy/backend/backend_conn_mgr.go | 9 ++++--- pkg/proxy/backend/cmd_processor_exec.go | 5 +++- pkg/proxy/backend/sql_normalize_safe.go | 21 +++++++++++++++ pkg/proxy/backend/sql_normalize_safe_test.go | 28 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 5 deletions(-) create mode 100644 pkg/proxy/backend/sql_normalize_safe.go create mode 100644 pkg/proxy/backend/sql_normalize_safe_test.go diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index f6573640..0269aeb9 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -19,7 +19,6 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-mysql-org/go-mysql/mysql" - "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tiproxy/lib/config" "github.com/pingcap/tiproxy/lib/util/errors" "github.com/pingcap/tiproxy/lib/util/waitgroup" @@ -326,16 +325,18 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) ( if err != nil && errors.Is(err, ErrBackendConn) { cmd, data := pnet.Command(request[0]), request[1:] var query string + queryNormalizeOK := true if cmd == pnet.ComQuery { - query = parser.Normalize(pnet.ParseQueryPacket(data), "ON") - if len(query) > 256 { + query, queryNormalizeOK = normalizeSQLSafe(pnet.ParseQueryPacket(data)) + if queryNormalizeOK && len(query) > 256 { query = query[:256] } } // idle_time: maybe the idle time exceeds wait_timeout? // execute_time and query: maybe this query causes TiDB OOM? mgr.logger.Info("backend disconnects", zap.Duration("idle_time", time.Duration(now-mgr.lastActiveTime)), - zap.Duration("execute_time", time.Duration(now-startTime)), zap.Stringer("cmd", cmd), zap.String("query", query)) + zap.Duration("execute_time", time.Duration(now-startTime)), zap.Stringer("cmd", cmd), + zap.Bool("query_normalize_ok", queryNormalizeOK), zap.String("query", query)) } mgr.lastActiveTime = now mgr.processLock.Unlock() diff --git a/pkg/proxy/backend/cmd_processor_exec.go b/pkg/proxy/backend/cmd_processor_exec.go index 4f234aa2..8659595f 100644 --- a/pkg/proxy/backend/cmd_processor_exec.go +++ b/pkg/proxy/backend/cmd_processor_exec.go @@ -394,7 +394,10 @@ func (cp *CmdProcessor) needHoldRequest(request []byte) bool { } func isBeginStmt(query string) bool { - normalized := parser.Normalize(query, "ON") + normalized, ok := normalizeSQLSafe(query) + if !ok { + return false + } return strings.HasPrefix(normalized, "begin") || strings.HasPrefix(normalized, "start transaction") } diff --git a/pkg/proxy/backend/sql_normalize_safe.go b/pkg/proxy/backend/sql_normalize_safe.go new file mode 100644 index 00000000..ec77aa10 --- /dev/null +++ b/pkg/proxy/backend/sql_normalize_safe.go @@ -0,0 +1,21 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package backend + +import "github.com/pingcap/tidb/pkg/parser" + +var normalizeSQLFn = func(sqlText string) string { + return parser.Normalize(sqlText, "ON") +} + +func normalizeSQLSafe(sqlText string) (normalized string, ok bool) { + ok = true + defer func() { + if r := recover(); r != nil { + ok = false + normalized = "" + } + }() + return normalizeSQLFn(sqlText), ok +} diff --git a/pkg/proxy/backend/sql_normalize_safe_test.go b/pkg/proxy/backend/sql_normalize_safe_test.go new file mode 100644 index 00000000..a59b0ccf --- /dev/null +++ b/pkg/proxy/backend/sql_normalize_safe_test.go @@ -0,0 +1,28 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package backend + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNormalizeSQLSafeCatchesPanic(t *testing.T) { + orig := normalizeSQLFn + t.Cleanup(func() { normalizeSQLFn = orig }) + normalizeSQLFn = func(string) string { panic("boom") } + + normalized, ok := normalizeSQLSafe("select 1") + require.False(t, ok) + require.Equal(t, "", normalized) +} + +func TestIsBeginStmtNoPanicOnNormalizePanic(t *testing.T) { + orig := normalizeSQLFn + t.Cleanup(func() { normalizeSQLFn = orig }) + normalizeSQLFn = func(string) string { panic("boom") } + + require.False(t, isBeginStmt("begin")) +} From 7b6a5b8790b7d9a93e02a724399f4080aff7ff2b Mon Sep 17 00:00:00 2001 From: Yuqing Bai Date: Sun, 8 Feb 2026 08:59:50 +0800 Subject: [PATCH 11/11] cmd: accept single-dash long flags --- cmd/tiproxy/main.go | 38 ++++++++++++++++++++++++++++++++++++++ cmd/tiproxy/main_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 cmd/tiproxy/main_test.go diff --git a/cmd/tiproxy/main.go b/cmd/tiproxy/main.go index cd0db9a4..43f5bc27 100644 --- a/cmd/tiproxy/main.go +++ b/cmd/tiproxy/main.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "runtime" + "strings" "github.com/pingcap/tiproxy/lib/util/cmd" "github.com/pingcap/tiproxy/lib/util/errors" @@ -17,6 +18,41 @@ import ( "github.com/spf13/cobra" ) +func rewriteSingleDashLongFlags(args []string) []string { + // Cobra/pflag uses: + // - short flags: -v + // - long flags: --config=/path or --config /path + // + // Some users may accidentally pass "-config" / "-query-interaction-metrics" (single dash). + // pflag treats that as a shorthand bundle and errors. We rewrite a small allow-list to be lenient. + allow := map[string]string{ + "config": "config", + "log_encoder": "log_encoder", + "log_level": "log_level", + "query-interaction-metrics": "query-interaction-metrics", + "query_interaction_metrics": "query-interaction-metrics", + } + out := make([]string, 0, len(args)) + for _, a := range args { + if len(a) < 3 || !strings.HasPrefix(a, "-") || strings.HasPrefix(a, "--") { + out = append(out, a) + continue + } + name := a[1:] + suffix := "" + if i := strings.IndexByte(name, '='); i >= 0 { + suffix = name[i:] + name = name[:i] + } + if canonical, ok := allow[name]; ok { + out = append(out, "--"+canonical+suffix) + continue + } + out = append(out, a) + } + return out +} + func main() { rootCmd := &cobra.Command{ Use: os.Args[0], @@ -35,6 +71,8 @@ func main() { // Keep underscore alias for compatibility with existing flag naming style. rootCmd.PersistentFlags().BoolVar(&sctx.Overlay.Advance.QueryInteractionMetrics, "query_interaction_metrics", false, "alias of --query-interaction-metrics") + rootCmd.SetArgs(rewriteSingleDashLongFlags(os.Args[1:])) + metrics.MaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) rootCmd.RunE = func(cmd *cobra.Command, _ []string) error { diff --git a/cmd/tiproxy/main_test.go b/cmd/tiproxy/main_test.go new file mode 100644 index 00000000..ffbaa05f --- /dev/null +++ b/cmd/tiproxy/main_test.go @@ -0,0 +1,26 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRewriteSingleDashLongFlags(t *testing.T) { + require.Equal(t, + []string{"--query-interaction-metrics", "--config", "a.toml"}, + rewriteSingleDashLongFlags([]string{"-query-interaction-metrics", "-config", "a.toml"}), + ) + require.Equal(t, + []string{"--query-interaction-metrics=true"}, + rewriteSingleDashLongFlags([]string{"-query_interaction_metrics=true"}), + ) + // Unknown flags and real short flags should be left untouched. + require.Equal(t, + []string{"-v", "-x", "--log_level", "info"}, + rewriteSingleDashLongFlags([]string{"-v", "-x", "--log_level", "info"}), + ) +}