Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions paddle/fluid/memory/allocation/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,7 @@ cc_library(virtual_memory_auto_growth_best_fit_allocator SRCS virtual_memory_aut
if(NOT WIN32)
cc_library(mmap_allocator SRCS mmap_allocator.cc DEPS allocator)
cc_test(mmap_allocator_test SRCS mmap_allocator_test.cc DEPS mmap_allocator allocator)
if (WITH_GPU)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块需要限制成非WIN32吗? 下面cuda_ipc_allocator在define的时候限制了非WIN32

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没有尝试,cmake里这么写,也可以减少 WIN32下的编译问题

cc_library(cuda_ipc_allocator SRCS cuda_ipc_allocator.cc DEPS allocator)
endif()
endif(NOT WIN32)
80 changes: 80 additions & 0 deletions paddle/fluid/memory/allocation/cuda_ipc_allocator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _WIN32

#include "paddle/fluid/memory/allocation/cuda_ipc_allocator.h"
#include "paddle/fluid/platform/cuda_device_guard.h"

#include <fcntl.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <random>
#include <string>

#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace memory {
namespace allocation {

namespace {
std::mutex ipc_mutex_;
std::unordered_map<std::string, std::weak_ptr<void>> ipc_handle_to_baseptr_;
} // namespace

std::shared_ptr<void> GetIpcBasePtr(std::string handle) {
std::lock_guard<std::mutex> lock(ipc_mutex_);

auto iter = ipc_handle_to_baseptr_.find(handle);
if (iter != ipc_handle_to_baseptr_.end()) {
auto baseptr = iter->second.lock();
if (baseptr) return baseptr;
}
// The IpcMemHandle can only open once for the same handle,
// so here we cache it here.
void *baseptr = nullptr;
auto ipc_handle =
reinterpret_cast<const cudaIpcMemHandle_t *>(handle.c_str());
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcOpenMemHandle(
&baseptr, *ipc_handle, cudaIpcMemLazyEnablePeerAccess));
// Close ipc handle on the same device.
int device_id = platform::GetCurrentDeviceId();
// Add deleter to close ipc handle.
auto sp = std::shared_ptr<void>(baseptr, [handle, device_id](void *ptr) {
platform::CUDADeviceGuard guard(device_id);
std::lock_guard<std::mutex> lock(ipc_mutex_);
PADDLE_ENFORCE_GPU_SUCCESS(cudaIpcCloseMemHandle(ptr));
ipc_handle_to_baseptr_.erase(handle);
VLOG(6) << "cudaIpcCloseMemHandle for ptr:"
<< "\t" << ptr;
});
std::weak_ptr<void> wp = sp;
ipc_handle_to_baseptr_.insert(iter, {handle, wp});

return sp;
}

CudaIpcAllocation::~CudaIpcAllocation() {
shared_ptr_.reset();
VLOG(6) << "tensor deleted cudaIpcCloseMemHandle for ptr:"
<< "\t" << this->ptr();
}

} // namespace allocation
} // namespace memory
} // namespace paddle

#endif
56 changes: 56 additions & 0 deletions paddle/fluid/memory/allocation/cuda_ipc_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _WIN32
#pragma once

#include <memory>
#include <mutex> // NOLINT
#include <string>
#include <unordered_set>
#include <utility>

#include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/enforce.h"

namespace paddle {
namespace memory {
namespace allocation {

std::shared_ptr<void> GetIpcBasePtr(std::string handle);

class CudaIpcAllocation : public Allocation {
public:
explicit CudaIpcAllocation(void *ptr, size_t size, int device_id,
std::shared_ptr<void> shared_ptr)
: Allocation(ptr, size, platform::CUDAPlace(device_id)),
device_id_(std::move(device_id)),
shared_ptr_(std::move(shared_ptr)) {}

inline const int &device_id() const { return device_id_; }

~CudaIpcAllocation() override;

private:
int device_id_;
std::shared_ptr<void> shared_ptr_;
};

} // namespace allocation
} // namespace memory
} // namespace paddle

#endif
187 changes: 169 additions & 18 deletions paddle/fluid/memory/allocation/mmap_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,155 @@ namespace paddle {
namespace memory {
namespace allocation {

std::string GetIPCName() {
static std::random_device rd;
std::string handle = "/paddle_";
#ifdef _WIN32
handle += std::to_string(GetCurrentProcessId());
#else
handle += std::to_string(getpid());
#endif
handle += "_";
handle += std::to_string(rd());
return handle;
}

struct CountInfo {
std::atomic<int> refcount;
};

void AllocateMemoryMap(std::string filename, int flags, size_t size,
void **map_ptr_, int *fd_) {
// TODO(@ZHUI): support win32
int file_flags = 0;
int fd = -1;
if (flags & MAPPED_SHAREDMEM) {
file_flags = O_RDWR | O_CREAT;
} else {
file_flags = O_RDONLY;
}
if (flags & MAPPED_EXCLUSIVE) {
file_flags |= O_EXCL;
}
if (flags & MAPPED_NOCREATE) {
file_flags &= ~O_CREAT;
}

if (!(flags & MAPPED_FROMFD)) {
if (flags & MAPPED_SHAREDMEM) {
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
PADDLE_ENFORCE_NE(
fd, -1,
platform::errors::Unavailable(
"File descriptor %s open failed, unable in read-write mode",
filename.c_str()));
VLOG(6) << "shm_open: " << filename;
}
} else {
fd = -1;
}

PADDLE_ENFORCE_EQ(ftruncate(fd, size), 0,
platform::errors::Unavailable(
"Fruncate a file to a specified length failed!"));

if (flags & MAPPED_SHAREDMEM) {
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
} else {
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里如果是MAP_PRIVATE的话,MMap还能共享吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该都是 走上面 MAP_SHARED 的分支

}

PADDLE_ENFORCE_NE(*map_ptr_, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when create shared memory."));

if (flags & MAPPED_KEEPFD) {
*fd_ = fd;
} else {
PADDLE_ENFORCE_NE(::close(fd), -1,
platform::errors::Unavailable(
"Error closing memory maped file <", filename, ">"));

*fd_ = -1;
}
}

std::shared_ptr<RefcountedMemoryMapAllocation>
AllocateRefcountedMemoryMapAllocation(std::string filename, int flags,
size_t size) {
int fd = -1;
void *base_ptr = nullptr;
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
void *aliged_base_ptr =
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
return std::make_shared<RefcountedMemoryMapAllocation>(aliged_base_ptr, size,
filename, flags, fd);
}

RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
// must reset base ptr first.
resetBaseptr();
initializeRefercount();
}

void MemoryMapAllocation::close() {
if (closed_) {
return;
}
closed_ = true;
}

MemoryMapAllocation::~MemoryMapAllocation() { close(); }

void RefcountedMemoryMapAllocation::incref() {
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
++info->refcount;
}

int RefcountedMemoryMapAllocation::decref() {
CountInfo *info = static_cast<CountInfo *>(map_ptr_);
return --info->refcount == 0;
}

void RefcountedMemoryMapAllocation::resetBaseptr() {
map_ptr_ =
static_cast<void *>(static_cast<char *>(map_ptr_) - mmap_alignment);
map_size_ = map_size_ + mmap_alignment;
}

void RefcountedMemoryMapAllocation::initializeRefercount() {
CountInfo *info = reinterpret_cast<CountInfo *>(map_ptr_);

if (flags_ & MAPPED_EXCLUSIVE) {
new (&info->refcount) std::atomic<int>(1);
} else {
info->refcount++;
}
}

void RefcountedMemoryMapAllocation::close() {
if (closed_) {
return;
}
closed_ = true;
void *data = map_ptr_;
CountInfo *info = reinterpret_cast<CountInfo *>(data);
if (--info->refcount == 0) {
PADDLE_ENFORCE_NE(
shm_unlink(ipc_name_.c_str()), -1,
platform::errors::Unavailable(
"could not unlink the shared memory file ", ipc_name_));
VLOG(6) << "shm_unlink file: " << ipc_name_;
}

PADDLE_ENFORCE_NE(
munmap(map_ptr_, map_size_), -1,
platform::errors::Unavailable("could not unmap the shared memory file: ",
strerror(errno), " (", errno, ")"));
}

MemoryMapWriterAllocation::~MemoryMapWriterAllocation() {
PADDLE_ENFORCE_NE(
munmap(this->ptr(), this->size()), -1,
Expand All @@ -44,30 +193,30 @@ MemoryMapReaderAllocation::~MemoryMapReaderAllocation() {
/* Here we do not pay attention to the result of shm_unlink,
because the memory mapped file may have been cleared due to the
MemoryMapFdSet::Clear() */

// Code of DataLoader subprocess:
//
// core._array_to_share_memory_tensor(b)
// out_queue.put((idx, tensor_list, structure))
// core._remove_tensor_list_mmap_fds(tensor_list)

/* If the tensor in already in the send queue, the tensor will be
* deconstructed by the function. If the tensor not send yet, it
* will be cleared by MemoryMapFdSet::Clear().
* If the `_remove_tensor_list_mmap_fds` have be interrupted, the
* tensor will be cleared by both methods.
* */

shm_unlink(this->ipc_name().c_str());
MemoryMapFdSet::Instance().Remove(this->ipc_name());
VLOG(3) << "~MemoryMapReaderAllocation: " << this->ipc_name();
}

std::string GetIPCName() {
static std::random_device rd;
std::string handle = "/paddle_";
#ifdef _WIN32
handle += std::to_string(GetCurrentProcessId());
#else
handle += std::to_string(getpid());
#endif
handle += "_";
handle += std::to_string(rd());
return handle;
}

std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
size_t size) {
const std::string &ipc_name = GetIPCName();
int flags = O_RDWR | O_CREAT;

int fd = shm_open(ipc_name.c_str(), flags, 0644);
int fd = shm_open(ipc_name.c_str(), flags, 0600);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));
Expand All @@ -86,12 +235,14 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(

std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
const std::string &ipc_name, size_t size) {
int fd = shm_open(ipc_name.c_str(), O_RDONLY, 0644);
int flags = O_RDWR | O_CREAT;
flags &= ~O_CREAT;

int fd = shm_open(ipc_name.c_str(), flags, 0600);
PADDLE_ENFORCE_NE(
fd, -1, platform::errors::Unavailable("File descriptor %s open failed",
ipc_name.c_str()));

void *ptr = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
void *ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
PADDLE_ENFORCE_NE(ptr, MAP_FAILED,
platform::errors::Unavailable(
"Memory map failed when rebuild shared memory."));
Expand Down
Loading