From 97efdd7e207460ab842bf954bed2273face436af Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Fri, 19 Sep 2025 12:21:22 -0700 Subject: [PATCH 1/2] durable functions python stateful app patterns --- .../python/async-http-api/README.md | 137 +++++++ .../python/async-http-api/function_app.py | 179 +++++++++ .../python/async-http-api/host.json | 24 ++ .../python/async-http-api/requirements.txt | 2 + .../python/eternal-orchestrations/README.md | 175 +++++++++ .../eternal-orchestrations/function_app.py | 238 ++++++++++++ .../python/eternal-orchestrations/host.json | 24 ++ .../python/fan-out-fan-in/README.md | 130 +++++++ .../python/fan-out-fan-in/function_app.py | 139 +++++++ .../python/fan-out-fan-in/host.json | 24 ++ .../python/fan-out-fan-in/requirements.txt | 2 + .../python/function-chaining/README.md | 113 ++++++ .../python/function-chaining/function_app.py | 108 ++++++ .../python/function-chaining/host.json | 24 ++ .../python/function-chaining/requirements.txt | 2 + .../python/human-interaction/README.md | 177 +++++++++ .../python/human-interaction/function_app.py | 280 ++++++++++++++ .../python/human-interaction/host.json | 24 ++ .../python/human-interaction/requirements.txt | 2 + .../python/monitoring/README.md | 344 ++++++++++++++++++ .../python/monitoring/function_app.py | 180 +++++++++ .../python/monitoring/host.json | 24 ++ .../python/monitoring/requirements.txt | 2 + .../python/pdf-summarizer/host.json | 4 +- 24 files changed, 2356 insertions(+), 2 deletions(-) create mode 100644 samples/durable-functions/python/async-http-api/README.md create mode 100644 samples/durable-functions/python/async-http-api/function_app.py create mode 100644 samples/durable-functions/python/async-http-api/host.json create mode 100644 samples/durable-functions/python/async-http-api/requirements.txt create mode 100644 samples/durable-functions/python/eternal-orchestrations/README.md create mode 100644 samples/durable-functions/python/eternal-orchestrations/function_app.py create mode 100644 samples/durable-functions/python/eternal-orchestrations/host.json create mode 100644 samples/durable-functions/python/fan-out-fan-in/README.md create mode 100644 samples/durable-functions/python/fan-out-fan-in/function_app.py create mode 100644 samples/durable-functions/python/fan-out-fan-in/host.json create mode 100644 samples/durable-functions/python/fan-out-fan-in/requirements.txt create mode 100644 samples/durable-functions/python/function-chaining/README.md create mode 100644 samples/durable-functions/python/function-chaining/function_app.py create mode 100644 samples/durable-functions/python/function-chaining/host.json create mode 100644 samples/durable-functions/python/function-chaining/requirements.txt create mode 100644 samples/durable-functions/python/human-interaction/README.md create mode 100644 samples/durable-functions/python/human-interaction/function_app.py create mode 100644 samples/durable-functions/python/human-interaction/host.json create mode 100644 samples/durable-functions/python/human-interaction/requirements.txt create mode 100644 samples/durable-functions/python/monitoring/README.md create mode 100644 samples/durable-functions/python/monitoring/function_app.py create mode 100644 samples/durable-functions/python/monitoring/host.json create mode 100644 samples/durable-functions/python/monitoring/requirements.txt diff --git a/samples/durable-functions/python/async-http-api/README.md b/samples/durable-functions/python/async-http-api/README.md new file mode 100644 index 0000000..ded9990 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/README.md @@ -0,0 +1,137 @@ +# Async HTTP API Pattern - Durable Functions with Durable Task Scheduler + +This sample demonstrates the **Async HTTP API** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to handle long-running operations with HTTP polling for status updates. + +## Pattern Overview + +The Async HTTP API pattern provides a way to handle long-running operations: +1. **HTTP Start**: Client submits a request and receives URLs for status checking +2. **Long-Running Process**: `process_long_running_task` simulates work that takes time +3. **Status Polling**: Clients can check progress via HTTP endpoints +4. **Completion**: Eventually returns the final result + +## Architecture + +- **HTTP Trigger**: `async_operation` - Starts the long-running orchestration +- **Orchestrator**: `async_http_orchestrator` - Manages the long-running process +- **Activity**: `process_long_running_task` - Simulates time-consuming work +- **Status Endpoint**: `status/{instanceId}` - Provides orchestration status +- **Backend**: Durable Task Scheduler for state management + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) +- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) + +## Setup + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Start the Durable Task Scheduler Emulator**: + ```bash + docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + ``` + +3. **Configure connection** (already set in `local.settings.json`): + The sample is configured to use the local emulator by default. + +## Running the Sample + +1. **Start the Azure Functions host**: + ```bash + func start + ``` + +2. **Start a long-running operation**: + ```bash + # Start operation with default parameters + curl -X POST http://localhost:7071/api/async_http_api \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start operation with custom parameters + curl -X POST http://localhost:7071/api/async_http_api \ + -H "Content-Type: application/json" \ + -d '{"operation_type": "data_processing", "duration": 45, "data": {"input": "sample"}}' + ``` + +3. **Poll for status** (use URLs from the initial response): + ```bash + # Check status + curl http://localhost:7071/api/status/{instanceId} + + # The response includes standard orchestration management URLs: + # - statusQueryGetUri: Check current status + # - sendEventPostUri: Send external events + # - terminatePostUri: Terminate the orchestration + ``` + +## Configuration Files + +### host.json +Configures the Durable Functions extension to use Durable Task Scheduler: +- Sets the hub name to "default" +- Configures the storage provider as "azureManaged" +- References the connection string name + +### local.settings.json +Contains local development settings: +- Durable Task Scheduler connection string for local emulator +- Function worker runtime set to "python" + +## Expected Behavior + +1. **Initial Response**: Returns HTTP 202 with management URLs +2. **Status Polling**: Shows "Running" status while processing +3. **Progress Updates**: Activity logs progress during execution +4. **Completion**: Eventually returns "Completed" with final result + +Example status progression: +```json +// Initially +{"runtimeStatus": "Running", "output": null} + +// Finally +{"runtimeStatus": "Completed", "output": {"task": "ProcessData", "result": "Success", "duration": 30}} +``` + +## How It Works + +1. **Async Start**: HTTP trigger starts orchestration and returns immediately with status URLs +2. **Background Processing**: Long-running activity executes while client can poll for updates +3. **Status Management**: Durable Functions manages orchestration state across the operation +4. **Client Experience**: Clients get immediate response and can check progress periodically + +## Monitoring + +- **Function Logs**: Check the Azure Functions host output for processing details +- **Dashboard**: Navigate to http://localhost:8082 to view long-running orchestrations +- **Status Endpoints**: Use the returned URLs to monitor progress programmatically + +## Using with Azure Durable Task Scheduler + +To use with an Azure-hosted Durable Task Scheduler instead of the emulator: + +1. Update `local.settings.json`: + ```json + { + "Values": { + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + } + } + ``` + +2. Ensure you're authenticated with Azure CLI: + ```bash + az login + ``` + +## Learn More + +- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Async HTTP APIs Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-http-features) \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/function_app.py b/samples/durable-functions/python/async-http-api/function_app.py new file mode 100644 index 0000000..a9a8b27 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/function_app.py @@ -0,0 +1,179 @@ +import logging +import json +import time +import uuid +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="async_http_api", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_async_operation(req: func.HttpRequest, client): + """HTTP trigger that starts a long-running asynchronous operation.""" + try: + # Get input from request body or use default + req_body = req.get_json() + operation_data = { + "operation_type": req_body.get("operation_type", "data_processing") if req_body else "data_processing", + "duration": req_body.get("duration", 30) if req_body else 30, # seconds + "data": req_body.get("data", {"sample": "data"}) if req_body else {"sample": "data"} + } + + logging.info(f"Starting async operation: {operation_data}") + + # Start the orchestration + instance_id = await client.start_new("async_http_api_orchestrator", client_input=operation_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def async_http_api_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the async HTTP API pattern. + Starts a long-running operation and provides status updates. + """ + operation_data = context.get_input() + logging.info(f"Async HTTP API orchestration started: {operation_data}") + + # Start the long-running operation + operation_id = str(uuid.uuid4()) + operation_request = { + **operation_data, + "operation_id": operation_id, + "started_at": context.current_utc_datetime.isoformat() + } + + # Call the long-running activity + result = yield context.call_activity("long_running_operation", operation_request) + + logging.info(f"Async HTTP API orchestration completed: {result}") + return result + +@app.activity_trigger(input_name="operationRequest") +def long_running_operation(operationRequest: dict) -> dict: + """Simulate a long-running operation with status updates.""" + logging.info(f"Starting long-running operation: {operationRequest['operation_id']}") + + operation_type = operationRequest["operation_type"] + duration = operationRequest["duration"] + data = operationRequest["data"] + + # Simulate processing phases + phases = ["initializing", "processing", "validating", "finalizing"] + phase_duration = duration / len(phases) + + start_time = time.time() + + for i, phase in enumerate(phases): + logging.info(f"Operation {operationRequest['operation_id']} - Phase: {phase}") + + # Simulate work being done in this phase + time.sleep(phase_duration) + + progress = ((i + 1) / len(phases)) * 100 + logging.info(f"Operation {operationRequest['operation_id']} - Progress: {progress}%") + + end_time = time.time() + actual_duration = end_time - start_time + + result = { + "operation_id": operationRequest["operation_id"], + "operation_type": operation_type, + "status": "completed", + "started_at": operationRequest["started_at"], + "completed_at": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "duration_seconds": round(actual_duration, 2), + "input_data": data, + "output_data": { + "processed": True, + "result_id": str(uuid.uuid4()), + "phases_completed": phases, + "total_items_processed": 1000, + "success_rate": 98.5 + } + } + + logging.info(f"Completed long-running operation: {operationRequest['operation_id']}") + return result + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration with detailed progress information.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + # Enhanced status response for async operations + response_data = { + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None, + "customStatus": status.custom_status + } + + # Add operation-specific status information + if status.runtime_status == "Running": + response_data["message"] = "Operation is currently in progress. Check back later for results." + elif status.runtime_status == "Completed": + response_data["message"] = "Operation completed successfully." + elif status.runtime_status == "Failed": + response_data["message"] = "Operation failed. Check the output for error details." + + return func.HttpResponse( + json.dumps(response_data, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="cancel/{instanceId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def cancel_operation(req: func.HttpRequest, client): + """Cancel a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + await client.terminate(instance_id, "Operation cancelled by user request") + + return func.HttpResponse( + json.dumps({"message": f"Operation {instance_id} has been cancelled"}), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error cancelling operation: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to cancel operation: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/host.json b/samples/durable-functions/python/async-http-api/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/async-http-api/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/requirements.txt b/samples/durable-functions/python/async-http-api/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/README.md b/samples/durable-functions/python/eternal-orchestrations/README.md new file mode 100644 index 0000000..b054a2f --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/README.md @@ -0,0 +1,175 @@ +# Eternal Orchestrations Pattern - Durable Functions with Durable Task Scheduler + +This sample demonstrates the **Eternal Orchestrations** pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to create long-running, recurring workflows that continue indefinitely until explicitly terminated. + +## Pattern Overview + +The Eternal Orchestrations pattern creates perpetual workflows: +1. **Initialization**: Sets up the recurring workflow with initial parameters +2. **Work Execution**: `perform_periodic_work` - Executes the recurring task +3. **Timer Wait**: Waits for the next execution interval using durable timers +4. **Self-Continuation**: Orchestration continues itself in a new iteration +5. **Graceful Termination**: Can be stopped via external events or conditions + +## Architecture + +- **HTTP Trigger**: `start_eternal_process` - Starts the eternal orchestration +- **Orchestrator**: `eternal_orchestrator` - Manages the infinite loop with timers +- **Activity**: `perform_periodic_work` - Executes the recurring work +- **Control Endpoints**: + - `stop_eternal/{instanceId}` - Gracefully stops the orchestration + - `status/{instanceId}` - Checks current status and iteration count +- **Backend**: Durable Task Scheduler for long-term state management + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) +- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) + +## Setup + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Start the Durable Task Scheduler Emulator**: + ```bash + docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + ``` + +3. **Configure connection** (already set in `local.settings.json`): + The sample is configured to use the local emulator by default. + +## Running the Sample + +1. **Start the Azure Functions host**: + ```bash + func start + ``` + +2. **Start an eternal orchestration**: + ```bash + # Start with default parameters (2 minute intervals, 5 iterations max) + curl -X POST http://localhost:7071/api/eternal_orchestration \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start with custom parameters + curl -X POST http://localhost:7071/api/eternal_orchestration \ + -H "Content-Type: application/json" \ + -d '{"task_type": "data_sync", "interval_minutes": 1, "max_iterations": 10, "target_url": "https://httpbin.org/delay/1"}' + ``` + +3. **Monitor the running orchestration**: + ```bash + # Check current status and iteration count + curl http://localhost:7071/api/status/{instanceId} + ``` + +4. **Stop the eternal orchestration**: + ```bash + # Send stop signal + curl -X POST http://localhost:7071/api/stop/{instanceId} + ``` + +## Configuration Files + +### host.json +Configures the Durable Functions extension to use Durable Task Scheduler: +- Sets the hub name to "default" +- Configures the storage provider as "azureManaged" +- References the connection string name + +### local.settings.json +Contains local development settings: +- Durable Task Scheduler connection string for local emulator +- Function worker runtime set to "python" + +## Expected Behavior + +### Running Status +```json +{ + "runtimeStatus": "Running", + "input": {"task_type": "health_check", "interval_minutes": 2, "max_iterations": 5}, + "customStatus": { + "current_iteration": 3, + "max_iterations": 5, + "last_run": "2025-09-19T16:45:30Z", + "task_type": "health_check" + } +} +``` + +### Completed Status (after reaching max iterations) +```json +{ + "runtimeStatus": "Completed", + "output": { + "status": "completed", + "total_iterations": 5, + "final_result": { + "iteration": 5, + "task_type": "health_check", + "success": true, + "executed_at": "2025-09-19T16:45:30Z" + } + } +} +``` + +## How It Works + +1. **Continuous Loop**: Orchestration runs in an infinite loop until stopped +2. **Durable Timers**: Uses `create_timer()` to wait between iterations without consuming resources +3. **Self-Continuation**: Each iteration calls `continue_as_new()` to reset and continue +4. **External Events**: Listens for stop signals during timer waits +5. **State Preservation**: Maintains iteration counters and status across continuations + +## Key Features + +- **Resource Efficient**: Uses durable timers instead of blocking waits +- **Long-Term Stability**: Designed to run for days, weeks, or months +- **Graceful Shutdown**: Responds to external stop events +- **Progress Tracking**: Maintains iteration counts and execution history +- **Self-Healing**: Automatically recovers from host restarts + +## Monitoring + +- **Function Logs**: Check the Azure Functions host output for iteration details +- **Dashboard**: Navigate to http://localhost:8082 to view long-running orchestrations +- **Custom Status**: Monitor iteration progress and timing via status endpoints +- **Timers**: See active timers and their scheduled execution times + +## Best Practices + +- **Set Reasonable Intervals**: Avoid very short intervals that could overwhelm the system +- **Implement Stop Conditions**: Always provide a way to gracefully terminate +- **Monitor Resource Usage**: Watch for memory leaks or resource accumulation +- **Handle Failures**: Include error handling and retry logic in activities + +## Using with Azure Durable Task Scheduler + +To use with an Azure-hosted Durable Task Scheduler instead of the emulator: + +1. Update `local.settings.json`: + ```json + { + "Values": { + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + } + } + ``` + +2. Ensure you're authenticated with Azure CLI: + ```bash + az login + ``` + +## Learn More + +- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Eternal Orchestrations Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-eternal-orchestrations) \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/function_app.py b/samples/durable-functions/python/eternal-orchestrations/function_app.py new file mode 100644 index 0000000..8838859 --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/function_app.py @@ -0,0 +1,238 @@ +import logging +import json +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="eternal_orchestration", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_eternal_orchestration(req: func.HttpRequest, client): + """HTTP trigger that starts an eternal orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + config = { + "task_type": req_body.get("task_type", "health_check") if req_body else "health_check", + "interval_minutes": req_body.get("interval_minutes", 2) if req_body else 2, # Short interval for demo + "max_iterations": req_body.get("max_iterations", 5) if req_body else 5, # Limit for demo + "target_url": req_body.get("target_url", "https://httpbin.org/status/200") if req_body else "https://httpbin.org/status/200" + } + + logging.info(f"Starting eternal orchestration: {config}") + + # Start the orchestration + instance_id = await client.start_new("eternal_orchestrator", client_input=config) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def eternal_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the eternal orchestration pattern. + Uses continue_as_new for proper eternal orchestration behavior. + """ + config = context.get_input() + current_iteration = config.get("current_iteration", 1) + max_iterations = config.get("max_iterations", 5) + + logging.info(f"Eternal orchestration iteration {current_iteration}") + + # Update custom status + context.set_custom_status({ + "current_iteration": current_iteration, + "max_iterations": max_iterations, + "last_run": context.current_utc_datetime.isoformat(), + "task_type": config["task_type"], + "status": "executing_task" + }) + + # Execute the periodic task + task_result = yield context.call_activity("execute_periodic_task", { + **config, + "iteration": current_iteration + }) + + # Check if we should continue + if current_iteration >= max_iterations: + logging.info(f"Eternal orchestration completed after {current_iteration} iterations") + return { + "status": "completed", + "total_iterations": current_iteration, + "final_result": task_result + } + + # Update status to waiting + context.set_custom_status({ + "current_iteration": current_iteration, + "max_iterations": max_iterations, + "last_run": context.current_utc_datetime.isoformat(), + "task_type": config["task_type"], + "status": "waiting_for_next_iteration" + }) + + # Use activity function to handle the delay instead of orchestrator timer + yield context.call_activity("wait_for_interval", { + "interval_minutes": config.get("interval_minutes", 2) + }) + + # Continue with next iteration + next_config = config.copy() + next_config["current_iteration"] = current_iteration + 1 + context.continue_as_new(next_config) + +@app.activity_trigger(input_name="taskConfig") +def execute_periodic_task(taskConfig: dict) -> dict: + """Execute a periodic task (e.g., health check, data sync, cleanup).""" + import requests + import time + + task_type = taskConfig["task_type"] + iteration = taskConfig["iteration"] + target_url = taskConfig["target_url"] + + logging.info(f"Executing {task_type} task - iteration {iteration}") + + start_time = time.time() + + try: + if task_type == "health_check": + # Perform health check + response = requests.get(target_url, timeout=10) + success = response.status_code == 200 + details = { + "status_code": response.status_code, + "response_time_ms": round((time.time() - start_time) * 1000, 2), + "url": target_url + } + elif task_type == "data_sync": + # Simulate data synchronization + time.sleep(1) # Simulate work + success = True + details = { + "records_synced": 150, + "duration_ms": round((time.time() - start_time) * 1000, 2) + } + elif task_type == "cleanup": + # Simulate cleanup task + time.sleep(0.5) # Simulate work + success = True + details = { + "files_cleaned": 25, + "space_freed_mb": 128, + "duration_ms": round((time.time() - start_time) * 1000, 2) + } + else: + # Default task + success = True + details = {"message": f"Executed {task_type} task"} + + except Exception as e: + success = False + details = {"error": str(e)} + + result = { + "iteration": iteration, + "task_type": task_type, + "success": success, + "executed_at": datetime.utcnow().isoformat(), + "details": details + } + + logging.info(f"Task execution completed: {result}") + return result + +@app.activity_trigger(input_name="waitConfig") +def wait_for_interval(waitConfig: dict) -> dict: + """Activity function that handles waiting between iterations.""" + import time + + interval_minutes = waitConfig.get("interval_minutes", 2) + # For demo purposes, cap the wait time at 2 minutes and convert to seconds + wait_seconds = min(interval_minutes * 60, 120) + + logging.info(f"Waiting {wait_seconds} seconds before next iteration") + + start_time = time.time() + time.sleep(wait_seconds) + actual_wait = time.time() - start_time + + result = { + "requested_wait_seconds": wait_seconds, + "actual_wait_seconds": round(actual_wait, 2), + "completed_at": datetime.utcnow().isoformat() + } + + logging.info(f"Wait completed: {result}") + return result + +@app.route(route="stop/{instanceId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def stop_eternal_orchestration(req: func.HttpRequest, client): + """Stop an eternal orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + await client.terminate(instance_id, "Stopped by user request") + + return func.HttpResponse( + json.dumps({"message": f"Eternal orchestration {instance_id} has been stopped"}), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error stopping orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to stop orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "customStatus": status.custom_status, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/host.json b/samples/durable-functions/python/eternal-orchestrations/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/README.md b/samples/durable-functions/python/fan-out-fan-in/README.md new file mode 100644 index 0000000..09469ce --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/README.md @@ -0,0 +1,130 @@ +# Fan-Out/Fan-In Pattern - Durable Functions with Durable Task Scheduler + +This sample demonstrates the **Fan-Out/Fan-In** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. In this pattern, multiple activities are executed in parallel (fan-out), and their results are aggregated when all complete (fan-in). + +## Pattern Overview + +The Fan-Out/Fan-In pattern executes multiple activities in parallel and aggregates results: +1. **Fan-Out**: `process_work_item` activities are started in parallel for each work item +2. **Fan-In**: Wait for all parallel activities to complete +3. `aggregate_results` - Combines all results into a summary + +## Architecture + +- **HTTP Trigger**: `fan_out_fan_in` - Starts the orchestration +- **Orchestrator**: `fan_out_fan_in_orchestrator` - Manages parallel execution and aggregation +- **Activities**: + - `process_work_item` - Processes individual work items (executed in parallel) + - `aggregate_results` - Combines results from all parallel activities +- **Backend**: Durable Task Scheduler for state management + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) +- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) + +## Setup + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Start the Durable Task Scheduler Emulator**: + ```bash + docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + ``` + +3. **Configure connection** (already set in `local.settings.json`): + The sample is configured to use the local emulator by default. + +## Running the Sample + +1. **Start the Azure Functions host**: + ```bash + func start + ``` + +2. **Test the orchestration**: + ```bash + # Start orchestration with default work items + curl -X POST http://localhost:7071/api/fan_out_fan_in \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start orchestration with custom work items + curl -X POST http://localhost:7071/api/fan_out_fan_in \ + -H "Content-Type: application/json" \ + -d '{"workItems": ["Task1", "Task2", "Task3", "Task4", "Task5", "Task6"]}' + ``` + +3. **Check orchestration status**: + Use the `statusQueryGetUri` from the response to check status, or: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Configuration Files + +### host.json +Configures the Durable Functions extension to use Durable Task Scheduler: +- Sets the hub name to "default" +- Configures the storage provider as "azureManaged" +- References the connection string name + +### local.settings.json +Contains local development settings: +- Durable Task Scheduler connection string for local emulator +- Function worker runtime set to "python" + +## Expected Output + +For input `{"workItems": ["Task1", "Task2", "Task3"]}`, the orchestration will produce: +```json +{ + "total_items_processed": 3, + "total_value": 165, + "average_value": 55.0, + "total_processing_time": 3.47, + "processed_items": ["Processed_Task1", "Processed_Task2", "Processed_Task3"], + "success": true +} +``` + +## How It Works + +1. **Parallel Execution**: Each work item is processed simultaneously using `process_work_item` +2. **Processing Simulation**: Each activity simulates work with random processing times (0.5-2.0 seconds) +3. **Result Generation**: Each activity returns processing metrics and a random value +4. **Aggregation**: Results are combined to show totals, averages, and processing statistics + +## Monitoring + +- **Function Logs**: Check the Azure Functions host output for detailed logging of parallel execution +- **Dashboard**: Navigate to http://localhost:8082 to view orchestrations and parallel activities in the emulator dashboard +- **Performance**: Watch how multiple activities execute concurrently and complete at different times + +## Using with Azure Durable Task Scheduler + +To use with an Azure-hosted Durable Task Scheduler instead of the emulator: + +1. Update `local.settings.json`: + ```json + { + "Values": { + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + } + } + ``` + +2. Ensure you're authenticated with Azure CLI: + ```bash + az login + ``` + +## Learn More + +- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Fan-Out/Fan-In Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-cloud-backup) \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/function_app.py b/samples/durable-functions/python/fan-out-fan-in/function_app.py new file mode 100644 index 0000000..126ff04 --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/function_app.py @@ -0,0 +1,139 @@ +import logging +import json +import asyncio +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="fan_out_fan_in", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_fan_out_fan_in(req: func.HttpRequest, client): + """HTTP trigger that starts the fan-out/fan-in orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + work_items = req_body.get("workItems", ["Item1", "Item2", "Item3", "Item4", "Item5"]) if req_body else ["Item1", "Item2", "Item3", "Item4", "Item5"] + + logging.info(f"Starting fan-out/fan-in orchestration with {len(work_items)} work items") + + # Start the orchestration + instance_id = await client.start_new("fan_out_fan_in_orchestrator", client_input=work_items) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def fan_out_fan_in_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the fan-out/fan-in pattern. + Processes multiple work items in parallel and aggregates the results. + """ + work_items = context.get_input() + logging.info(f"Fan-out/fan-in orchestration started with {len(work_items)} work items") + + # Fan-out: Start all work items in parallel + parallel_tasks = [] + for item in work_items: + task = context.call_activity("process_work_item", item) + parallel_tasks.append(task) + + # Fan-in: Wait for all tasks to complete and collect results + results = yield context.task_all(parallel_tasks) + + # Aggregate the results + aggregated_result = yield context.call_activity("aggregate_results", results) + + logging.info(f"Fan-out/fan-in orchestration completed with aggregated result") + return aggregated_result + +@app.activity_trigger(input_name="workItem") +def process_work_item(workItem: str) -> dict: + """Process a single work item.""" + import time + import random + + logging.info(f"Processing work item: {workItem}") + + # Simulate processing time + processing_time = random.uniform(0.5, 2.0) + time.sleep(processing_time) + + # Simulate different types of processing results + result = { + "item": workItem, + "processed": True, + "processing_time": round(processing_time, 2), + "result": f"Processed_{workItem}", + "value": random.randint(10, 100) + } + + logging.info(f"Completed processing work item: {workItem}") + return result + +@app.activity_trigger(input_name="results") +def aggregate_results(results: list) -> dict: + """Aggregate the results from all parallel processing tasks.""" + logging.info(f"Aggregating {len(results)} results") + + total_value = sum(result["value"] for result in results) + total_processing_time = sum(result["processing_time"] for result in results) + processed_items = [result["result"] for result in results] + + aggregated = { + "total_items_processed": len(results), + "total_value": total_value, + "average_value": round(total_value / len(results), 2), + "total_processing_time": round(total_processing_time, 2), + "processed_items": processed_items, + "success": True + } + + logging.info(f"Aggregation completed: {aggregated}") + return aggregated + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/host.json b/samples/durable-functions/python/fan-out-fan-in/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/requirements.txt b/samples/durable-functions/python/fan-out-fan-in/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/README.md b/samples/durable-functions/python/function-chaining/README.md new file mode 100644 index 0000000..e872a80 --- /dev/null +++ b/samples/durable-functions/python/function-chaining/README.md @@ -0,0 +1,113 @@ +# Function Chaining Pattern - Durable Functions with Durable Task Scheduler + +This sample demonstrates the **Function Chaining** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. In this pattern, activities are executed sequentially, with the output of each activity passed as input to the next activity. + +## Pattern Overview + +The Function Chaining pattern executes a sequence of activities in order: +1. `say_hello` - Takes a name and returns a greeting +2. `process_greeting` - Takes the greeting and adds more text +3. `finalize_response` - Takes the processed greeting and finalizes it + +## Architecture + +- **HTTP Trigger**: `function_chaining` - Starts the orchestration +- **Orchestrator**: `function_chaining_orchestrator` - Manages the sequence of activities +- **Activities**: Three sequential activities that transform the input +- **Backend**: Durable Task Scheduler for state management + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) +- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) + +## Setup + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Start the Durable Task Scheduler Emulator**: + ```bash + docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + ``` + +3. **Configure connection** (already set in `local.settings.json`): + The sample is configured to use the local emulator by default. + +## Running the Sample + +1. **Start the Azure Functions host**: + ```bash + func start + ``` + +2. **Test the orchestration**: + ```bash + # Start orchestration with default input + curl -X POST http://localhost:7071/api/function_chaining \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start orchestration with custom name + curl -X POST http://localhost:7071/api/function_chaining \ + -H "Content-Type: application/json" \ + -d '{"name": "Alice"}' + ``` + +3. **Check orchestration status**: + Use the `statusQueryGetUri` from the response to check status, or: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Configuration Files + +### host.json +Configures the Durable Functions extension to use Durable Task Scheduler: +- Sets the hub name to "default" +- Configures the storage provider as "AzureManaged" +- References the connection string name + +### local.settings.json +Contains local development settings: +- Durable Task Scheduler connection string for local emulator +- Function worker runtime set to "python" + +## Expected Output + +For input `{"name": "Alice"}`, the orchestration will produce: +``` +"Hello Alice! How are you today? I hope you're doing well!" +``` + +## Monitoring + +- **Function Logs**: Check the Azure Functions host output for detailed logging +- **Dashboard**: Navigate to http://localhost:8082 to view orchestrations in the emulator dashboard + +## Using with Azure Durable Task Scheduler + +To use with an Azure-hosted Durable Task Scheduler instead of the emulator: + +1. Update `local.settings.json`: + ```json + { + "Values": { + "DurableTaskSchedulerConnection": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + } + } + ``` + +2. Ensure you're authenticated with Azure CLI: + ```bash + az login + ``` + +## Learn More + +- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Function Chaining Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-sequence) \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/function_app.py b/samples/durable-functions/python/function-chaining/function_app.py new file mode 100644 index 0000000..c18d04b --- /dev/null +++ b/samples/durable-functions/python/function-chaining/function_app.py @@ -0,0 +1,108 @@ +import logging +import json +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="function_chaining", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_function_chaining(req: func.HttpRequest, client): + """HTTP trigger that starts the function chaining orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + name = req_body.get("name", "World") if req_body else "World" + + logging.info(f"Starting function chaining orchestration for: {name}") + + # Start the orchestration + instance_id = await client.start_new("function_chaining_orchestrator", client_input=name) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def function_chaining_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the function chaining pattern. + Calls activities sequentially, passing the output of each activity to the next. + """ + name = context.get_input() + logging.info(f"Function chaining orchestration started for: {name}") + + # Call first activity - passing input directly + greeting = yield context.call_activity("say_hello", name) + + # Call second activity with the result from first activity + processed_greeting = yield context.call_activity("process_greeting", greeting) + + # Call third activity with the result from second activity + final_response = yield context.call_activity("finalize_response", processed_greeting) + + logging.info(f"Function chaining orchestration completed: {final_response}") + return final_response + +@app.activity_trigger(input_name="name") +def say_hello(name: str) -> str: + """First activity that greets the user.""" + logging.info(f"Activity say_hello called with name: {name}") + return f"Hello {name}!" + +@app.activity_trigger(input_name="greeting") +def process_greeting(greeting: str) -> str: + """Second activity that processes the greeting.""" + logging.info(f"Activity process_greeting called with greeting: {greeting}") + return f"{greeting} How are you today?" + +@app.activity_trigger(input_name="response") +def finalize_response(response: str) -> str: + """Third activity that finalizes the response.""" + logging.info(f"Activity finalize_response called with response: {response}") + return f"{response} I hope you're doing well!" + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/host.json b/samples/durable-functions/python/function-chaining/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/function-chaining/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/requirements.txt b/samples/durable-functions/python/function-chaining/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/function-chaining/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/README.md b/samples/durable-functions/python/human-interaction/README.md new file mode 100644 index 0000000..f10bd24 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/README.md @@ -0,0 +1,177 @@ +# Human Interaction Pattern - Durable Functions with Durable Task Scheduler + +This sample demonstrates the **Human Interaction** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to handle workflows that require human approval or input with timeout handling. + +## Pattern Overview + +The Human Interaction pattern waits for human input with timeout handling: +1. **Process Start**: `send_approval_request` - Sends a request for human approval +2. **Wait for Input**: Orchestration waits for external event or timeout +3. **Timeout Handling**: If no response within timeout, escalates or takes default action +4. **Completion**: Processes the approval/denial or timeout result + +## Architecture + +- **HTTP Trigger**: `request_approval` - Starts the approval workflow +- **Orchestrator**: `human_interaction_orchestrator` - Manages waiting and timeout logic +- **Activities**: + - `send_approval_request` - Initiates the approval request + - `process_approval` - Handles approved requests + - `escalate_approval` - Handles timeout scenarios +- **Event Endpoint**: `approve/{instanceId}` - Receives approval/denial +- **Backend**: Durable Task Scheduler for state management + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) +- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) + +## Setup + +1. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +2. **Start the Durable Task Scheduler Emulator**: + ```bash + docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + ``` + +3. **Configure connection** (already set in `local.settings.json`): + The sample is configured to use the local emulator by default. + +## Running the Sample + +1. **Start the Azure Functions host**: + ```bash + func start + ``` + +2. **Start an approval workflow**: + ```bash + # Start approval request with default timeout (10 minutes) + curl -X POST http://localhost:7071/api/human_interaction \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start approval request with custom parameters + curl -X POST http://localhost:7071/api/human_interaction \ + -H "Content-Type: application/json" \ + -d '{"request_type": "expense_approval", "amount": 5000, "timeout_minutes": 15, "requester": "alice@company.com", "description": "Conference attendance expenses"}' + ``` + +3. **Provide approval/denial** (within the timeout period): + ```bash + # Approve the request (use approvalId from the orchestration response) + curl -X POST http://localhost:7071/api/approve/{approvalId} \ + -H "Content-Type: application/json" \ + -d '{"approved": true, "comments": "Approved by manager", "approver": "manager@company.com"}' + + # Deny the request + curl -X POST http://localhost:7071/api/approve/{approvalId} \ + -H "Content-Type: application/json" \ + -d '{"approved": false, "comments": "Insufficient justification", "approver": "manager@company.com"}' + ``` + +4. **Check status**: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Configuration Files + +### host.json +Configures the Durable Functions extension to use Durable Task Scheduler: +- Sets the hub name to "default" +- Configures the storage provider as "azureManaged" +- References the connection string name + +### local.settings.json +Contains local development settings: +- Durable Task Scheduler connection string for local emulator +- Function worker runtime set to "python" + +## Expected Behavior + +### Scenario 1: Approved Within Timeout +```json +{ + "requestType": "ExpenseApproval", + "status": "Approved", + "approval": { + "approved": true, + "comments": "Approved by manager", + "timestamp": "2025-09-19T16:45:30Z" + } +} +``` + +### Scenario 2: Denied Within Timeout +```json +{ + "requestType": "ExpenseApproval", + "status": "Denied", + "approval": { + "approved": false, + "comments": "Insufficient justification", + "timestamp": "2025-09-19T16:45:30Z" + } +} +``` + +### Scenario 3: Timeout (No Response) +```json +{ + "requestType": "ExpenseApproval", + "status": "Escalated", + "reason": "No response within timeout period", + "escalation": "Sent to senior management" +} +``` + +## How It Works + +1. **Timer Setup**: Orchestration sets up a durable timer for the timeout period +2. **Event Waiting**: Waits for either external approval event or timer completion +3. **Race Condition**: Whichever completes first (approval or timeout) determines the outcome +4. **State Management**: Durable Task Scheduler maintains state during the waiting period + +## Testing Timeout Scenarios + +To test timeout behavior: +1. Start an approval request with a short timeout (e.g., 10 seconds) +2. Don't send any approval/denial +3. Wait for the timeout to occur +4. Check the final status to see escalation handling + +## Monitoring + +- **Function Logs**: Check the Azure Functions host output for approval workflow details +- **Dashboard**: Navigate to http://localhost:8082 to view waiting orchestrations +- **Timers**: See active timers and their remaining durations in the dashboard + +## Using with Azure Durable Task Scheduler + +To use with an Azure-hosted Durable Task Scheduler instead of the emulator: + +1. Update `local.settings.json`: + ```json + { + "Values": { + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + } + } + ``` + +2. Ensure you're authenticated with Azure CLI: + ```bash + az login + ``` + +## Learn More + +- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Human Interaction Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-phone-verification) \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/function_app.py b/samples/durable-functions/python/human-interaction/function_app.py new file mode 100644 index 0000000..0ec2af6 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/function_app.py @@ -0,0 +1,280 @@ +import logging +import json +import uuid +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="human_interaction", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_approval_process(req: func.HttpRequest, client): + """HTTP trigger that starts a human interaction approval process.""" + try: + # Get input from request body or use default + req_body = req.get_json() + request_data = { + "request_type": req_body.get("request_type", "expense_approval") if req_body else "expense_approval", + "amount": req_body.get("amount", 1500.00) if req_body else 1500.00, + "requester": req_body.get("requester", "john.doe@company.com") if req_body else "john.doe@company.com", + "description": req_body.get("description", "Business travel expenses") if req_body else "Business travel expenses", + "timeout_minutes": req_body.get("timeout_minutes", 10) if req_body else 10 # Short timeout for demo + } + + logging.info(f"Starting human interaction process: {request_data}") + + # Start the orchestration + instance_id = await client.start_new("human_interaction_orchestrator", client_input=request_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def human_interaction_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the human interaction pattern. + Waits for human approval with a timeout mechanism. + """ + request_data = context.get_input() + logging.info(f"Human interaction orchestration started: {request_data}") + + # Generate approval request (use instance ID as approval ID for simplicity) + approval_request = { + "approval_id": context.instance_id, + "request_type": request_data["request_type"], + "amount": request_data["amount"], + "requester": request_data["requester"], + "description": request_data["description"], + "created_at": context.current_utc_datetime.isoformat(), + "timeout_minutes": request_data["timeout_minutes"] + } + + # Send approval request notification + yield context.call_activity("send_approval_request", approval_request) + + # Update custom status to show waiting for approval + context.set_custom_status({ + "approval_id": approval_request["approval_id"], + "status": "waiting_for_approval", + "timeout_minutes": request_data["timeout_minutes"], + "approval_url": f"http://localhost:7071/api/approve/{approval_request['approval_id']}" + }) + + # For demo purposes, let's simplify and just wait for the external event + # In a production scenario, you'd implement proper timeout handling + logging.info(f"Waiting for external approval event for request: {approval_request['approval_id']}") + + try: + # Wait for external approval event (simplified - no timeout for now) + approval_response = yield context.wait_for_external_event("approval_response") + + logging.info(f"Approval received for request: {approval_request['approval_id']}") + logging.info(f"Approval response type: {type(approval_response)}, value: {approval_response}") + + # Handle case where response might be a string (serialization issue) + if isinstance(approval_response, str): + import json + try: + approval_response = json.loads(approval_response) + except: + # If parsing fails, create a default response + approval_response = {"approved": True, "approver": "unknown", "comments": "Parsed from string"} + + result = yield context.call_activity("process_approval", { + "approval_request": approval_request, + "approval_response": approval_response, + "status": "approved" if approval_response.get("approved") else "rejected" + }) + + except Exception as e: + # Handle any errors + logging.error(f"Error waiting for approval: {str(e)}") + result = yield context.call_activity("process_approval", { + "approval_request": approval_request, + "approval_response": None, + "status": "error" + }) + + logging.info(f"Human interaction orchestration completed: {result}") + return result + +@app.activity_trigger(input_name="approvalRequest") +def send_approval_request(approvalRequest: dict) -> dict: + """Send approval request notification (simulated).""" + logging.info(f"Sending approval request: {approvalRequest['approval_id']}") + + # In a real scenario, this would send email, SMS, or push notification + # For demo purposes, we'll just log the approval details + + notification_result = { + "approval_id": approvalRequest["approval_id"], + "notification_sent": True, + "notification_method": "email", # simulated + "recipient": "manager@company.com", # simulated + "sent_at": datetime.utcnow().isoformat(), + "approval_url": f"http://localhost:7071/api/approve/{approvalRequest['approval_id']}" + } + + logging.info(f"Approval request notification sent: {notification_result}") + return notification_result + +@app.activity_trigger(input_name="approvalData") +def process_approval(approvalData: dict) -> dict: + """Process the approval response and finalize the request.""" + logging.info(f"Processing approval: {approvalData}") + + approval_request = approvalData["approval_request"] + approval_response = approvalData["approval_response"] + status = approvalData["status"] + + result = { + "approval_id": approval_request["approval_id"], + "request_type": approval_request["request_type"], + "amount": approval_request["amount"], + "requester": approval_request["requester"], + "status": status, + "processed_at": datetime.utcnow().isoformat(), + "created_at": approval_request["created_at"] + } + + if status == "approved": + result.update({ + "approved_by": approval_response.get("approver", "unknown"), + "approval_comments": approval_response.get("comments", ""), + "approved_at": approval_response.get("approved_at"), + "next_steps": "Request will be processed for payment" + }) + elif status == "rejected": + result.update({ + "rejected_by": approval_response.get("approver", "unknown"), + "rejection_reason": approval_response.get("comments", "No reason provided"), + "rejected_at": approval_response.get("approved_at"), + "next_steps": "Request has been denied. Contact approver for details." + }) + else: # timeout + result.update({ + "timeout_reason": f"No response received within {approval_request['timeout_minutes']} minutes", + "next_steps": "Request will be escalated to senior management" + }) + + logging.info(f"Approval processing completed: {result}") + return result + +@app.activity_trigger(input_name="timeoutConfig") +def wait_for_timeout(timeoutConfig: dict) -> dict: + """Activity function that handles waiting for timeout duration.""" + import time + + timeout_minutes = timeoutConfig.get("timeout_minutes", 10) + # For demo purposes, cap the timeout at 5 minutes and convert to seconds + timeout_seconds = min(timeout_minutes * 60, 300) # Max 5 minutes + + logging.info(f"Starting timeout wait for {timeout_seconds} seconds") + + start_time = time.time() + time.sleep(timeout_seconds) + actual_wait = time.time() - start_time + + result = { + "timeout_occurred": True, + "requested_timeout_seconds": timeout_seconds, + "actual_wait_seconds": round(actual_wait, 2), + "completed_at": datetime.utcnow().isoformat() + } + + logging.info(f"Timeout completed: {result}") + return result + +@app.route(route="approve/{approvalId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def submit_approval(req: func.HttpRequest, client): + """HTTP endpoint for submitting approval decisions.""" + approval_id = req.route_params.get('approvalId') + + try: + # Get approval decision from request body + req_body = req.get_json() + approved = req_body.get("approved", False) if req_body else False + approver = req_body.get("approver", "manager@company.com") if req_body else "manager@company.com" + comments = req_body.get("comments", "") if req_body else "" + + approval_response = { + "approved": approved, + "approver": approver, + "comments": comments, + "approved_at": datetime.utcnow().isoformat() + } + + logging.info(f"Submitting approval decision: {approval_response}") + + # For this demo, we'll use the approval_id as the instance_id + # In a real implementation, you'd maintain a proper mapping + instance_id = approval_id + + # Raise the external event to the orchestration + await client.raise_event(instance_id, "approval_response", approval_response) + + return func.HttpResponse( + json.dumps({ + "message": f"Approval decision submitted for {approval_id}", + "approval_response": approval_response, + "instance_id": instance_id, + "event_raised": True + }), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error submitting approval: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to submit approval: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/host.json b/samples/durable-functions/python/human-interaction/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/human-interaction/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/requirements.txt b/samples/durable-functions/python/human-interaction/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/README.md b/samples/durable-functions/python/monitoring/README.md new file mode 100644 index 0000000..d095067 --- /dev/null +++ b/samples/durable-functions/python/monitoring/README.md @@ -0,0 +1,344 @@ +# Monitoring Pattern + +## Description of the Sample + +This sample demonstrates the monitoring pattern with Azure Durable Functions in Python. The monitoring pattern is used for periodically checking the status of a long-running operation until it completes or times out. + +In this sample: +1. The orchestrator starts monitoring a job with a specified ID +2. It periodically calls the `check_job_status` activity at defined intervals +3. The current job status is exposed via custom status, making it available to clients +4. Monitoring continues until either: + - The job completes successfully + - The specified timeout period is reached + +This pattern is useful for: +- Polling external services or APIs that don't support callbacks +- Checking the status of long-running operations +- Implementing timeout mechanisms for operations with unpredictable durations +- Maintaining state about progress of a workflow + +## Sample Architecture + +``` +HTTP Start → Monitoring Orchestrator + ├── Check Job Status (Activity) + ├── Wait (Timer) + ├── Check Job Status (Activity) + ├── Wait (Timer) + └── ... (repeat until completion or timeout) +``` + +## Prerequisites + +1. **Azure Storage Emulator** (Azurite) or **Azure Storage Account** +2. **Azure Functions Core Tools** v4.x +3. **Python** 3.8 or higher + +## Setup Instructions + +### 1. Install Dependencies + +```bash +# Navigate to the monitoring sample directory +cd samples/durable-functions/python/monitoring + +# Create and activate virtual environment (recommended) +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install required packages +pip install -r requirements.txt +``` + +### 2. Start Storage Emulator + +**Option A: Using Azurite (Recommended)** +```bash +# Install Azurite globally +npm install -g azurite + +# Start Azurite +azurite --silent --location ./azurite --debug ./azurite/debug.log +``` + +**Option B: Using Azure Storage** +Update `local.settings.json` with your storage connection string: +```json +{ + "Values": { + "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net" + } +} +``` + +### 3. Start the Function App + +```bash +# Start the Azure Functions runtime +func start +``` + +The function app will start on `http://localhost:7071` + +## Usage Examples + +### 1. Start Job Monitoring (Default Configuration) + +**Basic Job Monitoring:** +```bash +curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" +``` + +**Expected Response:** +```json +{ + "id": "abc123def456", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456?taskHub=default", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/raiseEvent/{eventName}?taskHub=default", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/terminate?reason={text}&taskHub=default", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456?taskHub=default" +} +``` + +### 2. Start Job Monitoring with Custom Parameters + +**With Custom Job Configuration:** +```bash +curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" \ + -d '{ + "job_id": "my-custom-job-123", + "polling_interval_seconds": 10, + "timeout_seconds": 60 + }' +``` + +### 3. Check Orchestration Status + +```bash +curl -X GET "http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}?taskHub=default" +``` + +**Sample Status Response (In Progress):** +```json +{ + "name": "monitoring_job_orchestrator", + "instanceId": "abc123def456", + "runtimeStatus": "Running", + "input": { + "job_id": "job-uuid-12345", + "polling_interval_seconds": 5, + "timeout_seconds": 30 + }, + "customStatus": { + "job_id": "job-uuid-12345", + "status": "Running", + "check_count": 2, + "last_check_time": "2025-09-19T18:05:15.123Z" + }, + "output": null, + "createdTime": "2025-09-19T18:05:00Z", + "lastUpdatedTime": "2025-09-19T18:05:15Z" +} +``` + +### 4. Get Job Status Directly + +```bash +curl -X GET "http://localhost:7071/api/job_status/{jobId}" +``` + +**Sample Job Status Response:** +```json +{ + "job_id": "job-uuid-12345", + "status": "Running", + "progress_percent": 75, + "estimated_completion": "2025-09-19T18:15:00Z", + "last_updated": "2025-09-19T18:05:30.456Z", + "details": "Processing batch 3 of 4" +} +``` + +## How the Pattern Works + +### 1. Job Status Polling +- The orchestrator periodically calls the `check_job_status` activity +- Each check simulates querying an external service or API +- Job status progresses from "Unknown" → "Running" → "Completed" + +### 2. Activity-based Delays +- Uses `wait_for_interval` activity to wait between status checks +- Configurable polling interval (default: 5 seconds) +- Note: Uses activity function instead of `create_timer()` to avoid timer configuration issues in Python SDK + +### 3. Custom Status Updates +- Current job status is exposed via `set_custom_status()` +- Clients can query orchestration status to see job progress +- Real-time visibility into monitoring state + +### 4. Timeout Handling +- Monitoring stops if timeout period is reached +- Job status is set to "Timeout" if not completed in time +- Prevents infinite monitoring loops + +## Sample Outputs + +### Completed Job Monitoring Result + +```json +{ + "job_id": "job-uuid-12345", + "final_status": "Completed", + "checks_performed": 4, + "monitoring_duration_seconds": 15.6 +} +``` + +### Timed Out Job Monitoring Result + +```json +{ + "job_id": "job-uuid-67890", + "final_status": "Timeout", + "checks_performed": 6, + "monitoring_duration_seconds": 30.0 +} +``` + +### Custom Status During Monitoring + +```json +{ + "job_id": "job-uuid-12345", + "status": "Running", + "check_count": 3, + "last_check_time": "2025-09-19T18:05:15.123Z" +} +``` + +## Configuration Options + +### Workflow Configuration +- **workflow_type**: Type of workflow being monitored (string) +- **batch_size**: Total number of items to process (integer, default: 100) +- **failure_rate**: Simulated failure rate for demo (float, 0.0-1.0, default: 0.1) +- **enable_monitoring**: Enable/disable monitoring features (boolean, default: true) + +### Alert Thresholds +- **Success Rate Alert**: Triggered when success rate < 90% +- **Performance Alert**: Based on processing time deviations +- **Error Rate Alert**: When error rate exceeds configured threshold + +## Monitoring Best Practices + +### 1. **Use Custom Status Effectively** +```python +# Update custom status with meaningful progress information +context.set_custom_status({ + "current_phase": "processing", + "progress": {"completed": 50, "total": 100}, + "metrics": {"success_rate": 95.5} +}) +``` + +### 2. **Implement Structured Logging** +```python +logging.info(f"Batch {batch_num} completed", extra={ + "workflow_id": workflow_id, + "batch_number": batch_num, + "items_processed": processed_count, + "processing_time": processing_time +}) +``` + +### 3. **Track Key Metrics** +- Processing throughput (items/second) +- Success/failure rates +- Resource utilization +- Duration and timing metrics + +### 4. **Configure Appropriate Alerts** +- Set meaningful thresholds +- Include actionable information +- Route to appropriate teams +- Provide context for quick resolution + +## Integration Examples + +### Application Insights Integration +```python +# Add Application Insights logging +import logging +from opencensus.ext.azure.log_exporter import AzureLogHandler + +# Configure Application Insights +logging.getLogger().addHandler(AzureLogHandler( + connection_string="InstrumentationKey=your-key" +)) +``` + +### Event Grid Integration +Configure in `host.json` to publish orchestration events: +```json +{ + "extensions": { + "durableTask": { + "notifications": { + "eventGrid": { + "topicEndpoint": "https://your-topic.eventgrid.azure.net/", + "keySettingName": "EventGridKey" + } + } + } + } +} +``` + +## Troubleshooting + +### Common Issues + +1. **Timer Configuration Errors** + - Error: "replay schema version >= V3 is being used, but timer properties are not defined" + - Solution: This sample uses activity functions (`wait_for_interval`) instead of `create_timer()` to avoid known timer issues in the Python SDK + - This is a temporary workaround until the SDK timer configuration is fixed + +2. **Storage Connection Issues** + - Ensure Durable Task Scheduler is running on http://localhost:8080 + - Check `local.settings.json` has correct `DURABLE_TASK_SCHEDULER_CONNECTION_STRING` + - Verify `TASKHUB_NAME` is set to "default" + +3. **Job Status Not Updating** + - Verify `set_custom_status()` calls in orchestrator + - Check that `check_job_status` activity is being called successfully + - Monitor logs for activity execution + +### Debug Logs + +Enable detailed logging in `host.json`: +```json +{ + "logging": { + "logLevel": { + "DurableTask.Core": "Information", + "DurableTask.AzureStorage": "Information" + } + } +} +``` + +## Related Samples + +- **[Function Chaining](../function-chaining/)**: Basic orchestration pattern +- **[Fan-out/Fan-in](../fan-out-fan-in/)**: Parallel processing with aggregation +- **[Human Interaction](../human-interaction/)**: External event handling +- **[Eternal Orchestrations](../eternal-orchestrations/)**: Long-running workflows + +## Additional Resources + +- [Durable Functions Monitoring Documentation](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-monitor) +- [Azure Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/functions-reference-python) +- [Application Insights for Azure Functions](https://docs.microsoft.com/azure/azure-functions/functions-monitoring) \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/function_app.py b/samples/durable-functions/python/monitoring/function_app.py new file mode 100644 index 0000000..6bf0e88 --- /dev/null +++ b/samples/durable-functions/python/monitoring/function_app.py @@ -0,0 +1,180 @@ +import logging +import json +import time +import uuid +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="start_monitoring_job", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_monitoring_job(req: func.HttpRequest, client): + """HTTP trigger that starts a job monitoring orchestration.""" + try: + # Get input from request body or use default + try: + req_body = req.get_json() if req.get_body() else None + except ValueError: + req_body = None + + # Generate a unique job ID or use provided one + job_id = req_body.get("job_id", f"job-{uuid.uuid4()}") if req_body else f"job-{uuid.uuid4()}" + + job_data = { + "job_id": job_id, + "polling_interval_seconds": req_body.get("polling_interval_seconds", 5) if req_body else 5, + "timeout_seconds": req_body.get("timeout_seconds", 30) if req_body else 30 + } + + logging.info(f"Starting job monitoring for: {job_id}") + logging.info(f"Polling interval: {job_data['polling_interval_seconds']} seconds") + logging.info(f"Timeout: {job_data['timeout_seconds']} seconds") + + # Start the orchestration + instance_id = await client.start_new("monitoring_job_orchestrator", client_input=job_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def monitoring_job_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the monitoring pattern. + + This orchestrator periodically checks the status of a job until it + completes or reaches a maximum number of checks. + """ + job_data = context.get_input() + job_id = job_data.get("job_id") + polling_interval = job_data.get("polling_interval_seconds", 5) + timeout = job_data.get("timeout_seconds", 30) + + logging.info(f"Starting monitoring orchestration for job {job_id}") + logging.info(f"Polling interval: {polling_interval} seconds") + logging.info(f"Timeout: {timeout} seconds") + + # Record the start time + start_time = context.current_utc_datetime + expiration_time = start_time + timedelta(seconds=timeout) + + # Initialize monitoring state + job_status = { + "job_id": job_id, + "status": "Unknown", + "check_count": 0 + } + + # Loop until the job completes or times out + while True: + # Check current job status + check_input = {"job_id": job_id, "check_count": job_status.get("check_count", 0)} + job_status = yield context.call_activity("check_job_status", check_input) + + # Make the job status available to clients via custom status + context.set_custom_status(job_status) + + if job_status["status"] == "Completed": + logging.info(f"Job {job_id} completed after {job_status['check_count']} checks") + break + + # Check if we've hit the timeout + current_time = context.current_utc_datetime + if current_time >= expiration_time: + logging.info(f"Monitoring for job {job_id} timed out after {timeout} seconds") + job_status["status"] = "Timeout" + break + + # Wait using an activity function instead of timer (avoids timer configuration issues) + logging.info(f"Waiting {polling_interval} seconds before next check of job {job_id}") + yield context.call_activity("wait_for_interval", polling_interval) + + # Return the final status + return { + "job_id": job_id, + "final_status": job_status["status"], + "checks_performed": job_status["check_count"], + "monitoring_duration_seconds": (context.current_utc_datetime - start_time).total_seconds() + } + +@app.activity_trigger(input_name="jobData") +def check_job_status(jobData: dict) -> dict: + """ + Activity that simulates checking the status of a long-running job. + In a real application, this would call an external API or service. + """ + # Extract job_id from the job_data dictionary + job_id = jobData.get("job_id", "unknown") + check_count = jobData.get("check_count", 0) + + logging.info(f"Checking status for job: {job_id} (check #{check_count+1})") + + # Simulate job status progression + if check_count >= 3: + status = "Completed" + else: + status = "Running" + + return { + "job_id": job_id, + "status": status, + "check_count": check_count + 1, + "last_check_time": datetime.utcnow().isoformat() + } + +@app.activity_trigger(input_name="intervalSeconds") +def wait_for_interval(intervalSeconds: int) -> str: + """ + Activity that simulates waiting for a specified interval. + This is used instead of create_timer() to avoid timer configuration issues in Python SDK. + """ + logging.info(f"Waiting for {intervalSeconds} seconds...") + time.sleep(intervalSeconds) + return f"Waited for {intervalSeconds} seconds" + + + +@app.route(route="job_status/{jobId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_job_status(req: func.HttpRequest, client): + """Get the current status of a job being monitored.""" + job_id = req.route_params.get('jobId') + + try: + # In a real implementation, you would: + # 1. Find the orchestration instance for this job_id + # 2. Get its current status + # For demo purposes, we'll return a simulated status + + # Simulate job status lookup + status_info = { + "job_id": job_id, + "status": "Running", + "progress_percent": 75, + "estimated_completion": "2025-09-19T18:15:00Z", + "last_updated": datetime.utcnow().isoformat(), + "details": "Processing batch 3 of 4" + } + + return func.HttpResponse( + json.dumps(status_info), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting job status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get job status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/host.json b/samples/durable-functions/python/monitoring/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/monitoring/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/requirements.txt b/samples/durable-functions/python/monitoring/requirements.txt new file mode 100644 index 0000000..11c94ee --- /dev/null +++ b/samples/durable-functions/python/monitoring/requirements.txt @@ -0,0 +1,2 @@ +azure-functions>=1.18.0,<2.0.0 +azure-functions-durable>=1.2.9,<2.0.0 \ No newline at end of file diff --git a/samples/durable-functions/python/pdf-summarizer/host.json b/samples/durable-functions/python/pdf-summarizer/host.json index b7dd325..1d8da0b 100644 --- a/samples/durable-functions/python/pdf-summarizer/host.json +++ b/samples/durable-functions/python/pdf-summarizer/host.json @@ -14,12 +14,12 @@ "type": "azureManaged", "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" }, - "hubName": "TASKHUB_NAME" + "hubName": "%TASKHUB_NAME%" } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", - "version": "[4.29.0, 5.0.0)" + "version": "[4.34.0, 5.0.0)" } } From 5d514f996055b7734deca671b572bf7f4249e60d Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Fri, 19 Sep 2025 13:19:12 -0700 Subject: [PATCH 2/2] Update READMEs --- .gitignore | 15 +- .../python/async-http-api/README.md | 246 ++++++--- .../python/eternal-orchestrations/README.md | 295 +++++----- .../python/fan-out-fan-in/README.md | 227 +++++--- .../python/function-chaining/README.md | 202 ++++--- .../python/human-interaction/README.md | 296 +++++----- .../python/monitoring/README.md | 508 +++++++----------- 7 files changed, 996 insertions(+), 793 deletions(-) diff --git a/.gitignore b/.gitignore index 1c3d923..83db52a 100644 --- a/.gitignore +++ b/.gitignore @@ -404,4 +404,17 @@ FodyWeavers.xsd samples/durable-task-sdks/java/**/build/ samples/durable-task-sdks/java/**/*.class samples/durable-task-sdks/java/**/.gradle/ -samples/durable-task-sdks/java/**/out/ \ No newline at end of file +samples/durable-task-sdks/java/**/out/ + +# Azure Functions Azurite local storage (generated files) +**/__azurite_db_*.json +**/AzuriteConfig + +# Python virtual environments and cache +**/__pycache__/ +**/venv/ +**/.venv/ + +# Local settings files that might contain secrets (but preserve template files in samples) +# Note: Sample local.settings.json files are kept as templates with safe defaults +local.settings.json \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/README.md b/samples/durable-functions/python/async-http-api/README.md index ded9990..eaf53d8 100644 --- a/samples/durable-functions/python/async-http-api/README.md +++ b/samples/durable-functions/python/async-http-api/README.md @@ -1,52 +1,125 @@ -# Async HTTP API Pattern - Durable Functions with Durable Task Scheduler +# Async HTTP API Pattern -This sample demonstrates the **Async HTTP API** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to handle long-running operations with HTTP polling for status updates. +## Description of the Sample -## Pattern Overview +This sample demonstrates the async HTTP API pattern with Azure Durable Functions in Python. This pattern is essential for handling long-running operations where clients need to periodically check the status of their requests rather than waiting for a synchronous response. -The Async HTTP API pattern provides a way to handle long-running operations: -1. **HTTP Start**: Client submits a request and receives URLs for status checking -2. **Long-Running Process**: `process_long_running_task` simulates work that takes time -3. **Status Polling**: Clients can check progress via HTTP endpoints -4. **Completion**: Eventually returns the final result +In this sample: +1. **HTTP Request**: Client submits a request to start a long-running operation +2. **Immediate Response**: The HTTP trigger returns immediately with status URLs and a 202 Accepted status +3. **Background Processing**: The orchestrator manages a long-running `process_long_running_task` activity +4. **Status Polling**: Clients use the provided URLs to check the operation status +5. **Completion**: Eventually, the operation completes and returns the final result -## Architecture - -- **HTTP Trigger**: `async_operation` - Starts the long-running orchestration -- **Orchestrator**: `async_http_orchestrator` - Manages the long-running process -- **Activity**: `process_long_running_task` - Simulates time-consuming work -- **Status Endpoint**: `status/{instanceId}` - Provides orchestration status -- **Backend**: Durable Task Scheduler for state management +This pattern is useful for: +- Operations that take several seconds or minutes to complete +- Batch processing jobs where clients need to track progress +- Integration scenarios where you need to prevent HTTP timeouts +- APIs that need to provide immediate responses for long-running tasks ## Prerequisites -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) -- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: -## Setup + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` -1. **Install dependencies**: + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): ```bash - pip install -r requirements.txt + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate ``` -2. **Start the Durable Task Scheduler Emulator**: +2. Install the required packages: ```bash - docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + pip install -r requirements.txt ``` -3. **Configure connection** (already set in `local.settings.json`): - The sample is configured to use the local emulator by default. - -## Running the Sample - -1. **Start the Azure Functions host**: +3. Start the Azure Functions runtime: ```bash func start ``` + + You should see output indicating the functions have loaded successfully. -2. **Start a long-running operation**: +4. Start a long-running operation by sending a POST request: ```bash # Start operation with default parameters curl -X POST http://localhost:7071/api/async_http_api \ @@ -59,79 +132,74 @@ The Async HTTP API pattern provides a way to handle long-running operations: -d '{"operation_type": "data_processing", "duration": 45, "data": {"input": "sample"}}' ``` -3. **Poll for status** (use URLs from the initial response): +5. Poll for status using the URLs from the initial response: ```bash - # Check status + # Check status using the statusQueryGetUri from the response curl http://localhost:7071/api/status/{instanceId} - - # The response includes standard orchestration management URLs: - # - statusQueryGetUri: Check current status - # - sendEventPostUri: Send external events - # - terminatePostUri: Terminate the orchestration ``` -## Configuration Files - -### host.json -Configures the Durable Functions extension to use Durable Task Scheduler: -- Sets the hub name to "default" -- Configures the storage provider as "azureManaged" -- References the connection string name - -### local.settings.json -Contains local development settings: -- Durable Task Scheduler connection string for local emulator -- Function worker runtime set to "python" - -## Expected Behavior - -1. **Initial Response**: Returns HTTP 202 with management URLs -2. **Status Polling**: Shows "Running" status while processing -3. **Progress Updates**: Activity logs progress during execution -4. **Completion**: Eventually returns "Completed" with final result - -Example status progression: -```json -// Initially -{"runtimeStatus": "Running", "output": null} +## Understanding the Output -// Finally -{"runtimeStatus": "Completed", "output": {"task": "ProcessData", "result": "Success", "duration": 30}} -``` +When you run the sample, you'll see the following behavior: -## How It Works - -1. **Async Start**: HTTP trigger starts orchestration and returns immediately with status URLs -2. **Background Processing**: Long-running activity executes while client can poll for updates -3. **Status Management**: Durable Functions manages orchestration state across the operation -4. **Client Experience**: Clients get immediate response and can check progress periodically - -## Monitoring - -- **Function Logs**: Check the Azure Functions host output for processing details -- **Dashboard**: Navigate to http://localhost:8082 to view long-running orchestrations -- **Status Endpoints**: Use the returned URLs to monitor progress programmatically - -## Using with Azure Durable Task Scheduler +1. **Initial HTTP Response**: The operation starts immediately and returns HTTP 202 (Accepted) with management URLs: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` -To use with an Azure-hosted Durable Task Scheduler instead of the emulator: +2. **Status Polling**: While the long-running task executes, status checks will show: + ```json + { + "runtimeStatus": "Running", + "input": {"operation_type": "data_processing", "duration": 30}, + "output": null, + "createdTime": "2023-12-01T10:00:00Z", + "lastUpdatedTime": "2023-12-01T10:00:15Z" + } + ``` -1. Update `local.settings.json`: +3. **Completion**: Once the operation finishes (after the specified duration), the status will show: ```json { - "Values": { - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + "runtimeStatus": "Completed", + "input": {"operation_type": "data_processing", "duration": 30}, + "output": { + "task": "data_processing", + "result": "Success", + "duration": 30, + "timestamp": "2023-12-01T10:00:45Z" } } ``` -2. Ensure you're authenticated with Azure CLI: - ```bash - az login - ``` +4. **Key Benefits**: This pattern allows clients to: + - Get immediate confirmation that the request was accepted + - Avoid HTTP timeouts on long-running operations + - Check progress at their own pace + - Handle other tasks while waiting for completion + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Real-time status updates as the long-running task progresses + - Timeline showing when the operation started and how long it's been running + - Input parameters and current execution state + - The async HTTP API pattern in action with clear start/processing/completion phases + +The dashboard is particularly valuable for this pattern because it demonstrates how the orchestration continues running in the background while clients can poll for updates through the HTTP API. ## Learn More -- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Async HTTP APIs Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-http-features) \ No newline at end of file +- [Async HTTP APIs Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-http-features) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/README.md b/samples/durable-functions/python/eternal-orchestrations/README.md index b054a2f..110a1de 100644 --- a/samples/durable-functions/python/eternal-orchestrations/README.md +++ b/samples/durable-functions/python/eternal-orchestrations/README.md @@ -1,55 +1,126 @@ -# Eternal Orchestrations Pattern - Durable Functions with Durable Task Scheduler +# Eternal Orchestrations Pattern -This sample demonstrates the **Eternal Orchestrations** pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to create long-running, recurring workflows that continue indefinitely until explicitly terminated. +## Description of the Sample -## Pattern Overview +This sample demonstrates the eternal orchestrations pattern with Azure Durable Functions in Python. This pattern is designed for creating long-running, recurring workflows that can continue indefinitely until explicitly stopped, making it ideal for periodic tasks, monitoring jobs, and scheduled processes. -The Eternal Orchestrations pattern creates perpetual workflows: -1. **Initialization**: Sets up the recurring workflow with initial parameters -2. **Work Execution**: `perform_periodic_work` - Executes the recurring task -3. **Timer Wait**: Waits for the next execution interval using durable timers -4. **Self-Continuation**: Orchestration continues itself in a new iteration -5. **Graceful Termination**: Can be stopped via external events or conditions +In this sample: +1. **Initialization**: The orchestration starts with specified parameters (task type, interval, max iterations) +2. **Work Execution**: The `perform_periodic_work` activity executes the recurring task +3. **Timer Wait**: Uses a durable timer to wait for the next execution interval without consuming resources +4. **Self-Continuation**: The orchestration continues itself with `continue_as_new()` to reset the execution history and prevent memory buildup +5. **Graceful Termination**: Can be stopped via external events or after reaching maximum iterations -## Architecture - -- **HTTP Trigger**: `start_eternal_process` - Starts the eternal orchestration -- **Orchestrator**: `eternal_orchestrator` - Manages the infinite loop with timers -- **Activity**: `perform_periodic_work` - Executes the recurring work -- **Control Endpoints**: - - `stop_eternal/{instanceId}` - Gracefully stops the orchestration - - `status/{instanceId}` - Checks current status and iteration count -- **Backend**: Durable Task Scheduler for long-term state management +This pattern is useful for: +- Periodic data synchronization or ETL jobs +- Health monitoring and system status checks +- Regular cleanup or maintenance tasks +- Scheduled reporting or notifications +- Long-running processes that need to execute at regular intervals ## Prerequisites -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) -- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample -## Setup +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: -1. **Install dependencies**: +1. First, activate your Python virtual environment (if you're using one): ```bash - pip install -r requirements.txt + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate ``` -2. **Start the Durable Task Scheduler Emulator**: +2. Install the required packages: ```bash - docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + pip install -r requirements.txt ``` -3. **Configure connection** (already set in `local.settings.json`): - The sample is configured to use the local emulator by default. - -## Running the Sample - -1. **Start the Azure Functions host**: +3. Start the Azure Functions runtime: ```bash func start ``` + + You should see output indicating the functions have loaded successfully. -2. **Start an eternal orchestration**: +4. Start an eternal orchestration by sending a POST request: ```bash # Start with default parameters (2 minute intervals, 5 iterations max) curl -X POST http://localhost:7071/api/eternal_orchestration \ @@ -62,114 +133,86 @@ The Eternal Orchestrations pattern creates perpetual workflows: -d '{"task_type": "data_sync", "interval_minutes": 1, "max_iterations": 10, "target_url": "https://httpbin.org/delay/1"}' ``` -3. **Monitor the running orchestration**: +5. Monitor the running orchestration: ```bash - # Check current status and iteration count curl http://localhost:7071/api/status/{instanceId} ``` -4. **Stop the eternal orchestration**: +6. Stop the eternal orchestration when needed: ```bash - # Send stop signal curl -X POST http://localhost:7071/api/stop/{instanceId} ``` -## Configuration Files - -### host.json -Configures the Durable Functions extension to use Durable Task Scheduler: -- Sets the hub name to "default" -- Configures the storage provider as "azureManaged" -- References the connection string name - -### local.settings.json -Contains local development settings: -- Durable Task Scheduler connection string for local emulator -- Function worker runtime set to "python" - -## Expected Behavior - -### Running Status -```json -{ - "runtimeStatus": "Running", - "input": {"task_type": "health_check", "interval_minutes": 2, "max_iterations": 5}, - "customStatus": { - "current_iteration": 3, - "max_iterations": 5, - "last_run": "2025-09-19T16:45:30Z", - "task_type": "health_check" - } -} -``` - -### Completed Status (after reaching max iterations) -```json -{ - "runtimeStatus": "Completed", - "output": { - "status": "completed", - "total_iterations": 5, - "final_result": { - "iteration": 5, - "task_type": "health_check", - "success": true, - "executed_at": "2025-09-19T16:45:30Z" - } - } -} -``` - -## How It Works - -1. **Continuous Loop**: Orchestration runs in an infinite loop until stopped -2. **Durable Timers**: Uses `create_timer()` to wait between iterations without consuming resources -3. **Self-Continuation**: Each iteration calls `continue_as_new()` to reset and continue -4. **External Events**: Listens for stop signals during timer waits -5. **State Preservation**: Maintains iteration counters and status across continuations - -## Key Features - -- **Resource Efficient**: Uses durable timers instead of blocking waits -- **Long-Term Stability**: Designed to run for days, weeks, or months -- **Graceful Shutdown**: Responds to external stop events -- **Progress Tracking**: Maintains iteration counts and execution history -- **Self-Healing**: Automatically recovers from host restarts - -## Monitoring - -- **Function Logs**: Check the Azure Functions host output for iteration details -- **Dashboard**: Navigate to http://localhost:8082 to view long-running orchestrations -- **Custom Status**: Monitor iteration progress and timing via status endpoints -- **Timers**: See active timers and their scheduled execution times - -## Best Practices - -- **Set Reasonable Intervals**: Avoid very short intervals that could overwhelm the system -- **Implement Stop Conditions**: Always provide a way to gracefully terminate -- **Monitor Resource Usage**: Watch for memory leaks or resource accumulation -- **Handle Failures**: Include error handling and retry logic in activities - -## Using with Azure Durable Task Scheduler - -To use with an Azure-hosted Durable Task Scheduler instead of the emulator: - -1. Update `local.settings.json`: +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Running Status**: While the eternal orchestration is active, status checks will show: ```json { - "Values": { - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + "runtimeStatus": "Running", + "input": {"task_type": "health_check", "interval_minutes": 2, "max_iterations": 5}, + "customStatus": { + "current_iteration": 3, + "max_iterations": 5, + "last_run": "2025-09-19T16:45:30Z", + "task_type": "health_check" } } ``` -2. Ensure you're authenticated with Azure CLI: - ```bash - az login +3. **Completed Status**: When reaching maximum iterations or being stopped: + ```json + { + "runtimeStatus": "Completed", + "output": { + "status": "completed", + "total_iterations": 5, + "final_result": { + "iteration": 5, + "task_type": "health_check", + "success": true, + "executed_at": "2025-09-19T16:45:30Z" + } + } + } ``` +4. **Key Characteristics**: + - **Resource Efficient**: Uses durable timers instead of blocking waits between iterations + - **Self-Continuation**: Calls `continue_as_new()` to reset execution history and prevent memory buildup + - **Graceful Shutdown**: Responds to external stop events or completion conditions + - **Long-Term Stability**: Designed to run for extended periods (days, weeks, or months) + - **Progress Tracking**: Maintains iteration counts and execution history via custom status + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - The eternal orchestration running continuously with regular activity executions + - Durable timers showing scheduled next execution times between iterations + - Custom status updates tracking current iteration count and progress + - Timeline showing the recurring pattern of work activity followed by timer wait + - How `continue_as_new()` resets the execution history while maintaining the iteration loop + +The dashboard is particularly valuable for this pattern because it demonstrates how eternal orchestrations can run indefinitely while managing resources efficiently through self-continuation and durable timers. + ## Learn More -- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Eternal Orchestrations Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-eternal-orchestrations) \ No newline at end of file +- [Eternal Orchestrations Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-eternal-orchestrations) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/README.md b/samples/durable-functions/python/fan-out-fan-in/README.md index 09469ce..3205ae9 100644 --- a/samples/durable-functions/python/fan-out-fan-in/README.md +++ b/samples/durable-functions/python/fan-out-fan-in/README.md @@ -1,52 +1,124 @@ -# Fan-Out/Fan-In Pattern - Durable Functions with Durable Task Scheduler +# Fan-Out/Fan-In Pattern -This sample demonstrates the **Fan-Out/Fan-In** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. In this pattern, multiple activities are executed in parallel (fan-out), and their results are aggregated when all complete (fan-in). +## Description of the Sample -## Pattern Overview +This sample demonstrates the fan-out/fan-in pattern with Azure Durable Functions in Python. This pattern is useful for executing multiple activities in parallel and then aggregating their results when all activities complete. -The Fan-Out/Fan-In pattern executes multiple activities in parallel and aggregates results: -1. **Fan-Out**: `process_work_item` activities are started in parallel for each work item -2. **Fan-In**: Wait for all parallel activities to complete -3. `aggregate_results` - Combines all results into a summary +In this sample: +1. **Fan-Out**: The orchestrator starts multiple `process_work_item` activities in parallel, one for each work item +2. **Parallel Processing**: Each activity processes its work item independently and concurrently +3. **Fan-In**: The orchestrator waits for all activities to complete and collects their results +4. **Aggregation**: An `aggregate_results` activity combines all results into a summary report -## Architecture - -- **HTTP Trigger**: `fan_out_fan_in` - Starts the orchestration -- **Orchestrator**: `fan_out_fan_in_orchestrator` - Manages parallel execution and aggregation -- **Activities**: - - `process_work_item` - Processes individual work items (executed in parallel) - - `aggregate_results` - Combines results from all parallel activities -- **Backend**: Durable Task Scheduler for state management +This pattern is useful for: +- Processing large datasets by breaking them into chunks +- Performing parallel calculations or transformations +- Distributing workload across multiple workers for better performance +- Scenarios where independent tasks can be executed simultaneously ## Prerequisites -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) -- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample -## Setup +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: -1. **Install dependencies**: +1. First, activate your Python virtual environment (if you're using one): ```bash - pip install -r requirements.txt + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate ``` -2. **Start the Durable Task Scheduler Emulator**: +2. Install the required packages: ```bash - docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + pip install -r requirements.txt ``` -3. **Configure connection** (already set in `local.settings.json`): - The sample is configured to use the local emulator by default. - -## Running the Sample - -1. **Start the Azure Functions host**: +3. Start the Azure Functions runtime: ```bash func start ``` + + You should see output indicating the functions have loaded successfully. -2. **Test the orchestration**: +4. Test the orchestration by sending a POST request: ```bash # Start orchestration with default work items curl -X POST http://localhost:7071/api/fan_out_fan_in \ @@ -59,72 +131,61 @@ The Fan-Out/Fan-In pattern executes multiple activities in parallel and aggregat -d '{"workItems": ["Task1", "Task2", "Task3", "Task4", "Task5", "Task6"]}' ``` -3. **Check orchestration status**: - Use the `statusQueryGetUri` from the response to check status, or: +5. Check orchestration status using the `statusQueryGetUri` from the response: ```bash curl http://localhost:7071/api/status/{instanceId} ``` -## Configuration Files - -### host.json -Configures the Durable Functions extension to use Durable Task Scheduler: -- Sets the hub name to "default" -- Configures the storage provider as "azureManaged" -- References the connection string name - -### local.settings.json -Contains local development settings: -- Durable Task Scheduler connection string for local emulator -- Function worker runtime set to "python" - -## Expected Output - -For input `{"workItems": ["Task1", "Task2", "Task3"]}`, the orchestration will produce: -```json -{ - "total_items_processed": 3, - "total_value": 165, - "average_value": 55.0, - "total_processing_time": 3.47, - "processed_items": ["Processed_Task1", "Processed_Task2", "Processed_Task3"], - "success": true -} -``` - -## How It Works - -1. **Parallel Execution**: Each work item is processed simultaneously using `process_work_item` -2. **Processing Simulation**: Each activity simulates work with random processing times (0.5-2.0 seconds) -3. **Result Generation**: Each activity returns processing metrics and a random value -4. **Aggregation**: Results are combined to show totals, averages, and processing statistics - -## Monitoring +## Understanding the Output -- **Function Logs**: Check the Azure Functions host output for detailed logging of parallel execution -- **Dashboard**: Navigate to http://localhost:8082 to view orchestrations and parallel activities in the emulator dashboard -- **Performance**: Watch how multiple activities execute concurrently and complete at different times +When you run the sample, you'll see the following behavior: -## Using with Azure Durable Task Scheduler +1. **Initial Response**: The HTTP trigger returns a JSON response with management URLs: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` -To use with an Azure-hosted Durable Task Scheduler instead of the emulator: +2. **Parallel Processing**: The orchestrator starts multiple `process_work_item` activities simultaneously. Each activity: + - Processes its work item independently + - Simulates work with random processing times (0.5-2.0 seconds) + - Returns processing metrics and a random value -1. Update `local.settings.json`: +3. **Aggregated Results**: For input `{"workItems": ["Task1", "Task2", "Task3"]}`, the final output will be: ```json { - "Values": { - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" - } + "total_items_processed": 3, + "total_value": 165, + "average_value": 55.0, + "total_processing_time": 3.47, + "processed_items": ["Processed_Task1", "Processed_Task2", "Processed_Task3"], + "success": true } ``` -2. Ensure you're authenticated with Azure CLI: - ```bash - az login - ``` +4. **Performance Benefits**: Because activities run in parallel rather than sequentially, the total processing time is much shorter than the sum of individual processing times. + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Parallel activity execution timeline showing concurrent processing + - Input and output data for each `process_work_item` activity + - Performance metrics demonstrating the fan-out/fan-in pattern + - Visual representation of how activities start simultaneously and complete at different times + +The dashboard is particularly useful for this sample because it clearly shows how the fan-out/fan-in pattern executes multiple activities concurrently, leading to significant performance improvements compared to sequential processing. ## Learn More -- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Fan-Out/Fan-In Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-cloud-backup) \ No newline at end of file +- [Fan-Out/Fan-In Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-cloud-backup) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/README.md b/samples/durable-functions/python/function-chaining/README.md index e872a80..6962d90 100644 --- a/samples/durable-functions/python/function-chaining/README.md +++ b/samples/durable-functions/python/function-chaining/README.md @@ -1,50 +1,123 @@ -# Function Chaining Pattern - Durable Functions with Durable Task Scheduler +# Function Chaining Pattern -This sample demonstrates the **Function Chaining** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. In this pattern, activities are executed sequentially, with the output of each activity passed as input to the next activity. +## Description of the Sample -## Pattern Overview +This sample demonstrates the function chaining pattern with Azure Durable Functions in Python. Function chaining is a fundamental workflow pattern where activities are executed in a sequence, with the output of one activity passed as the input to the next activity. -The Function Chaining pattern executes a sequence of activities in order: -1. `say_hello` - Takes a name and returns a greeting -2. `process_greeting` - Takes the greeting and adds more text -3. `finalize_response` - Takes the processed greeting and finalizes it +In this sample: +1. The orchestrator calls the `say_hello` activity with a name input +2. The result is passed to the `process_greeting` activity +3. That result is passed to the `finalize_response` activity +4. The final result is returned to the client -## Architecture - -- **HTTP Trigger**: `function_chaining` - Starts the orchestration -- **Orchestrator**: `function_chaining_orchestrator` - Manages the sequence of activities -- **Activities**: Three sequential activities that transform the input -- **Backend**: Durable Task Scheduler for state management +This pattern is useful for: +- Creating sequential workflows where steps must execute in order +- Passing data between steps with data transformations at each step +- Building pipelines where each activity adds value to the result ## Prerequisites -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) -- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) -## Setup + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` -1. **Install dependencies**: +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): ```bash - pip install -r requirements.txt + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate ``` -2. **Start the Durable Task Scheduler Emulator**: +2. Install the required packages: ```bash - docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + pip install -r requirements.txt ``` -3. **Configure connection** (already set in `local.settings.json`): - The sample is configured to use the local emulator by default. - -## Running the Sample - -1. **Start the Azure Functions host**: +3. Start the Azure Functions runtime: ```bash func start ``` + + You should see output indicating the functions have loaded successfully. -2. **Test the orchestration**: +4. Test the orchestration by sending a POST request: ```bash # Start orchestration with default input curl -X POST http://localhost:7071/api/function_chaining \ @@ -57,57 +130,52 @@ The Function Chaining pattern executes a sequence of activities in order: -d '{"name": "Alice"}' ``` -3. **Check orchestration status**: - Use the `statusQueryGetUri` from the response to check status, or: +5. Check orchestration status using the `statusQueryGetUri` from the response: ```bash curl http://localhost:7071/api/status/{instanceId} ``` -## Configuration Files - -### host.json -Configures the Durable Functions extension to use Durable Task Scheduler: -- Sets the hub name to "default" -- Configures the storage provider as "AzureManaged" -- References the connection string name - -### local.settings.json -Contains local development settings: -- Durable Task Scheduler connection string for local emulator -- Function worker runtime set to "python" - -## Expected Output - -For input `{"name": "Alice"}`, the orchestration will produce: -``` -"Hello Alice! How are you today? I hope you're doing well!" -``` +## Understanding the Output -## Monitoring +When you run the sample, you'll see the following behavior: -- **Function Logs**: Check the Azure Functions host output for detailed logging -- **Dashboard**: Navigate to http://localhost:8082 to view orchestrations in the emulator dashboard - -## Using with Azure Durable Task Scheduler - -To use with an Azure-hosted Durable Task Scheduler instead of the emulator: - -1. Update `local.settings.json`: +1. **Initial Response**: The HTTP trigger returns a JSON response with management URLs: ```json { - "Values": { - "DurableTaskSchedulerConnection": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" - } + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" } ``` -2. Ensure you're authenticated with Azure CLI: - ```bash - az login +2. **Orchestration Execution**: The function chaining pattern processes the input through three activities: + - `say_hello`: Creates the initial greeting + - `process_greeting`: Adds the "How are you today?" part + - `finalize_response`: Adds the final "I hope you're doing well!" part + +3. **Final Result**: For input `{"name": "Alice"}`, the orchestration will produce: ``` + "Hello Alice! How are you today? I hope you're doing well!" + ``` + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Orchestration timeline and execution steps + - Input and output data for each activity + - Performance metrics and timing information + - Visual representation of the function chaining pattern + +The dashboard provides real-time visibility into the orchestration execution, making it easy to understand how data flows through each step in the chain. ## Learn More -- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Function Chaining Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-sequence) \ No newline at end of file +- [Function Chaining Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-sequence) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/README.md b/samples/durable-functions/python/human-interaction/README.md index f10bd24..d256382 100644 --- a/samples/durable-functions/python/human-interaction/README.md +++ b/samples/durable-functions/python/human-interaction/README.md @@ -1,55 +1,125 @@ -# Human Interaction Pattern - Durable Functions with Durable Task Scheduler +# Human Interaction Pattern -This sample demonstrates the **Human Interaction** orchestration pattern using Durable Functions with the Durable Task Scheduler backend. This pattern shows how to handle workflows that require human approval or input with timeout handling. +## Description of the Sample -## Pattern Overview +This sample demonstrates the human interaction pattern with Azure Durable Functions in Python. This pattern is essential for workflows that require human approval, input, or decision-making with built-in timeout handling to prevent indefinite waiting. -The Human Interaction pattern waits for human input with timeout handling: -1. **Process Start**: `send_approval_request` - Sends a request for human approval -2. **Wait for Input**: Orchestration waits for external event or timeout -3. **Timeout Handling**: If no response within timeout, escalates or takes default action -4. **Completion**: Processes the approval/denial or timeout result +In this sample: +1. **Approval Request**: The orchestrator starts by sending an approval request using the `send_approval_request` activity +2. **External Event Waiting**: The orchestration waits for an external event (approval/denial) or a timeout +3. **Race Condition**: Either the human provides input or the timeout expires - whichever happens first determines the flow +4. **Response Processing**: Approved requests are processed via `process_approval`, while timeouts trigger `escalate_approval` +5. **Final Result**: The orchestration completes with the approval status or escalation details -## Architecture - -- **HTTP Trigger**: `request_approval` - Starts the approval workflow -- **Orchestrator**: `human_interaction_orchestrator` - Manages waiting and timeout logic -- **Activities**: - - `send_approval_request` - Initiates the approval request - - `process_approval` - Handles approved requests - - `escalate_approval` - Handles timeout scenarios -- **Event Endpoint**: `approve/{instanceId}` - Receives approval/denial -- **Backend**: Durable Task Scheduler for state management +This pattern is useful for: +- Approval workflows for expenses, time-off requests, or purchases +- Document review and sign-off processes +- Quality control checkpoints requiring human validation +- Any process where automated systems need human oversight with timeout handling ## Prerequisites -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools](https://docs.microsoft.com/azure/azure-functions/functions-run-local#install-the-azure-functions-core-tools) -- [Durable Task Scheduler Emulator](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) (for local development) +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` -## Setup +1. Create a resource group in a region where the Durable Task Scheduler is available: -1. **Install dependencies**: + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): ```bash - pip install -r requirements.txt + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate ``` -2. **Start the Durable Task Scheduler Emulator**: +2. Install the required packages: ```bash - docker run --name dts-emulator -p 8080:8080 -p 8082:8082 -d mcr.microsoft.com/dts/dts-emulator:latest + pip install -r requirements.txt ``` -3. **Configure connection** (already set in `local.settings.json`): - The sample is configured to use the local emulator by default. - -## Running the Sample - -1. **Start the Azure Functions host**: +3. Start the Azure Functions runtime: ```bash func start ``` + + You should see output indicating the functions have loaded successfully. -2. **Start an approval workflow**: +4. Start an approval workflow by sending a POST request: ```bash # Start approval request with default timeout (10 minutes) curl -X POST http://localhost:7071/api/human_interaction \ @@ -62,116 +132,100 @@ The Human Interaction pattern waits for human input with timeout handling: -d '{"request_type": "expense_approval", "amount": 5000, "timeout_minutes": 15, "requester": "alice@company.com", "description": "Conference attendance expenses"}' ``` -3. **Provide approval/denial** (within the timeout period): +5. Provide approval or denial within the timeout period: ```bash # Approve the request (use approvalId from the orchestration response) curl -X POST http://localhost:7071/api/approve/{approvalId} \ -H "Content-Type: application/json" \ -d '{"approved": true, "comments": "Approved by manager", "approver": "manager@company.com"}' - # Deny the request + # Or deny the request curl -X POST http://localhost:7071/api/approve/{approvalId} \ -H "Content-Type: application/json" \ -d '{"approved": false, "comments": "Insufficient justification", "approver": "manager@company.com"}' ``` -4. **Check status**: +6. Check orchestration status: ```bash curl http://localhost:7071/api/status/{instanceId} ``` -## Configuration Files - -### host.json -Configures the Durable Functions extension to use Durable Task Scheduler: -- Sets the hub name to "default" -- Configures the storage provider as "azureManaged" -- References the connection string name - -### local.settings.json -Contains local development settings: -- Durable Task Scheduler connection string for local emulator -- Function worker runtime set to "python" - -## Expected Behavior - -### Scenario 1: Approved Within Timeout -```json -{ - "requestType": "ExpenseApproval", - "status": "Approved", - "approval": { - "approved": true, - "comments": "Approved by manager", - "timestamp": "2025-09-19T16:45:30Z" - } -} -``` - -### Scenario 2: Denied Within Timeout -```json -{ - "requestType": "ExpenseApproval", - "status": "Denied", - "approval": { - "approved": false, - "comments": "Insufficient justification", - "timestamp": "2025-09-19T16:45:30Z" - } -} -``` - -### Scenario 3: Timeout (No Response) -```json -{ - "requestType": "ExpenseApproval", - "status": "Escalated", - "reason": "No response within timeout period", - "escalation": "Sent to senior management" -} -``` - -## How It Works - -1. **Timer Setup**: Orchestration sets up a durable timer for the timeout period -2. **Event Waiting**: Waits for either external approval event or timer completion -3. **Race Condition**: Whichever completes first (approval or timeout) determines the outcome -4. **State Management**: Durable Task Scheduler maintains state during the waiting period - -## Testing Timeout Scenarios - -To test timeout behavior: -1. Start an approval request with a short timeout (e.g., 10 seconds) -2. Don't send any approval/denial -3. Wait for the timeout to occur -4. Check the final status to see escalation handling - -## Monitoring - -- **Function Logs**: Check the Azure Functions host output for approval workflow details -- **Dashboard**: Navigate to http://localhost:8082 to view waiting orchestrations -- **Timers**: See active timers and their remaining durations in the dashboard - -## Using with Azure Durable Task Scheduler - -To use with an Azure-hosted Durable Task Scheduler instead of the emulator: - -1. Update `local.settings.json`: +## Understanding the Output + +When you run the sample, you'll see the following behavior based on different scenarios: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abcd1234", + "approvalId": "approval-abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Scenario 1 - Approved Within Timeout**: If approval is provided before timeout expires: ```json { - "Values": { - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=https://your-scheduler.dts.azure.net;Authentication=DefaultAzure" + "requestType": "ExpenseApproval", + "status": "Approved", + "approval": { + "approved": true, + "comments": "Approved by manager", + "approver": "manager@company.com", + "timestamp": "2025-09-19T16:45:30Z" } } ``` -2. Ensure you're authenticated with Azure CLI: - ```bash - az login +3. **Scenario 2 - Denied Within Timeout**: If denial is provided before timeout expires: + ```json + { + "requestType": "ExpenseApproval", + "status": "Denied", + "approval": { + "approved": false, + "comments": "Insufficient justification", + "approver": "manager@company.com", + "timestamp": "2025-09-19T16:45:30Z" + } + } + ``` + +4. **Scenario 3 - Timeout (No Response)**: If no response is received within the timeout period: + ```json + { + "requestType": "ExpenseApproval", + "status": "Escalated", + "reason": "No response within timeout period", + "escalation": "Sent to senior management", + "timeoutMinutes": 15 + } ``` +5. **Testing Timeout Scenarios**: To test timeout behavior: + - Start an approval request with a short timeout (e.g., 10 seconds) + - Don't send any approval/denial response + - Wait for the timeout to occur and check the final escalation status + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - The orchestration waiting for external events (approval/denial) + - Active durable timers and their remaining time until timeout + - Timeline showing the approval request activity and the waiting state + - Real-time updates when the external event is received or timeout occurs + +The dashboard is particularly valuable for this pattern because it clearly visualizes the "waiting" state and shows how external events can interrupt long-running waits, demonstrating the human interaction pattern in real-time. + ## Learn More -- [Durable Functions Overview](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Human Interaction Pattern](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-phone-verification) \ No newline at end of file +- [Human Interaction Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-phone-verification) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/README.md b/samples/durable-functions/python/monitoring/README.md index d095067..68d993d 100644 --- a/samples/durable-functions/python/monitoring/README.md +++ b/samples/durable-functions/python/monitoring/README.md @@ -31,314 +31,210 @@ HTTP Start → Monitoring Orchestrator ## Prerequisites -1. **Azure Storage Emulator** (Azurite) or **Azure Storage Account** -2. **Azure Functions Core Tools** v4.x -3. **Python** 3.8 or higher +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed -## Setup Instructions +## Configuring Durable Task Scheduler -### 1. Install Dependencies +There are two ways to run this sample locally: -```bash -# Navigate to the monitoring sample directory -cd samples/durable-functions/python/monitoring +### Using the Emulator (Recommended) -# Create and activate virtual environment (recommended) -python -m venv venv -source venv/bin/activate # On Windows: venv\Scripts\activate +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: -# Install required packages -pip install -r requirements.txt -``` - -### 2. Start Storage Emulator - -**Option A: Using Azurite (Recommended)** -```bash -# Install Azurite globally -npm install -g azurite - -# Start Azurite -azurite --silent --location ./azurite --debug ./azurite/debug.log -``` - -**Option B: Using Azure Storage** -Update `local.settings.json` with your storage connection string: -```json -{ - "Values": { - "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=;AccountKey=;EndpointSuffix=core.windows.net" - } -} -``` - -### 3. Start the Function App - -```bash -# Start the Azure Functions runtime -func start -``` - -The function app will start on `http://localhost:7071` - -## Usage Examples - -### 1. Start Job Monitoring (Default Configuration) - -**Basic Job Monitoring:** -```bash -curl -X POST http://localhost:7071/api/start_monitoring_job \ - -H "Content-Type: application/json" -``` - -**Expected Response:** -```json -{ - "id": "abc123def456", - "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456?taskHub=default", - "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/raiseEvent/{eventName}?taskHub=default", - "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/terminate?reason={text}&taskHub=default", - "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456?taskHub=default" -} -``` - -### 2. Start Job Monitoring with Custom Parameters - -**With Custom Job Configuration:** -```bash -curl -X POST http://localhost:7071/api/start_monitoring_job \ - -H "Content-Type: application/json" \ - -d '{ - "job_id": "my-custom-job-123", - "polling_interval_seconds": 10, - "timeout_seconds": 60 - }' -``` - -### 3. Check Orchestration Status - -```bash -curl -X GET "http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}?taskHub=default" -``` - -**Sample Status Response (In Progress):** -```json -{ - "name": "monitoring_job_orchestrator", - "instanceId": "abc123def456", - "runtimeStatus": "Running", - "input": { - "job_id": "job-uuid-12345", - "polling_interval_seconds": 5, - "timeout_seconds": 30 - }, - "customStatus": { - "job_id": "job-uuid-12345", - "status": "Running", - "check_count": 2, - "last_check_time": "2025-09-19T18:05:15.123Z" - }, - "output": null, - "createdTime": "2025-09-19T18:05:00Z", - "lastUpdatedTime": "2025-09-19T18:05:15Z" -} -``` - -### 4. Get Job Status Directly - -```bash -curl -X GET "http://localhost:7071/api/job_status/{jobId}" -``` - -**Sample Job Status Response:** -```json -{ - "job_id": "job-uuid-12345", - "status": "Running", - "progress_percent": 75, - "estimated_completion": "2025-09-19T18:15:00Z", - "last_updated": "2025-09-19T18:05:30.456Z", - "details": "Processing batch 3 of 4" -} -``` - -## How the Pattern Works - -### 1. Job Status Polling -- The orchestrator periodically calls the `check_job_status` activity -- Each check simulates querying an external service or API -- Job status progresses from "Unknown" → "Running" → "Completed" - -### 2. Activity-based Delays -- Uses `wait_for_interval` activity to wait between status checks -- Configurable polling interval (default: 5 seconds) -- Note: Uses activity function instead of `create_timer()` to avoid timer configuration issues in Python SDK - -### 3. Custom Status Updates -- Current job status is exposed via `set_custom_status()` -- Clients can query orchestration status to see job progress -- Real-time visibility into monitoring state - -### 4. Timeout Handling -- Monitoring stops if timeout period is reached -- Job status is set to "Timeout" if not completed in time -- Prevents infinite monitoring loops - -## Sample Outputs - -### Completed Job Monitoring Result - -```json -{ - "job_id": "job-uuid-12345", - "final_status": "Completed", - "checks_performed": 4, - "monitoring_duration_seconds": 15.6 -} -``` - -### Timed Out Job Monitoring Result - -```json -{ - "job_id": "job-uuid-67890", - "final_status": "Timeout", - "checks_performed": 6, - "monitoring_duration_seconds": 30.0 -} -``` - -### Custom Status During Monitoring - -```json -{ - "job_id": "job-uuid-12345", - "status": "Running", - "check_count": 3, - "last_check_time": "2025-09-19T18:05:15.123Z" -} -``` - -## Configuration Options - -### Workflow Configuration -- **workflow_type**: Type of workflow being monitored (string) -- **batch_size**: Total number of items to process (integer, default: 100) -- **failure_rate**: Simulated failure rate for demo (float, 0.0-1.0, default: 0.1) -- **enable_monitoring**: Enable/disable monitoring features (boolean, default: true) - -### Alert Thresholds -- **Success Rate Alert**: Triggered when success rate < 90% -- **Performance Alert**: Based on processing time deviations -- **Error Rate Alert**: When error rate exceeds configured threshold - -## Monitoring Best Practices - -### 1. **Use Custom Status Effectively** -```python -# Update custom status with meaningful progress information -context.set_custom_status({ - "current_phase": "processing", - "progress": {"completed": 50, "total": 100}, - "metrics": {"success_rate": 95.5} -}) -``` - -### 2. **Implement Structured Logging** -```python -logging.info(f"Batch {batch_num} completed", extra={ - "workflow_id": workflow_id, - "batch_number": batch_num, - "items_processed": processed_count, - "processing_time": processing_time -}) -``` - -### 3. **Track Key Metrics** -- Processing throughput (items/second) -- Success/failure rates -- Resource utilization -- Duration and timing metrics - -### 4. **Configure Appropriate Alerts** -- Set meaningful thresholds -- Include actionable information -- Route to appropriate teams -- Provide context for quick resolution - -## Integration Examples - -### Application Insights Integration -```python -# Add Application Insights logging -import logging -from opencensus.ext.azure.log_exporter import AzureLogHandler - -# Configure Application Insights -logging.getLogger().addHandler(AzureLogHandler( - connection_string="InstrumentationKey=your-key" -)) -``` - -### Event Grid Integration -Configure in `host.json` to publish orchestration events: -```json -{ - "extensions": { - "durableTask": { - "notifications": { - "eventGrid": { - "topicEndpoint": "https://your-topic.eventgrid.azure.net/", - "keySettingName": "EventGridKey" - } - } - } - } -} -``` - -## Troubleshooting - -### Common Issues - -1. **Timer Configuration Errors** - - Error: "replay schema version >= V3 is being used, but timer properties are not defined" - - Solution: This sample uses activity functions (`wait_for_interval`) instead of `create_timer()` to avoid known timer issues in the Python SDK - - This is a temporary workaround until the SDK timer configuration is fixed - -2. **Storage Connection Issues** - - Ensure Durable Task Scheduler is running on http://localhost:8080 - - Check `local.settings.json` has correct `DURABLE_TASK_SCHEDULER_CONNECTION_STRING` - - Verify `TASKHUB_NAME` is set to "default" - -3. **Job Status Not Updating** - - Verify `set_custom_status()` calls in orchestrator - - Check that `check_job_status` activity is being called successfully - - Monitor logs for activity execution - -### Debug Logs - -Enable detailed logging in `host.json`: -```json -{ - "logging": { - "logLevel": { - "DurableTask.Core": "Information", - "DurableTask.AzureStorage": "Information" - } - } -} -``` - -## Related Samples - -- **[Function Chaining](../function-chaining/)**: Basic orchestration pattern -- **[Fan-out/Fan-in](../fan-out-fan-in/)**: Parallel processing with aggregation -- **[Human Interaction](../human-interaction/)**: External event handling -- **[Eternal Orchestrations](../eternal-orchestrations/)**: Long-running workflows - -## Additional Resources - -- [Durable Functions Monitoring Documentation](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-monitor) -- [Azure Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/functions-reference-python) -- [Application Insights for Azure Functions](https://docs.microsoft.com/azure/azure-functions/functions-monitoring) \ No newline at end of file + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Start job monitoring by sending a POST request: + ```bash + # Basic job monitoring with default configuration + curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" \ + -d '{}' + + # Job monitoring with custom parameters + curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" \ + -d '{ + "job_id": "my-custom-job-123", + "polling_interval_seconds": 10, + "timeout_seconds": 60 + }' + ``` + +5. Check orchestration status using the `statusQueryGetUri` from the response: + ```bash + curl -X GET "http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}" + ``` + +6. Optionally, check job status directly: + ```bash + curl -X GET "http://localhost:7071/api/job_status/{jobId}" + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abc123def456", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456" + } + ``` + +2. **Orchestration Status During Monitoring**: While the job is being monitored, status checks will show: + ```json + { + "name": "monitoring_job_orchestrator", + "instanceId": "abc123def456", + "runtimeStatus": "Running", + "input": { + "job_id": "job-uuid-12345", + "polling_interval_seconds": 5, + "timeout_seconds": 30 + }, + "customStatus": { + "job_id": "job-uuid-12345", + "status": "Running", + "check_count": 3, + "last_check_time": "2025-09-19T18:05:15.123Z" + } + } + ``` + +3. **Completed Job Monitoring Result**: When the job completes successfully: + ```json + { + "job_id": "job-uuid-12345", + "final_status": "Completed", + "checks_performed": 4, + "monitoring_duration_seconds": 15.6 + } + ``` + +4. **Timeout Scenario**: If the job doesn't complete within the specified timeout: + ```json + { + "job_id": "job-uuid-67890", + "final_status": "Timeout", + "checks_performed": 6, + "monitoring_duration_seconds": 30.0 + } + ``` + +5. **Monitoring Pattern Benefits**: + - Provides real-time visibility into job progress via custom status + - Prevents infinite waiting with built-in timeout handling + - Uses configurable polling intervals to balance responsiveness and resource usage + - Handles external services that don't support callbacks or webhooks + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Real-time custom status updates showing job monitoring progress + - Timeline of periodic `check_job_status` activity executions + - Activity-based delays between status checks (using `wait_for_interval`) + - How the monitoring pattern continues until job completion or timeout + +The dashboard is particularly useful for this pattern because it shows how the orchestration maintains state between periodic checks, demonstrating the monitoring pattern's ability to track long-running external operations over time. + +## Learn More + +- [Monitoring Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-monitor) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file