Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
cb8f16b
[Feat] Add support of logical merge in Cagra
rhdong Feb 20, 2025
6c00e52
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Feb 26, 2025
450df2a
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 3, 2025
4debc02
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 4, 2025
c5ecfe8
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 6, 2025
b3a99c6
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 7, 2025
bcfa656
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 12, 2025
186be95
fix ci fail
rhdong Mar 12, 2025
6138f92
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 17, 2025
b962830
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 18, 2025
1e61e1b
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 24, 2025
590c3a8
Merge branch 'branch-25.04' into rhdong/logical-merge
rhdong Mar 31, 2025
a986d2e
resolve conflicts
rhdong Apr 23, 2025
73b96e3
Merge remote-tracking branch 'origin/branch-25.06' into rhdong/logica…
rhdong May 11, 2025
f357c6e
fix errors when processing conflicts & supports i64 output indices.
rhdong May 12, 2025
37bbfe5
Merge branch 'rhdong/logical-merge-dev' into rhdong/logical-merge
rhdong May 12, 2025
8caa251
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 14, 2025
e229aee
Merge remote-tracking branch 'origin/branch-25.06' into rhdong/logica…
rhdong May 15, 2025
5076f8f
fix small dataset issue by avoid graph degree reduce to 1
rhdong May 15, 2025
7610e65
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 15, 2025
0f42309
Merge remote-tracking branch 'origin/branch-25.06' into rhdong/logica…
rhdong May 16, 2025
acb4704
process conflict
rhdong May 16, 2025
63e9d55
Merge branch 'rhdong/logical-merge-dev' into rhdong/logical-merge
rhdong May 16, 2025
db0c624
introduce raft::copy for 2Dcpy
rhdong May 19, 2025
c1023e1
TBR: pin the raft for temp CI verify.
rhdong May 19, 2025
39e7938
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 20, 2025
9e112a7
Revert "TBR: pin the raft for temp CI verify."
rhdong May 20, 2025
f5eb6e4
replace merge_top_k_gpu by select_k
rhdong May 21, 2025
da8c2e5
add benchmark for CAGRA merge
rhdong May 22, 2025
0342358
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 22, 2025
ddef8ce
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 26, 2025
147be54
Refactor composite wrappers and merge API with redesigned index system
rhdong May 27, 2025
acbcb50
add back default constructor
rhdong May 27, 2025
82d978b
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 27, 2025
007b9da
remove MergeStrategy reference of C header
rhdong May 27, 2025
9a69747
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 27, 2025
e19590d
add support multi-stream for CompositeIndex::search
rhdong May 27, 2025
eca7283
setup stream number for resource handle by using set_cuda_stream_pool
rhdong May 27, 2025
bf0bca5
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 27, 2025
7d49411
Merge branch 'branch-25.06' into rhdong/logical-merge
rhdong May 28, 2025
f2ec31c
set n_stream equal to the size of sub_indices
rhdong May 28, 2025
d08f458
optimize naming for split num.
rhdong May 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ if(BUILD_SHARED_LIBS)
src/neighbors/detail/cagra/cagra_build.cpp
src/neighbors/detail/cagra/topk_for_cagra/topk.cu
src/neighbors/dynamic_batching.cu
src/neighbors/cagra_index_wrapper.cu
src/neighbors/composite/index.cu
src/neighbors/composite/merge.cpp
$<$<BOOL:${BUILD_CAGRA_HNSWLIB}>:src/neighbors/hnsw.cpp>
src/neighbors/ivf_flat_index.cpp
src/neighbors/ivf_flat/ivf_flat_build_extend_float_int64_t.cu
Expand Down
14 changes: 14 additions & 0 deletions cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ void parse_build_param(const nlohmann::json& conf,
parse_build_param(comp_search_conf, vpq_pams);
param.cagra_params.compression.emplace(vpq_pams);
}

if (conf.contains("num_dataset_splits")) {
param.num_dataset_splits = conf.at("num_dataset_splits");
}
if (conf.contains("merge_type")) {
std::string mt = conf.at("merge_type");
if (mt == "PHYSICAL") {
param.merge_type = cuvs::bench::CagraMergeType::kPhysical;
} else if (mt == "LOGICAL") {
param.merge_type = cuvs::bench::CagraMergeType::kLogical;
} else {
throw std::runtime_error("invalid value for merge_type");
}
}
}

cuvs::bench::AllocatorType parse_allocator(std::string mem_type)
Expand Down
187 changes: 164 additions & 23 deletions cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cuvs/distance/distance.hpp>
#include <cuvs/neighbors/cagra.hpp>
#include <cuvs/neighbors/common.hpp>
#include <cuvs/neighbors/composite/merge.hpp>
#include <cuvs/neighbors/dynamic_batching.hpp>
#include <cuvs/neighbors/ivf_pq.hpp>
#include <cuvs/neighbors/nn_descent.hpp>
Expand All @@ -44,14 +45,17 @@
#include <iostream>
#include <memory>
#include <optional>
#include <raft/util/integer_utils.hpp>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <vector>

namespace cuvs::bench {

enum class AllocatorType { kHostPinned, kHostHugePage, kDevice };
enum class CagraBuildAlgo { kAuto, kIvfPq, kNnDescent };
enum class CagraMergeType { kPhysical, kLogical };

template <typename T, typename IdxT>
class cuvs_cagra : public algo<T>, public algo_gpu {
Expand Down Expand Up @@ -80,6 +84,8 @@ class cuvs_cagra : public algo<T>, public algo_gpu {
std::optional<float> ivf_pq_refine_rate = std::nullopt;
std::optional<cuvs::neighbors::ivf_pq::index_params> ivf_pq_build_params = std::nullopt;
std::optional<cuvs::neighbors::ivf_pq::search_params> ivf_pq_search_params = std::nullopt;
size_t num_dataset_splits = 1;
CagraMergeType merge_type = CagraMergeType::kPhysical;

void prepare_build_params(const raft::extent_2d<IdxT>& dataset_extents)
{
Expand Down Expand Up @@ -188,6 +194,7 @@ class cuvs_cagra : public algo<T>, public algo_gpu {
bool dynamic_batching_conservative_dispatch_;

std::shared_ptr<cuvs::neighbors::filtering::base_filter> filter_;
std::vector<std::shared_ptr<cuvs::neighbors::cagra::index<T, IdxT>>> sub_indices_;

inline rmm::device_async_resource_ref get_mr(AllocatorType mem_type)
{
Expand All @@ -211,10 +218,57 @@ void cuvs_cagra<T, IdxT>::build(const T* dataset, size_t nrow)
auto dataset_view_device =
raft::make_mdspan<const T, IdxT, raft::row_major, false, true>(dataset, dataset_extents);
bool dataset_is_on_host = raft::get_device_for_address(dataset) == -1;
if (index_params_.num_dataset_splits <= 1) {
index_ = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(std::move(
dataset_is_on_host ? cuvs::neighbors::cagra::build(handle_, params, dataset_view_host)
: cuvs::neighbors::cagra::build(handle_, params, dataset_view_device)));
} else {
IdxT rows_per_split =
raft::ceildiv<IdxT>(nrow, static_cast<IdxT>(index_params_.num_dataset_splits));
for (size_t i = 0; i < index_params_.num_dataset_splits; ++i) {
IdxT start = static_cast<IdxT>(i * rows_per_split);
if (start >= nrow) break;
IdxT rows = std::min(rows_per_split, static_cast<IdxT>(nrow) - start);
const T* sub_ptr = dataset + static_cast<size_t>(start) * dimension_;
auto sub_host =
raft::make_host_matrix_view<const T, int64_t, raft::row_major>(sub_ptr, rows, dimension_);
auto sub_dev =
raft::make_device_matrix_view<const T, int64_t, raft::row_major>(sub_ptr, rows, dimension_);

auto sub_index =
cuvs::neighbors::cagra::index<T, IdxT>(handle_, index_params_.cagra_params.metric);
if (index_params_.merge_type == CagraMergeType::kPhysical) {
if (dataset_is_on_host) {
sub_index.update_dataset(handle_, sub_host);
} else {
sub_index.update_dataset(handle_, sub_dev);
}
}
if (index_params_.merge_type == CagraMergeType::kLogical) {
if (dataset_is_on_host) {
sub_index = cuvs::neighbors::cagra::build(handle_, params, sub_host);
} else {
sub_index = cuvs::neighbors::cagra::build(handle_, params, sub_dev);
}
}
auto sub_index_shared =
std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(std::move(sub_index));
sub_indices_.push_back(std::move(sub_index_shared));
}
if (index_params_.merge_type == CagraMergeType::kPhysical) {
cuvs::neighbors::cagra::merge_params merge_params{index_params_.cagra_params};
merge_params.merge_strategy = cuvs::neighbors::MergeStrategy::MERGE_STRATEGY_PHYSICAL;

std::vector<cuvs::neighbors::cagra::index<T, IdxT>*> indices;
indices.reserve(sub_indices_.size());
for (auto& ptr : sub_indices_) {
indices.push_back(ptr.get());
}

index_ = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(std::move(
dataset_is_on_host ? cuvs::neighbors::cagra::build(handle_, params, dataset_view_host)
: cuvs::neighbors::cagra::build(handle_, params, dataset_view_device)));
index_ = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(
std::move(cuvs::neighbors::cagra::merge(handle_, merge_params, indices)));
}
}
}

inline auto allocator_to_string(AllocatorType mem_type) -> std::string
Expand All @@ -233,7 +287,7 @@ template <typename T, typename IdxT>
void cuvs_cagra<T, IdxT>::set_search_param(const search_param_base& param,
const void* filter_bitset)
{
filter_ = make_cuvs_filter(filter_bitset, index_->size());
if (index_) { filter_ = make_cuvs_filter(filter_bitset, index_->size()); }
auto sp = dynamic_cast<const search_param&>(param);
bool needs_dynamic_batcher_update =
(dynamic_batching_max_batch_size_ != sp.dynamic_batching_max_batch_size) ||
Expand Down Expand Up @@ -314,27 +368,65 @@ void cuvs_cagra<T, IdxT>::set_search_param(const search_param_base& param,
template <typename T, typename IdxT>
void cuvs_cagra<T, IdxT>::set_search_dataset(const T* dataset, size_t nrow)
{
using ds_idx_type = decltype(index_->data().n_rows());
bool is_vpq =
dynamic_cast<const cuvs::neighbors::vpq_dataset<half, ds_idx_type>*>(&index_->data()) ||
dynamic_cast<const cuvs::neighbors::vpq_dataset<float, ds_idx_type>*>(&index_->data());
// It can happen that we are re-using a previous algo object which already has
// the dataset set. Check if we need update.
if (static_cast<size_t>(input_dataset_v_->extent(0)) != nrow ||
input_dataset_v_->data_handle() != dataset) {
*input_dataset_v_ = raft::make_device_matrix_view<const T, int64_t>(dataset, nrow, this->dim_);
need_dataset_update_ = !is_vpq; // ignore update if this is a VPQ dataset.
if (index_params_.num_dataset_splits > 1 &&
index_params_.merge_type == CagraMergeType::kLogical) {
bool dataset_is_on_host = raft::get_device_for_address(dataset) == -1;
IdxT rows_per_split =
raft::ceildiv<IdxT>(nrow, static_cast<IdxT>(index_params_.num_dataset_splits));
for (size_t i = 0; i < sub_indices_.size(); ++i) {
IdxT start = static_cast<IdxT>(i * rows_per_split);
if (start >= nrow) break;
IdxT rows = std::min(rows_per_split, static_cast<IdxT>(nrow) - start);
const T* sub_ptr = dataset + static_cast<size_t>(start) * dimension_;
auto sub_host =
raft::make_host_matrix_view<const T, int64_t, raft::row_major>(sub_ptr, rows, dimension_);
auto sub_dev =
raft::make_device_matrix_view<const T, int64_t, raft::row_major>(sub_ptr, rows, dimension_);
auto sub_index = sub_indices_[i].get();
if (index_params_.merge_type == CagraMergeType::kLogical) {
if (dataset_is_on_host) {
sub_index->update_dataset(handle_, sub_host);
} else {
sub_index->update_dataset(handle_, sub_dev);
}
}
}
need_dataset_update_ = false;
} else {
using ds_idx_type = decltype(index_->data().n_rows());
bool is_vpq =
dynamic_cast<const cuvs::neighbors::vpq_dataset<half, ds_idx_type>*>(&index_->data()) ||
dynamic_cast<const cuvs::neighbors::vpq_dataset<float, ds_idx_type>*>(&index_->data());
// It can happen that we are re-using a previous algo object which already has
// the dataset set. Check if we need update.
if (static_cast<size_t>(input_dataset_v_->extent(0)) != nrow ||
input_dataset_v_->data_handle() != dataset) {
*input_dataset_v_ =
raft::make_device_matrix_view<const T, int64_t>(dataset, nrow, this->dim_);
need_dataset_update_ = !is_vpq; // ignore update if this is a VPQ dataset.
}
}
}

template <typename T, typename IdxT>
void cuvs_cagra<T, IdxT>::save(const std::string& file) const
{
using ds_idx_type = decltype(index_->data().n_rows());
bool is_vpq =
dynamic_cast<const cuvs::neighbors::vpq_dataset<half, ds_idx_type>*>(&index_->data()) ||
dynamic_cast<const cuvs::neighbors::vpq_dataset<float, ds_idx_type>*>(&index_->data());
cuvs::neighbors::cagra::serialize(handle_, file, *index_, is_vpq);
if (index_params_.num_dataset_splits > 1 &&
index_params_.merge_type == CagraMergeType::kLogical) {
for (size_t i = 0; i < sub_indices_.size(); ++i) {
std::string subfile = file + (i == 0 ? "" : ".subidx." + std::to_string(i));
cuvs::neighbors::cagra::serialize(handle_, subfile, *sub_indices_[i], false);
}
std::ofstream f(file + ".submeta", std::ios::out);
f << sub_indices_.size();
f.close();
} else {
using ds_idx_type = decltype(index_->data().n_rows());
bool is_vpq =
dynamic_cast<const cuvs::neighbors::vpq_dataset<half, ds_idx_type>*>(&index_->data()) ||
dynamic_cast<const cuvs::neighbors::vpq_dataset<float, ds_idx_type>*>(&index_->data());
cuvs::neighbors::cagra::serialize(handle_, file, *index_, is_vpq);
}
}

template <typename T, typename IdxT>
Expand All @@ -346,8 +438,24 @@ void cuvs_cagra<T, IdxT>::save_to_hnswlib(const std::string& file) const
template <typename T, typename IdxT>
void cuvs_cagra<T, IdxT>::load(const std::string& file)
{
index_ = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(handle_);
cuvs::neighbors::cagra::deserialize(handle_, file, index_.get());
std::ifstream meta(file + ".submeta", std::ios::in);
if (index_params_.num_dataset_splits > 1 &&
index_params_.merge_type == CagraMergeType::kLogical && meta.good()) {
// Load multiple sub-indices for logical merge
size_t count;
meta >> count;
meta.close();
sub_indices_.clear();
for (size_t i = 0; i < count; ++i) {
std::string subfile = file + (i == 0 ? "" : ".subidx." + std::to_string(i));
auto sub_index = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(handle_);
cuvs::neighbors::cagra::deserialize(handle_, subfile, sub_index.get());
sub_indices_.push_back(std::move(sub_index));
}
} else {
index_ = std::make_shared<cuvs::neighbors::cagra::index<T, IdxT>>(handle_);
cuvs::neighbors::cagra::deserialize(handle_, file, index_.get());
}
}

template <typename T, typename IdxT>
Expand Down Expand Up @@ -377,8 +485,41 @@ void cuvs_cagra<T, IdxT>::search_base(
neighbors_view,
distances_view);
} else {
cuvs::neighbors::cagra::search(
handle_, search_params_, *index_, queries_view, neighbors_view, distances_view, *filter_);
if (index_params_.num_dataset_splits <= 1 ||
index_params_.merge_type == CagraMergeType::kPhysical) {
cuvs::neighbors::cagra::search(
handle_, search_params_, *index_, queries_view, neighbors_view, distances_view, *filter_);
} else {
if (index_params_.merge_type == CagraMergeType::kLogical) {
cuvs::neighbors::cagra::merge_params merge_params{index_params_.cagra_params};
merge_params.merge_strategy = cuvs::neighbors::MergeStrategy::MERGE_STRATEGY_LOGICAL;

// Create wrapped indices for composite merge
std::vector<std::shared_ptr<cuvs::neighbors::IndexWrapper<T, IdxT, algo_base::index_type>>>
wrapped_indices;
wrapped_indices.reserve(sub_indices_.size());
for (auto& ptr : sub_indices_) {
auto index_wrapper =
cuvs::neighbors::cagra::make_index_wrapper<T, IdxT, algo_base::index_type>(ptr.get());
wrapped_indices.push_back(index_wrapper);
}

raft::resources composite_handle(handle_);
size_t n_streams = wrapped_indices.size();
raft::resource::set_cuda_stream_pool(composite_handle,
std::make_shared<rmm::cuda_stream_pool>(n_streams));

auto merged_index =
cuvs::neighbors::composite::merge(composite_handle, merge_params, wrapped_indices);
cuvs::neighbors::filtering::none_sample_filter empty_filter;
merged_index->search(composite_handle,
search_params_,
queries_view,
neighbors_view,
distances_view,
empty_filter);
}
}
}
}

Expand Down
18 changes: 8 additions & 10 deletions cpp/include/cuvs/neighbors/cagra.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include "common.hpp"
#include <cuvs/distance/distance.hpp>
#include <cuvs/neighbors/common.h>
#include <cuvs/neighbors/common.hpp>
#include <cuvs/neighbors/ivf_pq.hpp>
#include <cuvs/neighbors/nn_descent.hpp>
Expand Down Expand Up @@ -275,17 +274,10 @@ struct extend_params {
* @{
*/

/**
* @brief Determines the strategy for merging CAGRA graphs.
*
* @note Currently, only the MERGE_STRATEGY_PHYSICAL strategy is supported.
*/
using MergeStrategy = cuvsMergeStrategy;

/**
* @brief Parameters for merging CAGRA indexes.
*/
struct merge_params {
struct merge_params : cuvs::neighbors::merge_params {
merge_params() = default;

/**
Expand All @@ -298,7 +290,11 @@ struct merge_params {
cagra::index_params output_index_params;

/// Strategy for merging. Defaults to `MergeStrategy::MERGE_STRATEGY_PHYSICAL`.
MergeStrategy strategy = MergeStrategy::MERGE_STRATEGY_PHYSICAL;
cuvs::neighbors::MergeStrategy merge_strategy =
cuvs::neighbors::MergeStrategy::MERGE_STRATEGY_PHYSICAL;

/// Implementation of the polymorphic strategy() method
cuvs::neighbors::MergeStrategy strategy() const { return merge_strategy; }
};

/**
Expand Down Expand Up @@ -2602,3 +2598,5 @@ auto distribute(const raft::resources& clique, const std::string& filename)
-> cuvs::neighbors::mg_index<cagra::index<T, IdxT>, T, IdxT>;

} // namespace cuvs::neighbors::cagra

#include <cuvs/neighbors/cagra_index_wrapper.hpp>
Loading