Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions openkb/agent/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ def _build_style(use_color: bool) -> Style:


def _fmt(style: Style, *fragments: tuple[str, str]) -> None:
# prompt_toolkit's print_formatted_text constructs a Win32Output on
# Windows that requires a real console handle, raising
# NoConsoleScreenBufferError when stdout is a pipe, file, or captured
# subprocess stream. Fall back to plain text when the output isn't a
# usable console.
if not _use_color(force_off=False):
for _, text in fragments:
sys.stdout.write(text)
sys.stdout.flush()
return
print_formatted_text(FormattedText(list(fragments)), style=style, end="")


Expand Down
17 changes: 16 additions & 1 deletion openkb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import asyncio
import json
import logging
import sys
import time
from pathlib import Path

Expand Down Expand Up @@ -357,6 +358,17 @@ def add(ctx, path):
add_single_file(target, kb_dir)


def _stream_to_tty() -> bool:
"""Return True when stdout is a real terminal.

Used to auto-disable streaming output when ``openkb query`` is piped,
redirected to a file, or run as a subprocess — streaming output emits
interleaved tool-call lines that are noisy for non-interactive callers,
and the non-streaming branch returns just the final answer string.
"""
return sys.stdout.isatty()


@cli.command()
@click.argument("question")
@click.option("--save", is_flag=True, default=False, help="Save the answer to wiki/explorations/.")
Expand All @@ -380,8 +392,11 @@ def query(ctx, question, save, raw):
_setup_llm_key(kb_dir)
model: str = config.get("model", DEFAULT_CONFIG["model"])

stream = _stream_to_tty()
try:
answer = asyncio.run(run_query(question, kb_dir, model, stream=True, raw=raw))
answer = asyncio.run(run_query(question, kb_dir, model, stream=stream, raw=raw))
if not stream and answer:
click.echo(answer)
except Exception as exc:
click.echo(f"[ERROR] Query failed: {exc}")
return
Expand Down
48 changes: 48 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,51 @@ def test_init_already_exists(tmp_path):
result = runner.invoke(cli, ["init"])
assert result.exit_code == 0
assert "already initialized" in result.output


class TestQueryStreamGate:
"""Regression tests for issue #34.

`openkb query` should auto-disable streaming when stdout isn't a TTY
(pipes, redirects, captured subprocess streams, MCP stdio transport),
so non-interactive callers get the clean final answer instead of an
interleave of tool-call telemetry and answer tokens.
"""

@staticmethod
def _capture_run_query(captured):
async def fake(*_args, **kwargs):
captured.update(kwargs)
return "the answer"
return fake

def test_query_disables_stream_when_stdout_is_not_tty(self, kb_dir):
captured: dict = {}
with patch("openkb.cli._stream_to_tty", return_value=False), \
patch("openkb.agent.query.run_query", side_effect=self._capture_run_query(captured)), \
patch("openkb.cli._setup_llm_key"), \
patch("openkb.cli.append_log"):
result = CliRunner().invoke(
cli, ["--kb-dir", str(kb_dir), "query", "what is X?"]
)

assert result.exit_code == 0, result.output
assert captured["stream"] is False
# Non-stream branch must still print the answer
assert "the answer" in result.output

def test_query_enables_stream_when_stdout_is_tty(self, kb_dir):
captured: dict = {}
with patch("openkb.cli._stream_to_tty", return_value=True), \
patch("openkb.agent.query.run_query", side_effect=self._capture_run_query(captured)), \
patch("openkb.cli._setup_llm_key"), \
patch("openkb.cli.append_log"):
result = CliRunner().invoke(
cli, ["--kb-dir", str(kb_dir), "query", "what is X?"]
)

assert result.exit_code == 0, result.output
assert captured["stream"] is True
# Stream branch should NOT echo the answer again — run_query already
# wrote tokens to stdout as they arrived.
assert "the answer" not in result.output
67 changes: 67 additions & 0 deletions tests/test_query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for openkb.agent.query (Task 11)."""
from __future__ import annotations

import io
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -70,3 +72,68 @@ async def fake_run(agent, message, **kwargs):
await run_query("How does attention work?", tmp_path, "gpt-4o-mini")

assert "How does attention work?" in captured["message"]


class TestFmtFallback:
"""Regression tests for issue #34.

`_fmt` must not invoke prompt_toolkit's `print_formatted_text` when the
output stream cannot drive a console (non-TTY stdout, or `NO_COLOR=1`).
On Windows, `print_formatted_text` constructs a `Win32Output` that
requires a real console handle and crashes with `NoConsoleScreenBufferError`
when stdout is a pipe, file, or captured subprocess stream.
"""

@staticmethod
def _boom(*_args, **_kwargs):
raise AssertionError(
"print_formatted_text must not run when output is not a TTY"
)

def test_fmt_falls_back_when_stdout_is_not_tty(self, monkeypatch):
from openkb.agent import chat

monkeypatch.setattr(chat, "print_formatted_text", self._boom)
buf = io.StringIO() # StringIO.isatty() returns False
monkeypatch.setattr(sys, "stdout", buf)

style = chat._build_style(use_color=False)
chat._fmt(style, ("class:tool", "hello"), ("class:tool", " world\n"))

assert buf.getvalue() == "hello world\n"

def test_fmt_falls_back_when_no_color_env(self, monkeypatch):
from openkb.agent import chat

monkeypatch.setattr(chat, "print_formatted_text", self._boom)

fake_tty = io.StringIO()
fake_tty.isatty = lambda: True # type: ignore[method-assign]
monkeypatch.setattr(sys, "stdout", fake_tty)
monkeypatch.setenv("NO_COLOR", "1")

style = chat._build_style(use_color=False)
chat._fmt(style, ("class:error", "boom\n"))

assert fake_tty.getvalue() == "boom\n"

def test_fmt_uses_prompt_toolkit_on_real_tty(self, monkeypatch):
from openkb.agent import chat

called = {"count": 0}

def fake_print(*_args, **_kwargs):
called["count"] += 1

monkeypatch.setattr(chat, "print_formatted_text", fake_print)

fake_tty = io.StringIO()
fake_tty.isatty = lambda: True # type: ignore[method-assign]
monkeypatch.setattr(sys, "stdout", fake_tty)
monkeypatch.delenv("NO_COLOR", raising=False)

style = chat._build_style(use_color=True)
chat._fmt(style, ("class:header", "hi\n"))

assert called["count"] == 1
assert fake_tty.getvalue() == ""