diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala index fbb5b99ce6b..39261432872 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala @@ -52,7 +52,13 @@ object ExecutorDeployment { operatorConfig.workerConfigs.foreach(workerConfig => { val workerId = workerConfig.workerId - val workerIndex = VirtualIdentityUtils.getWorkerIndex(workerId) + val workerIndex = VirtualIdentityUtils + .getWorkerIndex(workerId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when deploying executor, got: ${workerId.name}" + ) + ) val locationPreference = op.locationPreference.getOrElse(RoundRobinPreference) val preferredAddress: Address = locationPreference match { case PreferController => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index affbd786f9b..095315590f6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -289,7 +289,16 @@ class OutputManager( val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 - .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .writer( + VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id for output storage writer, got: ${actorId.name}" + ) + ) + .toString + ) .asInstanceOf[BufferedItemWriter[Tuple]] val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) this.outputPortResultWriterThreads(portId) = writerThread diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala index b4afe510306..3730dd91d37 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala @@ -39,7 +39,13 @@ class SerializationManager(val actorId: ActorVirtualIdentity) extends AmberLoggi def restoreExecutorState( chkpt: CheckpointState ): (OperatorExecutor, Iterator[(TupleLike, Option[PortIdentity])]) = { - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when restoring executor state, got: ${actorId.name}" + ) + ) val workerCount = execInitMsg.totalWorkerCount val executor = execInitMsg.opExecInitInfo match { case OpExecWithClassName(className, descString) => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 212a980e5ed..969b466a1b2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -36,7 +36,13 @@ trait InitializeExecutorHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { dp.serializationManager.setOpInitialization(req) - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when initializing executor, got: ${actorId.name}" + ) + ) cachedTotalWorkerCount = req.totalWorkerCount setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount) EmptyReturn() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala index 8ed9ebdc595..2078ff892b6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala @@ -35,7 +35,13 @@ trait UpdateExecutorHandler { request: UpdateExecutorRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when updating executor, got: ${actorId.name}" + ) + ) // Close the existing executor (if any) before replacing it to avoid resource leaks. val oldExecutor = dp.executor if (oldExecutor != null) { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 031c4b8c7f9..7c1bfafb8db 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -68,10 +68,13 @@ object VirtualIdentityUtils { } } - def getWorkerIndex(workerId: ActorVirtualIdentity): Int = { + def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = { workerId.name match { case workerNamePattern(_, _, _, idx) => - idx.toInt + Some(idx.toInt) + case _ => + // for special actorId such as SELF, CONTROLLER + None } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 4da024dd53b..a25a0c46dac 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -96,19 +96,20 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { "getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in { val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-42") - VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42 + VirtualIdentityUtils.getWorkerIndex(actor) shouldBe Some(42) } - it should "throw MatchError on non-worker actor names (current behavior)" in { - // getWorkerIndex pattern-matches on workerNamePattern with no fallback, - // so passing a special ActorVirtualIdentity like CONTROLLER or SELF - // yields scala.MatchError. Pinning this behavior here means a future - // change that adds a fallback (or a different exception) breaks this - // spec on purpose so the new contract is reviewed. + it should "return None for non-worker actor names" in { + // Special ActorVirtualIdentity values like CONTROLLER or SELF do not + // match workerNamePattern. getWorkerIndex returns None rather than + // throwing scala.MatchError, mirroring the graceful handling in + // getPhysicalOpId and toShorterString. Returning Option forces each + // caller to explicitly acknowledge the non-worker case rather than + // silently propagating a sentinel value. val controller = ActorVirtualIdentity("CONTROLLER") - assertThrows[scala.MatchError] { - VirtualIdentityUtils.getWorkerIndex(controller) - } + VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None + val self = ActorVirtualIdentity("SELF") + VirtualIdentityUtils.getWorkerIndex(self) shouldBe None } // ----- toShorterString -----