Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions verl/experimental/transfer_queue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
logger = logging.getLogger(__file__)
logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN"))

_TRANSFER_QUEUE_CONTROLLER_INFOS = None
_TRANSFER_QUEUE_STORAGE_INFOS = None


class AsyncTransferQueueClient:
def __init__(
Expand Down Expand Up @@ -660,3 +663,18 @@ def process_zmq_server_info(handlers: dict[Any, Union[TransferQueueController, T
for name, handler in handlers.items():
server_info[name] = ray.get(handler.get_zmq_server_info.remote()) # type: ignore[attr-defined]
return server_info


def set_transferqueue_server_info(controller_infos: dict[Any, ZMQServerInfo], storage_infos: dict[Any, ZMQServerInfo]):
global _TRANSFER_QUEUE_CONTROLLER_INFOS, _TRANSFER_QUEUE_STORAGE_INFOS
if _TRANSFER_QUEUE_CONTROLLER_INFOS is not None and _TRANSFER_QUEUE_STORAGE_INFOS is not None:
return
_TRANSFER_QUEUE_CONTROLLER_INFOS = controller_infos
_TRANSFER_QUEUE_STORAGE_INFOS = storage_infos


def get_transferqueue_server_info():
assert _TRANSFER_QUEUE_CONTROLLER_INFOS is not None and _TRANSFER_QUEUE_STORAGE_INFOS is not None, (
"TransferQueue server infos have not been set yet."
)
return _TRANSFER_QUEUE_CONTROLLER_INFOS, _TRANSFER_QUEUE_STORAGE_INFOS
16 changes: 16 additions & 0 deletions verl/single_controller/base/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ def _query_collect_info(self, mesh_name: str):
assert mesh_name in self.__collect_dp_rank, f"{mesh_name} is not registered in {self.__class__.__name__}"
return self.__collect_dp_rank[mesh_name]

@register(dispatch_mode=Dispatch.ONE_TO_ALL, blocking=False)
def set_transferqueue_server_info(self, controller_infos, storage_infos):
"""Set the transfer queue server information for the worker.

Args:
controller_infos (list):
List of controller server information.
storage_infos (list):
List of storage unit server information.
"""
from verl.experimental.transfer_queue.client import (
set_transferqueue_server_info,
)

set_transferqueue_server_info(controller_infos, storage_infos)

@classmethod
def env_keys(cls):
"""The keys of the environment variables that are used to configure the Worker."""
Expand Down
Loading