From 06c59541bfda2078b7e9098c920f29e9741d7b80 Mon Sep 17 00:00:00 2001 From: linuxuser Date: Thu, 7 May 2026 15:28:54 +0800 Subject: [PATCH] Incrementally resume long-PDF ingestion via cached PageIndex doc_id --- openkb/cli.py | 100 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 17 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 7352fa71..b4d39cbb 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -128,7 +128,7 @@ def _find_kb_dir(override: Path | None = None) -> Path | None: return None -def add_single_file(file_path: Path, kb_dir: Path) -> None: +def add_single_file(file_path: Path, kb_dir: Path) -> bool: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -136,10 +136,13 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: 2. Convert the document (hash-check; skip if already known). 3. If long doc: run PageIndex then compile_long_doc. 4. Else: compile_short_doc. + + Returns: + bool: True when processing completed or was skipped as already-known, + False on conversion/indexing/compilation error. """ from openkb.agent.compiler import compile_long_doc, compile_short_doc from openkb.state import HashRegistry - logger = logging.getLogger(__name__) openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") @@ -154,32 +157,60 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) - return + return False if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - return + return True doc_name = file_path.stem # 3/4. Index and compile if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return + jobs_path = openkb_dir / "long_pdf_jobs.json" + jobs = _load_long_pdf_jobs(jobs_path) + job_key = result.file_hash or str(result.raw_path) + job = jobs.get(job_key, {}) + reused_index = False + + if job.get("status") == "indexed" and job.get("doc_id"): + reused_index = True + index_doc_id = job["doc_id"] + index_description = job.get("description", "") + click.echo(f" Long document detected — reusing indexed doc_id={index_doc_id}.") + else: + click.echo(f" Long document detected — indexing with PageIndex...") + try: + from openkb.indexer import index_long_document + index_result = index_long_document(result.raw_path, kb_dir) + except Exception as exc: + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + job["status"] = "index_failed" + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = str(exc) + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) + return False + index_doc_id = index_result.doc_id + index_description = index_result.description + job["status"] = "indexed" + job["doc_id"] = index_doc_id + job["description"] = index_description + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = "" + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") + click.echo(f" Compiling long doc (doc_id={index_doc_id})...") for attempt in range(2): try: asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) + compile_long_doc(doc_name, summary_path, index_doc_id, kb_dir, model, + doc_description=index_description) ) break except Exception as exc: @@ -189,7 +220,17 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: else: click.echo(f" [ERROR] Compilation failed: {exc}") logger.debug("Compilation traceback:", exc_info=True) - return + job["status"] = "compile_failed" + job["doc_name"] = doc_name + job["doc_id"] = index_doc_id + job["description"] = index_description + job["updated_at"] = int(time.time()) + job["last_error"] = str(exc) + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) + return False + if reused_index: + click.echo(" [OK] Reused existing PDF index.") else: click.echo(f" Compiling short doc...") for attempt in range(2): @@ -203,15 +244,40 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: else: click.echo(f" [ERROR] Compilation failed: {exc}") logger.debug("Compilation traceback:", exc_info=True) - return + return False # Register hash only after successful compilation if result.file_hash: doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") registry.add(result.file_hash, {"name": file_path.name, "type": doc_type}) + if result.is_long_doc: + jobs_path = openkb_dir / "long_pdf_jobs.json" + jobs = _load_long_pdf_jobs(jobs_path) + job = jobs.get(result.file_hash, {}) + job["status"] = "done" + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = "" + jobs[result.file_hash] = job + _save_long_pdf_jobs(jobs_path, jobs) append_log(kb_dir / "wiki", "ingest", file_path.name) click.echo(f" [OK] {file_path.name} added to knowledge base.") + return True + + +def _load_long_pdf_jobs(path: Path) -> dict: + if path.exists(): + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + pass + return {} + + +def _save_long_pdf_jobs(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2), encoding="utf-8") # ---------------------------------------------------------------------------