Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 47 additions & 29 deletions paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,63 @@ void HashTable<KeyType, ValType>::insert(const KeyType* d_keys, size_t len,
template <typename KeyType, typename ValType>
void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
container_->prefetch(cudaCpuDeviceId, stream);
std::vector<std::thread> threads;
size_t num = container_->size();
KeyType unuse_key = std::numeric_limits<KeyType>::max();
thrust::pair<KeyType, ValType>* kv = container_->data();
for (size_t i = 0; i < num; ++i) {
if (kv[i].first == unuse_key) {
continue;
}
ValType& gpu_val = kv[i].second;

int thread_num = 8;
int len_per_thread = num / thread_num;
int remain = num % thread_num;
int begin = 0;

auto dump_func = [unuse_key, kv](int left, int right) {
for (int i = left; i < right; i++) {
if (kv[i].first == unuse_key) {
continue;
}
ValType& gpu_val = kv[i].second;
#ifdef PADDLE_WITH_PSLIB
auto* downpour_value =
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr);
int downpour_value_size = downpour_value->size();
if (gpu_val.mf_size > 0 && downpour_value_size == 7) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
}
float* cpu_val = downpour_value->data();
// cpu_val[0] = 0;
cpu_val[1] = gpu_val.delta_score;
cpu_val[2] = gpu_val.show;
cpu_val[3] = gpu_val.clk;
cpu_val[4] = gpu_val.lr;
cpu_val[5] = gpu_val.lr_g2sum;
cpu_val[6] = gpu_val.slot;
if (gpu_val.mf_size > 0) {
for (int x = 0; x < gpu_val.mf_size; x++) {
cpu_val[x + 7] = gpu_val.mf[x];
auto* downpour_value =
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr);
int downpour_value_size = downpour_value->size();
if (gpu_val.mf_size > 0 && downpour_value_size == 7) {
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
}
float* cpu_val = downpour_value->data();
// cpu_val[0] = 0;
cpu_val[1] = gpu_val.delta_score;
cpu_val[2] = gpu_val.show;
cpu_val[3] = gpu_val.clk;
cpu_val[4] = gpu_val.lr;
cpu_val[5] = gpu_val.lr_g2sum;
cpu_val[6] = gpu_val.slot;
if (gpu_val.mf_size > 0) {
for (int x = 0; x < gpu_val.mf_size; x++) {
cpu_val[x + 7] = gpu_val.mf[x];
}
}
}
#endif
#ifdef PADDLE_WITH_PSCORE
auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr);
downpour_value->count_ = gpu_val.show;
for (int x = 0; x < gpu_val.mf_size; x++) {
downpour_value->data_[x] = gpu_val.mf[x];
}
auto* downpour_value = (paddle::distributed::VALUE*)(gpu_val.cpu_ptr);
downpour_value->count_ = gpu_val.show;
for (int x = 0; x < gpu_val.mf_size; x++) {
downpour_value->data_[x] = gpu_val.mf[x];
}
#endif
}
};

for (int i = 0; i < thread_num; i++) {
threads.push_back(std::thread(
dump_func, begin, begin + len_per_thread + (i < remain ? 1 : 0)));
begin += len_per_thread + (i < remain ? 1 : 0);
}
for (std::thread& t : threads) {
t.join();
}

container_->prefetch(devid, stream);
// container_->prefetch(devid, stream);
}

template <typename KeyType, typename ValType>
Expand Down