Skip to content

fix(jobs): correct total_count and page fill for status-filtered job lists#263

Open
maxdubrinsky wants to merge 1 commit into
mainfrom
aircore-738-jobs-status-pagination/md
Open

fix(jobs): correct total_count and page fill for status-filtered job lists#263
maxdubrinsky wants to merge 1 commit into
mainfrom
aircore-738-jobs-status-pagination/md

Conversation

@maxdubrinsky

@maxdubrinsky maxdubrinsky commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Summary

JobDispatcher.list_jobs filters by status in memory — status lives on PlatformJobAttempt, not PlatformJob, so it cannot be pushed to the entity-store query. Previously the store query was paginated with the caller's page_size over a status-free superset of the filter, and that superset's total_results was returned as the count. Consequences for any status-filtered job list:

  1. Inflated total_count — it returned the superset count (every job matching the non-status constraints), not the count of jobs whose attempt status actually matched.
  2. Under-filled / empty pagespage_size was applied to the superset, then matches were dropped after slicing, so a page could be empty even when matching jobs existed on later store pages.

Closes AIRCORE-738.

Fix

When the filter references status, drain the full status-free superset in batches (_STORE_OVERFETCH_PAGE_SIZE), narrow it in-memory, then count + slice the matched set in Python — mirroring the existing correct over-fetch path in list_steps. Filters that don't reference status keep the cheap store-pushdown path (store paginates and counts directly), so the common case is unchanged. The in-memory join now loads current attempts concurrently via _gather_attempts, so the over-fetch isn't N serial round-trips.

Testing

pytest services/core/jobs/tests323 passed, 17 skipped. New regression tests in test_dispatcher.py:

  • test_list_jobs_status_filter_total_count_excludes_non_matchingtotal_count == matched count (was the superset count).
  • test_list_jobs_status_filter_pages_fill_to_page_size — pages fill 2/2/1 and every match surfaces exactly once; total_count exact per page.
  • test_list_jobs_status_filter_no_match_zero_count[] + total_count == 0.
  • test_list_jobs_no_status_filter_paginates_via_store — guards the unchanged no-status pushdown path.

The three status tests fail against main (6==3, 10==5, 3==0) and pass with this change.

Summary by CodeRabbit

  • Bug Fixes
    • Fixed pagination and total-count behavior when filtering jobs by status so pages are filled correctly and counts reflect only status-matching jobs.
  • Tests
    • Added regression tests to verify status-filtered counting, pagination, empty-result handling, and unchanged behavior when no status filter is provided.

@maxdubrinsky maxdubrinsky requested review from a team as code owners June 10, 2026 18:58
@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

list_jobs now detects status-field filters and either pushes exact filters to the store (non-status) or drains a status-free superset in overfetch batches and applies an in-memory join/filter/sort/slice so total_count and pagination reflect only status-matching jobs.

Changes

Status-aware job list filtering

Layer / File(s) Summary
Status-aware join and filter helper
services/core/jobs/src/nmp/core/jobs/app/dispatcher.py
Adds _STORE_OVERFETCH_PAGE_SIZE and _join_and_filter_jobs which concurrently fetches each job's current attempt, constructs virtual job objects for in-memory status-aware filter evaluation, skips jobs without current attempts, and returns ordered PlatformJobResponse results.
list_jobs pagination with branching logic
services/core/jobs/src/nmp/core/jobs/app/dispatcher.py
list_jobs branches on presence of the status field in the parsed filter: non-status filters are pushed to the store for exact pagination/count; status filters drain the superset via repeated overfetch batches, apply the in-memory join/filter/sort, then slice by offset/limit in Python so total_count equals the filtered set.
Regression tests for status filtering and pagination
services/core/jobs/tests/test_dispatcher.py
Adds four async tests asserting that status filtering (on PlatformJobAttempt.status) yields correct total_count, fully filled matching-job pages under limit/offset, empty results with total_count=0 when no matches exist, and correct pagination/count behavior when no status filter is provided.
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 90.91% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed Title accurately summarizes the main change: fixing total_count and page fill logic for status-filtered job lists, which is the core purpose of the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch aircore-738-jobs-status-pagination/md

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
services/core/jobs/src/nmp/core/jobs/app/dispatcher.py (1)

343-345: ⚡ Quick win

Set iteration order is not guaranteed by Python spec.

attempt_ids is a set. _gather_attempts internally converts it to a list, but zip(attempt_ids, attempts) iterates the set again. While CPython maintains stable iteration order for unmodified sets, this isn't spec-guaranteed. Same pattern exists in list_steps.

Consider having _gather_attempts return dict[str, PlatformJobAttempt | None] directly, or convert the set to a list once and pass that to both.

♻️ Suggested fix
-        attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id}
-        attempts = await self._gather_attempts(attempt_ids)
-        attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None}
+        aid_list = [job.current_attempt_id for job in jobs if job.current_attempt_id]
+        unique_aids = list(dict.fromkeys(aid_list))  # dedupe preserving order
+        attempts = await self._gather_attempts(set(unique_aids))
+        # Build mapping using the same iteration order as _gather_attempts internally uses
+        attempt_by_id = {}
+        for aid in unique_aids:
+            # Fetch result by index would require _gather_attempts to return dict
+            pass  # Better: refactor _gather_attempts to return dict[str, Optional[...]]

Or refactor _gather_attempts to return a dict:

async def _gather_attempts(self, attempt_ids: set[str]) -> dict[str, PlatformJobAttempt | None]:
    if not attempt_ids:
        return {}
    aid_list = list(attempt_ids)
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(self.get_attempt(aid)) for aid in aid_list]
    return {aid: t.result() for aid, t in zip(aid_list, tasks)}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 343 -
345, The current code builds attempt_ids as a set and then zips attempt_ids with
the results from self._gather_attempts, which relies on matching iteration order
that the Python spec doesn't guarantee; fix by making order explicit: either
convert attempt_ids to a list once (e.g., aid_list = list(attempt_ids)) and pass
aid_list to self._gather_attempts and use aid_list to zip with attempts, or
change _gather_attempts to return a dict mapping attempt_id ->
PlatformJobAttempt (so call attempt_by_id = await
self._gather_attempts(attempt_ids)); apply the same pattern for list_steps to
avoid relying on set iteration order.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py`:
- Around line 343-345: The current code builds attempt_ids as a set and then
zips attempt_ids with the results from self._gather_attempts, which relies on
matching iteration order that the Python spec doesn't guarantee; fix by making
order explicit: either convert attempt_ids to a list once (e.g., aid_list =
list(attempt_ids)) and pass aid_list to self._gather_attempts and use aid_list
to zip with attempts, or change _gather_attempts to return a dict mapping
attempt_id -> PlatformJobAttempt (so call attempt_by_id = await
self._gather_attempts(attempt_ids)); apply the same pattern for list_steps to
avoid relying on set iteration order.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: b4989e23-b8b8-4709-8eb6-136f2e3de5ce

📥 Commits

Reviewing files that changed from the base of the PR and between 15dc9b9 and d395c12.

📒 Files selected for processing (2)
  • services/core/jobs/src/nmp/core/jobs/app/dispatcher.py
  • services/core/jobs/tests/test_dispatcher.py

@github-actions

github-actions Bot commented Jun 10, 2026

Copy link
Copy Markdown
Contributor
Suite Lines Covered Line Rate Branch Rate
Unit Tests 19049/25113 75.8% 62.4%
Integration Tests 12058/23885 50.5% 25.7%

…ists (AIRCORE-738)

Status lives on PlatformJobAttempt, not PlatformJob, so list_jobs evaluates the
status filter in-memory after the store query. Previously the store query was
paginated with the caller's page_size over a status-free *superset*, and that
superset's total_results was returned as the count. So any status-filtered job
list reported an inflated total_count (superset count, not matched count) and
under-filled pages (the page was narrowed *after* slicing, so it could be empty
even when matches existed on later store pages).

Fix: when the filter references status, drain the full status-free superset in
batches, narrow it in-memory, then count + slice the matched set in Python
(mirroring list_steps' job/source path). Filters that don't reference status
keep the cheap store-pushdown path unchanged. Current attempts are loaded
concurrently via _gather_attempts so the over-fetch isn't N serial round-trips.

Adds regression tests asserting exact total_count, page fill (2/2/1), empty
result on no match, and that the no-status pushdown path still paginates.

Signed-off-by: Max Dubrinsky <mdubrinsky@nvidia.com>
@maxdubrinsky maxdubrinsky force-pushed the aircore-738-jobs-status-pagination/md branch from d395c12 to 782d18c Compare June 10, 2026 19:22

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py`:
- Around line 418-435: The current loop accumulates all pages into candidates
and then calls await self._join_and_filter_jobs(candidates, parsed) which can
trigger an unbounded fan-out of attempt get_by_id calls; instead process each
overfetch page as it is read (or chunk candidates into batches of size
_STORE_OVERFETCH_PAGE_SIZE) and call await self._join_and_filter_jobs(...) per
page/batch to bound concurrency. Concretely, inside the while True after
receiving response, call _join_and_filter_jobs(response.data, parsed), extend a
local matched list with the returned results (or collect counts), and only after
the loop apply the final sort/slice; ensure you reference self.store.list,
_STORE_OVERFETCH_PAGE_SIZE, and _join_and_filter_jobs when modifying the logic
so no single gather spans the entire workspace.
- Around line 343-345: _join_and_filter_jobs builds attempt_by_id using zip over
attempt_ids (a set) and attempts which can mis-pair attempts; change
_gather_attempts to return a dict keyed by attempt_id (or build attempt_by_id by
mapping each returned attempt.aid to the attempt) and use that mapping in
attempt_by_id instead of zipping a set. Also address unbounded fan-out in
list_jobs: avoid fetching attempts for every job in candidates—apply in-memory
filtering/limit/sorting first (or cap the candidate set) and use a bounded
concurrency mechanism when awaiting _gather_attempts/TaskGroup for
current_attempt_id to prevent store/latency blowups; reference
functions/classes: _join_and_filter_jobs, _gather_attempts, list_jobs,
candidates, current_attempt_id, attempt_by_id, and the TaskGroup usage.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: a6c27980-8df6-4222-b5b2-97ec5d3f07b5

📥 Commits

Reviewing files that changed from the base of the PR and between d395c12 and 782d18c.

📒 Files selected for processing (2)
  • services/core/jobs/src/nmp/core/jobs/app/dispatcher.py
  • services/core/jobs/tests/test_dispatcher.py

Comment on lines +343 to +345
attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id}
attempts = await self._gather_attempts(attempt_ids)
attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix incorrect attempt ID → attempt pairing and bound status-filter fan-out in dispatcher

  • _join_and_filter_jobs rebuilds attempt_by_id using zip(attempt_ids, attempts) where attempt_ids is a set; pairing assumes set iteration order matches the list(attempt_ids) order used by _gather_attempts. Build attempt_by_id by keying on attempt ID (or return a dict from _gather_attempts) to avoid mis-matching jobs to attempts.
  • list_jobs status-filter drains the full status-free superset into candidates and then fetches one attempt per unique current_attempt_id via an unbounded TaskGroup; cap concurrency and/or avoid fetching attempts for jobs that will be discarded by in-memory filtering/slicing to prevent store/latency blowups.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 343 -
345, _join_and_filter_jobs builds attempt_by_id using zip over attempt_ids (a
set) and attempts which can mis-pair attempts; change _gather_attempts to return
a dict keyed by attempt_id (or build attempt_by_id by mapping each returned
attempt.aid to the attempt) and use that mapping in attempt_by_id instead of
zipping a set. Also address unbounded fan-out in list_jobs: avoid fetching
attempts for every job in candidates—apply in-memory filtering/limit/sorting
first (or cap the candidate set) and use a bounded concurrency mechanism when
awaiting _gather_attempts/TaskGroup for current_attempt_id to prevent
store/latency blowups; reference functions/classes: _join_and_filter_jobs,
_gather_attempts, list_jobs, candidates, current_attempt_id, attempt_by_id, and
the TaskGroup usage.

Comment on lines +418 to +435
candidates: list[PlatformJob] = []
page = 1
while True:
response = await self.store.list(
PlatformJob,
filter_operation=store_operation,
page=page,
page_size=_STORE_OVERFETCH_PAGE_SIZE,
workspace=workspace,
sort=sort_str,
)
candidates.extend(response.data)
if page >= response.pagination.total_pages:
break
page += 1

return job_outputs, response.pagination.total_results
matched = _apply_sort(await self._join_and_filter_jobs(candidates, parsed))
return matched[start : start + page_size], len(matched)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Bound the attempt-fetch fan-out in the status path.

This branch drains the full superset and then joins attempts for every candidate in one shot. On a large workspace, one status-filtered request can turn into thousands of concurrent get_by_id calls, which is enough to saturate the entity client/DB pool and make the endpoint unstable under load. Join/filter each overfetch page as it is read, or chunk the attempt fetches to the same batch size instead of doing one unbounded gather over the whole workspace.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 418 -
435, The current loop accumulates all pages into candidates and then calls await
self._join_and_filter_jobs(candidates, parsed) which can trigger an unbounded
fan-out of attempt get_by_id calls; instead process each overfetch page as it is
read (or chunk candidates into batches of size _STORE_OVERFETCH_PAGE_SIZE) and
call await self._join_and_filter_jobs(...) per page/batch to bound concurrency.
Concretely, inside the while True after receiving response, call
_join_and_filter_jobs(response.data, parsed), extend a local matched list with
the returned results (or collect counts), and only after the loop apply the
final sort/slice; ensure you reference self.store.list,
_STORE_OVERFETCH_PAGE_SIZE, and _join_and_filter_jobs when modifying the logic
so no single gather spans the entire workspace.

@maxdubrinsky maxdubrinsky changed the title fix(jobs): exact total_count and full pages for status-filtered job lists (AIRCORE-738) fix(jobs): correct total_count and page fill for status-filtered job lists Jun 10, 2026
@github-actions github-actions Bot added the fix label Jun 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant