Skip to content

Commit e462fc0

Browse files
authored
fix(server): use compression for non big values (#4331)
* fix server: use compression for non big values --------- Signed-off-by: adi_holden <[email protected]>
1 parent 904d21d commit e462fc0

File tree

13 files changed

+91
-108
lines changed

13 files changed

+91
-108
lines changed

src/core/interpreter.cc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -386,13 +386,26 @@ int DragonflyRandstrCommand(lua_State* state) {
386386
lua_remove(state, 1);
387387

388388
std::string buf(dsize, ' ');
389+
389390
auto push_str = [dsize, state, &buf]() {
390391
static const char alphanum[] =
391392
"0123456789"
392393
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
393394
"abcdefghijklmnopqrstuvwxyz";
394-
for (int i = 0; i < dsize; ++i)
395-
buf[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
395+
396+
static const char pattern[] = "DRAGONFLY";
397+
constexpr int pattern_len = sizeof(pattern) - 1;
398+
constexpr int pattern_interval = 53;
399+
for (int i = 0; i < dsize; ++i) {
400+
if (i % pattern_interval == 0 && i + pattern_len <= dsize) {
401+
// Insert the repeating pattern for better compression of random string.
402+
buf.replace(i, pattern_len, pattern, pattern_len);
403+
i += pattern_len - 1; // Adjust index to skip the pattern
404+
} else {
405+
// Fill the rest with semi-random characters for variation
406+
buf[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
407+
}
408+
}
396409
lua_pushlstring(state, buf.c_str(), buf.length());
397410
};
398411

src/server/rdb_save.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,7 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) {
162162
}
163163

164164
dfly::CompressionMode GetDefaultCompressionMode() {
165-
const auto flag = absl::GetFlag(FLAGS_compression_mode);
166-
if (ServerState::tlocal()->serialization_max_chunk_size == 0) {
167-
return flag;
168-
}
169-
170-
static bool once = flag != dfly::CompressionMode::NONE;
171-
LOG_IF(WARNING, once) << "Setting CompressionMode to NONE because big value serialization is on";
172-
once = false;
173-
return dfly::CompressionMode::NONE;
165+
return absl::GetFlag(FLAGS_compression_mode);
174166
}
175167

176168
uint8_t RdbObjectType(const PrimeValue& pv) {
@@ -944,6 +936,7 @@ io::Bytes SerializerBase::PrepareFlush(SerializerBase::FlushState flush_state) {
944936
return mem_buf_.InputBuffer();
945937

946938
bool is_last_chunk = flush_state == FlushState::kFlushEndEntry;
939+
VLOG(2) << "PrepareFlush:" << is_last_chunk << " " << number_of_chunks_;
947940
if (is_last_chunk && number_of_chunks_ == 0) {
948941
if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD ||
949942
compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) {
@@ -1603,6 +1596,7 @@ void SerializerBase::CompressBlob() {
16031596
compression_stats_.emplace(CompressionStats{});
16041597
}
16051598
Bytes blob_to_compress = mem_buf_.InputBuffer();
1599+
VLOG(2) << "CompressBlob size " << blob_to_compress.size();
16061600
size_t blob_size = blob_to_compress.size();
16071601
if (blob_size < kMinStrSizeToCompress) {
16081602
++compression_stats_->small_str_count;
@@ -1644,6 +1638,8 @@ void SerializerBase::CompressBlob() {
16441638
memcpy(dest.data(), compressed_blob.data(), compressed_blob.length());
16451639
mem_buf_.CommitWrite(compressed_blob.length());
16461640
++compression_stats_->compressed_blobs;
1641+
auto& stats = ServerState::tlocal()->stats;
1642+
++stats.compressed_blobs;
16471643
}
16481644

16491645
size_t RdbSerializer::GetTempBufferSize() const {

src/server/rdb_test.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
172172
RespExpr resp = Run({"save", "df"});
173173
ASSERT_EQ(resp, "OK");
174174

175+
if (mode == CompressionMode::MULTI_ENTRY_ZSTD || mode == CompressionMode::MULTI_ENTRY_LZ4) {
176+
EXPECT_GE(GetMetrics().coordinator_stats.compressed_blobs, 1);
177+
}
178+
175179
auto save_info = service_->server_family().GetLastSaveInfo();
176180
resp = Run({"dfly", "load", save_info.file_name});
177181
ASSERT_EQ(resp, "OK");

src/server/server_family.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
23742374
append("rdb_save_usec", m.coordinator_stats.rdb_save_usec);
23752375
append("rdb_save_count", m.coordinator_stats.rdb_save_count);
23762376
append("big_value_preemptions", m.coordinator_stats.big_value_preemptions);
2377+
append("compressed_blobs", m.coordinator_stats.compressed_blobs);
23772378
append("instantaneous_input_kbps", -1);
23782379
append("instantaneous_output_kbps", -1);
23792380
append("rejected_connections", -1);

src/server/server_state.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
2727
}
2828

2929
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
30-
static_assert(sizeof(Stats) == 18 * 8, "Stats size mismatch");
30+
static_assert(sizeof(Stats) == 19 * 8, "Stats size mismatch");
3131

3232
#define ADD(x) this->x += (other.x)
3333

@@ -51,6 +51,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
5151
ADD(rdb_save_count);
5252

5353
ADD(big_value_preemptions);
54+
ADD(compressed_blobs);
5455

5556
ADD(oom_error_cmd_cnt);
5657

src/server/server_state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class ServerState { // public struct - to allow initialization.
123123
uint64_t rdb_save_count = 0;
124124

125125
uint64_t big_value_preemptions = 0;
126+
uint64_t compressed_blobs = 0;
126127

127128
// Number of times we rejected command dispatch due to OOM condition.
128129
uint64_t oom_error_cmd_cnt = 0;

src/server/snapshot.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ bool SliceSnapshot::PushSerialized(bool force) {
364364
return false;
365365

366366
// Flush any of the leftovers to avoid interleavings
367-
size_t serialized = FlushSerialized(FlushState::kFlushMidEntry);
367+
size_t serialized = FlushSerialized(FlushState::kFlushEndEntry);
368368

369369
if (!delayed_entries_.empty()) {
370370
// Async bucket serialization might have accumulated some delayed values.
@@ -377,7 +377,7 @@ bool SliceSnapshot::PushSerialized(bool force) {
377377
} while (!delayed_entries_.empty());
378378

379379
// blocking point.
380-
serialized += FlushSerialized(FlushState::kFlushMidEntry);
380+
serialized += FlushSerialized(FlushState::kFlushEndEntry);
381381
}
382382
return serialized > 0;
383383
}

tests/dragonfly/instance.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
416416
args.setdefault("noversion_check", None)
417417
# MacOs does not set it automatically, so we need to set it manually
418418
args.setdefault("maxmemory", "8G")
419-
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1"
419+
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1"
420420
args.setdefault("vmodule", vmod)
421421
args.setdefault("jsonpathv2")
422422

@@ -426,7 +426,7 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
426426
args.setdefault("log_dir", self.params.log_dir)
427427

428428
if version >= 1.21 and "serialization_max_chunk_size" not in args:
429-
args.setdefault("serialization_max_chunk_size", 16384)
429+
args.setdefault("serialization_max_chunk_size", 300000)
430430

431431
for k, v in args.items():
432432
args[k] = v.format(**self.params.env) if isinstance(v, str) else v

tests/dragonfly/replication_test.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,16 @@ async def wait_for_replicas_state(*clients, state="online", node_role="slave", t
4949
# Quick general test that replication is working
5050
(1, 3 * [1], dict(key_target=1_000), 500),
5151
# A lot of huge values
52-
(2, 2 * [1], dict(key_target=1_000, huge_value_percentage=2), 500),
52+
(2, 2 * [1], dict(key_target=1_000, huge_value_target=30), 500),
5353
(4, [4, 4], dict(key_target=10_000), 1_000),
5454
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT),
5555
# Skewed tests with different thread ratio
5656
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW),
5757
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW),
5858
# Everything is big because data size is 10k
59-
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW),
59+
pytest.param(
60+
2, [2], dict(key_target=1_000, data_size=10_000, huge_value_target=0), 100, marks=M_SLOW
61+
),
6062
# Stress test
6163
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
6264
],
@@ -128,22 +130,22 @@ async def check():
128130

129131
info = await c_master.info()
130132
preemptions = info["big_value_preemptions"]
131-
key_target = seeder_config["key_target"]
132-
# Rough estimate
133-
estimated_preemptions = key_target * (0.01)
134-
assert preemptions > estimated_preemptions
133+
total_buckets = info["num_buckets"]
134+
compressed_blobs = info["compressed_blobs"]
135+
logging.debug(
136+
f"Compressed blobs {compressed_blobs} .Buckets {total_buckets}. Preemptions {preemptions}"
137+
)
135138

139+
assert preemptions >= seeder.huge_value_target * 0.5
140+
assert compressed_blobs > 0
136141
# Because data size could be 10k and for that case there will be almost a preemption
137142
# per bucket.
138-
if "data_size" not in seeder_config.keys():
139-
total_buckets = info["num_buckets"]
143+
if seeder.data_size < 1000:
140144
# We care that we preempt less times than the total buckets such that we can be
141145
# sure that we test both flows (with and without preemptions). Preemptions on 30%
142146
# of buckets seems like a big number but that depends on a few parameters like
143147
# the size of the hug value and the serialization max chunk size. For the test cases here,
144148
# it's usually close to 10% but there are some that are close to 30.
145-
total_buckets = info["num_buckets"]
146-
logging.debug(f"Buckets {total_buckets}. Preemptions {preemptions}")
147149
assert preemptions <= (total_buckets * 0.3)
148150

149151

@@ -2282,7 +2284,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,
22822284
c_master = master.client()
22832285
c_replica = replica.client()
22842286

2285-
await c_master.execute_command("debug", "populate", "200000", "foo", "5000")
2287+
await c_master.execute_command("debug", "populate", "200000", "foo", "5000", "RAND")
22862288
seeder = df_seeder_factory.create(port=master.port)
22872289
seeder_task = asyncio.create_task(seeder.run())
22882290

@@ -2310,6 +2312,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,
23102312
await assert_replica_reconnections(replica, 0)
23112313

23122314

2315+
@dfly_args({"proactor_threads": 1})
23132316
async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
23142317
# disconnect after 1 second of being blocked
23152318
master = df_factory.create(replication_timeout=1000)
@@ -2320,7 +2323,7 @@ async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
23202323
c_master = master.client()
23212324
c_replica = replica.client()
23222325

2323-
await c_master.execute_command("debug", "populate", "200000", "foo", "500")
2326+
await c_master.execute_command("debug", "populate", "200000", "foo", "500", "RAND")
23242327
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
23252328

23262329
@assert_eventually
@@ -2624,7 +2627,7 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry(
26242627
c_master = master.client()
26252628
c_replica = replica.client()
26262629

2627-
await c_master.execute_command("debug", "populate", "100000", "foo", "5000")
2630+
await c_master.execute_command("debug", "populate", "100000", "foo", "5000", "RAND")
26282631

26292632
c_master = master.client()
26302633
c_replica = replica.client()

tests/dragonfly/seeder/__init__.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,8 @@ def __init__(
138138
data_size=100,
139139
collection_size=None,
140140
types: typing.Optional[typing.List[str]] = None,
141-
huge_value_percentage=0,
142-
huge_value_size=10000,
143-
# 2 huge entries per container/key as default
144-
huge_value_csize=2,
141+
huge_value_target=5,
142+
huge_value_size=100000,
145143
):
146144
SeederBase.__init__(self, types)
147145
self.key_target = key_target
@@ -151,9 +149,8 @@ def __init__(
151149
else:
152150
self.collection_size = collection_size
153151

154-
self.huge_value_percentage = huge_value_percentage
152+
self.huge_value_target = huge_value_target
155153
self.huge_value_size = huge_value_size
156-
self.huge_value_csize = huge_value_csize
157154

158155
self.units = [
159156
Seeder.Unit(
@@ -175,9 +172,8 @@ async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=No
175172
target_deviation if target_deviation is not None else -1,
176173
self.data_size,
177174
self.collection_size,
178-
self.huge_value_percentage,
175+
self.huge_value_target / len(self.units),
179176
self.huge_value_size,
180-
self.huge_value_csize,
181177
]
182178

183179
sha = await client.script_load(Seeder._load_script("generate"))
@@ -211,11 +207,10 @@ async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey,
211207
result = await client.evalsha(sha, 0, *args)
212208
result = result.split()
213209
unit.counter = int(result[0])
214-
huge_keys = int(result[1])
215-
huge_entries = int(result[2])
210+
huge_entries = int(result[1])
216211

217212
msg = f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}"
218-
if huge_keys > 0:
219-
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total huge entries {huge_entries}."
213+
if huge_entries > 0:
214+
msg = f"{msg}. Total huge entries {huge_entries} added."
220215

221216
logging.debug(msg)

0 commit comments

Comments
 (0)