Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/include/cuvs/neighbors/cagra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1975,6 +1975,9 @@ void serialize_to_hnswlib(
* @note: When device memory is sufficient, the dataset attached to the returned index is allocated
* in device memory by default; otherwise, host memory is used automatically.
*
* @note: This API only supports physical merge (`merge_strategy = MERGE_STRATEGY_PHYSICAL`), and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on our discussion yesterday, we should just remove the "merge strategy" concept all together and support only physical in the "merge" apis. We can still provide a composite that will allow for multiple indexes to be searched concurrently, but don't need to make it part of the merge() apis.

* attempting a logical merge here will throw an error.
*
* Usage example:
* @code{.cpp}
* using namespace raft::neighbors;
Expand Down
24 changes: 24 additions & 0 deletions cpp/include/cuvs/neighbors/tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,30 @@ cuvsError_t cuvsTieredIndexSearch(cuvsResources_t res,
cuvsError_t cuvsTieredIndexExtend(cuvsResources_t res,
DLManagedTensor* new_vectors,
cuvsTieredIndex_t index);
/**
* @}
*/

/**
* @defgroup tiered_c_index_merge Tiered index merge
* @{
*/
/**
* @brief Merge multiple indices together into a single index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we flesh out this description a little more to provide more details for the user on exaclty how this does the merge? We should do the same in the c++ description also. Maybe add details about what happens to the vectors in tier 0, what happens to the indexes in tier 1, what are some of the memory considerations (I know there's copies- what are some of the assumptions a user will need to make?)

*
* @param[in] res cuvsResources_t opaque C handle
* @param[in] index_params Index parameters to use when merging
* @param[in] indices pointers to indices to merge together
* @param[in] num_indices the number of indices to merge
* @param[out] output_index the merged index
* @return cuvsError_t
*/
cuvsError_t cuvsTieredIndexMerge(cuvsResources_t res,
cuvsTieredIndexParams_t index_params,
cuvsTieredIndex_t* indices,
size_t num_indices,
cuvsTieredIndex_t output_index);

/**
* @}
*/
Expand Down
30 changes: 30 additions & 0 deletions cpp/include/cuvs/neighbors/tiered_index.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,34 @@ void search(raft::resources const& res,
raft::device_matrix_view<float, int64_t, raft::row_major> distances,
const cuvs::neighbors::filtering::base_filter& sample_filter =
cuvs::neighbors::filtering::none_sample_filter{});

/** @brief Merge multiple tiered indices into a single index.
*
* This function merges multiple tiered indices into one, combining both the datasets and graph
* structures.
*
* @param[in] res
* @param[in] index_params configure the index building
* @param[in] indices A vector of pointers to the indices to merge. All indices should
* be of the same type, and have datasets with the same dimensionality
*
* @return A new tiered index containing the merged indices
*/
auto merge(raft::resources const& res,
const index_params<cagra::index_params>& index_params,
const std::vector<tiered_index::index<cagra::index<float, uint32_t>>*>& indices)
-> tiered_index::index<cagra::index<float, uint32_t>>;

/** @copydoc merge */
auto merge(raft::resources const& res,
const index_params<ivf_flat::index_params>& index_params,
const std::vector<tiered_index::index<ivf_flat::index<float, int64_t>>*>& indices)
-> tiered_index::index<ivf_flat::index<float, int64_t>>;

/** @copydoc merge */
auto merge(raft::resources const& res,
const index_params<ivf_pq::index_params>& index_params,
const std::vector<tiered_index::index<ivf_pq::typed_index<float, int64_t>>*>& indices)
-> tiered_index::index<ivf_pq::typed_index<float, int64_t>>;

} // namespace cuvs::neighbors::tiered_index
6 changes: 5 additions & 1 deletion cpp/src/neighbors/detail/cagra/cagra_merge.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,10 @@ index<T, IdxT> merge(raft::resources const& handle,
const cagra::merge_params& params,
std::vector<cuvs::neighbors::cagra::index<T, IdxT>*>& indices)
{
// we're doing a physical merge here, make sure that this matches the merge_params
RAFT_EXPECTS(params.merge_strategy == cuvs::neighbors::MergeStrategy::MERGE_STRATEGY_PHYSICAL,
"cagra::merge only supports merge_strategy=MERGE_STRATEGY_PHYSICAL");

using cagra_index_t = cuvs::neighbors::cagra::index<T, IdxT>;
using ds_idx_type = typename cagra_index_t::dataset_index_type;

Expand Down
73 changes: 73 additions & 0 deletions cpp/src/neighbors/detail/tiered_index.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,79 @@ auto build(
return std::shared_ptr<index_state<UpstreamT>>(ret);
}

/**
* @brief merge multiple indices together
*/
template <typename UpstreamT>
auto merge(raft::resources const& res,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder- if we are going to rebuild the cagra index anyways to consolidate tiered indices, should we just go ahead and add all the vectors in tier0 to that cagra index too (e.g. should we also do a compaction?). Disregard if this is already what we're doing.

const index_params<typename UpstreamT::index_params_type>& index_params,
const std::vector<tiered_index::index<UpstreamT>*>& indices)
-> std::shared_ptr<index_state<UpstreamT>>
{
using value_type = typename UpstreamT::value_type;

RAFT_EXPECTS(indices.size() > 0, "Must pass at least one index to merge");

for (auto index : indices) {
RAFT_EXPECTS(index != nullptr,
"Null pointer detected in 'indices'. Ensure all elements are valid before usage.");
}

// handle simple case of only one index being merged
if (indices.size() == 1) { return indices[0]->state; }

auto dim = indices[0]->state->dim();
auto include_norms = indices[0]->state->storage->include_norms;

// validate data and check what needs to be merged
size_t bfknn_rows = 0, ann_rows = 0;
for (auto index : indices) {
RAFT_EXPECTS(dim == index->state->dim(), "Each index must have the same dimensionality");
bfknn_rows += index->state->bfknn_rows();
ann_rows += index->state->ann_rows();
}

// degenerate case - all indices are empty, just re-use the state from the first index
if (!bfknn_rows && !ann_rows) { return indices[0]->state; }

// concatenate all the storages together
auto to_allocate = bfknn_rows + ann_rows;
auto new_storage =
std::make_shared<brute_force_storage<value_type>>(res, to_allocate, dim, include_norms);

for (auto index : indices) {
auto storage = index->state->storage;

// copy over dataset to new storage
raft::copy(res,
raft::make_device_matrix_view<value_type, int64_t, raft::row_major>(
new_storage->dataset.data() + new_storage->num_rows_used * dim,
storage->num_rows_used,
dim),
raft::make_device_matrix_view<const value_type, int64_t, raft::row_major>(
storage->dataset.data(), storage->num_rows_used, dim));

// copy over precalculated norms
if (include_norms) {
raft::copy(res,
raft::make_device_vector_view<value_type, int64_t, raft::row_major>(
new_storage->norms->data() + new_storage->num_rows_used, storage->num_rows_used),
raft::make_device_vector_view<const value_type, int64_t, raft::row_major>(
storage->norms->data(), storage->num_rows_used));
}
new_storage->num_rows_used += storage->num_rows_used;
}

auto next_state = std::make_shared<index_state<UpstreamT>>(*indices[0]->state);
next_state->storage = new_storage;
next_state->build_params = index_params;

if (next_state->bfknn_rows() > static_cast<size_t>(next_state->build_params.min_ann_rows)) {
next_state = compact(res, *next_state);
}
return next_state;
}

template <typename UpstreamT>
auto extend(raft::resources const& res,
const index_state<UpstreamT>& current,
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/neighbors/tiered_index.cu
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ void search(raft::resources const& res,
res, search_params, ivf_pq::typed_search, queries, neighbors, distances, sample_filter);
}

auto merge(raft::resources const& res,
const index_params<cagra::index_params>& index_params,
const std::vector<tiered_index::index<cagra::index<float, uint32_t>>*>& indices)
-> tiered_index::index<cagra::index<float, uint32_t>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we would have merge() return index and, as you have done, use the PIMPL idom to allow the return of either the same index type that was input, or a composite_index.

{
auto state = detail::merge(res, index_params, indices);
return cuvs::neighbors::tiered_index::index<cagra::index<float, uint32_t>>(state);
}

auto merge(raft::resources const& res,
const index_params<ivf_flat::index_params>& index_params,
const std::vector<tiered_index::index<ivf_flat::index<float, int64_t>>*>& indices)
-> tiered_index::index<ivf_flat::index<float, int64_t>>
{
auto state = detail::merge(res, index_params, indices);
return cuvs::neighbors::tiered_index::index<ivf_flat::index<float, int64_t>>(state);
}

auto merge(raft::resources const& res,
const index_params<ivf_pq::index_params>& index_params,
const std::vector<tiered_index::index<ivf_pq::typed_index<float, int64_t>>*>& indices)
-> tiered_index::index<ivf_pq::typed_index<float, int64_t>>
{
auto state = detail::merge(res, index_params, indices);
return cuvs::neighbors::tiered_index::index<ivf_pq::typed_index<float, int64_t>>(state);
}

template <typename UpstreamT>
int64_t index<UpstreamT>::size() const noexcept
{
Expand Down
126 changes: 104 additions & 22 deletions cpp/src/neighbors/tiered_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,47 +37,60 @@
namespace {
using namespace cuvs::neighbors;

template <typename T>
void convert_c_index_params(cuvsTieredIndexParams params,
int64_t n_rows,
int64_t dim,
tiered_index::index_params<T>* out)
{
out->min_ann_rows = params.min_ann_rows;
out->create_ann_index_on_extend = params.create_ann_index_on_extend;
out->metric = params.metric;

if constexpr (std::is_same_v<T, cagra::index_params>) {
if (params.cagra_params != NULL) {
cagra::convert_c_index_params(*params.cagra_params, n_rows, dim, out);
}
} else if constexpr (std::is_same_v<T, ivf_flat::index_params>) {
if (params.ivf_flat_params != NULL) {
ivf_flat::convert_c_index_params(*params.ivf_flat_params, out);
}
} else if constexpr (std::is_same_v<T, ivf_pq::index_params>) {
if (params.ivf_pq_params != NULL) {
ivf_pq::convert_c_index_params(*params.ivf_pq_params, out);
}
} else {
RAFT_FAIL("unhandled index params type");
}
}

template <typename T>
void* _build(cuvsResources_t res, cuvsTieredIndexParams params, DLManagedTensor* dataset_tensor)
{
auto res_ptr = reinterpret_cast<raft::resources*>(res);
using mdspan_type = raft::device_matrix_view<const T, int64_t, raft::row_major>;
auto mds = cuvs::core::from_dlpack<mdspan_type>(dataset_tensor);

auto dataset = dataset_tensor->dl_tensor;
RAFT_EXPECTS(dataset.ndim == 2, "dataset should be a 2-dimensional tensor");
RAFT_EXPECTS(dataset.shape != nullptr, "dataset should have an initialized shape");

switch (params.algo) {
case CUVS_TIERED_INDEX_ALGO_CAGRA: {
auto build_params = tiered_index::index_params<cagra::index_params>();
if (params.cagra_params != NULL) {
auto dataset = dataset_tensor->dl_tensor;
cagra::convert_c_index_params(
*params.cagra_params, dataset.shape[0], dataset.shape[1], &build_params);
}
build_params.min_ann_rows = params.min_ann_rows;
build_params.create_ann_index_on_extend = params.create_ann_index_on_extend;
build_params.metric = params.metric;
convert_c_index_params(params, dataset.shape[0], dataset.shape[1], &build_params);
return new tiered_index::index<cagra::index<T, uint32_t>>(
tiered_index::build(*res_ptr, build_params, mds));
}
case CUVS_TIERED_INDEX_ALGO_IVF_FLAT: {
auto build_params = tiered_index::index_params<ivf_flat::index_params>();
if (params.ivf_flat_params != NULL) {
ivf_flat::convert_c_index_params(*params.ivf_flat_params, &build_params);
}
build_params.metric = params.metric;
build_params.min_ann_rows = params.min_ann_rows;
build_params.create_ann_index_on_extend = params.create_ann_index_on_extend;
convert_c_index_params(params, dataset.shape[0], dataset.shape[1], &build_params);
return new tiered_index::index<ivf_flat::index<T, int64_t>>(
tiered_index::build(*res_ptr, build_params, mds));
}
case CUVS_TIERED_INDEX_ALGO_IVF_PQ: {
auto build_params = tiered_index::index_params<ivf_pq::index_params>();
build_params.metric = params.metric;
if (params.ivf_pq_params != NULL) {
ivf_pq::convert_c_index_params(*params.ivf_pq_params, &build_params);
}
build_params.metric = params.metric;
build_params.min_ann_rows = params.min_ann_rows;
build_params.create_ann_index_on_extend = params.create_ann_index_on_extend;
auto build_params = tiered_index::index_params<ivf_pq::index_params>();
convert_c_index_params(params, dataset.shape[0], dataset.shape[1], &build_params);
return new tiered_index::index<ivf_pq::typed_index<T, int64_t>>(
tiered_index::build(*res_ptr, build_params, mds));
}
Expand Down Expand Up @@ -157,6 +170,47 @@ void _extend(cuvsResources_t res, DLManagedTensor* new_vectors, cuvsTieredIndex

tiered_index::extend(*res_ptr, vectors_mds, index_ptr);
}
template <typename UpstreamT>
void _merge(cuvsResources_t res,
cuvsTieredIndexParams params,
cuvsTieredIndex_t* indices,
size_t num_indices,
cuvsTieredIndex_t output_index)
{
auto res_ptr = reinterpret_cast<raft::resources*>(res);

std::vector<cuvs::neighbors::tiered_index::index<UpstreamT>*> cpp_indices;

int64_t n_rows = 0, dim = 0;
for (size_t i = 0; i < num_indices; ++i) {
RAFT_EXPECTS(indices[i]->dtype.code == indices[0]->dtype.code,
"indices must all have the same dtype");
RAFT_EXPECTS(indices[i]->dtype.bits == indices[0]->dtype.bits,
"indices must all have the same dtype");
RAFT_EXPECTS(indices[i]->algo == indices[0]->algo,
"indices must all have the same index algorithm");

auto idx_ptr =
reinterpret_cast<cuvs::neighbors::tiered_index::index<UpstreamT>*>(indices[i]->addr);
n_rows += idx_ptr->size();
if (dim) {
RAFT_EXPECTS(dim == idx_ptr->dim(), "indices must all have the same dimensionality");
} else {
dim = idx_ptr->dim();
}
cpp_indices.push_back(idx_ptr);
}

auto build_params = tiered_index::index_params<typename UpstreamT::index_params_type>();
convert_c_index_params(params, n_rows, dim, &build_params);

auto ptr = new cuvs::neighbors::tiered_index::index<UpstreamT>(
cuvs::neighbors::tiered_index::merge(*res_ptr, build_params, cpp_indices));

output_index->addr = reinterpret_cast<uintptr_t>(ptr);
output_index->dtype = indices[0]->dtype;
output_index->algo = indices[0]->algo;
}

} // namespace

Expand Down Expand Up @@ -305,3 +359,31 @@ extern "C" cuvsError_t cuvsTieredIndexExtend(cuvsResources_t res,
}
});
}

extern "C" cuvsError_t cuvsTieredIndexMerge(cuvsResources_t res,
cuvsTieredIndexParams_t params,
cuvsTieredIndex_t* indices,
size_t num_indices,
cuvsTieredIndex_t output_index)
{
return cuvs::core::translate_exceptions([=] {
RAFT_EXPECTS(num_indices >= 1, "must have at least one index to merge");

switch (indices[0]->algo) {
case CUVS_TIERED_INDEX_ALGO_CAGRA: {
_merge<cagra::index<float, uint32_t>>(res, *params, indices, num_indices, output_index);
break;
}
case CUVS_TIERED_INDEX_ALGO_IVF_FLAT: {
_merge<ivf_flat::index<float, int64_t>>(res, *params, indices, num_indices, output_index);
break;
}
case CUVS_TIERED_INDEX_ALGO_IVF_PQ: {
_merge<ivf_pq::typed_index<float, int64_t>>(
res, *params, indices, num_indices, output_index);
break;
}
default: RAFT_FAIL("unsupported tiered index algorithm");
}
});
}
Loading