Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
*.py[cod]
101 changes: 101 additions & 0 deletions qa_system/ARCHITECTURE_ANALYSIS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# qa_system 代码结构与链路分析

## 1. UNDERSTAND(理解)
项目 `qa_system` 的核心问题是:
- 在**本地离线**条件下,把上传的录音文件进行 ASR 转写;
- 结合说话人分离能力,把文本按说话人切分;
- 以可质检的日志格式输出,支持后续销售/客户话术分析。

它本质上是一个“Web 任务壳 + 离线语音处理引擎”的组合系统。

## 2. ANALYZE(分析)

### 2.1 分层结构
- 接入层(HTTP + 页面):`qa_system/app.py`、`templates/index.html`、`static/script.js`
- 领域服务层(转写编排):`services/transcription.py`
- 说话人策略层(可替换实现):`services/diarization.py`
- 测试层:`tests/test_diarization.py`

### 2.2 关键对象
- `OfflineTranscriptionService`
- 负责模型初始化(FunASR `AutoModel`)
- 批处理音频、调用 ffmpeg 统一采样、执行转写
- 调用 `Diarizer` 进行 speaker 归因与段落合并
- `Diarizer`
- `campp`:基于 FunASR 返回的 `sentence_info` 映射
- `pyannote`:保留扩展入口,当前抛出 `NotImplementedError`
- `jobs`(内存字典)
- job 状态机:`running -> done/failed`
- 结果缓存与轮询查询

### 2.3 外部依赖角色
- `FastAPI`:接口与模板渲染
- `ffmpeg-python`:音频预处理(转 16k 单声道 wav)
- `funasr` + `torch`:离线转写与 speaker 信息生成
- `psutil`:CPU 核数探测

## 3. REASON(推理)

### 3.1 端到端调用链(主链路)
1. 前端提交表单(多文件 + diarization + merge_threshold + hotwords)到 `POST /api/jobs`。
2. 后端保存上传文件到 `uploads/<job_id>/`,生成 job 记录并异步投递线程池。
3. 后台线程调用 `service.transcribe_batch(...)`:
- 遍历可识别音/视频文件;
- 每个文件先经 ffmpeg 统一解码;
- 调用 FunASR `generate` 获取 `text` 和 `sentence_info`;
- 根据策略分支:
- `campp`:从 `sentence_info` 直接映射 speaker 段;
- `pyannote`:当前未落地,会报错;
- 对短句相邻同 speaker 段做合并;
- 产出 `TranscriptResult`。
4. 后端将每个文件转成可读日志(`speaker + 时间戳 + 文本`),写入 `results/<job_id>.txt`。
5. 前端轮询 `GET /api/jobs/{job_id}`,直到 `done`/`failed`,渲染结果文本。

### 3.2 数据流(字段级)
- 输入:浏览器 `multipart/form-data`
- `files[]`
- `diarization`
- `merge_threshold_chars`
- `hotwords`
- 中间:`jobs[job_id]`
- 初始:`status=running`
- 成功:`status=done, result, files`
- 失败:`status=failed, error`
- 模型中间态:
- `infer_result.text`
- `infer_result.sentence_info[]`(含 `spk/start/end/text`)
- 输出:
- API JSON(轮询态)
- 落盘文本日志(质检可读)

### 3.3 当前工程权衡
- 优点:
- 结构清晰、接口简洁、离线可运行;
- speaker 策略做了抽象,有后续扩展位;
- 前后端耦合低(纯 JSON 轮询)。
- 风险/限制:
- `jobs` 在内存中,进程重启丢失任务;
- 线程池仅 2 worker,吞吐受限;
- `service.diarizer.strategy` 在并发下会被全局覆写(竞态风险);
- `pyannote` 分支尚未实现;
- 缺少端到端集成测试与异常可观测性(日志/指标)。

## 4. SYNTHESIZE(综合)

### 4.1 当前应用结构可以理解为
- **控制平面**:FastAPI + job 状态管理 + 页面轮询
- **数据平面**:ffmpeg 预处理 + FunASR 推理 + diarizer 后处理
- **输出平面**:文本日志与 JSON 状态

### 4.2 建议的工程化增强路径(按优先级)
1. 并发安全:将 `diarization` 作为 `transcribe_batch` 的入参,而不是修改共享 service 实例属性。
2. 任务持久化:把 `jobs` 落到 SQLite/Redis,支持重启恢复。
3. 任务队列化:引入后台队列(RQ/Celery)替代进程内线程池。
4. 可观测性:增加结构化日志、任务耗时、失败原因分类。
5. pyannote 落地:补齐 token、模型缓存与 pipeline 初始化。

## 5. CONCLUDE(总结)
`qa_system` 已具备一个可用的离线语音质检原型骨架:
- 上传 -> 异步转写 -> speaker 分离 -> 日志输出闭环完整;
- 代码边界(Web / 转写 / diarization)基本合理;
- 后续应优先补“并发安全 + 持久化 + pyannote 实现”,即可从原型走向稳定可运维版本。
46 changes: 46 additions & 0 deletions qa_system/MCP_INTEGRATION_DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# qa_system 封装为 MCP Tool 的设计方案

## 1. 目标
把现有离线 ASR + 说话人分离服务封装成一个 MCP Tool,供外部 Agent/LLM 通过自然语言触发:

- 输入示例:`帮我转录一下录音文件“/data/audios/call_001.wav”`
- 输出:带 speaker 标签与时间戳的说话人日志文本

## 2. 工具设计

### 2.1 Tool 名称
- `transcribe_audio_to_speaker_log`

### 2.2 输入契约
- `prompt: str`(必填):自然语言指令,内部自动提取音频路径
- `diarization: str = "campp"`
- `merge_threshold_chars: int = 12`
- `hotwords: str = ""`

### 2.3 输出契约
- `str`:
- `# 文件: xxx.wav`
- `speaker1 [00:00:00.000 --> 00:00:01.230]:...`

## 3. 处理链路
1. MCP Agent 调用 tool,传入自然语言 prompt。
2. Tool 从 prompt 中提取文件路径(优先引号内容,其次绝对/相对路径 token)。
3. 校验路径有效性。
4. 调用 `OfflineTranscriptionService.transcribe_batch([path], ...)`。
5. 用 `render_dialogue_log` 生成最终文本并返回给 Agent/LLM。

## 4. 当前实现说明
- 代码文件:`qa_system/mcp_tool_server.py`
- 使用 `FastMCP` 注册 tool。
- 采用单例 `OfflineTranscriptionService` + 互斥锁,避免并发下策略切换冲突。

## 5. 工程化建议
1. 安全:新增路径白名单(例如仅允许 `/data/uploads`)。
2. 稳定性:把 diarization 策略改为调用级参数,避免修改共享对象状态。
3. 可扩展:新增批量工具 `transcribe_batch_to_speaker_logs(paths: list[str])`。
4. 结果结构化:返回 JSON(文件、段落数组、耗时)并附带纯文本版本。

## 6. 与现有 Web 服务关系
- Web 版(FastAPI)用于人工上传与可视化轮询。
- MCP 版用于程序化调用(Agent/LLM)。
- 两者复用同一套核心服务层(`services/transcription.py` + `services/diarization.py`)。
61 changes: 61 additions & 0 deletions qa_system/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# 金融销售电话质检系统(二次开发版)

该目录是基于原项目核心能力(FunASR 离线转写 + 说话人分离)构建的 Web 版质检原型,面向“电话下单录音质检”场景。

## 功能

1. 支持录音文件批量上传。
2. 支持说话人分离(默认 FunASR CAM++,并预留 pyannote 扩展位)。
3. 支持普通离线自动转写。
4. 输出说话人日志(speaker + 时间戳 + 文本),用于 sales/customer 角色分析。

## 启动

```bash
pip install fastapi uvicorn jinja2 python-multipart
# 以及原项目依赖
pip install -U funasr modelscope ffmpeg-python pydub torch psutil

uvicorn qa_system.app:app --host 0.0.0.0 --port 8000 --reload
```

浏览器访问:`http://127.0.0.1:8000`

## 架构

- `app.py`: Web 接口、任务调度、状态轮询。
- `services/transcription.py`: 离线转写主流程(FunASR + ffmpeg)。
- `services/diarization.py`: 说话人分离抽象层(CAM++ / pyannote)。
- `templates/ + static/`: 前端交互页面。

## pyannote 说明

当前代码中保留了 pyannote 接口抽象,但默认未启用完整 Pipeline 初始化。生产接入时请按你们私有环境增加 HuggingFace Token、模型与缓存策略。


## MCP Tool 封装(供外部 Agent/LLM 调用)

新增 `qa_system/mcp_tool_server.py`,可将离线转写能力直接暴露为 MCP Tool:

- Tool 名称:`transcribe_audio_to_speaker_log`
- 典型调用:`帮我转录一下录音文件“/path/to/audio.wav”`
- 返回:说话人区分的日志文本(`speaker + 时间戳 + 文本`)

### 运行

```bash
pip install mcp
python -m qa_system.mcp_tool_server
```

### Tool 入参

- `prompt: str`:自然语言指令,支持从引号中提取路径。
- `diarization: str = "campp"`:`campp` 或 `pyannote`。
- `merge_threshold_chars: int = 12`:相邻同 speaker 合并阈值。
- `hotwords: str = ""`:热词,空格分隔。

### 使用建议

- 生产场景建议仅开放白名单目录,避免任意路径读取。
- 当前 `pyannote` 仍需你们私有环境配置后启用。
Empty file added qa_system/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions qa_system/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from uuid import uuid4

from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.requests import Request

from qa_system.services.transcription import OfflineTranscriptionService

BASE_DIR = Path(__file__).resolve().parent
UPLOAD_DIR = BASE_DIR / "uploads"
RESULT_DIR = BASE_DIR / "results"
UPLOAD_DIR.mkdir(exist_ok=True)
RESULT_DIR.mkdir(exist_ok=True)

app = FastAPI(title="金融销售电话质检系统")
app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static")
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
executor = ThreadPoolExecutor(max_workers=2)
service = OfflineTranscriptionService()
jobs: dict[str, dict] = {}


@app.get("/", response_class=HTMLResponse)
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})


@app.post("/api/jobs")
async def create_job(
files: list[UploadFile] = File(...),
diarization: str = Form("campp"),
merge_threshold_chars: int = Form(12),
hotwords: str = Form(""),
):
job_id = uuid4().hex
job_dir = UPLOAD_DIR / job_id
job_dir.mkdir(exist_ok=True)

source_paths: list[str] = []
for file in files:
save_path = job_dir / file.filename
content = await file.read()
save_path.write_bytes(content)
source_paths.append(str(save_path))

jobs[job_id] = {"status": "running", "created_at": datetime.now().isoformat()}

def _run() -> None:
try:
service.diarizer.strategy = diarization
transcripts = service.transcribe_batch(
source_paths=source_paths,
merge_threshold_chars=merge_threshold_chars,
hotwords=hotwords,
)
result_file = RESULT_DIR / f"{job_id}.txt"
dialogue_logs = [service.render_dialogue_log(t) for t in transcripts]
result_file.write_text("\n\n".join(dialogue_logs), encoding="utf-8")
jobs[job_id] = {
"status": "done",
"result": result_file.read_text(encoding="utf-8"),
"files": [t.source_file for t in transcripts],
}
except Exception as exc:
jobs[job_id] = {"status": "failed", "error": str(exc)}

executor.submit(_run)
return JSONResponse({"job_id": job_id, "status": jobs[job_id]["status"]})


@app.get("/api/jobs/{job_id}")
def get_job(job_id: str):
if job_id not in jobs:
return JSONResponse({"error": "job not found"}, status_code=404)
return JSONResponse(jobs[job_id])
58 changes: 58 additions & 0 deletions qa_system/mcp_tool_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from __future__ import annotations

import threading
from pathlib import Path

from mcp.server.fastmcp import FastMCP

from qa_system.services.transcription import OfflineTranscriptionService

mcp = FastMCP("qa-system-asr")
_service_lock = threading.Lock()
_service = OfflineTranscriptionService()


def _extract_path_from_prompt(prompt: str) -> str:
quote_chars = ['"', '“', '”', "'", '‘', '’']
for q in quote_chars:
if q in prompt:
parts = prompt.split(q)
if len(parts) >= 3 and parts[1].strip():
return parts[1].strip()

for token in prompt.split():
if token.startswith(("/", "./", "../")):
return token.strip(",。,.!!")

return prompt.strip()


@mcp.tool()
def transcribe_audio_to_speaker_log(
prompt: str,
diarization: str = "campp",
merge_threshold_chars: int = 12,
hotwords: str = "",
) -> str:
"""根据自然语言 prompt 转写本地录音,并返回说话人日志文本。"""
audio_path = _extract_path_from_prompt(prompt)
path = Path(audio_path).expanduser().resolve()
if not path.exists() or not path.is_file():
raise ValueError(f"音频文件不存在或不是文件: {path}")

with _service_lock:
_service.diarizer.strategy = diarization
results = _service.transcribe_batch(
source_paths=[str(path)],
merge_threshold_chars=merge_threshold_chars,
hotwords=hotwords,
)

if not results:
raise RuntimeError(f"未识别到可处理的音频文件: {path}")

return _service.render_dialogue_log(results[0])


if __name__ == "__main__":
mcp.run()
Empty file added qa_system/services/__init__.py
Empty file.
Loading