From e9aff5ee6f852027b3cc622721fba557a6b6b7fc Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 23 Feb 2026 18:20:49 +0800 Subject: [PATCH 1/2] test: add diarization unit tests and clean pycache artifacts --- .gitignore | 2 + qa_system/README.md | 33 +++++++++ qa_system/__init__.py | 0 qa_system/app.py | 82 ++++++++++++++++++++++ qa_system/services/__init__.py | 0 qa_system/services/diarization.py | 60 +++++++++++++++++ qa_system/services/transcription.py | 101 ++++++++++++++++++++++++++++ qa_system/static/script.js | 42 ++++++++++++ qa_system/static/style.css | 63 +++++++++++++++++ qa_system/templates/index.html | 48 +++++++++++++ qa_system/tests/test_diarization.py | 55 +++++++++++++++ 11 files changed, 486 insertions(+) create mode 100644 .gitignore create mode 100644 qa_system/README.md create mode 100644 qa_system/__init__.py create mode 100644 qa_system/app.py create mode 100644 qa_system/services/__init__.py create mode 100644 qa_system/services/diarization.py create mode 100644 qa_system/services/transcription.py create mode 100644 qa_system/static/script.js create mode 100644 qa_system/static/style.css create mode 100644 qa_system/templates/index.html create mode 100644 qa_system/tests/test_diarization.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43ae0e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.py[cod] diff --git a/qa_system/README.md b/qa_system/README.md new file mode 100644 index 0000000..5c1eb7b --- /dev/null +++ b/qa_system/README.md @@ -0,0 +1,33 @@ +# 金融销售电话质检系统(二次开发版) + +该目录是基于原项目核心能力(FunASR 离线转写 + 说话人分离)构建的 Web 版质检原型,面向“电话下单录音质检”场景。 + +## 功能 + +1. 支持录音文件批量上传。 +2. 支持说话人分离(默认 FunASR CAM++,并预留 pyannote 扩展位)。 +3. 支持普通离线自动转写。 +4. 输出说话人日志(speaker + 时间戳 + 文本),用于 sales/customer 角色分析。 + +## 启动 + +```bash +pip install fastapi uvicorn jinja2 python-multipart +# 以及原项目依赖 +pip install -U funasr modelscope ffmpeg-python pydub torch psutil + +uvicorn qa_system.app:app --host 0.0.0.0 --port 8000 --reload +``` + +浏览器访问:`http://127.0.0.1:8000` + +## 架构 + +- `app.py`: Web 接口、任务调度、状态轮询。 +- `services/transcription.py`: 离线转写主流程(FunASR + ffmpeg)。 +- `services/diarization.py`: 说话人分离抽象层(CAM++ / pyannote)。 +- `templates/ + static/`: 前端交互页面。 + +## pyannote 说明 + +当前代码中保留了 pyannote 接口抽象,但默认未启用完整 Pipeline 初始化。生产接入时请按你们私有环境增加 HuggingFace Token、模型与缓存策略。 diff --git a/qa_system/__init__.py b/qa_system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qa_system/app.py b/qa_system/app.py new file mode 100644 index 0000000..7f42b02 --- /dev/null +++ b/qa_system/app.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from pathlib import Path +from uuid import uuid4 + +from fastapi import FastAPI, File, Form, UploadFile +from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from starlette.requests import Request + +from qa_system.services.transcription import OfflineTranscriptionService + +BASE_DIR = Path(__file__).resolve().parent +UPLOAD_DIR = BASE_DIR / "uploads" +RESULT_DIR = BASE_DIR / "results" +UPLOAD_DIR.mkdir(exist_ok=True) +RESULT_DIR.mkdir(exist_ok=True) + +app = FastAPI(title="金融销售电话质检系统") +app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") +templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) +executor = ThreadPoolExecutor(max_workers=2) +service = OfflineTranscriptionService() +jobs: dict[str, dict] = {} + + +@app.get("/", response_class=HTMLResponse) +def index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.post("/api/jobs") +async def create_job( + files: list[UploadFile] = File(...), + diarization: str = Form("campp"), + merge_threshold_chars: int = Form(12), + hotwords: str = Form(""), +): + job_id = uuid4().hex + job_dir = UPLOAD_DIR / job_id + job_dir.mkdir(exist_ok=True) + + source_paths: list[str] = [] + for file in files: + save_path = job_dir / file.filename + content = await file.read() + save_path.write_bytes(content) + source_paths.append(str(save_path)) + + jobs[job_id] = {"status": "running", "created_at": datetime.now().isoformat()} + + def _run() -> None: + try: + service.diarizer.strategy = diarization + transcripts = service.transcribe_batch( + source_paths=source_paths, + merge_threshold_chars=merge_threshold_chars, + hotwords=hotwords, + ) + result_file = RESULT_DIR / f"{job_id}.txt" + dialogue_logs = [service.render_dialogue_log(t) for t in transcripts] + result_file.write_text("\n\n".join(dialogue_logs), encoding="utf-8") + jobs[job_id] = { + "status": "done", + "result": result_file.read_text(encoding="utf-8"), + "files": [t.source_file for t in transcripts], + } + except Exception as exc: + jobs[job_id] = {"status": "failed", "error": str(exc)} + + executor.submit(_run) + return JSONResponse({"job_id": job_id, "status": jobs[job_id]["status"]}) + + +@app.get("/api/jobs/{job_id}") +def get_job(job_id: str): + if job_id not in jobs: + return JSONResponse({"error": "job not found"}, status_code=404) + return JSONResponse(jobs[job_id]) diff --git a/qa_system/services/__init__.py b/qa_system/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/qa_system/services/diarization.py b/qa_system/services/diarization.py new file mode 100644 index 0000000..dfb1b91 --- /dev/null +++ b/qa_system/services/diarization.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from dataclasses import dataclass +from importlib import import_module +from importlib.util import find_spec +from typing import List + + +@dataclass +class SpeakerSegment: + speaker: str + start_ms: int + end_ms: int + text: str + + +class Diarizer: + """Diarization abstraction supporting FunASR CAM++ first, then pyannote.""" + + def __init__(self, strategy: str = "campp") -> None: + self.strategy = strategy + + def diarize_with_funasr(self, sentence_info: list[dict]) -> List[SpeakerSegment]: + segments: List[SpeakerSegment] = [] + for sentence in sentence_info: + segments.append( + SpeakerSegment( + speaker=f"speaker{sentence['spk']}", + start_ms=int(sentence["start"]), + end_ms=int(sentence["end"]), + text=sentence["text"], + ) + ) + return segments + + def diarize_with_pyannote(self, audio_path: str) -> List[SpeakerSegment]: + if find_spec("pyannote.audio") is None: + raise RuntimeError("pyannote.audio 未安装,无法使用 pyannote 方案") + + pipeline_module = import_module("pyannote.audio") + Pipeline = getattr(pipeline_module, "Pipeline") + + raise NotImplementedError( + "pyannote 方案需要 HuggingFace Token 与模型配置,请在生产环境接入后启用。" + f"当前输入文件: {audio_path}" + ) + + def merge_adjacent(self, segments: List[SpeakerSegment], merge_threshold_chars: int = 12) -> List[SpeakerSegment]: + if not segments: + return [] + + merged = [segments[0]] + for current in segments[1:]: + previous = merged[-1] + if current.speaker == previous.speaker and len(previous.text) < merge_threshold_chars: + previous.text += current.text + previous.end_ms = current.end_ms + else: + merged.append(current) + return merged diff --git a/qa_system/services/transcription.py b/qa_system/services/transcription.py new file mode 100644 index 0000000..f2efdab --- /dev/null +++ b/qa_system/services/transcription.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path +from typing import Iterable, List + +import ffmpeg +import psutil +import torch +from funasr import AutoModel + +from qa_system.services.diarization import Diarizer, SpeakerSegment + + +@dataclass +class TranscriptResult: + source_file: str + full_text: str + segments: List[SpeakerSegment] + + +class OfflineTranscriptionService: + def __init__(self, model_root: Path | None = None, diarization_strategy: str = "campp") -> None: + home = Path.home() + base = model_root or home / ".cache" / "modelscope" / "hub" / "models" / "iic" + self.model = AutoModel( + model=str(base / "speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"), + model_revision="v2.0.4", + vad_model=str(base / "speech_fsmn_vad_zh-cn-16k-common-pytorch"), + vad_model_revision="v2.0.4", + punc_model=str(base / "punc_ct-transformer_zh-cn-common-vocab272727-pytorch"), + punc_model_revision="v2.0.4", + spk_model=str(base / "speech_campplus_sv_zh-cn_16k-common"), + spk_model_revision="v2.0.4", + ngpu=1 if torch.cuda.is_available() else 0, + ncpu=psutil.cpu_count(), + disable_pbar=True, + disable_log=True, + disable_update=True, + ) + self.diarizer = Diarizer(strategy=diarization_strategy) + + @staticmethod + def _to_time(ms: int) -> str: + d = timedelta(milliseconds=ms) + return f"{d.seconds // 3600:02d}:{(d.seconds // 60) % 60:02d}:{d.seconds % 60:02d}.{d.microseconds // 1000:03d}" + + @staticmethod + def _iter_audio_files(paths: Iterable[str]) -> Iterable[str]: + support_ext = {".mp3", ".m4a", ".aac", ".ogg", ".wav", ".flac", ".wma", ".aif", ".mp4", ".avi", ".mov", ".mkv"} + for path in paths: + p = Path(path) + if p.is_file() and p.suffix.lower() in support_ext: + yield str(p) + if p.is_dir(): + for child in p.rglob("*"): + if child.is_file() and child.suffix.lower() in support_ext: + yield str(child) + + def transcribe_batch(self, source_paths: Iterable[str], merge_threshold_chars: int = 12, hotwords: str = "") -> List[TranscriptResult]: + results: List[TranscriptResult] = [] + for audio in self._iter_audio_files(source_paths): + audio_bytes, _ = ( + ffmpeg.input(audio, threads=0) + .output("-", format="wav", acodec="pcm_s16le", ac=1, ar=16000) + .run(cmd=["ffmpeg", "-nostdin"], capture_stdout=True, capture_stderr=True) + ) + + infer_result = self.model.generate( + input=audio_bytes, + batch_size_s=300, + is_final=True, + sentence_timestamp=True, + hotword=hotwords, + )[0] + + sentence_info = infer_result.get("sentence_info", []) + if self.diarizer.strategy == "pyannote": + segments = self.diarizer.diarize_with_pyannote(audio) + else: + segments = self.diarizer.diarize_with_funasr(sentence_info) + segments = self.diarizer.merge_adjacent(segments, merge_threshold_chars) + results.append( + TranscriptResult( + source_file=audio, + full_text=infer_result.get("text", ""), + segments=segments, + ) + ) + return results + + @classmethod + def render_dialogue_log(cls, transcript: TranscriptResult) -> str: + rows = [f"# 文件: {os.path.basename(transcript.source_file)}"] + for seg in transcript.segments: + start = cls._to_time(seg.start_ms) + end = cls._to_time(seg.end_ms) + rows.append(f"{seg.speaker} [{start} --> {end}]:{seg.text}") + return "\n".join(rows) diff --git a/qa_system/static/script.js b/qa_system/static/script.js new file mode 100644 index 0000000..5cabaff --- /dev/null +++ b/qa_system/static/script.js @@ -0,0 +1,42 @@ +const form = document.getElementById("job-form"); +const statusEl = document.getElementById("status"); +const resultEl = document.getElementById("result"); + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function pollJob(jobId) { + while (true) { + const response = await fetch(`/api/jobs/${jobId}`); + const data = await response.json(); + statusEl.textContent = JSON.stringify(data, null, 2); + + if (data.status === "done") { + resultEl.textContent = data.result || "任务完成,但没有文本输出"; + return; + } + + if (data.status === "failed") { + resultEl.textContent = `任务失败:${data.error || "未知错误"}`; + return; + } + + await sleep(1500); + } +} + +form.addEventListener("submit", async (event) => { + event.preventDefault(); + resultEl.textContent = "处理中,请稍候..."; + + const payload = new FormData(form); + const response = await fetch("/api/jobs", { method: "POST", body: payload }); + const data = await response.json(); + statusEl.textContent = JSON.stringify(data, null, 2); + + if (!data.job_id) { + resultEl.textContent = `任务创建失败:${JSON.stringify(data)}`; + return; + } + + await pollJob(data.job_id); +}); diff --git a/qa_system/static/style.css b/qa_system/static/style.css new file mode 100644 index 0000000..3d77977 --- /dev/null +++ b/qa_system/static/style.css @@ -0,0 +1,63 @@ +:root { + color-scheme: light; + font-family: "Inter", "PingFang SC", "Microsoft YaHei", sans-serif; + --bg: #f5f7fb; + --card: #ffffff; + --text: #19253a; + --primary: #2a6df5; + --border: #dbe5f4; +} + +body { + margin: 0; + background: linear-gradient(135deg, #eff4ff 0%, var(--bg) 45%, #ebfff9 100%); + color: var(--text); +} + +.container { + max-width: 980px; + margin: 24px auto; + padding: 0 16px; + display: grid; + gap: 16px; +} + +.card { + background: var(--card); + border: 1px solid var(--border); + border-radius: 14px; + padding: 20px; + box-shadow: 0 8px 24px rgba(35, 66, 132, 0.08); +} + +h1, h2 { margin-top: 0; } + +form { + display: grid; + gap: 10px; +} + +input, select, button { + border-radius: 10px; + border: 1px solid #cad8ef; + padding: 10px 12px; + font-size: 14px; +} + +button { + border: none; + background: var(--primary); + color: #fff; + cursor: pointer; +} + +pre { + margin: 0; + background: #0f1729; + color: #d8f4ff; + border-radius: 10px; + padding: 12px; + max-height: 340px; + overflow: auto; + white-space: pre-wrap; +} diff --git a/qa_system/templates/index.html b/qa_system/templates/index.html new file mode 100644 index 0000000..eb4cbf7 --- /dev/null +++ b/qa_system/templates/index.html @@ -0,0 +1,48 @@ + + + + + + 金融销售电话质检系统 + + + +
+
+

金融销售电话质检系统(离线)

+

上传录音文件后执行离线转写 + 说话人分离,输出可用于 sales/customer 对话分析的日志。

+ +
+ + + + + + + + + + + + + +
+
+ +
+

任务状态

+
尚未开始
+
+ +
+

说话人日志

+
等待结果...
+
+
+ + + + diff --git a/qa_system/tests/test_diarization.py b/qa_system/tests/test_diarization.py new file mode 100644 index 0000000..23055ed --- /dev/null +++ b/qa_system/tests/test_diarization.py @@ -0,0 +1,55 @@ +import unittest + +from qa_system.services.diarization import Diarizer, SpeakerSegment + + +class TestDiarizer(unittest.TestCase): + def test_diarize_with_funasr_maps_fields(self): + diarizer = Diarizer() + sentence_info = [ + {"spk": 1, "start": 0, "end": 1200, "text": "你好"}, + {"spk": 2, "start": 1200, "end": 2400, "text": "请问"}, + ] + + segments = diarizer.diarize_with_funasr(sentence_info) + + self.assertEqual(2, len(segments)) + self.assertEqual("speaker1", segments[0].speaker) + self.assertEqual(0, segments[0].start_ms) + self.assertEqual(1200, segments[0].end_ms) + self.assertEqual("你好", segments[0].text) + self.assertEqual("speaker2", segments[1].speaker) + + def test_merge_adjacent_merges_same_speaker_when_short(self): + diarizer = Diarizer() + merged = diarizer.merge_adjacent( + [ + SpeakerSegment("speaker1", 0, 1000, "短句"), + SpeakerSegment("speaker1", 1000, 1800, "继续"), + SpeakerSegment("speaker2", 1800, 2600, "切换"), + ], + merge_threshold_chars=10, + ) + + self.assertEqual(2, len(merged)) + self.assertEqual("短句继续", merged[0].text) + self.assertEqual(1800, merged[0].end_ms) + self.assertEqual("speaker2", merged[1].speaker) + + def test_merge_adjacent_keeps_long_text_unmerged(self): + diarizer = Diarizer() + merged = diarizer.merge_adjacent( + [ + SpeakerSegment("speaker1", 0, 1000, "这是一条比较长的句子"), + SpeakerSegment("speaker1", 1000, 1800, "后续"), + ], + merge_threshold_chars=4, + ) + + self.assertEqual(2, len(merged)) + self.assertEqual("这是一条比较长的句子", merged[0].text) + self.assertEqual("后续", merged[1].text) + + +if __name__ == "__main__": + unittest.main() From c282281702050ba56e4d9600c08e138d23ecae0d Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 23 Feb 2026 21:31:14 +0800 Subject: [PATCH 2/2] feat: add MCP tool server design for speaker-aware transcription --- qa_system/ARCHITECTURE_ANALYSIS.md | 101 ++++++++++++++++++++++++++++ qa_system/MCP_INTEGRATION_DESIGN.md | 46 +++++++++++++ qa_system/README.md | 28 ++++++++ qa_system/mcp_tool_server.py | 58 ++++++++++++++++ 4 files changed, 233 insertions(+) create mode 100644 qa_system/ARCHITECTURE_ANALYSIS.md create mode 100644 qa_system/MCP_INTEGRATION_DESIGN.md create mode 100644 qa_system/mcp_tool_server.py diff --git a/qa_system/ARCHITECTURE_ANALYSIS.md b/qa_system/ARCHITECTURE_ANALYSIS.md new file mode 100644 index 0000000..80da50e --- /dev/null +++ b/qa_system/ARCHITECTURE_ANALYSIS.md @@ -0,0 +1,101 @@ +# qa_system 代码结构与链路分析 + +## 1. UNDERSTAND(理解) +项目 `qa_system` 的核心问题是: +- 在**本地离线**条件下,把上传的录音文件进行 ASR 转写; +- 结合说话人分离能力,把文本按说话人切分; +- 以可质检的日志格式输出,支持后续销售/客户话术分析。 + +它本质上是一个“Web 任务壳 + 离线语音处理引擎”的组合系统。 + +## 2. ANALYZE(分析) + +### 2.1 分层结构 +- 接入层(HTTP + 页面):`qa_system/app.py`、`templates/index.html`、`static/script.js` +- 领域服务层(转写编排):`services/transcription.py` +- 说话人策略层(可替换实现):`services/diarization.py` +- 测试层:`tests/test_diarization.py` + +### 2.2 关键对象 +- `OfflineTranscriptionService` + - 负责模型初始化(FunASR `AutoModel`) + - 批处理音频、调用 ffmpeg 统一采样、执行转写 + - 调用 `Diarizer` 进行 speaker 归因与段落合并 +- `Diarizer` + - `campp`:基于 FunASR 返回的 `sentence_info` 映射 + - `pyannote`:保留扩展入口,当前抛出 `NotImplementedError` +- `jobs`(内存字典) + - job 状态机:`running -> done/failed` + - 结果缓存与轮询查询 + +### 2.3 外部依赖角色 +- `FastAPI`:接口与模板渲染 +- `ffmpeg-python`:音频预处理(转 16k 单声道 wav) +- `funasr` + `torch`:离线转写与 speaker 信息生成 +- `psutil`:CPU 核数探测 + +## 3. REASON(推理) + +### 3.1 端到端调用链(主链路) +1. 前端提交表单(多文件 + diarization + merge_threshold + hotwords)到 `POST /api/jobs`。 +2. 后端保存上传文件到 `uploads//`,生成 job 记录并异步投递线程池。 +3. 后台线程调用 `service.transcribe_batch(...)`: + - 遍历可识别音/视频文件; + - 每个文件先经 ffmpeg 统一解码; + - 调用 FunASR `generate` 获取 `text` 和 `sentence_info`; + - 根据策略分支: + - `campp`:从 `sentence_info` 直接映射 speaker 段; + - `pyannote`:当前未落地,会报错; + - 对短句相邻同 speaker 段做合并; + - 产出 `TranscriptResult`。 +4. 后端将每个文件转成可读日志(`speaker + 时间戳 + 文本`),写入 `results/.txt`。 +5. 前端轮询 `GET /api/jobs/{job_id}`,直到 `done`/`failed`,渲染结果文本。 + +### 3.2 数据流(字段级) +- 输入:浏览器 `multipart/form-data` + - `files[]` + - `diarization` + - `merge_threshold_chars` + - `hotwords` +- 中间:`jobs[job_id]` + - 初始:`status=running` + - 成功:`status=done, result, files` + - 失败:`status=failed, error` +- 模型中间态: + - `infer_result.text` + - `infer_result.sentence_info[]`(含 `spk/start/end/text`) +- 输出: + - API JSON(轮询态) + - 落盘文本日志(质检可读) + +### 3.3 当前工程权衡 +- 优点: + - 结构清晰、接口简洁、离线可运行; + - speaker 策略做了抽象,有后续扩展位; + - 前后端耦合低(纯 JSON 轮询)。 +- 风险/限制: + - `jobs` 在内存中,进程重启丢失任务; + - 线程池仅 2 worker,吞吐受限; + - `service.diarizer.strategy` 在并发下会被全局覆写(竞态风险); + - `pyannote` 分支尚未实现; + - 缺少端到端集成测试与异常可观测性(日志/指标)。 + +## 4. SYNTHESIZE(综合) + +### 4.1 当前应用结构可以理解为 +- **控制平面**:FastAPI + job 状态管理 + 页面轮询 +- **数据平面**:ffmpeg 预处理 + FunASR 推理 + diarizer 后处理 +- **输出平面**:文本日志与 JSON 状态 + +### 4.2 建议的工程化增强路径(按优先级) +1. 并发安全:将 `diarization` 作为 `transcribe_batch` 的入参,而不是修改共享 service 实例属性。 +2. 任务持久化:把 `jobs` 落到 SQLite/Redis,支持重启恢复。 +3. 任务队列化:引入后台队列(RQ/Celery)替代进程内线程池。 +4. 可观测性:增加结构化日志、任务耗时、失败原因分类。 +5. pyannote 落地:补齐 token、模型缓存与 pipeline 初始化。 + +## 5. CONCLUDE(总结) +`qa_system` 已具备一个可用的离线语音质检原型骨架: +- 上传 -> 异步转写 -> speaker 分离 -> 日志输出闭环完整; +- 代码边界(Web / 转写 / diarization)基本合理; +- 后续应优先补“并发安全 + 持久化 + pyannote 实现”,即可从原型走向稳定可运维版本。 diff --git a/qa_system/MCP_INTEGRATION_DESIGN.md b/qa_system/MCP_INTEGRATION_DESIGN.md new file mode 100644 index 0000000..8639b8b --- /dev/null +++ b/qa_system/MCP_INTEGRATION_DESIGN.md @@ -0,0 +1,46 @@ +# qa_system 封装为 MCP Tool 的设计方案 + +## 1. 目标 +把现有离线 ASR + 说话人分离服务封装成一个 MCP Tool,供外部 Agent/LLM 通过自然语言触发: + +- 输入示例:`帮我转录一下录音文件“/data/audios/call_001.wav”` +- 输出:带 speaker 标签与时间戳的说话人日志文本 + +## 2. 工具设计 + +### 2.1 Tool 名称 +- `transcribe_audio_to_speaker_log` + +### 2.2 输入契约 +- `prompt: str`(必填):自然语言指令,内部自动提取音频路径 +- `diarization: str = "campp"` +- `merge_threshold_chars: int = 12` +- `hotwords: str = ""` + +### 2.3 输出契约 +- `str`: + - `# 文件: xxx.wav` + - `speaker1 [00:00:00.000 --> 00:00:01.230]:...` + +## 3. 处理链路 +1. MCP Agent 调用 tool,传入自然语言 prompt。 +2. Tool 从 prompt 中提取文件路径(优先引号内容,其次绝对/相对路径 token)。 +3. 校验路径有效性。 +4. 调用 `OfflineTranscriptionService.transcribe_batch([path], ...)`。 +5. 用 `render_dialogue_log` 生成最终文本并返回给 Agent/LLM。 + +## 4. 当前实现说明 +- 代码文件:`qa_system/mcp_tool_server.py` +- 使用 `FastMCP` 注册 tool。 +- 采用单例 `OfflineTranscriptionService` + 互斥锁,避免并发下策略切换冲突。 + +## 5. 工程化建议 +1. 安全:新增路径白名单(例如仅允许 `/data/uploads`)。 +2. 稳定性:把 diarization 策略改为调用级参数,避免修改共享对象状态。 +3. 可扩展:新增批量工具 `transcribe_batch_to_speaker_logs(paths: list[str])`。 +4. 结果结构化:返回 JSON(文件、段落数组、耗时)并附带纯文本版本。 + +## 6. 与现有 Web 服务关系 +- Web 版(FastAPI)用于人工上传与可视化轮询。 +- MCP 版用于程序化调用(Agent/LLM)。 +- 两者复用同一套核心服务层(`services/transcription.py` + `services/diarization.py`)。 diff --git a/qa_system/README.md b/qa_system/README.md index 5c1eb7b..88df03c 100644 --- a/qa_system/README.md +++ b/qa_system/README.md @@ -31,3 +31,31 @@ uvicorn qa_system.app:app --host 0.0.0.0 --port 8000 --reload ## pyannote 说明 当前代码中保留了 pyannote 接口抽象,但默认未启用完整 Pipeline 初始化。生产接入时请按你们私有环境增加 HuggingFace Token、模型与缓存策略。 + + +## MCP Tool 封装(供外部 Agent/LLM 调用) + +新增 `qa_system/mcp_tool_server.py`,可将离线转写能力直接暴露为 MCP Tool: + +- Tool 名称:`transcribe_audio_to_speaker_log` +- 典型调用:`帮我转录一下录音文件“/path/to/audio.wav”` +- 返回:说话人区分的日志文本(`speaker + 时间戳 + 文本`) + +### 运行 + +```bash +pip install mcp +python -m qa_system.mcp_tool_server +``` + +### Tool 入参 + +- `prompt: str`:自然语言指令,支持从引号中提取路径。 +- `diarization: str = "campp"`:`campp` 或 `pyannote`。 +- `merge_threshold_chars: int = 12`:相邻同 speaker 合并阈值。 +- `hotwords: str = ""`:热词,空格分隔。 + +### 使用建议 + +- 生产场景建议仅开放白名单目录,避免任意路径读取。 +- 当前 `pyannote` 仍需你们私有环境配置后启用。 diff --git a/qa_system/mcp_tool_server.py b/qa_system/mcp_tool_server.py new file mode 100644 index 0000000..1e8b693 --- /dev/null +++ b/qa_system/mcp_tool_server.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import threading +from pathlib import Path + +from mcp.server.fastmcp import FastMCP + +from qa_system.services.transcription import OfflineTranscriptionService + +mcp = FastMCP("qa-system-asr") +_service_lock = threading.Lock() +_service = OfflineTranscriptionService() + + +def _extract_path_from_prompt(prompt: str) -> str: + quote_chars = ['"', '“', '”', "'", '‘', '’'] + for q in quote_chars: + if q in prompt: + parts = prompt.split(q) + if len(parts) >= 3 and parts[1].strip(): + return parts[1].strip() + + for token in prompt.split(): + if token.startswith(("/", "./", "../")): + return token.strip(",。,.!!") + + return prompt.strip() + + +@mcp.tool() +def transcribe_audio_to_speaker_log( + prompt: str, + diarization: str = "campp", + merge_threshold_chars: int = 12, + hotwords: str = "", +) -> str: + """根据自然语言 prompt 转写本地录音,并返回说话人日志文本。""" + audio_path = _extract_path_from_prompt(prompt) + path = Path(audio_path).expanduser().resolve() + if not path.exists() or not path.is_file(): + raise ValueError(f"音频文件不存在或不是文件: {path}") + + with _service_lock: + _service.diarizer.strategy = diarization + results = _service.transcribe_batch( + source_paths=[str(path)], + merge_threshold_chars=merge_threshold_chars, + hotwords=hotwords, + ) + + if not results: + raise RuntimeError(f"未识别到可处理的音频文件: {path}") + + return _service.render_dialogue_log(results[0]) + + +if __name__ == "__main__": + mcp.run()