Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ size_t Connection::MessageHandle::UsedMemory() const {
}

bool Connection::MessageHandle::IsReplying() const {
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
return IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
holds_alternative<PipelineMessagePtr>(handle) ||
(holds_alternative<MCPipelineMessagePtr>(handle) &&
!get<MCPipelineMessagePtr>(handle)->cmd.no_reply);
}
Expand Down Expand Up @@ -1606,7 +1607,8 @@ void Connection::SendAsync(MessageHandle msg) {
stats_->dispatch_queue_subscriber_bytes += used_mem;
}

if (msg.IsPipelineMsg()) {
// Squashing is only applied to redis commands
if (std::holds_alternative<PipelineMessagePtr>(msg.handle)) {
pending_pipeline_cmd_cnt_++;
}

Expand Down Expand Up @@ -1636,11 +1638,13 @@ void Connection::RecycleMessage(MessageHandle msg) {
stats_->dispatch_queue_subscriber_bytes -= used_mem;
}

// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
if (msg.IsPipelineMsg()) {
++stats_->pipelined_cmd_cnt;
stats_->pipelined_cmd_latency += (ProactorBase::GetMonotonicTimeNs() - msg.dispatch_ts) / 1000;
}

// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
pending_pipeline_cmd_cnt_--;
if (stats_->pipeline_cmd_cache_bytes < queue_backpressure_->pipeline_cache_limit) {
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
Expand Down
5 changes: 3 additions & 2 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ class Connection : public util::Connection {
}

bool IsPipelineMsg() const {
return std::holds_alternative<PipelineMessagePtr>(handle);
return std::holds_alternative<PipelineMessagePtr>(handle) ||
std::holds_alternative<MCPipelineMessagePtr>(handle);
}

bool IsPubMsg() const {
Expand Down Expand Up @@ -404,7 +405,7 @@ class Connection : public util::Connection {
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)

size_t pending_pipeline_cmd_cnt_ = 0; // how many queued async commands in dispatch_q
size_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q

io::IoBuf io_buf_; // used in io loop and parsers
std::unique_ptr<RedisParser> redis_parser_;
Expand Down
6 changes: 5 additions & 1 deletion tests/dragonfly/pymemcached_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pymemcache.client.base import Client as MCClient
from . import dfly_args
from redis import Redis
from .instance import DflyInstance
import socket
import random
Expand Down Expand Up @@ -45,7 +46,7 @@ def test_basic(memcached_client: MCClient):


@dfly_args(DEFAULT_ARGS)
def test_noreply_pipeline(memcached_client: MCClient):
def test_noreply_pipeline(df_server: DflyInstance, memcached_client: MCClient):
"""
With the noreply option the python client doesn't wait for replies,
so all the commands are pipelined. Assert pipelines work correctly and the
Expand All @@ -62,6 +63,9 @@ def test_noreply_pipeline(memcached_client: MCClient):
# check all commands were executed
assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)}

info = Redis(port=df_server.port).info()
assert info["total_pipelined_commands"] > len(keys) - 5


@dfly_args(DEFAULT_ARGS)
def test_noreply_alternating(memcached_client: MCClient):
Expand Down
Loading