Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion paddle/fluid/distributed/collective/process_group_bkcl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupBKCL::Reduce(
const phi::DenseTensor& input,
BKCLContext_t comm,
const XPUStream& stream) {
phi::DenseTensor output_t(*output);
phi::DenseTensor output_t;
paddle::framework::TensorCopy(*output, platform::XPUPlace(), &output_t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TensorCopy会自己分配空间,对吧?
之后可以考虑直接实现一个Reduce,在模型并行sharding中Reduce用的比较多;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

会自己分配空间,reduce等xccl kernel实现后我更新一下

const auto& place = input.place();
auto* calc_ctx = static_cast<phi::XPUContext*>(
platform::DeviceContextPool::Instance().Get(place));
Expand Down
64 changes: 64 additions & 0 deletions paddle/fluid/pybind/process_group_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ void ConcatDenseTensorWithType(const DeviceContext &dev_ctx,
}
}

#ifdef PADDLE_WITH_XPU
template <>
void ConcatDenseTensorWithType(const phi::XPUContext &dev_ctx,
const std::vector<phi::DenseTensor> &t_list,
phi::DenseTensor *p_out,
phi::DataType type) {
switch (type) {
case phi::DataType::FLOAT16:
ConcatDenseTensor<phi::XPUContext, phi::dtype::float16>()(
dev_ctx, t_list, p_out);
break;
case phi::DataType::FLOAT32:
ConcatDenseTensor<phi::XPUContext, float>()(dev_ctx, t_list, p_out);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it concats tensors.", type));
}
}
#endif

template <typename DeviceContext>
void SplitDenseTensorWithType(const DeviceContext &dev_ctx,
const phi::DenseTensor &t_in,
Expand Down Expand Up @@ -170,6 +191,27 @@ void SplitDenseTensorWithType(const DeviceContext &dev_ctx,
}
}

#ifdef PADDLE_WITH_XPU
template <>
void SplitDenseTensorWithType(const phi::XPUContext &dev_ctx,
const phi::DenseTensor &t_in,
std::vector<phi::DenseTensor *> *p_list,
phi::DataType type) {
switch (type) {
case phi::DataType::FLOAT16:
SplitDenseTensor<phi::XPUContext, phi::dtype::float16>()(
dev_ctx, t_in, p_list);
break;
case phi::DataType::FLOAT32:
SplitDenseTensor<phi::XPUContext, float>()(dev_ctx, t_in, p_list);
break;
default:
PADDLE_THROW(platform::errors::Unimplemented(
"Data type (%s) is not supported when it splits tensors.", type));
}
}
#endif

void ConcatTensor(const phi::DeviceContext &dev_ctx,
const std::vector<phi::DenseTensor> &tensor_list,
const experimental::Tensor *tensor) {
Expand All @@ -187,6 +229,17 @@ void ConcatTensor(const phi::DeviceContext &dev_ctx,
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat tensor since it's not support GPU, please "
"recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU
ConcatDenseTensorWithType(static_cast<const phi::XPUContext &>(dev_ctx),
tensor_list,
dense_tensor,
tensor->dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't concat tensor since it's not support XPU, please "
"recompile or reinstall Paddle with XPU support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
Expand Down Expand Up @@ -233,6 +286,17 @@ void SplitTensor(const phi::DeviceContext &dev_ctx,
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split tensor since it's not support GPU, please "
"recompile or reinstall Paddle with GPU support."));
#endif
} else if (platform::is_xpu_place(place)) {
#ifdef PADDLE_WITH_XPU
SplitDenseTensorWithType(static_cast<const phi::XPUContext &>(dev_ctx),
tensor,
&dense_list,
tensor.dtype());
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't split tensor since it's not compiled with XPU, "
"please recompile or reinstall Paddle with XPU support."));
#endif
} else if (platform::is_custom_place(place)) {
#ifdef PADDLE_WITH_CUSTOM_DEVICE
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def test_create_process_group_bkcl(self):
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
old_tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.reduce(tensor_x, 0, sync_op=True)
Expand All @@ -174,6 +175,7 @@ def test_create_process_group_bkcl(self):
paddle.device.xpu.synchronize()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
assert np.array_equal(tensor_y, old_tensor_y)
sys.stdout.write("rank {}: test reduce sum api ok\n".format(pg.rank()))


Expand Down