Skip to content

Commit 413d6f7

Browse files
committed
Adding all changes before testing
1 parent 0120bb5 commit 413d6f7

File tree

7 files changed

+83
-35
lines changed

7 files changed

+83
-35
lines changed

examples/tutorials/10_agentic/10_temporal/000_hello_acp/project/run_worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from workflow import At000HelloAcpWorkflow
1010

1111

12+
1213
environment_variables = EnvironmentVariables.refresh()
1314

1415
logger = make_logger(__name__)

examples/tutorials/10_agentic/10_temporal/030_custom_activities/project/custom_activites.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,18 @@ class ProcessBatchEventsActivityParams(BaseModel):
1515
events: List[Any]
1616
batch_number: int
1717

18+
1819
REPORT_PROGRESS_ACTIVITY = "report_progress"
1920
class ReportProgressActivityParams(BaseModel):
2021
num_batches_processed: int
2122
num_batches_failed: int
2223
num_batches_running: int
24+
task_id: str
25+
26+
27+
COMPLETE_WORKFLOW_ACTIVITY = "complete_workflow"
28+
class CompleteWorkflowActivityParams(BaseModel):
29+
task_id: str
2330

2431

2532
class CustomActivities:
@@ -82,9 +89,22 @@ async def report_progress(self, params: ReportProgressActivityParams) -> None:
8289
logger.info(f"📊 Progress Update - num_batches_processed: {params.num_batches_processed}, num_batches_failed: {params.num_batches_failed}, num_batches_running: {params.num_batches_running}")
8390

8491
await adk.messages.create(
85-
task_id=task_id,
92+
task_id=params.task_id,
8693
content=TextContent(
8794
author="agent",
8895
content=f"📊 Progress Update - num_batches_processed: {params.num_batches_processed}, num_batches_failed: {params.num_batches_failed}, num_batches_running: {params.num_batches_running}",
8996
),
9097
)
98+
99+
@activity.defn(name=COMPLETE_WORKFLOW_ACTIVITY)
100+
async def complete_workflow(self, params: CompleteWorkflowActivityParams) -> None:
101+
"""
102+
This activity will complete the workflow.
103+
104+
Typically here you may do anything like:
105+
- Send a final email to the user
106+
- Send a final message to the user
107+
- Update a job status in a database to completed
108+
"""
109+
logger.info(f"🎉 Workflow Complete! Task ID: {params.task_id}")
110+

examples/tutorials/10_agentic/10_temporal/030_custom_activities/project/run_worker.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from agentex.lib.environment_variables import EnvironmentVariables
88

99
from project.workflow import At030CustomActivitiesWorkflow
10+
from project.custom_activites import CustomActivities
1011

1112

1213
environment_variables = EnvironmentVariables.refresh()
@@ -27,8 +28,17 @@ async def main():
2728
task_queue=task_queue_name,
2829
)
2930

31+
agentex_activities = get_all_activities()
32+
33+
custom_activities_use_case = CustomActivities()
34+
all_activites = [
35+
custom_activities_use_case.report_progress,
36+
custom_activities_use_case.process_batch_events,
37+
*agentex_activities,
38+
]
39+
3040
await worker.run(
31-
activities=get_all_activities(),
41+
activities=all_activites,
3242
workflow=At030CustomActivitiesWorkflow,
3343
)
3444

examples/tutorials/10_agentic/10_temporal/030_custom_activities/project/workflow.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
from typing import List, Any, override
33

44
from temporalio import workflow
5-
from pydantic import BaseModel
5+
from temporalio.common import RetryPolicy
6+
from datetime import timedelta
67

78
from agentex.lib import adk
89
from agentex.lib.types.acp import CreateTaskParams, SendEventParams
@@ -15,7 +16,7 @@
1516

1617
from project.workflow_utils import BatchProcessingUtils
1718
from project.shared_models import StateModel, IncomingEventData
18-
19+
from project.custom_activites import REPORT_PROGRESS_ACTIVITY, ReportProgressActivityParams, COMPLETE_WORKFLOW_ACTIVITY, CompleteWorkflowActivityParams
1920

2021
environment_variables = EnvironmentVariables.refresh()
2122

@@ -97,7 +98,7 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
9798
task_id=params.task.id,
9899
content=TextContent(
99100
author="agent",
100-
content=f"🚀 Starting batch processing! I'll collect events into batches of {self._batch_size} and process them using custom activities.",
101+
content=f"🚀 Starting batch processing! I'll collect events into batches of {self._batch_size} and process them using custom activities. I'll also report progress you as I go..",
101102
),
102103
)
103104

@@ -172,7 +173,39 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
172173

173174
# Wait for all remaining tasks to complete, with real-time progress updates
174175
await BatchProcessingUtils.wait_for_remaining_tasks(self._processing_tasks, self._state, params.task.id)
175-
176-
# Final summary with complete statistics
177-
await BatchProcessingUtils.send_final_summary(self._state, params.task.id)
178-
return
176+
await workflow.execute_activity(
177+
REPORT_PROGRESS_ACTIVITY,
178+
ReportProgressActivityParams(
179+
num_batches_processed=self._state.num_batches_processed,
180+
num_batches_failed=self._state.num_batches_failed,
181+
num_batches_running=0,
182+
task_id=params.task.id
183+
),
184+
start_to_close_timeout=timedelta(minutes=1),
185+
retry_policy=RetryPolicy(maximum_attempts=3)
186+
)
187+
188+
final_summary = f"✅ Workflow Complete! Final Summary:\n"
189+
f"• Batches completed successfully: {self._state.num_batches_processed}\n"
190+
f"• Batches failed: {self._state.num_batches_failed}\n"
191+
f"• Total events processed: {self._state.total_events_processed}\n"
192+
f"• Events dropped (queue full): {self._state.total_events_dropped}\n"
193+
f"📝 Tutorial completed - you learned how to use asyncio.create_task() with Temporal custom activities!"
194+
195+
await adk.messages.create(
196+
task_id=task_id,
197+
content=TextContent(
198+
author="agent",
199+
content=final_summary
200+
),
201+
)
202+
203+
await workflow.execute_activity(
204+
COMPLETE_WORKFLOW_ACTIVITY,
205+
CompleteWorkflowActivityParams(
206+
task_id=params.task.id
207+
),
208+
start_to_close_timeout=timedelta(minutes=1),
209+
retry_policy=RetryPolicy(maximum_attempts=3)
210+
)
211+

examples/tutorials/10_agentic/10_temporal/030_custom_activities/project/workflow_utils.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ async def update_progress(processing_tasks: List[asyncio.Task[Any]], state: Stat
123123
ReportProgressActivityParams(
124124
num_batches_processed=state.num_batches_processed,
125125
num_batches_failed=state.num_batches_failed,
126-
num_batches_running=len(still_running)
126+
num_batches_running=len(still_running),
127+
task_id=task_id,
127128
),
128129
start_to_close_timeout=timedelta(minutes=1),
129130
retry_policy=RetryPolicy(maximum_attempts=3)
@@ -197,22 +198,4 @@ async def wait_for_remaining_tasks(processing_tasks: List[asyncio.Task[Any]], st
197198
except asyncio.TimeoutError:
198199
# Some tasks still running, update progress and continue waiting
199200
processing_tasks[:] = await BatchProcessingUtils.update_progress(processing_tasks, state, task_id)
200-
continue
201-
202-
@staticmethod
203-
async def send_final_summary(state: Any, task_id: str) -> None:
204-
"""
205-
Send the final workflow completion summary with statistics.
206-
"""
207-
await adk.messages.create(
208-
task_id=task_id,
209-
content=TextContent(
210-
author="agent",
211-
content=f"✅ Workflow Complete! Final Summary:\n"
212-
f"• Batches completed successfully: {state.num_batches_processed}\n"
213-
f"• Batches failed: {state.num_batches_failed}\n"
214-
f"• Total events processed: {state.total_events_processed}\n"
215-
f"• Events dropped (queue full): {state.total_events_dropped}\n"
216-
f"📝 Tutorial completed - you learned how to use asyncio.create_task() with Temporal custom activities!",
217-
),
218-
)
201+
continue

examples/tutorials/10_agentic/10_temporal/030_custom_activities/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ dependencies = [
1515

1616
[project.optional-dependencies]
1717
dev = [
18+
"jupyter",
1819
"pytest",
1920
"black",
2021
"isort",
@@ -31,4 +32,4 @@ target-version = ['py312']
3132

3233
[tool.isort]
3334
profile = "black"
34-
line_length = 88
35+
line_length = 88

src/agentex/lib/cli/handlers/run_handlers.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async def cleanup_processes(self):
8686

8787

8888
async def start_temporal_worker_with_reload(
89-
worker_path: Path, env: dict[str, str], process_manager: ProcessManager
89+
worker_path: Path, env: dict[str, str], process_manager: ProcessManager, manifest_dir: Path
9090
) -> asyncio.Task[None]:
9191
"""Start temporal worker with auto-reload using watchfiles"""
9292

@@ -143,7 +143,7 @@ async def start_worker() -> asyncio.subprocess.Process:
143143
except asyncio.CancelledError:
144144
pass
145145

146-
current_process = await start_temporal_worker(worker_path, env)
146+
current_process = await start_temporal_worker(worker_path, env, manifest_dir)
147147
process_manager.add_process(current_process)
148148
console.print("[green]Temporal worker started[/green]")
149149
return current_process
@@ -222,7 +222,7 @@ async def start_acp_server(
222222

223223

224224
async def start_temporal_worker(
225-
worker_path: Path, env: dict[str, str]
225+
worker_path: Path, env: dict[str, str], manifest_dir: Path
226226
) -> asyncio.subprocess.Process:
227227
"""Start the temporal worker process"""
228228
cmd = [sys.executable, "-m", "run_worker"]
@@ -231,7 +231,7 @@ async def start_temporal_worker(
231231

232232
return await asyncio.create_subprocess_exec(
233233
*cmd,
234-
cwd=worker_path.parent, # Use worker directory as CWD for imports to work
234+
cwd=manifest_dir, # Use worker directory as CWD for imports to work
235235
env=env,
236236
stdout=asyncio.subprocess.PIPE,
237237
stderr=asyncio.subprocess.STDOUT,
@@ -320,7 +320,7 @@ async def run_agent(manifest_path: str, debug_config: "DebugConfig | None" = Non
320320
worker_task = asyncio.create_task(stream_process_output(worker_process, "WORKER"))
321321
else:
322322
# Normal mode with auto-reload
323-
worker_task = await start_temporal_worker_with_reload(file_paths["worker"], agent_env, process_manager)
323+
worker_task = await start_temporal_worker_with_reload(file_paths["worker"], agent_env, process_manager, manifest_dir)
324324
tasks.append(worker_task)
325325

326326
console.print(

0 commit comments

Comments
 (0)