-
Notifications
You must be signed in to change notification settings - Fork 76
fix: use tcp store_based_barrier to control p2p update synchronization #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6cf03ff to
e9325bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the process group management in the ParameterServer.update method by introducing a TCP store-based barrier and using PyTorch's subgroup functionality instead of custom process group initialization for rank subsets.
Key changes:
- Replaced custom
init_process_group_for_rankswithdist.new_group()for creating rank subgroups - Added
store_based_barriermethod to synchronize all ranks using TCP store - Removed
_get_bcast_rank_mapfunction, now using global ranks directly with subgroups - All collective operations now use the
ranks_groupparameter
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
6980e59 to
40e03b3
Compare
| if ranks_group: | ||
| dist.destroy_process_group(ranks_group) | ||
| if self._auto_pg and dist.is_initialized(): | ||
| dist.destroy_process_group() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? I think the GPU mem from NCCL may be released after dist.destroy_process_group(ranks_group) and dist.destroy_process_group() may not be necessary. Please test and check whether my view is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. If only dist.destroy_process_group(ranks_group) is called, 1306MB will remain, while 980MB for both are called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, we can only call dist.destroy_process_group(). When no arguments are given, it will destroy all process groups, including ranks_group
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the case where auto_pg == False, I think we'd better not to leave ranks_group not destroyed
| self.init_process_group() | ||
|
|
||
| # if both ranks is None or [], it will use fully broadcast to update to all ranks | ||
| ranks_group = dist.new_group(ranks if ranks else None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause compatible problem. If user does not use auto pg and init process group only in ranks by using the same logic in like init_process_group_for_ranks, this will break.
But whether we should be compatible with this situation may need to discuss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume that if the user initialize the PG by themselve, the ranks param should also correspond to the PG? In which case it should be OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm maybe I was wrong.
Is there any document about, in case of not _auto_pg, which ranks should form a PG & which ranks should call update & the meaning of ranks?
5b68376 to
5c38df8
Compare
3b6334f to
1bfca4a
Compare
1bfca4a to
214fc86
Compare
Act a TCP Store based barrier in
ParameterServer.updatemethod. Rewrite the logic of process management. Deprecate the logicif self._rank not in ranks: return. Do a_store_based_barrieramong all ranks to make sure a synchronization before they quitupdatemethod. Also all communication are done in a sub group, deprecating_get_bcast_rank_mapandinit_process_group_for_ranksmethods