Skip to content

Commit 48747ba

Browse files
committed
Add support for paddle.multiprocessing
1 parent d21074c commit 48747ba

File tree

12 files changed

+1083
-37
lines changed

12 files changed

+1083
-37
lines changed

paddle/fluid/memory/allocation/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ if(NOT WIN32)
126126
cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator)
127127
endif(NOT WIN32)
128128

129-
if(WITH_GPU AND WITH_TESTING AND NOT "$ENV{CI_SKIP_CPP_TEST}" STREQUAL "ON")
129+
cc_library(cuda_ipc_allocator SRCS cuda_ipc_allocator.cc DEPS allocator)
130+
131+
if(WITH_GPU AND WITH_TESTING AND NOT "$ENV{CI_SKIP_CPP_TEST}" STREQUAL "ON")
130132
nv_test(base_ptr_test SRCS base_ptr_test.cu DEPS malloc gpu_info)
131133
set_tests_properties(base_ptr_test PROPERTIES
132134
ENVIRONMENT "FLAGS_allocator_strategy=auto_growth;
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h"
16+
#include "paddle/fluid/platform/cuda_device_guard.h"
17+
18+
#include <fcntl.h>
19+
#include <stdlib.h>
20+
#include <sys/mman.h>
21+
#include <random>
22+
#include <string>
23+
24+
#include "glog/logging.h"
25+
#include "paddle/fluid/platform/enforce.h"
26+
27+
namespace paddle {
28+
namespace memory {
29+
namespace allocation {
30+
31+
namespace {
32+
std::mutex ipc_mutex_;
33+
std::unordered_map<std::string, std::weak_ptr<void>> ipc_handle_to_baseptr_;
34+
} // namespace
35+
36+
std::shared_ptr<void> GetIpcBasePtr(std::string handle) {
37+
std::lock_guard<std::mutex> lock(ipc_mutex_);
38+
39+
auto iter = ipc_handle_to_baseptr_.find(handle);
40+
if (iter != ipc_handle_to_baseptr_.end()) {
41+
auto baseptr = iter->second.lock();
42+
if (baseptr) return baseptr;
43+
}
44+
// The IpcMemHandle can only open once for the same handle,
45+
// so here we cache it here.
46+
void *baseptr = nullptr;
47+
auto ipc_handle =
48+
reinterpret_cast<const cudaIpcMemHandle_t *>(handle.c_str());
49+
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcOpenMemHandle(
50+
&baseptr, *ipc_handle, cudaIpcMemLazyEnablePeerAccess));
51+
// Close ipc handle on the same device.
52+
int device_id = platform::GetCurrentDeviceId();
53+
// Add deleter to close ipc handle.
54+
auto sp = std::shared_ptr<void>(baseptr, [handle, device_id](void *ptr) {
55+
platform::CUDADeviceGuard guard(device_id);
56+
std::lock_guard<std::mutex> lock(ipc_mutex_);
57+
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcCloseMemHandle(ptr));
58+
ipc_handle_to_baseptr_.erase(handle);
59+
VLOG(6) << "cudaIpcCloseMemHandle for ptr:"
60+
<< "\t" << ptr;
61+
});
62+
std::weak_ptr<void> wp = sp;
63+
ipc_handle_to_baseptr_.insert(iter, {handle, wp});
64+
65+
return sp;
66+
}
67+
68+
CudaIpcAllocation::~CudaIpcAllocation() {
69+
shared_ptr_.reset();
70+
VLOG(6) << "tensor deleted cudaIpcCloseMemHandle for ptr:"
71+
<< "\t" << this->ptr();
72+
}
73+
74+
} // namespace allocation
75+
} // namespace memory
76+
} // namespace paddle
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <memory>
18+
#include <mutex> // NOLINT
19+
#include <string>
20+
#include <unordered_set>
21+
#include <utility>
22+
23+
#include "paddle/fluid/memory/allocation/allocator.h"
24+
#include "paddle/fluid/platform/cuda_device_guard.h"
25+
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
26+
#include "paddle/fluid/platform/enforce.h"
27+
28+
namespace paddle {
29+
namespace memory {
30+
namespace allocation {
31+
32+
std::shared_ptr<void> GetIpcBasePtr(std::string handle);
33+
34+
class CudaIpcAllocation : public Allocation {
35+
public:
36+
explicit CudaIpcAllocation(void *ptr, size_t size, int device_id,
37+
std::shared_ptr<void> shared_ptr)
38+
: Allocation(ptr, size, platform::CUDAPlace(device_id)),
39+
device_id_(std::move(device_id)),
40+
shared_ptr_(std::move(shared_ptr)) {}
41+
42+
inline const int &device_id() const { return device_id_; }
43+
44+
~CudaIpcAllocation() override;
45+
46+
private:
47+
int device_id_;
48+
std::shared_ptr<void> shared_ptr_;
49+
};
50+
51+
} // namespace allocation
52+
} // namespace memory
53+
} // namespace paddle

paddle/fluid/memory/allocation/mmap_allocator.cc

Lines changed: 204 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,188 @@ namespace paddle {
2929
namespace memory {
3030
namespace allocation {
3131

32+
std::string GetIPCName() {
33+
static std::random_device rd;
34+
std::string handle = "/paddle_";
35+
#ifdef _WIN32
36+
handle += std::to_string(GetCurrentProcessId());
37+
#else
38+
handle += std::to_string(getpid());
39+
#endif
40+
handle += "_";
41+
handle += std::to_string(rd());
42+
return handle;
43+
}
44+
45+
struct CountInfo {
46+
std::atomic<int> refcount;
47+
};
48+
49+
void AllocateMemoryMap(std::string filename, int flags, size_t size,
50+
void **map_ptr_, int *fd_) {
51+
// TODO(@ZHUI): support win32
52+
int file_flags = 0;
53+
int fd = -1;
54+
if (flags & MAPPED_SHAREDMEM) {
55+
file_flags = O_RDWR | O_CREAT;
56+
} else {
57+
file_flags = O_RDONLY;
58+
}
59+
if (flags & MAPPED_EXCLUSIVE) {
60+
file_flags |= O_EXCL;
61+
}
62+
if (flags & MAPPED_NOCREATE) {
63+
file_flags &= ~O_CREAT;
64+
}
65+
66+
if (!(flags & MAPPED_FROMFD)) {
67+
if (flags & MAPPED_SHAREDMEM) {
68+
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
69+
PADDLE_ENFORCE_NE(
70+
fd, -1,
71+
platform::errors::Unavailable(
72+
"File descriptor %s open failed, unable in read-write mode",
73+
filename.c_str()));
74+
VLOG(6) << "shm_open: " << filename;
75+
}
76+
} else {
77+
fd = -1;
78+
}
79+
80+
PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0,
81+
platform::errors::Unavailable(
82+
"Fruncate a file to a specified length failed!"));
83+
84+
if (flags & MAPPED_SHAREDMEM) {
85+
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
86+
} else {
87+
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
88+
}
89+
90+
PADDLE_ENFORCE_NE(*map_ptr_, MAP_FAILED,
91+
platform::errors::Unavailable(
92+
"Memory map failed when create shared memory."));
93+
94+
if (flags & MAPPED_KEEPFD) {
95+
*fd_ = fd;
96+
} else {
97+
PADDLE_ENFORCE_NE(::close(fd), -1,
98+
platform::errors::Unavailable(
99+
false, "Error closing file <", filename, ">"));
100+
101+
*fd_ = -1;
102+
}
103+
}
104+
105+
std::shared_ptr<MemoryMapAllocation> AllocateMemoryMapAllocation(
106+
std::string filename, int flags, size_t size) {
107+
int fd = -1;
108+
void *base_ptr = nullptr;
109+
AllocateMemoryMap(filename, flags, size, &base_ptr, &fd);
110+
return std::make_shared<MemoryMapAllocation>(base_ptr, size, filename, flags,
111+
fd);
112+
}
113+
114+
std::shared_ptr<RefcountedMemoryMapAllocation>
115+
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
116+
size_t size) {
117+
int fd = -1;
118+
void *base_ptr = nullptr;
119+
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
120+
void *aliged_base_ptr =
121+
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
122+
return std::make_shared<RefcountedMemoryMapAllocation>(aliged_base_ptr, size,
123+
filename, flags, fd);
124+
}
125+
126+
RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
127+
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
128+
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
129+
// must reset base ptr first.
130+
resetBaseptr();
131+
initializeRefercount();
132+
}
133+
134+
void MemoryMapAllocation::close() {
135+
if (closed_) {
136+
return;
137+
}
138+
closed_ = true;
139+
if (map_ptr_ == nullptr) {
140+
return;
141+
}
142+
if (flags_ & MAPPED_KEEPFD) {
143+
PADDLE_ENFORCE_NE(
144+
::close(fd_), -1,
145+
platform::errors::Unavailable("could not close file descriptor ", fd_,
146+
" :", strerror(errno), " (", errno, ")"));
147+
}
148+
149+
PADDLE_ENFORCE_NE(
150+
munmap(map_ptr_, map_size_), -1,
151+
platform::errors::Unavailable("could not unmap the shared memory file: ",
152+
strerror(errno), " (", errno, ")"));
153+
154+
if (!(flags_ & (MAPPED_FROMFD | MAPPED_UNLINK))) {
155+
if (flags_ & MAPPED_SHAREDMEM) {
156+
PADDLE_ENFORCE_NE(
157+
shm_unlink(ipc_name_.c_str()), -1,
158+
platform::errors::Unavailable(
159+
"could not unlink the shared memory file ", ipc_name_, " : ",
160+
strerror(errno), " (", errno, ")"));
161+
}
162+
}
163+
}
164+
165+
MemoryMapAllocation::~MemoryMapAllocation() { close(); }
166+
167+
void RefcountedMemoryMapAllocation::incref() {
168+
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
169+
++info->refcount;
170+
}
171+
172+
int RefcountedMemoryMapAllocation::decref() {
173+
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
174+
return --info->refcount == 0;
175+
}
176+
177+
void RefcountedMemoryMapAllocation::resetBaseptr() {
178+
map_ptr_ =
179+
static_cast<void *>(static_cast<char *>(map_ptr_) - mmap_alignment);
180+
map_size_ = map_size_ + mmap_alignment;
181+
}
182+
183+
void RefcountedMemoryMapAllocation::initializeRefercount() {
184+
CountInfo *info = reinterpret_cast<CountInfo *>(map_ptr_);
185+
186+
if (flags_ & MAPPED_EXCLUSIVE) {
187+
new (&info->refcount) std::atomic<int>(1);
188+
} else {
189+
info->refcount++;
190+
}
191+
}
192+
193+
void RefcountedMemoryMapAllocation::close() {
194+
if (closed_) {
195+
return;
196+
}
197+
closed_ = true;
198+
void *data = map_ptr_;
199+
CountInfo *info = reinterpret_cast<CountInfo *>(data);
200+
if (--info->refcount == 0) {
201+
PADDLE_ENFORCE_NE(
202+
shm_unlink(ipc_name_.c_str()), -1,
203+
platform::errors::Unavailable(
204+
"could not unlink the shared memory file ", ipc_name_));
205+
VLOG(6) << "shm_unlink file: " << ipc_name;
206+
}
207+
208+
PADDLE_ENFORCE_NE(
209+
munmap(map_ptr_, map_size_), -1,
210+
platform::errors::Unavailable("could not unmap the shared memory file: ",
211+
strerror(errno), " (", errno, ")"));
212+
}
213+
32214
MemoryMapWriterAllocation::~MemoryMapWriterAllocation() {
33215
PADDLE_ENFORCE_NE(
34216
munmap(this->ptr(), this->size()), -1,
@@ -44,30 +226,30 @@ MemoryMapReaderAllocation::~MemoryMapReaderAllocation() {
44226
/* Here we do not pay attention to the result of shm_unlink,
45227
because the memory mapped file may have been cleared due to the
46228
MemoryMapFdSet::Clear() */
229+
230+
// Code of DataLoader subprocess:
231+
//
232+
// core._array_to_share_memory_tensor(b)
233+
// out_queue.put((idx, tensor_list, structure))
234+
// core._remove_tensor_list_mmap_fds(tensor_list)
235+
236+
/* If the tensor in already in the send queue, the tensor will be
237+
* deconstructed by the function. If the tensor not send yet, it
238+
* will be cleared by MemoryMapFdSet::Clear().
239+
* If the `_remove_tensor_list_mmap_fds` have be interrupted, the
240+
* tensor will be cleared by both methods.
241+
* */
242+
47243
shm_unlink(this->ipc_name().c_str());
48244
MemoryMapFdSet::Instance().Remove(this->ipc_name());
49245
VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name();
50246
}
51247

52-
std::string GetIPCName() {
53-
static std::random_device rd;
54-
std::string handle = "/paddle_";
55-
#ifdef _WIN32
56-
handle += std::to_string(GetCurrentProcessId());
57-
#else
58-
handle += std::to_string(getpid());
59-
#endif
60-
handle += "_";
61-
handle += std::to_string(rd());
62-
return handle;
63-
}
64-
65248
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
66249
size_t size) {
67250
const std::string &ipc_name = GetIPCName();
68251
int flags = O_RDWR | O_CREAT;
69-
70-
int fd = shm_open(ipc_name.c_str(), flags, 0644);
252+
int fd = shm_open(ipc_name.c_str(), flags, 0600);
71253
PADDLE_ENFORCE_NE(
72254
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
73255
ipc_name.c_str()));
@@ -86,12 +268,16 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
86268

87269
std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
88270
const std::string &ipc_name, size_t size) {
89-
int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644);
271+
// TODO(@ZHUI): support win32
272+
int flags = O_RDWR | O_CREAT;
273+
flags &= ~O_CREAT;
274+
275+
int fd = shm_open(ipc_name.c_str(), flags, 0600);
90276
PADDLE_ENFORCE_NE(
91277
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
92278
ipc_name.c_str()));
93-
94-
void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
279+
// pass
280+
void *ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
95281
PADDLE_ENFORCE_NE(ptr, MAP_FAILED,
96282
platform::errors::Unavailable(
97283
"Memory map failed when rebuild shared memory."));

0 commit comments

Comments
 (0)