Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The Conductor Python SDK includes built-in metrics collection using Prometheus t
| `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) |
| `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full |
| `task_paused_total` | Counter | `taskType` | Number of polls while worker paused |
| `worker_restart_total` | Counter | `taskType` | Number of times TaskHandler restarted a worker subprocess |
| `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count |
| `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) |
| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count |
Expand Down
320 changes: 221 additions & 99 deletions README.md

Large diffs are not rendered by default.

56 changes: 54 additions & 2 deletions docs/WORKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,64 @@ workers = [
)
]

# If there are decorated workers in your application, scan_for_annotated_workers should be set
# default value of scan_for_annotated_workers is False
# TaskHandler scans for @worker_task decorated workers by default.
# Set scan_for_annotated_workers=False if you want to disable auto-discovery.
with TaskHandler(workers, configuration, scan_for_annotated_workers=True) as task_handler:
task_handler.start_processes()
```

### Resilience: auto-restart and health checks

If you run workers as a long-lived service (e.g., alongside FastAPI/Uvicorn), you can optionally enable process
supervision so the `TaskHandler` monitors worker processes and restarts them if they exit unexpectedly:

```python
with TaskHandler(
workers,
configuration,
scan_for_annotated_workers=True,
# Enabled by default. Set to False to disable supervision.
monitor_processes=True,
restart_on_failure=True,
) as task_handler:
task_handler.start_processes()
```

For a `/healthcheck` endpoint, you can use:

```python
task_handler.is_healthy()
task_handler.get_worker_process_status()
```

To disable restarts/monitoring (e.g., for local debugging), set:

```python
TaskHandler(..., monitor_processes=False, restart_on_failure=False)
```

### Mitigation for intermittent HTTP/2 connection termination

The SDK uses `httpx` for outbound calls to the Conductor/Orkes server. By default, it enables HTTP/2 for these calls.
In some environments (certain proxies/load balancers/NATs), long-lived HTTP/2 connections may be terminated, which can
surface as errors like `httpcore.RemoteProtocolError: <ConnectionTerminated ...>`.

The SDK automatically attempts to recover by recreating the underlying HTTP client and retrying the request once.
If your environment is still unstable with HTTP/2, you can force the SDK to use HTTP/1.1 instead via an environment
variable.

#### `CONDUCTOR_HTTP2_ENABLED`

- **What it does**: Controls whether the Conductor Python SDK uses HTTP/2 for outbound requests to the Conductor server.
- **Default**: `true` (HTTP/2 enabled).
- **Scope**: Affects all SDK clients (workers, `OrkesClients`, sync + async). It does *not* change your FastAPI/Uvicorn
server behavior; it only changes how the SDK talks to Conductor.
- **Values**: `false|0|no|off` disables HTTP/2. Anything else enables it.

```shell
export CONDUCTOR_HTTP2_ENABLED=false
```

If you paste the above code in a file called main.py, you can launch the workers by running:
```shell
python3 main.py
Expand Down
3 changes: 2 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ python examples/workers_e2e.py
|------|-------------|-----|
| **workers_e2e.py** | ⭐ Start here - sync + async workers | `python examples/workers_e2e.py` |
| **worker_example.py** | Comprehensive patterns (None returns, TaskInProgress) | `python examples/worker_example.py` |
| **fastapi_worker_service.py** | FastAPI exposing a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
| **worker_configuration_example.py** | Hierarchical configuration (env vars) | `python examples/worker_configuration_example.py` |
| **task_context_example.py** | Task context (logs, poll_count, task_id) | `python examples/task_context_example.py` |
| **task_workers.py** | Task worker patterns with dataclasses | `python examples/task_workers.py` |
Expand Down Expand Up @@ -416,4 +417,4 @@ export conductor.worker.all.thread_count=20
---

**Repository**: https://github.com/conductor-oss/conductor-python
**License**: Apache 2.0
**License**: Apache 2.0
213 changes: 213 additions & 0 deletions examples/fastapi_worker_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""
FastAPI + Conductor workers in one process.

Install (example-only deps):
pip install fastapi uvicorn

Run (single web worker; TaskHandler will spawn one process per Conductor worker):
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
export CONDUCTOR_AUTH_KEY="..."
export CONDUCTOR_AUTH_SECRET="..."
uvicorn examples.fastapi_worker_service:app --host 0.0.0.0 --port 8081 --workers 1

Trigger the workflow via API (waits up to 10s for completion):
curl -s -X POST http://localhost:8081/v1/hello \\
-H 'content-type: application/json' \\
-d '{"name":"Ada","a":2,"b":3}' | jq .

Notes:
- Do NOT run uvicorn with multiple web workers unless you explicitly want multiple independent TaskHandlers polling.
- TaskHandler supervision is enabled by default (monitor + restart worker subprocesses).
"""

from __future__ import annotations

import os
from contextlib import asynccontextmanager
from typing import Optional

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.context.task_context import get_task_context
from conductor.client.orkes_clients import OrkesClients
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor


# ---------------------------------------------------------------------------
# Example worker(s)
# ---------------------------------------------------------------------------

@worker_task(
task_definition_name="fastapi_normalize_name",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def normalize_name(name: str) -> str:
# This shows how to access task context safely.
_ = get_task_context()
return name.strip().title()


@worker_task(
task_definition_name="fastapi_add_numbers",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def add_numbers(a: int, b: int) -> int:
_ = get_task_context()
return a + b


@worker_task(
task_definition_name="fastapi_build_message",
poll_interval_millis=100,
register_task_def=True,
overwrite_task_def=False,
)
def build_message(normalized_name: str, total: int) -> dict:
ctx = get_task_context()
return {
"message": f"Hello {normalized_name}! {total=}",
"normalized_name": normalized_name,
"total": total,
"task_id": ctx.get_task_id(),
"workflow_id": ctx.get_workflow_instance_id(),
}


def _build_hello_workflow(executor: WorkflowExecutor) -> ConductorWorkflow:
workflow = ConductorWorkflow(executor=executor, name="fastapi_hello_workflow", version=1)

t1 = normalize_name(task_ref_name="normalize_name_ref", name=workflow.input("name"))
t2 = add_numbers(task_ref_name="add_numbers_ref", a=workflow.input("a"), b=workflow.input("b"))
t3 = build_message(
task_ref_name="build_message_ref",
normalized_name=t1.output("result"),
total=t2.output("result"),
)

workflow >> t1 >> t2 >> t3

workflow.output_parameters(
output_parameters={
"message": t3.output("message"),
"normalized_name": t3.output("normalized_name"),
"total": t3.output("total"),
}
)

return workflow


class HelloRequest(BaseModel):
name: str = Field(default="World", description="Name to greet")
a: int = Field(default=1, description="First number")
b: int = Field(default=2, description="Second number")


# ---------------------------------------------------------------------------
# FastAPI app + TaskHandler lifecycle
# ---------------------------------------------------------------------------

task_handler: Optional[TaskHandler] = None
workflow_executor: Optional[WorkflowExecutor] = None
api_config: Optional[Configuration] = None


@asynccontextmanager
async def lifespan(app: FastAPI):
global task_handler, workflow_executor, api_config

api_config = Configuration()
clients = OrkesClients(configuration=api_config)
workflow_executor = clients.get_workflow_executor()

# scan_for_annotated_workers=True will pick up @worker_task functions in this module.
task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True,
# Defaults are already True, but keeping these explicit in the example:
monitor_processes=True,
restart_on_failure=True,
)
task_handler.start_processes()

try:
yield
finally:
if task_handler is not None:
task_handler.stop_processes()
task_handler = None
workflow_executor = None
api_config = None


app = FastAPI(lifespan=lifespan)


@app.get("/healthcheck")
def healthcheck():
# 503 if worker processes aren't healthy; useful for container orchestrators.
if task_handler is None:
return JSONResponse({"ok": False, "detail": "workers_not_started"}, status_code=503)

ok = task_handler.is_healthy()
payload = {
"ok": ok,
"workers": task_handler.get_worker_process_status(),
}
return JSONResponse(payload, status_code=200 if ok else 503)


@app.post("/v1/hello")
def hello(req: HelloRequest):
"""
Expose a Conductor workflow as an API:
- Builds an inline workflow definition with 3 SIMPLE tasks
- Starts it and waits up to 10 seconds for completion
- Returns workflow output as the HTTP response
"""
if task_handler is None or workflow_executor is None or api_config is None:
return JSONResponse({"ok": False, "detail": "service_not_ready"}, status_code=503)
if not task_handler.is_healthy():
return JSONResponse(
{"ok": False, "detail": "workers_unhealthy", "workers": task_handler.get_worker_process_status()},
status_code=503,
)

workflow = _build_hello_workflow(executor=workflow_executor)
payload = req.model_dump() if hasattr(req, "model_dump") else req.dict() # pydantic v2/v1

try:
run = workflow.execute(workflow_input=payload, wait_for_seconds=10)
except Exception as e:
return JSONResponse({"ok": False, "detail": "workflow_start_failed", "error": str(e)}, status_code=502)

response = {
"ok": run.status == "COMPLETED",
"workflow_id": run.workflow_id,
"status": run.status,
"output": run.output,
"ui_url": f"{api_config.ui_host}/execution/{run.workflow_id}",
}
return JSONResponse(response, status_code=200 if run.status == "COMPLETED" else 202)


if __name__ == "__main__":
import uvicorn

uvicorn.run(
"examples.fastapi_worker_service:app",
host="0.0.0.0",
port=int(os.getenv("PORT", "8081")),
workers=1,
)
12 changes: 7 additions & 5 deletions src/conductor/client/automator/json_schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ def _type_to_json_schema(type_hint, strict_schema: bool = False) -> Optional[Dic
if len(non_none_args) == 1:
# Optional[T] case
inner_schema = _type_to_json_schema(non_none_args[0], strict_schema)
if inner_schema:
# For optional, we could use oneOf or just mark as nullable
# Using nullable for simplicity
inner_schema['nullable'] = True
return inner_schema
if inner_schema is not None:
# Draft-07 JSON Schema does not support OpenAPI's `nullable`.
# Represent Optional[T] as a union with null.
if inner_schema == {}:
# "Any" already includes null, so keep it minimal.
return inner_schema
return {"anyOf": [inner_schema, {"type": "null"}]}
# Multiple non-None types in Union - too complex
return None

Expand Down
Loading
Loading