From 0a9f11e2553fa2afc7e02c9d3e9ee0c5b9867800 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Sat, 9 May 2026 20:25:45 -0700 Subject: [PATCH 1/3] return -1 from getWorkerIndex for non-worker actor IDs instead of throwing MatchError --- .../texera/amber/util/VirtualIdentityUtils.scala | 3 +++ .../amber/util/VirtualIdentityUtilsSpec.scala | 15 ++++++--------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 031c4b8c7f9..943fd9c3f3f 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -72,6 +72,9 @@ object VirtualIdentityUtils { workerId.name match { case workerNamePattern(_, _, _, idx) => idx.toInt + case _ => + // for special actorId such as SELF, CONTROLLER + -1 } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index 4da024dd53b..ea22027e341 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -99,16 +99,13 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42 } - it should "throw MatchError on non-worker actor names (current behavior)" in { - // getWorkerIndex pattern-matches on workerNamePattern with no fallback, - // so passing a special ActorVirtualIdentity like CONTROLLER or SELF - // yields scala.MatchError. Pinning this behavior here means a future - // change that adds a fallback (or a different exception) breaks this - // spec on purpose so the new contract is reviewed. + it should "fall back to -1 for non-worker actor names" in { + // Special ActorVirtualIdentity values like CONTROLLER or SELF do not + // match workerNamePattern. getWorkerIndex returns -1 as a sentinel + // rather than throwing scala.MatchError, mirroring the graceful + // handling in getPhysicalOpId and toShorterString. val controller = ActorVirtualIdentity("CONTROLLER") - assertThrows[scala.MatchError] { - VirtualIdentityUtils.getWorkerIndex(controller) - } + VirtualIdentityUtils.getWorkerIndex(controller) shouldBe -1 } // ----- toShorterString ----- From e291b1d6b761042f032d33eb3b2c4826f93b6d13 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 12 May 2026 12:41:54 -0700 Subject: [PATCH 2/3] Force every caller to handle non-worker cases with clear message --- .../architecture/common/ExecutorDeployment.scala | 8 +++++++- .../messaginglayer/OutputManager.scala | 11 ++++++++++- .../worker/managers/SerializationManager.scala | 8 +++++++- .../InitializeExecutorHandler.scala | 8 +++++++- .../promisehandlers/UpdateExecutorHandler.scala | 8 +++++++- .../texera/amber/util/VirtualIdentityUtils.scala | 6 +++--- .../amber/util/VirtualIdentityUtilsSpec.scala | 14 ++++++++------ 7 files changed, 49 insertions(+), 14 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala index cf41297c981..e9a731c0fcd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala @@ -52,7 +52,13 @@ object ExecutorDeployment { operatorConfig.workerConfigs.foreach(workerConfig => { val workerId = workerConfig.workerId - val workerIndex = VirtualIdentityUtils.getWorkerIndex(workerId) + val workerIndex = VirtualIdentityUtils + .getWorkerIndex(workerId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when deploying executor, got: ${workerId.name}" + ) + ) val locationPreference = op.locationPreference.getOrElse(RoundRobinPreference) val preferredAddress: Address = locationPreference match { case PreferController => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index affbd786f9b..095315590f6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -289,7 +289,16 @@ class OutputManager( val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 - .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .writer( + VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id for output storage writer, got: ${actorId.name}" + ) + ) + .toString + ) .asInstanceOf[BufferedItemWriter[Tuple]] val writerThread = new OutputPortResultWriterThread(bufferedItemWriter) this.outputPortResultWriterThreads(portId) = writerThread diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala index b4afe510306..3730dd91d37 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala @@ -39,7 +39,13 @@ class SerializationManager(val actorId: ActorVirtualIdentity) extends AmberLoggi def restoreExecutorState( chkpt: CheckpointState ): (OperatorExecutor, Iterator[(TupleLike, Option[PortIdentity])]) = { - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when restoring executor state, got: ${actorId.name}" + ) + ) val workerCount = execInitMsg.totalWorkerCount val executor = execInitMsg.opExecInitInfo match { case OpExecWithClassName(className, descString) => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 212a980e5ed..969b466a1b2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -36,7 +36,13 @@ trait InitializeExecutorHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { dp.serializationManager.setOpInitialization(req) - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when initializing executor, got: ${actorId.name}" + ) + ) cachedTotalWorkerCount = req.totalWorkerCount setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount) EmptyReturn() diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala index 8ed9ebdc595..2078ff892b6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala @@ -35,7 +35,13 @@ trait UpdateExecutorHandler { request: UpdateExecutorRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { - val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + val workerIdx = VirtualIdentityUtils + .getWorkerIndex(actorId) + .getOrElse( + throw new IllegalStateException( + s"Expected worker actor id when updating executor, got: ${actorId.name}" + ) + ) // Close the existing executor (if any) before replacing it to avoid resource leaks. val oldExecutor = dp.executor if (oldExecutor != null) { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala index 943fd9c3f3f..7c1bfafb8db 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala @@ -68,13 +68,13 @@ object VirtualIdentityUtils { } } - def getWorkerIndex(workerId: ActorVirtualIdentity): Int = { + def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = { workerId.name match { case workerNamePattern(_, _, _, idx) => - idx.toInt + Some(idx.toInt) case _ => // for special actorId such as SELF, CONTROLLER - -1 + None } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index ea22027e341..c072ec68c46 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -96,16 +96,18 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { "getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in { val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-42") - VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42 + VirtualIdentityUtils.getWorkerIndex(actor) shouldBe Some(42) } - it should "fall back to -1 for non-worker actor names" in { + it should "return None for non-worker actor names" in { // Special ActorVirtualIdentity values like CONTROLLER or SELF do not - // match workerNamePattern. getWorkerIndex returns -1 as a sentinel - // rather than throwing scala.MatchError, mirroring the graceful - // handling in getPhysicalOpId and toShorterString. + // match workerNamePattern. getWorkerIndex returns None rather than + // throwing scala.MatchError, mirroring the graceful handling in + // getPhysicalOpId and toShorterString. Returning Option forces each + // caller to explicitly acknowledge the non-worker case rather than + // silently propagating a sentinel value. val controller = ActorVirtualIdentity("CONTROLLER") - VirtualIdentityUtils.getWorkerIndex(controller) shouldBe -1 + VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None } // ----- toShorterString ----- From c2c47f9e3f15db5697645963570bf54ce6df4ce1 Mon Sep 17 00:00:00 2001 From: Matthew Ball Date: Tue, 12 May 2026 12:43:31 -0700 Subject: [PATCH 3/3] Added a SELF assertion alongside CONTROLLER so the test matches the comment. --- .../org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala index c072ec68c46..a25a0c46dac 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala @@ -108,6 +108,8 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers { // silently propagating a sentinel value. val controller = ActorVirtualIdentity("CONTROLLER") VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None + val self = ActorVirtualIdentity("SELF") + VirtualIdentityUtils.getWorkerIndex(self) shouldBe None } // ----- toShorterString -----