From a23ca752303d2b8fad9b73c48f61a126927a6209 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Wed, 6 May 2026 15:20:51 -0700 Subject: [PATCH 1/8] feat: add _sum_cost_usd helper for spawn cost bridge --- amplifier_app_cli/session_spawner.py | 33 +++++++++++++++++-- tests/test_cost_bridge.py | 49 ++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 tests/test_cost_bridge.py diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index 190e3db..8e9dbb5 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -5,6 +5,7 @@ import logging import sys +from decimal import Decimal from pathlib import Path from amplifier_core import AmplifierSession @@ -14,6 +15,27 @@ logger = logging.getLogger(__name__) + +# Intentionally duplicated across provider-anthropic, foundation, and hooks-streaming-ui. +# These are separate repos with no shared utility dependency. The function is 10 lines — +# the coordination cost of sharing outweighs the duplication cost. +def _sum_cost_usd(contributions: list) -> Decimal | None: + """Sum cost_usd values from collect_contributions() results. + + Returns Decimal total, or None if no cost data is present. + None != 0: None means unknown cost (no rate data), 0 means known-free. + """ + total: Decimal | None = None + for c in contributions: + if c and isinstance(c, dict): + cost = c.get("cost_usd") + if cost is not None: + total = (total or Decimal("0")) + ( + cost if isinstance(cost, Decimal) else Decimal(str(cost)) + ) + return total + + # Capture default sys.path entries at import time. # Used to filter out bundle-added paths when forwarding sys_paths to subprocess children. _DEFAULT_SYS_PATHS: frozenset[str] = frozenset(sys.path) @@ -714,7 +736,11 @@ async def _capture_completion(event: str, data: dict) -> HookResult: } -async def resume_sub_session(sub_session_id: str, instruction: str, parent_session: AmplifierSession | None = None) -> dict: +async def resume_sub_session( + sub_session_id: str, + instruction: str, + parent_session: AmplifierSession | None = None, +) -> dict: """Resume existing sub-session for multi-turn engagement. Loads previously saved sub-session state, recreates the session with @@ -1016,7 +1042,10 @@ async def _capture_completion(event: str, data: dict) -> HookResult: finally: # Unregister child cancellation token before cleanup # MUST run even if execution was cancelled (CancelledError) or failed - if resume_parent_cancellation is not None and resume_child_cancellation is not None: + if ( + resume_parent_cancellation is not None + and resume_child_cancellation is not None + ): resume_parent_cancellation.unregister_child(resume_child_cancellation) logger.debug( f"Unregistered child cancellation token for resumed sub-session {sub_session_id}" diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py new file mode 100644 index 0000000..0094b09 --- /dev/null +++ b/tests/test_cost_bridge.py @@ -0,0 +1,49 @@ +"""Tests for _sum_cost_usd helper in session_spawner.""" + +from decimal import Decimal + +from amplifier_app_cli.session_spawner import _sum_cost_usd + + +def test_sums_single_contribution(): + result = _sum_cost_usd([{"cost_usd": Decimal("0.05")}]) + assert result == Decimal("0.05") + + +def test_sums_multiple_contributions(): + result = _sum_cost_usd( + [ + {"cost_usd": Decimal("0.03")}, + {"cost_usd": Decimal("0.05")}, + {"cost_usd": Decimal("0.01")}, + ] + ) + assert result == Decimal("0.09") + + +def test_returns_none_for_empty_list(): + result = _sum_cost_usd([]) + assert result is None + + +def test_returns_none_when_all_none(): + result = _sum_cost_usd([{"cost_usd": None}, None, {}]) + assert result is None + + +def test_accepts_string_cost_usd(): + result = _sum_cost_usd([{"cost_usd": "0.05"}]) + assert result == Decimal("0.05") + assert isinstance(result, Decimal) + + +def test_skips_none_entries_in_mixed_list(): + result = _sum_cost_usd( + [ + {"cost_usd": Decimal("0.03")}, + None, + {"cost_usd": None}, + {"cost_usd": Decimal("0.02")}, + ] + ) + assert result == Decimal("0.05") From c66f0a3a5af215f75d063b9cd1d3d5f6c8ee200b Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Wed, 6 May 2026 15:29:45 -0700 Subject: [PATCH 2/8] feat: bridge child session costs to parent in spawn_sub_session() --- amplifier_app_cli/session_spawner.py | 29 ++++++++++++++++ tests/test_cost_bridge.py | 51 ++++++++++++++++++++++++++++ tests/test_session_spawner.py | 8 +++++ 3 files changed, 88 insertions(+) diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index 8e9dbb5..e50ad4a 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -36,6 +36,28 @@ def _sum_cost_usd(contributions: list) -> Decimal | None: return total +async def _bridge_child_cost( + child_coordinator, + parent_coordinator, + child_session_id: str, +) -> None: + """Propagate child session cost to parent's session.cost channel. + + Called after child_session.execute() returns and before child_session.cleanup(). + The child coordinator is still alive in this window. + """ + child_contributions = await child_coordinator.collect_contributions("session.cost") + child_total = _sum_cost_usd(child_contributions) + + if child_total is not None: + # Freeze value in default arg — child coordinator will be torn down after this + parent_coordinator.register_contributor( + "session.cost", + f"delegate:{child_session_id}", + lambda total=child_total: {"cost_usd": total}, + ) + + # Capture default sys.path entries at import time. # Used to filter out bundle-added paths when forwarding sys_paths to subprocess children. _DEFAULT_SYS_PATHS: frozenset[str] = frozenset(sys.path) @@ -710,6 +732,13 @@ async def _capture_completion(event: str, data: dict) -> HookResult: store.save(sub_session_id, transcript, metadata) logger.debug(f"Sub-session {sub_session_id} state persisted") + # Bridge child session costs to parent coordinator before child is torn down + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) + finally: # Unregister child cancellation token before cleanup # MUST run even if execution was cancelled (CancelledError) or failed diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 0094b09..8ba930d 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -1,6 +1,9 @@ """Tests for _sum_cost_usd helper in session_spawner.""" from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import pytest from amplifier_app_cli.session_spawner import _sum_cost_usd @@ -47,3 +50,51 @@ def test_skips_none_entries_in_mixed_list(): ] ) assert result == Decimal("0.05") + + +@pytest.mark.asyncio +async def test_spawn_bridge_registers_child_cost_on_parent(): + """After spawn_sub_session completes, parent coordinator has a delegate contributor.""" + child_coord = MagicMock() + child_coord.collect_contributions = AsyncMock( + return_value=[{"cost_usd": Decimal("0.07")}] + ) + + parent_coord = MagicMock() + registered = {} + + def capture_register(channel, name, callback): + registered[(channel, name)] = callback + + parent_coord.register_contributor = capture_register + + from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( + child_coordinator=child_coord, + parent_coordinator=parent_coord, + child_session_id="test-child-123", + ) + + key = ("session.cost", "delegate:test-child-123") + assert key in registered + result = registered[key]() + assert result == {"cost_usd": Decimal("0.07")} + + +@pytest.mark.asyncio +async def test_spawn_bridge_skips_registration_when_no_cost(): + """If child has no cost data, no contributor is registered on parent.""" + child_coord = MagicMock() + child_coord.collect_contributions = AsyncMock(return_value=[]) + + parent_coord = MagicMock() + parent_coord.register_contributor = MagicMock() + + from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( + child_coordinator=child_coord, + parent_coordinator=parent_coord, + child_session_id="test-child-456", + ) + + parent_coord.register_contributor.assert_not_called() diff --git a/tests/test_session_spawner.py b/tests/test_session_spawner.py index 1e5e593..4bc8e2c 100644 --- a/tests/test_session_spawner.py +++ b/tests/test_session_spawner.py @@ -659,6 +659,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) async def mock_execute(instruction): # Simulate orchestrator emitting orchestrator:complete during execute @@ -769,6 +770,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1052,6 +1054,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1217,6 +1220,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1335,6 +1339,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1445,6 +1450,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1561,6 +1567,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator @@ -1657,6 +1664,7 @@ def child_get(name): child_coordinator.get = child_get child_coordinator.mount = AsyncMock() + child_coordinator.collect_contributions = AsyncMock(return_value=[]) child_session = MagicMock() child_session.coordinator = child_coordinator From 9c19733e18a9fb9b3bbf22d39553aaea95ba4fb3 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Wed, 6 May 2026 15:33:36 -0700 Subject: [PATCH 3/8] feat: bridge child session costs in resume_sub_session() --- amplifier_app_cli/session_spawner.py | 8 ++++++++ tests/test_cost_bridge.py | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index e50ad4a..f3e79a5 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -1068,6 +1068,14 @@ async def _capture_completion(event: str, data: dict) -> HookResult: f"Sub-session {sub_session_id} state updated (turn {metadata['turn_count']})" ) + # Bridge child session costs to parent coordinator before child is torn down + if parent_session is not None: + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) + finally: # Unregister child cancellation token before cleanup # MUST run even if execution was cancelled (CancelledError) or failed diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 8ba930d..343a148 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -98,3 +98,29 @@ async def test_spawn_bridge_skips_registration_when_no_cost(): ) parent_coord.register_contributor.assert_not_called() + + +@pytest.mark.asyncio +async def test_resume_bridge_registers_child_cost_on_parent(): + """resume_sub_session also bridges child costs after execute().""" + child_coord = MagicMock() + child_coord.collect_contributions = AsyncMock( + return_value=[{"cost_usd": Decimal("0.04")}] + ) + + parent_coord = MagicMock() + registered = {} + + def capture_register(channel, name, callback): + registered[(channel, name)] = callback + + parent_coord.register_contributor = capture_register + + from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( + child_coordinator=child_coord, + parent_coordinator=parent_coord, + child_session_id="resumed-child-789", + ) + + assert ("session.cost", "delegate:resumed-child-789") in registered From 2dc6828d4d03ac72ae7ce91b270422729f75d7f7 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Wed, 6 May 2026 16:01:49 -0700 Subject: [PATCH 4/8] style: fix ruff formatting in test_cost_bridge.py --- tests/test_cost_bridge.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 343a148..709e0e7 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -69,6 +69,7 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, @@ -91,6 +92,7 @@ async def test_spawn_bridge_skips_registration_when_no_cost(): parent_coord.register_contributor = MagicMock() from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, @@ -117,6 +119,7 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register from amplifier_app_cli.session_spawner import _bridge_child_cost + await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, From 5bbe66e0fb3679da5b5964014b92afe9407b7cf8 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Wed, 6 May 2026 23:38:06 -0700 Subject: [PATCH 5/8] refactor: improve _sum_cost_usd comment to describe purpose rather than implementation note --- .../brainstorm/77298-1776142637/state/server-stopped | 1 + amplifier_app_cli/session_spawner.py | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) create mode 100644 .superpowers/brainstorm/77298-1776142637/state/server-stopped diff --git a/.superpowers/brainstorm/77298-1776142637/state/server-stopped b/.superpowers/brainstorm/77298-1776142637/state/server-stopped new file mode 100644 index 0000000..d59b8f7 --- /dev/null +++ b/.superpowers/brainstorm/77298-1776142637/state/server-stopped @@ -0,0 +1 @@ +{"reason":"idle timeout","timestamp":1776146178059} diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index f3e79a5..8abeaa7 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -16,9 +16,8 @@ logger = logging.getLogger(__name__) -# Intentionally duplicated across provider-anthropic, foundation, and hooks-streaming-ui. -# These are separate repos with no shared utility dependency. The function is 10 lines — -# the coordination cost of sharing outweighs the duplication cost. +# Collects cost_usd contributions from a session.cost channel and returns the +# total as Decimal, or None when no cost data is present (e.g. self-hosted models). def _sum_cost_usd(contributions: list) -> Decimal | None: """Sum cost_usd values from collect_contributions() results. From 32ea8fde50217f44dd25e2365e5611ab5432e2f0 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Thu, 7 May 2026 01:33:38 -0700 Subject: [PATCH 6/8] fix: wrap _bridge_child_cost in try/except, fix test description, remove stray artifact --- .../77298-1776142637/state/server-stopped | 1 - amplifier_app_cli/session_spawner.py | 34 ++++++--- tests/test_cost_bridge.py | 75 +++++++++++++++++++ 3 files changed, 99 insertions(+), 11 deletions(-) delete mode 100644 .superpowers/brainstorm/77298-1776142637/state/server-stopped diff --git a/.superpowers/brainstorm/77298-1776142637/state/server-stopped b/.superpowers/brainstorm/77298-1776142637/state/server-stopped deleted file mode 100644 index d59b8f7..0000000 --- a/.superpowers/brainstorm/77298-1776142637/state/server-stopped +++ /dev/null @@ -1 +0,0 @@ -{"reason":"idle timeout","timestamp":1776146178059} diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index 8abeaa7..2072def 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -732,11 +732,18 @@ async def _capture_completion(event: str, data: dict) -> HookResult: logger.debug(f"Sub-session {sub_session_id} state persisted") # Bridge child session costs to parent coordinator before child is torn down - await _bridge_child_cost( - child_coordinator=child_session.coordinator, - parent_coordinator=parent_session.coordinator, - child_session_id=sub_session_id, - ) + try: + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) + except Exception: + logger.warning( + "Failed to bridge child session cost for %s; skipping", + sub_session_id, + exc_info=True, + ) finally: # Unregister child cancellation token before cleanup @@ -1069,11 +1076,18 @@ async def _capture_completion(event: str, data: dict) -> HookResult: # Bridge child session costs to parent coordinator before child is torn down if parent_session is not None: - await _bridge_child_cost( - child_coordinator=child_session.coordinator, - parent_coordinator=parent_session.coordinator, - child_session_id=sub_session_id, - ) + try: + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) + except Exception: + logger.warning( + "Failed to bridge child session cost for %s; skipping", + sub_session_id, + exc_info=True, + ) finally: # Unregister child cancellation token before cleanup diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 709e0e7..206a1a6 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -127,3 +127,78 @@ def capture_register(channel, name, callback): ) assert ("session.cost", "delegate:resumed-child-789") in registered + + +@pytest.mark.asyncio +async def test_resume_bridge_accumulates_incremental_costs(): + """Resuming the same session twice correctly accumulates incremental costs. + + Each resume_sub_session call creates a FRESH child coordinator. The provider + re-mounts from zero, so the child's session.cost channel only contains costs + for THAT resume's turns — not the full session history. + + _bridge_child_cost therefore passes the incremental cost for each resume. + + register_contributor in amplifier-core APPENDS (coordinator.rs: .push(entry)) — + it does NOT overwrite on duplicate name. Both entries are returned by + collect_contributions and summed correctly by _sum_cost_usd. + + Verified properties: + - Both calls use the same (channel, name) key — standard contributor identity. + - Each callback carries only the incremental cost of its resume. + - _sum_cost_usd([cb1(), cb2()]) == first_cost + second_cost (no double-count). + """ + from amplifier_app_cli.session_spawner import _bridge_child_cost, _sum_cost_usd + + parent_coord = MagicMock() + all_register_calls: list[tuple] = [] + + def capture_register(channel, name, callback): + all_register_calls.append((channel, name, callback)) + + parent_coord.register_contributor = capture_register + + # First resume: fresh child coordinator accumulated $0.04 (turn 1 only) + child_coord_1 = MagicMock() + child_coord_1.collect_contributions = AsyncMock( + return_value=[{"cost_usd": Decimal("0.04")}] + ) + await _bridge_child_cost( + child_coordinator=child_coord_1, + parent_coordinator=parent_coord, + child_session_id="test-child-xyz", + ) + + # Second resume: fresh child coordinator accumulated $0.06 (turn 2 only) + child_coord_2 = MagicMock() + child_coord_2.collect_contributions = AsyncMock( + return_value=[{"cost_usd": Decimal("0.06")}] + ) + await _bridge_child_cost( + child_coordinator=child_coord_2, + parent_coordinator=parent_coord, + child_session_id="test-child-xyz", + ) + + assert len(all_register_calls) == 2, "Expected exactly two register_contributor calls" + + channel1, name1, _ = all_register_calls[0] + channel2, name2, _ = all_register_calls[1] + + # Same channel + name: register_contributor appends both, collect_contributions + # returns both, _sum_cost_usd sums them — no key uniqueness required. + assert channel1 == channel2 == "session.cost" + assert name1 == name2 == "delegate:test-child-xyz" + + # Verify incremental values and that their sum is correct + _, _, cb1 = all_register_calls[0] + _, _, cb2 = all_register_calls[1] + assert cb1()["cost_usd"] == Decimal("0.04") + assert cb2()["cost_usd"] == Decimal("0.06") + + # Simulate what collect_contributions + _sum_cost_usd would produce: + # both entries are returned, summed to $0.10 (no double-counting) + total = _sum_cost_usd([cb1(), cb2()]) + assert total == Decimal("0.10"), ( + f"Expected $0.10 from two incremental contributions, got {total!r}" + ) From d1b6d7daf8280393729933a795e3cbfd2e59a912 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Thu, 7 May 2026 09:44:40 -0700 Subject: [PATCH 7/8] refactor: import _bridge_child_cost from foundation instead of reimplementing; remove outer try/except wrappers --- amplifier_app_cli/session_spawner.py | 81 +++++----------------------- tests/test_cost_bridge.py | 36 +++++++++---- 2 files changed, 40 insertions(+), 77 deletions(-) diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index 2072def..e91694e 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -5,58 +5,17 @@ import logging import sys -from decimal import Decimal from pathlib import Path from amplifier_core import AmplifierSession from amplifier_foundation import generate_sub_session_id +from amplifier_foundation.bundle._prepared import _bridge_child_cost from .agent_config import merge_configs logger = logging.getLogger(__name__) -# Collects cost_usd contributions from a session.cost channel and returns the -# total as Decimal, or None when no cost data is present (e.g. self-hosted models). -def _sum_cost_usd(contributions: list) -> Decimal | None: - """Sum cost_usd values from collect_contributions() results. - - Returns Decimal total, or None if no cost data is present. - None != 0: None means unknown cost (no rate data), 0 means known-free. - """ - total: Decimal | None = None - for c in contributions: - if c and isinstance(c, dict): - cost = c.get("cost_usd") - if cost is not None: - total = (total or Decimal("0")) + ( - cost if isinstance(cost, Decimal) else Decimal(str(cost)) - ) - return total - - -async def _bridge_child_cost( - child_coordinator, - parent_coordinator, - child_session_id: str, -) -> None: - """Propagate child session cost to parent's session.cost channel. - - Called after child_session.execute() returns and before child_session.cleanup(). - The child coordinator is still alive in this window. - """ - child_contributions = await child_coordinator.collect_contributions("session.cost") - child_total = _sum_cost_usd(child_contributions) - - if child_total is not None: - # Freeze value in default arg — child coordinator will be torn down after this - parent_coordinator.register_contributor( - "session.cost", - f"delegate:{child_session_id}", - lambda total=child_total: {"cost_usd": total}, - ) - - # Capture default sys.path entries at import time. # Used to filter out bundle-added paths when forwarding sys_paths to subprocess children. _DEFAULT_SYS_PATHS: frozenset[str] = frozenset(sys.path) @@ -731,19 +690,12 @@ async def _capture_completion(event: str, data: dict) -> HookResult: store.save(sub_session_id, transcript, metadata) logger.debug(f"Sub-session {sub_session_id} state persisted") - # Bridge child session costs to parent coordinator before child is torn down - try: - await _bridge_child_cost( - child_coordinator=child_session.coordinator, - parent_coordinator=parent_session.coordinator, - child_session_id=sub_session_id, - ) - except Exception: - logger.warning( - "Failed to bridge child session cost for %s; skipping", - sub_session_id, - exc_info=True, - ) + # Bridge child session costs to parent coordinator (_bridge_child_cost never raises) + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) finally: # Unregister child cancellation token before cleanup @@ -1074,20 +1026,13 @@ async def _capture_completion(event: str, data: dict) -> HookResult: f"Sub-session {sub_session_id} state updated (turn {metadata['turn_count']})" ) - # Bridge child session costs to parent coordinator before child is torn down + # Bridge child session costs to parent coordinator (_bridge_child_cost never raises) if parent_session is not None: - try: - await _bridge_child_cost( - child_coordinator=child_session.coordinator, - parent_coordinator=parent_session.coordinator, - child_session_id=sub_session_id, - ) - except Exception: - logger.warning( - "Failed to bridge child session cost for %s; skipping", - sub_session_id, - exc_info=True, - ) + await _bridge_child_cost( + child_coordinator=child_session.coordinator, + parent_coordinator=parent_session.coordinator, + child_session_id=sub_session_id, + ) finally: # Unregister child cancellation token before cleanup diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 206a1a6..3a5cc0e 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -1,11 +1,15 @@ -"""Tests for _sum_cost_usd helper in session_spawner.""" +"""Tests for spawn cost bridge helpers. + +_sum_cost_usd and _bridge_child_cost live in amplifier_foundation.bundle._prepared +and are imported directly from there (app-cli delegates, not reimplements). +""" from decimal import Decimal from unittest.mock import AsyncMock, MagicMock import pytest -from amplifier_app_cli.session_spawner import _sum_cost_usd +from amplifier_foundation.bundle._prepared import _bridge_child_cost, _sum_cost_usd def test_sums_single_contribution(): @@ -68,8 +72,6 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register - from amplifier_app_cli.session_spawner import _bridge_child_cost - await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, @@ -82,6 +84,27 @@ def capture_register(channel, name, callback): assert result == {"cost_usd": Decimal("0.07")} +@pytest.mark.asyncio +async def test_bridge_swallows_exception_and_logs(): + """_bridge_child_cost never raises — errors are logged as warnings.""" + child_coord = MagicMock() + # Simulate a failure inside collect_contributions + child_coord.collect_contributions = AsyncMock(side_effect=RuntimeError("simulated")) + + parent_coord = MagicMock() + parent_coord.register_contributor = MagicMock() + + # Must not raise + await _bridge_child_cost( + child_coordinator=child_coord, + parent_coordinator=parent_coord, + child_session_id="test-child-err", + ) + + # No contributor registered because the bridge failed before it could register + parent_coord.register_contributor.assert_not_called() + + @pytest.mark.asyncio async def test_spawn_bridge_skips_registration_when_no_cost(): """If child has no cost data, no contributor is registered on parent.""" @@ -91,8 +114,6 @@ async def test_spawn_bridge_skips_registration_when_no_cost(): parent_coord = MagicMock() parent_coord.register_contributor = MagicMock() - from amplifier_app_cli.session_spawner import _bridge_child_cost - await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, @@ -118,8 +139,6 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register - from amplifier_app_cli.session_spawner import _bridge_child_cost - await _bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, @@ -148,7 +167,6 @@ async def test_resume_bridge_accumulates_incremental_costs(): - Each callback carries only the incremental cost of its resume. - _sum_cost_usd([cb1(), cb2()]) == first_cost + second_cost (no double-count). """ - from amplifier_app_cli.session_spawner import _bridge_child_cost, _sum_cost_usd parent_coord = MagicMock() all_register_calls: list[tuple] = [] From 397d137b1d1e081cac816d4e0decfc5e79e34dd4 Mon Sep 17 00:00:00 2001 From: Ken Chau Date: Thu, 7 May 2026 13:58:56 -0700 Subject: [PATCH 8/8] refactor: import bridge_child_cost from public amplifier_foundation API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- amplifier_app_cli/session_spawner.py | 10 +++++----- tests/test_cost_bridge.py | 30 ++++++++++++++-------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/amplifier_app_cli/session_spawner.py b/amplifier_app_cli/session_spawner.py index e91694e..e17fe84 100644 --- a/amplifier_app_cli/session_spawner.py +++ b/amplifier_app_cli/session_spawner.py @@ -9,7 +9,7 @@ from amplifier_core import AmplifierSession from amplifier_foundation import generate_sub_session_id -from amplifier_foundation.bundle._prepared import _bridge_child_cost +from amplifier_foundation import bridge_child_cost from .agent_config import merge_configs @@ -690,8 +690,8 @@ async def _capture_completion(event: str, data: dict) -> HookResult: store.save(sub_session_id, transcript, metadata) logger.debug(f"Sub-session {sub_session_id} state persisted") - # Bridge child session costs to parent coordinator (_bridge_child_cost never raises) - await _bridge_child_cost( + # Bridge child session costs to parent coordinator (bridge_child_cost never raises) + await bridge_child_cost( child_coordinator=child_session.coordinator, parent_coordinator=parent_session.coordinator, child_session_id=sub_session_id, @@ -1026,9 +1026,9 @@ async def _capture_completion(event: str, data: dict) -> HookResult: f"Sub-session {sub_session_id} state updated (turn {metadata['turn_count']})" ) - # Bridge child session costs to parent coordinator (_bridge_child_cost never raises) + # Bridge child session costs to parent coordinator (bridge_child_cost never raises) if parent_session is not None: - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_session.coordinator, parent_coordinator=parent_session.coordinator, child_session_id=sub_session_id, diff --git a/tests/test_cost_bridge.py b/tests/test_cost_bridge.py index 3a5cc0e..7f9bc77 100644 --- a/tests/test_cost_bridge.py +++ b/tests/test_cost_bridge.py @@ -9,16 +9,16 @@ import pytest -from amplifier_foundation.bundle._prepared import _bridge_child_cost, _sum_cost_usd +from amplifier_foundation import bridge_child_cost, sum_cost_usd def test_sums_single_contribution(): - result = _sum_cost_usd([{"cost_usd": Decimal("0.05")}]) + result = sum_cost_usd([{"cost_usd": Decimal("0.05")}]) assert result == Decimal("0.05") def test_sums_multiple_contributions(): - result = _sum_cost_usd( + result = sum_cost_usd( [ {"cost_usd": Decimal("0.03")}, {"cost_usd": Decimal("0.05")}, @@ -29,23 +29,23 @@ def test_sums_multiple_contributions(): def test_returns_none_for_empty_list(): - result = _sum_cost_usd([]) + result = sum_cost_usd([]) assert result is None def test_returns_none_when_all_none(): - result = _sum_cost_usd([{"cost_usd": None}, None, {}]) + result = sum_cost_usd([{"cost_usd": None}, None, {}]) assert result is None def test_accepts_string_cost_usd(): - result = _sum_cost_usd([{"cost_usd": "0.05"}]) + result = sum_cost_usd([{"cost_usd": "0.05"}]) assert result == Decimal("0.05") assert isinstance(result, Decimal) def test_skips_none_entries_in_mixed_list(): - result = _sum_cost_usd( + result = sum_cost_usd( [ {"cost_usd": Decimal("0.03")}, None, @@ -72,7 +72,7 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, child_session_id="test-child-123", @@ -95,7 +95,7 @@ async def test_bridge_swallows_exception_and_logs(): parent_coord.register_contributor = MagicMock() # Must not raise - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, child_session_id="test-child-err", @@ -114,7 +114,7 @@ async def test_spawn_bridge_skips_registration_when_no_cost(): parent_coord = MagicMock() parent_coord.register_contributor = MagicMock() - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, child_session_id="test-child-456", @@ -139,7 +139,7 @@ def capture_register(channel, name, callback): parent_coord.register_contributor = capture_register - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord, parent_coordinator=parent_coord, child_session_id="resumed-child-789", @@ -165,7 +165,7 @@ async def test_resume_bridge_accumulates_incremental_costs(): Verified properties: - Both calls use the same (channel, name) key — standard contributor identity. - Each callback carries only the incremental cost of its resume. - - _sum_cost_usd([cb1(), cb2()]) == first_cost + second_cost (no double-count). + - sum_cost_usd([cb1(), cb2()]) == first_cost + second_cost (no double-count). """ parent_coord = MagicMock() @@ -181,7 +181,7 @@ def capture_register(channel, name, callback): child_coord_1.collect_contributions = AsyncMock( return_value=[{"cost_usd": Decimal("0.04")}] ) - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord_1, parent_coordinator=parent_coord, child_session_id="test-child-xyz", @@ -192,7 +192,7 @@ def capture_register(channel, name, callback): child_coord_2.collect_contributions = AsyncMock( return_value=[{"cost_usd": Decimal("0.06")}] ) - await _bridge_child_cost( + await bridge_child_cost( child_coordinator=child_coord_2, parent_coordinator=parent_coord, child_session_id="test-child-xyz", @@ -216,7 +216,7 @@ def capture_register(channel, name, callback): # Simulate what collect_contributions + _sum_cost_usd would produce: # both entries are returned, summed to $0.10 (no double-counting) - total = _sum_cost_usd([cb1(), cb2()]) + total = sum_cost_usd([cb1(), cb2()]) assert total == Decimal("0.10"), ( f"Expected $0.10 from two incremental contributions, got {total!r}" )