From c4de977ee9eeba5573f4b7c1c40b4050a1184a57 Mon Sep 17 00:00:00 2001 From: Azure SRE Agent Date: Sun, 26 Apr 2026 06:34:30 +0000 Subject: [PATCH] Fix CWE-862: Enforce approval_mode in FunctionTool.invoke() FunctionTool.invoke() previously ignored approval_mode='always_require', allowing direct callers (e.g., Claude/Copilot integrations) to bypass the human approval gate that only existed in _try_execute_function_calls(). Changes: - Add ToolApprovalRequiredException to exceptions.py - Add approval_mode check at the top of FunctionTool.invoke() - Add _approved parameter (default False) to invoke() signature and overloads - Pass _approved=True from _auto_invoke_function() call sites (both direct and middleware pipeline paths) - Export ToolApprovalRequiredException from agent_framework package - Add comprehensive tests for the new guard Based on upstream microsoft/agent-framework main branch. Fixes MSRC Case 109288 (VULN-176822) Co-authored-by: Azure SRE Agent --- .../packages/core/agent_framework/__init__.py | 2 + .../packages/core/agent_framework/_tools.py | 18 ++++ .../core/agent_framework/exceptions.py | 13 +++ python/packages/core/tests/core/test_tools.py | 83 +++++++++++++++++++ 4 files changed, 116 insertions(+) diff --git a/python/packages/core/agent_framework/__init__.py b/python/packages/core/agent_framework/__init__.py index 364f62eae1..02ac7d1211 100644 --- a/python/packages/core/agent_framework/__init__.py +++ b/python/packages/core/agent_framework/__init__.py @@ -248,6 +248,7 @@ ) from .exceptions import ( MiddlewareException, + ToolApprovalRequiredException, UserInputRequiredException, WorkflowCheckpointException, WorkflowConvergenceException, @@ -403,6 +404,7 @@ "TypeCompatibilityError", "UpdateT", "UsageDetails", + "ToolApprovalRequiredException", "UserInputRequiredException", "ValidationTypeEnum", "Workflow", diff --git a/python/packages/core/agent_framework/_tools.py b/python/packages/core/agent_framework/_tools.py index 3f15472a5a..68969ab624 100644 --- a/python/packages/core/agent_framework/_tools.py +++ b/python/packages/core/agent_framework/_tools.py @@ -545,6 +545,7 @@ async def invoke( context: FunctionInvocationContext | None = None, tool_call_id: str | None = None, skip_parsing: Literal[True], + _approved: bool = False, **kwargs: Any, ) -> Any: ... @@ -556,6 +557,7 @@ async def invoke( context: FunctionInvocationContext | None = None, tool_call_id: str | None = None, skip_parsing: Literal[False] = False, + _approved: bool = False, **kwargs: Any, ) -> list[Content]: ... @@ -566,6 +568,7 @@ async def invoke( context: FunctionInvocationContext | None = None, tool_call_id: str | None = None, skip_parsing: bool = False, + _approved: bool = False, **kwargs: Any, ) -> list[Content] | Any: """Run the AI function with the provided arguments as a Pydantic model. @@ -588,6 +591,10 @@ async def invoke( tool_call_id: Optional tool call identifier used for telemetry and tracing. skip_parsing: When ``True``, bypass parsing and return the wrapped function's raw value instead of a ``list[Content]``. Defaults to ``False``. + _approved: Internal flag set by the auto-invocation pipeline after the + approval gate in ``_try_execute_function_calls`` has been satisfied. + Callers outside the pipeline must obtain human approval and pass + ``_approved=True`` to execute tools with ``approval_mode='always_require'``. kwargs: Direct function argument values. When provided, every keyword must match a declared tool parameter. Runtime data must be passed via ``context``. @@ -599,9 +606,18 @@ async def invoke( Raises: TypeError: If arguments is not mapping-like or fails schema checks. + ToolApprovalRequiredException: If the tool requires approval and ``_approved`` is False. """ if self.declaration_only: raise ToolException(f"Function '{self.name}' is declaration only and cannot be invoked.") + if self.approval_mode == "always_require" and not _approved: + from .exceptions import ToolApprovalRequiredException + + raise ToolApprovalRequiredException( + f"Function '{self.name}' requires human approval (approval_mode='always_require'). " + "The auto-invocation pipeline handles this automatically. If calling invoke() " + "directly, obtain human approval first and pass _approved=True." + ) global OBSERVABILITY_SETTINGS from ._middleware import FunctionInvocationContext from ._types import Content @@ -1520,6 +1536,7 @@ async def _auto_invoke_function( arguments=args, context=direct_context, tool_call_id=function_call_content.call_id, + _approved=True, ) return Content.from_function_result( call_id=function_call_content.call_id, # type: ignore[arg-type] @@ -1551,6 +1568,7 @@ async def final_function_handler(context_obj: Any) -> Any: arguments=context_obj.arguments, context=context_obj, tool_call_id=function_call_content.call_id, + _approved=True, ) from ._middleware import MiddlewareTermination diff --git a/python/packages/core/agent_framework/exceptions.py b/python/packages/core/agent_framework/exceptions.py index 4f56c34b5c..10f0c93696 100644 --- a/python/packages/core/agent_framework/exceptions.py +++ b/python/packages/core/agent_framework/exceptions.py @@ -180,6 +180,19 @@ class ToolExecutionException(ToolException): pass +class ToolApprovalRequiredException(ToolException): + """Raised when a tool with approval_mode='always_require' is invoked without approval. + + Direct calls to ``FunctionTool.invoke()`` on tools that require human approval + must pass ``_approved=True`` to certify that the approval gate has been satisfied. + The auto-invocation pipeline in ``_try_execute_function_calls`` handles this + automatically; this exception guards against bypasses from alternative call paths + (e.g., Claude, Copilot Studio, or custom integrations). + """ + + pass + + class UserInputRequiredException(ToolException): """Raised when a tool wrapping a sub-agent requires user input to proceed. diff --git a/python/packages/core/tests/core/test_tools.py b/python/packages/core/tests/core/test_tools.py index b3762bf4ef..73485e5aa6 100644 --- a/python/packages/core/tests/core/test_tools.py +++ b/python/packages/core/tests/core/test_tools.py @@ -1085,6 +1085,89 @@ def invalid_tool(x, ctx, runtime: FunctionInvocationContext) -> str: return f"{x}-{ctx.kwargs}-{runtime.kwargs}" +# region approval_mode enforcement tests + + +async def test_invoke_blocked_when_approval_required(): + """Test that direct invoke() raises ToolApprovalRequiredException for always_require tools.""" + from agent_framework.exceptions import ToolApprovalRequiredException + + @tool(name="dangerous_tool", description="Requires approval", approval_mode="always_require") + def dangerous_action(path: str) -> str: + """A tool that requires human approval.""" + return f"deleted {path}" + + # Arrange / Act / Assert + with pytest.raises(ToolApprovalRequiredException, match="requires human approval"): + await dangerous_action.invoke(arguments={"path": "/critical"}) + + +async def test_invoke_succeeds_with_approved_flag(): + """Test that invoke() succeeds when _approved=True is passed for always_require tools.""" + + @tool(name="approved_tool", description="Requires approval", approval_mode="always_require") + def guarded_action(x: int) -> str: + """A tool that requires human approval.""" + return f"result={x}" + + # Arrange / Act + result = await guarded_action.invoke(arguments={"x": 42}, _approved=True) + + # Assert + assert len(result) == 1 + assert result[0].text == "result=42" + + +async def test_invoke_allowed_when_approval_not_required(): + """Test that invoke() works normally for tools without approval_mode='always_require'.""" + + @tool(name="safe_tool", description="No approval needed") + def safe_action(x: int) -> str: + """A tool that does not require approval.""" + return f"safe={x}" + + # Arrange / Act + result = await safe_action.invoke(arguments={"x": 7}) + + # Assert + assert len(result) == 1 + assert result[0].text == "safe=7" + assert safe_action.approval_mode == "never_require" + + +async def test_invoke_allowed_for_auto_approval_mode(): + """Test that invoke() works for tools with approval_mode other than 'always_require'.""" + + @tool(name="auto_tool", description="Auto approval", approval_mode="auto") + def auto_action(x: int) -> str: + """A tool with auto approval mode.""" + return f"auto={x}" + + # Arrange / Act + result = await auto_action.invoke(arguments={"x": 3}) + + # Assert + assert len(result) == 1 + assert result[0].text == "auto=3" + + +async def test_call_still_works_for_approval_required_tools(): + """Test that __call__ (the raw function) is not gated — only invoke() is gated.""" + + @tool(name="callable_tool", description="Requires approval", approval_mode="always_require") + def callable_action(x: int) -> int: + """A tool that requires human approval.""" + return x * 2 + + # Arrange / Act — __call__ is the low-level execution, not the security boundary + result = callable_action(5) + + # Assert + assert result == 10 + + +# endregion + # region _parse_annotation tests