Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ extern "C" {
#include "base/pod_array.h"
#include "core/bloom.h"
#include "core/detail/bitpacking.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"

ABSL_RETIRED_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");

ABSL_FLAG(bool, experimental_flat_json, false, "If true uses flat json implementation.");

namespace dfly {
Expand Down Expand Up @@ -67,6 +66,19 @@ inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
}
}

void FreeList(unsigned encoding, void* ptr, MemoryResource* mr) {
switch (encoding) {
case OBJ_ENCODING_QUICKLIST:
quicklistRelease((quicklist*)ptr);
break;
case kEncodingQL2:
CompactObj::DeleteMR<QList>(ptr);
break;
default:
LOG(FATAL) << "Unknown list encoding type";
}
}

size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2: {
Expand Down Expand Up @@ -288,8 +300,9 @@ size_t RobjWrapper::MallocUsed() const {
CHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return InnerObjMallocUsed();
case OBJ_LIST:
DCHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
return QlMAllocSize((quicklist*)inner_obj_);
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return QlMAllocSize((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->MallocUsed();
case OBJ_SET:
return MallocUsedSet(encoding_, inner_obj_);
case OBJ_HASH:
Expand All @@ -312,7 +325,9 @@ size_t RobjWrapper::Size() const {
DCHECK_EQ(OBJ_ENCODING_RAW, encoding_);
return sz_;
case OBJ_LIST:
return quicklistCount((quicklist*)inner_obj_);
if (encoding_ == OBJ_ENCODING_QUICKLIST)
return quicklistCount((quicklist*)inner_obj_);
return ((QList*)inner_obj_)->Size();
case OBJ_ZSET: {
switch (encoding_) {
case OBJ_ENCODING_SKIPLIST: {
Expand Down Expand Up @@ -367,8 +382,7 @@ void RobjWrapper::Free(MemoryResource* mr) {
mr->deallocate(inner_obj_, 0, 8); // we do not keep the allocated size.
break;
case OBJ_LIST:
CHECK_EQ(encoding_, OBJ_ENCODING_QUICKLIST);
quicklistRelease((quicklist*)inner_obj_);
FreeList(encoding_, inner_obj_, mr);
break;
case OBJ_SET:
FreeObjSet(encoding_, inner_obj_, mr);
Expand Down
2 changes: 1 addition & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
namespace dfly {

constexpr unsigned kEncodingIntSet = 0;
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
constexpr unsigned kEncodingQL2 = 1;
constexpr unsigned kEncodingListPack = 3;
constexpr unsigned kEncodingJsonCons = 0;
constexpr unsigned kEncodingJsonFlat = 1;
Expand Down
113 changes: 76 additions & 37 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ extern "C" {

#include "base/flags.h"
#include "base/logging.h"
#include "core/qlist.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
Expand Down Expand Up @@ -59,6 +60,8 @@ ABSL_FLAG(int32_t, list_max_listpack_size, -2, "Maximum listpack size, default i
*/

ABSL_FLAG(int32_t, list_compress_depth, 0, "Compress depth of the list. Default is no compression");
ABSL_FLAG(bool, list_experimental_v2, false,
"Compress depth of the list. Default is no compression");

namespace dfly {

Expand All @@ -73,6 +76,10 @@ quicklist* GetQL(const PrimeValue& mv) {
return (quicklist*)mv.RObjPtr();
}

QList* GetQLV2(const PrimeValue& mv) {
return (QList*)mv.RObjPtr();
}

void* listPopSaver(unsigned char* data, size_t sz) {
return new string((char*)data, sz);
}
Expand Down Expand Up @@ -265,26 +272,46 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
res = std::move(*op_res);
}

quicklist* ql = nullptr;
size_t len = 0;
DVLOG(1) << "OpPush " << key << " new_key " << res.is_new;
quicklist* ql = nullptr;
QList* ql_v2 = nullptr;

if (res.is_new) {
ql = quicklistCreate();
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
ql_v2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, ql_v2);
} else {
ql = quicklistCreate();
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
}
} else {
if (res.it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
ql = GetQL(res.it->second);
if (res.it->second.Encoding() == kEncodingQL2) {
ql_v2 = GetQLV2(res.it->second);
} else {
ql = GetQL(res.it->second);
}
}

// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;

for (string_view v : vals) {
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
if (ql) {
// Left push is LIST_HEAD.
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
for (string_view v : vals) {
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
}
len = quicklistCount(ql);
} else {
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
for (string_view v : vals) {
ql_v2->Push(v, where);
}
len = ql_v2->Size();
}

if (res.is_new) {
Expand All @@ -305,7 +332,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
RecordJournal(op_args, command, mapped, 2);
}

return quicklistCount(ql);
return len;
}

OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
Expand All @@ -320,26 +347,26 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u

auto it = it_res->it;
quicklist* ql = GetQL(it->second);

auto prev_len = quicklistCount(ql);
StringVec res;
if (quicklistCount(ql) < count) {
count = quicklistCount(ql);
if (prev_len < count) {
count = prev_len;
}
res.reserve(count);

if (return_results) {
for (unsigned i = 0; i < count; ++i) {
res.push_back(ListPop(dir, ql));
}
} else {
for (unsigned i = 0; i < count; ++i) {
ListPop(dir, ql);
res.reserve(count);
}

for (unsigned i = 0; i < count; ++i) {
string val = ListPop(dir, ql);
if (return_results) {
res.push_back(std::move(val));
}
}

it_res->post_updater.Run();

if (quicklistCount(ql) == 0) {
if (count == prev_len) {
CHECK(db_slice.Del(op_args.db_cntx, it));
}

Expand Down Expand Up @@ -418,31 +445,43 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, std::string_view key) {
if (!res)
return res.status();

quicklist* ql = GetQL(res.value()->second);
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
return ql->Size();
}

quicklist* ql = GetQL(res.value()->second);
return quicklistCount(ql);
}

OpResult<string> OpIndex(const OpArgs& op_args, std::string_view key, long index) {
auto res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_LIST);
if (!res)
return res.status();
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
if (!iter)
return OpStatus::KEY_NOTFOUND;

quicklistNext(iter, &entry);
string str;

if (entry.value) {
str.assign(reinterpret_cast<char*>(entry.value), entry.sz);
if (res.value()->second.Encoding() == kEncodingQL2) {
QList* ql = GetQLV2(res.value()->second);
auto it = ql->GetIterator(index);
if (!it.Next())
return OpStatus::KEY_NOTFOUND;
str = it.Get().to_string();
} else {
str = absl::StrCat(entry.longval);
}
quicklistReleaseIterator(iter);
quicklist* ql = GetQL(res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index);
if (!iter)
return OpStatus::KEY_NOTFOUND;

quicklistNext(iter, &entry);

if (entry.value) {
str.assign(reinterpret_cast<char*>(entry.value), entry.sz);
} else {
str = absl::StrCat(entry.longval);
}
quicklistReleaseIterator(iter);
}
return str;
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_SET:
if (compact_enc == kEncodingIntSet)
return RDB_TYPE_SET_INTSET;
else if (compact_enc == kEncodingStrMap || compact_enc == kEncodingStrMap2) {
else if (compact_enc == kEncodingStrMap2) {
if (((StringSet*)pv.RObjPtr())->ExpirationUsed())
return RDB_TYPE_SET_WITH_EXPIRY;
else
Expand Down
2 changes: 0 additions & 2 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/transaction.h"

ABSL_DECLARE_FLAG(bool, use_set2);

namespace dfly {

using namespace facade;
Expand Down
Loading