Skip to content

Commit c8a98fd

Browse files
authored
chore: small fixes around tiering (#3368)
There are no changes in functionality here. Signed-off-by: Roman Gershman <[email protected]>
1 parent cd1f9d3 commit c8a98fd

File tree

11 files changed

+44
-28
lines changed

11 files changed

+44
-28
lines changed

src/core/compact_object.cc

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -950,21 +950,23 @@ void CompactObj::SetExternal(size_t offset, uint32_t sz) {
950950
u_.ext_ptr.offload.page_index = offset / 4096;
951951
}
952952

953-
void CompactObj::SetCold(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
954-
SetMeta(EXTERNAL_TAG, mask_);
953+
void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
954+
// We copy the mask of the "cooled" referenced object because it contains the encoding info.
955+
SetMeta(EXTERNAL_TAG, record->value.mask_);
956+
955957
u_.ext_ptr.is_cool = 1;
956958
u_.ext_ptr.page_offset = offset % 4096;
957959
u_.ext_ptr.serialized_size = sz;
958-
u_.ext_ptr.cold_record = record;
960+
u_.ext_ptr.cool_record = record;
959961
}
960962

961-
auto CompactObj::GetCold() const -> ColdItem {
963+
auto CompactObj::GetCool() const -> CoolItem {
962964
DCHECK(IsExternal() && u_.ext_ptr.is_cool);
963965

964-
ColdItem res;
966+
CoolItem res;
965967
res.page_offset = u_.ext_ptr.page_offset;
966968
res.serialized_size = u_.ext_ptr.serialized_size;
967-
res.record = u_.ext_ptr.cold_record;
969+
res.record = u_.ext_ptr.cool_record;
968970
return res;
969971
}
970972

@@ -976,9 +978,9 @@ void CompactObj::ImportExternal(const CompactObj& src) {
976978

977979
std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
978980
DCHECK_EQ(EXTERNAL_TAG, taglen_);
979-
DCHECK_EQ(u_.ext_ptr.is_cool, 0);
980-
981-
size_t offset = size_t(u_.ext_ptr.offload.page_index) * 4096 + u_.ext_ptr.page_offset;
981+
auto& ext = u_.ext_ptr;
982+
size_t offset = ext.page_offset;
983+
offset += size_t(ext.is_cool ? ext.cool_record->page_index : ext.offload.page_index) * 4096;
982984
return pair<size_t, size_t>(offset, size_t(u_.ext_ptr.serialized_size));
983985
}
984986

src/core/compact_object.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,14 +331,14 @@ class CompactObj {
331331
}
332332

333333
void SetExternal(size_t offset, uint32_t sz);
334-
void SetCold(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
334+
void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
335335

336-
struct ColdItem {
336+
struct CoolItem {
337337
uint16_t page_offset;
338338
size_t serialized_size;
339339
detail::TieredColdRecord* record;
340340
};
341-
ColdItem GetCold() const;
341+
CoolItem GetCool() const;
342342

343343
void ImportExternal(const CompactObj& src);
344344

@@ -433,14 +433,16 @@ class CompactObj {
433433
uint16_t is_cool : 1;
434434
uint16_t is_reserved : 15;
435435

436+
// We do not have enough space in the common area to store page_index together with
437+
// cool_record pointer. Therefore, we moved this field into TieredColdRecord itself.
436438
struct Offload {
437439
uint32_t page_index;
438440
uint32_t reserved;
439441
};
440442

441443
union {
442444
Offload offload;
443-
detail::TieredColdRecord* cold_record;
445+
detail::TieredColdRecord* cool_record;
444446
};
445447
} __attribute__((packed));
446448

src/core/cool_queue.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ class CoolQueue {
2727

2828
CompactObj Erase(detail::TieredColdRecord* record);
2929

30-
size_t UsedMemory() const;
30+
size_t UsedMemory() const {
31+
return used_memory_;
32+
}
3133

3234
private:
3335
detail::TieredColdRecord* head_ = nullptr;

src/server/common.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ bool ParseDouble(string_view src, double* value) {
257257
#define ADD(x) (x) += o.x
258258

259259
TieredStats& TieredStats::operator+=(const TieredStats& o) {
260-
static_assert(sizeof(TieredStats) == 120);
260+
static_assert(sizeof(TieredStats) == 128);
261261

262262
ADD(total_stashes);
263263
ADD(total_fetches);
@@ -278,6 +278,7 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
278278
ADD(small_bins_entries_cnt);
279279
ADD(small_bins_filling_bytes);
280280
ADD(total_stash_overflows);
281+
ADD(cold_storage_bytes);
281282

282283
return *this;
283284
}

src/server/common.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ struct TieredStats {
8282
uint64_t small_bins_cnt = 0;
8383
uint64_t small_bins_entries_cnt = 0;
8484
size_t small_bins_filling_bytes = 0;
85+
size_t cold_storage_bytes = 0;
8586

8687
TieredStats& operator+=(const TieredStats&);
8788
};

src/server/engine_shard_set.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
419419
});
420420
}
421421

422-
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
422+
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
423423
CHECK(shard_ == nullptr) << pb->GetPoolIndex();
424424

425425
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
@@ -447,7 +447,7 @@ void EngineShard::InitTieredStorage(ProactorBase* pb, size_t max_file_size) {
447447
// TODO: enable tiered storage on non-default namespace
448448
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
449449
auto* shard = EngineShard::tlocal();
450-
shard->tiered_storage_ = make_unique<TieredStorage>(&db_slice, max_file_size);
450+
shard->tiered_storage_ = make_unique<TieredStorage>(max_file_size, &db_slice);
451451
error_code ec = shard->tiered_storage_->Open(backing_prefix);
452452
CHECK(!ec) << ec.message();
453453
}
@@ -888,7 +888,7 @@ void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
888888
size_t max_shard_file_size = GetTieredFileLimit(sz);
889889
pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
890890
if (index < shard_queue_.size()) {
891-
InitThreadLocal(pb, update_db_time, max_shard_file_size);
891+
InitThreadLocal(pb, update_db_time);
892892
}
893893
});
894894

@@ -909,8 +909,8 @@ void EngineShardSet::Shutdown() {
909909
RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); });
910910
}
911911

912-
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
913-
EngineShard::InitThreadLocal(pb, update_db_time, max_file_size);
912+
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
913+
EngineShard::InitThreadLocal(pb, update_db_time);
914914
EngineShard* es = EngineShard::tlocal();
915915
shard_queue_[es->shard_id()] = es->GetFiberQueue();
916916
}

src/server/engine_shard_set.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class EngineShard {
4848

4949
// Sets up a new EngineShard in the thread.
5050
// If update_db_time is true, initializes periodic time update for its db_slice.
51-
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
51+
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
5252

5353
// Must be called after all InitThreadLocal() have finished
5454
void InitTieredStorage(util::ProactorBase* pb, size_t max_file_size);
@@ -342,7 +342,7 @@ class EngineShardSet {
342342
void TEST_EnableCacheMode();
343343

344344
private:
345-
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
345+
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
346346

347347
util::ProactorPool* pp_;
348348
std::vector<TaskQueue*> shard_queue_;

src/server/server_family.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2190,6 +2190,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
21902190
append("tiered_small_bins_cnt", m.tiered_stats.small_bins_cnt);
21912191
append("tiered_small_bins_entries_cnt", m.tiered_stats.small_bins_entries_cnt);
21922192
append("tiered_small_bins_filling_bytes", m.tiered_stats.small_bins_filling_bytes);
2193+
append("tiered_cold_storage_bytes", m.tiered_stats.cold_storage_bytes);
21932194
}
21942195

21952196
if (should_enter("PERSISTENCE", true)) {

src/server/tiered_storage.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "base/logging.h"
1818
#include "server/common.h"
1919
#include "server/db_slice.h"
20+
#include "server/engine_shard_set.h"
2021
#include "server/snapshot.h"
2122
#include "server/table.h"
2223
#include "server/tiering/common.h"
@@ -26,8 +27,8 @@
2627

2728
using namespace facade;
2829

29-
ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 1_MB,
30-
"In bytes. If memory budget on a shard goes blow this limit, tiering stops "
30+
ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
31+
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
3132
"hot-loading values into ram.");
3233

3334
ABSL_FLAG(unsigned, tiered_storage_write_depth, 50,
@@ -250,7 +251,7 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
250251
return false;
251252
}
252253

253-
TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_size)
254+
TieredStorage::TieredStorage(size_t max_size, DbSlice* db_slice)
254255
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
255256
bins_{make_unique<tiering::SmallBins>()} {
256257
write_depth_limit_ = absl::GetFlag(FLAGS_tiered_storage_write_depth);
@@ -306,6 +307,8 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
306307
const PrimeValue& value,
307308
std::function<T(std::string*)> modf) {
308309
DCHECK(value.IsExternal());
310+
DCHECK(!value.IsCool()); // TBD
311+
309312
util::fb2::Future<T> future;
310313
PrimeValue decoder;
311314
decoder.ImportExternal(value);
@@ -419,6 +422,7 @@ TieredStats TieredStorage::GetStats() const {
419422

420423
{ // Own stats
421424
stats.total_stash_overflows = stats_.stash_overflow_cnt;
425+
stats.cold_storage_bytes = cool_queue_.UsedMemory();
422426
}
423427
return stats;
424428
}

src/server/tiered_storage.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include <absl/container/flat_hash_map.h>
1515

16+
#include "core/cool_queue.h"
1617
#include "server/common.h"
1718
#include "server/table.h"
1819

@@ -34,7 +35,7 @@ class TieredStorage {
3435
// Min sizes of values taking up full page on their own
3536
const static size_t kMinOccupancySize = tiering::kPageSize / 2;
3637

37-
explicit TieredStorage(DbSlice* db_slice, size_t max_size);
38+
explicit TieredStorage(size_t max_file_size, DbSlice* db_slice);
3839
~TieredStorage(); // drop forward declared unique_ptrs
3940

4041
TieredStorage(TieredStorage&& other) = delete;
@@ -79,11 +80,12 @@ class TieredStorage {
7980
// Returns if a value should be stashed
8081
bool ShouldStash(const PrimeValue& pv) const;
8182

82-
private:
8383
PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off
8484

8585
std::unique_ptr<ShardOpManager> op_manager_;
8686
std::unique_ptr<tiering::SmallBins> bins_;
87+
CoolQueue cool_queue_;
88+
8789
unsigned write_depth_limit_ = 10;
8890
struct {
8991
uint64_t stash_overflow_cnt = 0;

0 commit comments

Comments
 (0)