diff --git a/.gitignore b/.gitignore index 1c3d923..83db52a 100644 --- a/.gitignore +++ b/.gitignore @@ -404,4 +404,17 @@ FodyWeavers.xsd samples/durable-task-sdks/java/**/build/ samples/durable-task-sdks/java/**/*.class samples/durable-task-sdks/java/**/.gradle/ -samples/durable-task-sdks/java/**/out/ \ No newline at end of file +samples/durable-task-sdks/java/**/out/ + +# Azure Functions Azurite local storage (generated files) +**/__azurite_db_*.json +**/AzuriteConfig + +# Python virtual environments and cache +**/__pycache__/ +**/venv/ +**/.venv/ + +# Local settings files that might contain secrets (but preserve template files in samples) +# Note: Sample local.settings.json files are kept as templates with safe defaults +local.settings.json \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/README.md b/samples/durable-functions/python/async-http-api/README.md new file mode 100644 index 0000000..eaf53d8 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/README.md @@ -0,0 +1,205 @@ +# Async HTTP API Pattern + +## Description of the Sample + +This sample demonstrates the async HTTP API pattern with Azure Durable Functions in Python. This pattern is essential for handling long-running operations where clients need to periodically check the status of their requests rather than waiting for a synchronous response. + +In this sample: +1. **HTTP Request**: Client submits a request to start a long-running operation +2. **Immediate Response**: The HTTP trigger returns immediately with status URLs and a 202 Accepted status +3. **Background Processing**: The orchestrator manages a long-running `process_long_running_task` activity +4. **Status Polling**: Clients use the provided URLs to check the operation status +5. **Completion**: Eventually, the operation completes and returns the final result + +This pattern is useful for: +- Operations that take several seconds or minutes to complete +- Batch processing jobs where clients need to track progress +- Integration scenarios where you need to prevent HTTP timeouts +- APIs that need to provide immediate responses for long-running tasks + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Start a long-running operation by sending a POST request: + ```bash + # Start operation with default parameters + curl -X POST http://localhost:7071/api/async_http_api \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start operation with custom parameters + curl -X POST http://localhost:7071/api/async_http_api \ + -H "Content-Type: application/json" \ + -d '{"operation_type": "data_processing", "duration": 45, "data": {"input": "sample"}}' + ``` + +5. Poll for status using the URLs from the initial response: + ```bash + # Check status using the statusQueryGetUri from the response + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial HTTP Response**: The operation starts immediately and returns HTTP 202 (Accepted) with management URLs: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Status Polling**: While the long-running task executes, status checks will show: + ```json + { + "runtimeStatus": "Running", + "input": {"operation_type": "data_processing", "duration": 30}, + "output": null, + "createdTime": "2023-12-01T10:00:00Z", + "lastUpdatedTime": "2023-12-01T10:00:15Z" + } + ``` + +3. **Completion**: Once the operation finishes (after the specified duration), the status will show: + ```json + { + "runtimeStatus": "Completed", + "input": {"operation_type": "data_processing", "duration": 30}, + "output": { + "task": "data_processing", + "result": "Success", + "duration": 30, + "timestamp": "2023-12-01T10:00:45Z" + } + } + ``` + +4. **Key Benefits**: This pattern allows clients to: + - Get immediate confirmation that the request was accepted + - Avoid HTTP timeouts on long-running operations + - Check progress at their own pace + - Handle other tasks while waiting for completion + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Real-time status updates as the long-running task progresses + - Timeline showing when the operation started and how long it's been running + - Input parameters and current execution state + - The async HTTP API pattern in action with clear start/processing/completion phases + +The dashboard is particularly valuable for this pattern because it demonstrates how the orchestration continues running in the background while clients can poll for updates through the HTTP API. + +## Learn More + +- [Async HTTP APIs Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-http-features) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/function_app.py b/samples/durable-functions/python/async-http-api/function_app.py new file mode 100644 index 0000000..a9a8b27 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/function_app.py @@ -0,0 +1,179 @@ +import logging +import json +import time +import uuid +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="async_http_api", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_async_operation(req: func.HttpRequest, client): + """HTTP trigger that starts a long-running asynchronous operation.""" + try: + # Get input from request body or use default + req_body = req.get_json() + operation_data = { + "operation_type": req_body.get("operation_type", "data_processing") if req_body else "data_processing", + "duration": req_body.get("duration", 30) if req_body else 30, # seconds + "data": req_body.get("data", {"sample": "data"}) if req_body else {"sample": "data"} + } + + logging.info(f"Starting async operation: {operation_data}") + + # Start the orchestration + instance_id = await client.start_new("async_http_api_orchestrator", client_input=operation_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def async_http_api_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the async HTTP API pattern. + Starts a long-running operation and provides status updates. + """ + operation_data = context.get_input() + logging.info(f"Async HTTP API orchestration started: {operation_data}") + + # Start the long-running operation + operation_id = str(uuid.uuid4()) + operation_request = { + **operation_data, + "operation_id": operation_id, + "started_at": context.current_utc_datetime.isoformat() + } + + # Call the long-running activity + result = yield context.call_activity("long_running_operation", operation_request) + + logging.info(f"Async HTTP API orchestration completed: {result}") + return result + +@app.activity_trigger(input_name="operationRequest") +def long_running_operation(operationRequest: dict) -> dict: + """Simulate a long-running operation with status updates.""" + logging.info(f"Starting long-running operation: {operationRequest['operation_id']}") + + operation_type = operationRequest["operation_type"] + duration = operationRequest["duration"] + data = operationRequest["data"] + + # Simulate processing phases + phases = ["initializing", "processing", "validating", "finalizing"] + phase_duration = duration / len(phases) + + start_time = time.time() + + for i, phase in enumerate(phases): + logging.info(f"Operation {operationRequest['operation_id']} - Phase: {phase}") + + # Simulate work being done in this phase + time.sleep(phase_duration) + + progress = ((i + 1) / len(phases)) * 100 + logging.info(f"Operation {operationRequest['operation_id']} - Progress: {progress}%") + + end_time = time.time() + actual_duration = end_time - start_time + + result = { + "operation_id": operationRequest["operation_id"], + "operation_type": operation_type, + "status": "completed", + "started_at": operationRequest["started_at"], + "completed_at": time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "duration_seconds": round(actual_duration, 2), + "input_data": data, + "output_data": { + "processed": True, + "result_id": str(uuid.uuid4()), + "phases_completed": phases, + "total_items_processed": 1000, + "success_rate": 98.5 + } + } + + logging.info(f"Completed long-running operation: {operationRequest['operation_id']}") + return result + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration with detailed progress information.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + # Enhanced status response for async operations + response_data = { + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None, + "customStatus": status.custom_status + } + + # Add operation-specific status information + if status.runtime_status == "Running": + response_data["message"] = "Operation is currently in progress. Check back later for results." + elif status.runtime_status == "Completed": + response_data["message"] = "Operation completed successfully." + elif status.runtime_status == "Failed": + response_data["message"] = "Operation failed. Check the output for error details." + + return func.HttpResponse( + json.dumps(response_data, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="cancel/{instanceId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def cancel_operation(req: func.HttpRequest, client): + """Cancel a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + await client.terminate(instance_id, "Operation cancelled by user request") + + return func.HttpResponse( + json.dumps({"message": f"Operation {instance_id} has been cancelled"}), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error cancelling operation: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to cancel operation: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/host.json b/samples/durable-functions/python/async-http-api/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/async-http-api/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/async-http-api/requirements.txt b/samples/durable-functions/python/async-http-api/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/async-http-api/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/README.md b/samples/durable-functions/python/eternal-orchestrations/README.md new file mode 100644 index 0000000..110a1de --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/README.md @@ -0,0 +1,218 @@ +# Eternal Orchestrations Pattern + +## Description of the Sample + +This sample demonstrates the eternal orchestrations pattern with Azure Durable Functions in Python. This pattern is designed for creating long-running, recurring workflows that can continue indefinitely until explicitly stopped, making it ideal for periodic tasks, monitoring jobs, and scheduled processes. + +In this sample: +1. **Initialization**: The orchestration starts with specified parameters (task type, interval, max iterations) +2. **Work Execution**: The `perform_periodic_work` activity executes the recurring task +3. **Timer Wait**: Uses a durable timer to wait for the next execution interval without consuming resources +4. **Self-Continuation**: The orchestration continues itself with `continue_as_new()` to reset the execution history and prevent memory buildup +5. **Graceful Termination**: Can be stopped via external events or after reaching maximum iterations + +This pattern is useful for: +- Periodic data synchronization or ETL jobs +- Health monitoring and system status checks +- Regular cleanup or maintenance tasks +- Scheduled reporting or notifications +- Long-running processes that need to execute at regular intervals + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Start an eternal orchestration by sending a POST request: + ```bash + # Start with default parameters (2 minute intervals, 5 iterations max) + curl -X POST http://localhost:7071/api/eternal_orchestration \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start with custom parameters + curl -X POST http://localhost:7071/api/eternal_orchestration \ + -H "Content-Type: application/json" \ + -d '{"task_type": "data_sync", "interval_minutes": 1, "max_iterations": 10, "target_url": "https://httpbin.org/delay/1"}' + ``` + +5. Monitor the running orchestration: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +6. Stop the eternal orchestration when needed: + ```bash + curl -X POST http://localhost:7071/api/stop/{instanceId} + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Running Status**: While the eternal orchestration is active, status checks will show: + ```json + { + "runtimeStatus": "Running", + "input": {"task_type": "health_check", "interval_minutes": 2, "max_iterations": 5}, + "customStatus": { + "current_iteration": 3, + "max_iterations": 5, + "last_run": "2025-09-19T16:45:30Z", + "task_type": "health_check" + } + } + ``` + +3. **Completed Status**: When reaching maximum iterations or being stopped: + ```json + { + "runtimeStatus": "Completed", + "output": { + "status": "completed", + "total_iterations": 5, + "final_result": { + "iteration": 5, + "task_type": "health_check", + "success": true, + "executed_at": "2025-09-19T16:45:30Z" + } + } + } + ``` + +4. **Key Characteristics**: + - **Resource Efficient**: Uses durable timers instead of blocking waits between iterations + - **Self-Continuation**: Calls `continue_as_new()` to reset execution history and prevent memory buildup + - **Graceful Shutdown**: Responds to external stop events or completion conditions + - **Long-Term Stability**: Designed to run for extended periods (days, weeks, or months) + - **Progress Tracking**: Maintains iteration counts and execution history via custom status + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - The eternal orchestration running continuously with regular activity executions + - Durable timers showing scheduled next execution times between iterations + - Custom status updates tracking current iteration count and progress + - Timeline showing the recurring pattern of work activity followed by timer wait + - How `continue_as_new()` resets the execution history while maintaining the iteration loop + +The dashboard is particularly valuable for this pattern because it demonstrates how eternal orchestrations can run indefinitely while managing resources efficiently through self-continuation and durable timers. + +## Learn More + +- [Eternal Orchestrations Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-eternal-orchestrations) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/function_app.py b/samples/durable-functions/python/eternal-orchestrations/function_app.py new file mode 100644 index 0000000..8838859 --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/function_app.py @@ -0,0 +1,238 @@ +import logging +import json +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="eternal_orchestration", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_eternal_orchestration(req: func.HttpRequest, client): + """HTTP trigger that starts an eternal orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + config = { + "task_type": req_body.get("task_type", "health_check") if req_body else "health_check", + "interval_minutes": req_body.get("interval_minutes", 2) if req_body else 2, # Short interval for demo + "max_iterations": req_body.get("max_iterations", 5) if req_body else 5, # Limit for demo + "target_url": req_body.get("target_url", "https://httpbin.org/status/200") if req_body else "https://httpbin.org/status/200" + } + + logging.info(f"Starting eternal orchestration: {config}") + + # Start the orchestration + instance_id = await client.start_new("eternal_orchestrator", client_input=config) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def eternal_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the eternal orchestration pattern. + Uses continue_as_new for proper eternal orchestration behavior. + """ + config = context.get_input() + current_iteration = config.get("current_iteration", 1) + max_iterations = config.get("max_iterations", 5) + + logging.info(f"Eternal orchestration iteration {current_iteration}") + + # Update custom status + context.set_custom_status({ + "current_iteration": current_iteration, + "max_iterations": max_iterations, + "last_run": context.current_utc_datetime.isoformat(), + "task_type": config["task_type"], + "status": "executing_task" + }) + + # Execute the periodic task + task_result = yield context.call_activity("execute_periodic_task", { + **config, + "iteration": current_iteration + }) + + # Check if we should continue + if current_iteration >= max_iterations: + logging.info(f"Eternal orchestration completed after {current_iteration} iterations") + return { + "status": "completed", + "total_iterations": current_iteration, + "final_result": task_result + } + + # Update status to waiting + context.set_custom_status({ + "current_iteration": current_iteration, + "max_iterations": max_iterations, + "last_run": context.current_utc_datetime.isoformat(), + "task_type": config["task_type"], + "status": "waiting_for_next_iteration" + }) + + # Use activity function to handle the delay instead of orchestrator timer + yield context.call_activity("wait_for_interval", { + "interval_minutes": config.get("interval_minutes", 2) + }) + + # Continue with next iteration + next_config = config.copy() + next_config["current_iteration"] = current_iteration + 1 + context.continue_as_new(next_config) + +@app.activity_trigger(input_name="taskConfig") +def execute_periodic_task(taskConfig: dict) -> dict: + """Execute a periodic task (e.g., health check, data sync, cleanup).""" + import requests + import time + + task_type = taskConfig["task_type"] + iteration = taskConfig["iteration"] + target_url = taskConfig["target_url"] + + logging.info(f"Executing {task_type} task - iteration {iteration}") + + start_time = time.time() + + try: + if task_type == "health_check": + # Perform health check + response = requests.get(target_url, timeout=10) + success = response.status_code == 200 + details = { + "status_code": response.status_code, + "response_time_ms": round((time.time() - start_time) * 1000, 2), + "url": target_url + } + elif task_type == "data_sync": + # Simulate data synchronization + time.sleep(1) # Simulate work + success = True + details = { + "records_synced": 150, + "duration_ms": round((time.time() - start_time) * 1000, 2) + } + elif task_type == "cleanup": + # Simulate cleanup task + time.sleep(0.5) # Simulate work + success = True + details = { + "files_cleaned": 25, + "space_freed_mb": 128, + "duration_ms": round((time.time() - start_time) * 1000, 2) + } + else: + # Default task + success = True + details = {"message": f"Executed {task_type} task"} + + except Exception as e: + success = False + details = {"error": str(e)} + + result = { + "iteration": iteration, + "task_type": task_type, + "success": success, + "executed_at": datetime.utcnow().isoformat(), + "details": details + } + + logging.info(f"Task execution completed: {result}") + return result + +@app.activity_trigger(input_name="waitConfig") +def wait_for_interval(waitConfig: dict) -> dict: + """Activity function that handles waiting between iterations.""" + import time + + interval_minutes = waitConfig.get("interval_minutes", 2) + # For demo purposes, cap the wait time at 2 minutes and convert to seconds + wait_seconds = min(interval_minutes * 60, 120) + + logging.info(f"Waiting {wait_seconds} seconds before next iteration") + + start_time = time.time() + time.sleep(wait_seconds) + actual_wait = time.time() - start_time + + result = { + "requested_wait_seconds": wait_seconds, + "actual_wait_seconds": round(actual_wait, 2), + "completed_at": datetime.utcnow().isoformat() + } + + logging.info(f"Wait completed: {result}") + return result + +@app.route(route="stop/{instanceId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def stop_eternal_orchestration(req: func.HttpRequest, client): + """Stop an eternal orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + await client.terminate(instance_id, "Stopped by user request") + + return func.HttpResponse( + json.dumps({"message": f"Eternal orchestration {instance_id} has been stopped"}), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error stopping orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to stop orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "customStatus": status.custom_status, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/eternal-orchestrations/host.json b/samples/durable-functions/python/eternal-orchestrations/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/eternal-orchestrations/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/README.md b/samples/durable-functions/python/fan-out-fan-in/README.md new file mode 100644 index 0000000..3205ae9 --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/README.md @@ -0,0 +1,191 @@ +# Fan-Out/Fan-In Pattern + +## Description of the Sample + +This sample demonstrates the fan-out/fan-in pattern with Azure Durable Functions in Python. This pattern is useful for executing multiple activities in parallel and then aggregating their results when all activities complete. + +In this sample: +1. **Fan-Out**: The orchestrator starts multiple `process_work_item` activities in parallel, one for each work item +2. **Parallel Processing**: Each activity processes its work item independently and concurrently +3. **Fan-In**: The orchestrator waits for all activities to complete and collects their results +4. **Aggregation**: An `aggregate_results` activity combines all results into a summary report + +This pattern is useful for: +- Processing large datasets by breaking them into chunks +- Performing parallel calculations or transformations +- Distributing workload across multiple workers for better performance +- Scenarios where independent tasks can be executed simultaneously + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Test the orchestration by sending a POST request: + ```bash + # Start orchestration with default work items + curl -X POST http://localhost:7071/api/fan_out_fan_in \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start orchestration with custom work items + curl -X POST http://localhost:7071/api/fan_out_fan_in \ + -H "Content-Type: application/json" \ + -d '{"workItems": ["Task1", "Task2", "Task3", "Task4", "Task5", "Task6"]}' + ``` + +5. Check orchestration status using the `statusQueryGetUri` from the response: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns a JSON response with management URLs: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Parallel Processing**: The orchestrator starts multiple `process_work_item` activities simultaneously. Each activity: + - Processes its work item independently + - Simulates work with random processing times (0.5-2.0 seconds) + - Returns processing metrics and a random value + +3. **Aggregated Results**: For input `{"workItems": ["Task1", "Task2", "Task3"]}`, the final output will be: + ```json + { + "total_items_processed": 3, + "total_value": 165, + "average_value": 55.0, + "total_processing_time": 3.47, + "processed_items": ["Processed_Task1", "Processed_Task2", "Processed_Task3"], + "success": true + } + ``` + +4. **Performance Benefits**: Because activities run in parallel rather than sequentially, the total processing time is much shorter than the sum of individual processing times. + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Parallel activity execution timeline showing concurrent processing + - Input and output data for each `process_work_item` activity + - Performance metrics demonstrating the fan-out/fan-in pattern + - Visual representation of how activities start simultaneously and complete at different times + +The dashboard is particularly useful for this sample because it clearly shows how the fan-out/fan-in pattern executes multiple activities concurrently, leading to significant performance improvements compared to sequential processing. + +## Learn More + +- [Fan-Out/Fan-In Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-cloud-backup) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/function_app.py b/samples/durable-functions/python/fan-out-fan-in/function_app.py new file mode 100644 index 0000000..126ff04 --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/function_app.py @@ -0,0 +1,139 @@ +import logging +import json +import asyncio +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="fan_out_fan_in", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_fan_out_fan_in(req: func.HttpRequest, client): + """HTTP trigger that starts the fan-out/fan-in orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + work_items = req_body.get("workItems", ["Item1", "Item2", "Item3", "Item4", "Item5"]) if req_body else ["Item1", "Item2", "Item3", "Item4", "Item5"] + + logging.info(f"Starting fan-out/fan-in orchestration with {len(work_items)} work items") + + # Start the orchestration + instance_id = await client.start_new("fan_out_fan_in_orchestrator", client_input=work_items) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def fan_out_fan_in_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the fan-out/fan-in pattern. + Processes multiple work items in parallel and aggregates the results. + """ + work_items = context.get_input() + logging.info(f"Fan-out/fan-in orchestration started with {len(work_items)} work items") + + # Fan-out: Start all work items in parallel + parallel_tasks = [] + for item in work_items: + task = context.call_activity("process_work_item", item) + parallel_tasks.append(task) + + # Fan-in: Wait for all tasks to complete and collect results + results = yield context.task_all(parallel_tasks) + + # Aggregate the results + aggregated_result = yield context.call_activity("aggregate_results", results) + + logging.info(f"Fan-out/fan-in orchestration completed with aggregated result") + return aggregated_result + +@app.activity_trigger(input_name="workItem") +def process_work_item(workItem: str) -> dict: + """Process a single work item.""" + import time + import random + + logging.info(f"Processing work item: {workItem}") + + # Simulate processing time + processing_time = random.uniform(0.5, 2.0) + time.sleep(processing_time) + + # Simulate different types of processing results + result = { + "item": workItem, + "processed": True, + "processing_time": round(processing_time, 2), + "result": f"Processed_{workItem}", + "value": random.randint(10, 100) + } + + logging.info(f"Completed processing work item: {workItem}") + return result + +@app.activity_trigger(input_name="results") +def aggregate_results(results: list) -> dict: + """Aggregate the results from all parallel processing tasks.""" + logging.info(f"Aggregating {len(results)} results") + + total_value = sum(result["value"] for result in results) + total_processing_time = sum(result["processing_time"] for result in results) + processed_items = [result["result"] for result in results] + + aggregated = { + "total_items_processed": len(results), + "total_value": total_value, + "average_value": round(total_value / len(results), 2), + "total_processing_time": round(total_processing_time, 2), + "processed_items": processed_items, + "success": True + } + + logging.info(f"Aggregation completed: {aggregated}") + return aggregated + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/host.json b/samples/durable-functions/python/fan-out-fan-in/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/fan-out-fan-in/requirements.txt b/samples/durable-functions/python/fan-out-fan-in/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/fan-out-fan-in/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/README.md b/samples/durable-functions/python/function-chaining/README.md new file mode 100644 index 0000000..6962d90 --- /dev/null +++ b/samples/durable-functions/python/function-chaining/README.md @@ -0,0 +1,181 @@ +# Function Chaining Pattern + +## Description of the Sample + +This sample demonstrates the function chaining pattern with Azure Durable Functions in Python. Function chaining is a fundamental workflow pattern where activities are executed in a sequence, with the output of one activity passed as the input to the next activity. + +In this sample: +1. The orchestrator calls the `say_hello` activity with a name input +2. The result is passed to the `process_greeting` activity +3. That result is passed to the `finalize_response` activity +4. The final result is returned to the client + +This pattern is useful for: +- Creating sequential workflows where steps must execute in order +- Passing data between steps with data transformations at each step +- Building pipelines where each activity adds value to the result + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Test the orchestration by sending a POST request: + ```bash + # Start orchestration with default input + curl -X POST http://localhost:7071/api/function_chaining \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start orchestration with custom name + curl -X POST http://localhost:7071/api/function_chaining \ + -H "Content-Type: application/json" \ + -d '{"name": "Alice"}' + ``` + +5. Check orchestration status using the `statusQueryGetUri` from the response: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns a JSON response with management URLs: + ```json + { + "id": "abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Orchestration Execution**: The function chaining pattern processes the input through three activities: + - `say_hello`: Creates the initial greeting + - `process_greeting`: Adds the "How are you today?" part + - `finalize_response`: Adds the final "I hope you're doing well!" part + +3. **Final Result**: For input `{"name": "Alice"}`, the orchestration will produce: + ``` + "Hello Alice! How are you today? I hope you're doing well!" + ``` + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Orchestration timeline and execution steps + - Input and output data for each activity + - Performance metrics and timing information + - Visual representation of the function chaining pattern + +The dashboard provides real-time visibility into the orchestration execution, making it easy to understand how data flows through each step in the chain. + +## Learn More + +- [Function Chaining Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-sequence) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/function_app.py b/samples/durable-functions/python/function-chaining/function_app.py new file mode 100644 index 0000000..c18d04b --- /dev/null +++ b/samples/durable-functions/python/function-chaining/function_app.py @@ -0,0 +1,108 @@ +import logging +import json +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="function_chaining", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_function_chaining(req: func.HttpRequest, client): + """HTTP trigger that starts the function chaining orchestration.""" + try: + # Get input from request body or use default + req_body = req.get_json() + name = req_body.get("name", "World") if req_body else "World" + + logging.info(f"Starting function chaining orchestration for: {name}") + + # Start the orchestration + instance_id = await client.start_new("function_chaining_orchestrator", client_input=name) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def function_chaining_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the function chaining pattern. + Calls activities sequentially, passing the output of each activity to the next. + """ + name = context.get_input() + logging.info(f"Function chaining orchestration started for: {name}") + + # Call first activity - passing input directly + greeting = yield context.call_activity("say_hello", name) + + # Call second activity with the result from first activity + processed_greeting = yield context.call_activity("process_greeting", greeting) + + # Call third activity with the result from second activity + final_response = yield context.call_activity("finalize_response", processed_greeting) + + logging.info(f"Function chaining orchestration completed: {final_response}") + return final_response + +@app.activity_trigger(input_name="name") +def say_hello(name: str) -> str: + """First activity that greets the user.""" + logging.info(f"Activity say_hello called with name: {name}") + return f"Hello {name}!" + +@app.activity_trigger(input_name="greeting") +def process_greeting(greeting: str) -> str: + """Second activity that processes the greeting.""" + logging.info(f"Activity process_greeting called with greeting: {greeting}") + return f"{greeting} How are you today?" + +@app.activity_trigger(input_name="response") +def finalize_response(response: str) -> str: + """Third activity that finalizes the response.""" + logging.info(f"Activity finalize_response called with response: {response}") + return f"{response} I hope you're doing well!" + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/host.json b/samples/durable-functions/python/function-chaining/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/function-chaining/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/function-chaining/requirements.txt b/samples/durable-functions/python/function-chaining/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/function-chaining/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/README.md b/samples/durable-functions/python/human-interaction/README.md new file mode 100644 index 0000000..d256382 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/README.md @@ -0,0 +1,231 @@ +# Human Interaction Pattern + +## Description of the Sample + +This sample demonstrates the human interaction pattern with Azure Durable Functions in Python. This pattern is essential for workflows that require human approval, input, or decision-making with built-in timeout handling to prevent indefinite waiting. + +In this sample: +1. **Approval Request**: The orchestrator starts by sending an approval request using the `send_approval_request` activity +2. **External Event Waiting**: The orchestration waits for an external event (approval/denial) or a timeout +3. **Race Condition**: Either the human provides input or the timeout expires - whichever happens first determines the flow +4. **Response Processing**: Approved requests are processed via `process_approval`, while timeouts trigger `escalate_approval` +5. **Final Result**: The orchestration completes with the approval status or escalation details + +This pattern is useful for: +- Approval workflows for expenses, time-off requests, or purchases +- Document review and sign-off processes +- Quality control checkpoints requiring human validation +- Any process where automated systems need human oversight with timeout handling + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Start an approval workflow by sending a POST request: + ```bash + # Start approval request with default timeout (10 minutes) + curl -X POST http://localhost:7071/api/human_interaction \ + -H "Content-Type: application/json" \ + -d '{}' + + # Start approval request with custom parameters + curl -X POST http://localhost:7071/api/human_interaction \ + -H "Content-Type: application/json" \ + -d '{"request_type": "expense_approval", "amount": 5000, "timeout_minutes": 15, "requester": "alice@company.com", "description": "Conference attendance expenses"}' + ``` + +5. Provide approval or denial within the timeout period: + ```bash + # Approve the request (use approvalId from the orchestration response) + curl -X POST http://localhost:7071/api/approve/{approvalId} \ + -H "Content-Type: application/json" \ + -d '{"approved": true, "comments": "Approved by manager", "approver": "manager@company.com"}' + + # Or deny the request + curl -X POST http://localhost:7071/api/approve/{approvalId} \ + -H "Content-Type: application/json" \ + -d '{"approved": false, "comments": "Insufficient justification", "approver": "manager@company.com"}' + ``` + +6. Check orchestration status: + ```bash + curl http://localhost:7071/api/status/{instanceId} + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior based on different scenarios: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abcd1234", + "approvalId": "approval-abcd1234", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abcd1234" + } + ``` + +2. **Scenario 1 - Approved Within Timeout**: If approval is provided before timeout expires: + ```json + { + "requestType": "ExpenseApproval", + "status": "Approved", + "approval": { + "approved": true, + "comments": "Approved by manager", + "approver": "manager@company.com", + "timestamp": "2025-09-19T16:45:30Z" + } + } + ``` + +3. **Scenario 2 - Denied Within Timeout**: If denial is provided before timeout expires: + ```json + { + "requestType": "ExpenseApproval", + "status": "Denied", + "approval": { + "approved": false, + "comments": "Insufficient justification", + "approver": "manager@company.com", + "timestamp": "2025-09-19T16:45:30Z" + } + } + ``` + +4. **Scenario 3 - Timeout (No Response)**: If no response is received within the timeout period: + ```json + { + "requestType": "ExpenseApproval", + "status": "Escalated", + "reason": "No response within timeout period", + "escalation": "Sent to senior management", + "timeoutMinutes": 15 + } + ``` + +5. **Testing Timeout Scenarios**: To test timeout behavior: + - Start an approval request with a short timeout (e.g., 10 seconds) + - Don't send any approval/denial response + - Wait for the timeout to occur and check the final escalation status + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - The orchestration waiting for external events (approval/denial) + - Active durable timers and their remaining time until timeout + - Timeline showing the approval request activity and the waiting state + - Real-time updates when the external event is received or timeout occurs + +The dashboard is particularly valuable for this pattern because it clearly visualizes the "waiting" state and shows how external events can interrupt long-running waits, demonstrating the human interaction pattern in real-time. + +## Learn More + +- [Human Interaction Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-phone-verification) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/function_app.py b/samples/durable-functions/python/human-interaction/function_app.py new file mode 100644 index 0000000..0ec2af6 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/function_app.py @@ -0,0 +1,280 @@ +import logging +import json +import uuid +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="human_interaction", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_approval_process(req: func.HttpRequest, client): + """HTTP trigger that starts a human interaction approval process.""" + try: + # Get input from request body or use default + req_body = req.get_json() + request_data = { + "request_type": req_body.get("request_type", "expense_approval") if req_body else "expense_approval", + "amount": req_body.get("amount", 1500.00) if req_body else 1500.00, + "requester": req_body.get("requester", "john.doe@company.com") if req_body else "john.doe@company.com", + "description": req_body.get("description", "Business travel expenses") if req_body else "Business travel expenses", + "timeout_minutes": req_body.get("timeout_minutes", 10) if req_body else 10 # Short timeout for demo + } + + logging.info(f"Starting human interaction process: {request_data}") + + # Start the orchestration + instance_id = await client.start_new("human_interaction_orchestrator", client_input=request_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def human_interaction_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the human interaction pattern. + Waits for human approval with a timeout mechanism. + """ + request_data = context.get_input() + logging.info(f"Human interaction orchestration started: {request_data}") + + # Generate approval request (use instance ID as approval ID for simplicity) + approval_request = { + "approval_id": context.instance_id, + "request_type": request_data["request_type"], + "amount": request_data["amount"], + "requester": request_data["requester"], + "description": request_data["description"], + "created_at": context.current_utc_datetime.isoformat(), + "timeout_minutes": request_data["timeout_minutes"] + } + + # Send approval request notification + yield context.call_activity("send_approval_request", approval_request) + + # Update custom status to show waiting for approval + context.set_custom_status({ + "approval_id": approval_request["approval_id"], + "status": "waiting_for_approval", + "timeout_minutes": request_data["timeout_minutes"], + "approval_url": f"http://localhost:7071/api/approve/{approval_request['approval_id']}" + }) + + # For demo purposes, let's simplify and just wait for the external event + # In a production scenario, you'd implement proper timeout handling + logging.info(f"Waiting for external approval event for request: {approval_request['approval_id']}") + + try: + # Wait for external approval event (simplified - no timeout for now) + approval_response = yield context.wait_for_external_event("approval_response") + + logging.info(f"Approval received for request: {approval_request['approval_id']}") + logging.info(f"Approval response type: {type(approval_response)}, value: {approval_response}") + + # Handle case where response might be a string (serialization issue) + if isinstance(approval_response, str): + import json + try: + approval_response = json.loads(approval_response) + except: + # If parsing fails, create a default response + approval_response = {"approved": True, "approver": "unknown", "comments": "Parsed from string"} + + result = yield context.call_activity("process_approval", { + "approval_request": approval_request, + "approval_response": approval_response, + "status": "approved" if approval_response.get("approved") else "rejected" + }) + + except Exception as e: + # Handle any errors + logging.error(f"Error waiting for approval: {str(e)}") + result = yield context.call_activity("process_approval", { + "approval_request": approval_request, + "approval_response": None, + "status": "error" + }) + + logging.info(f"Human interaction orchestration completed: {result}") + return result + +@app.activity_trigger(input_name="approvalRequest") +def send_approval_request(approvalRequest: dict) -> dict: + """Send approval request notification (simulated).""" + logging.info(f"Sending approval request: {approvalRequest['approval_id']}") + + # In a real scenario, this would send email, SMS, or push notification + # For demo purposes, we'll just log the approval details + + notification_result = { + "approval_id": approvalRequest["approval_id"], + "notification_sent": True, + "notification_method": "email", # simulated + "recipient": "manager@company.com", # simulated + "sent_at": datetime.utcnow().isoformat(), + "approval_url": f"http://localhost:7071/api/approve/{approvalRequest['approval_id']}" + } + + logging.info(f"Approval request notification sent: {notification_result}") + return notification_result + +@app.activity_trigger(input_name="approvalData") +def process_approval(approvalData: dict) -> dict: + """Process the approval response and finalize the request.""" + logging.info(f"Processing approval: {approvalData}") + + approval_request = approvalData["approval_request"] + approval_response = approvalData["approval_response"] + status = approvalData["status"] + + result = { + "approval_id": approval_request["approval_id"], + "request_type": approval_request["request_type"], + "amount": approval_request["amount"], + "requester": approval_request["requester"], + "status": status, + "processed_at": datetime.utcnow().isoformat(), + "created_at": approval_request["created_at"] + } + + if status == "approved": + result.update({ + "approved_by": approval_response.get("approver", "unknown"), + "approval_comments": approval_response.get("comments", ""), + "approved_at": approval_response.get("approved_at"), + "next_steps": "Request will be processed for payment" + }) + elif status == "rejected": + result.update({ + "rejected_by": approval_response.get("approver", "unknown"), + "rejection_reason": approval_response.get("comments", "No reason provided"), + "rejected_at": approval_response.get("approved_at"), + "next_steps": "Request has been denied. Contact approver for details." + }) + else: # timeout + result.update({ + "timeout_reason": f"No response received within {approval_request['timeout_minutes']} minutes", + "next_steps": "Request will be escalated to senior management" + }) + + logging.info(f"Approval processing completed: {result}") + return result + +@app.activity_trigger(input_name="timeoutConfig") +def wait_for_timeout(timeoutConfig: dict) -> dict: + """Activity function that handles waiting for timeout duration.""" + import time + + timeout_minutes = timeoutConfig.get("timeout_minutes", 10) + # For demo purposes, cap the timeout at 5 minutes and convert to seconds + timeout_seconds = min(timeout_minutes * 60, 300) # Max 5 minutes + + logging.info(f"Starting timeout wait for {timeout_seconds} seconds") + + start_time = time.time() + time.sleep(timeout_seconds) + actual_wait = time.time() - start_time + + result = { + "timeout_occurred": True, + "requested_timeout_seconds": timeout_seconds, + "actual_wait_seconds": round(actual_wait, 2), + "completed_at": datetime.utcnow().isoformat() + } + + logging.info(f"Timeout completed: {result}") + return result + +@app.route(route="approve/{approvalId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def submit_approval(req: func.HttpRequest, client): + """HTTP endpoint for submitting approval decisions.""" + approval_id = req.route_params.get('approvalId') + + try: + # Get approval decision from request body + req_body = req.get_json() + approved = req_body.get("approved", False) if req_body else False + approver = req_body.get("approver", "manager@company.com") if req_body else "manager@company.com" + comments = req_body.get("comments", "") if req_body else "" + + approval_response = { + "approved": approved, + "approver": approver, + "comments": comments, + "approved_at": datetime.utcnow().isoformat() + } + + logging.info(f"Submitting approval decision: {approval_response}") + + # For this demo, we'll use the approval_id as the instance_id + # In a real implementation, you'd maintain a proper mapping + instance_id = approval_id + + # Raise the external event to the orchestration + await client.raise_event(instance_id, "approval_response", approval_response) + + return func.HttpResponse( + json.dumps({ + "message": f"Approval decision submitted for {approval_id}", + "approval_response": approval_response, + "instance_id": instance_id, + "event_raised": True + }), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error submitting approval: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to submit approval: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.route(route="status/{instanceId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_orchestration_status(req: func.HttpRequest, client): + """Get the status of a running orchestration.""" + instance_id = req.route_params.get('instanceId') + + try: + status = await client.get_status(instance_id) + + if status: + return func.HttpResponse( + json.dumps({ + "instanceId": status.instance_id, + "name": status.name, + "runtimeStatus": status.runtime_status, + "input": status.input_, + "output": status.output, + "createdTime": status.created_time.isoformat() if status.created_time else None, + "lastUpdatedTime": status.last_updated_time.isoformat() if status.last_updated_time else None + }, default=str), + mimetype="application/json" + ) + else: + return func.HttpResponse( + json.dumps({"error": "Orchestration not found"}), + status_code=404, + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/host.json b/samples/durable-functions/python/human-interaction/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/human-interaction/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/human-interaction/requirements.txt b/samples/durable-functions/python/human-interaction/requirements.txt new file mode 100644 index 0000000..94cf613 --- /dev/null +++ b/samples/durable-functions/python/human-interaction/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azure-functions-durable \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/README.md b/samples/durable-functions/python/monitoring/README.md new file mode 100644 index 0000000..68d993d --- /dev/null +++ b/samples/durable-functions/python/monitoring/README.md @@ -0,0 +1,240 @@ +# Monitoring Pattern + +## Description of the Sample + +This sample demonstrates the monitoring pattern with Azure Durable Functions in Python. The monitoring pattern is used for periodically checking the status of a long-running operation until it completes or times out. + +In this sample: +1. The orchestrator starts monitoring a job with a specified ID +2. It periodically calls the `check_job_status` activity at defined intervals +3. The current job status is exposed via custom status, making it available to clients +4. Monitoring continues until either: + - The job completes successfully + - The specified timeout period is reached + +This pattern is useful for: +- Polling external services or APIs that don't support callbacks +- Checking the status of long-running operations +- Implementing timeout mechanisms for operations with unpredictable durations +- Maintaining state about progress of a workflow + +## Sample Architecture + +``` +HTTP Start → Monitoring Orchestrator + ├── Check Job Status (Activity) + ├── Wait (Timer) + ├── Check Job Status (Activity) + ├── Wait (Timer) + └── ... (repeat until completion or timeout) +``` + +## Prerequisites + +1. [Python 3.8+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools](https://docs.microsoft.com/en-us/azure/azure-functions/functions-run-local) v4.x +3. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler) installed + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +1. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: http://localhost:8080, taskhub: default). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +1. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` +1. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +1. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +1. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up the Durable Task Scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Start the Azure Functions runtime: + ```bash + func start + ``` + + You should see output indicating the functions have loaded successfully. + +4. Start job monitoring by sending a POST request: + ```bash + # Basic job monitoring with default configuration + curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" \ + -d '{}' + + # Job monitoring with custom parameters + curl -X POST http://localhost:7071/api/start_monitoring_job \ + -H "Content-Type: application/json" \ + -d '{ + "job_id": "my-custom-job-123", + "polling_interval_seconds": 10, + "timeout_seconds": 60 + }' + ``` + +5. Check orchestration status using the `statusQueryGetUri` from the response: + ```bash + curl -X GET "http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}" + ``` + +6. Optionally, check job status directly: + ```bash + curl -X GET "http://localhost:7071/api/job_status/{jobId}" + ``` + +## Understanding the Output + +When you run the sample, you'll see the following behavior: + +1. **Initial Response**: The HTTP trigger returns management URLs immediately: + ```json + { + "id": "abc123def456", + "statusQueryGetUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456", + "sendEventPostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/raiseEvent/{eventName}", + "terminatePostUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456/terminate?reason={text}", + "purgeHistoryDeleteUri": "http://localhost:7071/runtime/webhooks/durabletask/instances/abc123def456" + } + ``` + +2. **Orchestration Status During Monitoring**: While the job is being monitored, status checks will show: + ```json + { + "name": "monitoring_job_orchestrator", + "instanceId": "abc123def456", + "runtimeStatus": "Running", + "input": { + "job_id": "job-uuid-12345", + "polling_interval_seconds": 5, + "timeout_seconds": 30 + }, + "customStatus": { + "job_id": "job-uuid-12345", + "status": "Running", + "check_count": 3, + "last_check_time": "2025-09-19T18:05:15.123Z" + } + } + ``` + +3. **Completed Job Monitoring Result**: When the job completes successfully: + ```json + { + "job_id": "job-uuid-12345", + "final_status": "Completed", + "checks_performed": 4, + "monitoring_duration_seconds": 15.6 + } + ``` + +4. **Timeout Scenario**: If the job doesn't complete within the specified timeout: + ```json + { + "job_id": "job-uuid-67890", + "final_status": "Timeout", + "checks_performed": 6, + "monitoring_duration_seconds": 30.0 + } + ``` + +5. **Monitoring Pattern Benefits**: + - Provides real-time visibility into job progress via custom status + - Prevents infinite waiting with built-in timeout handling + - Uses configurable polling intervals to balance responsiveness and resource usage + - Handles external services that don't support callbacks or webhooks + +## Dashboard Review + +You can monitor the orchestration execution through the Durable Task Scheduler dashboard: + +1. Navigate to `http://localhost:8082` in your browser +2. You'll see a list of task hubs - select the "default" hub +3. Click on your orchestration instance to see: + - Real-time custom status updates showing job monitoring progress + - Timeline of periodic `check_job_status` activity executions + - Activity-based delays between status checks (using `wait_for_interval`) + - How the monitoring pattern continues until job completion or timeout + +The dashboard is particularly useful for this pattern because it shows how the orchestration maintains state between periodic checks, demonstrating the monitoring pattern's ability to track long-running external operations over time. + +## Learn More + +- [Monitoring Pattern in Durable Functions](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-monitor) +- [Durable Task Scheduler Overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions Python Developer Guide](https://docs.microsoft.com/azure/azure-functions/durable/durable-functions-python) \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/function_app.py b/samples/durable-functions/python/monitoring/function_app.py new file mode 100644 index 0000000..6bf0e88 --- /dev/null +++ b/samples/durable-functions/python/monitoring/function_app.py @@ -0,0 +1,180 @@ +import logging +import json +import time +import uuid +from datetime import datetime, timedelta +import azure.functions as func +import azure.durable_functions as df + +# Create the Durable Functions app with HTTP auth level set to ANONYMOUS for easier testing +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +@app.route(route="start_monitoring_job", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def http_start_monitoring_job(req: func.HttpRequest, client): + """HTTP trigger that starts a job monitoring orchestration.""" + try: + # Get input from request body or use default + try: + req_body = req.get_json() if req.get_body() else None + except ValueError: + req_body = None + + # Generate a unique job ID or use provided one + job_id = req_body.get("job_id", f"job-{uuid.uuid4()}") if req_body else f"job-{uuid.uuid4()}" + + job_data = { + "job_id": job_id, + "polling_interval_seconds": req_body.get("polling_interval_seconds", 5) if req_body else 5, + "timeout_seconds": req_body.get("timeout_seconds", 30) if req_body else 30 + } + + logging.info(f"Starting job monitoring for: {job_id}") + logging.info(f"Polling interval: {job_data['polling_interval_seconds']} seconds") + logging.info(f"Timeout: {job_data['timeout_seconds']} seconds") + + # Start the orchestration + instance_id = await client.start_new("monitoring_job_orchestrator", client_input=job_data) + + # Return management URLs for the orchestration + return client.create_check_status_response(req, instance_id) + + except Exception as e: + logging.error(f"Error starting orchestration: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to start orchestration: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) + +@app.orchestration_trigger(context_name="context") +def monitoring_job_orchestrator(context: df.DurableOrchestrationContext): + """ + Orchestrator that demonstrates the monitoring pattern. + + This orchestrator periodically checks the status of a job until it + completes or reaches a maximum number of checks. + """ + job_data = context.get_input() + job_id = job_data.get("job_id") + polling_interval = job_data.get("polling_interval_seconds", 5) + timeout = job_data.get("timeout_seconds", 30) + + logging.info(f"Starting monitoring orchestration for job {job_id}") + logging.info(f"Polling interval: {polling_interval} seconds") + logging.info(f"Timeout: {timeout} seconds") + + # Record the start time + start_time = context.current_utc_datetime + expiration_time = start_time + timedelta(seconds=timeout) + + # Initialize monitoring state + job_status = { + "job_id": job_id, + "status": "Unknown", + "check_count": 0 + } + + # Loop until the job completes or times out + while True: + # Check current job status + check_input = {"job_id": job_id, "check_count": job_status.get("check_count", 0)} + job_status = yield context.call_activity("check_job_status", check_input) + + # Make the job status available to clients via custom status + context.set_custom_status(job_status) + + if job_status["status"] == "Completed": + logging.info(f"Job {job_id} completed after {job_status['check_count']} checks") + break + + # Check if we've hit the timeout + current_time = context.current_utc_datetime + if current_time >= expiration_time: + logging.info(f"Monitoring for job {job_id} timed out after {timeout} seconds") + job_status["status"] = "Timeout" + break + + # Wait using an activity function instead of timer (avoids timer configuration issues) + logging.info(f"Waiting {polling_interval} seconds before next check of job {job_id}") + yield context.call_activity("wait_for_interval", polling_interval) + + # Return the final status + return { + "job_id": job_id, + "final_status": job_status["status"], + "checks_performed": job_status["check_count"], + "monitoring_duration_seconds": (context.current_utc_datetime - start_time).total_seconds() + } + +@app.activity_trigger(input_name="jobData") +def check_job_status(jobData: dict) -> dict: + """ + Activity that simulates checking the status of a long-running job. + In a real application, this would call an external API or service. + """ + # Extract job_id from the job_data dictionary + job_id = jobData.get("job_id", "unknown") + check_count = jobData.get("check_count", 0) + + logging.info(f"Checking status for job: {job_id} (check #{check_count+1})") + + # Simulate job status progression + if check_count >= 3: + status = "Completed" + else: + status = "Running" + + return { + "job_id": job_id, + "status": status, + "check_count": check_count + 1, + "last_check_time": datetime.utcnow().isoformat() + } + +@app.activity_trigger(input_name="intervalSeconds") +def wait_for_interval(intervalSeconds: int) -> str: + """ + Activity that simulates waiting for a specified interval. + This is used instead of create_timer() to avoid timer configuration issues in Python SDK. + """ + logging.info(f"Waiting for {intervalSeconds} seconds...") + time.sleep(intervalSeconds) + return f"Waited for {intervalSeconds} seconds" + + + +@app.route(route="job_status/{jobId}", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_job_status(req: func.HttpRequest, client): + """Get the current status of a job being monitored.""" + job_id = req.route_params.get('jobId') + + try: + # In a real implementation, you would: + # 1. Find the orchestration instance for this job_id + # 2. Get its current status + # For demo purposes, we'll return a simulated status + + # Simulate job status lookup + status_info = { + "job_id": job_id, + "status": "Running", + "progress_percent": 75, + "estimated_completion": "2025-09-19T18:15:00Z", + "last_updated": datetime.utcnow().isoformat(), + "details": "Processing batch 3 of 4" + } + + return func.HttpResponse( + json.dumps(status_info), + mimetype="application/json" + ) + + except Exception as e: + logging.error(f"Error getting job status: {str(e)}") + return func.HttpResponse( + json.dumps({"error": f"Failed to get job status: {str(e)}"}), + status_code=500, + mimetype="application/json" + ) \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/host.json b/samples/durable-functions/python/monitoring/host.json new file mode 100644 index 0000000..ea59c5d --- /dev/null +++ b/samples/durable-functions/python/monitoring/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request,Error" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.34.0, 5.0.0)" + } +} \ No newline at end of file diff --git a/samples/durable-functions/python/monitoring/requirements.txt b/samples/durable-functions/python/monitoring/requirements.txt new file mode 100644 index 0000000..11c94ee --- /dev/null +++ b/samples/durable-functions/python/monitoring/requirements.txt @@ -0,0 +1,2 @@ +azure-functions>=1.18.0,<2.0.0 +azure-functions-durable>=1.2.9,<2.0.0 \ No newline at end of file diff --git a/samples/durable-functions/python/pdf-summarizer/host.json b/samples/durable-functions/python/pdf-summarizer/host.json index b7dd325..1d8da0b 100644 --- a/samples/durable-functions/python/pdf-summarizer/host.json +++ b/samples/durable-functions/python/pdf-summarizer/host.json @@ -14,12 +14,12 @@ "type": "azureManaged", "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" }, - "hubName": "TASKHUB_NAME" + "hubName": "%TASKHUB_NAME%" } }, "extensionBundle": { "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", - "version": "[4.29.0, 5.0.0)" + "version": "[4.34.0, 5.0.0)" } }