From 0cf13b0760aaebdebab2feae76dcca7757434016 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 15 Dec 2025 09:35:18 +0000 Subject: [PATCH 1/2] Initial plan From 2d2d97ce3ebed415e895180a9f1e3fcc91ec1608 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 15 Dec 2025 09:48:40 +0000 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E5=B0=86=20Metrics=20=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E5=A4=A7=E5=B0=8F=E4=BB=8E=E7=A1=AC=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E6=94=B9=E4=B8=BA=E5=8F=AF=E9=85=8D=E7=BD=AE=E7=9A=84=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E5=8F=98=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: BukeLy <19304666+BukeLy@users.noreply.github.com> --- env.example | 8 +++ src/config.py | 48 ++++++++++++++---- src/metrics.py | 135 +++++++++++++++++++++++++------------------------ 3 files changed, 116 insertions(+), 75 deletions(-) diff --git a/env.example b/env.example index db7a4f2..c1493b9 100644 --- a/env.example +++ b/env.example @@ -230,6 +230,14 @@ MAX_FILE_PATHS=100 # 最大文件路径数(默认100) # 最多缓存多少个租户实例(超过后会使用 LRU 策略清理) MAX_TENANT_INSTANCES=50 +# ====== 性能监控配置 ====== + +# --- Metrics 缓存大小配置 --- +# 控制性能监控模块的历史数据缓存大小 +# METRICS_RESPONSE_TIMES_CACHE_SIZE=500 # API 响应时间缓存大小(默认 500) +# METRICS_DOC_METRICS_CACHE_SIZE=5000 # 文档处理指标缓存大小(默认 5000) +# METRICS_ALERTS_CACHE_SIZE=500 # 告警记录缓存大小(默认 500) + # ====== RAG-Anything VLM 增强配置 ====== # 用于控制 MinerU 远程模式下的图表处理质量 diff --git a/src/config.py b/src/config.py index 0d1f8f5..bdfc707 100644 --- a/src/config.py +++ b/src/config.py @@ -8,12 +8,9 @@ 重构原因: 统一配置管理,从服务商导向改为功能导向命名 """ -import os -from typing import Optional from pydantic import Field from pydantic_settings import BaseSettings - # ==================== LLM Configuration ==================== class LLMConfig(BaseSettings): @@ -28,7 +25,7 @@ class LLMConfig(BaseSettings): # Rate limiting requests_per_minute: int = Field(default=800, description="Maximum requests per minute") tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute (input + output)") - max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + max_async: int | None = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") class Config: env_prefix = "LLM_" @@ -55,7 +52,7 @@ class EmbeddingConfig(BaseSettings): # Rate limiting requests_per_minute: int = Field(default=1600, description="Maximum requests per minute") tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute") - max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + max_async: int | None = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") timeout: int = Field(default=30, description="HTTP request timeout (seconds)") class Config: @@ -79,7 +76,7 @@ class RerankConfig(BaseSettings): # Rate limiting requests_per_minute: int = Field(default=1600, description="Maximum requests per minute") tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute") - max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + max_async: int | None = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") timeout: int = Field(default=30, description="HTTP request timeout (seconds)") class Config: @@ -147,7 +144,7 @@ class DeepSeekOCRConfig(BaseSettings): # Rate limiting requests_per_minute: int = Field(default=800, description="Maximum requests per minute") tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute") - max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + max_async: int | None = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") class Config: env_prefix = "DS_OCR_" @@ -247,6 +244,33 @@ class Config: populate_by_name = True +# ==================== Metrics Configuration ==================== + +class MetricsConfig(BaseSettings): + """Metrics Cache Size Configuration""" + + response_times_cache_size: int = Field( + default=500, + description="Cache size for API response times (after truncation)", + alias="METRICS_RESPONSE_TIMES_CACHE_SIZE" + ) + doc_metrics_cache_size: int = Field( + default=5000, + description="Cache size for document metrics (after truncation)", + alias="METRICS_DOC_METRICS_CACHE_SIZE" + ) + alerts_cache_size: int = Field( + default=500, + description="Cache size for alerts (after truncation)", + alias="METRICS_ALERTS_CACHE_SIZE" + ) + + class Config: + env_file = ".env" + extra = "ignore" + populate_by_name = True + + # ==================== Tenant Configuration (for override) ==================== class TenantConfig: @@ -254,9 +278,9 @@ class TenantConfig: def __init__( self, - llm_config: Optional[dict] = None, - embedding_config: Optional[dict] = None, - rerank_config: Optional[dict] = None, + llm_config: dict | None = None, + embedding_config: dict | None = None, + rerank_config: dict | None = None, quota_daily_queries: int = 1000, quota_storage_mb: int = 1000, status: str = "active" @@ -294,6 +318,7 @@ def __init__(self): self.storage = StorageConfig() self.lightrag_query = LightRAGQueryConfig() self.multi_tenant = MultiTenantConfig() + self.metrics = MetricsConfig() def validate(self) -> None: """Validate Configuration Integrity""" @@ -334,6 +359,9 @@ def print_summary(self) -> None: print(f"Storage - Graph: {self.storage.graph_storage}") print(f"Storage - DocStatus: {self.storage.doc_status_storage}") print(f"Max Tenant Instances: {self.multi_tenant.max_tenant_instances}") + print(f"Metrics - Response Times Cache: {self.metrics.response_times_cache_size}") + print(f"Metrics - Doc Metrics Cache: {self.metrics.doc_metrics_cache_size}") + print(f"Metrics - Alerts Cache: {self.metrics.alerts_cache_size}") print("=" * 60) diff --git a/src/metrics.py b/src/metrics.py index 5e03e4f..a2470ca 100644 --- a/src/metrics.py +++ b/src/metrics.py @@ -9,16 +9,18 @@ - 定期生成性能报告 """ -import time import asyncio +import statistics import threading -from dataclasses import dataclass, field -from typing import Dict, List, Optional, Any -from datetime import datetime, timedelta +import time from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + import psutil -import statistics +from src.config import get_config from src.logger import logger @@ -29,7 +31,7 @@ class PerformanceMetric: value: float timestamp: datetime = field(default_factory=datetime.now) unit: str = "" - threshold: Optional[float] = None # 告警阈值 + threshold: float | None = None # 告警阈值 @dataclass @@ -37,28 +39,28 @@ class APIMetrics: """API 性能指标""" endpoint: str method: str - response_times: List[float] = field(default_factory=list) - status_codes: Dict[int, int] = field(default_factory=lambda: defaultdict(int)) + response_times: list[float] = field(default_factory=list) + status_codes: dict[int, int] = field(default_factory=lambda: defaultdict(int)) request_count: int = 0 error_count: int = 0 - + @property def average_response_time(self) -> float: """平均响应时间""" return statistics.mean(self.response_times) if self.response_times else 0.0 - + @property def p95_response_time(self) -> float: """95分位数响应时间""" if len(self.response_times) < 20: return self.average_response_time return statistics.quantiles(self.response_times, n=20)[18] - + @property def error_rate(self) -> float: """错误率""" return self.error_count / self.request_count if self.request_count > 0 else 0.0 - + @property def throughput(self) -> float: """吞吐量(请求/秒)""" @@ -72,48 +74,49 @@ class DocumentMetrics: filename: str file_size: int parser: str - parse_time: Optional[float] = None - insert_time: Optional[float] = None - total_time: Optional[float] = None + parse_time: float | None = None + insert_time: float | None = None + total_time: float | None = None entity_count: int = 0 relation_count: int = 0 chunk_count: int = 0 status: str = "pending" # pending, processing, completed, failed - error: Optional[str] = None + error: str | None = None timestamp: datetime = field(default_factory=datetime.now) class MetricsCollector: """性能指标采集器""" - + def __init__(self): - self.api_metrics: Dict[str, APIMetrics] = {} - self.doc_metrics: List[DocumentMetrics] = [] - self.system_metrics: Dict[str, PerformanceMetric] = {} - self.alerts: List[Dict[str, Any]] = [] + self.api_metrics: dict[str, APIMetrics] = {} + self.doc_metrics: list[DocumentMetrics] = [] + self.system_metrics: dict[str, PerformanceMetric] = {} + self.alerts: list[dict[str, Any]] = [] self.lock = threading.Lock() - - def record_api_call(self, endpoint: str, method: str, + + def record_api_call(self, endpoint: str, method: str, response_time: float, status_code: int): """记录 API 调用""" key = f"{method} {endpoint}" - + with self.lock: if key not in self.api_metrics: self.api_metrics[key] = APIMetrics(endpoint=endpoint, method=method) - + metrics = self.api_metrics[key] metrics.response_times.append(response_time) metrics.status_codes[status_code] += 1 metrics.request_count += 1 - + # 限制历史数据大小 - if len(metrics.response_times) > 1000: - metrics.response_times = metrics.response_times[-500:] - + cache_size = get_config().metrics.response_times_cache_size + if len(metrics.response_times) > cache_size * 2: + metrics.response_times = metrics.response_times[-cache_size:] + if status_code >= 400: metrics.error_count += 1 - + # 检查是否触发告警 if metrics.error_rate > 0.1: # 错误率 > 10% self._add_alert( @@ -121,16 +124,17 @@ def record_api_call(self, endpoint: str, method: str, f"API {key} error rate: {metrics.error_rate:.1%}", severity="warning" ) - + def record_document(self, doc_metrics: DocumentMetrics): """记录文档处理指标""" with self.lock: self.doc_metrics.append(doc_metrics) - + # 限制历史数据大小 - if len(self.doc_metrics) > 10000: - self.doc_metrics = self.doc_metrics[-5000:] - + cache_size = get_config().metrics.doc_metrics_cache_size + if len(self.doc_metrics) > cache_size * 2: + self.doc_metrics = self.doc_metrics[-cache_size:] + # 检查处理时间告警 if doc_metrics.total_time and doc_metrics.total_time > 300: # > 5 分钟 self._add_alert( @@ -138,7 +142,7 @@ def record_document(self, doc_metrics: DocumentMetrics): f"Document {doc_metrics.filename} processing took {doc_metrics.total_time:.1f}s", severity="warning" ) - + # 检查处理失败 if doc_metrics.status == "failed": self._add_alert( @@ -146,15 +150,15 @@ def record_document(self, doc_metrics: DocumentMetrics): f"Document {doc_metrics.filename} processing failed: {doc_metrics.error}", severity="error" ) - - def record_system_metric(self, name: str, value: float, unit: str = "", - threshold: Optional[float] = None): + + def record_system_metric(self, name: str, value: float, unit: str = "", + threshold: float | None = None): """记录系统指标""" metric = PerformanceMetric(name=name, value=value, unit=unit, threshold=threshold) - + with self.lock: self.system_metrics[name] = metric - + # 检查阈值告警 if threshold and value > threshold: self._add_alert( @@ -162,7 +166,7 @@ def record_system_metric(self, name: str, value: float, unit: str = "", f"{name}: {value:.2f}{unit} (threshold: {threshold:.2f})", severity="warning" ) - + def _add_alert(self, alert_type: str, message: str, severity: str = "info"): """添加告警""" alert = { @@ -171,15 +175,16 @@ def _add_alert(self, alert_type: str, message: str, severity: str = "info"): "severity": severity, "timestamp": datetime.now().isoformat() } - + with self.lock: self.alerts.append(alert) - if len(self.alerts) > 1000: - self.alerts = self.alerts[-500:] - + cache_size = get_config().metrics.alerts_cache_size + if len(self.alerts) > cache_size * 2: + self.alerts = self.alerts[-cache_size:] + logger.warning(f"[ALERT] {alert_type}: {message}") - - def get_api_summary(self) -> Dict[str, Any]: + + def get_api_summary(self) -> dict[str, Any]: """获取 API 性能摘要""" with self.lock: summary = {} @@ -193,20 +198,20 @@ def get_api_summary(self) -> Dict[str, Any]: "throughput_req_per_sec": f"{metrics.throughput:.2f}" } return summary - - def get_document_summary(self) -> Dict[str, Any]: + + def get_document_summary(self) -> dict[str, Any]: """获取文档处理性能摘要""" with self.lock: if not self.doc_metrics: return {"total": 0, "summary": {}} - + completed = [m for m in self.doc_metrics if m.status == "completed"] failed = [m for m in self.doc_metrics if m.status == "failed"] - + parse_times = [m.parse_time for m in completed if m.parse_time] insert_times = [m.insert_time for m in completed if m.insert_time] total_times = [m.total_time for m in completed if m.total_time] - + return { "total_documents": len(self.doc_metrics), "completed": len(completed), @@ -218,35 +223,35 @@ def get_document_summary(self) -> Dict[str, Any]: "avg_insert_time_s": f"{statistics.mean(insert_times):.2f}" if insert_times else "N/A", "avg_total_time_s": f"{statistics.mean(total_times):.2f}" if total_times else "N/A", } - - def get_recent_alerts(self, limit: int = 50) -> List[Dict[str, Any]]: + + def get_recent_alerts(self, limit: int = 50) -> list[dict[str, Any]]: """获取最近的告警""" with self.lock: return self.alerts[-limit:] - + def collect_system_metrics(self): """采集系统性能指标""" try: # CPU 使用率 cpu_percent = psutil.cpu_percent(interval=1) self.record_system_metric("cpu_usage", cpu_percent, unit="%", threshold=80.0) - + # 内存使用率 memory = psutil.virtual_memory() self.record_system_metric("memory_usage", memory.percent, unit="%", threshold=85.0) - + # 磁盘使用率 disk = psutil.disk_usage('/') self.record_system_metric("disk_usage", disk.percent, unit="%", threshold=90.0) - + # 网络 I/O net_io = psutil.net_io_counters() self.record_system_metric("network_bytes_sent", net_io.bytes_sent / (1024 * 1024), unit="MB") self.record_system_metric("network_bytes_recv", net_io.bytes_recv / (1024 * 1024), unit="MB") - + except Exception as e: logger.warning(f"Failed to collect system metrics: {e}") - + def start_system_monitoring(self, interval: int = 60): """启动系统监控线程""" def monitoring_loop(): @@ -258,7 +263,7 @@ def monitoring_loop(): logger.error(f"Error in system monitoring: {e}") except asyncio.CancelledError: break - + monitor_thread = threading.Thread(target=monitoring_loop, daemon=True) monitor_thread.name = "MetricsMonitorThread" monitor_thread.start() @@ -284,17 +289,17 @@ def decorator(func): async def wrapper(*args, **kwargs): start_time = time.time() status_code = 200 # 默认状态码 - + try: result = await func(*args, **kwargs) return result - except Exception as e: + except Exception: status_code = 500 raise finally: response_time = time.time() - start_time collector = get_metrics_collector() collector.record_api_call(endpoint, method, response_time, status_code) - + return wrapper return decorator