Skip to content

Refactor rollout topology and rollout metadata#1933

Open
YanhuiDua wants to merge 7 commits into
InternLM:mainfrom
YanhuiDua:refine-rollout-topology
Open

Refactor rollout topology and rollout metadata#1933
YanhuiDua wants to merge 7 commits into
InternLM:mainfrom
YanhuiDua:refine-rollout-topology

Conversation

@YanhuiDua

@YanhuiDua YanhuiDua commented Jun 24, 2026

Copy link
Copy Markdown
Collaborator

Summary

这个 PR 重构了 rollout topology 和 rollout weight update metadata 的表达方式:

  1. 新增 RolloutTopology,作为启动期确定的rollout拓扑,指明有哪些rollout engine和server process;
  2. 明确 RolloutWorkerRegistry 作为 Rollout worker 运行状态的集合,包含worker url, session url, active状态等;
  3. 新增 RolloutWeightUpdateTarget 代替RolloutMetadata,它由RolloutServerProcess.weight_update_ranks 和运行时的 WorkerSnapshot.server_url/lifecycle_state 组合而成,从RolloutWorkerRegistry映射而来;
  4. 更新 RolloutWeightUpdateInfo,更新成员变量为:rollout_config、weight_update_targets、train_rank等

重构后,具体关系如下:
image

@YanhuiDua YanhuiDua force-pushed the refine-rollout-topology branch from 53c3a19 to 71a0a3a Compare June 24, 2026 09:06
@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

Comment thread xtuner/v1/rl/rollout/controller.py Outdated
Comment thread xtuner/v1/rl/rollout/rollout_topology.py Outdated
Comment thread xtuner/v1/rl/rollout/rollout_topology.py Outdated
Comment thread xtuner/v1/rl/rollout/worker_registry.py Outdated
Comment thread xtuner/v1/rl/rollout/worker_registry.py Outdated
@claude

claude Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

Clean refactor that converges scattered topology fields into RolloutTopology / ServerLaunchSpec / RolloutWorkerInitResult. The new data model is well-structured and the init flow (bind → init → register) is clear.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Issues

Critical

  • [controller.py:260-266] Removed session_url validation for request-entrypoint workers. The old code raised RuntimeError when a request-entrypoint worker returned session_url=None. This safety check should be restored.

Warning

  • [rollout_topology.py:42-50] validate_engine_launch_specs() (~90 lines of structural validation) was removed with no replacement on RolloutTopology. Invalid topologies will produce opaque downstream failures. A __post_init__ with key invariant checks would catch configuration errors early.
  • [rollout_topology.py:82-84] cached_property on frozen=True dataclass relies on CPython __dict__-bypass behavior. Will break if slots=True is ever added. Consider __post_init__ + object.__setattr__ instead.
  • [worker_registry.py:131] register_started_server does not validate that rank exists in the topology, which can cause an unguarded KeyError in _build_worker_groups()lifecycle_group_for_server_rank().

Nit

  • [worker_registry.py:76] to_legacy() return type is bare dictdict[str, Any] would be better.
  • [worker.py] Pre-existing typo self.serverl_url (extra l) carried forward — appears to be dead code.

Verdict

REQUEST_CHANGES — the removed session_url validation is a silent safety regression that should be restored.

@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

Comment thread xtuner/v1/rl/rollout/rollout_topology.py Outdated
Comment thread xtuner/v1/rl/rollout/worker_registry.py
Comment thread xtuner/v1/rl/rollout/controller.py Outdated
@claude

claude Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

The follow-up commit (patch 2/2) addresses all critical and major findings from the prior review:

  • ✅ Session URL validation for request-entrypoint workers restored
  • cached_property on frozen dataclass replaced with __post_init__ + object.__setattr__
  • to_legacy() return type annotated as dict[str, Any]

The refactor is well-structured. RolloutTopology cleanly separates the global topology (engine layout, lifecycle groups, request entrypoints) from worker-local state (ServerLaunchSpec). The bind → init → register controller flow is clearer than the old single-call approach.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Remaining Issues

Warning

  • [rollout_topology.py:57-72] __post_init__ validates duplicate server process ranks but drops most of the old validate_engine_launch_specs invariants (server rank ∈ engine_ranks, each engine has ≥1 request entrypoint, valid node_rank bounds). Suggested adding the two highest-value checks inline.

Nit

  • [worker_registry.py:180] _build_worker_groups has an unguarded lifecycle_group_for_server_rank call — a programming error in registration could surface as a confusing KeyError.
  • [controller.py:239] _dist_init_addr discard in the comprehension is non-obvious without context.

Verdict

APPROVE — the core design is sound and all prior critical findings are resolved. The remaining items are defensive suggestions, not blockers.

@YanhuiDua YanhuiDua force-pushed the refine-rollout-topology branch 3 times, most recently from 3cfb762 to 787428e Compare June 30, 2026 03:47
@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

@YanhuiDua YanhuiDua force-pushed the refine-rollout-topology branch from 787428e to bb56702 Compare June 30, 2026 04:02
@claude

claude Bot commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

Third review after commits were rebased. The topology refactor is solid: RolloutTopology cleanly encodes engine layout, ServerLaunchSpec projects worker-local state, and RolloutWeightUpdateTarget replaces the legacy dict metadata for weight updates. All prior critical findings (session_url validation, cached_property on frozen dataclass) are properly addressed.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Issues

Nit

  • [data.py] _resolve_rollout_backend keeps (rollout_config.extra_rollout_config or dict()) while lmdeploy.py and sglang.py dropped the or dict() fallback. Both work since extra_rollout_config defaults to {}, but the inconsistency may confuse readers.

Verdict

APPROVE — the data model is clean, the validation in RolloutTopology.__post_init__ is now comprehensive (covers duplicate ranks, server ranks ⊆ engine ranks, weight_update_ranks coverage, request entrypoint requirement), and the test coverage for the new topology API is thorough.

weight_update_host: str | None = None
# Optional port used by NCCL external weight update groups.
weight_update_port: int | None = None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: [Nit] This line keeps the or dict() defensive pattern:

backend = (rollout_config.extra_rollout_config or dict()).get("lmdeploy_backend", "pytorch")

But lmdeploy.py and sglang.py in this same PR dropped the or dict():

extra_config = self.config.extra_rollout_config

Since RolloutConfig.extra_rollout_config defaults to {}, both are safe — but picking one style consistently avoids confusion about whether the field can be None.

@YanhuiDua YanhuiDua changed the title Refactor rollout topology binding Refactor rollout topology and rollout metadata Jun 30, 2026
Comment thread xtuner/v1/rl/weight_update/update_weighter.py Outdated
@jayhenry

jayhenry commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

总览

当前 RolloutHealthManager 的业务复杂度是必要的:它要同时支持周期健康检查、失败阈值、训练前 shutdown barrier、sync-step restart recovery、lifecycle group 和 proxy listener。问题不在于功能多,而在于当前 Implementation 里 stop、pause、lock 这些机制代码穿插在业务流程中,导致主流程难读。

建议保持 RolloutHealthManager 这个 Module 不变,不新增一组 helper class,只通过三个改动让实现更简洁:

  1. _exclusive_lifecycle_operation() 收敛 pause + lock + stop checkpoint。
  2. _checkpoint_not_stopping() 统一 stop check。
  3. 让高层方法按业务顺序线性表达。
flowchart LR
  Public[public Interface] --> Exclusive[_exclusive_lifecycle_operation]
  Exclusive --> Registry[RolloutWorkerRegistry]
  Exclusive --> Recovery[_restart_worker_group / _shutdown_worker_group]
  Recovery --> Checkpoint[_checkpoint_not_stopping]
  Public --> Notify[lock 外 listener notify / log]
Loading

重点问题

1. pause + lock + stop checkpoint 分散

当前 restart_inactive_workers()check_and_shutdown_inactive_workers() 都需要做同一组前置控制:

  • 暂停后台 periodic health check。
  • 获取 _operation_lock,避免并发改 registry。
  • 确认 manager 没有 stop。
  • 操作结束后恢复后台检查。

如果这些逻辑散在各个方法里,读者必须同时跟踪业务流程和并发控制。更简洁的做法是用一个 context manager 表达这个固定模式:

@contextmanager
def _exclusive_lifecycle_operation(self):
    with self._background_health_checks_paused():
        with self._operation_lock:
            self._checkpoint_not_stopping()
            yield

调用方只看到业务意图:

with self._exclusive_lifecycle_operation():
    inactive_groups = self._registry.claim_inactive_groups_for_recovery()
    ...

这个改动的核心价值是 Locality:pause、lock、stop checkpoint 的规则只在一个地方维护。外部 Interface 不变,也不新增新的 public Seam。

2. stop check 语义分散

当前实现里会反复出现 _is_stopping()self._stop_event.is_set()self._stopped。这些检查不能删,因为 recovery 中有多次 Ray 调用,任何一步都可能很慢;如果 stop 已经发生,recovery 不应该继续启动 server。

但 stop check 应该统一表达为一个 checkpoint:

def _checkpoint_not_stopping(self) -> None:
    if self._is_stopping():
        raise InterruptedError

长流程只在关键阶段前后调用它:

self._checkpoint_not_stopping()
self._shutdown_worker_group(group, wait_server_down=True, best_effort=False)

self._checkpoint_not_stopping()
init_results = self._init_worker_group(group)

self._checkpoint_not_stopping()
self._offload_worker_group(group)

这样 _restart_worker_group() 不再被多层 if self._is_stopping() 打断。停止后的 best-effort cleanup 也可以集中在 except InterruptedError 中处理:

except InterruptedError:
    self._shutdown_worker_group(group, wait_server_down=False, best_effort=True)
    return False

这个改动让 stop 控制成为一个内部 Interface,而不是散落的实现细节。

3. 高层方法应该像业务流程

当前 Implementation 的阅读成本高,是因为高层方法被 pause、lock、notify、stop 细节打断。建议把高层方法写成线性业务流程。

这个规则是几个设计原则的合流:

  • Kent Beck 在《Smalltalk Best Practice Patterns》里提出 Composed Method:一个方法应由同一抽象层级的步骤组成,读起来像一段清晰叙述。
  • Robert C. Martin 在《Clean Code》里强调 The Stepdown Rule:高层函数先表达意图,细节放到后续更低层级。
  • John Ousterhout 在《A Philosophy of Software Design》里强调 Deep Module 和降低认知负担:Interface 应该隐藏复杂 Implementation,让调用方用更少信息完成更多事。

这里不能机械理解成“每一步都抽一个函数”。结合 deletion test,只有能隐藏真实规则、提高 Locality 的私有方法才值得保留;纯转发的一行 wrapper 应该内联。

restart_inactive_workers() 应该读起来像:

def restart_inactive_workers(self) -> None:
    recovered_groups = []
    failed_groups = []

    with self._exclusive_lifecycle_operation():
        groups = self._registry.claim_inactive_groups_for_recovery()
        for group in sorted(groups, key=lambda group: group.ranks):
            recovered = self._restart_worker_group(group)
            updated_group = self._registry.set_group_recovery_result(group, recovered=recovered)
            if recovered:
                self._clear_failure_counts(group.ranks)
                recovered_groups.append(updated_group or group)
            else:
                failed_groups.append(updated_group or group)

    self._notify_worker_group_recovered(tuple(recovered_groups))
    self._log_failed_recovery_groups(failed_groups)

check_and_shutdown_inactive_workers() 应该读起来像:

def check_and_shutdown_inactive_workers(self) -> None:
    with self._exclusive_lifecycle_operation():
        newly_inactive = self._check_and_mark_failed_groups_locked(fail_fast=True)
        inactive_groups = self._registry.claim_inactive_groups_for_recovery()
        failed_shutdown_groups = self._shutdown_and_mark_inactive(inactive_groups)

    self._notify_worker_group_inactive(newly_inactive)
    self._log_failed_shutdown_groups(failed_shutdown_groups)

关键点是:锁内只做 registry 状态迁移和 worker lifecycle 操作;listener 和 log 这类外部副作用放在锁外。这样主流程保持线性,副作用位置也更明确。

pr_health_managers_v2.py

Comment thread xtuner/v1/rl/rollout/controller.py Outdated
Comment thread xtuner/v1/rl/rollout/rollout_topology.py
@YanhuiDua

Copy link
Copy Markdown
Collaborator Author

@claude review

@claude

claude Bot commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Review Summary

Fourth review covering the latest two commits (patches 5/6): init()/reinit() lifecycle split and weight_transport_type replacing train_rollout_mode.

Both commits are clean refinements that address prior collaborator feedback:

  • Patch 5: init(launch_spec) vs reinit() makes first-start vs recovery explicit; register_started_servers batch simplifies the controller; session server guard moved into the worker itself (where it belongs, since only request-entrypoint workers need it).
  • Patch 6: Replacing train_rollout_mode with weight_transport_type removes an unnecessary deployment-topology abstraction from what is really a transport-protocol choice.

ProduceBatchResult impact: not affected
RoutedExperts impact: not affected

Issues

Nit

  • [rollout_topology.py:post_init] Duplicate worker_rank across server_processes within a single engine is not validated. Two processes with the same worker_rank in one engine would silently overwrite in _server_process_by_rank. Very unlikely in practice (since weight_update_ranks coverage check partly prevents this from being useful), but was checked by the old validate_engine_launch_specs. Low-priority gap.
  • [data.py:89] The (rollout_config.extra_rollout_config or dict()) defensive pattern is still present in _resolve_rollout_backend while backends dropped it. Cosmetic inconsistency noted in prior review.

Verdict

APPROVE — the refactor is well-structured, test coverage is thorough (including lifecycle, topology API, and weight-update target projection tests), and all prior findings are resolved.

Comment thread xtuner/v1/rl/rollout/lmdeploy.py
Comment thread xtuner/v1/rl/rollout/sglang.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants