Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion apps/api/routers/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def start_topic_wiki_task(
fn=_run_topic_wiki_task,
keyword=keyword,
limit=limit,
category="generation",
)
return {"task_id": task_id, "status": "pending"}

Expand Down Expand Up @@ -181,13 +182,19 @@ def daily_brief(req: DailyBriefRequest) -> dict:

def _generate_fn(progress_callback=None):
# publish() 内部已写入 generated_content 表,无需重复
return brief_service.publish(recipient=recipient)
if progress_callback:
progress_callback("正在生成每日简报...", 20, 100)
result = brief_service.publish(recipient=recipient)
if progress_callback:
progress_callback("简报生成完成", 95, 100)
return result

task_id = global_tracker.submit(
task_type="daily_brief",
title="📰 生成每日简报",
fn=_generate_fn,
total=100,
category="generation",
)
return {
"task_id": task_id,
Expand Down
18 changes: 17 additions & 1 deletion apps/api/routers/cs_feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,35 @@ def _fetch_fn(progress_callback=None):
from packages.storage.db import session_scope
from packages.storage.repositories import PaperRepository

if progress_callback:
progress_callback("正在获取论文列表...", 10, 100)
client = ArxivClient()
papers = client.fetch_latest(
query=f"cat:{category_code}",
max_results=sub.daily_limit,
days_back=7,
)

total_papers = len(papers)
if progress_callback:
progress_callback(f"开始入库 ({total_papers} 篇)...", 50, 100)

count = 0
with session_scope() as session:
paper_repo = PaperRepository(session)
for p in papers:
for i, p in enumerate(papers):
paper_repo.upsert_paper(p)
count += 1
if progress_callback:
progress_callback(
f"入库中 ({i + 1}/{total_papers})...",
50 + int((i + 1) / total_papers * 40),
100,
)
repo.update_run_status(category_code, count)

if progress_callback:
progress_callback("抓取完成", 95, 100)
return {"fetched": count}

task_id = global_tracker.submit(
Expand Down
33 changes: 26 additions & 7 deletions apps/api/routers/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ def sync_citations_incremental(
"""增量同步引用(后台执行)"""

def _fn(progress_callback=None):
return graph_service.sync_incremental(
if progress_callback:
progress_callback("正在同步增量引用...", 20, 100)
result = graph_service.sync_incremental(
paper_limit=paper_limit,
edge_limit_per_paper=edge_limit_per_paper,
)
if progress_callback:
progress_callback("增量引用同步完成", 90, 100)
return result

task_id = global_tracker.submit("citation_sync", "📊 增量引用同步", _fn)
task_id = global_tracker.submit("citation_sync", "📊 增量引用同步", _fn, category="sync")
return {"task_id": task_id, "message": "增量引用同步已启动", "status": "running"}


Expand All @@ -52,13 +57,20 @@ def sync_citations_for_topic(
pass

def _fn(progress_callback=None):
return graph_service.sync_citations_for_topic(
if progress_callback:
progress_callback("正在同步主题引用...", 20, 100)
result = graph_service.sync_citations_for_topic(
topic_id=topic_id,
paper_limit=paper_limit,
edge_limit_per_paper=edge_limit_per_paper,
)
if progress_callback:
progress_callback("主题引用同步完成", 90, 100)
return result

task_id = global_tracker.submit("citation_sync", f"📊 主题引用同步: {topic_name}", _fn)
task_id = global_tracker.submit(
"citation_sync", f"📊 主题引用同步:{topic_name}", _fn, category="sync"
)
return {"task_id": task_id, "message": f"主题引用同步已启动: {topic_name}", "status": "running"}


Expand All @@ -71,9 +83,16 @@ def sync_citations(
paper_title = get_paper_title(UUID(paper_id)) or paper_id[:8]

def _fn(progress_callback=None):
return graph_service.sync_citations_for_paper(paper_id=paper_id, limit=limit)

task_id = global_tracker.submit("citation_sync", f"📄 引用同步: {paper_title[:30]}", _fn)
if progress_callback:
progress_callback("正在同步论文引用...", 20, 100)
result = graph_service.sync_citations_for_paper(paper_id=paper_id, limit=limit)
if progress_callback:
progress_callback("论文引用同步完成", 90, 100)
return result

task_id = global_tracker.submit(
"citation_sync", f"📄 引用同步:{paper_title[:30]}", _fn, category="sync"
)
return {"task_id": task_id, "message": "论文引用同步已启动", "status": "running"}


Expand Down
67 changes: 60 additions & 7 deletions apps/api/routers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from fastapi import APIRouter, BackgroundTasks, HTTPException, Query

from packages.ai.daily_runner import run_daily_brief, run_daily_ingest, run_weekly_graph_maintenance
from packages.ai.daily_runner import run_daily_brief, run_daily_ingest
from packages.domain.enums import ReadStatus
from packages.domain.task_tracker import global_tracker
from packages.storage.db import session_scope
Expand All @@ -31,7 +31,7 @@ def _fn(progress_callback=None):
brief = run_daily_brief()
return {"ingest": ingest, "brief": brief}

task_id = global_tracker.submit("daily_job", "📅 每日任务执行", _fn)
task_id = global_tracker.submit("daily_job", "📅 每日任务执行", _fn, category="report")
return {"task_id": task_id, "message": "每日任务已启动", "status": "running"}


Expand All @@ -40,9 +40,54 @@ def run_weekly_graph_once() -> dict:
"""每周图维护任务 - 后台执行"""

def _fn(progress_callback=None):
return run_weekly_graph_maintenance()
from packages.ai.graph_service import GraphService
from packages.storage.db import session_scope
from packages.storage.repositories import TopicRepository

task_id = global_tracker.submit("weekly_maintenance", "🔄 每周图维护", _fn)
if progress_callback:
progress_callback("正在获取主题列表...", 10, 100)

with session_scope() as session:
topics = TopicRepository(session).list_topics(enabled_only=True)

total_topics = len(topics)
graph = GraphService()
topic_results = []

for i, t in enumerate(topics):
if progress_callback:
progress_callback(
f"处理主题 {i + 1}/{total_topics}: {t.name[:20]}...",
20 + int((i + 1) / total_topics * 40),
100,
)
try:
topic_results.append(
graph.sync_citations_for_topic(
topic_id=t.id,
paper_limit=20,
edge_limit_per_paper=6,
)
)
except Exception:
logger.exception(
"Failed to sync citations for topic %s",
t.id,
)
continue

if progress_callback:
progress_callback("正在执行增量同步...", 70, 100)
incremental = graph.sync_incremental(paper_limit=50, edge_limit_per_paper=6)

if progress_callback:
progress_callback("图维护完成", 95, 100)
return {
"topic_sync": topic_results,
"incremental": incremental,
}

task_id = global_tracker.submit("weekly_maintenance", "🔄 每周图维护", _fn, category="sync")
return {"task_id": task_id, "message": "每周图维护已启动", "status": "running"}


Expand Down Expand Up @@ -79,7 +124,11 @@ def _run_batch():
failed = 0
try:
global_tracker.start(
task_id, "batch_process", f"📚 批量处理未读论文 ({total} 篇)", total=total
task_id,
"batch_process",
f"📚 批量处理未读论文 ({total} 篇)",
total=total,
category="analysis",
)

with ThreadPoolExecutor(max_workers=PAPER_CONCURRENCY) as pool:
Expand Down Expand Up @@ -204,7 +253,9 @@ async def run_daily_report_once(background_tasks: BackgroundTasks):

def _run_workflow_bg():
task_id = f"daily_report_{_uuid.uuid4().hex[:8]}"
global_tracker.start(task_id, "daily_report", "📊 每日报告工作流", total=100)
global_tracker.start(
task_id, "daily_report", "📊 每日报告工作流", total=100, category="report"
)

def _progress(msg: str, cur: int, tot: int):
global_tracker.update(task_id, cur, msg, total=100)
Expand Down Expand Up @@ -235,7 +286,9 @@ async def run_daily_report_send_only(

def _run_send_only_bg():
task_id = f"report_send_{_uuid.uuid4().hex[:8]}"
global_tracker.start(task_id, "report_send", "📧 快速发送简报", total=100)
global_tracker.start(
task_id, "report_send", "📧 快速发送简报", total=100, category="report"
)

def _progress(msg: str, cur: int, tot: int):
global_tracker.update(task_id, cur, msg, total=100)
Expand Down
76 changes: 51 additions & 25 deletions apps/api/routers/papers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from uuid import UUID

import httpx
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import FileResponse

Expand All @@ -14,6 +15,18 @@
from packages.storage.db import session_scope
from packages.storage.repositories import PaperRepository

# 全局 HTTP 客户端复用(避免每次请求创建新客户端)
_http_client: httpx.AsyncClient | None = None


def _get_http_client() -> httpx.AsyncClient:
"""获取或创建全局 HTTP 客户端"""
global _http_client
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(timeout=60.0, follow_redirects=True)
return _http_client


router = APIRouter()


Expand Down Expand Up @@ -78,37 +91,34 @@ def recommended_papers(top_k: int = Query(default=10, ge=1, le=50)) -> dict:
@router.get("/papers/proxy-arxiv-pdf/{arxiv_id:path}")
async def proxy_arxiv_pdf(arxiv_id: str):
"""代理访问 arXiv PDF(解决 CORS 问题)"""
import httpx

# 清理 arxiv_id(移除版本号)
clean_id = arxiv_id.split("v")[0]
arxiv_url = f"https://arxiv.org/pdf/{clean_id}.pdf"

try:
# 使用后端服务器访问 arXiv(绕过 CORS)
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(arxiv_url, follow_redirects=True)

if response.status_code == 404:
raise HTTPException(status_code=404, detail=f"arXiv 论文不存在:{clean_id}")

if response.status_code != 200:
raise HTTPException(
status_code=500, detail=f"arXiv 访问失败:{response.status_code}"
)

# 返回 PDF 内容
from fastapi.responses import Response

return Response(
content=response.content,
media_type="application/pdf",
headers={
"Access-Control-Allow-Origin": "*",
"Content-Disposition": f'inline; filename="{clean_id}.pdf"',
"Cache-Control": "public, max-age=3600",
},
)
client = _get_http_client()
response = await client.get(arxiv_url, follow_redirects=True)

if response.status_code == 404:
raise HTTPException(status_code=404, detail=f"arXiv 论文不存在:{clean_id}")

if response.status_code != 200:
raise HTTPException(status_code=500, detail=f"arXiv 访问失败:{response.status_code}")

# 返回 PDF 内容
from fastapi.responses import Response

return Response(
content=response.content,
media_type="application/pdf",
headers={
"Access-Control-Allow-Origin": "*",
"Content-Disposition": f'inline; filename="{clean_id}.pdf"',
"Cache-Control": "public, max-age=3600",
},
)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="arXiv 请求超时")
except httpx.RequestError as exc:
Expand Down Expand Up @@ -328,17 +338,33 @@ def analyze_paper_figures(
def _analyze_fn(progress_callback=None):
from packages.ai.figure_service import FigureService

if progress_callback:
progress_callback("正在提取图表...", 10, 100)
svc = FigureService()
results = svc.analyze_paper_figures(paper_id, pdf_path, max_figures)

total_figures = len(results)
if progress_callback and total_figures > 0:
progress_callback(f"正在生成解读 ({total_figures} 个图表)...", 50, 100)

# 分析完成后,从 DB 获取带 id 的完整结果
from packages.ai.figure_service import FigureService as FS2

items = FS2.get_paper_analyses(paper_id)
for item in items:
for i, item in enumerate(items):
if item.get("has_image"):
item["image_url"] = f"/papers/{paper_id}/figures/{item['id']}/image"
else:
item["image_url"] = None
if progress_callback:
progress_callback(
f"解读中 ({i + 1}/{total_figures})...",
50 + int((i + 1) / total_figures * 45),
100,
)

if progress_callback:
progress_callback("图表分析完成", 95, 100)
return {"paper_id": str(paper_id), "count": len(items), "items": items}

task_id = global_tracker.submit(
Expand Down
6 changes: 3 additions & 3 deletions apps/api/routers/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
def run_skim(paper_id: UUID) -> dict:
tid = f"skim_{paper_id.hex[:8]}"
title = get_paper_title(paper_id) or str(paper_id)[:8]
global_tracker.start(tid, "skim", f"粗读: {title[:30]}", total=1)
global_tracker.start(tid, "skim", f"粗读{title[:30]}", total=1, category="analysis")
try:
skim = pipelines.skim(paper_id)
global_tracker.finish(tid, success=True)
Expand All @@ -40,7 +40,7 @@ def run_skim(paper_id: UUID) -> dict:
def run_deep(paper_id: UUID) -> dict:
tid = f"deep_{paper_id.hex[:8]}"
title = get_paper_title(paper_id) or str(paper_id)[:8]
global_tracker.start(tid, "deep_read", f"精读: {title[:30]}", total=1)
global_tracker.start(tid, "deep_read", f"精读{title[:30]}", total=1, category="analysis")
try:
deep = pipelines.deep_dive(paper_id)
global_tracker.finish(tid, success=True)
Expand All @@ -54,7 +54,7 @@ def run_deep(paper_id: UUID) -> dict:
def run_embed(paper_id: UUID) -> dict:
tid = f"embed_{paper_id.hex[:8]}"
title = get_paper_title(paper_id) or str(paper_id)[:8]
global_tracker.start(tid, "embed", f"嵌入: {title[:30]}", total=1)
global_tracker.start(tid, "embed", f"嵌入{title[:30]}", total=1, category="analysis")
try:
pipelines.embed_paper(paper_id)
global_tracker.finish(tid, success=True)
Expand Down
Loading