Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
449 changes: 350 additions & 99 deletions python/packages/azurefunctions/agent_framework_azurefunctions/_app.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def prepare_activity_task(self, activity_name: str, input_json: str) -> Any:
orchestration_context: Any = self._context
return orchestration_context.call_activity(activity_name, input_json)

def call_sub_orchestrator(self, name: str, input: Any, instance_id: str | None = None) -> Any:
orchestration_context: Any = self._context
return orchestration_context.call_sub_orchestrator(name, input_=input, instance_id=instance_id)

# -- Composite tasks ------------------------------------------------------

def task_all(self, tasks: list[Any]) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ def _load_and_validate_env(sample_path: Path) -> None:
"DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"FUNCTIONS_WORKER_RUNTIME",
]
if sample_path.name == "11_workflow_parallel":
# Samples that host no AI agents need no model credentials (only the DTS emulator
# and Azurite). The suite-level gate still requires *some* LLM config to be present.
no_llm_samples = {"13_subworkflow_hitl"}
if sample_path.name in no_llm_samples:
pass
elif sample_path.name == "11_workflow_parallel":
required_env_vars.extend(["AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_MODEL"])
else:
required_env_vars.extend(["FOUNDRY_PROJECT_ENDPOINT", "FOUNDRY_MODEL"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import pytest

# Must match the workflow name in samples/04-hosting/azure_functions/09_workflow_shared_state/function_app.py
WORKFLOW_NAME = "email_triage_shared_state"

# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.flaky,
Expand All @@ -45,7 +48,7 @@ def test_workflow_with_spam_email(self) -> None:
spam_content = "URGENT! You have won $1,000,000! Click here to claim your prize now before it expires!"

# Start orchestration with spam email
response = self.helper.post_text(f"{self.base_url}/api/workflow/run", spam_content)
response = self.helper.post_text(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", spam_content)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand All @@ -64,7 +67,7 @@ def test_workflow_with_legitimate_email(self) -> None:
)

# Start orchestration with legitimate email
response = self.helper.post_text(f"{self.base_url}/api/workflow/run", legitimate_content)
response = self.helper.post_text(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", legitimate_content)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand All @@ -83,7 +86,7 @@ def test_workflow_with_phishing_email(self) -> None:
)

# Start orchestration with phishing email
response = self.helper.post_text(f"{self.base_url}/api/workflow/run", phishing_content)
response = self.helper.post_text(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", phishing_content)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import pytest

# Must match the workflow name in samples/04-hosting/azure_functions/10_workflow_no_shared_state/function_app.py
WORKFLOW_NAME = "email_triage"

# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.flaky,
Expand Down Expand Up @@ -51,7 +54,7 @@ def test_workflow_with_spam_email(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand All @@ -73,7 +76,7 @@ def test_workflow_with_legitimate_email(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand All @@ -92,13 +95,13 @@ def test_workflow_status_endpoint(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]

# Check status using the workflow status endpoint
status_response = self.helper.get(f"{self.base_url}/api/workflow/status/{instance_id}")
status_response = self.helper.get(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/status/{instance_id}")
assert status_response.status_code == 200
status = status_response.json()
assert "instanceId" in status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import pytest

# Must match the workflow name in samples/04-hosting/azure_functions/11_workflow_parallel/function_app.py
WORKFLOW_NAME = "parallel_review"

# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.flaky,
Expand Down Expand Up @@ -62,14 +65,14 @@ def test_parallel_workflow_end_to_end(self) -> None:
}

# Start the orchestration.
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
assert "statusQueryGetUri" in data

# The status endpoint reflects the started instance.
status_response = self.helper.get(f"{self.base_url}/api/workflow/status/{instance_id}")
status_response = self.helper.get(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/status/{instance_id}")
assert status_response.status_code == 200
assert status_response.json()["instanceId"] == instance_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
pytest.mark.usefixtures("function_app_for_test"),
]

# Must match the workflow name in samples/04-hosting/azure_functions/12_workflow_hitl/function_app.py
WORKFLOW_NAME = "content_moderation"


@pytest.mark.orchestration
class TestWorkflowHITL:
Expand All @@ -46,7 +49,7 @@ def _wait_for_hitl_request(self, instance_id: str, timeout: int = 40) -> dict:
"""Polls for a pending HITL request."""
start_time = time.time()
while time.time() - start_time < timeout:
status_response = self.helper.get(f"{self.base_url}/api/workflow/status/{instance_id}")
status_response = self.helper.get(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/status/{instance_id}")
if status_response.status_code == 200:
status = status_response.json()
pending_requests = status.get("pendingHumanInputRequests", [])
Expand All @@ -69,7 +72,7 @@ def test_hitl_workflow_approval(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
assert "instanceId" in data
Expand All @@ -89,7 +92,7 @@ def test_hitl_workflow_approval(self) -> None:

# Send approval
approval_response = self.helper.post_json(
f"{self.base_url}/api/workflow/respond/{instance_id}/{request_id}",
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": True, "reviewer_notes": "Content is appropriate and well-written."},
)
assert approval_response.status_code == 200
Expand All @@ -112,7 +115,7 @@ def test_hitl_workflow_rejection(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
Expand All @@ -127,7 +130,7 @@ def test_hitl_workflow_rejection(self) -> None:

# Send rejection
rejection_response = self.helper.post_json(
f"{self.base_url}/api/workflow/respond/{instance_id}/{request_id}",
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": False, "reviewer_notes": "Content appears to be spam/scam material."},
)
assert rejection_response.status_code == 200
Expand All @@ -150,7 +153,7 @@ def test_hitl_workflow_status_endpoint(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
Expand All @@ -169,7 +172,7 @@ def test_hitl_workflow_status_endpoint(self) -> None:
if pending_requests:
request_id = pending_requests[0]["requestId"]
self.helper.post_json(
f"{self.base_url}/api/workflow/respond/{instance_id}/{request_id}",
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": True, "reviewer_notes": ""},
)

Expand All @@ -189,7 +192,7 @@ def test_hitl_workflow_with_neutral_content(self) -> None:
}

# Start orchestration
response = self.helper.post_json(f"{self.base_url}/api/workflow/run", payload)
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
data = response.json()
instance_id = data["instanceId"]
Expand All @@ -203,7 +206,7 @@ def test_hitl_workflow_with_neutral_content(self) -> None:

# Approve
self.helper.post_json(
f"{self.base_url}/api/workflow/respond/{instance_id}/{request_id}",
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": True, "reviewer_notes": "Approved after review."},
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright (c) Microsoft. All rights reserved.
"""
Integration Tests for the Sub-workflow HITL Sample (13_subworkflow_hitl)

Tests nested human-in-the-loop through the Azure Functions host: the HITL pause
lives inside an inner workflow embedded via ``WorkflowExecutor``, so the pending
request surfaces at the top-level instance with a **qualified** request id
(``review_sub~0~{requestId}``). The caller responds against the top-level instance
and the host routes it to the owning child orchestration.

This sample hosts no AI agents, so it exercises the AF nested-HITL plumbing
deterministically (no model latency / variability).

The function app is automatically started by the test fixture.

Prerequisites:
- Azurite running for durable orchestrations
- Durable Task Scheduler emulator running on localhost:8080

Usage:
uv run pytest packages/azurefunctions/tests/integration_tests/test_13_workflow_subworkflow_hitl.py -v
"""

import time

import pytest

# Module-level markers - applied to all tests in this file
pytestmark = [
pytest.mark.flaky,
pytest.mark.integration,
pytest.mark.sample("13_subworkflow_hitl"),
pytest.mark.usefixtures("function_app_for_test"),
]

# Must match the outer workflow name in samples/.../13_subworkflow_hitl/function_app.py
WORKFLOW_NAME = "moderation_pipeline"
# The WorkflowExecutor node id that embeds the inner HITL workflow.
SUBWORKFLOW_NODE_ID = "review_sub"


@pytest.mark.orchestration
class TestSubworkflowHITL:
"""Tests for the 13_subworkflow_hitl sample (nested HITL behind one surface)."""

@pytest.fixture(autouse=True)
def _setup(self, base_url: str, sample_helper) -> None:
"""Provide the helper and base URL for each test."""
self.base_url = base_url
self.helper = sample_helper

def _wait_for_hitl_request(self, instance_id: str, timeout: int = 40) -> dict:
"""Poll the top-level status endpoint until a (nested) HITL request appears."""
start_time = time.time()
while time.time() - start_time < timeout:
status_response = self.helper.get(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/status/{instance_id}")
if status_response.status_code == 200:
status = status_response.json()
if status.get("pendingHumanInputRequests"):
return status
time.sleep(2)
raise AssertionError(f"Timed out waiting for a nested HITL request for instance {instance_id}")

def _start(self, payload: dict) -> dict:
"""Start the outer workflow and return the run response JSON."""
response = self.helper.post_json(f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/run", payload)
assert response.status_code == 202
return response.json()

def test_nested_request_surfaces_with_qualified_id(self) -> None:
"""The nested pending request is surfaced with a ``review_sub~0~{id}`` qualified id."""
data = self._start({
"content_id": "article-100",
"title": "Quarterly Roadmap",
"body": "A summary of the upcoming features planned for the next quarter.",
})
instance_id = data["instanceId"]

status = self._wait_for_hitl_request(instance_id)
pending = status.get("pendingHumanInputRequests", [])
assert len(pending) == 1
request_id = pending[0]["requestId"]

# The qualifier carries the node id and the child's ordinal (0 for the single
# dispatch), then the inner bare request id: ``review_sub~0~{requestId}``.
expected_prefix = f"{SUBWORKFLOW_NODE_ID}~0~"
assert request_id.startswith(expected_prefix), request_id
assert request_id[len(expected_prefix) :] # non-empty inner id

# The respondUrl always targets the top-level instance.
assert f"/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/" in pending[0]["respondUrl"]

# Drain the pause so the instance does not hang.
approve = self.helper.post_json(
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": True, "reviewer_notes": "ok"},
)
assert approve.status_code == 200
self.helper.wait_for_orchestration(data["statusQueryGetUri"])

def test_nested_hitl_approval(self) -> None:
"""Responding 'approved' to the nested request resumes the outer workflow to APPROVED."""
data = self._start({
"content_id": "article-001",
"title": "Introduction to AI in Healthcare",
"body": (
"Artificial intelligence is improving healthcare by enabling faster diagnosis, "
"personalized treatment plans, and better patient outcomes."
),
})
instance_id = data["instanceId"]

status = self._wait_for_hitl_request(instance_id)
request_id = status["pendingHumanInputRequests"][0]["requestId"]

approval = self.helper.post_json(
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": True, "reviewer_notes": "Looks good."},
)
assert approval.status_code == 200

final_status = self.helper.wait_for_orchestration(data["statusQueryGetUri"])
assert final_status["runtimeStatus"] == "Completed"
assert "APPROVED" in str(final_status.get("output")).upper()

def test_nested_hitl_rejection(self) -> None:
"""Responding 'rejected' to the nested request resumes the outer workflow to REJECTED."""
data = self._start({
"content_id": "article-002",
"title": "Get Rich Quick",
"body": "Click here NOW to make $10,000 overnight! GUARANTEED! Limited time offer!",
})
instance_id = data["instanceId"]

status = self._wait_for_hitl_request(instance_id)
request_id = status["pendingHumanInputRequests"][0]["requestId"]

rejection = self.helper.post_json(
f"{self.base_url}/api/workflow/{WORKFLOW_NAME}/respond/{instance_id}/{request_id}",
{"approved": False, "reviewer_notes": "Violates content policy."},
)
assert rejection.status_code == 200

final_status = self.helper.wait_for_orchestration(data["statusQueryGetUri"])
assert final_status["runtimeStatus"] == "Completed"
assert "REJECTED" in str(final_status.get("output")).upper()


if __name__ == "__main__":
pytest.main([__file__, "-v"])
Loading
Loading