Skip to content

Conversation

theyueli
Copy link
Contributor

@theyueli theyueli commented Dec 9, 2023

This is part two that implements the logic which notifies tracking clients by sending invalidation messages.

Once part 1 is merged, this PR will gets smaller (it includes part 1 currently)

Functional tests have also been included, which includes a fix for #2304

The client tracking state is set by CLIENT TRACKING subcommand as well
as upon client disconnection.
Track the keys of a readonly command by maintaining mapping that maps
keys to the sets of tracking clients.
Send invalidation messages to clients when their tracked keys are
updated.
@theyueli theyueli added the enhancement New feature or request label Dec 9, 2023
@theyueli theyueli self-assigned this Dec 9, 2023
@theyueli theyueli requested a review from adiholden December 9, 2023 00:34
auto& client_set = client_tracking_map_[key];
DVLOG(2) << "Garbage collect clients that are no longer tracking... ";
auto is_closed_or_not_tracking = [](const facade::Connection::WeakRef& p) {
return (p.IsExpired() || (!p.Get()->IsTrackingOn()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this Get here is unsafe. In practice the the other thread can never be so fast to delete the connection faster than reading a variable...

Better do it from cb, as you drop them below either way

DVLOG(2) << "Number of clients left: " << client_set.size();

if (!client_set.empty()) {
auto cb = [key, client_set](unsigned idx, util::ProactorBase*) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use [client_set = std::move(client_set)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


void DbSlice::SendInvalidationTrackingMessage(const std::string_view& key) {
if (client_tracking_map_.find(key) != client_tracking_map_.end()) {
// notify all the clients.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits:
You can pass string_view by value
You can store the it = find(key) to avoid reading again with [key]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines 1392 to 1393
absl::erase_if(client_set, is_closed_or_not_tracking);
DVLOG(2) << "Number of clients left: " << client_set.size();
Copy link
Contributor

@dranikpg dranikpg Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you do this to reduce the number of traversals (because each thread currently iterates over the whole set), but I assume this optimization is mostly not worth it

PerformDeletion(it, shard_owner(), db_ptr.get());
SendInvalidationTrackingMessage(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why dont you make PerformDeletion function of DbSlice? I dont like it that we need to remember to call SendInvalidationTrackingMessage whenever we call PerformDeletion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why dont you make PerformDeletion function of DbSlice? I dont like it that we need to remember to call SendInvalidationTrackingMessage whenever we call PerformDeletion

done, please take another look. I like this change, saved a lot of code duplication.

@@ -116,6 +116,11 @@ class Connection : public util::Connection {
util::fb2::BlockingCounter bc; // Decremented counter when processed
};

struct InvalidationMessage {
std::string_view key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you should use a string here and not string_view.
Who is the owner of this string and how do you know it is alive until you send the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@@ -338,6 +336,12 @@ class DbSlice {
// Track keys for the client represented by the the weak reference to its connection.
void TrackKeys(const facade::Connection::WeakRef&, const ArgSlice&);

// Send invalidation message to the clients that are tracking the change to a key.
void SendInvalidationTrackingMessage(std::string_view key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, fixed.

@@ -1233,6 +1172,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
return current < used_memory_start ? used_memory_start - current : 0;
};

string_view key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

RecordExpiry(cntx.db_index, tmp_key);
}

auto obj_type = it->second.ObjType();
if (doc_del_cb_ && (obj_type == OBJ_JSON || obj_type == OBJ_HASH)) {
if (tmp_key.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can revert the changes in this function

- mock the function for sending invalidation message to avoid test
crash due to lack of real listener in the testing framework.
Copy link
Contributor

@dranikpg dranikpg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testing approach is the same as I had in mind. Please note there are still some unresolved comments from Adi

Comment on lines 238 to 242
// case 7. test multi command
Run({"MGET", "X", "Y", "Z"});
pp_->at(1)->Await([&] { return Run({"MSET", "X", "1", "Y", "2"}); });
EXPECT_EQ(GetInvalidationMessage("IO0", 3).key, "X");
EXPECT_EQ(GetInvalidationMessage("IO0", 4).key, "Y");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make the test a little more sophisticated. Let's try multiple keys on all shards, so that we don't even know the order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even with this test, the order of arrival is different now, btw, is there a convenient test function that checks the completeness disregarding the order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few more keys, how does this look?

Comment on lines 303 to 304
std::vector<string_view> keys{msg.key};
rbuilder->SendStringArr(keys);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string_view[] keys = {msg.key}
SendStringArr(keys)

does the same without allocations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't compile

/home/yli/git/dragonfly-rebase/src/facade/dragonfly_connection.cc:303:21: error: structured binding declaration cannot have type ‘std::string_view’ {aka ‘std::basic_string_view<char>’}
  303 |     std::string_view[] keys = {msg.key};
      |                     ^~
/home/yli/git/dragonfly-rebase/src/facade/dragonfly_connection.cc:303:21: note: type must be cv-qualified ‘auto’ or reference to cv-qualified ‘auto’
/home/yli/git/dragonfly-rebase/src/facade/dragonfly_connection.cc:303:21: error: empty structured binding declaration
/home/yli/git/dragonfly-rebase/src/facade/dragonfly_connection.cc:303:24: error: expected initializer before ‘keys’
  303 |     std::string_view[] keys = {msg.key};
      |                        ^~~~
/home/yli/git/dragonfly-rebase/src/facade/dragonfly_connection.cc:304:29: error: ‘keys’ was not declared in this scope
  304 |     rbuilder->SendStringArr(keys);
      |                             ^~~~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try string_view keys[] = {msg.key};
It thinks you're using structured binding, using the brackets on the left side is C#/Java syntax

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this works well, thanks!

@theyueli
Copy link
Contributor Author

The testing approach is the same as I had in mind. Please note there are still some unresolved comments from Adi

yes, i plan to add more tests now after verifying the mock methods with you guys.

@@ -200,4 +200,52 @@ TEST_F(ServerFamilyTest, ClientPause) {
EXPECT_GT((absl::Now() - start), absl::Milliseconds(50));
}

TEST_F(ServerFamilyTest, ClientTracking) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create different unit test for different test cases.
I think that it is great that we have this in unit tests with mocking but we must add pytest to make sure this command works well to check the functionality without the mocking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine with me too. Currently i see different styles mixed in our code: sometimes all cases went into one function (e.g. ClientPause here, and many other redis commands' tests), sometimes separated smaller cases (e.g. SlowLog here)

@theyueli
Copy link
Contributor Author

all the comments have been addressed. Please take another look. @dranikpg @adiholden @chakaz

@@ -1049,18 +1051,15 @@ DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it)
return {it, expire_it};

string tmp_key_buf;
string_view tmp_key;
string_view tmp_key = it->first.GetSlice(&tmp_key_buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert the changes here to be as before. you did not changed this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

listener->TraverseConnections(cb);
}
}

void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction);
Drakarys(cntx->transaction, cntx->transaction->GetDbIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should clear client_tracking_map_ in flush

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed as Drakarys will finally end up calling PerformDeletion for each key, which will get the tracking table removed at the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FushDb calls PerformDeletion only for external keys (ssd).
If we would have called PerformDeletion for all keys in flushdb the client would get the invalidation message twice, one for all keys and one for each key in the database.
If we will have client tracking with tiering I believe the behaviour of flushall will be wrong. This is why I wrote above to get the invalidation message out of PerformDeletion that should just free the data and update table counters

Copy link
Contributor Author

@theyueli theyueli Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see what you meant, I just added another commit which clear client_tracking_map in FlushDB of DbSlice:

287c3f5

This shall fix both problems:

  1. FlushDb and FlushAll of ServerFamily will end up clearing the client tracking map
  2. For SSD tiering, since the map has been cleared before PerformDeletion gets called, notification will not be sent out twice.

SendInvalidationTrackingMessage(key);
}

void DbSlice::PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing this more times and reviewing other PR in this file I now understand that actually PerformDeletion should not be a member of DbSlice as I asked before. Rather we should have in DbSlice and function DeleteInternal which will write to journal, do the doc_del_cb_ call, SendInvalidationTrackingMessage
This function will be called from all the flows except the flush flows in which we dont need to do this changes and there we will call PerformDeletion which will be static function.

I believe this will fix several bugs we have that we forget to call the doc_del_cb_ and the journal change writes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of refactoring is out of the scope for this PR. We will do it in another one once this gets merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the refactoring can be done in a seprate PR. But as I wrote in another comment I believe that sending Invalidation message from PerformDeletion is incorrect for the flow of flush all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, please see if my reply above will resolve this issue as well.

@adiholden
Copy link
Contributor

@theyueli you are going to commit the pytests to this PR correct?

@theyueli
Copy link
Contributor Author

@theyueli you are going to commit the pytests to this PR correct?

Probably in a separate PR.

@theyueli theyueli merged commit 6905389 into dragonflydb:main Dec 21, 2023
@theyueli theyueli deleted the ct-invalidate branch December 21, 2023 12:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants