Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
629 changes: 620 additions & 9 deletions integration/test_global_metrics.py

Large diffs are not rendered by default.

18 changes: 12 additions & 6 deletions integration/test_reclaimable_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_reclaimable_memory_multiple_indexes(self):
info_data = client.info("SEARCH")
initial_reclaimable = int(info_data["search_index_reclaimable_memory"])
assert initial_reclaimable == 0

# Add vectors to both indexes
for idx_name in indexes:
for i in range(5):
Expand All @@ -141,9 +141,15 @@ def test_reclaimable_memory_multiple_indexes(self):

# Check reclaimable memory (should be global across all indexes)
info_data = client.info("SEARCH")
final_reclaimable = int(info_data["search_index_reclaimable_memory"])

# Each vector is 2 float32 values = 2 * 4 = 8 bytes
# We deleted 3 vectors, so reclaimable memory should increase by 3 * 8 = 24 bytes
expected_reclaimable = after_insert_reclaimable + (3 * 2 * 4) # 3 vectors * 2 dimensions * 4 bytes per float32
assert final_reclaimable == expected_reclaimable
# The vectors are still in use
assert int(info_data["search_index_reclaimable_memory"]) == 0

# Delete vectors from second index
for i in range(3):
client.delete(f"multi_idx_2:{i}")

# Check reclaimable memory
info_data = client.info("SEARCH")
# The vectors are no longer in use
assert int(info_data["search_index_reclaimable_memory"]) == 24
4 changes: 2 additions & 2 deletions src/index_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ void IndexSchema::ProcessKeyspaceNotification(ValkeyModuleCtx *ctx,
}
MutatedAttributes mutated_attributes;
bool added = false;
auto interned_key = StringInternStore::Intern(key_cstr, nullptr, indexes::MetricType::kKeysMemory);
auto interned_key = StringInternStore::Intern(key_cstr, StringType::KEY);
for (const auto &attribute_itr : attributes_) {
auto &attribute = attribute_itr.second;
if (!key_obj) {
Expand Down Expand Up @@ -956,7 +956,7 @@ void IndexSchema::OnLoadingEnded(ValkeyModuleCtx *ctx) {
<< " stale entries for {Index: " << name_ << "}";

for (auto &[key, attributes] : deletion_attributes) {
auto interned_key = std::make_shared<InternedString>(key);
auto interned_key = std::make_shared<InternedString>(key, StringType::KEY);
ProcessMutation(ctx, attributes, interned_key, true);
}
VMSDK_LOG(NOTICE, ctx) << "Scanned index schema " << name_
Expand Down
177 changes: 53 additions & 124 deletions src/indexes/global_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,31 @@
#include <cassert>
#include <cstdint>

#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"
#include "src/utils/string_interning.h"
#include "src/indexes/metric_types.h"
#include "vmsdk/src/info.h"

namespace valkey_search::indexes {

// Packed metadata structure for InternedString (32 bits total)
struct MetaData {
uint32_t metric_type : 7; // 7 bits for metric type
uint32_t use_count : 16; // 16 bits for use count
uint32_t ever_used : 1; // 1 bit to track if ever been used
uint32_t reserved : 8; // 8 bits reserved for future use
};

struct MetricData {
std::atomic<uint64_t> count{0};
};

const absl::flat_hash_map<MetricType, absl::string_view> kMetricTypeToStr = {
{MetricType::kVectorsMemory, "vectors_memory"},
{MetricType::kVectorsMemoryMarkedDeleted, "vectors_memory_marked_deleted"},
{MetricType::kHnswNodes, "hnsw_nodes"},
{MetricType::kHnswNodesMarkedDeleted, "hnsw_nodes_marked_deleted"},
{MetricType::kHnswEdges, "hnsw_edges"},
{MetricType::kHnswEdgesMarkedDeleted, "hnsw_edges_marked_deleted"},
{MetricType::kFlatNodes, "flat_nodes"},
{MetricType::kTags, "tags"},
{MetricType::kTagsMemory, "tags_memory"},
{MetricType::kNumericRecords, "numeric_records"},
{MetricType::kInternedStrings, "interned_strings"},
{MetricType::kInternedStringsMarkedDeleted, "interned_strings_marked_deleted"},
{MetricType::kInternedStringsMemory, "interned_strings_memory"},
{MetricType::kKeysMemory, "keys_memory"}
constexpr absl::string_view kMetricTypeStrings[] = {
#define METRIC_ENTRY(name, str) str,
METRIC_TYPES_TABLE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of macro with-in a macro is a very powerful thing. But I believe it would be clearer if you passed in the "METRIC_ENTRY" as a parameter to METRIC_TYPES_TABLE rather than having an externally known name that's not obvious from the usage. By passing in the "macro to invoke" you eliminate odd-ball global dependencies.

#undef METRIC_ENTRY
};

inline absl::string_view GetMetricTypeString(MetricType metric_type) {
const auto index = static_cast<size_t>(metric_type);
if (index >= static_cast<size_t>(MetricType::kMetricTypeCount)) {
return ""; // Invalid metric type
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this should be an assert.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, feels like the body of this and the table should be in .cc file, isn't "kMetricStringTypes" really supposed to be hidden behind "GetMetricTypeString" ??

return kMetricTypeStrings[index];
}

class GlobalIndexStats {
public:
static GlobalIndexStats& Instance() {
Expand All @@ -57,125 +45,66 @@ class GlobalIndexStats {
}

void Incr(MetricType metric_type, uint64_t value = 1) {
GetOrCreateMetric(metric_type).count.fetch_add(value, std::memory_order_relaxed);
GetMetric(metric_type).count.fetch_add(value, std::memory_order_relaxed);
}

void Decr(MetricType metric_type, uint64_t value = 1) {
GetOrCreateMetric(metric_type).count.fetch_sub(value, std::memory_order_relaxed);
GetMetric(metric_type).count.fetch_sub(value, std::memory_order_relaxed);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be some insulation against going negative (large positive). This is a common failure in these kinds of tracking operations. When you detect a negative counter, you should clamp it to zero and put out a log message -- be sure to use the LOG_EVERY_N .... macro to rate-limit the log messages, because you could get a lot of them here. Optionally, we might want to have a configurable to make this be a hard crash in order to help debug any account errors.

}

uint64_t GetCount(MetricType metric_type) const {
auto it = metrics_.find(metric_type);
return it != metrics_.end() ? it->second->count.load(std::memory_order_relaxed) : 0;
}

absl::flat_hash_map<MetricType, uint64_t> GetAllMetrics() const {
absl::flat_hash_map<MetricType, uint64_t> result;
for (const auto& [type, metric] : metrics_) {
result[type] = metric->count.load(std::memory_order_relaxed);
auto& store = ::valkey_search::StringInternStore::Instance();
uint64_t total = 0;
if (metric_type == MetricType::kInternedStrings) {
for (size_t j = 0; j < static_cast<size_t>(::valkey_search::StringType::kStringTypeCount); ++j) {
Comment on lines +57 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch statement please.

total += store.GetCounters(static_cast<::valkey_search::StringType>(j)).object_count;
}
return total;
} else if (metric_type == MetricType::kInternedStringsMemory) {
for (size_t j = 0; j < static_cast<size_t>(::valkey_search::StringType::kStringTypeCount); ++j) {
total += store.GetCounters(static_cast<::valkey_search::StringType>(j)).memory_bytes;
}
return total;
} else if (metric_type == MetricType::kVectorsMemory) {
return store.GetCounters(::valkey_search::StringType::VECTOR).memory_bytes;
} else if (metric_type == MetricType::kVectorsMemoryMarkedDeleted) {
return store.GetMarkedDeletedCounters().memory_bytes;
} else if (metric_type == MetricType::kVectorsMarkedDeleted) {
return store.GetMarkedDeletedCounters().object_count;
} else if (metric_type == MetricType::kTagsMemory) {
return store.GetCounters(::valkey_search::StringType::TAG).memory_bytes;
} else if (metric_type == MetricType::kKeysMemory) {
return store.GetCounters(::valkey_search::StringType::KEY).memory_bytes;
} else {
const size_t index = static_cast<size_t>(metric_type);
if (index >= static_cast<size_t>(MetricType::kMetricTypeCount)) {
return 0; // Invalid metric type
}
Comment on lines +80 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like an assert would be more appropriate here on the "default" label on the switch.

return metrics_[index].count.load(std::memory_order_relaxed);
}
return result;
}

private:
GlobalIndexStats() = default;

MetricData& GetOrCreateMetric(MetricType metric_type) {
auto [it, inserted] = metrics_.try_emplace(metric_type, nullptr);
if (inserted) {
it->second = std::make_unique<MetricData>();
}
return *it->second;
MetricData& GetMetric(MetricType metric_type) {
const size_t index = static_cast<size_t>(metric_type);
assert(index < static_cast<size_t>(MetricType::kMetricTypeCount));
return metrics_[index];
}

absl::flat_hash_map<MetricType, std::unique_ptr<MetricData>> metrics_;
MetricData metrics_[static_cast<size_t>(MetricType::kMetricTypeCount)];
};

inline void OnInternedStringAlloc(::valkey_search::InternedString* interned_str, MetricType metric_type) {
if (!interned_str) return;
size_t bytes = interned_str->Str().length();
GlobalIndexStats::Instance().Incr(MetricType::kInternedStringsMemory, bytes);
GlobalIndexStats::Instance().Incr(MetricType::kInternedStrings, 1);

if (metric_type == MetricType::kNone) return;

MetaData metadata{};
metadata.metric_type = static_cast<uint8_t>(metric_type);
metadata.use_count = 0; // Start with 0
metadata.ever_used = 0; // Not used yet
metadata.reserved = 0;
interned_str->SetMetadataFlags(*reinterpret_cast<uint32_t*>(&metadata));

GlobalIndexStats::Instance().Incr(metric_type, bytes);
}

inline void OnInternedStringDealloc(::valkey_search::InternedString* interned_str) {
size_t bytes = interned_str->Str().length();
GlobalIndexStats::Instance().Decr(MetricType::kInternedStringsMemory, bytes);
GlobalIndexStats::Instance().Decr(MetricType::kInternedStrings, 1);
uint32_t metadata_flags = interned_str->GetMetadataFlags();
if (metadata_flags == 0) return;

MetaData* metadata = reinterpret_cast<MetaData*>(&metadata_flags);
assert(metadata->use_count == 0); // Assert use_count is 0 when deallocating
MetricType type = static_cast<MetricType>(metadata->metric_type);
if (type != MetricType::kNone) {
GlobalIndexStats::Instance().Decr(type, bytes);
}
}

inline bool OnInternedStringMarkUnused(const std::shared_ptr<::valkey_search::InternedString>& interned_str) {
if (!interned_str) return false;

uint32_t* flags_ptr = interned_str->GetMetadataFlagsPtr();
MetaData* metadata = reinterpret_cast<MetaData*>(flags_ptr);
assert(metadata->use_count > 0);
metadata->use_count--;

if (metadata->use_count == 0 && metadata->ever_used) {
// Only mark as deleted if it was actually used and now back to 0
MetricType type = static_cast<MetricType>(metadata->metric_type);
// Only intern strings that may be marked as deleted are kVectorsMemory
assert(type == MetricType::kVectorsMemory);
GlobalIndexStats::Instance().Incr(MetricType::kVectorsMemoryMarkedDeleted, interned_str->Str().length());
GlobalIndexStats::Instance().Incr(MetricType::kInternedStringsMarkedDeleted, 1);
}

return true;
}

inline bool OnInternedStringIncrUsed(const std::shared_ptr<::valkey_search::InternedString>& interned_str) {
if (!interned_str) return false;

uint32_t* flags_ptr = interned_str->GetMetadataFlagsPtr();
if (*flags_ptr == 0) return false;

MetaData* metadata = reinterpret_cast<MetaData*>(flags_ptr);

if (metadata->use_count == 0 && metadata->ever_used) {
// This was marked as deleted, now used again
MetricType type = static_cast<MetricType>(metadata->metric_type);
assert(type == MetricType::kVectorsMemory);
GlobalIndexStats::Instance().Decr(MetricType::kVectorsMemoryMarkedDeleted, interned_str->Str().length());
GlobalIndexStats::Instance().Decr(MetricType::kInternedStringsMarkedDeleted, 1);
}

// Mark as ever used on first increment
if (!metadata->ever_used) {
metadata->ever_used = 1;
}

metadata->use_count++;

return true;
}

template<typename InfoFieldInteger>
inline void CreateGlobalMetricsInfoFields() {
static auto global_metrics_fields = []() {
std::vector<std::unique_ptr<InfoFieldInteger>> fields;

for (const auto& [metric_type, metric_name] : kMetricTypeToStr) {

for (size_t i = 0; i < static_cast<size_t>(MetricType::kMetricTypeCount); ++i) {
const auto metric_type = static_cast<MetricType>(i);
const absl::string_view metric_name = GetMetricTypeString(metric_type);
if (metric_name.empty()) continue;
auto count_field = std::make_unique<InfoFieldInteger>(
"global_metrics", std::string(metric_name),
vmsdk::info_field::IntegerBuilder()
Expand Down
43 changes: 22 additions & 21 deletions src/indexes/metric_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,30 @@

namespace valkey_search::indexes {

#define METRIC_TYPES_TABLE \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as describe above make METRIC_ENTRY be a parameter to this macro

METRIC_ENTRY(kNone, "") \
METRIC_ENTRY(kVectorsMemory, "vectors_memory") \
METRIC_ENTRY(kVectorsMemoryMarkedDeleted, "vectors_memory_marked_deleted") \
METRIC_ENTRY(kVectorsMarkedDeleted, "vectors_marked_deleted") \
METRIC_ENTRY(kHnswNodes, "hnsw_nodes") \
METRIC_ENTRY(kHnswNodesMarkedDeleted, "hnsw_nodes_marked_deleted") \
METRIC_ENTRY(kHnswEdges, "hnsw_edges") \
METRIC_ENTRY(kHnswEdgesMarkedDeleted, "hnsw_edges_marked_deleted") \
METRIC_ENTRY(kFlatNodes, "flat_nodes") \
METRIC_ENTRY(kTags, "tags") \
METRIC_ENTRY(kTagsMemory, "tags_memory") \
METRIC_ENTRY(kNumericRecords, "numeric_records") \
METRIC_ENTRY(kInternedStrings, "interned_strings") \
METRIC_ENTRY(kInternedStringsMemory, "interned_strings_memory") \
METRIC_ENTRY(kKeysMemory, "keys_memory")

// Generate the enum from the table above
enum class MetricType {
kNone, // Indicates no metric type set
kVectorsMemory,
kVectorsMemoryMarkedDeleted,

kHnswNodes,
kHnswNodesMarkedDeleted,
kHnswEdges,
kHnswEdgesMarkedDeleted,

kFlatNodes,

kTags,
kTagsMemory,

kNumericRecords,

kInternedStrings,
kInternedStringsMarkedDeleted,
kInternedStringsMemory,

kKeysMemory,
#define METRIC_ENTRY(name, str) name,
METRIC_TYPES_TABLE
#undef METRIC_ENTRY

// Sentinel value for array bounds checking
kMetricTypeCount
};

Expand Down
4 changes: 2 additions & 2 deletions src/indexes/tag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Tag::~Tag() {

absl::StatusOr<bool> Tag::AddRecord(const InternedStringPtr& key,
absl::string_view data) {
auto interned_data = StringInternStore::Intern(data, nullptr, MetricType::kTagsMemory);
auto interned_data = StringInternStore::Intern(data, StringType::TAG);
auto parsed_tags = ParseRecordTags(*interned_data, separator_);
absl::MutexLock lock(&index_mutex_);
if (parsed_tags.empty()) {
Expand Down Expand Up @@ -119,7 +119,7 @@ absl::flat_hash_set<absl::string_view> Tag::ParseRecordTags(
absl::StatusOr<bool> Tag::ModifyRecord(const InternedStringPtr& key,
absl::string_view data) {
// TODO: implement operator [] in patricia_tree.
auto interned_data = StringInternStore::Intern(data);
auto interned_data = StringInternStore::Intern(data, StringType::TAG);
auto new_parsed_tags = ParseRecordTags(*interned_data, separator_);
if (new_parsed_tags.empty()) {
[[maybe_unused]] auto res =
Expand Down
10 changes: 5 additions & 5 deletions src/indexes/vector_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ std::shared_ptr<InternedString> VectorBase::InternVector(
NormalizeEmbedding(record, GetDataTypeSize(), &magnitude.value());
return StringInternStore::Intern(
absl::string_view((const char *)norm_record.data(), norm_record.size()),
vector_allocator_.get(), MetricType::kVectorsMemory);
StringType::VECTOR, vector_allocator_.get());
}
return StringInternStore::Intern(record, vector_allocator_.get(), MetricType::kVectorsMemory);
return StringInternStore::Intern(record, StringType::VECTOR, vector_allocator_.get());
}

absl::StatusOr<bool> VectorBase::AddRecord(const InternedStringPtr &key,
Expand Down Expand Up @@ -320,7 +320,7 @@ absl::StatusOr<std::optional<uint64_t>> VectorBase::UnTrackKey(

char *VectorBase::TrackVector(uint64_t internal_id, char *vector, size_t len) {
auto interned_vector = StringInternStore::Intern(
absl::string_view(vector, len), vector_allocator_.get());
absl::string_view(vector, len), StringType::VECTOR, vector_allocator_.get());
TrackVector(internal_id, interned_vector);
return (char *)interned_vector->Str().data();
}
Expand Down Expand Up @@ -434,7 +434,7 @@ void VectorBase::ExternalizeVector(ValkeyModuleCtx *ctx,
is_module_owned);
CHECK(!is_module_owned);
std::optional<float> magnitude;
auto interned_key = StringInternStore::Intern(key_cstr, nullptr, indexes::MetricType::kKeysMemory);
auto interned_key = StringInternStore::Intern(key_cstr, StringType::KEY);
auto interned_vector =
InternVector(vmsdk::ToStringView(record.get()), magnitude);
if (interned_vector) {
Expand All @@ -455,7 +455,7 @@ absl::Status VectorBase::LoadTrackedKeys(
if (!tracked_key_metadata.ParseFromString(metadata_str->binary_content())) {
return absl::InvalidArgumentError("Error parsing metadata from proto");
}
auto interned_key = StringInternStore::Intern(tracked_key_metadata.key(), nullptr, indexes::MetricType::kKeysMemory);
auto interned_key = StringInternStore::Intern(tracked_key_metadata.key(), StringType::KEY);
tracked_metadata_by_key_.insert(
{interned_key,
{.internal_id = tracked_key_metadata.internal_id(),
Expand Down
Loading