Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,7 @@ cc_library(
eager_deletion_pass
buffer_shared_inplace_op_pass
buffer_shared_cross_op_memory_reuse_pass
inplace_addto_op_pass
set_reader_device_info_utils)
inplace_addto_op_pass)

cc_library(
executor_cache
Expand Down
30 changes: 28 additions & 2 deletions paddle/fluid/framework/compiled_program.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ limitations under the License. */
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h"
#include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h"
#include "paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h"
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"

#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
#include "paddle/fluid/platform/cuda_device_guard.h"
Expand All @@ -49,6 +49,10 @@ namespace framework {
std::once_flag p2p_init_flag;
#endif

static std::unordered_set<std::string> ReaderOpSet() {
return {"create_py_reader"};
}

class CompiledProgramPrivate {
public:
CompiledProgramPrivate(const std::vector<platform::Place> &places,
Expand Down Expand Up @@ -559,7 +563,7 @@ CompiledProgram::CompiledProgram(const std::vector<platform::Place> &places,
platform::errors::Unavailable(
"NPU is not supported in CompiledProgram."));
InitP2P(places);
ir::InitReaderQueueDeviceCount(
InitReaderQueueDeviceCount(
graph, *(member_->global_scope_), member_->places_.size());
// Initialize necessary info of member_ with strategy.
InitProgramPrivateMemberInfo(build_strategy, places.size());
Expand Down Expand Up @@ -807,6 +811,28 @@ void CompiledProgram::InitProgramPrivateMemberInfo(
"please use CPU/CUDA/XPU backend."));
}
}

void CompiledProgram::InitReaderQueueDeviceCount(ir::Graph *graph,
const Scope &scope,
size_t dev_cnt) {
using QueueHolder =
operators::reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder;

auto reader_ops = ReaderOpSet();
for (auto &node : graph->Nodes()) {
if (node->IsOp() && node->Op() &&
reader_ops.count(node->Op()->Type()) != 0) {
auto queue_name = node->Op()->Input("blocking_queue")[0];
auto var = scope.FindVar(queue_name);
if (var && var->IsType<QueueHolder>()) {
VLOG(10) << "Set device count of " << queue_name << " to be "
<< dev_cnt;
var->GetMutable<QueueHolder>()->GetQueue()->SetDeviceCount(dev_cnt);
}
}
}
}

void CompiledProgram::CreateLocalScopes(
Scope *global_scope,
const std::vector<Scope *> &local_scopes,
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/framework/compiled_program.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class CompiledProgram {
void InitProgramPrivateMemberInfo(const BuildStrategy &build_strategy,
size_t device_count);

void InitReaderQueueDeviceCount(ir::Graph *graph,
const Scope &scope,
size_t dev_cnt);

void CreateLocalScopes(Scope *global_scope,
const std::vector<Scope *> &local_scopes,
bool create_new);
Expand Down
8 changes: 0 additions & 8 deletions paddle/fluid/framework/details/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ set(op_handle_srcs
nan_inf_utils_detail.cc
all_reduce_op_handle.cc
fused_all_reduce_op_handle.cc
grad_merge_all_reduce_op_handle.cc
reduce_op_handle.cc
broadcast_op_handle.cc
fused_broadcast_op_handle.cc
var_handle.cc
op_handle_base.cc
scale_loss_grad_op_handle.cc
fetch_op_handle.cc
fetch_async_op_handle.cc
share_tensor_buffer_functor.cc
computation_op_handle.cc
share_tensor_buffer_op_handle.cc
Expand Down Expand Up @@ -78,21 +75,16 @@ add_dependencies(detail_op_handle framework_proto auto_parallel_proto xxhash)
set(IR_PASS_DEPS
graph_viz_pass
multi_devices_graph_pass
multi_devices_graph_print_pass
multi_devices_graph_check_pass
fuse_elewise_add_act_pass
fuse_bn_act_pass
fuse_bn_add_act_pass
multi_batch_merge_pass
fuse_relu_depthwise_conv_pass
lock_free_optimize_pass
sequential_execution_pass
all_reduce_deps_pass
add_reader_dependency_pass
modify_op_lock_and_record_event_pass
coalesce_grad_tensor_pass
fuse_all_reduce_op_pass
backward_optimizer_op_deps_pass
fuse_adam_op_pass
fuse_sgd_op_pass
fuse_momentum_op_pass
Expand Down
92 changes: 0 additions & 92 deletions paddle/fluid/framework/details/build_strategy.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {

AppendAddReaderDependencyPass();
AppendMultiDevPass();
AppendMultiGraphOptPasses();

AppendPassToSetMkldnnAttr("onednn_placement_pass");
// runtime_context_cache pass should be the last pass to enable the attr of
// all original and fused operators. But no operators can be enabled this
Expand All @@ -81,8 +79,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
"runtime_context_cache_pass");
AppendPassWithCheck(strategy_.remove_unnecessary_lock_,
"modify_op_lock_and_record_event_pass");
// Note: This pass is used to check whether the multi_device_graph is right.
AppendPass("multi_devices_check_pass");

SetCollectiveContext();
}
Expand Down Expand Up @@ -144,29 +140,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
}
}

void AppendMultiGraphOptPasses() {
// NOTE: fuse_all_reduce_ops will count the number of all_reduce operator
// first, if the number is zero, fuse_all_reduce_ops will do nothing.
AppendPassWithCheck(strategy_.fuse_all_reduce_ops_,
"fuse_all_reduce_op_pass");
AppendPrintGraphPass("multi_devices_print_pass", "_multi_devices_graph");

// experimental shows that the program will be faster if append
// all_reduce_deps_pass here.
bool append_all_reduce_deps_pass =
!strategy_.enable_parallel_graph_ &&
(SeqOnlyAllReduceOps(strategy_) ||
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce);
AppendPassWithCheck(append_all_reduce_deps_pass, "all_reduce_deps_pass");

bool append_backward_optimizer_op_deps_pass =
strategy_.num_trainers_ > 1 && !strategy_.async_mode_ &&
!strategy_.is_distribution_ &&
strategy_.enable_backward_optimizer_op_deps_;
AppendPassWithCheck(append_backward_optimizer_op_deps_pass,
"backward_optimizer_op_deps_pass");
}

void AppendOpFusePasses() {
// 1. infernce pass if enabled.
AppendPassWithCheck(
Expand Down Expand Up @@ -279,7 +252,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
multi_devices_pass->SetNotOwned<const BuildStrategy>("strategy",
&strategy_);
}

void AppendPrintGraphPass(const std::string &pass_name,
const std::string &debug_file_suffix) {
if (!strategy_.debug_graphviz_path_.empty()) {
Expand Down Expand Up @@ -391,66 +363,13 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, bkcl_ctx);
#endif
} else if (pass->Type() == "fuse_all_reduce_op_pass") {
pass->Erase(kNRanks);
pass->Set<size_t>(kNRanks, new size_t(nranks));
pass->Erase(kPlaces);
pass->SetNotOwned<const std::vector<platform::Place>>(kPlaces, &places);
pass->Erase(kLocalScopes);
pass->SetNotOwned<const std::vector<Scope *>>(kLocalScopes,
&local_scopes);
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
platform::NCCLCommunicator *nctx =
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
pass->Erase(kNCCLCtxs);
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
platform::BKCLCommunicator *nctx =
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_,
false,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical_allreduce"));
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#endif
} else if (pass->Type() == "coalesce_grad_tensor_pass") {
pass->Erase(kNRanks);
pass->Set<size_t>(kNRanks, new size_t(nranks));
} else if (pass->Type() == "sequential_execution_pass") {
LOG(INFO) << "set enable_sequential_execution:"
<< enable_sequential_execution_;
} else if (pass->Type() == "all_reduce_deps_pass") {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
platform::NCCLCommunicator *nctx =
(use_device == p::kCUDA) ? nccl_ctxs : nullptr;
pass->Erase(kNCCLCtxs);
pass->SetNotOwned<platform::NCCLCommunicator>(kNCCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#elif defined(PADDLE_WITH_XPU) && defined(PADDLE_WITH_XPU_BKCL)
platform::BKCLCommunicator *nctx =
(use_device == p::kXPU) ? bkcl_ctxs : nullptr;
pass->Erase(kBKCLCtxs);
pass->SetNotOwned<platform::BKCLCommunicator>(kBKCLCtxs, nctx);
pass->Erase(kUseHierarchicalAllReduce);
PADDLE_ENFORCE_EQ(use_hierarchical_allreduce_,
false,
platform::errors::Unimplemented(
"xpu doesn't support hierarchical_allreduce"));
pass->Set<bool>(kUseHierarchicalAllReduce,
new bool(use_hierarchical_allreduce_));
#endif
VLOG(1) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this)
<< ", num_trainers:" << num_trainers_;
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (use_device != p::kCUDA) {
VLOG(1) << "fuse_relu_depthwise_conv_pass is only supported on "
Expand Down Expand Up @@ -478,12 +397,6 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
} else if (pass->Type() == "onednn_placement_pass") {
pass->Set("mkldnn_enabled_op_types",
new std::unordered_set<std::string>(mkldnn_enabled_op_types_));
} else if (pass->Type() == "backward_optimizer_op_deps_pass") {
if (use_device != p::kCUDA) {
VLOG(1) << "backward_optimizer_op_deps_pass is only supported on "
"GPU, skipped.";
continue;
}
}
VLOG(1) << "Start Apply Pass " << pass->Type();
if (FLAGS_convert_all_blocks) {
Expand Down Expand Up @@ -513,19 +426,14 @@ USE_PASS(no_reduce_multi_devices_pass);
USE_PASS(reduce_mode_multi_devices_pass);
USE_PASS(all_reduce_mode_multi_devices_pass);
USE_PASS(dist_multi_devices_pass);
USE_PASS(multi_devices_check_pass);
USE_PASS(multi_devices_print_pass);
USE_PASS(sequential_execution_pass);
USE_PASS(all_reduce_deps_pass);
USE_PASS(backward_optimizer_op_deps_pass);
USE_PASS(modify_op_lock_and_record_event_pass);
USE_PASS(lock_free_optimize_pass);
USE_PASS(coalesce_grad_tensor_pass);
USE_PASS(graph_to_program_pass);
USE_PASS(fuse_adam_op_pass);
USE_PASS(fuse_sgd_op_pass);
USE_PASS(fuse_momentum_op_pass);
USE_PASS(fuse_all_reduce_op_pass);
USE_PASS(runtime_context_cache_pass);
USE_PASS(add_reader_dependency_pass);
USE_PASS(delete_dropout_op_x_pass);
Expand Down
Loading