diff --git a/openkb/agent/chat.py b/openkb/agent/chat.py index 91455525..9b1afc2f 100644 --- a/openkb/agent/chat.py +++ b/openkb/agent/chat.py @@ -80,6 +80,16 @@ def _build_style(use_color: bool) -> Style: def _fmt(style: Style, *fragments: tuple[str, str]) -> None: + # prompt_toolkit's print_formatted_text constructs a Win32Output on + # Windows that requires a real console handle, raising + # NoConsoleScreenBufferError when stdout is a pipe, file, or captured + # subprocess stream. Fall back to plain text when the output isn't a + # usable console. + if not _use_color(force_off=False): + for _, text in fragments: + sys.stdout.write(text) + sys.stdout.flush() + return print_formatted_text(FormattedText(list(fragments)), style=style, end="") diff --git a/openkb/cli.py b/openkb/cli.py index 7352fa71..8b08d520 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -10,6 +10,7 @@ import asyncio import json import logging +import sys import time from pathlib import Path @@ -357,6 +358,17 @@ def add(ctx, path): add_single_file(target, kb_dir) +def _stream_to_tty() -> bool: + """Return True when stdout is a real terminal. + + Used to auto-disable streaming output when ``openkb query`` is piped, + redirected to a file, or run as a subprocess — streaming output emits + interleaved tool-call lines that are noisy for non-interactive callers, + and the non-streaming branch returns just the final answer string. + """ + return sys.stdout.isatty() + + @cli.command() @click.argument("question") @click.option("--save", is_flag=True, default=False, help="Save the answer to wiki/explorations/.") @@ -380,8 +392,11 @@ def query(ctx, question, save, raw): _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) + stream = _stream_to_tty() try: - answer = asyncio.run(run_query(question, kb_dir, model, stream=True, raw=raw)) + answer = asyncio.run(run_query(question, kb_dir, model, stream=stream, raw=raw)) + if not stream and answer: + click.echo(answer) except Exception as exc: click.echo(f"[ERROR] Query failed: {exc}") return diff --git a/tests/test_cli.py b/tests/test_cli.py index afb961dc..83c3415b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -65,3 +65,51 @@ def test_init_already_exists(tmp_path): result = runner.invoke(cli, ["init"]) assert result.exit_code == 0 assert "already initialized" in result.output + + +class TestQueryStreamGate: + """Regression tests for issue #34. + + `openkb query` should auto-disable streaming when stdout isn't a TTY + (pipes, redirects, captured subprocess streams, MCP stdio transport), + so non-interactive callers get the clean final answer instead of an + interleave of tool-call telemetry and answer tokens. + """ + + @staticmethod + def _capture_run_query(captured): + async def fake(*_args, **kwargs): + captured.update(kwargs) + return "the answer" + return fake + + def test_query_disables_stream_when_stdout_is_not_tty(self, kb_dir): + captured: dict = {} + with patch("openkb.cli._stream_to_tty", return_value=False), \ + patch("openkb.agent.query.run_query", side_effect=self._capture_run_query(captured)), \ + patch("openkb.cli._setup_llm_key"), \ + patch("openkb.cli.append_log"): + result = CliRunner().invoke( + cli, ["--kb-dir", str(kb_dir), "query", "what is X?"] + ) + + assert result.exit_code == 0, result.output + assert captured["stream"] is False + # Non-stream branch must still print the answer + assert "the answer" in result.output + + def test_query_enables_stream_when_stdout_is_tty(self, kb_dir): + captured: dict = {} + with patch("openkb.cli._stream_to_tty", return_value=True), \ + patch("openkb.agent.query.run_query", side_effect=self._capture_run_query(captured)), \ + patch("openkb.cli._setup_llm_key"), \ + patch("openkb.cli.append_log"): + result = CliRunner().invoke( + cli, ["--kb-dir", str(kb_dir), "query", "what is X?"] + ) + + assert result.exit_code == 0, result.output + assert captured["stream"] is True + # Stream branch should NOT echo the answer again — run_query already + # wrote tokens to stdout as they arrived. + assert "the answer" not in result.output diff --git a/tests/test_query.py b/tests/test_query.py index d39fcaaa..4f1f0e0f 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -1,6 +1,8 @@ """Tests for openkb.agent.query (Task 11).""" from __future__ import annotations +import io +import sys from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch @@ -70,3 +72,68 @@ async def fake_run(agent, message, **kwargs): await run_query("How does attention work?", tmp_path, "gpt-4o-mini") assert "How does attention work?" in captured["message"] + + +class TestFmtFallback: + """Regression tests for issue #34. + + `_fmt` must not invoke prompt_toolkit's `print_formatted_text` when the + output stream cannot drive a console (non-TTY stdout, or `NO_COLOR=1`). + On Windows, `print_formatted_text` constructs a `Win32Output` that + requires a real console handle and crashes with `NoConsoleScreenBufferError` + when stdout is a pipe, file, or captured subprocess stream. + """ + + @staticmethod + def _boom(*_args, **_kwargs): + raise AssertionError( + "print_formatted_text must not run when output is not a TTY" + ) + + def test_fmt_falls_back_when_stdout_is_not_tty(self, monkeypatch): + from openkb.agent import chat + + monkeypatch.setattr(chat, "print_formatted_text", self._boom) + buf = io.StringIO() # StringIO.isatty() returns False + monkeypatch.setattr(sys, "stdout", buf) + + style = chat._build_style(use_color=False) + chat._fmt(style, ("class:tool", "hello"), ("class:tool", " world\n")) + + assert buf.getvalue() == "hello world\n" + + def test_fmt_falls_back_when_no_color_env(self, monkeypatch): + from openkb.agent import chat + + monkeypatch.setattr(chat, "print_formatted_text", self._boom) + + fake_tty = io.StringIO() + fake_tty.isatty = lambda: True # type: ignore[method-assign] + monkeypatch.setattr(sys, "stdout", fake_tty) + monkeypatch.setenv("NO_COLOR", "1") + + style = chat._build_style(use_color=False) + chat._fmt(style, ("class:error", "boom\n")) + + assert fake_tty.getvalue() == "boom\n" + + def test_fmt_uses_prompt_toolkit_on_real_tty(self, monkeypatch): + from openkb.agent import chat + + called = {"count": 0} + + def fake_print(*_args, **_kwargs): + called["count"] += 1 + + monkeypatch.setattr(chat, "print_formatted_text", fake_print) + + fake_tty = io.StringIO() + fake_tty.isatty = lambda: True # type: ignore[method-assign] + monkeypatch.setattr(sys, "stdout", fake_tty) + monkeypatch.delenv("NO_COLOR", raising=False) + + style = chat._build_style(use_color=True) + chat._fmt(style, ("class:header", "hi\n")) + + assert called["count"] == 1 + assert fake_tty.getvalue() == ""