diff --git a/env.example b/env.example index db7a4f2..797b97c 100644 --- a/env.example +++ b/env.example @@ -27,9 +27,12 @@ LLM_REQUESTS_PER_MINUTE=800 # 每分钟最大请求数(默认 800) LLM_TOKENS_PER_MINUTE=40000 # 每分钟最大令牌数(包含输入+输出,默认 40000) # LLM_MAX_ASYNC=8 # 【可选,专家模式】全局并发数 -# # 未设置时,系统会自动计算:min(RPM, TPM / 3500) +# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens) # # 推荐:不设置此项,让系统自动计算以确保不超过 TPM/RPM 限制 -# # 计算示例:min(800, 40000/3500) = min(800, 11) = 11 并发 + +# LLM_AVG_TOKENS_PER_REQUEST=3500 # 【可选】每请求平均 Token 数(用于并发计算) +# # 默认 3500(Insert: 2840, Query: 3000-5000 的保守值) +# # 可按场景调整:Insert 密集场景可设为 2500,Query 密集场景可设为 4000 # ====== Embedding 配置 ====== # 用于向量化文本,支持语义检索 @@ -54,9 +57,13 @@ EMBEDDING_REQUESTS_PER_MINUTE=1600 # 每分钟最大请求数(默认 1600) EMBEDDING_TOKENS_PER_MINUTE=400000 # 每分钟最大令牌数(默认 400000) # EMBEDDING_MAX_ASYNC=32 # 【可选,专家模式】全局并发数 -# # 未设置时,系统会自动计算:min(RPM, TPM / 500) +# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens) # # 推荐:不设置此项,让系统自动计算 -# # 计算示例:min(1600, 400000/500) = min(1600, 800) = 800 并发 + +# EMBEDDING_AVG_TOKENS_PER_REQUEST=20000 # 【可选】每请求平均 Token 数(用于并发计算) +# # 默认 20000(大批量文档 Embedding 观察值 ~17181) +# # 小文档批量场景可设为 5000-10000 以提升并发 + EMBEDDING_TIMEOUT=30 # HTTP 请求超时(秒,默认 30) # ====== Rerank 配置(重排序模型) ====== @@ -75,9 +82,12 @@ RERANK_REQUESTS_PER_MINUTE=1600 # 每分钟最大请求数(默认 1600) RERANK_TOKENS_PER_MINUTE=400000 # 每分钟最大令牌数(默认 400000) # RERANK_MAX_ASYNC=16 # 【可选,专家模式】全局并发数 -# # 未设置时,系统会自动计算:min(RPM, TPM / 500) +# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens) # # 推荐:不设置此项,让系统自动计算 -# # 计算示例:min(1600, 400000/500) = min(1600, 800) = 800 并发 + +# RERANK_AVG_TOKENS_PER_REQUEST=500 # 【可选】每请求平均 Token 数(用于并发计算) +# # 默认 500(文档评分平均值) + RERANK_TIMEOUT=30 # HTTP 请求超时(秒,默认 30) # ====== MinerU 配置 ====== @@ -140,7 +150,10 @@ DEEPSEEK_OCR_DPI=200 # PDF 转图片 DPI(150=可能幻觉,200=稳 # DeepSeek-OCR 是 LLM 类型 API,使用与 LLM 相同的限制规则 DS_OCR_REQUESTS_PER_MINUTE=800 # 每分钟最大请求数(默认 800) DS_OCR_TOKENS_PER_MINUTE=40000 # 每分钟最大令牌数(默认 40000) -# DS_OCR_MAX_ASYNC=8 # 【可选】全局默认并发数(未设置时使用硬编码默认值 8) +# DS_OCR_MAX_ASYNC=8 # 【可选】全局默认并发数(未设置时自动计算) + +# DS_OCR_AVG_TOKENS_PER_REQUEST=3500 # 【可选】每请求平均 Token 数(用于并发计算) +# # 默认 3500(与 LLM 类似,OCR + 描述生成) # ====== 智能 Parser 选择器配置(v2.0) ====== # 基于文档复杂度自动选择最优 Parser 和模式 diff --git a/src/config.py b/src/config.py index 0d1f8f5..5414fb8 100644 --- a/src/config.py +++ b/src/config.py @@ -29,6 +29,7 @@ class LLMConfig(BaseSettings): requests_per_minute: int = Field(default=800, description="Maximum requests per minute") tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute (input + output)") max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + avg_tokens_per_request: int = Field(default=3500, description="Average tokens per request for concurrency calculation (Insert: 2840, Query: 3000-5000)") class Config: env_prefix = "LLM_" @@ -56,6 +57,7 @@ class EmbeddingConfig(BaseSettings): requests_per_minute: int = Field(default=1600, description="Maximum requests per minute") tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute") max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + avg_tokens_per_request: int = Field(default=20000, description="Average tokens per request for concurrency calculation (Large batch: observed 17181 tokens/request)") timeout: int = Field(default=30, description="HTTP request timeout (seconds)") class Config: @@ -80,6 +82,7 @@ class RerankConfig(BaseSettings): requests_per_minute: int = Field(default=1600, description="Maximum requests per minute") tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute") max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + avg_tokens_per_request: int = Field(default=500, description="Average tokens per request for concurrency calculation (Document scoring average)") timeout: int = Field(default=30, description="HTTP request timeout (seconds)") class Config: @@ -148,6 +151,7 @@ class DeepSeekOCRConfig(BaseSettings): requests_per_minute: int = Field(default=800, description="Maximum requests per minute") tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute") max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)") + avg_tokens_per_request: int = Field(default=3500, description="Average tokens per request for concurrency calculation (Similar to LLM: OCR + description)") class Config: env_prefix = "DS_OCR_" diff --git a/src/deepseek_ocr_client.py b/src/deepseek_ocr_client.py index 1cfcc76..d2cf379 100644 --- a/src/deepseek_ocr_client.py +++ b/src/deepseek_ocr_client.py @@ -80,7 +80,8 @@ def __init__(self, config: Optional[DSSeekConfig] = None): service="ds_ocr", max_concurrent=getattr(self.config, 'max_async', config.ds_ocr.max_async), requests_per_minute=getattr(self.config, 'requests_per_minute', config.ds_ocr.requests_per_minute), - tokens_per_minute=getattr(self.config, 'tokens_per_minute', config.ds_ocr.tokens_per_minute) + tokens_per_minute=getattr(self.config, 'tokens_per_minute', config.ds_ocr.tokens_per_minute), + avg_tokens_per_request=getattr(self.config, 'avg_tokens_per_request', config.ds_ocr.avg_tokens_per_request) ) logger.info(f"DeepSeek-OCR Client initialized: {self.config.base_url} (with rate limiting)") diff --git a/src/multi_tenant.py b/src/multi_tenant.py index d4cb0b1..d5173b6 100644 --- a/src/multi_tenant.py +++ b/src/multi_tenant.py @@ -94,13 +94,15 @@ def _create_llm_func(self, llm_config: Dict): requests_per_minute = llm_config.get("requests_per_minute", config.llm.requests_per_minute) tokens_per_minute = llm_config.get("tokens_per_minute", config.llm.tokens_per_minute) max_concurrent = llm_config.get("max_async", None) # RateLimiter 的并发数(可选) + avg_tokens_per_request = llm_config.get("avg_tokens_per_request", config.llm.avg_tokens_per_request) # 创建速率限制器(会自动计算 max_concurrent,除非显式提供) rate_limiter = get_rate_limiter( service="llm", max_concurrent=max_concurrent, # 租户的 RateLimiter 配置 requests_per_minute=requests_per_minute, - tokens_per_minute=tokens_per_minute + tokens_per_minute=tokens_per_minute, + avg_tokens_per_request=avg_tokens_per_request ) # 获取 rate_limiter 实际使用的并发数(将用于 LightRAG) @@ -159,12 +161,14 @@ def _create_embedding_func(self, embedding_config: Dict): requests_per_minute = embedding_config.get("requests_per_minute", config.embedding.requests_per_minute) tokens_per_minute = embedding_config.get("tokens_per_minute", config.embedding.tokens_per_minute) max_concurrent = embedding_config.get("max_async", config.embedding.max_async) + avg_tokens_per_request = embedding_config.get("avg_tokens_per_request", config.embedding.avg_tokens_per_request) rate_limiter = get_rate_limiter( service="embedding", max_concurrent=max_concurrent, requests_per_minute=requests_per_minute, - tokens_per_minute=tokens_per_minute + tokens_per_minute=tokens_per_minute, + avg_tokens_per_request=avg_tokens_per_request ) # 获取 rate_limiter 实际使用的并发数(将用于 LightRAG) @@ -224,12 +228,14 @@ def _create_rerank_func(self, rerank_config: Dict): requests_per_minute = rerank_config.get("requests_per_minute", config.rerank.requests_per_minute) tokens_per_minute = rerank_config.get("tokens_per_minute", config.rerank.tokens_per_minute) max_concurrent = rerank_config.get("max_async", config.rerank.max_async) + avg_tokens_per_request = rerank_config.get("avg_tokens_per_request", config.rerank.avg_tokens_per_request) rate_limiter = get_rate_limiter( service="rerank", max_concurrent=max_concurrent, requests_per_minute=requests_per_minute, - tokens_per_minute=tokens_per_minute + tokens_per_minute=tokens_per_minute, + avg_tokens_per_request=avg_tokens_per_request ) def rerank_func_with_rate_limit(query, documents, top_n=None, **kwargs): @@ -287,12 +293,14 @@ def _create_vision_model_func(self, llm_config: Dict): requests_per_minute = llm_config.get("requests_per_minute", config.llm.requests_per_minute) tokens_per_minute = llm_config.get("tokens_per_minute", config.llm.tokens_per_minute) max_concurrent = llm_config.get("max_async", config.llm.max_async) + avg_tokens_per_request = llm_config.get("avg_tokens_per_request", config.llm.avg_tokens_per_request) rate_limiter = get_rate_limiter( service="llm", # VLM 共享 LLM 的速率限制 max_concurrent=max_concurrent, requests_per_minute=requests_per_minute, - tokens_per_minute=tokens_per_minute + tokens_per_minute=tokens_per_minute, + avg_tokens_per_request=avg_tokens_per_request ) async def seed_vision_model_func(prompt: str, image_data: str, system_prompt: str) -> str: diff --git a/src/rate_limiter.py b/src/rate_limiter.py index aa926e9..9b0e8f7 100644 --- a/src/rate_limiter.py +++ b/src/rate_limiter.py @@ -379,7 +379,8 @@ def get_rate_limiter( service: str, max_concurrent: Optional[int] = None, requests_per_minute: Optional[int] = None, - tokens_per_minute: Optional[int] = None + tokens_per_minute: Optional[int] = None, + avg_tokens_per_request: Optional[int] = None ) -> AsyncSemaphoreWithRateLimit: """ Get or create a rate limiter for a specific service. @@ -394,16 +395,18 @@ def get_rate_limiter( Auto-calculation Formula: concurrent = min(RPM, TPM / avg_tokens_per_request) - Average Token Estimation: + Average Token Estimation (configurable per tenant/service): - LLM: 3500 tokens/request (insert + query scenarios) - - Embedding: 500 tokens/request (batch encoding) + - Embedding: 20000 tokens/request (large batch encoding) - Rerank: 500 tokens/request (document scoring) + - DS_OCR: 3500 tokens/request (OCR + description) Args: service: Service name (e.g., "llm", "embedding", "rerank") max_concurrent: Override max concurrent (tenant RateLimiter config) requests_per_minute: Override RPM limit (tenant config) tokens_per_minute: Override TPM limit (tenant config) + avg_tokens_per_request: Override average tokens per request (tenant config) Returns: AsyncSemaphoreWithRateLimit instance @@ -415,28 +418,39 @@ def get_rate_limiter( # Import config for global defaults from src.config import config - # Token estimation per service (based on production observation) - avg_tokens_map = { - "llm": 3500, # Insert: 2840, Query: 3000-5000, Conservative: 3500 - "embedding": 20000, # Large batch: observed 17181 tokens/request (2.4MB doc) - "rerank": 500, # Document scoring average - "ds_ocr": 3500 # Similar to LLM (OCR + description) + # Service-specific default configurations (fallback values) + SERVICE_DEFAULTS = { + "llm": {"rpm": 800, "tpm": 40000, "avg_tokens": 3500}, + "embedding": {"rpm": 1600, "tpm": 400000, "avg_tokens": 20000}, + "rerank": {"rpm": 1600, "tpm": 400000, "avg_tokens": 500}, + "ds_ocr": {"rpm": 800, "tpm": 40000, "avg_tokens": 3500}, } - # Default RPM/TPM values (used if not provided) - default_rpm_tpm = { - "llm": (800, 40000), - "embedding": (1600, 400000), - "rerank": (1600, 400000), - "ds_ocr": (800, 40000) - } - - default_rpm, default_tpm = default_rpm_tpm.get(service, (1000, 50000)) - avg_tokens = avg_tokens_map.get(service, 3500) - - # Get effective RPM/TPM (tenant config > global config) - effective_rpm = requests_per_minute or default_rpm - effective_tpm = tokens_per_minute or default_tpm + # Get service-specific config object and defaults + defaults = SERVICE_DEFAULTS.get(service, {"rpm": 1000, "tpm": 50000, "avg_tokens": 3500}) + + if service == "llm": + service_config = config.llm + elif service == "embedding": + service_config = config.embedding + elif service == "rerank": + service_config = config.rerank + elif service == "ds_ocr": + service_config = config.ds_ocr if hasattr(config, 'ds_ocr') else None + else: + service_config = None + + # Get effective values with priority: param > config > default + if service_config: + effective_rpm = requests_per_minute or getattr(service_config, 'requests_per_minute', defaults["rpm"]) + effective_tpm = tokens_per_minute or getattr(service_config, 'tokens_per_minute', defaults["tpm"]) + effective_avg_tokens = avg_tokens_per_request or getattr(service_config, 'avg_tokens_per_request', defaults["avg_tokens"]) + env_max_async = getattr(service_config, 'max_async', None) + else: + effective_rpm = requests_per_minute or defaults["rpm"] + effective_tpm = tokens_per_minute or defaults["tpm"] + effective_avg_tokens = avg_tokens_per_request or defaults["avg_tokens"] + env_max_async = None # Determine final concurrent value final_concurrent = None @@ -448,28 +462,16 @@ def get_rate_limiter( config_source = "tenant" # Priority 2: Environment variable (expert mode) - if final_concurrent is None: - if service == "llm": - env_max_async = getattr(config.llm, 'max_async', None) - elif service == "embedding": - env_max_async = getattr(config.embedding, 'max_async', None) - elif service == "rerank": - env_max_async = getattr(config.rerank, 'max_async', None) - elif service == "ds_ocr": - env_max_async = getattr(config.ds_ocr, 'max_async', None) if hasattr(config, 'ds_ocr') else None - else: - env_max_async = None - - if env_max_async is not None: - final_concurrent = env_max_async - config_source = "env" + if final_concurrent is None and env_max_async is not None: + final_concurrent = env_max_async + config_source = "env" # Priority 3: Auto-calculate (default behavior) if final_concurrent is None: final_concurrent = calculate_optimal_concurrent( requests_per_minute=effective_rpm, tokens_per_minute=effective_tpm, - avg_tokens_per_request=avg_tokens, + avg_tokens_per_request=effective_avg_tokens, max_in_flight=30 # Conservative: max 30 requests in-flight ) config_source = "auto" @@ -478,7 +480,7 @@ def get_rate_limiter( if final_concurrent < 2: logger.warning( f"[{service.upper()}] Calculated concurrent < 2 " - f"(RPM={effective_rpm}, TPM={effective_tpm}, avg_tokens={avg_tokens}, max_in_flight=30). " + f"(RPM={effective_rpm}, TPM={effective_tpm}, avg_tokens={effective_avg_tokens}, max_in_flight=30). " f"Setting to 2. Consider increasing TPM/RPM limits." ) final_concurrent = 2 @@ -495,7 +497,7 @@ def get_rate_limiter( if config_source == "auto": logger.info( f"Created rate limiter for {service.upper()}: " - f"concurrent={final_concurrent} (auto: TPM={effective_tpm} / avg_tokens={avg_tokens} / max_in_flight=30), " + f"concurrent={final_concurrent} (auto: TPM={effective_tpm} / avg_tokens={effective_avg_tokens} / max_in_flight=30), " f"RPM={effective_rpm}, TPM={effective_tpm}" ) else: diff --git a/src/tenant_config.py b/src/tenant_config.py index 23a3f2f..68dc037 100644 --- a/src/tenant_config.py +++ b/src/tenant_config.py @@ -309,6 +309,7 @@ def _merge_llm_config(self, tenant_config: Optional[TenantConfigModel]) -> Dict[ # 速率限制配置 "requests_per_minute": config.llm.requests_per_minute, "tokens_per_minute": config.llm.tokens_per_minute, + "avg_tokens_per_request": config.llm.avg_tokens_per_request, # 用于并发计算的平均 token 数 } if tenant_config and tenant_config.llm_config: @@ -330,6 +331,7 @@ def _merge_embedding_config(self, tenant_config: Optional[TenantConfigModel]) -> "requests_per_minute": config.embedding.requests_per_minute, "tokens_per_minute": config.embedding.tokens_per_minute, "max_async": config.embedding.max_async, # RateLimiter 的并发数(可选) + "avg_tokens_per_request": config.embedding.avg_tokens_per_request, # 用于并发计算的平均 token 数 "timeout": config.embedding.timeout, } @@ -351,6 +353,7 @@ def _merge_rerank_config(self, tenant_config: Optional[TenantConfigModel]) -> Di "requests_per_minute": config.rerank.requests_per_minute, "tokens_per_minute": config.rerank.tokens_per_minute, "max_async": config.rerank.max_async, # RateLimiter 的并发数(可选) + "avg_tokens_per_request": config.rerank.avg_tokens_per_request, # 用于并发计算的平均 token 数 "timeout": config.rerank.timeout, } @@ -376,6 +379,7 @@ def _merge_ds_ocr_config(self, tenant_config: Optional[TenantConfigModel]) -> Di "requests_per_minute": config.ds_ocr.requests_per_minute, "tokens_per_minute": config.ds_ocr.tokens_per_minute, "max_async": config.ds_ocr.max_async, + "avg_tokens_per_request": config.ds_ocr.avg_tokens_per_request, # 用于并发计算的平均 token 数 } if tenant_config and tenant_config.ds_ocr_config: