Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions src/core/dash.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ class DashTable<_Key, _Value, Policy>::Iterator {
public:
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
using IteratorPairType =
std::conditional_t<IsConst, detail::IteratorPair<const Key_t, const Value_t>,
detail::IteratorPair<Key_t, Value_t>>;

// Copy constructor from iterator to const_iterator.
template <bool TIsConst = IsConst, bool TIsSingleB,
Expand Down Expand Up @@ -372,16 +375,9 @@ class DashTable<_Key, _Value, Policy>::Iterator {
return *this;
}

detail::IteratorPair<Key_t, Value_t> operator->() {
IteratorPairType operator->() const {
auto* seg = owner_->segment_[seg_id_];
return detail::IteratorPair<Key_t, Value_t>{seg->Key(bucket_id_, slot_id_),
seg->Value(bucket_id_, slot_id_)};
}

const detail::IteratorPair<Key_t, Value_t> operator->() const {
auto* seg = owner_->segment_[seg_id_];
return detail::IteratorPair<Key_t, Value_t>{seg->Key(bucket_id_, slot_id_),
seg->Value(bucket_id_, slot_id_)};
return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)};
}

// Make it self-contained. Does not need container::end().
Expand Down
10 changes: 6 additions & 4 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class ElementAccess {
};

std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
auto res = shard->db_slice().Find(context_, key_, OBJ_STRING);
auto res = shard->db_slice().FindReadOnly(context_, key_, OBJ_STRING);
if (res.status() == OpStatus::WRONG_TYPE) {
return {};
}
Expand Down Expand Up @@ -458,7 +458,8 @@ OpResult<std::string> RunBitOpNot(const OpArgs& op_args, ArgSlice keys) {
EngineShard* es = op_args.shard;
// if we found the value, just return, if not found then skip, otherwise report an error
auto key = keys.front();
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
OpResult<PrimeConstIterator> find_res =
es->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
if (find_res) {
return GetString(find_res.value()->second, es);
} else {
Expand All @@ -479,7 +480,8 @@ OpResult<std::string> RunBitOpOnShard(std::string_view op, const OpArgs& op_args

// collect all the value for this shard
for (auto& key : keys) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_STRING);
OpResult<PrimeConstIterator> find_res =
es->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
if (find_res) {
values.emplace_back(GetString(find_res.value()->second, es));
} else {
Expand Down Expand Up @@ -1261,7 +1263,7 @@ OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui

OpResult<std::string> ReadValue(const DbContext& context, std::string_view key,
EngineShard* shard) {
OpResult<PrimeIterator> it_res = shard->db_slice().Find(context, key, OBJ_STRING);
OpResult<PrimeConstIterator> it_res = shard->db_slice().FindReadOnly(context, key, OBJ_STRING);
if (!it_res.ok()) {
return it_res.status();
}
Expand Down
78 changes: 78 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,83 @@ auto DbSlice::Find(const Context& cntx, string_view key, unsigned req_obj_type)
return it;
}

DbSlice::AutoUpdater::AutoUpdater() {
}

DbSlice::AutoUpdater::AutoUpdater(AutoUpdater&& o) {
*this = std::move(o);
}

DbSlice::AutoUpdater& DbSlice::AutoUpdater::operator=(AutoUpdater&& o) {
Run();
fields_ = o.fields_;
o.Cancel();
return *this;
}

DbSlice::AutoUpdater::~AutoUpdater() {
Run();
}

void DbSlice::AutoUpdater::Run() {
if (fields_.action == DestructorAction::kDoNothing) {
return;
}

// Check that AutoUpdater does not run after a key was removed.
// If this CHECK() failed for you, it probably means that you deleted a key while having an auto
// updater in scope. You'll probably want to call Run() (or Cancel() - but be careful).
DCHECK(IsValid(fields_.db_slice->db_arr_[fields_.db_ind]->prime.Find(fields_.key)))
<< "Key was removed before PostUpdate() - this is a bug!";

// Make sure that the DB has not changed in size since this object was created.
// Adding or removing elements from the DB may invalidate iterators.
CHECK_EQ(fields_.db_size, fields_.db_slice->DbSize(fields_.db_ind))
<< "Attempting to run post-update after DB was modified";

CHECK_EQ(fields_.deletion_count, fields_.db_slice->deletion_count_)
<< "Attempting to run post-update after a deletion was issued";

DCHECK(fields_.action == DestructorAction::kRun);
CHECK_NE(fields_.db_slice, nullptr);

fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.key_existed);
Cancel(); // Reset to not run again
}

void DbSlice::AutoUpdater::Cancel() {
this->fields_ = {};
}

DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
DCHECK(fields_.action == DestructorAction::kRun);
fields_.db_slice->PreUpdate(fields_.db_ind, fields_.it);
fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind);
fields_.deletion_count = fields_.db_slice->deletion_count_;
}

OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string_view key,
unsigned req_obj_type) {
// TODO(#2252): Call an internal find version that does not handle post updates
auto it = FindExt(cntx, key).first;

if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;

if (it->second.ObjType() != req_obj_type) {
return OpStatus::WRONG_TYPE;
}

return {
{it, AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})}};
}

auto DbSlice::FindReadOnly(const Context& cntx, string_view key, unsigned req_obj_type) const
-> OpResult<PrimeConstIterator> {
auto res = Find(cntx, key, req_obj_type);
return res.ok() ? OpResult<PrimeConstIterator>(res.value()) : res.status();
}

pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(const Context& cntx, string_view key) const {
pair<PrimeIterator, ExpireIterator> res;

Expand Down Expand Up @@ -555,6 +632,7 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
}

PerformDeletion(it, shard_owner(), db.get());
deletion_count_++;

return true;
}
Expand Down
51 changes: 51 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,44 @@ class DbSlice {
void operator=(const DbSlice&) = delete;

public:
class AutoUpdater {
public:
AutoUpdater();
AutoUpdater(AutoUpdater&& o);
AutoUpdater& operator=(AutoUpdater&& o);
~AutoUpdater();

void Run();
void Cancel();

private:
enum class DestructorAction {
kDoNothing,
kRun,
};

// Wrap members in a struct to auto generate operator=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason you actually need this field is to clear it in the move constructor instead of dropping in an undefined state 🤔 Not handy, but I don't know any other solution instead of having a move-clearable flag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by a move-clearable flag?
(I don't see any issue with this implementation though..)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an interesting thought I had... So this example

struct PostUpdater {
  PostUpdater& operator=(PostUpdater&& o) = default;

  ~PostUpdater() { cout << a << " and " << uint64_t(b.get()) << endl; }

  int a;
  unique_ptr<int> b;
};

int main() {
  PostUpdater p1(1, make_unique<int>(11));
  PostUpdater p2(2, nullptr);

  p2 = std::move(p1);
}

will print

1 and 94585575722672
1 and 0

because moving an int is just copying it.... A move-clearable flag would behave like unique_ptr (but without the heap allocation). This way to don't need the nested struct, but can rely on default move constructor / operator to store "default" values in the moved-from object that can then be checked inside the destructor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Well, it's simpler currently :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Well, it's simpler currently :)

Not arguing, just if we or absl had this type, it'd be one line

struct Fields {
DestructorAction action = DestructorAction::kDoNothing;

DbSlice* db_slice = nullptr;
DbIndex db_ind = 0;
PrimeIterator it;
std::string_view key;
bool key_existed = false;

size_t db_size = 0;
size_t deletion_count = 0;
// TODO(#2252): Add heap size here, and only update memory in d'tor
};

AutoUpdater(const Fields& fields);

friend class DbSlice;

Fields fields_ = {};
};

struct Stats {
// DbStats db;
std::vector<DbStats> db_stats;
Expand Down Expand Up @@ -147,9 +185,20 @@ class DbSlice {
return ExpirePeriod{time_ms - expire_base_[0]};
}

// TODO(#2252): Remove this in favor of FindMutable() / FindReadOnly()
OpResult<PrimeIterator> Find(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;

struct ItAndUpdater {
PrimeIterator it;
AutoUpdater post_updater;
};
OpResult<ItAndUpdater> FindMutable(const Context& cntx, std::string_view key,
unsigned req_obj_type);

OpResult<PrimeConstIterator> FindReadOnly(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;

// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<PrimeIterator, ExpireIterator> FindExt(const Context& cntx, std::string_view key) const;

Expand Down Expand Up @@ -259,6 +308,7 @@ class DbSlice {
size_t DbSize(DbIndex db_ind) const;

// Callback functions called upon writing to the existing key.
// TODO(#2252): Remove these (or make them private)
void PreUpdate(DbIndex db_ind, PrimeIterator it);
void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key,
bool existing_entry = true);
Expand Down Expand Up @@ -371,6 +421,7 @@ class DbSlice {
ssize_t memory_budget_ = SSIZE_MAX;
size_t bytes_per_object_ = 0;
size_t soft_budget_limit_ = 0;
size_t deletion_count_ = 0;

mutable SliceEvents events_; // we may change this even for const operations.

Expand Down
6 changes: 3 additions & 3 deletions src/server/hll_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void PFAdd(CmdArgList args, ConnectionContext* cntx) {
OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();

OpResult<PrimeIterator> it = db_slice.Find(op_args.db_cntx, key, OBJ_STRING);
OpResult<PrimeConstIterator> it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
if (it.ok()) {
string hll;
string_view hll_view = it.value()->second.GetSlice(&hll);
Expand Down Expand Up @@ -150,8 +150,8 @@ OpResult<vector<string>> ReadValues(const OpArgs& op_args, ArgSlice keys) {
try {
vector<string> values;
for (size_t i = 0; i < keys.size(); ++i) {
OpResult<PrimeIterator> it =
op_args.shard->db_slice().Find(op_args.db_cntx, keys[i], OBJ_STRING);
OpResult<PrimeConstIterator> it =
op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, keys[i], OBJ_STRING);
if (it.ok()) {
string hll;
it.value()->second.GetString(&hll);
Expand Down
38 changes: 20 additions & 18 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,18 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
* of returning no or very few elements. (taken from redis code at db.c line 904 */
constexpr size_t INTERATION_FACTOR = 10;

OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH);
OpResult<PrimeConstIterator> find_res =
op_args.shard->db_slice().FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (!find_res) {
DVLOG(1) << "ScanOp: find failed: " << find_res << ", baling out";
return find_res.status();
}

PrimeIterator it = find_res.value();
PrimeConstIterator it = find_res.value();
StringVec res;
uint32_t count = scan_op.limit * HASH_TABLE_ENTRIES_FACTOR;
PrimeValue& pv = it->second;
const PrimeValue& pv = it->second;

if (pv.Encoding() == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
Expand Down Expand Up @@ -342,15 +343,14 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList valu
DCHECK(!values.empty());

auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH);

if (!it_res)
return it_res.status();

op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, (*it_res)->second);
db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res);
PrimeValue& pv = it_res->it->second;
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);

PrimeValue& pv = (*it_res)->second;
unsigned deleted = 0;
bool key_remove = false;
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
Expand Down Expand Up @@ -387,7 +387,7 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList valu
}
}

db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key);
it_res->post_updater.Run();

if (!key_remove)
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
Expand All @@ -396,7 +396,7 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList valu
if (enc == kEncodingListPack) {
stats->listpack_blob_cnt--;
}
db_slice.Del(op_args.db_cntx.db_index, *it_res);
db_slice.Del(op_args.db_cntx.db_index, it_res->it);
} else if (enc == kEncodingListPack) {
stats->listpack_bytes += lpBytes((uint8_t*)pv.RObjPtr());
}
Expand All @@ -408,12 +408,12 @@ OpResult<vector<OptStr>> OpHMGet(const OpArgs& op_args, std::string_view key, Cm
DCHECK(!fields.empty());

auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (!it_res)
return it_res.status();

PrimeValue& pv = (*it_res)->second;
const PrimeValue& pv = (*it_res)->second;

std::vector<OptStr> result(fields.size());

Expand Down Expand Up @@ -466,7 +466,7 @@ OpResult<vector<OptStr>> OpHMGet(const OpArgs& op_args, std::string_view key, Cm

OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (it_res) {
return HMapLength(op_args.db_cntx, (*it_res)->second);
Expand All @@ -479,7 +479,7 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {

OpResult<int> OpExist(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
Expand All @@ -503,7 +503,7 @@ OpResult<int> OpExist(const OpArgs& op_args, string_view key, string_view field)

OpResult<string> OpGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();

Expand Down Expand Up @@ -531,7 +531,7 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key, string_view field

OpResult<vector<string>> OpGetAll(const OpArgs& op_args, string_view key, uint8_t mask) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);
if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return vector<string>{};
Expand Down Expand Up @@ -582,7 +582,7 @@ OpResult<vector<string>> OpGetAll(const OpArgs& op_args, string_view key, uint8_

OpResult<size_t> OpStrLen(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_HASH);

if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
Expand Down Expand Up @@ -1062,7 +1062,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
auto& db_slice = shard->db_slice();
DbContext db_context = t->GetDbContext();
auto it_res = db_slice.Find(db_context, key, OBJ_HASH);
auto it_res = db_slice.FindReadOnly(db_context, key, OBJ_HASH);

if (!it_res)
return it_res.status();
Expand Down Expand Up @@ -1097,7 +1097,9 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
}

if (string_map->Empty()) {
db_slice.Del(db_context.db_index, *it_res);
auto it_mutable = db_slice.FindMutable(db_context, key, OBJ_HASH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a change in this PR bu I am confused here. This is readonly command, and it can remove key from db.. does this makes sense? we should replicate a del command here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the weirdness of our field expiration feature..
In Redis, an empty set is non-existing. Here, if we discover that all fields have expired, we remove the key. Yes, it could happen in a readonly command :(

We should do it in all commands I guess, or better yet, implement it as a similar mechanism to expiration. Otherwise, an attempt to, say, set a string to what was previously a hash key will fail, despite it being empty.

I'd say this is of low priority though (and also outside the scope here)

it_mutable->post_updater.Run();
db_slice.Del(db_context.db_index, it_mutable->it);
return facade::OpStatus::KEY_NOTFOUND;
}
} else if (pv.Encoding() == kEncodingListPack) {
Expand Down
Loading