Skip to content

Commit fc1d21a

Browse files
authored
【Hackathon 6th Fundable Projects 4 No.5】remove OpHandle (#65340)
* test rm multi_devices_graph_pass * test rm fuse_all_reduce_op_pass * remove grad_merge_all_reduce_op_handle
1 parent e3d2a66 commit fc1d21a

23 files changed

+35
-2247
lines changed

paddle/fluid/framework/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -889,8 +889,7 @@ cc_library(
889889
eager_deletion_pass
890890
buffer_shared_inplace_op_pass
891891
buffer_shared_cross_op_memory_reuse_pass
892-
inplace_addto_op_pass
893-
set_reader_device_info_utils)
892+
inplace_addto_op_pass)
894893

895894
cc_library(
896895
executor_cache

paddle/fluid/framework/compiled_program.cc

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ limitations under the License. */
2929
#include "paddle/fluid/framework/ir/graph_helper.h"
3030
#include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h"
3131
#include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h"
32-
#include "paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h"
32+
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
3333

3434
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
3535
#include "paddle/fluid/platform/cuda_device_guard.h"
@@ -49,6 +49,10 @@ namespace framework {
4949
std::once_flag p2p_init_flag;
5050
#endif
5151

52+
static std::unordered_set<std::string> ReaderOpSet() {
53+
return {"create_py_reader"};
54+
}
55+
5256
class CompiledProgramPrivate {
5357
public:
5458
CompiledProgramPrivate(const std::vector<platform::Place> &places,
@@ -559,7 +563,7 @@ CompiledProgram::CompiledProgram(const std::vector<platform::Place> &places,
559563
platform::errors::Unavailable(
560564
"NPU is not supported in CompiledProgram."));
561565
InitP2P(places);
562-
ir::InitReaderQueueDeviceCount(
566+
InitReaderQueueDeviceCount(
563567
graph, *(member_->global_scope_), member_->places_.size());
564568
// Initialize necessary info of member_ with strategy.
565569
InitProgramPrivateMemberInfo(build_strategy, places.size());
@@ -807,6 +811,28 @@ void CompiledProgram::InitProgramPrivateMemberInfo(
807811
"please use CPU/CUDA/XPU backend."));
808812
}
809813
}
814+
815+
void CompiledProgram::InitReaderQueueDeviceCount(ir::Graph *graph,
816+
const Scope &scope,
817+
size_t dev_cnt) {
818+
using QueueHolder =
819+
operators::reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder;
820+
821+
auto reader_ops = ReaderOpSet();
822+
for (auto &node : graph->Nodes()) {
823+
if (node->IsOp() && node->Op() &&
824+
reader_ops.count(node->Op()->Type()) != 0) {
825+
auto queue_name = node->Op()->Input("blocking_queue")[0];
826+
auto var = scope.FindVar(queue_name);
827+
if (var && var->IsType<QueueHolder>()) {
828+
VLOG(10) << "Set device count of " << queue_name << " to be "
829+
<< dev_cnt;
830+
var->GetMutable<QueueHolder>()->GetQueue()->SetDeviceCount(dev_cnt);
831+
}
832+
}
833+
}
834+
}
835+
810836
void CompiledProgram::CreateLocalScopes(
811837
Scope *global_scope,
812838
const std::vector<Scope *> &local_scopes,

paddle/fluid/framework/compiled_program.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class CompiledProgram {
6666
void InitProgramPrivateMemberInfo(const BuildStrategy &build_strategy,
6767
size_t device_count);
6868

69+
void InitReaderQueueDeviceCount(ir::Graph *graph,
70+
const Scope &scope,
71+
size_t dev_cnt);
72+
6973
void CreateLocalScopes(Scope *global_scope,
7074
const std::vector<Scope *> &local_scopes,
7175
bool create_new);

paddle/fluid/framework/details/CMakeLists.txt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,12 @@ set(op_handle_srcs
1313
nan_inf_utils_detail.cc
1414
all_reduce_op_handle.cc
1515
fused_all_reduce_op_handle.cc
16-
grad_merge_all_reduce_op_handle.cc
1716
reduce_op_handle.cc
1817
broadcast_op_handle.cc
1918
fused_broadcast_op_handle.cc
2019
var_handle.cc
2120
op_handle_base.cc
2221
scale_loss_grad_op_handle.cc
23-
fetch_op_handle.cc
24-
fetch_async_op_handle.cc
2522
share_tensor_buffer_functor.cc
2623
computation_op_handle.cc
2724
share_tensor_buffer_op_handle.cc
@@ -78,21 +75,16 @@ add_dependencies(detail_op_handle framework_proto auto_parallel_proto xxhash)
7875
set(IR_PASS_DEPS
7976
graph_viz_pass
8077
multi_devices_graph_pass
81-
multi_devices_graph_print_pass
82-
multi_devices_graph_check_pass
8378
fuse_elewise_add_act_pass
8479
fuse_bn_act_pass
8580
fuse_bn_add_act_pass
8681
multi_batch_merge_pass
8782
fuse_relu_depthwise_conv_pass
8883
lock_free_optimize_pass
8984
sequential_execution_pass
90-
all_reduce_deps_pass
9185
add_reader_dependency_pass
9286
modify_op_lock_and_record_event_pass
9387
coalesce_grad_tensor_pass
94-
fuse_all_reduce_op_pass
95-
backward_optimizer_op_deps_pass
9688
fuse_adam_op_pass
9789
fuse_sgd_op_pass
9890
fuse_momentum_op_pass

paddle/fluid/framework/details/build_strategy.cc

100644100755
Lines changed: 0 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
7171

7272
AppendAddReaderDependencyPass();
7373
AppendMultiDevPass();
74-
AppendMultiGraphOptPasses();
75-
7674
AppendPassToSetMkldnnAttr("onednn_placement_pass");
7775
// runtime_context_cache pass should be the last pass to enable the attr of
7876
// all original and fused operators. But no operators can be enabled this
@@ -81,8 +79,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
8179
"runtime_context_cache_pass");
8280
AppendPassWithCheck(strategy_.remove_unnecessary_lock_,
8381
"modify_op_lock_and_record_event_pass");
84-
// Note: This pass is used to check whether the multi_device_graph is right.
85-
AppendPass("multi_devices_check_pass");
8682

8783
SetCollectiveContext();
8884
}
@@ -144,29 +140,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
144140
}
145141
}
146142

147-
void AppendMultiGraphOptPasses() {
148-
// NOTE: fuse_all_reduce_ops will count the number of all_reduce operator
149-
// first, if the number is zero, fuse_all_reduce_ops will do nothing.
150-
AppendPassWithCheck(strategy_.fuse_all_reduce_ops_,
151-
"fuse_all_reduce_op_pass");
152-
AppendPrintGraphPass("multi_devices_print_pass", "_multi_devices_graph");
153-
154-
// experimental shows that the program will be faster if append
155-
// all_reduce_deps_pass here.
156-
bool append_all_reduce_deps_pass =
157-
!strategy_.enable_parallel_graph_ &&
158-
(SeqOnlyAllReduceOps(strategy_) ||
159-
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce);
160-
AppendPassWithCheck(append_all_reduce_deps_pass, "all_reduce_deps_pass");
161-
162-
bool append_backward_optimizer_op_deps_pass =
163-
strategy_.num_trainers_ > 1 && !strategy_.async_mode_ &&
164-
!strategy_.is_distribution_ &&
165-
strategy_.enable_backward_optimizer_op_deps_;
166-
AppendPassWithCheck(append_backward_optimizer_op_deps_pass,
167-
"backward_optimizer_op_deps_pass");
168-
}
169-
170143
void AppendOpFusePasses() {
171144
// 1. infernce pass if enabled.
172145
AppendPassWithCheck(
@@ -279,7 +252,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
279252
multi_devices_pass->SetNotOwned<const BuildStrategy>("strategy",
280253
&strategy_);
281254
}
282-
283255
void AppendPrintGraphPass(const std::string &pass_name,
284256
const std::string &debug_file_suffix) {
285257
if (!strategy_.debug_graphviz_path_.empty()) {
@@ -391,66 +363,13 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
391363
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
392364
pass->Erase(kBKCLCtxs);
393365
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, bkcl_ctx);
394-
#endif
395-
} else if (pass->Type() == "fuse_all_reduce_op_pass") {
396-
pass->Erase(kNRanks);
397-
pass->Set<size_t>(kNRanks, new size_t(nranks));
398-
pass->Erase(kPlaces);
399-
pass->SetNotOwned<const std::vector<platform::Place>>(kPlaces, &places);
400-
pass->Erase(kLocalScopes);
401-
pass->SetNotOwned<const std::vector<Scope *>>(kLocalScopes,
402-
&local_scopes);
403-
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
404-
platform::NCCLCommunicator *nctx =
405-
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
406-
pass->Erase(kNCCLCtxs);
407-
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
408-
pass->Erase(kUseHierarchicalAllReduce);
409-
pass->Set<bool>(kUseHierarchicalAllReduce,
410-
new bool(use_hierarchical_allreduce_));
411-
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
412-
platform::BKCLCommunicator *nctx =
413-
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
414-
pass->Erase(kBKCLCtxs);
415-
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
416-
pass->Erase(kUseHierarchicalAllReduce);
417-
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_,
418-
false,
419-
platform::errors::Unimplemented(
420-
"xpu doesn't support hierarchical_allreduce"));
421-
pass->Set<bool>(kUseHierarchicalAllReduce,
422-
new bool(use_hierarchical_allreduce_));
423366
#endif
424367
} else if (pass->Type() == "coalesce_grad_tensor_pass") {
425368
pass->Erase(kNRanks);
426369
pass->Set<size_t>(kNRanks, new size_t(nranks));
427370
} else if (pass->Type() == "sequential_execution_pass") {
428371
LOG(INFO) << "set enable_sequential_execution:"
429372
<< enable_sequential_execution_;
430-
} else if (pass->Type() == "all_reduce_deps_pass") {
431-
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
432-
platform::NCCLCommunicator *nctx =
433-
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
434-
pass->Erase(kNCCLCtxs);
435-
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
436-
pass->Erase(kUseHierarchicalAllReduce);
437-
pass->Set<bool>(kUseHierarchicalAllReduce,
438-
new bool(use_hierarchical_allreduce_));
439-
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
440-
platform::BKCLCommunicator *nctx =
441-
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
442-
pass->Erase(kBKCLCtxs);
443-
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
444-
pass->Erase(kUseHierarchicalAllReduce);
445-
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_,
446-
false,
447-
platform::errors::Unimplemented(
448-
"xpu doesn't support hierarchical_allreduce"));
449-
pass->Set<bool>(kUseHierarchicalAllReduce,
450-
new bool(use_hierarchical_allreduce_));
451-
#endif
452-
VLOG(1) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this)
453-
<< ", num_trainers:" << num_trainers_;
454373
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
455374
if (use_device != p::kCUDA) {
456375
VLOG(1) << "fuse_relu_depthwise_conv_pass is only supported on "
@@ -478,12 +397,6 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
478397
} else if (pass->Type() == "onednn_placement_pass") {
479398
pass->Set("mkldnn_enabled_op_types",
480399
new std::unordered_set<std::string>(mkldnn_enabled_op_types_));
481-
} else if (pass->Type() == "backward_optimizer_op_deps_pass") {
482-
if (use_device != p::kCUDA) {
483-
VLOG(1) << "backward_optimizer_op_deps_pass is only supported on "
484-
"GPU, skipped.";
485-
continue;
486-
}
487400
}
488401
VLOG(1) << "Start Apply Pass " << pass->Type();
489402
if (FLAGS_convert_all_blocks) {
@@ -513,19 +426,14 @@ USE_PASS(no_reduce_multi_devices_pass);
513426
USE_PASS(reduce_mode_multi_devices_pass);
514427
USE_PASS(all_reduce_mode_multi_devices_pass);
515428
USE_PASS(dist_multi_devices_pass);
516-
USE_PASS(multi_devices_check_pass);
517-
USE_PASS(multi_devices_print_pass);
518429
USE_PASS(sequential_execution_pass);
519-
USE_PASS(all_reduce_deps_pass);
520-
USE_PASS(backward_optimizer_op_deps_pass);
521430
USE_PASS(modify_op_lock_and_record_event_pass);
522431
USE_PASS(lock_free_optimize_pass);
523432
USE_PASS(coalesce_grad_tensor_pass);
524433
USE_PASS(graph_to_program_pass);
525434
USE_PASS(fuse_adam_op_pass);
526435
USE_PASS(fuse_sgd_op_pass);
527436
USE_PASS(fuse_momentum_op_pass);
528-
USE_PASS(fuse_all_reduce_op_pass);
529437
USE_PASS(runtime_context_cache_pass);
530438
USE_PASS(add_reader_dependency_pass);
531439
USE_PASS(delete_dropout_op_x_pass);

0 commit comments

Comments
 (0)