diff --git a/METRICS.md b/METRICS.md index 5d8c5643..2201d1c0 100644 --- a/METRICS.md +++ b/METRICS.md @@ -33,6 +33,7 @@ The Conductor Python SDK includes built-in metrics collection using Prometheus t | `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) | | `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full | | `task_paused_total` | Counter | `taskType` | Number of polls while worker paused | +| `worker_restart_total` | Counter | `taskType` | Number of times TaskHandler restarted a worker subprocess | | `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count | | `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) | | `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count | diff --git a/README.md b/README.md index b6e1f592..3468e73a 100644 --- a/README.md +++ b/README.md @@ -1,71 +1,142 @@ -# Conductor Python SDK +# Python SDK for Conductor -[![CI Status](https://github.com/conductor-oss/python-sdk/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-oss/python-sdk/actions/workflows/pull_request.yml) +[![CI](https://github.com/conductor-sdk/conductor-python/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-sdk/conductor-python/actions/workflows/pull_request.yml) +[![PyPI](https://img.shields.io/pypi/v/conductor-python.svg)](https://pypi.org/project/conductor-python/) +[![Python Versions](https://img.shields.io/pypi/pyversions/conductor-python.svg)](https://pypi.org/project/conductor-python/) +[![License](https://img.shields.io/pypi/l/conductor-python.svg)](LICENSE) -Python SDK for [Conductor](https://www.conductor-oss.org/) — the leading open-source orchestration platform for building distributed applications, AI agents, and workflow-driven microservices. Define workflows as code, run workers anywhere, and let Conductor handle retries, state management, and observability. +Python SDK for [Conductor](https://www.conductor-oss.org/) (OSS and Orkes Conductor) — an orchestration platform for building distributed applications, AI agents, and workflow-driven microservices. Define workflows as code, run workers anywhere, and let Conductor handle retries, state management, and observability. If you find [Conductor](https://github.com/conductor-oss/conductor) useful, please consider giving it a star on GitHub -- it helps the project grow. [![GitHub stars](https://img.shields.io/github/stars/conductor-oss/conductor.svg?style=social&label=Star&maxAge=)](https://GitHub.com/conductor-oss/conductor/) -## 60-Second Quickstart -Install the SDK and create a single file `quickstart.py`: -```shell -pip install conductor-python -``` + +* [Python SDK for Conductor](#python-sdk-for-conductor) + * [Start Conductor server](#start--conductor-server) + * [Install the SDK](#install-the-sdk) + * [60-Second Quickstart](#60-second-quickstart) + * [Comprehensive worker example](#comprehensive-worker-example-) + * [Workers](#workers) + * [Monitoring Workers](#monitoring-workers) + * [Workflows](#workflows) + * [Troubleshooting](#troubleshooting) + * [AI & LLM Workflows](#ai--llm-workflows) + * [Examples](#examples) + * [API Journey Examples](#api-journey-examples) + * [Documentation](#documentation) + * [Support](#support) + * [Frequently Asked Questions](#frequently-asked-questions) + * [License](#license) + + -## Setting Up Conductor +## Start Conductor server -If you don't already have a Conductor server running: +If you don't already have a Conductor server running, pick one: + +**Docker Compose (recommended, includes UI):** + +```shell +docker run -p 8080:8080 conductoross/conductor:latest +``` +The UI will be available at `http://localhost:8080` and the API at `http://localhost:8080/api` -**macOS / Linux:** +**MacOS / Linux (one-liner):** (If you don't want to use docker, you can install and run the binary directly) ```shell curl -sSL https://raw.githubusercontent.com/conductor-oss/conductor/main/conductor_server.sh | sh ``` -**Docker:** +**Conductor CLI** ```shell -docker run -p 8080:8080 conductoross/conductor:latest +# Installs conductor cli +npm install -g @conductor-oss/conductor-cli + +# Start the open source conductor server +conductor server start +# see conductor server --help for all the available commands ``` -The UI will be available at `http://localhost:8080`. -## Run your first workflow app +## Install the SDK + +```shell +pip install conductor-python +``` + +## 60-Second Quickstart + +**Step 1: Create a workflow** + +Workflows are definitions that reference task types (e.g. a SIMPLE task called `greet`). We'll build a workflow called +`greetings` that runs one task and returns its output. + +Assuming you have a `WorkflowExecutor` (`executor`) and a worker task (`greet`): + +```python +from conductor.client.workflow.conductor_workflow import ConductorWorkflow + +workflow = ConductorWorkflow(name='greetings', version=1, executor=executor) +greet_task = greet(task_ref_name='greet_ref', name=workflow.input('name')) +workflow >> greet_task +workflow.output_parameters({'result': greet_task.output('result')}) +workflow.register(overwrite=True) +``` + +**Step 2: Write worker** + +Workers are just Python functions decorated with `@worker_task` that poll Conductor for tasks and execute them. + +```python +from conductor.client.worker.worker_task import worker_task + +# register_task_def=True is convenient for local dev quickstarts; in production, manage task definitions separately. +@worker_task(task_definition_name='greet', register_task_def=True) +def greet(name: str) -> str: + return f'Hello {name}' +``` + +**Step 3: Run your first workflow app** + +Create a `quickstart.py` with the following: + ```python from conductor.client.automator.task_handler import TaskHandler from conductor.client.configuration.configuration import Configuration +from conductor.client.orkes_clients import OrkesClients from conductor.client.workflow.conductor_workflow import ConductorWorkflow -from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor from conductor.client.worker.worker_task import worker_task -# Step 1: Define a worker — any Python function -@worker_task(task_definition_name='greet') +# A worker is any Python function. +@worker_task(task_definition_name='greet', register_task_def=True) def greet(name: str) -> str: return f'Hello {name}' def main(): - # Step 2: Configure the SDK (reads CONDUCTOR_SERVER_URL from env) + # Configure the SDK (reads CONDUCTOR_SERVER_URL / CONDUCTOR_AUTH_* from env). config = Configuration() - # Step 3: Build a workflow with the >> operator - executor = WorkflowExecutor(configuration=config) - workflow = ConductorWorkflow(name='greetings', version=1, executor=executor) - workflow >> greet(task_ref_name='greet_ref', name=workflow.input('name')) - workflow.register(True) + clients = OrkesClients(configuration=config) + executor = clients.get_workflow_executor() - # Step 4: Start polling for tasks - task_handler = TaskHandler(configuration=config) - task_handler.start_processes() + # Build a workflow with the >> operator. + workflow = ConductorWorkflow(name='greetings', version=1, executor=executor) + greet_task = greet(task_ref_name='greet_ref', name=workflow.input('name')) + workflow >> greet_task + workflow.output_parameters({'result': greet_task.output('result')}) + workflow.register(overwrite=True) - # Step 5: Run the workflow and get the result - result = executor.execute(name='greetings', version=1, workflow_input={'name': 'Conductor'}) - print(f'result: {result.output["result"]}') - print(f'execution: {config.ui_host}/execution/{result.workflow_id}') + # Start polling for tasks (one worker subprocess per worker function). + with TaskHandler(configuration=config, scan_for_annotated_workers=True) as task_handler: + task_handler.start_processes() - task_handler.stop_processes() + # Run the workflow and get the result. + run = executor.execute(name='greetings', version=1, workflow_input={'name': 'Conductor'}) + print(f'result: {run.output["result"]}') + print(f'execution: {config.ui_host}/execution/{run.workflow_id}') if __name__ == '__main__': @@ -75,42 +146,42 @@ if __name__ == '__main__': Run it: ```shell -export CONDUCTOR_SERVER_URL="http://localhost:8080/api" python quickstart.py ``` -> **Using Orkes Conductor?** Export your authentication credentials as well: +> ### Using Orkes Conductor / Remote Server? +> Export your authentication credentials as well: +> > ```shell > export CONDUCTOR_SERVER_URL="https://your-cluster.orkesconductor.io/api" +> +> # If using Orkes Conductor that requires auth key/secret > export CONDUCTOR_AUTH_KEY="your-key" > export CONDUCTOR_AUTH_SECRET="your-secret" +> +> # Optional — set to false to force HTTP/1.1 if your network environment has unstable long-lived HTTP/2 connections (default: true) +> # export CONDUCTOR_HTTP2_ENABLED=false > ``` > See [Configuration](#configuration) for details. -That's it -- you just defined a worker, built a workflow, and executed it. Open [http://localhost:8080](http://localhost:8080) to see the execution in the Conductor UI. +That's it -- you just defined a worker, built a workflow, and executed it. Open the Conductor UI (default: +[http://localhost:8127](http://localhost:8127)) to see the execution. -### Comprehensive example with sync + async workers, metrics, and long-running tasks +## Comprehensive worker example +The example includes sync + async workers, metrics, and long-running tasks See [examples/workers_e2e.py](examples/workers_e2e.py) -### Configuration - -The SDK reads configuration from environment variables: - -```shell -# Required — Conductor server endpoint -export CONDUCTOR_SERVER_URL="http://localhost:8080/api" - -# Optional — Authentication (required for Orkes Conductor) -export CONDUCTOR_AUTH_KEY="your-key" -export CONDUCTOR_AUTH_SECRET="your-secret" -``` --- ## Workers -Workers are Python functions that execute tasks. Decorate any function with `@worker_task` to make it a distributed worker: -Note: Workers can be used by LLMs for tool calling. +Workers are Python functions that execute Conductor tasks. Decorate any function with `@worker_task` to: + +- register it as a worker (auto-discovered by `TaskHandler`) +- use it as a workflow task (call it with `task_ref_name=...`) + +Note: Workers can also be used by LLMs for tool calling (see [AI & LLM Workflows](#ai--llm-workflows)). ```python from conductor.client.worker.worker_task import worker_task @@ -123,6 +194,8 @@ def greet(name: str) -> str: **Async workers** for I/O-bound tasks — the SDK automatically uses `AsyncTaskRunner` (event loop, no thread overhead): ```python +import httpx + @worker_task(task_definition_name='fetch_data') async def fetch_data(url: str) -> dict: async with httpx.AsyncClient() as client: @@ -132,6 +205,9 @@ async def fetch_data(url: str) -> dict: **Start workers** with `TaskHandler`: +Note: `@worker_task` functions are discovered only after their modules are imported. Either import your worker modules +explicitly, or pass `import_modules=[...]` when constructing `TaskHandler`. + ```python from conductor.client.automator.task_handler import TaskHandler from conductor.client.configuration.configuration import Configuration @@ -140,13 +216,76 @@ api_config = Configuration() task_handler = TaskHandler( workers=[], configuration=api_config, - scan_for_annotated_workers=True, + scan_for_annotated_workers=True, # auto-discover @worker_task functions + # monitor_processes=True and restart_on_failure=True by default ) task_handler.start_processes() +try: + task_handler.join_processes() # blocks (workers poll forever) +finally: + task_handler.stop_processes() ``` Workers support complex inputs (dataclasses), long-running tasks (`TaskInProgress`), and hierarchical configuration via environment variables. +**Resilience: auto-restart and health checks** + + +Workers are typically long-lived services. By default, `TaskHandler` monitors worker subprocesses and restarts them if +they exit unexpectedly. + +For a `/healthcheck` endpoint, you can use: + +```python +task_handler.is_healthy() +task_handler.get_worker_process_status() +``` + +To disable monitoring/restarts (e.g., local debugging): + +```python +TaskHandler(..., monitor_processes=False, restart_on_failure=False) +``` + +**Worker Configuration** + +Workers support hierarchical environment variable configuration — global settings that can be overridden per worker: + +```shell +# Global (all workers) +export CONDUCTOR_WORKER_ALL_POLL_INTERVAL_MILLIS=250 +export CONDUCTOR_WORKER_ALL_THREAD_COUNT=20 +export CONDUCTOR_WORKER_ALL_DOMAIN=production + +# Per-worker override +export CONDUCTOR_WORKER_GREETINGS_THREAD_COUNT=50 +``` + +See [WORKER_CONFIGURATION.md](WORKER_CONFIGURATION.md) for all options. + +## Monitoring Workers + +Enable Prometheus metrics: + +```python +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.configuration.settings.metrics_settings import MetricsSettings + +api_config = Configuration() +metrics_settings = MetricsSettings(directory='/tmp/conductor-metrics', http_port=8000) + +task_handler = TaskHandler(configuration=api_config, metrics_settings=metrics_settings, scan_for_annotated_workers=True) +task_handler.start_processes() +# Metrics at http://localhost:8000/metrics +try: + task_handler.join_processes() # blocks (workers poll forever) +finally: + task_handler.stop_processes() +``` + +See [METRICS.md](METRICS.md) for details. + **Learn more:** - [Worker Design & Architecture](docs/design/WORKER_DESIGN.md) — AsyncTaskRunner vs TaskRunner, discovery, lifecycle - [Worker Configuration](WORKER_CONFIGURATION.md) — Environment variable configuration system @@ -157,13 +296,19 @@ Workers support complex inputs (dataclasses), long-running tasks (`TaskInProgres Define workflows in Python using the `>>` operator to chain tasks: ```python +from conductor.client.configuration.configuration import Configuration +from conductor.client.orkes_clients import OrkesClients from conductor.client.workflow.conductor_workflow import ConductorWorkflow -from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor -workflow_executor = WorkflowExecutor(configuration=api_config) +api_config = Configuration() +clients = OrkesClients(configuration=api_config) +workflow_executor = clients.get_workflow_executor() + workflow = ConductorWorkflow(name='greetings', version=1, executor=workflow_executor) +# Assuming greet is defined (see Workers section above). workflow >> greet(task_ref_name='greet_ref', name=workflow.input('name')) -workflow.register(True) +# Registering is required if you want to start/execute by name+version; optional if you only execute inline. +workflow.register(overwrite=True) ``` **Execute workflows:** @@ -176,10 +321,14 @@ print(result.output) # Asynchronous (returns workflow ID immediately) from conductor.client.http.models import StartWorkflowRequest request = StartWorkflowRequest(name='greetings', version=1, input={'name': 'Orkes'}) -workflow_id = workflow_client.start_workflow(request) +workflow_id = workflow_executor.start_workflow(request) + +# Inline (sends the workflow definition with the request; no prior register required) +run = workflow.execute(workflow_input={'name': 'Orkes'}, wait_for_seconds=10) +print(run.output) ``` -**Manage running workflows:** +**Manage running workflows and send signals:** ```python from conductor.client.orkes_clients import OrkesClients @@ -199,21 +348,22 @@ workflow_client.restart_workflow(workflow_id) - [Workflow Testing](docs/WORKFLOW_TESTING.md) — Unit testing with mock task outputs - [Metadata Management](docs/METADATA.md) — Task & workflow definitions -## Hello World - -The complete Hello World example lives in [`examples/helloworld/`](examples/helloworld/): - -```shell -python examples/helloworld/helloworld.py -``` - -It creates a `greetings` workflow with one worker task, runs the worker, executes the workflow, and prints the result. See the [Hello World source](examples/helloworld/helloworld.py) for the full code. +## Troubleshooting +- Worker stops polling or crashes: `TaskHandler` monitors and restarts worker subprocesses by default. Consider exposing + a `/healthcheck` endpoint using `task_handler.is_healthy()` + `task_handler.get_worker_process_status()`. If you + enable metrics, alert on `worker_restart_total`. +- `httpcore.RemoteProtocolError: `: the SDK recreates the underlying HTTP client and retries + once for idempotent requests. If your environment is still unstable with HTTP/2, set + `CONDUCTOR_HTTP2_ENABLED=false` (forces HTTP/1.1) — see `docs/WORKER.md`. +- FastAPI/Uvicorn: avoid running `uvicorn` with multiple web workers unless you explicitly want multiple independent + `TaskHandler`s polling Conductor (see `examples/fastapi_worker_service.py`). +--- ## AI & LLM Workflows Conductor supports AI-native workflows including agentic tool calling, RAG pipelines, and multi-agent orchestration. -### Agentic Workflows +**Agentic Workflows** Build AI agents where LLMs dynamically select and call Python workers as tools. See [examples/agentic_workflows/](examples/agentic_workflows/) for all examples. @@ -225,7 +375,7 @@ Build AI agents where LLMs dynamically select and call Python workers as tools. | [function_calling_example.py](examples/agentic_workflows/function_calling_example.py) | LLM picks which Python function to call based on user queries | | [mcp_weather_agent.py](examples/agentic_workflows/mcp_weather_agent.py) | AI agent using MCP tools for weather queries | -### LLM and RAG Workflows +**LLM and RAG Workflows** | Example | Description | |---------|-------------| @@ -244,36 +394,6 @@ pip install "markitdown[pdf]" python examples/rag_workflow.py document.pdf "What are the key findings?" ``` -### Worker Configuration - -Workers support hierarchical environment variable configuration — global settings that can be overridden per worker: - -```shell -# Global (all workers) -export CONDUCTOR_WORKER_ALL_POLL_INTERVAL_MILLIS=250 -export CONDUCTOR_WORKER_ALL_THREAD_COUNT=20 -export CONDUCTOR_WORKER_ALL_DOMAIN=production - -# Per-worker override -export CONDUCTOR_WORKER_GREETINGS_THREAD_COUNT=50 -``` - -See [WORKER_CONFIGURATION.md](WORKER_CONFIGURATION.md) for all options. - -### Monitoring - -Enable Prometheus metrics: - -```python -from conductor.client.configuration.settings.metrics_settings import MetricsSettings - -metrics_settings = MetricsSettings(directory='/tmp/conductor-metrics', http_port=8000) -task_handler = TaskHandler(configuration=api_config, metrics_settings=metrics_settings, scan_for_annotated_workers=True) -# Metrics at http://localhost:8000/metrics -``` - -See [METRICS.md](METRICS.md) for details. - ## Examples See the [Examples Guide](examples/README.md) for the full catalog. Key examples: @@ -281,6 +401,7 @@ See the [Examples Guide](examples/README.md) for the full catalog. Key examples: | Example | Description | Run | |---------|-------------|-----| | [workers_e2e.py](examples/workers_e2e.py) | End-to-end: sync + async workers, metrics | `python examples/workers_e2e.py` | +| [fastapi_worker_service.py](examples/fastapi_worker_service.py) | FastAPI: expose a workflow as an API (+ workers) (deps: fastapi, uvicorn) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` | | [helloworld.py](examples/helloworld/helloworld.py) | Minimal hello world | `python examples/helloworld/helloworld.py` | | [dynamic_workflow.py](examples/dynamic_workflow.py) | Build workflows programmatically | `python examples/dynamic_workflow.py` | | [llm_chat.py](examples/agentic_workflows/llm_chat.py) | AI multi-turn chat | `python examples/agentic_workflows/llm_chat.py` | @@ -290,7 +411,7 @@ See the [Examples Guide](examples/README.md) for the full catalog. Key examples: | [test_workflows.py](examples/test_workflows.py) | Unit testing workflows | `python -m unittest examples.test_workflows` | | [kitchensink.py](examples/kitchensink.py) | All task types (HTTP, JS, JQ, Switch) | `python examples/kitchensink.py` | -### API Journey Examples +## API Journey Examples End-to-end examples covering all APIs for each domain: @@ -322,7 +443,8 @@ End-to-end examples covering all APIs for each domain: ## Support -- [Open an issue](https://github.com/conductor-oss/conductor/issues) for bugs, questions, and feature requests +- [Open an issue (SDK)](https://github.com/conductor-sdk/conductor-python/issues) for SDK bugs, questions, and feature requests +- [Open an issue (Conductor server)](https://github.com/conductor-oss/conductor/issues) for Conductor OSS server issues - [Join the Conductor Slack](https://join.slack.com/t/orkes-conductor/shared_invite/zt-2vdbx239s-Eacdyqya9giNLHfrCavfaA) for community discussion and help - [Orkes Community Forum](https://community.orkes.io/) for Q&A diff --git a/docs/WORKER.md b/docs/WORKER.md index 42e6a4d4..372d3e48 100644 --- a/docs/WORKER.md +++ b/docs/WORKER.md @@ -259,12 +259,64 @@ workers = [ ) ] -# If there are decorated workers in your application, scan_for_annotated_workers should be set -# default value of scan_for_annotated_workers is False +# TaskHandler scans for @worker_task decorated workers by default. +# Set scan_for_annotated_workers=False if you want to disable auto-discovery. with TaskHandler(workers, configuration, scan_for_annotated_workers=True) as task_handler: task_handler.start_processes() ``` +### Resilience: auto-restart and health checks + +If you run workers as a long-lived service (e.g., alongside FastAPI/Uvicorn), you can optionally enable process +supervision so the `TaskHandler` monitors worker processes and restarts them if they exit unexpectedly: + +```python +with TaskHandler( + workers, + configuration, + scan_for_annotated_workers=True, + # Enabled by default. Set to False to disable supervision. + monitor_processes=True, + restart_on_failure=True, +) as task_handler: + task_handler.start_processes() +``` + +For a `/healthcheck` endpoint, you can use: + +```python +task_handler.is_healthy() +task_handler.get_worker_process_status() +``` + +To disable restarts/monitoring (e.g., for local debugging), set: + +```python +TaskHandler(..., monitor_processes=False, restart_on_failure=False) +``` + +### Mitigation for intermittent HTTP/2 connection termination + +The SDK uses `httpx` for outbound calls to the Conductor/Orkes server. By default, it enables HTTP/2 for these calls. +In some environments (certain proxies/load balancers/NATs), long-lived HTTP/2 connections may be terminated, which can +surface as errors like `httpcore.RemoteProtocolError: `. + +The SDK automatically attempts to recover by recreating the underlying HTTP client and retrying the request once. +If your environment is still unstable with HTTP/2, you can force the SDK to use HTTP/1.1 instead via an environment +variable. + +#### `CONDUCTOR_HTTP2_ENABLED` + +- **What it does**: Controls whether the Conductor Python SDK uses HTTP/2 for outbound requests to the Conductor server. +- **Default**: `true` (HTTP/2 enabled). +- **Scope**: Affects all SDK clients (workers, `OrkesClients`, sync + async). It does *not* change your FastAPI/Uvicorn + server behavior; it only changes how the SDK talks to Conductor. +- **Values**: `false|0|no|off` disables HTTP/2. Anything else enables it. + +```shell +export CONDUCTOR_HTTP2_ENABLED=false +``` + If you paste the above code in a file called main.py, you can launch the workers by running: ```shell python3 main.py diff --git a/examples/README.md b/examples/README.md index 034f26ee..9dfe81d9 100644 --- a/examples/README.md +++ b/examples/README.md @@ -25,6 +25,7 @@ python examples/workers_e2e.py |------|-------------|-----| | **workers_e2e.py** | ⭐ Start here - sync + async workers | `python examples/workers_e2e.py` | | **worker_example.py** | Comprehensive patterns (None returns, TaskInProgress) | `python examples/worker_example.py` | +| **fastapi_worker_service.py** | FastAPI exposing a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` | | **worker_configuration_example.py** | Hierarchical configuration (env vars) | `python examples/worker_configuration_example.py` | | **task_context_example.py** | Task context (logs, poll_count, task_id) | `python examples/task_context_example.py` | | **task_workers.py** | Task worker patterns with dataclasses | `python examples/task_workers.py` | @@ -416,4 +417,4 @@ export conductor.worker.all.thread_count=20 --- **Repository**: https://github.com/conductor-oss/conductor-python -**License**: Apache 2.0 \ No newline at end of file +**License**: Apache 2.0 diff --git a/examples/fastapi_worker_service.py b/examples/fastapi_worker_service.py new file mode 100644 index 00000000..136cea92 --- /dev/null +++ b/examples/fastapi_worker_service.py @@ -0,0 +1,213 @@ +""" +FastAPI + Conductor workers in one process. + +Install (example-only deps): + pip install fastapi uvicorn + +Run (single web worker; TaskHandler will spawn one process per Conductor worker): + export CONDUCTOR_SERVER_URL="http://localhost:8080/api" + export CONDUCTOR_AUTH_KEY="..." + export CONDUCTOR_AUTH_SECRET="..." + uvicorn examples.fastapi_worker_service:app --host 0.0.0.0 --port 8081 --workers 1 + +Trigger the workflow via API (waits up to 10s for completion): + curl -s -X POST http://localhost:8081/v1/hello \\ + -H 'content-type: application/json' \\ + -d '{"name":"Ada","a":2,"b":3}' | jq . + +Notes: + - Do NOT run uvicorn with multiple web workers unless you explicitly want multiple independent TaskHandlers polling. + - TaskHandler supervision is enabled by default (monitor + restart worker subprocesses). +""" + +from __future__ import annotations + +import os +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field + +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from conductor.client.context.task_context import get_task_context +from conductor.client.orkes_clients import OrkesClients +from conductor.client.worker.worker_task import worker_task +from conductor.client.workflow.conductor_workflow import ConductorWorkflow +from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor + + +# --------------------------------------------------------------------------- +# Example worker(s) +# --------------------------------------------------------------------------- + +@worker_task( + task_definition_name="fastapi_normalize_name", + poll_interval_millis=100, + register_task_def=True, + overwrite_task_def=False, +) +def normalize_name(name: str) -> str: + # This shows how to access task context safely. + _ = get_task_context() + return name.strip().title() + + +@worker_task( + task_definition_name="fastapi_add_numbers", + poll_interval_millis=100, + register_task_def=True, + overwrite_task_def=False, +) +def add_numbers(a: int, b: int) -> int: + _ = get_task_context() + return a + b + + +@worker_task( + task_definition_name="fastapi_build_message", + poll_interval_millis=100, + register_task_def=True, + overwrite_task_def=False, +) +def build_message(normalized_name: str, total: int) -> dict: + ctx = get_task_context() + return { + "message": f"Hello {normalized_name}! {total=}", + "normalized_name": normalized_name, + "total": total, + "task_id": ctx.get_task_id(), + "workflow_id": ctx.get_workflow_instance_id(), + } + + +def _build_hello_workflow(executor: WorkflowExecutor) -> ConductorWorkflow: + workflow = ConductorWorkflow(executor=executor, name="fastapi_hello_workflow", version=1) + + t1 = normalize_name(task_ref_name="normalize_name_ref", name=workflow.input("name")) + t2 = add_numbers(task_ref_name="add_numbers_ref", a=workflow.input("a"), b=workflow.input("b")) + t3 = build_message( + task_ref_name="build_message_ref", + normalized_name=t1.output("result"), + total=t2.output("result"), + ) + + workflow >> t1 >> t2 >> t3 + + workflow.output_parameters( + output_parameters={ + "message": t3.output("message"), + "normalized_name": t3.output("normalized_name"), + "total": t3.output("total"), + } + ) + + return workflow + + +class HelloRequest(BaseModel): + name: str = Field(default="World", description="Name to greet") + a: int = Field(default=1, description="First number") + b: int = Field(default=2, description="Second number") + + +# --------------------------------------------------------------------------- +# FastAPI app + TaskHandler lifecycle +# --------------------------------------------------------------------------- + +task_handler: Optional[TaskHandler] = None +workflow_executor: Optional[WorkflowExecutor] = None +api_config: Optional[Configuration] = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global task_handler, workflow_executor, api_config + + api_config = Configuration() + clients = OrkesClients(configuration=api_config) + workflow_executor = clients.get_workflow_executor() + + # scan_for_annotated_workers=True will pick up @worker_task functions in this module. + task_handler = TaskHandler( + workers=[], + configuration=api_config, + scan_for_annotated_workers=True, + # Defaults are already True, but keeping these explicit in the example: + monitor_processes=True, + restart_on_failure=True, + ) + task_handler.start_processes() + + try: + yield + finally: + if task_handler is not None: + task_handler.stop_processes() + task_handler = None + workflow_executor = None + api_config = None + + +app = FastAPI(lifespan=lifespan) + + +@app.get("/healthcheck") +def healthcheck(): + # 503 if worker processes aren't healthy; useful for container orchestrators. + if task_handler is None: + return JSONResponse({"ok": False, "detail": "workers_not_started"}, status_code=503) + + ok = task_handler.is_healthy() + payload = { + "ok": ok, + "workers": task_handler.get_worker_process_status(), + } + return JSONResponse(payload, status_code=200 if ok else 503) + + +@app.post("/v1/hello") +def hello(req: HelloRequest): + """ + Expose a Conductor workflow as an API: + - Builds an inline workflow definition with 3 SIMPLE tasks + - Starts it and waits up to 10 seconds for completion + - Returns workflow output as the HTTP response + """ + if task_handler is None or workflow_executor is None or api_config is None: + return JSONResponse({"ok": False, "detail": "service_not_ready"}, status_code=503) + if not task_handler.is_healthy(): + return JSONResponse( + {"ok": False, "detail": "workers_unhealthy", "workers": task_handler.get_worker_process_status()}, + status_code=503, + ) + + workflow = _build_hello_workflow(executor=workflow_executor) + payload = req.model_dump() if hasattr(req, "model_dump") else req.dict() # pydantic v2/v1 + + try: + run = workflow.execute(workflow_input=payload, wait_for_seconds=10) + except Exception as e: + return JSONResponse({"ok": False, "detail": "workflow_start_failed", "error": str(e)}, status_code=502) + + response = { + "ok": run.status == "COMPLETED", + "workflow_id": run.workflow_id, + "status": run.status, + "output": run.output, + "ui_url": f"{api_config.ui_host}/execution/{run.workflow_id}", + } + return JSONResponse(response, status_code=200 if run.status == "COMPLETED" else 202) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + "examples.fastapi_worker_service:app", + host="0.0.0.0", + port=int(os.getenv("PORT", "8081")), + workers=1, + ) diff --git a/src/conductor/client/automator/json_schema_generator.py b/src/conductor/client/automator/json_schema_generator.py index 536b4254..d93e457c 100644 --- a/src/conductor/client/automator/json_schema_generator.py +++ b/src/conductor/client/automator/json_schema_generator.py @@ -188,11 +188,13 @@ def _type_to_json_schema(type_hint, strict_schema: bool = False) -> Optional[Dic if len(non_none_args) == 1: # Optional[T] case inner_schema = _type_to_json_schema(non_none_args[0], strict_schema) - if inner_schema: - # For optional, we could use oneOf or just mark as nullable - # Using nullable for simplicity - inner_schema['nullable'] = True - return inner_schema + if inner_schema is not None: + # Draft-07 JSON Schema does not support OpenAPI's `nullable`. + # Represent Optional[T] as a union with null. + if inner_schema == {}: + # "Any" already includes null, so keep it minimal. + return inner_schema + return {"anyOf": [inner_schema, {"type": "null"}]} # Multiple non-None types in Union - too complex return None diff --git a/src/conductor/client/automator/task_handler.py b/src/conductor/client/automator/task_handler.py index 3185b4ae..342ec321 100644 --- a/src/conductor/client/automator/task_handler.py +++ b/src/conductor/client/automator/task_handler.py @@ -4,9 +4,11 @@ import inspect import logging import os +import threading +import time from multiprocessing import Process, freeze_support, Queue, set_start_method from sys import platform -from typing import List, Optional +from typing import List, Optional, Any, Dict from conductor.client.automator.task_runner import TaskRunner from conductor.client.automator.async_task_runner import AsyncTaskRunner @@ -16,6 +18,9 @@ from conductor.client.event.sync_event_dispatcher import SyncEventDispatcher from conductor.client.event.sync_listener_register import register_task_runner_listener from conductor.client.telemetry.metrics_collector import MetricsCollector +from conductor.client.telemetry.model.metric_documentation import MetricDocumentation +from conductor.client.telemetry.model.metric_label import MetricLabel +from conductor.client.telemetry.model.metric_name import MetricName from conductor.client.worker.worker import Worker from conductor.client.worker.worker_interface import WorkerInterface from conductor.client.worker.worker_config import resolve_worker_config @@ -37,9 +42,31 @@ _mp_fork_set = True except Exception as e: logger.info("error when setting multiprocessing.set_start_method - maybe the context is set %s", e.args) - if platform == "darwin": +if platform == "darwin": os.environ["no_proxy"] = "*" +def _run_sync_worker_process( + worker: WorkerInterface, + configuration: Optional[Configuration], + metrics_settings: Optional[MetricsSettings], + event_listeners: Optional[List[Any]], +) -> None: + """Process target: construct TaskRunner after fork/spawn and run forever.""" + task_runner = TaskRunner(worker, configuration, metrics_settings, event_listeners) + task_runner.run() + + +def _run_async_worker_process( + worker: WorkerInterface, + configuration: Optional[Configuration], + metrics_settings: Optional[MetricsSettings], + event_listeners: Optional[List[Any]], +) -> None: + """Process target: construct AsyncTaskRunner after fork/spawn and run forever.""" + async_task_runner = AsyncTaskRunner(worker, configuration, metrics_settings, event_listeners) + asyncio.run(async_task_runner.run()) + + def register_decorated_fn(name: str, poll_interval: int, domain: str, worker_id: str, func, thread_count: int = 1, register_task_def: bool = False, poll_timeout: int = 100, lease_extend_enabled: bool = False, task_def: Optional['TaskDef'] = None, @@ -152,10 +179,18 @@ def __init__( metrics_settings: Optional[MetricsSettings] = None, scan_for_annotated_workers: bool = True, import_modules: Optional[List[str]] = None, - event_listeners: Optional[List] = None + event_listeners: Optional[List] = None, + monitor_processes: bool = True, + restart_on_failure: bool = True, + monitor_interval_seconds: float = 5.0, + restart_backoff_seconds: float = 1.0, + restart_backoff_max_seconds: float = 60.0, + restart_max_attempts: int = 0 ): workers = workers or [] self.logger_process, self.queue = _setup_logging_queue(configuration) + self._configuration = configuration + self._metrics_settings = metrics_settings # Set prometheus multiprocess directory BEFORE any worker processes start # This must be done before prometheus_client is imported in worker processes @@ -220,6 +255,19 @@ def __init__( self.__create_task_runner_processes(workers, configuration, metrics_settings) self.__create_metrics_provider_process(metrics_settings) + self._worker_restart_counter = None + + # Optional supervision: monitor worker processes and (optionally) restart on failure. + self.monitor_processes = monitor_processes + self.restart_on_failure = restart_on_failure + self.monitor_interval_seconds = monitor_interval_seconds + self.restart_backoff_seconds = restart_backoff_seconds + self.restart_backoff_max_seconds = restart_backoff_max_seconds + self.restart_max_attempts = restart_max_attempts + self._monitor_stop_event = threading.Event() + self._monitor_thread: Optional[threading.Thread] = None + self._restart_counts: List[int] = [0 for _ in self.workers] + self._next_restart_at: List[float] = [0.0 for _ in self.workers] logger.info("TaskHandler initialized") def __enter__(self): @@ -229,6 +277,9 @@ def __exit__(self, exc_type, exc_value, traceback): self.stop_processes() def stop_processes(self) -> None: + self._monitor_stop_event.set() + if self._monitor_thread is not None and self._monitor_thread.is_alive(): + self._monitor_thread.join(timeout=2.0) self.__stop_task_runner_processes() self.__stop_metrics_provider_process() logger.info("Stopped worker processes...") @@ -238,8 +289,10 @@ def stop_processes(self) -> None: def start_processes(self) -> None: logger.info("Starting worker processes...") freeze_support() + self._monitor_stop_event.clear() self.__start_task_runner_processes() self.__start_metrics_provider_process() + self.__start_monitor_thread() logger.info("Started all processes") def join_processes(self) -> None: @@ -289,22 +342,175 @@ def __create_task_runner_process( is_async_worker = inspect.iscoroutinefunction(worker.execute) if is_async_worker: - # Use AsyncTaskRunner for async def workers - async_task_runner = AsyncTaskRunner(worker, configuration, metrics_settings, self.event_listeners) - # Wrap async runner in a sync function for multiprocessing - process = Process(target=self.__run_async_runner, args=(async_task_runner,)) - logger.debug(f"Created AsyncTaskRunner for async worker: {worker.get_task_definition_name()}") + process = Process( + target=_run_async_worker_process, + args=(worker, configuration, metrics_settings, self.event_listeners) + ) + logger.debug(f"Created AsyncTaskRunner process for async worker: {worker.get_task_definition_name()}") else: - # Use TaskRunner for sync def workers - task_runner = TaskRunner(worker, configuration, metrics_settings, self.event_listeners) - process = Process(target=task_runner.run) - logger.debug(f"Created TaskRunner for sync worker: {worker.get_task_definition_name()}") + process = Process( + target=_run_sync_worker_process, + args=(worker, configuration, metrics_settings, self.event_listeners) + ) + logger.debug(f"Created TaskRunner process for sync worker: {worker.get_task_definition_name()}") self.task_runner_processes.append(process) - def __run_async_runner(self, async_task_runner: AsyncTaskRunner) -> None: - """Helper method to run AsyncTaskRunner in event loop within multiprocessing context.""" - asyncio.run(async_task_runner.run()) + def __start_monitor_thread(self) -> None: + if not self.monitor_processes: + return + if self._monitor_thread is not None and self._monitor_thread.is_alive(): + return + self._monitor_thread = threading.Thread(target=self.__monitor_loop, name="TaskHandlerMonitor", daemon=True) + self._monitor_thread.start() + logger.info( + "TaskHandler monitor started (restart_on_failure=%s, interval=%ss)", + self.restart_on_failure, + self.monitor_interval_seconds + ) + + def __monitor_loop(self) -> None: + while not self._monitor_stop_event.is_set(): + try: + self.__check_and_restart_processes() + except Exception as e: + logger.debug("TaskHandler monitor loop error: %s", e) + # Use Event.wait() so stop_processes can wake the thread immediately. + self._monitor_stop_event.wait(self.monitor_interval_seconds) + + def __check_and_restart_processes(self) -> None: + if self._monitor_stop_event.is_set(): + return + for i, process in enumerate(list(self.task_runner_processes)): + if process is None: + continue + if process.is_alive(): + continue + exitcode = process.exitcode + if exitcode is None: + continue + worker = self.workers[i] if i < len(self.workers) else None + worker_name = worker.get_task_definition_name() if worker is not None else f"worker[{i}]" + logger.warning("Worker process exited (worker=%s, pid=%s, exitcode=%s)", worker_name, process.pid, exitcode) + if not self.restart_on_failure: + continue + self.__restart_worker_process(i) + + def __restart_worker_process(self, index: int) -> None: + if self._monitor_stop_event.is_set(): + return + if index >= len(self.workers) or index >= len(self.task_runner_processes): + return + + # Enforce max attempts if configured (0 = unlimited) + if self.restart_max_attempts > 0 and self._restart_counts[index] >= self.restart_max_attempts: + worker = self.workers[index] + logger.error( + "Not restarting worker process: max restart attempts reached (worker=%s, attempts=%s)", + worker.get_task_definition_name(), + self._restart_counts[index] + ) + return + + now = time.time() + if now < self._next_restart_at[index]: + return + + worker = self.workers[index] + attempt = self._restart_counts[index] + 1 + + # Exponential backoff per-worker to avoid tight crash loops + backoff = min(self.restart_backoff_seconds * (2 ** max(self._restart_counts[index], 0)), self.restart_backoff_max_seconds) + self._next_restart_at[index] = now + backoff + + try: + # Reap the old process (avoid accumulating zombies on repeated restarts). + old_process = self.task_runner_processes[index] + try: + if old_process is not None and old_process.exitcode is not None: + old_process.join(timeout=0.0) + try: + old_process.close() + except Exception: + pass + except Exception: + pass + + new_process = self.__build_process_for_worker(worker) + self.task_runner_processes[index] = new_process + new_process.start() + self._restart_counts[index] = attempt + self.__inc_worker_restart_metric(worker.get_task_definition_name()) + logger.info( + "Restarted worker process (worker=%s, attempt=%s, pid=%s, next_backoff=%ss)", + worker.get_task_definition_name(), + attempt, + new_process.pid, + backoff + ) + except Exception as e: + logger.error("Failed to restart worker process (worker=%s): %s", worker.get_task_definition_name(), e) + + def __inc_worker_restart_metric(self, task_type: str) -> None: + """Best-effort counter increment for worker subprocess restarts (requires metrics_settings).""" + if self._metrics_settings is None: + return + + try: + # Avoid instantiating MetricsCollector here: it keeps a global registry which can be problematic + # when multiple TaskHandlers/tests use different PROMETHEUS_MULTIPROC_DIR values in one process. + from conductor.client.telemetry import metrics_collector as mc + + mc._ensure_prometheus_imported() + if self._worker_restart_counter is None: + # Use a dedicated registry to avoid duplicate metric registration errors in the default registry. + registry = mc.CollectorRegistry() + self._worker_restart_counter = mc.Counter( + name=MetricName.WORKER_RESTART, + documentation=MetricDocumentation.WORKER_RESTART, + labelnames=[MetricLabel.TASK_TYPE.value], + registry=registry, + ) + self._worker_restart_counter.labels(task_type).inc() + except Exception as e: + # Metrics should never break worker supervision. + logger.debug("Failed to increment worker_restart metric: %s", e) + + def __build_process_for_worker(self, worker: WorkerInterface) -> Process: + """Create a new worker process for the given worker (used for initial start + restarts).""" + # Detect if worker function is async + if hasattr(worker, 'execute_function'): + is_async_worker = inspect.iscoroutinefunction(worker.execute_function) + else: + is_async_worker = inspect.iscoroutinefunction(worker.execute) + + if is_async_worker: + return Process( + target=_run_async_worker_process, + args=(worker, self._configuration, self._metrics_settings, self.event_listeners) + ) + return Process( + target=_run_sync_worker_process, + args=(worker, self._configuration, self._metrics_settings, self.event_listeners) + ) + + def get_worker_process_status(self) -> List[Dict[str, Any]]: + """Return basic worker process status for health checks / observability.""" + statuses: List[Dict[str, Any]] = [] + for i, worker in enumerate(self.workers): + process = self.task_runner_processes[i] if i < len(self.task_runner_processes) else None + statuses.append({ + "worker": worker.get_task_definition_name(), + "pid": getattr(process, "pid", None), + "alive": process.is_alive() if process is not None else False, + "exitcode": getattr(process, "exitcode", None), + "restart_count": self._restart_counts[i] if i < len(self._restart_counts) else 0, + }) + return statuses + + def is_healthy(self) -> bool: + """True if all worker processes are alive.""" + return all(p is not None and p.is_alive() for p in self.task_runner_processes) def __start_metrics_provider_process(self): if self.metrics_provider_process is None: @@ -318,7 +524,7 @@ def __start_task_runner_processes(self): task_runner_process.start() print(f'task runner process {task_runner_process.name} started') worker = self.workers[i] - paused_status = "PAUSED" if worker.paused else "ACTIVE" + paused_status = "PAUSED" if getattr(worker, "paused", False) else "ACTIVE" logger.debug("Started worker '%s' [%s]", worker.get_task_definition_name(), paused_status) n = n + 1 logger.info("Started %s TaskRunner process(es)", n) diff --git a/src/conductor/client/http/async_rest.py b/src/conductor/client/http/async_rest.py index 48e8276e..9fb948ef 100644 --- a/src/conductor/client/http/async_rest.py +++ b/src/conductor/client/http/async_rest.py @@ -1,5 +1,6 @@ import io import json +import os import re import httpx @@ -46,35 +47,47 @@ def getheaders(self): class AsyncRESTClientObject(object): def __init__(self, connection=None): if connection is None: - # Create httpx async client with HTTP/2 support and connection pooling - # HTTP/2 provides: - # - Request/response multiplexing (multiple requests over single connection) - # - Header compression (HPACK) - # - Server push capability - # - Binary protocol (more efficient than HTTP/1.1 text) - limits = httpx.Limits( - max_connections=100, # Total connections across all hosts - max_keepalive_connections=50, # Persistent connections to keep alive - keepalive_expiry=30.0 # Keep connections alive for 30 seconds - ) - - # Retry configuration for transient failures - transport = httpx.AsyncHTTPTransport( - retries=3, # Retry up to 3 times - http2=True # Enable HTTP/2 support - ) - - self.connection = httpx.AsyncClient( - limits=limits, - transport=transport, - timeout=httpx.Timeout(120.0, connect=10.0), # 120s total, 10s connect - follow_redirects=True, - http2=True # Enable HTTP/2 globally - ) + self._http2_enabled = self._is_http2_enabled() + self.connection = self._create_default_httpx_client() self._owns_connection = True else: self.connection = connection self._owns_connection = False + self._http2_enabled = None + + def _is_http2_enabled(self) -> bool: + val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower() + return val not in ("0", "false", "no", "off") + + def _create_default_httpx_client(self) -> httpx.AsyncClient: + limits = httpx.Limits( + max_connections=100, # Total connections across all hosts + max_keepalive_connections=50, # Persistent connections to keep alive + keepalive_expiry=30.0 # Keep connections alive for 30 seconds + ) + + transport = httpx.AsyncHTTPTransport( + retries=3, + http2=bool(self._http2_enabled) + ) + + return httpx.AsyncClient( + limits=limits, + transport=transport, + timeout=httpx.Timeout(120.0, connect=10.0), + follow_redirects=True, + http2=bool(self._http2_enabled) + ) + + async def _reset_connection(self) -> None: + if not getattr(self, "_owns_connection", False): + return + try: + if getattr(self, "connection", None) is not None: + await self.connection.aclose() + except Exception: + pass + self.connection = self._create_default_httpx_client() async def __aenter__(self): """Async context manager entry.""" @@ -113,6 +126,7 @@ async def request(self, method, url, query_params=None, headers=None, method = method.upper() assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', 'PATCH', 'OPTIONS'] + idempotent_methods = {'GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE'} if post_params and body: raise ValueError( @@ -134,46 +148,56 @@ async def request(self, method, url, query_params=None, headers=None, if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' - try: - # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` - if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: - if query_params: - url += '?' + urlencode(query_params) - if re.search('json', headers['Content-Type'], re.IGNORECASE) or isinstance(body, str): - request_body = '{}' - if body is not None: - request_body = json.dumps(body) - if isinstance(body, str): - request_body = request_body.strip('"') + for attempt in range(2): + try: + # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` + if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: + request_url = url + if query_params: + request_url += '?' + urlencode(query_params) + if re.search('json', headers['Content-Type'], re.IGNORECASE) or isinstance(body, str): + request_body = '{}' + if body is not None: + request_body = json.dumps(body) + if isinstance(body, str): + request_body = request_body.strip('"') + r = await self.connection.request( + method, request_url, + content=request_body, + timeout=timeout, + headers=headers + ) + else: + # Cannot generate the request from given parameters + msg = """Cannot prepare a request message for provided + arguments. Please check that your arguments match + declared content type.""" + raise ApiException(status=0, reason=msg) + # For `GET`, `HEAD` + else: r = await self.connection.request( method, url, - content=request_body, + params=query_params, timeout=timeout, headers=headers ) - else: - # Cannot generate the request from given parameters - msg = """Cannot prepare a request message for provided - arguments. Please check that your arguments match - declared content type.""" - raise ApiException(status=0, reason=msg) - # For `GET`, `HEAD` - else: - r = await self.connection.request( - method, url, - params=query_params, - timeout=timeout, - headers=headers - ) - except httpx.TimeoutException as e: - msg = f"Request timeout: {e}" - raise ApiException(status=0, reason=msg) - except httpx.ConnectError as e: - msg = f"Connection error: {e}" - raise ApiException(status=0, reason=msg) - except Exception as e: - msg = "{0}\n{1}".format(type(e).__name__, str(e)) - raise ApiException(status=0, reason=msg) + break + except (httpx.ProtocolError, httpx.ReadError, httpx.WriteError) as e: + if attempt == 0 and self._owns_connection: + await self._reset_connection() + if method in idempotent_methods: + continue + msg = f"Protocol error: {e}" + raise ApiException(status=0, reason=msg) + except httpx.TimeoutException as e: + msg = f"Request timeout: {e}" + raise ApiException(status=0, reason=msg) + except httpx.ConnectError as e: + msg = f"Connection error: {e}" + raise ApiException(status=0, reason=msg) + except Exception as e: + msg = "{0}\n{1}".format(type(e).__name__, str(e)) + raise ApiException(status=0, reason=msg) if _preload_content: r = RESTResponse(r) @@ -293,6 +317,10 @@ def __str__(self): def is_not_found(self) -> bool: return self.code == 404 + +# Backwards compatible alias for older SDK versions/tests. +RestException = ApiException + class AuthorizationException(ApiException): def __init__(self, status=None, reason=None, http_resp=None, body=None): try: diff --git a/src/conductor/client/http/rest.py b/src/conductor/client/http/rest.py index 2e57e1a3..aedcbc95 100644 --- a/src/conductor/client/http/rest.py +++ b/src/conductor/client/http/rest.py @@ -1,5 +1,6 @@ import io import json +import os import re import httpx @@ -46,35 +47,50 @@ def getheaders(self): class RESTClientObject(object): def __init__(self, connection=None): if connection is None: - # Create httpx client with HTTP/2 support and connection pooling - # HTTP/2 provides: - # - Request/response multiplexing (multiple requests over single connection) - # - Header compression (HPACK) - # - Server push capability - # - Binary protocol (more efficient than HTTP/1.1 text) - limits = httpx.Limits( - max_connections=100, # Total connections across all hosts - max_keepalive_connections=50, # Persistent connections to keep alive - keepalive_expiry=30.0 # Keep connections alive for 30 seconds - ) - - # Retry configuration for transient failures - transport = httpx.HTTPTransport( - retries=3, # Retry up to 3 times - http2=True # Enable HTTP/2 support - ) - - self.connection = httpx.Client( - limits=limits, - transport=transport, - timeout=httpx.Timeout(120.0, connect=10.0), # 120s total, 10s connect - follow_redirects=True, - http2=True # Enable HTTP/2 globally - ) + self._http2_enabled = self._is_http2_enabled() + self.connection = self._create_default_httpx_client() self._owns_connection = True else: self.connection = connection self._owns_connection = False + self._http2_enabled = None + + def _is_http2_enabled(self) -> bool: + # Default to HTTP/2 enabled. Some environments (proxies/LBs) may be unstable with long-lived HTTP/2. + val = os.getenv("CONDUCTOR_HTTP2_ENABLED", "true").strip().lower() + return val not in ("0", "false", "no", "off") + + def _create_default_httpx_client(self) -> httpx.Client: + # Create httpx client with connection pooling. + # Use HTTP/2 when enabled (default), but allow opting out via CONDUCTOR_HTTP2_ENABLED. + limits = httpx.Limits( + max_connections=100, # Total connections across all hosts + max_keepalive_connections=50, # Persistent connections to keep alive + keepalive_expiry=30.0 # Keep connections alive for 30 seconds + ) + + transport = httpx.HTTPTransport( + retries=3, # Retry up to 3 times for eligible transport errors + http2=bool(self._http2_enabled) + ) + + return httpx.Client( + limits=limits, + transport=transport, + timeout=httpx.Timeout(120.0, connect=10.0), # 120s total, 10s connect + follow_redirects=True, + http2=bool(self._http2_enabled) + ) + + def _reset_connection(self) -> None: + if not getattr(self, "_owns_connection", False): + return + try: + if getattr(self, "connection", None) is not None: + self.connection.close() + except Exception: + pass + self.connection = self._create_default_httpx_client() def __del__(self): """Cleanup httpx client on object destruction.""" @@ -114,6 +130,7 @@ def request(self, method, url, query_params=None, headers=None, method = method.upper() assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', 'PATCH', 'OPTIONS'] + idempotent_methods = {'GET', 'HEAD', 'OPTIONS', 'PUT', 'DELETE'} if post_params and body: raise ValueError( @@ -135,46 +152,59 @@ def request(self, method, url, query_params=None, headers=None, if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' - try: - # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` - if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: - if query_params: - url += '?' + urlencode(query_params) - if re.search('json', headers['Content-Type'], re.IGNORECASE) or isinstance(body, str): - request_body = '{}' - if body is not None: - request_body = json.dumps(body) - if isinstance(body, str): - request_body = request_body.strip('"') + for attempt in range(2): + try: + # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` + if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: + request_url = url + if query_params: + request_url += '?' + urlencode(query_params) + if re.search('json', headers['Content-Type'], re.IGNORECASE) or isinstance(body, str): + request_body = '{}' + if body is not None: + request_body = json.dumps(body) + if isinstance(body, str): + request_body = request_body.strip('"') + r = self.connection.request( + method, request_url, + content=request_body, + timeout=timeout, + headers=headers + ) + else: + # Cannot generate the request from given parameters + msg = """Cannot prepare a request message for provided + arguments. Please check that your arguments match + declared content type.""" + raise ApiException(status=0, reason=msg) + # For `GET`, `HEAD` + else: r = self.connection.request( method, url, - content=request_body, + params=query_params, timeout=timeout, headers=headers ) - else: - # Cannot generate the request from given parameters - msg = """Cannot prepare a request message for provided - arguments. Please check that your arguments match - declared content type.""" - raise ApiException(status=0, reason=msg) - # For `GET`, `HEAD` - else: - r = self.connection.request( - method, url, - params=query_params, - timeout=timeout, - headers=headers - ) - except httpx.TimeoutException as e: - msg = f"Request timeout: {e}" - raise ApiException(status=0, reason=msg) - except httpx.ConnectError as e: - msg = f"Connection error: {e}" - raise ApiException(status=0, reason=msg) - except Exception as e: - msg = "{0}\n{1}".format(type(e).__name__, str(e)) - raise ApiException(status=0, reason=msg) + break + except (httpx.ProtocolError, httpx.ReadError, httpx.WriteError) as e: + # A stale/broken keep-alive connection can cause protocol errors (esp. with HTTP/2). + # Reset the client to recover without requiring process restart. + # Only auto-retry idempotent methods to avoid duplicating side effects. + if attempt == 0 and self._owns_connection: + self._reset_connection() + if method in idempotent_methods: + continue + msg = f"Protocol error: {e}" + raise ApiException(status=0, reason=msg) + except httpx.TimeoutException as e: + msg = f"Request timeout: {e}" + raise ApiException(status=0, reason=msg) + except httpx.ConnectError as e: + msg = f"Connection error: {e}" + raise ApiException(status=0, reason=msg) + except Exception as e: + msg = "{0}\n{1}".format(type(e).__name__, str(e)) + raise ApiException(status=0, reason=msg) if _preload_content: r = RESTResponse(r) @@ -294,6 +324,11 @@ def __str__(self): def is_not_found(self) -> bool: return self.code == 404 + +# Backwards compatible alias for older SDK versions/tests. +# Historically callers imported `RestException` from this module. +RestException = ApiException + class AuthorizationException(ApiException): def __init__(self, status=None, reason=None, http_resp=None, body=None): try: diff --git a/src/conductor/client/telemetry/metrics_collector.py b/src/conductor/client/telemetry/metrics_collector.py index 54bfc648..93677edb 100644 --- a/src/conductor/client/telemetry/metrics_collector.py +++ b/src/conductor/client/telemetry/metrics_collector.py @@ -87,31 +87,37 @@ class MetricsCollector: Note: Uses Python's Protocol for structural subtyping rather than explicit inheritance to avoid circular imports and maintain backward compatibility. """ - counters: ClassVar[Dict[str, Counter]] = {} - gauges: ClassVar[Dict[str, Gauge]] = {} - histograms: ClassVar[Dict[str, Histogram]] = {} - summaries: ClassVar[Dict[str, Summary]] = {} - quantile_metrics: ClassVar[Dict[str, Gauge]] = {} # metric_name -> Gauge with quantile label (used as summary) - quantile_data: ClassVar[Dict[str, deque]] = {} # metric_name+labels -> deque of values - registry = None # Lazy initialization - created when first MetricsCollector instance is created - must_collect_metrics = False QUANTILE_WINDOW_SIZE = 1000 # Keep last 1000 observations for quantile calculation def __init__(self, settings: MetricsSettings): - if settings is not None: - os.environ["PROMETHEUS_MULTIPROC_DIR"] = settings.directory + # Instance state (avoid cross-test/dir interference from class-level caches). + self.counters: Dict[str, Counter] = {} + self.gauges: Dict[str, Gauge] = {} + self.histograms: Dict[str, Histogram] = {} + self.summaries: Dict[str, Summary] = {} + self.quantile_metrics: Dict[str, Gauge] = {} # metric_name -> Gauge with quantile label (used as summary) + self.quantile_data: Dict[str, deque] = {} # metric_name+labels -> deque of values + self.registry = None + self.must_collect_metrics = False - # Import prometheus_client NOW (after PROMETHEUS_MULTIPROC_DIR is set) - _ensure_prometheus_imported() + if settings is None: + return - # Initialize registry on first use (after PROMETHEUS_MULTIPROC_DIR is set) - if MetricsCollector.registry is None: - MetricsCollector.registry = CollectorRegistry() - MultiProcessCollector(MetricsCollector.registry) - logger.debug(f"Created CollectorRegistry with multiprocess support") + os.environ["PROMETHEUS_MULTIPROC_DIR"] = settings.directory - self.must_collect_metrics = True - logger.debug(f"MetricsCollector initialized with directory={settings.directory}, must_collect={self.must_collect_metrics}") + # Import prometheus_client NOW (after PROMETHEUS_MULTIPROC_DIR is set). + _ensure_prometheus_imported() + + # Each MetricsCollector instance gets its own registry so callers/tests can + # safely use different PROMETHEUS_MULTIPROC_DIR values in the same process. + self.registry = CollectorRegistry() + + self.must_collect_metrics = True + logger.debug( + "MetricsCollector initialized with directory=%s, must_collect=%s", + settings.directory, + self.must_collect_metrics, + ) @staticmethod def provide_metrics(settings: MetricsSettings) -> None: @@ -297,6 +303,16 @@ def increment_uncaught_exception(self): labels={} ) + def increment_worker_restart(self, task_type: str) -> None: + """Incremented each time TaskHandler restarts a worker subprocess.""" + self.__increment_counter( + name=MetricName.WORKER_RESTART, + documentation=MetricDocumentation.WORKER_RESTART, + labels={ + MetricLabel.TASK_TYPE: task_type + } + ) + def increment_task_poll_error(self, task_type: str, exception: Exception) -> None: # No-op: Poll errors are already tracked via task_poll_time_seconds_count with status=FAILURE pass diff --git a/src/conductor/client/telemetry/model/metric_documentation.py b/src/conductor/client/telemetry/model/metric_documentation.py index cdcd56e1..d76ad063 100644 --- a/src/conductor/client/telemetry/model/metric_documentation.py +++ b/src/conductor/client/telemetry/model/metric_documentation.py @@ -18,5 +18,6 @@ class MetricDocumentation(str, Enum): TASK_UPDATE_ERROR = "Task status cannot be updated back to server" TASK_UPDATE_TIME_HISTOGRAM = "Task update duration in seconds with quantiles" THREAD_UNCAUGHT_EXCEPTION = "thread_uncaught_exceptions" + WORKER_RESTART = "Worker subprocess restarted" WORKFLOW_START_ERROR = "Counter for workflow start errors" WORKFLOW_INPUT_SIZE = "Records input payload size of a workflow" diff --git a/src/conductor/client/telemetry/model/metric_name.py b/src/conductor/client/telemetry/model/metric_name.py index 72651019..89810865 100644 --- a/src/conductor/client/telemetry/model/metric_name.py +++ b/src/conductor/client/telemetry/model/metric_name.py @@ -18,5 +18,6 @@ class MetricName(str, Enum): TASK_UPDATE_ERROR = "task_update_error" TASK_UPDATE_TIME_HISTOGRAM = "task_update_time_seconds" THREAD_UNCAUGHT_EXCEPTION = "thread_uncaught_exceptions" + WORKER_RESTART = "worker_restart" WORKFLOW_INPUT_SIZE = "workflow_input_size" WORKFLOW_START_ERROR = "workflow_start_error" diff --git a/tests/chaos/README.md b/tests/chaos/README.md new file mode 100644 index 00000000..f8328fa8 --- /dev/null +++ b/tests/chaos/README.md @@ -0,0 +1,30 @@ +# Chaos Tests (TaskHandler) + +These tests are intentionally heavier than unit tests. They run `TaskHandler` with real multiprocessing worker +processes and inject failures (protocol errors, forced worker kills) to validate that: + +- Workers keep polling under transient transport failures +- `TaskHandler` detects dead worker processes and restarts them + +## Run + +Chaos tests are opt-in and skipped by default. + +```bash +RUN_CHAOS_TESTS=1 pytest -q tests/chaos -q +``` + +Default timeout is 60s (set `CHAOS_TEST_TIMEOUT_SECONDS` to override) so runs cannot hang CI. + +## What is simulated + +- A transport-level stub for `httpx.Client` (used by the SDK REST layer) that: + - returns `[]` for `GET /tasks/poll/batch/{tasktype}` + - optionally raises `httpx.ProtocolError` on the first poll call + - returns `OK` for `POST /tasks` + +## Notes + +- These tests do not require a real Conductor/Orkes server. +- They do not validate "real" HTTP/2 GOAWAY semantics (that requires an HTTP/2-capable proxy/server), but they do + validate the SDK code paths that handle protocol errors and reset the underlying client. diff --git a/tests/chaos/__init__.py b/tests/chaos/__init__.py new file mode 100644 index 00000000..14efa1fd --- /dev/null +++ b/tests/chaos/__init__.py @@ -0,0 +1,2 @@ +# Package marker for chaos tests. + diff --git a/tests/chaos/_fake_conductor_server.py b/tests/chaos/_fake_conductor_server.py new file mode 100644 index 00000000..705e3fe5 --- /dev/null +++ b/tests/chaos/_fake_conductor_server.py @@ -0,0 +1,143 @@ +import json +import socket +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from multiprocessing import Value +from typing import Optional + + +class FakeConductorSharedState: + """ + Shared state for the fake server. + + Uses multiprocessing.Value (shared memory) to avoid requiring a local manager server, + which may be blocked in some sandboxed environments. + """ + def __init__(self): + self.drop_first_n = Value("i", 0) + self.requests_total = Value("i", 0) + self.drops_total = Value("i", 0) + self.poll_requests = Value("i", 0) + self.poll_responses = Value("i", 0) + self.update_requests = Value("i", 0) + self.update_responses = Value("i", 0) + + def _inc(self, counter: Value, n: int = 1) -> None: + with counter.get_lock(): + counter.value += n + + def _get(self, counter: Value) -> int: + with counter.get_lock(): + return int(counter.value) + + def _set(self, counter: Value, value: int) -> None: + with counter.get_lock(): + counter.value = int(value) + + +class FakeConductorHTTPServer(HTTPServer): + def __init__(self, server_address, RequestHandlerClass, state: FakeConductorSharedState): + super().__init__(server_address, RequestHandlerClass) + self.state = state + + +class FakeConductorHandler(BaseHTTPRequestHandler): + # Keep the handler quiet; chaos harness can print summaries itself. + def log_message(self, format, *args): + return + + def _should_drop(self) -> bool: + # Deterministic: drop first N requests. + remaining = self.server.state._get(self.server.state.drop_first_n) + if remaining > 0: + self.server.state._set(self.server.state.drop_first_n, remaining - 1) + return True + return False + + def _drop_connection(self) -> None: + self.server.state._inc(self.server.state.drops_total) + try: + self.connection.shutdown(socket.SHUT_RDWR) + except Exception: + pass + try: + self.connection.close() + except Exception: + pass + + def do_GET(self) -> None: + self.server.state._inc(self.server.state.requests_total) + + if self._should_drop(): + self._drop_connection() + return + + # Task poll used by TaskRunner: GET /api/tasks/poll/batch/{tasktype} + if self.path.startswith("/api/tasks/poll/batch/"): + self.server.state._inc(self.server.state.poll_requests) + payload = "[]" + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload.encode("utf-8")) + self.server.state._inc(self.server.state.poll_responses) + return + + if self.path == "/api/health": + payload = json.dumps({"ok": True, "ts": time.time()}) + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload.encode("utf-8")) + return + + self.send_response(404) + self.end_headers() + + def do_POST(self) -> None: + self.server.state._inc(self.server.state.requests_total) + + if self._should_drop(): + self._drop_connection() + return + + # Task update used by TaskRunner: POST /api/tasks + if self.path == "/api/tasks": + content_length = int(self.headers.get("Content-Length", "0")) + if content_length > 0: + _ = self.rfile.read(content_length) + self.server.state._inc(self.server.state.update_requests) + payload = "OK" + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload.encode("utf-8")) + self.server.state._inc(self.server.state.update_responses) + return + + self.send_response(404) + self.end_headers() + + +def serve_fake_conductor(shared_state: FakeConductorSharedState, port_queue, stop_event) -> None: + """ + Run a minimal HTTP server that implements just enough Conductor endpoints for TaskRunner: + - GET /api/tasks/poll/batch/{tasktype} + - POST /api/tasks + + Supports request dropping via shared_state.drop_first_n. + """ + server = FakeConductorHTTPServer(("127.0.0.1", 0), FakeConductorHandler, state=shared_state) + server.timeout = 0.2 + port_queue.put(server.server_address[1]) + + while not stop_event.is_set(): + server.handle_request() + + try: + server.server_close() + except Exception: + pass diff --git a/tests/chaos/test_task_handler_chaos.py b/tests/chaos/test_task_handler_chaos.py new file mode 100644 index 00000000..15b6a1b9 --- /dev/null +++ b/tests/chaos/test_task_handler_chaos.py @@ -0,0 +1,176 @@ +import os +import signal +import time +import unittest +from multiprocessing import Value +from unittest.mock import patch + +import httpx + +from conductor.client.automator.task_handler import TaskHandler +from conductor.client.configuration.configuration import Configuration +from tests.unit.resources.workers import ClassWorker + + +# These are shared across forked worker processes (multiprocessing.Value uses shared memory). +_client_creations = Value("i", 0) +_requests_total = Value("i", 0) +_poll_requests = Value("i", 0) +_fail_next_poll = Value("i", 0) + + +class _FlakyHttpxClient: + """ + Stand-in for httpx.Client used by RESTClientObject during chaos tests. + + This avoids binding to local TCP sockets (which may be disallowed in some sandboxes) + while still exercising: + - RESTClientObject protocol-error handling + connection reset + retry + - TaskRunner polling loop in a real worker subprocess + """ + + def __init__(self, *args, **kwargs): + with _client_creations.get_lock(): + _client_creations.value += 1 + + def close(self): + return + + def request(self, method, url, params=None, content=None, timeout=None, headers=None): + with _requests_total.get_lock(): + _requests_total.value += 1 + + # TaskRunner polling endpoint: + # GET {host}/tasks/poll/batch/{tasktype} + if "/tasks/poll/batch/" in url: + with _poll_requests.get_lock(): + _poll_requests.value += 1 + + with _fail_next_poll.get_lock(): + should_fail = _fail_next_poll.value > 0 + if should_fail: + _fail_next_poll.value -= 1 + + if should_fail: + raise httpx.ProtocolError("chaos: simulated ConnectionTerminated/protocol error") + + return httpx.Response( + 200, + content=b"[]", + headers={"Content-Type": "application/json"}, + ) + + # TaskRunner update endpoint: + # POST {host}/tasks + if url.endswith("/tasks"): + return httpx.Response( + 200, + content=b"OK", + headers={"Content-Type": "text/plain"}, + ) + + return httpx.Response(404, content=b"") + + +@unittest.skipUnless(os.getenv("RUN_CHAOS_TESTS") == "1", "Set RUN_CHAOS_TESTS=1 to run chaos tests") +@unittest.skipIf(os.name == "nt", "Chaos tests rely on fork semantics on POSIX") +class TestTaskHandlerChaos(unittest.TestCase): + def setUp(self) -> None: + # Hard stop to prevent runaway chaos runs (default 60s, override via CHAOS_TEST_TIMEOUT_SECONDS) + timeout = int(os.getenv("CHAOS_TEST_TIMEOUT_SECONDS", "60")) + try: + signal.alarm(timeout) + except Exception: + # On platforms without alarm (should be skipped anyway) + pass + + with _client_creations.get_lock(): + _client_creations.value = 0 + with _requests_total.get_lock(): + _requests_total.value = 0 + with _poll_requests.get_lock(): + _poll_requests.value = 0 + with _fail_next_poll.get_lock(): + _fail_next_poll.value = 0 + + # Patch httpx.Client in the SDK REST layer so worker subprocesses inherit it via fork. + import conductor.client.http.rest as rest_module + self._patcher = patch.object(rest_module.httpx, "Client", _FlakyHttpxClient) + self._patcher.start() + + def tearDown(self) -> None: + try: + signal.alarm(0) + except Exception: + pass + try: + self._patcher.stop() + except Exception: + pass + + def test_protocol_error_triggers_client_reset_and_polling_continues(self): + # Fail the first poll request at the transport layer (simulates a terminated keep-alive connection). + with _fail_next_poll.get_lock(): + _fail_next_poll.value = 1 + + worker = ClassWorker("chaos_task") + worker.poll_interval = 1.0 # ms + config = Configuration(server_api_url="http://chaos/api", debug=False) + + with TaskHandler( + workers=[worker], + configuration=config, + scan_for_annotated_workers=False, + monitor_interval_seconds=0.1, + restart_backoff_seconds=0.0, + restart_backoff_max_seconds=0.0, + ) as handler: + handler.start_processes() + + deadline = time.time() + 5 + while time.time() < deadline and int(_poll_requests.value) < 10: + time.sleep(0.05) + + # At least one poll happened, and the worker is still alive. + self.assertGreaterEqual(int(_poll_requests.value), 1) + self.assertTrue(handler.is_healthy()) + + # We should have created at least two httpx clients in the worker due to the reset+retry path. + self.assertGreaterEqual(int(_client_creations.value), 2) + + def test_worker_process_restarts_after_kill(self): + worker = ClassWorker("chaos_task") + worker.poll_interval = 1.0 # ms + config = Configuration(server_api_url="http://chaos/api", debug=False) + + with TaskHandler( + workers=[worker], + configuration=config, + scan_for_annotated_workers=False, + monitor_interval_seconds=0.1, + restart_backoff_seconds=0.0, + restart_backoff_max_seconds=0.0, + ) as handler: + handler.start_processes() + + deadline = time.time() + 5 + while time.time() < deadline and int(_poll_requests.value) < 5: + time.sleep(0.05) + + status0 = handler.get_worker_process_status()[0] + old_pid = status0["pid"] + self.assertIsNotNone(old_pid) + + handler.task_runner_processes[0].kill() + + deadline = time.time() + 10 + while time.time() < deadline: + status = handler.get_worker_process_status()[0] + if status["restart_count"] >= 1 and status["pid"] != old_pid and status["alive"]: + break + time.sleep(0.05) + + status1 = handler.get_worker_process_status()[0] + self.assertGreaterEqual(status1["restart_count"], 1) + self.assertNotEqual(status1["pid"], old_pid) + self.assertTrue(status1["alive"]) diff --git a/tests/unit/api_client/test_rest_client.py b/tests/unit/api_client/test_rest_client.py new file mode 100644 index 00000000..3b205287 --- /dev/null +++ b/tests/unit/api_client/test_rest_client.py @@ -0,0 +1,31 @@ +import unittest +from unittest.mock import MagicMock, patch + +import httpx + +from conductor.client.http import rest + + +class TestRESTClientObject(unittest.TestCase): + @patch.object(rest.RESTClientObject, "_create_default_httpx_client") + def test_resets_and_retries_on_remote_protocol_error(self, mock_create_client): + first_client = MagicMock() + second_client = MagicMock() + mock_create_client.side_effect = [first_client, second_client] + + first_client.request.side_effect = httpx.RemoteProtocolError("ConnectionTerminated") + + response = MagicMock() + response.status_code = 200 + response.reason_phrase = "OK" + response.headers = {} + response.text = "" + second_client.request.return_value = response + + client = rest.RESTClientObject(connection=None) + result = client.request("GET", "http://example", query_params={"a": "b"}) + + self.assertEqual(mock_create_client.call_count, 2) + self.assertTrue(first_client.close.called) + self.assertEqual(result.status, 200) + diff --git a/tests/unit/automator/test_json_schema_generator.py b/tests/unit/automator/test_json_schema_generator.py index bbdd3a9a..adc5bd65 100644 --- a/tests/unit/automator/test_json_schema_generator.py +++ b/tests/unit/automator/test_json_schema_generator.py @@ -69,15 +69,15 @@ class TestOptionalTypes(unittest.TestCase): def test_optional_string(self): schema = _type_to_json_schema(Optional[str]) - self.assertEqual(schema, {"type": "string", "nullable": True}) + self.assertEqual(schema, {"anyOf": [{"type": "string"}, {"type": "null"}]}) def test_optional_int(self): schema = _type_to_json_schema(Optional[int]) - self.assertEqual(schema, {"type": "integer", "nullable": True}) + self.assertEqual(schema, {"anyOf": [{"type": "integer"}, {"type": "null"}]}) def test_optional_dict(self): schema = _type_to_json_schema(Optional[dict]) - self.assertEqual(schema, {"type": "object", "nullable": True}) + self.assertEqual(schema, {"anyOf": [{"type": "object"}, {"type": "null"}]}) def test_optional_parameter_not_required(self): """Optional[T] parameters should not be in required array.""" @@ -95,8 +95,10 @@ def worker(required_param: str, optional_param: Optional[str]) -> dict: self.assertIn('required_param', input_schema['properties']) self.assertIn('optional_param', input_schema['properties']) - # optional_param should be nullable - self.assertTrue(input_schema['properties']['optional_param']['nullable']) + # optional_param should allow null + optional_schema = input_schema['properties']['optional_param'] + self.assertIn('anyOf', optional_schema) + self.assertIn({"type": "null"}, optional_schema['anyOf']) def test_optional_with_default_still_not_required(self): """Optional[T] with default value should not be required.""" @@ -226,7 +228,7 @@ class UserProfile: "type": "object", "properties": { "user_id": {"type": "string"}, - "email": {"type": "string", "nullable": True} + "email": {"anyOf": [{"type": "string"}, {"type": "null"}]} }, "required": ["user_id"], "additionalProperties": True @@ -316,7 +318,7 @@ def test_union_with_task_in_progress(self): def test_optional_is_union(self): # Optional[str] is Union[str, None] schema = _type_to_json_schema(Optional[str]) - self.assertEqual(schema, {"type": "string", "nullable": True}) + self.assertEqual(schema, {"anyOf": [{"type": "string"}, {"type": "null"}]}) class TestUnsupportedTypes(unittest.TestCase): @@ -456,7 +458,7 @@ def register_user( contact_schema = input_schema['properties']['contact'] self.assertEqual(contact_schema['type'], "object") self.assertEqual(contact_schema['properties']['email'], {"type": "string"}) - self.assertEqual(contact_schema['properties']['phone'], {"type": "string", "nullable": True}) + self.assertEqual(contact_schema['properties']['phone'], {"anyOf": [{"type": "string"}, {"type": "null"}]}) # Only email is required in contact (phone is optional) self.assertEqual(contact_schema['required'], ['email']) diff --git a/tests/unit/automator/test_task_handler_coverage.py b/tests/unit/automator/test_task_handler_coverage.py index ecb6bac7..ae728483 100644 --- a/tests/unit/automator/test_task_handler_coverage.py +++ b/tests/unit/automator/test_task_handler_coverage.py @@ -13,6 +13,7 @@ """ import multiprocessing import os +import tempfile import unittest from unittest.mock import Mock, patch, MagicMock, PropertyMock, call from conductor.client.automator.task_handler import ( @@ -59,6 +60,81 @@ def tearDown(self): except Exception: pass + +class _FakeProcess: + def __init__(self, *, alive: bool, exitcode: int, pid: int = 123): + self._alive = alive + self.exitcode = exitcode + self.pid = pid + self.started = False + + def is_alive(self): + return self._alive + + def start(self): + self.started = True + self._alive = True + # Simulate OS-assigned PID on start + self.pid = 999 + + +class TestTaskHandlerSupervision(unittest.TestCase): + @patch('conductor.client.automator.task_handler._setup_logging_queue') + @patch('conductor.client.automator.task_handler.importlib.import_module') + def test_restart_dead_worker_process(self, mock_import, mock_logging): + mock_logging.return_value = (Mock(), Mock()) + + handler = TaskHandler( + workers=[ClassWorker('test_task')], + configuration=Configuration(), + scan_for_annotated_workers=False, + monitor_processes=True, + restart_on_failure=True, + monitor_interval_seconds=0.01, + restart_backoff_seconds=0.0, + restart_backoff_max_seconds=0.0 + ) + + # Replace the real Process with a dead one to trigger restart logic. + handler.task_runner_processes[0] = _FakeProcess(alive=False, exitcode=1, pid=111) + new_proc = _FakeProcess(alive=False, exitcode=None, pid=None) + handler._TaskHandler__build_process_for_worker = Mock(return_value=new_proc) + + handler._TaskHandler__check_and_restart_processes() + + self.assertTrue(new_proc.started) + self.assertEqual(handler.get_worker_process_status()[0]["restart_count"], 1) + + @patch('conductor.client.automator.task_handler._setup_logging_queue') + @patch('conductor.client.automator.task_handler.importlib.import_module') + def test_restart_increments_worker_restart_metric(self, mock_import, mock_logging): + mock_logging.return_value = (Mock(), Mock()) + + with tempfile.TemporaryDirectory(prefix="conductor-metrics-") as metrics_dir: + metrics_settings = MetricsSettings(directory=metrics_dir, update_interval=0.5) + handler = TaskHandler( + workers=[ClassWorker('test_task')], + configuration=Configuration(), + metrics_settings=metrics_settings, + scan_for_annotated_workers=False, + monitor_processes=True, + restart_on_failure=True, + monitor_interval_seconds=0.01, + restart_backoff_seconds=0.0, + restart_backoff_max_seconds=0.0 + ) + + handler.task_runner_processes[0] = _FakeProcess(alive=False, exitcode=1, pid=111) + new_proc = _FakeProcess(alive=False, exitcode=None, pid=None) + handler._TaskHandler__build_process_for_worker = Mock(return_value=new_proc) + + with patch.object(handler, "_TaskHandler__inc_worker_restart_metric") as mock_inc: + handler._TaskHandler__check_and_restart_processes() + + self.assertTrue(new_proc.started) + self.assertEqual(handler.get_worker_process_status()[0]["restart_count"], 1) + mock_inc.assert_called_once_with('test_task') + @patch('conductor.client.automator.task_handler._setup_logging_queue') def test_initialization_with_no_workers(self, mock_logging): """Test initialization with no workers provided.""" @@ -330,20 +406,7 @@ def tearDown(self): # Clean up any started processes for handler in self.handlers: try: - # Terminate all task runner processes - for process in handler.task_runner_processes: - if process.is_alive(): - process.terminate() - process.join(timeout=1) - if process.is_alive(): - process.kill() - # Terminate metrics process if it exists - if hasattr(handler, 'metrics_provider_process') and handler.metrics_provider_process: - if handler.metrics_provider_process.is_alive(): - handler.metrics_provider_process.terminate() - handler.metrics_provider_process.join(timeout=1) - if handler.metrics_provider_process.is_alive(): - handler.metrics_provider_process.kill() + handler.stop_processes() except Exception: pass @@ -930,13 +993,7 @@ def tearDown(self): # Clean up any started processes for handler in self.handlers: try: - # Terminate all task runner processes - for process in handler.task_runner_processes: - if process.is_alive(): - process.terminate() - process.join(timeout=1) - if process.is_alive(): - process.kill() + handler.stop_processes() except Exception: pass # Restore original environment @@ -997,13 +1054,7 @@ def tearDown(self): # Clean up any started processes for handler in self.handlers: try: - # Terminate all task runner processes - for process in handler.task_runner_processes: - if process.is_alive(): - process.terminate() - process.join(timeout=1) - if process.is_alive(): - process.kill() + handler.stop_processes() except Exception: pass diff --git a/tests/unit/automator/test_task_runner_coverage.py b/tests/unit/automator/test_task_runner_coverage.py index 19b07261..766427da 100644 --- a/tests/unit/automator/test_task_runner_coverage.py +++ b/tests/unit/automator/test_task_runner_coverage.py @@ -294,7 +294,9 @@ def test_poll_task_when_worker_paused(self, mock_sleep): def test_poll_task_with_auth_failure_backoff(self, mock_sleep): """Test exponential backoff on authorization failures""" worker = MockWorker('test_task') - task_runner = TaskRunner(worker=worker) + # Ensure auth env vars do not cause ApiClient token refresh (which can call time.sleep internally). + with patch.dict(os.environ, {}, clear=True): + task_runner = TaskRunner(worker=worker) # Simulate auth failure task_runner._auth_failures = 2 @@ -305,7 +307,7 @@ def test_poll_task_with_auth_failure_backoff(self, mock_sleep): # Should skip polling and return None due to backoff self.assertIsNone(task) - mock_sleep.assert_called_once() + mock_sleep.assert_called_once_with(0.1) @patch('time.sleep') def test_poll_task_auth_failure_with_invalid_token(self, mock_sleep): diff --git a/workers.md b/workers.md index 6fd2e222..2c7122c3 100644 --- a/workers.md +++ b/workers.md @@ -100,6 +100,12 @@ if __name__ == '__main__': ``` +Notes: +- TaskHandler enables supervision by default. For local debugging you can disable it with `monitor_processes=False` and `restart_on_failure=False`. +- `CONDUCTOR_HTTP2_ENABLED` controls whether the SDK uses HTTP/2 for outbound requests to Conductor (default: `true`). + If you see intermittent `httpcore.RemoteProtocolError: `, try `CONDUCTOR_HTTP2_ENABLED=false` + to force HTTP/1.1 (workaround for some proxies/load balancers). + ## Design Principles for Workers Each worker embodies the design pattern and follows certain basic principles: @@ -253,4 +259,4 @@ There are several advantages to this approach: ## Deploying Workers in Production -Conductor workers can run in the cloud-native environment or on-prem and can easily be deployed like any other Python application. Workers can run a containerized environment, VMs, or bare metal like you would deploy your other Python applications. \ No newline at end of file +Conductor workers can run in the cloud-native environment or on-prem and can easily be deployed like any other Python application. Workers can run a containerized environment, VMs, or bare metal like you would deploy your other Python applications.