diff --git a/openkb/cli.py b/openkb/cli.py index 7352fa71..f2ae63d1 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -128,7 +128,16 @@ def _find_kb_dir(override: Path | None = None) -> Path | None: return None -def add_single_file(file_path: Path, kb_dir: Path) -> None: +def _close_litellm_async_clients() -> None: + """Best-effort cleanup of cached LiteLLM async clients.""" + try: + asyncio.run(litellm.close_litellm_async_clients()) + except Exception: + # Cleanup must never break main workflow. + pass + + +def add_single_file(file_path: Path, kb_dir: Path) -> bool: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -136,10 +145,13 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: 2. Convert the document (hash-check; skip if already known). 3. If long doc: run PageIndex then compile_long_doc. 4. Else: compile_short_doc. + + Returns: + bool: True when processing completed or was skipped as already-known, + False on conversion/indexing/compilation error. """ from openkb.agent.compiler import compile_long_doc, compile_short_doc from openkb.state import HashRegistry - logger = logging.getLogger(__name__) openkb_dir = kb_dir / ".openkb" config = load_config(openkb_dir / "config.yaml") @@ -154,33 +166,95 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) - return + return False if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") - return + return True - doc_name = file_path.stem + doc_name = result.raw_path.stem if result.raw_path is not None else file_path.stem # 3/4. Index and compile if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return + jobs_path = openkb_dir / "long_pdf_jobs.json" + jobs = _load_long_pdf_jobs(jobs_path) + job_key = result.file_hash or str(result.raw_path) + job = jobs.get(job_key, {}) + reused_index = False + + if job.get("status") == "indexed" and job.get("doc_id"): + reused_index = True + index_doc_id = job["doc_id"] + index_description = job.get("description", "") + click.echo(f" Long document detected — reusing indexed doc_id={index_doc_id}.") + else: + click.echo(f" Long document detected — indexing with PageIndex...") + try: + from openkb.indexer import index_long_document + try: + index_result = index_long_document(result.raw_path, kb_dir) + finally: + _close_litellm_async_clients() + except Exception as exc: + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + click.echo(" Falling back to short-document conversion for this PDF...") + try: + from openkb.images import convert_pdf_with_images + sources_dir = kb_dir / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = sources_dir / "images" / doc_name + images_dir.mkdir(parents=True, exist_ok=True) + markdown = convert_pdf_with_images(result.raw_path, doc_name, images_dir) + source_md = sources_dir / f"{doc_name}.md" + source_md.write_text(markdown, encoding="utf-8") + try: + asyncio.run(compile_short_doc(doc_name, source_md, kb_dir, model)) + finally: + _close_litellm_async_clients() + except Exception as fallback_exc: + click.echo(f" [ERROR] Fallback short-doc conversion failed: {fallback_exc}") + logger.debug("Fallback conversion traceback:", exc_info=True) + job["status"] = "index_failed" + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = f"{exc} | fallback_failed: {fallback_exc}" + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) + return False + + job["status"] = "fallback_short_doc" + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = str(exc) + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) + click.echo(" [OK] Added via short-doc fallback.") + if result.file_hash: + registry.add(result.file_hash, {"name": result.raw_path.name, "type": "pdf"}) + return True + index_doc_id = index_result.doc_id + index_description = index_result.description + job["status"] = "indexed" + job["doc_id"] = index_doc_id + job["description"] = index_description + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = "" + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") + click.echo(f" Compiling long doc (doc_id={index_doc_id})...") for attempt in range(2): try: - asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) - ) + try: + asyncio.run( + compile_long_doc(doc_name, summary_path, index_doc_id, kb_dir, model, + doc_description=index_description) + ) + finally: + _close_litellm_async_clients() break except Exception as exc: if attempt == 0: @@ -189,12 +263,25 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: else: click.echo(f" [ERROR] Compilation failed: {exc}") logger.debug("Compilation traceback:", exc_info=True) - return + job["status"] = "compile_failed" + job["doc_name"] = doc_name + job["doc_id"] = index_doc_id + job["description"] = index_description + job["updated_at"] = int(time.time()) + job["last_error"] = str(exc) + jobs[job_key] = job + _save_long_pdf_jobs(jobs_path, jobs) + return False + if reused_index: + click.echo(" [OK] Reused existing PDF index.") else: click.echo(f" Compiling short doc...") for attempt in range(2): try: - asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) + try: + asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) + finally: + _close_litellm_async_clients() break except Exception as exc: if attempt == 0: @@ -203,15 +290,40 @@ def add_single_file(file_path: Path, kb_dir: Path) -> None: else: click.echo(f" [ERROR] Compilation failed: {exc}") logger.debug("Compilation traceback:", exc_info=True) - return + return False # Register hash only after successful compilation if result.file_hash: doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") registry.add(result.file_hash, {"name": file_path.name, "type": doc_type}) + if result.is_long_doc: + jobs_path = openkb_dir / "long_pdf_jobs.json" + jobs = _load_long_pdf_jobs(jobs_path) + job = jobs.get(result.file_hash, {}) + job["status"] = "done" + job["doc_name"] = doc_name + job["updated_at"] = int(time.time()) + job["last_error"] = "" + jobs[result.file_hash] = job + _save_long_pdf_jobs(jobs_path, jobs) append_log(kb_dir / "wiki", "ingest", file_path.name) click.echo(f" [OK] {file_path.name} added to knowledge base.") + return True + + +def _load_long_pdf_jobs(path: Path) -> dict: + if path.exists(): + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + pass + return {} + + +def _save_long_pdf_jobs(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2), encoding="utf-8") # --------------------------------------------------------------------------- diff --git a/openkb/converter.py b/openkb/converter.py index 3f5f5299..12c804aa 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -8,6 +8,7 @@ import pymupdf from markitdown import MarkItDown +from openpyxl import load_workbook from openkb.config import load_config from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images @@ -27,12 +28,70 @@ class ConvertResult: file_hash: str | None = None # For deferred hash registration +def _sanitize_filename(name: str) -> str: + """Normalize filename quirks that can break downstream indexing/matching.""" + cleaned = name.strip() + cleaned = " ".join(cleaned.split()) + return cleaned or name + + def get_pdf_page_count(path: Path) -> int: """Return the number of pages in the PDF at *path* using pymupdf.""" with pymupdf.open(str(path)) as doc: return doc.page_count +def convert_xlsx_streaming(path: Path, *, max_rows: int = 5000, max_cols: int = 64) -> str: + """Convert .xlsx to markdown using a memory-safe streaming reader. + + This avoids large in-memory DataFrames and pathological worksheet ranges + that can cause extreme RAM usage in generic converters. + """ + wb = load_workbook(filename=str(path), read_only=True, data_only=True) + lines: list[str] = [] + try: + for ws in wb.worksheets: + lines.append(f"# Sheet: {ws.title}") + lines.append("") + rows_written = 0 + empty_streak = 0 + + for row in ws.iter_rows(min_row=1, max_row=max_rows, min_col=1, max_col=max_cols, values_only=True): + vals = [] + non_empty = False + for cell in row: + if cell is None: + vals.append("") + continue + text = str(cell).strip() + vals.append(text) + if text: + non_empty = True + + if not non_empty: + empty_streak += 1 + if rows_written == 0: + continue + # Stop after sustained empty tail to avoid scanning sparse sheets. + if empty_streak >= 200: + break + else: + empty_streak = 0 + + if non_empty: + lines.append(" | ".join(vals).rstrip()) + rows_written += 1 + + if rows_written == 0: + lines.append("(No non-empty cells found within scan limits.)") + + lines.append("") + finally: + wb.close() + + return "\n".join(lines) + + def convert_document(src: Path, kb_dir: Path) -> ConvertResult: """Convert a document and integrate it into the knowledge base. @@ -65,7 +124,7 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: # ------------------------------------------------------------------ raw_dir = kb_dir / "raw" raw_dir.mkdir(parents=True, exist_ok=True) - raw_dest = raw_dir / src.name + raw_dest = raw_dir / _sanitize_filename(src.name) if raw_dest.resolve() != src.resolve(): shutil.copy2(src, raw_dest) @@ -91,7 +150,7 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: images_dir = kb_dir / "wiki" / "sources" / "images" / src.stem images_dir.mkdir(parents=True, exist_ok=True) - doc_name = src.stem + doc_name = raw_dest.stem if src.suffix.lower() == ".md": markdown = src.read_text(encoding="utf-8") @@ -99,6 +158,8 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: elif src.suffix.lower() == ".pdf": # Use pymupdf dict-mode for PDFs: text + images inline at correct positions markdown = convert_pdf_with_images(src, doc_name, images_dir) + elif src.suffix.lower() == ".xlsx": + markdown = convert_xlsx_streaming(src) else: # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) mid = MarkItDown() diff --git a/openkb/indexer.py b/openkb/indexer.py index 6ea9d73f..e788a672 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -3,6 +3,7 @@ import json as json_mod import logging +import gc from dataclasses import dataclass from pathlib import Path @@ -17,6 +18,21 @@ logger = logging.getLogger(__name__) +def _close_pageindex_client(client: PageIndexClient | None) -> None: + """Best-effort cleanup for PageIndex local backend resources.""" + if client is None: + return + try: + backend = getattr(client, "_backend", None) + storage = getattr(backend, "_storage", None) + close_fn = getattr(storage, "close", None) + if callable(close_fn): + close_fn() + except Exception: + # Cleanup failure should never mask indexing errors. + pass + + @dataclass class IndexResult: """Result of indexing a long document via PageIndex.""" @@ -40,74 +56,87 @@ def index_long_document(pdf_path: Path, kb_dir: Path) -> IndexResult: if_add_doc_description=True, ) - client = PageIndexClient( - api_key=pageindex_api_key or None, - model=model, - storage_path=str(openkb_dir), - index_config=index_config, - ) - col = client.collection() - # Add PDF (retry up to 3 times — PageIndex TOC accuracy is stochastic) max_retries = 3 doc_id = None + client: PageIndexClient | None = None + col = None for attempt in range(1, max_retries + 1): try: + client = PageIndexClient( + api_key=pageindex_api_key or None, + model=model, + storage_path=str(openkb_dir), + index_config=index_config, + ) + col = client.collection() doc_id = col.add(str(pdf_path)) logger.info("PageIndex added %s → doc_id=%s (attempt %d)", pdf_path.name, doc_id, attempt) break except Exception as exc: - logger.warning("PageIndex attempt %d/%d failed for %s: %s", attempt, max_retries, pdf_path.name, exc) + logger.exception( + "PageIndex attempt %d/%d failed for %s", + attempt, + max_retries, + pdf_path.name, + ) + _close_pageindex_client(client) + client = None + col = None + gc.collect() if attempt == max_retries: raise RuntimeError(f"Failed to index {pdf_path.name} after {max_retries} attempts: {exc}") from exc - # Fetch complete document (metadata + structure + text) - doc = col.get_document(doc_id, include_text=True) - doc_name: str = doc.get("doc_name", pdf_path.stem) - description: str = doc.get("doc_description", "") - structure: list = doc.get("structure", []) - - # Debug: print doc keys and page_count to diagnose get_page_content range - logger.info("Doc keys: %s", list(doc.keys())) - logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) - - tree = { - "doc_name": doc_name, - "doc_description": description, - "structure": structure, - } - - # Write wiki/sources/ — per-page content - sources_dir = kb_dir / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = sources_dir / "images" / pdf_path.stem - - from openkb.images import convert_pdf_to_pages - - all_pages: list = [] - if pageindex_api_key: - # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content - # requires a page range, so pass "1-N". - from openkb.converter import get_pdf_page_count - page_count = get_pdf_page_count(pdf_path) - try: - all_pages = col.get_page_content(doc_id, f"1-{page_count}") - except Exception as exc: - logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) + try: + # Fetch complete document (metadata + structure + text) + doc = col.get_document(doc_id, include_text=True) + doc_name: str = doc.get("doc_name", pdf_path.stem) + description: str = doc.get("doc_description", "") + structure: list = doc.get("structure", []) - if not all_pages: - if pageindex_api_key: - logger.warning("Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name) - all_pages = convert_pdf_to_pages(pdf_path, pdf_path.stem, images_dir) + # Debug: print doc keys and page_count to diagnose get_page_content range + logger.info("Doc keys: %s", list(doc.keys())) + logger.info("page_count from doc: %s", doc.get("page_count", "NOT PRESENT")) - (sources_dir / f"{pdf_path.stem}.json").write_text( - json_mod.dumps(all_pages, ensure_ascii=False, indent=2), encoding="utf-8", - ) + tree = { + "doc_name": doc_name, + "doc_description": description, + "structure": structure, + } - # Write wiki/summaries/ (no images, just summaries) - summaries_dir = kb_dir / "wiki" / "summaries" - summaries_dir.mkdir(parents=True, exist_ok=True) - summary_md = render_summary_md(tree, pdf_path.stem, doc_id) - (summaries_dir / f"{pdf_path.stem}.md").write_text(summary_md, encoding="utf-8") + # Write wiki/sources/ — per-page content + sources_dir = kb_dir / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = sources_dir / "images" / pdf_path.stem - return IndexResult(doc_id=doc_id, description=description, tree=tree) + from openkb.images import convert_pdf_to_pages + + all_pages: list = [] + if pageindex_api_key: + # Cloud mode: fetch OCR'd markdown from PageIndex. get_page_content + # requires a page range, so pass "1-N". + from openkb.converter import get_pdf_page_count + page_count = get_pdf_page_count(pdf_path) + try: + all_pages = col.get_page_content(doc_id, f"1-{page_count}") + except Exception as exc: + logger.warning("Cloud get_page_content failed for %s: %s", pdf_path.name, exc) + + if not all_pages: + if pageindex_api_key: + logger.warning("Cloud returned no pages for %s; falling back to local pymupdf", pdf_path.name) + all_pages = convert_pdf_to_pages(pdf_path, pdf_path.stem, images_dir) + + (sources_dir / f"{pdf_path.stem}.json").write_text( + json_mod.dumps(all_pages, ensure_ascii=False, indent=2), encoding="utf-8", + ) + + # Write wiki/summaries/ (no images, just summaries) + summaries_dir = kb_dir / "wiki" / "summaries" + summaries_dir.mkdir(parents=True, exist_ok=True) + summary_md = render_summary_md(tree, pdf_path.stem, doc_id) + (summaries_dir / f"{pdf_path.stem}.md").write_text(summary_md, encoding="utf-8") + + return IndexResult(doc_id=doc_id, description=description, tree=tree) + finally: + _close_pageindex_client(client)