Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size
if (result == MAP_FAILED) {
ARROW_LOG(FATAL) << "mmap failed";
}
close(fd);
close(fd); // Closing this fd has an effect on performance.

ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
entry.pointer = result;
entry.length = map_size;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/plasma/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ Status plasma_error_status(int plasma_error) {
ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local;
ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote;

const PlasmaStoreInfo* plasma_config;

} // namespace plasma
5 changes: 5 additions & 0 deletions cpp/src/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ enum ObjectRequestType {
extern int ObjectStatusLocal;
extern int ObjectStatusRemote;

/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
/// by making it possible to pass a context object through dlmalloc.
struct PlasmaStoreInfo;
extern const PlasmaStoreInfo* plasma_config;
} // namespace plasma

#endif // PLASMA_COMMON_H
82 changes: 54 additions & 28 deletions cpp/src/plasma/malloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
#include <sys/mman.h>
#include <unistd.h>

#include <cerrno>
#include <string>
#include <unordered_map>
#include <vector>

#include "plasma/common.h"
#include "plasma/plasma.h"

extern "C" {
void* fake_mmap(size_t);
Expand Down Expand Up @@ -60,12 +64,12 @@ struct mmap_record {

namespace {

/** Hashtable that contains one entry per segment that we got from the OS
* via mmap. Associates the address of that segment with its file descriptor
* and size. */
/// Hashtable that contains one entry per segment that we got from the OS
/// via mmap. Associates the address of that segment with its file descriptor
/// and size.
std::unordered_map<void*, mmap_record> mmap_records;

} /* namespace */
} // namespace

constexpr int GRANULARITY_MULTIPLIER = 2;

Expand All @@ -77,64 +81,84 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) {
return (unsigned char const*)pto - (unsigned char const*)pfrom;
}

/* Create a buffer. This is creating a temporary file and then
* immediately unlinking it so we do not leave traces in the system. */
// Create a buffer. This is creating a temporary file and then
// immediately unlinking it so we do not leave traces in the system.
int create_buffer(int64_t size) {
int fd;
std::string file_template = plasma::plasma_config->directory;
#ifdef _WIN32
if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
(DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))),
(DWORD)(uint64_t)size, NULL)) {
fd = -1;
}
#else
#ifdef __linux__
constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
#else
constexpr char file_template[] = "/tmp/plasmaXXXXXX";
#endif
char file_name[32];
strncpy(file_name, file_template, 32);
fd = mkstemp(file_name);
if (fd < 0) return -1;
file_template += "/plasmaXXXXXX";
std::vector<char> file_name(file_template.begin(), file_template.end());
file_name.push_back('\0');
fd = mkstemp(&file_name[0]);
if (fd < 0) {
ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0];
return -1;
}

FILE* file = fdopen(fd, "a+");
if (!file) {
close(fd);
ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0];
return -1;
}
if (unlink(file_name) != 0) {
ARROW_LOG(FATAL) << "unlink error";
// Immediately unlink the file so we do not leave traces in the system.
if (unlink(&file_name[0]) != 0) {
ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0];
return -1;
}
if (ftruncate(fd, (off_t)size) != 0) {
ARROW_LOG(FATAL) << "ftruncate error";
return -1;
if (!plasma::plasma_config->hugepages_enabled) {
// Increase the size of the file to the desired size. This seems not to be
// needed for files that are backed by the huge page fs, see also
// http://www.mail-archive.com/[email protected]/msg14737.html
if (ftruncate(fd, (off_t)size) != 0) {
ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0];
return -1;
}
}
#endif
return fd;
}

void* fake_mmap(size_t size) {
/* Add sizeof(size_t) so that the returned pointer is deliberately not
* page-aligned. This ensures that the segments of memory returned by
* fake_mmap are never contiguous. */
// Add sizeof(size_t) so that the returned pointer is deliberately not
// page-aligned. This ensures that the segments of memory returned by
// fake_mmap are never contiguous.
size += sizeof(size_t);

int fd = create_buffer(size);
ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
#ifdef __linux__
// MAP_POPULATE will pre-populate the page tables for this memory region
// which avoids work when accessing the pages later. Only supported on Linux.
void* pointer =
mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
#else
void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
#endif
if (pointer == MAP_FAILED) {
ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno);
if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) {
ARROW_LOG(ERROR)
<< " (this probably means you have to increase /proc/sys/vm/nr_hugepages)";
}
return pointer;
}

/* Increase dlmalloc's allocation granularity directly. */
// Increase dlmalloc's allocation granularity directly.
mparams.granularity *= GRANULARITY_MULTIPLIER;

mmap_record& record = mmap_records[pointer];
record.fd = fd;
record.size = size;

/* We lie to dlmalloc about where mapped memory actually lives. */
// We lie to dlmalloc about where mapped memory actually lives.
pointer = pointer_advance(pointer, sizeof(size_t));
ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
return pointer;
Expand All @@ -148,8 +172,8 @@ int fake_munmap(void* addr, int64_t size) {
auto entry = mmap_records.find(addr);

if (entry == mmap_records.end() || entry->second.size != size) {
/* Reject requests to munmap that don't directly match previous
* calls to mmap, to prevent dlmalloc from trimming. */
// Reject requests to munmap that don't directly match previous
// calls to mmap, to prevent dlmalloc from trimming.
return -1;
}

Expand All @@ -163,7 +187,7 @@ int fake_munmap(void* addr, int64_t size) {
}

void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
/* TODO(rshin): Implement a more efficient search through mmap_records. */
// TODO(rshin): Implement a more efficient search through mmap_records.
for (const auto& entry : mmap_records) {
if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) {
*fd = entry.second.fd;
Expand All @@ -176,3 +200,5 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse
*map_size = 0;
*offset = 0;
}

void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); }
2 changes: 2 additions & 0 deletions cpp/src/plasma/malloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@

void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);

void set_malloc_granularity(int value);

#endif // MALLOC_H
8 changes: 8 additions & 0 deletions cpp/src/plasma/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <string.h>
#include <unistd.h> // pid_t

#include <string>
#include <unordered_map>
#include <unordered_set>

Expand Down Expand Up @@ -129,6 +130,13 @@ struct PlasmaStoreInfo {
/// The amount of memory (in bytes) that we allow to be allocated in the
/// store.
int64_t memory_capacity;
/// Boolean flag indicating whether to start the object store with hugepages
/// support enabled. Huge pages are substantially larger than normal memory
/// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce
/// bookkeeping overhead from the OS.
bool hugepages_enabled;
/// A (platform-dependent) directory where to create the memory-backed file.
std::string directory;
};

/// Get an entry from the object table and return NULL if the object_id
Expand Down
86 changes: 61 additions & 25 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)

Client::Client(int fd) : fd(fd) {}

PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory)
PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
bool hugepages_enabled)
: loop_(loop), eviction_policy_(&store_info_) {
store_info_.memory_capacity = system_memory;
store_info_.directory = directory;
store_info_.hugepages_enabled = hugepages_enabled;
}

// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
Expand All @@ -114,6 +117,8 @@ PlasmaStore::~PlasmaStore() {
}
}

const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; }

// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
Expand Down Expand Up @@ -633,10 +638,13 @@ class PlasmaStoreRunner {
public:
PlasmaStoreRunner() {}

void Start(char* socket_name, int64_t system_memory) {
void Start(char* socket_name, int64_t system_memory, std::string directory,
bool hugepages_enabled) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), system_memory));
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
plasma_config = store_->get_plasma_store_info();
int socket = bind_ipc_sock(socket_name, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
Expand Down Expand Up @@ -670,25 +678,35 @@ void HandleSignal(int signal) {
}
}

void start_server(char* socket_name, int64_t system_memory) {
void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory,
bool hugepages_enabled) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);

PlasmaStoreRunner runner;
g_runner = &runner;
signal(SIGTERM, HandleSignal);
runner.Start(socket_name, system_memory);
runner.Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
}

} // namespace plasma

int main(int argc, char* argv[]) {
char* socket_name = NULL;
// Directory where plasma memory mapped files are stored.
std::string plasma_directory;
bool hugepages_enabled = false;
int64_t system_memory = -1;
int c;
while ((c = getopt(argc, argv, "s:m:")) != -1) {
while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
switch (c) {
case 'd':
plasma_directory = std::string(optarg);
break;
case 'h':
hugepages_enabled = true;
break;
case 's':
socket_name = optarg;
break;
Expand All @@ -705,36 +723,54 @@ int main(int argc, char* argv[]) {
exit(-1);
}
}
// Sanity check command line options.
if (!socket_name) {
ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
}
if (system_memory == -1) {
ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
}
if (hugepages_enabled && plasma_directory.empty()) {
ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge pages "
"filesystem with -d";
}
if (plasma_directory.empty()) {
#ifdef __linux__
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open("/dev/shm", O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
if (system_memory > shm_mem_avail) {
ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The "
"request is for "
<< system_memory << " bytes, and the amount available is "
<< shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass "
"an argument with the flag '--shm-size' to 'docker run'.";
plasma_directory = "/dev/shm";
#else
plasma_directory = "/tmp";
#endif
}
ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
<< " and huge page support "
<< (hugepages_enabled ? "enabled" : "disabled");
#ifdef __linux__
if (!hugepages_enabled) {
// On Linux, check that the amount of memory available in /dev/shm is large
// enough to accommodate the request. If it isn't, then fail.
int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
struct statvfs shm_vfs_stats;
fstatvfs(shm_fd, &shm_vfs_stats);
// The value shm_vfs_stats.f_bsize is the block size, and the value
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
if (system_memory > shm_mem_avail) {
ARROW_LOG(FATAL)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
}
} else {
set_malloc_granularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
plasma::start_server(socket_name, system_memory);
plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled);
}
7 changes: 6 additions & 1 deletion cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define PLASMA_STORE_H

#include <deque>
#include <string>
#include <vector>

#include "plasma/common.h"
Expand Down Expand Up @@ -47,10 +48,14 @@ struct Client {

class PlasmaStore {
public:
PlasmaStore(EventLoop* loop, int64_t system_memory);
PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
bool hugetlbfs_enabled);

~PlasmaStore();

/// Get a const pointer to the internal PlasmaStoreInfo object.
const PlasmaStoreInfo* get_plasma_store_info();

/// Create a new object. The client must do a call to release_object to tell
/// the store when it is done with the object.
///
Expand Down
Loading