Skip to content

Conversation

anyadontfly
Copy link
Contributor

@anyadontfly anyadontfly commented Mar 3, 2025

Why are these changes needed?

Currently, only one dag node per actor can be bound to allreduce. When the users want to schedule output tensors from several dag nodes into one allreduce call, they would have to collect tensors into a tuple and return the tuple of tensors from a single dag node. For example,

@ray.remote
class Actor:
    def comp1(self, _):
        return torch.ones(1,)

    def comp2(self, _):
        return torch.ones(1,) * 2

    def get_results(self, *args):
        return tuple(args)

To launch allreduce on the result of comp1 and comp2, users need an additional function get_results to gather the result of comp1 and comp2.

with InputNode() as inp:
    res_comp1 = [actor.comp1.bind(inp) for actor in actors]
    res_comp2 = [actor.comp2.bind(inp) for actor in actors]
    res_tuple = [actor.get_results.bind(res_comp1[i], res_comp2[i]) for i, actor in enumerate(actors)]
    res_ar = allreduce.bind(res_tuple)

In this PR, users can simply put the result of comp1 and comp2 in a list, and launch allreduce on the list of outputs, which no longer requires the intermediate function get_results.

with InputNode() as inp:
    res_comp1 = [actor.comp1.bind(inp) for actor in actors]
    res_comp2 = [actor.comp2.bind(inp) for actor in actors]
    res_ar = allreduce.bind([res_comp1, res_comp2])

Related issue number

Meta-issue: #47983

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jcotant1 jcotant1 added the core Issues that should be addressed in Ray Core label Mar 4, 2025
Signed-off-by: Puyuan Yao <[email protected]>
Signed-off-by: Puyuan Yao <[email protected]>
@jjyao
Copy link
Collaborator

jjyao commented Apr 29, 2025

@anyadontfly could you rebase, there is merge conflict

@anyadontfly anyadontfly changed the title [WIP][core][compiled graphs] Supporting allreduce on tuple of tensors [core][compiled graphs] Supporting allreduce on list of input nodes May 22, 2025
@anyadontfly
Copy link
Contributor Author

Hi Stephanie, the PR is ready to run more tests. Thanks! @stephanie-wang

Signed-off-by: Puyuan Yao <[email protected]>
Comment on lines 101 to 113
for i in range(len(input_node_list)):
output_node = ClassMethodNode(
f"return_idx_{i}",
(collective_output_node, i),
dict(),
dict(),
{
BIND_INDEX_KEY: collective_output_node._get_bind_index(),
IS_CLASS_METHOD_OUTPUT_KEY: True,
PARENT_CLASS_NODE_KEY: actor_handle,
},
)
output_nodes.append(output_node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't quite understand this logic. I thought we should always return the same type of node, just sometimes it will be a list and sometimes it will be a nested list. Is this method now returning either a list of ClassMethodNodes or a list of CollectiveOutputNodes? If so, please update it so that we only return one type of node.

Also, please update the type signature accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the same logic as ClassMethodNode with multiple return nodes so that we can execute the nccl operation once with multiple return nodes.

If no nested list passed in (normal allreduce), _bind will return CollectiveNode which is equivalent to a ClassMethodNode with _is_class_method_output=False.

If a nested list passed in (bucketed allreduce), _bind will return ClassMethodNode with _is_class_method_output=True and set _class_method_output to the original CollectiveNode so that the nccl operation will be execute only once at runtime instead of once for each object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify this so that we always use the same logic for len(input_node_list) == 1 or N? Conceptually, it would be better if we always think of a CollectiveNode as taking in a nested list, and the only difference here should be whether we return a flat or nested list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the returned node for N input_node_lists to CollectiveNode.

I'm trying to inherit ClassMethodNode implementation of num_returns == 1 or N.

For allreducing 1 object, we can perform the allreduce and return the result from the CollectiveNode. But for allreducing N objects, we still want to perform allreduce only once on the CollectiveNode. So we need intermediate nodes to distribute results to different nodes.

If I use the same logic for 1 object or N objects, I can return an intermediate node from .bind as the case for N objects, but I think that would result in a redundant node.

Comment on lines +146 to +147
collective operation. The output tensors have the same length and order
as the input node list of the actor of this operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is confusing. There is no input node list in this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think so. The inputs and outputs are tensors in execute.

Signed-off-by: Puyuan Yao <[email protected]>
Signed-off-by: Puyuan Yao <[email protected]>
recv_buf = torch.empty_like(t)
communicator.allreduce(t, recv_buf, self._op.reduceOp)
else:
recv_buf = tuple(torch.empty_like(t) for t in send_buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allocate a separate torch tensor for each input instead of one flat tensor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the implementation here to recv_buf pointing to flat_buf to avoid unnecessary memory allocation.

Comment on lines 101 to 113
for i in range(len(input_node_list)):
output_node = ClassMethodNode(
f"return_idx_{i}",
(collective_output_node, i),
dict(),
dict(),
{
BIND_INDEX_KEY: collective_output_node._get_bind_index(),
IS_CLASS_METHOD_OUTPUT_KEY: True,
PARENT_CLASS_NODE_KEY: actor_handle,
},
)
output_nodes.append(output_node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we modify this so that we always use the same logic for len(input_node_list) == 1 or N? Conceptually, it would be better if we always think of a CollectiveNode as taking in a nested list, and the only difference here should be whether we return a flat or nested list.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks good! Please just address the comment about dtypes.

Signed-off-by: Puyuan Yao <[email protected]>
Signed-off-by: Puyuan Yao <[email protected]>
@@ -212,21 +211,25 @@ def execute(
recv_buf = torch.empty_like(t)
communicator.allreduce(t, recv_buf, self._op.reduceOp)
else:
if not all(t.dtype == send_buf[0].dtype for t in send_buf):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can support this case by using torch.view. But it's fine to do it in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks!

@stephanie-wang stephanie-wang enabled auto-merge (squash) July 8, 2025 18:24
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Jul 8, 2025
Signed-off-by: Puyuan Yao <[email protected]>
Signed-off-by: Puyuan Yao <[email protected]>
auto-merge was automatically disabled July 8, 2025 20:41

Head branch was pushed to by a user without write access

@stephanie-wang stephanie-wang merged commit 8daa431 into ray-project:master Jul 9, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution Contributed by the community core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants