Skip to content

Add scheduling policy and improve threading and synchronization#954

Merged
helloyongyang merged 2 commits intoModelTC:mainfrom
zhtshr:zht_dev
Mar 20, 2026
Merged

Add scheduling policy and improve threading and synchronization#954
helloyongyang merged 2 commits intoModelTC:mainfrom
zhtshr:zht_dev

Conversation

@zhtshr
Copy link
Contributor

@zhtshr zhtshr commented Mar 20, 2026

This pull request significantly refactors the data transfer and threading logic in the DataManager class in lightx2v/disagg/conn.py to improve thread safety, resource management, and scalability. The changes introduce a global transfer thread (instead of per-room transfer threads), per-room ZeroMQ sockets, and improved locking for shared state. The code is now better structured for concurrent operations and easier resource cleanup.

Key changes include:

Threading and Synchronization Improvements

  • Introduced a single global transfer thread managed by DataManager, replacing per-room transfer threads. This thread is controlled by new transfer_event and transfer_stop_event attributes, and is started only for relevant disaggregation phases/modes. ([lightx2v/disagg/conn.pyR87-L127](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R87-L127))
  • Added a pool_lock to synchronize access to shared pools (request_pool, request_status, waiting_pool), ensuring thread safety during concurrent operations. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R87-L127), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R470), [[3]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L464-R483), [[4]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R496))

ZeroMQ Socket Management

  • Replaced a single shared server_socket with per-room sockets managed in the new room_sockets dictionary. Added helper methods to create, bind, and close sockets for each room, enabling independent communication channels per room and preventing port conflicts. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R87-L127), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R253-L190))
  • Updated all socket usage in phase threads to use the correct per-room socket. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L238-R330), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L251-R340), [[3]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L346-R400), [[4]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L359-R410), [[5]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R426-R451))

Resource Cleanup and API Changes

  • Refactored the release method to clean up all threads, events, sockets, and data pointers across all rooms, ensuring a clean shutdown. The old per-room release is renamed to remove. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L136-R186), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R210-R240))
  • Added socket cleanup to end_room_threads to avoid resource leaks. ([lightx2v/disagg/conn.pyR253-L190](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R253-L190))

Port Calculation and Addressing

  • Updated port calculation for both sender and receiver sockets to include a room * 10 offset, preventing port collisions when multiple rooms are used simultaneously. Also updated status synchronization to use the new port scheme. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L228-R318), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L238-R330), [[3]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L346-R400), [[4]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L517-R534))

Minor API and Logic Adjustments

  • Simplified the DataSender and DataReceiver initialization and removed unused arguments. ([lightx2v/disagg/conn.pyL496-R513](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7L496-R513))
  • Removed redundant or obsolete per-room transfer event logic in favor of the global transfer event. ([[1]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R87-L127), [[2]](https://github.com/ModelTC/LightX2V/pull/954/files#diff-a5e5778ac7adc9b8f2c175153e932db47158abebba325b47298001bc80e89ba7R253-L190))

New Features

  • Use a round-robin policy to dispatch requests to instances.

These changes collectively make the data transfer subsystem more robust, scalable, and maintainable.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the DataManager class by refactoring its threading and synchronization mechanisms to improve stability and scalability. It moves from a per-room threading model to a single global transfer thread, introduces per-room ZeroMQ sockets to prevent port conflicts, and implements robust locking for shared data structures. Additionally, a new round-robin scheduling policy is introduced for dispatching requests, and resource cleanup procedures are made more comprehensive, ensuring a more robust and maintainable data transfer subsystem.

Highlights

  • Threading Model Refactor: The DataManager now uses a single global transfer thread, replacing the previous per-room transfer threads, improving resource management and simplifying synchronization logic.
  • ZeroMQ Socket Management: Per-room ZeroMQ sockets have been introduced, replacing a single shared socket. Helper methods were added to create, bind, and close these sockets, preventing port conflicts and enabling independent communication channels.
  • Concurrency Control: A pool_lock was added to the DataManager to ensure thread-safe access and modification of shared request pools (request_pool, request_status, waiting_pool).
  • Resource Cleanup and API Changes: The DataManager's release method was refactored for comprehensive global cleanup of threads, events, and sockets. The old per-room release method was renamed to remove.
  • Port Collision Prevention: Port calculation for sender and receiver sockets now includes a room * 10 offset, ensuring unique ports when multiple rooms are active simultaneously.
  • Scheduling Policy Introduction: A new RoundRobinPolicy was implemented and integrated into the ControllerService to dispatch requests to instances in a round-robin fashion.
  • Service Method Renaming: The exec_request method in EncoderService, DecoderService, and TransformerService was renamed to run for consistency.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@helloyongyang helloyongyang merged commit 04e5e10 into ModelTC:main Mar 20, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants