From e9aff5ee6f852027b3cc622721fba557a6b6b7fc Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 23 Feb 2026 18:20:49 +0800 Subject: [PATCH 1/2] test: add diarization unit tests and clean pycache artifacts --- .gitignore | 2 + qa_system/README.md | 33 +++++++++ qa_system/__init__.py | 0 qa_system/app.py | 82 ++++++++++++++++++++++ qa_system/services/__init__.py | 0 qa_system/services/diarization.py | 60 +++++++++++++++++ qa_system/services/transcription.py | 101 ++++++++++++++++++++++++++++ qa_system/static/script.js | 42 ++++++++++++ qa_system/static/style.css | 63 +++++++++++++++++ qa_system/templates/index.html | 48 +++++++++++++ qa_system/tests/test_diarization.py | 55 +++++++++++++++ 11 files changed, 486 insertions(+) create mode 100644 .gitignore create mode 100644 qa_system/README.md create mode 100644 qa_system/__init__.py create mode 100644 qa_system/app.py create mode 100644 qa_system/services/__init__.py create mode 100644 qa_system/services/diarization.py create mode 100644 qa_system/services/transcription.py create mode 100644 qa_system/static/script.js create mode 100644 qa_system/static/style.css create mode 100644 qa_system/templates/index.html create mode 100644 qa_system/tests/test_diarization.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43ae0e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.py[cod] diff --git a/qa_system/README.md b/qa_system/README.md new file mode 100644 index 0000000..5c1eb7b --- /dev/null +++ b/qa_system/README.md @@ -0,0 +1,33 @@ +# 金融销售电话质检系统(二次开发版) + +该目录是基于原项目核心能力(FunASR 离线转写 + 说话人分离)构建的 Web 版质检原型,面向“电话下单录音质检”场景。 + +## 功能 + +1. 支持录音文件批量上传。 +2. 支持说话人分离(默认 FunASR CAM++,并预留 pyannote 扩展位)。 +3. 支持普通离线自动转写。 +4. 输出说话人日志(speaker + 时间戳 + 文本),用于 sales/customer 角色分析。 + +## 启动 + +```bash +pip install fastapi uvicorn jinja2 python-multipart +# 以及原项目依赖 +pip install -U funasr modelscope ffmpeg-python pydub torch psutil + +uvicorn qa_system.app:app --host 0.0.0.0 --port 8000 --reload +``` + +浏览器访问:`http://127.0.0.1:8000` + +## 架构 + +- `app.py`: Web 接口、任务调度、状态轮询。 +- `services/transcription.py`: 离线转写主流程(FunASR + ffmpeg)。 +- `services/diarization.py`: 说话人分离抽象层(CAM++ / pyannote)。 +- `templates/ + static/`: 前端交互页面。 + +## pyannote 说明 + +当前代码中保留了 pyannote 接口抽象,但默认未启用完整 Pipeline 初始化。生产接入时请按你们私有环境增加 HuggingFace Token、模型与缓存策略。 diff --git a/qa_system/__init__.py b/qa_system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qa_system/app.py b/qa_system/app.py new file mode 100644 index 0000000..7f42b02 --- /dev/null +++ b/qa_system/app.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from pathlib import Path +from uuid import uuid4 + +from fastapi import FastAPI, File, Form, UploadFile +from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from starlette.requests import Request + +from qa_system.services.transcription import OfflineTranscriptionService + +BASE_DIR = Path(__file__).resolve().parent +UPLOAD_DIR = BASE_DIR / "uploads" +RESULT_DIR = BASE_DIR / "results" +UPLOAD_DIR.mkdir(exist_ok=True) +RESULT_DIR.mkdir(exist_ok=True) + +app = FastAPI(title="金融销售电话质检系统") +app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") +templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) +executor = ThreadPoolExecutor(max_workers=2) +service = OfflineTranscriptionService() +jobs: dict[str, dict] = {} + + +@app.get("/", response_class=HTMLResponse) +def index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.post("/api/jobs") +async def create_job( + files: list[UploadFile] = File(...), + diarization: str = Form("campp"), + merge_threshold_chars: int = Form(12), + hotwords: str = Form(""), +): + job_id = uuid4().hex + job_dir = UPLOAD_DIR / job_id + job_dir.mkdir(exist_ok=True) + + source_paths: list[str] = [] + for file in files: + save_path = job_dir / file.filename + content = await file.read() + save_path.write_bytes(content) + source_paths.append(str(save_path)) + + jobs[job_id] = {"status": "running", "created_at": datetime.now().isoformat()} + + def _run() -> None: + try: + service.diarizer.strategy = diarization + transcripts = service.transcribe_batch( + source_paths=source_paths, + merge_threshold_chars=merge_threshold_chars, + hotwords=hotwords, + ) + result_file = RESULT_DIR / f"{job_id}.txt" + dialogue_logs = [service.render_dialogue_log(t) for t in transcripts] + result_file.write_text("\n\n".join(dialogue_logs), encoding="utf-8") + jobs[job_id] = { + "status": "done", + "result": result_file.read_text(encoding="utf-8"), + "files": [t.source_file for t in transcripts], + } + except Exception as exc: + jobs[job_id] = {"status": "failed", "error": str(exc)} + + executor.submit(_run) + return JSONResponse({"job_id": job_id, "status": jobs[job_id]["status"]}) + + +@app.get("/api/jobs/{job_id}") +def get_job(job_id: str): + if job_id not in jobs: + return JSONResponse({"error": "job not found"}, status_code=404) + return JSONResponse(jobs[job_id]) diff --git a/qa_system/services/__init__.py b/qa_system/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qa_system/services/diarization.py b/qa_system/services/diarization.py new file mode 100644 index 0000000..dfb1b91 --- /dev/null +++ b/qa_system/services/diarization.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from dataclasses import dataclass +from importlib import import_module +from importlib.util import find_spec +from typing import List + + +@dataclass +class SpeakerSegment: + speaker: str + start_ms: int + end_ms: int + text: str + + +class Diarizer: + """Diarization abstraction supporting FunASR CAM++ first, then pyannote.""" + + def __init__(self, strategy: str = "campp") -> None: + self.strategy = strategy + + def diarize_with_funasr(self, sentence_info: list[dict]) -> List[SpeakerSegment]: + segments: List[SpeakerSegment] = [] + for sentence in sentence_info: + segments.append( + SpeakerSegment( + speaker=f"speaker{sentence['spk']}", + start_ms=int(sentence["start"]), + end_ms=int(sentence["end"]), + text=sentence["text"], + ) + ) + return segments + + def diarize_with_pyannote(self, audio_path: str) -> List[SpeakerSegment]: + if find_spec("pyannote.audio") is None: + raise RuntimeError("pyannote.audio 未安装,无法使用 pyannote 方案") + + pipeline_module = import_module("pyannote.audio") + Pipeline = getattr(pipeline_module, "Pipeline") + + raise NotImplementedError( + "pyannote 方案需要 HuggingFace Token 与模型配置,请在生产环境接入后启用。" + f"当前输入文件: {audio_path}" + ) + + def merge_adjacent(self, segments: List[SpeakerSegment], merge_threshold_chars: int = 12) -> List[SpeakerSegment]: + if not segments: + return [] + + merged = [segments[0]] + for current in segments[1:]: + previous = merged[-1] + if current.speaker == previous.speaker and len(previous.text) < merge_threshold_chars: + previous.text += current.text + previous.end_ms = current.end_ms + else: + merged.append(current) + return merged diff --git a/qa_system/services/transcription.py b/qa_system/services/transcription.py new file mode 100644 index 0000000..f2efdab --- /dev/null +++ b/qa_system/services/transcription.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path +from typing import Iterable, List + +import ffmpeg +import psutil +import torch +from funasr import AutoModel + +from qa_system.services.diarization import Diarizer, SpeakerSegment + + +@dataclass +class TranscriptResult: + source_file: str + full_text: str + segments: List[SpeakerSegment] + + +class OfflineTranscriptionService: + def __init__(self, model_root: Path | None = None, diarization_strategy: str = "campp") -> None: + home = Path.home() + base = model_root or home / ".cache" / "modelscope" / "hub" / "models" / "iic" + self.model = AutoModel( + model=str(base / "speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"), + model_revision="v2.0.4", + vad_model=str(base / "speech_fsmn_vad_zh-cn-16k-common-pytorch"), + vad_model_revision="v2.0.4", + punc_model=str(base / "punc_ct-transformer_zh-cn-common-vocab272727-pytorch"), + punc_model_revision="v2.0.4", + spk_model=str(base / "speech_campplus_sv_zh-cn_16k-common"), + spk_model_revision="v2.0.4", + ngpu=1 if torch.cuda.is_available() else 0, + ncpu=psutil.cpu_count(), + disable_pbar=True, + disable_log=True, + disable_update=True, + ) + self.diarizer = Diarizer(strategy=diarization_strategy) + + @staticmethod + def _to_time(ms: int) -> str: + d = timedelta(milliseconds=ms) + return f"{d.seconds // 3600:02d}:{(d.seconds // 60) % 60:02d}:{d.seconds % 60:02d}.{d.microseconds // 1000:03d}" + + @staticmethod + def _iter_audio_files(paths: Iterable[str]) -> Iterable[str]: + support_ext = {".mp3", ".m4a", ".aac", ".ogg", ".wav", ".flac", ".wma", ".aif", ".mp4", ".avi", ".mov", ".mkv"} + for path in paths: + p = Path(path) + if p.is_file() and p.suffix.lower() in support_ext: + yield str(p) + if p.is_dir(): + for child in p.rglob("*"): + if child.is_file() and child.suffix.lower() in support_ext: + yield str(child) + + def transcribe_batch(self, source_paths: Iterable[str], merge_threshold_chars: int = 12, hotwords: str = "") -> List[TranscriptResult]: + results: List[TranscriptResult] = [] + for audio in self._iter_audio_files(source_paths): + audio_bytes, _ = ( + ffmpeg.input(audio, threads=0) + .output("-", format="wav", acodec="pcm_s16le", ac=1, ar=16000) + .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True) + ) + + infer_result = self.model.generate( + input=audio_bytes, + batch_size_s=300, + is_final=True, + sentence_timestamp=True, + hotword=hotwords, + )[0] + + sentence_info = infer_result.get("sentence_info", []) + if self.diarizer.strategy == "pyannote": + segments = self.diarizer.diarize_with_pyannote(audio) + else: + segments = self.diarizer.diarize_with_funasr(sentence_info) + segments = self.diarizer.merge_adjacent(segments, merge_threshold_chars) + results.append( + TranscriptResult( + source_file=audio, + full_text=infer_result.get("text", ""), + segments=segments, + ) + ) + return results + + @classmethod + def render_dialogue_log(cls, transcript: TranscriptResult) -> str: + rows = [f"# 文件: {os.path.basename(transcript.source_file)}"] + for seg in transcript.segments: + start = cls._to_time(seg.start_ms) + end = cls._to_time(seg.end_ms) + rows.append(f"{seg.speaker} [{start} --> {end}]:{seg.text}") + return "\n".join(rows) diff --git a/qa_system/static/script.js b/qa_system/static/script.js new file mode 100644 index 0000000..5cabaff --- /dev/null +++ b/qa_system/static/script.js @@ -0,0 +1,42 @@ +const form = document.getElementById("job-form"); +const statusEl = document.getElementById("status"); +const resultEl = document.getElementById("result"); + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function pollJob(jobId) { + while (true) { + const response = await fetch(`/api/jobs/${jobId}`); + const data = await response.json(); + statusEl.textContent = JSON.stringify(data, null, 2); + + if (data.status === "done") { + resultEl.textContent = data.result || "任务完成,但没有文本输出"; + return; + } + + if (data.status === "failed") { + resultEl.textContent = `任务失败:${data.error || "未知错误"}`; + return; + } + + await sleep(1500); + } +} + +form.addEventListener("submit", async (event) => { + event.preventDefault(); + resultEl.textContent = "处理中,请稍候..."; + + const payload = new FormData(form); + const response = await fetch("/api/jobs", { method: "POST", body: payload }); + const data = await response.json(); + statusEl.textContent = JSON.stringify(data, null, 2); + + if (!data.job_id) { + resultEl.textContent = `任务创建失败:${JSON.stringify(data)}`; + return; + } + + await pollJob(data.job_id); +}); diff --git a/qa_system/static/style.css b/qa_system/static/style.css new file mode 100644 index 0000000..3d77977 --- /dev/null +++ b/qa_system/static/style.css @@ -0,0 +1,63 @@ +:root { + color-scheme: light; + font-family: "Inter", "PingFang SC", "Microsoft YaHei", sans-serif; + --bg: #f5f7fb; + --card: #ffffff; + --text: #19253a; + --primary: #2a6df5; + --border: #dbe5f4; +} + +body { + margin: 0; + background: linear-gradient(135deg, #eff4ff 0%, var(--bg) 45%, #ebfff9 100%); + color: var(--text); +} + +.container { + max-width: 980px; + margin: 24px auto; + padding: 0 16px; + display: grid; + gap: 16px; +} + +.card { + background: var(--card); + border: 1px solid var(--border); + border-radius: 14px; + padding: 20px; + box-shadow: 0 8px 24px rgba(35, 66, 132, 0.08); +} + +h1, h2 { margin-top: 0; } + +form { + display: grid; + gap: 10px; +} + +input, select, button { + border-radius: 10px; + border: 1px solid #cad8ef; + padding: 10px 12px; + font-size: 14px; +} + +button { + border: none; + background: var(--primary); + color: #fff; + cursor: pointer; +} + +pre { + margin: 0; + background: #0f1729; + color: #d8f4ff; + border-radius: 10px; + padding: 12px; + max-height: 340px; + overflow: auto; + white-space: pre-wrap; +} diff --git a/qa_system/templates/index.html b/qa_system/templates/index.html new file mode 100644 index 0000000..eb4cbf7 --- /dev/null +++ b/qa_system/templates/index.html @@ -0,0 +1,48 @@ + + + + + + 金融销售电话质检系统 + + + +
+
+

金融销售电话质检系统(离线)

+

上传录音文件后执行离线转写 + 说话人分离,输出可用于 sales/customer 对话分析的日志。

+ +
+ + + + + + + + + + + + + +
+
+ +
+

任务状态

+
尚未开始
+
+ +
+

说话人日志

+
等待结果...
+
+
+ + + + diff --git a/qa_system/tests/test_diarization.py b/qa_system/tests/test_diarization.py new file mode 100644 index 0000000..23055ed --- /dev/null +++ b/qa_system/tests/test_diarization.py @@ -0,0 +1,55 @@ +import unittest + +from qa_system.services.diarization import Diarizer, SpeakerSegment + + +class TestDiarizer(unittest.TestCase): + def test_diarize_with_funasr_maps_fields(self): + diarizer = Diarizer() + sentence_info = [ + {"spk": 1, "start": 0, "end": 1200, "text": "你好"}, + {"spk": 2, "start": 1200, "end": 2400, "text": "请问"}, + ] + + segments = diarizer.diarize_with_funasr(sentence_info) + + self.assertEqual(2, len(segments)) + self.assertEqual("speaker1", segments[0].speaker) + self.assertEqual(0, segments[0].start_ms) + self.assertEqual(1200, segments[0].end_ms) + self.assertEqual("你好", segments[0].text) + self.assertEqual("speaker2", segments[1].speaker) + + def test_merge_adjacent_merges_same_speaker_when_short(self): + diarizer = Diarizer() + merged = diarizer.merge_adjacent( + [ + SpeakerSegment("speaker1", 0, 1000, "短句"), + SpeakerSegment("speaker1", 1000, 1800, "继续"), + SpeakerSegment("speaker2", 1800, 2600, "切换"), + ], + merge_threshold_chars=10, + ) + + self.assertEqual(2, len(merged)) + self.assertEqual("短句继续", merged[0].text) + self.assertEqual(1800, merged[0].end_ms) + self.assertEqual("speaker2", merged[1].speaker) + + def test_merge_adjacent_keeps_long_text_unmerged(self): + diarizer = Diarizer() + merged = diarizer.merge_adjacent( + [ + SpeakerSegment("speaker1", 0, 1000, "这是一条比较长的句子"), + SpeakerSegment("speaker1", 1000, 1800, "后续"), + ], + merge_threshold_chars=4, + ) + + self.assertEqual(2, len(merged)) + self.assertEqual("这是一条比较长的句子", merged[0].text) + self.assertEqual("后续", merged[1].text) + + +if __name__ == "__main__": + unittest.main() From e98ef254ece4495ce2f4a9f75452b9c3df674759 Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 23 Feb 2026 18:51:25 +0800 Subject: [PATCH 2/2] docs: add qa_system architecture and dataflow analysis --- qa_system/ARCHITECTURE_ANALYSIS.md | 84 ++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 qa_system/ARCHITECTURE_ANALYSIS.md diff --git a/qa_system/ARCHITECTURE_ANALYSIS.md b/qa_system/ARCHITECTURE_ANALYSIS.md new file mode 100644 index 0000000..a860242 --- /dev/null +++ b/qa_system/ARCHITECTURE_ANALYSIS.md @@ -0,0 +1,84 @@ +# QA System 架构与数据链路分析 + +## 1. 系统定位 + +`qa_system` 是一个本地离线语音质检 Web 原型: +- 后端用 FastAPI 提供上传、异步任务、结果轮询接口。 +- 核心能力是 FunASR 离线 ASR + 说话人分离。 +- 前端是单页表单 + 轮询状态,输出说话人日志。 + +## 2. 代码结构 + +- `qa_system/app.py`:应用入口、接口定义、任务调度、结果落盘与状态管理。 +- `qa_system/services/transcription.py`:离线转写总流程(音频预处理、模型推理、说话人分离策略选择、分段合并、日志格式化)。 +- `qa_system/services/diarization.py`:说话人分离抽象层,当前主路径为 FunASR 句级说话人标签映射,预留 pyannote 扩展。 +- `qa_system/templates/index.html`:上传与参数输入页面。 +- `qa_system/static/script.js`:前端提交任务 + 轮询 `/api/jobs/{job_id}`。 +- `qa_system/tests/test_diarization.py`:验证 diarizer 字段映射与合并逻辑。 + +## 3. 核心调用链路 + +1. 浏览器提交表单到 `POST /api/jobs`。 +2. 后端将上传文件保存到 `uploads//`。 +3. 后端在内存 `jobs[job_id]` 写入 `running` 状态,并将实际处理丢给线程池。 +4. 后台线程调用 `OfflineTranscriptionService.transcribe_batch()`: + - 扫描有效音/视频文件; + - ffmpeg 转成单声道 16k WAV 字节; + - 调用 FunASR `generate()` 返回文本与句级时间戳/说话人信息; + - 根据策略做 diarization(campp / pyannote); + - 做相邻同 speaker 短句合并; + - 产出 `TranscriptResult`。 +5. 后端把每个文件渲染为 `speaker + 时间戳 + 文本` 日志,写入 `results/.txt`。 +6. 内存状态更新为 `done`(或 `failed`)。 +7. 前端轮询 `GET /api/jobs/{job_id}`,拿到结果并展示。 + +## 4. 数据流转(输入 -> 中间态 -> 输出) + +### 4.1 输入层 + +- 文件输入:`files`(多文件) +- 参数输入: + - `diarization`(`campp` / `pyannote`) + - `merge_threshold_chars`(相邻同 speaker 合并阈值) + - `hotwords`(ASR 热词) + +### 4.2 处理中间态 + +- 文件落盘:`uploads//` +- 状态内存:`jobs[job_id] = {status, created_at, ...}` +- 音频中间表示:ffmpeg 输出 WAV bytes(单声道/16k) +- 模型中间结果: + - `infer_result["text"]` + - `infer_result["sentence_info"]`(包含 `spk/start/end/text`) +- 领域对象:`SpeakerSegment`、`TranscriptResult` + +### 4.3 输出层 + +- 文本文件:`results/.txt` +- API 输出: + - 创建任务返回 `job_id` + - 查询任务返回 `running/done/failed` + 结果或错误 +- 页面展示:任务状态 JSON + 说话人日志文本 + +## 5. qa_system 的关键设计点 + +- **离线优先**:ASR 主链路不依赖在线服务,模型从本地缓存加载。 +- **策略解耦**:说话人分离通过 `Diarizer` 统一接口封装,默认 CAM++,可切 pyannote。 +- **异步体验**:HTTP 接口快速返回 `job_id`,长任务在线程池执行,前端轮询。 +- **可解释产物**:输出按 speaker + 时间轴组织,便于人工质检或后处理。 + +## 6. 当前限制与工程风险 + +- `jobs` 使用进程内字典,重启丢失、无法多实例共享。 +- 线程任务数量固定(`max_workers=2`),高并发下排队明显。 +- `service.diarizer.strategy` 在共享单例上被请求动态修改,存在并发串扰风险。 +- pyannote 分支目前抛 `NotImplementedError`,前端可选但不可用。 +- 文件读写与任务结果没有生命周期清理策略(磁盘增长风险)。 + +## 7. 建议的演进方向 + +1. 任务状态迁移到持久化存储(SQLite/Redis + 队列)。 +2. 每个任务独立实例化转写服务或把策略作为函数参数透传,避免共享状态。 +3. 对 pyannote 选项做能力探测,不可用时在前端禁用并给出提示。 +4. 增加任务取消、超时、清理与审计日志。 +5. 增加端到端测试(上传 -> 完成 -> 结果格式断言)。