Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/code/agent/services/gateway/handlers/prompt_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import constants
from utils.logger import log
from exceptions.exceptions import TaskError, InternalError
from services.metrics.task_event_emitter import TaskEventEmitter


class PromptHandler:
Expand Down Expand Up @@ -45,47 +46,49 @@ def handle_post_request(self):
}
}), 400


# 注入 user_id 到 extra_data
user_id = getattr(g, 'user_id', 'default')
if 'extra_data' not in request_data:
request_data['extra_data'] = {}
request_data['extra_data'][constants.HEADER_FUNART_COMFY_USERID.lower()] = user_id

# 提取 task_id(用于事件追踪,同 ServerlessHandler 的模式)
task_id = (request.headers.get(constants.HEADER_FC_ASYNC_TASK_ID) or
request.headers.get(constants.HEADER_FC_REQUEST_ID) or
'unknown')

# 入口哨兵
TaskEventEmitter.emit_submitted(task_id, "Async")

try:
# 转发给GPU
task_id, result = self.task_manager.forward_to_gpu_async(
request_body=request_data,
client_id=client_id
)

# 成功:返回ComfyUI格式
# 成功:返回ComfyUI格式(completed 由 GPU 侧闭环)
return jsonify({
"prompt_id": task_id,
"number": 1,
"node_errors": {}
})

except TaskError as e:
except (TaskError, InternalError) as e:
# 统一兜底:CPU 侧提交失败
log("ERROR", f"[PromptHandler] Task error: {e.message}")
# 返回ComfyUI格式的错误
return jsonify({
"error": {
"type": e.error_code,
"message": e.message
}
}), e.code

except InternalError as e:
log("ERROR", f"[PromptHandler] Internal error: {e.message}")
TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=e.message)
return jsonify({
"error": {
"type": "internal_error",
"type": e.error_code if hasattr(e, 'error_code') else "internal_error",
"message": e.message
}
}), e.code

except Exception as e:
log("ERROR", f"[PromptHandler] Unexpected error: {str(e)}\n{traceback.format_exc()}")
TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=str(e))
return jsonify({
"error": {
"type": "internal_error",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import constants
from utils.logger import log
from services.metrics.task_event_emitter import TaskEventEmitter


class ServerlessHandler:
Expand Down Expand Up @@ -79,6 +80,8 @@ def handle_post_request(self):

log("INFO", f"[ServerlessHandler][{task_id}] Forwarding {'async' if is_async else 'sync'} request")

TaskEventEmitter.emit_submitted(task_id, "Async" if is_async else "Sync")

resp = requests.post(
gpu_url,
json=body,
Expand All @@ -96,6 +99,7 @@ def handle_post_request(self):

except Exception as e:
log("ERROR", f"[ServerlessHandler][{task_id}] Internal error: {e}\n{traceback.format_exc()}")
TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=str(e))
return jsonify({
"type": "error",
"error_code": constants.ERROR_CODE.INTERNAL_ERROR.value,
Expand Down
Empty file.
108 changes: 108 additions & 0 deletions src/code/agent/services/metrics/task_event_emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
TaskEventEmitter

在任务生命周期的关键阶段输出结构化 JSON 日志到 stdout,
由 FC 平台自动采集到 SLS,用于任务追踪和性能分析。

三条事件共用 metric_type: "art_task_event",通过 event 字段区分:
- submitted: CPU 侧,任务提交(转发 GPU)时
- executing: GPU 侧,run() 入口收到请求时
- completed: GPU 侧,任务完成或失败时
"""
import json
import time
import traceback
from typing import Optional

import constants
from utils.logger import log

_METRIC_TYPE = "art_task_event"


class TaskEventEmitter:
"""任务生命周期事件上报(纯静态方法,无状态,线程安全)"""

@staticmethod
def emit_submitted(task_id: str, invocation_type: str) -> None:
"""
任务提交事件(CPU 侧)

Args:
task_id: 任务 ID
invocation_type: 调用类型,"Async" 或 "Sync"
"""
_emit({
"metric_type": _METRIC_TYPE,
"task_id": task_id,
"event": "Submitted",
"invocation_type": invocation_type,
"timestamp_ms": _now_ms(),
})

@staticmethod
def emit_executing(task_id: str) -> int:
"""
任务开始执行事件(GPU 侧)

Args:
task_id: 任务 ID

Returns:
int: 当前时间戳(毫秒),供 emit_completed 计算 duration_ms
"""
ts = _now_ms()
_emit({
"metric_type": _METRIC_TYPE,
"task_id": task_id,
"event": "Executing",
"timestamp_ms": ts,
})
return ts

@staticmethod
def emit_completed(
task_id: str,
status: str,
executing_timestamp_ms: Optional[int] = None,
error_type: Optional[str] = None,
error_message: Optional[str] = None,
) -> None:
"""
任务完成事件(GPU 侧或 CPU 侧转发失败时)

Args:
task_id: 任务 ID
status: "succeeded" 或 "failed"
executing_timestamp_ms: emit_executing 返回的时间戳,用于计算 duration_ms
error_type: 错误类型(成功时为 None)
error_message: 错误消息(成功时为 None)
"""
ts = _now_ms()
duration_ms = (ts - executing_timestamp_ms) if executing_timestamp_ms is not None else None

_emit({
"metric_type": _METRIC_TYPE,
"task_id": task_id,
"event": "Completed",
"status": status,
"timestamp_ms": ts,
"duration_ms": duration_ms,
"error_type": error_type,
"error_message": error_message,
"instance_id": constants.INSTANCE_ID,
})


def _now_ms() -> int:
return int(time.time() * 1000)


def _emit(event: dict) -> None:
"""输出单行 JSON 日志,带时间戳,绝不抛异常。"""
try:
log("INFO", json.dumps(event, ensure_ascii=False))
# 新增一行日志,用于分割ComfyUI本身的日志,避免影响日志解析
log("INFO", "")
except Exception:
log("WARNING", f"[TaskEventEmitter] Failed to emit event: {traceback.format_exc()}")
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from uuid import uuid4, UUID
from flask import request
from services.serverlessapi.input_cleaner import wake as wake_input_cleaner
from services.metrics.task_event_emitter import TaskEventEmitter


class ComfyUIException(Exception):
Expand Down Expand Up @@ -661,6 +662,9 @@ def run(
"""

try:
executing_timestamp_ms = TaskEventEmitter.emit_executing(task_id)
log("INFO", f"[ServerlessAPI] Starting execution with task_id: {task_id}")

# 验证请求体必须是 dict
if not isinstance(request_body, dict):
raise ComfyUIException(
Expand Down Expand Up @@ -884,13 +888,15 @@ def on_message(ws: websocket.WebSocket, message: str):
log("DEBUG", f"saving result to store for task_id: {task_id}")
self.put_status_to_store(task_id, json.dumps(result))
log("INFO", f"finished running prompt: {prompt_id}")
TaskEventEmitter.emit_completed(task_id, "succeeded", executing_timestamp_ms=executing_timestamp_ms)
return result
except ComfyUIException as e:
self.put_status_to_store(
task_id,
json.dumps(e.response()),
)

TaskEventEmitter.emit_completed(task_id, "failed", executing_timestamp_ms=executing_timestamp_ms, error_type=e.code, error_message=str(e))
raise e
except Exception as e:
self.put_status_to_store(
Expand All @@ -904,4 +910,5 @@ def on_message(ws: websocket.WebSocket, message: str):
),
)

TaskEventEmitter.emit_completed(task_id, "failed", executing_timestamp_ms=executing_timestamp_ms, error_type=constants.ERROR_CODE.UNCLASSIFY.value, error_message=str(e))
raise e