From 64f58f18584c4bdb0ab20adc052a1d9c2433ac4b Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Wed, 11 Mar 2026 21:49:09 +0800 Subject: [PATCH] =?UTF-8?q?#AI=20commit#=20perf:=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=BC=95=E6=93=8E=E5=88=9B=E5=BB=BA=E5=92=8C=E5=A4=8D=E7=94=A8?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 AMUtils 中引入复用线程池,将线程数设置为 5 来提升指标更新效率 - 修改 DefaultEngineAskEngineService 中的条件逻辑结构,优化引擎复用节点的处理流程 - 将 DefaultEngineCreateService 中的日志级别从 info 提升到 warn 并添加异常堆栈信息 - 在 DefaultEngineCreateService 和 DefaultEngineReuseService 中添加空值检查来避免 NPE - 使用 tryCatch 包装指标更新逻辑以提高系统稳定性 - 从引擎复用过滤条件中移除节点状态解锁检查以提升复用率 --- .../DefaultEngineAskEngineService.scala | 69 +++++++++++-------- .../engine/DefaultEngineCreateService.scala | 38 ++++++---- .../engine/DefaultEngineReuseService.scala | 39 ++++++----- .../linkis/manager/am/utils/AMUtils.scala | 10 ++- 4 files changed, 96 insertions(+), 60 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala index 06bffcc8cb..1f912b5f74 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala @@ -247,37 +247,50 @@ class DefaultEngineAskEngineService null } } - - val engineCreateRequest = new EngineCreateRequest - engineCreateRequest.setLabels(engineAskRequest.getLabels) - engineCreateRequest.setTimeout(engineAskRequest.getTimeOut) - engineCreateRequest.setUser(engineAskRequest.getUser) - engineCreateRequest.setProperties(engineAskRequest.getProperties) - engineCreateRequest.setCreateService(engineAskRequest.getCreateService) - - val createNode = engineCreateService.createEngine(engineCreateRequest, sender) - val timeout = - if (engineCreateRequest.getTimeout <= 0) { - AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong - } else engineCreateRequest.getTimeout - // UseEngine requires a timeout (useEngine 需要加上超时) - val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout) - if (null == createEngineNode) { - throw new LinkisRetryException( - AMConstant.EM_ERROR_CODE, - s"create engine${createNode.getServiceInstance} success, but to use engine failed" - ) - } - logger.info( - s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode" - ) - if (null != sender) { - sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode)) + if (reuseNode != null) { logger.info( - s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance." + s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode" ) + if (null != sender) { + sender.send(EngineCreateSuccess(engineAskAsyncId, reuseNode, true)) + logger.info( + s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=true) to Entrance." + ) + } else { + logger.warn(f"Task: $taskId will not send async using null sender.") + } } else { - logger.warn(s"Task: $taskId will not send async using null sender.") + val engineCreateRequest = new EngineCreateRequest + engineCreateRequest.setLabels(engineAskRequest.getLabels) + engineCreateRequest.setTimeout(engineAskRequest.getTimeOut) + engineCreateRequest.setUser(engineAskRequest.getUser) + engineCreateRequest.setProperties(engineAskRequest.getProperties) + engineCreateRequest.setCreateService(engineAskRequest.getCreateService) + + val createNode = engineCreateService.createEngine(engineCreateRequest, sender) + val timeout = + if (engineCreateRequest.getTimeout <= 0) { + AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong + } else engineCreateRequest.getTimeout + // UseEngine requires a timeout (useEngine 需要加上超时) + val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout) + if (null == createEngineNode) { + throw new LinkisRetryException( + AMConstant.EM_ERROR_CODE, + s"create engine${createNode.getServiceInstance} success, but to use engine failed" + ) + } + logger.info( + s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode" + ) + if (null != sender) { + sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode)) + logger.info( + s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance." + ) + } else { + logger.warn(s"Task: $taskId will not send async using null sender.") + } } } { Utils.tryAndWarn { diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index 27a961b852..111bcb9e1c 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -236,10 +236,10 @@ class DefaultEngineCreateService val engineNode = Utils.tryCatch(getEMService().createEngine(engineBuildRequest, emNode)) { case t: Throwable => - logger.info(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}") + logger.warn(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}", t) val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance) if (null == failedEcNode) { - logger.info(s" engineConn does not exist in db: $oldServiceInstance ") + logger.warn(s" engineConn does not exist in db: $oldServiceInstance ") } else { failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance)) failedEcNode.getLabels.addAll( @@ -289,18 +289,28 @@ class DefaultEngineCreateService s"Failed to update engineNode: ${t.getMessage}" ) } - if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { - val emInstance = engineNode.getServiceInstance.getInstance - val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance - // 8. Update job history metrics after successful engine creation - 异步执行 - AMUtils.updateMetricsAsync( - taskId, - resourceTicketId, - emInstance, - ecmInstance, - null, - isReuse = false - ) + Utils.tryCatch { + if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { + val emInstance = engineNode.getServiceInstance.getInstance + val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance + if ((null != emInstance) && (null != ecmInstance)) { + // 8. Update job history metrics after successful engine creation - 异步执行 + AMUtils.updateMetricsAsync( + taskId, + resourceTicketId, + emInstance, + ecmInstance, + null, + isReuse = false + ) + } else { + logger.info( + s"CreateEngine:Failed to update metrics for emInstance: $emInstance, ecmInstance: $ecmInstance" + ) + } + } + } { case e: Exception => + logger.error(s"Failed to update metrics for taskId: $taskId", e) } // 9. Add the Label of EngineConn, and add the Alias of engineConn val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel]) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala index fb79c9e062..25179296ca 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala @@ -236,7 +236,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe StringUtils.isNotBlank(templateName) && AMConfiguration.EC_REUSE_WITH_TEMPLATE_RULE_ENABLE ) { engineScoreList = engineScoreList - .filter(engine => engine.getNodeStatus == NodeStatus.Unlock) .filter(engine => { val oldTemplateName: String = getValueByKeyFromProps(confTemplateNameKey, parseParamsToMap(engine.getParams)) @@ -276,7 +275,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe // 过滤掉资源不满足的引擎 engineScoreList = engineScoreList - .filter(engine => engine.getNodeStatus == NodeStatus.Unlock) .filter(engine => { val enginePythonVersion: String = getPythonVersion(parseParamsToMap(engine.getParams)) var pythonVersionMatch: Boolean = true @@ -383,20 +381,29 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe .toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances) ) } - if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { - val engineNode = - ecResourceInfoService.getECResourceInfoRecordByInstance( - engine.getServiceInstance.getInstance - ) - // 异步更新 metrics - AMUtils.updateMetricsAsync( - taskId, - engineNode.getTicketId, - engineNode.getServiceInstance, - engineNode.getEcmInstance, - engineNode.getLogDirSuffix, - isReuse = true - ) + Utils.tryCatch { + if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { + val engineNode = + ecResourceInfoService.getECResourceInfoRecordByInstance( + engine.getServiceInstance.getInstance + ) + if (null != engineNode) { + // 异步更新 metrics + AMUtils.updateMetricsAsync( + taskId, + engineNode.getTicketId, + engineNode.getServiceInstance, + engineNode.getEcmInstance, + engineNode.getLogDirSuffix, + isReuse = true + ) + } else { + logger.info(s"ReuseEngine:Failed to update metrics for engineNode: $engineNode") + } + + } + } { case e: Exception => + logger.error(s"Failed to update metrics for taskId: $taskId", e) } engine } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala index 89084ebe9a..1b359c29a6 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala @@ -44,11 +44,16 @@ import java.io.File import java.util import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContextExecutorService import com.google.gson.JsonObject object AMUtils extends Logging { + // 优化:线程池复用,线程数设置为5 + private implicit val updateMetricsExecutor: ExecutionContextExecutorService = + Utils.newCachedExecutionContext(5, "UpdateMetrics-Thread-") + lazy val GSON = BDPJettyServerHelper.gson private val SUCCESS_FLAG = 0 @@ -409,14 +414,15 @@ object AMUtils extends Logging { import scala.concurrent.Future import scala.util.{Failure, Success} + // 优化:使用复用的线程池,线程数设置为5 Future { updateMetrics(taskId, resourceTicketId, emInstance, ecmInstance, engineLogPath, isReuse) - }(Utils.newCachedExecutionContext(1, "UpdateMetrics-Thread-")).onComplete { + }(updateMetricsExecutor).onComplete { case Success(_) => logger.debug(s"Task: $taskId metrics update completed successfully for engine: $emInstance") case Failure(t) => logger.warn(s"Task: $taskId metrics update failed for engine: $emInstance", t) - }(Utils.newCachedExecutionContext(1, "UpdateMetrics-Thread-")) + }(updateMetricsExecutor) } }