@@ -56,29 +56,35 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
5656 local_size_(local_size),
5757 gloo_rank_(gloo_rank),
5858 gloo_size_(gloo_size),
59- with_switch_(with_switch) {
59+ with_switch_(with_switch),
60+ switch_endpoint_(switch_endpoint) {
6061#if defined(PADDLE_WITH_NCCL)
6162 inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
6263 IGNORE_ID);
6364#elif defined(PADDLE_WITH_ASCEND_CL)
6465 inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
6566 IGNORE_ID);
6667#else
67- PADDLE_THROW (platform::errors::InvalidArgument (
68+ PADDLE_THROW (platform::errors::Fatal (
6869 " ProcessGroupHeter only supports NCCL and HCCL now." );
6970#endif
70- if (with_switch_) {
71- // TODO(sandyhouse) starts a client to connect the cloud switch module
72- // std::shared_ptr<HeterClient> client_ =
73- // HeterClient::GetInstance({switch_endpoint}, {}, 0);
74- } else if (local_rank_ == 0 ) {
71+ if (local_rank_ == 0 && !with_switch_) {
7572 auto opts = ProcessGroupGloo::GlooOptions::create ();
7673 opts->device = ProcessGroupGloo::createDefaultDevice ();
7774 inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
7875 gloo_size_, IGNORE_ID, opts);
7976 }
8077}
8178
79+ template <typename T>
80+ static void _do_add (T* dst, T* src, size_t size) {
81+ for (size_t i = 0 ; i < size; i++) {
82+ *dst += *src;
83+ dst++;
84+ src++;
85+ }
86+ }
87+
8288std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce (
8389 std::vector<Tensor>& tensors, const AllreduceOptions& opts) {
8490#if defined(PADDLE_WITH_NCCL)
@@ -93,33 +99,92 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
9399
94100 // Step2: copy tensors to CPU
95101 if (local_rank_ == 0 ) {
96- std::vector<Tensor> cpu_tensors (tensors.size ());
102+ std::vector<Tensor> cpu_tensors;
103+ cpu_tensors.reserve (tensors.size ());
97104 for (size_t i = 0 ; i < tensors.size (); i++) {
98105 auto dense_gpu_tensor =
99106 std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl ());
100- auto dense_cpu_tensor =
101- std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl ());
102- dense_cpu_tensor->Resize (tensors[i].dims ());
107+ phi::DenseTensorMeta meta = phi::DenseTensorMeta (
108+ dense_gpu_tensor->dtype (), dense_gpu_tensor->dims ());
109+ std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
110+ std::make_shared<phi::DenseTensor>(
111+ std::make_unique<paddle::experimental::DefaultAllocator>(
112+ paddle::platform::CPUPlace ())
113+ .get (),
114+ meta);
115+ dense_cpu_tensor->ResizeAndAllocate (dense_gpu_tensor->dims ());
116+ cpu_tensors[i] = paddle::experimental::Tensor (dense_cpu_tensor);
103117 framework::TensorCopySync (*dense_gpu_tensor, platform::CPUPlace (),
104118 dense_cpu_tensor.get ());
105119 }
106120 // Step3: do inter cluster allreduce
107121 if (with_switch_) {
108- // TODO(sandyhouse) send to and recv from switch, and do add
122+ if (local_rank_ == 0 ) {
123+ HeterClient* client_ =
124+ HeterClient::GetInstance ({switch_endpoint_}, {}, 0 ).get ();
125+ auto dense_cpu_tensor =
126+ std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0 ].impl ());
127+ std::vector<int > send_size;
128+ send_size.push_back (dense_cpu_tensor->numel ());
129+ int ret = client_->Send (
130+ gid_, {dense_cpu_tensor->name ()}, send_size,
131+ dense_cpu_tensor->data (),
132+ dense_cpu_tensor->numel () *
133+ framework::DataTypeSize (dense_cpu_tensor->dtype ()));
134+ PADDLE_ENFORCE_EQ (ret, 0 , platform::errors::PreconditionNotMet (
135+ " Send to the switch module error." ));
136+ phi::DenseTensorMeta meta = phi::DenseTensorMeta (
137+ dense_cpu_tensor->dtype (), dense_cpu_tensor->dims ());
138+ std::shared_ptr<phi::DenseTensor> dense_cpu_tensor2 =
139+ std::make_shared<phi::DenseTensor>(
140+ std::make_unique<paddle::experimental::DefaultAllocator>(
141+ paddle::platform::CPUPlace ())
142+ .get (),
143+ meta);
144+ dense_cpu_tensor2->ResizeAndAllocate (dense_cpu_tensor->dims ());
145+ Tensor cpu_tensor_temp =
146+ paddle::experimental::Tensor (dense_cpu_tensor2);
147+ ret = client_->Recv (
148+ gid_, {dense_cpu_tensor->name ()}, dense_cpu_tensor2->data (),
149+ dense_cpu_tensor2->numel () *
150+ framework::DataTypeSize (dense_cpu_tensor2->dtype ()));
151+ PADDLE_ENFORCE_EQ (ret, 0 , platform::errors::PreconditionNotMet (
152+ " Recv from the switch module error." ));
153+
154+ switch (dense_cpu_tensor->dtype ()) {
155+ case DataType::FLOAT32:
156+ _do_add<float >(reinterpret_cast <float *>(dense_cpu_tensor->data ()),
157+ reinterpret_cast <float *>(dense_cpu_tensor2->data ()),
158+ dense_cpu_tensor->numel ());
159+ break ;
160+ case DataType::FLOAT64:
161+ _do_add<double >(
162+ reinterpret_cast <double *>(dense_cpu_tensor->data ()),
163+ reinterpret_cast <double *>(dense_cpu_tensor2->data ()),
164+ dense_cpu_tensor->numel ());
165+ break ;
166+ case DataType::INT32:
167+ _do_add<int >(reinterpret_cast <int *>(dense_cpu_tensor->data ()),
168+ reinterpret_cast <int *>(dense_cpu_tensor2->data ()),
169+ dense_cpu_tensor->numel ());
170+ break ;
171+ default :
172+ PADDLE_THROW (platform::errors::PreconditionNotMet (
173+ " Unsupported data type (%s) to do add." ,
174+ framework::DataType2String (dense_cpu_tensor->dtype ())));
175+ }
176+ }
109177 } else {
110178 auto gloo_task = inter_pg_->AllReduce (cpu_tensors, opts);
111179 gloo_task->Wait ();
112180 }
113181 // Step4: copy cpu tensors to gpu
114- // TODO(sandyhouse)
115182 // copy cpu tensors to gpu
116183 for (size_t i = 0 ; i < tensors.size (); i++) {
117184 auto dense_gpu_tensor =
118185 std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl ());
119186 auto dense_cpu_tensor =
120187 std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl ());
121- // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
122- // dense_gpu_tensor.get());
123188 framework::TensorCopySync (*dense_cpu_tensor, dense_cpu_tensor->place (),
124189 dense_gpu_tensor.get ());
125190 }
@@ -147,18 +212,57 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
147212 inner_pg_->Broadcast (tensors, b_opts);
148213
149214 if (local_rank_ == 0 ) {
150- std::vector<Tensor> cpu_tensors (tensors.size ());
215+ std::vector<Tensor> cpu_tensors;
216+ cpu_tensors.reserve (tensors.size ());
151217 for (size_t i = 0 ; i < tensors.size (); i++) {
152218 auto dense_gpu_tensor =
153219 std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl ());
154- auto dense_cpu_tensor =
155- std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl ());
156- dense_cpu_tensor->Resize (tensors[i].dims ());
220+ phi::DenseTensorMeta meta = phi::DenseTensorMeta (
221+ dense_gpu_tensor->dtype (), dense_gpu_tensor->dims ());
222+ std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
223+ std::make_shared<phi::DenseTensor>(
224+ std::make_unique<paddle::experimental::DefaultAllocator>(
225+ paddle::platform::CPUPlace ())
226+ .get (),
227+ meta);
228+ dense_cpu_tensor->ResizeAndAllocate (dense_gpu_tensor->dims ());
229+ cpu_tensors[i] = paddle::experimental::Tensor (dense_cpu_tensor);
157230 framework::TensorCopySync (*dense_gpu_tensor, platform::CPUPlace (),
158231 dense_cpu_tensor.get ());
159232 }
160233 if (with_switch_) {
161- // TODO(sandyhouse) send to and recv
234+ if (local_rank_ == 0 ) {
235+ HeterClient* client_ =
236+ HeterClient::GetInstance ({switch_endpoint_}, {}, 0 ).get ();
237+ auto dense_cpu_tensor =
238+ std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[0 ].impl ());
239+ if (gloo_rank_ == 0 ) {
240+ std::vector<int > send_size;
241+ send_size.push_back (dense_cpu_tensor->numel ());
242+ int ret = client_->Send (
243+ gid_, {dense_cpu_tensor->name ()}, send_size,
244+ dense_cpu_tensor->data (),
245+ dense_cpu_tensor->numel () *
246+ framework::DataTypeSize (dense_cpu_tensor->dtype ()));
247+ PADDLE_ENFORCE_EQ (ret, 0 , platform::errors::PreconditionNotMet (
248+ " Send to the switch module error." ));
249+ } else {
250+ int ret = client_->Recv (
251+ gid_, {dense_cpu_tensor->name ()}, dense_cpu_tensor->data (),
252+ dense_cpu_tensor->numel () *
253+ framework::DataTypeSize (dense_cpu_tensor->dtype ()));
254+ PADDLE_ENFORCE_EQ (ret, 0 ,
255+ platform::errors::PreconditionNotMet (
256+ " Receive from the switch module error." ));
257+ ret = client_->Recv (
258+ gid_, {dense_cpu_tensor->name ()}, dense_cpu_tensor->data (),
259+ dense_cpu_tensor->numel () *
260+ framework::DataTypeSize (dense_cpu_tensor->dtype ()));
261+ PADDLE_ENFORCE_EQ (ret, 0 ,
262+ platform::errors::PreconditionNotMet (
263+ " Receive from the switch module error." ));
264+ }
265+ }
162266 } else {
163267 auto gloo_task = inter_pg_->Broadcast (cpu_tensors, opts);
164268 gloo_task->Wait ();
@@ -168,8 +272,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
168272 std::dynamic_pointer_cast<phi::DenseTensor>(tensors[i].impl ());
169273 auto dense_cpu_tensor =
170274 std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensors[i].impl ());
171- // framework::TensorCopySync(*dense_cpu_tensor, tensors[i].place(),
172- // dense_gpu_tensor.get());
173275 framework::TensorCopySync (*dense_cpu_tensor, dense_cpu_tensor->place (),
174276 dense_gpu_tensor.get ());
175277 }
@@ -185,22 +287,44 @@ void ProcessGroupHeter::Broadcast(const phi::DenseTensor* in,
185287 inner_pg_->Broadcast (in, out);
186288
187289 if (local_rank_ == 0 ) {
188- Tensor cpu_tensor;
189- auto dense_cpu_tensor =
190- std::dynamic_pointer_cast<phi::DenseTensor>(cpu_tensor.impl ());
191- dense_cpu_tensor->Resize (in->dims ());
290+ phi::DenseTensorMeta meta = phi::DenseTensorMeta (in->dtype (), in->dims ());
291+ std::shared_ptr<phi::DenseTensor> dense_cpu_tensor =
292+ std::make_shared<phi::DenseTensor>(
293+ std::make_unique<paddle::experimental::DefaultAllocator>(
294+ paddle::platform::CPUPlace ())
295+ .get (),
296+ meta);
297+ dense_cpu_tensor->ResizeAndAllocate (in->dims ());
298+ Tensor cpu_tensor = paddle::experimental::Tensor (dense_cpu_tensor);
192299 framework::TensorCopySync (*in, platform::CPUPlace (),
193300 dense_cpu_tensor.get ());
194301 if (with_switch_) {
195- // TODO(sandyhouse) send to and recv
302+ if (local_rank_ == 0 ) {
303+ HeterClient* client_ =
304+ HeterClient::GetInstance ({switch_endpoint_}, {}, 0 ).get ();
305+ if (gloo_rank_ == 0 ) {
306+ std::vector<int > send_size;
307+ send_size.push_back (in->numel ());
308+ int ret = client_->Send (
309+ gid_, {in->name ()}, send_size, dense_cpu_tensor->data (),
310+ in->numel () * framework::DataTypeSize (in->dtype ()));
311+ PADDLE_ENFORCE_EQ (ret, 0 , platform::errors::PreconditionNotMet (
312+ " Send to the switch module error." ));
313+ } else {
314+ int ret =
315+ client_->Recv (gid_, {in->name ()}, dense_cpu_tensor->data (),
316+ in->numel () * framework::DataTypeSize (in->dtype ()));
317+ PADDLE_ENFORCE_EQ (ret, 0 ,
318+ platform::errors::PreconditionNotMet (
319+ " Receive from the switch module error." ));
320+ }
321+ }
196322 } else {
197323 std::vector<Tensor> cpu_tensors = {cpu_tensor};
198- // auto gloo_task = inter_pg_->Broadcast(cpu_tensors);
199- // gloo_task->Wait();
200- inter_pg_->Broadcast (cpu_tensors);
324+ auto gloo_task = inter_pg_->Broadcast (cpu_tensors);
325+ gloo_task->Wait ();
201326 }
202- framework::TensorCopySync (*dense_cpu_tensor, dense_cpu_tensor->place (),
203- out);
327+ framework::TensorCopySync (*dense_cpu_tensor, out->place (), out);
204328 }
205329 inner_pg_->Broadcast (out, out);
206330}
0 commit comments