diff --git a/src/code/agent/services/gateway/handlers/prompt_handler.py b/src/code/agent/services/gateway/handlers/prompt_handler.py index 0ca51be4..06f6caaa 100644 --- a/src/code/agent/services/gateway/handlers/prompt_handler.py +++ b/src/code/agent/services/gateway/handlers/prompt_handler.py @@ -8,6 +8,7 @@ import constants from utils.logger import log from exceptions.exceptions import TaskError, InternalError +from services.metrics.task_event_emitter import TaskEventEmitter class PromptHandler: @@ -45,12 +46,21 @@ def handle_post_request(self): } }), 400 + # 注入 user_id 到 extra_data user_id = getattr(g, 'user_id', 'default') if 'extra_data' not in request_data: request_data['extra_data'] = {} request_data['extra_data'][constants.HEADER_FUNART_COMFY_USERID.lower()] = user_id + # 提取 task_id(用于事件追踪,同 ServerlessHandler 的模式) + task_id = (request.headers.get(constants.HEADER_FC_ASYNC_TASK_ID) or + request.headers.get(constants.HEADER_FC_REQUEST_ID) or + 'unknown') + + # 入口哨兵 + TaskEventEmitter.emit_submitted(task_id, "Async") + try: # 转发给GPU task_id, result = self.task_manager.forward_to_gpu_async( @@ -58,34 +68,27 @@ def handle_post_request(self): client_id=client_id ) - # 成功:返回ComfyUI格式 + # 成功:返回ComfyUI格式(completed 由 GPU 侧闭环) return jsonify({ "prompt_id": task_id, "number": 1, "node_errors": {} }) - except TaskError as e: + except (TaskError, InternalError) as e: + # 统一兜底:CPU 侧提交失败 log("ERROR", f"[PromptHandler] Task error: {e.message}") - # 返回ComfyUI格式的错误 - return jsonify({ - "error": { - "type": e.error_code, - "message": e.message - } - }), e.code - - except InternalError as e: - log("ERROR", f"[PromptHandler] Internal error: {e.message}") + TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=e.message) return jsonify({ "error": { - "type": "internal_error", + "type": e.error_code if hasattr(e, 'error_code') else "internal_error", "message": e.message } }), e.code except Exception as e: log("ERROR", f"[PromptHandler] Unexpected error: {str(e)}\n{traceback.format_exc()}") + TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=str(e)) return jsonify({ "error": { "type": "internal_error", diff --git a/src/code/agent/services/gateway/handlers/serverless_handler.py b/src/code/agent/services/gateway/handlers/serverless_handler.py index b595abd7..6b84ed71 100644 --- a/src/code/agent/services/gateway/handlers/serverless_handler.py +++ b/src/code/agent/services/gateway/handlers/serverless_handler.py @@ -8,6 +8,7 @@ import constants from utils.logger import log +from services.metrics.task_event_emitter import TaskEventEmitter class ServerlessHandler: @@ -79,6 +80,8 @@ def handle_post_request(self): log("INFO", f"[ServerlessHandler][{task_id}] Forwarding {'async' if is_async else 'sync'} request") + TaskEventEmitter.emit_submitted(task_id, "Async" if is_async else "Sync") + resp = requests.post( gpu_url, json=body, @@ -96,6 +99,7 @@ def handle_post_request(self): except Exception as e: log("ERROR", f"[ServerlessHandler][{task_id}] Internal error: {e}\n{traceback.format_exc()}") + TaskEventEmitter.emit_completed(task_id, "failed", error_type="submit_failed", error_message=str(e)) return jsonify({ "type": "error", "error_code": constants.ERROR_CODE.INTERNAL_ERROR.value, diff --git a/src/code/agent/services/metrics/__init__.py b/src/code/agent/services/metrics/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/code/agent/services/metrics/task_event_emitter.py b/src/code/agent/services/metrics/task_event_emitter.py new file mode 100644 index 00000000..171991d2 --- /dev/null +++ b/src/code/agent/services/metrics/task_event_emitter.py @@ -0,0 +1,108 @@ +""" +TaskEventEmitter + +在任务生命周期的关键阶段输出结构化 JSON 日志到 stdout, +由 FC 平台自动采集到 SLS,用于任务追踪和性能分析。 + +三条事件共用 metric_type: "art_task_event",通过 event 字段区分: + - submitted: CPU 侧,任务提交(转发 GPU)时 + - executing: GPU 侧,run() 入口收到请求时 + - completed: GPU 侧,任务完成或失败时 +""" +import json +import time +import traceback +from typing import Optional + +import constants +from utils.logger import log + +_METRIC_TYPE = "art_task_event" + + +class TaskEventEmitter: + """任务生命周期事件上报(纯静态方法,无状态,线程安全)""" + + @staticmethod + def emit_submitted(task_id: str, invocation_type: str) -> None: + """ + 任务提交事件(CPU 侧) + + Args: + task_id: 任务 ID + invocation_type: 调用类型,"Async" 或 "Sync" + """ + _emit({ + "metric_type": _METRIC_TYPE, + "task_id": task_id, + "event": "Submitted", + "invocation_type": invocation_type, + "timestamp_ms": _now_ms(), + }) + + @staticmethod + def emit_executing(task_id: str) -> int: + """ + 任务开始执行事件(GPU 侧) + + Args: + task_id: 任务 ID + + Returns: + int: 当前时间戳(毫秒),供 emit_completed 计算 duration_ms + """ + ts = _now_ms() + _emit({ + "metric_type": _METRIC_TYPE, + "task_id": task_id, + "event": "Executing", + "timestamp_ms": ts, + }) + return ts + + @staticmethod + def emit_completed( + task_id: str, + status: str, + executing_timestamp_ms: Optional[int] = None, + error_type: Optional[str] = None, + error_message: Optional[str] = None, + ) -> None: + """ + 任务完成事件(GPU 侧或 CPU 侧转发失败时) + + Args: + task_id: 任务 ID + status: "succeeded" 或 "failed" + executing_timestamp_ms: emit_executing 返回的时间戳,用于计算 duration_ms + error_type: 错误类型(成功时为 None) + error_message: 错误消息(成功时为 None) + """ + ts = _now_ms() + duration_ms = (ts - executing_timestamp_ms) if executing_timestamp_ms is not None else None + + _emit({ + "metric_type": _METRIC_TYPE, + "task_id": task_id, + "event": "Completed", + "status": status, + "timestamp_ms": ts, + "duration_ms": duration_ms, + "error_type": error_type, + "error_message": error_message, + "instance_id": constants.INSTANCE_ID, + }) + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _emit(event: dict) -> None: + """输出单行 JSON 日志,带时间戳,绝不抛异常。""" + try: + log("INFO", json.dumps(event, ensure_ascii=False)) + # 新增一行日志,用于分割ComfyUI本身的日志,避免影响日志解析 + log("INFO", "") + except Exception: + log("WARNING", f"[TaskEventEmitter] Failed to emit event: {traceback.format_exc()}") diff --git a/src/code/agent/services/serverlessapi/serverless_api_service.py b/src/code/agent/services/serverlessapi/serverless_api_service.py index 717c6ff6..10254c43 100644 --- a/src/code/agent/services/serverlessapi/serverless_api_service.py +++ b/src/code/agent/services/serverlessapi/serverless_api_service.py @@ -19,6 +19,7 @@ from uuid import uuid4, UUID from flask import request from services.serverlessapi.input_cleaner import wake as wake_input_cleaner +from services.metrics.task_event_emitter import TaskEventEmitter class ComfyUIException(Exception): @@ -661,6 +662,9 @@ def run( """ try: + executing_timestamp_ms = TaskEventEmitter.emit_executing(task_id) + log("INFO", f"[ServerlessAPI] Starting execution with task_id: {task_id}") + # 验证请求体必须是 dict if not isinstance(request_body, dict): raise ComfyUIException( @@ -884,6 +888,7 @@ def on_message(ws: websocket.WebSocket, message: str): log("DEBUG", f"saving result to store for task_id: {task_id}") self.put_status_to_store(task_id, json.dumps(result)) log("INFO", f"finished running prompt: {prompt_id}") + TaskEventEmitter.emit_completed(task_id, "succeeded", executing_timestamp_ms=executing_timestamp_ms) return result except ComfyUIException as e: self.put_status_to_store( @@ -891,6 +896,7 @@ def on_message(ws: websocket.WebSocket, message: str): json.dumps(e.response()), ) + TaskEventEmitter.emit_completed(task_id, "failed", executing_timestamp_ms=executing_timestamp_ms, error_type=e.code, error_message=str(e)) raise e except Exception as e: self.put_status_to_store( @@ -904,4 +910,5 @@ def on_message(ws: websocket.WebSocket, message: str): ), ) + TaskEventEmitter.emit_completed(task_id, "failed", executing_timestamp_ms=executing_timestamp_ms, error_type=constants.ERROR_CODE.UNCLASSIFY.value, error_message=str(e)) raise e