diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala index ef0117834cc..ef92afef593 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala @@ -72,10 +72,9 @@ case object LinkConfig { case BroadcastPartition() => BroadcastPartitioning( dataTransferBatchSize, - fromWorkerIds.zip(toWorkerIds).map { - case (fromWorkerId, toWorkerId) => - ChannelIdentity(fromWorkerId, toWorkerId, isControl = false) - } + fromWorkerIds.flatMap(fromId => + toWorkerIds.map(toId => ChannelIdentity(fromId, toId, isControl = false)) + ) ) case UnknownPartition() => diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala index 8a5f2b86520..7dd8a3de08c 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala @@ -144,13 +144,7 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers { // ----- BroadcastPartition ----- - "BroadcastPartition" should "produce a BroadcastPartitioning whose channels follow zip pairing today (current behavior)" in { - // Pin: BroadcastPartition currently uses `fromWorkerIds.zip(toWorkerIds)` - // — the SAME 1:1 pairing as OneToOnePartition. ChannelConfig in the same - // package emits a full cross product for the BroadcastPartition arm, - // which matches broadcast semantics ("each sender targets every - // receiver"). The two helpers diverge today; pinning this so a fix that - // realigns the contract surfaces here. Filed as a Bug. + "BroadcastPartition" should "produce a BroadcastPartitioning with the full sender x receiver cross product" in { val out = LinkConfig.toPartitioning( List(w1, w2, w3), List(u1, u2, u3), @@ -160,10 +154,20 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers { out shouldBe a[BroadcastPartitioning] val bp = out.asInstanceOf[BroadcastPartitioning] bp.batchSize shouldBe batch - endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3", "u3")) + endpoints(bp.channels) shouldBe Seq( + ("w1", "u1"), + ("w1", "u2"), + ("w1", "u3"), + ("w2", "u1"), + ("w2", "u2"), + ("w2", "u3"), + ("w3", "u1"), + ("w3", "u2"), + ("w3", "u3") + ) } - it should "silently truncate broadcast pairings when sides differ in length (current behavior)" in { + it should "emit the full cross product even when sender and receiver counts differ" in { val out = LinkConfig.toPartitioning( List(w1, w2, w3), List(u1, u2), @@ -171,7 +175,14 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers { batch ) val bp = out.asInstanceOf[BroadcastPartitioning] - endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2")) + endpoints(bp.channels) shouldBe Seq( + ("w1", "u1"), + ("w1", "u2"), + ("w2", "u1"), + ("w2", "u2"), + ("w3", "u1"), + ("w3", "u2") + ) } // ----- UnknownPartition -----