-
Notifications
You must be signed in to change notification settings - Fork 395
[Fix] replace ray.get to await in async func and add timeout in ray.get #1397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
||
| # TODO: Move DEVICE to `xtuner.utils.device` | ||
| PG_READY_TIMEOUT = 30 | ||
| TRAINER_RAY_GET_TIMEOUT = 5 * 3600 # 5 hour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both PG_READY_TIMEOUT and TRAINER_RAY_GET_TIMEOUT should be configurable by environment variable. Consider create a env.py to manager cross-file env that have the same meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
下个pr汇合下所有的env,统一管理,需要讨论下env.py放在什么目录下
| self.logging_replaybuffer_state() | ||
| self.logger.info(ray.get(self.env_controller.get_rollout_stats.remote())) # type: ignore[attr-defined] | ||
| replay_buffer_stats = await self.replay_buffer.print.remote() # type: ignore[attr-defined] | ||
| rollout_stats = await self.env_controller.get_rollout_stats.remote() # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using asyncio.wait here? Since it's at the end of this method, not inside a task while loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ray 的 ObjectRef(如 actor.method.remote() 返回的对象)不能直接用于 asyncio.wait,需要经过一些转换,see at: https://docs.ray.io/en/latest/ray-core/actors/async_api.html#objectrefs-as-asyncio-futures
| await asyncio.wait_for(asyncio.gather(*waiting_tasks, return_exceptions=True), timeout=10) | ||
|
|
||
| self.logger.info(ray.get(self.env_controller.get_rollout_stats.remote())) # type: ignore[attr-defined] | ||
| rollout_stats = await self.env_controller.get_rollout_stats.remote() # type: ignore[attr-defined] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not using asyncio.wait here? Since it's at the end of this method, not inside a task while loop.
No description provided.