Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion paddle/fluid/distributed/fleet_executor/carrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class Carrier final {
Interceptor* SetInterceptor(int64_t interceptor_id,
std::unique_ptr<Interceptor>);

void SetCreatingFlag(bool flag) {}
void SetMsgBus(const std::shared_ptr<MessageBus>& msg_bus) {
msg_bus_ = msg_bus;
}
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -66,6 +67,8 @@ class TaskLoop {
}

private:
DISABLE_COPY_AND_ASSIGN(TaskLoop);

void AbortNotInLoopThread();

static thread_local TaskLoop* thread_local_loop_;
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_loop_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <mutex>
#include <thread>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {

Expand All @@ -31,6 +33,8 @@ class TaskLoopThread {
TaskLoop* StartLoop();

private:
DISABLE_COPY_AND_ASSIGN(TaskLoopThread);

void Loop();

bool start_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <memory>
#include <vector>

#include "paddle/fluid/platform/macros.h"

namespace paddle {
namespace distributed {

Expand All @@ -37,6 +39,8 @@ class TaskLoopThreadPool {
std::vector<TaskLoop*> GetAllLoops();

private:
DISABLE_COPY_AND_ASSIGN(TaskLoopThreadPool);

bool start_;
int thread_num_;
std::vector<std::unique_ptr<TaskLoopThread>> threads_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ TEST(ComputeInterceptor, Compute) {
a->SetPlace(place);
a->SetMicroBatchScope(scopes);

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ TEST(ComputeInterceptor, Compute) {
carrier.SetInterceptor(1, InterceptorFactory::Create("Compute", 1, node_b));
carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c));

carrier.SetCreatingFlag(false);

InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
// test run three times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ TEST(InterceptorTest, PingPong) {
0, InterceptorFactory::Create("PingPong", 0, nullptr));

carrier.SetInterceptor(1, std::make_unique<PingPongInterceptor>(1, nullptr));
carrier.SetCreatingFlag(false);

InterceptorMessage msg;
a->Send(1, msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ TEST(InterceptorTest, PingPong) {
if (pid == 0) {
Carrier* carrier =
FleetExecutor::CreateCarrier(0, interceptor_id_to_rank);
carrier->SetCreatingFlag(false);
auto msg_bus = std::make_shared<MessageBus>();
carrier->SetMsgBus(msg_bus);
// NOTE: need Init msg_bus after carrier SetMsgBus
Expand All @@ -128,7 +127,6 @@ TEST(InterceptorTest, PingPong) {
} else {
Carrier* carrier =
FleetExecutor::CreateCarrier(1, interceptor_id_to_rank);
carrier->SetCreatingFlag(false);
auto msg_bus = std::make_shared<MessageBus>();
carrier->SetMsgBus(msg_bus);
msg_bus->Init(1, {{0, ip0}, {1, ip1}}, ip1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ TEST(AmplifierInterceptor, Amplifier) {
carrier.SetInterceptor(4, InterceptorFactory::Create("Amplifier", 4, node_e));
carrier.SetInterceptor(5, InterceptorFactory::Create("Compute", 5, node_f));

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ TEST(AmplifierInterceptor, Amplifier) {
carrier.SetInterceptor(2, InterceptorFactory::Create("Compute", 2, node_c));
carrier.SetInterceptor(3, InterceptorFactory::Create("Amplifier", 3, node_d));

carrier.SetCreatingFlag(false);

// start
InterceptorMessage msg;
msg.set_message_type(DATA_IS_READY);
Expand Down