-
Notifications
You must be signed in to change notification settings - Fork 7k
Closed
Labels
rdtRay Direct TransportRay Direct Transport
Description
What happened + What you expected to happen
PR [#52938](https://github.com//pull/52938) added the ability to transfer GPU-stored objects between actors using Gloo and NCCL.I'm trying to figure out how exactly to perform this using NCCL, but the test I'm running is giving me RuntimeError: No backend type associated with device type cpu. Is this because I'm calling it wrong, and if so, what would the correct method be?
Versions / Dependencies
Installed from ray-3.0.0.dev0-cp312-cp312-manylinux2014_x86_64.whlReproduction Script
Paste and runimport torch
import torch.distributed as dist
import sys
import pytest
import ray
from ray.experimental.channel.torch_tensor_type import TorchTensorType
from ray.experimental.channel import ChannelContext
from contextlib import contextmanager
torch.manual_seed(0)
NUM_TESTS = 1
def get_default_fixure_system_config():
system_config = {
"object_timeout_milliseconds": 200,
"health_check_initial_delay_ms": 0,
"health_check_failure_threshold": 10,
"object_store_full_delay_ms": 100,
"local_gc_min_interval_s": 1,
}
return system_config
def get_default_fixture_ray_kwargs():
system_config = get_default_fixure_system_config()
ray_kwargs = {
"num_cpus": 2,
"object_store_memory": 150 * 1024 * 1024,
"dashboard_port": None,
"namespace": "default_test_namespace",
"_system_config": system_config,
}
return ray_kwargs
@contextmanager
def _ray_start(**kwargs):
init_kwargs = get_default_fixture_ray_kwargs()
init_kwargs.update(kwargs)
# Start the Ray processes.
address_info = ray.init("local", **init_kwargs)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
# Delete the cluster address just in case.
ray._common.utils.reset_ray_address()
@pytest.fixture
def ray_start_regular(request):
param = getattr(request, "param", {})
with _ray_start(**param) as res:
yield res
@ray.remote(num_gpus=1)
class TestActorNCCL:
def tensor_to_cuda(self, tensor):
self.tensor = tensor.cuda()
@ray.method(tensor_transport="nccl")
def get_tensor_ref(self):
return self.tensor
def register_custom_serializer(self):
TorchTensorType().register_custom_serializer()
def setup(self, world_size, rank):
init_method = "tcp://localhost:8890"
dist.init_process_group(
backend="nccl", world_size=world_size, rank=rank, init_method=init_method
)
@ray.method(tensor_transport="nccl")
def echo(self, data):
return data
def double(self, data):
if isinstance(data, list):
return [d * 2 for d in data].cpu()
return data * 2
def get_gpu_object(self, obj_id: str):
gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
if gpu_object_manager.has_gpu_object(obj_id):
gpu_object = gpu_object_manager.get_gpu_object(obj_id)
print(f"gpu_object: {gpu_object}")
return gpu_object
return None
def init_process_group(actors):
world_size = len(actors)
ray.get([actor.setup.remote(world_size, i) for i, actor in enumerate(actors)])
ctx = ChannelContext.get_current()
ctx.communicators[0] = actors
ray.get([actor.register_custom_serializer.remote() for actor in actors])
def test_inter_actor_gpu_tensor_transfer(ray_start_regular):
tensor = torch.randn((500, 5000))
world_size = 2
actors = [TestActorNCCL.remote() for _ in range(world_size)]
init_process_group(actors)
sender = actors[0]
receiver = actors[1]
ray.get(sender.tensor_to_cuda.remote(tensor))
ref = sender.get_tensor_ref.remote()
res = receiver.double.remote(ref)
assert ray.get(res) == pytest.approx(tensor * 2)
if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__, "--durations=0"]))
Metadata
Metadata
Assignees
Labels
rdtRay Direct TransportRay Direct Transport