Skip to content
68 changes: 51 additions & 17 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
};

pub use rocksdb::DBRawIterator;

use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -143,25 +145,51 @@ impl CompactionProfile {
}
}

pub trait Comparator<'a>: Send + Sync {
fn get_fn(&self) -> Box<dyn Fn(&[u8], &[u8]) -> cmp::Ordering + 'a>;
}

pub struct ComparatorWrapper<T> {
cmp: T,
}

impl<T> ComparatorWrapper<T> {
pub fn new(cmp: T) -> Self {
Self { cmp }
}
}

impl<T: 'static + Send + Sync + Clone + Fn(&[u8], &[u8]) -> cmp::Ordering> Comparator<'static> for ComparatorWrapper<T> {
fn get_fn(&self) -> Box<dyn Fn(&[u8], &[u8]) -> cmp::Ordering + 'static> {
Box::new(self.cmp.clone())
}
}

#[derive(Clone, Default)]
pub struct ColumnConfig {
/// Memory budget (in MiB) used for setting block cache size and
/// write buffer size for each column including the default one.
/// If the memory budget of a column is not specified,
/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
pub memory_budget: Option<MiB>,

pub comparator: Option<std::sync::Arc<dyn Comparator<'static>>>,
}

/// Database configuration
#[derive(Clone)]
#[non_exhaustive]
pub struct DatabaseConfig {
/// Max number of open files.
pub max_open_files: i32,
/// Memory budget (in MiB) used for setting block cache size and
/// write buffer size for each column including the default one.
/// If the memory budget of a column is not specified,
/// `DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB` is used for that column.
pub memory_budget: HashMap<u32, MiB>,
/// Compaction profile.
pub compaction: CompactionProfile,
/// Set number of columns.
///
/// # Safety
///
/// The number of columns must not be zero.
pub columns: u32,
pub columns: Vec<ColumnConfig>,
/// Specify the maximum number of info/debug log files to be kept.
pub keep_log_file_num: i32,
/// Enable native RocksDB statistics.
Expand Down Expand Up @@ -196,22 +224,22 @@ impl DatabaseConfig {
/// # Safety
///
/// The number of `columns` must not be zero.
pub fn with_columns(columns: u32) -> Self {
assert!(columns > 0, "the number of columns must not be zero");
pub fn with_columns(columns: Vec<ColumnConfig>) -> Self {
assert!(columns.len() > 0, "the number of columns must not be zero");

Self { columns, ..Default::default() }
}

/// Returns the total memory budget in bytes.
pub fn memory_budget(&self) -> MiB {
(0..self.columns)
.map(|i| self.memory_budget.get(&i).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
self.columns.iter()
.map(|i| i.memory_budget.unwrap_or(DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB)
.sum()
}

/// Returns the memory budget of the specified column in bytes.
fn memory_budget_for_col(&self, col: u32) -> MiB {
self.memory_budget.get(&col).unwrap_or(&DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
self.columns[col as usize].memory_budget.unwrap_or(DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB) * MB
}

// Get column family configuration with the given block based options.
Expand All @@ -224,6 +252,9 @@ impl DatabaseConfig {
opts.optimize_level_style_compaction(column_mem_budget);
opts.set_target_file_size_base(self.compaction.initial_file_size);
opts.set_compression_per_level(&[]);
if let Some(comparator) = &self.columns[col as usize].comparator {
opts.set_comparator(&format!("column_{col}_comparator"), comparator.get_fn());
}

opts
}
Expand All @@ -233,9 +264,8 @@ impl Default for DatabaseConfig {
fn default() -> DatabaseConfig {
DatabaseConfig {
max_open_files: 512,
memory_budget: HashMap::new(),
compaction: CompactionProfile::default(),
columns: 1,
columns: vec![ColumnConfig::default()],
keep_log_file_num: 1,
enable_statistics: false,
secondary: None,
Expand Down Expand Up @@ -334,12 +364,12 @@ impl Database {
///
/// The number of `config.columns` must not be zero.
pub fn open<P: AsRef<Path>>(config: &DatabaseConfig, path: P) -> io::Result<Database> {
assert!(config.columns > 0, "the number of columns must not be zero");
assert!(config.columns.len() > 0, "the number of columns must not be zero");

let opts = generate_options(config);
let block_opts = generate_block_based_options(config)?;

let column_names: Vec<_> = (0..config.columns).map(|c| format!("col{}", c)).collect();
let column_names: Vec<_> = (0..config.columns.len()).map(|c| format!("col{}", c)).collect();
let write_opts = WriteOptions::default();
let read_opts = generate_read_options();

Expand Down Expand Up @@ -369,8 +399,8 @@ impl Database {
column_names: &[&str],
block_opts: &BlockBasedOptions,
) -> io::Result<rocksdb::DB> {
let cf_descriptors: Vec<_> = (0..config.columns)
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i)))
let cf_descriptors: Vec<_> = (0..config.columns.len())
.map(|i| ColumnFamilyDescriptor::new(column_names[i as usize], config.column_config(&block_opts, i as u32)))
.collect();

let db = match DB::open_cf_descriptors(&opts, path.as_ref(), cf_descriptors) {
Expand Down Expand Up @@ -579,6 +609,10 @@ impl Database {
pub fn try_catch_up_with_primary(&self) -> io::Result<()> {
self.inner.db.try_catch_up_with_primary().map_err(other_io_err)
}

pub fn raw_iter(&self, col: u32) -> io::Result<DBRawIterator<'_>> {
Ok(self.inner.db.raw_iterator_cf(self.inner.cf(col as usize)?))
}
}

// duplicate declaration of methods here to avoid trait import in certain existing cases
Expand Down