-
Notifications
You must be signed in to change notification settings - Fork 5.9k
listen_and_serv_op support async update #10042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
listen_and_serv_op support async update #10042
Conversation
… add-async-listen-and-serv-op
…qiao/Paddle into add-async-listen-and-serv-op
…qiao/Paddle into add-async-listen-and-serv-op
… add-async-listen-and-serv-op
…o/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
…om/jacquesqiao/Paddle into add-async-listen-and-serv-op
… add-async-listen-and-serv-op
… add-async-listen-and-serv-op
| std::unordered_map<std::string, int32_t> grad_to_id; | ||
| std::unordered_map<int32_t, std::string> id_to_grad; | ||
|
|
||
| auto grad_to_id_str = Attr<std::vector<std::string>>("grad_to_id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_id_str can be generated by listen_and_serv_op when initializing, read the ProgramDesc blocks and create a mapping, so we can save this attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think grad_to_id_str should be created in Python by transpiler because the transpile logic know how to split the operator and block, listen_and_serv_op just use the result is fine, or it has to understand the detailed logic of transpiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Firstly we want to make listen_and_serv_op general, that means it should not know that attributes and inputs are "grads" or parameters, it should simply receive the data and run a block.
In that case, for Async Execution, listen_and_serv is responsible to determine which block need to run when the data arrives. Just open for discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @typhoonzero, I get the point, I totally agree with the idea that listen_and_serv_op should be a general operator! We will find a better way to implement async update in the future PRs.
| public: | ||
| explicit RequestBase(GrpcService::AsyncService* service, | ||
| ::grpc::ServerCompletionQueue* cq, | ||
| ::grpc::ServerCompletionQueue* cq, bool sync_mode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe is_sync or just sync can tell the meaning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sync_mode means it works in a mode, but is_sync means itself is async. So I think sync_mode is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
| auto optimize_prepared = executor->Prepare(*program, block_list); | ||
| std::unordered_map<std::string, | ||
| std::shared_ptr<framework::ExecutorPrepareContext>> | ||
| grad_to_prepared; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_prepared_block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| LOG(ERROR) << "run sub program error " << e.what(); | ||
| } | ||
| }); | ||
| // TODO(qiao) maybe we can remove this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing this means more "async" mode, trainer even doesn't know whether the sent gradient is updated to the server side weights before it gets the latest weights. Or do you mean by letting updates to different weights become parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation will update gradients in sequence if we keep this wait. This may influence the effect, I will do some test on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @typhoonzero , we think that each gradient should be put to an independent block queue to ensure that they are updated without conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we think that each gradient should be put to an independent block queue
Do you mean each gradient of one parameter, such as grad_w1(trainer0), grad_w1(trainer1), grad_w2(trainer0), we put grad_w1(trainer0) and grad_w1(trainer1) into a queue, and grad_w2(trainer0) into another one?
According to the design doc, maybe we need multiple BlockingQueues so that each parameter can own one of them to implement a lock of updating parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we need multiple block queue, each will store gradients for on parameters, but we do not need to add a lock, because the queue will block until the optimize block is finished.
| queue_(queue), | ||
| responder_(&ctx_) { | ||
| if (sync_mode_) { | ||
| request_.reset(new VariableResponse(scope, dev_ctx_, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request_.reset(new VariableResponse(
scope,
dev_ctx_,
!sync_mode_ // create_scope
));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought a while here, and think the current code is easier for user understand the intent.
| int Parse(const ::grpc::ByteBuffer& byte_buffer); | ||
|
|
||
| const framework::Scope& GetLocalScope() const { return *local_scope_; } | ||
| framework::Scope& GetLocalScope() const { return *local_scope_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider GetMutableLocalScope that returns a pointer and avoid removing the const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| .SetDefault("127.0.0.1:6164") | ||
| .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); | ||
| AddAttr<std::vector<std::string>>( | ||
| "grad_to_id", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grad_to_block_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| trainers=1, | ||
| split_method=splitter.round_robin): | ||
| split_method=splitter.round_robin, | ||
| sync_mode=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
… add-async-listen-and-serv-op
| AsyncExecuteBlock(executor, grad_to_prepared_block[recv_var_name].get(), | ||
| v.second->GetMutableLocalScope()); | ||
| // TODO(qiao): explain why | ||
| if (var->IsType<framework::SelectedRows>()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need to clear the rows, because of each gradient var is in a new scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great suggestion! removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It LGTM now, does python wrapping in io.py need to be updated in this PR or later?
|
@typhoonzero the CI is too slow, I will give another PR to fix the io.py. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++
fix: #9997