Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ LLM_REQUESTS_PER_MINUTE=800 # 每分钟最大请求数(默认 800)
LLM_TOKENS_PER_MINUTE=40000 # 每分钟最大令牌数(包含输入+输出,默认 40000)

# LLM_MAX_ASYNC=8 # 【可选,专家模式】全局并发数
# # 未设置时,系统会自动计算:min(RPM, TPM / 3500)
# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens)
# # 推荐:不设置此项,让系统自动计算以确保不超过 TPM/RPM 限制
# # 计算示例:min(800, 40000/3500) = min(800, 11) = 11 并发

# LLM_AVG_TOKENS_PER_REQUEST=3500 # 【可选】每请求平均 Token 数(用于并发计算)
# # 默认 3500(Insert: 2840, Query: 3000-5000 的保守值)
# # 可按场景调整:Insert 密集场景可设为 2500,Query 密集场景可设为 4000

# ====== Embedding 配置 ======
# 用于向量化文本,支持语义检索
Expand All @@ -54,9 +57,13 @@ EMBEDDING_REQUESTS_PER_MINUTE=1600 # 每分钟最大请求数(默认 1600)
EMBEDDING_TOKENS_PER_MINUTE=400000 # 每分钟最大令牌数(默认 400000)

# EMBEDDING_MAX_ASYNC=32 # 【可选,专家模式】全局并发数
# # 未设置时,系统会自动计算:min(RPM, TPM / 500)
# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens)
# # 推荐:不设置此项,让系统自动计算
# # 计算示例:min(1600, 400000/500) = min(1600, 800) = 800 并发

# EMBEDDING_AVG_TOKENS_PER_REQUEST=20000 # 【可选】每请求平均 Token 数(用于并发计算)
# # 默认 20000(大批量文档 Embedding 观察值 ~17181)
# # 小文档批量场景可设为 5000-10000 以提升并发

EMBEDDING_TIMEOUT=30 # HTTP 请求超时(秒,默认 30)

# ====== Rerank 配置(重排序模型) ======
Expand All @@ -75,9 +82,12 @@ RERANK_REQUESTS_PER_MINUTE=1600 # 每分钟最大请求数(默认 1600)
RERANK_TOKENS_PER_MINUTE=400000 # 每分钟最大令牌数(默认 400000)

# RERANK_MAX_ASYNC=16 # 【可选,专家模式】全局并发数
# # 未设置时,系统会自动计算:min(RPM, TPM / 500)
# # 未设置时,系统会自动计算:min(RPM, TPM / avg_tokens)
# # 推荐:不设置此项,让系统自动计算
# # 计算示例:min(1600, 400000/500) = min(1600, 800) = 800 并发

# RERANK_AVG_TOKENS_PER_REQUEST=500 # 【可选】每请求平均 Token 数(用于并发计算)
# # 默认 500(文档评分平均值)

RERANK_TIMEOUT=30 # HTTP 请求超时(秒,默认 30)

# ====== MinerU 配置 ======
Expand Down Expand Up @@ -140,7 +150,10 @@ DEEPSEEK_OCR_DPI=200 # PDF 转图片 DPI(150=可能幻觉,200=稳
# DeepSeek-OCR 是 LLM 类型 API,使用与 LLM 相同的限制规则
DS_OCR_REQUESTS_PER_MINUTE=800 # 每分钟最大请求数(默认 800)
DS_OCR_TOKENS_PER_MINUTE=40000 # 每分钟最大令牌数(默认 40000)
# DS_OCR_MAX_ASYNC=8 # 【可选】全局默认并发数(未设置时使用硬编码默认值 8)
# DS_OCR_MAX_ASYNC=8 # 【可选】全局默认并发数(未设置时自动计算)

# DS_OCR_AVG_TOKENS_PER_REQUEST=3500 # 【可选】每请求平均 Token 数(用于并发计算)
# # 默认 3500(与 LLM 类似,OCR + 描述生成)

# ====== 智能 Parser 选择器配置(v2.0) ======
# 基于文档复杂度自动选择最优 Parser 和模式
Expand Down
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class LLMConfig(BaseSettings):
requests_per_minute: int = Field(default=800, description="Maximum requests per minute")
tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute (input + output)")
max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)")
avg_tokens_per_request: int = Field(default=3500, description="Average tokens per request for concurrency calculation (Insert: 2840, Query: 3000-5000)")

class Config:
env_prefix = "LLM_"
Expand Down Expand Up @@ -56,6 +57,7 @@ class EmbeddingConfig(BaseSettings):
requests_per_minute: int = Field(default=1600, description="Maximum requests per minute")
tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute")
max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)")
avg_tokens_per_request: int = Field(default=20000, description="Average tokens per request for concurrency calculation (Large batch: observed 17181 tokens/request)")
timeout: int = Field(default=30, description="HTTP request timeout (seconds)")

class Config:
Expand All @@ -80,6 +82,7 @@ class RerankConfig(BaseSettings):
requests_per_minute: int = Field(default=1600, description="Maximum requests per minute")
tokens_per_minute: int = Field(default=400000, description="Maximum tokens per minute")
max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)")
avg_tokens_per_request: int = Field(default=500, description="Average tokens per request for concurrency calculation (Document scoring average)")
timeout: int = Field(default=30, description="HTTP request timeout (seconds)")

class Config:
Expand Down Expand Up @@ -148,6 +151,7 @@ class DeepSeekOCRConfig(BaseSettings):
requests_per_minute: int = Field(default=800, description="Maximum requests per minute")
tokens_per_minute: int = Field(default=40000, description="Maximum tokens per minute")
max_async: Optional[int] = Field(default=None, description="Maximum concurrent requests (optional, auto-calculated if not set)")
avg_tokens_per_request: int = Field(default=3500, description="Average tokens per request for concurrency calculation (Similar to LLM: OCR + description)")

class Config:
env_prefix = "DS_OCR_"
Expand Down
3 changes: 2 additions & 1 deletion src/deepseek_ocr_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(self, config: Optional[DSSeekConfig] = None):
service="ds_ocr",
max_concurrent=getattr(self.config, 'max_async', config.ds_ocr.max_async),
requests_per_minute=getattr(self.config, 'requests_per_minute', config.ds_ocr.requests_per_minute),
tokens_per_minute=getattr(self.config, 'tokens_per_minute', config.ds_ocr.tokens_per_minute)
tokens_per_minute=getattr(self.config, 'tokens_per_minute', config.ds_ocr.tokens_per_minute),
avg_tokens_per_request=getattr(self.config, 'avg_tokens_per_request', config.ds_ocr.avg_tokens_per_request)
Comment on lines -83 to +84
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ds_ocr.avg_tokens_per_request
ds_ocr.tokens_per_minute
这两个参数的目的是什么?在没有设置TPM/RPM情况下估算吗?还有就是我记得关于Token我引入了Transform的tokenizer来计算,为什么还需要这两个参数呢? @copilot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这两个参数的作用不同:

  1. tokens_per_minute - API 服务商的 TPM 限制(如硅基流动 40000),用于 RateLimiter 控制每分钟最大 token 消耗,防止 429 错误。

  2. avg_tokens_per_request - 用于启动时预估安全并发数的参数。在 calculate_optimal_concurrent() 中计算:concurrent = TPM / avg_tokens / max_in_flight

虽然 count_tokens() 用 tiktoken 精确计算每个请求的实际 token(运行时扣减 TPM 配额),但 avg_tokens_per_request初始化时预估应允许多少并发请求。这是两个不同用途:

  • 运行时:count_tokens() 精确计算实际消耗
  • 初始化:avg_tokens_per_request 预估并发上限

不同场景(Insert vs Query)的平均 token 差异大,现在可配置这个预估值来优化并发。

)

logger.info(f"DeepSeek-OCR Client initialized: {self.config.base_url} (with rate limiting)")
Expand Down
16 changes: 12 additions & 4 deletions src/multi_tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,15 @@ def _create_llm_func(self, llm_config: Dict):
requests_per_minute = llm_config.get("requests_per_minute", config.llm.requests_per_minute)
tokens_per_minute = llm_config.get("tokens_per_minute", config.llm.tokens_per_minute)
max_concurrent = llm_config.get("max_async", None) # RateLimiter 的并发数(可选)
avg_tokens_per_request = llm_config.get("avg_tokens_per_request", config.llm.avg_tokens_per_request)

# 创建速率限制器(会自动计算 max_concurrent,除非显式提供)
rate_limiter = get_rate_limiter(
service="llm",
max_concurrent=max_concurrent, # 租户的 RateLimiter 配置
requests_per_minute=requests_per_minute,
tokens_per_minute=tokens_per_minute
tokens_per_minute=tokens_per_minute,
avg_tokens_per_request=avg_tokens_per_request
)

# 获取 rate_limiter 实际使用的并发数(将用于 LightRAG)
Expand Down Expand Up @@ -159,12 +161,14 @@ def _create_embedding_func(self, embedding_config: Dict):
requests_per_minute = embedding_config.get("requests_per_minute", config.embedding.requests_per_minute)
tokens_per_minute = embedding_config.get("tokens_per_minute", config.embedding.tokens_per_minute)
max_concurrent = embedding_config.get("max_async", config.embedding.max_async)
avg_tokens_per_request = embedding_config.get("avg_tokens_per_request", config.embedding.avg_tokens_per_request)

rate_limiter = get_rate_limiter(
service="embedding",
max_concurrent=max_concurrent,
requests_per_minute=requests_per_minute,
tokens_per_minute=tokens_per_minute
tokens_per_minute=tokens_per_minute,
avg_tokens_per_request=avg_tokens_per_request
)

# 获取 rate_limiter 实际使用的并发数(将用于 LightRAG)
Expand Down Expand Up @@ -224,12 +228,14 @@ def _create_rerank_func(self, rerank_config: Dict):
requests_per_minute = rerank_config.get("requests_per_minute", config.rerank.requests_per_minute)
tokens_per_minute = rerank_config.get("tokens_per_minute", config.rerank.tokens_per_minute)
max_concurrent = rerank_config.get("max_async", config.rerank.max_async)
avg_tokens_per_request = rerank_config.get("avg_tokens_per_request", config.rerank.avg_tokens_per_request)

rate_limiter = get_rate_limiter(
service="rerank",
max_concurrent=max_concurrent,
requests_per_minute=requests_per_minute,
tokens_per_minute=tokens_per_minute
tokens_per_minute=tokens_per_minute,
avg_tokens_per_request=avg_tokens_per_request
)

def rerank_func_with_rate_limit(query, documents, top_n=None, **kwargs):
Expand Down Expand Up @@ -287,12 +293,14 @@ def _create_vision_model_func(self, llm_config: Dict):
requests_per_minute = llm_config.get("requests_per_minute", config.llm.requests_per_minute)
tokens_per_minute = llm_config.get("tokens_per_minute", config.llm.tokens_per_minute)
max_concurrent = llm_config.get("max_async", config.llm.max_async)
avg_tokens_per_request = llm_config.get("avg_tokens_per_request", config.llm.avg_tokens_per_request)

rate_limiter = get_rate_limiter(
service="llm", # VLM 共享 LLM 的速率限制
max_concurrent=max_concurrent,
requests_per_minute=requests_per_minute,
tokens_per_minute=tokens_per_minute
tokens_per_minute=tokens_per_minute,
avg_tokens_per_request=avg_tokens_per_request
)

async def seed_vision_model_func(prompt: str, image_data: str, system_prompt: str) -> str:
Expand Down
84 changes: 43 additions & 41 deletions src/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ def get_rate_limiter(
service: str,
max_concurrent: Optional[int] = None,
requests_per_minute: Optional[int] = None,
tokens_per_minute: Optional[int] = None
tokens_per_minute: Optional[int] = None,
avg_tokens_per_request: Optional[int] = None
) -> AsyncSemaphoreWithRateLimit:
"""
Get or create a rate limiter for a specific service.
Expand All @@ -394,16 +395,18 @@ def get_rate_limiter(
Auto-calculation Formula:
concurrent = min(RPM, TPM / avg_tokens_per_request)

Average Token Estimation:
Average Token Estimation (configurable per tenant/service):
- LLM: 3500 tokens/request (insert + query scenarios)
- Embedding: 500 tokens/request (batch encoding)
- Embedding: 20000 tokens/request (large batch encoding)
- Rerank: 500 tokens/request (document scoring)
- DS_OCR: 3500 tokens/request (OCR + description)

Args:
service: Service name (e.g., "llm", "embedding", "rerank")
max_concurrent: Override max concurrent (tenant RateLimiter config)
requests_per_minute: Override RPM limit (tenant config)
tokens_per_minute: Override TPM limit (tenant config)
avg_tokens_per_request: Override average tokens per request (tenant config)

Returns:
AsyncSemaphoreWithRateLimit instance
Expand All @@ -415,28 +418,39 @@ def get_rate_limiter(
# Import config for global defaults
from src.config import config

# Token estimation per service (based on production observation)
avg_tokens_map = {
"llm": 3500, # Insert: 2840, Query: 3000-5000, Conservative: 3500
"embedding": 20000, # Large batch: observed 17181 tokens/request (2.4MB doc)
"rerank": 500, # Document scoring average
"ds_ocr": 3500 # Similar to LLM (OCR + description)
# Service-specific default configurations (fallback values)
SERVICE_DEFAULTS = {
"llm": {"rpm": 800, "tpm": 40000, "avg_tokens": 3500},
"embedding": {"rpm": 1600, "tpm": 400000, "avg_tokens": 20000},
"rerank": {"rpm": 1600, "tpm": 400000, "avg_tokens": 500},
"ds_ocr": {"rpm": 800, "tpm": 40000, "avg_tokens": 3500},
}

# Default RPM/TPM values (used if not provided)
default_rpm_tpm = {
"llm": (800, 40000),
"embedding": (1600, 400000),
"rerank": (1600, 400000),
"ds_ocr": (800, 40000)
}

default_rpm, default_tpm = default_rpm_tpm.get(service, (1000, 50000))
avg_tokens = avg_tokens_map.get(service, 3500)

# Get effective RPM/TPM (tenant config > global config)
effective_rpm = requests_per_minute or default_rpm
effective_tpm = tokens_per_minute or default_tpm
# Get service-specific config object and defaults
defaults = SERVICE_DEFAULTS.get(service, {"rpm": 1000, "tpm": 50000, "avg_tokens": 3500})

if service == "llm":
service_config = config.llm
elif service == "embedding":
service_config = config.embedding
elif service == "rerank":
service_config = config.rerank
elif service == "ds_ocr":
service_config = config.ds_ocr if hasattr(config, 'ds_ocr') else None
else:
service_config = None

# Get effective values with priority: param > config > default
if service_config:
effective_rpm = requests_per_minute or getattr(service_config, 'requests_per_minute', defaults["rpm"])
effective_tpm = tokens_per_minute or getattr(service_config, 'tokens_per_minute', defaults["tpm"])
effective_avg_tokens = avg_tokens_per_request or getattr(service_config, 'avg_tokens_per_request', defaults["avg_tokens"])
env_max_async = getattr(service_config, 'max_async', None)
else:
effective_rpm = requests_per_minute or defaults["rpm"]
effective_tpm = tokens_per_minute or defaults["tpm"]
effective_avg_tokens = avg_tokens_per_request or defaults["avg_tokens"]
env_max_async = None

# Determine final concurrent value
final_concurrent = None
Expand All @@ -448,28 +462,16 @@ def get_rate_limiter(
config_source = "tenant"

# Priority 2: Environment variable (expert mode)
if final_concurrent is None:
if service == "llm":
env_max_async = getattr(config.llm, 'max_async', None)
elif service == "embedding":
env_max_async = getattr(config.embedding, 'max_async', None)
elif service == "rerank":
env_max_async = getattr(config.rerank, 'max_async', None)
elif service == "ds_ocr":
env_max_async = getattr(config.ds_ocr, 'max_async', None) if hasattr(config, 'ds_ocr') else None
else:
env_max_async = None

if env_max_async is not None:
final_concurrent = env_max_async
config_source = "env"
if final_concurrent is None and env_max_async is not None:
final_concurrent = env_max_async
config_source = "env"

# Priority 3: Auto-calculate (default behavior)
if final_concurrent is None:
final_concurrent = calculate_optimal_concurrent(
requests_per_minute=effective_rpm,
tokens_per_minute=effective_tpm,
avg_tokens_per_request=avg_tokens,
avg_tokens_per_request=effective_avg_tokens,
max_in_flight=30 # Conservative: max 30 requests in-flight
)
config_source = "auto"
Expand All @@ -478,7 +480,7 @@ def get_rate_limiter(
if final_concurrent < 2:
logger.warning(
f"[{service.upper()}] Calculated concurrent < 2 "
f"(RPM={effective_rpm}, TPM={effective_tpm}, avg_tokens={avg_tokens}, max_in_flight=30). "
f"(RPM={effective_rpm}, TPM={effective_tpm}, avg_tokens={effective_avg_tokens}, max_in_flight=30). "
f"Setting to 2. Consider increasing TPM/RPM limits."
)
final_concurrent = 2
Expand All @@ -495,7 +497,7 @@ def get_rate_limiter(
if config_source == "auto":
logger.info(
f"Created rate limiter for {service.upper()}: "
f"concurrent={final_concurrent} (auto: TPM={effective_tpm} / avg_tokens={avg_tokens} / max_in_flight=30), "
f"concurrent={final_concurrent} (auto: TPM={effective_tpm} / avg_tokens={effective_avg_tokens} / max_in_flight=30), "
f"RPM={effective_rpm}, TPM={effective_tpm}"
)
else:
Expand Down
4 changes: 4 additions & 0 deletions src/tenant_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def _merge_llm_config(self, tenant_config: Optional[TenantConfigModel]) -> Dict[
# 速率限制配置
"requests_per_minute": config.llm.requests_per_minute,
"tokens_per_minute": config.llm.tokens_per_minute,
"avg_tokens_per_request": config.llm.avg_tokens_per_request, # 用于并发计算的平均 token 数
}

if tenant_config and tenant_config.llm_config:
Expand All @@ -330,6 +331,7 @@ def _merge_embedding_config(self, tenant_config: Optional[TenantConfigModel]) ->
"requests_per_minute": config.embedding.requests_per_minute,
"tokens_per_minute": config.embedding.tokens_per_minute,
"max_async": config.embedding.max_async, # RateLimiter 的并发数(可选)
"avg_tokens_per_request": config.embedding.avg_tokens_per_request, # 用于并发计算的平均 token 数
"timeout": config.embedding.timeout,
}

Expand All @@ -351,6 +353,7 @@ def _merge_rerank_config(self, tenant_config: Optional[TenantConfigModel]) -> Di
"requests_per_minute": config.rerank.requests_per_minute,
"tokens_per_minute": config.rerank.tokens_per_minute,
"max_async": config.rerank.max_async, # RateLimiter 的并发数(可选)
"avg_tokens_per_request": config.rerank.avg_tokens_per_request, # 用于并发计算的平均 token 数
"timeout": config.rerank.timeout,
}

Expand All @@ -376,6 +379,7 @@ def _merge_ds_ocr_config(self, tenant_config: Optional[TenantConfigModel]) -> Di
"requests_per_minute": config.ds_ocr.requests_per_minute,
"tokens_per_minute": config.ds_ocr.tokens_per_minute,
"max_async": config.ds_ocr.max_async,
"avg_tokens_per_request": config.ds_ocr.avg_tokens_per_request, # 用于并发计算的平均 token 数
}

if tenant_config and tenant_config.ds_ocr_config:
Expand Down