Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 87 additions & 19 deletions src/bub/core/model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import re
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import ClassVar
from typing import Any, ClassVar

from loguru import logger
from republic import Tool, ToolAutoResult
from republic import Tool

from bub.core.router import AssistantRouteResult, InputRouter
from bub.skills.loader import SkillMetadata
Expand Down Expand Up @@ -47,6 +47,7 @@ class ModelRunner:
"""Runs assistant loop over tape with command-aware follow-up handling."""

DEFAULT_HEADERS: ClassVar[dict[str, str]] = {"HTTP-Referer": "https://bub.build/", "X-Title": "Bub"}
SERIAL_TOOL_CALL_PROVIDERS: ClassVar[frozenset[str]] = frozenset({"anthropic", "vertexaianthropic"})

def __init__(
self,
Expand Down Expand Up @@ -166,14 +167,20 @@ async def _chat(self, prompt: str) -> _ChatResult:
system_prompt = self._render_system_prompt()
try:
async with asyncio.timeout(self._model_timeout_seconds):
output = await self._tape.tape.run_tools_async(
prompt=prompt,
system_prompt=system_prompt,
max_tokens=self._max_tokens,
tools=self._tools,
extra_headers=self.DEFAULT_HEADERS,
stream_kwargs: dict[str, Any] = {
"prompt": prompt,
"system_prompt": system_prompt,
"max_tokens": self._max_tokens,
"tools": self._tools,
"extra_headers": self.DEFAULT_HEADERS,
}
if self._needs_serial_tool_calls():
stream_kwargs["parallel_tool_calls"] = False

stream = await self._tape.tape.stream_events_async(
**stream_kwargs,
)
return _ChatResult.from_tool_auto(output)
return await self._read_stream_result(stream)
except TimeoutError:
return _ChatResult(
text="",
Expand All @@ -183,6 +190,31 @@ async def _chat(self, prompt: str) -> _ChatResult:
logger.exception("model.call.error")
return _ChatResult(text="", error=f"model_call_error: {exc!s}")

def _needs_serial_tool_calls(self) -> bool:
provider, separator, _ = self._model.partition(":")
if not separator:
return False
return provider.casefold() in self.SERIAL_TOOL_CALL_PROVIDERS

async def _read_stream_result(self, stream: Any) -> _ChatResult:
final_event: dict[str, Any] | None = None
error_event: dict[str, Any] | None = None
async for event in stream:
event_kind = getattr(event, "kind", None)
event_data = getattr(event, "data", None)
if not isinstance(event_data, dict):
continue
if event_kind == "error":
error_event = event_data
elif event_kind == "final":
final_event = event_data

return _ChatResult.from_stream_events(
final_event=final_event,
stream_error=getattr(stream, "error", None),
error_event=error_event,
)

def _render_system_prompt(self) -> str:
blocks: list[str] = []
if self._base_system_prompt:
Expand Down Expand Up @@ -223,18 +255,54 @@ class _ChatResult:
followup_prompt: str | None = None

@classmethod
def from_tool_auto(cls, output: ToolAutoResult) -> _ChatResult:
if output.kind == "text":
return cls(text=output.text or "")
if output.kind == "tools":
return cls(text="", followup_prompt=TOOL_CONTINUE_PROMPT)

if output.tool_calls or output.tool_results:
def from_stream_events(
cls,
*,
final_event: dict[str, Any] | None,
stream_error: object | None,
error_event: dict[str, Any] | None,
) -> _ChatResult:
if stream_error is not None:
return cls(text="", error=_format_stream_error(stream_error))

if final_event is None:
if error_event is not None:
return cls(text="", error=_format_error_event(error_event))
return cls(text="", error="stream_events_error: missing final event")

if final_event.get("ok") is False or error_event is not None:
return cls(text="", error=_format_error_event(error_event))

if final_event.get("tool_calls") or final_event.get("tool_results"):
return cls(text="", followup_prompt=TOOL_CONTINUE_PROMPT)

if output.error is None:
return cls(text="", error="tool_auto_error: unknown")
return cls(text="", error=f"{output.error.kind.value}: {output.error.message}")
if isinstance(final_text := final_event.get("text"), str):
return cls(text=final_text)

return cls(text="", error="tool_auto_error: unknown")


def _format_stream_error(error: object) -> str:
kind = getattr(error, "kind", None)
message = getattr(error, "message", None)
kind_value = getattr(kind, "value", kind)
if isinstance(kind_value, str) and isinstance(message, str):
return f"{kind_value}: {message}"
if isinstance(message, str):
return message
return str(error)


def _format_error_event(error_event: dict[str, Any] | None) -> str:
if error_event is None:
return "tool_auto_error: unknown"
kind = error_event.get("kind")
message = error_event.get("message")
if isinstance(kind, str) and isinstance(message, str):
return f"{kind}: {message}"
if isinstance(message, str):
return message
return "tool_auto_error: unknown"


def _runtime_contract() -> str:
Expand Down
7 changes: 4 additions & 3 deletions src/bub/skills/telegram/scripts/telegram_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,16 @@ def main():
mention_username = args.source_username

# Send messages
# Use ASCII status tags for better PowerShell rendering.
try:
send_message(bot_token, chat_id, args.message, reply_to, mention_username)
print(f" Message sent successfully to {chat_id} (MarkdownV2)")
print(f"[OK] Message sent successfully to {chat_id} (MarkdownV2)")
except requests.HTTPError as e:
print(f" HTTP Error: {e}")
print(f"[ERROR] HTTP Error: {e}")
print(f" Response: {e.response.text}")
sys.exit(1)
except Exception as e:
print(f"❌ Error: {e}")
print(f"[ERROR] {e}")
sys.exit(1)


Expand Down
110 changes: 106 additions & 4 deletions src/bub/tape/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ def _append_tool_result_entry(
entry: TapeEntry,
) -> None:
results = entry.payload.get("results")
if not isinstance(results, list):
if not isinstance(results, list) or not pending_calls:
return
for index, result in enumerate(results):
paired_count = min(len(results), len(pending_calls))
if paired_count <= 0:
return
if paired_count < len(pending_calls):
_trim_last_tool_call_message(messages, paired_count)
pending_calls = pending_calls[:paired_count]
for index, result in enumerate(results[:paired_count]):
messages.append(_build_tool_result_message(result, pending_calls, index))


Expand All @@ -82,16 +88,112 @@ def _build_tool_result_message(
return message


def _trim_last_tool_call_message(messages: list[dict[str, Any]], count: int) -> None:
if not messages:
return
candidate = messages[-1]
if candidate.get("role") != "assistant":
return
tool_calls = candidate.get("tool_calls")
if not isinstance(tool_calls, list):
return
if count <= 0:
messages.pop()
return
candidate["tool_calls"] = tool_calls[:count]


def _normalize_tool_calls(value: object) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []
calls: list[dict[str, Any]] = []
for item in value:
if isinstance(item, dict):
calls.append(dict(item))
calls.extend(_normalize_tool_call(item))
return calls


def _normalize_tool_call(item: object) -> list[dict[str, Any]]:
if not isinstance(item, dict):
return []

normalized = dict(item)
function = normalized.get("function")
if not isinstance(function, dict):
return []

name = function.get("name")
if not isinstance(name, str) or not name:
return []

raw_arguments = function.get("arguments")
argument_chunks = _normalize_tool_arguments(raw_arguments)
if not argument_chunks:
return []

call_id = normalized.get("id")
calls: list[dict[str, Any]] = []
for index, arguments in enumerate(argument_chunks):
cloned = dict(normalized)
cloned_function = dict(function)
cloned_function["arguments"] = arguments
cloned["function"] = cloned_function
if isinstance(call_id, str) and call_id and index > 0:
cloned["id"] = f"{call_id}__{index + 1}"
calls.append(cloned)
return calls


def _normalize_tool_arguments(value: object) -> list[str]:
if isinstance(value, dict):
return [json.dumps(value, ensure_ascii=False)]
if not isinstance(value, str):
return []

raw = value.strip()
if not raw:
return []

parsed = _parse_json_object(raw)
if parsed is not None:
return [raw]

chunks = _split_json_objects(raw)
if len(chunks) <= 1:
return []
return chunks


def _parse_json_object(raw: str) -> dict[str, Any] | None:
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return None
if not isinstance(parsed, dict):
return None
return parsed


def _split_json_objects(raw: str) -> list[str]:
decoder = json.JSONDecoder()
chunks: list[str] = []
position = 0
total = len(raw)
while position < total:
while position < total and raw[position].isspace():
position += 1
if position >= total:
break
try:
parsed, end = decoder.raw_decode(raw, position)
except json.JSONDecodeError:
return []
if not isinstance(parsed, dict):
return []
chunks.append(raw[position:end])
position = end
return chunks


def _render_tool_result(result: object) -> str:
if isinstance(result, str):
return result
Expand Down
Loading