Skip to content

Commit 841336c

Browse files
committed
fix: fixes
Signed-off-by: Vladislav Oleshko <[email protected]>
1 parent d920af0 commit 841336c

File tree

5 files changed

+34
-36
lines changed

5 files changed

+34
-36
lines changed

src/server/rdb_load.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2426,9 +2426,9 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
24262426

24272427
// Block, if tiered storage is active, but can't keep up
24282428
while (EngineShard::tlocal()->ShouldThrottleForTiering()) {
2429-
this->blocked_shards_.fetch_add(memory_order_relaxed); // stop adding items to shard queue
2429+
this->blocked_shards_.fetch_add(1, memory_order_relaxed); // stop adding items to shard queue
24302430
ThisFiber::SleepFor(100us);
2431-
this->blocked_shards_.fetch_sub(memory_order_relaxed);
2431+
this->blocked_shards_.fetch_sub(1, memory_order_relaxed);
24322432
}
24332433
};
24342434

@@ -2479,8 +2479,8 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
24792479
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
24802480
}
24812481

2482-
if (auto* ts = es->tiered_storage(); ts && ts->ShouldStash(res.it->second))
2483-
ts->Stash(db_cntx.db_index, item->key, &res.it->second);
2482+
if (auto* ts = es->tiered_storage(); ts)
2483+
ts->TryStash(db_cntx.db_index, item->key, &res.it->second);
24842484
}
24852485

24862486
for (auto* item : ib) {

src/server/string_family.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -664,9 +664,8 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
664664
EngineShard* shard = op_args_.shard;
665665

666666
// Currently we always try to offload, but Stash may ignore it, if disk I/O is overloaded.
667-
if (auto* ts = shard->tiered_storage(); ts && ts->ShouldStash(*pv)) {
668-
ts->Stash(op_args_.db_cntx.db_index, key, pv);
669-
}
667+
if (auto* ts = shard->tiered_storage(); ts)
668+
ts->TryStash(op_args_.db_cntx.db_index, key, pv);
670669

671670
if (manual_journal_ && op_args_.shard->journal()) {
672671
RecordJournal(params, key, value);

src/server/tiered_storage.cc

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,15 @@ template util::fb2::Future<size_t> TieredStorage::Modify(DbIndex dbid, std::stri
271271
const PrimeValue& value,
272272
std::function<size_t(std::string*)> modf);
273273

274-
void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
275-
CHECK(!value->IsExternal() && !value->HasIoPending());
274+
bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
275+
if (!ShouldStash(*value))
276+
return false;
276277

277278
// TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
278279
// with a lot of underutilized disk space.
279280
if (op_manager_->GetStats().pending_stash_cnt >= write_depth_limit_) {
280281
++stats_.stash_overflow_cnt;
281-
return;
282+
return false;
282283
}
283284

284285
string buf;
@@ -298,7 +299,10 @@ void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
298299
if (ec) {
299300
LOG(ERROR) << "Stash failed immediately" << ec.message();
300301
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
302+
return false;
301303
}
304+
305+
return true;
302306
}
303307

304308
void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
@@ -318,10 +322,6 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue*
318322
value->SetIoPending(false);
319323
}
320324

321-
bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
322-
return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize;
323-
}
324-
325325
float TieredStorage::WriteDepthUsage() const {
326326
return 1.0f * op_manager_->GetStats().pending_stash_cnt / write_depth_limit_;
327327
}
@@ -364,30 +364,26 @@ void TieredStorage::RunOffloading(DbIndex dbid) {
364364
if (SliceSnapshot::IsSnaphotInProgress())
365365
return;
366366

367-
PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
368-
int stash_limit = write_depth_limit_ - op_manager_->GetStats().pending_stash_cnt;
369-
if (stash_limit <= 0)
370-
return;
371-
372-
std::string tmp;
373-
auto cb = [this, dbid, &tmp, &stash_limit](PrimeIterator it) {
374-
if (it->second.HasIoPending() || it->second.IsExternal())
375-
return;
376-
377-
if (ShouldStash(it->second)) {
378-
Stash(dbid, it->first.GetSlice(&tmp), &it->second);
379-
stash_limit--;
380-
}
367+
auto cb = [this, dbid, tmp = std::string{}](PrimeIterator it) mutable {
368+
TryStash(dbid, it->first.GetSlice(&tmp), &it->second);
381369
};
382370

371+
PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime;
383372
PrimeTable::Cursor start_cursor{};
384373

385374
// Loop while we haven't traversed all entries or reached our stash io device limit.
386375
// Keep number of iterations below resonable limit to keep datastore always responsive
387376
size_t iterations = 0;
388377
do {
378+
if (op_manager_->GetStats().pending_stash_cnt >= write_depth_limit_)
379+
break;
389380
offloading_cursor_ = table.TraverseBySegmentOrder(offloading_cursor_, cb);
390-
} while (offloading_cursor_ != start_cursor && stash_limit > 0 && iterations++ < 500);
381+
} while (offloading_cursor_ != start_cursor && iterations++ < 500);
382+
}
383+
384+
bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
385+
return !pv.IsExternal() && !pv.HasIoPending() && pv.ObjType() == OBJ_STRING &&
386+
pv.Size() >= kMinValueSize;
391387
}
392388

393389
} // namespace dfly

src/server/tiered_storage.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,15 @@ class TieredStorage {
5656
std::function<T(std::string*)> modf);
5757

5858
// Stash value. Sets IO_PENDING flag and unsets it on error or when finished
59-
void Stash(DbIndex dbid, std::string_view key, PrimeValue* value);
59+
// Returns true if item was scheduled for stashing.
60+
bool TryStash(DbIndex dbid, std::string_view key, PrimeValue* value);
6061

6162
// Delete value, must be offloaded (external type)
6263
void Delete(DbIndex dbid, PrimeValue* value);
6364

6465
// Cancel pending stash for value, must have IO_PENDING flag set
6566
void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value);
6667

67-
// Returns if a value should be stashed
68-
bool ShouldStash(const PrimeValue& pv) const;
69-
7068
// Percentage (0-1) of currently used storage_write_depth for ongoing stashes
7169
float WriteDepthUsage() const;
7270

@@ -75,6 +73,10 @@ class TieredStorage {
7573
// Run offloading loop until i/o device is loaded or all entries were traversed
7674
void RunOffloading(DbIndex dbid);
7775

76+
private:
77+
// Returns if a value should be stashed
78+
bool ShouldStash(const PrimeValue& pv) const;
79+
7880
private:
7981
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
8082

@@ -128,7 +130,7 @@ class TieredStorage {
128130
return {};
129131
}
130132

131-
void Stash(DbIndex dbid, std::string_view key, PrimeValue* value) {
133+
void TryStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
132134
}
133135

134136
void Delete(PrimeValue* value) {
@@ -137,7 +139,7 @@ class TieredStorage {
137139
void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
138140
}
139141

140-
bool ShouldStash(const PrimeValue& pv) {
142+
bool ShouldStash(const PrimeValue& pv) const {
141143
return false;
142144
}
143145

tests/dragonfly/snapshot_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ async def test_tiered_entries(async_client: aioredis.Redis):
469469
"dbfilename": "tiered-entries",
470470
"tiered_prefix": "tiering-test-backing",
471471
"tiered_offload_threshold": "0.5", # ask to keep below 0.5 * 1G
472-
"tiered_storage_write_depth": 5000,
472+
"tiered_storage_write_depth": 50,
473473
}
474474
)
475475
async def test_tiered_entries_throttle(async_client: aioredis.Redis):
@@ -495,6 +495,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis):
495495

496496
while not load_task.done():
497497
info = await async_client.info("ALL")
498+
# print(info["used_memory_human"], info["used_memory_rss_human"])
498499
assert info["used_memory"] < 600e6 # less than 600mb,
499500
await asyncio.sleep(0.05)
500501

0 commit comments

Comments
 (0)