Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ object ExecutorDeployment {

operatorConfig.workerConfigs.foreach(workerConfig => {
val workerId = workerConfig.workerId
val workerIndex = VirtualIdentityUtils.getWorkerIndex(workerId)
val workerIndex = VirtualIdentityUtils
.getWorkerIndex(workerId)
.getOrElse(
throw new IllegalStateException(
s"Expected worker actor id when deploying executor, got: ${workerId.name}"
)
)
val locationPreference = op.locationPreference.getOrElse(RoundRobinPreference)
val preferredAddress: Address = locationPreference match {
case PreferController =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,16 @@ class OutputManager(
val bufferedItemWriter = DocumentFactory
.openDocument(storageUri)
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.writer(
VirtualIdentityUtils
.getWorkerIndex(actorId)
.getOrElse(
throw new IllegalStateException(
s"Expected worker actor id for output storage writer, got: ${actorId.name}"
)
)
.toString
)
.asInstanceOf[BufferedItemWriter[Tuple]]
val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
this.outputPortResultWriterThreads(portId) = writerThread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ class SerializationManager(val actorId: ActorVirtualIdentity) extends AmberLoggi
def restoreExecutorState(
chkpt: CheckpointState
): (OperatorExecutor, Iterator[(TupleLike, Option[PortIdentity])]) = {
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
val workerIdx = VirtualIdentityUtils
.getWorkerIndex(actorId)
.getOrElse(
throw new IllegalStateException(
s"Expected worker actor id when restoring executor state, got: ${actorId.name}"
)
)
val workerCount = execInitMsg.totalWorkerCount
val executor = execInitMsg.opExecInitInfo match {
case OpExecWithClassName(className, descString) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ trait InitializeExecutorHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
dp.serializationManager.setOpInitialization(req)
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
val workerIdx = VirtualIdentityUtils
.getWorkerIndex(actorId)
.getOrElse(
throw new IllegalStateException(
s"Expected worker actor id when initializing executor, got: ${actorId.name}"
)
)
cachedTotalWorkerCount = req.totalWorkerCount
setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
EmptyReturn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ trait UpdateExecutorHandler {
request: UpdateExecutorRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
val workerIdx = VirtualIdentityUtils
.getWorkerIndex(actorId)
.getOrElse(
throw new IllegalStateException(
s"Expected worker actor id when updating executor, got: ${actorId.name}"
)
)
// Close the existing executor (if any) before replacing it to avoid resource leaks.
val oldExecutor = dp.executor
if (oldExecutor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ object VirtualIdentityUtils {
}
}

def getWorkerIndex(workerId: ActorVirtualIdentity): Int = {
def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
workerId.name match {
case workerNamePattern(_, _, _, idx) =>
idx.toInt
Some(idx.toInt)
case _ =>
// for special actorId such as SELF, CONTROLLER
None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,20 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with Matchers {

"getWorkerIndex" should "return the trailing numeric workerId from a worker actor name" in {
val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-42")
VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42
VirtualIdentityUtils.getWorkerIndex(actor) shouldBe Some(42)
}

it should "throw MatchError on non-worker actor names (current behavior)" in {
// getWorkerIndex pattern-matches on workerNamePattern with no fallback,
// so passing a special ActorVirtualIdentity like CONTROLLER or SELF
// yields scala.MatchError. Pinning this behavior here means a future
// change that adds a fallback (or a different exception) breaks this
// spec on purpose so the new contract is reviewed.
it should "return None for non-worker actor names" in {
// Special ActorVirtualIdentity values like CONTROLLER or SELF do not
// match workerNamePattern. getWorkerIndex returns None rather than
// throwing scala.MatchError, mirroring the graceful handling in
// getPhysicalOpId and toShorterString. Returning Option forces each
// caller to explicitly acknowledge the non-worker case rather than
// silently propagating a sentinel value.
val controller = ActorVirtualIdentity("CONTROLLER")
assertThrows[scala.MatchError] {
VirtualIdentityUtils.getWorkerIndex(controller)
}
VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None
val self = ActorVirtualIdentity("SELF")
VirtualIdentityUtils.getWorkerIndex(self) shouldBe None
}

// ----- toShorterString -----
Expand Down
Loading