Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"aiohttp>=3.13.3",
"httpx[socks]>=0.28.1",
"typing-extensions>=4.13.0",
"logfire>=4.31.0",
]

[project.urls]
Expand All @@ -45,11 +46,6 @@ Documentation = "https://bub.build"
[project.scripts]
bub = "bub.__main__:app"

[project.optional-dependencies]
logfire = [
"logfire>=4.31.0",
]

[dependency-groups]
dev = [
"pytest>=7.2.0",
Expand Down
22 changes: 13 additions & 9 deletions src/bub/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import os
import sys

import typer
Expand All @@ -12,17 +13,20 @@
def _instrument_bub() -> None:
from loguru import logger

logger.remove()
logger.add(sys.stderr, colorize=True)
from bub.builtin import telemetry

try:
import logfire
from logfire.integrations.loguru import LogfireHandler
level = os.environ.get("BUB_LOG_LEVEL", "WARNING").upper()
logger.remove()

logfire.configure()
logger.add(LogfireHandler(), format="{message}")
except Exception as exc:
logger.debug("logfire instrumentation disabled: {}", exc)
telemetry.configure_telemetry()
logfire_handler = telemetry.loguru_handler()
logfire_handler["level"] = "INFO"
logger.configure(
handlers=[
{"sink": sys.stderr, "colorize": True, "level": level},
logfire_handler,
]
)


def create_cli_app() -> typer.Typer:
Expand Down
254 changes: 161 additions & 93 deletions src/bub/builtin/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from loguru import logger

from bub.builtin import telemetry
from bub.builtin.model_runner import (
ModelRunner,
is_context_length_error,
Expand Down Expand Up @@ -102,7 +103,21 @@ async def run_stream(
stack = AsyncExitStack()
# The fork_tape context manager must not be exited until the last chunk of the stream is consumed.
tape = await stack.enter_async_context(tape.fork_tape(merge_back=merge_back))
await tape.ensure_bootstrap_anchor()
stack.enter_context(
telemetry.bub_span(
"bub.agent.run",
tape=tape.name,
store=getattr(tape, "store", None),
attributes={
telemetry.GEN_AI_OPERATION_NAME: "invoke_agent",
telemetry.GEN_AI_AGENT_NAME: telemetry.BUB_AGENT_NAME,
telemetry.GEN_AI_REQUEST_MODEL: model or self.settings.model,
"bub.session.id": session_id,
"bub.tape.merge_back": merge_back,
},
)
)
await self._ensure_bootstrap_anchor(tape)
if isinstance(prompt, str) and prompt.strip().startswith(","):
result = await self._run_command(tape=tape, line=prompt.strip())
events = self._events_from_iterable([
Expand All @@ -129,35 +144,54 @@ async def _run_command(self, tape: Tape, *, line: str) -> str:
context = ToolContext(tape=tape, run_id="run_command", state=tape.context.state)
output = ""
status = "ok"
try:
if name not in REGISTRY:
output = await REGISTRY["bash"].run(context=context, cmd=line)
with telemetry.bub_span(
"bub.command",
tape=tape.name,
store=getattr(tape, "store", None),
attributes={
telemetry.GEN_AI_OPERATION_NAME: "command",
telemetry.BUB_RUN_ID: "run_command",
"bub.command.name": name,
},
) as span:
try:
if name not in REGISTRY:
output = await REGISTRY["bash"].run(context=context, cmd=line)
else:
args = _parse_args(arg_tokens)
if REGISTRY[name].context:
args.kwargs["context"] = context
output = REGISTRY[name].run(*args.positional, **args.kwargs)
if inspect.isawaitable(output):
output = await output
except Exception as exc:
status = "error"
output = f"{exc!s}"
raise
else:
args = _parse_args(arg_tokens)
if REGISTRY[name].context:
args.kwargs["context"] = context
output = REGISTRY[name].run(*args.positional, **args.kwargs)
if inspect.isawaitable(output):
output = await output
except Exception as exc:
status = "error"
output = f"{exc!s}"
raise
else:
return output if isinstance(output, str) else str(output)
finally:
elapsed_ms = int((time.monotonic() - start) * 1000)
output_text = output if isinstance(output, str) else str(output)

event_payload = {
"raw": line,
"name": name,
"status": status,
"elapsed_ms": elapsed_ms,
"output": output_text,
"date": datetime.now(UTC).isoformat(),
}
await tape.append_event("command", event_payload)
return output if isinstance(output, str) else str(output)
finally:
elapsed_ms = int((time.monotonic() - start) * 1000)
output_text = output if isinstance(output, str) else str(output)
span.set_attribute("bub.command.status", status)
span.set_attribute("bub.command.elapsed_ms", elapsed_ms)

event_payload = {
"raw": line,
"name": name,
"status": status,
"elapsed_ms": elapsed_ms,
"output": output_text,
"date": datetime.now(UTC).isoformat(),
}
await telemetry.record_tape_event(tape.store, tape.name, "command", event_payload)
raise RuntimeError("command execution did not produce a result")

@staticmethod
async def _ensure_bootstrap_anchor(tape: Tape) -> None:
anchors = list(await tape.store.fetch_all(tape.query().kinds("anchor")))
if not anchors:
await telemetry.record_tape_handoff(tape.store, tape.name, name="session/start", state={"owner": "human"})

async def _agent_loop(
self,
Expand All @@ -170,7 +204,9 @@ async def _agent_loop(
) -> AsyncStreamEvents:
next_prompt: str | list[dict] = prompt
display_model = model or self.settings.model
await tape.append_event(
await telemetry.record_tape_event(
tape.store,
tape.name,
"loop.start",
{
"model": display_model,
Expand Down Expand Up @@ -206,95 +242,127 @@ async def _stream_events_with_auto_handoff(
start = time.monotonic()
should_continue = False
logger.info("loop.step step={} tape={} model={}", step, tape.name, display_model)
await tape.append_event("loop.step.start", {"step": step, "prompt": next_prompt})
try:
output = await self._run_once(
tape=tape,
prompt=next_prompt,
model=model,
allowed_skills=allowed_skills,
allowed_tools=allowed_tools,
with telemetry.bub_span(
"bub.agent.step",
tape=tape.name,
store=getattr(tape, "store", None),
attributes={
telemetry.GEN_AI_OPERATION_NAME: "agent_step",
telemetry.GEN_AI_REQUEST_MODEL: display_model,
"bub.loop.step": step,
},
) as span:
await telemetry.record_tape_event(
tape.store, tape.name, "loop.step.start", {"step": step, "prompt": next_prompt}
)
async for event in output:
yield event
if event.kind == "error":
elapsed_ms = int((time.monotonic() - start) * 1000)
await tape.append_event(
try:
output = await self._run_once(
tape=tape,
prompt=next_prompt,
model=model,
allowed_skills=allowed_skills,
allowed_tools=allowed_tools,
)
async for event in output:
yield event
if event.kind == "error":
elapsed_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("bub.loop.status", "error")
span.set_attribute("bub.loop.elapsed_ms", elapsed_ms)
await telemetry.record_tape_event(
tape.store,
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "error",
"error": event.data.get("message", ""),
"date": datetime.now(UTC).isoformat(),
},
)
elif event.kind == "final":
should_continue = bool(event.data.get("tool_calls") or event.data.get("tool_results"))
except Exception as exc:
error_message = f"{exc!s}"
elapsed_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("bub.loop.elapsed_ms", elapsed_ms)
if auto_handoff_remaining > 0 and is_context_length_error(error_message):
auto_handoff_remaining -= 1
span.set_attribute("bub.loop.status", "auto_handoff")
logger.warning(
"auto_handoff: context length exceeded, performing automatic handoff. tape={} step={}",
tape.name,
step,
)
await telemetry.record_tape_handoff(
tape.store,
tape.name,
name="auto_handoff/context_overflow",
state={"reason": "context_length_exceeded", "error": error_message},
)
await telemetry.record_tape_event(
tape.store,
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "error",
"error": event.data.get("message", ""),
"status": "auto_handoff",
"error": error_message,
"date": datetime.now(UTC).isoformat(),
},
)
elif event.kind == "final":
should_continue = bool(event.data.get("tool_calls") or event.data.get("tool_results"))
except Exception as exc:
error_message = f"{exc!s}"
elapsed_ms = int((time.monotonic() - start) * 1000)
if auto_handoff_remaining > 0 and is_context_length_error(error_message):
auto_handoff_remaining -= 1
logger.warning(
"auto_handoff: context length exceeded, performing automatic handoff. tape={} step={}",
next_prompt = prompt
continue

span.set_attribute("bub.loop.status", "error")
await telemetry.record_tape_event(
tape.store,
tape.name,
step,
)
await tape.handoff(
name="auto_handoff/context_overflow",
state={"reason": "context_length_exceeded", "error": error_message},
)
await tape.append_event(
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "auto_handoff",
"status": "error",
"error": error_message,
"date": datetime.now(UTC).isoformat(),
},
)
next_prompt = prompt
continue
raise

await tape.append_event(
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "error",
"error": error_message,
"date": datetime.now(UTC).isoformat(),
},
)
raise
state.error = output.error
state.usage = output.usage
elapsed_ms = int((time.monotonic() - start) * 1000)
span.set_attribute("bub.loop.elapsed_ms", elapsed_ms)
if not should_continue:
span.set_attribute("bub.loop.status", "ok")
await telemetry.record_tape_event(
tape.store,
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "ok",
"date": datetime.now(UTC).isoformat(),
},
)
return

state.error = output.error
state.usage = output.usage
elapsed_ms = int((time.monotonic() - start) * 1000)
if not should_continue:
await tape.append_event(
next_prompt = self._continue_prompt(tape)
span.set_attribute("bub.loop.status", "continue")
await telemetry.record_tape_event(
tape.store,
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "ok",
"status": "continue",
"date": datetime.now(UTC).isoformat(),
},
)
return

next_prompt = self._continue_prompt(tape)
await tape.append_event(
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "continue",
"date": datetime.now(UTC).isoformat(),
},
)

raise RuntimeError(f"max_steps_reached={self.settings.max_steps}")

Expand Down
Loading
Loading