Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,37 +247,50 @@ class DefaultEngineAskEngineService
null
}
}

val engineCreateRequest = new EngineCreateRequest
engineCreateRequest.setLabels(engineAskRequest.getLabels)
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
engineCreateRequest.setUser(engineAskRequest.getUser)
engineCreateRequest.setProperties(engineAskRequest.getProperties)
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)

val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
val timeout =
if (engineCreateRequest.getTimeout <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
} else engineCreateRequest.getTimeout
// UseEngine requires a timeout (useEngine 需要加上超时)
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
if (null == createEngineNode) {
throw new LinkisRetryException(
AMConstant.EM_ERROR_CODE,
s"create engine${createNode.getServiceInstance} success, but to use engine failed"
)
}
logger.info(
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
)
if (null != sender) {
sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode))
if (reuseNode != null) {
logger.info(
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance."
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode"
)
if (null != sender) {
sender.send(EngineCreateSuccess(engineAskAsyncId, reuseNode, true))
logger.info(
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=true) to Entrance."
)
} else {
logger.warn(f"Task: $taskId will not send async using null sender.")
}
} else {
logger.warn(s"Task: $taskId will not send async using null sender.")
val engineCreateRequest = new EngineCreateRequest
engineCreateRequest.setLabels(engineAskRequest.getLabels)
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
engineCreateRequest.setUser(engineAskRequest.getUser)
engineCreateRequest.setProperties(engineAskRequest.getProperties)
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)

val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
val timeout =
if (engineCreateRequest.getTimeout <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
} else engineCreateRequest.getTimeout
// UseEngine requires a timeout (useEngine 需要加上超时)
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
if (null == createEngineNode) {
throw new LinkisRetryException(
AMConstant.EM_ERROR_CODE,
s"create engine${createNode.getServiceInstance} success, but to use engine failed"
)
}
logger.info(
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
)
if (null != sender) {
sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode))
logger.info(
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance."
)
} else {
logger.warn(s"Task: $taskId will not send async using null sender.")
}
}
} {
Utils.tryAndWarn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ class DefaultEngineCreateService

val engineNode = Utils.tryCatch(getEMService().createEngine(engineBuildRequest, emNode)) {
case t: Throwable =>
logger.info(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}")
logger.warn(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}", t)
val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
if (null == failedEcNode) {
logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
logger.warn(s" engineConn does not exist in db: $oldServiceInstance ")
} else {
failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
failedEcNode.getLabels.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
StringUtils.isNotBlank(templateName) && AMConfiguration.EC_REUSE_WITH_TEMPLATE_RULE_ENABLE
) {
engineScoreList = engineScoreList
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
.filter(engine => {
val oldTemplateName: String =
getValueByKeyFromProps(confTemplateNameKey, parseParamsToMap(engine.getParams))
Expand Down Expand Up @@ -276,7 +275,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe

// 过滤掉资源不满足的引擎
engineScoreList = engineScoreList
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
.filter(engine => {
val enginePythonVersion: String = getPythonVersion(parseParamsToMap(engine.getParams))
var pythonVersionMatch: Boolean = true
Expand Down
Loading