Skip to content

test(workflow-operator): add DistributedAggregationSpec for four-function contract#5792

Open
lalalastella wants to merge 1 commit into
apache:mainfrom
lalalastella:test/distributed-aggregation-spec
Open

test(workflow-operator): add DistributedAggregationSpec for four-function contract#5792
lalalastella wants to merge 1 commit into
apache:mainfrom
lalalastella:test/distributed-aggregation-spec

Conversation

@lalalastella

Copy link
Copy Markdown

What changes were proposed in this PR?

Add DistributedAggregationSpec.scala — a dedicated unit spec for DistributedAggregation in common/workflow-operator.

DistributedAggregation[P] defines four functions (init, iterate, merge, finalAgg) for data-parallel aggregation (SOSP'09 paper). There is no dedicated spec that pins this contract; accidental refactors to the case class (field renames, reordering) would break every aggregation operator silently.

The spec uses a representative average aggregation DistributedAggregation[(Double, Long)] (partial = (sum, count)) and asserts:

Step Contract verified
init() returns identity partial (0.0, 0L)
iterate folds one tuple's value into (sum + v, count + 1)
merge additive combination; commutative and associative; merge with init() is a no-op
finalAgg computes sum / count
Distributed == single-node split across two partitions → merge → finalAgg == single-node fold
Empty partition contributes init() and leaves the other partial unchanged

Tuple/Schema/Attribute helpers follow the AggregateOpSpec pattern in the same package.

No production-code changes.

Any related issues, documentation, discussions?

Closes #5777

How was this PR tested?

sbt "WorkflowOperator/testOnly org.apache.texera.amber.operator.aggregate.DistributedAggregationSpec"

Was this PR authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

…tion contract

Pins init/iterate/merge/finalAgg via a representative average aggregation
(partial = (sum, count)).  Covers the key contract: split input across two
partitions, merge the partials, and finalAgg must equal a single-node fold.
Also verifies commutativity and associativity of merge, and that an empty
partition leaves the other partial unchanged.

Closes apache#5777

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Scala unit spec in common/workflow-operator to pin the behavioral contract of DistributedAggregation (init/iterate/merge/finalAgg) using a representative “average” aggregation, ensuring distributed (partitioned + merged) results match single-partition folding.

Changes:

  • Introduces DistributedAggregationSpec.scala with contract-focused tests for init, iterate, merge, and finalAgg.
  • Verifies distributed vs single-node equivalence, including an empty-partition merge case.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +29 to +34
private val avg: DistributedAggregation[(Double, Long)] = DistributedAggregation(
init = () => (0.0, 0L),
iterate = (p, t) => (p._1 + t.getField[Double]("value"), p._2 + 1L),
merge = (a, b) => (a._1 + b._1, a._2 + b._2),
finalAgg = p => (p._1 / p._2).asInstanceOf[Object]
)
Comment on lines +53 to +64
"DistributedAggregation.iterate" should "accumulate sum and count from each tuple" in {
var p = avg.init()
p = avg.iterate(p, tuple(3.0))
p = avg.iterate(p, tuple(7.0))
p shouldBe (10.0, 2L)
}

it should "leave the partial unchanged when called on an empty sequence" in {
val p = avg.init()
// iterating zero times is the empty-partition case
p shouldBe avg.init()
}

@aglinxinyuan aglinxinyuan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address the comments.

@codecov-commenter

codecov-commenter commented Jun 19, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 53.30%. Comparing base (731d671) to head (a568f62).

Additional details and impacted files
@@            Coverage Diff            @@
##               main    #5792   +/-   ##
=========================================
  Coverage     53.30%   53.30%           
  Complexity     2668     2668           
=========================================
  Files          1098     1098           
  Lines         42532    42532           
  Branches       4575     4575           
=========================================
  Hits          22673    22673           
  Misses        18530    18530           
  Partials       1329     1329           
Flag Coverage Δ *Carryforward flag
access-control-service 70.44% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from 731d671
amber 53.81% <ø> (ø) Carriedforward from 731d671
computing-unit-managing-service 1.65% <ø> (ø)
config-service 56.71% <ø> (ø)
file-service 57.06% <ø> (ø)
frontend 48.00% <ø> (ø) Carriedforward from 731d671
pyamber 90.13% <ø> (ø) Carriedforward from 731d671
python 90.80% <ø> (ø) Carriedforward from 731d671
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 0 better · 🔴 2 worse · ⚪ 13 noise (<±5%) · 0 without baseline

Compared against main 731d671 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 396 0.242 23,902/38,366/38,366 us 🔴 +7.0% / 🔴 +9.7%
bs=100 sw=10 sl=64 810 0.495 122,956/142,329/142,329 us ⚪ within ±5% / 🔴 +9.5%
bs=1000 sw=10 sl=64 922 0.563 1,075,057/1,150,230/1,150,230 us ⚪ within ±5% / 🔴 +12.4%
Baseline details

Latest main 731d671 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 396 tuples/sec 403 tuples/sec 410.82 tuples/sec -1.7% -3.6%
bs=10 sw=10 sl=64 MB/s 0.242 MB/s 0.246 MB/s 0.251 MB/s -1.6% -3.5%
bs=10 sw=10 sl=64 p50 23,902 us 24,904 us 23,785 us -4.0% +0.5%
bs=10 sw=10 sl=64 p95 38,366 us 35,857 us 34,980 us +7.0% +9.7%
bs=10 sw=10 sl=64 p99 38,366 us 35,857 us 34,980 us +7.0% +9.7%
bs=100 sw=10 sl=64 throughput 810 tuples/sec 817 tuples/sec 891.94 tuples/sec -0.9% -9.2%
bs=100 sw=10 sl=64 MB/s 0.495 MB/s 0.499 MB/s 0.544 MB/s -0.8% -9.1%
bs=100 sw=10 sl=64 p50 122,956 us 121,526 us 112,277 us +1.2% +9.5%
bs=100 sw=10 sl=64 p95 142,329 us 138,670 us 139,802 us +2.6% +1.8%
bs=100 sw=10 sl=64 p99 142,329 us 138,670 us 139,802 us +2.6% +1.8%
bs=1000 sw=10 sl=64 throughput 922 tuples/sec 925 tuples/sec 1,041 tuples/sec -0.3% -11.4%
bs=1000 sw=10 sl=64 MB/s 0.563 MB/s 0.564 MB/s 0.635 MB/s -0.2% -11.4%
bs=1000 sw=10 sl=64 p50 1,075,057 us 1,084,658 us 972,714 us -0.9% +10.5%
bs=1000 sw=10 sl=64 p95 1,150,230 us 1,123,828 us 1,023,057 us +2.3% +12.4%
bs=1000 sw=10 sl=64 p99 1,150,230 us 1,123,828 us 1,023,057 us +2.3% +12.4%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,505.18,200,128000,396,0.242,23902.26,38366.42,38366.42
1,100,10,64,20,2468.08,2000,1280000,810,0.495,122956.43,142329.26,142329.26
2,1000,10,64,20,21684.21,20000,12800000,922,0.563,1075057.21,1150230.01,1150230.01

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

add unit test coverage for DistributedAggregation

4 participants