Skip to content

fix: make async row groups lazy#729

Open
eric-tramel wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
eric-tramel:codex/fix-726-lazy-row-groups
Open

fix: make async row groups lazy#729
eric-tramel wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
eric-tramel:codex/fix-726-lazy-row-groups

Conversation

@eric-tramel
Copy link
Copy Markdown
Contributor

📋 Summary

Fixes #726 by replacing eager async row-group metadata construction with a compact row-group plan. Scheduler/tracker preparation now stays proportional to the active/sparse row groups instead of materializing list and dictionary metadata for every logical row group, while preserving resume offsets for ordered seed datasets.

🔗 Related Issue

Fixes #726

🔄 Changes

  • Add compact and explicit row-group plan types for async scheduling metadata.
  • Thread row-group plans through the async builder, completion tracker, and scheduler instead of preallocating full row-group lists and lookup dictionaries.
  • Preserve original resume offsets for remaining row groups with holes, and reject corrupt resume metadata where original_target_num_records exceeds the requested target.
  • Add regression coverage for large fresh async preparation, near-complete resume sparsity, and resume offset behavior.

🧪 Testing

  • make lint-engine
  • make test-engine — 2220 passed in 36.28s
  • Unit tests added/updated
  • E2E tests: N/A — engine metadata/scheduler fix

Performance demonstration, measured locally with tracemalloc for 2,000,000 records, buffer_size=2, and 1,000,000 logical row groups:

Scenario Memory Time Notes
Simulated old list/dict metadata path 253.3 MiB peak 1.338s Kept row-group list plus tracker/scheduler/offset dictionaries alive; 4,000,000 retained metadata entries.
New compact scheduler/tracker preparation 0.018 MiB peak 0.000473s Same 1,000,000 logical row groups and 2,000,000 scheduled records.
New near-complete resume plan with two groups remaining 0.001 MiB retained 0.327970s Remaining row groups: [(999998, 2), (999999, 2)]; avoids retaining the near-full completed-ID set in the plan.

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated: N/A — internal engine scheduling metadata representation only

@eric-tramel eric-tramel requested a review from a team as a code owner June 1, 2026 23:26
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Review: PR #729 — fix: make async row groups lazy

Summary

Replaces eager list[tuple[int,int]] + dict[int,int] row-group metadata with
a lazy plan abstraction so scheduler/tracker preparation cost no longer scales
linearly with the logical row-group count. Introduces a new module
row_group_plan.py containing:

  • RowGroupPlanLike (Protocol) — the scheduler-facing interface.
  • CompactRowGroupPlan — formula-driven plan used for fresh runs and resume.
  • ExplicitRowGroupPlan — adapter for already-materialized tuples (kept for
    test/small-caller convenience).
  • normalize_row_group_plan / RowGroupInput — input coercion.

Threads the new types through AsyncTaskScheduler, CompletionTracker, and
DatasetBuilder._prepare_async_run. Drops the now-redundant
row_group_start_offsets parameter, the _rg_size_map / _rg_start_offset_map
caches in the scheduler, and the _build_row_group_start_offsets helper.
Also adds a validation guard rejecting resume metadata where
original_target_num_records > target_num_records.

The benchmark numbers in the PR description (253 MiB → 0.018 MiB peak for 1M
logical groups) are corroborated by new tracemalloc-based regression tests.

Findings

Correctness — looks solid, with one ambiguous protocol contract

  • The size/offset formulas in CompactRowGroupPlan._row_group_size_for and
    row_group_start_offset reproduce the original closed-form computation from
    build_row_group_resume_plan. Original-group offsets remain
    row_group * buffer_size regardless of holes, preserving the resume-with-
    holes invariant the previous row_group_start_offsets dict delivered.
  • The completion-density heuristic
    (valid_completed_count > total_row_groups // 2) correctly switches between
    storing scheduled IDs vs completed IDs so the in-memory filter is always
    proportional to the smaller set. The near-complete-resume regression test
    pins this behavior.
  • has_row_group is the canonical membership predicate now; _get_rg_start_offset
    in AsyncTaskScheduler swallows KeyError and returns None
    but the RowGroupPlanLike protocol declares
    row_group_start_offset(self, row_group: int) -> int with no documented
    raise contract. A short docstring (or int | None with explicit None
    semantics) on the protocol would prevent future implementations from
    silently returning 0 or similar. Minor.
  • describe_known_row_groups for the compact plan returns
    "{n} scheduled of {m} total row groups" instead of the previous
    sorted(...) of all known IDs. Resulting ValueError messages from
    CompletionTracker._validate_row_group are now less specific. Acceptable
    given the materialization cost we're avoiding, but worth flagging — anyone
    grepping logs for known IDs will see different output.

Backward compatibility

  • RowGroupResumePlan.remaining_row_groups changes type from
    list[tuple[int, int]] to CompactRowGroupPlan, and
    row_group_start_offsets is removed entirely from the dataclass. This is
    internal to data_designer.engine.dataset_builders, so the layering rule
    (interface → engine → config) protects external callers. No public surface
    appears affected.
  • precomputed_row_groups/row_groups parameters are widened to
    RowGroupInput, which still accepts Sequence[tuple[int, int]], so
    existing direct callers (and the explicit-list test) continue to work.
  • The _build_async resume completion check changed from
    len(completed_ids) >= resume_plan.total_row_groups to
    remaining_row_group_count == 0. The new form is actually more correct
    because the plan filters out-of-range completed IDs before counting.

Validation hardening

  • The new original_target_num_records > target_num_records guard in
    _load_resume_state is reasonable defense against corrupt metadata and
    has direct test coverage
    (test_build_resume_raises_when_original_target_metadata_exceeds_target).
  • CompactRowGroupPlan.__post_init__ rejects negative inputs and
    num_records < original_target. Negative-extension test covers one path;
    the others are simple invariants.

Style / project conventions

  • from __future__ import annotations, SPDX header, absolute imports,
    modern type syntax, and type annotations on all members are all present.
  • @dataclass(frozen=True, slots=True) with object.__setattr__ in
    __post_init__ is the correct pattern for derived fields on a frozen
    dataclass. InitVar is used appropriately for completed_ids.
  • No relative imports; all imports use data_designer.engine.dataset_builders.*.
  • Lazy-heavy-imports rule: row_group_plan.py only imports stdlib, so no
    concern here.

Test coverage

  • New tests:
    • test_prepare_async_run_uses_compact_plan_for_large_fresh_runs
      _prepare_async_run end-to-end with 1M groups, asserts
      CompactRowGroupPlan is propagated and peak memory < 5 MiB.
    • test_scheduler_preparation_memory_stays_bounded_for_million_row_groups
      — full scheduler construction at 1M groups under 5 MiB.
    • test_compact_row_group_plan_rejects_negative_extension — validation.
    • test_row_group_resume_plan_stays_sparse_when_almost_complete
      near-complete resume optimization.
    • test_build_resume_raises_when_original_target_metadata_exceeds_target
      — corrupt-metadata guard.
  • Existing tests updated to use CompactRowGroupPlan.resume(...) and the
    iterator/method API instead of list/dict equality.
  • Coverage gap, minor: I don't see an explicit test for the
    intermediate completion-density case (some completed, but
    <= total // 2), where the plan stores id_filter = frozenset(completed).
    This branch is exercised indirectly by the existing builder tests, but a
    direct unit test pinning _filter_includes_scheduled is False would make
    the heuristic boundary explicit.

Performance

  • Behaves as advertised: zero-allocation fresh path, sparse-allocation resume
    path. The two tracemalloc tests will catch a regression that
    reintroduces eager materialization. 5 MiB ceiling is generous enough to
    resist flakiness on shared CI without hiding real regressions.
  • row_group_min_size / row_group_max_size build a tiny candidate list on
    every property access; called from diagnostics so this is fine. If they
    show up in hot paths later they can be cached at construction.

Nits (non-blocking)

  • row_group_plan.py:412 — a one-line comment on the
    valid_completed_count > total_row_groups // 2 branch (e.g., "store the
    smaller set: scheduled IDs when most groups are complete, completed IDs
    otherwise") would help a future reader understand the heuristic without
    re-deriving it.
  • RowGroupPlanLike protocol could document row_group_size /
    row_group_start_offset raising KeyError for unknown groups
    (AsyncTaskScheduler._get_rg_size / _get_rg_start_offset rely on this).

Verdict

Approve with minor suggestions. The refactor is well-scoped, preserves the
"declare, don't orchestrate" boundary between config and engine, and has
strong regression coverage for the bug it fixes. The protocol-contract
docstring and a single intermediate-density unit test would be small
follow-ups; nothing here blocks merge.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jun 1, 2026

Greptile Summary

This PR replaces the eager O(N) pre-allocation of row-group metadata (lists, dicts of size/offset/tracker entries) with a compact, lazy CompactRowGroupPlan that computes sizes and offsets from arithmetic at lookup time. The scheduler, completion tracker, and dataset builder now all speak the new RowGroupPlanLike protocol.

  • row_group_plan.py introduces CompactRowGroupPlan (lazy, arithmetic-derived), ExplicitRowGroupPlan (adapter for test tuples), and RowGroupPlanLike (shared protocol). The plan chooses between storing completed IDs or scheduled IDs based on which set is smaller (the >½ heuristic), bounding retained memory to the sparse side of the resume frontier.
  • dataset_builder.py threads CompactRowGroupPlan through build_row_group_resume_plan and _prepare_async_run, removes row_group_start_offsets plumbing (now folded into the plan), and adds an upfront DatasetGenerationError for corrupt resume metadata where original_target_num_records > target_num_records.
  • async_scheduler.py and completion.py drop their pre-computed dicts (_rg_size_map, _rg_start_offset_map, _row_group_sizes) and delegate all lookups to the plan protocol.

Confidence Score: 5/5

Safe to merge. All production paths use CompactRowGroupPlan with correct arithmetic offsets; the memory savings are dramatic and regression-tested with tracemalloc.

The refactor is internally consistent: every lookup that previously hit a pre-built dict now goes through a protocol method whose arithmetic matches the old accumulation logic exactly, as confirmed by the resumed-offset tests. The branching heuristic (store the smaller of completed vs. scheduled IDs) is simple and correctly bounds retained memory in both the fresh and near-complete-resume cases. The new corrupt-metadata guard prevents a class of silent bugs that previously would have produced wrong sizes in the resume plan.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/row_group_plan.py New file: correct Protocol definition, clean branching logic for dense/sparse sets, arithmetic offsets are validated by tests including edge cases around the >½ frontier heuristic.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Removes pre-built size/offset dicts and row_group_start_offsets parameter; delegates all lookups to RowGroupPlanLike. No behavioral change on production code paths.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Replaces eager list/dict construction in build_row_group_resume_plan with CompactRowGroupPlan.resume; adds early DatasetGenerationError for corrupt original_target_num_records > target_num_records metadata.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/scheduling/completion.py Replaces _row_group_sizes dict with _row_group_plan; adds _row_group_size_or_default helper for the is_column_complete path that previously used .get() with a 0 default. Semantics preserved.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py Adds tracemalloc-based regression for large fresh-run preparation staying under 5 MiB peak.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Adds memory-bounded scheduler construction test for 1M row groups; migrates offset test from raw list + explicit offsets to CompactRowGroupPlan.resume.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Adds corrupt-metadata rejection test and sparse-resume memory test; updates existing assertions to use list(plan) rather than comparing against materialized lists.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[DatasetBuilder._prepare_async_run] --> B{precomputed_row_groups?}
    B -- Fresh run --> C[CompactRowGroupPlan.fresh\nnum_records / buffer_size\narithmetic, zero allocation]
    B -- Resume --> D[build_row_group_resume_plan\nCompactRowGroupPlan.resume]
    D --> E{valid_completed_count\n> total // 2?}
    E -- Dense: store scheduled IDs --> F[_id_filter = frozenset of\nremaining IDs\n_filter_includes_scheduled=True]
    E -- Sparse: store completed IDs --> G[_id_filter = frozenset of\ncompleted IDs\n_filter_includes_scheduled=False]
    C --> H[normalize_row_group_plan]
    F --> H
    G --> H
    H --> I[RowGroupPlanLike]
    I --> J[CompletionTracker.with_graph]
    I --> K[AsyncTaskScheduler.__init__\n_scheduled_records = plan.scheduled_total_rows\nno size/offset dicts]
    K --> L[_get_rg_size → plan.row_group_size]
    K --> M[_get_rg_start_offset → plan.row_group_start_offset\narithmetic: rg_id × buffer_size]
Loading

Reviews (2): Last reviewed commit: "fix: make async row groups lazy" | Re-trigger Greptile

Avoid preallocating per-row-group list and dictionary metadata for huge async runs. The async builder now passes a compact row-group plan through the completion tracker and scheduler while preserving resume offsets and explicit small-list compatibility.

Fixes NVIDIA-NeMo#726

Signed-off-by: Eric W. Tramel <1223539+eric-tramel@users.noreply.github.com>
@eric-tramel eric-tramel force-pushed the codex/fix-726-lazy-row-groups branch from 90a2323 to ff99564 Compare June 1, 2026 23:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Huge async jobs preallocate row-group metadata for the full logical dataset

1 participant